首页 > 代码库 > 信号量&读写信号量&完成变量

信号量&读写信号量&完成变量

Linux提供两种信号量:

1、内核信号量,由内核控制路径使用

2、System V IPC信号量,由用户态进程使用

从本质上说,它们实现了一个加锁原语,即让等待者睡眠,直到等待的资源变为空闲。

信号量

内核信号量类似于自旋锁,因为当锁关闭着的时候,它不允许内核控制路径继续运行。然而,当内核内核控制路径试图获取内核信号量所保护的忙资源时,相应的进程被挂起。只有在资源被释放时,进程才再次变为可运行的

因此,只有可以睡眠的函数才能获取内核信号量;中断处理程序和可延迟处理函数都不能使用内核信号量。


struct semaphore {
	atomic_t count;
	int sleepers;
	wait_queue_head_t wait;
};
count    为一个atomic_t类型的值。如果该值大于0,那么资源就是空闲的,也就是说,该资源现在可以使用。相反,如果count等于0,那么信号量是忙的,但没有进程等待这个被保护的资源。最后如果count为负数,则资源时不可用的,并至少有一个进程等待资源。

sleepers 存放一个标志,表示是否有一些进程在信号量上睡眠

wait     存放等待队列链表的地址,当前等待资源的所有睡眠进程都放在这个链表中。当然,如果count大于或等于0,等待队列就为空。

可以使用init_MUTEX和init_MUTEX_LOCKED函数来初始化互斥访问所需的信号量:这两个宏分别把count字段设置成1(互斥访问的资源空闲)和0(对信号量进行初始化的进程当前互斥访问的资源忙等)

static inline void init_MUTEX (struct semaphore *sem)
{
	sema_init(sem, 1);
}

static inline void init_MUTEX_LOCKED (struct semaphore *sem)
{
	sema_init(sem, 0);
}

注意,也可以吧信号量中的count初始化为任意的正数值n,在这种情况下,最多有n个进程可以并发地访问这个资源。

获取信号量

当进程希望获取内核信号量时,就调用down()函数。

/arch/i386/kernel/semaphore.c

fastcall void __sched __down(struct semaphore * sem)
{
	struct task_struct *tsk = current;
	DECLARE_WAITQUEUE(wait, tsk);
	unsigned long flags;

	tsk->state = TASK_UNINTERRUPTIBLE;
	spin_lock_irqsave(&sem->wait.lock, flags);
	add_wait_queue_exclusive_locked(&sem->wait, &wait);

	sem->sleepers++;
	for (;;) {
		int sleepers = sem->sleepers;

		/*
		 * Add "everybody else" into it. They aren‘t
		 * playing, because we own the spinlock in
		 * the wait_queue_head.
		 */
		if (!atomic_add_negative(sleepers - 1, &sem->count)) {
			sem->sleepers = 0;
			break;
		}
		sem->sleepers = 1;	/* us - see -1 above */
		spin_unlock_irqrestore(&sem->wait.lock, flags);

		schedule();

		spin_lock_irqsave(&sem->wait.lock, flags);
		tsk->state = TASK_UNINTERRUPTIBLE;
	}
	remove_wait_queue_locked(&sem->wait, &wait);
	wake_up_locked(&sem->wait);
	spin_unlock_irqrestore(&sem->wait.lock, flags);
	tsk->state = TASK_RUNNING;
}
asm(
".section .sched.text\n"
".align 4\n"
".globl __down_failed\n"
"__down_failed:\n\t"
#if defined(CONFIG_FRAME_POINTER)
	"pushl %ebp\n\t"
	"movl  %esp,%ebp\n\t"
#endif
	"pushl %edx\n\t"
	"pushl %ecx\n\t"
	"call __down\n\t"
	"popl %ecx\n\t"
	"popl %edx\n\t"
#if defined(CONFIG_FRAME_POINTER)
	"movl %ebp,%esp\n\t"
	"popl %ebp\n\t"
#endif
	"ret"
);
/include/asm-i386/semaphore.h
static inline void down(struct semaphore * sem)
{
	might_sleep();
	__asm__ __volatile__(
		"# atomic down operation\n\t"
		LOCK "decl %0\n\t"     /* --sem->count */
		"js 2f\n"
		"1:\n"
		LOCK_SECTION_START("")
		"2:\tlea %0,%%eax\n\t"
		"call __down_failed\n\t"
		"jmp 1b\n"
		LOCK_SECTION_END
		:"=m" (sem->count)
		:
		:"memory","ax");
}
分析一下down的实现

首先decl 递减sem->count,然后检查该值是否为负。如果count大于或等于0,当前进程获得资源并继续正常执行。否则,count为负,当前进程必须挂起。把一下寄存器内容保存在栈中,然后调用__down()

从本质上说,__down()函数把当前进程的状态从TASK_RUNNING改变为TASK_UNINTERRUPTIBLE,并把进程放在信号量的等待队列。该函数在访问信号量结构之前,要获得用来保护信号量队列的sem->wait.lock自旋锁(因为等待队列是由中断处理程序和主要内核函数修改的,因此必须对其双向链表进行保护以避免对其同时访问),并禁止本地中断。

最后,__down()函数的主要任务是挂起当前进程(通过调用schedule),直到信号量释放。要牢记如果没有进程在信号量等待队列上睡眠,则信号量的sleeper字段通常是0,否则被置为1。

只有异常处理程序,特别是系统调用服务历程,才可以调用down()函数。

JS  Jump if sign (negative)

JMP Jump

LEA Load effective address

释放信号量

