首页 > 代码库 > mapreduce-实现多表关联
mapreduce-实现多表关联
//map
package hadoop3;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class duobiaomap extends Mapper<LongWritable,Text,Text,Text>{
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
{
String line=value.toString();
if (line.contains("factoryname") || line.contains("addressID"))
{
return;
}
String [] str=line.split("\t");
String flag=new String();
if (str[0].length()==1)
{ flag="2";
context.write(new Text(str[0]), new Text(flag+"+"+str[1]) );
}
else if (str[0].length()>1)
{
flag="1";
context.write(new Text(str[1]), new Text(flag+"+"+str[0]));
}
else {}
}
}
//reduce
package hadoop3;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class duobiaoreduce extends Reducer<Text,Text,Text,Text>
{ private static int num=0;
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException
{
if(num==0)
{
context.write(new Text("factory"), new Text("address"));
num++;
}
Iterator<Text> itr=values.iterator();
String [] factory=new String[100];
int factorynum=0;
String [] address=new String[100];
int addressnum=0;
while(itr.hasNext())
{
String [] str1=itr.next().toString().split("\\+");
if (str1[0].compareTo("1")==0)
{
factory[factorynum]=str1[1];
factorynum++;
}
else if(str1[0].compareTo("2")==0)
{
address[addressnum]=str1[1];
addressnum++;
}
else {}
}
if(factorynum !=0 && addressnum !=0){
for (int i=0;i<address.length;i++)
{
for (int j=0;j<factory.length;j++)
{
context.write(new Text(factory[j]), new Text(address[i]));
}
}
}
}
}
//
package hadoop3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
public class duobiao extends Configured implements Tool{
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
ToolRunner.run(new duobiao(), args);
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf=getConf();
Job job=new Job();
job.setJarByClass(getClass());
FileSystem fs=FileSystem.get(conf);
fs.delete(new Path("/outfile1105"),true);
FileInputFormat.addInputPath(job, new Path("/luo/duobiao.txt"));
FileInputFormat.addInputPath(job, new Path("/luo/duobiao2.txt"));
FileOutputFormat.setOutputPath(job, new Path("/outfile1105"));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(duobiaomap.class);
job.setReducerClass(duobiaoreduce.class);
job.waitForCompletion(true);
return 0;
}
}
mapreduce-实现多表关联