首页 > 代码库 > RabbitMQ指南(Java)

RabbitMQ指南(Java)

一、Hello World

1、基本概念介绍

RabbitMQ是一个消息代理(或者说消息队列),它的主要意图很明显,就是接收和转发消息。你可以把它想象成一个邮局:当你把一封邮件放入邮箱,邮递员会帮你把邮件送到收件人的手上。在这里,RabbitMQ就好比一个邮箱、邮局或者邮递员。

RabbitMQ和邮局的主要区别在于,RabbitMQ不是处理邮件,而是接收、存储和将消息以二进制的方式转发出去。

在这里,我们先说明一些RabbitMQ中涉及到的术语。

  • 生产者(Producer)。生产表示只负责发送的意思,一个只负责发送消息的程序称为一个生产者,我们通过一个P来表示一个生产者,如下图:
    技术分享

  • 队列(Queue)。队列就好比一个邮箱,它在RabbitMQ的内部。虽然消息在RabbitMQ和程序之间传递,但是它们是存储在队列中的。一个队列没有大小的限制,你想存储多少条消息就存储多少条,它的本质是一个无限大的缓冲区。任何生产者都可以往一个队列里发送消息,同样的,任何消费者也可以从一个队列那接收到消息。我们用下图来表示一个队列,队列上面的文字表示这个队列的名字:
    技术分享

  • 消费者(Consumer)。接收和发送的过程很类似,一个消费者程序通常是等待别人发送消息给它。我们通过一个C来表示一个消费者,如下图:
    技术分享

注意一点,消费者、生产者和消息队列可以不用运行在同一台机器上。实际上,在大多数的应用程序中,它们并不是在同一台机器上运行的。

2、”Hello World”

在这一小节中,我们将编写两个Java程序,一个作为生产者,发送一条简单的消息;另一个作为消费者,接收并打印出接收到的消息。在这里,我们先不讨论Java API的具体细节,而是先编写出一个可运行的“Hello World”程序。

在下图中,“P”表示一个生产者,”C”表示一个消费者,中间的矩形表示一个队列。

技术分享

RabbitMQ会涉及许多协议,其中AMQP协议是一个开放式的、通用的消息协议。RabbitMQ支持很多编程语言,我们这里通过RabbitMQ提供的Java客户端来进行演示。请自行下载、安装RabbitMQ和相关jar包。

(1)发送

技术分享

在下面的代码中,我们用Send来表示一个发送端,用Recv来表示一个接收端。发送端会先连接RabbitMQ,并发送一个简单的消息后关闭。

对于Send而言,我们需要导入一些相关的类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel;

首先,先建立一个类,并为队列起一个名:

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) 
        throws Exception {
        ...
    }
}

然后我们创建一个connection:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

connection是一个抽象的sockect连接,在连接时,我们需要注意一下登录认证信息。在这里我们连接到本地机器,所以填写上“localhost”。如果我们想要连接到其他机器上,只需要填写上其他机器的IP地址即可。

接下来,我们创建一个channel,大多数的API都可以通过这个channel来获取。

如果要发送消息,我们必须先声明一个队列,然后我们才可以把消息发送到这个队列里去:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");

声明一个队列后,仅当队列不存在时,队列才会被创建。另外,消息的内容是一个字节数组,所以我们需要先对消息进行编码。最后,我们需要关闭连接。

channel.close();
connection.close();

以上就是Send.java类的所有代码

发送端运行不起来

如果你是第一次使用RabbitMQ,而且没有看到打印的“Sent”信息,你可能会奇怪是哪里出问题了。这里有可能是RabbitMQ在启动时没有足够的磁盘空间(默认情况下最少需要1Gb的空间),所以它会拒绝接收任何消息。通过查看日志文件可以确定是否是由这个原因所造成的。如果有必要,我们也可以通过修改配置文件中的disk_free_limit来降低大小的限制。

(2)接收

