首页 > 代码库 > RabbitMQ学习(二)工作队列

RabbitMQ学习(二)工作队列

 

1.工作队列(Work Queue)又叫任务队列(Task Queue)指将任务分发个多个消费者。

2.实际操作:

       这里使用一个生产者产生多条数据提供给3个消费者

       生产者代码:

public class Producter {
    //队列名称
   
private final static String QUEUE_NAME = "Work_Queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //配置rabbitmq服务器地址
       
ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("starktan");
        factory.setPassword("starktan");
        factory.setVirtualHost("/");
        //建立连接和通道
       
Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,可以手动在mq中创建
       
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //写入10条数据(直接循环写入)
       
for (int i = 0; i < 10; i++) {
            System.out.println("发送第" + i + "信息!");
            String message = "WorkQueue Message number " + i + " " + System.currentTimeMillis();
            channel.basicPublish("", QUEUE_NAME, true, null, message.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者代码

public class Consumer {
    //队列名称
   
private final static String QUEUE_NAME = "Work_Queue";
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        //创建连接和通道
       
ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        ExecutorService service = Executors.newFixedThreadPool(10);
        for(int i=0;i<3;i++){
            final int cur = i;
            service.submit(new Runnable() {
                Channel channel = connection.createChannel();
                public void run() {
                    //创建队列消费者
                   
QueueingConsumer consumer = new QueueingConsumer(channel);
                    //指定消费队列
                   
try {
                        channel.basicConsume(QUEUE_NAME, true, consumer);
                        while (true)
                        {
                            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
                           
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            String message = new String(delivery.getBody());
                            System.out.println("线程 "+cur+" 获取到消息 " + message + "开始处理");
                            Thread.sleep(1000*(cur+5)*2);
                            System.out.println("线程 "+cur+" "+message + "处理完成");
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        service.shutdown();
    }
}


技术分享运行效果:(消费者循环调度)

 

3.消息确认:

       在处理一个比较耗时的任务的时候,如果消费者在中途崩溃掉,则对应的这条数据就丢失了,为了避免消息丢失的情况,RabbitMQ提供了消息确认

       使用两个消费者进行演示,调用方法

public void getConsum() throws IOException, TimeoutException, InterruptedException {
    //创建连接和通道
   
ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
   
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //创建队列消费者
   
QueueingConsumer consumer = new QueueingConsumer(channel);
    //指定消费队列,且改为手动确认
   
channel.basicConsume(QUEUE_NAME, false, consumer);
    while (true)
    {
       
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" 获取到消息 " + message + "开始处理");
        try{
            Thread.sleep(10000);
        }catch (InterruptedException e){}
        finally {
         channel.basicAck(delivery.getEnvelope().getDeliveryTag()
                    , false);
        }
        System.out.println( message + "处理完成");
    }
}

技术分享

手动关掉一个消费者

 技术分享

消息被另一个消费者继续进行处理;

4.公平调度:

        channel.basicQos(1);//保证一次只分发一个

5.持久化: 保证当RabbitMQ服务器崩溃关机也不会造成消息丢失

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

第二个参数改为true

RabbitMQ学习(二)工作队列