首页 > 代码库 > Hadoop实战读书笔记(6)
Hadoop实战读书笔记(6)
putmerge程序的大体流程是?
1、根据用户定义的参数设置本地目录和HDFS的目录文件
2、提取本地输入目录中每个文件的信息
3、创建一个输出流写入到HDF文件
4、遍历本地目录中的每个文件,打开一个输入流来读取该文件,剩下就是一个标准的Java文件复制过程了
具体程序如下:
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FieSystem.getLocal(conf);
// 设定输入目录与输出文件
Path inputDir = new Path(args[0]);
Path hdfsFile = new Path(args[1]);
try {
// 得到本地文件列表
FileStatus[] inputFiles = local.listStatus(inputDir);
// 生成HDFS输出流
FSDataOutputStream out = hdfs.create(hdfsFile);
for (int i = 0; i < inputFiles.length; i++) {
System.out.println(inputFiles[i].getPath().getName());
// 打开本地输入流
FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while ( (bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
in.close();
}
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
那么现在有数据了,还要对它进行处理、分析以及做其他的操作。
MapReduce程序通过操作键/值对来处理数据,一般形式为
map: (K1, V1) -> list(K2, V2)
reduce:(K2, list(V2)) -> list(K3, V3)
Hadoop数据类型有哪些?
MapReduce框架并不允许它们是任意的类。
虽然我们经常把某些键与值称为整数、字符串等,但它们实际上并不是Integer、String等哪些标准的Java类。为了让键/值对可以在集群上移动,MapReduce框架提供了一种序列化键/值对的方法。因此,只有那些支持这种序列化的类能够在这个框架中充当键或者值。
更具体的Hadoop类型说明
实现Writable接口的类可以是值
而实现WritableComparable<T>接口的类既可以是键也可以是值
注意WritableComparable<T>接口是Writable和java.lang.Comparable<T>接口的组合,对于键而言,我们需要这个比较,因为它们将在Reduce阶段进行排序,而值仅会被简单地传递。
键/值对经常使用的数据类型列表,这些类均实现WritableComparable接口
类 |
描述 |
BooleanWritable |
标准布尔变量的封装 |
ByteWritable |
单字节数的封装 |
DoubleWritable |
双字节数的封装 |
FloatWritable |
浮点数的封装 |
IntWritable |
整数的封装 |
LongWritable |
Long的封装 |
Text |
使用UTF-8格式的文本封装 |
NullWritable |
无键值时的站位符 |
如何自定义数据类型?
只要它实现了Writable(或WritableComparable<T>)接口。
定义一个Edge类型用于表示一个网络的边界
public class Edge implements WritableComparable<Efge> {
private String departureNode;
private String arrivalNode;
public String getDepartureNode() {
return departureNode;
}
// 说明如何读入数据
@Override
public void readFields(DataInput in) throws IOException {
departureNode = in.readUTF();
arrivalNode = in.readUTF();
}
// 说明如何写出数据
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(departureNode);
out.writeUTF(arrivalNode);
}
// 定义数据排序
@Override
public int compareTo(Edge o) {
return (departureNode.compareTo(o.departureNode) != 0)
? departureNode.compareTo(o.departureNode)
: arrivalNode.compareTo(o.arrivalNode);
}
}
Mapper类是什么?
一个类要作为mapper,需继承MapReduceBase基类并实现Mapper接口。
mapper和reducer的基类均为MapReduceBase类
其中包含一些函数或方法:
1、void configure(JobConf job),该函数提取XML配置文件或者应用程序主类中的参数,在数据处理之前调用该函数。
2、void close(),作为map任务结束前的最后一个操作,该函数完成所有的结尾工作,如关闭数据库连接、打开文件等。
Mapper接口负责数据处理阶段,它采用Mapper<K1, V1, K2, V2>Java泛型,这里键类和值类分别实现WritableComparable和Writable接口。
Mapper类只有是一个方法-map,用于处理一个单独的键/值对。
void map (K1 key,
V1 value,
OutputCollector<K2, V2> output,
Reporter reporter
) throws IOException
上面这个map函数的参数都是什么意思?
该函数处理一个给定的键/值对 (K1, V1),生成一个键/值对 (K2, V2) 的列表 (该列表页可能为空)
OutputCollector接收这个映射过程的输出
Reporter可提供对mapper相关附加信息的记录
Hadoop提供了一些有用的mapper实现,这些实现是?
类 |
描述 |
IdentityMapper<K, V> |
实现Mapper<K, V, K, V>, 将输入直接映射到输出 |
InverseMapper<K, V> |
实现Mapper<K, V, V, K>, 反转键/值对 |
RegexMapper<K> |
实现Mapper<K, Text, Text, LongWritable>, 为每个常规表达式的匹配项生成一个 (match, 1) 对 |
TokenCountMapper<K> |
实现Mapper<K, Text, Text, LongWritable>, 当输入的值为分词时, 生成一个(token, 1) 对 |
Reducer是什么?
一个类要作为reducer,需继承MapReduceBase基类并实现Reducer接口。
以便于允许配置和清理。
此外,它还必须实现Reducer接口使其具有如下的单一方法:
void reduce (K2 key,
Iterator<V2> values,
OutputCollector<K3, V3> output,
Reporter reporter
) throws IOException
当reducer任务接收来自各个mapper的输出时,它按照键/值对中的键对输入数据进行排序,并将相同键的值归并。然后调用reduce()函数,并通过迭代处理哪些与指定键相关联的值,生成一个 (可能为空的) 列表 (K3, V3)
OutputCollector接收reduce阶段的输出,并写入输出文件
Reporter可提供对reducer相关附加信息的记录,形成任务进度
一些非常有用的由Hadoop预定义的Reducer实现
类 |
描述 |
IdentityReducer<K, V> |
实现Reducer<K, V, K, V>, 将输入直接映射到输出 |
LongSumReducer<K> |
实现<K, LongWritable, K, LongWritable>, 计算与给定键相对应的所有值的和 |
注:虽然我们将Hadoop程序称为MapReduce应用,但是在map和reduce两个阶段之间还有一个极其重要的步骤:将mapper的结果输出给不同的reducer。这就是partitioner的工作。
初次使用MapReduce的程序员通常有一个误解?
仅需要一个reducer? 采用单一的reducer可以在处理之前对所有的数据进行排序。
No,采用单一的reducer忽略了并行计算的好处。
那么就应该使用多个reducer是么?但需要解决一个问题,如何确定mapper应该把键/值对输出给谁。
默认的作法是对键进行散列来确定reducer。Hadoop通过HashPartitioner类强制执行这个策略。但有时HashPartitioner会出错。
HashPartitioner会出什么错?
假如你使用Edge类来分析航班信息来决定从各个机场离港的乘客数目,这些数据可能是:
(San Francisco, Los Angeles) Chuck Lam
(San Francisco, Dallas) James Warren
如果你使用HashPartitioner,这两行可以被送到不同的reducer, 离港的乘客数目被处理两次并且两次都是错误的
如何为你的应用量身定制partitioner呢?
上面的情况,我希望具有相同离港地的所有edge被送往相同的reducer,怎么做呢?只要对Edge类的departureNode成员进行散列就可以了:
public class EdgePartitioner implements Partitioner<Edge, Writable> {
@Override
public int getPartition (Edge key, Writable value, int numPartitions) {
return key.getDepartureNode().hashCode() % numPartitions;
}
@Override
public void configure(JobConf conf) { }
}
一个定制的partitioner只需要实现configure()和getPartition()两个函数,前者将Hadoop对作业的配置应用在patitioner上,而后者返回一个介于0和reduce任务数之间的整数,指向键/值对将要发送的reducer
Combiner:本地reduce
在许多MapReduce应用场景中,我们不妨在分发mapper结果之前做一下 "本地Reduce"。再考虑一下WordCount的例子,如果作业处理的文件中单词 "the" 出现了574次,存储并洗牌一次 ("the", 574) 键/值对比许多次 ("the", 1) 更为高效。这种处理步骤被称为合并。
预定义mapper和Reducer类的单词计数
public class WordCount {
public static void main (String[] args) {
JobClient client = new JobClient();
JobConf conf = new JobConf(WordCount.class);
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setMapperClass(TokenCountMapper.class); // Hadoop自己的TokenCountMapper
conf.setCombinerClass(LongSumReducer.class);
conf.setReducerClass(LongSumReduver.class); // Hadoop自己的LongSumReducer
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用Hadoop预定义的类TokenCountMapper和LongSumReducer,编写MapReduce分厂的容易,Hadoop也支持生成更复杂的程序,这里只是强调Hadoop允许你通过最小的代码量快速生成实用的程序。
Hadoop实战读书笔记(6)