首页 > 代码库 > [hadoop]Hadoop源码分析-Context


  学编程第一个肯定是hello world,Hadoop也不例外,它的hello world就是Wordcount,单词统计例子

 1 package org.apache.hadoop.examples; 2  3 import java.io.IOException; 4 import java.util.StringTokenizer; 5  6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text;10 import org.apache.hadoop.mapreduce.Job;11 import org.apache.hadoop.mapreduce.Mapper;12 import org.apache.hadoop.mapreduce.Reducer;13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;15 import org.apache.hadoop.util.GenericOptionsParser;16 17 public class WordCount {18 19   public static class TokenizerMapper 20        extends Mapper<Object, Text, Text, IntWritable>{21     22     private final static IntWritable one = new IntWritable(1);23     private Text word = new Text();24       25     public void map(Object key, Text value, Context context26                     ) throws IOException, InterruptedException {27       StringTokenizer itr = new StringTokenizer(value.toString());28       while (itr.hasMoreTokens()) {29         word.set(itr.nextToken());30         context.write(word, one);31       }32     }33   }34   35   public static class IntSumReducer 36        extends Reducer<Text,IntWritable,Text,IntWritable> {37     private IntWritable result = new IntWritable();38 39     public void reduce(Text key, Iterable<IntWritable> values, 40                        Context context41                        ) throws IOException, InterruptedException {42       int sum = 0;43       for (IntWritable val : values) {44         sum += val.get();45       }46       result.set(sum);47       context.write(key, result);48     }49   }50 51   public static void main(String[] args) throws Exception {52     Configuration conf = new Configuration();53     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();54     if (otherArgs.length != 2) {55       System.err.println("Usage: wordcount <in> <out>");56       System.exit(2);57     }58     Job job = new Job(conf, "word count");59     job.setJarByClass(WordCount.class);60     job.setMapperClass(TokenizerMapper.class);61     job.setCombinerClass(IntSumReducer.class);62     job.setReducerClass(IntSumReducer.class);63     job.setOutputKeyClass(Text.class);64     job.setOutputValueClass(IntWritable.class);65     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));66     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));67     System.exit(job.waitForCompletion(true) ? 0 : 1);68   }69 }


 1 public void map(Object key, Text value, Context context) 2                 throws OException,InterruptedException{ 3     StringTokenizer itr = new StringTokenizer(value.toString()); 4     while (itr.hasMoreTokens()) { 5         word.set(itr.nextToken()); 6         context.write(word, one); 7     } 8 } 9 10 public void reduce(Text key, Iterable<IntWritable> values,Context context)    11                    throws IOException, InterruptedException {12     int sum = 0;13     for (IntWritable val : values) {14         sum += val.get();15     }16     result.set(sum);17     context.write(key, result);18 }



context.write(word, one);


