首页 > 代码库 > hadoop mapreduce中对splite的处理
hadoop mapreduce中对splite的处理
分片:
1. 在job.submit() 提交job之后 会调用 submitter.submitJobInternal(Job.this, cluster);
2. 在submitJobInternal()函数中 会给job创建分片 int maps = writeSplits(job, submitJobDir); 在该函数中会调用writeNewSplits()
3. 在writeNewSplits()方法 中,通过反射获得InputFormat对象,会调用该对象中的getSplits()方法,来进行分片,从而得到InputSplit[] 数组. 然后会通过 JobSplitWriter.createSplitFiles方法将数组内容写出. writeNewSplits方法返回的是分片数目,决定了会创建多少个 map task.
4. 在JobSplitWriter.createSplitFiles方法中, 会打开一个输出流out,输出文件名是(${jobSubmitDir}/job.split)
其中调用writeNewSplits()来完成写出操作,.与此同时,该函数会返回一个SplitMetaInfo的数组.
在该数据结构中主要包括三个属性:
long startOffset : 该分片在 job.split 中的偏移量
long inputDataLength: 分片数据长度
String[] locations: hosts on which this split is local
5.最后调用writeJobSplitMetaInfo()方法 将第5步中的SplitMetaInfo数组写入到另一个文件中,文件名是${jobSubmitDir}/job.splitmetainfo.
6. 上述步骤中最终会输出两个文件 job.split 和 job.splitmetainfo.
在 job.split内容: split的类名, split的序列化信息,在FileSplit类中会写入文件名,偏移量,和长度
在 job.splitmetainfo中内容: META_SPLIT_FILE_HEADER , version , 分片数量, splitMetaInfo序列化(包括locations的数目, 以此写入所有的locations, startOffset,inputDataLength)
到此为止,将分片信息记录完成,写入到HDFS中相应的文件中.
读取分片:
1. 首先在JobImpl类中的InitTransition中会读取相应的split信息, 并启动相应的Task
2. TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId)
在该函数中会通过SplitMetaInfoReader.readSplitMetaInfo函数从job.splitmetainfo文件中读取出相应信息,首先会先验证META_SPLIT_VERSION和numSplits,然后会依次读取出每个splitMetaInfo,根据splitMetaInfo再从 job.split 中读取相应数据,构建出TaskSplitIndex对象,然后得到TaskSplitMetaInfo对象,最后返回 TaskSplitMetaInfo[]数组.
TaskSplitIndex有两个属性: String splitLocation 和 long startOffset;
TaskSplitMetaInfo有三个属性: TaskSplitIndex splitIndex , long inputDataLength, String[] locations
3. 在创建Map任务的时候会将该数组传入 , createMapTasks(job, inputLength, taskSplitMetaInfo);
对应每个taskSplitMetaInfo 会创建一个TaskImpl,并传入对应taskSplitMetaInfo
TaskImpl task =new MapTaskImpl(....);
4. 在MapTaskImpl中会创建MapTaskAttemptImpl对象,该对象中存在createRemoteTask方法,在改方法中创建了实际的MapTask对象
MapTask mapTask =new MapTask("", TypeConverter.fromYarn(getID()), partition,splitInfo.getSplitIndex(), 1);
splitInfo.getSplitIndex()会返回一个TaskSplitIndex对象,
5. 在MapTask执行runNewMapper方法时,会通过
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());
读取到实际的文件,最后通过InputFormat接口的createRecordReader方法得到需要的RecordReader.
hadoop mapreduce中对splite的处理
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。