首页 > 代码库 > ActiveMQ伪分布式部署
ActiveMQ伪分布式部署
控制台 | 服务接口 | 集群通讯接口 | |
mq1 | 8161 | 51511 | 61601 |
mq2 | 8162 | 51512 | 61602 |
mq3 | 8163 | 51513 | 61603 |
服务接口没有使用默认的61611,是因为activemq默认还会使用61613,61614等端口;
3、修改activemq配置
a) 安装activemq,本文使用Activemq版本为5.14.0;(前提条件,电脑已安装java JDK,不然启动时会提示)
解压文件到任意目录,双击bin\win64中的activemq.bat(根据电脑操作系统位数),在浏览器里输入“http://127.0.0.1:8161/admin/”,默认账号密码都是“admin”.如图:
至此,activemq单机模式安装成功。
b) 修改配置文件activemq.xml,路径为conf/activemq.xml
1、broker(所有activemq的brokerName必须一致,才能加入同一个集群)
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
2、配置levelDB,在<persistenceAdapter>节点内,将原来的kahaDB注释掉,添加<replicatedLevelDB>节点以及内容,其中的61601可修改
bind:集群间通讯的ip和端口
zkAddress:ZooKeeper地址,多个可用,逗号分隔(下面有安装教程)
hostname:主机名,直接设置本机IP地址即可
zkPath:zkPath目录(自定义),可在ZooInspetor中进行查看
<persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kahadb"/> --> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://127.0.0.1:61601" zkAddress="127.0.0.1:2181" hostname="10.0.10.238" sync="local_disk" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>
3、配置服务接口,在<transportConnectors>节点内,仅修改红色方框标注的地方(51511)
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:51511?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
4、配置控制台端口,conf\jetty.xml文件中,在id="jettyPort"的<bean>节点内,仅修改红色方框标注的地方(8161)
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8161"/> </bean>
5、在E盘中新建文件夹activemq,将整个activemq的安装目录复制成三个,并命名mq1,mq2,mq3;并修改mq2和mq3中的配置文件(按如上方法),更改其服务接口为“51512”,“51513”和控制台端口“8162”,“8163”以及集群通信接
口“61602”,“61603”。
四、Zookeeper单机模式安装
1.下载zookeeper3.4.9,地址: ,并解压至刚新建的activemq文件夹中,前提是确保以及安装java JDK
2.双击bin目录下的zkServer.cmd启动
3.打开一个CMD窗口,输入命令“netstat -ano|findstr "2181" 查看zookeeper是否启动成功,如图端口2181已经打开,说明启动成功
4.点击打开的CMD窗口,Ctrl+C关闭,关闭后修改配置文件,在解压出的zookeeper-3.4.9文件夹中新建data文件夹,并在data文件夹中新建log和data文件夹,后面会用到。
5.Zookeeper 的配置文件在 conf 目录下,这个目录下有 zoo.cfg 和 log4j.properties, zoo.cfg的配置文件项如下:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=E:/activemq/zookeeper-3.4.9/data/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
dataLogDir=E:/activemq/zookeeper-3.4.9/data/log
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
各项含义:
tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)
初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端
连接失败。总的时间长度就是 5*2000=10 秒
syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒
dataDir:顾名思义就是 Zookeeper 保存数据的目录。
dataLogDir:默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里
clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
6.配置完成后,双机bin目录下的zkServer.cmd启动
7.打开一个CMD窗口,输入命令“netstat -ano|findstr "2181" 查看zookeeper是否启动成功
五、测试ActiveMQ伪分布式是否安装成功
1. 先启动Zookeeper
2. 分别启动三个activemq,
mq1:
mq2和mq3的cmd窗口会一直更新数据,尝试连接
浏览器中输入http://127.0.0.1:8161/admin/(此时master为mq1)
此时可以看到作为master的是端口为8161的activemq即mq1;当关闭mq1的命令窗口,此时mq2变为master,mq3变为slave
再次查看http://127.0.0.1:8161/admin/ ,可以看到服务已经关闭了;而此时,zookeeper已经将master切换到了mq2上了,可以查看mq2的地址http://127.0.0.1:8162/admin/
至此,伪分布式activemq的部署已经完成了。
六、Java测试代码(Spring整合ActiveMQ)
1. springJMSConfig.xml,在web.xml文件中调用
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd"> <!-- mq连接工厂 --> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="http://www.mamicode.com/failover:(tcp://127.0.0.1:51511,tcp://127.0.0.1:51512,tcp://127.0.0.1:51513)" /> <!-- 异步发送消息 --> <property name="useAsyncSend" value="http://www.mamicode.com/true" /> <!-- 队列消费者配置预取值 --> <property name="prefetchPolicy"> <bean class="org.apache.activemq.ActiveMQPrefetchPolicy"> <property name="queuePrefetch" value="http://www.mamicode.com/2" /> </bean> </property> <property name="userName" value="http://www.mamicode.com/admin" /> <property name="password" value="http://www.mamicode.com/admin" /> </bean> <!-- 连接池 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="amqConnectionFactory" /> </bean> <!-- 定义连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> </bean> <!-- 主题目的地--> <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="http://www.mamicode.com/testXBQ_Topic66" /> </bean> <!-- 配置JMS模板,Spring提供的JMS工具类,发送、接收消息--> <bean id="jmsTemplateNotice" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="pooledConnectionFactory" /> <property name="defaultDestination" ref="defaultDestination" /> <!-- true是订阅 --> <property name="pubSubDomain" value="http://www.mamicode.com/true" /> <!-- 加入事物回滚 --> <property name="sessionTransacted" value="http://www.mamicode.com/true" /> </bean> </beans>
java中调用发送,打开ActiveMQ控制台,查看消息数量即可。
package com.hyzn.fw.mqsend; import javax.annotation.Resource; import javax.jms.Destination; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; /** * @ClassName: TopicSender * @Description: TODO 发布主题MQ * @author xbq * @version 1.0 * @date 2017-2-13 下午4:16:22 */ @Component public class TopicSender { @Resource private JmsTemplate jmsTemplateNotice; public void sendMqMessage(Destination destination, String message){ if(null == destination){ // 为空采用 destination = this.jmsTemplateNotice.getDefaultDestination(); } this.jmsTemplateNotice.setDefaultDestination(destination); //发送mq消息 this.jmsTemplateNotice.convertAndSend(message); System.out.println("消息推送完毕!!"); } }
ActiveMQ伪分布式部署