首页 > 代码库 > Hadoop实战读书笔记(6)

Hadoop实战读书笔记(6)

putmerge程序的大体流程是?

1、根据用户定义的参数设置本地目录和HDFS的目录文件

2、提取本地输入目录中每个文件的信息

3、创建一个输出流写入到HDF文件

4、遍历本地目录中的每个文件,打开一个输入流来读取该文件,剩下就是一个标准的Java文件复制过程了

具体程序如下:

public static void main(String[] args) throws IOException {

       Configuration conf = new Configuration();

       FileSystem hdfs = FileSystem.get(conf);

       FileSystem local = FieSystem.getLocal(conf);

       // 设定输入目录与输出文件

       Path inputDir = new Path(args[0]);

       Path hdfsFile = new Path(args[1]);

 

       try {

              // 得到本地文件列表

              FileStatus[] inputFiles = local.listStatus(inputDir);

              // 生成HDFS输出流

              FSDataOutputStream out = hdfs.create(hdfsFile);

 

              for (int i = 0; i < inputFiles.length; i++) {

                     System.out.println(inputFiles[i].getPath().getName());

                     // 打开本地输入流

                     FSDataInputStream in = local.open(inputFiles[i].getPath());

                     byte buffer[] = new byte[256];

                     int bytesRead = 0;

                     while ( (bytesRead = in.read(buffer)) > 0) {

                            out.write(buffer, 0, bytesRead);

                     }

                     in.close();

              }

              out.close();

       } catch (IOException e) {

              e.printStackTrace();

       }

}

 

那么现在有数据了,还要对它进行处理、分析以及做其他的操作。

MapReduce程序通过操作键/值对来处理数据,一般形式为

map: (K1, V1) -> list(K2, V2)

reduce:(K2, list(V2)) -> list(K3, V3)

 

Hadoop数据类型有哪些?

MapReduce框架并不允许它们是任意的类。

虽然我们经常把某些键与值称为整数、字符串等,但它们实际上并不是IntegerString等哪些标准的Java类。为了让键/值对可以在集群上移动,MapReduce框架提供了一种序列化键/值对的方法。因此,只有那些支持这种序列化的类能够在这个框架中充当键或者值。

 

更具体的Hadoop类型说明

实现Writable接口的类可以是值

而实现WritableComparable<T>接口的类既可以是键也可以是值

注意WritableComparable<T>接口是Writablejava.lang.Comparable<T>接口的组合,对于键而言,我们需要这个比较,因为它们将在Reduce阶段进行排序,而值仅会被简单地传递。

 

/值对经常使用的数据类型列表,这些类均实现WritableComparable接口

描述

BooleanWritable

标准布尔变量的封装

ByteWritable

单字节数的封装

DoubleWritable

双字节数的封装

FloatWritable

浮点数的封装

IntWritable

整数的封装

LongWritable

Long的封装

Text

使用UTF-8格式的文本封装

NullWritable

无键值时的站位符

 

如何自定义数据类型?

只要它实现了Writable(WritableComparable<T>)接口。

 

定义一个Edge类型用于表示一个网络的边界

public class Edge implements WritableComparable<Efge> {

       private String departureNode;

       private String arrivalNode;

 

       public String getDepartureNode() {

              return departureNode;

       }

       // 说明如何读入数据

       @Override

       public void readFields(DataInput in) throws IOException {

              departureNode = in.readUTF();

              arrivalNode = in.readUTF();

       }

       // 说明如何写出数据

       @Override

       public void write(DataOutput out) throws IOException {

              out.writeUTF(departureNode);

              out.writeUTF(arrivalNode);

       }

       // 定义数据排序

       @Override

       public int compareTo(Edge o) {

              return (departureNode.compareTo(o.departureNode) != 0)

                     ? departureNode.compareTo(o.departureNode)

                     : arrivalNode.compareTo(o.arrivalNode);

       }

}

 

Mapper类是什么?

一个类要作为mapper,需继承MapReduceBase基类并实现Mapper接口。

mapperreducer的基类均为MapReduceBase

其中包含一些函数或方法:

1void configure(JobConf job),该函数提取XML配置文件或者应用程序主类中的参数,在数据处理之前调用该函数。

2void close(),作为map任务结束前的最后一个操作,该函数完成所有的结尾工作,如关闭数据库连接、打开文件等。

Mapper接口负责数据处理阶段,它采用Mapper<K1, V1, K2, V2>Java泛型,这里键类和值类分别实现WritableComparableWritable接口。

Mapper类只有是一个方法-map,用于处理一个单独的键/值对。

void map (K1 key,

              V1 value,

              OutputCollector<K2, V2> output,

              Reporter reporter

              ) throws IOException

 

上面这个map函数的参数都是什么意思?

该函数处理一个给定的键/值对 (K1, V1),生成一个键/值对 (K2, V2) 的列表 (该列表页可能为空)

