首页 > 代码库 > RabbitMQ学习(二).NET Client之Work Queues

RabbitMQ学习(二).NET Client之Work Queues

  • 2 Work queues

    Distributing tasks among workers

    Python | Java | Ruby | PHP| C#

转载请注明出处:jiq?钦‘s technical Blog

Work Queues

(using the .NET Client)

前面已经介绍过了如何编写程序去发送消息到命名队列,以及从命名队列接收消息。

在这个部分我们将创建一个工作队列(Work Queue),用于将耗时任务(time-consuming tasks)分发给多个工作者(workers).

引入工作队列最主要的目的是为了避免需要直接去运行资源密集型任务(resource-intensive task),还不得不等待其执行完成。

我们不需要亲自调度将被执行的这些任务,只需要将任务包装成消息,然后将其发送到队列,异步运行(running in the background)的工作者进程会取出这些任务来执行,如果你有多个工作者,那么这些任务会在他们之间共享。

工作队列这个概念在web应用中特别有用处,因为在一个很短的HTTP请求窗口内,我们不可能能够处理完一个很复杂的耗时任务。

Preparation(准备)

将前面Send/Receive例子中的Send.cs简单修改一下,允许从命令行输入任意要发送的消息。然后设置deliveryMode为2,即Persistent模式。

这个程序的作用就是将任务发送到工作队列,暂且将这个程序叫做NewTask.cs:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;

channel.BasicPublish("", "hello", properties, body);

其中从命令行获取输入的消息的函数如下:

private static string GetMessage(string[] args)
{
  return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}


老的Receive.cs代码也需要变化,在收到工作队列中的消息后将消息中的每个点"."都解析为sleep一秒,假装正在执行耗时任务,暂且将这个程序叫做Worker.cs:

var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", true, consumer);

Console.WriteLine(" [*] Waiting for messages. " +
                  "To exit press CTRL+C");
while (true)
{
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    int dots = message.Split(‘.‘).Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine(" [x] Done");
}

按照上一个教程讲的方式对他们进行编译:

$ csc /r:"RabbitMQ.Client.dll" NewTask.cs
$ csc /r:"RabbitMQ.Client.dll" Worker.cs

Round-robin dispatching(轮询调度 --- 简单负载均衡)

使用工作队列的其中一大优势就是能够很容易地平衡好工作任务的分配,假如现在已经有了很多积压的工作,这个时候可以很轻易地加入更多的workers进去,即可以很容易地横向拓展。 

首先让我们同时运行两个Workers.cs程序作为消费者(任务执行者):

shell1$ Worker.exe
Worker
 [*] Waiting for messages. To exit press CTRL+C
shell2$ Worker.exe
Worker
 [*] Waiting for messages. To exit press CTRL+C

然后打开第三个终端,运行NewTask.cs程序,发送新的任务:

shell3$ NewTask.exe First message.
shell3$ NewTask.exe Second message..
shell3$ NewTask.exe Third message...
shell3$ NewTask.exe Fourth message....
shell3$ NewTask.exe Fifth message.....

这个时候你就可以看到任务消息被均衡地分配到两个Worker了:

shell1$ Worker.exe
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received ‘First message.‘
 [x] Received ‘Third message...‘
 [x] Received ‘Fifth message.....‘
shell2$ Worker.exe
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received ‘Second message..‘
 [x] Received ‘Fourth message....‘

默认地RabbitMQ将会依次按顺序(in sequence)发送每一个消息给下一个consumer,平均来说每个consumer都会得到相同数目的消息,这种分发消息的方式就叫做轮训调度(round-robin),你可以试试更多的Worker看看效果。

译者注: “轮训调度”这个特性是RabbitMQ的队列所默认具备的,你不需要针对队列进行任何属性设置,只要你有多个consumer同时从一个队列接收消息,那么RabbitMQ就会以轮训调度的方式均匀地将消息分发给这些consumer,同时你可以无缝地ADD进更多的针对这个队列的consumer。

Message acknowledgment(消息确认)

执行一项任务一般会花费比较长的时间,你肯定想要知道一个开始了一项长时间的任务之后的工作者进程之后发生了什么,有没有任务执行到一般就挂掉了。

针对我们上面的代码,一旦RabbitMQ将消息递送出去给consumer之后就会将消息在内存中移除掉。--- 译者注:可见RabbitMQ存储消息默认是存储在内存中,机器挂掉之后默认会丢失。

这个时候,假如你kill掉等待接收这个消息的worker,那么这个等待被处理的消息就会丢失掉,从而这个消息就会永久地丢失掉,但是它还没有被成功处理。

我们当然不想丢失任何消息了,最好的情况是一旦一个worker死掉不能成功处理消息了,那么这个消息可以被RabbitMQ递送给另一个worker!!!


为了确保一个消息永远不会丢失,RabbitMQ提供了消息确认机制。consumer可以将一个ack(nowledgement)发送回RabbitMQ告诉他消息已经被成功接收并处理了,RabbitMQ你可以将它删除了。

假如consumer挂掉了没有发回ack,那么RabbitMQ就会知道了这个消息没有被成功处理,就会将其重新递送给一个新的consumer,这样就保证了即时在workers经常挂掉的情况下消息也不会丢失。

