首页 > 代码库 > python第十一天-----补:线程池
python第十一天-----补:线程池
低版本:
1 #!/usr/bin/env python 2 import threading 3 import time 4 import queue 5 6 7 class TreadPool: 8 """ 9 将线程加入到队列中作为资源去完成任务10 优点:简单好写容易理解11 缺点:太尼玛多了.....12 """13 def __init__(self, maxsize):14 self.maxsize = maxsize15 self._q = queue.Queue(maxsize)16 for i in range(maxsize):17 self._q.put(threading.Thread)18 19 def get_thread(self):20 return self._q.get()21 22 def add_thread(self):23 self._q.put(threading.Thread)24 25 26 def task(arg, p):27 print(arg)28 time.sleep(1)29 p.add_thread()30 31 pool = TreadPool(5)32 33 for i in range(100):34 t = pool.get_thread()35 obj = t(target=task, args=(i, pool))36 obj.start()
高级版本:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import queue # 队列模块 5 import threading # 线程模块 6 import contextlib # 上下文模块 7 import time # 时间模块 8 9 StopEvent = object() # 创建一个停止时所需要用到的对象 10 11 12 class ThreadPool(object): 13 """ 14 线程池(用于放置任务,将任务作为队列中元素让线程去取得,可以复用线程减少开销) 15 """ 16 def __init__(self, max_num, max_task_num=None): 17 """ 18 构造方法 19 :param max_num: 20 :param max_task_num:所创建的队列内最大支持的任务个数 21 """ 22 if max_task_num: 23 self.q = queue.Queue(max_task_num) # 指定队列任务数量则创建有限队列 24 else: 25 self.q = queue.Queue() # 未指定队列任务数量则创建无限队列 26 self.max_num = max_num # 每次使用的最大线程个数 27 self.cancel = False # 任务取消,默认False,用于线程停止的判断 28 self.terminal = False # 任务终止,默认False,用于线程池终止的判断 29 self.generate_list = [] # 定义一个已生成任务列表 30 self.free_list = [] # 定义一个空闲任务列表 31 32 def run(self, func, args, callback=None): 33 """ 34 线程池执行一个任务方法 35 :param func: 传递进来的任务函数 36 :param args: 任务函数使用的参数 37 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数 38 1、任务函数执行状态; 39 2、任务函数返回值(默认为None,即:不执行回调函数) 40 :return: 如果线程池已经终止,则返回True否则None 41 """ 42 if self.cancel: # 如果条件为真则不会继续执行 43 return 44 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 45 self.generate_thread() # 如果现有空闲列表无元素并且已生成任务列表内 46 # 元素个数小于队列支持的最大数量则创建一个线程 47 w = (func, args, callback,) # 具体任务 48 self.q.put(w) # 将任务放入队列当中 49 50 def generate_thread(self): 51 """ 52 创建一个线程方法 53 """ 54 t = threading.Thread(target=self.call) # 调用线程类创建一个线程,参数传递self.call方法 55 t.start() # 线程开始 56 57 def call(self): 58 """ 59 循环去获取任务函数并执行任务函数 60 """ 61 current_thread = threading.currentThread() # 创建当前任务 62 self.generate_list.append(current_thread) # 生成任务列表添加当前任务 63 64 event = self.q.get() # 事件获取 65 while event != StopEvent: # 当前事件不是停止时执行 66 67 func, arguments, callback = event # 任务具体函数,参数获取 68 try: 69 result = func(*arguments) # 结果为任务处理的出的结果 70 success = True # 任务处理成功 71 except Exception as e: 72 success = False # 任务处理失败 73 result = None # 结果为None 74 75 if callback is not None: # 回调不为空 76 try: 77 callback(success, result) # 将刚才执行结果返回 78 except Exception as e: 79 pass 80 81 with self.worker_state(self.free_list, current_thread): 82 if self.terminal: # 如果线程池已经被终止 83 event = StopEvent # 事件变为空任务 84 else: 85 event = self.q.get() # 事件为正常任务 86 else: 87 88 self.generate_list.remove(current_thread) # 生成任务列表移除当前任务 89 90 def close(self): 91 """ 92 执行完所有的任务后,所有线程停止 93 """ 94 self.cancel = True # 线程停止,判定条件变为真 95 full_size = len(self.generate_list) # 获取还有几个在执行任务的线程 96 while full_size: # 向队列中添加相应个数的空任务 97 self.q.put(StopEvent) 98 full_size -= 1 99 100 def terminate(self):101 """102 无论是否还有任务,终止线程103 """104 self.terminal = True # 线程池关闭,判定条件变为真105 106 while self.generate_list: # 当还有线程存在时放置空任务107 self.q.put(StopEvent)108 109 self.q.queue.clear() # 将队列中所有任务清空110 111 @contextlib.contextmanager112 def worker_state(self, state_list, worker_thread):113 """114 用于记录线程中正在等待的线程数115 """116 state_list.append(worker_thread) # 等待状态列表中添加正在等待的线程数117 try:118 yield119 finally:120 state_list.remove(worker_thread) # 移除正在等待的线程数121 122 123 124 # How to use125 126 127 pool = ThreadPool(5) # 创建一个每次支持5线程的线程池128 129 def callback(status, result):130 # status, execute action status131 # result, execute action return value132 pass133 134 135 def action(i): # 任务函数136 print(i)137 138 for i in range(30): # 使用线程池执行30次任务139 ret = pool.run(action, (i,), callback)140 141 time.sleep(1) # 1秒等待142 print(len(pool.generate_list), len(pool.free_list)) # 打印线程池内当前任务个数及空任务个数143 pool.close() # 线程停止144 pool.terminate() # 线程池终止
python第十一天-----补:线程池
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。