首页 > 代码库 > Python:线程、进程与协程(2)——threading模块(1)

Python:线程、进程与协程(2)——threading模块(1)

    上一篇博文介绍了Python中线程、进程与协程的基本概念,通过这几天的学习总结,下面来讲讲Python的threading模块。首先来看看threading模块有哪些方法和类吧。

技术分享


主要有:

Thread :线程类,这是用的最多的一个类,可以指定线程函数执行或者继承自它都可以实现子线程功能。

Timer:与Thread类似,但要等待一段时间后才开始运行,是Thread的子类。

Lock :原锁,是一个同步原语,当它锁住时不归某个特定的线程所有,这个可以对全局变量互斥时使用。

RLock :可重入锁,使单线程可以再次获得已经获得的锁,即可以被相同的线程获得多次。

Condition :条件变量,能让一个线程停下来,等待其他线程满足某个“条件”。

Event:事件对象,是线程间最简单的通信机制之一:线程可以激活在一个事件对象上等待的其他线程。

Semaphore:信号量对象,是个变量,管理一个内置的计数器,指定可同时访问资源或者进入临界区的线程数。

BoundedSemaphore :有界信号量对象,与semaphore类似,但不允许超过初始值;

ThreadError:线程错误信息类。


active_count()和activeCount():返回当前活着的Thread对象个数。

current_thread()和currentThread():返回当前的Thread对象,对应于调用者控制的线程。如果调用者控制的线程不是通过threading模块创建的,则返回一个只有有限功能的虚假线程对象。

enumerate():返回当前活着的Thread对象的列表。该列表包括守护线程、由current_thread()创建的虚假线程对象和主线程。它不包括终止的线程和还没有开始的线程。

settrace(func):为所有从threading模块启动的线程设置一个跟踪函数。在每个线程的run()方法调用之前,func将传递给sys.settrace()(该函数是设置系统的跟踪函数)。

setprofile(func):为所有从threading模块启动的线程设置一个profile函数。在每个线程的run()调用之前,func将传递给sys.setprofile()(这个函数用于设置系统的探查函数)。

stack_size([size]):返回创建新的线程时该线程使用的栈的大小,

可选的size参数指定后来创建的线程使用栈的大小,它必须是0(使用平台的或者配置的默认值)或不少于32,768(32kB)的正整数。


其它一些以"_"开头的,有些是引入其它模块的函数,然后起了个别名,比如_format_exc,它的定义如下:

from traceback import format_exc as _format_exc


有些其实是类,比如RLock的定义如下:

def RLock(*args, **kwargs):
    """Factory function that returns a new reentrant lock.

    A reentrant lock must be released by the thread that acquired it. Once a
    thread has acquired a reentrant lock, the same thread may acquire it again
    without blocking; the thread must release it once for each time it has
    acquired it.

    """
    return _RLock(*args, **kwargs)

_RLock其实是个类,内部调用的类,RLock其实是个函数,返回一个类,感兴趣的可以去看看threading.py相关代码。

    由于篇幅比较长,只介绍了Thread,Lock,RLock,Condition,不正之处欢迎批评指正,当然更希望大家有耐心、仔细看完,同时能实践一下示例并仔细体会。

(一)Thread对象

    这个类表示在单独的一个控制线程中运行的一个活动。有两种创建方法:创建线程要执行的函数,把这个函数传递进Thread对象里,让它来执行;而是从Thread类中继承,然后在子类中覆盖run()方法,在子类中不应该覆盖其它方法(__init__()除外),也就是只覆盖该类的__init__()和run()方法。


Thread类主要方法:

start():开始线程的活动。每个线程对象必须只能调用它一次。

run():表示线程活动的方法,可以在子类中覆盖这个方法。

join([timeout]):是用来阻塞当前上下文,直至该线程运行结束,一个线程可以被join()多次,timeout单位是秒。

name:一个字符串,只用于标识的目的。它没有语义。多个线程可以被赋予相同的名字。初始的名字通过构造函数设置。

getName()/setName():作用于name的两个函数,从字面就知道是干嘛的,一个是获取线程名,一个是设置线程名

ident:线程的ID,如果线程还未启动则为None,它是一个非零的整数当一个线程退出另外一个线程创建时,线程的ID可以重用,即使在线程退出后,其ID仍然可以访问。

is_alive()/isAlive():判断线程是否还活着。

daemon:一个布尔值,指示线程是(True)否(False)是一个守护线程。它必须在调用start()之前设置,否则会引发RuntimeError。它的初始值继承自创建它的线程;主线程不是一个守护线程,所以在主线程中创建的所有线程默认daemon = False。

    何为守护线程?举个例子,在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,当主线程完成想退出时,会检验子线程是否完成。 对于普通线程,如果子线程的任务没有结束,主线程不会退出,整个程序也不会退出;对于守护线程,即使子线程任务还没有结束,如果主线程退出该线程也会退出。

isDaemon()/setDaemon():作用与daemon的函数,一个是判断是不是守护线程,一个是设置守护线程。


