首页 > 代码库 > [源码]源代码解析 SynchronousQueue
[源码]源代码解析 SynchronousQueue
简析SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue
三者都是blockingQueue.
LinkedBlockingQueue,ArrayBlockingQueue
有界,默认是Integer.Max;
SynchronousQueue没什么界不界的概念.之所以这么说.是因为它的操作必须成对.
注记方案:
oppo(oppo手机)是一对,offer和pool不阻塞
ppt是一对.put和take都阻塞.
1. 里面没有任何数据.调用offer()无法成功,返回flase,表示填充失败.调用put被阻塞,直到有人take或者poll, 反之亦然,如下.
2. 先take,被阻塞,直到有一个线程来offer,或者put.
两个不同的互补碰撞发生匹配完成(fullfill). 之前的take的线程被唤醒获得offer的提供的数据.
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<Long> workQueue = new SynchronousQueue<Long>();
boolean offer = workQueue.offer(2L);
System.out.println("main thread: offer="+offer);
ExecutorService newCachedThreadPool =Executors.newCachedThreadPool();
// 内部实现是 <span style="font-family: 'Microsoft YaHei';">new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); </span>
newCachedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("take thread: begin take and thread will be blocked by call park(await) method");
Long take = workQueue.take();
System.out.println("take thread: take suceffull , take object="+take);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
});
Thread.sleep(2000);
System.out.println("main thread: after sleep ");
newCachedThreadPool.execute(new Runnable() {
@Override
public void run() {
long object = 123L;
System.out.println("offer thead: begin offer "+object);
boolean offer = workQueue.offer(object);
System.out.println("offer thead: finish offer , is sucefully ? "+offer+" , if true, SynchronousQueue will unpark(notify) the take thread ");
}
});
newCachedThreadPool.shutdown();
}
输出:
main thread: offer=false
take thread: begin take and thread will be blocked by call park(await) method
main thread: after sleep
offer thead: begin offer 123
take thread: take suceffull , take object=123
offer thead: finish offer , is sucefully ? true , if true, SynchronousQueue will unpark(notify) the take thread
SynchronousQueue没什么界不界的概念.之所以这么说.是因为它的操作必须成对.
注记方案:
oppo(oppo手机)是一对,offer和pool不阻塞
ppt是一对.put和take都阻塞.
1. 里面没有任何数据.调用offer()无法成功,返回flase,表示填充失败.调用put被阻塞,直到有人take或者poll, 反之亦然,如下.
2. 先take,被阻塞,直到有一个线程来offer,或者put.
两个不同的互补碰撞发生匹配完成(fullfill). 之前的take的线程被唤醒获得offer的提供的数据.
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<Long> workQueue = new SynchronousQueue<Long>();
boolean offer = workQueue.offer(2L);
System.out.println("main thread: offer="+offer);
ExecutorService newCachedThreadPool =Executors.newCachedThreadPool();
// 内部实现是 <span style="font-family: 'Microsoft YaHei';">new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); </span>
newCachedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("take thread: begin take and thread will be blocked by call park(await) method");
Long take = workQueue.take();
System.out.println("take thread: take suceffull , take object="+take);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
});
Thread.sleep(2000);
System.out.println("main thread: after sleep ");
newCachedThreadPool.execute(new Runnable() {
@Override
public void run() {
long object = 123L;
System.out.println("offer thead: begin offer "+object);
boolean offer = workQueue.offer(object);
System.out.println("offer thead: finish offer , is sucefully ? "+offer+" , if true, SynchronousQueue will unpark(notify) the take thread ");
}
});
newCachedThreadPool.shutdown();
}
输出:
main thread: offer=false
take thread: begin take and thread will be blocked by call park(await) method
main thread: after sleep
offer thead: begin offer 123
take thread: take suceffull , take object=123
offer thead: finish offer , is sucefully ? true , if true, SynchronousQueue will unpark(notify) the take thread
关键术语解析:
和其他queue不同的是SynchronousQueue的take()函数调用也有可能被添加到queue里,变成一个节点(未匹配时.)
Node类型一共分层两种. 一种是 isDate=true. (对应offer , put 函数) 一种是isDate=false (对应 take函数)
dual queue:dual的含义就好理解了,因为只有两类,可以当isDate=true和isDate=false遇到时会匹配.可翻译为
成双的,对偶的. 对偶队列.same mode: 相同的模式(isDate都=true,或者isDate都=false).比如take产生的Node和前面已经放入到队列中的take动作Node就属于同一个模式
fulfill(从SynchronousQueue下面的一个注释我们可以理解,具体见本文下载的摘抄中的红体字):
same mode: 相同的模式(isDate都=true,或者isDate都=false).比如take产生的Node和前面已经放入到队列中的take动作Node就属于同一个模式
字面英文翻译,完成.具体到算法里的含义是一个动作和之前的complementary(译为互补)的动作得到匹配.
complementary :互补的.比如先take,放到队列中.后面来一个offer动作就是complementary (互补)
=============
SynchronousQueue下面的一个部分注释部分翻译.
/*
* This class implements extensions of the dual stack and dual
* queue algorithms described in "Nonblocking Concurrent Objects
* with Condition Synchronization", by W. N. Scherer III and
* M. L. Scott. 18th Annual Conf. on Distributed Computing,
* Oct. 2004 (see also
*
http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html
).参照的算法
* The (Lifo) stack is used for non-fair mode, and the (Fifo)
* queue for fair mode. The performance of the two is generally
* similar. Fifo usually supports higher throughput under
* contention but Lifo maintains higher thread locality in common
* applications.
*
* A dual queue (and similarly stack) is one that at any given
* time either holds "data" -- items provided by put operations,
* or "requests" -- slots representing take operations, or is
* empty. A call to "
fulfill
" 翻译: 一个目的是为了"完成"的调用 (i.e., a call requesting an item
* from a queue holding data or vice versa 翻译: 当queue里有数据时,一个调用请求数据,就会得到匹配. 这样的调用称为实现完成的调用 ) dequeues a
*
complementary
node. (翻译: 会让一个互补的节点退出队列)The most interesting feature of these
* queues is that any operation can figure out which mode the
* queue is in, and act accordingly without needing locks.
下面还有一些注释未剪贴上来,比较了java实现的算法和借鉴的算法(见注释中网址)有何区别
* The algorithms here differ from the versions in the above paper
* in extending them for use in synchronous queues, as well as
* dealing with cancellation. The main differences include:
*
* 1. The original algorithms used bit-marked pointers, but
* the ones here use mode bits in nodes, leading to a number
* of further adaptations.
* 2. SynchronousQueues must block threads waiting to become
* fulfilled.
* 3. Support for cancellation via timeout and interrupts,
* including cleaning out cancelled nodes/threads
* from lists to avoid garbage retention and memory depletion.
* in extending them for use in synchronous queues, as well as
* dealing with cancellation. The main differences include:
*
* 1. The original algorithms used bit-marked pointers, but
* the ones here use mode bits in nodes, leading to a number
* of further adaptations.
* 2. SynchronousQueues must block threads waiting to become
* fulfilled.
* 3. Support for cancellation via timeout and interrupts,
* including cleaning out cancelled nodes/threads
* from lists to avoid garbage retention and memory depletion.
=============再来看看SynchronousQueue.TransferQueue.transfer下面的注释.=========
/**
* Puts or takes an item.
*/
Object transfer(Object e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
* 了解了上面的几个术语概念.就很容易明白这句话的含义.
当队列是空,或者是同一种Mode时,直接放入到列队尾.不会完成(fullfill)
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to
fulfill
by CAS‘ing
* item field of waiting node and
dequeuing
it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = http://www.mamicode.com/(e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData =http://www.mamicode.com/= isData) { // empty or same-mode
//楼主注:①
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can‘t wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos); // 楼主注: 会被阻塞,等待其他线程互补操作时唤醒
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null)? x : e; // 楼主注: 因为是互补匹配的.要么x=null 要么 e=null 返回一个非null的值即可.不管该线程调用的是take还是put,都返回数据.
} else { // complementary-mode //楼主注:②
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData =http://www.mamicode.com/= (x != null) ||
// m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null)? x : e; // 楼主注: 因为是互补匹配的.要么x=null 要么 e=null 返回一个非null的值即可.不管该线程调用的是take还是put,都返回数据.
}
}
}
我一开始没理解的一点是:
当一个head是 isDate=false , tail是isDate=true时, 一个线程进来的操作是isDate=false时.
不会进入①,进入②后看代码又无法和head完成匹配(fullfill).
后来想明白了,这种情况不会发生.因为tail是isDate=true,这个会与head完成匹配(fullfill).换句话说.
队列里tail和head肯定是same mode.所以当①判断失败,进入②后,肯定能和head完成匹配(fulfill)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。