首页 > 代码库 > J.U.C 系列之 Tools

J.U.C 系列之 Tools

JDK 5.0 开始,java并发大师Doug lea 为我们带来了更高效更应用的java并发API Java.util.concurrent包,简称J.U.C

 

J.U.C 体系由如下五个知识结构构成

技术分享

 

本节我们首先来介绍其中的并发辅助工具类 Tools

Tools又由几个主要的工具类构成,如下所示‘

技术分享

一 等待多线程完成的CountDownLatch

CountDownLatch允许一个或者多个线程等待其他线程完成操作。

假如有这样一个需求:我们需要解析一个Excel表里面的多个sheet数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到多有sheet的数据都解析完成之后,程序需要提示解析完成。在这个需求里面要求实现主线程等待所有线程完成sheet的解析操作,最简单的就是使用Thread 的join方法,这里不在讨论,若不清楚可以自行查看,我们这里来看看若使用CountDownLatch如何实现。

代码如下若是

import java.util.concurrent.CountDownLatch;public class CountDownLatchTest {    /*      * CountDownLatch 构造函数接受一个int 类型的参数作为计数器。如果你想等待N个点完成,       这里就传入N.这里我们测试等待2个点完成       每次调用countDown方法N就会减一     */        static CountDownLatch c = new CountDownLatch(2);    public static void main(String[] args) throws InterruptedException {        new Thread(new Runnable() {            @Override            public void run() {                System.out.println(1);                c.countDown();                                System.out.println(2);                c.countDown();            }        }).start();                        //await 方法会阻塞当前线程,直到计数器N的值变为0,        //由于countDown方法可以在任何线程执行,所以这里的N个点可以是N个线程                c.await();                System.out.println(3);    }}

如果某个sheet的解析处理的异常缓慢,我们不可能让主线程一直等待,所以可以使用另一个带超时等待的await方法await(Long time,TimeUnit unit),这个方法等待指定时间后,就不会继续阻塞当前线程。

注:countDownLatch不能够重新初始化或者修改内部计数器值

二 同步循环屏障 CyclicBarrier

CyclicBarrier的字面量的意思是可循环使用的屏障。它主要做的是,让一组线程达到一个屏障时被阻塞,直到最后一个线程到达屏障点时,屏障才会被打开,所有被屏障拦截的线程才会继续执行。

具体效果图如下所示

技术分享

 

CyclicBarrier 提供2个构造方法:

public CyclicBarrier(int parties, Runnable barrierAction) {} public CyclicBarrier(int parties) {}

参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时首先执行的任务。

下面我们通过实例演示如何使用CyclicBarrier

package com.wirt;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest {     static CyclicBarrier c = new CyclicBarrier(2,new A());    public static void main(String[] args) throws InterruptedException {        new Thread(new Runnable() {            @Override            public void run() {                try {                    c.await();                } catch (InterruptedException | BrokenBarrierException e) {                    e.printStackTrace();                }                System.out.println(1);                             }        }).start();                try {            c.await();        } catch (BrokenBarrierException e) {            e.printStackTrace();        }                System.out.println(2);    }        static class A implements Runnable{        @Override        public void run() {            System.out.println(3);        }            }}

因为设置了拦截数量是2,且两个线程都到达屏障后优先执行A任务,然后在执行到达屏障的任务

 

三 控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,他通过协调各个线程以保证合理的使用公共资源。

Semaphore 可以用于做流量控制,特别是公共资源的应用场景,比如数据库连接,加入有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取。但是到读到内存后,还是要存储到数据库中,而数据库的连接数只有十个,这是我们必须控制只有十个线程同时获取数据库连接保存数据,否则报错无法获取数据库连接,这个时候,就可以使用Semaphore来做流量控制。示例代码如下

 

 

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class SemaphoreTest {     private static final int COUNT =  30;         private static ExecutorService threadPool = Executors.newFixedThreadPool(COUNT);        private static Semaphore s = new Semaphore(10);        public static void main(String[] args) {        for(int i =0;i<COUNT;i++){            threadPool.execute(new Runnable() {                                @Override                public void run() {                     try {                         s.acquire(); //获取许可证                         System.out.println("save data");                         s.release(); //释放许可证                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            });        }                threadPool.shutdown();    }}

 

在代码中,虽然有30个线程任务在执行,但是只有10个线程可以同时并发执行。

 

 

四 线程间交换数据的Exchanger

Exchanger 是一个用于线程间协作的工具类,用于进行线程间的数据交换,它提供一个同步点,在这个同步点,两个线程通过执行exchanger方法,进行交换数据,如果第一个线程先执行exchanger,它会阻塞等待第二个线程执行exchanger方法,当两个线程都达到同步点时,这两个线程交换数据。

下面通过示例演示Exchanger用法

 

package com.wirt;import java.util.concurrent.Exchanger;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class ExchangerTest {     private static final Exchanger<String> exgr = new Exchanger<String>();        private static final ExecutorService  threadPool = Executors.newFixedThreadPool(2);        public static void main(String[] args) {        threadPool.execute(new Runnable() {                        @Override            public void run() {                 String A = "银行流水A"; //A录入银行流水数据                 try {                    exgr.exchange(A);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });                threadPool.execute(new Runnable() {                        @Override            public void run() {                 String B = "银行流水B"; //B录入银行流水数据                 try {                     String A = exgr.exchange(B);                     System.out.println("A和B数据是否一致:"+A.equals(B)+"; A="+A+"; B="+B);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });                threadPool.shutdown();    }}

 

五 总结

工具类使用场景
CountDownLatch多线程同时解析一个Excel中多个sheet,等到所有sheet解析完成,程序提示完成
CyclicBarrier多线程计算数据,最后合并计算结果
Semaphore用于流量控制,特别是公共资源有限情况下的应用场景,例如数据库连接
Exchanger1 可以用于遗传算法  2 可以用于校对工作

 

参考:《Java并发编程艺术》

工具类之Executors放在下一节

J.U.C 系列之 Tools