首页 > 代码库 > Hadoop 高级程序设计(一)---复合键 自定义输入类型

Hadoop 高级程序设计(一)---复合键 自定义输入类型

简介:

在大数据处理的基本方法上,对于相互间计算的依赖性不大的数据,mapreduce采用分治的策略进行处理,将大的问题划分成小的问题进行求解,使得问题变得简单可行,同时在处理问题上面,MapReduce框架隐藏了很多的处理细节,将数据切分,任务调度,数据通信,容错,负载均衡.....交给了系统负责,对于很多问题,只需要采取框架的缺省值完成即可,用户只需完成设计map函数很reduce函数即可。

复合键

在一般的情况下只需要使用简单的<key,value>对即可,但是在一些复杂的情况下可以完成很多有效的处理,可以减少网络数据通信开销,提高程序计算效率。
例子:倒排索引 文档检索系统中最常用的数据结构,广泛的应用与全文检索,存储某个单词或者词组在一个文挡或是多个文档中的存储位置的映射,根据内容来查找文档的方式。由于不是根据文档查找内容 而是根据内容来查找文档,进行相反的操作,继而成为倒排索引。
代码:
package reverseIndex;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 倒排索引:根据内容查找文件 
 * xd is a good man ->file1.txt
 * good boy is xd   ->file2.txt
 * xd like beautiful women ->file3.txt
 * 对应下:
 * xd ->file1.txt file2.txt file3.txt
 * is ->file1.txt file2.txt
 * a ->file1.txt
 * good ->file1.txt file2.txt
 * man ->file1.txt
 * boy ->file2.txt
 * like ->file3.txt
 * beautiful ->file3.txt
 * women ->file3.txt
 * 在每个map函数中 所需数据对 是<"单词+文件名","词频"> 便于combiner的词频统计
 * 而在combiner中  将数据对变为<"单词","文件名+词频"> 便于将相同的key的数据 分发到 同一个reducer中执行 (HashPartition).
 * @author XD
 */
public class inverseIndex {
	public static class Map extends Mapper<LongWritable,Text,Text,Text>{
		private Text keyInfo = new Text();	//key值
		private Text valueInfo = new Text();	//value值
		private FileSplit split;	//回去文件的splie对象
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			split = (FileSplit)context.getInputSplit();	//关键 获取<key,value>所属的split对象
			StringTokenizer itr = new StringTokenizer(value.toString());
			while(itr.hasMoreTokens()){
				int splitIndex = split.getPath().toString().indexOf("file");//获取文件名 包含file的索引位置
				keyInfo.set(itr.nextToken()+":"+split.getPath().toString().substring(splitIndex));	//设定key值
				valueInfo.set("1");
				context.write(keyInfo, valueInfo);
			}
		}
	}
	public static class combiner extends Reducer<Text,Text,Text,Text>{
		private Text info = new Text();	//为了拆分 key值 准备存储新的value值
		public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException{
			int sum = 0;
			for(Text val:values){
				sum += Integer.parseInt(val.toString());
			}
			int splitIndex = key.toString().indexOf(":");
			info.set(key.toString().substring(splitIndex+1)+":"+sum);	//新的value值
			key.set(key.toString().substring(0, splitIndex));
			context.write(key, info);
		}
	}
	public static class Reduce extends Reducer<Text,Text,Text,Text>{
		private Text result = new Text();	//设定最终的输出结果
		public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException{
			String list = new String();
			for(Text val:values){
				list += val.toString()+";";	//不同的索引文件分隔开来
			}
			result.set(list);
			context.write(key,result);
		}		
	}
}

用户自定义数据类型

Hadoop中提供了很多的内置数据类型,但是在解决一些复杂的问题,这些内置的简单数据类型很难满足用户的需求,需要自定义数据类型。用户在自定义数据类型的时候,需要实现Writable接口,以便数据可以被序列化后完成网络传输或是文件的输入输出。此外,要是数据需要作为主键使用或者需要比较数值大小时,需要实现WritableComparable接口。
例子:
package com.rpc.nefu;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

//对自己输入的数据需要 可序列化  即自定义一个可序列化的类
public class keyvalue implements WritableComparable<keyvalue>{
	public int x,y;
	public keyvalue(){
		this.x = 0;
		this.y = 0;
	}
	public keyvalue(int x1,int y1){
		this.x = x1;
		this.y = y1;
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		x = in.readInt();
		y = in.readInt();
		
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeInt(x);
		out.writeInt(y);
	}
	public int distanceFromOrigin(){
		return (x*x+y*y);
	}
	public boolean equals(keyvalue o){
		if(!(o instanceof keyvalue)){
			return false;
		}
		return (this.x == o.x) && (this.y == o.y);
	}
	public int hashCode() {
		return Float.floatToIntBits(x)
			^ Float.floatToIntBits(y);
			
	}
	public String toString(){
		return Integer.toString(x)+","+Integer.toString(y);
	}
	@Override
	public int compareTo(keyvalue o) {
		//return x;
		// TODO Auto-generated method stub
		if(x > o.x){
			return 1;
		}else if(x == o.x){
			return 0;
		}else{
			return -1;
		}
	}	
}



Hadoop 高级程序设计(一)---复合键 自定义输入类型