首页 > 代码库 > 基础入门_Python-模块和包.运维开发中__import__动态导入最佳实践?

基础入门_Python-模块和包.运维开发中__import__动态导入最佳实践?

常规导入:


import module_name[,module1,...] 
from module_name import [*|child[,child1,...]
from module_name import [*|child[,child1,...] as alias_name

注意: 导入语句可出现在程序任意位置,自定义包要实现from module_name import *的效果则此模块必须在__init__.py实现__all__ = [‘module_1‘, ‘module_2‘]


加载一次:

说明: 多次重复使用import语句时,不会重新加载模块,而是把该模块的内存地址给引用到本地环境变量


==> x.py <==
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# 51CTOBG: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import os
# 说明: 导入其它模块
print ‘os in x.py‘, id(os)
==> y.py <==
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# 51CTOBG: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import x
import os
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    print ‘os in y.py‘, id(os)
    import x


重新加载:

说明: 对已经加载的模块进行重新加载,一般用于原模块有变化等特殊情况,reload前该模块必须已经import过,但是需要注意的是已经使用的实例还会使用旧模块,而新产生的实例才会使用新模块,reload之后还是原来的内存地址


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# 51CTOBG: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import sys
try:
    sys.setdefaultencoding(‘utf-8‘)
except Exception, e:
    print e
    reload(sys)
    sys.setdefaultencoding(‘utf-8‘)
    print sys.getdefaultencoding()
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    pass

说明: 很多人不名为为何要reload()一下sys才能使用setdefaultencoding设置编码,其实是因为解释器初始化时预先执行了/usr/lib64/python2.7/site.py,而在其554行代码中del sys.setdefaultencoding删除了此方法,其实你import sys只是指向了那个被删除了setdefaultencoding属性的sys模块地址,所以需要重新reload一下还原此方法


相对导入:

说明:  PY通过模块名中的点来判断是否属于包,属于哪个包,当你使用from ..xx import oo,其中的点表示包结构中的层次,如果模块名为__main__表示它不属于任何包,所以此时模块名应该不包含点,否则会导致relative-import in non-package错误,也就是说包含相对导入的文件无法作为入口文件,但是可通过python -m来当作模块载入


from ..libs.database import Redis
from ..libs.alarm import alarm_template
from ..libs.alarm.api import weixin_notify


绝对导入:

说明: 绝对导入也叫完全导入,2.x版本必须使用from __future__ import absolute_import打开此机制,而3.x则将其作为默认机制


from x.y.z import o


动态导入:

说明: __import__其实就是import的内部实现,通常用于动态加载,如插件式监控系统中只知道插件名如何执行插件内的代码?此时就可以通过动态加载来实现获取插件内的函数然后去调用

__import__(module_name[, globals[, locals[, fromlist]]])  -> object

说明: module_name为模块名,但是需要注意的是如果module_name包含子模块如x.y,则默认会返回x对象,如果要返回y对象需要设置fromlist列表,来实现from x import y的效果.当然要实现动态导入含有专门的imp和importlib模块.可以学习一下~


应用场景:

1. zabbix/nagios等监控系统主动监控都必须手工配置Agent,即使自定义插件亦是如此,如果要实现一个自动检测插件自动调用插件自动上报数据的监控系统要如何实现哪?

技术分享

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# 51CTOBG: http://my.51CTOBG.net/pydevops/
# Purpose:
#
"""
# 说明: 兼容绝对导入
from __future__ import absolute_import
# 说明: 导入公共模块
import json
import time
import threading
# 说明: 导入其他模块
from ..libs.database import Redis
from ..libs.alarm import alarm_template
from ..libs.alarm.api import weixin_notify


# 说明: 客户端监控类
class MonitorClient(object):
    def __init__(self, redis, agent, info, error):
        self.info = info
        self.error = error
        self.redis = Redis(db=redis[‘db‘],
                           host=redis[‘host‘],
                           port=redis[‘port‘],
                           password=redis[‘password‘])
        self.agent_host = agent[‘host‘]
        self.redis_host = redis[‘host‘]
        self.clientconf = self._get_climconf()
        self.pubchannel = redis[‘publish‘] or ‘xmdevops_channel‘

        self.info.info(‘update key#climconf::%s val#%s‘ % (self.agent_host, self.clientconf))

    def start(self):
        self._plugins_handler()

    def _get_climconf(self):
        redis_key = ‘climconf::%s‘ % (self.agent_host,)
        while True:
            redis_val = self.redis.get(redis_key)
            if not redis_val:
                message = ‘getval key#%s with nothing,5 seconds try again‘ % (redis_key,)
                self.info.info(message)
                self._report_exception(redis_key, message)
                time.sleep(5)
                continue
            try:
                conf_dict = json.loads(redis_val)
            except TypeError, e:
                message = ‘unpack key#%s val#%s with error %s‘ % (redis_key, redis_val, e)
                self.error.error(message)
                self._report_exception(redis_key, message)
                time.sleep(5)
                continue
            break
        return conf_dict

    def _plugins_handler(self):
        while True:
            for service_name, plugin_info in self.clientconf.iteritems():
                if len(plugin_info) < 4:
                    self.clientconf[service_name].append(0)
                plugin_name, check_interval, add_data, last_runtime = plugin_info
                if time.time() - last_runtime > check_interval:
                    self.clientconf[service_name][-1] = time.time()
                    self.info.info(‘plugin key#%s val#%s is called‘ % (service_name, plugin_info))
                    cur_thread = threading.Thread(
                        target=self._plugins_called, args=(service_name, plugin_name, add_data))
                    cur_thread.start()
            time.sleep(1)

            old_clientconf = self.clientconf
            self.clientconf = self._get_climconf()
            for trigger_key, trigger_val in self.clientconf.iteritems():
                if trigger_key in old_clientconf:
                    self.clientconf[trigger_key].append(old_clientconf[trigger_key][-1])

    def _plugins_called(self, service_name, plugin_name, add_data):
        plugin_path = ‘app.%s.%s‘ % (‘plugins‘, service_name)
        try:
            plugin_mods = __import__(plugin_path, fromlist=[service_name])
        except ValueError, e:
            message = ‘import key#%s val#%s with error %s‘ % (plugin_path, e)
            self.error.error(message)
            self._report_exception(plugin_path, message)
            return
        try:
            plugin_func = getattr(plugin_mods, plugin_name)
        except AttributeError, e:
            message = ‘plugin key#%s val#%s not exists‘ % (plugin_mods, plugin_name)
            self.error.error(message)
            self._report_exception(plugin_func, message)
            return
        plugin_data = plugin_func(add_data)
        report_data = {
            ‘host‘: self.agent_host,
            ‘data‘: plugin_data,
            ‘service‘: service_name
        }
        data = json.dumps(report_data)
        self.info.info(‘publish key#%s val#%s‘ % (service_name, data))
        self.redis.publish(self.pubchannel, data)

    def _report_exception(self, errors, details):
        message = alarm_template % (
            self.agent_host, ‘critical‘, errors,
            time.strftime(‘%H:%M:%S‘, time.localtime()), details)
        results = weixin_notify(message)
        if results:
            self.error.error(results)

说明: 如上就是一个自己写的基于redis的全自动化微型监控框架部分核心代码,首先读取网页端下发下来的监控配置,然后利用线程通过__import__动态调用插件中的入口监控函数,然后将执行结果上报到对应区域的redis,server端再处理阀值数据等等,可以作为一个非常好的学习案例

本文出自 “ζ自动化运维开发之路ζ” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1857506

基础入门_Python-模块和包.运维开发中__import__动态导入最佳实践?