首页 > 代码库 > 初识RabbitMQ系列之一HelloWorld

初识RabbitMQ系列之一HelloWorld

Server端代码:

 1 package com.helloworld; 2  3 import java.io.IOException; 4  5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8  9 public class Send {10     /** 发送目的地队列名称 */11     private final static String QUEUE_NAME = "hello";  12     13     public static void main(String[] args) throws IOException {  14         ConnectionFactory factory = new ConnectionFactory();  15         factory.setHost("121.40.151.134");  16         factory.setUsername("rabbitName");17         factory.setPassword("rabbitPWD");18         Connection connection = factory.newConnection();  19         Channel channel = connection.createChannel();  20         channel.queueDeclare(QUEUE_NAME, false, false, false, null);  21         String message = "Hello World!";  22         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  23         System.out.println("[" + message + "]");  24         channel.close();  25         connection.close();  26     } 27 }

 

Client端代码:

 1 package com.helloworld; 2  3 import java.io.IOException; 4  5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer;10 import com.rabbitmq.client.ShutdownSignalException;11 12 public class Recv {13     /** 接收队列名称,如果是一个不存在的队列名称将会报错 */14     private final static String QUEUE_NAME = "hello";  15     16     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {  17         ConnectionFactory factory = new ConnectionFactory();  18         factory.setHost("121.40.151.134");  19         factory.setUsername("rabbitName");20         factory.setPassword("rabbitPWD");21         Connection connection = factory.newConnection();  22         Channel channel = connection.createChannel();  23         channel.queueDeclare(QUEUE_NAME, false, false, false, null);  24         System.out.println("CRTL+C");  25         QueueingConsumer consumer = new QueueingConsumer(channel);  26         channel.basicConsume(QUEUE_NAME, true, consumer);  27           28         while (true) {  29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  30             String message = new String(delivery.getBody());  31             System.out.println("[" + message + "]");  32         }  33     }  34 }

RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃。

channel.queueDeclare:第一个参数:队列名字,第二个参数:队列是否可持久化即重启后该队列是否依然存在,第三个参数:该队列是否是独占的即客户端连接上来时它就占用整个连接,第四个参数:是否自动销毁即当这个队列不再被使用的时候即没有消费者对接上来时自动删除,第五个参数:其他参数如TTL(队列存活时间)等。

channel.basicConsume:第一个参数:队列名字,第二个参数:是否自动应答,如果为真,消息一旦被消费者收到,服务端就知道该消息已经投递,从而从队列中将消息剔除,否则,需要在消费者端手工调用channel.basicAck()方法通知服务端,如果没有调用,消息将会进入unacknowledged状态,并且当消费者连接断开后变成ready状态重新进入队列,第三个参数,具体消费者类。


 

初识RabbitMQ系列之一HelloWorld