RabbitMQ学习(三)订阅/发布
2024-10-24 18:19:39 210人阅读
RabbitMQ学习(三)订阅/发布
1.RabbitMQ模型
前面所学都只用到了生产者、队列、消费者。如上图所示,其实生产者并不直接将信息传输到队列中,在生产者和队列中间有一个交换机(Exchange),我们之前没有使用到交换机是应为我们没有配置交换机,使用了默认的交换机。
有几个可供选择的交换机类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)
这里我们使用扇形交换机做一个简单的广播模型:一个生产者和多个消费者接受相同消息;
生产者代码:
public class Productor {
public static void main(String[]
args) throws
IOException,
TimeoutException {
//配置rabbitmq服务器地址
ConnectionFactory
factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("starktan");
factory.setPassword("starktan");
factory.setVirtualHost("/");
//建立连接和通道
Connection
connection = factory.newConnection();
Channel channel =
connection.createChannel();
//声明一个扇形交换机
channel.exchangeDeclare("fanout",
BuiltinExchangeType.FANOUT);
System.out.println("发送信息!");
String message = "WorkQueue
Message number " + System.currentTimeMillis();
channel.basicPublish("fanout", "", true, null,
message.getBytes());
channel.close();
connection.close();
}
}
|
消费者代码:
public class Consumer {
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
//创建连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 4; i++) {
final int cur = i;
service.submit(new Runnable() {
Channel channel = connection.createChannel();
String queryname = channel.queueDeclare().getQueue();
public void run() {
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
try {
channel.queueBind(queryname,"fanout","");
channel.basicConsume(queryname,consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("线程 " + cur + " 获取到消息 " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
|
运行效果:
RabbitMQ学习(三)订阅/发布
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉:
投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。