首页 > 代码库 > Haddop随笔(一):工作流程的源码

Haddop随笔(一):工作流程的源码

一、几个可能会用到的属性值

  1、mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution

  这两个属性可以决定Map任务和Reduce任务是否开启推测式执行策略。推测式执行策略在Hadoop中用来应对执行缓慢的任务所造成的瓶颈,但是对代码缺陷所导致的任务执行过慢,推测执行是一种反向的作用,应当避免,而Hadoop默认是开启推测式执行的。

  2、mapred.job.reuse.jvm.num.tasks

  这个属性值默认为1,表示一个JVM上只运行一个任务。如果Task Tracker具有很多小任务,重复启动JVM是非常耗时的,可以考虑任务JVM重用。将属性值设置为大于1的值表示启用了JVM;设置为-1表示共享此JVM的任务数目不受限制。尽管启用了JVM重用,但是Task Tracker上的多个任务还是顺序执行的,只是不需要额外启动JVM而已。

  3、Task Tracker的本地参数值

名称类型描述
mapred.job.idStringjob id
mapred.jarStringjob目录下job.jar的位置
job.local.dirStringjob指定的共享存储空间
mapred.tip.idStringtask id
mapred.task.idStringtask尝试id
mapred.task.partitioninttask在job中的id
map.input.fileStringmap读取的文件名
map.input.startlongmap输入的数据块的起始位置的偏移
map.input.lengthlongmap输入的数据块的字节数
mapred.work.output.dirStringtask临时输出目录

 二、Hadoop工作流程

  以下Hadoop源码来源于hadoop-1.2.1。

 

  1、作业的提交

  hadoop项目的第一步自然是根据需求进行代码编写,而后是需要配置Map类、Reduce类、Input路径、Output路径以及其他的虚拟机配置等。但这些操作只是为hadoop形成了一个作业,还算不上Hadoop的工作流程,将项目提交执行后,hadoop进入完全自动的运行方式,工作流程才开始启动。

  作业的提交代码实现于JobClient类(JobClient.java)的submitJobInternal方法,分为以下几个步骤:

  a.获取Job的ID

    JobID jobId = jobSubmitClient.getNewJobId();

  b.分配job在HDFS中的资源空间,配置其路径

    Path submitJobDir = new Path(jobStagingArea, jobId.toString());    jobCopy.set("mapreduce.job.dir", submitJobDir.toString());    //jobCopy是拷贝的一个job副本    JobStatus status = null;  

  c.监控地址令牌,直到可以对该空间进行操作。

    TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),new Path [] {submitJobDir},jobCopy);       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);       int reduces = jobCopy.getNumReduceTasks();       InetAddress ip = InetAddress.getLocalHost();       if (ip != null) {            job.setJobSubmitHostAddress(ip.getHostAddress());            job.setJobSubmitHostName(ip.getHostName());       }       JobContext context = new JobContext(jobCopy, jobId);

  d.检查output类型

    if (reduces == 0 ? jobCopy.getUseNewMapper() :             jobCopy.getUseNewReducer()) {            org.apache.hadoop.mapreduce.OutputFormat<?,?> output =              ReflectionUtils.newInstance(context.getOutputFormatClass(),                  jobCopy);            output.checkOutputSpecs(context);          } else {            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);          }

  e.为job划分splits

    FileSystem fs = submitJobDir.getFileSystem(jobCopy);       LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));       int maps = writeSplits(context, submitJobDir);       jobCopy.setNumMapTasks(maps);

  f.在job的配置文件中写入相关的排队、配置信息

    String queue = jobCopy.getQueueName();       AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);       jobCopy.set(QueueManager.toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

  g.将job的信息通知给JobTrack

    FSDataOutputStream out = FileSystem.create(fs, submitJobFile,new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

  f.将job写入到HDFS中。需要注意的是,写入job之前要先将cache中的数据清除。这样做的原因:cache中的数据对当前job是没有用的了,如果在job写入之前清除是没有影响的;但是,如果在写入之后清除,cache中的很可能已经被更改为其他job的信息了,清除cache中的数据会破坏其他的job。

    TokenCache.cleanUpTokenReferral(jobCopy);       try {            jobCopy.writeXml(out);       } finally {            out.close();       }

  g.调用submitJob类来真正的提交job

    printTokens(jobId, jobCopy.getCredentials());    status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials());    JobProfile prof = jobSubmitClient.getJobProfile(jobId);    if (status != null && prof != null) {            return new NetworkedJob(status, prof, jobSubmitClient);       } else {            throw new IOException("Could not launch job");       }

  至此,一个作业就被完整地提交给JobTrack和HDFS了。

 

  2、作业的初始化

  作业的初始化主要是考虑到map和reduce都需要进行初始化,因此需要注意其中的细节。

  作业的初始化源码实现于JobInprogress类(JobInProgress.java)的initTask方法。作业初始化的步骤如下:

  a.首先需要获取job的一些配置信息和运行信息

    if (!jobtracker.getConf().getBoolean(JT_JOB_INIT_EXCEPTION_OVERRIDE, false) && getJobConf().getBoolean(JOB_INIT_EXCEPTION, false)) {          waitForInitWaitLockForTests();      }    if (tasksInited || isComplete()) {      return;    }    synchronized(jobInitKillStatus){    //互锁操作      if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {        return;    }    jobInitKillStatus.initStarted = true;    }

  b.获取job的执行信息和优先级信息

  c.根据input splits的数目决定map的数目,每一个splits都需要一个map

    TaskSplitMetaInfo[] splits = createSplits(jobId);    if (numMapTasks != splits.length) {    throw new IOException("Number of maps in JobConf doesn‘t match number of " +          "recieved splits for job " + jobId + "! " +          "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);    }    numMapTasks = splits.length;

  d.检查splits的位置,保证创建的map/reduce或者初始化的map/reduce都是有意义的,同时还需要在监控进程中设置map和reduce的信息

    for (TaskSplitMetaInfo split : splits) {      NetUtils.verifyHostnames(split.getLocations());    }    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);    jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);    this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);    this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);

  e.为splits分配map

    maps = new TaskInProgress[numMapTasks];    for(int i=0; i < numMapTasks; ++i) {      inputLength += splits[i].getInputDataLength();      maps[i] = new TaskInProgress(jobId, jobFile,                                    splits[i],                                    jobtracker, conf, this, i, numSlotsPerMap);    }    LOG.info("Input size for job " + jobId + " = " + inputLength+ ". Number of splits = " + splits.length);

  f.将map放入到等待执行的缓冲区内

    localityWaitFactor = conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);    if (numMapTasks > 0) {       nonRunningMapCache = createCache(splits, maxLevel);    }    // set the launch time    this.launchTime = jobtracker.getClock().getTime();

  g.同样的对待reduce操作

  h.创建两个清除进程,一个清除map,一个清除reduce

    cleanup = new TaskInProgress[2];    // cleanup map tip. This map doesn‘t use any splits. Just assign an empty    // split.    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1);    cleanup[0].setJobCleanupTask();    // cleanup reduce tip.    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,                       numReduceTasks, jobtracker, conf, this, 1);    cleanup[1].setJobCleanupTask();

  i.创建两个设置进程,分别对map和reduce进行初始化设置

    setup = new TaskInProgress[2];    // setup map tip. This map doesn‘t use any split. Just assign an empty    // split.    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1);    setup[0].setJobSetupTask();     // setup reduce tip.    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,                       numReduceTasks + 1, jobtracker, conf, this, 1);    setup[1].setJobSetupTask();

  j.最后,设置一个互锁的方法,用来检查初始化是否成功,并写入日志

    synchronized(jobInitKillStatus){      jobInitKillStatus.initDone = true;       // set this before the throw to make sure cleanup works properly      tasksInited = true;      if(jobInitKillStatus.killed) {        throw new KillInterruptedException("Job " + jobId + " killed in init");      }    }    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, numMapTasks, numReduceTasks);

  至此,作业的初始化操作就已经完成了。由于初始化操作需要根据splits数目来确定需要初始化的map和reduce操作,因此相比较有点复杂。

 

  3、作业的分配

  作业的分配需要JobTrack和TaskTrack相互协调,JobTrack根据TaskTrack发送的心跳信息来决定任务的分配方法。心跳信息会报告当前任务的状况,会提出新的任务请求,或者提出任务执行失败的报告等。JobTrack接受到信息后会采取相应的措施。

  Hadoop的作业分配是一种“拉”的方式,TaskTrack发送心跳信息(源码实现为TaskTrack类(TaskTrack.java)的transmitHeartBeat方法)给JobTrack来请求一个新的任务,而这个TaskTrack提出任务请求时也会提供自己当前map/reduce任务槽的数量供JobTrack参考。JobTrack根据心跳信息(源码实现为JobTrack类(JobTrack.java)的heartbeat方法)在本地优先的情况下对该TaskTrack分配Task。

 

  4、作业的执行

  作业的执行同样在TaskTrack中完成, 在作业执行时,第一步是将任务进行本地化,第二步是通过虚拟机执行任务。

  任务本地化,基于TaskTrack.java的localizeJob方法实现,主要步骤为:

  a.将job.split复制到本地;b.将job.jar复制到本地;c.将job的配置信息写入job.xml;d.创建本地目录,解压缩job.jar;e.调用launchTaskForJob方法发布任务;f.调用launchTask方法启动任务。

  任务的执行包括map执行(MapTaskRunner类)和reduce执行(ReduceTaskRunner类),每个任务的执行都是通过一个JVM实现的。

 

  5、作业的进度

  作业进度的更新依赖于TaskTrack向JobTrack发送的心跳信息。每个TaskTrack都会将自己的进度信息和状态信息封装在心跳信息中,每隔5秒向JobTrack发送一次。JobTrack在收集所有的信息后统一并得出全局信息。

  

Haddop随笔(一):工作流程的源码