上面的代码是发送端的。我们的接收端是从RabbitMQ那获取消息,所以并不是像发送端那样发送一个简单的消息,而是需要一直监听获取消息,并打印输出。

技术分享

对于接收端而言,它需要导入如下的类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

DefaultConsumer是一个实现了Consumer接口的类,我们可以使用它来获取发送过来的消息。

跟Send一样,我们先创建一个connection和channel,并声明一个接收消息的队列。注意一点,这里的队列名要和Send的相匹配。

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv)
        throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        ...
    }
}

在这里我们同样声明了一个队列,因为我们可能在启动发送端之前就先启动了接收端,在我们开始接收消息之前,我们要先确认队列是否存在。

我们告诉RabbitMQ通过这个队列给我们发送消息,因此RabbitMQ会异步的给我们推送消息,我们需要提供一个回调对象用来缓存消息,直到我们准备使用它。这就是DefaultConsumer所做的事情。

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received ‘" + message + "‘");
    }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

以上就是Recv.java类的所有代码

3、代码整合

编译并运行以上程序,你会看到接收端将会收到并打印出来自发送端的“Hello World!”消息。

你可以在类路径下编译这些文件:

$ javac -cp rabbitmq-client.jar Send.java Recv.java

为了运行它们,你需要rabbitma-client.jar和它的依赖文件。在一个终端运行发送者:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

然后运行接收者:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

在windows环境中,我们使用分号代替冒号来分隔classpath上的选项。

Hint

你可以设置一个环境变量到classpath中:

 $ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
 $ java -cp $CP Send
在windows上:
 > set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
 > java -cp %CP% Send

4、个人补充

(1)登录认证

在实际使用中,RabbitMQ并不是简单的通过指定一个IP就可以进行连接的,它还需要指定端口号、用户名和密码。就像是这样:

factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");

当没有明确指明登录认证信息的时候,就会使用默认值来进行登录,以上都是默认的认证信息。

另外,我们也可以通过设置URI来进行连接,URI的格式如下:

amqp://username:password@host:port

factory.setUri("amqp://guest:guest@localhost:5672");

二、工作队列

技术分享
在第一部分中,我们的程序结构非常简单。在接下来,我们将会创建一个工作队列,向多个消费者分发任务。

1、准备

在之前的程序中,我们发送了一个包含“Hello World!”的消息,现在我们发送一些字符串用来表示复杂的任务。在这里,因为我们没有真正意义上的任务,比如调整图片的大小或者渲染pdf文件,所以我们通过Thread.sleep()函数来模拟程序的处理时间。我们用字符串中的句号来代表它的复杂度,每一个句号表示要花费一秒的工作时间。例如,一个“Hello…”的字符串就表示它需要3秒钟的处理时间。

我们会稍微修改我们之前程序中的Send.java代码,使其允许发送包含任意内容的消息。这个程序将会定时向队列中发送任务,我们把它命名为NewTask.java:

String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");

一些帮助我们从命令行的参数中获取消息的函数:

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

我们的Recv.java程序也需要作出一些修改:它需要从队列中获取消息,并统计消息中有多少个“.”,然后sleep相应的时间。我们把它取名为Worker.java:

final Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received ‘" + message + "‘");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
        }
    }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == ‘.‘) Thread.sleep(1000);
    }
}

编译以上程序:

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

2、轮询分发

使用任务队列的优势之一是任务的并行处理。如果现在积压了一大堆任务,我们仅需要添加更多的消费者即可,这是很容易扩展的。

首先,我们尝试同时运行两个消费者实例,他们都会从队列里去获取消息,如下。

你需要打开三个控制台,其中两个用来运行消费者程序,分别称为C1和C2

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C

第三个控制台用来运行生产者程序。一旦你开启了消费者程序,就可以启动生产者了:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

