首页 > 代码库 > JUC——Exchanger

JUC——Exchanger

A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to theexchange method, matches with a partner thread, and receives its partner‘s object on return. An Exchanger may be viewed as a bidirectional form of aSynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.  

JDK1.5中有一个Exchanger类,可以用来完成线程间的数据交换。 

类java.util.concurrent.Exchanger提供了一个同步点,在这个同步点,线程可以组对,并且彼此交换数据。一个Exchanger可以看作是SynchronousQueue的双向版本(SynchronousQueue本身数据是单向的),每个线程通过exchange()方法的入口提供数据给他的伙伴线程,并接收他的伙伴线程提供的数据,并返回。当两个线程通过Exchanger交换了对象,这个交换对于两个线程来说都是安全的。 

Sample Usage: Here are the highlights of a class that uses anExchanger to swap buffers between threads so that the thread filling the buffer gets a freshly emptied one when it needs it, handing off the filled one to the thread emptying the buffer. 

 class FillAndEmpty {
   Exchanger exchanger = new Exchanger();
   DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
 }
 }

算法介绍:

/*
     * Algorithm Description:
     *
     * The basic idea is to maintain a "slot", which is a reference to
     * a Node containing both an Item to offer and a "hole" waiting to
     * get filled in.  If an incoming "occupying" thread sees that the
     * slot is null, it CAS'es (compareAndSets) a Node there and waits
     * for another to invoke exchange.  That second "fulfilling" thread
     * sees that the slot is non-null, and so CASes it back to null,
     * also exchanging items by CASing the hole, plus waking up the
     * occupying thread if it is blocked.  In each case CAS'es may
     * fail because a slot at first appears non-null but is null upon
     * CAS, or vice-versa.  So threads may need to retry these
     * actions.
     *
     * This simple approach works great when there are only a few
     * threads using an Exchanger, but performance rapidly
     * deteriorates due to CAS contention on the single slot when
     * there are lots of threads using an exchanger.  So instead we use
     * an "arena"; basically a kind of hash table with a dynamically
     * varying number of slots, any one of which can be used by
     * threads performing an exchange.  Incoming threads pick slots
     * based on a hash of their Thread ids.  If an incoming thread
     * fails to CAS in its chosen slot, it picks an alternative slot
     * instead.  And similarly from there.  If a thread successfully
     * CASes into a slot but no other thread arrives, it tries
     * another, heading toward the zero slot, which always exists even
     * if the table shrinks.  The particular mechanics controlling this
     * are as follows:
     *
     * Waiting: Slot zero is special in that it is the only slot that
     * exists when there is no contention.  A thread occupying slot
     * zero will block if no thread fulfills it after a short spin.
     * In other cases, occupying threads eventually give up and try
     * another slot.  Waiting threads spin for a while (a period that
     * should be a little less than a typical context-switch time)
     * before either blocking (if slot zero) or giving up (if other
     * slots) and restarting.  There is no reason for threads to block
     * unless there are unlikely to be any other threads present.
     * Occupants are mainly avoiding memory contention so sit there
     * quietly polling for a shorter period than it would take to
     * block and then unblock them.  Non-slot-zero waits that elapse
     * because of lack of other threads waste around one extra
     * context-switch time per try, which is still on average much
     * faster than alternative approaches.
     *
     * Sizing: Usually, using only a few slots suffices to reduce
     * contention.  Especially with small numbers of threads, using
     * too many slots can lead to just as poor performance as using
     * too few of them, and there's not much room for error.  The
     * variable "max" maintains the number of slots actually in
     * use.  It is increased when a thread sees too many CAS
     * failures.  (This is analogous to resizing a regular hash table
     * based on a target load factor, except here, growth steps are
     * just one-by-one rather than proportional.)  Growth requires
     * contention failures in each of three tried slots.  Requiring
     * multiple failures for expansion copes with the fact that some
     * failed CASes are not due to contention but instead to simple
     * races between two threads or thread pre-emptions occurring
     * between reading and CASing.  Also, very transient peak
     * contention can be much higher than the average sustainable
     * levels.  An attempt to decrease the max limit is usually made
     * when a non-slot-zero wait elapses without being fulfilled.
     * Threads experiencing elapsed waits move closer to zero, so
     * eventually find existing (or future) threads even if the table
     * has been shrunk due to inactivity.  The chosen mechanics and
     * thresholds for growing and shrinking are intrinsically
     * entangled with indexing and hashing inside the exchange code,
     * and can't be nicely abstracted out.
     *
     * Hashing: Each thread picks its initial slot to use in accord
     * with a simple hashcode.  The sequence is the same on each
     * encounter by any given thread, but effectively random across
     * threads.  Using arenas encounters the classic cost vs quality
     * tradeoffs of all hash tables.  Here, we use a one-step FNV-1a
     * hash code based on the current thread's Thread.getId(), along
     * with a cheap approximation to a mod operation to select an
     * index.  The downside of optimizing index selection in this way
     * is that the code is hardwired to use a maximum table size of
     * 32.  But this value more than suffices for known platforms and
     * applications.
     *
     * Probing: On sensed contention of a selected slot, we probe
     * sequentially through the table, analogously to linear probing
     * after collision in a hash table.  (We move circularly, in
     * reverse order, to mesh best with table growth and shrinkage
     * rules.)  Except that to minimize the effects of false-alarms
     * and cache thrashing, we try the first selected slot twice
     * before moving.
     *
     * Padding: Even with contention management, slots are heavily
     * contended, so use cache-padding to avoid poor memory
     * performance.  Because of this, slots are lazily constructed
     * only when used, to avoid wasting this space unnecessarily.
     * While isolation of locations is not much of an issue at first
     * in an application, as time goes on and garbage-collectors
     * perform compaction, slots are very likely to be moved adjacent
     * to each other, which can cause much thrashing of cache lines on
     * MPs unless padding is employed.
     *
     * This is an improvement of the algorithm described in the paper
     * "A Scalable Elimination-based Exchange Channel" by William
     * Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05
     * workshop.  Available at: http://hdl.handle.net/1802/2104
     */