首页 > 代码库 > Mapreduce设置多路径输入输出

Mapreduce设置多路径输入输出

最近写Mapreduce程序时,想用到多路径输入,一次输入多个文件夹下的数据。并且希望输出路径也可以区分,修改输出文件的名称。查了相关资料,已实现。

  • 多路径输入

设置Mapreduce的输入是HDFS上多个文件夹下的数据,在main函数下稍作配置即可,示例代码如下:

public static void main(String[] args) throws Exception {                String ioPath[] = {                "hdfs://10.1.2.3:8020/user/me/input/folder1",                "hdfs://10.1.2.3:8020/user/me/input/folder2",                "hdfs://10.1.2.3:8020/user/me/output"                        };                Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://10.1.2.3:8020");        conf.set("mapreduce.jobtracker.address", "10.1.2.3:8021");        Job job = Job.getInstance(conf, "Job-Name");            job.setJarByClass(TestMain.class);        job.setReducerClass(TestReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);                FileSystem fs = FileSystem.get(conf);        fs.delete(new Path(ioPath[2]),true);                MultipleInputs.addInputPath(job, new Path(ioPath[0]), TextInputFormat.class, TagUrlMapper.class);        MultipleInputs.addInputPath(job, new Path(ioPath[1]), TextInputFormat.class, TagUrlMapper.class);        FileOutputFormat.setOutputPath(job, new Path(ioPath[2]));                System.exit(job.waitForCompletion(true) ? 0 : 1);      }

使用MultipleInputs.addInputPath()方法添加输入路径,输入类型和Mapper类。

  • 多路径输出

在reducer中配置多路径输出及输出文件名的开头,示例代码:

public class TestReducer extends Reducer<Text, Text, Text, Text> {    private  MultipleOutputs<Text, Text> mos;   @Override    protected void setup(Context context)                    throws IOException, InterruptedException {      super.setup(context);      mos = new MultipleOutputs<Text, Text>(context);    }       @Override   protected void cleanup(Context context)                   throws IOException, InterruptedException {       super.cleanup(context);       mos.close();   }      @Override  public void reduce(Text key, Iterable<Text> values,      Context context)throws IOException, InterruptedException {          while(values.iterator().hasNext()){          tag = values.iterator().next();          if (......){              mos.write(key, new Text("taged"), "taged/taged");          }          else{              mos.write(key, new Text("untaged"), "untaged/untaged");          }      }}

使用MultipleOutputs类来控制输出路径。重写Reducer的setup()和cleanup()方法,如示例代码所示。

输出路径示例如下:image

Mapreduce设置多路径输入输出