首页 > 代码库 > RabbitMQ学习(五).NET Client之Topics

RabbitMQ学习(五).NET Client之Topics

  • 5 Topics

    Receiving messages based on a pattern

    Python | Java | Ruby | PHP| C#

转载请注明出处:jiq?钦‘s technical Blog

Topics

(using the .NET client)


前面的教程我们已经加强了我们的日志系统,我们将只具备无脑广播的 fanout 交换机替换为了可以选择性地接收日志消息的 direct 交换机。

尽管使用 direct 交换机加强了我们的系统, 但是仍然是受限的 --- 无法基于多个标准对消息进行路由!!!

比如我们可能不仅仅只想针对于日志消息的严重程度进行消息的订阅,我们还想同时针对日志消息的来源 (auth/cron/kern...)等主题进行订阅。即我们可能又想要订阅‘cron‘发出来的关键性的错误日志消息,还想订阅来自于‘kern‘的所有的日志消息。这个时候我们需要使用更为复杂的 topic 交换机。

Topic exchange(Topic交换机)

发给 topic 交换机的消息的 routing key 不能是随意的,必须遵循这样的格式: 点号‘.‘分割的单词列表,单词可以是任意的。

一些有效的 routing key像:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit". 数目为多少没有限制,但是不能超过255字节。

接收方的 binding key也要遵循同样的格式,topic 交换机背后的逻辑和 direct 交换机其实类似的,同样是将具备特定 routing key 的消息发送到所有的绑定了匹配的 binding key 的队列,不过对于binding key却很不一样:

  • * (star) 代表一个单词.
  • # (hash) 代表0个或者多个单词.

用一个例子来解释:

在这个例子中我们将要描述动物的消息。

消息的routing key由三个单词组成,第一个单词描述动物的速度,第二个单词描述肤色,第三个单词描述物种: "<speed>.<colour>.<species>".

我们在接收方给topic交换机创建三个绑定队列: 队列Q1的binding key是 "*.orange.*" ,队列Q2 的binding key是 "*.*.rabbit" 和 "lazy.#".

这些绑定可以概括如下:

  • Q1队列是对所有的肤色为橘色的动物感兴趣.
  • Q2队列不但对所有兔子感兴趣,还对所有速度超慢的动物也感兴趣.

一个routing key为 "quick.orange.rabbit" 的消息将会被推送到Q1和Q2两个队列. routing key为 "lazy.orange.elephant" 的消息也会被推送到两个队列. "quick.orange.fox" 只会被推送到第一个队列, "lazy.brown.fox" 只会被推送到第二个队列. "lazy.pink.rabbit" 只会被推送到第二个队列一次,尽管它匹配了两个binding key,"quick.brown.fox" 因为不匹配任何binding key将会被丢弃.

发送routing key为一个或者四个单词的消息会怎么样呢?比如 "orange" or "quick.orange.male.rabbit"? 会被丢弃,因为没有任何binding key去匹配.

不过 "lazy.orange.male.rabbit", 尽管有四个单词,但是仍然会匹配最后一个binding key而被推送到队列Q2.

Topic exchange

Topic exchange is powerful and can behave like other exchanges.

When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key - like in fanout exchange.

When special characters "*" (star) and "#" (hash) aren‘t used in bindings, the topic exchange will behave just like a direct one.

Putting it all together(代码总览)

我们将在我们的日志系统中引入 topic 交换机,消息的 routing key 设定为两个单词: "<facility>.<severity>".

代码和前面的几乎一样,EmitLogTopic.cs的代码如下:

class EmitLogTopic
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("topic_logs", "topic");

                var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
                var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("topic_logs", routingKey, null, body);
                Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", routingKey, message);
            }
        }
    }
}

ReceiveLogsTopic.cs的代码如下:

class ReceiveLogsTopic
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("topic_logs", "topic");
                var queueName = channel.QueueDeclare();

                if (args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [binding_key...]",Environment.GetCommandLineArgs()[0]);
                    Environment.ExitCode = 1;
                    return;
                }

                foreach (var bindingKey in args)
                {
                    channel.QueueBind(queueName, "topic_logs", bindingKey);
                }

                Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C");
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queueName, true, consumer);

                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘",routingKey, message);
                }
            }
        }
    }
}

Run the following examples:

To receive all the logs:

$ ReceiveLogsTopic.exe "#"

To receive all logs from the facility "kern":

$ ReceiveLogsTopic.exe "kern.*"

Or if you want to hear only about "critical" logs:

$ ReceiveLogsTopic.exe "*.critical"

You can create multiple bindings:

$ ReceiveLogsTopic.exe "kern.*" "*.critical"

And to emit a log with a routing key "kern.critical" type:

$ EmitLogTopic.exe "kern.critical" "A critical kernel error"

Have fun playing with these programs. Note that the code doesn‘t make any assumption about the routing or binding keys, you may want to play with more than two routing key parameters.

Some teasers:

  • Will "*" binding catch a message sent with an empty routing key?
  • Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key?
  • How different is "a.*.#" from "a.#"?

(Full source code for EmitLogTopic.cs and ReceiveLogsTopic.cs)

Next, find out how to do a round trip message as a remote procedure call in tutorial 6

RabbitMQ学习(五).NET Client之Topics