首页 > 代码库 > 并发设计模式之生产者消费者设计模式

并发设计模式之生产者消费者设计模式

主函数:

 1 package com.ietree.basicskill.mutilthread.designpattern.ProducerConsumer;
 2 
 3 import java.util.concurrent.BlockingQueue;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.LinkedBlockingQueue;
 7 
 8 /**
 9  * Created by Administrator on 2017/5/17.
10  */
11 public class Main {
12     public static void main(String[] args) throws Exception {
13         //内存缓冲区
14         BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
15         //生产者
16         Producer p1 = new Producer(queue);
17         Producer p2 = new Producer(queue);
18         Producer p3 = new Producer(queue);
19         //消费者
20         Consumer c1 = new Consumer(queue);
21         Consumer c2 = new Consumer(queue);
22         Consumer c3 = new Consumer(queue);
23 
24         //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
25         ExecutorService cachePool = Executors.newCachedThreadPool();
26         cachePool.execute(p1);
27         cachePool.execute(p2);
28         cachePool.execute(p3);
29         cachePool.execute(c1);
30         cachePool.execute(c2);
31         cachePool.execute(c3);
32 
33         try {
34             Thread.sleep(3000);
35         } catch (InterruptedException e) {
36             e.printStackTrace();
37         }
38         p1.stop();
39         p2.stop();
40         p3.stop();
41         try {
42             Thread.sleep(2000);
43         } catch (InterruptedException e) {
44             e.printStackTrace();
45         }
46 //        cachePool.shutdown();
47 //        cachePool.shutdownNow();
48 
49     }
50 }

Producer类:

 1 package com.ietree.basicskill.mutilthread.designpattern.ProducerConsumer;
 2 
 3 import java.util.Random;
 4 import java.util.concurrent.BlockingQueue;
 5 import java.util.concurrent.TimeUnit;
 6 import java.util.concurrent.atomic.AtomicInteger;
 7 
 8 /**
 9  * Created by Administrator on 2017/5/17.
10  */
11 public class Producer implements Runnable {
12 
13     //共享缓存区
14     private BlockingQueue<Data> queue;
15     //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
16     private volatile boolean isRunning = true;
17     //id生成器
18     private static AtomicInteger count = new AtomicInteger();
19     //随机对象
20     private static Random r = new Random();
21 
22     public Producer(BlockingQueue queue){
23         this.queue = queue;
24     }
25 
26     @Override
27     public void run() {
28         while(isRunning){
29             try {
30                 //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
31                 Thread.sleep(r.nextInt(1000));
32                 //获取的数据进行累计...
33                 int id = count.incrementAndGet();
34                 //比如通过一个getData方法获取了
35                 Data data = http://www.mamicode.com/new Data(Integer.toString(id), "数据" + id);
36                 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
37                 if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
38                     System.out.println("提交缓冲区数据失败....");
39                     //do something... 比如重新提交
40                 }
41             } catch (InterruptedException e) {
42                 e.printStackTrace();
43             }
44         }
45     }
46 
47     public void stop(){
48         this.isRunning = false;
49     }
50 }

Consumer类:

 1 package com.ietree.basicskill.mutilthread.designpattern.ProducerConsumer;
 2 
 3 import java.util.Random;
 4 import java.util.concurrent.BlockingQueue;
 5 
 6 /**
 7  * Created by Administrator on 2017/5/17.
 8  */
 9 public class Consumer implements Runnable {
10     private BlockingQueue<Data> queue;
11 
12     public Consumer(BlockingQueue queue){
13         this.queue = queue;
14     }
15 
16     //随机对象
17     private static Random r = new Random();
18 
19     @Override
20     public void run() {
21         while(true){
22             try {
23                 //获取数据
24                 Data data = http://www.mamicode.com/this.queue.take();
25                 //进行数据处理。休眠0 - 1000毫秒模拟耗时
26                 Thread.sleep(r.nextInt(1000));
27                 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
28             } catch (InterruptedException e) {
29                 e.printStackTrace();
30             }
31         }
32     }
33 }

Data类

 1 package com.ietree.basicskill.mutilthread.designpattern.ProducerConsumer;
 2 
 3 /**
 4  * Created by Administrator on 2017/5/17.
 5  */
 6 public class Data {
 7     private String id;
 8     private String name;
 9 
10     public Data(String id, String name){
11         this.id = id;
12         this.name = name;
13     }
14 
15     public String getId() {
16         return id;
17     }
18 
19     public void setId(String id) {
20         this.id = id;
21     }
22 
23     public String getName() {
24         return name;
25     }
26 
27     public void setName(String name) {
28         this.name = name;
29     }
30 
31     @Override
32     public String toString(){
33         return "{id: " + id + ", name: " + name + "}";
34     }
35 }

 

并发设计模式之生产者消费者设计模式