让我们看看消费者得到了什么消息:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received ‘First message.‘
 [x] Received ‘Third message...‘
 [x] Received ‘Fifth message.....‘
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received ‘Second message..‘
 [x] Received ‘Fourth message....‘

默认情况下,RabbitMQ会一直把消息发送给下一个消费者,平均情况下,每个消费者获得的消息是一样多的。这种分发方式叫做轮询,你可以尝试运行更多的消费者。

3、消息确认

处理一个任务可能需要花费数秒时间,你可能会好奇如果一个消费者执行了一个长任务,并且在完成处理前就挂了的情况下会发送什么事。就拿我们当前的代码来说,一旦RabbitMQ将消息传递给消费者,消息就会从内存中删除。在这种情况下,如果你强行关闭正在运行的消费者,那么它正在处理的消息就会丢失。那些发送给这个消费者但还没有开始处理的消息也会一并丢失。

但是,我们并不想丢失任何消息。实际上,如果一个消费者挂了,我们更希望将消息传递给其他的消费者消费。

为了保证消息不会丢失,RabbitMQ支持消息确认(acknowledgments)机制。Ack是由消费者发送的,用来告诉RabbitMQ这个消息已经接收到并处理完成,可以从内存中删除它了。

如果一个消费者没有发送ack就挂了,RabbitMQ会认为这个消息没有处理完成并将消息重新入队。如果这时有其他消费者在运行,它将会把这个消息发送给另一个消费者。通过这种方式,即使消费者挂了,也可以确保消息不会丢失。

在这里不存在超时的概念,只有在消费者挂了的情况下,RabbitMQ才会重发消息,否则就会一直等待消息的处理,即使需要花费很长的时间来处理。

消息确认机制默认情况下是开启的。现在我们要关闭它,改为手动提交确认信号。当处理完一个任务后,我们将手动提交。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received ‘" + message + "‘");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

通过这段代码,我们可以保证即使你通过Ctrl+C来关闭一个消费者,也不会丢失任何消息。那些没有发送确认信号的消息将会很快被重发。

忘记发送确认信号

忘记写basicAck是一个很普遍的错误,但是这会产生严重的后果。当你的客户端退出后,所有的消息将会被重新发送,RabbitMQ会越来越占内存,因为它不会删除那些没有发送确认信号的消息。

想要调试这种类型的错误,你可以使用rabbitmqctl打印出messages_unacknowledged属性:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

4、消息持久化

我们已经学习了如何保证在消费者挂了的情况下,消息也不会丢失。但是如果RabbitMQ挂了,我们的消息仍然会丢失。

当RabbitMQ退出或崩溃时,它将会丢失所有队列和消息,除非你让它不要这么做。通过两个方面可以保证消息不会丢失:对消息和队列进行持久化处理。

首先,我们需要保证RabbitMQ不会丢失我们的队列。为此,我们需要将一个队列声明为一个持久化的队列:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然这个代码是正确的,但是它却不会执行,因为我们已经定义了一个非持久化的hello队列。RabbitMQ不允许使用不同的参数去重新定义一个已经存在的队列,如果你强行这样做,它将会返回一个错误。有一个快速的解决方案,就是重新声明一个不同名字的队列,比如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

需要在生产者和消费者的代码中对queuqDeclare进行修改。

