首页 > 代码库 > java线程模型Master-Worker
java线程模型Master-Worker
这样的模型是最经常使用的并行模式之中的一个,在Nginx源代码中有涉及到有想看的能够去这个大神的博客了解一下http://blog.csdn.net/marcky/article/details/6014733,这位大神写的有些简洁。
从思想的角度来说。它主要由两类进程进行协作:各自是Master进程和Worker进程。Master进程负责接受和分配任务,Worker进程负责处理子任务,当Worker将子任务处理完毕后。将结果返回给Master进程。由Master进程做归纳和汇总。得到终于结果。详细流程能够看此图
这样的模式可以将一个大任务分解成若干个小任务去运行,适合一些耗时比較久的任务,可以提高系统的吞吐量。
一个相对完整的模型应该具备下面功能
在借鉴了java性能优化书上的列子。上面实现了一个简单的Master-Worker模式
package com.thread; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; public class Master_Worker { public static void main(String args[]) { long start = System.currentTimeMillis(); Master_Worker master_Worker = new Master_Worker(new PlusWorker(), 11); for (int i = 0; i < 100; i++) { master_Worker.submit(i); } master_Worker.execute(); int re = 0; Map<String, Object> result_Map = master_Worker.getResultMap(); while (result_Map.size()>0||!master_Worker.isComplete()) { Set<String> keysSet = result_Map.keySet(); String keyString = null; for (String string : keysSet) { keyString = string; break; } Integer i = null; if (keyString !=null) { i = (Integer) result_Map.get(keyString); } if (i!=null) { re+=i; } if (keyString!=null) { result_Map.remove(keyString); } } long end = System.currentTimeMillis(); System.out.println("结果:"+re+"-运行之间"+(end-start)); int sum = 0; start = System.currentTimeMillis(); for (int i = 1; i <= 100; i++) { sum+=i*i*i; try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } end = System.currentTimeMillis(); System.out.println("结果:"+sum+"-运行之间"+(end-start)); } // 任务队列 protected Queue<Object> workerQueue = new ConcurrentLinkedDeque<>(); // Worker进程队列 protected Map<String, Thread> threadMap = new HashMap<>(); // 子任务处理结果集 protected Map<String, Object> resultMap = new ConcurrentHashMap<>(); // 是否全部的子任务都结束了 public boolean isComplete() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } // Master的构造,须要一个Worker进程逻辑,和须要的Worker进程数量 public Master_Worker(Worker woker, int countWorker) { woker.setWorkQueue(workerQueue); woker.setResultMap(resultMap); for (int i = 0; i < countWorker; i++) { threadMap.put(Integer.toString(i), new Thread(woker, Integer.toString(i))); } } //返回子任务结果集 public Map<String, Object> getResultMap() { return resultMap; } //提交任务 public void submit(Object job) { workerQueue.add(job); } public void execute() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { entry.getValue().start(); } } } } class Worker implements Runnable { // 任务队列,用于取得子任务 protected Queue<Object> workQueue; // 子任务处理结果集 protected Map<String, Object> resultMap; public void setWorkQueue(Queue<Object> workQueue) { this.workQueue = workQueue; } public void setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; } // 子任务处理逻辑,在子类中实现详细逻辑 public Object handle(Object input) { /* 这里能够写自己想要做的事情 */ return input; } @Override public void run() { // TODO Auto-generated method stub while (true) { // 获取子任务 Object inputObject = workQueue.poll(); if (inputObject == null) { break; } // 处理子任务 Object reObject = handle(inputObject); resultMap.put(Integer.toString(inputObject.hashCode()), reObject); } } } /* * 扩展自己的类 * */ class PlusWorker extends Worker{ @Override public Object handle(Object input) { // TODO Auto-generated method stub //在这里能够自己实现自己的业务逻辑等,在这里我让线程睡眠了100毫秒,模拟任务运行 try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Integer i = (Integer) input; return i*i*i; } }
这里的大多数都是借鉴java性能优化一书,加上自己的改编和简单介绍。
java线程模型Master-Worker
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。