首页 > 代码库 > Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题。有如下几个常见的实现方法:

1. wait()/notify()

2. lock & condition

3. BlockingQueue

 

下面来逐一分析。

 

1. wait()/notify()

第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行;这也是最原始的实现。

 1 public class WaitNotifyBroker<T> implements Broker<T> {
 2 
 3     private final Object[] items;
 4 
 5     private int takeIndex;
 6     private int putIndex;
 7     private int count;
 8 
 9     public WaitNotifyBroker(int capacity) {
10         this.items = new Object[capacity];
11     }
12 
13     @SuppressWarnings("unchecked")
14     @Override
15     public T take() {
16         T tmpObj = null;
17         try {
18             synchronized (items) {
19                 while (0 == count) {
20                     items.wait();
21                 }
22                 tmpObj = (T) items[takeIndex];
23                 if (++takeIndex == items.length) {
24                     takeIndex = 0;
25                 }
26                 count--;
27                 items.notify();
28             }
29         } catch (InterruptedException e) {
30             e.printStackTrace();
31         }
32 
33         return tmpObj;
34     }
35 
36     @Override
37     public void put(T obj) {
38         try {
39             synchronized (items) {
40                 while (items.length == count) {
41                     items.wait();
42                 }
43 
44                 items[putIndex] = obj;
45                 if (++putIndex == items.length) {
46                     putIndex = 0;
47                 }
48                 count++;
49                 items.notify();
50             }
51         } catch (InterruptedException e) {
52             e.printStackTrace();
53         }
54 
55     }
56 
57 }

这里利用Array构造一个Buffer去存取数据,并利用count, putIndex和takeIndex来保证First-In-First-Out。

如果利用LinkedList来代替Array,相对来说会稍微简单些。

LinkedList的实现,可以参考《Java 7 Concurrency Cookbook》第2章wait/notify。

 

 

2. lock & condition

lock & condition,实际上也实现了类似synchronized和wait()/notify()的功能,但在加锁和解锁、暂停和唤醒方面,更加细腻和可控。

在JDK的BlockingQueue的默认实现里,也是利用了lock & condition。此文也详细介绍了怎么利用lock&condition写BlockingQueue,这里换LinkedList再实现一次:

 1 public class LockConditionBroker<T> implements Broker<T> {
 2 
 3     private final ReentrantLock lock;
 4     private final Condition notFull;
 5     private final Condition notEmpty;
 6     private final int capacity;
 7     private LinkedList<T> items;
 8 
 9     public LockConditionBroker(int capacity) {
10         this.lock = new ReentrantLock();
11         this.notFull = lock.newCondition();
12         this.notEmpty = lock.newCondition();
13         this.capacity = capacity;
14 
15         items = new LinkedList<T>();
16     }
17 
18     @Override
19     public T take() {
20         T tmpObj = null;
21         lock.lock();
22         try {
23             while (items.size() == 0) {
24                 notEmpty.await();
25             }
26 
27             tmpObj = items.poll();
28             notFull.signalAll();
29 
30         } catch (InterruptedException e) {
31             e.printStackTrace();
32         } finally {
33             lock.unlock();
34         }
35         return tmpObj;
36     }
37 
38     @Override
39     public void put(T obj) {
40         lock.lock();
41         try {
42             while (items.size() == capacity) {
43                 notFull.await();
44             }
45 
46             items.offer(obj);
47             notEmpty.signalAll();
48 
49         } catch (InterruptedException e) {
50             e.printStackTrace();
51         } finally {
52             lock.unlock();
53         }
54 
55     }
56 }

 

 

3. BlockingQueue

最后这种方法,也是最简单最值得推荐的。利用并发包提供的工具:阻塞队列,将阻塞的逻辑交给BlockingQueue。

实际上,上述1和2的方法实现的Broker类,也可以视为一种简单的阻塞队列,不过没有标准包那么完善。

 1 public class BlockingQueueBroker<T> implements Broker<T> {
 2 
 3     private final BlockingQueue<T> queue;
 4 
 5     public BlockingQueueBroker() {
 6         this.queue = new LinkedBlockingQueue<T>();
 7     }
 8 
 9     @Override
10     public T take() {
11         try {
12             return queue.take();
13         } catch (InterruptedException e) {
14             e.printStackTrace();
15         }
16 
17         return null;
18     }
19 
20     @Override
21     public void put(T obj) {
22         try {
23             queue.put(obj);
24         } catch (InterruptedException e) {
25             e.printStackTrace();
26         }
27     }
28 
29 }

我们的队列封装了标注包里的LinkedBlockingQueue,十分简单高效。

 

接下来,就是一个1P2C的例子:

 1 public interface Broker<T> {
 2 
 3     T take();
 4 
 5     void put(T obj);
 6 
 7 }
 8 
 9 
10 public class Producer implements Runnable {
11 
12     private final Broker<Integer> broker;
13     private final String name;
14 
15     public Producer(Broker<Integer> broker, String name) {
16         this.broker = broker;
17         this.name = name;
18     }
19 
20     @Override
21     public void run() {
22         try {
23             for (int i = 0; i < 5; i++) {
24                 broker.put(i);
25                 System.out.format("%s produced: %s%n", name, i);
26                 Thread.sleep(1000);
27             }
28             broker.put(-1);
29             System.out.println("produced termination signal");
30         } catch (InterruptedException e) {
31             e.printStackTrace();
32             return;
33         }
34 
35     }
36 
37 }
38 
39 
40 public class Consumer implements Runnable {
41 
42     private final Broker<Integer> broker;
43     private final String name;
44 
45     public Consumer(Broker<Integer> broker, String name) {
46         this.broker = broker;
47         this.name = name;
48     }
49 
50     @Override
51     public void run() {
52         try {
53             for (Integer message = broker.take(); message != -1; message = broker.take()) {
54                 System.out.format("%s consumed: %s%n", name, message);
55                 Thread.sleep(1000);
56             }
57             System.out.println("received termination signal");
58         } catch (InterruptedException e) {
59             e.printStackTrace();
60             return;
61         }
62 
63     }
64 
65 }
66 
67 
68 public class Main {
69 
70     public static void main(String[] args) {
71         Broker<Integer> broker = new WaitNotifyBroker<Integer>(5);
72 //         Broker<Integer> broker = new LockConditionBroker<Integer>(5);
73 //         Broker<Integer> broker = new BlockingQueueBroker<Integer>();
74 
75         new Thread(new Producer(broker, "prod 1")).start();
76         new Thread(new Consumer(broker, "cons 1")).start();
77         new Thread(new Consumer(broker, "cons 2")).start();
78 
79     }
80 
81 }

 

除了上述的方法,其实还有很多第三方的并发包可以解决这个问题。例如LMAX Disruptor和Chronicle等

 

本文完。

 

参考:

《Java 7 Concurrency Cookbook》