首页 > 代码库 > Hadoop Mapreduce模板

Hadoop Mapreduce模板

技术分享

Mapper

 1 package com.scb.jason.mapper;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 
 8 import java.io.IOException;
 9 import java.util.StringTokenizer;
10 
11 /**
12  * Created by Administrator on 2017/7/23.
13  */
14 // Step 1: Map Class
15 public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
16 
17     private Text mapOutputkey =  new Text();
18     private final static IntWritable mapOutputValue = http://www.mamicode.com/new IntWritable(1);
19 
20     @Override
21     protected void setup(Context context) throws IOException, InterruptedException {
22         super.setup(context);
23     }
24 
25     @Override
26     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
27         String lineValue =http://www.mamicode.com/ value.toString();
28         StringTokenizer stringTokenizer = new StringTokenizer(lineValue);
29         while(stringTokenizer.hasMoreTokens()){
30             String wordValue =http://www.mamicode.com/ stringTokenizer.nextToken();
31             mapOutputkey.set(wordValue);
32             context.write(mapOutputkey,mapOutputValue);
33         }
34     }
35 
36     @Override
37     protected void cleanup(Context context) throws IOException, InterruptedException {
38         super.cleanup(context);
39     }
40 
41     @Override
42     public void run(Context context) throws IOException, InterruptedException {
43         super.run(context);
44     }
45 }

 

Reducer

 1 package com.scb.jason.reducer;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Reducer;
 6 
 7 import java.io.IOException;
 8 
 9 /**
10  * Created by Administrator on 2017/7/23.
11  */
12 // Step 2: Reduce Class
13 public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
14 
15     private IntWritable outputValue = http://www.mamicode.com/new IntWritable();
16 
17     @Override
18     protected void setup(Context context) throws IOException, InterruptedException {
19         super.setup(context);
20     }
21 
22     @Override
23     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
24         int sum = 0;
25         for(IntWritable value:values){
26             sum += value.get();
27         }
28         outputValue.set(sum);
29         context.write(key,outputValue);
30     }
31 
32     @Override
33     protected void cleanup(Context context) throws IOException, InterruptedException {
34         super.cleanup(context);
35     }
36 
37     @Override
38     public void run(Context context) throws IOException, InterruptedException {
39         super.run(context);
40     }
41 }

 

Driver

 1 package com.scb.jason.driver;
 2 
 3 import com.scb.jason.mapper.WordCountMapper;
 4 import com.scb.jason.reducer.WordCountReducer;
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.conf.Configured;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9 import org.apache.hadoop.io.IntWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.util.Tool;
15 import org.apache.hadoop.util.ToolRunner;
16 
17 import java.io.IOException;
18 
19 /**
20  * Created by Administrator on 2017/7/17.
21  */
22 public class WordCount extends Configured implements Tool {
23 
24     // Step 3: Driver
25     public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
26         Configuration configuration = new Configuration();
27         FileSystem fs = FileSystem.get(configuration);
28 
29         Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
30         job.setJarByClass(this.getClass());
31 
32         Path input = new Path(args[0]);
33         FileInputFormat.addInputPath(job,input);
34 
35         job.setMapperClass(WordCountMapper.class);
36         job.setMapOutputKeyClass(Text.class);
37         job.setMapOutputValueClass(IntWritable.class);
38 
39         job.setReducerClass(WordCountReducer.class);
40         job.setMapOutputKeyClass(Text.class);
41         job.setMapOutputValueClass(IntWritable.class);
42 
43         Path outPath = new Path(args[1]);
44         if(fs.exists(outPath)){
45             fs.delete(outPath,true);
46         }
47         FileOutputFormat.setOutputPath(job,outPath);
48 
49         boolean isSuccess = job.waitForCompletion(true);
50         return isSuccess?1:0;
51     }
52 
53     public static void main(String[] args) throws Exception {
54         int exitCode = ToolRunner.run(new WordCount(),args);
55         System.exit(exitCode);
56     }
57 
58 }

 

Hadoop Mapreduce模板