首页 > 代码库 > 一个生产/消费者问题
一个生产/消费者问题
这几天在写一个小工具,其核心就是一个生产消费者问题
1. 单个生产者并发生产数据D
2. 多个一级消费者并发消费数据D,得到D‘
3. 单个二级消费者消费D‘,此处有一额外限定:D‘不能立即被消费,必须在一定延时之后才能被消费
4. 数据是有限的,数据被消费完毕之后,程序必须停止
我的思考过程如下
1. 不考虑二级消费者,只考虑单个生产者多个消费者的情况
最标准的生产消费者问题,只需要使用BlockingQueue即可,生产者将数据put到队列,消费者从队列中take数据。
由于生产者速度极快,为了防止队列无限增长引起OOM,实际采用的是LinkedBlockingQueue,队列写满后,put操作会被阻塞
为了让程序能在数据处理完毕(是否处理完毕只有生产者知道)之后结束,生产者会在数据处理完毕之后,向队列中放入和消费者数量等同的毒丸对象,消费者读到毒丸对象之后就会自杀。这样生产者+消费者线程都能正常结束。
2. 考虑二级消费者,和延时消费的设定
为了满足延时消费,我使用了DelayQueue,因为放入DelayQueue中的元素只有在经过了设定的时间后才能被取出。
此时一级消费者同时也是生产者,一级消费者会将自己处理完毕的数据打上时间标记,然后写入到DelayQueue中,二级消费者从DelayQueue中读取数据。
为了保证程序能在数据处理完毕后结束,我设计了这样的机制:
一级消费者会在读到毒丸对象后,向二级消费者同样写入一个毒丸对象。
二级消费者会统计接收到的毒丸对象的数量N,如果N==一级消费者的总数,说明所有一级消费者均已死亡,也就是说DelayQueue不会再被写入新的数据。
此时如果DelayQueue长度为0,表示所有数据均已消费完毕,二级消费者线程亦可自杀。
在将思路转换为实际代码的过程中,还是遇到了很多麻烦
1. DelayQueue的元素必须要实现Delay接口,那么也必须要实现getDelay和compareTo方法。
其中特别需要注意的是getDelay方法,这个方法有一个TimeUnit类型的参数,返回值代表这个对象还有多久到期。
我最开始认为这个方法与compareTo类似,返回结果只要是个正负数就行,具体数字无所谓。于是我就直接无视传入的TimeUnit参数,直接以毫秒为单位返回结果。但是实际上这样做是有问题的:
分析DelayQueue的源码后发现,DelayQueue内部维护了一个优先队列(根据重载的compareTo方法进行排序),从队列中取元素的时候,DelayQueue会调用队头的元素的getDelay方法(传入的TimeUnit参数是nanoseconds!),然后根据返回结果调用awaitNanos方法等待一定的时间。从这里我们可以看出,按我的做法,DelayQueue的检查频率会提升若干个数量级,白白耗费cpu资源。
一个生产/消费者问题