fastcall void __up(struct semaphore *sem)
{
	wake_up(&sem->wait);
}
asm(
".section .sched.text\n"
".align 4\n"
".globl __up_wakeup\n"
"__up_wakeup:\n\t"
	"pushl %edx\n\t"
	"pushl %ecx\n\t"
	"call __up\n\t"
	"popl %ecx\n\t"
	"popl %edx\n\t"
	"ret"
);
static inline void up(struct semaphore * sem)
{
	__asm__ __volatile__(
		"# atomic up operation\n\t"
		LOCK "incl %0\n\t"     /* ++sem->count */
		"jle 2f\n"
		"1:\n"
		LOCK_SECTION_START("")
		"2:\tlea %0,%%eax\n\t"
		"call __up_wakeup\n\t"
		"jmp 1b\n"
		LOCK_SECTION_END
		".subsection 0\n"
		:"=m" (sem->count)
		:
		:"memory","ax");
}
up()函数增加count字段的值,然后检查它的值是否大于0。如果count大于0,说明没有进程在等待队列上睡眠,因此什么事情也不做。否则调用__up()函数以唤醒一个睡眠进程。

Linux中提供信号量的宏

sema_init          初始化信号量

down               获取信号量

down_interruptible 获取信号量,该函数广泛应用在驱动设备程序中,因为如果进程收到了一个信号但在信号量上被阻塞,就允许进程放弃down操作。
                   如果睡眠进程在获得需要的资源之前被一个信号唤醒,那么该函数就会增加count字段的值并返回-EINTR。因此返回-EINTR时,                    设备驱动程序可以放弃I/O操作

down_trylock       获取信号量,非阻塞接口,与down()不同之处在于系统资源繁忙时,该函数会立即返回,而不是让进程去睡眠

up                 释放信号量

读/写信号量

读写信号量类似前面的读写锁,有一点不同:在信号量再次变为打开之前,等待进程挂起而不是自旋。

struct rw_semaphore {
	signed long		count;
#define RWSEM_UNLOCKED_VALUE		0x00000000
#define RWSEM_ACTIVE_BIAS		0x00000001
#define RWSEM_ACTIVE_MASK		0x0000ffff
#define RWSEM_WAITING_BIAS		(-0x00010000)
#define RWSEM_ACTIVE_READ_BIAS		RWSEM_ACTIVE_BIAS
#define RWSEM_ACTIVE_WRITE_BIAS		(RWSEM_WAITING_BIAS + RWSEM_ACTIVE_BIAS)
	spinlock_t		wait_lock;
	struct list_head	wait_list;
#if RWSEM_DEBUG
	int			debug;
#endif
};
count  存放两个16位的计数器。其中最高16位计数器以二进制补码形式存放非等待写者进程的总数(0或1)和等待的写内核控制路径数。最低16位计数器存        放非等待的读者和写者进程的总数。

wait_list 指向等待进程的链表。

wait_lock 一个自旋锁,用于保护等待队列链表和rw_semaphore结构本身。

Linux中提供读写信号量的宏

init_rwsem          初始化读写信号量

down_read           获取读信号量

down_read_trylock   尝试获取读信号量,非阻塞接口

down_write          获取写信号量

down_write_trylock  尝试获取写信号量,非阻塞接口

up_read             释放读信号量

up_write            释放写信号量

downgrade_write     自动把写锁转换为读锁

完成变量(补充原语)

引入补充原语是为了解决多处理器系统上发生的一种微妙的竞争条件,当A进程分配了一个临时变量,把它初始化为关闭的MUTEX,并把地址传递给进程B,然后再A之上调用down(),进程A打算一旦被唤醒就撤销该信号量。随后,运行在不同CPU上的进程B在同一信号量上调用up(),然而,目前up()和down()的实现允许这两个函数在同一个信号量上并发执行。因此,进程A可以被唤醒并撤销临时信号量,而进程B还在运行up()函数,结果,up()可能试图访问一个不存在的数据结构。
如果在内核中一个任务需要发出信号通知另一个任务发生了某个特定事件,利用完成变量(complection variable)是使两个任务得以同步的简单方法。如果一个任务要执行一些工作时,另一个任务就会在完成变量上等待。当这个任务完成工作后,会使用完成变量去唤醒在等待的任务
struct completion {
	unsigned int done;
	wait_queue_head_t wait;
};
void fastcall complete(struct completion *x)
{
	unsigned long flags;

	spin_lock_irqsave(&x->wait.lock, flags);
	x->done++;
	__wake_up_common(&x->wait, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE,
			 1, 0, NULL);
	spin_unlock_irqrestore(&x->wait.lock, flags);
}
与up()对应的函数叫做complete(),该函数接收completion数据结构的地址作为参数,递增done字段,唤醒在wait等待队列上的睡眠的互斥进程。
void fastcall __sched wait_for_completion(struct completion *x)
{
	might_sleep();
	spin_lock_irq(&x->wait.lock);
	if (!x->done) {
		DECLARE_WAITQUEUE(wait, current);

		wait.flags |= WQ_FLAG_EXCLUSIVE;
		__add_wait_queue_tail(&x->wait, &wait);
		do {
			__set_current_state(TASK_UNINTERRUPTIBLE);
			spin_unlock_irq(&x->wait.lock);
			schedule();
			spin_lock_irq(&x->wait.lock);
		} while (!x->done);
		__remove_wait_queue(&x->wait, &wait);
	}
	x->done--;
	spin_unlock_irq(&x->wait.lock);
}
与down()对应的函数叫做wait_for_completion(),该函数接收completion数据结构的地址作为参数,并检查done标志的值。如果该标志的值大于0,wait_for_completion()就终止,因为这说明complete()已经在另一个CPU上运行。否则该函数把current作为一个互斥进程加到等待队列的末尾,并把current置为TASK_UNINTERRUPTIBLE状态让其睡眠。一旦current被唤醒,该函数就把current从等待队列中删除,然后,检查done标志的值:如果等于0函数就结束,否则再次挂起进程。