首页 > 代码库 > RabbitMQ 笔记

RabbitMQ 笔记

1:发送

Uri uri = new Uri("amqp://10.0.4.85:5672/");

ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "abc";
factory.Password = "abcdef";
factory.VirtualHost = "dnt_mq";
factory.RequestedHeartbeat = 0;
factory.Endpoint = new AmqpTcpEndpoint(uri);

string exchange = "ex1";
string exchangeType = "direct";
string routingKey = "Q1";

using (IConnection conn = factory.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
if (exchangeType != null)
{
ch.ExchangeDeclare(exchange, exchangeType);//,true,true,false,false, true,null);
ch.QueueDeclare("MyFirstQueue", true, true, true, null);
ch.QueueBind("MyFirstQueue", exchange, routingKey);
}
string jsonStr = "abc";//JsonConvert.SerializeObject(requestMsg);
byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);

IBasicProperties properties = ch.CreateBasicProperties();
properties.DeliveryMode = 2;

ch.BasicPublish(exchange, routingKey, properties, bytes);

/*
* channel.queueBind("queueName", "exchangeName", "routingKey.*");
* channel.basicPublish("exchangeName", "routingKey.one", properties, bytes);
*
* channel.exchangeDeclare("exchangeName", "fanout"); //direct fanout topic
* channel.basicPublish("exchangeName", "", properties, bytes);
* */
}
}

2 接收
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "";
factory.Port = 456;
factory.UserName = "";
factory.Password = "";
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//在MQ上定义一个持久化队列,如果名称相同不会重复创建
channel.QueueDeclare("MyFirstQueue", true, false, false, null);

//输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
channel.BasicQos(0, 1, false);

Console.WriteLine("Listening...");

//在队列上定义一个消费者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
//消费队列,并设置应答模式为程序主动应答
channel.BasicConsume("MyFirstQueue", false, consumer);

while (true)
{
//阻塞函数,获取队列中的消息
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] bytes = ea.Body;
string str = Encoding.UTF8.GetString(bytes);
//RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
Console.WriteLine("HandleMsg:" + "".ToString());
//回复确认
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
catch (Exception e1)
{
Console.WriteLine(e1.ToString());
}
Console.ReadLine();

}

RabbitMQ 笔记