首页 > 代码库 > 线程和进程

线程和进程

线程和进程

进程:狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。

简而言之,就像qq要以一个整体的形式暴露给操作系统管理,里面包含对各种资源的调用,内存的管理,网络接口的调用等,对各种资源管理的集合就可以称为 进程。

进程要操作CPU必须要先创建一个线程,在当代面向线程设计的计算机结构中,进程是线程的容器。

 

线程:是程序执行流的最小单元,是一串指令的集合

 

区别:

1,线程共享内存空间和资源(对于线程之间而言),而进程内存是独立的

2,一个进程中的线程之间可以直接交流(读取全局变量),两个进程想通信,必须借助中间代理

3,创建新线程很简单,创建新进程需要对父进程进行一次克隆。

4,一个线程可以控制和操作同一个进程中的其他线程,但是进程只能操作子进程。

5,线程启动速度比进程快,线程的上下文切换要比进程快的多

 

对于IO操作,一般使用多线程来提高并发。
对于计算性操作,一般使用多进程来提高并发。
归根结底是因为下面的计算机规定:IO操作不占用CPU,计算性操作占用CPU,另外对于一个原因就是GIL

 

GIL

GIL(全称Global Interpreter Lock)全局解释器锁(java和C#中没有这一机制)
隐含的意思:对于任何Python程序,不管有多少的CPU和线程,任何时候都总是只有一个线程在执行。

总结:多线程和多进程的目的在于提高并发,IO密集型用多线程,计算密集型用多进程。

 

 

一,线程

  1,线程的基本操作

 

import threading
import time


def f1(*args):
	time.sleep(5)
	print(args)

# 单进程,单线程
t = threading.Thread(target=f1, args=(123,))
t.setDaemon(True)
# 设置为True,表示主线程不等子线程执行完就结束整个程序
# 设置为False,表示主进程等子线程结束后才会结束整个程序
t.start()   # 一个线程

t.join()
# 不加参数,默认表示主线程一直等待,等到子线程执行完毕,才会往下走。
# 加参数,参数表示主线程在此最多等待n秒

print("end")

 

  

注意:t.join() 和 t.setDaemon(False)意义不同,t.join()会停住等到子线程执行完才往下走。t.setDaemon(False)不会停止会先执行下面的程序,然后再等待子线程执行完才结束。

 

  2,线程创建的两种方式

技术分享
import threading
import time


def f1(*args):
    time.sleep(5)
    print(args)

t = threading.Thread(target=f1, args=(123,))
t.setDaemon(True)
# 设置为True,表示主线程不等子线程执行完就结束整个程序
# 设置为False,表示主进程等子线程结束后才会结束整个程序
t.start()   # 一个线程
第一种

注意:其实上述线程的创建和调用在内部执行了,run()方法,自己动手去找一边。

 

技术分享
class MyThread(threading.Thread):
    def __init__(self, func, args):
        self.func = func
        self.args = args
        super(MyThread, self).__init__()

    def run(self):
        self.func(self.args)

obj = MyThread(f1, 123)
obj.start()
第二种

注意:第二种方式采用了自定义类的方式创建了线程,两种方法的本质其实都是一样的。都需要执行run()方法,第一种执行内部的run()方法,第二种执行自定义的run()方法。

 

二, 队列

  1,先进先出   

put放数据,是否阻塞(默认阻塞),阻塞时的超时时间
get取数据,是否阻塞(默认阻塞),阻塞时的超时时间
队列长度
qsize()真实的个数
maxsize 最大支持的个数
join,task_done, join方法会阻塞进程,当队列中任务执行完毕之后,不再阻塞(即每次执行完任务,后来要加task_done)
empty() 检查是否为空
full() 检查是否已满

例1.

import queue
q = queue.Queue(2)  # 里面参数表示最多可以放的元素的个数(默认为没有个数限制),如果超过这个个数,默认会阻塞。
q.put(1)   # 放数据
q.put(2)
print(q.qsize())  # 队列中元素的个数
q.put(6, block=False)  # 放数据时默认会阻塞,即block为True;否则,block为False时,如果放不进去,会直接报错!
q.put(3, block=False, timeout=2)  # 后面还可以接超时时间,如果2秒后仍然放不进去,则直接报错!

print(q.qsize())  # 队列中元素的个数
print(q.get())  # 取数据

  

  例2.

import queue
q = queue.Queue(2)
q.put(1)
q.put(2)
q.get()
q.task_done()
q.get()
q.task_done()
q.join()

  

注意:这里是放到队列中两个数据,然后再取出两个数据,程序执行完毕,队列清空。如果直接再后面加上join,会阻塞程序。如果每次取出数据都告诉join(即在每次取完时在q.task_done),直到取完程序立刻终止,不会阻塞。用的比较少。

 

  2,其他队列

queue.Queue() # 先进先出队列
queue.deque() # 双向队列(两头进两头出)
queue.LifoQueue() # 后进先出队列(last in first out)
queue.PriorityQueue() # 优先级队列

例1:先进后出

import queque	
q = queue.LifoQueue()  # 后进先出队列(last in first out)

q.put(123)
q.put(456)
r = q.get()
print(r)

#执行结果:
#456

  

  例2:优先级队列

import queque
q = queue.PriorityQueue()  # 优先级队列

q.put((0, ‘tom‘))  # 两个参数,前面一个是优先级,后面一个是放进去的数据,优先级越高,先出
q.put((5, ‘lily‘))
q.put((2, ‘yn‘))
r = q.get()
print(r)

#执行结果:
#tom

  

  例3:双向队列

import queue

q = queue.deque()  # 双向队列(两头进两头出)

q.append("hello")
q.append("world")
q.appendleft("tom")
r = q.pop()
r1 = q.popleft()
print(r)
print(r1)

#执行结果:

#world
#tom

  注意:这些队列都是在python内存中创建的,程序退出队列清空!

 

 

三,生产者和消费者模型

比如:用户访问12306网站春运时并发非常的高,大量的用户来连接服务器很可能引起服务器的宕机。此时引入生产者消费者模型,就好比在中间加了个管道,如有大并发的连接,全部放入管道,等待服务器到管道获取连接进行排队处理,服务器处理完,用户在进行查询来获取处理结果。

 

 

四, 线程锁(互斥锁)

import threading
import time
NUM = 10 


def f1(l):
	global NUM
	# 上锁
	l.acquire()
	NUM -= 1
	time.sleep(2)
	print(NUM)
	# 开锁
	l.release()

# lock = threading.Lock()
lock = threading.RLock()

for i in range(10):
	t = threading.Thread(target=f1, args=(lock,))
	t.start()

  

总结:
1,线程在操作系统执行的最小单元,同一个进程中的线程之间共享内存等各种资源,如何同一个进程中的
很多线程都在修改同一个数据,就是造成数据的混乱,不统一的情况,所以需要线程锁,也叫互斥锁。这样
线程每个线程在执行操作之前都在加锁,所以保证了数据的一致性。
如果上述程序不加锁的输出为:10个0,正常输出应该是:9-0

2,注意Rlock 和 lock的区别,Rlock为递归锁,锁多层,lock只能锁一次。Rlock里面支持多层锁的嵌套,
一般最好使用Rlock,好处显而易见。

 

五, 信号量 (semaphore)

互斥锁,同时只允许一个线程更改数据,而semaphore是同时允许一定数量的线程更改数据,比如,有5个收费口高速公路收费站只允许同时5辆车进行缴费,前面5辆车过去,后面的车才能进来。

import threading
import time
NUM = 10


def f1(l):
	global NUM
	# 上锁
	l.acquire()   # 这时每次只放行2个
	NUM -= 1
	time.sleep(2)
	print(NUM)
	# 开锁
	l.release()

# lock = threading.Lock()
# lock = threading.RLock()
lock = threading.BoundedSemaphore(2)  # 允许同时最多有2个同时运行
for i in range(10):
	t = threading.Thread(target=f1, args=(lock,))
	t.start()

  

 

六, 事件(event)

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
clear:将“Flag”设置为False
set:将“Flag”设置为True

import threading


def f1(i, e):
	print(i)
	e.wait()   # 检测是什么灯,如果是红灯,停;如果是绿灯,行。
	print(i + 100)

event = threading.Event()
for i in range(10):
	t = threading.Thread(target=f1, args=(i, event))
	t.start()
# ------------------
event.clear()   # 设置为红灯,全部暂停
inp = input(‘>>>‘)
if inp == ‘1‘:
	event.set()  # 设置为绿灯,设置为绿灯,全部放行。

  执行流程:首先10个线程首先print(i),然后阻塞,等待放行,然后一次性全部通过,执行print(i + 100)

 

 

七, 条件(condition)

让线程只有在满足某个条件的情况下才允许n个线程通过。

技术分享
import threading


def f1(i, conn):
    print(i)
    conn.acquire()
    conn.wait()
    print(i + 100)
    conn.release()

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=f1, args=(i, c))
    t.start()

