首页 > 代码库 > swoole 异步队列

swoole 异步队列

<?php
// 消费者,文件名《server.php》

class Server {
    private $serv;
    
    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            ‘worker_num‘ => 1,        // 一般设置为服务器CPU数的1-4倍
            ‘daemonize‘ => false,     // 以非守护进程执行(一般项目中填 true)
            ‘max_request‘ => 9999,    // 处理这么多请求之后,重启worker进程,防止内存泄漏
            ‘task_worker_num‘ => 4,   // task进程的数量
       //   ‘log_file‘ => ‘/data/log/swoole.log‘ ,     //日志
        ));

        $this->serv->on(‘Receive‘, array($this, ‘onReceive‘));
        // bind callback
        $this->serv->on(‘Task‘, array($this, ‘onTask‘));
        $this->serv->on(‘Finish‘, array($this, ‘onFinish‘));
        $this->serv->start();
    }
    
    /**
    * 接收到数据时回调此函数,发生在worker进程中
    * $server,swoole_server对象
    * $fd,TCP客户端连接的文件描述符
    * $from_id,TCP连接所在的Reactor线程ID
    * $data,收到的数据内容,可能是文本或者二进制内容
    */
    public function onReceive($serv, $fd, $from_id, $data ) {
        echo PHP_EOL."=========== onReceive ============".PHP_EOL;
        echo "Get Message From Client {$fd}:{$data}".PHP_EOL;
        $serv->task( $data );
    }
    
    /**
    * 在task_worker进程内被调用worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌
这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task * $task_id 是任务ID,由swoole扩展内自动生成,用于区分不同的任务$task_id 和 $src_worker_id 组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同 * $src_worker_id 来自于哪个worker进程 * $data 是任务的内容
*/ public function onTask($serv, $task_id, $from_id, $data) { $array = json_decode( $data , true ); echo "=========== onTask ============".PHP_EOL; echo var_export($array, 1).PHP_EOL; return $array; // 返回执行结果到worker进程,低版本的swoole要写成 $serv->finish($array); } /** * 当worker进程投递的任务在task_worker中完成时,task进程会通过swoole_server->finish()方法将任务处理的结果发送给worker进程 * $task_id 是任务的ID * $data 是任务处理的结果内容,也就是 onTask()函数中的 return值 */ public function onFinish($serv, $task_id, $data) { echo "=========== onFinish ============".PHP_EOL; echo "Task {$task_id} finish !".PHP_EOL; echo var_export($data, 1).PHP_EOL; } } $server = new Server(); ############################ 华丽的分割线 ############################ <?php // 生产者,文件名《client.php》 class Client { private $client; public function __construct() { $this->client = new swoole_client(SWOOLE_SOCK_TCP); } public function connect() { if( !$this->client->connect(‘127.0.0.1‘, 9501 , 1) ) { echo "Connect Error"; } $data = array( ‘url‘ => ‘http://funsion.cnblogs.com‘, ‘timestamp‘ => date(‘Y-m-d H:i:s‘), ); $json_data = json_encode($data); $this->client->send($json_data); } } $client = new Client(); $client->connect(); ############################ 华丽的分割线 ############################ 先执行 php server.php (执行一次,会以守护进程运行) 然后再执行 php client.php (可以执行多次,每次执行会产生新的任务)

 

swoole 异步队列