首页 > 代码库 > 并发容器2
并发容器2
主要分析队列中的并发类; 基于1.6 分析;
2017-04-29 21:57:21 星期六
1 BlockingQueue 接口
该接口继承自Queue,是Java中阻塞队列的顶层接口; (此处的队列指的是FIFO)
1.6 中大概有7种 阻塞队列(Queue);LinkedBlockingDeque实现 BlockingDeque(该接口是BlockingQueue子类,指的是阻塞的双端队列,即可能支持FIFO或LIFO)
特性
- 不支持null;
- 该队列可以是有容量限制的。 之前集合中的非阻塞队列一般都是自动扩容,如ArrayDeque.
- 线程安全的;
- 主要是要在生产者-消费者模型;
- remove(X) 能删除队列中的元素;
方法:
- contain remove 这些都是Collection中定义的方法;
- remainingCapacity: 由于支持有限队列,用来获取剩余长度;
- drainTo: 类似于清空队列,并将队列中元素全部取出到集合中。
2 ArrayBlockingQueue
- 该类底层使用有界数组实现的阻塞队列;
- FIFO(队首 remove 队尾 add )
- 公平机制: 当有几个线程阻塞在队列中时,公平机制会按照请求顺序调度这些阻塞线程,非公平机制则顺序不确定。 默认情况下非公平!
- 内部基于ReenTarntLock+Condition实现。所有方法都是线程安全。包括迭代器。
2.1 结构
队列结构: 内部存储的实质是Object数组. 定长数组,count记录当前队列中元素个数。takeIndex和putIndex分别指向入出索引;
- 队列判空基于count;
Lock:
- 内部持有独占锁,以及绑定的两个Condition。
- 该类所有方法都必须持有该锁,包括迭代器中操作
2.2 方法
2.2.1 构造函数
- 关于公平机制: 此处的公平即ReentrantLock的公平机制,如当put or take 导致线程阻塞时,当某线程执行完毕后公平机制会按照请求顺序唤醒后序线程,不会出现插队现象。
- 所有方法使用的都是lock这个锁。
2.2.2 Queue方法
offer poll peek; 返回 null | false;
add remove element; 抛出异常; (这些函数基本是通过调用上面函数实现。)
- 不支持null元素;
- 这些属于非阻塞方法; 但是实现时与阻塞方法一样都是调用extract和insert实现入出; 而这两个方法每次执行后都会在相应的Condition执行唤醒操作。
2.2.3 阻塞方法
put 和 take; 生产者消费者直接调用此两个函数即可实现,很方便;
- 2.2.2中虽然也会lock()但是并不响应中断,此两个方法支持中断。 获取锁调用
lockInterruptibly()
- 实现也比较简单,while一直判断,只要不满足条件就阻塞该线程。
该类中定义两个Condition对象。此处执行put只要队满那么就会在notFull这个条件等待。 (期间可能会睡眠在notFull这个队列,当然也会释放相应的锁,别的线程调用notFull.signal()
唤醒后就会转入ReenTrantLock队列重新获取锁执行。)
2.2.4 时间限制+阻塞
即offer(E, long, TimeUnit)和poll(long, TimeUnit) 两个方法;
- 支持中断。阻塞方法;在给定时间内放弃。
和put take实现基本一致,只是此处调用了Condition的时间限制wait方法;
2.2.5 其余方法
drainTo: 移除元素并转移到给定集合;
由于本身也是Collection子类, 所以许多方法都是支持的如 toArray()等;
remove: 支持移除某个位置的元素。但是由于基于数组实现,所以代价比较高。 其后面位置元素会依次向前;
2.3 迭代器
- 获取迭代器函数iterator()和 迭代器内部方法都必须持有锁
此迭代器是弱一致性的,不会抛出ConcurrentModificationException, 也就是说迭代过程中不能保证remove这样的操作立即能够反映出来。
2.4 序列化
该类仅仅标记实现Serializable
并没有实现read等方法;即采用了默认序列化,变量没有使用transient关键字。
而LinkedBlockingQueue由于链表实现,则不同。
3 LinkedBlockingQueue
链接队列的吞吐量通常要高于基于数组的。
3.1 结构
- 与ArrayBlockingQueue不同 此类使用了两个独占锁, takelock和putlock,并且将Condition分别绑定到对应锁中;
- count 采用AtomicInteger类型。 而ArrayBlockingQueue就是普通的int(该类任何函数都是需要持有锁所以没必要)
- 使用两个独占锁使得put和take没必要竞争。效率更高。 但是对于remove 迭代器等方法还是需要持有两把锁。
- 默认队列长度Integer.MAX_VALUE, 而Array没有默认构造函数必须指定长度。
3.2 基于两个锁实现
3.2.1 ArrayBlockingQueue 实现
- 阻塞队列,主要就是两个功能: put和take; insert和extract是入队出队核心函数。
- 能够发现二者其实并没有数据竞争除了count变量。 入队只要将putIndex++;出队只要takeIndex--;
- ArrayBlockingQueue实现时将put和take操作使用同一把锁,这样count变量便可以原子更新,但是同时导致put和take互斥执行,导致性能下降。 但是二者本身是没有必要的;
- LinkedBlockingQueue 即持有两把锁,将put和take操作分开,不再使用同一把锁;
3.2.2 LinkedBlockingQueue 实现
变量:
- count变量使用AtomicInteger保证原子更新;
- put和take 分别持有不同独占锁,并且绑定不同Condition对象;
入出队:
入队出队只要更新各自的head和last引用即可,不必负责count更新。 这样使得put和take不必互斥执行;put :
- put只要持有putLock即可,如果队满则会在该锁绑定的notFull Condition等待;
- 通过调用以上insert()执行last引用更新。
- count++
- 唤醒notEmpty Condition 等待的线程; (如果还有容量能put也会唤醒notFull等待的线程)。
- 关于唤醒ArrayBlockingQueue, 则将唤醒操作放在insert操作中,也就是每次put后就只会唤醒notEmpty,即使当前队列还能继续put也不会唤醒notFull
3.3 方法
方法和ArrayBlockingQueue基本一致;
- 对于队列put take 以及offer等版本的首尾操作,只要获取一个锁即可
- 对于remove, clear等需要修改链表结构,还需要持有两把锁。
- Collection中toArray等需要遍历的操作也是需要持有两把锁。
3.3.1 迭代器:
- 关于锁: 必须持有两个锁才能next remove等
- 此迭代器是弱一致性,即不能保证remove等操作能立即反映,但是肯定不会抛出异常;
4 其余
http://qifuguang.me/2015/10/23/Java%E5%B9%B6%E5%8F%91%E5%8C%85%E5%AD%A6%E4%B9%A0%E4%B9%9D-Java%E4%B8%AD%E7%9A%84%E9%98%BB%E5%A1%9E%E9%98%9F%E5%88%97/
简介:
PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。
SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。另外双向阻塞队列可以运用在“工作窃取”模式中。(即Fork-Join)
DelayQueue:
Task & Q
- BlockingQueue 接口对于add offer等的语义??
- 关于迭代器中弱一致性? ConcurrentHashMap也是??
- 阻塞队列还剩余5个类!
并发容器2