首页 > 代码库 > hadoop 倒排索引
hadoop 倒排索引
Hadoop倒排索引
1)file1:
MapReduce is simple
2)file2:
MapReduce is powerful is simple
3)file3:
Hello MapReduce bye MapReduce
样例输出如下所示。
MapReduce file1.txt:1;file2.txt:1;file3.txt:2;
is file1.txt:1;file2.txt:2;
simple file1.txt:1;file2.txt:1;
powerful file2.txt:1;
Hello file3.txt:1;
bye file3.txt:1;
package com.hadoop.thirteen;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class InvertedIndexMapper extends
Mapper<LongWritable, Text, Text, Text> {
private String fileName;
private final Text val = new Text("1");
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
/*String keyTmp = line+":"+fileName;// 为mapreduce is funny:file1
context.write(new Text(keyTmp), val);*/
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
String keyTmp = tokenizer.nextToken()+":"+fileName;// 为mapreduce is funny:file1
context.write(new Text(keyTmp), val);
}
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String temp = inputSplit.getPath().toString();// /user/thirteen/thirteen_file1.txt
fileName = temp.substring(temp.indexOf("file"));
}
public static void main(String[] args) {
System.out.println("axsddd".indexOf("s"));
String str="sgdfkldhfld";
StringTokenizer stringTokenizer = new StringTokenizer(str);
while(stringTokenizer.hasMoreTokens()){
System.out.println(stringTokenizer.nextToken());
}
}
}
package com.hadoop.thirteen;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class InvertedIndexCombine extends
Reducer<Text, Text, Text, Text> {
int sum = 0;
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
String[]arr = key.toString().split(":");
for(Text val:value){
sum +=Integer.parseInt(val.toString());
}
//输出为 单词 url:词频
context.write(new Text(arr[0]), new Text(arr[1]+":"+sum));
}
}
package com.hadoop.thirteen;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class InvertedIndexReducer extends
Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
//可变字符串
/*StringBuffer buffer = new StringBuffer();
//在后面加;
for (Text val : values) {
buffer.append(val+";");
}
//这里buffer.toString().substring(0,buffer.length()-1)把最后一个;号去掉
context.write(key, new Text(buffer.toString().substring(0,buffer.length()-1)));*/
for (Text value : values) {
context.write(key, value);
}
}
}
package com.hadoop.thirteen;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobMain {
/**
* @param args
*/
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration,"inverted_index_job");
job.setJarByClass(JobMain.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombine.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
System.exit(job.waitForCompletion(true)?0:1);
}
}
hadoop 倒排索引