首页 > 代码库 > 基础入门_Python-模块和包.运维开发中watchdog事件监视的最佳实践?

基础入门_Python-模块和包.运维开发中watchdog事件监视的最佳实践?

简单介绍:

说明:  此模块是一个跨平台的PY库和SHELL工具,可以监视文件系统事件(增加/删除/修改)


快速安装:

pip install --upgrade watchdog

日志记录:

event_handler = LoggingEventHandler() -> event_handler

说明: 创建一个日志处理句柄,其实LoggingEventHandler是继承自FileSystemEventHandler类,只是重写了增删查改的回调函数,直接调用logging模块写到对应logging配置的目标


说明: 此模块为我们实现了一个watchdog.events.LoggingEventHandler类,可直接配合logging模块,可以简单记录增删查改


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import time
import logging
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    logging.basicConfig(level=logging.DEBUG,
                        format=‘%(asctime)s - %(message)s‘,
                        datefmt=‘%Y-%m-%d %H:%M:%S‘)
    logging.info(‘start watching.‘)
    event_handler = LoggingEventHandler()
    watcher = Observer()
    watcher.schedule(event_handler=event_handler, path=‘.‘, recursive=True)
    watcher.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt, e:
        watcher.stop()
    watcher.join()

说明: LoggingEventHandler直接调用logging.info写日志,而logging又是如此强大的支持线程安全的日志模块,所以可以有机的结合实现更加强大的功能,watchdog非常简单,首先from watchdog.observers import Observer导入Observer类,然后实例化后调用schedule只用传递三个参数,第一个参数就是实例处理句柄,第二个参数是要监控的地址,默认并不递归监控,只有指定recursive=True时才会递归检测,至于线程对象的start/stop/join什么意思我就不多说了~对了,上面的那个for循环主要是为了捕捉Ctrl+C异常,调用watch.stop()让线程正常退出~


回调处理:

event_handler = FileSystemEventHandler() -> event_handler

说明: 由于FileSystemEventHandler是基类,并没有具体实现on_any_event/on_created/on_deleted/on_modified/on_moved方法,所以通常并不会直接实例化作为事件处理对象,而是自定义一个类继承它然后去实现那些回调函数


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import time
from watchdog.events import FileSystemEventHandler
# 说明: 导入其它模块
def check_modification(func):
    def wrapper(self, event):
        print u‘‘‘Event Statics:
        事件类型: {}
        是否目录: {}
        文件路径: {}
        ‘‘‘.format(event.event_type, event.is_directory, event.src_path)
    return wrapper
class CustomerHandler(FileSystemEventHandler):
    @check_modification
    def on_created(self, event):
        pass
    @check_modification
    def on_deleted(self, event):
        pass
    @check_modification
    def on_modified(self, event):
        pass
    @check_modification
    def on_moved(self, event):
        pass
def start_watching(event_handler, path=‘.‘, recursive=True):
    from watchdog.observers import Observer
    watcher = Observer()
    watcher.schedule(event_handler=event_handler, path=path, recursive=recursive)
    watcher.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt, e:
        watcher.stop()
if __name__ == ‘__main__‘:
    event_handler = CustomerHandler()
    start_watching(event_handler=event_handler)

说明: 如上自定义一个继承自FileSystemEventHandler的类,并且实现了增删查该的方法,所有方法默认都有一个event参数,为事件对象,为了方便直接定义了个修饰器,输出event对象的三个常用的属性,event.event_type, event.is_directory, event.src_path


最佳实践:

技术分享

1. 玩过FLASK的人都知道在DEBUG模式下,对PY文件的修改会自从重启整个程序,避免调试手动重启的麻烦,昨天刚好接到一个需求,希望写一个简单的插件系统,支持动态加载,我们都直到一旦程序启动再向插件目录增/删/查/改插件,程序是无感知的,为了实现类似FLASK重载效果,让插件式监控更加智能,于是学习了下FLASK源码实现,并自己手写了一个简化版的自动重载装饰器,直接看源码吧~

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import os
import sys
import time
import threading
import subprocess
# 说明: 导入其它模块


# 说明: 基类监视
class BaseReloaderLoop(object):
    def __init__(self, watch_files, interval):
        self.watch_files = set(os.path.abspath(f) for f in watch_files)
        self.interval = interval

    def monitor(self):
        pass

    def rerun_with_autoreload(self):
        while True:
            envs = os.environ.copy()
            args = [sys.executable] + sys.argv
            envs.update({‘APP_AUTORELOAD‘: ‘True‘})
            # 说明: 阻塞版的POPEN
            subprocess.call(args, env=envs)


