首页 > 代码库 > rabbitmq延迟任务的处理

rabbitmq延迟任务的处理

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

最近的一个项目遇到了这种情况,如果运单30分钟还没有被接单,则状态自动变为已取消。实现延迟消息原理如下,借用一张图:

技术分享

 

php代码如下:

/**
 * 死信
 * 创建交换机1、交换机2、队列1、队列2
 * 交换机1绑定队列1、交换机2绑定队列2
 * 其中交换机1为死信交换机,队列1处理消息的ttl,队列1没有消费者
 *
 * 由于队列1没有消费者,所以它里面的消息过期后会变成死信,再定义规则,让死信进入交换机2,交换机2再把消息路由到队列2
 * 我们的客户端只需要消费队列2即可
 * 
 * 需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。
 */
public function actionDeadletter(){
    $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
    $channel = $connection->channel();
    
    // 死信交换机和队列的设置
    $channel->exchange_declare(‘dead_exchange‘, ‘direct‘, false, true, false);
    $channel->queue_declare(‘queue1‘, false, true, false, false, false, [
            ‘x-dead-letter-exchange‘ => [‘S‘, ‘normal_exchange‘], // 死信被转发到哪个交换机
            ‘x-dead-letter-routing-key‘ => [‘S‘, ‘normal_routingkey‘] // 死信路由
    ]);
    $channel->queue_bind(‘queue1‘, ‘dead_exchange‘, ‘dead_routingkey‘);
    
    // 正常交换机和队列的设置
    $channel->exchange_declare(‘normal_exchange‘, ‘direct‘, false, true, false);
    $channel->queue_declare(‘queue2‘, false, true, false, false, false);
    $channel->queue_bind(‘queue2‘, ‘normal_exchange‘, ‘normal_routingkey‘);
    
    $msg = new AMQPMessage(‘hello world‘, [
            ‘delivery_mode‘ => 2,
            ‘expiration‘ => 10*1000 //毫秒
    ]);
    
    $channel->basic_publish($msg, ‘dead_exchange‘, ‘dead_routingkey‘);
    
    echo " [x] Sent ‘Hello World!‘\n";
    $channel->close();
    $connection->close();
}

 运行程序,打开rabbitmq的web管理界面,可以看到消息先进入队列1,当消息过期后会自动进入队列2

参考:https://stackoverflow.com/questions/21942063/how-to-delay-php-amqplib

参考:http://www.cnblogs.com/haoxinyue/p/6613706.html

rabbitmq延迟任务的处理