首页 > 代码库 > MapReduceTopK TreeMap
MapReduceTopK TreeMap
MapReduce TopK统计加排序中介绍的TopK在mapreduce的实现。
本案例省略的上面案例中的Sort步骤,改用TreeMap来实现获取前K个词
package TopK1; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 统计词频 * @author zx * zhangxian1991@qq.com */ public class WordCount { /** * 读取单词 * @author zx * */ public static class Map extends Mapper<Object,Text,Text,IntWritable>{ IntWritable count = new IntWritable(1); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ String word = st.nextToken().replaceAll("\"", "").replace("'", "").replace(".", ""); context.write(new Text(word), count); } } } /** * 统计词频 * @author zx * */ public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{ @SuppressWarnings("unused") @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable intWritable : values) { count ++; } context.write(key,new IntWritable(count)); } } @SuppressWarnings("deprecation") public static boolean run(String in,String out) throws IOException, ClassNotFoundException, InterruptedException{ FileUtil.deleteFile(out); Configuration conf = new Configuration(); Job job = new Job(conf,"WordCount1"); job.setJarByClass(WordCount.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); // 设置Map输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置Reduce输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(in)); FileOutputFormat.setOutputPath(job, new Path(out)); return job.waitForCompletion(true); } }
package TopK1; import java.io.IOException; import java.util.Comparator; import java.util.Iterator; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; /** * * @author zx *zhangxian1991@qq.com */ public class TopK { public static class TopKMap extends Mapper<Object, Text, IntWritable, IntWritable>{ TreeMap<Integer,String> tm = new TreeMap<Integer,String>(new Comparator<Integer>() { /** * treeMap中的元素逆序排列 * @param o1 * @param o2 * @return */ @Override public int compare(Integer o1, Integer o2) { return o2.compareTo(o1); } }); int k = 0; @Override protected void cleanup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Path topKPath = new Path(conf.get("topKOut")); FileSystem fs = topKPath.getFileSystem(conf); FSDataOutputStream fsDOS = fs.create(topKPath); Iterator<Integer> it = tm.keySet().iterator(); while(it.hasNext()){ Integer key = it.next(); String value = http://www.mamicode.com/tm.get(key).toString();>package TopK1; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; /** * * @author zx * */ public class FileUtil { /** * 上传数据文件到hdfs * @param inputPath * @param fileName * @return * @throws IOException */ public static String loadFile(String inputPath,String folder,String fileName) throws IOException{ //获取数据文件的全路径 if(null != folder && !"".equals(folder)){ folder = folder + "/"; } String srcPathDir = FileUtil.class.getProtectionDomain().getCodeSource().getLocation() .getFile() + folder + fileName; Path srcpath = new Path("file:///" + srcPathDir); Path dstPath = new Path(getJobRootPath(inputPath) + fileName); Configuration conf = new Configuration(); FileSystem fs = dstPath.getFileSystem(conf); fs.delete(dstPath, true); fs.copyFromLocalFile(srcpath, dstPath); fs.close(); return getJobRootPath(inputPath) + fileName; } /** * 如果路径的最后不包哈“/”就加一个“/” * @param path * @return */ public static String getJobRootPath(String path){ if(path.lastIndexOf("/") == path.length()-1){ path = path.substring(0, path.lastIndexOf("/")); } return path.substring(0, path.lastIndexOf("/")+1); } public static void deleteFile(String ...filePath) throws IOException{ Configuration conf = new Configuration(); for (int i = 0; i < filePath.length; i++) { Path path = new Path(filePath[i]); FileSystem fs = path.getFileSystem(conf); fs.delete(path,true); } } }
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。