首页 > 代码库 > 继承FileInputFormat类来理解 FileInputFormat类

继承FileInputFormat类来理解 FileInputFormat类

import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.fs.BlockLocation;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.PathFilter;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputFormat;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat.MultiPathFilter;import org.apache.hadoop.mapreduce.security.TokenCache;import com.google.common.base.Charsets;public class MyFileinput extends FileInputFormat<LongWritable, Text> {	private static final PathFilter hiddenFileFilter = new PathFilter() {		public boolean accept(Path p) {			String name = p.getName();			return ((!(name.startsWith("_"))) && (!(name.startsWith("."))));		}	};		// 遍历文件列表, 过滤掉_ .开头的文件(可以自定义过滤)	protected List<FileStatus> listStatus(JobContext job) throws IOException {		System.out.println("*********************");		List result = new ArrayList();		Path[] dirs = getInputPaths(job);		System.out.println("dirs" + dirs);		System.out.println("dirs length = " + dirs.length);		for(Path p: dirs){			System.out.println("Path loop " + p);		}		if (dirs.length == 0) {			throw new IOException("No input paths specified in job");		}		TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,				job.getConfiguration());		List errors = new ArrayList();		List filters = new ArrayList();		filters.add(hiddenFileFilter);		PathFilter jobFilter = getInputPathFilter(job);		if (jobFilter != null) {			filters.add(jobFilter);		}		// 过滤函数,可以拓展		PathFilter inputFilter = new MultiPathFilter(filters);		for (int i = 0; i < dirs.length; ++i) {			Path p = dirs[i];			FileSystem fs = p.getFileSystem(job.getConfiguration());			FileStatus[] matches = fs.globStatus(p, inputFilter);			System.out.println("matches=" + matches);			for(FileStatus match: matches){				System.out.println("loop matches" + match.getPath());			}						if (matches == null)				errors.add(new IOException("Input path does not exist: " + p));			else if (matches.length == 0)				errors.add(new IOException("Input Pattern " + p						+ " matches 0 files"));			else {				for (FileStatus globStat : matches) {					System.out.println("globStat " + globStat);					if (globStat.isDirectory())						for (FileStatus stat : fs.listStatus(								globStat.getPath(), inputFilter)) {							result.add(stat);						}					else {						result.add(globStat);					}				}			}		}		if (!(errors.isEmpty())) {			throw new InvalidInputException(errors);		}		// LOG.info("Total input paths to process : " + result.size());		return result;	}		// 计算分片大小,返回一个分片列表	public List<InputSplit> getSplits(JobContext job) throws IOException {		long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));		long maxSize = getMaxSplitSize(job);				System.out.print("minSize " + minSize);		System.out.print("maxSize " + maxSize);				List splits = new ArrayList();		// 获取输入目录下的文件列表(过滤文件)		List<FileStatus> files = listStatus(job);		for (FileStatus file : files) {			Path path = file.getPath();			long length = file.getLen();			System.out.println("path: " + path+ " file len = " + length);			if (length != 0L) {				// 通过路径找到块列表 				FileSystem fs = path.getFileSystem(job.getConfiguration());				BlockLocation[] blkLocations = fs.getFileBlockLocations(file,						0L, length);								if (isSplitable(job, path)) {					long blockSize = file.getBlockSize();					System.out.println("blockSize:" + blockSize);					long splitSize = computeSplitSize(blockSize, minSize,							maxSize);					System.out.println("splitSize :" + splitSize);					long bytesRemaining = length;					System.out.println("bytesRemaining :" + bytesRemaining);										System.out.println(bytesRemaining / splitSize);					// 定义为1.1D, 为避免一个分片过小, 也需要启动一个MAP来运行					// 最后剩余的文件大小只要不超过分片大小的1.1倍都会放入一个分片					while (bytesRemaining / splitSize > 1.1D) {						int blkIndex = getBlockIndex(blkLocations, length								- bytesRemaining);						System.out.println("blkIndex :" + blkIndex);												// 添加到分片分片列表						splits.add(makeSplit(path, length - bytesRemaining,								splitSize, blkLocations[blkIndex].getHosts()));						bytesRemaining -= splitSize;					}										// 文件尾 					if (bytesRemaining != 0L) {						Long remain = length - bytesRemaining;						System.out.println("文件尾大小" + bytesRemaining);						int blkIndex = getBlockIndex(blkLocations, length								- bytesRemaining);						splits.add(makeSplit(path, length - bytesRemaining,								bytesRemaining,								blkLocations[blkIndex].getHosts()));					}				} else {					splits.add(makeSplit(path, 0L, length,							blkLocations[0].getHosts()));				}			} else {				// 测试文件大小为0, 也会启动一个map				splits.add(makeSplit(path, 0L, length, new String[0]));			}		}					job.getConfiguration().setLong(				"mapreduce.input.fileinputformat.numinputfiles", files.size());		// LOG.debug("Total # of splits: " + splits.size());		return splits;	}	private static class MultiPathFilter implements PathFilter {		private List<PathFilter> filters;		public MultiPathFilter(List<PathFilter> filters) {			this.filters = filters;		}		public boolean accept(Path path) {			for (PathFilter filter : this.filters) {				if (!(filter.accept(path))) {					return false;				}			}			return true;		}	}		// 文件内容读取, 默认按行读取 	@Override	public RecordReader<LongWritable, Text> createRecordReader(			InputSplit split, TaskAttemptContext context) {		String delimiter = context.getConfiguration().get(				"textinputformat.record.delimiter");				System.out.println("delimiter ==" + delimiter);		// 默认为空		byte[] recordDelimiterBytes = null;		if (null != delimiter)			recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);				return new LineRecordReader(recordDelimiterBytes);	}}

主要功能是计算分片和按照分片给MAP任务读取内容

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext paramJobContext)
            throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(
            InputSplit paramInputSplit,
            TaskAttemptContext paramTaskAttemptContext) throws IOException,
            InterruptedException;
}

从顶层的派生类提供的接口差不多也能看出来。