首页 > 代码库 > Linux tasklet 和workqueue学习

Linux tasklet 和workqueue学习

中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时会到来,如果关中断的时间太长,CPU就不能及时响应其他的中断请求,从而造成中断的丢失。因此,Linux内核的目标就是尽可能快的处理完中断请求,尽其所能把更多的处理向后推迟。例如,假设一个数据块已经达到了网线,当中断控制器接受到这个中断请求信号时,Linux内核只是简单地标志数据到来了,然后让处理器恢复到它以前运行的状态,其余的处理稍后再进行(如把数据移入一个缓冲区,接受数据的进程就可以在缓冲区找到数据)。因此,内核把中断处理分为两部分:上半部(tophalf)和下半部(bottomhalf),上半部(就是中断服务程序)内核立即执行,而下半部(就是一些内核函数)留着稍后处理,

首先,一个快速的“上半部”来处理硬件发出的请求,它必须在一个新的中断产生之前终止。通常,除了在设备和一些内存缓冲区(如果你的设备用到了DMA,就不止这些)之间移动或传送数据,确定硬件是否处于健全的状态之外,这一部分做的工作很少。

下半部运行时是允许中断请求的,而上半部运行时是关中断的,这是二者之间的主要区别。

但是,内核到底什时候执行下半部,以何种方式组织下半部?这就是我们要讨论的下半部实现机制,这种机制在内核的演变过程中不断得到改进,在以前的内核中,这个机制叫做bottomhalf(简称bh),在2.4以后的版本中有了新的发展和改进,改进的目标使下半部可以在多处理机上并行执行,并有助于驱动程序的开发者进行驱动程序的开发。下面介绍常用的小任务(Tasklet)机制及2.6内核中的工作队列机制。

1.Tasklet

这里的tasklet是指对要推迟执行的函数进行组织的一种机制。其数据结构为tasklet_struct,每个结构代表一个独立的小任务,其定义如下:

struct tasklet_struct {
struct tasklet_struct *next;         /*指向链表中的下一个结构*/
          unsignedlong state;                /* 小任务的状态*/
          atomic_tcount;        /* 引用计数器*/
          void(*func) (unsigned long);                /* 要调用的函数*/
          unsignedlong data;                 /* 传递给函数的参数*/
};
结构中的func域就是下半部中要推迟执行的函数,data是它唯一的参数。
State域的取值为TASKLET_STATE_SCHED或TASKLET_STATE_RUN。TASKLET_STATE_SCHED表示小任务已被调度,正准备投入运行,TASKLET_STATE_RUN表示小任务正在运行。TASKLET_STATE_RUN只有在多处理器系统上才使用,单处理器系统什么时候都清楚一个小任务是不是正在运行(它要么就是当前正在执行的代码,要么不是)。Count域是小任务的引用计数器。如果它不为0,则小任务被禁止,不允许执行;只有当它为零,小任务才被激活,并且在被设置为挂起时,小任务才能够执行。

1)声明和使用小任务大多数情况下,为了控制一个寻常的硬件设备,小任务机制是实现下半部的最佳选择。

小任务可以动态创建,使用方便,执行起来也比较快。

我们既可以静态地创建小任务,也可以动态地创建它。选择那种方式取决于到底是想要对小任务进行直接引用还是一个间接引用。

如果准备静态地创建一个小任务(也就是对它直接引用),使用下面两个宏中的一个:
DECLARE_TASKLET(name,func, data)
DECLARE_TASKLET_DISABLED(name,func, data)
这两个宏都能根据给定的名字静态地创建一个tasklet_struct结构。当该小任务被调度以后,给定的函数func会被执行,它的参数由data给出。这两个宏之间的区别在于引用计数器的初始值设置不同。第一个宏把创建的小任务的引用计数器设置为0,因此,该小任务处于激活状态。另一个把引用计数器设置为1,所以该小任务处于禁止状态例如:

DECLARE_TASKLET(my_tasklet,my_tasklet_handler, dev);
这行代码其实等价于
struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0), tasklet_handler,dev};
这样就创建了一个名为my_tasklet的小任务,其处理程序为tasklet_handler,并且已被激活。当处理程序被调用的时候,dev就会被传递给它。

 

