首页 > 代码库 > tornado 异步调用系统命令和非阻塞线程池

tornado 异步调用系统命令和非阻塞线程池

项目中异步调用 ping 和 nmap 实现对目标 ip 和所在网关的探测

Subprocess.STREAM 不用担心进程返回数据过大造成的死锁, Subprocess.PIPE 会有这个问题.

import tornado.gen
from tornado.process import Subprocess


@tornado.gen.coroutine
def run_command(command):
    """run command"""
    process = Subprocess(
        [command],
        stdout=Subprocess.STREAM,
        stderr=Subprocess.STREAM,
        shell=True
    )
    out, err = yield [process.stdout.read_until_close(), process.stderr.read_until_close()]
    raise tornado.gen.Return((out, err))


class NmapHandler(tornado.web.RequestHandler):
    """handle nmap check request"""
    @tornado.gen.coroutine
    def get(self):
        ip = self.get_argument("ip", None)
        if not ip:
            self.write(json.dumps({}))
            raise tornado.gen.Return(None)

        nmap_resp, _ = yield run_command(nmap % ip)

        self.write(json.dumps(
            {
                "ip": ip,
                "nmap_resp": nmap_resp
            }
        ))

 

使用非阻塞线程池, 调用 paramiko 来分发检测任务.

from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import run_on_executor


class FailureHandler(tornado.web.RequestHandler):
    """handle server check request"""
    executor = ThreadPoolExecutor(100)

    @run_on_executor
    def get(self):
        ip = self.get_argument("ip", None)
        if not ip:
            self.write(json.dumps({}))
            raise tornado.gen.Return(None)

        resp = distributer(ip)
        if resp:
            resp = 0
        else:
            resp = 1

        self.write(json.dumps(
            {
                "ip": ip,
                "failure_rslt": resp
            }
        ))

 

tornado 异步调用系统命令和非阻塞线程池