首页 > 代码库 > Mapreduce运行过程分析(基于Hadoop2.4)——(三)
Mapreduce运行过程分析(基于Hadoop2.4)——(三)
4.4 Reduce类
4.4.1 Reduce介绍
整完了Map,接下来就是Reduce了。YarnChild.main()—>ReduceTask.run()。ReduceTask.run方法開始和MapTask类似,包含initialize()初始化,依据情况看是否调用runJobCleanupTask(),runTaskCleanupTask()等。之后进入正式的工作,主要有这么三个步骤:Copy、Sort、Reduce。
4.4.2 Copy
Copy就是从运行各个Map任务的节点获取map的输出文件。这是由ReduceTask.ReduceCopier 类来负责。ReduceCopier对象负责将Map函数的输出拷贝至Reduce所在机器。假设大小超过一定阈值就写到磁盘,否则放入内存,在远程拷贝数据的同一时候,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多和磁盘文件过多。
Step1:
首先在ReduceTask的run方法中,通过例如以下配置来mapreduce.job.reduce.shuffle.consumer.plugin.class装配shuffle的plugin。默认的实现是Shuffle类:
1 Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); 7 shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); 9 LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
Step2:
初始化上述的plugin后,运行其run方法,得到RawKeyValueIterator的实例。
run方法的运行过程例如以下:
Step2.1:
量化Reduce的事件数目:
1 int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks()); 3 int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
Step2.2:
生成map的完毕状态获取线程,并启动此线程:
final EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, maxEventsToFetch); eventFetcher.start();
获取已经完毕的Map信息,如Map的host、mapId等放入ShuffleSchedulerImpl中的Set<MapHost>中便于以下进行数据的拷贝传输。
1 URI u = getBaseURI(reduceId, event.getTaskTrackerHttp()); 3 addKnownMapOutput(u.getHost() + ":" + u.getPort(), 5 u.toString(), 7 event.getTaskAttemptId()); 9 maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
Step2.3:
在Shuffle类中启动初始化Fetcher线程组,并启动:
1 boolean isLocal = localMapFiles != null; 2 3 final int numFetchers = isLocal ? 1 : 4 5 jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); 6 7 Fetcher<K,V>[] fetchers = new Fetcher[numFetchers]; 8 9 if (isLocal) { 10 11 fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler, 12 13 merger, reporter, metrics, this, reduceTask.getShuffleSecret(), 14 15 localMapFiles); 16 17 fetchers[0].start(); 18 19 } else { 20 21 for (int i=0; i < numFetchers; ++i) { 22 23 fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 24 25 reporter, metrics, this, 26 27 reduceTask.getShuffleSecret()); 28 29 fetchers[i].start(); 30 31 } 32 33 }
线程的run方法就是进行数据的远程拷贝:
1 try { 3 // If merge is on, block 5 merger.waitForResource(); 8 9 // Get a host to shuffle from 11 host = scheduler.getHost(); 13 metrics.threadBusy(); 17 // Shuffle 19 copyFromHost(host); 21 } finally { 23 if (host != null) { 25 scheduler.freeHost(host); 27 metrics.threadFree(); 29 } 31 }
Step2.4:
来看下这个copyFromHost方法。主要是就是使用HttpURLConnection,实现远程数据的传输。
建立连接之后,从接收到的Stream流中读取数据。每次读取一个map文件。
1 TaskAttemptID[] failedTasks = null; 2 3 while (!remaining.isEmpty() && failedTasks == null) { 4 5 failedTasks = copyMapOutput(host, input, remaining); 6 7 }
上面的copyMapOutput方法中,每次读取一个mapid,依据MergeManagerImpl中的reserve函数,检查map的输出是否超过了mapreduce.reduce.memory.totalbytes配置的大小,此配置的默认值
是当前Runtime的maxMemory*mapreduce.reduce.shuffle.input.buffer.percent配置的值,Buffer.percent的默认值为0.90。
假设mapoutput超过了此配置的大小时,生成一个OnDiskMapOutput实例。在接下来的操作中,map的输出写入到local暂时文件里。
假设没有超过此大小,生成一个InMemoryMapOutput实例。在接下来操作中,直接把map输出写入到内存。
最后,运行ShuffleScheduler.copySucceeded完毕文件的copy,调用mapout.commit函数,更新状态或者触发merge操作。
Step2.5:
等待上面全部的拷贝完毕之后,关闭相关的线程。
1 eventFetcher.shutDown(); 2 3 // Stop the map-output fetcher threads 4 for (Fetcher<K,V> fetcher : fetchers) { 5 fetcher.shutDown(); 6 } 7 8 // stop the scheduler 9 scheduler.close(); 10 11 copyPhase.complete(); // copy is already complete 12 taskStatus.setPhase(TaskStatus.Phase.SORT); 13 reduceTask.statusUpdate(umbilical);
Step2.6:
运行终于的merge操作,由Shuffle中的MergeManager完毕:
1 public RawKeyValueIterator close() throws Throwable { 2 3 // Wait for on-going merges to complete 4 5 if (memToMemMerger != null) { 6 7 memToMemMerger.close(); 8 9 } 10 11 inMemoryMerger.close(); 12 13 onDiskMerger.close(); 14 15 16 17 List<InMemoryMapOutput<K, V>> memory = 18 19 new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); 20 21 inMemoryMergedMapOutputs.clear(); 22 23 memory.addAll(inMemoryMapOutputs); 24 25 inMemoryMapOutputs.clear(); 26 27 List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs); 28 29 onDiskMapOutputs.clear(); 30 31 return finalMerge(jobConf, rfs, memory, disk); 32 33 }
Step3:
释放资源。
mapOutputFilesOnDisk.clear();
Copy完成。
4.4.3 Sort
Sort(事实上相当于合并)就相当于排序工作的一个延续,它会在全部的文件都拷贝完成后进行。使用工具类Merger归并全部的文件。经过此过程后,会产生一个合并了全部(全部并不准确)Map任务输出文件的新文件,而那些从其它各个server搞过来的 Map任务输出文件会删除。依据hadoop是否分布式来决定调用哪种排序方式。
在上面的4.3.2节中的Step2.4结束之后就会触发此操作。
4.4.4 Reduce
经过上面的步骤之后,回到ReduceTask中的run方法继续往下运行,调用runNewReducer。创建reducer:
1 org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = 2 3 (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>) 4 5 ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
并运行其run方法,此run方法就是我们的org.apache.hadoop.mapreduce.Reducer中的run方法。
1 public void run(Context context) throws IOException, InterruptedException { 2 3 setup(context); 4 5 try { 6 7 while (context.nextKey()) { 8 9 reduce(context.getCurrentKey(), context.getValues(), context); 10 11 // If a back up store is used, reset it 12 13 Iterator<VALUEIN> iter = context.getValues().iterator(); 14 15 if(iter instanceof ReduceContext.ValueIterator) { 16 17 ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); 18 19 } 20 21 } 22 23 } finally { 24 25 cleanup(context); 26 27 } 28 29 } 30 31 }
while的循环条件是ReduceContext.nextKey()为真,这种方法就在ReduceContext中实现的,这种方法的目的就是处理下一个唯一的key,由于reduce方法的输入数据是分组的,所以每次都会处理一个key及这个key相应的全部value,又由于已经将全部的Map Task的输出拷贝过来并且做了排序,所以key同样的KV对都是挨着的。
nextKey方法中,又会调用nextKeyValue方法来尝试去获取下一个key值,而且假设没数据了就会返回false,假设还有数据就返回true。防止获取反复的数据就在这里做的处理。
接下来就是调用用户自己定义的reduce方法了。
1 public void reduce(Text key, Iterable<IntWritable> values, 2 3 Context context 4 5 ) throws IOException, InterruptedException { 6 7 int sum = 0; 8 9 for (IntWritable val : values) { 10 11 sum += val.get(); 12 13 } 14 15 result.set(sum); 16 17 context.write(key, result); 18 19 }
-------------------------------------------------------------------------------
假设您看了本篇博客,认为对您有所收获,请点击右下角的 [推荐]
假设您想转载本博客,请注明出处
假设您对本文有意见或者建议,欢迎留言
感谢您的阅读,请关注我的兴许博客
Mapreduce运行过程分析(基于Hadoop2.4)——(三)