while True:
    inp = input(">>>")
    if inp == q:
        break
    c.acquire()         # ---|
    c.notify(int(inp))  # ------> 注意这三句必须放在一起,为固定用法
    c.release()         # ---|
第一种写法

执行流程:首先10个线程首先print(i),接着线程阻塞在conn.acquire()这里,主线程接着向下执行,执行到input时,等待用户输入,当输入n,就会放行n个线程去执行print(i + 100)

 

技术分享
import threading

def condition():
    ret = False
    r = input(">>>")
    if r == "true":
        ret = True
    else:
        ret = False
    return ret

def f1(i, conn):
    print(i)
    conn.acquire()
    conn.wait_for(condition)
    print(i + 100)
    conn.release()

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=f1, args=(i, c))
    t.start()
第二种写法

和上述例子notify(1)的结果相同,只有满足某些条件才会执行。

 

 

八, 定时器

from threading import Timer

def f1():
	print("hello, world !")
t = Timer(10, f1)
t.start()

等待10秒中才会执行f1函数,写监控,客户端等才会用到。

 

 

九, 自定义线程池

线程池非常重要,但是python2没有提供,python3中提供了这个功能,但是很弱。可以利用第三方库,最好自己去写!

思路:维护一个容器(要有最大连接数),取一个少一个线程,无线程时等待,线程执行完毕,交还线程。需要用到队列!

