首页 > 代码库 > java并发基础
java并发基础
《java并发编程实战》终于读完4-7章了,感触很深,但是有些东西还没有吃透,先把已经理解的整理一下。《java并发编程实战》笔记(一)是对前3章的总结。这里总结一下第5章的东西,为什么跳过第4章?不告诉你。
一,阻塞队列和生产者-消费者模式
java中的阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。类库中包含了BlockingQueue的多种实现,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列(先进先出),PriorityBlockingQueue是一个按优先级排序的队列。
阻塞队列支持生产者-消费者模式,该模式将“找出需要完成的工作”和“执行工作”这两个过程分离开来;并把工作项放入一个“待完成”列表中以便在随后处理,而不是找出后立即处理。当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。生产者不需要知道消费者的标识和数量,或者它们是否是唯一的生产者,而只需将数据放入队列即可。同样,消费者也不需要知道生产者是谁,来自何处。
举个例子说明阻塞队列和生产者-消费者模式如何配合使用:在某个文件层次结构中搜索所有的.java文件。
import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; /** * @描述:生产者 * @author 肖冬 */ public class FileCrawler implements Runnable { private final BlockingQueue<File> fileQueue ; private final FileFilter fileFilter; private final File root; public FileCrawler(BlockingQueue<File> fileQueue,FileFilter fileFilter, File root) { super(); this.fileQueue = fileQueue; this.fileFilter = fileFilter; this.root = root; } @Override public void run() { try { crawl(root); } catch (InterruptedException e) { //使线程恢复中断状态,为什么这么做:因为线程一旦抛出中断异常,就会重置中断状态 Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException{ File[] entries = root.listFiles(fileFilter); if (entries !=null) { for (File entry : entries) { if (entry.isDirectory()) { crawl(entry); }else{ //生产者将任务放到阻塞队列 fileQueue.put(entry); } } } } }
import java.io.File; import java.util.concurrent.BlockingQueue; /** * @描述:消费者 * @author 肖冬 */ public class Indexer implements Runnable { private final BlockingQueue<File> queue; public Indexer(BlockingQueue<File> queue) { this.queue = queue; } @Override public void run() { try{ while (true) { indexFile(queue.take()); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } private void indexFile(File f){ System.out.println(f.getName()); } }
import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.junit.Test; /** * @描述:junit测试 * @author 肖冬 */ public class TestMain { @Test public void testFileFilter(){ File root = new File("D://test"); FileFilter fileFilter = new FileFilter() { @Override public boolean accept(File f) { if (f.isDirectory()) return true; if (f.isFile()) { String name = f.getName(); if (name.endsWith(".java")) return true; else return false; } return false; } }; BlockingQueue<File> queue = new LinkedBlockingQueue<File>(); //生产者 new Thread(new FileCrawler(queue,fileFilter, root)).start(); //消费者 new Thread(new Indexer(queue)).start(); } }
生产者将符合条件的文件放入阻塞队列,消费者处理任务时只从阻塞队列中去任务就可以了,不必关心生产者。
生产者-消费者模式同样能带来许多性能优势。生产者和消费者可以并发的执行。如果一个是I/O密集型,另一个是CPU密集型,那么并发执行的吞吐率要高于串行执行的吞吐率。
二,同步工具类
同步工具类可以是任何一个对象,只要它根据自身的状态来协调线程间的控制流,比如阻塞队列,应为take和put等方法将阻塞,直到队列达到期望的状态(队列即非空,也非满)。
除了阻塞队列,其他类型的同步容器还包括:闭锁,栅栏,信号量。
1.闭锁
我自己理解闭锁的意思,它可以让某些动作一起开始,比如,让多个线程一起执行,并等待最慢的线程执行完毕。代码如下:
import java.util.concurrent.CountDownLatch; public class Harness { public long timeTasks(int nThreads,final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread(){ @Override public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch(InterruptedException e){} } }; t.start(); } long startTime = System.nanoTime(); startGate.countDown(); endGate.await(); long endTime = System.nanoTime(); return endTime - startTime; } }
例子中的CountDownLatch是一种灵活的闭锁实现,它可以使一个或者多个线程等待一组事件发生。闭锁包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或等待中的线程中断,或等待超时。countDown和await方法是配合使用的。
Harness创建一定数量的线程,利用它们并发的执行指定任务,它使用两个闭锁,分别表示起始门和结束门,起始门的初始值为1,而结束门的计数器初始值为工作线程的数量,每个线程首先要做的事就是在启动门上等待,确保所有线程同时执行。而每个线程要做的最后一件事是调用结束门的countDown方法减1,使主线程高效的等待直到所有线程都执行完成,然后统计耗时。
2.信号量
计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。
例子:有固定大小的HashSet,如果容器满了,容器将处于阻塞状态。
import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Semaphore; public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<T>()); sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException{ sem.acquire();//获得一个许可 boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally{ if (!wasAdded) { sem.release();//将许可返回给信号量 } } } public boolean remove(T o){ boolean wasRemoved = set.remove(o); if (wasRemoved) { sem.release();//将许可返回给信号量 } return wasRemoved; } }
Semaphore中管理着一组虚拟的许可,许可的虚拟数量可通过构造函数指定,在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用后返还许可。如果没有许可,那么acquire方法将阻塞,直到有许可为止(或者直到被中断或者操作超时),release方法将返还一个许可给信号量。所以例子中的如果容器满了,再add元素的时候出现的阻塞状态,实际上是Semaphore的acquire处于阻塞状态,将线程挂起,直到信号量中有新的许可为止。
三,构建高效且可伸缩的结果缓存
几乎所有的服务器应用程序都会使用某种形式的缓存,重用之前的计算结果能降低延迟,提高吞吐量,但却需要消耗更多的内存。我们从简单的HashMap开始,然后分析它的并发性缺陷,并讨论如何修复它们。
public interface Computable<A, V> { V compute(A arg) throws InterruptedException; }
import java.math.BigInteger; public class ExpensiveFunction implements Computable<String, BigInteger> { @Override public BigInteger compute(String arg) throws InterruptedException { //这里模拟耗时的计算 return new BigInteger(arg); } }
import java.util.HashMap; import java.util.Map; public class Memoizer1<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap<A, V>(); private Computable<A, V> c; public Memoizer1(Computable<A, V> c) { super(); this.c = c; } @Override public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg,result); } return result; } }
Memoizer1给出了第一种尝试:使用HashMap保存之前的结果,compute方法将首先检查需要的结果是否已经在缓存中,如果存在则返回之前计算的值,否则,计算结果并保存到缓存中,再返回。
HashMap不是线程安全的,因此要确保两个线程不会同时访问HashMap,Memoizer1用了一种保守的方法,即对整个compute方法进行同步,这种方法能确保线程安全性,但会带来一个明显的可伸缩性的问题:每次只有一个线程能执行compute方法。如果另一个线程正在计算结果,那么其他调用compute的线程可能会被阻塞很长时间。如果有多个线程在排队等待还未计算出的结果,那么,compute方法的计算时间可能比没有使用缓存的计算时间更长。
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class Memoizer2<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<A, V>(); private Computable<A, V> c; public Memoizer2(Computable<A, V> c) { super(); this.c = c; } @Override public V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg,result); } return result; } }
Memoizer2用ConcurrentHashMap替换HashMap,用于解决多线程访问缓存的安全性。为什么用ConcurrentHashMap而不是HashTable,因为ConcurrentHashMap的并发能力比HashTable强。
但是还有一个问题:如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在进行。那么很可能会重复这个计算。我们希望通过某种方式来表达“线程X正在进行计算f(27)”这种情况,这样当另一个线程查找f(27)时,它能够直到最高效的方法是等待线程X计算结束,然后去缓存中直接拿结果,怎么解决这个问题呢?有一个类能基本实现这个功能:FutureTask。
FutureTask表示一个计算的过程,这个过程可能已经计算完成,也可能正在进行。如果有结果可用,那么FutureTask.get将立即返回结果,否则它会一直阻塞,直到结果计算出来再将其返回。
import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; public class Memoizer3<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private Computable<A, V> c; public Memoizer3(Computable<A, V> c) { super(); this.c = c; } @Override public V compute(final A arg) throws InterruptedException { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { @Override public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = ft; cache.put(arg, f); ft.run();//在这里调用c.compute } try { return f.get(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } }
Memoizer3解决了上面提到的问题。若结果已经计算出来,那么将立即返回,如果其他线程正在计算该结果,那么新的线程将一直等待这个结果被计算出来。它只有一个缺陷,即仍然可能存在两个线程同时计算一个相同结果的情况。但是概率已经小了很多了。之所以还会发生,是因为put以后没有进行再判断,每个线程只要判断f==null就立刻开始创建任务。
mport java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; public class Memoizer4<A, V> implements Computable<A, V> { private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private Computable<A, V> c; public Memoizer4(Computable<A, V> c) { super(); this.c = c; } @Override public V compute(final A arg) throws InterruptedException { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { @Override public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(arg, f); if (f== null) { f = ft; ft.run();//在这里调用c.compute } } try { return f.get(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } }
Memoizer4解决了这个问题。至此,支持并发的耗时计算结果的缓存写完了,当然Memoizer4还有其他的问题,比如缓存清理的问题,但是已经可以用了。
java并发基础