首页 > 代码库 > 使用Hadoop的MapReduce与HDFS处理数据

使用Hadoop的MapReduce与HDFS处理数据

hadoop是一个分布式的基础架构,利用分布式实现高效的计算与储存,最核心的设计在于HDFS与MapReduce,HDFS提供了大量数据的存储,mapReduce提供了大量数据计算的实现,通过Java项目实现hadoop job处理海量数据解决复杂的需求。

一、基本环境及相关软件的配置

    详细配置说明:基本环境配置及权限申请

二、hadoop项目开发流程

     hadoop主要的开发为job的初始化与分布式处理流程的开发。

1、任务基本配置

    首相根据业务需求,需要在代码中配置job在每台机器上需要的java虚拟机使用的内存与执行过程需要的最大内存。

Configuration configuration =new Configuration();
configuration.set("mapreduce.map.java.opts","-Xmx2048m");
configuration.set("mapreduce.map.memory.mb","3072");
configuration.set("mapreduce.reduce.java.opts","-Xmx2048m");
configuration.set("mapreduce.reduce.memory.mb","3072");
Job job = newJob(configuration, "miuilite-dailyRetain-"+arg[4]);

2、运行参数配置

job.setJarByClass(MiuiliteRetainJob.class);
MultipleOutputs.addNamedOutput(job, MIUIDanfaGeneralMapReduce.MULTI_OUTPUT_NAME_STATUS, SequenceFileOutputFormat.class,Text.class, Text.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
MultipleInputs.addInputPath(job,new Path(arg[0]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class);
MultipleInputs .addInputPath(job,new Path(arg[1]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.StatusLogMapper.class);
FileOutputFormat.setOutputPath(job,new Path(arg[2]));
job.setReducerClass(MiuiliteRetainMapReduce.RetainReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(40);//配置节点数量

     hadoop任务处理过程中,各个分布式机器读取操作数据都是通过分布式储存文件系统hdfs,并且分布式计算将中间结果或者最终结果都是保存到hdfs上的,所以在job开发过程中需要的配置有:

   1)相关的地址:数据hdfs地址,中间状态缓存保存HDFS地址,以及生成的结果hdfs保存地址,(如需要本地进一步处理结果,还需要本地地址,需要将hdfs的结果地址拉取到本 地进行处理),本地服务器地址:

          MultipleInputs.addInputPath(job, new Path(arg[0]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class); 

        注意:对于要写入数据的地址要具有写的权限,详细权限配置请看基本配置介绍。

   2)各种数据格式:

            一种是文件的读取格式,可以使用基于行的日志文件,也可以使用二进制格式,多行输入记录或其他的格式,Hadoop有自带的几种格式:

输入格式
解释
key
value
TextInputFormat默认格式,按照行读取行的字节偏移量行的内容
KeyValueInputFormat解析每一行的数据第一个Tab前的字符剩下的内容
SequenceFileInputFormat具有高性能的二进制格式自定义自定义

所以在读取输入文件格式中,需要选择自己合适的格式来初始化  MultipleInputs.addInputPath(job, new Path(arg[0]),SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class);

对于自定义的SequenceFileInputFormat,它会读取特殊的特定于Hadoop的二进制文件,会让Hadoop的mapper快速读取数据。Sequence文件是块压缩的,并提供了对几种数据类型(不仅仅是文本类型)直接的序列化与反序列化操作。

     其次文件读取key 与value的格式,以及输出到文件的格式:BooleanWritable:标准布尔型数值,ByteWritable:单字节数值,DoubleWritable:双字节数,FloatWritable:浮点数,IntWritable:整型数,LongWritable:长整型数,Text:使用UTF8格式存储的文本,NullWritable:当<key,value>中的key或value为空时使用,需要在初始化job的过程中初始化对应输入输出的格式。

  3)配置数据处理类,一般分为两个阶段,

        第一步叫做mapping,会对数据作为mapper函数的输入数据,每条数据对应一个,mapper会吧每次map处理后的结果可以根据相同的key单独传到一个输出数据元素里面。例子: MultipleInputs.addInputPath(job, new Path(arg[0]), SequenceFileInputFormat.class,MiuiliteRetainMapReduce.NewLogMapper.class);。 

        注意:可以同时使用多个数据输入处理的mapper,但输出key与value格式必须保持一致。

        第二步叫做reducer,会接收mapping的输出作为输入列表的迭代器,会将同一key的值聚合在一起,并做一定的处理而返回处理结果。例子:job.setReducerClass(MiuiliteRetainMapReduce.RetainReducer.class);

3、数据处理流程

    在所有配置好了之后调用job.waitForCompletion(true);提交任务执行任务并等待结束。

Mapper阶段:

public static class NewLogMapperextends Mapper<Object, BytesWritable, Text, Text> {
 
        @Override
        publicvoid map(Object key, BytesWritable value, Context context) {
        //..........省略中间处理原始数据过程,比如解密,生成OutPutKey等
        context.write(newText(OutPutKey), newText(OutPutValue);
    }
 
}

Redecer阶段:

public static  class RetainReducerextends Reducer<Text, Text, Text, Text> {
        @Override
        publicvoid setup(Context context)throws IOException, InterruptedException {
            super.setup(context);
            //数据初始化过程,初始化相关的计数工具
        }
 
        @Override
        publicvoid reduce(Text key, Iterable<Text> values, Context context) {
          //对应同一个key进行相关的统计处理阶段,并将数据计入到相关的计数工具中。
        }
        @Override
        protectedvoid cleanup(Reducer.Context context)throws IOException, InterruptedException {
                stringCounter.output(context);
                super.cleanup(context);
         //执行完毕的后续阶段,将没台分布式计算的机器的结果输入到hdfs上,清理context,
 
        }

reducer完成后需要统一将处理结果写入到HDFS中,所以在统计工具中应带有最后的输出函数:

public void output(Reducer.Context context, intlongTailBar) throwsIOException, InterruptedException {
        for(Iterator<String> iterator = stringCountMap.keySet().iterator(); iterator.hasNext();) {
            String key = iterator.next();
            longvalue = http://www.mamicode.com/stringCountMap.get(key);
            if(value < longTailBar)
                continue;
            key = key.replace(‘\r‘,‘ ‘);
            key = key.replace(‘\n‘,‘ ‘);
            context.write(newText(key), new LongWritable(value));
        }
    }

4、处理结果本地

    hadoop处理后的结果都是保存在hdfs上的,可以将对应的结果作为行的任务的输入进一步精确处理,如果需要进一步本地处理,通过调用本地shell命令将结果复制到本地:

private void copyToLocal(String hdfsPath, String localPath)throws IOException, InterruptedException {
        String[] cmd = {"/bin/sh","-c", "hadoop fs -cat "+ hdfsPath + "/part* > "+ localPath};
        String tmpDic = loalPath.substring(0,localPath.lastIndexOf("/"));
        if(!newFile(tmpDic).exists()){
            newFile(tmpDic).mkdirs();
        }
        if(!newFile(localPath).exists()){
            newFile(localPath).createNewFile();
        }
        Process pid = Runtime.getRuntime().exec(cmd);
        if(pid != null) {
            pid.waitFor();
        }
    }

三、运行流程

运行shell命令配置

      hadoop项目运行方式通过shell文件执行指定的jar包,并指定对应的入口函数,依据项目的需求传入不同的参数。

      hadoop jar  miuiapp-logs.jar com.xiaomi.miui.logs.danfa.MiuiMihomeGeneralJob XXX-param-1 XXX-param-2 XXX-param-3

注意:如果通过crontab -e定时指定相关的命令运行,需要在运行的shell文件中添加 jdk,hadoop的地址到环境变量中。

注意:在配置pom过程中需要将jar包打成大包,将所有依赖的jar包都应该打进去,所以在pom中应该加入下列配置:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.3</version>
        <configuration>
            <appendAssemblyId>false</appendAssemblyId>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
        </configuration>
            <executions>
               <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>assembly</goal>
                    </goals>
                </execution>
            </executions>
</plugin>

使用Hadoop的MapReduce与HDFS处理数据