首页 > 代码库 > supervisor(二)event

supervisor(二)event

    supervisor的event机制其实,就是一个监控/通知的框架。抛开这个机制实现的过程来说的话,event其实就是一串数据,这串数据里面有head和body两部分。咱们先弄清楚event数据结构,咱们才能做后续的处理。先看看header长啥样的吧

ver:3.0 server:supervisor serial:21 pool:listener poolserial:10 eventname:PROCESS_COMMUNICATION_STDOUT len:54

来说说上面的这个header每一项,都是什么?

ver:表示event协议的版本,目前是3.0

server:表示supervisor的标识符,也就是咱们上一篇中[supervisord]块中的identifier选项中的东西

        默认为supervisor

serial:这个东西是每个event的序列号,supervisord在运行过程中,发送的第一个event的序列号就是

        1,接下来的event依次类推

pool:这个是你的listener的pool的名字,一般你的listener只启动一个进程的的话,其实也就没有         pool的概念了。名字就是[eventlistener:theeventlistenername]这个东西

poolserial:上面的serial是supervisord给每个event的编号。 而poolserial则是,eventpool给发送

             到我这个pool过来的event编的号

eventname:这个是event的类型名称,这个后面说。

len:这个长度,表示的是header后面的body部分的长度。header之后,我们会取len长度的内容作为        body。

    好,说完了header,咱们就该说说body部分的数据结构了。body的数据结构,其实是和event的具体类型相关的,不同的event的类型,header的结构都一样,但是body的结构大多就不一样了。

关于event类型,咱们就不展开说了,因为太多了,具体大伙可以去参阅一下官网。其实搞会一个,其他也都一个样。

咱们这里说说待会一个要用到的类型就OK了,啥类型呢?

是PROCESS_STATE_EXITED

看着这名字,大伙差不多也就知道它是干什么的了。PROCESS_STATE_EXITED其实就是,当supervisord管理的子进程退出的时候,supervisord就会产生PROCESS_STATE_EXITED这么个event。

来看看PROCESS_STATE_EXITED长啥样吧,header咱们前面说过了,都一样。来看看body部分

processname:cat groupname:cat from_state:RUNNING expected:0 pid:2766

来说说具体含义

processname:就是进程名字,这里名字不是我们实际进程的名字,而是咱们[program:x]配置成的名字

groupname:组名,这个一个样

from_state:这个是,我们的进程退出前的状态是什么状态

expected:这个咱们前面也讲过,默认情况下exitcodes是0和2,也就是说0和2是expected。其它的退出

          码,也就是unexpected了

pid:这个大伙想必都知道。

    OK,说到了这里,我们知道了event的产生,然后给我们的listener这么一种结构的数据。

现在我们有数据了,就看咱们怎么去处理这些数据了,这个过程就仁者见仁,智者见智了。我们可以利用接收的数据,加工后,进行报警,等等操作。

    处理数据之前,咱们还得要来了解一下,listener和supervisord之间的通信过程

    在这里我们首先要搞清楚,event的发起方和接收方。

    event的发起方是supervisord进程,接收方是一个叫listener的东西,listener怎么配置,上一篇参数详解里面已经写的很清楚了,大伙可以去参考下,这里就不赘述了。其实listener和program一样,都是supervisord的子进程。两者的在配置上,很多选项也都一样。

    其实,event还有另外一个过程,我们的program也就是我们要管理的进程,也可以发送event,进而和supervisord主动通信。不过program程序一般都是程序员们搞,咱们搞运维的就不管他们的事情了

OK,看看event协议。

协议其实很简单。

  1. 当supervisord启动的时候,如果我们的listener配置为autostart=true的话,listener就会作为supervisor的子进程被启动。

  2. listener被启动之后,会向自己的stdout写一个"READY"的消息,此时父进程也就是supervisord读取到这条消息后,会认为listener处于就绪状态。

  3. listener处于就绪状态后,当supervisord产生的event在listener的配置的可接受的events中时,supervisord就会把该event发送给该listener。  

  4. listener接收到event后,我们就可以根据event的head,body里面的数据,做一些列的处理了。我们根据event的内容,判断,提取,报警等等操作。

  5. 该干的活都干完之后,listener需要向自己的stdout写一个消息"RESULT\nOK",supervisord接受到这条消息后。就知道listener处理event完毕了。


