首页 > 代码库 > 一个生产/消费者问题

一个生产/消费者问题

这几天在写一个小工具,其核心就是一个生产消费者问题

 

1. 单个生产者并发生产数据D

2. 多个一级消费者并发消费数据D,得到D‘

3. 单个二级消费者消费D‘,此处有一额外限定:D‘不能立即被消费,必须在一定延时之后才能被消费

4. 数据是有限的,数据被消费完毕之后,程序必须停止

 

我的思考过程如下

1. 不考虑二级消费者,只考虑单个生产者多个消费者的情况

最标准的生产消费者问题,只需要使用BlockingQueue即可,生产者将数据put到队列,消费者从队列中take数据。

由于生产者速度极快,为了防止队列无限增长引起OOM,实际采用的是LinkedBlockingQueue,队列写满后,put操作会被阻塞

为了让程序能在数据处理完毕(是否处理完毕只有生产者知道)之后结束,生产者会在数据处理完毕之后,向队列中放入和消费者数量等同的毒丸对象,消费者读到毒丸对象之后就会自杀。这样生产者+消费者线程都能正常结束。

2. 考虑二级消费者,和延时消费的设定

为了满足延时消费,我使用了DelayQueue,因为放入DelayQueue中的元素只有在经过了设定的时间后才能被取出。

此时一级消费者同时也是生产者,一级消费者会将自己处理完毕的数据打上时间标记,然后写入到DelayQueue中,二级消费者从DelayQueue中读取数据。

为了保证程序能在数据处理完毕后结束,我设计了这样的机制:

一级消费者会在读到毒丸对象后,向二级消费者同样写入一个毒丸对象。

二级消费者会统计接收到的毒丸对象的数量N,如果N==一级消费者的总数,说明所有一级消费者均已死亡,也就是说DelayQueue不会再被写入新的数据。

此时如果DelayQueue长度为0,表示所有数据均已消费完毕,二级消费者线程亦可自杀。

 

在将思路转换为实际代码的过程中,还是遇到了很多麻烦

1. DelayQueue的元素必须要实现Delay接口,那么也必须要实现getDelay和compareTo方法。

其中特别需要注意的是getDelay方法,这个方法有一个TimeUnit类型的参数,返回值代表这个对象还有多久到期。

我最开始认为这个方法与compareTo类似,返回结果只要是个正负数就行,具体数字无所谓。于是我就直接无视传入的TimeUnit参数,直接以毫秒为单位返回结果。但是实际上这样做是有问题的:

分析DelayQueue的源码后发现,DelayQueue内部维护了一个优先队列(根据重载的compareTo方法进行排序),从队列中取元素的时候,DelayQueue会调用队头的元素的getDelay方法(传入的TimeUnit参数是nanoseconds!),然后根据返回结果调用awaitNanos方法等待一定的时间。从这里我们可以看出,按我的做法,DelayQueue的检查频率会提升若干个数量级,白白耗费cpu资源。

 

一个生产/消费者问题