首页 > 代码库 > python第十一天-----补:线程池

python第十一天-----补:线程池

低版本:

 1 #!/usr/bin/env python 2 import threading 3 import time 4 import queue 5  6  7 class TreadPool: 8     """ 9     将线程加入到队列中作为资源去完成任务10     优点:简单好写容易理解11     缺点:太尼玛多了.....12     """13     def __init__(self, maxsize):14         self.maxsize = maxsize15         self._q = queue.Queue(maxsize)16         for i in range(maxsize):17             self._q.put(threading.Thread)18 19     def get_thread(self):20         return self._q.get()21 22     def add_thread(self):23         self._q.put(threading.Thread)24 25 26 def task(arg, p):27     print(arg)28     time.sleep(1)29     p.add_thread()30 31 pool = TreadPool(5)32 33 for i in range(100):34     t = pool.get_thread()35     obj = t(target=task, args=(i, pool))36     obj.start()

高级版本:

 

  1 #!/usr/bin/env python  2 # -*- coding:utf-8 -*-  3   4 import queue        # 队列模块  5 import threading    # 线程模块  6 import contextlib   # 上下文模块  7 import time         # 时间模块  8   9 StopEvent = object()            # 创建一个停止时所需要用到的对象 10  11  12 class ThreadPool(object): 13     """ 14     线程池(用于放置任务,将任务作为队列中元素让线程去取得,可以复用线程减少开销) 15     """ 16     def __init__(self, max_num, max_task_num=None): 17         """ 18         构造方法 19         :param max_num: 20         :param max_task_num:所创建的队列内最大支持的任务个数 21         """ 22         if max_task_num: 23             self.q = queue.Queue(max_task_num)  # 指定队列任务数量则创建有限队列 24         else: 25             self.q = queue.Queue()              # 未指定队列任务数量则创建无限队列 26         self.max_num = max_num                  # 每次使用的最大线程个数 27         self.cancel = False                     # 任务取消,默认False,用于线程停止的判断 28         self.terminal = False                   # 任务终止,默认False,用于线程池终止的判断 29         self.generate_list = []                 # 定义一个已生成任务列表 30         self.free_list = []                     # 定义一个空闲任务列表 31  32     def run(self, func, args, callback=None): 33         """ 34         线程池执行一个任务方法 35         :param func: 传递进来的任务函数 36         :param args: 任务函数使用的参数 37         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数 38                             1、任务函数执行状态; 39                             2、任务函数返回值(默认为None,即:不执行回调函数) 40         :return: 如果线程池已经终止,则返回True否则None 41         """ 42         if self.cancel:                         # 如果条件为真则不会继续执行 43             return 44         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 45             self.generate_thread()              # 如果现有空闲列表无元素并且已生成任务列表内 46                                                 # 元素个数小于队列支持的最大数量则创建一个线程 47         w = (func, args, callback,)             # 具体任务 48         self.q.put(w)                           # 将任务放入队列当中 49  50     def generate_thread(self): 51         """ 52         创建一个线程方法 53         """ 54         t = threading.Thread(target=self.call)  # 调用线程类创建一个线程,参数传递self.call方法 55         t.start()                               # 线程开始 56  57     def call(self): 58         """ 59         循环去获取任务函数并执行任务函数 60         """ 61         current_thread = threading.currentThread()      # 创建当前任务 62         self.generate_list.append(current_thread)       # 生成任务列表添加当前任务 63  64         event = self.q.get()                            # 事件获取 65         while event != StopEvent:                       # 当前事件不是停止时执行 66  67             func, arguments, callback = event           # 任务具体函数,参数获取 68             try: 69                 result = func(*arguments)               # 结果为任务处理的出的结果 70                 success = True                          # 任务处理成功 71             except Exception as e: 72                 success = False                         # 任务处理失败 73                 result = None                           # 结果为None 74  75             if callback is not None:                    # 回调不为空 76                 try: 77                     callback(success, result)            # 将刚才执行结果返回 78                 except Exception as e: 79                     pass 80  81             with self.worker_state(self.free_list, current_thread): 82                 if self.terminal:                       # 如果线程池已经被终止 83                     event = StopEvent                   # 事件变为空任务 84                 else:    85                     event = self.q.get()                # 事件为正常任务 86         else: 87  88             self.generate_list.remove(current_thread)   # 生成任务列表移除当前任务 89  90     def close(self): 91         """ 92         执行完所有的任务后,所有线程停止 93         """ 94         self.cancel = True                      # 线程停止,判定条件变为真 95         full_size = len(self.generate_list)     # 获取还有几个在执行任务的线程 96         while full_size:                        # 向队列中添加相应个数的空任务 97             self.q.put(StopEvent) 98             full_size -= 1 99 100     def terminate(self):101         """102         无论是否还有任务,终止线程103         """104         self.terminal = True                    # 线程池关闭,判定条件变为真105 106         while self.generate_list:               # 当还有线程存在时放置空任务107             self.q.put(StopEvent)108 109         self.q.queue.clear()                    # 将队列中所有任务清空110 111     @contextlib.contextmanager112     def worker_state(self, state_list, worker_thread):113         """114         用于记录线程中正在等待的线程数115         """116         state_list.append(worker_thread)        # 等待状态列表中添加正在等待的线程数117         try:118             yield119         finally:120             state_list.remove(worker_thread)    # 移除正在等待的线程数121 122 123 124 # How to use125 126 127 pool = ThreadPool(5)                            # 创建一个每次支持5线程的线程池128 129 def callback(status, result):130     # status, execute action status131     # result, execute action return value132     pass133 134 135 def action(i):                                          # 任务函数136     print(i)137 138 for i in range(30):                                     # 使用线程池执行30次任务139     ret = pool.run(action, (i,), callback)140 141 time.sleep(1)                                           # 1秒等待142 print(len(pool.generate_list), len(pool.free_list))     # 打印线程池内当前任务个数及空任务个数143 pool.close()                                            # 线程停止144 pool.terminate()                                        # 线程池终止

 

python第十一天-----补:线程池