首页 > 代码库 > MapReduce 学习1

MapReduce 学习1

MapReduce概述
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.

MapReduce合并了两种经典函数:

映射(Mapping)对集合里的每个目标应用同一个操作。即,如果你想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于mapping。

如:[1,2]  ==> [1X2,2X2] ==> [2,4]

化简(Reducing )遍历集合中的元素来返回一个综合的结果。即,输出表单里一列数字的和这个任务属于reducing。

 

Mapreduce原理图:

 技术分享

数据源为:k1,v1

客户的需求为k2,v2

一个block块对应多个split,一个split对应一个map任务,一个reduce任务会生成一个part-00000文件则一个reduce输出一个文件

执行的过程:

map任务的阶段:

1.1:读取文件,解释成为key,value键值对,每一行的字节偏移量会作为key, 每一行的文本内容会作为value.每一键值对调用都会执行一次(hadoop系统)

1.2:自定义的Mapper函数,k1,v1为输入值,进行编写自己的逻辑,完成后输出k2,v2(自己手写)

public static class WorldCountMapper extends          Mapper<LongWritable, Text, Text, LongWritable> {        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException  {              String[] worlds = value.toString().split(",");            for(String world :worlds){                context.write(new Text(world), new LongWritable(1));            }                    }    }

 

1.3:对输出的key、value进行分区。(hadoop系统)

1.4对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。(hadoop系统)
1.5分组后的数据进行归约

reduce任务阶段:

2.1对多个map函数的输出,按照不同的分区,通过网络把数据copy到同一个reduce节点上(hadoop系统)

2.2自定义的reduce函数,k2,v2为输入值,进行编写自己的逻辑,完成后输出k3,v3(自己手写)

     public static class WorldCountReduce extends        Reducer<Text, LongWritable, Text, LongWritable>{                protected void reduce(Text k2, Iterable<LongWritable> v2s,                Context context)                throws IOException, InterruptedException {            // TODO Auto-generated method stub            long sum =0 ;            for(LongWritable v2:v2s){                sum += v2.get();            }            context.write(k2, new LongWritable(sum));        }                    }    

2.3 把reduce的输出保存到文件中。

 

驱动代码:

public static void main(String[] args) throws Exception {    Configuration conf = new Configuration();    //加载配置文件    Job job = new Job(conf);    //创建一个job,供JobTracker使用    job.setJarByClass(WordCountApp.class);            job.setMapperClass(WorldCountMapper.class);    job.setReducerClass(WorldCountReduce.class);    FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.10:9000/input"));//hadoop2.x能够读取文件夹与文件    //如果想读取子文件夹则需要设置属性    FileInputFormat.setInputDirRecursive(job, true);    FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.10:9000/output"));    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(LongWritable.class);            job.waitForCompletion(true);}

 

MapReduce 学习1