首页 > 代码库 > 线程锁、线程池

线程锁、线程池

 



一、线程(IO密集型工作多线程有用)

  • 线程:
    • 概述:
      • 若一个文件从上到下顺序执行,则为串行执行,整个py文件实际上是一个主线程
      • 若多线程,则可以并行执行,同一个时刻可以运行多个代码段
      • 给每个client请求分配一个线程,则这些线程可以同时工作
    • 多线程、多进程:
      • 1、一个应用程序,可以有多进程和多线程;默认是单进程、单线程
      • 2、单进程、多线程 :
            • 多线程:IO操作(输入输出流,文件操作)有用,因为几乎不用cpu来调度,一般用多线程来提高并发
            • 计算型操作,需要用到cpu调度执行,一般用多进程提高并发
      • 3、GIL 全局解释器锁,即进程中同一时刻只能被CPU 调度一个线程
  • 创建方式
    • 创建方式一:常规方式,比较简单常用
import threading
def func1(arg):
print(arg)
# t = threading.Thread( target=线程要执行的函数,  args=函数参数-数组, kwargs=函数参数-字典)
t = threading.Thread(target=f1, args=(123,) )
t.start() 
# 准备就绪,准备让cpu进行调度,
# 只要cpu一旦调度了该线程,就会执行threading 模块中的run方法,run方法只做一件事,执行target指向的函数
# 即,target实际上是run在方法内部执行的
    • 创建方式二: 重写init和run
class MyThread(threading.Thread):
def __init__(self, func, args):
self.func = func 
self.args = args 
super(MyThread, self).__init__()
def run(self):
self.func(self.args)
def f2():
pass
obj = MyThread(f2, 123)
obj.start()
  • 常用方法:
    • t.start()  
        • 准备好该线程,等待CPU进行调度,
        • 虽然线程已经就绪,但是什么时候执行 谁也不知道,只有被cpu进行调度的时候才会执行
    • t.join(3) 
        • 逐个执行线程,执行完毕后再往下执行;可设置执行“超时时间”,该方法使得多线程没有意义
        • 主线程代码执行到这里,会等待子线程去执行,等待超时时间为3秒,子线程执行完毕,主线程才会继续往下执行
    • t.setDaemon(True) 
        • 设置为后台进程(默认为False),主线程执行过程中,后台线程也在执行,主线程执行完毕,后台线程立即终止


二、线程锁
  • 概述:
    • 线程锁:
      • 因为线程是随机调度,即可能并行去更“改同一个数据”,若多个线程同时对资源进行修改,则会发生错误
      • 互斥锁,同一时刻,只允许一个线程来执行的操作
    • 未使用“锁”:
      • 10个人进行购买商品,每个人购买后商品减1,每个人看到的结果应该为9876543210
      • 实际执行,所有人同时购买商品,同时修改NUM,同时输出NUM ,则都输出0 
import threading
import time

NUM = 0
def function1():
global NUM
NUM += 1
time.sleep(0.1) # 需要让所有线程都夯在这里,然后同时输出
print(NUM)

for x in range(10):
t = threading.Thread(target=function1, )
t.start()
  • 各种锁讲解
    • “Rlock 锁”:
      • 将数据修改的位置作为原子操作,加锁
import threading
import time

NUM = 0
lock = threading.RLock()

def function1():
lock.acquire()  
global NUM
NUM += 1
time.sleep(2)
print(NUM)
lock.release()

for x in range(10):
t = threading.Thread(target=function1,)
t.start()
 
    • 注意:lock = threading.Rlock() # 支持多层锁的嵌套,一般用Rlock,lock不支持
def func(l):
global NUM 
l.acquire()
NUM -= 1 
l.acquire()
time.sleep(2)
l.release()
print(NUM)
l.release()
    • semaphore”锁
      • 信号量:表示同一时刻允许有多个线程同时工作,比如火车站过安检,每次允许10个人,上批次安检完成后再来十个人
      • 注意:若同一批次运行的线程,都修改同一个数据,依然会有“数据错误”的情况,这里只是演示功能
