首页 > 代码库 > Qemu线程池介绍

Qemu线程池介绍

有时我们希望把一部分工作通过创建线程的方式异步执行,这样我们可以在执行任务的同时,继续执行其他任务。但是如果这种需求比较多的话,频繁的创建和销毁线程带来很大的性能损耗。如果我们能创建一个或一些线程,然后重复使用它们,就可以避免这个问题。

Qemu的实现

qemu模仿glib实现了线程池的功能,目前qemu中线程池主要应用在raw文件的支持上,当linux-aio不可用时,就像glibc,通过线程实现aio机制。我们也可看到,代表线程池中的线程成员的数据结构 ThreadPoolElement 就包含了用来描述aio的BlockAIOCB结构。相关数据结构如下:

首先看一下线程池相关的数据结构:

typedef struct AIOCBInfo {

    void (*cancel_async)(BlockAIOCB *acb);

    AioContext *(*get_aio_context)(BlockAIOCB *acb);

    size_t aiocb_size;

} AIOCBInfo;

struct BlockAIOCB {

    const AIOCBInfo *aiocb_info;

    BlockDriverState *bs;

    BlockCompletionFunc *cb;

    void *opaque;

    int refcnt;

};

struct ThreadPoolElement {

    BlockAIOCB common;

    ThreadPool *pool;

    ThreadPoolFunc *func;

    void *arg;

    /* Moving state out of THREAD_QUEUED is protected by lock.  After

     * that, only the worker thread can write to it.  Reads and writes

     * of state and ret are ordered with memory barriers.

     */

    enum ThreadState state;

    int ret;

    /* Access to this list is protected by lock.  */

    QTAILQ_ENTRY(ThreadPoolElement) reqs;

    /* Access to this list is protected by the global mutex.  */

    QLIST_ENTRY(ThreadPoolElement) all;

};

struct ThreadPool {

    AioContext *ctx;

    QEMUBH *completion_bh;

    QemuMutex lock;

    QemuCond worker_stopped;

    QemuSemaphore sem;

    int max_threads;

    QEMUBH *new_thread_bh;

    /* The following variables are only accessed from one AioContext. */

    QLIST_HEAD(, ThreadPoolElement) head;

    /* The following variables are protected by lock.  */

    QTAILQ_HEAD(, ThreadPoolElement) request_list;

    int cur_threads;

    int idle_threads;

    int new_threads;     /* backlog of threads we need to create */

    int pending_threads; /* threads created but not running yet */

    bool stopping;

};

ThreadPool 数据结构负责维护线程池里面的线程成员,线程的创建是通过下半部来实现的;ThreadPool 中有5个负责维护不同状态下的线程成员的计数器,max_threads负责统计线程池中允许创建的线程的最大值; new_threads负责统计需要创建的线程数;pending_threads负责统计已创建但还没有运行的线程数;idle_threads负责统计空闲的线程数;cur_threads负责统计当前线程池中线程的个数;注意cur_threads包含new_threads中尚未创建的线程。

线程池创建

首先通过thread_pool_new函数为特定的AioContext实例创建一个新的线程池。在这个函数中初始化ThreadPool数据结构的各个成员,包括负责创建新线程的new_thread_bh和线程执行完毕后用来调度执行任务完成回调函数的completion_bh。

任务提交

我们通过调用thread_pool_submit_aio函数来提交任务,这个函数的原型是:

BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,

ThreadPoolFunc *func, void *arg,

BlockCompletionFunc *cb, void *opaque)

这个函数为提交的任务创建一个ThreadPoolElement实例添加到ThreadPool中,同时调用spawn_thread函数来创建一个qemu线程。spawn_thread函数是通过调度pool->new_thread_bh来创建qemu线程的。

线程执行

创建的线程执行worker_thread函数,这个函数从pool->request_list链表中取下第一个ThreadPoolElement节点,执行其任务函数,然后调度执行pool->completion_bh;这个bh遍历pool->head链表,根据其ThreadPoolElement成员的状态来决定是否执行实例上注册的完成回调函数。

线程执行完一个任务后,也就是一个ThreadPoolElement实例被执行后,线程就出在idle状态,等待下一个任务提交动作。任务提交与线程执行之间的同步是通过pool->sem来实现的。thread_pool_submit_aio中任务提交后会调用qemu_sem_post(&pool->sem)来增加pool->sem的计数,worker_thread在pool->sem上醒来后从pool->request_list链表上获取下一个要执行的ThreadPoolElement节点。

总结

线程池计数提供了线程重复使用的功能,这在qemu有大量io操作的时候提高了性能;同时,也提供了除了linux-aio之外的aio实现。

 

参考:

https://developer.gnome.org/glib/2.46/glib-Thread-Pools.html

Qemu线程池介绍