好,来看看例子吧

#!/usr/bin/env python
#coding:utf-8

import sys
import os
import subprocess
#childutils这个模块是supervisor的一个模型,可以方便我们处理event消息。。。当然我们也可以自己按照协议,用任何语言来写listener,只不过用childutils更加简便罢了
from supervisor import childutils
from optparse import OptionParser
import socket
import fcntl
import struct

__doc__ = "\033[32m%s,捕获PROCESS_STATE_EXITED事件类型,当异常退出时触发报警\033[0m" % sys.argv[0]

def write_stdout(s):
    sys.stdout.write(s)
    sys.stdout.flush()
#定义异常,没啥大用其实
class CallError(Exception):
    def __init__(self,value):
        self.value = value
    def __str__(self):
        return repr(self.value)
#定义处理event的类
class ProcessesMonitor():
    def __init__(self):
        self.stdin = sys.stdin
        self.stdout = sys.stdout

    def runforever(self):
        #定义一个无限循环,可以循环处理event,当然也可以不用循环,把listener的autorestart#配置为true,处理完一次event就让该listener退出,然后supervisord重启该listener,这样listen#er就可以处理新的event了
        while 1:
            #下面这个东西,是向stdout发送"READY",然后就阻塞在这里,一直等到有event发过来
            #headers,payload分别是接收到的header和body的内容
            headers, payload = childutils.listener.wait(self.stdin, self.stdout)
            #判断event是否是咱们需要的,不是的话,向stdout写入"RESULT\NOK",并跳过当前
            #循环的剩余部分
            if not headers[‘eventname‘] == ‘PROCESS_STATE_EXITED‘:
                childutils.listener.ok(self.stdout)
                continue

            pheaders,pdata = childutils.eventdata(payload+‘\n‘)
            #判读event是否是expected是否是expected的,expected的话为1,否则为0
            #这里的判断是过滤掉expected的event
            if int(pheaders[‘expected‘]):
                childutils.listener.ok(self.stdout)
                continue

            ip = self.get_ip(‘eth0‘)
            #构造报警信息结构
            msg = "[Host:%s][Process:%s][pid:%s][exited unexpectedly fromstate:%s]" % (ip,pheaders[‘processname‘],pheaders[‘pid‘],pheaders[‘from_state‘])
            #调用报警接口,这个接口是我们公司自己开发的,大伙不能用的,要换成自己的接口
            subprocess.call("/usr/local/bin/alert.py -m ‘%s‘" % msg,shell=True)
            #stdout写入"RESULT\nOK",并进入下一次循环
            childutils.listener.ok(self.stdout)


    ‘‘‘def check_user(self):
        userName = os.environ[‘USER‘]
        if userName != ‘root‘:
            try:
                raise MyError(‘must be run by root!‘)
            except MyError as e:
                write_stderr( "Error occurred,value:%s\n" % e.value)
                sys.exit(255)‘‘‘

    def get_ip(self,ifname):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        inet = fcntl.ioctl(s.fileno(), 0x8915, struct.pack(‘256s‘, ifname[:15]))
        ret = socket.inet_ntoa(inet[20:24])
        return ret


def main():
    parser = OptionParser()
    if len(sys.argv) == 2:
        if sys.argv[1] == ‘-h‘ or sys.argv[1] == ‘--help‘:
            print __doc__
            sys.exit(0)
    #(options, args) = parser.parse_args()
    #下面这个,表示只有supervisord才能调用该listener,否则退出
    if not ‘SUPERVISOR_SERVER_URL‘ in os.environ:
        try:
            raise CallError("%s must be run as a supervisor event" % sys.argv[0])
        except CallError as e:
            write_stderr("Error occurred,value: %s\n" % e.value)

        return

    prog = ProcessesMonitor()
    prog.runforever()

if __name__ == ‘__main__‘:
    main()

差不多就这些了,其他常用的event类型,已经listener的三种状态,已经怎么转换的。大伙可以去官网上看看

本文出自 “小城运维” 博客,请务必保留此出处http://lixcto.blog.51cto.com/4834175/1540169