import threading
import time

NUM = 0
semaphore = threading.BoundedSemaphore(5) # 表示每批放进来5个线程

def function1(i, se):
se.acquire()
global NUM
NUM += 1
time.sleep(2)
print(‘我是线程:{}, NUM此时为:{}‘.format(i, NUM))
se.release()

for x in range(10):
t = threading.Thread(target=function1, args=(x, semaphore))
t.start()
    • “event”锁:
      • 事件:要锁全部锁,要放全部放,实则内部维护了一个变量,值为布尔值
      • 该锁提供了3个方法:
        1. set() # 将Flag 值设为True
        2. clear() # 将Flag 值设为 False
        3. wait() # 若Flag为False,则阻塞住;若为True则放行
import threading
event = threading.Event()

def function(id, en):
print(‘兵{}:大王,怎么办?‘.format(id))
en.wait()
print(‘兵{}:杀呀~~~‘.format(id))

for x in range(10):
t = threading.Thread(target=function, args=(x, event))
t.start()

event.clear()
print(‘1、杀出一条血路 2、投降‘)
you_choice = input(‘>>‘).strip()
if you_choice == ‘1‘:
event.set()
    • "condition" 锁
      • 使得线程等待,只有当满足条件时,才放出N个线程去执行任务
      • 注意,wait和notify方法前后,必须被acquire和release方法包起来
      • 姿势1:自行notify 通知运行N个
import threading
condition = threading.Condition()

def function(id, con):
con.acquire()  
con.wait()  # 子线程全部卡在这里,等待通知
print(‘兵{}:杀呀~~~‘.format(id))
con.release()

for x in range(100):
t = threading.Thread(target=function, args=(x, condition))
t.start()


print(‘你要挑战几个?‘)
you_choice = input(‘>>‘).strip()
condition.acquire()  
condition.notify(int(you_choice))   # 传进去几个,上面运行几个线程
condition.release()
      • 姿势2:条件成立,则放出一个去运行
import threading
def condition_func(): # 该函数返回值必须为布尔值
ret = False
inp = input(‘>>‘).strip()
if inp == ‘1‘:
ret = True
return ret

def function(id, con):
con.acquire()  
con.wait_for(condition_func) # 一旦该函数体返回值为True,则放出一个线程
print(‘thread{}:im gone~~~‘.format(id))
con.release()

condition = threading.Condition()
for x in range(5):
t = threading.Thread(target=function, args=(x, condition))
t.start()
 
 
三、Timeer 定时器:
  • 定时器,指定n秒后执行某操作
from threading import Timer
import time
 
def function():
print(‘开火~‘)
 
t = Timer(3, function)  # 子线程,三秒后执行该函数
t.start()
for i in range(1, 4):
print(‘time:{}‘.format(i)) # 主线程,数数,每次间隔1秒
time.sleep(1)



 
 
四、线程池:
  • 线程池概述:
    • “线程池”与“上下文切换”:
      • CPU调度线程,当时间片用完,会进行切换,每次切换时线程现场的“保存与载入”都是时间开销
      • 当线程数目达到一个峰值后,再多创建线程,执行效率会下降
      • 因此,应该控制线程创建的最大数目,池中线程取一个少一个,无线程时,后续请求等待;执行完毕归还线程
    • 要创建一个线程池,那么该线程池应满足如下条件:
      • 线程池为一个容器
      • 池中线程用一个少一个
      • 池中无线程,则进行等待
      • 线程执行完任务,则进行归还
  • 创建姿势1: 利用队列来存放“线程”
import time
import queue
import threading

class MyPool:
def __init__(self, maxsize=5):
self.maxsize = maxsize
self._q = queue.Queue(self.maxsize)  # 创建一个指定大小的队列
for _ in range(self.maxsize):
self._q.put(threading.Thread)  # 在队列中全为线程"类"

def get_thread(self):
return self._q.get()  # 取出一个线程"类"

