首页 > 代码库 > mapReduce编程之google pageRank
mapReduce编程之google pageRank
1 pagerank算法介绍
1.1 pagerank的假设
数量假设:每个网页都会给它的链接网页投票,假设这个网页有n个链接,则该网页给每个链接平分投1/n票。
质量假设:一个网页的pagerank值越大,则它的投票越重要。表现为将它的pagerank值作为它投票的加权值。
1.2 矩阵表示形式
.........
最终PR值会收敛为稳定值。
1.3 deadends和spider traps
deadends:一个网页没有链接,则最终PR值会收敛为全为0;
spider traps:一个网页只有指向自身的链接,则最终PR值会收敛为该网页为1,其他全为0。
解决方法:
2 mapReduce实现
2.1 输入数据格式
2.2 job流程
2.3 代码
UnitMultiplication.java
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.Path; 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.InputFormat; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.Mapper; 8 import org.apache.hadoop.mapreduce.Reducer; 9 import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; 10 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 11 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 14 import java.io.IOException; 15 import java.util.ArrayList; 16 import java.util.List; 17 18 public class UnitMultiplication { 19 20 public static class TransitionMapper extends Mapper<Object, Text, Text, Text> { 21 22 @Override 23 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 24 String line = value.toString().trim(); 25 String[] fromTo = line.split("\t"); 26 27 if(fromTo.length == 1 || fromTo[1].trim().equals("")) { 28 return; 29 } 30 String from = fromTo[0]; 31 String[] tos = fromTo[1].split(","); 32 for (String to: tos) { 33 context.write(new Text(from), new Text(to + "=" + (double)1/tos.length)); 34 } 35 } 36 } 37 38 public static class PRMapper extends Mapper<Object, Text, Text, Text> { 39 40 @Override 41 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 42 String[] pr = value.toString().trim().split("\t"); 43 context.write(new Text(pr[0]), new Text(pr[1])); 44 } 45 } 46 47 public static class MultiplicationReducer extends Reducer<Text, Text, Text, Text> { 48 49 50 @Override 51 public void reduce(Text key, Iterable<Text> values, Context context) 52 throws IOException, InterruptedException { 53 List<String> transitionUnit = new ArrayList<String>(); 54 double prUnit = 0; 55 for (Text value: values) { 56 if(value.toString().contains("=")) { 57 transitionUnit.add(value.toString()); 58 } 59 else { 60 prUnit = Double.parseDouble(value.toString()); 61 } 62 } 63 for (String unit: transitionUnit) { 64 String outputKey = unit.split("=")[0]; 65 double relation = Double.parseDouble(unit.split("=")[1]); 66 String outputValue = http://www.mamicode.com/String.valueOf(relation * prUnit); 67 context.write(new Text(outputKey), new Text(outputValue)); 68 } 69 } 70 } 71 72 public static void main(String[] args) throws Exception { 73 74 Configuration conf = new Configuration(); 75 Job job = Job.getInstance(conf); 76 job.setJarByClass(UnitMultiplication.class); 77 78 ChainMapper.addMapper(job, TransitionMapper.class, Object.class, Text.class, Text.class, Text.class, conf); 79 ChainMapper.addMapper(job, PRMapper.class, Object.class, Text.class, Text.class, Text.class, conf); 80 81 job.setReducerClass(MultiplicationReducer.class); 82 83 job.setOutputKeyClass(Text.class); 84 job.setOutputValueClass(Text.class); 85 86 MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, TransitionMapper.class); 87 MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PRMapper.class); 88 89 FileOutputFormat.setOutputPath(job, new Path(args[2])); 90 job.waitForCompletion(true); 91 } 92 93 }
UnitSum.java
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.Path; 3 import org.apache.hadoop.io.DoubleWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Job; 6 import org.apache.hadoop.mapreduce.Mapper; 7 import org.apache.hadoop.mapreduce.Reducer; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10 11 import java.io.IOException; 12 import java.text.DecimalFormat; 13 14 public class UnitSum { 15 public static class PassMapper extends Mapper<Object, Text, Text, DoubleWritable> { 16 17 @Override 18 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 19 String[] pageSubrank = value.toString().split("\t"); 20 double subRank = Double.parseDouble(pageSubrank[1]); 21 context.write(new Text(pageSubrank[0]), new DoubleWritable(subRank)); 22 } 23 } 24 25 public static class SumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { 26 @Override 27 public void reduce(Text key, Iterable<DoubleWritable> values, Context context) 28 throws IOException, InterruptedException { 29 30 double sum = 0; 31 for (DoubleWritable value: values) { 32 sum += value.get(); 33 } 34 DecimalFormat df = new DecimalFormat("#.0000"); 35 sum = Double.valueOf(df.format(sum)); 36 context.write(key, new DoubleWritable(sum)); 37 } 38 } 39 40 public static void main(String[] args) throws Exception { 41 42 Configuration conf = new Configuration(); 43 Job job = Job.getInstance(conf); 44 job.setJarByClass(UnitSum.class); 45 job.setMapperClass(PassMapper.class); 46 job.setReducerClass(SumReducer.class); 47 job.setOutputKeyClass(Text.class); 48 job.setOutputValueClass(DoubleWritable.class); 49 FileInputFormat.addInputPath(job, new Path(args[0])); 50 FileOutputFormat.setOutputPath(job, new Path(args[1])); 51 job.waitForCompletion(true); 52 } 53 }
Driver.java
1 public class Driver { 2 3 public static void main(String[] args) throws Exception { 4 UnitMultiplication multiplication = new UnitMultiplication(); 5 UnitSum sum = new UnitSum(); 6 7 //args0: dir of transition.txt 8 //args1: dir of PageRank.txt 9 //args2: dir of unitMultiplication result 10 //args3: times of convergence 11 String transitionMatrix = args[0]; 12 String prMatrix = args[1]; 13 String unitState = args[2]; 14 int count = Integer.parseInt(args[3]); 15 for(int i=0; i<count; i++) { 16 String[] args1 = {transitionMatrix, prMatrix+i, unitState+i}; 17 multiplication.main(args1); 18 String[] args2 = {unitState + i, prMatrix+(i+1)}; 19 sum.main(args2); 20 } 21 } 22 }
mapReduce编程之google pageRank
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。