OutputCollector接收这个映射过程的输出

Reporter可提供对mapper相关附加信息的记录

 

Hadoop提供了一些有用的mapper实现,这些实现是?

描述

IdentityMapper<K, V>

实现Mapper<K,   V, K, V>, 将输入直接映射到输出

InverseMapper<K, V>

实现Mapper<K,   V, V, K> 反转键/值对

RegexMapper<K>

实现Mapper<K,   Text, Text, LongWritable>, 为每个常规表达式的匹配项生成一个   match 1  

TokenCountMapper<K>

实现Mapper<K,   Text, Text, LongWritable>, 当输入的值为分词时,   生成一个(token 1  

 

Reducer是什么?

一个类要作为reducer,需继承MapReduceBase基类并实现Reducer接口。

以便于允许配置和清理。

此外,它还必须实现Reducer接口使其具有如下的单一方法:

void reduce (K2 key,

                     Iterator<V2> values,

                     OutputCollector<K3, V3> output,

                     Reporter reporter

                     ) throws IOException

reducer任务接收来自各个mapper的输出时,它按照键/值对中的键对输入数据进行排序,并将相同键的值归并。然后调用reduce()函数,并通过迭代处理哪些与指定键相关联的值,生成一个 (可能为空的) 列表 K3, V3

OutputCollector接收reduce阶段的输出,并写入输出文件

Reporter可提供对reducer相关附加信息的记录,形成任务进度

 

一些非常有用的由Hadoop预定义的Reducer实现

描述

IdentityReducer<K, V>

实现Reducer<K,   V, K, V>, 将输入直接映射到输出

LongSumReducer<K>

实现<K,   LongWritable, K, LongWritable>   计算与给定键相对应的所有值的和

 

注:虽然我们将Hadoop程序称为MapReduce应用,但是在mapreduce两个阶段之间还有一个极其重要的步骤:将mapper的结果输出给不同的reducer。这就是partitioner的工作。

 

初次使用MapReduce的程序员通常有一个误解?

仅需要一个reducer 采用单一的reducer可以在处理之前对所有的数据进行排序。

No,采用单一的reducer忽略了并行计算的好处。

那么就应该使用多个reducer是么?但需要解决一个问题,如何确定mapper应该把键/值对输出给谁。

默认的作法是对键进行散列来确定reducerHadoop通过HashPartitioner类强制执行这个策略。但有时HashPartitioner会出错。

 

HashPartitioner会出什么错?

假如你使用Edge类来分析航班信息来决定从各个机场离港的乘客数目,这些数据可能是:

(San Francisco, Los Angeles) Chuck Lam

(San Francisco, Dallas) James Warren

如果你使用HashPartitioner,这两行可以被送到不同的reducer 离港的乘客数目被处理两次并且两次都是错误的

 

如何为你的应用量身定制partitioner呢?

上面的情况,我希望具有相同离港地的所有edge被送往相同的reducer,怎么做呢?只要对Edge类的departureNode成员进行散列就可以了:

public class EdgePartitioner implements Partitioner<Edge, Writable> {

       @Override

       public int getPartition (Edge key, Writable value, int numPartitions) {

              return key.getDepartureNode().hashCode() % numPartitions;

       }

       @Override

       public void configure(JobConf conf) { }

}

一个定制的partitioner只需要实现configure()getPartition()两个函数,前者将Hadoop对作业的配置应用在patitioner上,而后者返回一个介于0reduce任务数之间的整数,指向键/值对将要发送的reducer

 

Combiner:本地reduce

在许多MapReduce应用场景中,我们不妨在分发mapper结果之前做一下 "本地Reduce"。再考虑一下WordCount的例子,如果作业处理的文件中单词 "the" 出现了574次,存储并洗牌一次 ("the", 574) /值对比许多次 ("the", 1) 更为高效。这种处理步骤被称为合并。

 

预定义mapperReducer类的单词计数

public class WordCount {

       public static void main (String[] args) {

              JobClient client = new JobClient();

              JobConf conf = new JobConf(WordCount.class);

 

              FileInputFormat.addInputPath(conf, new Path(args[0]));

              FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      

              conf.setOutputKeyClass(Text.class);

              conf.setOutputValueClass(LongWritable.class);

              conf.setMapperClass(TokenCountMapper.class); // Hadoop自己的TokenCountMapper

              conf.setCombinerClass(LongSumReducer.class);

              conf.setReducerClass(LongSumReduver.class); // Hadoop自己的LongSumReducer

              client.setConf(conf);

              try {

                     JobClient.runJob(conf);

              } catch (Exception e) {

                     e.printStackTrace();

              }

       }

}

使用Hadoop预定义的类TokenCountMapperLongSumReducer,编写MapReduce分厂的容易,Hadoop也支持生成更复杂的程序,这里只是强调Hadoop允许你通过最小的代码量快速生成实用的程序。


Hadoop实战读书笔记(6)