首页 > 代码库 > mapreduce-实现单表关联

mapreduce-实现单表关联

//map类

package hadoop3;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class danbiaomap extends Mapper <LongWritable,Text,Text,Text>{


String childname=new String();
String parientname=new String();
String flag=new String();


protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException

{
String [] str=value.toString().split("\t");
if (str[0].compareTo("child")!=0)


{ //left table
flag="1";
childname=str[0];
parientname=str[1];
context.write(new Text(parientname), new Text(flag+"+"+childname+"+"+parientname));

//right table
flag="2";
context.write(new Text(childname), new Text(flag+"+"+childname+"+"+parientname));


}


}

}

 

//reduce 类

package hadoop3;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class danbiaoreduce extends Reducer<Text,Text,Text,Text>{

private int num=0;
protected void reduce(Text key,Iterable<Text> value, Context context) throws IOException, InterruptedException

{
if (num==0)

{

context.write(new Text("grandchild"),new Text( "grandparient"));
num++;
}

Iterator <Text> itr=value.iterator();
int grandchildnum=0;
String [] grandchild=new String[100];
int grandparientnum=0;
String [] grandparient=new String[100];

while (itr.hasNext())

{
String [] record=itr.next().toString().split("\\+");
if (record[0].compareTo("1")==0)
{
grandchild[grandchildnum]=record[1];
grandchildnum++;

}
else if (record[0].compareTo("2")==0)
{
grandparient[grandparientnum]=record[2];
grandparientnum++;

}
else
{}
}
if(grandchildnum !=0 && grandparientnum !=0)
{
for (int i=0;i<grandparientnum;i++)
{
for (int j=0;j<grandchildnum;j++)
{
context.write(new Text(grandchild[i]), new Text(grandparient[j]));

}

}


}



}



}

 

//主类

package hadoop3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

public class danbiao extends Configured implements Tool{

public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
ToolRunner.run(new danbiao(), args);
}

@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub

Configuration conf=getConf();
Job job=new Job();
job.setJarByClass(getClass());
FileSystem fs=FileSystem.get(conf);
fs.delete(new Path("/outfile1104"),true);
FileInputFormat.addInputPath(job,new Path("/luo/danbiao.txt"));
FileOutputFormat.setOutputPath(job, new Path("/outfile1104"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(danbiaomap.class);
job.setReducerClass(danbiaoreduce.class);

job.waitForCompletion(true);








return 0;
}

}

mapreduce-实现单表关联