 1 package counter; 2  3 import java.net.URI; 4  5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text;10 import org.apache.hadoop.mapreduce.Counter;11 import org.apache.hadoop.mapreduce.Job;12 import org.apache.hadoop.mapreduce.Mapper;13 import org.apache.hadoop.mapreduce.Reducer;14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;19 20 public class WordCountApp {21     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";22     static final String OUT_PATH = "hdfs://hadoop:9000/out";23     24     public static void main(String[] args) throws Exception {25         26         Configuration conf = new Configuration();27         28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);29         final Path outPath = new Path(OUT_PATH);30         31         if(fileSystem.exists(outPath)){32             fileSystem.delete(outPath, true);33         }34         35         final Job job = new Job(conf , WordCountApp.class.getSimpleName());36         37         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里38         39         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对40         41         job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类42         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略43         job.setMapOutputValueClass(LongWritable.class);44         45         job.setPartitionerClass(HashPartitioner.class);//1.3 分区46         job.setNumReduceTasks(1);//有一个reduce任务运行47         48         job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类49         job.setOutputKeyClass(Text.class);//指定reduce的输出类型50         job.setOutputValueClass(LongWritable.class);51         52         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里53         54         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类55         56         job.waitForCompletion(true);//把job提交给JobTracker运行57     }58     59     /**60      * KEYIN    即k1        表示行的偏移量61      * VALUEIN    即v1        表示行文本内容62      * KEYOUT    即k2        表示行中出现的单词63      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值164      */65     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{66         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {67         //    final Counter helloCounter = context.getCounter("Sensitive Words", "hello");68             69             final String line = v1.toString();70         /*    if(line.contains("hello")){71                 //记录敏感词出现在一行中72                 helloCounter.increment(1L);73             }*/74             final String[] splited = line.split(" ");75             for (String word : splited) {76                 context.write(new Text(word), new LongWritable(1));77             }78         };79     }80     81     /**82      * KEYIN    即k2        表示行中出现的单词83      * VALUEIN    即v2        表示行中出现的单词的次数84      * KEYOUT    即k3        表示文本中出现的不同单词85      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数86      *87      */88     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{89         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {90             long times = 0L;91             for (LongWritable count : v2s) {92                 times += count.get();93             }94             ctx.write(k2, new LongWritable(times));95         };96     }97         98 }
2.1 MapReduce的任务与原理

2.1.1 MapReduce的工作原理


图 2.1

  在图中我们已看出,关于File有两种划分,一个是split分片一个是block,注意分片只是逻辑划分,并不是像划分block那样,将文件真是的划分为多个部分,他只是逻辑上的的划分,可以说是只是读取时候按分片来读取。关于分片的大小默认为块大小,为什么要这样呢?那因为MapReduce作业 处理的文件是存放在DataNode上的,而且文件在DataNode上是按block存放的,而不同的block可是存放在不同的DataNode上的,如果分片大小大于block块大小,那么说明一个块满足不 了该分片,那么就需要再读取一个block块,这样当这两个block块位于不同的DataNode上 时,就要通过网络访问另一个节点,这样就可能造成网络延迟影响Mapreduce的执行效率,所以一般分片大小会默认为block块大小。


图 2.2

2.1.2 map()和reduce的任务

  1) 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
  2) 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
  3) 对输出的key、value进行分区
  4) 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
  1) 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
  2) 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
2.2.1 第一项任务


1 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里2 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对       

代码 2.1


图 2.2


 1 * <code>InputFormat</code> describes the input-specification for a          InputFormat用来描述Map-Reduce的输入规格 2  * Map-Reduce job.  3  *  4  * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the  Map-reduce框架依赖于一个job的InputFormat 5  * job to:<p> 6  * <ol> 7  *   <li> 8  *   Validate the input-specification of the job.                  验证job的输入规格 9  *   <li>10  *   Split-up the input file(s) into logical {@link InputSplit}s, each of    把输入文件拆分成逻辑Inputsplit,每一个11  *   which is then assigned to an individual {@link Mapper}.           InputSplit都会被分配到一个独立的Mapper12  *   </li>13  *   <li>14  *   Provide the {@link RecordReader} implementation to be used to glean     提供实现类RcordReader,用于为Mapper任务,从逻辑InputSplit15  *   input records from the logical <code>InputSplit</code> for processing by  收集输入记录。16  *   the {@link Mapper}.17  *   </li>18  * </ol>
代码 2.2

 1 /**  2    * Logically split the set of input files for the job.                      为job逻辑切分输入文件  3    * @param context job configuration. 4    * @return an array of {@link InputSplit}s for the job. 5    */ 6  public abstract  7     List<InputSplit> getSplits(JobContext context 8                                ) throws IOException, InterruptedException; 9 10 /**11    * Create a record reader for a given split. The framework will call    为分片创建一个记录读取器12    * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before13    * the split is used.14    * @param split the split to be read15    * @param context the information about the task16    * @return a new record reader17    * @throws IOException18    * @throws InterruptedException19    */20   public abstract 21     RecordReader<K,V> createRecordReader(InputSplit split,22                                          TaskAttemptContext context23                                         ) throws IOException, 24                                                  InterruptedException;
