首页 > 代码库 > [转]PHP利用Gearman来处理并行多进程问题

[转]PHP利用Gearman来处理并行多进程问题

From : http://www.yuansir-web.com/2013/11/25/php%E5%88%A9%E7%94%A8gearman%E6%9D%A5%E5%A4%84%E7%90%86%E5%B9%B6%E8%A1%8C%E5%A4%9A%E8%BF%9B%E7%A8%8B%E9%97%AE%E9%A2%98/

最近工作中开发的一套系统,其中很多都是需要操作多服务器的,比如需要同时发布数据到2000个服务器上,或者同时向2000个服务器拉取数据。刚开始的解决方案就是单纯用PHP的curl_multi的方式并发处理请求,而且参考了淘宝技术博客的《Rolling cURL: PHP并发最佳实践》,但是由于网络和数据以及各个服务器等等的一些情况导致这种并发处理的响应时间很慢,因为在并发请求的过程中还包括记录日志,处理数据等逻辑,等待处理结果并返回,所以也不能友好的满足后台操作的体验。

现在重新设计一种方案,利Gearman来实现并发的需求。通过Client将请求发送到Gearman的Jobs,在每个Work中来再来进行curl_multi和数据处理和日志等一些操作,同时用Supervisor来监控Gearman以及Works的进程,这样可以实现一个并行的多进程和负载均衡的方案。

Gearman可以做什么

  • 异步处理:图片处理,订单处理,批量邮件/通知之类的
  • 要求高CPU或内存的处理:大容量的数据处理,MapReduce运算,日志聚集,视频编码
  • 分布式和并行的处理
  • 定时处理:增量更新,数据复制
  • 限制速率的FIFO处理
  • 分布式的系统监控任务

 

Gearman工作原理
使用Gearman的应用通常有三部分组成:一个Client、一个Worker、一个 任务服务器。 Client的作用是提出一个 Job 任务 交给 Job Server 任务服务器。Job Server 会去寻找一个 合适的 Worker 来完成这项任务。Worker 执行由 Client 发送过来的 Job,并且将结果通过 Job Server 返回给 Client。Gearman 提供了 Client 和 Worker 的 API,利用这些API 应用可以同 Gearman Job Server来进行通信。Gearman 内部 Client 和 Worker 之间的通信都是通过 TCP 连接来进行的。

stackGearman可以将工作的负载分担到不同的机器中。

cluster

安装配置
我只是记录下我安装配置的过程,我在Ubuntu和CentOS中都试了下。
CentOS YUM 安装

1 rpm -ivh http://dl.iuscommunity.org/pub/ius/stable/Redhat/6/x86_64/epel-release-6-5.noarch.rpm
2 yum install -y gearmand

Ubuntu apt 安装

1 apt-get install gearman

源码编译

1 yum install uuid-devel libuuid libuuid-devel uuid boost-devel libevent libevent-devel
2 wget -c https://launchpad.net/gearmand/1.2/1.1.7/+download/gearmand-1.1.7.tar.gz
3 tar zxvf gearmand-1.1.7.tar.gz
4 ./configure --prefix=/usr/local/gearmand
5 make && make install

安装好以后启动

1 gearmand -d

加上-d参数是表示后台运行,你可以gearmand -h 来查看其它的选项,启动的时候带上其它配置参数

1 /usr/sbin/gearmand --pid-file=/var/run/gearman/gearmand.pid --user=gearman --daemon --log-file=/var/log/gearman-job-server/gearman.log --listen=127.0.0.1

安装PHP Gearman扩展
我都是用pcel来安装的,你也可以下载源码包来编译安装,但是记得要先安装libgearmanre2c,不然扩展编译安装会出错。

01 pecl install gearman #不成功并提示版本问题可以试试 pecl install gearman-1.0.3,默认好像是1.1.2
02 1
03 编译安装也很简单
04 1
05 wget -c http://pecl.php.net/get/gearman-1.1.1.tgz
06 tar zxvf gearman-1.1.1.tgz
07 phpize
08 ./configure
09 make && make install
10 echo "extension=gearman.so" >> /etc/php.ini

