首页 > 代码库 > hadoop 倒排索引

hadoop 倒排索引

                                                                Hadoop倒排索引

  1)file1: 

 

MapReduce is simple

 

    2)file2: 

 

MapReduce is powerful is simple

 

    3)file3: 

 

Hello MapReduce bye MapReduce

 

    样例输出如下所示。

 

MapReduce      file1.txt:1;file2.txt:1;file3.txt:2;

is            file1.txt:1;file2.txt:2;

simple           file1.txt:1;file2.txt:1;

powerful      file2.txt:1;

Hello          file3.txt:1;

bye            file3.txt:1;


package com.hadoop.thirteen;
import java.io.IOException;
import java.util.StringTokenizer;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;




public class InvertedIndexMapper extends
Mapper<LongWritable, Text, Text, Text> {
private String fileName;
private final Text val = new Text("1");
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
/*String keyTmp = line+":"+fileName;// 为mapreduce is funny:file1
context.write(new Text(keyTmp), val);*/
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
String keyTmp = tokenizer.nextToken()+":"+fileName;// 为mapreduce is funny:file1
context.write(new Text(keyTmp), val);
}
}


@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String temp = inputSplit.getPath().toString();// /user/thirteen/thirteen_file1.txt
   fileName = temp.substring(temp.indexOf("file"));
}

public static void main(String[] args) {
System.out.println("axsddd".indexOf("s"));
String str="sgdfkldhfld";
StringTokenizer stringTokenizer  = new StringTokenizer(str);
while(stringTokenizer.hasMoreTokens()){
System.out.println(stringTokenizer.nextToken());
}
}


}



package com.hadoop.thirteen;


import java.io.IOException;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class InvertedIndexCombine extends
Reducer<Text, Text, Text, Text> {
       int sum = 0;
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
 String[]arr = key.toString().split(":");
 
 for(Text val:value){
 sum +=Integer.parseInt(val.toString());
 }
 //输出为 单词      url:词频
 context.write(new Text(arr[0]), new Text(arr[1]+":"+sum));
}


}

package com.hadoop.thirteen;


import java.io.IOException;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class InvertedIndexReducer extends
Reducer<Text, Text, Text, Text> {


@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
//可变字符串
/*StringBuffer buffer = new StringBuffer();

//在后面加;
for (Text val : values) {
buffer.append(val+";");
}

//这里buffer.toString().substring(0,buffer.length()-1)把最后一个;号去掉
context.write(key, new Text(buffer.toString().substring(0,buffer.length()-1)));*/

for (Text value : values) {
context.write(key, value);
}

}



}


package com.hadoop.thirteen;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class JobMain {


/**
* @param args
*/
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration,"inverted_index_job");
job.setJarByClass(JobMain.class);

job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setCombinerClass(InvertedIndexCombine.class);

job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);

System.exit(job.waitForCompletion(true)?0:1);




}


}




hadoop 倒排索引