# 说明: 默认监视
class StatReloaderLoop(BaseReloaderLoop):
    def __init__(self, *args, **kwargs):
        super(StatReloaderLoop, self).__init__(*args, **kwargs)


# 说明: WATCHDOG
class WatchdogReloaderLoop(BaseReloaderLoop):
    def __init__(self, *args, **kwargs):
        super(WatchdogReloaderLoop, self).__init__(*args, **kwargs)
        self.scheduling_flag = True
        from watchdog.observers import Observer
        from watchdog.events import FileSystemEventHandler

        def stop_scheduling():
            self.scheduling_flag = False

        class _EventHandler(FileSystemEventHandler):
            def __init__(self):
                super(_EventHandler, self).__init__()

            def on_any_event(self, event):
                stop_scheduling()

        self.event_handler = _EventHandler()
        self.monitor_class = Observer

    def monitor(self):
        watcher = self.monitor_class()
        watcher.start()

        while self.scheduling_flag:
            for path in self.watch_files:
                try:
                    watcher.schedule(event_handler=self.event_handler, path=path, recursive=True)
                except (OSError, WindowsError), e:
                    # 说明: 异常处理
                    pass
            time.sleep(self.interval)


reloader_loop = {
    ‘stat‘: StatReloaderLoop,
    ‘watchdog‘: WatchdogReloaderLoop,
}
try:
    __import__(‘watchdog.observers‘)
except ImportError, e:
    reloader_loop[‘auto‘] = reloader_loop[‘status‘]
else:
    reloader_loop[‘auto‘] = reloader_loop[‘watchdog‘]


# 说明: 装饰函数
def run_with_autoreload(watch_files=None, interval=1, rtype=‘auto‘):
    """Decorator for run with autoreloader.
    :param watch_files: file path
    :type watch_files: list
    :param interval: check interval
    :type interval: int
    :param rtype: reload type
    :type rtype: str
    :return: None
    :rtype: None
    """
    def decorator(func):
        def wrapper(*args, **kwargs):
            reloader = reloader_loop[rtype](watch_files, interval)
            isreload = os.environ.get(‘APP_AUTORELOAD‘, ‘False‘)
            if isreload == ‘True‘:
                cur_thread = threading.Thread(target=func, args=args, kwargs=kwargs)
                cur_thread.setDaemon(True)
                cur_thread.start()
                reloader.monitor()
            else:
                reloader.rerun_with_autoreload()
        return wrapper
    return decorator

说明: 使用方法很简单直接在你的主程序入口函数上@run_with_autoreload(..., ..., ...)支持设置监视多个目录,设置监视间隔,说下整个思路吧,首先在当前环境获取APP_AUTORELOAD的值是否为True(默认其实都是没有配置也无需配置的),如果是则是由表示由子进程启动,否则调用rerun_with_autoreload(),这个方法主要是设置环境变量APP_AUTORELOAD为True且利用subprocess.Call在新的环境envs中调用我们命令行中输入的命令以子进程形式重新执行,子进程执行时由于APP_AUTORELOAD已经为True,所以会以开启一个线程执行我们修饰的主函数,但是注意,它设置了cur_thread.setDaemon(True)也就是说一旦子进程结束,它不管有没有执行完毕都会退出,而reloader.monitor()则担任了cur_thread.join()的阻塞作用,而内部watchdog监控到任何事件都会修改self.scheduling_flag的值,正好reloader.monitor()就依赖于此值进行循环的,所以一旦事件发送就停止循环,而此时reloader.rerun_with_autoreload()使得生成新的子进程接管整个应用,这样就达到了自动重载的功能


简单调用:

import time
from wrappers.autoreload import run_with_autoreload


@run_with_autoreload(watch_files=[‘./img‘, ‘./css‘], interval=1, rtype=‘auto‘)
def main():
    while True:
        print ‘=> {}‘.format(time.time())
        time.sleep(1)

if __name__ == ‘__main__‘:
    print ‘found notice: app start at {}.‘.format(time.time())
    main()

说明: 程序的入口函数大家按照自己的应用来,如上只是简单演示,我监视的是当前目录下的img/css目录,你可以尝试在当前目录建立img/css目录然后启动程序然后在img/css中添加/删除/修改文件,测试整个应用程序有没有重新加载~

本文出自 “@湖北@白头发” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1867597

基础入门_Python-模块和包.运维开发中watchdog事件监视的最佳实践?