首页 > 代码库 > 分布式计算之异步计算(Gearman示例)
分布式计算之异步计算(Gearman示例)
1、异步计算
分布式计算听起来有点高大上,如果说异步计算,估计了解的人多了。我们在日常的工作和生活中,一般都能遇到或者用到异步计算。
比如年底要做很多的报表,领导把需要的报表安排下来,我和我的团队去做统计。为了不耽误领导的时间,不需要领导站在我们屁股后面亲自督战。对领导来说,这个就是一个简单的异步计算模型了。
我们的团队在统计的时候,数据量很多,系统要运行很久,我们也没必要一直看进度条转圈圈,可以去做一下别的工作,或者吃点零食,这个也是异步计算。
负责统计的系统,也没有必要一直转圈圈,把任务在后台运行,前端还可以执行其他的查询,这个也是异步计算。
一个事情,就牵出来一大堆异步计算,看来生活中,真的是满常见的。:)
异步计算实现了事务的计算的转移,但是并不能降低事务本身的计算时间。
2、异步计算工具:Gearman
注:Gearman强悍的地方就是它可以支持不同语言直接的交互,client端是一种语言,worker端可以是另外一种语言!中间的交互具体转化细节都封装在jobserver中处理。
Gearman是一个用来把工作委派给其他机器、分布式的调用更适合做某项工作的机器、并发的做某项工作在多个调用间做负载均衡、或用来在调用其它语言的函数的系统。
一个Gearman请求的处理过程涉及三个角色:Client -> Job -> Worker。
Client:请求的发起者,可以是 C,PHP,Perl,MySQL UDF 等等。
Job:请求的调度者,用来负责协调把 Client发出的请求转发给合适的 Work。
Worker:请求的处理者,可以是 C,PHP,Perl等等。
因为 Client,Worker并不限制用一样的语言,所以有利于多语言多系统之间的集成。
甚至我们通过增加更多的 Worker,可以很方便的实现应用程序的异步计算。
3、Gearman样例
下面是例子是官网上面的字符串反转,供需要用的时候参考。
worker.php + client.java或worker.java +client.php
1. 环境:JDK1.7.0_25
所需jar包:java-gearman-service-0.6.6.jar, slf4j-api-1.6.4.jar,slf4j-simple-1.6.4.jar。可以直接到网上去下载。
2.代码:
Java语言
client端代码:
package com.gearman.demo;
import org.gearman.Gearman;
import org.gearman.GearmanClient;
import org.gearman.GearmanJobEvent;
import org.gearman.GearmanJobReturn;
import org.gearman.GearmanServer;
public class EchoClient {
public static void main(String... args) throws InterruptedException {
// 创建一个Gearman实例
Gearman gearman = Gearman.createGearman();
// 创建一个Gearman client
GearmanClient client = gearman.createGearmanClient();
/*
* 创建一个jobserver
*
* Parameter 1: job server的IP地址 Parameter 2: job server监听的端口
*
* job server收到client的job,并将其分发给注册worker
*/
GearmanServer server = gearman.createGearmanServer(
EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
// 告诉客户端,提交工作时它可以连接到该服务器
client.addServer(server);
/*
* 向job server提交工作
*
* Parameter 1: gearman function名字 Parameter 2:传送给job server和worker的数据
*
* GearmanJobReturn返回job发热结果
*/
GearmanJobReturn jobReturn = client.submitJob(
EchoWorker.ECHO_FUNCTION_NAME, ("Hello World!").getBytes());
// 遍历作业事件,直到我们打到最后文件
while (!jobReturn.isEOF()) {
// 下一个作业事件
GearmanJobEvent event = jobReturn.poll();
switch (event.getEventType()) {
case GEARMAN_JOB_SUCCESS: // job执行成功
System.out.println(new String(event.getData()));
break;
case GEARMAN_SUBMIT_FAIL: // job提交失败
case GEARMAN_JOB_FAIL: // job执行失败
System.err.println(event.getEventType() + ": "
+ new String(event.getData()));
default:
}
}
// 关闭
gearman.shutdown();
}
}
worker端代码:
package com.gearman.demo;
import org.gearman.Gearman;
import org.gearman.GearmanFunction;
import org.gearman.GearmanFunctionCallback;
import org.gearman.GearmanServer;
import org.gearman.GearmanWorker;
public class EchoWorker implements GearmanFunction {
// function name
public static final String ECHO_FUNCTION_NAME = "reverse";
// job server地址
public static final String ECHO_HOST = "10.10.115.23";
// job server监听的端口
public static final int ECHO_PORT = 4730;
public static void main(String... args) {
// 创建一个Gearman实例
Gearman gearman = Gearman.createGearman();
/*
* 创建一个jobserver
*
* Parameter 1: job server的IP地址 Parameter 2: job server监听的端口
*
* job server收到client的job,并将其分发给注册worker
*/
GearmanServer server = gearman.createGearmanServer(
EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
// 创建一个Gearman的worker
GearmanWorker worker = gearman.createGearmanWorker();
// 告诉工人如何执行工作(主要实现了GearmanFunction接口)
worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());
// worker连接服务器
worker.addServer(server);
}
@Override
public byte[] work(String function, byte[] data,
GearmanFunctionCallback callback) throws Exception {
// work方法实现了GearmanFunction接口中的work方法,本实例中进行了字符串的反写
if (data != null) {
String str = new String(data);
StringBuffer sb = new StringBuffer(str);
return sb.reverse().toString().getBytes();
} else {
return "未接收到data".getBytes();
}
}
}
*ECHO_HOST = "192.168.1.12"为安装了Gearman并开启geramand服务的主机地址
*int ECHO_PORT = 4730默认端口为4730
例如log4j.properties文件内容如下:
log4j.rootLogger=debug, stdout, R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=firestorm.log
log4j.appender.R.MaxFileSize=100KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n
log4j.logger.com.codefutures=DEBUG
PHP语言
client端:
<?php
$client = new GearmanClient();
$client->addServer();
print $client->do("reverse","Hello World!");
print "\n";
?>
worker端:
<?php
$worker= new GearmanWorker();
$worker->addServer();
$worker->addFunction("reverse","my_reverse_function");
echo "Starting worker ...";
while($worker->work());
function my_reverse_function($job)
{
return strrev($job->workload());
}
?>
3.测试
注:这里我将php文件放在:/home/user/projects/文件夹下,另外,*.java的文件在eclipse中新建的工程里面。
测试1:worker.php + EchoClient.java
$php worker.php
然后,再在eclipse中运行:EchoClient.java
eclipse console输出结果:
INFO - [10.10.115.23:4730] :Connected
INFO - [10.10.115.23:4730] : OUT :SUBMIT_JOB
INFO - [10.10.115.23:4730] : IN :JOB_CREATED
INFO - [10.10.115.23:4730] : IN :WORK_COMPLETE
!dlroW olleH
INFO - [10.10.115.23:4730] : Disconnected
运行成功!!
测试2:EchoWorker.java + client.php
先在eclipse中运行,EchoWorker.java
然后,再在命令行下输入:php client.php
命令行下输出结果:
!dlroW olleH
运行成功!!
分布式计算之异步计算(Gearman示例)