首页 > 代码库 > ${mapred.local.dir}选择策略--Map Task存放中间结果
${mapred.local.dir}选择策略--Map Task存放中间结果
上篇说了block在DataNode配置有多个${dfs.data.dir}时的存储策略,本文主要介绍TaskTracker在配置有多个${mapred.local.dir}时的选择策略。
1 mapred-site.xml2 <property>3 <name>mapred.local.dir</name>4 <value>/mnt/localdir1/local,/mnt/localdir2/local,/mnt/localdir3/local</value>5 </property>
当${mapred.local.dir}配置有多个目录分别用来挂载不同的硬盘时,Map Task的结果应该存放在哪个目录中?首先还是看一下方法的调用层次,如下图所示:
下面分析这两个方法:
1 /** Get a path from the local FS. If size is known, we go 2 * round-robin over the set of disks (via the configured dirs) and return 3 * the first complete path which has enough space. 4 * 5 * If size is not known, use roulette selection -- pick directories 6 * with probability proportional to their available space. 7 */ 8 public synchronized 9 Path getLocalPathForWrite(String pathStr, long size, 10 Configuration conf, boolean checkWrite11 ) throws IOException {12 //检查task目录是否有变化13 confChanged(conf);14 int numDirs = localDirsPath.length; //获取${mapred.local.dir}目录的个数15 int numDirsSearched = 0; //表示已经搜索过的次数16 //remove the leading slash from the path (to make sure that the uri17 //resolution results in a valid path on the dir being checked)18 if (pathStr.startsWith("/")) { //是指output/spill0.out文件19 pathStr = pathStr.substring(1);20 }21 Path returnPath = null;22 Path path = new Path(pathStr);23 24 //当要写入的数据量大小未知时25 if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability 26 //proportional to available size27 long[] availableOnDisk = new long[dirDF.length];28 long totalAvailable = 0;29 30 //build the "roulette wheel"31 for(int i =0; i < dirDF.length; ++i) { 32 //分别计算每一个${mapred.local.dir}目录可用大小,并计算总的可用大小33 availableOnDisk[i] = dirDF[i].getAvailable();34 totalAvailable += availableOnDisk[i];35 }36 37 // Keep rolling the wheel till we get a valid path38 Random r = new java.util.Random();39 while (numDirsSearched < numDirs && returnPath == null) {40 long randomPosition = Math.abs(r.nextLong()) % totalAvailable;41 int dir = 0;42 while (randomPosition > availableOnDisk[dir]) {43 randomPosition -= availableOnDisk[dir];44 dir++;45 }46 dirNumLastAccessed = dir; //表示上次访问过的目录47 //从${mapred.local.dir}中选择一个目录,在其下创建output/spill0.out文件48 returnPath = createPath(path, checkWrite); 49 if (returnPath == null) { //如果创建失败(可能存在disk read-only的情况)50 totalAvailable -= availableOnDisk[dir];51 availableOnDisk[dir] = 0; // skip this disk52 numDirsSearched++;53 }54 }55 } else { //写入的数据量已知56 while (numDirsSearched < numDirs && returnPath == null) {57 long capacity = dirDF[dirNumLastAccessed].getAvailable();58 if (capacity > size) {59 returnPath = createPath(path, checkWrite);60 }61 //使用轮流的方式来选择${mapred.local.dir}62 dirNumLastAccessed++;63 dirNumLastAccessed = dirNumLastAccessed % numDirs; 64 numDirsSearched++;65 } 66 }67 if (returnPath != null) {68 return returnPath;69 }70 71 //no path found72 throw new DiskErrorException("Could not find any valid local " +73 "directory for " + pathStr);74 }
confChanged(conf)方法首先检查原来的目录配置是否改变,这个下面说;然后给numDirs赋值,它表示总的${mapred.local.dir}目录个数,localDirsPath数组变量在confChanged(conf)方法中被更新了;接着在准备创建output/spill0.out文件,这个文件就是Map Task的运算结果在缓冲区写满之后spill到disk生成的文件,序号0代表序号,最后会将多个spill文件合成一个file.out文件;接下来就要选择${mapred.local.dir}目录了。其过程如下:
1、如果要写入的数据量大小未知时:
a) 计算dirDF数组中每个元素的剩余大小,并计算所有元素的总大小totalAvailable;
b) (循环)生成一个Long类型随机正数,该随机数对总大小totalAvailable取余后得randomPosition。
(内层循环)若randomPosition > 某个disk剩余量,则randomPosition减去该disk剩余量,并与下一个disk剩余量比较……
c) 选择了某个disk之后,如果这个disk不能创建文件,则排除这个disk,重新选择disk(总共尝试localDirsPath.length次)
2、要写入的数据量大小已知时:将${mapred.local.dir}组织成一个数组,轮流的使用数组中的目录。dirNumLastAccessed表示上次访问过的目录;
下面反过来分析下confChanged()方法。
实际上该方法中的获取到的localDirs数组所代表的目录,是Map Task或Reduce Task的工作目录。因为每次不同的
${mapred.local.dir}选择策略--Map Task存放中间结果