首页 > 代码库 > 46.ActiveMQ开篇(Hello World、安全认证、Connection、Session、MessageProducer、MessageConsumer)

46.ActiveMQ开篇(Hello World、安全认证、Connection、Session、MessageProducer、MessageConsumer)

一、背景介绍

  CORBA\DCOM\RMI等RPC中间件技术已经广泛应用于各个领域,但是面对规模和复杂度都越来越高的分布式系统,这些技术慢慢显现出局限性:

  • 同步通信:客户发出调用后,必须等待服务完成处理并返回结果后才能继续执行;
  • 客户端和服务端的生命周期密切耦合;客户进程和服务对象进行必须都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常。
  • 点对点通信:客户的一次调用只发送给一个单独的目标对象。

  面向消息的中间件较好地解决了上面的问题(Message Oriented Middleware,MOM)。发送者再发送消息给消息服务器消息服务器将消息存放到若干队列中,在合适的时候再将消息转发给接受者。这种模式下,发送和结束是异步的,发送者无需等待;二者的生命周期未必相同;发送消息的时候接受者不一定运行,接收消息的时候发送者也不一定运行;一对多通信:对一个消息可以有多个接受者。

  java消息服务(JMS)定义了java中访问消息中间件的接口。JMS只是接口,并没有实现。实现JMS的接口的消息中间件成文JMS Provider,已有的MOM系统包括Apache的ActiveMQ、以及阿里的RocketMQ、IBM的MQSeries、Microsoft的MSMQ和BEA的MessageQ、RabbitMQ等待,他们基本都遵循JMS规范。

二、JMS术语

JMS 实现JMS接口的消息中间件

Provider(MessageProvider):生产者

Consumer(MessageConsumer):消费者

PTP:Point to Point ,即点对点的消息模型;

pub/sub : publish/subscribe,发布/订阅的消息模型;

Queue:队列目标;

Topic:主题目标;

ConnectionFactory:连接工厂,JMS用它创建连接;

Connection:JMS客户端到JMS Provider的连接;

Destination:消息的目的地;

Session:会话,一个法师或者接收消息的线程;

 

Message 接口(消息):

  是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:

  • 消息头(必须要有):包含用于识别和为消息寻找路由的设置。
  • 一组消息属性:包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过路桥(消息选择器)。
  • 一个消息体:允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)

Session 接口(会话):

  表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个地接收。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文保存一组消息,知道事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话运行用户创建消息生产者来发送消息,创建消费者来接收消息。

 

三、ActiveMQ简介

  ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。

  ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中仍然扮演着重要角色,可以说ActiveMQ在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断地提升版本,80%以上的业务我们用ActiveMq都能满足,当然后续如天猫淘宝这种大型的电商网站,尤其是双十一这种特殊事件,ActiveMQ需要进行很复杂的优化源码以及架构设计才能完成,我们只会会学习一个更强大的分布式消息中间件,RocketMQ,可以说ActiveMQ是和谐,是基础,所以也必须要掌握好。

四、ActiveMQ的使用

4.1 环境搭建

  去官网下载:http://activemq.apache.org,解压放到一个地方,我放在D:\Programe_Files下,然后直接启动bin下的wrapper.exe即可启动监控台,在浏览器中访问localhost:8186,用户名admin/admin 即可访问。

技术分享

4.2 ActiveMQ Hello World

  首先写一个简单的Hello World实例,感受一下ActiveMQ,需要实现接受者和发送者两部分代码。

  1. 建立COnnectionFactory工厂对象,需要填入用户名密码以及要连接的地址,默认端口为“tcp://localhost:61616”
  2. 通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection的Start方法开启连接。
  3. 通过Connection对象创建Session会话,用于接收消息,参数配置1为是否启动事务,参数配置2为签收哦模式,一般我们设置自动签收。
  4. 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue;在Pub/Sub模式中,Destination被称作Topic(即主题)。可以使用多个Queue和Topic
  5. 需要通过Session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumer
  6. 使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)
  7. 使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。同理客户端使用receive方法接收数据,最后莫忘关闭Connection

发送:

 1 package com;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.DeliveryMode;
 6 import javax.jms.Destination;
 7 import javax.jms.JMSException;
 8 import javax.jms.MessageProducer;
 9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 
