首页 > 代码库 > RabbitMQ系列之二:work queue
RabbitMQ系列之二:work queue
server端代码:
1 package com.example.workqueue; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.MessageProperties; 9 10 public class Send {11 12 public static void main(String[] args) throws IOException {13 14 // 队列名称15 String queueName = "task_queue";16 17 ConnectionFactory factory = new ConnectionFactory();18 19 //远程服务器ip,如果在本地测试可以改成localhost20 factory.setHost("121.40.151.120");21 22 //不是在本地测试,用户名和密码必填23 factory.setUsername("rabbitmqname");24 factory.setPassword("rabbitmqpwd");25 26 Connection conn = factory.newConnection();27 Channel channel = conn.createChannel();28 29 boolean durable = true; 30 31 /** 32 * 参数说明:33 * queue:队列名称34 * durable:队列数据是否可以持久化,true:是,false:否。也就是服务重启后队列数据是否依然存在35 * exclusive:是否为某一个队列的专用连接36 * autoDelete:当队列不再被使用也就是没有消费者的时候是否自动删除37 * arguments:其它参数,比如队列存活时间38 */39 channel.queueDeclare(queueName, durable, false, false, null);40 41 String[] strs = new String[] { "First message." }; 42 String message = getMessage(strs); 43 44 /** 45 * 参数说明:46 * exchange:默认的exchange就是"",是direct类型的,47 * 任何发往到默认exchange的消息都会被路由到routingKey的名字对应的队列上,如果没有对应的队列,则消息会被丢弃。48 * routingKey:指定接收消息的队列49 * props:其它属性,比如消息路由头信息,持久化信息50 * body:消息内容51 */ 52 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 53 54 System.out.println("[" + message + "]"); 55 56 // 最后,我们关闭channel和连接,释放资源。 57 channel.close(); 58 conn.close(); 59 }60 61 private static String getMessage(String[] strings) { 62 if (strings.length < 1) { 63 return "Hello World!"; 64 } 65 return joinStrings(strings, " "); 66 } 67 68 private static String joinStrings(String[] strings, String delimiter) { 69 int length = strings.length; 70 if (length == 0) { 71 return ""; 72 } 73 StringBuilder words = new StringBuilder(strings[0]); 74 for (int i = 1; i < length; i++) { 75 words.append(delimiter).append(strings[i]); 76 } 77 return words.toString(); 78 } 79 80 }
client端代码:
1 package com.example.workqueue; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer;10 import com.rabbitmq.client.ShutdownSignalException;11 12 public class Recv {13 14 public static void main(String[] args) throws IOException, ShutdownSignalException, 15 ConsumerCancelledException, InterruptedException {16 17 // 队列名称18 String queueName = "task_queue";19 20 ConnectionFactory factory = new ConnectionFactory(); 21 22 factory.setHost("121.40.151.120");23 factory.setUsername("rabbitmqname");24 factory.setPassword("rabbitmqpwd");25 26 Connection connection = factory.newConnection(); 27 Channel channel = connection.createChannel(); 28 29 // 表示在同一时间不要给一个Rev一个以上的消息(只能是一个),也就是说不要将一个新的消息分发给Rev直到它处理完了并且返回了前一个消息的通知标志(acknowledged)30 channel.basicQos(1);31 32 //与服务端一致33 channel.queueDeclare(queueName, true, false, false, null); 34 35 System.out.println("CRTL+C"); 36 37 // QueueingConsumer:用来缓存服务端推送给我们的消息。 38 QueueingConsumer consumer = new QueueingConsumer(channel); 39 40 boolean autoAck = false;41 /** 42 * 参数说明:43 * queue:队列名称44 * autoAck:是否自动应答,true:消息一旦被消费者消费,服务端就知道该消息已经投递,从而从队列中将消息剔除;45 * false:需要在消费端显示调用channel.basicAck()方法通知服务端,如果没用显示调用,消息将进入46 * unacknowledged状态,当前消费者连接断开后该消息变成ready状态重新进入队列。47 * callback:具体消费者类48 */49 channel.basicConsume(queueName, autoAck, consumer); 50 51 while (true) { 52 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 53 String message = new String(delivery.getBody()); 54 System.out.println("[" + message + "]"); 55 doWork(message);56 System.out.println("r[done]");57 58 /**59 * 显示调用通知服务端该消息已经消费并返回了acknowledged60 * true:通知所有相同tag的untracked,false:只通知当前一个61 */62 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 63 } 64 }65 66 private static void doWork(String message) throws InterruptedException {67 for (char ch : message.toCharArray()) { 68 if (ch == ‘.‘) { 69 Thread.sleep(1000); 70 } 71 }72 }73 74 }
RabbitMQ系列之二:work queue
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。