首页 > 代码库 > hadoop mapreduce中对splite的处理

hadoop mapreduce中对splite的处理

分片:
1. 在job.submit() 提交job之后  会调用 submitter.submitJobInternal(Job.this, cluster);
 
2. 在submitJobInternal()函数中 会给job创建分片 int maps = writeSplits(job, submitJobDir); 在该函数中会调用writeNewSplits()
 
3. 在writeNewSplits()方法 中,通过反射获得InputFormat对象,会调用该对象中的getSplits()方法,来进行分片,从而得到InputSplit[] 数组. 然后会通过 JobSplitWriter.createSplitFiles方法将数组内容写出. writeNewSplits方法返回的是分片数目,决定了会创建多少个 map task.
 
4.  在JobSplitWriter.createSplitFiles方法中, 会打开一个输出流out,输出文件名是(${jobSubmitDir}/job.split)
    其中调用writeNewSplits()来完成写出操作,.与此同时,该函数会返回一个SplitMetaInfo的数组.
    在该数据结构中主要包括三个属性:
    long   startOffset :  该分片在 job.split 中的偏移量
    long inputDataLength: 分片数据长度
    String[] locations: hosts on which this split is local
 
5.最后调用writeJobSplitMetaInfo()方法  将第5步中的SplitMetaInfo数组写入到另一个文件中,文件名是${jobSubmitDir}/job.splitmetainfo.
 
6. 上述步骤中最终会输出两个文件 job.split 和 job.splitmetainfo.
    在 job.split内容:   split的类名, split的序列化信息,在FileSplit类中会写入文件名,偏移量,和长度
    在 job.splitmetainfo中内容: META_SPLIT_FILE_HEADER , version , 分片数量, splitMetaInfo序列化(包括locations的数目, 以此写入所有的locations, startOffset,inputDataLength)
 
到此为止,将分片信息记录完成,写入到HDFS中相应的文件中.
 

读取分片:
1. 首先在JobImpl类中的InitTransition中会读取相应的split信息, 并启动相应的Task
 
2. TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId)
    在该函数中会通过SplitMetaInfoReader.readSplitMetaInfo函数从job.splitmetainfo文件中读取出相应信息,首先会先验证META_SPLIT_VERSION和numSplits,然后会依次读取出每个splitMetaInfo,根据splitMetaInfo再从 job.split 中读取相应数据,构建出TaskSplitIndex对象,然后得到TaskSplitMetaInfo对象,最后返回 TaskSplitMetaInfo[]数组.
 TaskSplitIndex有两个属性: String splitLocation 和  long startOffset;
 TaskSplitMetaInfo有三个属性: TaskSplitIndex splitIndex , long inputDataLength,  String[] locations
    
 
3. 在创建Map任务的时候会将该数组传入 , createMapTasks(job, inputLength, taskSplitMetaInfo);
    对应每个taskSplitMetaInfo 会创建一个TaskImpl,并传入对应taskSplitMetaInfo
     TaskImpl task =new MapTaskImpl(....);
 
4. 在MapTaskImpl中会创建MapTaskAttemptImpl对象,该对象中存在createRemoteTask方法,在改方法中创建了实际的MapTask对象
MapTask mapTask =new MapTask("", TypeConverter.fromYarn(getID()), partition,splitInfo.getSplitIndex(), 1);
splitInfo.getSplitIndex()会返回一个TaskSplitIndex对象,
 
5. 在MapTask执行runNewMapper方法时,会通过
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());
    读取到实际的文件,最后通过InputFormat接口的createRecordReader方法得到需要的RecordReader.

hadoop mapreduce中对splite的处理