首页 > 代码库 > mahout推荐15-在hadoop上运行MapReduce
mahout推荐15-在hadoop上运行MapReduce
详情可以参考《Mahout实战》的第六章
代码:
package mahout.wiki;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.Iterator;import java.util.List;import java.util.PriorityQueue;import java.util.Queue;import java.util.Random;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;import org.apache.mahout.cf.taste.recommender.RecommendedItem;import org.apache.mahout.math.RandomAccessSparseVector;import org.apache.mahout.math.VarLongWritable;import org.apache.mahout.math.Vector;import org.apache.mahout.math.Vector.Element;import org.apache.mahout.math.map.OpenIntLongHashMap;import org.apache.mahout.math.VectorWritable;import com.demo.WordCount;import com.demo.Dedup.Reduce;import com.demo.WordCount.IntSumReducer;import com.demo.WordCount.TokenizerMapper;public class WikiTest { //解析WIkipediatri链接文件的mapper public static class WikipediaToItemPrefsMapper extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable>{ private static final Pattern NUMBERS = Pattern.compile("(\\d+)"); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String line = value.toString(); Matcher m = NUMBERS.matcher(line); m.find(); VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group())); VarLongWritable itemID = new VarLongWritable(); while(m.find()){ itemID.set(Long.parseLong(m.group())); context.write(userID, itemID); } } } // 从用户的物品偏好中生成Vector的reducer public static class WikipediaToUserVectorReducer extends Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable>{ @Override protected void reduce(VarLongWritable userId, Iterable<VarLongWritable> itemPrefs,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE,100); for (VarLongWritable itemPref : itemPrefs) { userVector.set((int)itemPref.get(), 1.0f); } context.write(userId, new VectorWritable(userVector)); } } // 计算共现关系的mapper public static class UserVectorToCooccurrenceMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, IntWritable>{ @Override protected void map(VarLongWritable userId, VectorWritable userVector,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stu Iterator<Element> it = userVector.get().nonZeroes().iterator(); while(it.hasNext()){ int index1 = it.next().index(); Iterator<Element> it2 = userVector.get().nonZeroes().iterator(); while (it2.hasNext()){ int index2 = it2.next().index(); context.write(new IntWritable(index1), new IntWritable(index2)); } } } } // 计算共生关系的reducer public static class UserVectorToCooccurrenceReducer extends Reducer<IntWritable, IntWritable, IntWritable, VectorWritable>{ @Override protected void reduce(IntWritable itemIndex1, Iterable<IntWritable> itemIndex2s,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Vector cooccurenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE,100); for (IntWritable intWritable : itemIndex2s) { int itemIndex2 = intWritable.get(); cooccurenceRow.set(itemIndex2, cooccurenceRow.get(itemIndex2) + 1.0); } context.write(itemIndex1, new VectorWritable(cooccurenceRow)); } } //封装共现关系列 public static class CooccurenceColumnWrapperMapper extends Mapper<IntWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{ @Override protected void map(IntWritable key, VectorWritable value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub context.write(key, new VectorOrPrefWritable()); } } // 分割用户向量 public static class UserVetorSplitterMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{ @Override protected void map(VarLongWritable key, VectorWritable value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub long userId = key.get(); Vector userVector = value.get(); Iterator<Element> it = userVector.nonZeroes().iterator(); IntWritable itemIndexWritable = new IntWritable(); while(it.hasNext()){ Vector.Element e = it.next(); int itemIndex = e.index(); float pref = (float) e.get(); itemIndexWritable.set(itemIndex); context.write(itemIndexWritable, new VectorOrPrefWritable(userId, pref)); } } } // 计算部分推荐向量 public static class PartialMultiplyMapper extends Mapper<IntWritable, VectorAndPrefsWritable, VarLongWritable, VectorWritable>{ @Override protected void map(IntWritable key, VectorAndPrefsWritable value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Vector cooccurenceColumn = value.getVector(); List<Long> userIDs = value.getUserIDs(); List<Float> prefValues = value.getValues(); for (int i = 0; i < userIDs.size(); i++) { long userId = userIDs.get(i); float prefValue = http://www.mamicode.com/prefValues.get(i);>
recommenderJob的流程图:就是苦于找不到如何配置,信息都是cmd形式。
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。