首页 > 代码库 > java.util.concurrent包下的几个常用类
java.util.concurrent包下的几个常用类
1.Callable<V>
2.Semaphore
public class SemaphoreTest {
private static final int NUMBER = 5; //限制资源访问数
private static final Semaphore avialable = new Semaphore(NUMBER,true);
public static void main(String[] args) {
ExecutorService pool = Executors.newCachedThreadPool();
Runnable r = new Runnable(){
public void run(){
try {
avialable.acquire(); //此方法阻塞
Thread.sleep(10*1000);
System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--执行完毕");
avialable.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
System.out.println(avialable.availablePermits());
for(int i=0;i<10;i++){
pool.execute(r);
}
System.out.println(avialable.availablePermits());
pool.shutdown();
}
public static String getNow(){
SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");
return sdf.format(new Date());
}
}
3.ReentrantLock与Condition
1.1)synchronized示例:
synchronized(object){
//do process to object
- }
private final ReentrantLock lock = new ReentrantLock();
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
public class ConditionTest {
private static final ReentrantLock lock = new ReentrantLock(true);
//从锁中创建一个绑定条件
private static final Condition condition = lock.newCondition();
private static int count = 1;
public static void main(String[] args) {
Runnable r1 = new Runnable(){
public void run(){
lock.lock();
try{
while(count<=5){
System.out.println(Thread.currentThread().getName()+"--"+count++);
Thread.sleep(1000);
}
condition.signal(); //线程r1释放条件信号,以唤醒r2中处于await的代码继续执行。
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
};
Runnable r2 = new Runnable(){
public void run(){
lock.lock();
try{
if(count<=5){
System.out.println("----$$$---");
condition.await(); //但调用await()后,lock锁会被释放,让线程r1能获取到,与Object.wait()方法一样
System.out.println("----------");
}
while(count<=10){
System.out.println(Thread.currentThread().getName()+"--"+count++);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
};
new Thread(r2).start(); //让r2先执行,先获得lock锁,但条件不满足,让r2等待await。
try {
Thread.sleep(100); //这里休眠主要是用于测试r2.await()会释放lock锁,被r1获取
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(r1).start();
}
}
public class ConditionMain {
public static void main(String[] args) {
final BoundleBuffer buf = new ConditionMain().new BoundleBuffer();
new Thread(new Runnable(){
public void run() {
for(int i=0;i<1000;i++){
try {
buf.put(i);
System.out.println("入值:"+i);
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
new Thread(new Runnable(){
public void run() {
for(int i=0;i<1000;i++){
try {
int x = buf.take();
System.out.println("出值:"+x);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
public class BoundleBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Integer[] items = new Integer[10];
int putptr, takeptr, count;
public void put(int x) throws InterruptedException {
System .out.println("put wait lock");
lock.lock();
System .out.println("put get lock");
try {
while (count == items.length){
System.out.println("buffer full, please wait");
notFull.await();
}
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
System .out.println("take wait lock");
lock.lock();
System .out.println("take get lock");
try {
while (count == 0){
System.out.println("no elements, please wait");
notEmpty.await();
}
int x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
}
4.BlockingQueue
抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
5.CompletionService
public class CompletionServiceTest {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(8); //需要2s,如果将8改成10,则只需要1s
CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(pool);
Callable<Boolean> task = new Callable<Boolean>(){
public Boolean call(){
try {
Thread.sleep(1000);
System.out.println("插入1000条数据完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
};
};
System.out.println(getNow()+"--开始插入数据");
for(int i=0;i<10;i++){
cs.submit(task);
}
for(int i=0;i<10;i++){
try {
//ExecutorCompletionService.take()方法是阻塞的,如果当前没有完成的任务则阻塞
System.out.println(cs.take().get());
//实际使用时,take()方法获取的结果可用于处理,如果插入失败,则可以进行重试或记录等操作
} catch (InterruptedException|ExecutionException e) {
e.printStackTrace();
}
}
System.out.println(getNow()+"--插入数据完成");
pool.shutdown();
}
public static String getNow(){
SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");
return sdf.format(new Date());
}
}
6.CountDownLatch
public class CountDownLatchTest {
private static SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");
public static void main(String[] args) {
final CountDownLatch start = new CountDownLatch(1); //用一个信号控制一组线程的开始,初始化为1
final CountDownLatch end = new CountDownLatch(10); //要等待N个线程的结束,初始化为N,这里是10
Runnable r = new Runnable(){
public void run(){
try {
start.await(); //阻塞,这样start.countDown()到0,所有阻塞在start.await()处的线程一起执行
Thread.sleep((long) (Math.random()*10000));
System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--执行完成");
end.countDown();//非阻塞,每个线程执行完,让end--,这样10个线程执行完end倒数到0,主线程的end.await()就可以继续执行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for(int i=0;i<10;i++){
new Thread(r).start(); //虽然开始了10个线程,但所有线程都阻塞在start.await()处
}
System.out.println(getNow()+"--线程全部启动完毕,休眠3s再让10个线程一起执行");
try {
Thread.sleep(3*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getNow()+"--开始");
start.countDown(); //start初始值为1,countDown()变成0,触发10个线程一起执行
try {
end.await(); //阻塞,等10个线程都执行完了才继续往下。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getNow()+"--10个线程都执行完了,主线程继续往下执行!");
}
private static String getNow(){
return sdf.format(new Date());
}
}
7.CyclicBarrier
1.一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。也就是说,这一组线程的执行分几个节点,每个节点往下执行,都需等待其他线程,这就需要这种等待具有循环性。CyclicBarrier在这样的情况下就很有用。
2.CyclicBarrier与CountDownLacth的区别:
1)CountDownLacth用于一个线程与一组线程之间的相互等待。常用的就是一个主线程与一组分治线程之间的等待:主线程发号令,一组线程同时执行;一组线程依次执行完,再唤醒主线程继续执行;
CyclicBarrier用于一组线程执行时,每个线程执行有多个节点,每个节点的处理需要相互等待。如:对5个文件进行处理,按行将各个文件数字挑出来合并成一行,排序,并输出到另一个文件,那每次处理都需要等待5个线程读入下一行。(api示例可供参考)
2)CountDownLacth的处理机制是:初始化一个值N(相当于一组线程有N个),每个线程调用一次countDown(),那么cdLatch减1,等所有线程都调用过countDown(),那么cdLatch值达到0,那么线程从await()处接着玩下执行。
CyclicBarrier的处理机制是:初始化一个值N(相当于一组线程有N个),每个线程调用一次await(),那么barrier加1,等所有线程都调用过await(),那么barrier值达到初始值N,所有线程接着往下执行,并将barrier值重置为0,再次循环下一个屏障;
3)由2)可以知道,CountDownLatch只可以使用一次,而CyclicBarrier是可以循环使用的。
3.个人用于理解的示例:
public class CyclicBarrierTest {
private static final CyclicBarrier barrier = new CyclicBarrier(5,
new Runnable(){
public void run(){ //每次线程到达屏障点,此方法都会执行
System.out.println("\n--------barrier action--------\n");
}
});
public static void main(String[] args) {
for(int i=0;i<5;i++){
new Thread(new CyclicBarrierTest().new Worker()).start();
}
}
class Worker implements Runnable{
public void run(){
try {
System.out.println(Thread.currentThread().getName()+"--第一阶段");
Thread.sleep(getRl());
barrier.await(); //每一次await()都会阻塞,等5个线程都执行到这一步(相当于barrier++操作,加到初始化值5),才继续往下执行
System.out.println(Thread.currentThread().getName()+"--第二阶段");
Thread.sleep(getRl());
barrier.await(); //每一次5个线程都到达共同的屏障节点,会执行barrier初始化参数中定义的Runnable.run()
System.out.println(Thread.currentThread().getName()+"--第三阶段");
Thread.sleep(getRl());
barrier.await();
System.out.println(Thread.currentThread().getName()+"--第四阶段");
Thread.sleep(getRl());
barrier.await();
System.out.println(Thread.currentThread().getName()+"--第五阶段");
Thread.sleep(getRl());
barrier.await();
System.out.println(Thread.currentThread().getName()+"--结束");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static long getRl(){
return Math.round(10000);
}
}
java.util.concurrent包下的几个常用类