一个MapReduce任务在Hadoop中称为Job,而JobTracker顾名思义就是对Job进行管理的节点,一个Job包含多个Map和Reduce任务,在Hadoop里Map和Reduce任务称为Task,而Job指的是Map-Reduce流程的称呼。一个Job包含多个Map Task和Reduce Task,在看任务提交代码之前,需要有一些基本的认识:

1、Job所需要的输入数据、资源(数据分布信息、参数配置等等)都存放于HDFS之上,其中资源信息需要Job客户端先提交至HDFS之上,这些资源信息并不传输至JobTracker,因为JobTracker本身也能随便访问HDFS,所以JobTracker是去HDFS中获得相应信息后再进行Map和Reduce Task分配;

2、JobClient和JobTracker可以看作CS结构,JobClient往HDFS中存入资源后,会朝JobTracker提交任务,至于到底传输给JobTracker些什么内容,实际上只是一个Job ID以及Job所在的HDFS文件目录等基本信息,需要注意的是,他们之间并不直接传递任何计算数据和资源数据,因为他们都是HDFS的客户端,都可以访问HDFS系统。


4、Map Task和Reduce Task最终运行于TaskTracker之上,TaskTracker一般是一台安装了JAVA虚拟机的Linux服务器,启动Map Task和Reduce Task时会启动JAVA虚拟机,执行Map或Reduce任务(因此Map、Reduce都是一个个JAVA进程),JAVA虚拟机启动的速度本身还是比较快,运行完毕后通知JobTracker,关闭JAVA虚拟机。一个TaskTracker可以启动很多个JAVA进程执行很多Map和Reduce任务,在YARN(Hadoop 2.0的分布式资源管理系统)中,可以指定一个Map、Reduce任务需要多少CPU核和内存,目前PC服务器一般有几十个核,和64GB以上内存,所以执行几十个Map/Reduce任务也是正常的;在YARN之前,可以配置一台服务器可以执行多少个Map/Reduce任务,但并不考虑各个Map/Reduce任务消耗资源的区别。

5、JobClient利用RPC机制请求JobTracker的服务,比如分配Job ID、启动任务、停止任务、查看Job进展等等。RPC机制是Hadoop里面一个很核心的部分,理解RPC机制是理解Hadoop的前提。JobTracker是MapReduce中最重要的一个类,实现了很多接口:

public class JobTracker implements MRConstants, InterTrackerProtocol,    JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,    JobTrackerMXBean {。。。。。。


interface JobSubmissionProtocol extends VersionedProtocol {  public JobID getNewJobId() throws IOException;  public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts)   throws IOException;  public ClusterStatus getClusterStatus(boolean detailed) throws IOException;  public void killJob(JobID jobid) throws IOException;  public void setJobPriority(JobID jobid, String priority)                                                      throws IOException;  public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;   public JobProfile getJobProfile(JobID jobid) throws IOException;  public JobStatus getJobStatus(JobID jobid) throws IOException;  public Counters getJobCounters(JobID jobid) throws IOException;     public TaskReport[] getMapTaskReports(JobID jobid) throws IOException;  public TaskReport[] getReduceTaskReports(JobID jobid) throws IOException;  public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException;  public TaskReport[] getSetupTaskReports(JobID jobid) throws IOException;  ........}

在JobClient这一端,使用动态代理机制(至于什么是动态代理,参考JAVA Proxy、InvocationHandler相关类),在调用JobSubmissionProtocol的下面方法(这个方法在Job客户端并没有具体实现)时:

  public JobID getNewJobId() throws IOException;



  /**   * Submit the job to the cluster and return immediately.   * @throws IOException   */  public void submit() throws IOException, InterruptedException,                               ClassNotFoundException {    ensureState(JobState.DEFINE);    setUseNewAPI();        // Connect to the JobTracker and submit the job    connect();    info = jobClient.submitJobInternal(conf);    super.setJobID(info.getID());    state = JobState.RUNNING;   }




  private void connect() throws IOException, InterruptedException {    ugi.doAs(new PrivilegedExceptionAction<Object>() {      public Object run() throws IOException {        jobClient = new JobClient((JobConf) getConfiguration());            return null;      }    });  }






  public JobClient(JobConf conf) throws IOException {    setConf(conf);    init(conf);  }  public void init(JobConf conf) throws IOException {    String tracker = conf.get("mapred.job.tracker", "local");    tasklogtimeout = conf.getInt(      TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);    this.ugi = UserGroupInformation.getCurrentUser();    if ("local".equals(tracker)) {      conf.setNumMapTasks(1);      this.jobSubmitClient = new LocalJobRunner(conf);    } else {      this.rpcJobSubmitClient =           createRPCProxy(JobTracker.getAddress(conf), conf);      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);    }          }

rpcJobSubmitClient 是一个JobSubmissionProtocol 对象,而JobSubmissionProtocol 是一个与JobTracker服务相关的RPC接口,提供了一些服务访问方法。

  private JobSubmissionProtocol rpcJobSubmitClient;

在createRPCProxy方法中,调用了Hadoop RPC的创建代理的方法RPC.getProxy:

  private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,      Configuration conf) throws IOException {        JobSubmissionProtocol rpcJobSubmitClient =         (JobSubmissionProtocol)RPC.getProxy(            JobSubmissionProtocol.class,            JobSubmissionProtocol.versionID, addr,             UserGroupInformation.getCurrentUser(), conf,            NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class),             0,            RetryUtils.getMultipleLinearRandomRetry(                conf,                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT                ),            false);        return rpcJobSubmitClient;  }


  /** Construct a client-side proxy object that implements the named protocol,   * talking to a server at the named address. */  public static VersionedProtocol getProxy(      Class<? extends VersionedProtocol> protocol,      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,      Configuration conf, SocketFactory factory, int rpcTimeout,      RetryPolicy connectionRetryPolicy,      boolean checkVersion) throws IOException {    if (UserGroupInformation.isSecurityEnabled()) {      SaslRpcServer.init(conf);    }    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,        rpcTimeout, connectionRetryPolicy);    VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(        protocol.getClassLoader(), new Class[]{protocol}, invoker);        if (checkVersion) {      checkVersion(protocol, clientVersion, proxy);    }    return proxy;  }

