首页 > 代码库 > hadoop wordcount
hadoop wordcount
Mapper
// map的数量与数的分片有关系 public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = StringUtils.split(line, " "); for (String word : words) { context.write(new Text(word), new LongWritable(1)); } } }
Reducer
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable l : values) { count ++; } context.write(key, new LongWritable(count)); } }
Runner
public class WCRunner { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WCRunner.class); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 设置reduce的数量,对应的会生成设置数量的文件,每个文件的内容是根据 // job.setPartitionerClass(HashPartitioner.class);中的Partitioner确定 job.setNumReduceTasks(10); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
public class WCRunner2 extends Configured implements Tool{ public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WCRunner2.class); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { ToolRunner.run(new WCRunner2(), args); } }
执行: hadoop jar wc.jar com.easytrack.hadoop.mr.WCRunner2 /wordcount.txt /wc/output4
hadoop wordcount
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。