至此,我们可以保证即使RabbitMQ重启,task_queue队列也不会丢失。现在,我们需要对消息进行持久化,通过设置MessageProperties(实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN即可。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

注意:

消息的持久化并不能完全保证消息不会丢失,虽然它会告诉RabbitMQ把消息保存在硬盘上,但是从接收消息到保存消息之间,还是需要一定的时间的。同样,RabbitMQ没有对每个消息做fsync(2)——消息仅仅存在于缓存中,并没有真正的写入硬盘。所以这个持久化并不是健壮的,但是对于简单的工作队列来说已经完全足够了。如果你需要更强大的持久化的话,你可以考虑使用publisher confirms机制。

5、公平分发

你可能会注意到,分发的过程并不是我们所希望的那样。例如在某一情况下有两个消费者,RabbitMQ默认将序号为奇数的消息发送给第一个消费者,序号为偶数的消息发送给另一个消费者,当序号为奇数的消息都是一些很耗时的任务,而序号为偶数的消息都是一些小任务,那么就会造成第一个消费者一直处于忙碌状态,而另一个消费者处理完毕后会处于空等待的状态。RabbitMQ并不会在意这些事情,它只会一直均匀的分发消息。

这种情况的发生是因为RabbitMQ只负责分发消息,它并不关心一个消费者返回了多少个确认信号(即处理了多少条消息),它只是盲目的将第n条消息往第n个消费者发送罢了。

技术分享

为了解决这个问题,我们可以使用basicQos方法,设置prefetchCount = 1。它将会告诉RabbitMQ不要同时给一个消费者超过一个任务,换句话说,就是在一个消费者发送确认信息之前,不要再给它发送新消息了。取而代之的是将消息发送给下一个空闲的消费者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列的大小

如果所有的消费者都处于繁忙中,你的队列很快就会被占满。你需要注意这件事,并且添加更多的消费者或者通过其他策略来解决他。

6、代码整合

最终的NewTask.java如下:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) 
                      throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = getMessage(argv);

        channel.basicPublish( "", TASK_QUEUE_NAME, 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
        System.out.println(" [x] Sent ‘" + message + "‘");

        channel.close();
        connection.close();
    }      
    //...
}

Worker.java的代码:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received ‘" + message + "‘");   
            doWork(message); 
            System.out.println(" [x] Done" );

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
    //...
}

三、发布和订阅

在之前的示例中,我们创建了一个工作队列,在这之前,我们都假设每一个消息都准确的发送到一个消费者那里。在接下来,我们将做一些完全不同的事情——将一个消息发送到多个消费者,这种模式被称为发布和订阅模式。

为了说明这个模式,我们将会创建一个简单的日志系统,它由两部分程序组成,一个是发送日志消息,另一个是接收并打印日志消息。

在我们的日志系统中,每一个运行的接收程序都会获取一个消息的拷贝副本。通过这种方式,我们可以让一个消费者把日志记录到硬盘中,同时可以让另一个消费者把日志输出到屏幕上。

在本质上,发送日志消息相当于广播到所有接收者。

1、交换机

在之前,我们都是直接从一个队列中发送或获取消息。现在是时候介绍RabbitMQ中的full messaging模型了。

让我们快速复习下在先前部分中我们所学的知识:

  • 一个发送消息的生产者是一个用户程序。
  • 一个存储消息的队列是一个缓冲区。
  • 一个接收消息的消费者是一个用户程序。

在RabbitMQ的消息模型中,核心思想是生产者不直接将消息发送给队列。实际上,生产者甚至完全不知道消息会被发送到哪个队列中。

相反,生产者只能将消息发送到交换机上。交换机是一个非常简单的东西,它一边从生产者那里接收消息,一边向队列推送消息。交换机必须确切的知道它想要把消息发送给哪些接收者。例如是否发送到一个特定的队列中?还是发送给很多个队列?或者是把消息丢弃等等。这些东西都通过交换机的类型来规定。

技术分享

交换机的类型包括:direct, topic, headers和fanout。我们先关注fanout。让我们先创建一个这种类型的交换机,并称呼它为logs:

channel.exchangeDeclare("logs", "fanout");

fanout类型的交换机非常简单,通过它的名字,你可能已经猜出它的用处了。它将会以广播的方式把接收到的消息发送到它所知道的队列中去,这个正是我们所需要的。

交换机列表

通过使用rabbitmqctl命令,我们可以列出服务器中的所有交换机:

$
sudo rabbitmqctl list_exchanges Listing exchanges ...
        direct amq.direct      direct amq.fanout      fanout amq.headers     headers amq.match       headers amq.rabbitmq.log      