代码 2.3


 1  /**  2    * Generate the list of files and make them into FileSplits.                 生成一个文件列表并创建FileSplits 3    */  4   public List<InputSplit> getSplits(JobContext job 5                                     ) throws IOException { 6     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //该值等于1 7     long maxSize = getMaxSplitSize(job); //该值等于263-1 8     // generate splits 9     List<InputSplit> splits = new ArrayList<InputSplit>();10     List<FileStatus> files = listStatus(job);  //读取文件夹下的所有文件11     for (FileStatus file: files) {      //遍历文件夹下的所有文件12       Path path = file.getPath();    //获取文件路径13       FileSystem fs = path.getFileSystem(job.getConfiguration());     //根据该路径获取文件系统14       long length = file.getLen();    //文件长度15       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);    //块位置16       if ((length != 0) && isSplitable(job, path)) {     //判断文件数量是否不为空且文件允许被拆分17         long blockSize = file.getBlockSize();18         long splitSize = computeSplitSize(blockSize, minSize, maxSize);  //计算分片大小,该分片大小和blockSize, minSize, maxSize有关系,默认为block块大小19         long bytesRemaining = length;  //文件初始长度20         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {    //分片21           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);22           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 23                                    blkLocations[blkIndex].getHosts()));24           bytesRemaining -= splitSize;25         }26         27         if (bytesRemaining != 0) {28           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 29                      blkLocations[blkLocations.length-1].getHosts()));30         }31       } else if (length != 0) {    //如果该文件不能够被切分就,就直接生成分片32         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));33       } else { 34         //Create empty hosts array for zero length files35         splits.add(new FileSplit(path, 0, length, new String[0]));36       }37     }38     // Save the number of input files in the job-conf39     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());40     LOG.debug("Total # of splits: " + splits.size());41     return splits;42   }
 代码 2.4

  注意,分片FileinputSplit只是逻辑划分,并不是像划分block那样,将文件真是的划分为多个部分,他只是逻辑上的的划分,可以说是只是读取时候按分片来读取,分片InputSplit大小默认为块大小,为什么要这样呢?那因为MapReduce作业 处理的文件是存放在datanode上的,而且文件在DataNode上是按block存放的,如果分片大小大于block块大小,那么说明一个块满足不了该分片需要再读取一个block块,而不同的block可是存放在不同的DataNode上的,这样当这两个block块位于不同的DataNode上时,就要通过网络访问另一个节点,这样就可能造成网络延迟影响Mapre-duce的执行效率,所以一般分片大小会默认为block块大小。

   我们知道FileInputFormat实现了,inputFormat的 getSplits()的抽象方法,那么另一个抽象方法createRecordReader由谁来实现呢,我们看一下该类的两个实现类FileIn putFormatTextInputFormat这两个实现类的源码,看一发现createRecordReader是在TextInputFormat这个实现类中实现的,我们看一下该类的源码如下代码2.5所示。

 1 /** An {@link InputFormat} for plain text files.  Files are broken into lines.      文件被解析成行 2  * Either linefeed or carriage-return are used to signal end of line.  Keys are    无论是换行符还是回车符都表示一行结束 3  * the position in the file, and values are the line of text.. */           键是该行在文件中的位置,值为该行的内容 4 public class TextInputFormat extends FileInputFormat<LongWritable, Text> { 5  6   @Override 7   public RecordReader<LongWritable, Text>  8     createRecordReader(InputSplit split, 9                        TaskAttemptContext context) {10     return new LineRecordReader();11   }12 13   @Override14   protected boolean isSplitable(JobContext context, Path file) {15     CompressionCodec codec = 16       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);17     if (null == codec) {18       return true;19     }20     return codec instanceof SplittableCompressionCodec;21   }22 23 }
  我们再分析一下createRecordReader()方法的返回值,他的返回值类型为RecordReader,返回值是new LineRecordReader (),而他继承了RecordReader,我们先看一下RecordReader源码如代码2.6所示。

 1 package org.apache.hadoop.mapreduce; 2  3 import java.io.Closeable; 4 import java.io.IOException; 5  6 /** 7  * The record reader breaks the data into key/value pairs for input to the       将数据解析成Mapper能够处理的键值对 8  * {@link Mapper}. 9  * @param <KEYIN>10  * @param <VALUEIN>11  */12 public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {13 14   /**15    * Called once at initialization.16    * @param split the split that defines the range of records to read17    * @param context the information about the task18    * @throws IOException19    * @throws InterruptedException20    */21   public abstract void initialize(InputSplit split,22                                   TaskAttemptContext context23                                   ) throws IOException, InterruptedException;24 25   /**26    * Read the next key, value pair.27    * @return true if a key/value pair was read28    * @throws IOException29    * @throws InterruptedException30    */31   public abstract boolean nextKeyValue() throws IOException, InterruptedException;32 33   public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;34   public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;35   public abstract float getProgress() throws IOException, InterruptedException;36   public abstract void close() throws IOException;37 }