下面通过简单的例子来展示Thread的使用方法,希望对你理解能有帮助,当然更希望你能动手操作下,了解上面方法的用处。

将函数传递到Thread对象

#coding=utf-8
import threading
import datetime
import time
def thread_fun(num):
    time.sleep(num)
    now = datetime.datetime.now()
    print "线程名:%s ,now is %s"          %( threading.currentThread().getName(), now)

def main(thread_num):
    thread_list = list()
    # 先创建线程对象
    for i in range(0, thread_num):
        thread_name = "thread_%s" %i
        thread_list.append(threading.Thread(target = thread_fun, name = thread_name, args = (2,)))

    # 启动所有线程
    for thread in thread_list:
        thread.setName("good")#修改线程名
        print thread.is_alive()#判断线程是否是活的
        print thread.ident
        thread.start()
        print thread.ident
        print thread.is_alive()
        thread.join()
        print thread.is_alive()

if __name__ == "__main__":
    main(3)


运行结果如下:

技术分享

上面的例子只是展示了几个简单的Thread类的方法,其它方法大家可以自己动手试试,体会下。这里再讲下threading.Thread()

它的原型是:

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={})

group应该为None;被保留用于未来实现了ThreadGroup类时的扩展。

target是将被run()方法调用的可调用对象。默认为None,表示不调用任何东西。

name是线程的名字。默认情况下,以“Thread-N”的形式构造一个唯一的名字,N是一个小的十进制整数。

args是给调用目标的参数元组。默认为()。

kwargs是给调用目标的关键字参数的一个字典。默认为{}。


继承自threading.Thread类:

继承的话,主要是重定义__init__()和run()。

#coding=utf-8
import threading

class MyThread(threading.Thread):
    def __init__(self,group=None,target=None,name=None,args=(),kwargs=None):
        threading.Thread.__init__(self,group,target,name,args,kwargs)

        self.name = "Thread_%s"%threading.active_count()

    def run(self):
        print  "I am %s" %self.name


if __name__ == "__main__":

    for thread in range(0, 5):
        t = MyThread()
        t.start()

运行结果就不贴出来了。

(二)Lock互斥锁

    如果多个线程访问同时同一个资源,那么就有可能对这个资源的安全性造成破坏,还好Python的threading模块中引入了互斥锁对象,可以保证共享数据操作的完整性。每个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象这时候可以使用threading模块提供的Lock类。它主要有两个方法:acquire()和 release()

acquire(blocking=True, timeout=-1)

    获取锁,设置为locked状态,blocking参数表示是否阻塞当前线程等待,timeout表示阻塞时的等待时间 。如果成功地获得lock,则acquire()函数返回True,否则返回False,timeout超时时如果还没有获得lock仍然返回False。

release()

    释放锁,设置为unlocked状态,当在未锁定的锁上调用时,会引发ThreadError。没有返回值。

下面的代码是没有设锁时,100个线程访问一个资源,从运行结果来看对资源的完整性造成了一定的影响。

#coding=utf-8
import threading
import time
num = 0
class MyThread(threading.Thread):
    def __init__(self,target=None,name=None,args=(),kwargs=None):
        threading.Thread.__init__(self,target=target,name=name,args=args,kwargs=kwargs)
    def run(self):
        global num
        time.sleep(1)#这句不能少,要不然看不到对公共资源完整性的破坏,当然也可以将该句放在
        #num += 1的后面,有兴趣可以试试
        num += 1
        self.setName("Thread-%s" % num)
        print("I am %s, set counter:%s" % (self.name, num))
if __name__ == "__main__":
    for i in range(0, 100):
        my_thread = MyThread()
        my_thread.start()

运行结果如下:

技术分享

看看部分数据的完整性遭到了“破坏”

再看看使用互斥锁的例子

#coding=utf-8
import threading
import time
num = 0
lock = threading.Lock()
class MyThread(threading.Thread):
    def __init__(self,target=None,name=None,args=(),kwargs=None):
        threading.Thread.__init__(self,target=target,name=name,args=args,kwargs=kwargs)
    def run(self):
        global num
        time.sleep(1)
        if lock.acquire():
            num += 1
            self.setName("Thread-%s" % num)
        print("I am %s, set counter:%s" % (self.name, num))
        lock.release()
if __name__ == "__main__":
    for i in range(0, 100):
        my_thread = MyThread()
        my_thread.start()

运行结果大家可以动手试试。

同步阻塞

    当一个线程调用Lock对象的acquire()方法获得锁时,这把锁就进入“locked”状态。因为每次只有一个线程可以获得锁,所以如果此时另一个线程试图获得这个锁,该线程就会变为“block“同步阻塞状态。直到拥有锁的线程调用锁的release()方法释放锁之后,该锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行状态。


死锁

是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程


(三)可重入锁RLock

用法和Lock用法一样,只RLock支持嵌套,而Lock不支持嵌套;RLock允许在同一线程中被多次acquire(如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐),而Lock却不允许这种情况,否则会出现死锁。看下面例子:

