首页 > 代码库 > Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)
Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)
生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题。有如下几个常见的实现方法:
1. wait()/notify()
2. lock & condition
3. BlockingQueue
下面来逐一分析。
1. wait()/notify()
第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行;这也是最原始的实现。
1 public class WaitNotifyBroker<T> implements Broker<T> { 2 3 private final Object[] items; 4 5 private int takeIndex; 6 private int putIndex; 7 private int count; 8 9 public WaitNotifyBroker(int capacity) { 10 this.items = new Object[capacity]; 11 } 12 13 @SuppressWarnings("unchecked") 14 @Override 15 public T take() { 16 T tmpObj = null; 17 try { 18 synchronized (items) { 19 while (0 == count) { 20 items.wait(); 21 } 22 tmpObj = (T) items[takeIndex]; 23 if (++takeIndex == items.length) { 24 takeIndex = 0; 25 } 26 count--; 27 items.notify(); 28 } 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 } 32 33 return tmpObj; 34 } 35 36 @Override 37 public void put(T obj) { 38 try { 39 synchronized (items) { 40 while (items.length == count) { 41 items.wait(); 42 } 43 44 items[putIndex] = obj; 45 if (++putIndex == items.length) { 46 putIndex = 0; 47 } 48 count++; 49 items.notify(); 50 } 51 } catch (InterruptedException e) { 52 e.printStackTrace(); 53 } 54 55 } 56 57 }
这里利用Array构造一个Buffer去存取数据,并利用count, putIndex和takeIndex来保证First-In-First-Out。
如果利用LinkedList来代替Array,相对来说会稍微简单些。
LinkedList的实现,可以参考《Java 7 Concurrency Cookbook》第2章wait/notify。
2. lock & condition
lock & condition,实际上也实现了类似synchronized和wait()/notify()的功能,但在加锁和解锁、暂停和唤醒方面,更加细腻和可控。
在JDK的BlockingQueue的默认实现里,也是利用了lock & condition。此文也详细介绍了怎么利用lock&condition写BlockingQueue,这里换LinkedList再实现一次:
1 public class LockConditionBroker<T> implements Broker<T> { 2 3 private final ReentrantLock lock; 4 private final Condition notFull; 5 private final Condition notEmpty; 6 private final int capacity; 7 private LinkedList<T> items; 8 9 public LockConditionBroker(int capacity) { 10 this.lock = new ReentrantLock(); 11 this.notFull = lock.newCondition(); 12 this.notEmpty = lock.newCondition(); 13 this.capacity = capacity; 14 15 items = new LinkedList<T>(); 16 } 17 18 @Override 19 public T take() { 20 T tmpObj = null; 21 lock.lock(); 22 try { 23 while (items.size() == 0) { 24 notEmpty.await(); 25 } 26 27 tmpObj = items.poll(); 28 notFull.signalAll(); 29 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } finally { 33 lock.unlock(); 34 } 35 return tmpObj; 36 } 37 38 @Override 39 public void put(T obj) { 40 lock.lock(); 41 try { 42 while (items.size() == capacity) { 43 notFull.await(); 44 } 45 46 items.offer(obj); 47 notEmpty.signalAll(); 48 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } finally { 52 lock.unlock(); 53 } 54 55 } 56 }
3. BlockingQueue
最后这种方法,也是最简单最值得推荐的。利用并发包提供的工具:阻塞队列,将阻塞的逻辑交给BlockingQueue。
实际上,上述1和2的方法实现的Broker类,也可以视为一种简单的阻塞队列,不过没有标准包那么完善。
1 public class BlockingQueueBroker<T> implements Broker<T> { 2 3 private final BlockingQueue<T> queue; 4 5 public BlockingQueueBroker() { 6 this.queue = new LinkedBlockingQueue<T>(); 7 } 8 9 @Override 10 public T take() { 11 try { 12 return queue.take(); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 17 return null; 18 } 19 20 @Override 21 public void put(T obj) { 22 try { 23 queue.put(obj); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 29 }
我们的队列封装了标注包里的LinkedBlockingQueue,十分简单高效。
接下来,就是一个1P2C的例子:
1 public interface Broker<T> { 2 3 T take(); 4 5 void put(T obj); 6 7 } 8 9 10 public class Producer implements Runnable { 11 12 private final Broker<Integer> broker; 13 private final String name; 14 15 public Producer(Broker<Integer> broker, String name) { 16 this.broker = broker; 17 this.name = name; 18 } 19 20 @Override 21 public void run() { 22 try { 23 for (int i = 0; i < 5; i++) { 24 broker.put(i); 25 System.out.format("%s produced: %s%n", name, i); 26 Thread.sleep(1000); 27 } 28 broker.put(-1); 29 System.out.println("produced termination signal"); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 return; 33 } 34 35 } 36 37 } 38 39 40 public class Consumer implements Runnable { 41 42 private final Broker<Integer> broker; 43 private final String name; 44 45 public Consumer(Broker<Integer> broker, String name) { 46 this.broker = broker; 47 this.name = name; 48 } 49 50 @Override 51 public void run() { 52 try { 53 for (Integer message = broker.take(); message != -1; message = broker.take()) { 54 System.out.format("%s consumed: %s%n", name, message); 55 Thread.sleep(1000); 56 } 57 System.out.println("received termination signal"); 58 } catch (InterruptedException e) { 59 e.printStackTrace(); 60 return; 61 } 62 63 } 64 65 } 66 67 68 public class Main { 69 70 public static void main(String[] args) { 71 Broker<Integer> broker = new WaitNotifyBroker<Integer>(5); 72 // Broker<Integer> broker = new LockConditionBroker<Integer>(5); 73 // Broker<Integer> broker = new BlockingQueueBroker<Integer>(); 74 75 new Thread(new Producer(broker, "prod 1")).start(); 76 new Thread(new Consumer(broker, "cons 1")).start(); 77 new Thread(new Consumer(broker, "cons 2")).start(); 78 79 } 80 81 }
除了上述的方法,其实还有很多第三方的并发包可以解决这个问题。例如LMAX Disruptor和Chronicle等
本文完。
参考:
《Java 7 Concurrency Cookbook》