首页 > 代码库 > 无锁编程实战演练

无锁编程实战演练

前段时间研究过一阵子无锁化编程。刚写了几个简单的程序,来验证了下自己学到的一些概念。

测试场景:假设有一个应用:现在有一个全局变量,用来计数,再创建10个线程并发执行,每个线程中循环对这个全局变量进行++操作(i++),循环加2000000次。

所以很容易知道,这必然会涉及到并发互斥操作。下面通过三种方式来实现这种并发操作。并对比出其在效率上的不同之处。

这里先贴上代码,共5个文件:2个用于做时间统计的文件:timer.h  timer.cpp。这两个文件是临时封装的,只用来计时,可以不必细看。

 timer.h

#ifndef TIMER_H#define TIMER_H#include <sys/time.h>class Timer{public:		Timer();	// 开始计时时间	void Start();	// 终止计时时间	void Stop();	// 重新设定	void Reset();	// 耗时时间	void Cost_time();private:	struct timeval t1;	struct timeval t2;	bool b1,b2;};#endif


timer.cpp

#include "timer.h"#include <stdio.h>Timer::Timer(){	b1 = false;	b2 = false;}void Timer::Start(){	gettimeofday(&t1,NULL);  	b1 = true;	b2 = false;}void Timer::Stop(){		if (b1 == true)	{		gettimeofday(&t2,NULL);  		b2 = true;	}}void Timer::Reset(){		b1 = false;	b2 = false;}void Timer::Cost_time(){	if (b1 == false)	{		printf("计时出错,应该先执行Start(),然后执行Stop(),再来执行Cost_time()");		return ;	}	else if (b2 == false)	{		printf("计时出错,应该执行完Stop(),再来执行Cost_time()");		return ;	}	else	{		int usec,sec;		bool borrow = false;		if (t2.tv_usec > t1.tv_usec)		{			usec = t2.tv_usec - t1.tv_usec;		}		else		{			borrow = true;			usec = t2.tv_usec+1000000 - t1.tv_usec;		}		if (borrow)		{			sec = t2.tv_sec-1 - t1.tv_sec;		}		else		{			sec = t2.tv_sec - t1.tv_sec;		}		printf("花费时间:%d秒 %d微秒\n",sec,usec);	}}


传统互斥量加锁方式 lock.cpp

#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <time.h>#include "timer.h"pthread_mutex_t mutex_lock;static volatile int count = 0;void *test_func(void *arg){        int i = 0;        for(i = 0; i < 2000000; i++)		{                pthread_mutex_lock(&mutex_lock);                count++;                pthread_mutex_unlock(&mutex_lock);        }        return NULL;}int main(int argc, const char *argv[]){	Timer timer; // 为了计时,临时封装的一个类Timer。	timer.Start();	// 计时开始	pthread_mutex_init(&mutex_lock, NULL);	pthread_t thread_ids[10];	int i = 0;	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)	{		pthread_create(&thread_ids[i], NULL, test_func, NULL);	}	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)	{		pthread_join(thread_ids[i], NULL);	}	timer.Stop();// 计时结束	timer.Cost_time();// 打印花费时间	printf("结果:count = %d\n",count);	return 0;}


no lock 不加锁的形式 nolock.cpp

#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <unistd.h>#include <time.h>#include "timer.h"int mutex = 0;int lock = 0;int unlock = 1;static volatile int count = 0;void *test_func(void *arg){        int i = 0;        for(i = 0; i < 2000000; i++)	{		while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);		 count++;		 __sync_bool_compare_and_swap (&mutex, unlock, 0);        }        return NULL;}int main(int argc, const char *argv[]){	Timer timer;	timer.Start();	pthread_t thread_ids[10];	int i = 0;	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)	{			pthread_create(&thread_ids[i], NULL, test_func, NULL);	}	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)	{			pthread_join(thread_ids[i], NULL);	}	timer.Stop();	timer.Cost_time();	printf("结果:count = %d\n",count);	return 0;}


原子函数进行统计方式 atomic.cpp

#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <unistd.h>#include <time.h>#include "timer.h"static volatile int count = 0;void *test_func(void *arg){        int i = 0;        for(i = 0; i < 2000000; i++)        {	        __sync_fetch_and_add(&count, 1);        }        return NULL;}int main(int argc, const char *argv[]){	Timer timer;	timer.Start();	pthread_t thread_ids[10];	int i = 0;	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){			pthread_create(&thread_ids[i], NULL, test_func, NULL);	}	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){			pthread_join(thread_ids[i], NULL);	}	timer.Stop();	timer.Cost_time();	printf("结果:count = %d\n",count);    return 0;}


#################################################################3

好,代码粘贴完毕。下面进入测试环节:

编译:

