首页 > 代码库 > MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录
MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录
有了前面的MultipleOutputs的使用经验,就可以将HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的。这样其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result.
同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除。
先看pom.xml, 现在参数只有一个输入目录了,输出目录会在该路径后面自动加上/output.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.freebird</groupId> <artifactId>mr1_example3</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>mr1_example3</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.3.2</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>hadoop</executable> <arguments> <argument>jar</argument> <argument>target/mr1_example3-1.0-SNAPSHOT.jar</argument> <argument>org.freebird.LogJob</argument> <argument>/user/chenshu/share/logs</argument> </arguments> </configuration> </plugin> </plugins> </build> </project>
LogJob.java做了修改,主要用Path, FileSystem和Configuration三个类配合,删除HDFS已经存在的目录。
并且只设置了一个NamedOutput,名为result.
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.freebird.reducer.LogReducer; import org.freebird.mapper.LogMapper; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.fs.FileSystem; import java.io.IOException; public class LogJob { public static void main(String[] args) throws Exception { String inputPath = args[0]; if (inputPath.endsWith("/")) { inputPath = inputPath.substring(0, inputPath.length() -1); } System.out.println("args[0] indicates input folder path, the last / will be removed if it exists:" + inputPath); String outputPath = inputPath + "/output"; System.out.println("output folder path is:" + outputPath); Configuration conf = new Configuration(); Job job = new Job(conf, "sum_did_from_log_file"); job.setJarByClass(LogJob.class); job.setMapperClass(org.freebird.mapper.LogMapper.class); job.setReducerClass(org.freebird.reducer.LogReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path path1 = new Path(inputPath); Path path2 = new Path(outputPath); recreateFolder(path2, conf); MultipleOutputs.addNamedOutput(job, "result", TextOutputFormat.class, Text.class, IntWritable.class); FileInputFormat.addInputPath(job, path1); FileOutputFormat.setOutputPath(job, path2); System.exit(job.waitForCompletion(true) ? 0 : 1); } private static void recreateFolder(Path path, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); if (fs.exists(path)) { fs.delete(path); } } }
Reduce代码也需要修改:
package org.freebird.reducer; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; public class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private MultipleOutputs outputs; @Override public void setup(Context context) throws IOException, InterruptedException { System.out.println("enter LogReducer:::setup method"); outputs = new MultipleOutputs(context); } @Override public void cleanup(Context context) throws IOException, InterruptedException { System.out.println("enter LogReducer:::cleanup method"); outputs.close(); } public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { System.out.println("enter LogReducer::reduce method"); int sum = 0; for (IntWritable val : values) { sum += val.get(); } System.out.println("key: " + key.toString() + " sum: " + sum); outputs.write("result", key, sum); } }
代码比之前的例子简单很多,仅仅是往一个named output "result" 写出结果。
MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。