首页 > 代码库 > RabbitMQ (消息队列)专题学习02 Hello World

RabbitMQ (消息队列)专题学习02 Hello World

一、概述

RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处。

RabbitMQ和邮局的主要区别就是RabbitMQ接收、存储和发送的是二进制数据----消息。

在专题学习一中我们已经简单提到了一些概念,在此我们更为深入的学习下RabbitMQ相关的专有名词。

1、生产(Producing):意思就是发送,发送消息的程序就是一个生产者(Producer),我们一般使用P来标示,如下图-1所示:

-1

2、队列(queue)就是邮箱的名称,消息通过你的应用程序和RabbitMQ服务器进行传递,它们能够存储在队列(queue)中,队列(queue)没有任何限制,你要存储多少消息都可以---queue基本上是一个无限的缓冲区,而且多个生产者(Producers)能够把消息发送给同一个队列,同样多个消费者(consumer)也能够从同一个队列(queue)中获取数据,如下图-2所示表示的就是一个队列:

-2

3、消费(consuming),它和获取消息是一个意思,一个消费者(sonsumer)就是一个等待获取消息的程序,我们通常用C来表示,如下图-3所示:

-3


二、实现Hello World

由于RabbitMQ支持多种语言,诸如Java、Python、Ruby、PHP、C#等等,实现了对每种语言的Client接口,RabbitMQ自己充当服务器,所以在专题一的架构中可以看出,不管发送消息的生产者和接收消息的消费者使用何种语言,都充当客户端的角色,故在此我们使用Java语言的客户端(消息发送者和消息接受者)。

在这部分学习中我们将实现两个Java程序:一个是发送单个消息的生产者,一个是接收消息并在控制台打印消息的消费者,我们将忽略一些JavaAPI的细节,把所有的精力都放在将要开始实现生产者和消费者上---它就是一个关于Hello World的消息队列的实现。

在下图-4中,P是消息的生产者,C是消息的消费者(接收者),中间部分就是队列(queue)---rabbitMQ用来保存消息的缓冲区域。

-4

大致过程:生产者(Producer)把消息发送到一个名为”hello“的队列中,消费者(Consumer)从这个队列中获取消息。

注意:在编程的时候要让客户端和服务器的AMQP的版本一致,不一致会报错,如果不一致主要去找对应版本的客户端即可,这样就不用再费劲去重装服务器端了。

2.1、发送消息

-5

下面实现调用消息发送者发送消息和消息接收者接收消息,发送者首先将建立与RabbitMQ的连接,发送一条消息,然后退出。

Send.java中,我们首先需要导入如下类文件:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
发送消息的过程如下:

1、创建连接(Connection)

2、创建通道(Channel),声明队列(当队列存在时就获取队列,不存在时就创建队列)。

3、发送消息

完成的发送者代码清单如下:

package com.xuz.send;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender01 {
	public static void main(String[] args) throws IOException {
		ConnectionFactory factory = new ConnectionFactory();
		//RabbitMQ-Server安装在本机,所以直接用127.0.0.1
		factory.setHost("127.0.0.1");
		//创建一个连接
		Connection conn = factory.newConnection();
		//创建一个通信通道
		Channel channel = conn.createChannel();
		//定义Queue名称
		String queueName = "hello";
		//为Channel定义queue的属性,queueName为queue名称
		channel.queueDeclare(queueName, false, false,false,null);
		String msg = "Hello World!xuzheng test!";
		//发送消息
		channel.basicPublish("", queueName, null, msg.getBytes());
		System.out.println("send message["+msg+"] to "+queueName+" success!");
		//关闭通道
		channel.close();
		//关闭连接
		conn.close();
	}
}

2.2、接收消息

-6

接收消息的过程如下:

1、创建连接

2、创建通道、声明队列(还得在声明一次,原因在后面解释)

3、创建QueueConsumer用于缓存消息

4、循环接收消息(无消息阻塞,有消息处理)。

注意:声明队列的机制是:当队列存在时就获取队列,不存在时就创建队列,接收者可能比发送者更早创建,这时它需要保证存在一个队列,它能够从中取出消息。

完整接收者的代码如下:

package com.xuz.recv;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Recv01 {
	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		String queueName = "hello";
		channel.queueDeclare(queueName, false, false, false, null);
		//以上部分和sender一样
		//配置好获取消息得方式
		QueueingConsumer consumer =  new QueueingConsumer(channel);
		channel.basicConsume(queueName, true,consumer);
		//循环获取消息
		while(true){
			//获取消息,如果没有消息,这一步将会一直阻塞
			Delivery delivery = consumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.out.println("received message["+msg+"] from "+queueName);
		}
	}
}	

说明:在上述代码中我们还是要连接RabbitMQ服务器,连接代码和发送端代码是一样的,创建通道也是一样的,我们需要确认队列是否存在,使用queue_declare创建一个队列,我们可以运行这个命令很多次,但是只有一个队列会被创建。

channel.queueDeclare(queueName, false, false, false, null);

你也许要问为什么重复声明了队列---我们已经在前面的代码中声明了它,如果我们确定了队列是否已经存在的,那么我们可以不这么做,比如先运行Sender01.java程序,可是我们并不确定哪个程序先运行,这种情况的话在程序中重复声明是好的做法。

factory.setHost("127.0.0.1");
表示我们的发送端和接收端都在安装RabbitMQ服务器的本机运行。

运行结果如下:

发送端:

接收端:

至此RabbitMQ入门程序HelloWorld消息队列实现完毕!

源码下载:

RabbitMQ之HelloWorld源码