首页 > 代码库 > 基础入门_Python-模块和包.运维开发中watchdog事件监视的最佳实践?
基础入门_Python-模块和包.运维开发中watchdog事件监视的最佳实践?
简单介绍:
说明: 此模块是一个跨平台的PY库和SHELL工具,可以监视文件系统事件(增加/删除/修改)
快速安装:
pip install --upgrade watchdog
日志记录:
event_handler = LoggingEventHandler() -> event_handler
说明: 创建一个日志处理句柄,其实LoggingEventHandler是继承自FileSystemEventHandler类,只是重写了增删查改的回调函数,直接调用logging模块写到对应logging配置的目标
说明: 此模块为我们实现了一个watchdog.events.LoggingEventHandler类,可直接配合logging模块,可以简单记录增删查改
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import time import logging from watchdog.observers import Observer from watchdog.events import LoggingEventHandler # 说明: 导入其它模块 if __name__ == ‘__main__‘: logging.basicConfig(level=logging.DEBUG, format=‘%(asctime)s - %(message)s‘, datefmt=‘%Y-%m-%d %H:%M:%S‘) logging.info(‘start watching.‘) event_handler = LoggingEventHandler() watcher = Observer() watcher.schedule(event_handler=event_handler, path=‘.‘, recursive=True) watcher.start() try: while True: time.sleep(1) except KeyboardInterrupt, e: watcher.stop() watcher.join()
说明: LoggingEventHandler直接调用logging.info写日志,而logging又是如此强大的支持线程安全的日志模块,所以可以有机的结合实现更加强大的功能,watchdog非常简单,首先from watchdog.observers import Observer导入Observer类,然后实例化后调用schedule只用传递三个参数,第一个参数就是实例处理句柄,第二个参数是要监控的地址,默认并不递归监控,只有指定recursive=True时才会递归检测,至于线程对象的start/stop/join什么意思我就不多说了~对了,上面的那个for循环主要是为了捕捉Ctrl+C异常,调用watch.stop()让线程正常退出~
回调处理:
event_handler = FileSystemEventHandler() -> event_handler
说明: 由于FileSystemEventHandler是基类,并没有具体实现on_any_event/on_created/on_deleted/on_modified/on_moved方法,所以通常并不会直接实例化作为事件处理对象,而是自定义一个类继承它然后去实现那些回调函数
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import time from watchdog.events import FileSystemEventHandler # 说明: 导入其它模块 def check_modification(func): def wrapper(self, event): print u‘‘‘Event Statics: 事件类型: {} 是否目录: {} 文件路径: {} ‘‘‘.format(event.event_type, event.is_directory, event.src_path) return wrapper class CustomerHandler(FileSystemEventHandler): @check_modification def on_created(self, event): pass @check_modification def on_deleted(self, event): pass @check_modification def on_modified(self, event): pass @check_modification def on_moved(self, event): pass def start_watching(event_handler, path=‘.‘, recursive=True): from watchdog.observers import Observer watcher = Observer() watcher.schedule(event_handler=event_handler, path=path, recursive=recursive) watcher.start() try: while True: time.sleep(1) except KeyboardInterrupt, e: watcher.stop() if __name__ == ‘__main__‘: event_handler = CustomerHandler() start_watching(event_handler=event_handler)
说明: 如上自定义一个继承自FileSystemEventHandler的类,并且实现了增删查该的方法,所有方法默认都有一个event参数,为事件对象,为了方便直接定义了个修饰器,输出event对象的三个常用的属性,event.event_type, event.is_directory, event.src_path
最佳实践:
1. 玩过FLASK的人都知道在DEBUG模式下,对PY文件的修改会自从重启整个程序,避免调试手动重启的麻烦,昨天刚好接到一个需求,希望写一个简单的插件系统,支持动态加载,我们都直到一旦程序启动再向插件目录增/删/查/改插件,程序是无感知的,为了实现类似FLASK重载效果,让插件式监控更加智能,于是学习了下FLASK源码实现,并自己手写了一个简化版的自动重载装饰器,直接看源码吧~
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import os import sys import time import threading import subprocess # 说明: 导入其它模块 # 说明: 基类监视 class BaseReloaderLoop(object): def __init__(self, watch_files, interval): self.watch_files = set(os.path.abspath(f) for f in watch_files) self.interval = interval def monitor(self): pass def rerun_with_autoreload(self): while True: envs = os.environ.copy() args = [sys.executable] + sys.argv envs.update({‘APP_AUTORELOAD‘: ‘True‘}) # 说明: 阻塞版的POPEN subprocess.call(args, env=envs) # 说明: 默认监视 class StatReloaderLoop(BaseReloaderLoop): def __init__(self, *args, **kwargs): super(StatReloaderLoop, self).__init__(*args, **kwargs) # 说明: WATCHDOG class WatchdogReloaderLoop(BaseReloaderLoop): def __init__(self, *args, **kwargs): super(WatchdogReloaderLoop, self).__init__(*args, **kwargs) self.scheduling_flag = True from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler def stop_scheduling(): self.scheduling_flag = False class _EventHandler(FileSystemEventHandler): def __init__(self): super(_EventHandler, self).__init__() def on_any_event(self, event): stop_scheduling() self.event_handler = _EventHandler() self.monitor_class = Observer def monitor(self): watcher = self.monitor_class() watcher.start() while self.scheduling_flag: for path in self.watch_files: try: watcher.schedule(event_handler=self.event_handler, path=path, recursive=True) except (OSError, WindowsError), e: # 说明: 异常处理 pass time.sleep(self.interval) reloader_loop = { ‘stat‘: StatReloaderLoop, ‘watchdog‘: WatchdogReloaderLoop, } try: __import__(‘watchdog.observers‘) except ImportError, e: reloader_loop[‘auto‘] = reloader_loop[‘status‘] else: reloader_loop[‘auto‘] = reloader_loop[‘watchdog‘] # 说明: 装饰函数 def run_with_autoreload(watch_files=None, interval=1, rtype=‘auto‘): """Decorator for run with autoreloader. :param watch_files: file path :type watch_files: list :param interval: check interval :type interval: int :param rtype: reload type :type rtype: str :return: None :rtype: None """ def decorator(func): def wrapper(*args, **kwargs): reloader = reloader_loop[rtype](watch_files, interval) isreload = os.environ.get(‘APP_AUTORELOAD‘, ‘False‘) if isreload == ‘True‘: cur_thread = threading.Thread(target=func, args=args, kwargs=kwargs) cur_thread.setDaemon(True) cur_thread.start() reloader.monitor() else: reloader.rerun_with_autoreload() return wrapper return decorator
说明: 使用方法很简单直接在你的主程序入口函数上@run_with_autoreload(..., ..., ...)支持设置监视多个目录,设置监视间隔,说下整个思路吧,首先在当前环境获取APP_AUTORELOAD的值是否为True(默认其实都是没有配置也无需配置的),如果是则是由表示由子进程启动,否则调用rerun_with_autoreload(),这个方法主要是设置环境变量APP_AUTORELOAD为True且利用subprocess.Call在新的环境envs中调用我们命令行中输入的命令以子进程形式重新执行,子进程执行时由于APP_AUTORELOAD已经为True,所以会以开启一个线程执行我们修饰的主函数,但是注意,它设置了cur_thread.setDaemon(True)也就是说一旦子进程结束,它不管有没有执行完毕都会退出,而reloader.monitor()则担任了cur_thread.join()的阻塞作用,而内部watchdog监控到任何事件都会修改self.scheduling_flag的值,正好reloader.monitor()就依赖于此值进行循环的,所以一旦事件发送就停止循环,而此时reloader.rerun_with_autoreload()使得生成新的子进程接管整个应用,这样就达到了自动重载的功能
简单调用:
import time from wrappers.autoreload import run_with_autoreload @run_with_autoreload(watch_files=[‘./img‘, ‘./css‘], interval=1, rtype=‘auto‘) def main(): while True: print ‘=> {}‘.format(time.time()) time.sleep(1) if __name__ == ‘__main__‘: print ‘found notice: app start at {}.‘.format(time.time()) main()
说明: 程序的入口函数大家按照自己的应用来,如上只是简单演示,我监视的是当前目录下的img/css目录,你可以尝试在当前目录建立img/css目录然后启动程序然后在img/css中添加/删除/修改文件,测试整个应用程序有没有重新加载~
本文出自 “@湖北@白头发” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1867597
基础入门_Python-模块和包.运维开发中watchdog事件监视的最佳实践?