首页 > 代码库 > gearman 分布式系统学习 python

gearman 分布式系统学习 python

前段时间遇到了这样一个需求

某一客户端向一个服务器提交任务

服务器再将分发下去由对于的工作人员来完成

前辈告诉我用gearman搭建一个分布式系统。gearman有三个部分,client、service和worker

client:提交任务

service:分配任务

worker:执行任务

它可以实现的效果就是一台机器上搭建好了服务器

另外可以多台机器作为客户端,可以一起提交任务,然后服务器会用个队列来存起来

然后就是起多台机器作为worker去服务器要任务

把每个步骤都分布到了不同的主机上,这就是典型的分布式系统。

(感觉这个这就是一个加强版的多线程机制,一个线程提交任务到队列,起多个线程去队列中去。就是一个网游一个是单机)

 

详细介绍可以看官网Gearman,官网上还有各种语言的例子

配置过程的话跟着网上来就可以,前辈说linux就是配置特别麻烦,等配置好了之后,用起来就方便了

 

gearman client

 提交任务用法很简单,以下是用例

gm_client = gearman.GearmanClient([‘localhost:4730‘])    gm_client.submit_job(GRARMAN_TASK_NAME, data, priority=gearman.PRIORITY_HIGH, background=True)  

记得先引包 import gearman

[‘localhost:4730‘] 就是gearman服务器的位置,端口默认是4730

然后client有一个submit_job的方法,下面有该函数的源码,有一堆参数意思就和名字一样。如background 参数就是提交后台任务False就是等待返回结果,适用于大量提交任务。

其中最重要的参数 就是 task  在测试代码中我填写是 GRARMAN_TASK_NAME 。这个是任务的唯一标识,就是可以有多个client提交任务,但服务器怎么识别任务呢,就是靠这参数,当然worker也是靠这个参数识别。(我第一次写的时候把这个参数写成了“echo”结果提交了一坨任务,但我自己的worker只收到几个,我调试了好久才发现。。。)

gearman.client.submit_job 源码

 

 

1     def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):2         """Submit a single job to any gearman server"""3         job_info = dict(task=task, data=http://www.mamicode.com/data, unique=unique, priority=priority)4         completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout)5         return gearman.util.unlist(completed_job_list)

 

gearman worker

worker里面的话,除了GRARMAN_TASK_NAME  和 [‘localhost:4730‘]值得注意意外,还有一个地方就是每个worker都需要注册一下自己要做的任务,就是下面代码中的

gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)

注册的名字需要和对应client中的一致,后面一个参数是一个方法,写的格式也是代码中的那样,相当于就是拿到了任务怎么干,这个怎么干的过程就写到方法里面

 

 

gm_worker = gearman.GearmanWorker([localhost:4730])    def task_listener_reverse(gearman_worker, gearman_job):          #gearman_job 就是client端传过来的数据     print "这里是想要干的事"         return gearman_job.data[::-1]#返回数据的逆序 #GRARMAN_TASK_NAME  这个名字需要是任务的唯一标识gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)

 

 

 


 

GearmanAdminClient

今天有一个需求

得到gearman服务上有多少个job,又有多少worker正在工作

然后根据job和worker的数量进行一些相应的调整工作

突然发现gearman中GearmanAdminClient有以下两个方法,瞬间完成任务

 

def get_workers(self):    """Retrieves a list of workers and reports what tasks they‘re operating on"""    self.establish_admin_connection()    self.current_handler.send_text_command(GEARMAN_SERVER_COMMAND_WORKERS)    return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_WORKERS)def get_status(self):    """Retrieves a list of all registered tasks and reports how many items/workers are in the queue"""    self.establish_admin_connection()    self.current_handler.send_text_command(GEARMAN_SERVER_COMMAND_STATUS)    return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_STATUS)

测试代码

    ad_client=gearman.GearmanAdminClient([‘localhost:4730‘])        list=ad_client.get_workers()    for row in list:        print row          print "\n"        list=ad_client.get_status()    for row in list:        print row

部分结果展示

{‘file_descriptor‘: ‘34‘, ‘tasks‘: (), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘}{‘file_descriptor‘: ‘50‘, ‘tasks‘: (‘resize‘, ‘like‘, ‘dislike‘), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘}{‘file_descriptor‘: ‘46‘, ‘tasks‘: (‘resize‘, ‘like‘, ‘dislike‘), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘}{‘file_descriptor‘: ‘59‘, ‘tasks‘: (‘add_phone_info‘,), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘}{‘file_descriptor‘: ‘55‘, ‘tasks‘: (‘add_phone_info‘,), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘}{‘workers‘: 0, ‘running‘: 0, ‘task‘: ‘apkcrawler‘, ‘queued‘: 22028}{‘workers‘: 0, ‘running‘: 0, ‘task‘: ‘reverse‘, ‘queued‘: 0}{‘workers‘: 1, ‘running‘: 0, ‘task‘: ‘echo‘, ‘queued‘: 0}{‘workers‘: 10, ‘running‘: 0, ‘task‘: ‘add_phone_info‘, ‘queued‘: 0}{‘workers‘: 0, ‘running‘: 0, ‘task‘: ‘write_hbase‘, ‘queued‘: 0}{‘workers‘: 0, ‘running‘: 0, ‘task‘: ‘write_amazon‘, ‘queued‘: 0}{‘workers‘: 10, ‘running‘: 0, ‘task‘: ‘dislike‘, ‘queued‘: 0}

  

gearman 分布式系统学习 python