首页 > 代码库 > Yarn下Map数控制

Yarn下Map数控制

public List<InputSplit> getSplits(JobContext job) throws IOException {        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));        long maxSize = getMaxSplitSize(job);        List splits = new ArrayList();        List files = listStatus(job);        for (FileStatus file : files) {            Path path = file.getPath();            long length = file.getLen();            if (length != 0L) {                FileSystem fs = path.getFileSystem(job.getConfiguration());                BlockLocation[] blkLocations = fs.getFileBlockLocations(file,                        0L, length);                if (isSplitable(job, path)) {                    long blockSize = file.getBlockSize();                    long splitSize = computeSplitSize(blockSize, minSize,                            maxSize);                    long bytesRemaining = length;                    while (bytesRemaining / splitSize > 1.1D) {                        int blkIndex = getBlockIndex(blkLocations, length                                - bytesRemaining);                        splits.add(makeSplit(path, length - bytesRemaining,                                splitSize, blkLocations[blkIndex].getHosts()));                        bytesRemaining -= splitSize;                    }                    if (bytesRemaining != 0L) {                        int blkIndex = getBlockIndex(blkLocations, length                                - bytesRemaining);                        splits.add(makeSplit(path, length - bytesRemaining,                                bytesRemaining,                                blkLocations[blkIndex].getHosts()));                    }                } else {                    splits.add(makeSplit(path, 0L, length,                            blkLocations[0].getHosts()));                }            } else {                splits.add(makeSplit(path, 0L, length, new String[0]));            }        }        job.getConfiguration().setLong(                "mapreduce.input.fileinputformat.numinputfiles", files.size());        LOG.debug("Total # of splits: " + splits.size());        return splits;    }

 Yarn 下好像没了1*下的由用户设置预期的Map数

核心代码long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));getFormatMinSplitSize 默认返回1,getMinSplitSize 为用户设置的最小分片数, 如果用户设置的大于1,则为用户设置的最小分片数long maxSize = getMaxSplitSize(job);getMaxSplitSize为用户设置的最大分片数,默认最大为9223372036854775807Llong splitSize = computeSplitSize(blockSize, minSize,                            maxSize);protected long computeSplitSize(long blockSize, long minSize, long maxSize) {        return Math.max(minSize, Math.min(maxSize, blockSize));    }

 

测试 文件大小 297M(311349250)

块大小128M

测试代码

测试1

   FileInputFormat.setMinInputSplitSize(job, 301349250);
   FileInputFormat.setMaxInputSplitSize(job, 10000);

测试后Map个数为1,由上面分片公式算出分片大小为301349250, 比 311349250小, 理论应该为两个map,  再看分片函数

while (bytesRemaining / splitSize > 1.1D) {
                        int blkIndex = getBlockIndex(blkLocations, length
                                - bytesRemaining);
                        splits.add(makeSplit(path, length - bytesRemaining,
                                splitSize, blkLocations[blkIndex].getHosts()));

                        bytesRemaining -= splitSize;
                    }

只要剩余的文件大小不超过分片大小的1.1倍, 则会分到一个分片中,避免开两个MAP, 其中一个运行数据太小,浪费资源。

 

测试2

FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);

FileInputFormat.setMaxInputSplitSize(job, 10000);

MAP 数为2

测试3

在原有的输入目录下,添加一个很小的文件,几K,测试是否会合并

FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
FileInputFormat.setMaxInputSplitSize(job, 10000);

Map数变为了3

看源代码

for (FileStatus file : files) {

..

}

原来输入是按照文件名来分片的,这个按照常理也能知道, 不同的文件内容格式不同

 

总结,分片过程大概为,先遍历目标文件,过滤部分不符合要求的文件, 然后添加到列表,然后按照文件名来切分分片 (大小为前面计算分片大小的公式, 最后有个文件尾可能合并,其实常写网络程序的都知道), 然后添加到分片列表,然后每个分片读取自身对应的部分给MAP处理