[adapter@ZHEJIANG test3]$ g++ lock.cpp ./timer.cpp  -lpthread -o lock ;
[adapter@ZHEJIANG test3]$ g++ nolock.cpp ./timer.cpp  -lpthread -o nolock ;
[adapter@ZHEJIANG test3]$  g++ atomic.cpp ./timer.cpp  -lpthread -o atomic ;

 

每一个线程循环加2000000次。
第一组测验
[adapter@ZHEJIANG test3]$ ./lock
花费时间:3秒 109807微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花费时间:7秒 595784微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./atomic
花费时间:0秒 381022微秒
结果:count = 20000000

结论:
可以看出,原子操作函数的速度是最快的,其他两种方式根本就没法比。而无锁操作是在原子操作函数的基础上形成的。
为什么无锁操作的效率会这么低?如果效率低的话,那还有什么意义,为什么现在大家都提倡无锁编程呢?为什么?咱先不
解释,先用数据说话。

第二组测验:
原子操作代码不变,加锁操作代码不变。改动一下无锁操作的代码。
将如下代码更改
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ));
更改后:while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )) usleep(1);
让他睡一微秒。
为什么要这样改代码?这样启不是会更慢?你的猜测是不无道理的,但是一个不休息的人干的活未必比有休息的人干的活多。

[adapter@ZHEJIANG test3]$ ./lock
花费时间:2秒 970773微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花费时间:0秒 685404微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./atomic
花费时间:0秒 380675微秒
结果:count = 20000000

结论:
不用明说,大家看到的结果是不是很诧异?是不是!有木有!怎么会是这样。无锁加上usleep(1),睡一会,反而会变得这么快。
虽和原子操作相比次了一点,但已经甩开有锁同步好几条街了,无锁比有锁快是应该的,但为什么睡一会会更快,不睡就比有锁
还慢那么多呢?怎么回事。是不是这个测试的时候cpu出现了不稳定的事情。
那好,那再测试几次。
[adapter@ZHEJIANG test3]$ ./nolock
花费时间:0秒 684938微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花费时间:0秒 686039微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花费时间:0秒 685928微秒
结果:count = 20000000

现在总没话可说了,这是事实!但为什么,我也不会解释。

很好奇,为什么越休息,效率越高。电脑是机器,它可不是人。怎么会这样?
那我就让它多休息一会:
 while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10); //之前是1,现在改成10了。
 下面就再单独对比一个nolock无锁方式。
[adapter@ZHEJIANG test3]$ ./nolock //usleep(1);
花费时间:0秒 686039微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock //usleep(10);
花费时间:0秒 680307微秒
结果:count = 20000000
nolock,结果usleep(10)居然比uleep(1)还要快一点。

那么这样呢:
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100); //之前是10,现在改成100了。
[adapter@ZHEJIANG test3]$ ./nolock //usleep(100)
花费时间:0秒 661935微秒
结果:count = 20000000
还是睡的越久,效率越高。

那我再试一下usleep(1000)
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(1000); //之前是100,现在改成1000了。
[adapter@ZHEJIANG test3]$ ./nolock // usleep(1000);
花费时间:0秒 652411微秒
结果:count = 20000000
还是睡的越久,效率越高。

那我再试一下usleep(10000)
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10000); //之前是1000,现在改成10000了。
[adapter@ZHEJIANG test3]$ ./nolock
花费时间:0秒 626267微秒
结果:count = 20000000
还是睡的越久,效率越高。

那我再试一下usleep(100000)
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000); //之前是10000,现在改成100000了,也就是0.1秒。
[adapter@ZHEJIANG test3]$ ./nolock
花费时间:0秒 942445微秒
结果:count = 20000000
哦,现在开始速度慢了。

执行环境:
gcc版本信息:
[adapter@ZHEJIANG test3]$ g++ -v
Using built-in specs.
Target: x86_64-redhat-linux
gcc version 4.4.5 20110214 (Red Hat 4.4.5-6) (GCC)

cpu信息:
[adapter@ZHEJIANG test3]$ cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c
      4  Intel(R) Core(TM) i5-3470 CPU @ 3.20GHz

通过编程测试及测试得出结论:
1、如果是想用全局变量来做统计操作。而又不得不考虑多线程间的互斥访问的话,最好使用编译器支持的原子操作函数。
再满足互斥访问的前提下,编程最简单,效率最高。
2、lock-free,无锁编程方式确实能够比传统加锁方式效率高,经上面测试可以发现,可以快到5倍左右。所以在高并发程序中
采用无锁编程的方式可以进一步提高程序效率。
3、但是,得对无锁方式有足够熟悉的了解,不然效率反而会更低。而且容易出错。
4、没想明白的疑问:为什么上面的循环检测时,加uleep比不加,效率更高。为什么在一定程度上,usleep越久效率越高?
请高手路过的时候,为小弟解答一下。谢谢。