1,低版本的线程池

import queue
import threading
import time


class ThreadPoll:
	def __init__(self, maxsize=5):
		self.maxsize = maxsize
		self._q = queue.Queue(maxsize)
		for i in range(maxsize):
			self._q.put(threading.Thread)
			# [threading.Thread, threading.Thread, threading.Thread, threading.Thread]

	def get_thread(self):
		return self._q.get()

	def add_thread(self):
		self._q.put(threading.Thread)

# 最多有5的允许5个线程
poll = ThreadPoll(5)


def task(args, p):
	print(args)
	time.sleep(1)
	p.add_thread()
	# 如果没有最后这个步的话,到第六个get_thread()会因为获取不到线程而阻塞住
	# 所以在取走一个线程之后再添加一个线程。


for i in range(100):
	t = poll.get_thread()  # 此时的t为threading.Tread类
	obj = t(target=task, args=(i, poll,))  # 创建一个threading.Tread对象
	obj.start()

  

执行流程:会5个5个的执行,注意线程执行没有先后顺序!

分析:通过上述线程池会发现一些问题。
a, 线程不能重复利用,只能开垃圾回收机制进行回收!
b, 线程池直接开到了最大。当线程个数小于5个时,也会创建5个,导致线程的浪费!

 

  2,高级线程池

#!/usr/bin/python
# _*_ coding:utf-8 _*_
import queue
import threading
import contextlib
import time
# 队列里面直接放任务,[(函数名,参数1),(函数名,参数2),(函数名,参数3)]


StopEvent = object()


