首页 > 代码库 > Rabbitmq.md

Rabbitmq.md

RabbitMQ介绍

什么是RabbitMQ

RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一exchange即可,剩下的消息分发工作由RabbitMQ完成。

单向解耦

“Producer”--
           |
           |----->"RabbitMQ Clusters" ---> “Consumer”
"Producer"--

双向解耦(如:RPC)

“Producer1”-->
           |
           |<----->"RabbitMQ Clusters" <---> “Consumer2&Producer2”
"Consumer1"<--

概念和特性

交换机(exchange)

1.接收消息,转发消息到绑定的队列。四种类型:direct, topic, headers and fanout

  • direct:转发消息到routigKey指定的队列
  • topic:按规则转发消息(最灵活)
  • headers:基于首部进行路由
  • fanout:转发消息到所有绑定队列

2.如果交换机上(Exchange)和(Queue)是多对多的关系。

3.topic类型交换器通过模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。、

支持表达式:
    *.1.geewu  #只要包含1.geewu就可以匹配相关信息。这个是topic,性能最慢
    hello   #这个是direct,性能最好。

4.因为交换器是在RabbitMQ是一个实际存在的实体,不能被改变。只能删除之后,重新创建。

5.交换器的属性:

  • 持久性:如果启用,交换器将会在server重启前都有效。(对应Duration属性,持久化)
  • 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。(创建时候设置,如果不设置不会自动删除)。
  • 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。(不会自动创建)

队列(queue)

1.队列是RabbitMQ内部对象,存储消息。相同属性的queue可以重复定义。

2.临时队列。channel.queueDeclare(),有时不需要指定队列的名字,并希望断开连接时删除队列。

队列的属性:

  • 持久性:如果启用,队列将会在server重启前都有效。
  • 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。
  • 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
  • 排他性:如果启用,队列只能被声明它的消费者使用。

消息传递

  • 消息在队列中保存,以轮询的方式将消息发送给监听消息队列的消费者,可以动态的增加消费者以提高消息的处理能力。
  • 为了实现负载均衡,可以在消费者端通知RabbitMQ,一个消息处理完之后才会接受下一个消息。 channel.basic_qos(prefetch_count=1) 注意:效率非常低,不能使用客户端缓存。

消息有14个属性,最常用的几种:

  • deliveryMode:持久化属性
  • contentType:编码
  • replyTo:指定一个回调队列
  • correlationId:消息id

在client代码中,send方法时候,可以设置mandatory和immediate。设置mandatory:发送到交换器并且还未投递到队列(没有绑定器存在)得到通知。设置immediate:没有消费者能够立即处理的时候得到通知。这些投递保障机制,保证了消息可靠性。

在client代码中,send方法时候persistent属性为true。数据就会被保存到队列中,但是必须Exchange,Queue,Client三者都设置为存储状态。

高可用性(HA)

1.消息ACK,通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。

channel.basicConsume(queuename, noAck=false, consumer);

2.消息和队列的持久化。定义队列时可以指定队列的持久化属性(问:持久化队列如何删除?) channel.queueDeclare(queuename, durable=true, false, false, null); 发送消息时可以指定消息持久化属性:这样,即使RabbitMQ服务器重启,也不会丢失队列和消息。

channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());

3.publisher confirms 提供批量确认消息的方法。

4.master/slave机制,配合Mirrored Queue。Mirrored Queue通过policy和rabbitmqctl设置可以实现。具体可以参考Rabbitmq官方文档。在Mirrored Queue下,无论Producer和Consumer连接那个RabbitMq服务器,都跟连接同一个RabbitMQ上,消费和生产数据会被同步。注意:Mirrored Queue会严重的消耗性能,性能会下降到原来的1/5。当一个slave重新加入mirrored-queue时,如果queue是durable的,则会被清空。
(通过命令行或管理插件可以查看哪个slave是同步的:

rabbitmqctl list_queues name slave_pids synchronised_slave_pids)

集群(cluster)

1.不支持跨网段,因为RabbitMQ底层是Erlang,会导致脑裂(Slave Node感觉Master Node死掉了,主Master Node觉得Slave2 Node死掉了,结果数据无法复制,系统逻辑出现问题)(如需支持,需要shovel或federation插件)
2.可以随意的动态增加或减少、启动或停止节点,允许节点故障。(但是数据同步会造成Queue服务暂停,所有的Producer和Consumer都被终止)
3.集群分为RAM节点和DISK节点,一个集群最好至少有一个DISK节点保存集群的状态。
4.集群的配置可以通过命令行,也可以通过配置文件,命令行优先。