12 import org.apache.activemq.ActiveMQConnectionFactory;
13 
14 public class Sender {
15     
16     public static void main(String[]args) throws JMSException{
17         ConnectionFactory factory = new ActiveMQConnectionFactory(
18                 ActiveMQConnectionFactory.DEFAULT_USER,
19                 ActiveMQConnectionFactory.DEFAULT_PASSWORD,
20                 "tcp://localhost:61616"
21                 );
22         
23         Connection conn = factory.createConnection();
24         conn.start();
25         
26         Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
27         
28         Destination dest = session.createQueue("queue1");
29         
30         MessageProducer messProducer = session.createProducer(dest);
31         
32         messProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);;
33         
34         TextMessage mess = session.createTextMessage();
35         for(int i=0;i<5;i++){
36 
37             mess.setText("消息内容"+i);
38             
39             messProducer.send(mess);
40         }
41         
42         if(conn != null)conn.close();
43     }
44     
45 }

 

 技术分享

 

 

接收:

package com;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
    public static void main(String[]args) throws JMSException{
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616"
                );
        
        Connection conn = factory.createConnection();
        conn.start();
        
        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        
        Destination dest = session.createQueue("queue1");
        
        MessageConsumer messConsumer = session.createConsumer(dest);
        
        while(true){
            TextMessage mess = (TextMessage) messConsumer.receive();
            System.out.println(mess.getText());
        }
        
//        if(conn != null)conn.close();
    }
}

 

技术分享

 

4.3 启动安全认证

在acitvemq.xml中的 broker 下加上如下代码:

        <plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="sgm" password="sgm" groups="users,admins" />
                </users>
            </simpleAuthenticationPlugin>
        </plugins>

 

 

重启ActiveMQ,再运行4.2中的 sender 发生异常:

技术分享

可见在第24行尝试连接的时候就发生错误了。所以ActiveMQConnectionFactory.DEFAULT_USER要改成"sgm",密码也同样修改。receiver做同样修改。

 

4.4 Connection的使用

  在成功创建正确的ConnectionFactory后,下一步将是创建一个连接,他是JMS定义的一个接口。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户端只使用单一连接。根据JMS文档,Connection的目的是“利用JMS提供者封装开发的连接”。以及表示“客户端与服务例程之间的开放TCP/IP套接字”。该文档还指出Connection应该是进行客户端身份校验的地方。

  当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。一个Connection可以建立一个或者多个Session。

  当一个程序执行完成后,必须关闭Connection,否则ActiveMQ不能释放资源,关闭一个Connection同样也关闭了Session、MessageProducer和MessageConsumer。

Connection createConnection();

Connection createConnection(String userName,String password);

 

大多数开源框架不需要自己优化了,能使用好API其实就是最好的优化了。

 

4.5 Session的使用

  一旦从ConnectionFactory中获得一个Connection,必须从Connection中创建一个或者多个Session,才能进一步执行其他操作。Session是一个发送或者接收消息的线程,可以使用Session创建MessageProducer,MessageConsumer和Message。

  Session可以被事务化,也可以不用事务,通常,可以通过向Connection上的创建方法传一个布尔值进行设置。

  Session createSession(boolean transacted,int acknowledgeMode);

  其中 transacted为是否使用事务的标识,acknowledgeMode为签收方式。

  结束事务有两种方式:提交或者回滚,当一个事务提交,消息被处理。事务中有一个步骤失败,事务就回滚,这个事务中的已经执行的动作将被撤销。在发送消息最后也必须要使用session.commit()方法提交事务。

  签收模式有三种:

  1. session.AUTO_ACKNOWLEDGE当消费端从receive或者onMessage成功返回时,Session自动签收客户端的这条消息。
  2. Session.CLIENT_ACNOWLEDGE消费端通过调用消息(Message)的acknoledge方法签收消息(mess.acknowledge())。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有消费已消费消息的收条。
  3. Session.DUPS_OK_ACKNOWLEDGE此选项指示Session不必确保对传送消息的签收,它可能引起消息的重复,但是降低了Session的开销,所以只有消费端能容忍重复消息才能使用。

   工作中一般使用手动签收的方式

4.6 MessageProducer

 ---

 

 技术分享

 

4.7 MessageConsumer

 ---

 

技术分享

 

46.ActiveMQ开篇(Hello World、安全认证、Connection、Session、MessageProducer、MessageConsumer)