首页 > 代码库 > Python自学笔记-进程,线程(Mr serven)

Python自学笔记-进程,线程(Mr serven)

对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。
有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。
 
同步是指:发送方发出数据后,等接收方发回响应以后才发下一个数据包的通讯方式。
异步是指:发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。
初始化进程、线程与协成的概念
什么是进程?
  进程,是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。前面的话我也没懂,用非官方的白话来解释就是——执行中的程序是进程,比如qq不是进程,但是当我们双击qq开始使用它的时候,它就变成了一个进程。我们写的python程序,只有当我们执行它的时候,它才是进程。我们正在执行的IE浏览器,QQ,pycharm都是进程,从操作系统的角度来讲,每一个进程都有它自己的内存空间,进程之间的内存是独立的。
什么是线程?
  线程,有时被称为轻量级进程,是程序执行流的最小单元。我们可以理解为,线程是属于进程的,我们平时写的简单程序,是单线程的,多线程和单线程的区别在于多线程可以同时处理多个任务,这时候我们可以理解为多线程和多进程是一样的,我可以在我的进程中开启一个线程放音乐,也可以开启另外的线程聊qq,但是进程之间的内存独立,而属于同一个进程多个线程之间的内存是共享的,多个线程可以直接对它们所在进程的内存数据进行读写并在线程间进行交换。
 
什么是协程?
  协程是一种用户态的轻量级线程。如果说多进程对于多CPU,多线程对应多核CPU,那么事件驱动和协程则是在充分挖掘不断提高性能的单核CPU的潜力。我们既可以利用异步优势,又可以避免反复系统调用,还有进程切换造成的开销,这就是协程。协程也是单线程,但是它能让原来要使用异步+回调方式写的非人类代码,可以用看似同步的方式写出来。它是实现推拉互动的所谓非抢占式协作的关键。对于python来说,由于python多线程中全局解释器导致的同时只能有一个线程访问cpu,所以对协程需求就相比于其他语言更为紧迫。
进程、线程与协程
  从硬件发展来看,从最初的单核单CPU,到单核多CPU,多核多CPU,似乎已经到了极限了,但是单核CPU性能却还在不断提升。server端也在不断的发展变化。如果将程序分为IO密集型应用和CPU密集型应用,二者的server的发展如下:
    IO密集型应用: 多进程->多线程->事件驱动->协程
    CPU密集型应用:多进程-->多线程                                                                                                                                                                    
  调度和切换的时间:进程   >   线程   >  协程
 
 

Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

import threading
import time

def show(arg):
    time.sleep(1)
    print("Thread"+str(arg))

for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()

print("main thread stop")

上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

更多方法:

    • start            线程准备就绪,等待CPU调度
    • setName      为线程设置名称
    • getName      获取线程名称
    • setDaemon   设置为后台线程或前台线程(默认)
                         如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                          如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
    • join              逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

                   run              线程被cpu调度后自动执行线程对象的run方法.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):
        print("Running on number:%s"%self.num)
        time.sleep(3)
        print(time.localtime())
if __name__ == __main__:
    print(time.localtime())
    t1 = MyThread(1)
    t2 = MyThread(2)
    t3 = MyThread(3)
    t4 = MyThread(4)
    t1.start()
    t2.start()
    t3.start()
    t4.start()

线程锁(Lock、RLock)

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

# no lock
import threading
import time

gl_num = 0

def show(arg):
    global gl_num
    time.sleep(1)
    gl_num += 1
    print(gl_num)

for i in range(100):
    t = threading.Thread(target=show, args=(i,))
    t.start()
print("main thread stop")
# lock

import threading
import time

gl_num = 0
lock = threading.RLock()

def Func():
    lock.acquire()
    global gl_num
    gl_num += 1
    time.sleep(1)
    print(gl_num)
    lock.release()

for i in range(10):
    t = threading.Thread(target=Func)
    t.start()

信号量(Semaphore)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading, time

def run(n):
    semaphore.acquire()
    time.sleep(4)
    print(" Begin to Run the thread:%s"%n)
    semaphore.release()

if __name__ == __main__:
    num = 0
    semaphore = threading.BoundedSemaphore(6)
    for i in range(200):
        t = threading.Thread(target=run, args=(i,))
        t.start()

事件(event)

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True
import threading

def do(event):
    print("Start")
    event.wait()
    print("Execute")

event_obj = threading.Event()
for i in range(5):
    t = threading.Thread(target=do,args=(event_obj,))
    t.start()

event_obj.clear()
inp = input("input:")
if inp == true:
    event_obj.set()

条件(Condition)

使得线程等待,只有满足某条件时,才释放n个线程.

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread:%s"%n)
    con.release()

if __name__ == __main__:
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
    while True:
        inp = input(>>>)
        if inp == q:
            break
        con.acquire()
        con.notify((int(inp)))
        con.release

 

import threading

def condition_func():
    ret = False
    inp = input(">>>")
    if inp == 1:
        ret = True

    return ret

def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread:%s"%n)
    con.release()

if __name__ == __main__:
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
from threading import Timer

def hello():
    print("helloworld")

t = Timer(5, hello)
t.start()
from multiprocessing import Process
import threading
import time

def foo(i):
    print("say hi",i)

for i in range(10):
    p = Process(target=foo, args=(i,))
    p.start()

注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。

进程数据共享

进程各自持有一份数据,默认无法共享数据.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from multiprocessing import Process
from multiprocessing import Manager

import time

li = []
def foo(i):
    li.append(i)
    print("say hi,",li)

for i in range(6):
    p = Process(target=foo, args=(i,))
    p.start()

print("ending",li)

# 方法一,Array

from multiprocessing import Process, Array

temp = Array(i,[11,22,33,44,55])
def Foo(i):
    temp[i] = 100 + i
    for item in temp:
        print(i,---->,item)
for i in range(5):
    p = Process(target=Foo,args=(i,))
    p.start()

#方法2: manage.dict()共享数据

from multiprocessing import Process, Manager

manage = Manager()
dic = manage.dict()

def foo(i):
    dic[i] = 100 + i
    print(dic.values())

for i in range(2):
    p = Process(target=foo, args=(i,))
    p.start()
    p.join()

 

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from multiprocessing import Process, Queue

def f(i,q):
    print(i,q.get())

if __name__ == __main__:
    q = Queue()

    q.put(h1)
    q.put(h2)
    q.put(h3)

    for i in range(10):
        p = Process(target=f, args=(i,q,))
        p.start()

当创建进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。

from multiprocessing import Process, Array, RLock

def Foo(lock, temp, i):
    lock.acquire()
    temp[0] = 100 + i
    for item in temp:
        print(i,---->,item)
    lock.release()
    
lock = RLock()
temp = Array(i,[11,22,33,44,55])
for i in range(20):
    p = Process(target=Foo, args=(lock,temp,i,))

 

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
from multiprocessing import Process, Pool
import time

def Foo(i):
    time.sleep(2)
    return i + 100

def Bar(arg):
    print(arg)

pool = Pool(5)
print(pool.apply(Foo,(1,)))
print(pool.apply_async(func=Foo, args=(1,)).get())

for i in range(10):
    pool.apply_async(func=Foo, args=(i,), callback=Bar)

print(End)
pool.close()
pool.join()

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
gr1.switch()
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import gevent

def foo():
    print("Running in foo")
    gevent.sleep(1)
    print("Explicit context switch to foo again")

def bar():
    print("Explicit context to bar")
    gevent.sleep(0)
    print("Implicit context switch back to bar")

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

 

Python自学笔记-进程,线程(Mr serven)