首页 > 代码库 > mapReduce编程之google pageRank

mapReduce编程之google pageRank

1 pagerank算法介绍

1.1 pagerank的假设

  数量假设:每个网页都会给它的链接网页投票,假设这个网页有n个链接,则该网页给每个链接平分投1/n票。

  质量假设:一个网页的pagerank值越大,则它的投票越重要。表现为将它的pagerank值作为它投票的加权值。

1.2 矩阵表示形式

  技术分享

  .........

      技术分享

最终PR值会收敛为稳定值。

1.3 deadends和spider traps

deadends:一个网页没有链接,则最终PR值会收敛为全为0;

spider traps:一个网页只有指向自身的链接,则最终PR值会收敛为该网页为1,其他全为0。

解决方法:

技术分享

2 mapReduce实现

2.1 输入数据格式

技术分享   

  技术分享

2.2 job流程

技术分享

2.3 代码

UnitMultiplication.java
技术分享
 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.InputFormat;
 6 import org.apache.hadoop.mapreduce.Job;
 7 import org.apache.hadoop.mapreduce.Mapper;
 8 import org.apache.hadoop.mapreduce.Reducer;
 9 import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
10 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
11 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 
14 import java.io.IOException;
15 import java.util.ArrayList;
16 import java.util.List;
17 
18 public class UnitMultiplication {
19 
20     public static class TransitionMapper extends Mapper<Object, Text, Text, Text> {
21 
22         @Override
23         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
24             String line = value.toString().trim();
25             String[] fromTo = line.split("\t");
26 
27             if(fromTo.length == 1 || fromTo[1].trim().equals("")) {
28                 return;
29             }
30             String from = fromTo[0];
31             String[] tos = fromTo[1].split(",");
32             for (String to: tos) {
33                 context.write(new Text(from), new Text(to + "=" + (double)1/tos.length));
34             }
35         }
36     }
37 
38     public static class PRMapper extends Mapper<Object, Text, Text, Text> {
39 
40         @Override
41         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
42             String[] pr = value.toString().trim().split("\t");
43             context.write(new Text(pr[0]), new Text(pr[1]));
44         }
45     }
46 
47     public static class MultiplicationReducer extends Reducer<Text, Text, Text, Text> {
48 
49 
50         @Override
51         public void reduce(Text key, Iterable<Text> values, Context context)
52                 throws IOException, InterruptedException {
53             List<String> transitionUnit = new ArrayList<String>();
54             double prUnit = 0;
55             for (Text value: values) {
56                 if(value.toString().contains("=")) {
57                     transitionUnit.add(value.toString());
58                 }
59                 else {
60                     prUnit = Double.parseDouble(value.toString());
61                 }
62             }
63             for (String unit: transitionUnit) {
64                 String outputKey = unit.split("=")[0];
65                 double relation = Double.parseDouble(unit.split("=")[1]);
66                 String outputValue = http://www.mamicode.com/String.valueOf(relation * prUnit);
67                 context.write(new Text(outputKey), new Text(outputValue));
68             }
69         }
70     }
71 
72     public static void main(String[] args) throws Exception {
73 
74         Configuration conf = new Configuration();
75         Job job = Job.getInstance(conf);
76         job.setJarByClass(UnitMultiplication.class);
77 
78         ChainMapper.addMapper(job, TransitionMapper.class, Object.class, Text.class, Text.class, Text.class, conf);
79         ChainMapper.addMapper(job, PRMapper.class, Object.class, Text.class, Text.class, Text.class, conf);
80 
81         job.setReducerClass(MultiplicationReducer.class);
82 
83         job.setOutputKeyClass(Text.class);
84         job.setOutputValueClass(Text.class);
85 
86         MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, TransitionMapper.class);
87         MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PRMapper.class);
88 
89         FileOutputFormat.setOutputPath(job, new Path(args[2]));
90         job.waitForCompletion(true);
91     }
92 
93 }
View Code
UnitSum.java
技术分享
 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.io.DoubleWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Job;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
10 
11 import java.io.IOException;
12 import java.text.DecimalFormat;
13 
14 public class UnitSum {
15     public static class PassMapper extends Mapper<Object, Text, Text, DoubleWritable> {
16 
17         @Override
18         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
19            String[] pageSubrank = value.toString().split("\t");
20             double subRank = Double.parseDouble(pageSubrank[1]);
21             context.write(new Text(pageSubrank[0]), new DoubleWritable(subRank));
22         }
23     }
24 
25     public static class SumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
26         @Override
27         public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
28                 throws IOException, InterruptedException {
29 
30             double sum = 0;
31             for (DoubleWritable value: values) {
32                 sum += value.get();
33             }
34             DecimalFormat df = new DecimalFormat("#.0000");
35             sum = Double.valueOf(df.format(sum));
36             context.write(key, new DoubleWritable(sum));
37         }
38     }
39 
40     public static void main(String[] args) throws Exception {
41 
42         Configuration conf = new Configuration();
43         Job job = Job.getInstance(conf);
44         job.setJarByClass(UnitSum.class);
45         job.setMapperClass(PassMapper.class);
46         job.setReducerClass(SumReducer.class);
47         job.setOutputKeyClass(Text.class);
48         job.setOutputValueClass(DoubleWritable.class);
49         FileInputFormat.addInputPath(job, new Path(args[0]));
50         FileOutputFormat.setOutputPath(job, new Path(args[1]));
51         job.waitForCompletion(true);
52     }
53 }
View Code
Driver.java
技术分享
 1 public class Driver {
 2 
 3     public static void main(String[] args) throws Exception {
 4         UnitMultiplication multiplication = new UnitMultiplication();
 5         UnitSum sum = new UnitSum();
 6 
 7         //args0: dir of transition.txt
 8         //args1: dir of PageRank.txt
 9         //args2: dir of unitMultiplication result
10         //args3: times of convergence
11         String transitionMatrix = args[0];
12         String prMatrix = args[1];
13         String unitState = args[2];
14         int count = Integer.parseInt(args[3]);
15         for(int i=0;  i<count;  i++) {
16             String[] args1 = {transitionMatrix, prMatrix+i, unitState+i};
17             multiplication.main(args1);
18             String[] args2 = {unitState + i, prMatrix+(i+1)};
19             sum.main(args2);
20         }
21     }
22 }
View Code

 

 

 

mapReduce编程之google pageRank