首页 > 代码库 > 并发容器2

并发容器2

主要分析队列中的并发类; 基于1.6 分析;
2017-04-29 21:57:21 星期六


1 BlockingQueue 接口

该接口继承自Queue,是Java中阻塞队列的顶层接口; (此处的队列指的是FIFO)

1.6 中大概有7种 阻塞队列(Queue);LinkedBlockingDeque实现 BlockingDeque(该接口是BlockingQueue子类,指的是阻塞的双端队列,即可能支持FIFO或LIFO)

技术分享

特性
  1. 不支持null;
  2. 该队列可以是有容量限制的。 之前集合中的非阻塞队列一般都是自动扩容,如ArrayDeque.
  3. 线程安全的;
  4. 主要是要在生产者-消费者模型;
  5. remove(X) 能删除队列中的元素;
方法:

技术分享
技术分享

  • contain remove 这些都是Collection中定义的方法;
  • remainingCapacity: 由于支持有限队列,用来获取剩余长度;
  • drainTo: 类似于清空队列,并将队列中元素全部取出到集合中。

2 ArrayBlockingQueue

  1. 该类底层使用有界数组实现的阻塞队列;
  2. FIFO(队首 remove 队尾 add )
  3. 公平机制: 当有几个线程阻塞在队列中时,公平机制会按照请求顺序调度这些阻塞线程,非公平机制则顺序不确定。 默认情况下非公平!
  4. 内部基于ReenTarntLock+Condition实现。所有方法都是线程安全。包括迭代器。

2.1 结构

  1. 队列结构: 内部存储的实质是Object数组. 定长数组,count记录当前队列中元素个数。takeIndex和putIndex分别指向入出索引;

    • 队列判空基于count;
  2. Lock

    • 内部持有独占锁,以及绑定的两个Condition。
    • 该类所有方法都必须持有该锁,包括迭代器中操作
      技术分享

2.2 方法

2.2.1 构造函数
  • 关于公平机制: 此处的公平即ReentrantLock的公平机制,如当put or take 导致线程阻塞时,当某线程执行完毕后公平机制会按照请求顺序唤醒后序线程,不会出现插队现象。
  • 所有方法使用的都是lock这个锁。
    技术分享
2.2.2 Queue方法

offer poll peek; 返回 null | false;
add remove element; 抛出异常; (这些函数基本是通过调用上面函数实现。)

  • 不支持null元素
  • 这些属于非阻塞方法; 但是实现时与阻塞方法一样都是调用extract和insert实现入出; 而这两个方法每次执行后都会在相应的Condition执行唤醒操作。
    技术分享
2.2.3 阻塞方法

put 和 take; 生产者消费者直接调用此两个函数即可实现,很方便;

  • 2.2.2中虽然也会lock()但是并不响应中断,此两个方法支持中断。 获取锁调用lockInterruptibly()
  • 实现也比较简单,while一直判断,只要不满足条件就阻塞该线程。

该类中定义两个Condition对象。此处执行put只要队满那么就会在notFull这个条件等待。 (期间可能会睡眠在notFull这个队列,当然也会释放相应的锁,别的线程调用notFull.signal()唤醒后就会转入ReenTrantLock队列重新获取锁执行。)
技术分享

2.2.4 时间限制+阻塞

即offer(E, long, TimeUnit)和poll(long, TimeUnit) 两个方法;

  • 支持中断。阻塞方法;在给定时间内放弃。

和put take实现基本一致,只是此处调用了Condition的时间限制wait方法;

2.2.5 其余方法

drainTo: 移除元素并转移到给定集合;
由于本身也是Collection子类, 所以许多方法都是支持的如 toArray()等;
remove: 支持移除某个位置的元素。但是由于基于数组实现,所以代价比较高。 其后面位置元素会依次向前;

2.3 迭代器

  • 获取迭代器函数iterator()和 迭代器内部方法都必须持有锁

此迭代器是弱一致性的,不会抛出ConcurrentModificationException, 也就是说迭代过程中不能保证remove这样的操作立即能够反映出来。

2.4 序列化

该类仅仅标记实现Serializable 并没有实现read等方法;即采用了默认序列化,变量没有使用transient关键字。
而LinkedBlockingQueue由于链表实现,则不同。


3 LinkedBlockingQueue

链接队列的吞吐量通常要高于基于数组的。

3.1 结构

  1. 与ArrayBlockingQueue不同 此类使用了两个独占锁, takelock和putlock,并且将Condition分别绑定到对应锁中;
  2. count 采用AtomicInteger类型。 而ArrayBlockingQueue就是普通的int(该类任何函数都是需要持有锁所以没必要)
  3. 使用两个独占锁使得put和take没必要竞争。效率更高。 但是对于remove 迭代器等方法还是需要持有两把锁。
  4. 默认队列长度Integer.MAX_VALUE, 而Array没有默认构造函数必须指定长度。

3.2 基于两个锁实现

3.2.1 ArrayBlockingQueue 实现

  • 阻塞队列,主要就是两个功能: put和take; insert和extract是入队出队核心函数。
    1. 能够发现二者其实并没有数据竞争除了count变量。 入队只要将putIndex++;出队只要takeIndex--;
    2. ArrayBlockingQueue实现时将put和take操作使用同一把锁,这样count变量便可以原子更新,但是同时导致put和take互斥执行,导致性能下降。 但是二者本身是没有必要的;
    3. LinkedBlockingQueue 即持有两把锁,将put和take操作分开,不再使用同一把锁;

技术分享

3.2.2 LinkedBlockingQueue 实现

  • 变量:
    技术分享

    • count变量使用AtomicInteger保证原子更新;
    • put和take 分别持有不同独占锁,并且绑定不同Condition对象;
  • 入出队:
    入队出队只要更新各自的head和last引用即可,不必负责count更新。 这样使得put和take不必互斥执行;
    技术分享

  • put :

    1. put只要持有putLock即可,如果队满则会在该锁绑定的notFull Condition等待;
    2. 通过调用以上insert()执行last引用更新。
    3. count++
    4. 唤醒notEmpty Condition 等待的线程; (如果还有容量能put也会唤醒notFull等待的线程)。
    5. 关于唤醒ArrayBlockingQueue, 则将唤醒操作放在insert操作中,也就是每次put后就只会唤醒notEmpty,即使当前队列还能继续put也不会唤醒notFull

3.3 方法

方法和ArrayBlockingQueue基本一致;

  • 对于队列put take 以及offer等版本的首尾操作,只要获取一个锁即可
  • 对于remove, clear等需要修改链表结构,还需要持有两把锁。
  • Collection中toArray等需要遍历的操作也是需要持有两把锁。

3.3.1 迭代器:

  • 关于锁: 必须持有两个锁才能next remove等
  • 此迭代器是弱一致性,即不能保证remove等操作能立即反映,但是肯定不会抛出异常;

4 其余

http://qifuguang.me/2015/10/23/Java%E5%B9%B6%E5%8F%91%E5%8C%85%E5%AD%A6%E4%B9%A0%E4%B9%9D-Java%E4%B8%AD%E7%9A%84%E9%98%BB%E5%A1%9E%E9%98%9F%E5%88%97/

简介:

  • PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。

  • SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。

  • LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

  • LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。另外双向阻塞队列可以运用在“工作窃取”模式中。(即Fork-Join)

  • DelayQueue:


Task & Q

  1. BlockingQueue 接口对于add offer等的语义??
  2. 关于迭代器中弱一致性? ConcurrentHashMap也是??
  3. 阻塞队列还剩余5个类!

并发容器2