topic amq.rabbitmq.trace      topic amq.topic       topic logs   
fanout ...done.

在这个列表里,有一些以amq.*开头的交换机和默认(没有名字)的交换机。这些都是默认创建的,而且你不太可能会使用到它们。

匿名交换机

在之前的部分中,我们对交换机毫无概念,但仍然能将消息发送到队列中,那是因为我们使用了默认的交换机,通过使用空串(”“)来标识它。

回想一下之前是如何发送消息的:

channel.basicPublish("", "hello", null, message.getBytes());

这里的第一个参数就是交换机的名字。空串表示它是默认交换机或者是匿名交换机:如果routingKey存在的话,消息将通过routingKey路由到特定的队列中去。

现在,我们定义自己命名的交换机:

channel.basicPublish( "logs", "", null, message.getBytes());

2、临时队列

你可能会想起我们之前使用的队列是有特定的名称的(hello和task_queue)。对于我们来说,为一个队列命名是非常有必要的,我们需要指定多个消费者到同一个队列获取消息。当你想在多个生产者和消费者之间共用一个队列,那么为队列命名就非常重要了。

但是对于我们目前来说还不需要。我们想要监听所有日志消息,而不是它的一个子集。同样,我们只会对最新的消息感兴趣,而不是旧消息。为了解决这个问题,我们需要做两件事。

第一,无论什么时候连接RabbitMQ,我们都需要一个新的空队列。为了这样做,我们会创建一个随机的名字,或者直接让服务器给我们一个随机的名字。

第二,一旦我们与消费者断开,队列应该被自动删除。

在Java中,当我们使用无参的queueDeclare(),它将会为我们创建一个非持久化的、专用的、自动删除、带随机名字的队列:

String queueName = channel.queueDeclare().getQueue();

在这里,队列名queueName的值是一个随机产生的字符串,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg。

3、绑定

技术分享

我们已经创建一个fanout类型的交换机和一个队列。现在我们需要告诉交换机发送消息到我们的队列里。在这里,把交换机和队列之间的关系称为绑定。

channel.queueBind(queueName, "logs", "");

从现在开始,logs交换机将会把消息发送到我们的队列中去。

绑定列表

你可以通过使用rabbitmqctl list_bindings来列出所有绑定。

4、代码整合

技术分享

生产者程序负责发送日志消息,看起来跟以前的代码没有什么区别。最重要的改变是,现在我们把消息发送到我们的logs交换机中,而不是匿名交换机。在发送时我们需要指定一个routingKey,但是在使用fanout类型的交换机时,routingKey的值会被忽略。这是我们的EmitLog.java程序:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent ‘" + message + "‘");

        channel.close();
        connection.close();
    }
    //...
}

正如你所看到的,在建立一个connection之后,我们声明了一个交换机。这个步骤是必须的,因为RabbitMQ禁止把消息发送到一个不存在的交换机。

如果交换机上没有绑定任何队列的话,消息将会被丢弃。但是这个对我们来说是可以接受的,如果没有消费者监听,我们可以安全的丢弃消息。

ReceiveLogs.java的代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received ‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

像之前那样编译程序,我已经编译完成了。

$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java

如果你想要把日志保存到文件中,你只需要打开控制台并输入:

$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

如果你希望在屏幕中看到日志,新建一个终端并运行:

$ java -cp .:rabbitmq-client.jar ReceiveLogs

当然,为了发送日志,输入:

$ java -cp .:rabbitmq-client.jar EmitLog

使用rabbitmqctl list_bindings,你可以验证这代码确实创建和绑定了我们想要的队列。随着两个ReceiveLogs.java程序的运行,你可以看到如下的信息:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这个结果简单明了:数据从logs交换机发送到服务器安排的两个队列中去,那正是我们所期望的。

四、路由

