首页 > 代码库 > RabbitMQ系列之二:work queue

RabbitMQ系列之二:work queue

server端代码:

 1 package com.example.workqueue; 2  3 import java.io.IOException; 4  5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.MessageProperties; 9 10 public class Send {11 12     public static void main(String[] args) throws IOException {13         14         // 队列名称15         String queueName = "task_queue";16         17         ConnectionFactory factory = new ConnectionFactory();18         19         //远程服务器ip,如果在本地测试可以改成localhost20         factory.setHost("121.40.151.120");21         22         //不是在本地测试,用户名和密码必填23         factory.setUsername("rabbitmqname");24         factory.setPassword("rabbitmqpwd");25         26         Connection conn = factory.newConnection();27         Channel channel = conn.createChannel();28         29         boolean durable = true;  30         31         /** 32          * 参数说明:33          * queue:队列名称34          * durable:队列数据是否可以持久化,true:是,false:否。也就是服务重启后队列数据是否依然存在35          * exclusive:是否为某一个队列的专用连接36          * autoDelete:当队列不再被使用也就是没有消费者的时候是否自动删除37          * arguments:其它参数,比如队列存活时间38         */39         channel.queueDeclare(queueName, durable, false, false, null);40         41         String[] strs = new String[] { "First message." }; 42         String message = getMessage(strs);  43         44         /** 45          * 参数说明:46          * exchange:默认的exchange就是"",是direct类型的,47          *             任何发往到默认exchange的消息都会被路由到routingKey的名字对应的队列上,如果没有对应的队列,则消息会被丢弃。48          * routingKey:指定接收消息的队列49          * props:其它属性,比如消息路由头信息,持久化信息50          * body:消息内容51         */  52         channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  53         54         System.out.println("[" + message + "]");  55           56         // 最后,我们关闭channel和连接,释放资源。  57         channel.close();  58         conn.close(); 59     }60     61     private static String getMessage(String[] strings) {  62         if (strings.length < 1) {  63             return "Hello World!";  64         }  65         return joinStrings(strings, " ");  66     }  67   68     private static String joinStrings(String[] strings, String delimiter) {  69         int length = strings.length;  70         if (length == 0) {  71             return "";  72         }  73         StringBuilder words = new StringBuilder(strings[0]);  74         for (int i = 1; i < length; i++) {  75             words.append(delimiter).append(strings[i]);  76         }  77         return words.toString();  78     } 79 80 }

 

client端代码:

 1 package com.example.workqueue; 2  3 import java.io.IOException; 4  5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer;10 import com.rabbitmq.client.ShutdownSignalException;11 12 public class Recv {13 14     public static void main(String[] args) throws IOException, ShutdownSignalException, 15         ConsumerCancelledException, InterruptedException {16         17         // 队列名称18         String queueName = "task_queue";19         20         ConnectionFactory factory = new ConnectionFactory(); 21         22         factory.setHost("121.40.151.120");23         factory.setUsername("rabbitmqname");24         factory.setPassword("rabbitmqpwd");25         26         Connection connection = factory.newConnection();  27         Channel channel = connection.createChannel();  28         29         // 表示在同一时间不要给一个Rev一个以上的消息(只能是一个),也就是说不要将一个新的消息分发给Rev直到它处理完了并且返回了前一个消息的通知标志(acknowledged)30         channel.basicQos(1);31           32         //与服务端一致33         channel.queueDeclare(queueName, true, false, false, null);  34         35         System.out.println("CRTL+C");  36           37         // QueueingConsumer:用来缓存服务端推送给我们的消息。  38         QueueingConsumer consumer = new QueueingConsumer(channel);  39         40         boolean autoAck = false;41         /** 42          * 参数说明:43          * queue:队列名称44          * autoAck:是否自动应答,true:消息一旦被消费者消费,服务端就知道该消息已经投递,从而从队列中将消息剔除;45          *                      false:需要在消费端显示调用channel.basicAck()方法通知服务端,如果没用显示调用,消息将进入46          *                            unacknowledged状态,当前消费者连接断开后该消息变成ready状态重新进入队列。47          * callback:具体消费者类48         */49         channel.basicConsume(queueName, autoAck, consumer);  50           51         while (true) {  52             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  53             String message = new String(delivery.getBody());  54             System.out.println("[" + message + "]");  55             doWork(message);56             System.out.println("r[done]");57             58             /**59              * 显示调用通知服务端该消息已经消费并返回了acknowledged60              * true:通知所有相同tag的untracked,false:只通知当前一个61              */62             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 63         } 64     }65 66     private static void doWork(String message) throws InterruptedException {67         for (char ch : message.toCharArray()) {  68             if (ch == ‘.‘) {  69                 Thread.sleep(1000);  70             }  71         }72     }73 74 }

 

RabbitMQ系列之二:work queue