首页 > 代码库 > Java实现生产者和消费者

Java实现生产者和消费者

  生产者和消费者问题是操作系统的经典问题,在实际工作中也常会用到,主要的难点在于协调生产者和消费者,因为生产者的个数和消费者的个数不确定,而生产者的生成速度与消费者的消费速度也不一样,同时还要实现生产者与消费者的解耦,即生产者并不知道有哪些消费者,而消费者也不需要知道产品是哪个生产的,他们之间只与一个交易平台发生关系。

  这是现实世界普遍存在的问题,比如我们去苹果专卖店买IPhone 6,我们属于消费者,而生产商把产品生产出来放在苹果专卖店,如果全世界只有一个苹果专卖店,当专卖店没有IPhone 6时,我们只有等,而当专卖店屯了很多货,以至于专卖店放不下了时,苹果公司比如要生产商暂停生产。生产者与消费者是通过一个缓存仓库来交易的。

  Java里面有LinkedBlockingQueue、ArrayBlockingQueue可以在并发环境实现阻塞插入和删除,非常适合作为生产者和消费者之间的纽带。

  生产者:

/** * 生产者 * @author jiqunpeng * */class Producer implements Runnable {    LinkedBlockingQueue<Integer> buffer;    //构造生产者,注册仓库    Producer(LinkedBlockingQueue<Integer> buffer) {        this.buffer = buffer;    }    /**     * 生产一个产品,当仓库已经满时,等待仓库有空地再放入仓库     * @param e     * @throws InterruptedException     */    public void produce(Integer e) throws InterruptedException {        buffer.put(e);    }    @Override    public void run() {        Random random = new Random(7);        try {            while (true) {//一生不息                Integer product = random.nextInt();                System.out.println(this + " \tProduct:\t " + product);                produce(product);                TimeUnit.MILLISECONDS.sleep(random.nextInt(500));//短暂的休息            }        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

  消费者  

/** * 消费者 * @author jiqunpeng * */class Consumer implements Runnable {    LinkedBlockingQueue<Integer> buffer;    //注册仓库    Consumer(LinkedBlockingQueue<Integer> buffer) {        this.buffer = buffer;    }    /**     * 从仓库中的取出产品消费,当仓库里面没有产品时,会一直等下去     * @return     * @throws InterruptedException     */    public Integer consume() throws InterruptedException {        Integer e = buffer.take();        return e;    }    @Override    public void run() {        Random random = new Random(7);        try {            while (true) {//一生都要吃                Integer product = consume();                System.out.println(this + " \tConsume:\t " + product);                TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));//吃了也要睡会            }        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

  调度运行

public class ProducerConsumer {    public static void main(String[] args) {        // 任务调度器        ExecutorService exec = Executors.newFixedThreadPool(10);        // 仓库        final LinkedBlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(5);        for (int i = 0; i < 2; i++) {            // 创建生产者            Producer p = new Producer(buffer);            // 领到把生产者拉到车间,被迫没日没夜的干活            exec.execute(p);            // 消费者出生了            Consumer c = new Consumer(buffer);            // 消费者一生都在消费            exec.execute(c);        }        exec.execute(new Runnable() {            @Override            public void run() {                while (true) {                    // 定时看一下仓库的空间                    System.out.println("buffer :" + buffer.size());                    try {                        TimeUnit.SECONDS.sleep(5);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        });    }}

  模拟结果:

os.Producer@16c163f     Product:     -1156638823os.Consumer@cf66b     Consume:     -1156638823os.Producer@db4fa2     Product:     -1156638823os.Consumer@491c4c     Consume:     -1156638823buffer :0os.Producer@16c163f     Product:     -1077308326os.Producer@db4fa2     Product:     -1077308326os.Producer@16c163f     Product:     1495978761os.Producer@db4fa2     Product:     1495978761os.Consumer@491c4c     Consume:     -1077308326os.Consumer@cf66b     Consume:     -1077308326os.Producer@16c163f     Product:     -441191359os.Producer@db4fa2     Product:     -441191359os.Producer@16c163f     Product:     -1253369595os.Producer@db4fa2     Product:     -1253369595os.Producer@16c163f     Product:     1511462400os.Consumer@cf66b     Consume:     1495978761os.Consumer@491c4c     Consume:     1495978761os.Producer@db4fa2     Product:     1511462400os.Producer@16c163f     Product:     518557417

   当然我们也可以自己定义一个线程安全的有界阻塞缓存队列:

public class BoundedBuffer<E> {    private Object[] buffer;    final private ReentrantLock lock;    final private Condition notEmpty;    final private Condition notFull;    private int count;    private int putIndex;    private int takeIndex;    public BoundedBuffer(int size) {        buffer = new Object[size];        lock = new ReentrantLock();        notEmpty = lock.newCondition();        notFull = lock.newCondition();    }    public void put(E e) throws InterruptedException {        lock.lock();        try {            while (count == buffer.length)                notFull.await();            buffer[putIndex] = e;            if (++putIndex == buffer.length)// 循环数组                putIndex = 0;            count++;            notEmpty.signal();        } finally {            lock.unlock();        }    }    public E take() throws InterruptedException {        lock.lock();        System.out.println("take()");        try {            while (count == 0)                notEmpty.await();            @SuppressWarnings("unchecked")            E item = (E) buffer[takeIndex];            count--;            if (++takeIndex == buffer.length)// 循环数组                takeIndex = 0;            notFull.signal();            return item;        } finally {            lock.unlock();        }    }}

 

Java实现生产者和消费者