首页 > 代码库 > Python 学习笔记 - 线程池

Python 学习笔记 - 线程池

前面我们学校里如何创建多线程,当我们接到一个新的请求时,会创建一个线程,执行完毕之后又销毁掉这个线程。对于一些数目巨大,但是单个快速执行的任务,每个任务真正执行消耗的时间和线程创建销毁的时间可能都差不多。这样一来,线程的效率浪费的比较严重。因此可以考虑使用线程池的技术,预先创建一些空闲的线程,当他们接收具体任务的时候,就去直接执行了,执行完了也不销毁,接着执行下一个任务。


Python里面,因为暂时还没有功能完备的线程池,因此这部分功能需要自己实现。


实现的基本原理是创建一个队列,把填充的任务数据一个个地塞进去;然后创建一个线程池,线程池里面的线程不断地读取队列里面的任务数据,执行任务,直到所有的任务都完成。


下面是我写的模拟Fabric批量远程操作的部分代码,paramiko模块可以让我远程的进行ssh连接;对于每个连接,我尝试使用了线程池。



#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author Yuan Li
"""
 本程序模拟Fabric,远程的批量进行SSH连接,可以执行下载,上传和shell命令执行。
 远程命令的执行,使用了线程池的技术,因为执行的时间比较少,而线程本身执行的时间占的比重比较大;
 对于下载和上传,因为本身就是比较消耗时间的操作,因此每个连接单独使用了线程创建和销毁,因为时间比较久,线程的时间可以忽略了
"""
import threading
import queue
import time
import paramiko
import os
#找到相对路径
parent_path = os.path.abspath(os.pardir)
db_path=os.path.join(parent_path,‘db‘)
#一个管理类,基本思路是把任务和相关的参数填充到队列(任务池)中,然后创建一个进程池,里面的进程循环地读取任务池里面的内容,任何执行其中的内容,直到所有任务全部实现。
class workmanager(object):
    #构造函数
    def __init__(self,cmd,username,password,work_num=1000,thread_num=2,):
        """
        :param cmd:远程命令
        :param username: 用户名
        :param password: 密码
        :param work_num: 任务池(队列大小)
        :param thread_num: 线程池大小
        """
        self.cmd=cmd
        self.work_num=work_num
        self.thread_num=thread_num
        self.queue=queue.Queue()
        self.threads=[]
        self.init_task(work_num,cmd,username,password)
        self.init_threadpool(thread_num)
    #初始化任务池
    def init_task(self,num,inp,username,password):
        for i in range(num):
            self.add_job(do_job,i,inp,username,password)
    #添加任务到任务池
    def add_job(self,job,*args):
        #填充任务到任务池,每一个任务是一个元祖(任务,参数列表)
        self.queue.put((job,list(args)))
    #初始化线程池
    def init_threadpool(self,num):
        for i in range(num):
            self.threads.append(work(self.queue))
    #等待挂起主线程
    def wait_allcomplete(self):
        for item in self.threads:
            if item.isAlive():
                item.join()
#线程类,每个线程循环地去任务池取任务
class work(threading.Thread):
    def __init__(self,que):
        super(work, self).__init__()
        self.queue=que
        self.start()
    def run(self):
        while True:
            try:
                #当任务池为空的时候,强制报错,退出
                do,args=self.queue.get(block=False)
                # print(do,args)
                do(args[0],args[1],args[2],args[3])
                #确保队列里面的任务都完成了
                self.queue.task_done()
            except:
                break
#初始化的一个主机组,测试用的
hosts=[‘anoble-ise‘,‘bberry-ise‘,‘blackbr-ise‘,‘jlau-ise‘,‘kwood-ise‘,‘marwa-ise‘,‘smaroo-ise‘,‘psekarwin-ise‘,‘spare2-ise‘]
#远程连接SSH并且执行命令
def do_job(args,inp,username,password):
    """
    :param args: hosts列表的索引
    :param inp: 远程命令
    :param username: 用户名
    :param password: 密码
    :return:
    """
    # time.sleep(0.1)
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hosts[args], 22, username, password)
    # 执行命令测试
    stdin, stdout, stderr = ssh.exec_command(inp)
    for line in stdout.readlines():
        print(line.strip())
    print(("\x1b[5;19;32m  %s \x1b[0m" % hosts[args]).center(40,‘*‘))
    print("\n")
#选择主机组
username=""
password=""
#入口文件
def display():
    global hosts,username,password
    msg="""
    欢迎使用Fabric模拟程序,您可以执行以下操作
    1.显示主机组
    2.批量执行远程命令
    3.批量上传
    4.批量下载
    5.输入管理员账号
    6.退出
    """
    while True:
        print(msg)
        inpt=input("请输入选项")
        #输出主机组的相关信息
        if inpt==‘1‘:pass
        #远程批量操作
        elif inpt==‘2‘:
            # username=input("用户名")
            # password=input("密码")
            if not username:
                print("请先配置登录账号信息")
            else:
                while True:
                    inp = input("输入指令(q返回上级目录)\n>>>")
                    if inp ==‘q‘:break
                    if not inp:
                        print("不能输入空命令")
                    else:
                        start = time.time()
                        #指定命令,用户名,密码,任务池(队列)的大小,和线程的个数)
                        work_manager = workmanager(inp,username,password, len(hosts), 2)
                        work_manager.wait_allcomplete()
                        end = time.time()
                        print("Cost time is %s" % (end - start))
        #创建批量上传的多线程
        elif inpt==‘3‘:
            pass
        #创建批量下载的多线程
        elif inpt==‘4‘:
            pass
        elif inpt==‘5‘:
            username = input("用户名")
            password = input("密码")
        elif inpt==‘6‘:
            exit("退出程序")
        else:
            print("无效输入,请重试")
if __name__ == ‘__main__‘:
    display()


本文出自 “麻婆豆腐” 博客,请务必保留此出处http://beanxyz.blog.51cto.com/5570417/1868230

Python 学习笔记 - 线程池