class ThreadPool(object):
	def __init__(self, max_num, max_task_num=None):
		if max_task_num:
			self.q = queue.Queue(max_task_num)
		else:
			self.q = queue.Queue()
		self.max_num = max_num
		self.cancel = False
		self.terminal = False
		self.generate_list = []  # 当前已经创建了多少线程
		self.free_list = []  # 当前还空闲多少线程

	def run(self, func, args, callback=None):
		"""
		线程池执行一个任务
		:param func: 任务函数
		:param args: 任务函数所需参数
		:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
		:return: 如果线程池已经终止,则返回True否则None
		"""
		if self.cancel:
			return
		if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
			# 当没有空闲的线程数,而且已经能够创建的线程数小于最大的线程数时,则创建线程
			self.generate_thread()
		w = (func, args, callback,)
		self.q.put(w)  # 如果不满足上述判断的话,只把任务放在队列中而不会去创建线程

	def generate_thread(self):
		"""
		创建一个线程
		"""
		t = threading.Thread(target=self.call)
		t.start()

	def call(self):
		"""
		循环去获取任务函数并执行任务函数
		"""
		current_thread = threading.currentThread
		self.generate_list.append(current_thread)

		event = self.q.get()
		while event != StopEvent:

			func, arguments, callback = event
			try:
				result = func(*arguments)
				success = True
			except Exception as e:
				success = False
				result = None

			if callback is not None:
				try:
					callback(success, result)
				except Exception as e:
					pass

			with self.worker_state(self.free_list, current_thread):
				if self.terminal:
					event = StopEvent
				else:
					event = self.q.get()
		else:

			self.generate_list.remove(current_thread)

	def close(self):
		"""
		执行完所有的任务后,所有线程停止
		"""
		self.cancel = True
		full_size = len(self.generate_list)
		while full_size:
			self.q.put(StopEvent)
			full_size -= 1

	def terminate(self):
		"""
		无论是否还有任务,终止线程
		"""
		self.terminal = True

		while self.generate_list:
			self.q.put(StopEvent)

		self.q.empty()

	@contextlib.contextmanager
	def worker_state(self, state_list, worker_thread):
		"""
		用于记录线程中正在等待的线程数
		"""
		state_list.append(worker_thread)
		try:
			yield
		finally:
			state_list.remove(worker_thread)

# ----------------执行--------------------------
pool = ThreadPool(5)


def callback(status, result):  # 定义的任务
	# status, execute action status
	# result, execute action return value
	pass


def action(i):
	print(i)


for i in range(300):
	ret = pool.run(action, (i,), callback)

# time.sleep(1)
# print(len(pool.generate_list), len(pool.free_list))
# print(len(pool.generate_list), len(pool.free_list))

  

 

十, 进程基本的进程操作

from multiprocessing import Process


def f1(i):
	print("hello,{}".format(i))

if __name__ == ‘__main__‘:
	# 注意在windows必须加这一段,否则会报错!但是,在linux,mac没有上述一行代码依然可以执行!
	# 所以,在windows下慎用多进程!
	for m in range(10):
		p = Process(target=f1, args=("tom",))
		# p.daemon = True
		p.start()
		# p.join()

  # 进程里面的这些方法和线程一样,这里不再一一赘述

 

 

十一 进程的数据共享

进程中默认数据是不共享的

from multiprocessing import Process
import threading
import time


def f1(i, args):
	args.append(i)
	print("hello,{}".format(i), args)

if __name__ == ‘__main__‘:
	# 注意在windows必须加这一段,否则会报错!但是,在linux,mac没有上述一行代码依然可以执行!
	li = []
	for m in range(5):
		p = Process(target=f1, args=(m, li))
		# p = threading.Thread(target=f1, args=(m, li))
		# p.daemon = True
		p.start()
		# p.join()

  

执行结果:

hello,0 [0]
hello,1 [1]
hello,2 [2]
hello,4 [4]
hello,3 [3]
由此可以看出进程之间,默认数据是不共享的。如果共享列表中的元素应该不断增加

 

数据共享的种三种方式:

  a. 第一种,运用特殊的queques方式

  

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing


def f1(i, args):
	args.put(i)
	print(args.qsize())

if __name__ == ‘__main__‘:
	li = queues.Queue(20, ctx=multiprocessing)
	for m in range(5):
		p = Process(target=f1, args=(m, li))
		p.start()

  

b. 第二种,运用数组的方式

from multiprocessing import Process
from multiprocessing import Array


def f1(i, args):
	args[i] = i + 100
	for n in args:
		print(n)
	print("---------")


if __name__ == ‘__main__‘:
	li = Array(‘i‘, 6)   # 创建数组,规定好是字符型,数量为6个,这里的数组是C语言的格式
	for m in range(5):
		p = Process(target=f1, args=(m, li))
		p.start()