在上一节,我们建立了一个简单的日志系统,我们可以将日志信息广播到多个接收者那里去。

接下来,我们将要实现只订阅部分消息。例如,我们只将error类型的日志信息保存到硬盘中去,当然依旧在控制台中打印出所有的日志信息。

1、绑定

在之前的例子中,我们已经建立了绑定,你可以回顾一下代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一个交换机跟一个队列之间的绑定,可以理解为:就是这个队列只对这个交换机发送过来的消息感兴趣。

绑定还有另外的一个routingKey参数,为了避免跟basic_publish的参数搞混,我们把它称为binding key。以下是我们通过一个key创建的一个绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

这个binding key的意义取决于交换机的类型,我们之前使用的fanout类型的交换机是会忽略它的值的。

2、direct交换类型

我们之前的日志系统是将所有消息广播到所有消费者那里去。我们想要扩展它,让它能根据消息的严重性来进行过滤。例如,我们只把error类型的错误日志记录到硬盘中去,而不是将硬盘空间浪费在warning或info类型的日志上。

我们使用的fanout类型的交换机没有太多的灵活性,它只能无脑的进行广播。

所以我们将会使用direct类型的交换机来替换它。direct类型交换机的路由规则很简单——它只将消息发送到那些binding key和routing key完全匹配的队列中去。

为了说明清楚,我们看一下下面的结构图:

技术分享

在这里,我们可以看到direct类型的交换机X绑定了两个队列。第一个队列通过一个orange关键字来绑定,第二个队列绑定了两个关键字,分别为black和green。

通过这样的绑定,当交换器接收到routing key为orange的消息的时候,就会发送到队列Q1中去;当接收到 routing key为black或green的消息的时候,就会发送到队列Q2中去;其他类型的消息则会被丢弃。

3、多重绑定

技术分享

用一个binding key绑定多个队列是合法的,在上面的例子中,我们还可以使用black将X和Q1绑定起来。在这里,direct类型的交换机就跟fanout类型的交换机一样,会把消息发送到所有的匹配的队列中去。一个routing key为black的消息将会被发送到Q1和Q2中去。

4、发送日志

我们将会在我们的日志系统中使用这种模型,使用direct类型的交换机来替换fanout类型的交换机。我们可以把routing key作为日志消息的严重级别,通过这种方式,接收程序就可以选择对应的级别进行接收。总之,让我们先看一下如何发送日志:

首先,我们需要创建一个交换机:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然后我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

在这里为了简化,我们先约定日志的级别分为info、warning和error三种。

5、订阅

接收消息的部分跟之前的程序没有什么不同。唯一的区别就是我们要为队列创建它感兴趣的binding key。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

6、代码整合

技术分享

EmitLogDirect.java的代码:

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘");

        channel.close();
        connection.close();
    }
    //..
}

ReceiveLogsDirect.java的代码

import com.rabbitmq.client.*;
import java.io.IOException;

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

像之前那样编译它们。为了方便,当我们运行实例时,我们使用一个$CP的环境变量来表示类的路径。

如果你只想保存warning和error类型的日志到文件里去,你可以打开控制台,并输入:

$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想要在屏幕中看到所有的日志信息,你可以打开一个新的终端并做如下操作:

$ java -cp $CP ReceiveLogsDirect info warning error
 [*] Waiting for logs. To exit press CTRL+C

另外,例如想要发送error类型的日志信息,可以输入:

$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
 [x] Sent ‘error‘:‘Run. Run. Or it will explode.‘

五、topic类型

在上一部分中,我们改进了我们的日志系统,使用direct类型的交换机代替fanout类型的交换机,得到了一个具有选择性的日志接收系统。

虽然使用了direct类型的交换机改进了我们的系统,但是它依然不完善,它不能根据多个条件进行路由。

在我们的日志系统中,我们可能不仅想要得到各种级别的日志,还想要得到日志的发送源。你可能从syslog unix tool了解过这个概念,它基于severity(info/warn/crit…) 和facility (auth/cron/kern…)来路由日志信息。

