首页 > 代码库 > rabbitMq创建和获取消息

rabbitMq创建和获取消息

package com.yunda.inter.preload.contextinit;

import net.sf.json.JSONObject;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.yunda.inter.shipmentAcceptor.bean.ShipmentData;
import com.yunda.inter.shipmentCheck.service.ShipmentCheckService;
import com.yunda.inter.sign.service.SignService;
import com.yunda.inter.util.CommUtil;
import com.yunda.inter.util.QueueUtil;
import com.yunda.inter.util.StringUtil;


/**
 * 启动预加载信息类
 *@author Administrator
 */
public class ContextLoaderSpringListener implements ApplicationListener<ContextRefreshedEvent>{
    
    private static Log logger = LogFactory.getLog(ContextLoaderSpringListener.class);
    @Autowired
    private ShipmentCheckService shipmentCheckService;

    //当spring容器初始化完成后就会执行该方法。
    public void onApplicationEvent(ContextRefreshedEvent event) {
        logger.debug("ConfigLoadListener init......");
        try {
            //创建一个频道
            Channel channel = QueueUtil.getConnection().createChannel();
            boolean durable = true;
            //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
            channel.queueDeclare(QueueUtil.getQueueName(), durable, false, false, null);

            //创建队列消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //指定消费队列
            //TODO:并发测试MQ,ack?
            channel.basicConsume(QueueUtil.getQueueName(), false/*打开应答机制*/, consumer);
            while (true) {
                //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                byte[] body = delivery.getBody();
                try {                    
                    String str=new String(body,"UTF-8");
                    JSONObject j = JSONObject.fromObject(str);
                    String shipmentId = j.getString("shipmentId");
                    String vehicleId = j.getString("vehicleId");
                    int planLineType = j.getInt("planLineType");
                
                    shipmentCheckService.check(shipmentId,vehicleId,planLineType);
                } catch (RuntimeException e) {
                    logger.error("货运单数据校验出现异常:", e);
                    logger.error("Source package:"+ CommUtil.getEncodeData(body));
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            logger.error("货运单存储器出现异常:", e);
        }
    }
    

}

 

    private void storeInQueue(byte[] dst) throws IOException, TimeoutException {
        Channel channel = QueueUtil.getConnection().createChannel();
        channel.queueDeclare(QueueUtil.getQueueName(), /*持久存储*/false, false, false, null);
        channel.basicPublish("", QueueUtil.getQueueName(), null, dst);
        channel.close();
    }

 

rabbitMq创建和获取消息