首页 > 代码库 > JAVA 生产者租塞模式的线程池 ThreadPoolExecutor

JAVA 生产者租塞模式的线程池 ThreadPoolExecutor

package com.dubbo.analyzer.executor;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * 任务执行者<br/> * 当线程不够且线程队列爆满时,会租塞生产者<br/> * 此类为单例,必须先调用 init 方法进行初始化.<br/> * <a href="http://www.importnew.com/10790.html" target="_blank">http://www.importnew.com/10790.html</a> * @author BennyTian * @date 2015/01/06 */public class Executor {		private Executor() { }	private static ThreadPoolExecutor executor = null;		private static TimeUnit unit = TimeUnit.MINUTES ;	private static long keepAliveTime = 1;		/**	 * 初始化	 * @param threadSize	线程数	 * @param poolSize		线程租塞队列的容量	 */	public static void init(Integer threadSize,Integer poolSize){		executor = new ThreadPoolExecutor(threadSize, threadSize, keepAliveTime, unit, new ArrayBlockingQueue<Runnable>(threadSize));		//异常捕获处理:当线程池达到处理最大极限的时候,调用 queue.put 对生产者进行租塞. execute方法默认使用的是异步的 queue.offer		executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {			@Override			public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {				try {					executor.getQueue().put(r);				} catch (InterruptedException e) {					e.printStackTrace();				}			}		});	}		/**	 * 执行任务	 * @param task	 */	public static void execute(Runnable task){		verify();		executor.execute(task);	}		/**	 * 线程池的任务是否全部执行完成	 * @return	 */	public static Boolean isCompleted(){		return executor.getActiveCount() == 0;	}		/**	 * 停止任务	 */	public static void shutdown(){		verify();		executor.shutdown();	}		private static void verify(){		if(executor==null){			throw new RuntimeException("please invoke [ init ] method");		}	}	}

 

JAVA 生产者租塞模式的线程池 ThreadPoolExecutor