而Invoker是一个实现了java.lang.reflect.InvocationHandler的类,负责代理JobSubmissionProtocol的各种服务,当JobClient调用JobSubmissionProtocol的方法(比如JobID getNewJobId() )时,会进入Invoker的invoke方法,而在该方法中,会将调用的方法信息打包送至JobTracker执行:

  private static class Invoker implements InvocationHandler {    private Client.ConnectionId remoteId;    private Client client;    private boolean isClosed = false;   .......    public Object invoke(Object proxy, Method method, Object[] args)      throws Throwable {   ......      ObjectWritable value = (ObjectWritable)        client.call(new Invocation(method, args), remoteId);   .....return value.get();    }


  private static class Invocation implements Writable, Configurable {    private String methodName;    private Class[] parameterClasses;    private Object[] parameters;    private Configuration conf;.......

然后在call方法中,将这一信息序列化(Invocation 是一个可序列化对象)后送出去:

  /** Make a call, passing <code>param</code>, to the IPC server defined by   * <code>remoteId</code>, returning the value.     * Throws exceptions if there are network problems or if the remote code    * threw an exception. */  public Writable call(Writable param, ConnectionId remoteId)                         throws InterruptedException, IOException {    Call call = new Call(param);    Connection connection = getConnection(remoteId, call);    connection.sendParam(call);                 // send the parameter    boolean interrupted = false;    synchronized (call) {      while (!call.done) {        try {          call.wait();                           // wait for the result        } catch (InterruptedException ie) {          // save the fact that we were interrupted          interrupted = true;        }      }。。。。。。    }  }


    /** Initiates a call by sending the parameter to the remote server.     * Note: this is not called from the Connection thread, but by other     * threads.     */    public void sendParam(Call call) {      if (shouldCloseConnection.get()) {        return;      }      DataOutputBuffer d=null;      try {        synchronized (this.out) {          if (LOG.isDebugEnabled())            LOG.debug(getName() + " sending #" + call.id);                    //for serializing the          //data to be written          d = new DataOutputBuffer();          d.writeInt(call.id);          call.param.write(d);          byte[] data =http://www.mamicode.com/ d.getData();          int dataLength = d.getLength();          out.writeInt(dataLength);      //first put the data length          out.write(data, 0, dataLength);//write the data          out.flush();        }      } catch(IOException e) {        markClosed(e);      } finally {        //the buffer is just an in-memory buffer, but it is still polite to        // close early        IOUtils.closeStream(d);      }    }  


  /**   * Internal method for submitting jobs to the system.   * @param job the configuration to submit   * @return a proxy object for the running job   * @throws FileNotFoundException   * @throws ClassNotFoundException   * @throws InterruptedException   * @throws IOException   */  public   RunningJob submitJobInternal(final JobConf job                               ) throws FileNotFoundException,                                         ClassNotFoundException,                                        InterruptedException,                                        IOException {    /*     * configure the command line options correctly on the submitting dfs     */    return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {      public RunningJob run() throws FileNotFoundException,       ClassNotFoundException,      InterruptedException,      IOException{        JobConf jobCopy = job;        Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,            jobCopy);        JobID jobId = jobSubmitClient.getNewJobId();        Path submitJobDir = new Path(jobStagingArea, jobId.toString());        jobCopy.set("mapreduce.job.dir", submitJobDir.toString());        JobStatus status = null;        try {          populateTokenCache(jobCopy, jobCopy.getCredentials());          copyAndConfigureFiles(jobCopy, submitJobDir);          // get delegation token for the dir          TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),                                              new Path [] {submitJobDir},                                              jobCopy);          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);          int reduces = jobCopy.getNumReduceTasks();          InetAddress ip = InetAddress.getLocalHost();          if (ip != null) {            job.setJobSubmitHostAddress(ip.getHostAddress());            job.setJobSubmitHostName(ip.getHostName());          }          JobContext context = new JobContext(jobCopy, jobId);          // Check the output specification          if (reduces == 0 ? jobCopy.getUseNewMapper() :             jobCopy.getUseNewReducer()) {            org.apache.hadoop.mapreduce.OutputFormat<?,?> output =              ReflectionUtils.newInstance(context.getOutputFormatClass(),                  jobCopy);            output.checkOutputSpecs(context);          } else {            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);          }                    jobCopy = (JobConf)context.getConfiguration();          // Create the splits for the job          FileSystem fs = submitJobDir.getFileSystem(jobCopy);          LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));          int maps = writeSplits(context, submitJobDir);          jobCopy.setNumMapTasks(maps);          // write "queue admins of the queue to which job is being submitted"          // to job file.          String queue = jobCopy.getQueueName();          AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);          jobCopy.set(QueueManager.toFullPropertyName(queue,              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());          // Write job file to JobTracker‘s fs                  FSDataOutputStream out =             FileSystem.create(fs, submitJobFile,                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));          // removing jobtoken referrals before copying the jobconf to HDFS          // as the tasks don‘t need this setting, actually they may break          // because of it if present as the referral will point to a          // different job.          TokenCache.cleanUpTokenReferral(jobCopy);          try {            jobCopy.writeXml(out);          } finally {            out.close();          }          //          // Now, actually submit the job (using the submit name)          //          printTokens(jobId, jobCopy.getCredentials());          status = jobSubmitClient.submitJob(              jobId, submitJobDir.toString(), jobCopy.getCredentials());          JobProfile prof = jobSubmitClient.getJobProfile(jobId);          if (status != null && prof != null) {            return new NetworkedJob(status, prof, jobSubmitClient);          } else {            throw new IOException("Could not launch job");          }        } finally {          if (status == null) {            LOG.info("Cleaning up the staging area " + submitJobDir);            if (fs != null && submitJobDir != null)              fs.delete(submitJobDir, true);          }        }      }    });  }





   Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,jobCopy); 




  public static Path getStagingDir(JobClient client, Configuration conf)   throws IOException, InterruptedException {    Path stagingArea = client.getStagingAreaDir();    FileSystem fs = stagingArea.getFileSystem(conf);。。。。    return stagingArea;  }


  public Path getStagingAreaDir() throws IOException {    if (stagingAreaDir == null) {      stagingAreaDir = new Path(jobSubmitClient.getStagingAreaDir());    }    return stagingAreaDir;  }    


  public String getStagingAreaDir() throws IOException {    // Check for safe-mode    checkSafeMode();    try{      final String user =        UserGroupInformation.getCurrentUser().getShortUserName();      return getMROwner().doAs(new PrivilegedExceptionAction<String>() {        @Override        public String run() throws Exception {          return getStagingAreaDirInternal(user);        }      });    } catch(InterruptedException ie) {      throw new IOException(ie);    }  }


  private String getStagingAreaDirInternal(String user) throws IOException {    final Path stagingRootDir =      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",            "/tmp/hadoop/mapred/staging"));    final FileSystem fs = stagingRootDir.getFileSystem(conf);    return fs.makeQualified(new Path(stagingRootDir,                              user+"/.staging")).toString();  }


2、获取Job ID和Job工作目录(但还没创建):


        JobID jobId = jobSubmitClient.getNewJobId();        Path submitJobDir = new Path(jobStagingArea, jobId.toString());        jobCopy.set("mapreduce.job.dir", submitJobDir.toString());



getNewJobId这个方法也是通过RPC获得ID信息(job id从1开始递增),然后在HDFS中的staging目录下创建工作目录,将这个目录设置成mapreduce.job.dir的值,最终目录一般呈现类似这种形式:
Job ID的生成一般由时间等组成,具体在JobTracker方法getNewJobId中:

  public synchronized JobID getNewJobId() throws IOException {    // Check for JobTracker operational state    checkJobTrackerState();        return new JobID(getTrackerIdentifier(), nextJobId++);  }



          copyAndConfigureFiles(jobCopy, submitJobDir);


  private void copyAndConfigureFiles(JobConf job, Path jobSubmitDir)   throws IOException, InterruptedException {    short replication = (short)job.getInt("mapred.submit.replication", 10);    copyAndConfigureFiles(job, jobSubmitDir, replication);    // Set the working directory    if (job.getWorkingDirectory() == null) {      job.setWorkingDirectory(fs.getWorkingDirectory());              }  }    private void copyAndConfigureFiles(JobConf job, Path submitJobDir,       short replication) throws IOException, InterruptedException {        if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {      LOG.warn("Use GenericOptionsParser for parsing the arguments. " +               "Applications should implement Tool for the same.");    }    // Retrieve command line arguments placed into the JobConf    // by GenericOptionsParser.    String files = job.get("tmpfiles");    String libjars = job.get("tmpjars");    String archives = job.get("tmparchives");    //    // Figure out what fs the JobTracker is using.  Copy the    // job to it, under a temporary name.  This allows DFS to work,    // and under the local fs also provides UNIX-like object loading     // semantics.  (that is, if the job file is deleted right after    // submission, we can still run the submission to completion)    //    // Create a number of filenames in the JobTracker‘s fs namespace    FileSystem fs = submitJobDir.getFileSystem(job);    LOG.debug("default FileSystem: " + fs.getUri());    if (fs.exists(submitJobDir)) {      throw new IOException("Not submitting job. Job directory " + submitJobDir          +" already exists!! This is unexpected.Please check what‘s there in" +          " that directory");    }    submitJobDir = fs.makeQualified(submitJobDir);    FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);    FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);    Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);    Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);    Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);    // add all the command line files/ jars and archive    // first copy them to jobtrackers filesystem         if (files != null) {      FileSystem.mkdirs(fs, filesDir, mapredSysPerms);      String[] fileArr = files.split(",");      for (String tmpFile: fileArr) {        URI tmpURI;        try {          tmpURI = new URI(tmpFile);        } catch (URISyntaxException e) {          throw new IllegalArgumentException(e);        }        Path tmp = new Path(tmpURI);        Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication);        try {          URI pathURI = getPathURI(newPath, tmpURI.getFragment());          DistributedCache.addCacheFile(pathURI, job);        } catch(URISyntaxException ue) {          //should not throw a uri exception           throw new IOException("Failed to create uri for " + tmpFile, ue);        }        DistributedCache.createSymlink(job);      }    }        if (libjars != null) {      FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms);      String[] libjarsArr = libjars.split(",");      for (String tmpjars: libjarsArr) {        Path tmp = new Path(tmpjars);        Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);        DistributedCache.addArchiveToClassPath          (new Path(newPath.toUri().getPath()), job, fs);      }    }            if (archives != null) {     FileSystem.mkdirs(fs, archivesDir, mapredSysPerms);      String[] archivesArr = archives.split(",");     for (String tmpArchives: archivesArr) {       URI tmpURI;       try {         tmpURI = new URI(tmpArchives);       } catch (URISyntaxException e) {         throw new IllegalArgumentException(e);       }       Path tmp = new Path(tmpURI);       Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication);       try {         URI pathURI = getPathURI(newPath, tmpURI.getFragment());         DistributedCache.addCacheArchive(pathURI, job);       } catch(URISyntaxException ue) {         //should not throw an uri excpetion         throw new IOException("Failed to create uri for " + tmpArchives, ue);       }       DistributedCache.createSymlink(job);     }    }        // First we check whether the cached archives and files are legal.    TrackerDistributedCacheManager.validate(job);    //  set the timestamps of the archives and files and set the    //  public/private visibility of the archives and files    TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job);    // get DelegationTokens for cache files    TrackerDistributedCacheManager.getDelegationTokens(job,                                                        job.getCredentials());    String originalJarPath = job.getJar();    if (originalJarPath != null) {           // copy jar to JobTracker‘s fs      // use jar name if job is not named.       if ("".equals(job.getJobName())){        job.setJobName(new Path(originalJarPath).getName());      }      Path originalJarFile = new Path(originalJarPath);      URI jobJarURI = originalJarFile.toUri();      // If the job jar is already in fs, we don‘t need to copy it from local fs      if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null              || !(jobJarURI.getScheme().equals(fs.getUri().getScheme())                  && jobJarURI.getAuthority().equals(                                            fs.getUri().getAuthority()))) {        Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);        job.setJar(submitJarFile.toString());        fs.copyFromLocalFile(originalJarFile, submitJarFile);        fs.setReplication(submitJarFile, replication);        fs.setPermission(submitJarFile,             new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));      }    } else {      LOG.warn("No job jar file set.  User classes may not be found. "+               "See JobConf(Class) or JobConf#setJar(String).");    }  }


    short replication = (short)job.getInt("mapred.submit.replication", 10);



    String files = job.get("tmpfiles");    String libjars = job.get("tmpjars");    String archives = job.get("tmparchives");    Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);    Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);    Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);    FileSystem.mkdirs(fs, filesDir, mapredSysPerms);    Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication);    FileSystem.mkdirs(fs, archivesDir, mapredSysPerms);     Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication);    FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms);    Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);


  public static Path getJobDistCacheFiles(Path jobSubmitDir) {    return new Path(jobSubmitDir, "files");  }  /**   * Get the job distributed cache archives path.   * @param jobSubmitDir    */  public static Path getJobDistCacheArchives(Path jobSubmitDir) {    return new Path(jobSubmitDir, "archives");  }  /**   * Get the job distributed cache libjars path.   * @param jobSubmitDir    */  public static Path getJobDistCacheLibjars(Path jobSubmitDir) {    return new Path(jobSubmitDir, "libjars");  }




    String originalJarPath = job.getJar();    if (originalJarPath != null) {           // copy jar to JobTracker‘s fs      // use jar name if job is not named.       if ("".equals(job.getJobName())){        job.setJobName(new Path(originalJarPath).getName());      }      Path originalJarFile = new Path(originalJarPath);      URI jobJarURI = originalJarFile.toUri();      // If the job jar is already in fs, we don‘t need to copy it from local fs      if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null              || !(jobJarURI.getScheme().equals(fs.getUri().getScheme())                  && jobJarURI.getAuthority().equals(                                            fs.getUri().getAuthority()))) {        Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);        job.setJar(submitJarFile.toString());        fs.copyFromLocalFile(originalJarFile, submitJarFile);        fs.setReplication(submitJarFile, replication);        fs.setPermission(submitJarFile,             new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));



  public static Path getJobJar(Path jobSubmitDir) {    return new Path(jobSubmitDir, "job.jar");  }







  Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);  public static Path getJobConfPath(Path jobSubmitDir) {    return new Path(jobSubmitDir, "job.xml");  }



          int reduces = jobCopy.getNumReduceTasks();


          InetAddress ip = InetAddress.getLocalHost();          if (ip != null) {            job.setJobSubmitHostAddress(ip.getHostAddress());            job.setJobSubmitHostName(ip.getHostName());          }



          JobContext context = new JobContext(jobCopy, jobId);


public class JobContext {  // Put all of the attribute names in here so that Job and JobContext are  // consistent.  protected static final String INPUT_FORMAT_CLASS_ATTR =     "mapreduce.inputformat.class";  protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";  protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";  protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";  protected static final String OUTPUT_FORMAT_CLASS_ATTR =     "mapreduce.outputformat.class";  protected static final String PARTITIONER_CLASS_ATTR =     "mapreduce.partitioner.class";  protected final org.apache.hadoop.mapred.JobConf conf;  protected final Credentials credentials;  private JobID jobId;  public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";  public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job";  public static final String JOB_ACL_MODIFY_JOB =    "mapreduce.job.acl-modify-job";  public static final String CACHE_FILE_VISIBILITIES =     "mapreduce.job.cache.files.visibilities";  public static final String CACHE_ARCHIVES_VISIBILITIES =     "mapreduce.job.cache.archives.visibilities";    public static final String JOB_CANCEL_DELEGATION_TOKEN =     "mapreduce.job.complete.cancel.delegation.tokens";  public static final String USER_LOG_RETAIN_HOURS =     "mapred.userlog.retain.hours";    /**   * The UserGroupInformation object that has a reference to the current user   */  protected UserGroupInformation ugi;  。。。。。。


  public Class<? extends Mapper<?,?,?,?>> getMapperClass()      throws ClassNotFoundException {    return (Class<? extends Mapper<?,?,?,?>>)       conf.getClass(MAP_CLASS_ATTR, Mapper.class);  }


          // Check the output specification          if (reduces == 0 ? jobCopy.getUseNewMapper() :             jobCopy.getUseNewReducer()) {            org.apache.hadoop.mapreduce.OutputFormat<?,?> output =              ReflectionUtils.newInstance(context.getOutputFormatClass(),                  jobCopy);            output.checkOutputSpecs(context);          } else {            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);          }          


          // Create the splits for the job          FileSystem fs = submitJobDir.getFileSystem(jobCopy);          LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));          int maps = writeSplits(context, submitJobDir);          jobCopy.setNumMapTasks(maps);






          // Write job file to JobTracker‘s fs                  FSDataOutputStream out =             FileSystem.create(fs, submitJobFile,                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));          // removing jobtoken referrals before copying the jobconf to HDFS          // as the tasks don‘t need this setting, actually they may break          // because of it if present as the referral will point to a          // different job.          TokenCache.cleanUpTokenReferral(jobCopy);          try {            jobCopy.writeXml(out);          } finally {            out.close();          }




          status = jobSubmitClient.submitJob(              jobId, submitJobDir.toString(), jobCopy.getCredentials());          JobProfile prof = jobSubmitClient.getJobProfile(jobId);

jobSubmitClient调用的这个方法同样利用RPC传递到JobTracker执行同名方法,可以看到,JobClient和JobTracker两者传递的内容实际上主要有两个,一个是Job ID,另一个是该Job在HDFS中的根目录,具体的资源数据等等实际上全部由JobClient预先放在HDFS中。其代码为:

  JobStatus submitJob(JobID jobId, String jobSubmitDir,      UserGroupInformation ugi, Credentials ts, boolean recovered)      throws IOException {    // Check for safe-mode    checkSafeMode();        JobInfo jobInfo = null;    if (ugi == null) {      ugi = UserGroupInformation.getCurrentUser();    }    synchronized (this) {      if (jobs.containsKey(jobId)) {        // job already running, don‘t start twice        return jobs.get(jobId).getStatus();      }      jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),          new Path(jobSubmitDir));    }        // Store the job-info in a file so that the job can be recovered    // later (if at all)    // Note: jobDir & jobInfo are owned by JT user since we are using    // his fs object    if (!recovered) {      Path jobDir = getSystemDirectoryForJob(jobId);      FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));      FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));      jobInfo.write(out);      out.close();    }    // Create the JobInProgress, do not lock the JobTracker since    // we are about to copy job.xml from HDFS and write jobToken file to HDFS    JobInProgress job = null;    try {      if (ts == null) {        ts = new Credentials();      }      generateAndStoreJobTokens(jobId, ts);      job = new JobInProgress(this, this.conf, jobInfo, 0, ts);    } catch (Exception e) {      throw new IOException(e);    }        if (recovered &&         !job.getJobConf().getBoolean(            JobConf.MAPREDUCE_RECOVER_JOB,             JobConf.DEFAULT_MAPREDUCE_RECOVER_JOB)) {      LOG.info("Job "+ jobId.toString() + " is not enable for recovery, cleaning up job files");      job.cleanupJob();      return null;    }        synchronized (this) {      // check if queue is RUNNING      String queue = job.getProfile().getQueueName();      if (!queueManager.isRunning(queue)) {        throw new IOException("Queue \"" + queue + "\" is not running");      }      try {        aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);      } catch (IOException ioe) {        LOG.warn("Access denied for user " + job.getJobConf().getUser()            + ". Ignoring job " + jobId, ioe);        job.fail();        throw ioe;      }      // Check the job if it cannot run in the cluster because of invalid memory      // requirements.      try {        checkMemoryRequirements(job);      } catch (IOException ioe) {        throw ioe;      }      try {        this.taskScheduler.checkJobSubmission(job);      } catch (IOException ioe){        LOG.error("Problem in submitting job " + jobId, ioe);        throw ioe;      }      // Submit the job      JobStatus status;      try {        status = addJob(jobId, job);      } catch (IOException ioe) {        LOG.info("Job " + jobId + " submission failed!", ioe);        status = job.getStatus();        status.setFailureInfo(StringUtils.stringifyException(ioe));        failJob(job);        throw ioe;      }      return status;    }  }


5.1 创建JobInProgress对象,该对象是JobTracker用来记录Job信息的类。其声明为:

public class JobInProgress {  JobProfile profile;  JobStatus status;  String jobFile = null;  Path localJobFile = null;  final QueueMetrics queueMetrics;  TaskInProgress maps[] = new TaskInProgress[0];  TaskInProgress reduces[] = new TaskInProgress[0];  TaskInProgress cleanup[] = new TaskInProgress[0];  TaskInProgress setup[] = new TaskInProgress[0];  int numMapTasks = 0;  int numReduceTasks = 0;  final long memoryPerMap;  final long memoryPerReduce;  volatile int numSlotsPerMap = 1;  volatile int numSlotsPerReduce = 1;  final int maxTaskFailuresPerTracker;    // Counters to track currently running/finished/failed Map/Reduce task-attempts  int runningMapTasks = 0;  int runningReduceTasks = 0;  int finishedMapTasks = 0;  int finishedReduceTasks = 0;  int failedMapTasks = 0;   int failedReduceTasks = 0;  private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L;  long reduce_input_limit = -1L;  private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;  int completedMapsForReduceSlowstart = 0;      // runningMapTasks include speculative tasks, so we need to capture   // speculative tasks separately   int speculativeMapTasks = 0;  int speculativeReduceTasks = 0;    final int mapFailuresPercent;  final int reduceFailuresPercent;  int failedMapTIPs = 0;  int failedReduceTIPs = 0;  private volatile boolean launchedCleanup = false;  private volatile boolean launchedSetup = false;  private volatile boolean jobKilled = false;  private volatile boolean jobFailed = false;  JobPriority priority = JobPriority.NORMAL;  final JobTracker jobtracker;    protected Credentials tokenStorage;  // NetworkTopology Node to the set of TIPs  Map<Node, List<TaskInProgress>> nonRunningMapCache;    // Map of NetworkTopology Node to set of running TIPs  Map<Node, Set<TaskInProgress>> runningMapCache;  // A list of non-local, non-running maps  final List<TaskInProgress> nonLocalMaps;  // Set of failed, non-running maps sorted by #failures  final SortedSet<TaskInProgress> failedMaps;  // A set of non-local running maps  Set<TaskInProgress> nonLocalRunningMaps;  // A list of non-running reduce TIPs  Set<TaskInProgress> nonRunningReduces;  // A set of running reduce TIPs  Set<TaskInProgress> runningReduces;    // A list of cleanup tasks for the map task attempts, to be launched  List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();    // A list of cleanup tasks for the reduce task attempts, to be launched  List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();。。。。。。。


  JobInProgress(JobTracker jobtracker, final JobConf default_conf,       JobInfo jobInfo, int rCount, Credentials ts)   throws IOException, InterruptedException {    try {      this.restartCount = rCount;      this.jobId = JobID.downgrade(jobInfo.getJobID());      String url = "http://" + jobtracker.getJobTrackerMachine() + ":"       + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobId;      this.jobtracker = jobtracker;      this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);      this.status.setUsername(jobInfo.getUser().toString());      this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);      // Add the queue-level metric below (after the profile has been initialized)      this.startTime = jobtracker.getClock().getTime();      status.setStartTime(startTime);      this.localFs = jobtracker.getLocalFileSystem();      this.tokenStorage = ts;      // use the user supplied token to add user credentials to the conf      jobSubmitDir = jobInfo.getJobSubmitDir();      user = jobInfo.getUser().toString();      userUGI = UserGroupInformation.createRemoteUser(user);      if (ts != null) {        for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {          userUGI.addToken(token);        }      }      fs = userUGI.doAs(new PrivilegedExceptionAction<FileSystem>() {        public FileSystem run() throws IOException {          return jobSubmitDir.getFileSystem(default_conf);        }});            /** check for the size of jobconf **/      Path submitJobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);      FileStatus fstatus = fs.getFileStatus(submitJobFile);      if (fstatus.getLen() > jobtracker.MAX_JOBCONF_SIZE) {        throw new IOException("Exceeded max jobconf size: "             + fstatus.getLen() + " limit: " + jobtracker.MAX_JOBCONF_SIZE);      }      this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR          +"/"+jobId + ".xml");      Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);      jobFile = jobFilePath.toString();      fs.copyToLocalFile(jobFilePath, localJobFile);      conf = new JobConf(localJobFile);      if (conf.getUser() == null) {        this.conf.setUser(user);      }      if (!conf.getUser().equals(user)) {        String desc = "The username " + conf.getUser() + " obtained from the " +        "conf doesn‘t match the username " + user + " the user " +        "authenticated as";        AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(), conf.getUser(),             jobId.toString(), desc);        throw new IOException(desc);      }            this.priority = conf.getJobPriority();      this.status.setJobPriority(this.priority);      String queueName = conf.getQueueName();      this.profile = new JobProfile(user, jobId,           jobFile, url, conf.getJobName(), queueName);      Queue queue = this.jobtracker.getQueueManager().getQueue(queueName);      if (queue == null) {        throw new IOException("Queue \"" + queueName + "\" does not exist");      }      this.queueMetrics = queue.getMetrics();      this.queueMetrics.addPrepJob(conf, jobId);      this.submitHostName = conf.getJobSubmitHostName();      this.submitHostAddress = conf.getJobSubmitHostAddress();      this.numMapTasks = conf.getNumMapTasks();      this.numReduceTasks = conf.getNumReduceTasks();      this.memoryPerMap = conf.getMemoryForMapTask();      this.memoryPerReduce = conf.getMemoryForReduceTask();      this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>      (numMapTasks + numReduceTasks + 10);      // Construct the jobACLs      status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));      this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();      this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();      this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();      hasSpeculativeMaps = conf.getMapSpeculativeExecution();      hasSpeculativeReduces = conf.getReduceSpeculativeExecution();      // a limit on the input size of the reduce.      // we check to see if the estimated input size of       // of each reduce is less than this value. If not      // we fail the job. A value of -1 just means there is no      // limit set.      reduce_input_limit = -1L;      this.maxLevel = jobtracker.getNumTaskCacheLevels();      this.anyCacheLevel = this.maxLevel+1;      this.nonLocalMaps = new LinkedList<TaskInProgress>();      this.failedMaps = new TreeSet<TaskInProgress>(failComparator);      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();      this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator);      this.runningReduces = new LinkedHashSet<TaskInProgress>();      this.resourceEstimator = new ResourceEstimator(this);      this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit",           DEFAULT_REDUCE_INPUT_LIMIT);      // register job‘s tokens for renewal      DelegationTokenRenewal.registerDelegationTokensForRenewal(          jobInfo.getJobID(), ts, jobtracker.getConf());            // Check task limits      checkTaskLimits();    } finally {      //close all FileSystems that was created above for the current user      //At this point, this constructor is called in the context of an RPC, and      //hence the "current user" is actually referring to the kerberos      //authenticated user (if security is ON).      FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());    }  }

5.2 将创建好的对象丢至任务队列中:

      // Submit the job      JobStatus status;      try {        status = addJob(jobId, job);      } catch (IOException ioe) {        LOG.info("Job " + jobId + " submission failed!", ioe);        status = job.getStatus();        status.setFailureInfo(StringUtils.stringifyException(ioe));        failJob(job);        throw ioe;      }      return status;    }  }


  private synchronized JobStatus addJob(JobID jobId, JobInProgress job)   throws IOException {    totalSubmissions++;    synchronized (jobs) {      synchronized (taskScheduler) {        jobs.put(job.getProfile().getJobID(), job);        for (JobInProgressListener listener : jobInProgressListeners) {          listener.jobAdded(job);        }      }    }    myInstrumentation.submitJob(job.getJobConf(), jobId);    job.getQueueMetrics().submitJob(job.getJobConf(), jobId);    LOG.info("Job " + jobId + " added successfully for user ‘"              + job.getJobConf().getUser() + "‘ to queue ‘"              + job.getJobConf().getQueueName() + "‘");    AuditLogger.logSuccess(job.getUser(),         Operation.SUBMIT_JOB.name(), jobId.toString());    return job.getStatus();  }


    synchronized (jobs) {      synchronized (taskScheduler) {        jobs.put(job.getProfile().getJobID(), job);        for (JobInProgressListener listener : jobInProgressListeners) {          listener.jobAdded(job);        }      }    }


  // All the known jobs.  (jobid->JobInProgress)  Map<JobID, JobInProgress> jobs =      Collections.synchronizedMap(new TreeMap<JobID, JobInProgress>());


  private final List<JobInProgressListener> jobInProgressListeners =    new CopyOnWriteArrayList<JobInProgressListener>();

而JobInProgressListener是一个抽象类 ,其实现有很多个,如JobQueueJobInProgressListener用于监控job的运行状态,EagerTaskInitializationListener用于对Job进行初始化,JobTracker的JobInProgressListener队列里包含了多个这种类,新到的Job都被加入到各个JobInProgressListener中,以EagerTaskInitializationListener为例:

  /**   * We add the JIP to the jobInitQueue, which is processed    * asynchronously to handle split-computation and build up   * the right TaskTracker/Block mapping.   */  @Override  public void jobAdded(JobInProgress job) {    synchronized (jobInitQueue) {      jobInitQueue.add(job);      resortInitQueue();      jobInitQueue.notifyAll();    }  }


  private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();


  /**   * Sort jobs by priority and then by start time.   */  private synchronized void resortInitQueue() {    Comparator<JobInProgress> comp = new Comparator<JobInProgress>() {      public int compare(JobInProgress o1, JobInProgress o2) {        int res = o1.getPriority().compareTo(o2.getPriority());        if(res == 0) {          if(o1.getStartTime() < o2.getStartTime())            res = -1;          else            res = (o1.getStartTime()==o2.getStartTime() ? 0 : 1);        }                  return res;      }    };        synchronized (jobInitQueue) {      Collections.sort(jobInitQueue, comp);    }  }



  class JobInitManager implements Runnable {       public void run() {      JobInProgress job = null;      while (true) {        try {          synchronized (jobInitQueue) {            while (jobInitQueue.isEmpty()) {              jobInitQueue.wait();            }            job = jobInitQueue.remove(0);          }          threadPool.execute(new InitJob(job));        } catch (InterruptedException t) {          LOG.info("JobInitManagerThread interrupted.");          break;        }       }      LOG.info("Shutting down thread pool");      threadPool.shutdownNow();    }  }

当被唤醒后,会执行 job = jobInitQueue.remove(0)获得队列中第一个Job,并调用线程池,threadPool用JAVA中标准的线程池类java.util.concurrent.ExecutorService实现。

5.3 Job任务执行


  class InitJob implements Runnable {      private JobInProgress job;        public InitJob(JobInProgress job) {      this.job = job;    }        public void run() {      ttm.initJob(job);    }  }


  public void initJob(JobInProgress job) { 。。。。。。。try {      JobStatus prevStatus = (JobStatus)job.getStatus().clone();      LOG.info("Initializing " + job.getJobID());      job.initTasks();      // Inform the listeners if the job state has changed      JobStatus newStatus = (JobStatus)job.getStatus().clone();      if (prevStatus.getRunState() != newStatus.getRunState()) {        JobStatusChangeEvent event =           new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,               newStatus);        synchronized (JobTracker.this) {          updateJobInProgressListeners(event);        }      }    }     }。。。。。。     }





          // Create the splits for the job          FileSystem fs = submitJobDir.getFileSystem(jobCopy);          LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));          int maps = writeSplits(context, submitJobDir);          jobCopy.setNumMapTasks(maps);


  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,      Path jobSubmitDir) throws IOException,      InterruptedException, ClassNotFoundException {    JobConf jConf = (JobConf)job.getConfiguration();    int maps;    if (jConf.getUseNewMapper()) {      maps = writeNewSplits(job, jobSubmitDir);    } else {      maps = writeOldSplits(jConf, jobSubmitDir);    }    return maps;  }  private <T extends InputSplit>  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,      InterruptedException, ClassNotFoundException {    Configuration conf = job.getConfiguration();    InputFormat<?, ?> input =      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);    List<InputSplit> splits = input.getSplits(job);    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);    // sort the splits into order based on size, so that the biggest    // go first    Arrays.sort(array, new SplitComparator());    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,        jobSubmitDir.getFileSystem(conf), array);    return array.length;  }


  public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,       Configuration conf, FileSystem fs, T[] splits)   throws IOException, InterruptedException {    FSDataOutputStream out = createFile(fs,         JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);    SplitMetaInfo[] info = writeNewSplits(conf, splits, out);    out.close();    writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,        info);  }  public static Path getJobSplitFile(Path jobSubmissionDir) {    return new Path(jobSubmissionDir, "job.split");  }  public static Path getJobSplitMetaFile(Path jobSubmissionDir) {    return new Path(jobSubmissionDir, "job.splitmetainfo");  }


public class FileSplit extends InputSplit implements Writable {  private Path file;  private long start;  private long length;  private String[] hosts;.......


  /**    * Generate the list of files and make them into FileSplits.   */   public List<InputSplit> getSplits(JobContext job                                    ) throws IOException {    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));    long maxSize = getMaxSplitSize(job);    // generate splits    List<InputSplit> splits = new ArrayList<InputSplit>();    List<FileStatus>files = listStatus(job);    for (FileStatus file: files) {      Path path = file.getPath();      FileSystem fs = path.getFileSystem(job.getConfiguration());      long length = file.getLen();      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);      if ((length != 0) && isSplitable(job, path)) {         long blockSize = file.getBlockSize();        long splitSize = computeSplitSize(blockSize, minSize, maxSize);        long bytesRemaining = length;        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,                                    blkLocations[blkIndex].getHosts()));          bytesRemaining -= splitSize;        }                if (bytesRemaining != 0) {          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,                      blkLocations[blkLocations.length-1].getHosts()));        }      } else if (length != 0) {        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));      } else {         //Create empty hosts array for zero length files        splits.add(new FileSplit(path, 0, length, new String[0]));      }    }        // Save the number of input files in the job-conf    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());    LOG.debug("Total # of splits: " + splits.size());    return splits;  }


    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));    long maxSize = getMaxSplitSize(job);


  /**   * Get the lower bound on split size imposed by the format.   * @return the number of bytes of the minimal split for this format   */  protected long getFormatMinSplitSize() {    return 1;  }  /**   * Get the minimum split size   * @param job the job   * @return the minimum number of bytes that can be in a split   */  public static long getMinSplitSize(JobContext job) {    return job.getConfiguration().getLong("mapred.min.split.size", 1L);  }


  /**   * Get the maximum split size.   * @param context the job to look at.   * @return the maximum number of bytes a split can include   */  public static long getMaxSplitSize(JobContext context) {    return context.getConfiguration().getLong("mapred.max.split.size",                                               Long.MAX_VALUE);  }


      long length = file.getLen();      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);


public class BlockLocation implements Writable {  private String[] hosts; //hostnames of datanodes  private String[] names; //hostname:portNumber of datanodes  private String[] topologyPaths; // full path name in network topology  private long offset;  //offset of the of the block in the file  private long length;。。。。。。。
        long blockSize = file.getBlockSize();        long splitSize = computeSplitSize(blockSize, minSize, maxSize);


  protected long computeSplitSize(long blockSize, long minSize,                                  long maxSize) {    return Math.max(minSize, Math.min(maxSize, blockSize));  }

可见,就是几个值选择一个。默认的,Split的大小就是blockSize,是一个Block的大小,也就是64MB等,当然,也可以设置为其它值。但无论如何设置,从下面的代码可以看出,Split的大小是一样的,除了最后一个FileSplit。而由于SPLIT_SLOP = 1.1,最后一个FileSplit是有可能大于一个Block大小的(<1.1个BlockSize即可,默认情况)。

        long bytesRemaining = length;        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,                                    blkLocations[blkIndex].getHosts()));          bytesRemaining -= splitSize;        }        //最后一个块        if (bytesRemaining != 0) {          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,                      blkLocations[blkLocations.length-1].getHosts()));        }

每个FileSplit创建时,其大小都为splitSize,而这个值是通过前面代码获取的。从上面的代码中,blkInde表示某一个Block,而 blkLocations[blkIndex].getHosts()表示该Block所在的机器,这些机器作为FileSplit的参数,因此,应该是暗指Split的大小最好不要过大,比如假定一个Split是2个Block大小,而这个2个Block如果位于两个服务器上,那么在构造FileSplit时,则最终分配的Map任务只会位于一台服务器上,另一个Block则需要通过网络传输至Map所在机器,这说明,SPlit不宜大,最好保持就默认为一个Block。


  public FileSplit(Path file, long start, long length, String[] hosts) {    this.file = file;    this.start = start;    this.length = length;    this.hosts = hosts;  }


  protected int getBlockIndex(BlockLocation[] blkLocations,                               long offset) {    for (int i = 0 ; i < blkLocations.length; i++) {      // is the offset inside this block?      if ((blkLocations[i].getOffset() <= offset) &&          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){        return i;      }    }    BlockLocation last = blkLocations[blkLocations.length -1];    long fileLength = last.getOffset() + last.getLength() -1;    throw new IllegalArgumentException("Offset " + offset +                                        " is outside of file (0.." +                                       fileLength + ")");  }

FileInputFormat用于获取输入数据的分割,该类继承于基类InputFormat<K, V>,其定义为:

public abstract class InputFormat<K, V> {  /**    * Logically split the set of input files for the job.     *    * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}   * for processing.</p>   *   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the   * input files are not physically split into chunks. For e.g. a split could   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat   * also creates the {@link RecordReader} to read the {@link InputSplit}.   *    * @param context job configuration.   * @return an array of {@link InputSplit}s for the job.   */  public abstract     List<InputSplit> getSplits(JobContext context                               ) throws IOException, InterruptedException;    /**   * Create a record reader for a given split. The framework will call   * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before   * the split is used.   * @param split the split to be read   * @param context the information about the task   * @return a new record reader   * @throws IOException   * @throws InterruptedException   */  public abstract     RecordReader<K,V> createRecordReader(InputSplit split,                                         TaskAttemptContext context                                        ) throws IOException,                                                  InterruptedException;}


从前面可以看出,如果采用默认形式,则Map数量等于Block的数量,一般情况下,Split尺寸就等于Block。这里可能就会有个疑问,某些记录较长(比如一行文本),可能会跨越多个Block,那么,也就会跨越多个Split,而Map和Split是一一对应关系,跨越边界的记录被哪个Map执行呢?这个问题由RecordReader保证,在处理文本文件时,Hadoop提供了一些基本实现,典型的有TextInputFormat,这个类继承于FileInputFormat<LongWritable, Text> ,其声明为:

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {  @Override  public RecordReader<LongWritable, Text>     createRecordReader(InputSplit split,                       TaskAttemptContext context) {    return new LineRecordReader();  }  @Override  protected boolean isSplitable(JobContext context, Path file) {    CompressionCodec codec =       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);    if (null == codec) {      return true;    }    return codec instanceof SplittableCompressionCodec;  }}


  public boolean nextKeyValue() throws IOException {    if (key == null) {      key = new LongWritable();    }    key.set(pos);    if (value =http://www.mamicode.com/= null) {      value = new Text();    }    int newSize = 0;    // We always read one extra line, which lies outside the upper    // split limit i.e. (end - 1)    while (getFilePosition() <= end) {      newSize = in.readLine(value, maxLineLength,          Math.max(maxBytesToConsume(pos), maxLineLength));      if (newSize == 0) {        break;      }      pos += newSize;      if (newSize < maxLineLength) {        break;      }      // line too long. try again      LOG.info("Skipped line of size " + newSize + " at pos " +                (pos - newSize));    }    if (newSize == 0) {      key = null;      value = null;      return false;    } else {      return true;    }  }



  public void initialize(InputSplit genericSplit,                         TaskAttemptContext context) throws IOException {    FileSplit split = (FileSplit) genericSplit;    Configuration job = context.getConfiguration();    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",                                    Integer.MAX_VALUE);    start = split.getStart();    end = start + split.getLength();    final Path file = split.getPath();.....


      fileIn.seek(start);      in = new LineReader(fileIn, job);      filePosition = fileIn;


    // If this is not the first split, we always throw away first record    // because we always (except the last split) read one extra line in    // next() method.    if (start != 0) {      start += in.readLine(new Text(), 0, maxBytesToConsume(start));    }    this.pos = start;


总的来说,Map任务数量决定于Split数量,一般等于HDFS Block的整数倍,会存在一条记录跨越多个Split的情况,记录读取由RecordReader的实现类决定,在这个类中,会处理好边界问题。



2、获取Job ID;
4、提交Job至JobTracker,主要包含Job ID、Job所在路径;
10、Map、Reduce Task在虚拟机中执行。
在本博文中,对1-5的过程进行了分析,关于JobTracker如何取出Job,以及Job如何分配,TaskTracker如何获取Map/Reduce Task的细节在后续博文中进行分析。
