首页 > 代码库 > java并行调度框架封装及示例

java并行调度框架封装及示例

参考资料:  阿里巴巴开源项目 CobarClient  源码实现。

分享作者:闫建忠 

分享时间:2014年5月7日

---------------------------------------------------------------------------------------

并行调度封装类设计: BXexample.java

package org.hdht.business.ordermanager.quartzjob;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.springframework.dao.ConcurrencyFailureException;

public class BXexample {

	 private static ExecutorService createCustomExecutorService(int poolSize, final String method) {
	        int coreSize = Runtime.getRuntime().availableProcessors();//返回系统CUP数量
	        if (poolSize < coreSize) {
	            coreSize = poolSize;
	        }
	        ThreadFactory tf = new ThreadFactory() {
	            public Thread newThread(Runnable r) {
	                Thread t = new Thread(r, "thread created at BXexample method [" + method + "]");
	                t.setDaemon(true);
	                return t;
	            }
	        };
	        BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>();
	        final ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, poolSize, 60,
	        		TimeUnit.SECONDS, queueToUse, tf, new ThreadPoolExecutor.CallerRunsPolicy());

	        return executor;
	 }

	public static <T> List<T> getSubListPage(List<T> list, int skip,int pageSize) {
		if (list == null || list.isEmpty()) {
			return null;
		}
		int startIndex = skip;
		int endIndex = skip + pageSize;
		if (startIndex > endIndex || startIndex > list.size()) {
			return null;
		}
		if (endIndex > list.size()) {
			endIndex = list.size();
		}
		return list.subList(startIndex, endIndex);
	}
	
	
	public static void BXfunction(Collection<?> paramCollection,final ExectueCallBack ecb){
		//构建执行器
		ExecutorService executor = createCustomExecutorService(Runtime.getRuntime().availableProcessors(), "batchExecuteProjection");
		try {
			//监视器
			final CountDownLatch latch = new CountDownLatch(paramCollection.size());
			final StringBuffer exceptionStaktrace = new StringBuffer();
			Iterator<?> iter = paramCollection.iterator();
			while (iter.hasNext()) {
				final Object entity = iter.next();
				Runnable task = new Runnable() {
					public void run() {
						try {
							ecb.doExectue(entity);
						} catch (Throwable t) {
							exceptionStaktrace.append(ExceptionUtils.getFullStackTrace(t));
						} finally {
							latch.countDown();
						}
					}
				};
				executor.execute(task);//并行调度
			}

			try {
				latch.await();//监视器等待所有线程执行完毕
			} catch (InterruptedException e) {
				//调度异常
				throw new ConcurrencyFailureException(
						"unexpected interruption when re-arranging parameter collection into sub-collections ",e);
			}
			if (exceptionStaktrace.length() > 0) {
				//业务异常
				throw new ConcurrencyFailureException(
						"unpected exception when re-arranging parameter collection, check previous log for details.\n"+ exceptionStaktrace);
			}
			
			
		} finally {
			executor.shutdown();//执行器关闭
		}
	}
	
	
}

回调接口类设计:ExectueCallBack.java

package org.hdht.business.ordermanager.quartzjob;

public interface ExectueCallBack {
	void doExectue(Object executor) throws Exception;
}



示例(hello 示例)

	public static void main(String[] args) {

		List<String> paramCollection  = new ArrayList<String>();
		paramCollection.add("9");
		paramCollection.add("2");
		paramCollection.add("18");
		paramCollection.add("7");
		paramCollection.add("6");
		paramCollection.add("1");
		paramCollection.add("3");
		paramCollection.add("4");
        paramCollection.add("14");
		paramCollection.add("13");
		
		int freesize = 3;//当前处理能力
		
		for(int i=0;i<paramCollection.size();i=i+freesize){
			
			List<String> tl = BXexample.getSubListPage(paramCollection, i, freesize);
			
			BXexample.BXfunction(tl,new ExectueCallBack() {
	            public void doExectue(Object executor) throws Exception {
	            	int k = Integer.parseInt((String)executor);

	            	for(int i=0;i<k*10000000;i++){
	                    //执行循环
	            	}
	            	System.out.println(k+":hello world");
	            }
	        });
			
		}
	}



示例(实际业务应用示例)

/**
		 * 并行调度相关处理
		 * 
		 * 按卫星*日期 ,将待处理的任务分解为 卫星+日期 粒度的子任务 添加到paramMapList列表中
		 */
		List<Map<String, Object>> paramMapList = new ArrayList<Map<String, Object>>();
		for (Iterator<OrderParamSatellite> iterator = paramSatellites.iterator(); iterator.hasNext();) {
			OrderParamSatellite paramSatellite = iterator.next();
			
			paramMapList.addAll(this.getParamMapList(paramSatellite));
		}



		//根据集群最大处理能力,分页处理任务列表,作为list截取的步长
		
		int fsize = HostServerQueue.getInstance().freeSize();
		for(int i=0;i<paramMapList.size();i=i+fsize){
			List<Map<String, Object>> tl = BXexample.getSubListPage(paramMapList, i,  fsize);
			//并行调度
			BXexample.BXfunction(tl,new ExectueCallBack(){
	            	public void doExectue(Object executor) throws Exception {
	            		ExecuteOrderBTask((Map<String, Object>)executor);
	            	}
	        });
			
			//动态查找空闲节点数量,即集群最大处理能力
			fsize = HostServerQueue.getInstance().freeSize();
		}