首页 > 代码库 > Hadoop实战读书笔记(7)
Hadoop实战读书笔记(7)
输入数据概要
输入数据通常驻留在较大的文件中,通常几十或者数百GB,甚至更大。MapReduce处理的基本原则之一是将输入数据分割成块。这些块可以在多台计算机上并行处理,在Hadoop的术语中这些块被称为输入分片(Input Split)。每个分片应该足够小以实现更细粒度的并行。(如果所有的输入数据都在一个分片中,那就没有并行了。) 另一方面,每个分片也不能太小,否则启动与停止各个分片处理所需的开销将占去很大一部分执行时间。
所以说:
1、单个文件要足够的大,这样才能被分片,才会有并行。
2、分片大小不能太小,否则分片太多影响效率。
并行处理切分输入数据的原则
输入数据通常为单一的大文件,揭示了其背后Hadoop通用文件系统 (尤其是HDFS) 的一些设计策略。
例如,Hadoop的文件系统提供了FSDataInputStream类用于读取文件,而未采用Java中的java.io.DataInputStream FSDataInputStream扩展了DataInputStream以支持随机读,MapReduce需要这样特性,因为一台机器可能被指派从输入文件的中间开始处理一个分片。
如果没有随机访问,而需要从头开始一直读取到分片的位置,效率就会非常低。你还可以看到HDFS为了存储MapReduce并行切分和处理的数据所做的设计。
HDFS按块存储文件并分布在多台机器上。笼统而言,每个文件块为一个分片。由于不同的机器会存储不同的块,如果每个分片/块都由它所驻留的机器进行处理,就自动实现了并行。此外,由于HDFS在多个节点上复制数据块以实现可靠性,MapReducer可以选择任意一个包含分片/数据块副本的节点。
1、必须支持随机访问。
2、每个分片由它所驻留的机器进行处理,这样就实现了并行。
3、由于存在副本的原因,MapReducer可以选择任意包含该分片的节点。
Hadoop默认的读入模式
MapReduce操作是基于键/值对的,Hadoop默认地将输入文件中的每一行视为一个记录,而键/值对分别为该行的字节偏移(key)和内容(value)。
你也许不会把所有的数据都这样记录,Hadoop也支持一些其他的数据格式,并允许自定义格式。
输入分片与记录边界
请注意,输入分片是一种记录的逻辑划分,而HDFS数据块是对输入数据的物理分割。当它们一致时,效率会非常高,但在实际应用中从未达到完全一致。记录可能会跨过数据块的边界。Hadoop确保全部记录都被处理。处理特定分片的计算节点会从一个数据块中获取记录的一个片段,该数据块可能不是该记录的"主"数据块,而会存放在远端。为获取一个记录片段所需的通信成本是微不足道的,因为它相对而言很少发生。
InputFormat接口
Hadoop分割与读取输入文件的方式被定义在InputFormat接口中
TextInputFormat是InputFormat的默认实现
从TextInputFormat返回的键为每行的字节偏移量,好像是很少用
InputFormat的其他常用实现
InputFormat |
描述 |
TextInputFormat |
在文本文件中的每一行均为一个记录。键 (key) 为一行的字节偏移,而值 (value)为一行的内容 Key: LongWritable Value: Text |
KeyValueTextInputFormat |
在文本文件中的每一行均为一个记录,以每行的第一个分隔符为界,分隔符之前的键(key),之后的是值(value)。分离器在属性key.value.separator.in.input.line中设定,默认为制表符(\t) Key: Text Value: Text |
SequenceFileInputFormat<K, V> |
用于读取序列文件的InputFormat。键和值由用户定义。序列文件为Hadoop专用的压缩二进制文件格式。它专用于一个MapReduce作业和其他MapReduce作业之间传送数据 Key: K (用户定义) Value: V (用户定义) |
NLineInputFormat |
与TextInputFormat相同,但每个分片一定有N行。N在属性mapred.line.input.format.linespermap中设定,默认为1 Key:LongWritable Value: Text |
KeyValueTextInputFormat在更结构化的输入文件中使用,由一个预定义的字符,通常为制表符(\t),将每行 (记录) 的键与值分开。例如,一个以制表符分割的,由时间戳和URL组成的数据文件也需要是这样的:
17:16:18 http://hadoop.apache.org/core/docs/r0.19.0/api/index.html
17:16:19 http://hadoop.apache.org/core/docs/r0.19.0/mapred_tutorial.html
...
你可以设置JobConf对象使用KeyValueTextInputFormat类读取这个文件:
conf.setInputFormat(KeyValueTextInputFormat.class);
由前面的示例文件可知,mapper读取的第一个记录将包含一个键 "17:16:18" 和一个值
"http://hadoop.apache.org/core/docs/r0.19.0/api/index.html"。而mapper读到的第二个记录将包含键"17:16:19"和值http://hadoop.apache.org/core/docs/r0.19.0/mapred_tutorial.html... 如此递归。
之前,我们在mapper中曾使用LongWritable和Text分别作为键(key)和值(value)的类型。在TextInputFormat中,因为值为用数字表示的偏移量,所以LongWritable是一个合理的键类型。而当使用KeyValueTextInputFormat时,无论是键和值都为Text类型,你必须改变Mapper的实现以及map()方法来适应这个新的键 (key) 类型
输入到MapReduce作业的数据未必都是些外部数据,实际上,一个MapReduce作业的输入常常是其他一些MapReduce的输出。
你可以自定义输出格式
默认的输出格式与KeyValueTextInputFormat能够读取的数据格式保存一致 (即记录中的每行均为一个由制表符分割的键和值)。不过, Hadoop提供了更加有效的二进制压缩文件格式,称为序列文件。这个序列文件为Hadoop处理做了优化,当链接多个MapReduce作业时,它是首选格式。读取序列文件的InputFormat类为SequenceFileInputFormat
生成一个定制的InputFormat——InputSplit和RecordReader
InputFormat是一个仅包含两个方法的接口。
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
这两个方法总结了InputStream需执行的两个功能:
1、确定所有用于输入数据的文件,并将之分割为输入分片。每个map任务分配一个分片
2、提供一个对象 (RecordReader),循环提取给定分片中的记录,并解析每个记录为预定义类型的键与值
那么,谁去考虑如何将文件划分为分片呢?
1、FileInputFormat类主要负责分片
2、InputFormat类都是FileInputFormat类的子类
3、FileInputFormat实现了getSplits()方法,不过保留了getRecordReader()抽象让子类实现
4、FileInputFormat中实现的getSplits()把输入数据粗略地划分为一组分片,分片数目在numSplits中限定,且每个分片得大小必须大于mapred.min.split.size个字节,但小于文件系统的块
5、在实际情况中,一个分片最终总是以一个块未大小,在HDFS中默认为64MB
FileInputFormat说明
1、FileInputFormat有一定数量的protected方法,子类可以通过覆盖改变其行为
2、其中一个就是isSplitable(FileSystem fs, Path fileName)方法,它检查你是否可以将给定文件分片,默认实现总是返回true,因此所有大于一个分块的文件都要分片。
3、有时你可能不想该文件分片,这时你就可以覆盖isSplittable()来返回false,例如一些文件压缩方案并不支持分割,一些数据处理操作,如文件转换,需要把每个文件视为一个原子记录,也不能将之分片
RecordReader接口说明
这个接口负责把一个输入分片解析为记录,再把每个记录解析为一个键/值对
public interface RecordReader<K, V> {
boolean next(K key, V value) throws IOException;
K createKey();
V createValue();
long getPos() throws IOException;
public void close() throws IOException;
float getProgress() throws IOException;
}
我们不用自己写RecordReader,还是利用Hadoop所提供的类。
1、LineRecordReader实现RecordReader<LongWritable, Text>。它在TextInputFormat中被用于每次读取一行,以字节偏移作为键,以行的内容作为值。
2、而KeyValueLineRecordReader则被用在KeyValueTextInputFormat中
3、在大多数情况下,自定义RecordReader是基于现有实现的封装,并把大多数操作放在next()方法中。
自定义的TimeUrlTextInputFormat类似于KeyValueTextInputFormat,以前的键/值是Text/Text,现在是Text/URLWritable,URLWritable是自定义的
public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable> {
@Override
public RecordReader<Text, URLWritable> getRecordReader(
InputSplit input, JobConf job, Reporter reporter) throws IOException {
return new TimeUrlLineRecordReader(job, (FileSplit) input);
}
}
// 我们的URLWritable类非常简单:
public class URLWritable implements Writable {
protected URL url;
public URLWritable() {}
public URLWritable(URL url) {
this.url = url;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(url.toString());
}
public void readFields(DataInput in) throw IOException {
url = new URL(in.readUTF());
}
public void set(String s) throws MalformedURLException {
url = new URL(s);
}
}
TimeUrlLineRecordReader实现
class TimeUrlLineRecordReader implements RecordReader<Text, URLWritable> {
private KeyValueLineRecordReader lineReader;
private Text lineKey, lineValue;
public TimeUrlLineRecordReader(JonConf job, FileSplit split) throws IOException {
lineReader = new KeyValueLineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = http://www.mamicode.com/lineReader.createValue();
}
public boolean next(Text key, URLWritable value) throws IOException {
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key.set(lineKey);
value.set(lineValue.toString());
return true;
}
public Text createKey() {
return new Text("");
}
public URLWritable createValue() {
return new URLWritable();
}
public long getPos() throws IOException {
return lineReader.getPos();
}
public float getProgress() throws IOException {
return lineReader.getProgress();
}
public void close() throws IOException {
lineReader.close();
}
}
TImeUrlLineRecordReader类生成一个KeyValueRecordReader对象,并直接把getPos()、getProgress()以及close()方法调用传递给它。而next()方法将lineValueText对象转换为URLWritable类型
OurputFormat类是什么?
当MapReduce输出数据到文件时,使用的时OutputFormat类,它与InputFormat类相似,因为每个reducer仅需将它的输出写入自己的文件中,输出无需分片。输出文件放在一个公用目录中,通常命名为part-nnnnn,这里nnnnn是reducer的分区ID。RecordWriter对象将输出结果进行格式化,而RecordReader对输入格式进行解析。
几乎我们处理的所有OutputFormat类都是从FileOutputFormat抽象类继承来的
你可以通过调用JobConf对象中的setOutputFormat()定制OutputFormat
注意:你可能会奇怪,为什么要将OutputFormat (InputFormat) 和 FileOutputFormat (FileInputFormat) 区分开,似乎所有OutputFormat (InputFormat) 类都扩展了FileOutputFormat (FielInputFormat) 是否有不处理文件的OutFormat (InputFormat)类?没错,NullOutputFormat简单地实现了OutputFomat,并不需要继承FileOutputFormat。更重要的是,OutputFormat (InputFormat) 类处理的是数据库,并非文件,而且在类的层次关系中OutputFormat (InputFormat) 类是区别于FileoutputFormat (FileInputFormat) 的一个独立分支。这些类有专门的应用,有兴趣的读者可以在网上搜寻在线Java文档进一步了解DBInputFormat和DBOutputFormat
主要的OutputFormat类,默认为TextOutputFormat
OutputFormat |
描述 |
TextOutputFormat<K, V> |
将每个记录写为一行文本。键和值以字符串的形式写入,并以制表符(\t)分隔。这个分隔符可以在属性mapred.textoutputformat.separator中修改 |
SequenceFileOutputFormat<K, V> |
以Hadoop专有序列文件格式写入键/值对。与SequenceFileInputFormat配合使用 |
NullOutputFormat<K, V> |
无输出 |
默认的OutputFormat是TextOutputFormat,将每个记录写为一行文本。每个记录的键和值通过toString()被转换为字符串(string),并以制表符 (\t) 分隔。分隔符可以在mapred.textoutputformat.separator属性中修改。
1、TextOutputFormat采用可被KeyValueInputFormat识别的格式输出数据。
2、如果把键的类型设为NullWritable,它也可以采用可被TextInputFormat识别的输出格式,在这种情况下,在键/值对中没有键,也没有分隔符。
3、如果想完全禁止输出,应该使用NullOutputFormat
4、如果让reducer采用自己的方式输出,并且不需要Hadoop写任何附加的文件,可以限制Hadoop的输出。
5、SequenceFileOutputFormat以序列文件格式输出数据,使其可以通过SequenceFileInputFormat来读取。它有助于通过中间数据结果将MapReduce作业串接起来
MapReduce程序的核心是Map和Reduce操作,还有其他的操作?
1、data spliting(数据分割)
2、shuffling(洗牌)
3、Partitining(分组)
4、Combining(合并)
5、Hadoop还支持采用不同的格式读入和输出数据
Hadoop实战读书笔记(7)