安装与使用

安装服务

由于RabbitMQ主要依赖Erlang所以如果手动安装的话这个需要注意,在下面的的实验中是用CentOS 7系统的yum来进行安装。在安装前需要配置好epel源。

yum install -y rabbitmq-server

然而安装后需要注意一个问题就是需要做下主机名的解析,如果不做则会导致RabbitMQ无法启动,配置如下:

主机1:

echo 192.168.100.51 study-1 >> /etc/hosts

主机2:

echo 192.168.100.52 study-2 >> /etc/hosts

启动服务:

systemctl start rabbitmq-server.service

启动图形界面

# rabbitmq-plugins enable rabbitmq_management
# systemctl restart rabbitmq-server.service

通过上面的操作后在浏览器输入主机的IP:15672,例如:http://192.168.100.51:15672/默认用户为guest密码是guest。

用户管理

[root@study-1 ~]# rabbitmqctl add_user admin password
Creating user "admin" ...
...done.
[root@study-1 ~]# rabbitmqctl list_users
Listing users ...
admin	[]
guest	[administrator]
...done.
[root@study-1 ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
...done.

权限管理

[root@study-1 ~]# rabbitmqctl list_user_permissions admin
Listing permissions for user "admin" ...
...done.
[root@study-1 ~]# rabbitmqctl list_user_permissions guest
Listing permissions for user "guest" ...
/	.*	.*	.*
...done.
[root@study-1 ~]# rabbitmqctl list_permissions 
Listing permissions in vhost "/" ...
guest	.*	.*	.*
...done.
[root@study-1 ~]# rabbitmqctl set_permissions -p /study1 admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/study1" ...
...done.
[root@study-1 ~]# rabbitmqctl list_permissions -p /study1
Listing permissions in vhost "/study1" ...
admin	.*	.*	.*
...done.

虚拟主机管理

[root@study-1 ~]# rabbitmqctl add_vhost /study1
Creating vhost "/study1" ...
...done.
[root@study-1 ~]# rabbitmqctl add_vhost /study1/lesson1
Creating vhost "/study1/lesson1" ...
...done.
[root@study-1 ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
/study1
/study1/lesson1
...done.
[root@study-1 ~]# rabbitmqctl delete_vhost /study1/lesson1
Deleting vhost "/study1/lesson1" ...
...done.
[root@study-1 ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
/study1
...done.

其他

broker查看

]# rabbitmqctl status

环境变量查看

]# rabbitmqctl environment

设定内存的高水位标记:

]# rabbitmqctl set_vm_memory_high_watermark <fraction>

关闭指定的连接

]# rabbitmqctl close_connection <connectionpid> <explanation>

HA环境搭建

配置之前需要将study-1主机上的/var/lib/rabbitmq/.erlang.cookie文件复制到study-2上。

[root@study-2 ~]# rabbitmqctl stop_app
Stopping node ‘rabbit@study-2‘ ...
...done.
[root@study-2 ~]# rabbitmqctl join_cluster rabbit@study-1
Clustering node ‘rabbit@study-2‘ with ‘rabbit@study-1‘ ...
...done.
[root@study-2 ~]# rabbitmqctl cluster_status
Cluster status of node ‘rabbit@study-2‘ ...
[{nodes,[{disc,[‘rabbit@study-1‘,‘rabbit@study-2‘]}]}]
...done.
[root@study-2 ~]# rabbitmqctl start_app
Starting node ‘rabbit@study-2‘ ...
...done.
[root@study-2 ~]# rabbitmqctl cluster_status
Cluster status of node ‘rabbit@study-2‘ ...
[{nodes,[{disc,[‘rabbit@study-1‘,‘rabbit@study-2‘]}]},
 {running_nodes,[‘rabbit@study-1‘,‘rabbit@study-2‘]},
 {cluster_name,<<"rabbit@study-1">>},
 {partitions,[]}]
...done.

通过上面的步骤就已经将study-2加入到study-1中,然后我们还可以在前端使用HAProxy设置一个调度器,将请求调度至后端的RabbitMQ集群中。

Rabbitmq.md