首页 > 代码库 > hadoop join问题1

hadoop join问题1

                 hadoop join问题1

 Join讲解1: 获取员工所在部门信息,输出格式要求:员工编号,员工姓名,部门名称,部门编号
1、原始数据
员工数据
empno ename job        mgrhiredate salcomm deptnoloc
7499 allen salesman 7698 1981-02-20 1600300 30
7782 clark managers 7639 1981-06-09 245010
7654 martin salesman 7698 1981-03-20 12501400 30boston
7900 james clerk 7698 1981-01-09 95030
7788 scott analyst 7566 1981-09-01 3000100 20


部门数据
deptno dname loc
30 sales chicago
20 research dallas
10 accounting newyork


2、实现的功能类似于
select e.empno,e.ename,d.dname,d.deptno from emp e join dept d on e.deptno=d.deptno;


key: deptno
第一种思路:
Text:empno_ename_0/deptno_dname_1;


第二种思路:
Consume bean: empno/ename/deptno/dname/flag






3、处理join的思路:
       将Join key 当作map的输出key, 也就是reduce的输入key ,  这样只要join的key相同,shuffle过后,就会进入到同一个reduce 的key - value list 中去。
       需要为join的2张表设计一个通用的一个bean.  并且bean中加一个flag的标志属性,这样可以根据flag来区分是哪张表的数据。
       reduce 阶段根据flag来判断是员工数据还是部门数据就很容易了 。而join的真正处理是在reduce阶段。


4、实现中间bean
存储数据的bean  (由于数据要在网络上传输必须序列化,hadoop处理的时候需要分组和排序,所以要实现WritableComparable接口):
package com.wy.hadoop.join.one;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.WritableComparable;


public class Emplyee implements WritableComparable {

private String empNo ="";
private String empName ="";
private String deptNo="";
private String deptName="";
private int flag =0;

public Emplyee(){};
public Emplyee(String empNo,String empName,String deptNo,String deptName,int flag){
this.empNo = empNo;
this.empName = empName;
this.deptNo = deptNo;
this.deptName = deptName;
this.flag = flag;
}
public Emplyee(Emplyee e){
this.empNo = e.empNo;
this.empName = e.empName;
this.deptNo = e.deptNo;
this.deptName = e.deptName;
this.flag = e.flag;
}
public String getEmpNo() {
return empNo;
}


public void setEmpNo(String empNo) {
this.empNo = empNo;
}


public String getEmpName() {
return empName;
}


public void setEmpName(String empName) {
this.empName = empName;
}


public String getDeptNo() {
return deptNo;
}


public void setDeptNo(String deptNo) {
this.deptNo = deptNo;
}


public String getDeptName() {
return deptName;
}


public void setDeptName(String deptName) {
this.deptName = deptName;
}


public int getFlag() {
return flag;
}


public void setFlag(int flag) {
this.flag = flag;
}


@Override
public void readFields(DataInput input) throws IOException {
// TODO Auto-generated method stub
this.empNo = input.readUTF();
this.empName = input.readUTF();
this.deptNo = input.readUTF();
this.deptName = input.readUTF();
this.flag = input.readInt();
}


@Override
public void write(DataOutput output) throws IOException {
// TODO Auto-generated method stub
output.writeUTF(this.empNo);
output.writeUTF(this.empName);
output.writeUTF(this.deptNo);
output.writeUTF(this.deptName);
output.writeInt(this.flag);

}


@Override
public int compareTo(Object o) {
// TODO Auto-generated method stub
return 0;
}
@Override
public String toString() {
String string = this.empNo+","+this.empName+","+this.deptName;
return string;
}

}

package com.hadoop.eight;


import java.io.IOException;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class JoinOneMapper extends Mapper<LongWritable, Text, IntWritable, Emplyee> {

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] arr = line.split("\t");
System.out.println("-------"+arr.length);
if(arr.length<=3){//部门数据 
Emplyee emplyee = new Emplyee();
emplyee.setDeptno(arr[0]);
emplyee.setDname(arr[1]);
emplyee.setFlag(0);

context.write(new IntWritable(Integer.parseInt(arr[0])), emplyee);
}else{//员工信息
Emplyee emplyee = new Emplyee();
emplyee.setEmpno(arr[0]);
emplyee.setEname(arr[1]);
emplyee.setDeptno(arr[7]);
emplyee.setFlag(1);

context.write(new IntWritable(Integer.parseInt(arr[7])), emplyee);
}


}

public static void main(String[] args){
String line = "30sales chicago";
String[] arr = line.split("\t");
System.out.println("-------"+arr.length);
if(arr.length==3){//部门数据 
System.out.println("----1");
}else{//员工信息
System.out.println("----2");
}
}


}


package com.hadoop.eight;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;




public class JoinOneReducer extends Reducer<IntWritable, Emplyee, NullWritable, Text> {


@Override
protected void reduce(IntWritable key, Iterable<Emplyee> values,Context context)
throws IOException, InterruptedException {
Emplyee dept = null;
List<Emplyee> list = new ArrayList<Emplyee>();

for(Emplyee e:values){
if(e.getFlag()==0){//部门数据 
dept = new Emplyee(e);
list.add(new Emplyee(e));
}else{
list.add(new Emplyee(e));
}
}

for(Emplyee e:list){
e.setDname(dept.getDname());
context.write(NullWritable.get(), new Text(e.toString()));
}

}


}


package com.hadoop.eight;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class JobMain {


/**
* @param args
*/
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration,"join_one_job");
job.setJarByClass(JobMain.class);

job.setMapperClass(JoinOneMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Emplyee.class);

job.setReducerClass(JoinOneReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);

System.exit(job.waitForCompletion(true)?0:1);


}


}



hadoop join问题1