RabbitMQ学习(二)工作队列
2024-10-24 12:38:38 210人阅读
1.工作队列(Work Queue)又叫任务队列(Task Queue)指将任务分发个多个消费者。
2.实际操作:
这里使用一个生产者产生多条数据提供给3个消费者
生产者代码:
public class Producter {
//队列名称
private final
static String QUEUE_NAME = "Work_Queue";
public static void main(String[]
args) throws
IOException,
TimeoutException {
//配置rabbitmq服务器地址
ConnectionFactory
factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("starktan");
factory.setPassword("starktan");
factory.setVirtualHost("/");
//建立连接和通道
Connection
connection = factory.newConnection();
Channel channel =
connection.createChannel();
//声明队列,可以手动在mq中创建
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//写入10条数据(直接循环写入)
for (int i = 0; i < 10; i++) {
System.out.println("发送第" + i + "信息!");
String message = "WorkQueue
Message number " + i + " " + System.currentTimeMillis();
channel.basicPublish("", QUEUE_NAME, true, null,
message.getBytes());
}
channel.close();
connection.close();
}
}
|
消费者代码
public class Consumer {
//队列名称
private final static String QUEUE_NAME = "Work_Queue";
public static void main(String[] args) throws IOException,
InterruptedException, TimeoutException {
//创建连接和通道
ConnectionFactory factory =
new
ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
ExecutorService service =
Executors.newFixedThreadPool(10);
for(int i=0;i<3;i++){
final int cur = i;
service.submit(new Runnable() {
Channel channel = connection.createChannel();
public void run() {
//创建队列消费者
QueueingConsumer consumer =
new
QueueingConsumer(channel);
//指定消费队列
try {
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery
delivery = consumer.nextDelivery();
String
message = new
String(delivery.getBody());
System.out.println("线程 "+cur+" 获取到消息 " + message + "开始处理");
Thread.sleep(1000*(cur+5)*2);
System.out.println("线程 "+cur+" "+message + "处理完成");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
} |
运行效果:(消费者循环调度)
3.消息确认:
在处理一个比较耗时的任务的时候,如果消费者在中途崩溃掉,则对应的这条数据就丢失了,为了避免消息丢失的情况,RabbitMQ提供了消息确认
使用两个消费者进行演示,调用方法
public void getConsum() throws IOException, TimeoutException, InterruptedException {
//创建连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列,且改为手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 获取到消息 " + message + "开始处理");
try{
Thread.sleep(10000);
}catch (InterruptedException e){}
finally {
channel.basicAck(delivery.getEnvelope().getDeliveryTag()
, false);
}
System.out.println( message + "处理完成");
}
}
|
手动关掉一个消费者
消息被另一个消费者继续进行处理;
4.公平调度:
channel.basicQos(1);//保证一次只分发一个
5.持久化: 保证当RabbitMQ服务器崩溃关机也不会造成消息丢失
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
第二个参数改为true
RabbitMQ学习(二)工作队列
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉:
投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。