那将会给我们带来更多的灵活性,因为我们可能只需要监听那些来自cron的critical日志,而不是所有的kern日志。

为了在我们的日志系统中实现此功能,我们需要使用更加复杂的类型——topic类型的交换机

1、topic类型的交换机

发送到topic类型的交换机的消息不能是随意的routing key,它必须是一个通过点分隔的字符串列表。字符串的内容什么都可以,但是我们一般会给它一个有意义的名字。例如一些合法的routing key就像这样:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。Routing key中你想要写多少个单词都可以,它的长度上线是255个字节。

Binding key也是一样。Topic类型的交换机的逻辑跟direct类型的非常相似,它也是把消息发送到与routing key匹配的队列中去。然而,它的binding key却有两种非常特殊的用法:

  • *(星号)能代替任何一个单词。
  • #(井号)能代替0个或任意多个单词。

下面这个例子很容易理解:

技术分享

在这个例子中,我们发送的消息都是用来描述动物的。每个消息的routing key都包含三个单词,第一个单词用来描述动物的速度,第二个用来表示颜色,第三个用来表示物种:”..”。

我们为交换机和队列创建三个绑定关系,队列Q1通过”*.orange.*”来绑定,队列Q2通过 “*.*.rabbit”和”lazy.#”来绑定。

这些绑定关系可以概括为:

  • Q1只对橙色的动物感兴趣。
  • Q2想要了解兔子和那些行动缓慢的动物。

一个routing key为”quick.orange.rabbit”的消息将会被发送到所有队列。带有”lazy.orange.elephant”的消息也会被发往所有队列中去。而对于”quick.orange.fox”来说,它只会被发送到Q1队列中,”lazy.brown.fox”只会被发送到Q2队列中。而”lazy.pink.rabbit”只会被发送到Q2队列中一次,尽管它匹配了Q2队列的两个routing key。对于”quick.brown.fox”来说,没有routing key和它匹配,所以它会被丢弃。

如果我们不遵守之前的约定,发送一条只带一个单词或四个单词的消息,例如”orange”或”quick.orange.male.rabbit”,会发生什么事呢?好吧,这些消息都会因为没有找到匹配routing key而被丢弃。

另一方面,对于”lazy.orange.male.rabbit”来说,虽然它包含四个单词,但是它却和最后一个routing key(”lazy.#”)匹配,所以它会被发送到Q2队列中去。

Topic类型的交换机

Topic类型的交换机的功能十分强大,可以媲美其他类型的交换机。

当一个队列的binding key为一个“#”,它将接收所有消息,就像fanout类型的交换机一样。

当binding key中没有包含“*”或“#”字符的话,topic类型的交换机就相当于direct类型的交换机。

2、代码整合

我们将在我们的日志系统中使用topic类型的交换机。我们假设日志消息的routing key通过两个单词组成,就像”.”。

代码几乎跟先前的一样。

EmitLogTopic.java的代码如下:

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent ‘" + routingKey + "‘:‘" + message + "‘");

        connection.close();
    }
    //...
}

ReceiveLogsTopic.java的代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for (String bindingKey : argv) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

接收所有日志类型:

$ java -cp $CP ReceiveLogsTopic "#"

从”kern”那接收日志信息:

$ java -cp $CP ReceiveLogsTopic "kern.*"

或者只接收“critical”类型的日志:

$ java -cp $CP ReceiveLogsTopic "*.critical"

你还可以建立多个绑定关系:

$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

或者直接发送routing key为”kern.critical”类型的日志:

$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
<p><b>一些有趣的问题:</b></p>

- "*"会匹配到routing key为空的消息吗?
-#.*”会匹配到“. .”吗?或者只匹配一个单词吗?
- “a.*.#”和“a.#“有什么不同?
<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    RabbitMQ指南(Java)