首页 > 代码库 > RabbitMQ 概念与Java例子

RabbitMQ 概念与Java例子

RabbitMQ简介
目前RabbitMQ是AMQP 0-9-1(高级消息队列协议)的一个实现,使用Erlang语言编写,利用了Erlang的分布式特性。
概念介绍:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。


AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概如下:
1. 客户端连接到消息队列服务器,打开一个channel。
2. 客户端声明一个exchange,并设置相关属性。
3. 客户端声明一个queue,并设置相关属性。
4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
5. 客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
Exchange路由消息的集中类型:
名称
默认的预先定义exchange名字
作用描述
Direct exchange
(Empty string) and amq.direct
根据Binding指定的Routing Key,将符合Key的消息发送到Binding的Queue
Fanout exchange
amq.fanout
将同一个message发送到所有同该Exchange bingding的queue
Topic exchange
amq.topic
根据Binding指定的Routing Key,Exchange对key进行模式匹配后路由到相应的Queue,模式匹配时符号”#”匹配一个或多个词,符号”*”匹配正好一个词。
Headers exchange
amq.match (and amq.headers in RabbitMQ)
同direct exchange类似,不同之处是不再使用Routing Key路由,而是使用headers(message attributes)进行匹配路由到指定Queue。

参考:
http://www.choudan.net/2013/07/25/OpenStack-RabbitMQ(%E4%B8%80).html
http://stephansun.iteye.com/blog/1452853
http://www.diggerplus.org/archives/3110
http://backend.blog.163.com/blog/static/202294126201322563245975/
http://lynnkong.iteye.com/blog/1699684

特性:
1. broker的持久化:exchange和queue声明为durable时,exchange和queue的配置会在服务端磁盘保存起来,这样在服务停掉重启后,exchange和queue以及其相应的binding等配置不会丢失;
2. message的持久化:当message的deliver mode attribute(message properties)设置为2时,每个未被消费的message将被保存在磁盘中,在服务重启后仍能保存。
message在文件中的保存参考:http://my.oschina.net/hncscwc/blog/182083
3. cluster:RabbitMQ支持多个nodes(每个nodes是一个RabbitMQ实例)组成一个cluster,访问cluster中的任意一个node的效果是相同的,也就是说任何一个message都可以在任意一个nodes上生产和消费(生产或消费的message会在nodes间中转)。
4. mirrored-queue:RabbitMQ在cluster的基础上,支持同一个queue的message同时存储在多个nodes上,这样当部分节点失效时,可保证message和broker的配置不丢失。


安装与配置
1.安装erlang
sudo apt-get install tk tcl unixODBC erlang
sudo vim /etc/profile
添加export PATH=$PATH:/usr/lib/erlang/bin/

2.安装rabbitmq
sudo apt-get install rabbitmq-server

sudo vim /etc/profile 添加export PATH=$PATH:/usr/lib/rabbitmq/bin
source /etc/profile

rabbitmq的基本配置(端口等)参考:http://my.oschina.net/hncscwc/blog/302339

3.用户与权限
在正式应用之前,我们先在RabbitMQ里创建一个vhost,加一个用户,并设置该用户的权限。使用rabbitmqctl客户端工具,在根目录下创建”/mq_test”这个vhost:
rabbitmqctl add_vhost /mq_test
创建一个用户名”test”,设置密码”test123″:
rabbitmqctl add_user test test123
设置pyh用户对/pyhtest这个vhost拥有全部权限:
rabbitmqctl set_permissions -p /mq_test test “.*” “.*” “.*”
后面三个”*”代表pyh用户拥有对/pyhtest的配置、写、读全部权限

参考:http://my.oschina.net/hncscwc/blog/262246

4.配置开启web管理插件
cat <<EOF>> /etc/rabbitmq/enabled_plugins
[rabbitmq_management].
EOF
可以通过http://localhost:15672/ 查看运行情况

5.启动
使用root权限运行rabbitmq-server 或使用/etc/init.d/rabbitmq-server start|restart|stop

 

6.在一台机器上启动多个节点(模拟集群)
vim start_rabbitmq_cluster.sh

#!/bin/bash

if [[ $# != 1 ]]
then
echo "Usage: $0 process_num"
exit 1
fi

HOST_NAME=`hostname`
START_NUM=0
PROCESS_NUM=$1
END_NUM=$(( START_NUM + PROCESS_NUM - 1))

RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME="rabbit" RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" rabbitmq-server -detached

for (( i=$((START_NUM+1)); i<=$END_NUM; i++ ))
do
RABBITMQ_PROT=$(( i + 5672 ))
MANAGE_PORT=$(( i + 15672 ))
NODE_NAME="rabbit_$i"
echo $RABBITMQ_PROT
echo $MANAGE_PORT
echo $NODE_NAME
RABBITMQ_NODE_PORT=$RABBITMQ_PROT RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,$MANAGE_PORT}]" RABBITMQ_NODENAME=$NODE_NAME rabbitmq-server -detached
sleep 3

rabbitmqctl -n $NODE_NAME stop_app
rabbitmqctl -n $NODE_NAME reset
echo "join cluster"
rabbitmqctl -n $NODE_NAME join_cluster rabbit@$HOST_NAME
rabbitmqctl -n $NODE_NAME start_app
done

rabbitmqctl cluster_status -n rabbit


chmod a+x start_rabbitmq_cluster.sh
start_rabbitmq_cluster.sh 3
启动后可以通过rabbitmqctl -n rabbit cluster_status查看集群节点配置情况,或者在web管理页面中查看

7.配置network partion时的处理方式

cat<<EOF>> /usr/local/rabbitmq/rabbitmq_server-3.1.0/etc/rabbitmq/rabbitmq.conf
[
{rabbit, [{cluster_partition_handling, pause_minority}]}
].
EOF
参考:http://my.oschina.net/hncscwc/blog/174417


Java代码示例
Producer
Consumer

RabbitMQ 概念与Java例子