首页 > 代码库 > Hadoop源码分析—— Job任务的程序入口

Hadoop源码分析—— Job任务的程序入口

这篇文章大致介绍Hadoop Job的程序是如何启动的。

 

通常用Java编写的Hadoop MapReduce程序是通过一个main方法作为程序的整个入口,如下:

public static void main(String[] args) throws Exception {     int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args);     System.exit(res);
}

可以看到这个Job任务的MapReduce实现类为CalculateSumJob,类的声明如下:

public class CalculateSumJob extends Configured implements Tool {...}

CalculateSumJob类继承了org.apache.hadoop.conf 包中的Configured类,而Configured类又实现了该包中的Configurable接口,主要是实现接口中的setConf和getConf方法,也就是设置或者读取Job任务的系统配置Configuration,Job的Configuration下一篇文章会详细讲。

同时CalculateSumJob类实现了org.apache.hadoop.util 包中的Tool接口,Tool类在继承Configurable接口的同时,自身也添加了run()方法,这个run()方法体现在main函数中的ToolRunner.run()这个语句。现在我们来看看这个ToolRunner是个什么东西,在org.apache.hadoop.util 包中找到ToolRunner类,实际上它就是Hadoop定义的一个类,专门用来run Hadoop Job的,这个类的核心代码如下:

  public static int run(Configuration conf, Tool tool, String[] args)     throws Exception{    if(conf == null) {      conf = new Configuration();    }    GenericOptionsParser parser = new GenericOptionsParser(conf, args);    //set the configuration back, so that Tool can configure itself    tool.setConf(conf);        //get the args w/o generic hadoop args    String[] toolArgs = parser.getRemainingArgs();    return tool.run(toolArgs);  }

结合main函数中的ToolRunner.run(new Configuration(), new CalculateSumJob(),args)来看,run方法将传入的new Configuration()配置信息通过setConf方法赋给CalculateSumJob这个类,同时调用了CalculateSumJob的run方法,CalculateSumJob需要自己实现Tool接口中的run方法,代码如下:

 public int run(String[] args) throws Exception {        Job job = new Job(getConf(), "CalculateSumJob");            job.setJarByClass(CalculateSumJob.class);            Configuration conf = job.getConfiguration();                String inputPath = conf.get("input");        String outputPath = conf.get("output");        int NumReduceTasks = Integer.parseInt(conf.get("num_reduce_tasks"));        job.setNumReduceTasks(NumReduceTasks);                FileInputFormat.addInputPath(job, new Path(inputPath));        FileOutputFormat.setOutputPath(job, new Path(outputPath));            job.setMapperClass(CalculateSumMapper.class);        job.setCombinerClass(CalculateSumCombiner.class);        job.setReducerClass(CalculateSumReducer.class);            job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);            job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);            System.exit(job.waitForCompletion(true) ? 0 : 1);        return 0;    }

这个run方法定义了Job的基本参数,包括获取配置信息、设置jar、设置Job的输入输出路径、Job的Mapper,Combiner,Reducer类,输出的key/value类型等等。最后通过job.waitForCompletion(true)来等待执行Job任务。

 

这些具体的参数设置将在接下来的文章中详细介绍。

 

最后附上这篇文章的CalculateSumJob程序代码:

package jixi.source.hadoop.job;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class CalculateSumJob extends Configured implements Tool {    public static class CalculateSumMapper extends        Mapper<LongWritable, Text, Text, Text> {    @Override    protected void map(LongWritable key, Text value, Context context)        throws IOException, InterruptedException {        Text outputKey = new Text();        Text outputValue = new Text();        String[] valueStr = value.toString().split("\t");        outputKey.set(valueStr[0]);        outputValue.set(valueStr[2]);        context.write(outputKey, outputValue);    }    }    public static class CalculateSumCombiner extends        Reducer<Text, Text, Text, Text> {    @Override    protected void reduce(Text key, Iterable<Text> values, Context context)        throws IOException, InterruptedException {        Text outputKey = new Text();        Text outputValue = new Text();        Double sum = new Double(0);        for (Text value : values) {        sum += Double.parseDouble(value.toString());        }        outputKey.set(key);        outputValue.set(sum.toString());        context.write(outputKey, outputValue);    }    }    public static class CalculateSumReducer extends        Reducer<Text, Text, Text, Text> {    @Override    protected void reduce(Text key, Iterable<Text> values, Context context)        throws IOException, InterruptedException {        Text outputKey = new Text();        Text outputValue = new Text();        Double sum = new Double(0);        for (Text value : values) {        sum += Double.parseDouble(value.toString());        }        outputKey.set(key);        outputValue.set(sum.toString()+"\t"+"null");        context.write(outputKey, outputValue);    }    }    public static void main(String[] args) throws Exception {        int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args);        System.exit(res);    }    public int run(String[] args) throws Exception {        Job job = new Job(getConf(), "CalculateSumJob");            job.setJarByClass(CalculateSumJob.class);            Configuration conf = job.getConfiguration();                String inputPath = conf.get("input");        String outputPath = conf.get("output");        int NumReduceTasks = Integer.parseInt(conf.get("num_reduce_tasks"));        job.setNumReduceTasks(NumReduceTasks);                FileInputFormat.addInputPath(job, new Path(inputPath));        FileOutputFormat.setOutputPath(job, new Path(outputPath));            job.setMapperClass(CalculateSumMapper.class);        job.setCombinerClass(CalculateSumCombiner.class);        job.setReducerClass(CalculateSumReducer.class);            job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);            job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);            System.exit(job.waitForCompletion(true) ? 0 : 1);        return 0;    }}
View Code