首页 > 代码库 > Publish/Subscribe
Publish/Subscribe
在RabbitMQ中,producer只能将消息发送到一个exchange
中。要理解exchange也非常简单,它一边负责接收producer发送的消息, 另一边将消息推送到queue中。exchange必须清楚的知道在收到消息之后该如何进行下一步的处理,比如是否应该将这条消息发送到某个queue中? 还是应该发送到多个queue中?还是应该直接丢弃这条消息等等。
RabbitMQ中的exchange类型有这么几种:direct
,topic
,headers
以及fanout
。这一小节将会主要介绍最后一种类型——fanout
。 使用RabbitMQ的client来创建一个fanout
类型的exchange。fanout类型的exchange非常简单,从名字也可以猜测出来,它会向所有的queue广播所有收到的消息。这正是我们的log系统需要的。
package com.rabbitmq.www.publish_subscribe; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; private final static String HOST_ADDR = "172.18.112.102"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDR); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); for(int i=0;i<=10;i++){ String message = "helloworld"+i; //信息发送给申明的exchange channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent ‘" + message ); } channel.close(); connection.close(); } }
package com.rabbitmq.www.publish_subscribe; import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; private final static String HOST_ADDR = "172.18.112.102"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDR); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //服务器为创建随机queue提供了一个无参数的queueDeclare()方来来创建一个非持久化的、独有的并且是自动删除的已命名的queue。 String queueName = channel.queueDeclare().getQueue(); //现在我们需要告诉exchange将消息发送到我们的queue中。 这种exchange和queue的关系称为绑定(binding)。 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received ‘" + message + "‘"); } }; channel.basicConsume(queueName, true, consumer); } }
Publish/Subscribe
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。