数组和链表的关系,列表就是基于链表来实现的。链表在内存中的数据不一定是连续的,但是每一个数据块都会记录上一个和下一个块的位置。而数组则在内存中的数据块是连续,类型、数量都是规定好的。注意所有语言里面的数据都是一样的。这种方式不常用。

 

  c. 第三种方式,利用特殊字典的方式

from multiprocessing import Process
from multiprocessing import Manager


def f1(i, args):
	args[i] = i + 100
	print(args.values())

if __name__ == ‘__main__‘:
	obj = Manager()
	li = obj.dict()
	for m in range(5):
		p = Process(target=f1, args=(m, li))
		p.start()
		p.join()    # 重要

注意,最后的p.join()必须加上,否则会报错!主进程执行完之后会停止,而此时创的子进程需要修改主进程的数据,而此时主进程已经关闭。这种方式比较常用!

 

 

十二, 进程锁

from multiprocessing import Process, Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import time


def f1(i, lis, lc):
	lc.acquire()
	lis[0] = lis[0] - 1
	time.sleep(1)
	print(‘hello,‘,lis[0])
	lc.release()
if __name__ == ‘__main__‘:
	li = Array(‘i‘, 1)
	li[0] = 10
	lock = RLock()
	for m in range(5):
		p = Process(target=f1, args=(m, li, lock))
		p.start()
		p.join()

  

注意:如果没有进程锁,多个子进程都要修改数据,会造成混乱,需要加锁

总结:进程中同样有RLock, Lock, Event, Condition, Semaphore这个方法,和线程相同这里不再一一赘述!

 

 

十三, 进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池python给定义好了,不需要自己去定义,只需要会用即可。

from multiprocessing import Pool
import time

def f1(args):
	print(args)

if __name__ == ‘__main__‘:
	pool = Pool(5)
	for i in range(30):
		# pool.apply(func=f1, args=(i,))  # 这是串行来执行的
		pool.apply_async(func=f1, args=(i, ))  # 这是异步执行的
	pool.close()  # 所有的任务执行完毕,才关闭
	# time.sleep(1)
	# pool.terminate()  # 立即终止当前任务!
	pool.join() # 等待所有的任务执行完毕才会终止

  注意:close和terminate的区别,以及串行和异步的区别

 

 

十四, 协程

IO密集型用多线程,计算或者CPU密集型用多进程。如果要写爬虫,会产生http请求,http请求也叫IO请求,用多线程比较合适!而对于IO密集型的也比较适合用协程(协程不适合有大量cpu操作的)

注意:线程和进程都是计算机给提供的,而协程则是在程序级别做的。

原理:利用一个线程,分解一个线程为多个微线程。

greenlet 底层的
gevent 高级的,是对greenlet的封装

 

例1,简单的协程操作-greenlet

from greenlet import greenlet


def test1():
	print(12)
	gr2.switch()
	print(34)
	gr2.switch()


def test2():
	print(56)
	gr1.switch()
	print(78)


gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

  

执行结果:(来回切换进行操作)

12
56
34
78

 

例2:简单的协程操作-gevent

import gevent


def f1():
	print("welcome to f1")
	gevent.sleep(0)
	print("welcome to f1 agin !")


def f2():
	print("welcome to f2")
	gevent.sleep(0)
	print("welcome to f2 agin !")

gevent.joinall([gevent.spawn(f1), gevent.spawn(f2)])

  

执行结果:

welcome to f1
welcome to f2
welcome to f1 agin !
welcome to f2 agin !

 

例3:涉及到IO操作,gevent就是一个高性能的代名词

from gevent import monkey
import gevent
import requests
monkey.patch_all()  # 把原来的socket功能修改了,发完请求的会告诉你发送完了,用上它,才能用协程


def f1(url):
	print("GET %s" % url)
	resp = requests.get(url)
	data = http://www.mamicode.com/resp.text"%d bytes received from %s." % (len(data), url))

gevent.joinall([
	gevent.spawn(f1, "https://www.baidu.com/"),
	gevent.spawn(f1, "https://www.python.org/"),
	gevent.spawn(f1, "https://www.sina.com/"),
])

  

 

十五, 缓存

python对memcache 和 redis 操作,详见:http://www.cnblogs.com/wupeiqi/articles/5132791.html

1,安装软件
2,程序:安装其对应的客户端(API)

 

 

线程和进程