PHP接口函数
Gearman提供很多完善的扩展函数,包括GearmanClient,GearmanJob,GearmanTask,GearmanWorker,具体可以查看PHP官方手册.
这是官方提供的Example其中的一个,相当与一个并发的分发任务处理的例子
gearman_client.php

01 <?php
02  
03 $client = new GearmanClient();
04 $client->addServer();
05  
06 // initialize the results of our 3 "query results" here
07 $userInfo = $friends = $posts = null;
08  
09 // This sets up what gearman will callback to as tasks are returned to us.
10 // The $context helps us know which function is being returned so we can
11 // handle it correctly.
12 $client->setCompleteCallback(function(GearmanTask $task, $context) use (&$userInfo, &$friends, &$posts) {
13 switch ($context)
14 {
15 case ‘lookup_user‘:
16 $userInfo = $task->data();
17 break;
18 case ‘baconate‘:
19 $friends = $task->data();
20 break;
21 case ‘get_latest_posts_by‘:
22 $posts = $task->data();
23 break;
24 }
25 });
26  
27 // Here we queue up multiple tasks to be execute in *as much* parallelism as gearmand can give us
28 $client->addTask(‘lookup_user‘, ‘joe@joe.com‘, ‘lookup_user‘);
29 $client->addTask(‘baconate‘, ‘joe@joe.com‘, ‘baconate‘);
30 $client->addTask(‘get_latest_posts_by‘, ‘joe@joe.com‘, ‘get_latest_posts_by‘);
31  
32 echo "Fetching...\n";
33 $start = microtime(true);
34 $client->runTasks();
35 $totaltime = number_format(microtime(true) - $start, 2);
36  
37 echo "Got user info in: $totaltime seconds:\n";
38 var_dump($userInfo, $friends, $posts);

gearman_work.php

01 <?php
02  
03 $worker = new GearmanWorker();
04 $worker->addServer();
05  
06 $worker->addFunction(‘lookup_user‘, function(GearmanJob $job) {
07 // normally you‘d so some very safe type checking and query binding to a database here.
08 // ...and we‘re gonna fake that.
09 sleep(3);
10 return ‘The user requested (‘ . $job->workload() . ‘) is 7 feet tall and awesome‘;
11 });
12  
13 $worker->addFunction(‘baconate‘, function(GearmanJob $job) {
14 sleep(3);
15 return ‘The user (‘ . $job->workload() . ‘) is 1 degree away from Kevin Bacon‘;
16 });
17  
18 $worker->addFunction(‘get_latest_posts_by‘, function(GearmanJob $job) {
19 sleep(3);
20 return ‘The user (‘ . $job->workload() . ‘) has no posts, sorry!‘;
21 });
22  
23 while ($worker->work());

我在3个终端中都执行了gearman_work.php

1 ryan@ryan-lamp:~$ ps aux | grep gearman* | grep -v grep
2 gearman 1504 0.0 0.1 60536 1264 ? Ssl 11:06 0:00 /usr/sbin/gearmand --pid-file=/var/run/gearman/gearmand.pid --user=gearman --daemon --log-file=/var/log/gearman-job-server/gearman.log --listen=127.0.0.1
3 ryan 2992 0.0 0.8 43340 9036 pts/0 S+ 14:05 0:00 php /var/www/gearmand_work.php
4 ryan 3713 0.0 0.8 43340 9036 pts/1 S+ 14:05 0:00 php /var/www/gearmand_work.php
5 ryan 3715 0.0 0.8 43340 9036 pts/2 S+ 14:05 0:00 php /var/www/gearmand_work.php

来查看下执行gearman_work.php的结果shell

1 Fetching...
2 Got user info in: 3.03 seconds:
3 string(59) "The user requested (joe@joe.com) is 7 feet tall and awesome"
4 string(56) "The user (joe@joe.com) is 1 degree away from Kevin Bacon"
5 string(43) "The user (joe@joe.com) has no posts, sorry!"

看到上面的3.03 seconds,说明client请求过去的任务被并行分发执行了。
在实际的生产环境中,为了监测gearmand和work的进程没有被意外退出,我们可以借助Supervisor这个工具,下次我再单独来写个Supervisor的笔记。