context.write(key, result);



 1 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { 2  3   public class Context  4     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 5     public Context(Configuration conf, TaskAttemptID taskid, 6                    RecordReader<KEYIN,VALUEIN> reader, 7                    RecordWriter<KEYOUT,VALUEOUT> writer, 8                    OutputCommitter committer, 9                    StatusReporter reporter,10                    InputSplit split) throws IOException, InterruptedException {11       super(conf, taskid, reader, writer, committer, reporter, split);12     }13   }14   15   /**16    * Called once at the beginning of the task.17    */18   protected void setup(Context context19                        ) throws IOException, InterruptedException {20     // NOTHING21   }22 23   /**24    * Called once for each key/value pair in the input split. Most applications25    * should override this, but the default is the identity function.26    */27   @SuppressWarnings("unchecked")28   protected void map(KEYIN key, VALUEIN value, 29                      Context context) throws IOException, InterruptedException {30     context.write((KEYOUT) key, (VALUEOUT) value);31   }32 33   /**34    * Called once at the end of the task.35    */36   protected void cleanup(Context context37                          ) throws IOException, InterruptedException {38     // NOTHING39   }40   41   /**42    * Expert users can override this method for more complete control over the43    * execution of the Mapper.44    * @param context45    * @throws IOException46    */47   public void run(Context context) throws IOException, InterruptedException {48     setup(context);49     try {50       while (context.nextKeyValue()) {51         map(context.getCurrentKey(), context.getCurrentValue(), context);52       }53     } finally {54       cleanup(context);55     }56   }57 }
 1 package org.apache.hadoop.mapreduce; 2  3 import java.io.IOException; 4  5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.io.RawComparator; 7 import org.apache.hadoop.mapred.RawKeyValueIterator; 8  <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p> 9  10 public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {11 12   public class Context 13     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {14     public Context(Configuration conf, TaskAttemptID taskid,15                    RawKeyValueIterator input, 16                    Counter inputKeyCounter,17                    Counter inputValueCounter,18                    RecordWriter<KEYOUT,VALUEOUT> output,19                    OutputCommitter committer,20                    StatusReporter reporter,21                    RawComparator<KEYIN> comparator,22                    Class<KEYIN> keyClass,23                    Class<VALUEIN> valueClass24                    ) throws IOException, InterruptedException {25       super(conf, taskid, input, inputKeyCounter, inputValueCounter,26             output, committer, reporter, 27             comparator, keyClass, valueClass);28     }29   }30 31   /**32    * Called once at the start of the task.33    */34   protected void setup(Context context35                        ) throws IOException, InterruptedException {36     // NOTHING37   }38 39   @SuppressWarnings("unchecked")40   protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context41                         ) throws IOException, InterruptedException {42     for(VALUEIN value: values) {43       context.write((KEYOUT) key, (VALUEOUT) value);44     }45   }46   protected void cleanup(Context context47                          ) throws IOException, InterruptedException {48     // NOTHING49   }50   public void run(Context context) throws IOException, InterruptedException {51     setup(context);52     try {53       while (context.nextKey()) {54         reduce(context.getCurrentKey(), context.getValues(), context);55       }56     } finally {57       cleanup(context);58     }59   }60 }


 1 public class Context  2     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 3     public Context(Configuration conf, TaskAttemptID taskid, 4                    RecordReader<KEYIN,VALUEIN> reader, 5                    RecordWriter<KEYOUT,VALUEOUT> writer, 6                    OutputCommitter committer, 7                    StatusReporter reporter, 8                    InputSplit split) throws IOException, InterruptedException { 9       super(conf, taskid, reader, writer, committer, reporter, split);10     }11 }
 1 public class Context  2     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 3     public Context(Configuration conf, TaskAttemptID taskid, 4                    RawKeyValueIterator input,  5                    Counter inputKeyCounter, 6                    Counter inputValueCounter, 7                    RecordWriter<KEYOUT,VALUEOUT> output, 8                    OutputCommitter committer, 9                    StatusReporter reporter,10                    RawComparator<KEYIN> comparator,11                    Class<KEYIN> keyClass,12                    Class<VALUEIN> valueClass13                    ) throws IOException, InterruptedException {14       super(conf, taskid, input, inputKeyCounter, inputValueCounter,15             output, committer, reporter, 16             comparator, keyClass, valueClass);17     }18 }


  1 package org.apache.hadoop.mapreduce;  2   3 import java.io.IOException;  4   5 import org.apache.hadoop.conf.Configuration;  6   7   8 public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>   9   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 10   private RecordReader<KEYIN,VALUEIN> reader; 11   private InputSplit split; 12  13   public MapContext(Configuration conf, TaskAttemptID taskid, 14                     RecordReader<KEYIN,VALUEIN> reader, 15                     RecordWriter<KEYOUT,VALUEOUT> writer, 16                     OutputCommitter committer, 17                     StatusReporter reporter, 18                     InputSplit split) { 19     super(conf, taskid, writer, committer, reporter); 20     this.reader = reader; 21     this.split = split; 22   } 23  24    25   public InputSplit getInputSplit() { 26     return split; 27   } 28  29   public KEYIN getCurrentKey() throws IOException, InterruptedException { 30     return reader.getCurrentKey(); 31   } 32  33   public VALUEIN getCurrentValue() throws IOException, InterruptedException { 34     return reader.getCurrentValue(); 35   } 36  37   public boolean nextKeyValue() throws IOException, InterruptedException { 38     return reader.nextKeyValue(); 39   } 40  41 } 42   43   44 package org.apache.hadoop.mapreduce; 45  46 import java.io.IOException; 47 import java.util.Iterator; 48 import java.util.NoSuchElementException; 49  50 import org.apache.hadoop.conf.Configuration; 51 import org.apache.hadoop.io.BytesWritable; 52 import org.apache.hadoop.io.DataInputBuffer; 53 import org.apache.hadoop.io.RawComparator; 54 import org.apache.hadoop.io.serializer.Deserializer; 55 import org.apache.hadoop.io.serializer.SerializationFactory; 56 import org.apache.hadoop.mapred.RawKeyValueIterator; 57 import org.apache.hadoop.util.Progressable; 58  59  60 public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 61     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 62   private RawKeyValueIterator input; 63   private Counter inputKeyCounter; 64   private Counter inputValueCounter; 65   private RawComparator<KEYIN> comparator; 66   private KEYIN key;                                  // current key 67   private VALUEIN value;                              // current value 68   private boolean firstValue = http://www.mamicode.com/false;                 // first value in key 69   private boolean nextKeyIsSame = false;              // more w/ this key 70   private boolean hasMore;                            // more in file 71   protected Progressable reporter; 72   private Deserializer<KEYIN> keyDeserializer; 73   private Deserializer<VALUEIN> valueDeserializer; 74   private DataInputBuffer buffer = new DataInputBuffer(); 75   private BytesWritable currentRawKey = new BytesWritable(); 76   private ValueIterable iterable = new ValueIterable(); 77  78   public ReduceContext(Configuration conf, TaskAttemptID taskid, 79                        RawKeyValueIterator input,  80                        Counter inputKeyCounter, 81                        Counter inputValueCounter, 82                        RecordWriter<KEYOUT,VALUEOUT> output, 83                        OutputCommitter committer, 84                        StatusReporter reporter, 85                        RawComparator<KEYIN> comparator, 86                        Class<KEYIN> keyClass, 87                        Class<VALUEIN> valueClass 88                        ) throws InterruptedException, IOException{ 89     super(conf, taskid, output, committer, reporter); 90     this.input = input; 91     this.inputKeyCounter = inputKeyCounter; 92     this.inputValueCounter = inputValueCounter; 93     this.comparator = comparator; 94     SerializationFactory serializationFactory = new SerializationFactory(conf); 95     this.keyDeserializer = serializationFactory.getDeserializer(keyClass); 96     this.keyDeserializer.open(buffer); 97     this.valueDeserializer = serializationFactory.getDeserializer(valueClass); 98     this.valueDeserializer.open(buffer); 99     hasMore = input.next();100   }101 102   /** Start processing next unique key. */103   public boolean nextKey() throws IOException,InterruptedException {104     while (hasMore && nextKeyIsSame) {105       nextKeyValue();106     }107     if (hasMore) {108       if (inputKeyCounter != null) {109         inputKeyCounter.increment(1);110       }111       return nextKeyValue();112     } else {113       return false;114     }115   }116 117   118   public boolean nextKeyValue() throws IOException, InterruptedException {119     if (!hasMore) {120       key = null;121       value = http://www.mamicode.com/null;122       return false;123     }124     firstValue = http://www.mamicode.com/!nextKeyIsSame;125     DataInputBuffer next = input.getKey();126     currentRawKey.set(next.getData(), next.getPosition(), 127                       next.getLength() - next.getPosition());128     buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());129     key = keyDeserializer.deserialize(key);130     next = input.getValue();131     buffer.reset(next.getData(), next.getPosition(),132         next.getLength() - next.getPosition());133     value =http://www.mamicode.com/ valueDeserializer.deserialize(value);134     hasMore = input.next();135     if (hasMore) {136       next = input.getKey();137       nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 138                                          currentRawKey.getLength(),139                                          next.getData(),140                                          next.getPosition(),141                                          next.getLength() - next.getPosition()142                                          ) == 0;143     } else {144       nextKeyIsSame = false;145     }146     inputValueCounter.increment(1);147     return true;148   }149 150   public KEYIN getCurrentKey() {151     return key;152   }153 154   public VALUEIN getCurrentValue() {155     return value;156   }157 158   protected class ValueIterator implements Iterator<VALUEIN> {159 160     public boolean hasNext() {161       return firstValue || nextKeyIsSame;162     }163 164     @Override165     public VALUEIN next() {166       // if this is the first record, we don‘t need to advance167       if (firstValue) {168         firstValue = http://www.mamicode.com/false;169         return value;170       }171       // if this isn‘t the first record and the next key is different, they172       // can‘t advance it here.173       if (!nextKeyIsSame) {174         throw new NoSuchElementException("iterate past last value");175       }176       // otherwise, go to the next key/value pair177       try {178         nextKeyValue();179         return value;180       } catch (IOException ie) {181         throw new RuntimeException("next value iterator failed", ie);182       } catch (InterruptedException ie) {183         // this is bad, but we can‘t modify the exception list of java.util184         throw new RuntimeException("next value iterator interrupted", ie);        185       }186     }187 188     public void remove() {189       throw new UnsupportedOperationException("remove not implemented");190     }191     192   }193 194   protected class ValueIterable implements Iterable<VALUEIN> {195     private ValueIterator iterator = new ValueIterator();196     @Override197     public Iterator<VALUEIN> iterator() {198       return iterator;199     } 200   }201   202   public 203   Iterable<VALUEIN> getValues() throws IOException, InterruptedException {204     return iterable;205   }206 }










