首页 > 代码库 > mahout推荐15-在hadoop上运行MapReduce

mahout推荐15-在hadoop上运行MapReduce

详情可以参考《Mahout实战》的第六章

代码:

package mahout.wiki;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.Iterator;import java.util.List;import java.util.PriorityQueue;import java.util.Queue;import java.util.Random;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;import org.apache.mahout.cf.taste.recommender.RecommendedItem;import org.apache.mahout.math.RandomAccessSparseVector;import org.apache.mahout.math.VarLongWritable;import org.apache.mahout.math.Vector;import org.apache.mahout.math.Vector.Element;import org.apache.mahout.math.map.OpenIntLongHashMap;import org.apache.mahout.math.VectorWritable;import com.demo.WordCount;import com.demo.Dedup.Reduce;import com.demo.WordCount.IntSumReducer;import com.demo.WordCount.TokenizerMapper;public class WikiTest {	//解析WIkipediatri链接文件的mapper	public static class WikipediaToItemPrefsMapper		extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable>{		private static final Pattern NUMBERS = Pattern.compile("(\\d+)");		@Override		protected void map(LongWritable key, Text value,Context context)				throws IOException, InterruptedException {			// TODO Auto-generated method stub			String line = value.toString();			Matcher m = NUMBERS.matcher(line);			m.find();						VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));			VarLongWritable itemID = new VarLongWritable();						while(m.find()){				itemID.set(Long.parseLong(m.group()));				context.write(userID, itemID);			}		}	}	// 从用户的物品偏好中生成Vector的reducer	public static class WikipediaToUserVectorReducer extends		Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable>{		@Override		protected void reduce(VarLongWritable userId,				Iterable<VarLongWritable> itemPrefs,Context context)				throws IOException, InterruptedException {			// TODO Auto-generated method stub			Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE,100);			for (VarLongWritable itemPref : itemPrefs) {				userVector.set((int)itemPref.get(), 1.0f);			}			context.write(userId, new VectorWritable(userVector));		}	}	// 计算共现关系的mapper	public static class UserVectorToCooccurrenceMapper extends 		Mapper<VarLongWritable, VectorWritable, IntWritable, IntWritable>{		@Override		protected void map(VarLongWritable userId, VectorWritable userVector,Context context)				throws IOException, InterruptedException {			// TODO Auto-generated method stu			Iterator<Element> it = userVector.get().nonZeroes().iterator();			while(it.hasNext()){				int index1 = it.next().index();				Iterator<Element> it2 = userVector.get().nonZeroes().iterator();				while (it2.hasNext()){					int index2 = it2.next().index();					context.write(new IntWritable(index1), new IntWritable(index2));				}			}		}	}	// 计算共生关系的reducer	public static class UserVectorToCooccurrenceReducer extends		Reducer<IntWritable, IntWritable, IntWritable, VectorWritable>{		@Override		protected void reduce(IntWritable itemIndex1, Iterable<IntWritable> itemIndex2s,Context context)				throws IOException, InterruptedException {			// TODO Auto-generated method stub			Vector cooccurenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE,100);			for (IntWritable intWritable : itemIndex2s) {				int itemIndex2 = intWritable.get();				cooccurenceRow.set(itemIndex2, cooccurenceRow.get(itemIndex2) + 1.0);			}			context.write(itemIndex1, new VectorWritable(cooccurenceRow));		}	}	//封装共现关系列	public static class CooccurenceColumnWrapperMapper extends		Mapper<IntWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{		@Override		protected void map(IntWritable key, VectorWritable value,Context context)				throws IOException, InterruptedException {			// TODO Auto-generated method stub			context.write(key, new VectorOrPrefWritable());		}	}	// 分割用户向量	public static class UserVetorSplitterMapper extends		Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{		@Override		protected void map(VarLongWritable key, VectorWritable value,Context context)				throws IOException, InterruptedException {			// TODO Auto-generated method stub			long userId = key.get();			Vector userVector = value.get();			Iterator<Element> it = userVector.nonZeroes().iterator();			IntWritable itemIndexWritable = new IntWritable();						while(it.hasNext()){				Vector.Element e = it.next();				int itemIndex = e.index();				float pref = (float) e.get();				itemIndexWritable.set(itemIndex);				context.write(itemIndexWritable, new VectorOrPrefWritable(userId, pref));			}		}	}	// 计算部分推荐向量	public static class PartialMultiplyMapper extends		Mapper<IntWritable, VectorAndPrefsWritable, VarLongWritable, VectorWritable>{		@Override		protected void map(IntWritable key, VectorAndPrefsWritable value,Context context)				throws IOException, InterruptedException {			// TODO Auto-generated method stub			Vector cooccurenceColumn = value.getVector();			List<Long> userIDs = value.getUserIDs();			List<Float> prefValues = value.getValues();						for (int i = 0; i < userIDs.size(); i++) {				long userId = userIDs.get(i);				float prefValue = http://www.mamicode.com/prefValues.get(i);>

 recommenderJob的流程图:就是苦于找不到如何配置,信息都是cmd形式。

http://blog.fens.me/wp-content/uploads/2013/10/aglorithm_2.jpg