def add_thread(self):
self._q.put(threading.Thread)  # 添加一个线程“类”

# 定义任务,参数为线程池(执行该任务的线程来源),执行完毕后向该池中归还(添加)一个线程
def task(n, p):
print(‘{}: 执行任务,完毕‘.format(n))
time.sleep(2)
p.add_thread()

# 创建一个大小为5的线程池
pool = MyPool(5)

for i in range(100):
t_class = pool.get_thread()  # 从队列中获取一个线程"类名称"
t_obj = t_class(target=task, args=(i, pool))  # 用取出来的类,实例化个线程,并交给线程任务去执行,每次从队列中取一个线程,若没有线程则等待
t_obj.start()
  • 此时,每次有5个线程去处理工作,不会超过5
  • 问题:
    • 线程重用:任务执行完成后归还线程,用的是“重新创建一个线程并put进队列”的方法,即原来的线程还放于内存中等待被GC回收
    • 空闲线程:若任务数少于池数目,则会多余创建,即应该线程池最初是空的,来一个创建一个,最大为5
 






























姿势2: 


  • 队列中存放任务:
    • 往往每个任务都是一个函数,
    • 可以将任务和其相关参数搞成元组(函数名,参数),将这些元组put到队列中,则队列中保存的全是需要执行的任务
  • 创建N个线程(N为线程池大小):
    • 每个线程都“循环”从队列中get任务并执行(线程重用)
    • 若队列中所有任务都取完了,则终止已经创建的线程,终止方法如下:
      1. get方法有超时时间,可以设置超时时间,超过这个时间则线程自动销毁
      2. 在队列末尾插入几个空值,get后判断,若取到的是任务则执行,否则则终止
 
 
 
 
 
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object() # 创建个静态字段,字段任何值都行,相当于None,即要在队列后插入的空值


class ThreadPool(object):

def __init__(self, max_num, max_task_num = None):
# 创建任务队列,可显示指定任务队列大小,否则为不限制
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
 
 
self.max_num = max_num # 最大线程数
self.cancel = False
self.terminal = False
self.generate_list = [] # 当前已经创建的线程
self.free_list = [] # 当前的空闲线程,队列中没有任务,则线程会空闲

def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if self.cancel:
return
# 没有空闲着的线程,并且已经创建的线程有没有超过线程池最大大小时,创建线程
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,) # 将线程的id,要执行的任务,包装成一个元组
self.q.put(w) # 将包装好的元组放入队列中

def generate_thread(self):
"""
创建一个线程,并执行call方法
"""
t = threading.Thread(target=self.call)
t.start()

def call(self):
"""
循环去获取任务函数并执行任务函数
"""
current_thread = threading.currentThread() # 获取当前线程
self.generate_list.append(current_thread) # 在线程列表中添加一个线程,

event = self.q.get() #从队列中获取任务,任务都是包装的元组(id,任务)
# 若是一个元组,即不等于stopevent
while event != StopEvent:
func, arguments, callback = event # 元组解包
try:
result = func(*arguments)  # 执行任务
success = True
except Exception as e:
success = False
result = None

if callback is not None:
try:
callback(success, result)
except Exception as e:
pass
# 线程执行完任务,则线程处于空闲状态,则标记该任务为空闲进程,下次直接会从空闲进程去取任务
with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get() # 执行完任务后,该线程又去获取任务,然后再次while
# 若evnent不是一个元组,则从线程列表中移除当前线程
else:
self.generate_list.remove(current_thread)

def close(self):
"""
执行完所有的任务后,所有线程停止,创建了几个线程,则往任务队列中插入几个空值
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1

def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True

while self.generate_list:
self.q.put(StopEvent)

self.q.queue.clear()

@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)



# How to use


pool = ThreadPool(5)

def callback(status, result):
# 需要执行的任务
pass


def action(i):
print(i)

# 创建300个任务
for i in range(300):
ret = pool.run(action, (i,), callback) 

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
 
 
 
 
 
 
 
 
 
 
 
 

 
 

 

线程锁、线程池