#coding=utf-8
import threading
import time
num = 0
lock = threading.Lock()
class MyThread(threading.Thread):
    def __init__(self,target=None,name=None,args=(),kwargs=None):
        threading.Thread.__init__(self,target=target,name=name,args=args,kwargs=kwargs)
    def run(self):
        global num
        time.sleep(1)
        if lock.acquire():
            num += 1
            self.setName("Thread-%s" % num)
            print("I am %s, set counter:%s" % (self.name, num))
            if lock.acquire():
                num += 1
                self.setName("Thread-%s" % num)
                print("I am %s, set counter:%s" % (self.name, num))
            lock.release()

        lock.release()
if __name__ == "__main__":
    for i in range(0, 100):
        my_thread = MyThread()
        my_thread.start()

上面的例子使用Lock锁嵌套使用,运行后就出现了死锁,运行不下去,“卡住了”。

技术分享

这个时候就出现了RLock,它支持在同一线程中可以多次请求同一资源。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。所以将上面的代码中

lock = threading.Lock()

改为

lock = threading.RLock()

就不会出现死锁情况。

(四)条件变量Condition

        除了互斥锁外,对复杂线程同步问题,Python提供了Condition对象来支持解决。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

除了acquire和release方法外还有如下方法:

wait([timeout]):  

  w释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。

notify():

  唤醒一个挂起的线程(如果存在挂起的线程),该方法不会释放所占用的琐。

notify_all()/notifyAll()

  唤醒所有挂起的线程(如果存在挂起的线程),这些方法不会释放所占用的琐。

同时要主要这四个方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。

    Condition其实是一把高级的锁,它内部维护着一个锁对象(默认是RLock),当然你也可以通过它的构造函数传递一个Lock/RLock对象给它。我们可以看看它的__init__()函数的定义:

def __init__(self, lock=None, verbose=None):
        _Verbose.__init__(self, verbose)
        if lock is None:
            lock = RLock()
        self.__lock = lock
        # Export the lock‘s acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self.__waiters = []

是不是一目了然。条件变量的经典问题就是生产者与消费者的例子了。假设某代工厂的两个车间,一个车间生产某种配件,另一个车间需要这些配件进行组装产品,那么前者就是生产者(Producer),后者就是消费者(Consumer),当配件数低于1000就需要生产,多余5000个可以暂停生产,生产者一秒中可生产200个,消费者的策略是剩余数量少于800就不组装了,一秒钟可消费100个。代码如下

#coding=utf-8
import threading
import time
left_num = 500#假设初始剩余数为500
con = threading.Condition()

class Producer(threading.Thread):
    """生产者"""
    def __init__(self, target=None, name=None, args=(), kwargs=None):
        threading.Thread.__init__(self, target=target, name=name, args=args, kwargs=kwargs)
    def run(self):
        global left_num
        is_product = True
        while True:
            if con.acquire():
                if left_num > 5000:#大于5000可以暂停生产
                    is_product = False#不用继续生产
                    con.wait()#该“生产线”挂起
                elif left_num < 1000:
                    is_product = True#继续生产
                    left_num += 200
                    msg = self.name+‘ produce 200, left_num=%s‘%left_num
                    print msg
                    con.notify()
                else:
                    #对于剩余数量处在中间的,就要分情况讨论了,一旦开启“生产”就要等
                    #到生产到指定数目才能停止,仔细想想很好理解的
                    if is_product:
                        left_num += 200
                        msg = self.name + ‘ produce 200, left_num=%s‘ % left_num
                        print msg
                        con.notify()
                    else:
                        con.wait()
                con.release()
                time.sleep(1)

class Consumer(threading.Thread):
    """消费者"""
    def __init__(self, target=None, name=None, args=(), kwargs=None):
        threading.Thread.__init__(self, target=target, name=name, args=args, kwargs=kwargs)
    def run(self):
        global left_num
        while True:
            if con.acquire():
                if left_num < 800:#少于800不组装
                    con.wait()
                else:
                    left_num -= 100
                    msg = self.name+‘ consume 100, left_num=%s‘%left_num
                    print msg
                    con.notify()
                con.release()
                time.sleep(1)


def test():
    for i in range(1):
        p = Producer()
        p.start()
    for i in range(1):
        c = Consumer()
        c.start()
if __name__ == ‘__main__‘:
    test()

        大家可以把代码复制下来运行看看,仔细体会下,当累计剩余数大于5000后,就停止了“生产",之后只有消费者在运作,当剩余数低于1000,生产者”生成线"重启生产,直到累计剩余数大于5000,后又停止生产,以此类推。示例中展示的都是生产者与消费者都只有一条”流水线”,实际上,生产者有多条”流水线",消费者也有多条"流水线“,其实都一样,将上面代码的for循环的范围改下就可以了。还有一点,Condition可以接受一个Lock/RLock对象,大家可以试试创建一个Lock/RLock传递给Condition,如下:

lock = threading.Lock()
con = threading.Condition(lock)


效果是一样的,大家可以动手试试。


Python:线程、进程与协程(2)——threading模块(1)