首页 > 代码库 > 一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)
一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)
Map/Reduce用户界面
本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现、配置和调优进行具体的设置。然而,开发时候还是要相应着API进行相关操作。
首先我们须要了解Mapper和Reducer接口,应用通常须要提供map和reduce方法以实现他们。
接着我们须要对JobConf, JobClient,Partitioner,OutputCollector,Reporter,InputFormat,OutputFormat,OutputCommitter等进行讨论。
最后,我们将通过讨论框架中一些实用的功能点(比如:DistributedCache,IsolationRunner等等)。
核心功能描写叙述
应用程序一般会通过提供map和reduce来实现Mapper和Reducer接口,它们组成作业的核心。
Mapper
mapper对输入的键值对映射成一组中间格式的键值对。
映射是一个独立的任务,它将输入记录集转换为中间格式记录集。这样的转换的中间格式记录不须要与输入的记录集类型一致,一个输入的键值能够相应多个输出的键值对。
Hadoop Map/Reduce为每一个由作业中InputFormat产生的InputSplit产生一个map任务。
总的来说。对Mapper的实现者须要重写JobConfigurable.configure(JobConf)方法。此方法须要传递JobConf參数。以此来完毕mapper的初始化。
接着。框架调用map方法。对任务中的InputSplit中每一个key/value pair调用以此。
应用程序通过重写Closable.close()来运行必要的清理工作。
输出的键值对不须要跟输入的键值对的类型一致。输入的键值可能映射成0到多个输出的键值对,然后调用OutputCollector.collect(WritableComparable,Writable)来收集输出的键值对。
应用程序能够使用Reporter报告进度。设定应用级别的状态消息,更新Counters(计数器),或者仅是表明自己执行正常。
框架随后会把与一个特定key关联的全部中间过程的值(value)分成组。然后把它们传给Reducer以产出终于的结果。用户能够通过 JobConf.setOutputKeyComparatorClass(Class)来指定详细负责分组的Comparator。
Mapper的输出被排序后。就被划分给每一个Reducer。分块的总数目和一个作业的reduce任务的数目是一样的。
用户能够通过实现自己定义的Partitioner来控制哪个key被分配给哪个Reducer。
用户可选择通过 JobConf.setCombinerClass(Class)指定一个combiner,它负责对中间过程的输出进行本地的聚集,这会有助于减少从Mapper到Reducer传输数据量。
这些被排好序的中间过程的输出结果保存的格式是(key-len, key, value-len, value),应用程序能够通过JobConf控制对这些中间结果是否进行压缩以及怎么压缩,使用哪种 CompressionCodec。
须要多少个Map?
Map的数目一般是由输入数据的大小决定的,一般就是全部输入文件的总块(block)数。
Map正常的并行规模大致是每一个节点(node)大约10到100个map,对于CPU 消耗较小的map任务能够设到300个左右。因为每一个任务初始化须要一定的时间,因此。比較合理的情况是map运行的时间至少超过1分钟。
这样,假设你输入10TB的数据。每一个块(block)的大小是128MB,你将须要大约82,000个map来完毕任务,除非使用 setNumMapTasks(int)(注意:这里不过对框架进行了一个提示(hint),实际决定因素见这里)将这个数值设置得更高。
Reducer
对于reducer,官方给出的说明为:
Reducer reduces a set of intermediate values which share a key to a smaller set of values.
大意是Reducer对中间的值集合转换成一个key相应一个更小的数据集。
Reducer的个数取决于用户设置,用户通过JobConf.setNumReduceTasks(int)来设置。
总的来说。Reducer的实现须要通过重写JobConfigurable.configure(JobConf)方法。这种方法须要传递一个JobConf參数,目的是完毕Reducer的初始化工作。然后。框架为成组的输入数据中的每一个<key, (list of values)>对调用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。
之后。应用程序能够通过重写Closeable.close()来运行对应的清理工作。
Reducer有3个主要阶段:shuffle、sort和reduce。
Shuffle
reducer的输入相应的是mapper的已排序的输出。
Sort
框架在此阶段依据输入key的值对reducer的输入进行分组(由于不同mapper的输出中可能会有同样的key);
Shuffle和sort两个阶段是同一时候进行的;map的输出也是边取回边合并的。
Secondary Sort
假设须要中间过程对key的分组规则和reduce前对key的分组规则不同,那么能够通过 JobConf.setOutputValueGroupingComparator(Class)来指定一个Comparator。
再加上 JobConf.setOutputKeyComparatorClass(Class)可用于控制中间过程的key怎样被分组,所以结合两者能够实现按值的二次排序。
Reduce
本阶段框架为已分组的输入数据中的每一个 <key, (list of values)>对调用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。
reduce任务的输出一般是通过调用OutputCollector.collect(WritableComparable, Writable)来写入文件系统的。
应用能够利用Reporter来报告进度。设置程序级别状态消息和更新计数器,或是只告知程序执行正常。
Reducer的输出没有排序处理。
须要多少Reduce
Reduce的数目建议是0.95或1.75乘以 (<no. of nodes> *mapred.tasktracker.reduce.tasks.maximum)。
用0.95。全部reduce能够在maps一完毕时就立马启动,開始传输map的输出结果。用1.75,速度快的节点能够在完毕第一轮reduce任务后,能够開始第二轮,这样能够得到比較好的负载均衡的效果。
添加reduce的数目会添加整个框架的开销,但能够改善负载均衡。减少因为运行失败带来的负面影响。
上述比例因子比总体数目稍小一些是为了给框架中的猜測性任务(speculative-tasks) 或失败的任务预留一些reduce的资源。
无Reducer
假设没有归约要进行。那么设置reduce任务的数目为零是合法的。
这样的情况下,map任务的输出会直接被写入由setOutputPath(Path)指定的输出路径。框架在把它们写入FileSystem之前没有对它们进行排序。
Partitioner
Partitioner对值空间进行划分。
Partitioner负责控制map输出结果key的切割。Key(或者一个key子集)被用于产生分区,通常使用的是Hash函数。分区的数目与一个作业的reduce任务的数目是一样的。因此,它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。
HashPartitioner是默认的 Partitioner。
Reporter
Reporter用于Map/Reduce应用程序报告进度。设定应用级别的状态消息, 更新Counters(计数器)的机制。
Mapper和Reducer的实现能够利用Reporter来报告进度。或者仅是表明自己执行正常。在那种应用程序须要花非常长时间处理个别键值对的场景中。这样的机制是非常关键的,由于框架可能会以为这个任务超时了,从而将它强行杀死。
还有一个避免这样的情况发生的方式是。将配置參数mapred.task.timeout设置为一个足够高的值(或者干脆设置为零,则没有超时限制了)。
应用程序能够用Reporter来更新Counter(计数器)。
OutputCollector
OutputCollector是一个Map/Reduce框架提供的用于收集Mapper或Reducer输出数据的通用机制(包含中间输出结果和作业的输出结果)。
Hadoop Map/Reduce框架附带了一个包括很多有用型的mapper、reducer和partitioner 的类库。
任务的运行和环境
TaskTracker是在一个单独的jvm上以子进程的形式运行 Mapper/Reducer任务(Task)的。
子任务会继承父TaskTracker的环境。用户能够通过JobConf中的 mapred.child.java.opts配置參数来设定子jvm上的附加选项。比如: 通过-Djava.library.path=<> 将一个非标准路径设为执行时的链接用以搜索共享库,等等。
假设mapred.child.java.opts包括一个符号@taskid@。 它会被替换成map/reduce的taskid的值。
以下是一个包括多个參数和替换的样例,当中包括:记录jvm GC日志; JVM JMX代理程序以无password的方式启动,这样它就能连接到jconsole上,从而能够查看子进程的内存和线程,得到线程的dump;还把子jvm的最大堆尺寸设置为512MB, 并为子jvm的java.library.path加入了一个附加路径。
<property>
<name>mapred.child.java.opts</name>
<value>
-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
</value>
</property>
用户或管理员也能够使用mapred.child.ulimit设定执行的子任务的最大虚拟内存。mapred.child.ulimit的值以(KB)为单位。而且必须大于或等于-Xmx參数传给JavaVM的值。否则VM会无法启动。
注意:mapred.child.java.opts仅仅用于设置task tracker启动的子任务。为守护进程设置内存选项请查看 cluster_setup.html
${mapred.local.dir}/taskTracker/是task tracker的本地文件夹, 用于创建本地缓存和job。它能够指定多个文件夹(跨越多个磁盘),文件会半随机的保存到本地路径下的某个文件夹。当job启动时,task tracker依据配置文档创建本地job文件夹,文件夹结构例如以下面所看到的:
- ${mapred.local.dir}/taskTracker/archive/ :分布式缓存。这个文件夹保存本地的分布式缓存。因此本地分布式缓存是在全部task和job间共享的。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job文件夹。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享文件夹。各个任务能够使用这个空间做为暂存空间。用于它们之间共享文件。
这个文件夹通过job.local.dir 參数暴露给用户。这个路径能够通过API JobConf.getJobLocalDir()来訪问。它也能够被做为系统属性获得。因此。用户(比方执行streaming)能够调用System.getProperty("job.local.dir")获得该文件夹。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 存放jar包的路径,用于存放作业的jar文件和展开的jar。job.jar是应用程序的jar文件。它会被自己主动分发到各台机器,在task启动前会被自己主动展开。使用api JobConf.getJar() 函数能够得到job.jar的位置。使用JobConf.getJar().getParent()能够訪问存放展开的jar包的文件夹。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一个job.xml文件。本地的通用的作业配置文件。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每一个任务有一个文件夹task-id,它里面有例如以下的文件夹结构:
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一个job.xml文件,本地化的任务作业配置文件。
任务本地化是指为该task设定特定的属性值。
这些值会在以下详细说明。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output一个存放中间过程的输出文件的文件夹。它保存了由framwork产生的暂时map reduce数据,比方map的输出文件等。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的当前工作文件夹。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的暂时文件夹。(用户能够设定属性mapred.child.tmp来为map和reduce task设定暂时文件夹。
缺省值是./tmp。
假设这个值不是绝对路径。 它会把task的工作路径加到该路径前面作为task的暂时文件路径。假设这个值是绝对路径则直接使用这个值。 假设指定的文件夹不存在,会自己主动创建该文件夹。
之后,依照选项 -Djava.io.tmpdir=‘暂时文件的绝对路径‘运行java子任务。
pipes和streaming的暂时文件路径是通过环境变量TMPDIR=‘the absolute path of the tmp dir‘设定的)。
假设mapred.child.tmp有./tmp值,这个文件夹会被创建。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一个job.xml文件,本地化的任务作业配置文件。
- ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享文件夹。各个任务能够使用这个空间做为暂存空间。用于它们之间共享文件。
以下的属性是为每一个task运行时使用的本地參数,它们保存在本地化的任务作业配置文件中:
名称 | 类型 | 描写叙述 |
---|---|---|
mapred.job.id | String | job id |
mapred.jar | String | job文件夹下job.jar的位置 |
job.local.dir | String | job指定的共享存储空间 |
mapred.tip.id | String | task id |
mapred.task.id | String | task尝试id |
mapred.task.is.map | boolean | 是否是map task |
mapred.task.partition | int | task在job中的id |
map.input.file | String | map读取的文件名称 |
map.input.start | long | map输入的数据块的起始位置偏移 |
map.input.length | long | map输入的数据块的字节数 |
mapred.work.output.dir | String | task暂时输出文件夹 |
task的标准输出和错误输出流会被读到TaskTracker中,而且记录到 ${HADOOP_LOG_DIR}/userlogs
DistributedCache可用于map或reduce task中分发jar包和本地库。子jvm总是把 当前工作文件夹 加到 java.library.path 和 LD_LIBRARY_PATH。
因此,能够通过 System.loadLibrary或 System.load装载缓存的库。有关使用分布式缓存载入共享库的细节请參考 native_libraries.html
作业的提交与监控
JobClient是用户提交的作业与JobTracker交互的主要接口。
JobClient 提供提交作业,追踪进程,訪问子任务的日志记录。获得Map/Reduce集群状态信息等功能。
作业提交过程包含:
- 检查作业输入输出样式细节
- 为作业计算InputSplit值。
- 假设须要的话,为作业的DistributedCache建立必须的统计信息。
- 拷贝作业的jar包和配置文件到FileSystem上的Map/Reduce系统文件夹下。
- 提交作业到JobTracker而且监控它的状态。
作业的历史文件记录到指定文件夹的"_logs/history/"子文件夹下。这个指定文件夹由hadoop.job.history.user.location设定。默认是作业输出的文件夹。因此默认情况下,文件会存放在mapred.output.dir/_logs/history文件夹下。
用户能够设置hadoop.job.history.user.location为none来停止日志记录。
用户使用以下的命令能够看到在指定文件夹下的历史日志记录的摘要。
$ bin/hadoop job -history output-dir
这个命令会打印出作业的细节,以及失败的和被杀死的任务细节。
要查看有关作业的很多其它细节比如成功的任务、每一个任务尝试的次数(task attempt)等,能够使用以下的命令
$ bin/hadoop job -history all output-dir
用户能够使用 OutputLogFilter从输出文件夹列表中筛选日志文件。
普通情况,用户利用JobConf创建应用程序并配置作业属性, 然后用 JobClient 提交作业并监视它的进程。
作业的控制
有时候。用一个单独的Map/Reduce作业并不能完毕一个复杂的任务,用户或许要链接多个Map/Reduce作业才行。这是easy实现的。由于作业通常输出到分布式文件系统上的。所以能够把这个作业的输出作为下一个作业的输入实现串联。
然而,这也意味着。确保每一作业完毕(成功或失败)的责任就直接落在了客户身上。在这样的情况下,能够用的控制作业的选项有:
- runJob(JobConf):提交作业,仅当作业完毕时返回。
- submitJob(JobConf):仅仅提交作业,之后须要你轮询它返回的
RunningJob句柄的状态。并依据情况调度。
- JobConf.setJobEndNotificationURI(String):设置一个作业完毕通知,可避免轮询。
作业的输入
InputFormat 为Map/Reduce作业描写叙述输入的细节规范。
Map/Reduce框架依据作业的InputFormat来:
- 检查作业输入的有效性。
- 把输入文件切分成多个逻辑InputSplit实例。 并把每一实例分别分发给一个 Mapper。
- 提供RecordReader的实现,这个RecordReader从逻辑InputSplit中获得输入记录, 这些记录将由Mapper处理。
基于文件的InputFormat实现(一般是 FileInputFormat的子类) 默认行为是依照输入文件的字节大小,把输入数据切分成逻辑分块(logicalInputSplit )。 当中输入文件所在的FileSystem的数据块尺寸是分块大小的上限。下限能够设置mapred.min.split.size的值。
考虑到边界情况。对于非常多应用程序来说,非常明显依照文件大小进行逻辑切割是不能满足需求的。
在这样的情况下,应用程序须要实现一个RecordReader来处理记录的边界并为每一个任务提供一个逻辑分块的面向记录的视图。
TextInputFormat 是默认的InputFormat。
假设一个作业的Inputformat是TextInputFormat, 而且框架检測到输入文件的后缀是.gz和.lzo,就会使用相应的CompressionCodec自己主动解压缩这些文件。 可是须要注意。上述带后缀的压缩文件不会被切分,而且整个压缩文件会分给一个mapper来处理。
InputSplit
InputSplit 是一个单独的Mapper要处理的数据块。
一般的InputSplit 是字节样式输入,然后由RecordReader处理并转化成记录样式。
FileSplit 是默认的InputSplit。 它把 map.input.file 设定为输入文件的路径,输入文件是逻辑分块文件。
RecordReader
RecordReader 从InputSlit读入<key, value>对。
一般的。RecordReader 把由InputSplit提供的字节样式的输入文件,转化成由Mapper处理的记录样式的文件。
因此RecordReader负责处理记录的边界情况和把数据表示成keys/values对形式。
作业的输出
OutputFormat 描写叙述Map/Reduce作业的输出样式。
Map/Reduce框架依据作业的OutputFormat来:
- 检验作业的输出,比如检查输出路径是否已经存在。
- 提供一个RecordWriter的实现,用来输出作业结果。
输出文件保存在FileSystem上。
TextOutputFormat是默认的 OutputFormat。
任务的Side-Effect File
在一些应用程序中,子任务须要产生一些side-file。这些文件与作业实际输出结果的文件不同。
在这样的情况下,同一个Mapper或者Reducer的两个实例(比方预防性任务)同一时候打开或者写 FileSystem上的同一文件就会产生冲突。因此应用程序在写文件的时候须要为每次任务尝试(不不过每次任务,每一个任务能够尝试运行非常多次)选取一个独一无二的文件名称(使用attemptid,比如task_200709221812_0001_m_000000_0)。
为了避免冲突,Map/Reduce框架为每次尝试运行任务都建立和维护一个特殊的 ${mapred.output.dir}/_temporary/_${taskid}子文件夹。这个文件夹位于本次尝试运行任务输出结果所在的FileSystem上。能够通过 ${mapred.work.output.dir}来訪问这个子文件夹。
对于成功完毕的任务尝试,仅仅有${mapred.output.dir}/_temporary/_${taskid}下的文件会移动到${mapred.output.dir}。当然。框架会丢弃那些失败的任务尝试的子文件夹。
这样的处理过程对于应用程序来说是全然透明的。
在任务运行期间,应用程序在写文件时能够利用这个特性,比方 通过FileOutputFormat.getWorkOutputPath()获得${mapred.work.output.dir}文件夹, 并在其下创建随意任务运行时所需的side-file,框架在任务尝试成功时会立即移动这些文件。因此不须要在程序内为每次任务尝试选取一个独一无二的名字。
注意:在每次任务尝试运行期间,${mapred.work.output.dir} 的值实际上是 ${mapred.output.dir}/_temporary/_{$taskid},这个值是Map/Reduce框架创建的。 所以使用这个特性的方法是,在FileOutputFormat.getWorkOutputPath() 路径下创建side-file就可以。
对于仅仅使用map不使用reduce的作业,这个结论也成立。这样的情况下。map的输出结果直接生成到HDFS上。
RecordWriter
RecordWriter 生成<key, value>对到输出文件。
RecordWriter的实现把作业的输出结果写到 FileSystem。
其它实用的特性
Counters
Counters 是多个由Map/Reduce框架或者应用程序定义的全局计数器。
每个Counter能够是不论什么一种 Enum类型。
同一特定Enum类型的Counter能够汇集到一个组,其类型为Counters.Group。
应用程序能够定义随意(Enum类型)的Counters而且能够通过 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long)更新。之后框架会汇总这些全局counters。
DistributedCache
DistributedCache 可将详细应用相关的、大尺寸的、仅仅读的文件有效地分布放置。
DistributedCache 是Map/Reduce框架提供的功能,可以缓存应用程序所需的文件 (包含文本,档案文件,jar文件等)。
应用程序在JobConf中通过url(hdfs://)指定须要被缓存的文件。
DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。
Map-Redcue框架在作业全部任务执行之前会把必要的文件复制到slave节点上。 它执行高效是由于每一个作业的文件仅仅拷贝一次而且为那些没有文档的slave节点缓存文档。
DistributedCache 依据缓存文档改动的时间戳进行追踪。
在作业运行期间。当前应用程序或者外部程序不能改动缓存文件。
distributedCache能够分发简单的仅仅读数据或文本文件,也能够分发复杂类型的文件比如归档文件和jar文件。
归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被解档(un-archived)。 这些文件能够设置运行权限。
用户能够通过设置mapred.cache.{files|archives}来分发文件。
假设要分发多个文件,能够使用逗号分隔文件所在路径。
也能够利用API来设置该属性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf)当中URI的形式是 hdfs://host:port/absolute-path#link-name在Streaming程序中,能够通过命令行选项 -cacheFile/-cacheArchive分发文件。
用户能够通过DistributedCache.createSymlink(Configuration)方法让DistributedCache在当前工作文件夹下创建到缓存文件的符号链接。 或者通过设置配置文件属性mapred.create.symlink为yes。 分布式缓存会截取URI的片段作为链接的名字。
比如。URI是 hdfs://namenode:port/lib.so.1#lib.so, 则在task当前工作文件夹会有名为lib.so的链接, 它会链接分布式缓存中的lib.so.1。
DistributedCache可在map/reduce任务中作为 一种基础软件分发机制使用。
它能够被用于分发jar包和本地库(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API可以被用于 缓存文件和jar包,并把它们增加子jvm的classpath。也可以通过设置配置文档里的属性 mapred.job.classpath.{files|archives}达到同样的效果。缓存文件可用于分发和装载本地库。
Tool
Tool接口支持处理经常使用的Hadoop命令行选项。
Tool 是Map/Reduce工具或应用的标准。应用程序应仅仅处理其定制參数。 要把标准命令行选项通过 ToolRunner.run(Tool, String[])托付给 GenericOptionsParser处理。
Hadoop命令行的经常使用选项有:
-conf <configuration file>
-D <property=value>
-fs <local|namenode:port>
-jt <local|jobtracker:port>
IsolationRunner
IsolationRunner 是帮助调试Map/Reduce程序的工具。
使用IsolationRunner的方法是,首先设置 keep.failed.task.files属性为true(同一时候參考keep.task.files.pattern)。
然后,登录到任务执行失败的节点上,进入 TaskTracker的本地路径执行
IsolationRunner:
$ cd <local path>/taskTracker/${taskid}/work
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
IsolationRunner会把失败的任务放在单独的一个可以调试的jvm上执行,而且採用和之前全然一样的输入数据。
Profiling
Profiling是一个工具,它使用内置的java profiler工具进行分析获得(2-3个)map或reduce例子执行分析报告。
用户能够通过设置属性mapred.task.profile指定系统是否採集profiler信息。
利用api JobConf.setProfileEnabled(boolean)能够改动属性值。假设设为true, 则开启profiling功能。profiler信息保存在用户日志文件夹下。
缺省情况。profiling功能是关闭的。
假设用户设定使用profiling功能,能够使用配置文档里的属性 mapred.task.profile.{maps|reduces}设置要profile map/reduce task的范围。设置该属性值的api是 JobConf.setProfileTaskRange(boolean,String)。
范围的缺省值是0-2。
用户能够通过设定配置文档里的属性mapred.task.profile.params来指定profiler配置參数。改动属性要使用api JobConf.setProfileParams(String)。当执行task时,假设字符串包括%s。 它会被替换成profileing的输出文件名称。这些參数会在命令行里传递到子JVM中。缺省的profiling 參数是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。
调试
Map/Reduce框架能够执行用户提供的用于调试的脚本程序。 当map/reduce任务失败时。用户能够通过执行脚本在任务日志(比如任务的标准输出、标准错误、系统日志以及作业配置文件)上做兴许处理工作。用户提供的调试脚本程序的标准输出和标准错误会输出为诊断文件。假设须要的话这些输出结果也能够打印在用户界面上。
在接下来的章节。我们讨论怎样与作业一起提交调试脚本。为了提交调试脚本。 首先要把这个脚本分发出去,并且还要在配置文件中设置。
怎样分发脚本文件:
用户要用 DistributedCache机制来分发和链接脚本文件
怎样提交脚本:
一个高速提交调试脚本的方法是分别为须要调试的map任务和reduce任务设置 "mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script" 属性的值。这些属性也能够通过 JobConf.setMapDebugScript(String) 和 JobConf.setReduceDebugScript(String) API来设置。
对于streaming, 能够分别为须要调试的map任务和reduce任务使用命令行选项-mapdebug 和 -reducedegug来提交调试脚本。
脚本的參数是任务的标准输出、标准错误、系统日志以及作业配置文件。
在执行map/reduce失败的节点上执行调试命令是:
$script $stdout $stderr $syslog $jobconf
Pipes 程序依据第五个參数获得c++程序名。 因此调试pipes程序的命令是
$script $stdout $stderr $syslog $jobconf $program
默认行为
对于pipes,默认的脚本会用gdb处理core dump, 打印 stack trace而且给出正在执行线程的信息。
JobControl
JobControl是一个工具,它封装了一组Map/Reduce作业以及他们之间的依赖关系。
数据压缩
Hadoop Map/Reduce框架为应用程序的写入文件操作提供压缩工具,这些工具能够为map输出的中间数据和作业终于输出数据(比如reduce的输出)提供支持。它还附带了一些 CompressionCodec的实现。比方实现了 zlib和lzo压缩算法。 Hadoop相同支持gzip文件格式。
考虑到性能问题(zlib)以及Java类库的缺失(lzo)等因素,Hadoop也为上述压缩解压算法提供本地库的实现。很多其它的细节请參考 这里。
中间输出
应用程序能够通过 JobConf.setCompressMapOutput(boolean)api控制map输出的中间结果。而且能够通过 JobConf.setMapOutputCompressorClass(Class)api指定 CompressionCodec。
作业输出
应用程序能够通过 FileOutputFormat.setCompressOutput(JobConf, boolean) api控制输出是否须要压缩而且能够使用 FileOutputFormat.setOutputCompressorClass(JobConf, Class)api指定CompressionCodec。
假设作业输出要保存成 SequenceFileOutputFormat格式。须要使用 SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,来设定 SequenceFile.CompressionType (i.e. RECORD / BLOCK - 默认是RECORD)。
样例:WordCount v2.0
这里是一个更全面的WordCount样例。它使用了我们已经讨论过的非常多Map/Reduce框架提供的功能。
执行这个样例须要HDFS的某些功能。特别是 DistributedCache相关功能。因此这个样例仅仅能执行在 伪分布式 或者 全然分布式模式的 Hadoop上。
package org.myorg; import java.io.*; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount extends Configured implements Tool { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { static enum Counters { INPUT_WORDS } private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive = true; private Set<String> patternsToSkip = new HashSet<String>(); private long numRecords = 0; private String inputFile; public void configure(JobConf job) { caseSensitive = job.getBoolean("wordcount.case.sensitive", true); inputFile = job.get("map.input.file"); if (job.getBoolean("wordcount.skip.patterns", false)) { Path[] patternsFiles = new Path[0]; try { patternsFiles = DistributedCache.getLocalCacheFiles(job); } catch (IOException ioe) { System.err .println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe)); } for (Path patternsFile : patternsFiles) { parseSkipFile(patternsFile); } } } private void parseSkipFile(Path patternsFile) { try { BufferedReader fis = new BufferedReader(new FileReader( patternsFile.toString())); String pattern = null; while ((pattern = fis.readLine()) != null) { patternsToSkip.add(pattern); } } catch (IOException ioe) { System.err .println("Caught exception while parsing the cached file ‘" + patternsFile + "‘ : " + StringUtils.stringifyException(ioe)); } } public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = (caseSensitive) ? value.toString() : value.toString() .toLowerCase(); for (String pattern : patternsToSkip) { line = line.replaceAll(pattern, ""); } StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); reporter.incrCounter(Counters.INPUT_WORDS, 1); } if ((++numRecords % 100) == 0) { reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); List<String> other_args = new ArrayList<String>(); for (int i = 0; i < args.length; ++i) { if ("-skip".equals(args[i])) { DistributedCache .addCacheFile(new Path(args[++i]).toUri(), conf); conf.setBoolean("wordcount.skip.patterns", true); } else { other_args.add(args[i]); } } FileInputFormat.setInputPaths(conf, new Path(other_args.get(0))); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }
执行例子
输入例子:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World, Bye World!
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.
执行程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1
注意此时的输入与第一个版本号的不同,输出的结果也有不同。
如今通过DistributedCache插入一个模式文件,文件里保存了要被忽略的单词模式。
$ hadoop dfs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to
再执行一次。这次使用很多其它的选项:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
应该得到这种输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1
再执行一次,这一次关闭大写和小写敏感性(case-sensitivity):
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2
程序要点
通过使用一些Map/Reduce框架提供的功能,WordCount的第二个版本号在原始版本号基础上有了例如以下的改进:
- 展示了应用程序怎样在Mapper (和Reducer)中通过configure方法 改动配置參数(28-43行)。
- 展示了作业怎样使用DistributedCache 来分发仅仅读数据。 这里同意用户指定单词的模式。在计数时忽略那些符合模式的单词(104行)。
- 展示Tool接口和GenericOptionsParser处理Hadoop命令行选项的功能 (87-116, 119行)。
- 展示了应用程序怎样使用Counters(68行),怎样通过传递给map(和reduce) 方法的Reporter实例来设置应用程序的状态信息(72行)。
Java和JNI是Sun Microsystems, Inc.在美国和其他国家的注冊商标。
一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)