首页 > 代码库 > Java:并行编程及同步使用方法

Java:并行编程及同步使用方法

  • 知道java可以使用java.util.concurrent包下的
CountDownLatch
ExecutorService
Future

Callable
实现并行编程,并在并行线程同步时,用起来十分简单的一种 。
实现原理:
1、CountDownLatch 统计并行线程完成数,并提供了await()方法,实现等待所有并行线程完成,或者指定最大等待时间。
2、ExecutorService提供了execute(Callable)执行线程方法,还提供了submit(Callable)提交线程。
3、Future接受实现Callable<V>接口的(可执行线程)返回值,接受Executors.submit(Callable<V>)返回值。而且Future<V>提供get()取回并行子线程返回的参数,还可以给get指定过期时间。

想到Concurrent,就能想到c#中,命名空间System.Collection,Concurrent,在该命名空间下提供了一些线程安全的集合类。

怎么使用:

定义可执行线程类:

public class UploadFileToTask implements Callable<UploadFileToTaskResult> {    private final Task_UploadFileToTaskItem taskItem;    private final Log log = LogHelper.getInstance(ImportMain.class);    private final CountDownLatch threadsSignal;    private final HDFSUtil hdfsUtil = new HDFSUtil();    private final static String HADOOP_HDFS_PATH = HdfsConfiguration.getHdfsUrl();    public UploadFileToTask(CountDownLatch threadsSignal ,Task_UploadFileToTaskItem taskItem){        this.taskItem=taskItem;        this.threadsSignal=threadsSignal;    }    @Override    public UploadFileToTaskResult call() throws Exception {        String area = taskItem.getArea();        String fileGenerateDate = taskItem.getFileGenerateDate();        String manufacturer = taskItem.getManufacturer();        String eNodeBId = taskItem.geteNodeBId();        String filePath = taskItem.getFilePath();        FileType fileType = taskItem.getFileType();        TaskStatus taskStatus= TaskStatus.Success;        // 不确定该FileSystem是否是线程安全的,故在每一个thread初始化一次。        Configuration conf = new Configuration();        Path dstPath = new Path(HADOOP_HDFS_PATH);        FileSystem hdfs = dstPath.getFileSystem(conf);        // 核心代码。。。        // 上传MR文件        // 上传Signal文件        // 如果文件路径不为空,就开始上传文件到hdfs        if(uploadFilePath.length()>0){            if (!hdfsUtil.uploadFileToHdfs(hdfs, filePath, uploadFilePath)) {                taskStatus= TaskStatus.Fail;            }        }        TaskGroupInfo taskGroupInfo = new TaskGroupInfo();        taskGroupInfo.setArea(area);        taskGroupInfo.setManufacturer(manufacturer);        taskGroupInfo.setFileGenerateDate(fileGenerateDate);        taskGroupInfo.setFileType(fileType);        String key = String.format("%s,%s,%s,%s", taskGroupInfo.getArea(), taskGroupInfo.getManufacturer(), taskGroupInfo.getFileGenerateDate(), String.valueOf(taskGroupInfo.getFileType().getValue()));        UploadFileToTaskResult result=new UploadFileToTaskResult();        // 填充返回值        result.setStatus(taskStatus);        result.setTaskGroupInfo(taskGroupInfo);        result.setTaskGroupkey(key);        result.setTaskOID(taskItem.getOid());        System.out.println("task id:" + taskItem.getOid() + " >>>>线程名称:" + Thread.currentThread().getName() + "结束. 还有" + threadsSignal.getCount() + " 个线程");        // 必须等核心处理逻辑处理完成后才可以减1        this.threadsSignal.countDown();        return result;    }}
  • 实现并行线程同步核心代码:
            // 获取当前节点带执行任务            ArrayList<Task_UploadFileToTaskItem> taskItems = uploadFileToTaskItemDao.getTopNTodoTaskItems(this.computeNode.getId(),Configuration.getTaskCount());            // 批量修改任务状态为正在处理状态(doing)。            log.info("Start:>>>>>>batch modify task status(doing)>>>>>>");            log.info("Over:>>>>>>batch modify task status(doing)>>>>>>");            // 批量处理上传任务(上传文件到)            log.info("Start:>>>>>>each process task(upload to)>>>>>>");
CountDownLatch threadsSignal
= new CountDownLatch(taskItems.size()); ExecutorService executor = Executors.newFixedThreadPool(taskItems.size()); List<Future<UploadFileToTaskResult>> resultLazyItems=new ArrayList<Future<UploadFileToTaskResult>>(); for (Task_UploadFileToTaskItem taskItem : taskItems) { // 使用future存储子线程执行后返回结果,必须在所有子线程都完成后才可以使用get(); // 如果在这里使用get(),会造成等待同步。 Future<UploadFileToTaskResult> future = executor.submit(new UploadFileToTask(threadsSignal,taskItem)); resultLazyItems.add(future); } // 等待所有并行子线程任务完成。 threadsSignal.await();
executor.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  log.info(
"Over:>>>>>>each process task(upload to)>>>>>>"); // 批量修改任务处理状态 Map<String, TaskGroupInfo> taskGroupItems=new HashMap<String, TaskGroupInfo>(); Map<Integer, TaskStatus> successTaskItems = new HashMap<Integer, TaskStatus>(); Map<Integer, TaskStatus> failTaskItems = new HashMap<Integer, TaskStatus>(); for(Future<UploadFileToTaskResult> future :resultLazyItems){ UploadFileToTaskResult result= future.get(); if(!taskGroupItems.containsKey(result.getTaskGroupkey())){ taskGroupItems.put(result.getTaskGroupkey(),result.getTaskGroupInfo()); } if(result.getStatus()== TaskStatus.Success){ successTaskItems.put(result.getTaskOID(),result.getStatus()); }else{ failTaskItems.put(result.getTaskOID(),result.getStatus()); } }
  •  参考资料:

http://blog.csdn.net/wangmuming/article/details/19832865

http://www.importnew.com/21312.html

Java:并行编程及同步使用方法