2)  编写自己的小任务处理程序小任务处理程序必须符合如下的函数类型:
void tasklet_handler(unsigned long data)
由于小任务不能睡眠,因此不能在小任务中使用信号量或者其它产生阻塞的函数。但是小任务运行时可以响应中断。


3)调度自己的小任务通过调用tasklet_schedule()函数并传递给它相应的tasklt_struct指针,该小任务就会被调度以便适当的时候执行:
tasklet_schedule(&my_tasklet);        /*把my_tasklet标记为挂起 */
在小任务被调度以后,只要有机会它就会尽可能早的运行。在它还没有得到运行机会之前,如果一个相同的小任务又被调度了,那么它仍然只会运行一次。
可以调用tasklet_disable()函数来禁止某个指定的小任务。如果该小任务当前正在执行,这个函数会等到它执行完毕再返回。

调用tasklet_enable()函数可以激活一个小任务,如果希望把以DECLARE_TASKLET_DISABLED()创建的小任务激活,也得调用这个函数,如:
tasklet_disable(&my_tasklet);        /*小任务现在被禁止,这个小任务不能运行*/
tasklet_enable(&my_tasklet);        /*  小任务现在被激活*/
也可以调用tasklet_kill()函数从挂起的队列中去掉一个小任务。该函数的参数是一个指向某个小任务的tasklet_struct的长指针。在小任务重新调度它自身的时候,从挂起的队列中移去已调度的小任务会很有用。这个函数首先等待该小任务执行完毕,然后再将它移去。

#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/init.h>
#include <linux/interrupt.h>

//define a devid
static int mydev=1119;
static int irq;
static char* devname=NULL;
//define arguments for this module
module_param(irq,int,0644);
module_param(devname,charp,0644);

//define a argument of tasklet struct
static struct tasklet_struct mytasklet;

static void mytasklet_handler(unsigned long data)
{
    printk("This is tasklet handler../n");
}

static irqreturn_t myirq_handler(int irq,void* dev)
{
    static int count=0;
    if(count<10)
    {
        printk("-----------%d start--------------------------/n",count+1);
        printk("The interrupt handeler is working../n");
        printk("The most of interrupt work will be done by following tasklet../n");
        tasklet_init(&mytasklet,mytasklet_handler,0);
        tasklet_schedule(&mytasklet);
        printk("The top half has been done and bottom half will be processed../n");
    }
    count++;
          return IRQ_HANDLED;
}

static int __init mytasklet_init()
{
    //request a irq

    printk("My module is working../n");
    if(request_irq(irq,myirq_handler,IRQF_SHARED,devname,&irq)!=0)
    {
        printk("tasklet_init:can not request irq %d for %s..",irq,devname);
        return -1;
    }
    printk("%s request irq:%d success../n",devname,irq);
    return 0;
}

static void __exit mytasklet_exit()
{
    printk("My module is leaving../n");
    free_irq(irq,&irq);
    printk("Free the irq %d../n",irq);
}

module_init(mytasklet_init);
module_exit(mytasklet_exit);
MODULE_LICENSE("GPL");

2.Workquue

工作队列(work queue)是另外一种将工作推后执行的形式,它和前面讨论的tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。

那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。

1).工作、工作队列和工作者线程

如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,这些工作以队列结构组织成工作队列(workqueue),其数据结构workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events,自己也可以创建自己的工作者线程。

struct workqueue_struct *create_workqueue(const char *name);
struct workqueue_struct *create_singlethread_workqueue(const char *name);
void destroy_workqueue(struct workqueue_struct *queue);

 

每个工作队列有一个或多个专用的进程("内核线程"), 这些进程运行提交给这个队列的函数。 若使用 create_workqueue, 就得到一个工作队列它在系统的每个处理器上有一个专用的线程。在很多情况下,过多线程对系统性能有影响,如果单个线程就足够则使用 create_singlethread_workqueue 来创建工作队列。当用完一个工作队列,可以使用destroy_workqueque去掉它。

int queue_work(struct workqueue_struct *queue, struct work_struct *work);
int queue_delayed_work(struct workqueue_struct *queue, struct work_struct *work, unsigned long delay);
从一个工作队列对工作进行排队执行的函数.


int cancel_delayed_work(struct work_struct *work);
void flush_workqueue(struct workqueue_struct *queue);
使用 cancel_delayed_work 来从一个工作队列中去除入口; flush_workqueue 确保没有工作队列入口在系统中任何地方运行.

