A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner‘s object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.


  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景。

public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException


import java.util.concurrent.Exchanger;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import org.apache.log4j.Logger;/** * @Title: ExchangerTest * @Description: Test class for Exchanger * @Company: CSAIR * @Author: lixuanbin * @Creation: 2014年12月14日 * @Version:1.0 */public class ExchangerTest {    protected static final Logger log = Logger.getLogger(ExchangerTest.class);    private static volatile boolean isDone = false;    static class ExchangerProducer implements Runnable {        private Exchanger<Integer> exchanger;        private static int data = http://www.mamicode.com/1;        ExchangerProducer(Exchanger<Integer> exchanger) {            this.exchanger = exchanger;        }        @Override        public void run() {            while (!Thread.interrupted() && !isDone) {                for (int i = 1; i <= 3; i++) {                    try {                        TimeUnit.SECONDS.sleep(1);                        data = i;                        System.out.println("producer before: " + data);                        data = exchanger.exchange(data);                        System.out.println("producer after: " + data);                    } catch (InterruptedException e) {                        log.error(e, e);                    }                }                isDone = true;            }        }    }    static class ExchangerConsumer implements Runnable {        private Exchanger<Integer> exchanger;        private static int data = http://www.mamicode.com/0;        ExchangerConsumer(Exchanger<Integer> exchanger) {            this.exchanger = exchanger;        }        @Override        public void run() {            while (!Thread.interrupted() && !isDone) {                data = 0;                System.out.println("consumer before : " + data);                try {                    TimeUnit.SECONDS.sleep(1);                    data = exchanger.exchange(data);                } catch (InterruptedException e) {                    log.error(e, e);                }                System.out.println("consumer after : " + data);            }        }    }    /**     * @param args     */    public static void main(String[] args) {        ExecutorService exec = Executors.newCachedThreadPool();        Exchanger<Integer> exchanger = new Exchanger<Integer>();        ExchangerProducer producer = new ExchangerProducer(exchanger);        ExchangerConsumer consumer = new ExchangerConsumer(exchanger);        exec.execute(producer);        exec.execute(consumer);        exec.shutdown();        try {            exec.awaitTermination(30, TimeUnit.SECONDS);        } catch (InterruptedException e) {            log.error(e, e);        }    }}

   这大致可以看作是一个简易的生产者消费者模型,有两个任务类,一个递增地产生整数,一个产生整数0,然后双方进行交易。每次交易前的生产者和每次交易后的消费者都会sleep 1秒来模拟数据处理的消耗,并在交易前后把整数值打印到控制台以便检测结果。在这个例子里交易循环只执行三次,采用一个volatile boolean来控制交易双方线程的退出。


consumer before : 0
producer before: 1
consumer after : 1
producer after: 0
consumer before : 0
producer before: 2
producer after: 0
consumer after : 2
consumer before : 0
producer before: 3
producer after: 0
consumer after : 3


  • exchange方法真的帮一对线程交换了数据;
  • exchange方法真的会阻塞调用方线程直至另一方线程参与交易。



   最近接到外部项目组向我组提出的接口需求,需要查询我们业务办理量的统计情况。我们系统目前的情况是,有一个日增长十多万、总数据量为千万级别的业务办理明细表(xxx_info),每人次的业务办理结果会实时写入其中。以往对外提供的业务统计接口是在每次被调用时候在明细表中执行SQL查询(select、count、where、group by等),响应时间很长,对原生产业务的使用也有很大的影响。于是我决定趁着这次新增接口的上线机会对系统进行优化。



class ExchangerProducer implements Runnable {    private Exchanger<Set<XXXStatistics>> exchanger;    private Set<XXXStatistics> holder;    private Date fltDate;    private int threshold;    ExchangerProducer(Exchanger<Set<XXXStatistics>> exchanger,            Set<XXXStatistics> holder, Date fltDate, int threshold) {        this.exchanger = exchanger;        this.holder = holder;        this.fltDate = fltDate;        this.threshold = threshold;    }    @Override    public void run() {        try {            while (!Thread.interrupted() && !isDone) {                List<XXXStatistics> temp1 = null;                List<XXXStatistics> temp11 = null;                for (int i = 0; i < allCities.size(); i++) {                    try {                        temp1 = xxxDao                                .findStatistics1(                                        fltDate, allCities.get(i));                        temp11 = xxxDao                                .findStatistics2(                                        fltDate, allCities.get(i),                                        internationalList);                        if (temp1 != null && !temp1.isEmpty()) {                            calculationCounter.addAndGet(temp1.size());                            if (temp11 != null && !temp11.isEmpty()) {                                // merge two lists into temp1                                mergeLists(temp1, temp11);                                temp11.clear();                                temp11 = null;                            }                            // merge temp1 into holder set                            mergeListToSet(holder, temp1);                            temp1.clear();                            temp1 = null;                        }                    } catch (Exception e) {                        log.error(e, e);                    }                    // Insert every ${threshold} or the last into database.                    if (holder.size() >= threshold                            || i == (allCities.size() - 1)) {                        log.info("data collected: \n" + holder);                        holder = exchanger.exchange(holder);                        log.info("data submitted");                    }                }                // all cities are calculated                isDone = true;            }            log.info("calculation job done, calculated: "                    + calculationCounter.get());        } catch (InterruptedException e) {            log.error(e, e);        }        exchanger = null;        holder.clear();        holder = null;        fltDate = null;    }}



  • threshold:缓冲区的容量阀值;
  • allCities:城市列表,迭代这个列表作为入参来执行查询统计;
  • XXXStatistics:统计数据封装实体类,实现了Serializable和Comparable接口,覆写equals和compareTo方法,以利用TreeSet提供的去重和排序处理;
  • isDone:volatile boolean,标识统计任务是否完成;
  • holder:TreeSet<XXXStatistics>,存放统计结果的内存缓冲区,容量达到阀值后提交给Exchanger执行exchange操作;
  • dao.findStatistics1,dao.findStatistics2:简化的数据库查询统计操作,此处仅供示意;
  • calculationCounter:AtomicInteger,标记生产端所提交的记录总数;
  • mergeLists,mergeListToSet:内部私有工具方法,把dao查询返回的列表合并到holder中;



class ExchangerConsumer implements Runnable {    private Exchanger<Set<XXXStatistics>> exchanger;    private Set<XXXStatistics> holder;    ExchangerConsumer(Exchanger<Set<XXXStatistics>> exchanger,            Set<XXXStatistics> holder) {        this.exchanger = exchanger;        this.holder = holder;    }    @Override    public void run() {        try {            List<XXXStatistics> tempList;            while (!Thread.interrupted() && !isDone) {                holder = exchanger.exchange(holder);                log.info("got data: \n" + holder);                if (holder != null && !holder.isEmpty()) {                    try {                        // insert data into database                        tempList = convertSetToList(holder);                        insertionCounter.addAndGet(xxxDao                                .batchInsertXXXStatistics(tempList));                        tempList.clear();                        tempList = null;                    } catch (Exception e) {                        log.error(e, e);                    }                    // clear the set                    holder.clear();                } else {                    log.info("wtf, got an empty list");                }                log.info("data processed");            }            log.info("insert job done, inserted: " + insertionCounter.get());        } catch (InterruptedException e) {            log.error(e, e);        }        exchanger = null;        holder.clear();        holder = null;    }}



  • convertSetToList:由于dao接口的限制,需把交换得到的Set转换为List;
  • batchInsertXXXStatistics:使用jdbc4的batch update而实现的批量插入dao接口;
  • insertionCounter:AtomicInteger,标记消费端插入成功的记录总数;



public boolean calculateStatistics(Date fltDate) {    // initialization    calculationCounter.set(0);    insertionCounter.set(0);    isDone = false;    exec = Executors.newCachedThreadPool();    Set<XXXStatistics> producerSet = new TreeSet<XXXStatistics>();    Set<XXXStatistics> consumerSet = new TreeSet<XXXStatistics>();    Exchanger<Set<XXXStatistics>> xc = new Exchanger<Set<XXXStatistics>>();    ExchangerProducer producer = new ExchangerProducer(xc, producerSet,            fltDate, threshold);    ExchangerConsumer consumer = new ExchangerConsumer(xc, consumerSet);    // execution    exec.execute(producer);    exec.execute(consumer);    exec.shutdown();    boolean isJobDone = false;    try {        // wait for termination        isJobDone = exec.awaitTermination(calculationTimeoutMinutes,                TimeUnit.MINUTES);    } catch (InterruptedException e) {        log.error(e, e);    }    if (!isJobDone) {        // force shutdown        exec.shutdownNow();        log.error("time elapsed for "                + calculationTimeoutMinutes                + " minutes, but still not finished yet, shut it down anyway.");    }    // clean up    exec = null;    producerSet.clear();    producerSet = null;    consumerSet.clear();    consumerSet = null;    xc = null;    producer = null;    consumer = null;    System.gc();    // return the result    if (isJobDone && calculationCounter.get() > 0            && calculationCounter.get() == insertionCounter.get()) {        return true;    }    return false;}




   在这个案例中,使用Exchanger进行批量的双向数据交换可谓恰如其分:生产者在执行新的查询统计任务填入数据到缓冲区的同时,消费者正在批量插入生产者换入的上一次产生的数据,系统的吞吐量得到平滑的提升;计算复杂度、内存消耗、系统性能也能通过相关的参数设置而得到有效的控制(在消费端也可以对holder进行再次分割以控制每次批插入的大小,建议参阅数据库厂商以及数据库驱动包的说明文档以确定jdbc的最优batch update size);代码的实现也很简洁易懂。这些优点,是采用有界阻塞队列所难以达到的。

