首页 > 代码库 > java并发深入

java并发深入

对象由数据+行为组成。数据就是字段,行为就是方法。
并发需要保证这些可被多个线程访问的共享对象数据的完整性,以及某些特定完整性语义。
比如一个类有一个字段count=0,两个线程同时对它做加1操作。
这时就有可能发生:
 线程1查询到count为1对其加1。
 线程2查询到count为1,对其加1。

  接着线程1提交,线程2提交。
最终值count还是为1。

也就是说线程1对count的修改丢失了。

解决这个问题,需要加锁。
java提交了内置锁syncronized,以及Lock。
内置锁syncronized,利用monitorEnter及monitorExit两条指令保证数据的可见性与原子性。

比如A类有一个字段count,默认值为0,代码如下:
public class A{
  private int count=0;
 
  public syncronized add(){
     count++;
  }
}

线程一首先调用add方法,这时会发生以下步骤:
1.线程二尝试获取在当前A实例上的锁,没有获取到则阻塞
2.获取到锁后,将count值从主存拷到当前线程工作内存,这时count为0

线程二这时执行add方法,但发现获取不到锁,这时阻塞在那边。

线程一执行完加1后,退出解锁。
这时线程二就可以获取到锁了。

并发中对于某些集合,要使它成为同步类,我们常常使用封装,如下:
class SyncMap{
  Map<String,String> maps=new HashMap<String,String>();
 
  public syncronized V put(K key, V value){
    maps.put(key,value);
  }
}

这样做的优点是不管底层maps有无同步,同步策略是什么,都可以安全的实现同步。

还有一种实现同步的方法,即将需要同步的操作交由已经存在的同步类来做。
考虑上面的count加1操作,如果将count类型改成AtomicInteger,由AtomicInteger实现同步,原子加1操作。

atomic
===============atomic all finish,cost:247,the res:3000000
===============atomic all finish,cost:248,the res:3000000
===============atomic all finish,cost:262,the res:3000000
===============atomic all finish,cost:239,the res:3000000
===============atomic all finish,cost:249,the res:3000000

sync
===============sync all finish,cost:54,the res:3000000
===============sync all finish,cost:45,the res:3000000
===============sync all finish,cost:47,the res:3000000
===============sync all finish,cost:45,the res:3000000
===============sync all finish,cost:49,the res:3000000

测试表明上述对于300个线程,每个线程做10000次加1操作,内置锁syncronized比atomicInteger效率要高


测试代码如下:

public class SyncWithAtomicTest {
	
	private int count=0;
	
	private static final int threadCount=300;
	
	private static final int countNum=10000;
	
	private final AtomicInteger countAtomicInteger=new AtomicInteger(0);

	private static final ExecutorService threadPool=Executors.newFixedThreadPool(threadCount);
	
	private final CountDownLatch latchStart=new CountDownLatch(threadCount);
	
	private final CountDownLatch latchEnd=new CountDownLatch(threadCount);
	
	public synchronized void addWithCountSync(){
		for(int i=0;i<countNum;i++){
			count++;
		}
	}
	
	public void addWithAtomicCount(){
		for(int i=0;i<countNum;i++){
			countAtomicInteger.incrementAndGet();
		}
	}
	
	public static void main(String[] args) throws InterruptedException {
		
		SyncWithAtomicTest obj=new SyncWithAtomicTest();
		
		Long oldTime=System.currentTimeMillis();
		
		for(int i=0;i<threadCount;i++){
			CountTask t=new CountTask();
			t.setTarget(obj);
			
			threadPool.execute(t);
		}
		
		obj.latchEnd.await();
		
		Long endTime=System.currentTimeMillis()-oldTime;
		
//		System.out.println("===============atomic all finish,cost:"+endTime+",the res:"+obj.countAtomicInteger.get());
		
		System.out.println("===============sync all finish,cost:"+endTime+",the res:"+obj.count);
	}
	
	static class CountTask implements Runnable{
		private SyncWithAtomicTest target;

		public void run() {
			try {
				target.latchStart.countDown();
				target.latchStart.await();
				
				//we do add oper when all threads is ready 
				target.addWithCountSync();
				
//				target.addWithAtomicCount();
				
				System.out.println("thread:"+Thread.currentThread().getId()+",finish the work");
				
				target.latchEnd.countDown();
				
				
			} catch (InterruptedException e) {
				e.printStackTrace();
				Thread.currentThread().interrupt();
			}
		}

		public void setTarget(SyncWithAtomicTest target) {
			this.target = target;
		}
	}

}