首页 > 代码库 > 自己总结一下mapreduce

自己总结一下mapreduce

mapreduce分为map和reduce两个工作,map负责处理初始数据,处理后产生的新数据再汇聚给reduce处理。

map和reduce类的重写都是一个规则。都是类名<*,*,*,*>(尖括号里面,Java叫做泛型)四个参数,map的前两个参数是从文件处传输过来待处理的key和value值,然后后两个参数是要提交给reduce处理的key和value值。例如map的尖括号里面是<*,*,Text,Intwritable>,那么reduce就必须是<Text,Intwritable,*,*>。这是根据一个带有学生名字和分数的文件,自己写的输出最高分的mapreduce

package org.apache.hadoop.examples;
import java.io.IOException;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount.IntSumReducer;
import org.apache.hadoop.fs.FileSystem;
//import org.apache.hadoop.examples.Wordcount.IntSumReducer;
//import org.apache.hadoop.examples.Wordcount.TokenizerMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.util.Iterator;


public class statics {

public static class Map extends

Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {


int max=0;
int min=65535;
String line = value.toString();

String s[]=line.split(" ");
for (int i=0;i<s.length;i++)
{
if (Integer.parseInt(s[i])>max)
max=Integer.parseInt(s[i]);
if (Integer.parseInt(s[i])>min)
min=Integer.parseInt(s[i]);
}

Text text = new Text("1");
// Text text1 = new Text("2");
context.write(text, new IntWritable(max));
// context.write(text1, new IntWritable(min));


}

}

public static class Reduce extends

Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {
int max=0;
int min=10000;

System.out.println(key.toString()+"....................");

Iterator<IntWritable> iterator = values.iterator(); //循环遍历成绩

while (iterator.hasNext()) {
int now = iterator.next().get();
if (now>max)
max =now;



}


context.write(new Text("max is "), new IntWritable(max));



}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
conf.set("mapred.job.tracker", "127.0.0.1:9000");
// 指定带运行参数的目录为输入输出目录
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) { // 判断路径参数是否为3个

System.err.println("Usage: Data Deduplication <in> <out><out>");

System.exit(2);

}

Job job = Job.getInstance(conf,"average");
job.setJarByClass(statics.class);

job.setMapperClass(Map.class);
job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);


job.setReducerClass(Reduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
Path outpath = new Path(otherArgs[1]);
if (fs.exists(outpath))
{
fs.delete(outpath,true);
}
FileOutputFormat.setOutputPath(job, outpath);


if (job.waitForCompletion(true))
{

System.exit(0);
}


}

}

自己总结一下mapreduce