首页 > 代码库 > hornetq 入门(1)

hornetq 入门(1)

Hornetq 版本2.4.0final  需要JDK7及以上

Hornetq官网

Hornetq2.1中文手册 

step1.启动服务端

  1.1准备配置文件(配置说明参考官网手册)

  hornetq-configuration.xml

  

<configuration xmlns="urn:hornetq"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
    <!--
        <broadcast-groups> <broadcast-group name="my-broadcast-group">
        <local-bind-address>192.168.0.215</local-bind-address>
        <local-bind-port>11212</local-bind-port>
        <group-address>255.255.255.0</group-address>
        <group-port>9876</group-port>
        <broadcast-period>2000</broadcast-period> </broadcast-group>
        </broadcast-groups>
    -->
    <name>HornetQ.main.config</name>
    
     <bindings-directory>F:/hornetq/data/messaging/bindings</bindings-directory>

     <large-messages-directory>F:/hornetq/data/messaging/largemessages</large-messages-directory>

     <paging-directory>F:/hornetq/data/messaging/paging</paging-directory>
    
    <!--离线消息固化到文件-->
    <journal-directory>F:/hornetq/journal</journal-directory>
    <journal-min-files>10</journal-min-files>
    <!-- 缓存大小 -->
    <id-cache-size>9000</id-cache-size>
    <jmx-management-enabled>true</jmx-management-enabled>   
    <!-- 消息计数器 -->
    <message-counter-enabled>true</message-counter-enabled>
    <!-- keep history for a week -->
    <message-counter-max-day-history>7</message-counter-max-day-history>            
    <!-- sample the queues every minute (60000ms) -->
    <message-counter-sample-period>60000</message-counter-sample-period>
    <persistence-enabled>true</persistence-enabled>    
    <!-- 通知将从JMS话题 "notificationsTopic"上接收 
    <management-notification-address>jms.queue.notificationsQueue</management-notification-address>
    --> 
   <!--不配置这个的话会有一个安全警告--> <cluster-user>HORNETQ.CLUSTER.ADMIN.USER</cluster-user> <cluster-password>test65525</cluster-password> <!-- Connectors --> <connectors> <connector name="connector-netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory </factory-class> <param key="use-nio" value="true" /> <param key="host" value="localhost"/> <param key="port" value="11212" /> </connector> <!-- SSL connector --> <connector name="netty-ssl-connector"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="localhost"/> <param key="port" value="5500"/> <param key="ssl-enabled" value="true"/> <param key="key-store-path" value="F:/ssl/keystore"/> <param key="key-store-password" value="test"/> </connector> </connectors> <!-- Acceptors --> <acceptors> <acceptor name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory </factory-class> <!-- --> <param key="use-nio" value="true" /> <param key="host" value="0.0.0.0,127.0.0.1,localhost"></param> <param key="port" value="11212" /> </acceptor> <!-- SSL connector --> <acceptor name="netty-ssl-acceptor"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> <param key="host" value="localhost"/> <param key="port" value="5500"/> <param key="ssl-enabled" value="true"/> <param key="key-store-path" value="F:/ssl/keystore"/> <param key="key-store-password" value="test"/> <param key="trust-store-path" value="F:/ssl/truststore"/> <param key="trust-store-password" value="test"/> </acceptor> </acceptors> <!-- Other config --> <address-settings> <address-setting match="jms.queue.#"> <redelivery-delay>5000</redelivery-delay> <!-- 没有导致远程queue查找不到 --> <expiry-address>jms.queue.expiryQueue</expiry-address> <!-- 没有导致远程queue查找不到 --> <last-value-queue>true</last-value-queue> <max-size-bytes>100000</max-size-bytes> <page-size-bytes>20000</page-size-bytes> <redistribution-delay>0</redistribution-delay> <address-full-policy>PAGE</address-full-policy> <!-- 死信地址--> <send-to-dla-on-no-route>true</send-to-dla-on-no-route> <dead-letter-address>jms.queue.deadLetterQueue</dead-letter-address> <max-delivery-attempts>3</max-delivery-attempts> </address-setting> </address-settings> <security-settings> <!--security for example queue--> <security-setting match="jms.queue.#"> <permission type="createDurableQueue" roles="guest" /> <permission type="deleteDurableQueue" roles="guest" /> <permission type="createNonDurableQueue" roles="guest" /> <permission type="deleteNonDurableQueue" roles="guest" /> <permission type="consume" roles="guest" /> <permission type="send" roles="guest" /> </security-setting> </security-settings> </configuration>

