首页 > 代码库 > mapreduce运行流程总结

mapreduce运行流程总结

     先上图,下图描绘了一个mapreduce程序的的一般运行过程和需要经过的几个阶段

技术分享

  大体上我们可以将mapreduce程序划分为inputformat ,map ,shuffle,reduce,outputformat五个阶段,下面我们会详细介绍各个阶段的具体的运行细节

  以最简单的wordcount程序为例,本例使用基于hadoop2.6的环境,一般的api都使用mapreudce下的,注意不要使用mapred下的api可能会引起未知错误

 

   惯例hello word程序

driver类,负责构建mapredue任务,设置job的名称,指定任务的输入文件并设置相应的读取类,map处理类,reduce处理类,输出文件路径,mapreduce程序从driver类开始,程序运行时会根据Configuration读取到配置和yarn通信,申请运行任务的资源,申请资源之后就开始将jar包发送到yarn的各个节点执行map和reudce任务

 1 package mapreduce.wordcount;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
12 
13 import java.io.IOException;
14 
15 /**
16  * Created by teaegg on 2016/11/21.
17  */
18 public class WordcountDriver {
19     public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
20         runjob(args);
21     }
22 
23 
24     public static void runjob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
25         Configuration conf = new Configuration();//创建任务的配置对象,会自动加载hadoop的的配置文件
26         //Job job = new Job(conf);是废弃的写法,虽然依然可以使用
27         Job job  = Job.getInstance(conf);
28         job.setJobName("wordcount");//设置任务的名称
29 
30         //hadoop中对java一般的常用的数据类型做了封装形成了mapreduce专用的数据类型
31         job.setOutputKeyClass(Text.class);//Text.class对应了String类型
32         job.setOutputValueClass(IntWritable.class);//IntWritable.class对应了int类型
33 
34         job.setMapperClass(WordcountMap.class);//设置任务的map类
35         job.setReducerClass(WordcountReduce.class);//设置任务的reduce类
36 
37         job.setMapOutputKeyClass(Text.class);//设置map端输出数据类型
38         job.setMapOutputValueClass(IntWritable.class);//设置mapd端reduce的数据类型
39         //注意以上都要和map的write方法写出类型一致
40 
41 
42         //1. 这里不设置TextInputFormat.class也没有关系,hadoop默认调用TextInputFormat类处理,
43         //2. 注意TextInputFormat类不能用来读取Sequncefile类型文件
44         job.setInputFormatClass(TextInputFormat.class);//设置输入文件读取的类,可以hdfs上处理一般的文本文件
45         job.setOutputFormatClass(TextOutputFormat.class);//设置输出的文件类型
46 
47         FileInputFormat.addInputPath(job, new Path(args[0]));//设置hdfs上输入文件路径,这里可以传入多个文件路径
48         // 可以再添加一个输入文件路径
49         //如 FileInputFormat.addInputPath(job, new Path(" "));
50         FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置hdfs上的输出文件路径,
51 
52         job.waitForCompletion(true);
53     }
54 
55 }

 

 

 

 map类,其实再map类执行之前还有一个过程inputformat,这里没有详细介绍,inputformat过程负责将输入路径传入的文件做分片,每一个分片会生成一个map任务,并且会读取输入的文件分片,并返回一行记录recordreader对象交给map方法执行,就这样不断的生成recordreader对象交给map去执行,recordreader对应的就是map方法中解析的kv键值对

 1 package mapreduce.wordcount;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 
 8 import java.io.IOException;
 9 
10 /**
11  * Created by teaegg on 2016/11/21.
12  */
13 public class WordcountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
14 
15     private Text outkey;
16 
17     private IntWritable outvalue = http://www.mamicode.com/new IntWritable(1);
18 
19     public void map(LongWritable key, Text value, Context context)
20             throws IOException, InterruptedException {
21 
22         //LongWritable key是这一行记录相对文件的偏移量,一般情况用不上,可以忽视
23         //inputformat过程默认采用Textinputformat类来读取hdfs上一般的文本文件,并将每一行记录转化成Text对象
24         //这是inputformat过程的一个主要职责,读取数据并交给map来处理,
25 
26         String line = value.toString();//toString方法可以将Text类型的数据转换成String
27         String[] item = line.split(" ");//采用空格分隔每行数据
28         for (String str: item) {
29             outkey = new Text(str);    //将每一个word再转化成Text类型
30 
31             //1. 这一步是关键一步,无论是inputformat还是map或者reduce,其处理的数据都是键值对类型
32             //2. 这里调用context.write方法,将map处理好的结果写出到磁盘上,然后数据会根据key做排序,shuffle并最终
33             //到达reduce端交给reduce方法继续处理
34             //3. map端可以多次调用write方法,每次调用都是一个写出的键值对结果,
35             context.write(outkey, outvalue);
36         }
37     }
38 
39     }

 

reduce类,reduce方法每次调用的时候会处理一个map输出的key和这个key下所有的输出的value值,

 1 package mapreduce.wordcount;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Reducer;
 6 
 7 import java.io.IOException;
 8 
 9 /**
10  * Created by teaegg on 2016/11/21.
11  * <p>
12  * 1. map端传入的是“一行”数据,而在reduce中,输入的key  即Text key是map端输出的context.write(outkey, outvalue)中设置的key
13  * 2. 而reduce方法中输入参数Iterable<IntWritable> values  是map端输出键值对相同的key下所有的value的集合
14  */
15 public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
16 
17     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
18         int sum = 0;
19         for (IntWritable val : values) {
20             sum += val.get();
21         }
22         context.write(key, new IntWritable(sum));
23     }
24 }

 

mapreduce运行流程总结