代码 2.6

  从上面的代码中我们可以发现,RecordReader类是一个抽象类,其中的抽象方法initialize(),主要是用来将内容解析成键值对的,nextKeyValue() getCurrentKey()getCurrentValue() 主要是用来获取键值对的内容的,他们的使用方法如下面代码2.7所示。

1 while(xxx.nextKeyValue()){2         key=xxx.getCurrenKey();3         value=http://www.mamicode.com/xxx.getCurrentValue();4     }

 代码 2.7

    从RecordReader的类中回到 LineRecordReader类我们可以看到,该类对RecordReader类的三个抽象方法nextKeyValue(), getCurrentKey(),getCurrentValue()进行了实现,LineRecordReader类源码如代码2.8所示。

 1 public boolean nextKeyValue() throws IOException { 2     if (key == null) { 3       key = new LongWritable(); 4     } 5     key.set(pos);  //第一次调用时pos为零,key也就为零,key表示该行的偏移量 6     if (value =http://www.mamicode.com/= null) { 7       value = http://www.mamicode.com/new Text(); 8     } 9     int newSize = 0;  //表示当前读取的字节数10     // We always read one extra line, which lies outside the upper11     // split limit i.e. (end - 1)12     while (getFilePosition() <= end) {  //读取一行内容给value13       newSize = in.readLine(value, maxLineLength,14           Math.max(maxBytesToConsume(pos), maxLineLength));15       if (newSize == 0) {16         break;17       }18       pos += newSize;  //读取一行重置pos19       if (newSize < maxLineLength) {20         break;21       }22 23       // line too long. try again24       LOG.info("Skipped line of size " + newSize + " at pos " + 25                (pos - newSize));26     }27     if (newSize == 0) {28       key = null;29       value = http://www.mamicode.com/null;30       return false;31     } else {32       return true;33     }34   }
代码 2.8

  通过以上对TextInputFormat的一系列分析,我们可以知道文件是如何分片的,分片是如何被解析成键值对的。那么这些键值对是如何被提交到Mapper上的呢?我们一步步分析,首先我们知道,分片是被createRecordReader()解析成键值对的,他的返回值是new LineRecordReader (),代表被解析成的键值对,那么我们就分析一下 LineRecordRe ader和Mapper的关系。好那么我们就看一下,Mapper的源码,如代码2.9所示


 1 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { 2  3   public class Context  4     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 5     public Context(Configuration conf, TaskAttemptID taskid, 6                    RecordReader<KEYIN,VALUEIN> reader, 7                    RecordWriter<KEYOUT,VALUEOUT> writer, 8                    OutputCommitter committer, 9                    StatusReporter reporter,10                    InputSplit split) throws IOException, InterruptedException {11       super(conf, taskid, reader, writer, committer, reporter, split);12     }13   }14   15   /**16    * Called once at the beginning of the task.17    */18   protected void setup(Context context19                        ) throws IOException, InterruptedException {20     // NOTHING21   }22 23   /**24    * Called once for each key/value pair in the input split. Most applications25    * should override this, but the default is the identity function.26    */27   @SuppressWarnings("unchecked")28   protected void map(KEYIN key, VALUEIN value, 29                      Context context) throws IOException, InterruptedException {30     context.write((KEYOUT) key, (VALUEOUT) value);31   }32 33   /**34    * Called once at the end of the task.35    */36   protected void cleanup(Context context37                          ) throws IOException, InterruptedException {38     // NOTHING39   }40   41   /**42    * Expert users can override this method for more complete control over the43    * execution of the Mapper.44    * @param context45    * @throws IOException46    */47   public void run(Context context) throws IOException, InterruptedException {48     setup(context);49     while (context.nextKeyValue()) {50       map(context.getCurrentKey(), context.getCurrentValue(), context);51     }52     cleanup(context);53   }54 }
代码 2.9



 1 public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>  2   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 3   private RecordReader<KEYIN,VALUEIN> reader; 4   private InputSplit split; 5  6   public MapContext(Configuration conf, TaskAttemptID taskid, 7                     RecordReader<KEYIN,VALUEIN> reader, 8                     RecordWriter<KEYOUT,VALUEOUT> writer, 9                     OutputCommitter committer,10                     StatusReporter reporter,11                     InputSplit split) {12     super(conf, taskid, writer, committer, reporter);13     this.reader = reader;14     this.split = split;15   }16 17   /**18    * Get the input split for this map.19    */20   public InputSplit getInputSplit() {21     return split;22   }23 24   @Override25   public KEYIN getCurrentKey() throws IOException, InterruptedException {26     return reader.getCurrentKey();27   }28 29   @Override30   public VALUEIN getCurrentValue() throws IOException, InterruptedException {31     return reader.getCurrentValue();32   }33 34   @Override35   public boolean nextKeyValue() throws IOException, InterruptedException {36     return reader.nextKeyValue();37   }38 39 }40      
 代码 2.10


图 2.4



图 2.3


2.2.2 第二项任务