hornetq-jms.xml

<configuration xmlns="urn:hornetq"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
  
    <connection-factory name="ConnectionFactory">
        <connectors>
            <!--对应hornetq-configuration.xml 里面的connectors-->
            <connector-ref connector-name="connector-netty" />
        </connectors>
        <entries>
            <entry name="ConnectionFactory" />
            <entry name="/ConnectionFactory" />
            <entry name="XAConnectionFactory" />
            <entry name="/XAConnectionFactory" />
            <entry name="java:/ConnectionFactory"/>
             <entry name="java:/XAConnectionFactory"/>
        </entries>
        
        <retry-interval>1000</retry-interval>
        <retry-interval-multiplier>1.5</retry-interval-multiplier>
        <max-retry-interval>60000</max-retry-interval>
        <reconnect-attempts>1000</reconnect-attempts>
        <confirmation-window-size>1048576</confirmation-window-size>
    </connection-factory>
    
     <!--jms address-->
    <queue name="notificationsQueue">
        <entry name="/queue/notificationsQueue"></entry>
    </queue>
    <queue name="testQueue">
          <entry name="/queue/testQueue"/>
          <selector string="color=‘red‘"/>
          <durable>true</durable>
    </queue>
    <!-- the dead letter queue where dead messages will be sent-->
   <queue name="deadLetterQueue">
      <entry name="/queue/deadLetterQueue"/>
   </queue>
     
</configuration>

 1.2 启动hornetq服务

 