There aren‘t any message timeouts; RabbitMQ will redeliver the message only when the worker connection dies. It‘s fine even if processing a message takes a very, very long time.

(译者注:这句话没能理解。。。我在想要是一个worker的任务是持久化一个消息,RabbitMQ将这个消息发送给这个worker之后,worker没有挂掉,但是worker和RabbitMQ之间的连接断开了,worker没有将确认发回给RabbitMQ,但是正常把消息存储下来了,这时RabbitMQ没有收到确认,就会将消息发送给另外一个worker将其存储下来,那么这个消息岂不被存储了两次???)


消息确认默认是打开的,上面的例子中我们通过设置channel的BasicConsume的第二个参数 noAck=true来将消息确认机制关闭掉了,你可以打开:

var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", false, consumer);

    while (true)
    {
        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
        //...
        channel.BasicAck(ea.DeliveryTag, false);
    }

这样做以后,我们能够保证即使你在worker正在处理一个消息的时候将其kill掉了,消息仍然不会丢失,在worker死掉之后不久所有的未确认的消息都会被重新递送。

Forgotten acknowledgment

It‘s a common mistake to miss the BasicAck. It‘s an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won‘t be able to release any unacked messages.

In order to debug this kind of mistake you can use rabbitmqctl to print themessages_unacknowledged field:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

Message durability(消息持久化)

我们已经知道了即使一个consumer挂掉,消息也不会丢失可以正确地被处理,但是一旦RabbitMQ服务器挂掉了,消息仍然会丢失!!!

当RabbitMQ退出或者崩溃,它就会忘掉自己所拥有的队列以及消息,除非我们告诉它不要忘记。要做到这点我们需要将队列和消息两者均标记为持久化的。

(1)首先标记队列是持久化的方式是:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

这样做会报错!为什么呢?因为前面已经创建了一个叫做hello的队列,它是非持久化的,RabbitMQ不允许用不同的参数重新定义一个存在的队列,我们需要重新取一个名字比如 task_queue:

bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

注意这个queueDeclare 这个设置持久化的操作需要在生产者和消费者两边都做!


(2)接下来我们需要通过调用 IBasicProperties.SetPersistent 这个方法传递参数true来标记消息也是具备持久化特性的:

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

Note on message persistence

Marking messages as persistent doesn‘t fully guarantee that a message won‘t be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn‘t saved it yet. Also, RabbitMQ doesn‘t do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren‘t strong, but it‘s more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.

Fair dispatch(公平调度)

你可能已经观察到了现有的调度方式仍然没有按照我们期望的方式进行。

例如在有两个worker的情况下,如果所有的奇数消息都是比较大的,然后所有的偶数消息都是很小的,那么就会导致一个worker比较忙,一个worker比较闲。然而RabbitMQ还是不知道这些事情,仍旧按照均匀的方式分配任务。

这种情况之所以会发生是因为RabbitMQ在消息一进入到队列中就开始进行调度,它并不关心consumer发回来的确认消息的数量,仅仅是盲目地按照第几个给谁第几个给谁的自认为公平的方式进行调度。

为了避免这种情况,我们可以调用channel的 basicQos 方法设置 prefetchCount 参数为1来告诉RabbitMQ不要将超过一个的消息给同一个worker。

换句话说,就是如果一个worker还在处理一个消息没有发回确认,那么就不要给他调度新的消息,而是调度给那些没有在忙的worker。

译者注:如果这样,我觉得这个特性要慎用,想想如果在一个场景下面一种消息非常大,处理时间非常之久,而且worker相对较少,那么RabbitMQ在worker还在处理消息的时候又不敢将新的消息发送给他,一旦这种消息的生产者生产消息的速度较快,这势必会造成RabbitMQ服务器队列上积压太多的这种消息,极度浪费服务器的内存。

channel.BasicQos(0, 1, false);

Note about queue size

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

Putting it all together(代码总览)

NewTask.cs 的完整的源码如下:

using System;
using RabbitMQ.Client;
using System.Text;

class NewTask
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("task_queue", true, false, false, null);

                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);

                var properties = channel.CreateBasicProperties();
                properties.SetPersistent(true);

                channel.BasicPublish("", "task_queue", properties, body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
        }
    }

  private static string GetMessage(string[] args)
  {
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
  }
}

(NewTask.cs source)

Worker.cs的完整的源码如下:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;

class Worker
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("task_queue", true, false, false, null);

                channel.BasicQos(0, 1, false);
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume("task_queue", false, consumer);

                Console.WriteLine(" [*] Waiting for messages. " +
                                  "To exit press CTRL+C");
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);

                    int dots = message.Split(‘.‘).Length - 1;
                    Thread.Sleep(dots * 1000);

                    Console.WriteLine(" [x] Done");

                    channel.BasicAck(ea.DeliveryTag, false);
                }
            }
        }
    }
}

(Worker.cs source)

Using message acknowledgments and BasicQos you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.

关于更多的 IModel 方法以及 IBasicProperties的信息,可以参考 RabbitMQ .NET client API reference online.

Now we can move on to tutorial 3 and learn how to deliver the same message to many consumers.



RabbitMQ学习(二).NET Client之Work Queues