首页 > 代码库 > YII使用beanstalk队列

YII使用beanstalk队列

转载于:http://blog.csdn.net/yao970953039/article/details/41821387

1.系统centos 我是直接使用yum install beanstalk安装的

2.下载beanstalk的php扩展包 放在extensions

 

[php] view plaincopy在CODE上查看代码片派生到我的代码片
  1. //控制器往队列里面塞任务  @author yao  
  2. Yii::import(‘application.extensions.SendSmsBeanstalk‘);  
  3.   
  4. class AController extends CController{  
  5.   
  6.     public function actionIndex(){  
  7.         for($i=0;$i<1000;$i++){  
  8.             SendSmsBeanstalk::sendSms(‘sendsms_‘.$i);  
  9.         }  
  10.     }  
  11.   
  12.     public function actionA(){  
  13.         SendSmsBeanstalk::handleMessage();  
  14.     }  
  15. }  
[php] view plaincopy在CODE上查看代码片派生到我的代码片
  1. </pre><pre code_snippet_id="546591" snippet_file_name="blog_20141209_4_7698245" name="code" class="php">//发送短信  @author yao  
  2. Yii::import(‘application.extensions.beanstalk.*‘);  
  3. class SendSmsBeanstalk extends CComponent{  
  4.   
  5.     //建立发短信任务  
  6.     public static function sendSms($body){  
  7.         //实例化beanstalk  
  8.         $beanstalk = new Socket_Beanstalk();  
  9.         if (!$beanstalk->connect()) {  
  10.             exit(current($beanstalk->errors()));  
  11.         }  
  12.         //选择使用的tube  
  13.         $beanstalk->useTube(‘hube_send_smss‘);  
  14.   
  15.   
  16.         //往tube中增加数据  
  17.         $put = $beanstalk->put(  
  18.             23, // 任务的优先级.  
  19.             0, // 不等待直接放到ready队列中.  
  20.             60, // 处理任务的时间.  
  21.             $body // 任务内容  
  22.         );  
  23.         if (!$put) {  
  24.             return false;   
  25.         }  
  26.         $beanstalk->disconnect();  
  27.     }  
  28.   
  29.   
  30.     //处理消息  
  31.     public static function handleMessage(){  
  32.         //实例化beanstalk  
  33.         $beanstalk = new Socket_Beanstalk();  
  34.   
  35.         if (!$beanstalk->connect()) {  
  36.             exit(current($beanstalk->errors()));  
  37.         }  
  38.   
  39.         $beanstalk->useTube(‘hube_send_smss‘);  
  40.         //设置要监听的tube  
  41.         $beanstalk->watch(‘hube_send_smss‘);  
  42.         //取消对默认tube的监听,可以省略  
  43.         $beanstalk->ignore(‘default‘);  
  44.   
  45.   
  46.   
  47.         while($job = $beanstalk->reserve(2)){//这里我写了单个任务只执行2秒。防止超时。本地测试的时候 没写超时会一直执行下去,哪怕队列里面已经没有任务  
  48.             //处理任务  
  49.             $result = $job[‘body‘];  
  50.             echo $job[‘id‘];echo ‘<br>‘;//打印任务ID  
  51.             if ($result) {<span style="font-family: Arial, Helvetica, sans-serif;">//这里可以写逻辑 todo</span>  
[php] view plaincopy在CODE上查看代码片派生到我的代码片
  1.             //删除任务  
  2.                 $beanstalk->delete($job[‘id‘]);  
  3.             } else {  
  4.             //休眠任务  
  5.                 $beanstalk->bury($job[‘id‘],‘‘);  
  6.             }  
  7.   
  8.         }  
  9.         $beanstalk->disconnect();  
  10.     }  

 

 

下方提供我找到的队列PHP版本core,不能上传附件。代码在下面

 

[php] view plaincopy在CODE上查看代码片派生到我的代码片
  1. /** 
  2.  * beanstalk: A minimalistic PHP beanstalk client. 
  3.  * 
  4.  * Copyright (c) 2009-2012 David Persson 
  5.  * 
  6.  * Distributed under the terms of the MIT License. 
  7.  * Redistributions of files must retain the above copyright notice. 
  8.  * 
  9.  * @copyright  2009-2012 David Persson <nperson@gmx.de> 
  10.  * @license    http://www.opensource.org/licenses/mit-license.php The MIT License 
  11.  * @link       http://github.com/davidpersson/beanstalk 
  12.  */  
  13.   
  14. /** 
  15.  * An interface to the beanstalk queue service. Implements the beanstalk 
  16.  * protocol spec 1.2. Where appropriate the documentation from the protcol has 
  17.  * been added to the docblocks in this class. 
  18.  * 
  19.  * @link https://github.com/kr/beanstalkd/blob/master/doc/protocol.txt 
  20.  */  
  21. class Socket_Beanstalk {  
  22.   
  23.     /** 
  24.      * Holds a boolean indicating whether a connection to the server is 
  25.      * currently established or not. 
  26.      * 
  27.      * @var boolean 
  28.      */  
  29.     public $connected = false;  
  30.   
  31.     /** 
  32.      * Holds configuration values. 
  33.      * 
  34.      * @var array 
  35.      */  
  36.     protected $_config = array();  
  37.   
  38.     /** 
  39.      * The current connection resource handle (if any). 
  40.      * 
  41.      * @var resource 
  42.      */  
  43.     protected $_connection;  
  44.   
  45.     /** 
  46.      * Generated errors. Will hold a maximum of 200 error messages at any time 
  47.      * to prevent pilling up messages and using more and more memory. This is 
  48.      * especially important if this class is used in long-running workers. 
  49.      * 
  50.      * @see Socket_Beanstalk::errors() 
  51.      * @see Socket_Beanstalk::_error() 
  52.      * @var array 
  53.      */  
  54.     protected $_errors = array();  
  55.   
  56.     /** 
  57.      * Constructor. 
  58.      * 
  59.      * @param array $config An array of configuration values: 
  60.      *        - `‘persistent‘`  Whether to make the connection persistent or 
  61.      *                          not, defaults to `true` as the FAQ recommends 
  62.      *                          persistent connections. 
  63.      *        - `‘host‘`        The beanstalk server hostname or IP address to 
  64.      *                          connect to, defaults to `127.0.0.1`. 
  65.      *        - `‘port‘`        The port of the server to connect to, defaults 
  66.      *                          to `11300`. 
  67.      *        - `‘timeout‘`     Timeout in seconds when establishing the 
  68.      *                          connection, defaults to `1`. 
  69.      * @return void 
  70.      */  
  71.     public function __construct(array $config = array()) {  
  72.         $defaults = array(  
  73.             ‘persistent‘ => true,  
  74.             ‘host‘ => ‘127.0.0.1‘,  
  75.             ‘port‘ => 11300,  
  76.             ‘timeout‘ => 1  
  77.         );  
  78.         $this->_config = $config + $defaults;  
  79.     }  
  80.   
  81.     /** 
  82.      * Destructor, disconnects from the server. 
  83.      * 
  84.      * @return void 
  85.      */  
  86.     public function __destruct() {  
  87.         $this->disconnect();  
  88.     }  
  89.   
  90.     /** 
  91.      * Initiates a socket connection to the beanstalk server. The resulting 
  92.      * stream will not have any timeout set on it. Which means it can wait an 
  93.      * unlimited amount of time until a packet becomes available. This is 
  94.      * required for doing blocking reads. 
  95.      * 
  96.      * @see Socket_Beanstalk::$_connection 
  97.      * @see Socket_Beanstalk::reserve() 
  98.      * @return boolean `true` if the connection was established, `false` otherwise. 
  99.      */  
  100.     public function connect() {  
  101.         if (isset($this->_connection)) {  
  102.             $this->disconnect();  
  103.         }  
  104.   
  105.         $function = $this->_config[‘persistent‘] ? ‘pfsockopen‘ : ‘fsockopen‘;  
  106.         $params = array($this->_config[‘host‘], $this->_config[‘port‘], &$errNum, &$errStr);  
  107.   
  108.         if ($this->_config[‘timeout‘]) {  
  109.             $params[] = $this->_config[‘timeout‘];  
  110.         }  
  111.         $this->_connection = @call_user_func_array($function, $params);  
  112.   
  113.         if (!empty($errNum) || !empty($errStr)) {  
  114.             $this->_error("{$errNum}: {$errStr}");  
  115.         }  
  116.   
  117.         $this->connected = is_resource($this->_connection);  
  118.   
  119.         if ($this->connected) {  
  120.             stream_set_timeout($this->_connection, -1);  
  121.         }  
  122.         return $this->connected;  
  123.     }  
  124.   
  125.     /** 
  126.      * Closes the connection to the beanstalk server. 
  127.      * 
  128.      * @return boolean `true` if diconnecting was successful. 
  129.      */  
  130.     public function disconnect() {  
  131.         if (!is_resource($this->_connection)) {  
  132.             $this->connected = false;  
  133.         } else {  
  134.             $this->connected = !fclose($this->_connection);  
  135.   
  136.             if (!$this->connected) {  
  137.                 $this->_connection = null;  
  138.             }  
  139.         }  
  140.         return !$this->connected;  
  141.     }  
  142.   
  143.     /** 
  144.      * Returns collected error messages. 
  145.      * 
  146.      * @return array An array of error messages. 
  147.      */  
  148.     public function errors() {  
  149.         return $this->_errors;  
  150.     }  
  151.   
  152.     /** 
  153.      * Pushes an error message to `Beanstalk::$_errors`. Ensures 
  154.      * that at any point there are not more than 200 messages. 
  155.      * 
  156.      * @param string $message The error message. 
  157.      * @return void 
  158.      */  
  159.     protected function _error($message) {  
  160.         if (count($this->_errors) >= 200) {  
  161.             array_shift($this->_errors);  
  162.         }  
  163.         array_push($this->_errors, $message);  
  164.     }  
  165.   
  166.     /** 
  167.      * Writes a packet to the socket. Prior to writing to the socket will check 
  168.      * for availability of the connection. 
  169.      * 
  170.      * @param string $data 
  171.      * @return integer|boolean number of written bytes or `false` on error. 
  172.      */  
  173.     protected function _write($data) {  
  174.         if (!$this->connected && !$this->connect()) {  
  175.             return false;  
  176.         }  
  177.   
  178.         $data .= "\r\n";  
  179.         return fwrite($this->_connection, $data, strlen($data));  
  180.     }  
  181.   
  182.     /** 
  183.      * Reads a packet from the socket. Prior to reading from the socket will 
  184.      * check for availability of the connection. 
  185.      * 
  186.      * @param int $length Number of bytes to read. 
  187.      * @return string|boolean Data or `false` on error. 
  188.      */  
  189.     protected function _read($length = null) {  
  190.         if (!$this->connected && !$this->connect()) {  
  191.             return false;  
  192.         }  
  193.         if ($length) {  
  194.             if (feof($this->_connection)) {  
  195.                 return false;  
  196.             }  
  197.             $data = fread($this->_connection, $length + 2);  
  198.             $meta = stream_get_meta_data($this->_connection);  
  199.   
  200.             if ($meta[‘timed_out‘]) {  
  201.                 $this->_error(‘Connection timed out.‘);  
  202.                 return false;  
  203.             }  
  204.             $packet = rtrim($data, "\r\n");  
  205.         } else {  
  206.             $packet = stream_get_line($this->_connection, 16384, "\r\n");  
  207.         }  
  208.         return $packet;  
  209.     }  
  210.   
  211.     /* Producer Commands */  
  212.   
  213.     /** 
  214.      * The `put` command is for any process that wants to insert a job into the queue. 
  215.      * 
  216.      * @param integer $pri Jobs with smaller priority values will be scheduled 
  217.      *        before jobs with larger priorities. The most urgent priority is 
  218.      *        0; the least urgent priority is 4294967295. 
  219.      * @param integer $delay Seconds to wait before putting the job in the 
  220.      *        ready queue.  The job will be in the "delayed" state during this time. 
  221.      * @param integer $ttr Time to run - Number of seconds to allow a worker to 
  222.      *        run this job.  The minimum ttr is 1. 
  223.      * @param string $data The job body. 
  224.      * @return integer|boolean `false` on error otherwise an integer indicating 
  225.      *         the job id. 
  226.      */  
  227.     public function put($pri, $delay, $ttr, $data) {  
  228.         $this->_write(sprintf(‘put %d %d %d %d‘, $pri, $delay, $ttr, strlen($data)));  
  229.         $this->_write($data);  
  230.         $status = strtok($this->_read(), ‘ ‘);  
  231.   
  232.         switch ($status) {  
  233.             case ‘INSERTED‘:  
  234.             case ‘BURIED‘:  
  235.                 return (integer)strtok(‘ ‘); // job id  
  236.             case ‘EXPECTED_CRLF‘:  
  237.             case ‘JOB_TOO_BIG‘:  
  238.             default:  
  239.                 $this->_error($status);  
  240.                 return false;  
  241.         }  
  242.     }  
  243.   
  244.     /** 
  245.      * The `use` command is for producers. Subsequent put commands will put jobs into 
  246.      * the tube specified by this command. If no use command has been issued, jobs 
  247.      * will be put into the tube named `default`. 
  248.      * 
  249.      * Please note that while obviously this method should better be named 
  250.      * `use` it is not. This is because `use` is a reserved keyword in PHP. 
  251.      * 
  252.      * @param string $tube A name at most 200 bytes. It specifies the tube to 
  253.      *        use.  If the tube does not exist, it will be created. 
  254.      * @return string|boolean `false` on error otherwise the name of the tube. 
  255.      */  
  256.     public function choose($tube) {  
  257.         $this->_write(sprintf(‘use %s‘, $tube));  
  258.         $status = strtok($this->_read(), ‘ ‘);  
  259.   
  260.         switch ($status) {  
  261.             case ‘USING‘:  
  262.                 return strtok(‘ ‘);  
  263.             default:  
  264.                 $this->_error($status);  
  265.                 return false;  
  266.         }  
  267.     }  
  268.   
  269.     /** 
  270.      * Alias for choose. 
  271.      * 
  272.      * @see Socket_Beanstalk::choose() 
  273.      * @param string $tube 
  274.      * @return string|boolean 
  275.      */  
  276.     public function useTube($tube) {  
  277.         return $this->choose($tube);  
  278.     }  
  279.   
  280.     /* Worker Commands */  
  281.   
  282.     /** 
  283.      * Reserve a job (with a timeout) 
  284.      * 
  285.      * @param integer $timeout If given specifies number of seconds to wait for 
  286.      *        a job. 0 returns immediately. 
  287.      * @return array|false `false` on error otherwise an array holding job id 
  288.      *         and body. 
  289.      */  
  290.     public function reserve($timeout = null) {  
  291.         if (isset($timeout)) {  
  292.             $this->_write(sprintf(‘reserve-with-timeout %d‘, $timeout));  
  293.         } else {  
  294.             $this->_write(‘reserve‘);  
  295.         }  
  296.         $status = strtok($this->_read(), ‘ ‘);  
  297.   
  298.         switch ($status) {  
  299.             case ‘RESERVED‘:  
  300.                 return array(  
  301.                     ‘id‘ => (integer)strtok(‘ ‘),  
  302.                     ‘body‘ => $this->_read((integer)strtok(‘ ‘))  
  303.                 );  
  304.             case ‘DEADLINE_SOON‘:  
  305.             case ‘TIMED_OUT‘:  
  306.             default:  
  307.                 $this->_error($status);  
  308.                 return false;  
  309.         }  
  310.     }  
  311.   
  312.     /** 
  313.      * Removes a job from the server entirely. 
  314.      * 
  315.      * @param integer $id The id of the job. 
  316.      * @return boolean `false` on error, `true` on success. 
  317.      */  
  318.     public function delete($id) {  
  319.         $this->_write(sprintf(‘delete %d‘, $id));  
  320.         $status = $this->_read();  
  321.   
  322.         switch ($status) {  
  323.             case ‘DELETED‘:  
  324.                 return true;  
  325.             case ‘NOT_FOUND‘:  
  326.             default:  
  327.                 $this->_error($status);  
  328.                 return false;  
  329.         }  
  330.     }  
  331.   
  332.     /** 
  333.      * Puts a reserved job back into the ready queue. 
  334.      * 
  335.      * @param integer $id The id of the job. 
  336.      * @param integer $pri Priority to assign to the job. 
  337.      * @param integer $delay Number of seconds to wait before putting the job in the ready queue. 
  338.      * @return boolean `false` on error, `true` on success. 
  339.      */  
  340.     public function release($id, $pri, $delay) {  
  341.         $this->_write(sprintf(‘release %d %d %d‘, $id, $pri, $delay));  
  342.         $status = $this->_read();  
  343.   
  344.         switch ($status) {  
  345.             case ‘RELEASED‘:  
  346.             case ‘BURIED‘:  
  347.                 return true;  
  348.             case ‘NOT_FOUND‘:  
  349.             default:  
  350.                 $this->_error($status);  
  351.                 return false;  
  352.         }  
  353.     }  
  354.   
  355.     /** 
  356.      * Puts a job into the `buried` state Buried jobs are put into a FIFO 
  357.      * linked list and will not be touched until a client kicks them. 
  358.      * 
  359.      * @param integer $id The id of the job. 
  360.      * @param integer $pri *New* priority to assign to the job. 
  361.      * @return boolean `false` on error, `true` on success. 
  362.      */  
  363.     public function bury($id, $pri) {  
  364.         $this->_write(sprintf(‘bury %d %d‘, $id, $pri));  
  365.         $status = $this->_read();  
  366.   
  367.         switch ($status) {  
  368.             case ‘BURIED‘:  
  369.                 return true;  
  370.             case ‘NOT_FOUND‘:  
  371.             default:  
  372.                 $this->_error($status);  
  373.                 return false;  
  374.         }  
  375.     }  
  376.   
  377.     /** 
  378.      * Allows a worker to request more time to work on a job 
  379.      * 
  380.      * @param integer $id The id of the job. 
  381.      * @return boolean `false` on error, `true` on success. 
  382.      */  
  383.     public function touch($id) {  
  384.         $this->_write(sprintf(‘touch %d‘, $id));  
  385.         $status = $this->_read();  
  386.   
  387.         switch ($status) {  
  388.             case ‘TOUCHED‘:  
  389.                 return true;  
  390.             case ‘NOT_TOUCHED‘:  
  391.             default:  
  392.                 $this->_error($status);  
  393.                 return false;  
  394.         }  
  395.     }  
  396.   
  397.     /** 
  398.      * Adds the named tube to the watch list for the current 
  399.      * connection. 
  400.      * 
  401.      * @param string $tube Name of tube to watch. 
  402.      * @return integer|boolean `false` on error otherwise number of tubes in watch list. 
  403.      */  
  404.     public function watch($tube) {  
  405.         $this->_write(sprintf(‘watch %s‘, $tube));  
  406.         $status = strtok($this->_read(), ‘ ‘);  
  407.   
  408.         switch ($status) {  
  409.             case ‘WATCHING‘:  
  410.                 return (integer)strtok(‘ ‘);  
  411.             default:  
  412.                 $this->_error($status);  
  413.                 return false;  
  414.         }  
  415.     }  
  416.   
  417.     /** 
  418.      * Remove the named tube from the watch list. 
  419.      * 
  420.      * @param string $tube Name of tube to ignore. 
  421.      * @return integer|boolean `false` on error otherwise number of tubes in watch list. 
  422.      */  
  423.     public function ignore($tube) {  
  424.         $this->_write(sprintf(‘ignore %s‘, $tube));  
  425.         $status = strtok($this->_read(), ‘ ‘);  
  426.   
  427.         switch ($status) {  
  428.             case ‘WATCHING‘:  
  429.                 return (integer)strtok(‘ ‘);  
  430.             case ‘NOT_IGNORED‘:  
  431.             default:  
  432.                 $this->_error($status);  
  433.                 return false;  
  434.         }  
  435.     }  
  436.   
  437.     /* Other Commands */  
  438.   
  439.     /** 
  440.      * Inspect a job by its id. 
  441.      * 
  442.      * @param integer $id The id of the job. 
  443.      * @return string|boolean `false` on error otherwise the body of the job. 
  444.      */  
  445.     public function peek($id) {  
  446.         $this->_write(sprintf(‘peek %d‘, $id));  
  447.         return $this->_peekRead();  
  448.     }  
  449.   
  450.     /** 
  451.      * Inspect the next ready job. 
  452.      * 
  453.      * @return string|boolean `false` on error otherwise the body of the job. 
  454.      */  
  455.     public function peekReady() {  
  456.         $this->_write(‘peek-ready‘);  
  457.         return $this->_peekRead();  
  458.     }  
  459.   
  460.     /** 
  461.      * Inspect the job with the shortest delay left. 
  462.      * 
  463.      * @return string|boolean `false` on error otherwise the body of the job. 
  464.      */  
  465.     public function peekDelayed() {  
  466.         $this->_write(‘peek-delayed‘);  
  467.         return $this->_peekRead();  
  468.     }  
  469.   
  470.     /** 
  471.      * Inspect the next job in the list of buried jobs. 
  472.      * 
  473.      * @return string|boolean `false` on error otherwise the body of the job. 
  474.      */  
  475.     public function peekBuried() {  
  476.         $this->_write(‘peek-buried‘);  
  477.         return $this->_peekRead();  
  478.     }  
  479.   
  480.     /** 
  481.      * Handles response for all peek methods. 
  482.      * 
  483.      * @return string|boolean `false` on error otherwise the body of the job. 
  484.      */  
  485.     protected function _peekRead() {  
  486.         $status = strtok($this->_read(), ‘ ‘);  
  487.   
  488.         switch ($status) {  
  489.             case ‘FOUND‘:  
  490.                 return array(  
  491.                     ‘id‘ => (integer)strtok(‘ ‘),  
  492.                     ‘body‘ => $this->_read((integer)strtok(‘ ‘))  
  493.                 );  
  494.             case ‘NOT_FOUND‘:  
  495.             default:  
  496.                 $this->_error($status);  
  497.                 return false;  
  498.         }  
  499.     }  
  500.   
  501.     /** 
  502.      * Moves jobs into the ready queue (applies to the current tube). 
  503.      * 
  504.      * If there are buried jobs those get kicked only otherwise 
  505.      * delayed jobs get kicked. 
  506.      * 
  507.      * @param integer $bound Upper bound on the number of jobs to kick. 
  508.      * @return integer|boolean False on error otherwise number of job kicked. 
  509.      */  
  510.     public function kick($bound) {  
  511.         $this->_write(sprintf(‘kick %d‘, $bound));  
  512.         $status = strtok($this->_read(), ‘ ‘);  
  513.   
  514.         switch ($status) {  
  515.             case ‘KICKED‘:  
  516.                 return (integer)strtok(‘ ‘);  
  517.             default:  
  518.                 $this->_error($status);  
  519.                 return false;  
  520.         }  
  521.     }  
  522.   
  523.     /* Stats Commands */  
  524.   
  525.     /** 
  526.      * Gives statistical information about the specified job if it exists. 
  527.      * 
  528.      * @param integer $id The job id 
  529.      * @return string|boolean `false` on error otherwise a string with a yaml formatted dictionary 
  530.      */  
  531.     public function statsJob($id) {  
  532.         $this->_write(sprintf(‘stats-job %d‘, $id));  
  533.         return $this->_statsRead();  
  534.     }  
  535.   
  536.     /** 
  537.      * Gives statistical information about the specified tube if it exists. 
  538.      * 
  539.      * @param string $tube Name of the tube. 
  540.      * @return string|boolean `false` on error otherwise a string with a yaml formatted dictionary. 
  541.      */  
  542.     public function statsTube($tube) {  
  543.         $this->_write(sprintf(‘stats-tube %s‘, $tube));  
  544.         return $this->_statsRead();  
  545.     }  
  546.   
  547.     /** 
  548.      * Gives statistical information about the system as a whole. 
  549.      * 
  550.      * @return string|boolean `false` on error otherwise a string with a yaml formatted dictionary. 
  551.      */  
  552.     public function stats() {  
  553.         $this->_write(‘stats‘);  
  554.         return $this->_statsRead();  
  555.     }  
  556.   
  557.     /** 
  558.      * Returns a list of all existing tubes. 
  559.      * 
  560.      * @return string|boolean `false` on error otherwise a string with a yaml formatted list. 
  561.      */  
  562.     public function listTubes() {  
  563.         $this->_write(‘list-tubes‘);  
  564.         return $this->_statsRead();  
  565.     }  
  566.   
  567.     /** 
  568.      * Returns the tube currently being used by the producer. 
  569.      * 
  570.      * @return string|boolean `false` on error otherwise a string with the name of the tube. 
  571.      */  
  572.     public function listTubeUsed() {  
  573.         $this->_write(‘list-tube-used‘);  
  574.         $status = strtok($this->_read(), ‘ ‘);  
  575.   
  576.         switch ($status) {  
  577.             case ‘USING‘:  
  578.                 return strtok(‘ ‘);  
  579.             default:  
  580.                 $this->_error($status);  
  581.                 return false;  
  582.         }  
  583.     }  
  584.   
  585.     /** 
  586.      * Alias for listTubeUsed. 
  587.      * 
  588.      * @see Socket_Beanstalk::listTubeUsed() 
  589.      * @return string|boolean `false` on error otherwise a string with the name of the tube. 
  590.      */  
  591.     public function listTubeChosen() {  
  592.         return $this->listTubeUsed();  
  593.     }  
  594.   
  595.     /** 
  596.      * Returns a list of tubes currently being watched by the worker. 
  597.      * 
  598.      * @return string|boolean `false` on error otherwise a string with a yaml formatted list. 
  599.      */  
  600.     public function listTubesWatched() {  
  601.         $this->_write(‘list-tubes-watched‘);  
  602.         return $this->_statsRead();  
  603.     }  
  604.   
  605.     /** 
  606.      * Handles responses for all stat methods. 
  607.      * 
  608.      * @param boolean $decode Whether to decode data before returning it or not. Default is `true`. 
  609.      * @return array|string|boolean `false` on error otherwise statistical data. 
  610.      */  
  611.     protected function _statsRead($decode = true) {  
  612.         $status = strtok($this->_read(), ‘ ‘);  
  613.   
  614.         switch ($status) {  
  615.             case ‘OK‘:  
  616.                 $data = $this->_read((integer)strtok(‘ ‘));  
  617.                 return $decode ? $this->_decode($data) : $data;  
  618.             default:  
  619.                 $this->_error($status);  
  620.                 return false;  
  621.         }  
  622.     }  
  623.   
  624.     /** 
  625.      * Decodes YAML data. This is a super naive decoder which just works on a 
  626.      * subset of YAML which is commonly returned by beanstalk. 
  627.      * 
  628.      * @param string $data The data in YAML format, can be either a list or a dictionary. 
  629.      * @return array An (associative) array of the converted data. 
  630.      */  
  631.     protected function _decode($data) {  
  632.         $data = array_slice(explode("\n", $data), 1);  
  633.         $result = array();  
  634.   
  635.         foreach ($data as $key => $value) {  
  636.             if ($value[0] === ‘-‘) {  
  637.                 $value = ltrim($value, ‘- ‘);  
  638.             } elseif (strpos($value, ‘:‘) !== false) {  
  639.                 list($key, $value) = explode(‘:‘, $value);  
  640.                 $value = ltrim($value, ‘ ‘);  
  641.             }  
  642.             if (is_numeric($value)) {  
  643.                 $value = (integer) $value == $value ? (integer) $value : (float) $value;  
  644.             }  
  645.             $result[$key] = $value;  
  646.         }  
  647.         return $result;  
  648.     }  
  649. }  


 

安装成功是上面这样子

YII使用beanstalk队列