有几个抽象方法:getCurrentKey() 、getCurrentValue() 、nextKeyValue() ,这是MapContext、ReduceContext共同的几个方法,务必需要MapContext与ReduceContext重新实现。write(KEYOUT key, VALUEOUT value) 则是把键值对写入DataOutput数据流中。在MapReduce编程过程中,不需要管理底层的数据流传输,write已经封装好了,直接调用即可写入流中。然后Hadoop会传输到下一步处理的环节。

从前面Mapper.Context、 Reducer.Context、MapContext、ReduceContext、TaskInputOutputContext、TaskAttemptContext均没有添加任何成员变量,都是使用祖先JobContext的成员变量,而JobContext的成员变量汇总如下:


  1 package org.apache.hadoop.mapreduce;  2   3 import java.io.IOException;  4   5 import org.apache.hadoop.conf.Configuration;  6 import org.apache.hadoop.fs.Path;  7 import org.apache.hadoop.io.RawComparator;  8 import org.apache.hadoop.mapreduce.Mapper;  9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 11 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 12 import org.apache.hadoop.security.Credentials; 13 import org.apache.hadoop.security.UserGroupInformation; 14  15 /** 16  * A read-only view of the job that is provided to the tasks while they 17  * are running. 18  */ 19 public class JobContext { 20   // Put all of the attribute names in here so that Job and JobContext are 21   // consistent. 22   protected static final String INPUT_FORMAT_CLASS_ATTR =  23     "mapreduce.inputformat.class"; 24   protected static final String MAP_CLASS_ATTR = "mapreduce.map.class"; 25   protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class"; 26   protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class"; 27   protected static final String OUTPUT_FORMAT_CLASS_ATTR =  28     "mapreduce.outputformat.class"; 29   protected static final String PARTITIONER_CLASS_ATTR =  30     "mapreduce.partitioner.class"; 31  32   protected final org.apache.hadoop.mapred.JobConf conf; 33   protected final Credentials credentials; 34   private JobID jobId; 35  36   public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers"; 37  38   public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; 39   public static final String JOB_ACL_MODIFY_JOB = 40     "mapreduce.job.acl-modify-job"; 41  42   public static final String CACHE_FILE_VISIBILITIES =  43     "mapreduce.job.cache.files.visibilities"; 44   public static final String CACHE_ARCHIVES_VISIBILITIES =  45     "mapreduce.job.cache.archives.visibilities"; 46    47   public static final String JOB_CANCEL_DELEGATION_TOKEN =  48     "mapreduce.job.complete.cancel.delegation.tokens"; 49   public static final String USER_LOG_RETAIN_HOURS =  50     "mapred.userlog.retain.hours"; 51    52   /** 53    * The UserGroupInformation object that has a reference to the current user 54    */ 55   protected UserGroupInformation ugi; 56    57   public JobContext(Configuration conf, JobID jobId) { 58     this.conf = new org.apache.hadoop.mapred.JobConf(conf); 59     this.credentials = this.conf.getCredentials(); 60     this.jobId = jobId; 61     try { 62       this.ugi = UserGroupInformation.getCurrentUser(); 63     } catch (IOException e) { 64       throw new RuntimeException(e); 65     } 66   } 67  68   void setJobID(JobID jobId) { 69     this.jobId = jobId; 70   } 71  72   /** 73    * Return the configuration for the job. 74    * @return the shared configuration object 75    */ 76   public Configuration getConfiguration() { 77     return conf; 78   } 79  80   /** 81    * Get credentials for the job. 82    * @return credentials for the job 83    */ 84   public Credentials getCredentials() { 85     return credentials; 86   } 87  88   /** 89    * Get the unique ID for the job. 90    * @return the object with the job id 91    */ 92   public JobID getJobID() { 93     return jobId; 94   } 95    96   /** 97    * Get configured the number of reduce tasks for this job. Defaults to  98    * <code>1</code>. 99    * @return the number of reduce tasks for this job.100    */101   public int getNumReduceTasks() {102     return conf.getNumReduceTasks();103   }104   105   /**106    * Get the current working directory for the default file system.107    * 108    * @return the directory name.109    */110   public Path getWorkingDirectory() throws IOException {111     return conf.getWorkingDirectory();112   }113 114   /**115    * Get the key class for the job output data.116    * @return the key class for the job output data.117    */118   public Class<?> getOutputKeyClass() {119     return conf.getOutputKeyClass();120   }121   122   /**123    * Get the value class for job outputs.124    * @return the value class for job outputs.125    */126   public Class<?> getOutputValueClass() {127     return conf.getOutputValueClass();128   }129 130   /**131    * Get the key class for the map output data. If it is not set, use the132    * (final) output key class. This allows the map output key class to be133    * different than the final output key class.134    * @return the map output key class.135    */136   public Class<?> getMapOutputKeyClass() {137     return conf.getMapOutputKeyClass();138   }139 140   /**141    * Get the value class for the map output data. If it is not set, use the142    * (final) output value class This allows the map output value class to be143    * different than the final output value class.144    *  145    * @return the map output value class.146    */147   public Class<?> getMapOutputValueClass() {148     return conf.getMapOutputValueClass();149   }150 151   /**152    * Get the user-specified job name. This is only used to identify the 153    * job to the user.154    * 155    * @return the job‘s name, defaulting to "".156    */157   public String getJobName() {158     return conf.getJobName();159   }160 161   /**162    * Get the {@link InputFormat} class for the job.163    * 164    * @return the {@link InputFormat} class for the job.165    */166   @SuppressWarnings("unchecked")167   public Class<? extends InputFormat<?,?>> getInputFormatClass() 168      throws ClassNotFoundException {169     return (Class<? extends InputFormat<?,?>>) 170       conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);171   }172 173   /**174    * Get the {@link Mapper} class for the job.175    * 176    * @return the {@link Mapper} class for the job.177    */178   @SuppressWarnings("unchecked")179   public Class<? extends Mapper<?,?,?,?>> getMapperClass() 180      throws ClassNotFoundException {181     return (Class<? extends Mapper<?,?,?,?>>) 182       conf.getClass(MAP_CLASS_ATTR, Mapper.class);183   }184 185   /**186    * Get the combiner class for the job.187    * 188    * @return the combiner class for the job.189    */190   @SuppressWarnings("unchecked")191   public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 192      throws ClassNotFoundException {193     return (Class<? extends Reducer<?,?,?,?>>) 194       conf.getClass(COMBINE_CLASS_ATTR, null);195   }196 197   /**198    * Get the {@link Reducer} class for the job.199    * 200    * @return the {@link Reducer} class for the job.201    */202   @SuppressWarnings("unchecked")203   public Class<? extends Reducer<?,?,?,?>> getReducerClass() 204      throws ClassNotFoundException {205     return (Class<? extends Reducer<?,?,?,?>>) 206       conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);207   }208 209   /**210    * Get the {@link OutputFormat} class for the job.211    * 212    * @return the {@link OutputFormat} class for the job.213    */214   @SuppressWarnings("unchecked")215   public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 216      throws ClassNotFoundException {217     return (Class<? extends OutputFormat<?,?>>) 218       conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);219   }220 221   /**222    * Get the {@link Partitioner} class for the job.223    * 224    * @return the {@link Partitioner} class for the job.225    */226   @SuppressWarnings("unchecked")227   public Class<? extends Partitioner<?,?>> getPartitionerClass() 228      throws ClassNotFoundException {229     return (Class<? extends Partitioner<?,?>>) 230       conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);231   }232 233   /**234    * Get the {@link RawComparator} comparator used to compare keys.235    * 236    * @return the {@link RawComparator} comparator used to compare keys.237    */238   public RawComparator<?> getSortComparator() {239     return conf.getOutputKeyComparator();240   }241 242   /**243    * Get the pathname of the job‘s jar.244    * @return the pathname245    */246   public String getJar() {247     return conf.getJar();248   }249 250   /** 251    * Get the user defined {@link RawComparator} comparator for 252    * grouping keys of inputs to the reduce.253    * 254    * @return comparator set by the user for grouping values.255    * @see Job#setGroupingComparatorClass(Class) for details.  256    */257   public RawComparator<?> getGroupingComparator() {258     return conf.getOutputValueGroupingComparator();259   }260 }

绝大部分的成员变量是static final 变量,有预先设定的值或者直接在构造函数中赋值。基本不需要再改变的,JobContext也提供了返回成员变量的函数,譬如诸多的get**.



本文基于知识共享署名-非商业性使用 3.0 许可协议进行许可。欢迎转载、演绎,但是必须保留本文的署名林羽飞扬,若需咨询,请给我发信
