首页 > 代码库 > MapReduce-MulitipleOutputs实现自定义输出到多个目录
MapReduce-MulitipleOutputs实现自定义输出到多个目录
输入源数据样例:
Source1-0001 Source2-0002 Source1-0003 Source2-0004 Source1-0005 Source2-0006 Source3-0007 Source3-0008描述:
- Source1开头的数据属于集合A;
- Source2开头的数据属于集合B;
- Source3开头的数据即属于集合A,也属于集合B;
输出要求:
- 完整保留集合A数据(包含Source1、Source3开头数据)
- 完整保留集合B数据(包含Source2、Source3开头数据)
程序实现:
import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.mahout.common.AbstractJob; import com.yhd.common.util.HadoopUtil; /** * AbstractJob 是mahout的Job模板,可以不使用该模板, * 实则的核心部分在于MultipleOutputs部分 * * @author ouyangyewei * */ public class TestMultipleOutputsJob extends AbstractJob { @Override public int run(String[] args) throws Exception { addInputOption(); addOutputOption(); Map<String, List<String>> parseArgs = parseArguments(args); if(parseArgs==null){ return -1; } HadoopUtil.delete(getConf(), getOutputPath()); Configuration conf = new Configuration(); conf.setInt("mapred.reduce.tasks", 4); conf.set("mapred.job.queue.name", "pms"); conf.set("mapred.child.java.opts", "-Xmx3072m"); conf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.05"); Job job = new Job(new Configuration(conf)); job.setJobName("TestMultipleOutputsJob"); job.setJarByClass(TestMultipleOutputsJob.class); job.setMapperClass(MultipleMapper.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, this.getInputPath()); FileOutputFormat.setOutputPath(job, this.getOutputPath()); /** 输出文件格式将为:Source1-m-**** */ MultipleOutputs.addNamedOutput(job, "Source1", TextOutputFormat.class, Text.class, Text.class); /** 输出文件格式将为:Source2-m-**** */ MultipleOutputs.addNamedOutput(job, "Source2", TextOutputFormat.class, Text.class, Text.class); boolean suceeded = job.waitForCompletion(true); if(!suceeded) { return -1; } return 0; } /** * * @author ouyangyewei * */ public static class MultipleMapper extends Mapper<LongWritable, Text, Text, Text> { private MultipleOutputs<Text, Text> mos = null; @Override protected void setup(Context context ) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, Text>(context); } public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); String[] tokenizer = line.split("-"); if (tokenizer[0].equals("Source1")) { /** 集合A的数据 */ mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]); } else if (tokenizer[0].equals("Source2")) { /** 集合B的数据 */ mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]); } /** 集合A交集合B的数据 */ if (tokenizer[0].equals("Source3")) { mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]); mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]); } } protected void cleanup(Context context ) throws IOException, InterruptedException { mos.close(); } } /** * @param args */ public static void main(String[] args) { System.setProperty("javax.xml.parsers.DocumentBuilderFactory", "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl"); System.setProperty("javax.xml.parsers.SAXParserFactory", "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"); TestMultipleOutputsJob instance = new TestMultipleOutputsJob(); try { instance.run(args); } catch (Exception e) { e.printStackTrace(); } } }
使用hadoop jar命令调度运行jar包代码:
hadoop jar bigdata-datamining-1.0-user-trace-jar-with-dependencies.jar com.yhd.datamining.data.usertrack.offline.job.mapred.TestMultipleOutputsJob --input /user/pms/workspace/ouyangyewei/testMultipleOutputs --output /user/pms/workspace/ouyangyewei/testMultipleOutputs/output
程序运行以后,输出的结果:
[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei] $hadoop fs -ls /user/pms/workspace/ouyangyewei/testMultipleOutputs/output Found 4 items -rw-r--r-- 3 pms pms 65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000 -rw-r--r-- 3 pms pms 65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000 -rw-r--r-- 3 pms pms 0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/_SUCCESS -rw-r--r-- 3 pms pms 0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/part-m-00000 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei] $hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000 Source1 0001 Source1 0003 Source1 0005 Source3 0007 Source3 0008 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei] $hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000 Source2 0002 Source2 0004 Source2 0006 Source3 0007 Source3 0008
MapReduce-MulitipleOutputs实现自定义输出到多个目录
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。