首页 > 代码库 > python-线程池的两种实现方式 【转载】

python-线程池的两种实现方式 【转载】

技术分享
  1 #!/usr/bin/env python  2 # -*- coding:utf-8 -*-  3   4 import queue  5 import threading  6 import contextlib  7 import time  8   9 StopEvent = object() 10  11  12 class ThreadPool(object): 13  14     def __init__(self, max_num, max_task_num = None): 15         if max_task_num: 16             self.q = queue.Queue(max_task_num) 17         else: 18             self.q = queue.Queue() 19         self.max_num = max_num 20         self.cancel = False 21         self.terminal = False 22         self.generate_list = [] 23         self.free_list = [] 24  25     def run(self, func, args, callback=None): 26         """ 27         线程池执行一个任务 28         :param func: 任务函数 29         :param args: 任务函数所需参数 30         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 31         :return: 如果线程池已经终止,则返回True否则None 32         """ 33         if self.cancel: 34             return 35         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 36             self.generate_thread() 37         w = (func, args, callback,) 38         self.q.put(w) 39  40     def generate_thread(self): 41         """ 42         创建一个线程 43         """ 44         t = threading.Thread(target=self.call) 45         t.start() 46  47     def call(self): 48         """ 49         循环去获取任务函数并执行任务函数 50         """ 51         current_thread = threading.currentThread() 52         self.generate_list.append(current_thread) 53  54         event = self.q.get() 55         while event != StopEvent: 56  57             func, arguments, callback = event 58             try: 59                 result = func(*arguments) 60                 success = True 61             except Exception as e: 62                 success = False 63                 result = None 64  65             if callback is not None: 66                 try: 67                     callback(success, result) 68                 except Exception as e: 69                     pass 70  71             with self.worker_state(self.free_list, current_thread): 72                 if self.terminal: 73                     event = StopEvent 74                 else: 75                     event = self.q.get() 76         else: 77  78             self.generate_list.remove(current_thread) 79  80     def close(self): 81         """ 82         执行完所有的任务后,所有线程停止 83         """ 84         self.cancel = True 85         full_size = len(self.generate_list) 86         while full_size: 87             self.q.put(StopEvent) 88             full_size -= 1 89  90     def terminate(self): 91         """ 92         无论是否还有任务,终止线程 93         """ 94         self.terminal = True 95  96         while self.generate_list: 97             self.q.put(StopEvent) 98  99         self.q.queue.clear()100 101     @contextlib.contextmanager102     def worker_state(self, state_list, worker_thread):103         """104         用于记录线程中正在等待的线程数105         """106         state_list.append(worker_thread)107         try:108             yield109         finally:110             state_list.remove(worker_thread)111 112 113 114 # How to use115 116 117 pool = ThreadPool(5)118 119 def callback(status, result):120     # status, execute action status121     # result, execute action return value122     pass123 124 125 def action(i):126     print(i)127 128 for i in range(30):129     ret = pool.run(action, (i,), callback)130 131 time.sleep(5)132 print(len(pool.generate_list), len(pool.free_list))133 print(len(pool.generate_list), len(pool.free_list))134 # pool.close()135 # pool.terminate()
线程池(优秀)
 1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import Queue 4 import threading 5  6  7 class ThreadPool(object): 8  9     def __init__(self, max_num=20):10         self.queue = Queue.Queue(max_num)11         for i in xrange(max_num):12             self.queue.put(threading.Thread)13 14     def get_thread(self):15         return self.queue.get()16 17     def add_thread(self):18         self.queue.put(threading.Thread)19 20 """21 pool = ThreadPool(10)22 23 def func(arg, p):24     print arg25     import time26     time.sleep(2)27     p.add_thread()28 29 30 for i in xrange(30):31     thread = pool.get_thread()32     t = thread(target=func, args=(i, pool))33     t.start()34 """

 

转载
作者:武沛齐 
出处:http://www.cnblogs.com/wupeiqi/ 

python-线程池的两种实现方式 【转载】