public static void startHornetqServer(){
        try {
            //config  hornetq-configuration.xml
            FileConfiguration config = new FileConfiguration();
            config.start();
            //HornetQServer
            HornetQServer server=HornetQServers.newHornetQServer(config);
            //JNPServer
            StandaloneNamingServer standalone=new StandaloneNamingServer(server);
            standalone.setBindAddress("0.0.0.0");
            standalone.setRmiBindAddress("0.0.0.0");
            standalone.start();
            //JMSServer hornetq-jms.xml
            jmsServer=new JMSServerManagerImpl(server);
            jmsServer.start();
            //start hornetq core server
            server.start();
            System.out.println(jmsServer.isStarted());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

 

step2.发送消息客户端

   

/**
     * @param args
     */
    public static void main(String[] args) {
        try {
            Properties prop = new Properties();
            prop.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
            prop.setProperty("java.naming.provider.url", "jnp://127.0.0.1:1099");
            prop.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
            prop.setProperty(Context.SECURITY_PRINCIPAL,"guest");
            prop.setProperty(Context.SECURITY_CREDENTIALS, "guest");
            
            Context ctx = new InitialContext(prop);
            System.out.println("+++++++1111ssssssss");
            //查找目标地址
            Destination destination = (Destination)ctx.lookup("/queue/notificationsQueue");
            System.out.println("+++++++2222"+destination);
            
            //根据上下文查找一个连接工厂 QueueConnectionFactory 。
            //该连接工厂是由JMS提供的,不需我们自己创建,每个厂商都为它绑定了一个全局JNDI,我们通过它的全局JNDI便可获取它;
            //ConnectionFactory 对应hornetq-jms.xml里面的 connection-factory name="ConnectionFactory"
            ConnectionFactory factory = (ConnectionFactory)ctx.lookup("ConnectionFactory");
            System.out.println("+++++++3333"+factory);
            //从连接工厂得到一个连接 create QueueConnection
            Connection    conn = factory.createConnection();
            System.out.println("+++++++4444"+conn);
            conn.start();
            
            //通过连接来建立一个会话(Session); 
            javax.jms.Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
            
            //根据会话以及目标地址来建立消息生产者MessageProducer (QueueSender和TopicPublisher都扩展自MessageProducer接口)
            MessageProducer producer = session.createProducer(destination);
            TextMessage msg = session.createTextMessage("ffffffffffffffffffffffffffffffffffffffffffffff小心呈现出");
            BytesMessage byteMessage=session.createBytesMessage();
            byteMessage.writeBytes("testddddddddd".getBytes("utf-8"));
            producer.send(msg);
            producer.send(byteMessage);
            System.out.println("send over !!!!!");
            session.close();
            conn.close();
            System.out.println("send down===");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

 

step3.接受消息客户端

public MessageReceive(String ...destinationJNDI){
            
            QueueConnectionFactory factory=(QueueConnectionFactory)getJNDIRemoteObj("ConnectionFactory");
            try {
                if(factory==null)
                    return;
                connection = factory.createConnection();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                connection.start();
                for (int i = 0; i < destinationJNDI.length; i++) {
                    destination = (Queue) getJNDIRemoteObj(destinationJNDI[i]);
                    if(destination==null)
                        continue;
                    producer = session.createConsumer(destination);
                    //接受消息
                    producer.setMessageListener(new ReceiveMessage());
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }    
        }
public static Object getJNDIRemoteObj(String jndiName) {
        try {
            Properties prop = new Properties();
            prop.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
            prop.setProperty("java.naming.provider.url", "jnp://127.0.0.1:1099");
            prop.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
            prop.setProperty(Context.SECURITY_PRINCIPAL,"guest");
            prop.setProperty(Context.SECURITY_CREDENTIALS, "guest");
            Context context = new InitialContext(prop);
            return context.lookup(jndiName);
        } catch (NamingException e) {
            e.printStackTrace();
        }
        return null;
    }
public class ReceiveMessage implements MessageListener {

    @SuppressWarnings("deprecation")
    @Override
    public void onMessage(Message message) {
         System.out.println("Received notification:"+new Date().toLocaleString());
         try
         {
//            Enumeration propertyNames = message.getPropertyNames();
//            while (propertyNames.hasMoreElements())
//            {
//               String propertyName = (String)propertyNames.nextElement();
//               System.err.format("  %s: %s\n", propertyName, message.getObjectProperty(propertyName));
//            }
             HornetQDestination des=(HornetQDestination) message.getJMSDestination();
            if(message instanceof TextMessage){
                TextMessage mesg=(TextMessage)message;
                System.out.println(des.getAddress()+"==received:"+mesg.getText());
            }else  if(message instanceof BytesMessage){
                BytesMessage mesg=(BytesMessage)message;
                ByteArrayOutputStream out=new ByteArrayOutputStream(((Long)mesg.getBodyLength()).intValue());
                try {
                    byte[] r=new byte[2048];
                    int i=0;
                    while((i=mesg.readBytes(r))!=-1)
                        out.write(r,0,i);
                    System.out.println(des.getClass()+"==received:"+new String(out.toByteArray(),"utf-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                try {
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(message instanceof HornetQObjectMessage){
                HornetQObjectMessage object=(HornetQObjectMessage)message;
                Object msgObj=object.getObject();
                if(msgObj instanceof ErrorMessageBO){
                    ErrorMessageBO messageBO=(ErrorMessageBO)msgObj;
                    String msg=messageBO.getMessageContent();
                    System.err.println("error:==>"+msg);
                }
            }
            
         }
         catch (JMSException e)
         {
             e.printStackTrace();
         }
         System.out.println("----------end--------------");

    }