2).表示工作的数据结构

工作用<linux/workqueue.h>中定义的work_struct结构表示:

struct work_struct{

unsigned long pending; /* 这个工作正在等待处理吗?*/

struct list_head entry; /* 连接所有工作的链表 */

void (*func) (void *); /* 要执行的函数 */

void *data; /* 传递给函数的参数 */

void *wq_data; /* 内部使用 */

struct timer_list timer; /* 延迟的工作队列所用到的定时器 */

};

这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。

3). 创建推后的工作

要使用工作队列,首先要做的是创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地建该结构:

DECLARE_WORK(name, void (*func) (void *), void *data);

这样就会静态地创建一个名为name,待执行函数为func,参数为data的work_struct结构。

同样,也可以在运行时通过指针创建一个工作:

INIT_WORK(struct work_struct *work, woid(*func) (void *), void *data);

这会动态地初始化一个由work指向的工作。

4). 工作队列中待执行的函数

工作队列待执行的函数原型是:

void work_handler(void *data)

这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。

5). 对工作进行调度

现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给系统默认的events工作线程,只需调用

schedule_work(&work);

work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。

有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:

schedule_delayed_work(&work, delay);

这时,&work指向的work_struct直到delay指定的时钟节拍用完以后才会执行。

 

#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/init.h>
#include <linux/interrupt.h>

//define a devid
static int mydev=1119;
static int irq;
static char* devname=NULL;
//define arguments for this module
module_param(irq,int,0644);
module_param(devname,charp,0644);

//define a argument of work struct
static struct work_struct mywork;

static void mywork_handler(unsigned long data)
{
    printk("This is workhandler../n");
}

static irqreturn_t myirq_handler(int irq,void* dev)
{
    static int count=0;
    if(count<10)
    {
        printk("-----------%d start--------------------------/n",count+1);
        printk("The interrupt handeler is working../n");
        printk("The most of interrupt work will be done by following work../n");
        INIT_WORK(&mywork,mywork_handler,0);
        schedule_work(&mywork);
        printk("The top half has been done and bottom half will be processed../n");
    }
    count++;
    return IRQ_HANDLED;
}

static int __init mywork_init()
{
    //request a irq

    printk("My module is working../n");
    if(request_irq(irq,myirq_handler,IRQF_SHARED,devname,&irq)!=0)
    {
        printk("work_init:can not request irq %d for %s..",irq,devname);
        return -1;
    }
    printk("%s request irq:%d success../n",devname,irq);
    return 0;
}

static void __exit mywork_exit()
{
    printk("My module is leaving../n");
    free_irq(irq,&irq);
    printk("Free the irq %d../n",irq);
}

module_init(mywork_init);
module_exit(mywork_exit);
MODULE_LICENSE("GPL");

3.区别

tasklet

Workqueue

处于atomic context,不能sleep

不处于atomic context,可以sleep

处于中断上下文,OS不可以进行进程调度

处于进程上下文,OS可以进行进程调度

运行调度它们的同一个CPU上

默认同一个CPU上

不能指定确定时间进行调度

不能指定确定时间进行调度或者指定至少延时一个确定时间后调度

只能交给ksoftirqd/0

可以提交给events/0,也可以提交给自定义的workqueue

tasklet与workqueue的不同应用环境总结如下:

(1)必须立即进行紧急处理的极少量任务放入在中断的顶半部中,此时屏蔽了与自己同类型的中断,由于任务量少,所以可以迅速不受打扰地处理完紧急任务。

(2) 需要较少时间的中等数量的急迫任务放在tasklet中。此时不会屏蔽任何中断(包括与自己的顶半部同类型的中断),所以不影响顶半部对紧急事务的处理;同时又不会进行用户进程调度,从而保证了自己急迫任务得以迅速完成。

(3)需要较多时间且并不急迫(允许被操作系统剥夺运行权)的大量任务放在workqueue中。此时操作系统会尽量快速处理完这个任务,但如果任务量太大,期间操作系统也会有机会调度别的用户进程运行,从而保证不会因为这个任务需要运行时间将其它用户进程无法进行。

(4)可能引起睡眠的任务放在workqueue中。因为在workqueue中睡眠是安全的。

 

Linux tasklet 和workqueue学习