首页 > 代码库 > 聊聊returner(二)
聊聊returner(二)
关于returner的基础使用请参考returner文章。
继续上面的话题,这里编写c/s端来采集数据。
继续下面话题之前,你需要了解event、returner、zmq协议框架。
步骤:
1、在syndic上运行客户端程序,用来收集数据,其实就是master-minion架构。
2、收集的数据首先写入本地log中,其次发送到顶级master端。
3、顶级master运行服务端程序,用来接收数据,并写入本地数据库。
4、确保数据不丢失,采用zmq协议框架,使用REQ/REP套接字。
先说说客户端:
1、使用event去过滤事件,包含new_job和ret事件。
2、关于new_job的信息使用单线程发送。
3、关于ret事件使用多线程发送。
client.py 程序如下
#!/usr/bin/env python # coding: utf-8 import sys import threading import salt.utils.event import msgpack import zmq import config class JobProcess(object): ‘‘‘ return new job‘s jid and id to master ‘‘‘ def __init__(self,jid,minions): self.jid = jid self.minions = minions # a list that include lots of minion id def run(self): context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://%s:%d" % (config.server,config.server_port)) # use msgpack convert data for id in self.minions: data = {‘jid‘:self.jid,‘id‘:id} packed_data = msgpack.packb(data) socket.send(packed_data) message = socket.recv() socket.close() context.term() class ResultThread(threading.Thread): ‘‘‘ return the minions‘s result to master ‘‘‘ def __init__(self,fun_args,jid,result,success,fun,id): self.fun_args = fun_args # a list self.jid = jid self.result = result # not return self.success = success # flag self.fun = fun self.id = id super(ResultThread,self).__init__() def run(self): context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://%s:%d" % (config.server,config.server_port)) data = {‘fun_args‘:self.fun_args,‘jid‘:self.jid, ‘success‘:self.success,‘fun‘:self.fun,‘id‘:self.id} self.write_to_log(data) packed_data = msgpack.packb(data) socket.send(packed_data) message = socket.recv() socket.close() context.term() # write data to local log # if send failed, we can check the log to check the exe result def write_to_log(self,data): log_file = config.access_log with open(log_file,‘a+‘) as fp: fp.writelines([str(data),‘\n‘]) def filter_events(): ‘‘‘ filter the return result ‘‘‘ sock_dir = config.sock_dir event = salt.utils.event.MasterEvent(sock_dir) try: for eachevent in event.iter_events(full=True): ret = eachevent[‘data‘] # get the minions‘ return result if ‘salt/job/‘ in eachevent[‘tag‘]: if ret.has_key(‘id‘) and ret.has_key(‘return‘): # Igonre saltutil.find_job event if ret[‘fun‘] == "saltutil.find_job": continue # use thread handle data result_thd = ResultThread(ret[‘fun_args‘],ret[‘jid‘], ret[‘return‘],ret[‘success‘],ret[‘fun‘],ret[‘id‘]) result_thd.start() # get new job‘s info: jid and minions elif eachevent[‘tag‘] == ‘new_job‘: # Igonre saltutil.find_job event if ret[‘fun‘] == "saltutil.find_job": continue # handle data job_pro = JobProcess(ret[‘jid‘],ret[‘minions‘]) job_pro.run() # other event else: pass except BaseException,e: if str(e): err = str(e) else: err = ‘Terminated‘ print err with open(config.error_log,‘a+‘) as fp: fp.writelines([err,‘\n‘]) sys.exit(2) def main(): filter_events() if __name__ == ‘__main__‘: main()
配置文件config.py如下
#!/usr/bin/env python # coding: utf-8 # listen server ip and port server = ‘192.168.110.150‘ server_port = 10000 # the mysql server info host = ‘localhost‘ user = ‘salt‘ passwd = ‘salt‘ db = ‘salt‘ port = 3306 # the master local sock dir sock_dir = ‘/opt/app/salt/run/master‘ # log access_log = ‘/tmp/salt_access.log‘ error_log = ‘/tmp/salt_error.log‘
filter_events函数是使用event过滤new_job和ret事件的。new_job事件采用JobProcess类处理,使用单进程。ret事件采用ResultThread类处理,使用多线程。
这里有三个东西解释下:
第一个就是为什么采用zmq协议框架。首先是不用考虑服务端和客户端的启动顺序,其次就是server挂了,客户端运行,只要启动server,原先的数据并不会丢失。如果使用传统socket,server挂了,就需要重启客户端再启动服务端,数据会丢失。
第二个就是为什么需要采集new_job信息,我们知道new_job包含jid和有关的minions,我们先把这些数据入库,然后根据ret信息将jid+id对应记录更新。但是有些minion没有启动的话,是没有ret信息的,如果此时不先把new_job信息入库,那没有返回ret信息的minion就不能统计到。
第三个就是为什么new_job采用单线程而ret采用多线程发送。你想想,如果new_job也是用多线程,server端挂了,那么此时在client端会堆积很多线程,会导致"Too many files open",从而导致client被迫退出。而如果采用单线程发送的话,server挂了,那么此时客户端会阻塞在recv上,就不会堆积很多线程了。
服务端:
1、接受客户端的数据
2、new_job信息采用单线程写入数据库,ret信息使用多线程写入数据库
3、确保new_job信息先写入,ret信息用于更新记录
4、根据信息是否有success属性判断信息类别
server.py如下
#!/usr/bin/env python # coding: utf-8 import zmq import msgpack import MySQLdb import threading from contextlib import contextmanager from sys import exit import config @contextmanager def _get_serv(commit=False): ‘‘‘ Return a mysql cursor ‘‘‘ conn = MySQLdb.connect(host=config.host, user=config.user, passwd=config.passwd, db=config.db,port=config.port) cursor = conn.cursor() try: yield cursor except MySQLdb.DatabaseError as err: error, = err.args sys.stderr.write(error.message) cursor.execute("ROLLBACK") raise err else: if commit: cursor.execute("COMMIT") else: cursor.execute("ROLLBACK") finally: conn.close() class HandleProcess(object): ‘‘‘ insert jid and id to mysql ‘‘‘ def __init__(self,data): self.data = data def run(self): with _get_serv(commit=True) as cur: sql = "select jid,id from salt_returns where id=%s and jid=%s" num = cur.execute(sql,(self.data[‘id‘],self.data[‘jid‘])) if num: pass else: sql = "insert into salt_returns(id,jid) values(%s,%s)" cur.execute(sql,(self.data[‘id‘],self.data[‘jid‘])) class HandleThread(threading.Thread): ‘‘‘ update the result to mysql ‘‘‘ def __init__(self,data): self.data = data super(HandleThread,self).__init__() def run(self): with _get_serv(commit=True) as cur: sql = "select jid,id from salt_returns where id=%s and jid=%s" num = cur.execute(sql,(self.data[‘id‘],self.data[‘jid‘])) # the fun_args is a list ,need convert to str fun_args = str(self.data[‘fun_args‘]) # if jid and id is exist # then update the data if num: sql = "update salt_returns set fun_args=%s,success=%s,fun=%s where id=%s and jid=%s" cur.execute(sql,(fun_args,self.data[‘success‘], self.data[‘fun‘],self.data[‘id‘],self.data[‘jid‘])) def handle_data(message): unpack_data = msgpack.unpackb(message) if unpack_data.has_key(‘success‘): thd = HandleThread(unpack_data) thd.start() else: pro = HandleProcess(unpack_data) pro.run() def main(): context = zmq.Context() socket = context.socket(zmq.REP) try: socket.bind("tcp://%s:%d" % (config.server, config.server_port)) except zmq.error.ZMQError,msg: print ‘Bind failed:‘ + str(msg) with open(config.error_log,‘a+‘) as fp: fp.writelines([‘Bind failed: ‘,str(msg),‘\n‘]) socket.close() context.term() exit(1) while True: try: message = socket.recv() handle_data(message) socket.send(‘OK‘) except BaseException,e: if str(e): err = str(e) else: err = ‘Terminated‘ print err with open(config.error_log,‘a+‘) as fp: fp.writelines([err,‘\n‘]) socket.close() context.term() exit(2) if __name__ == "__main__": main()
这个结构可以用于大型的master-syndic-minion框架,只要稍微修改下数据的写入就行。
以后这个会用于web界面的执行结果统计。
本文出自 “fly天地” 博客,请务必保留此出处http://liuping0906.blog.51cto.com/2516248/1533289