首页 > 代码库 > Hadoop 2.2.0 Job源代码阅读笔记

Hadoop 2.2.0 Job源代码阅读笔记

  本文所有涉及的内容均为2.2.0版本中呈现。

  概述:

  Job在创建Job并且提交的人的眼中,可以在创建的时候通过配置Job的内容,控制Job的执行,以及查询Job的运行状态。一旦Job提交以后,将不能对其进行配置,否则将会出现IllegalStateException异常。

  正常情况下用户通过Job类来创建、描述、提交Job,以及监控Job的处理过程。下面是一个简单的例子:  

// Create a new JobJob job = new Job(new Configuration());job.setJarByClass(MyJob.class);// Specify various job-specific parameters     job.setJobName("myjob");job.setInputPath(new Path("in"));job.setOutputPath(new Path("out"));job.setMapperClass(MyJob.MyMapper.class);job.setReducerClass(MyJob.MyReducer.class);// Submit the job, then poll for progress until the job is completejob.waitForCompletion(true);

  基本结构:  

  Job类在org.apache.hadoop.mapreduce包中,继承了JobContextImpl类以及实现了JobContext接口。

  Job定义的静态常量:  

private static final Log LOG = LogFactory.getLog(Job.class);  @InterfaceStability.Evolving  public static enum JobState {DEFINE, RUNNING};  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";  /** Key in mapred-*.xml that sets completionPollInvervalMillis */  public static final String COMPLETION_POLL_INTERVAL_KEY =     "mapreduce.client.completion.pollinterval";    /** Default completionPollIntervalMillis is 5000 ms. */  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =    "mapreduce.client.progressmonitor.pollinterval";  /** Default progMonitorPollIntervalMillis is 1000 ms. */  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;  public static final String USED_GENERIC_PARSER =     "mapreduce.client.genericoptionsparser.used";  public static final String SUBMIT_REPLICATION =     "mapreduce.client.submit.file.replication";  private static final String TASKLOG_PULL_TIMEOUT_KEY =           "mapreduce.client.tasklog.timeout";  private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;

  Job定义的私有变量:  

 private JobState state = JobState.DEFINE; private JobStatus status; private long statustime; private Cluster cluster;

  Job类加载的时候就要执行的加载配置文件的方法:  

static {    ConfigUtil.loadResources(); }

  加载的配置文件包括mapred-default.xml、mapred-site.xml、yarn-default.xml、yarn-site.xml。

  

  Job的构造函数:  

  @Deprecated  public Job() throws IOException {    this(new Configuration());  }  @Deprecated  public Job(Configuration conf) throws IOException {    this(new JobConf(conf));  }  @Deprecated  public Job(Configuration conf, String jobName) throws IOException {    this(conf);    setJobName(jobName);  }  Job(JobConf conf) throws IOException {    super(conf, null);    // propagate existing user credentials to job    this.credentials.mergeAll(this.ugi.getCredentials());    this.cluster = null;  }  Job(JobStatus status, JobConf conf) throws IOException {    this(conf);    setJobID(status.getJobID());    this.status = status;    state = JobState.RUNNING;  }

  可以注意到Hadoop不鼓励通过缺省的构造函数和通过Configuration类来构造Job对象。通过JobConf对象来构建Job是一个不错的选择。

  

  获取Job对象的实例化方法:

    除了通过构造函数,Job类中还提供了通过一些静态方法来获取Job的事例对象,看一下具体定义:    

 /**   * Creates a new {@link Job} with no particular {@link Cluster} .   * A Cluster will be created with a generic {@link Configuration}.   *    * @return the {@link Job} , with no connection to a cluster yet.   * @throws IOException   */  public static Job getInstance() throws IOException {    // create with a null Cluster    return getInstance(new Configuration());  }        /**   * Creates a new {@link Job} with no particular {@link Cluster} and a    * given {@link Configuration}.   *    * The <code>Job</code> makes a copy of the <code>Configuration</code> so    * that any necessary internal modifications do not reflect on the incoming    * parameter.   *    * A Cluster will be created from the conf parameter only when it‘s needed.   *    * @param conf the configuration   * @return the {@link Job} , with no connection to a cluster yet.   * @throws IOException   */  public static Job getInstance(Configuration conf) throws IOException {    // create with a null Cluster    JobConf jobConf = new JobConf(conf);    return new Job(jobConf);  }        /**   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.   * A Cluster will be created from the conf parameter only when it‘s needed.   *   * The <code>Job</code> makes a copy of the <code>Configuration</code> so    * that any necessary internal modifications do not reflect on the incoming    * parameter.   *    * @param conf the configuration   * @return the {@link Job} , with no connection to a cluster yet.   * @throws IOException   */  public static Job getInstance(Configuration conf, String jobName)           throws IOException {    // create with a null Cluster    Job result = getInstance(conf);    result.setJobName(jobName);    return result;  }    /**   * Creates a new {@link Job} with no particular {@link Cluster} and given   * {@link Configuration} and {@link JobStatus}.   * A Cluster will be created from the conf parameter only when it‘s needed.   *    * The <code>Job</code> makes a copy of the <code>Configuration</code> so    * that any necessary internal modifications do not reflect on the incoming    * parameter.   *    * @param status job status   * @param conf job configuration   * @return the {@link Job} , with no connection to a cluster yet.   * @throws IOException   */  public static Job getInstance(JobStatus status, Configuration conf)   throws IOException {    return new Job(status, new JobConf(conf));  }  /**   * Creates a new {@link Job} with no particular {@link Cluster}.   * A Cluster will be created from the conf parameter only when it‘s needed.   *   * The <code>Job</code> makes a copy of the <code>Configuration</code> so    * that any necessary internal modifications do not reflect on the incoming    * parameter.   *    * @param ignored   * @return the {@link Job} , with no connection to a cluster yet.   * @throws IOException   * @deprecated Use {@link #getInstance()}   */  @Deprecated  public static Job getInstance(Cluster ignored) throws IOException {    return getInstance();  }    /**   * Creates a new {@link Job} with no particular {@link Cluster} and given   * {@link Configuration}.   * A Cluster will be created from the conf parameter only when it‘s needed.   *    * The <code>Job</code> makes a copy of the <code>Configuration</code> so    * that any necessary internal modifications do not reflect on the incoming    * parameter.   *    * @param ignored   * @param conf job configuration   * @return the {@link Job} , with no connection to a cluster yet.   * @throws IOException   * @deprecated Use {@link #getInstance(Configuration)}   */  @Deprecated  public static Job getInstance(Cluster ignored, Configuration conf)       throws IOException {    return getInstance(conf);  }    /**   * Creates a new {@link Job} with no particular {@link Cluster} and given   * {@link Configuration} and {@link JobStatus}.   * A Cluster will be created from the conf parameter only when it‘s needed.   *    * The <code>Job</code> makes a copy of the <code>Configuration</code> so    * that any necessary internal modifications do not reflect on the incoming    * parameter.   *    * @param cluster cluster   * @param status job status   * @param conf job configuration   * @return the {@link Job} , with no connection to a cluster yet.   * @throws IOException   */  @Private  public static Job getInstance(Cluster cluster, JobStatus status,       Configuration conf) throws IOException {    Job job = getInstance(status, conf);    job.setCluster(cluster);    return job;  }

    可见通过这种方式获取Job实例的时候会有可能涉及到Cluster。

    

    轮询周期的方法:    

 /** The interval at which monitorAndPrintJob() prints status */  public static int getProgressPollInterval(Configuration conf) {    // Read progress monitor poll interval from config. Default is 1 second.    int progMonitorPollIntervalMillis = conf.getInt(      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);    if (progMonitorPollIntervalMillis < 1) {      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +         " has been set to an invalid value; "        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;    }    return progMonitorPollIntervalMillis;  }  /** The interval at which waitForCompletion() should check. */  public static int getCompletionPollInterval(Configuration conf) {    int completionPollIntervalMillis = conf.getInt(      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);    if (completionPollIntervalMillis < 1) {       LOG.warn(COMPLETION_POLL_INTERVAL_KEY +        " has been set to an invalid value; "       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;    }    return completionPollIntervalMillis;  }

    上面两个方法分别为获取并且打印Job的运行状态的周期,以及查看Job是否完成的周期。

    

    需要做异步处理的方法:    

synchronized void ensureFreshStatus()       throws IOException {    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {      updateStatus();    }  } /** Some methods need to update status immediately. So, refresh   * immediately   * @throws IOException   */  synchronized void updateStatus() throws IOException {    try {      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {        @Override        public JobStatus run() throws IOException, InterruptedException {          return cluster.getClient().getJobStatus(status.getJobID());        }      });    }    catch (InterruptedException ie) {      throw new IOException(ie);    }    if (this.status == null) {      throw new IOException("Job status not available ");    }    this.statustime = System.currentTimeMillis();  } private synchronized void connect()          throws IOException, InterruptedException, ClassNotFoundException {    if (cluster == null) {      cluster =         ugi.doAs(new PrivilegedExceptionAction<Cluster>() {                   public Cluster run()                          throws IOException, InterruptedException,                                  ClassNotFoundException {                     return new Cluster(getConfiguration());                   }                 });    }  } 

    

    设置配置参数的方法:

    

/**   * Set the number of reduce tasks for the job.   * @param tasks the number of reduce tasks   * @throws IllegalStateException if the job is submitted   */  public void setNumReduceTasks(int tasks) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setNumReduceTasks(tasks);  }  /**   * Set the current working directory for the default file system.   *    * @param dir the new current working directory.   * @throws IllegalStateException if the job is submitted   */  public void setWorkingDirectory(Path dir) throws IOException {    ensureState(JobState.DEFINE);    conf.setWorkingDirectory(dir);  }  /**   * Set the {@link InputFormat} for the job.   * @param cls the <code>InputFormat</code> to use   * @throws IllegalStateException if the job is submitted   */  public void setInputFormatClass(Class<? extends InputFormat> cls                                  ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls,                   InputFormat.class);  }  /**   * Set the {@link OutputFormat} for the job.   * @param cls the <code>OutputFormat</code> to use   * @throws IllegalStateException if the job is submitted   */  public void setOutputFormatClass(Class<? extends OutputFormat> cls                                   ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls,                   OutputFormat.class);  }  /**   * Set the {@link Mapper} for the job.   * @param cls the <code>Mapper</code> to use   * @throws IllegalStateException if the job is submitted   */  public void setMapperClass(Class<? extends Mapper> cls                             ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);  }  /**   * Set the Jar by finding where a given class came from.   * @param cls the example class   */  public void setJarByClass(Class<?> cls) {    ensureState(JobState.DEFINE);    conf.setJarByClass(cls);  }  /**   * Set the job jar    */  public void setJar(String jar) {    ensureState(JobState.DEFINE);    conf.setJar(jar);  }  /**   * Set the reported username for this job.   *    * @param user the username for this job.   */  public void setUser(String user) {    ensureState(JobState.DEFINE);    conf.setUser(user);  }  /**   * Set the combiner class for the job.   * @param cls the combiner to use   * @throws IllegalStateException if the job is submitted   */  public void setCombinerClass(Class<? extends Reducer> cls                               ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);  }  /**   * Set the {@link Reducer} for the job.   * @param cls the <code>Reducer</code> to use   * @throws IllegalStateException if the job is submitted   */  public void setReducerClass(Class<? extends Reducer> cls                              ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);  }  /**   * Set the {@link Partitioner} for the job.   * @param cls the <code>Partitioner</code> to use   * @throws IllegalStateException if the job is submitted   */  public void setPartitionerClass(Class<? extends Partitioner> cls                                  ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setClass(PARTITIONER_CLASS_ATTR, cls,                   Partitioner.class);  }  /**   * Set the key class for the map output data. This allows the user to   * specify the map output key class to be different than the final output   * value class.   *    * @param theClass the map output key class.   * @throws IllegalStateException if the job is submitted   */  public void setMapOutputKeyClass(Class<?> theClass                                   ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setMapOutputKeyClass(theClass);  }  /**   * Set the value class for the map output data. This allows the user to   * specify the map output value class to be different than the final output   * value class.   *    * @param theClass the map output value class.   * @throws IllegalStateException if the job is submitted   */  public void setMapOutputValueClass(Class<?> theClass                                     ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setMapOutputValueClass(theClass);  }  /**   * Set the key class for the job output data.   *    * @param theClass the key class for the job output data.   * @throws IllegalStateException if the job is submitted   */  public void setOutputKeyClass(Class<?> theClass                                ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setOutputKeyClass(theClass);  }  /**   * Set the value class for job outputs.   *    * @param theClass the value class for job outputs.   * @throws IllegalStateException if the job is submitted   */  public void setOutputValueClass(Class<?> theClass                                  ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setOutputValueClass(theClass);  }  /**   * Define the comparator that controls how the keys are sorted before they   * are passed to the {@link Reducer}.   * @param cls the raw comparator   * @throws IllegalStateException if the job is submitted   */  public void setSortComparatorClass(Class<? extends RawComparator> cls                                     ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setOutputKeyComparatorClass(cls);  }  /**   * Define the comparator that controls which keys are grouped together   * for a single call to    * {@link Reducer#reduce(Object, Iterable,    *                       org.apache.hadoop.mapreduce.Reducer.Context)}   * @param cls the raw comparator to use   * @throws IllegalStateException if the job is submitted   */  public void setGroupingComparatorClass(Class<? extends RawComparator> cls                                         ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setOutputValueGroupingComparator(cls);  }  /**   * Set the user-specified job name.   *    * @param name the job‘s new name.   * @throws IllegalStateException if the job is submitted   */  public void setJobName(String name) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setJobName(name);  }  /**   * Turn speculative execution on or off for this job.    *    * @param speculativeExecution <code>true</code> if speculative execution    *                             should be turned on, else <code>false</code>.   */  public void setSpeculativeExecution(boolean speculativeExecution) {    ensureState(JobState.DEFINE);    conf.setSpeculativeExecution(speculativeExecution);  }  /**   * Turn speculative execution on or off for this job for map tasks.    *    * @param speculativeExecution <code>true</code> if speculative execution    *                             should be turned on for map tasks,   *                             else <code>false</code>.   */  public void setMapSpeculativeExecution(boolean speculativeExecution) {    ensureState(JobState.DEFINE);    conf.setMapSpeculativeExecution(speculativeExecution);  }  /**   * Turn speculative execution on or off for this job for reduce tasks.    *    * @param speculativeExecution <code>true</code> if speculative execution    *                             should be turned on for reduce tasks,   *                             else <code>false</code>.   */  public void setReduceSpeculativeExecution(boolean speculativeExecution) {    ensureState(JobState.DEFINE);    conf.setReduceSpeculativeExecution(speculativeExecution);  }  /**   * Specify whether job-setup and job-cleanup is needed for the job    *    * @param needed If <code>true</code>, job-setup and job-cleanup will be   *               considered from {@link OutputCommitter}    *               else ignored.   */  public void setJobSetupCleanupNeeded(boolean needed) {    ensureState(JobState.DEFINE);    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);  }  /**   * Set the given set of archives   * @param archives The list of archives that need to be localized   */  public void setCacheArchives(URI[] archives) {    ensureState(JobState.DEFINE);    DistributedCache.setCacheArchives(archives, conf);  }  /**   * Set the given set of files   * @param files The list of files that need to be localized   */  public void setCacheFiles(URI[] files) {    ensureState(JobState.DEFINE);    DistributedCache.setCacheFiles(files, conf);  }  /**   * Add a archives to be localized   * @param uri The uri of the cache to be localized   */  public void addCacheArchive(URI uri) {    ensureState(JobState.DEFINE);    DistributedCache.addCacheArchive(uri, conf);  }    /**   * Add a file to be localized   * @param uri The uri of the cache to be localized   */  public void addCacheFile(URI uri) {    ensureState(JobState.DEFINE);    DistributedCache.addCacheFile(uri, conf);  }  /**   * Add an file path to the current set of classpath entries It adds the file   * to cache as well.   *    * Files added with this method will not be unpacked while being added to the   * classpath.   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}   * method instead.   *   * @param file Path of the file to be added   */  public void addFileToClassPath(Path file)    throws IOException {    ensureState(JobState.DEFINE);    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));  }  /**   * Add an archive path to the current set of classpath entries. It adds the   * archive to cache as well.   *    * Archive files will be unpacked and added to the classpath   * when being distributed.   *   * @param archive Path of the archive to be added   */  public void addArchiveToClassPath(Path archive)    throws IOException {    ensureState(JobState.DEFINE);    DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));  }  /**   * Originally intended to enable symlinks, but currently symlinks cannot be   * disabled.   */  @Deprecated  public void createSymlink() {    ensureState(JobState.DEFINE);    DistributedCache.createSymlink(conf);  }    /**    * Expert: Set the number of maximum attempts that will be made to run a   * map task.   *    * @param n the number of attempts per map task.   */  public void setMaxMapAttempts(int n) {    ensureState(JobState.DEFINE);    conf.setMaxMapAttempts(n);  }  /**    * Expert: Set the number of maximum attempts that will be made to run a   * reduce task.   *    * @param n the number of attempts per reduce task.   */  public void setMaxReduceAttempts(int n) {    ensureState(JobState.DEFINE);    conf.setMaxReduceAttempts(n);  }  /**   * Set whether the system should collect profiler information for some of    * the tasks in this job? The information is stored in the user log    * directory.   * @param newValue true means it should be gathered   */  public void setProfileEnabled(boolean newValue) {    ensureState(JobState.DEFINE);    conf.setProfileEnabled(newValue);  }  /**   * Set the profiler configuration arguments. If the string contains a ‘%s‘ it   * will be replaced with the name of the profiling output file when the task   * runs.   *   * This value is passed to the task child JVM on the command line.   *   * @param value the configuration string   */  public void setProfileParams(String value) {    ensureState(JobState.DEFINE);    conf.setProfileParams(value);  }  /**   * Set the ranges of maps or reduces to profile. setProfileEnabled(true)    * must also be called.   * @param newValue a set of integer ranges of the map ids   */  public void setProfileTaskRange(boolean isMap, String newValue) {    ensureState(JobState.DEFINE);    conf.setProfileTaskRange(isMap, newValue);  }     /**   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation   * tokens upon job completion. Defaults to true.   */  public void setCancelDelegationTokenUponJobCompletion(boolean value) {    ensureState(JobState.DEFINE);    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);  }

    要非常注意的地方就是在每项配置的时候都需要检查状态,Job只有处于DEFINE状态下的时候才可以对其进行配置。

    

    屏幕输出的方法:    

/**   * Dump stats to screen.   */  @Override  public String toString() {    ensureState(JobState.RUNNING);    String reasonforFailure = " ";    int numMaps = 0;    int numReduces = 0;    try {      updateStatus();      if (status.getState().equals(JobStatus.State.FAILED))        reasonforFailure = getTaskFailureEventString();      numMaps = getTaskReports(TaskType.MAP).length;      numReduces = getTaskReports(TaskType.REDUCE).length;    } catch (IOException e) {    } catch (InterruptedException ie) {    }    StringBuffer sb = new StringBuffer();    sb.append("Job: ").append(status.getJobID()).append("\n");    sb.append("Job File: ").append(status.getJobFile()).append("\n");    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());    sb.append("\n");    sb.append("Uber job : ").append(status.isUber()).append("\n");    sb.append("Number of maps: ").append(numMaps).append("\n");    sb.append("Number of reduces: ").append(numReduces).append("\n");    sb.append("map() completion: ");    sb.append(status.getMapProgress()).append("\n");    sb.append("reduce() completion: ");    sb.append(status.getReduceProgress()).append("\n");    sb.append("Job state: ");    sb.append(status.getState()).append("\n");    sb.append("retired: ").append(status.isRetired()).append("\n");    sb.append("reason for failure: ").append(reasonforFailure);    return sb.toString();  }

  

  获取任务进程的方法:  

 /**   * Get the <i>progress</i> of the job‘s map-tasks, as a float between 0.0    * and 1.0.  When all map tasks have completed, the function returns 1.0.   *    * @return the progress of the job‘s map-tasks.   * @throws IOException   */  public float mapProgress() throws IOException {    ensureState(JobState.RUNNING);    ensureFreshStatus();    return status.getMapProgress();  }  /**   * Get the <i>progress</i> of the job‘s reduce-tasks, as a float between 0.0    * and 1.0.  When all reduce tasks have completed, the function returns 1.0.   *    * @return the progress of the job‘s reduce-tasks.   * @throws IOException   */  public float reduceProgress() throws IOException {    ensureState(JobState.RUNNING);    ensureFreshStatus();    return status.getReduceProgress();  }  /**   * Get the <i>progress</i> of the job‘s cleanup-tasks, as a float between 0.0    * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.   *    * @return the progress of the job‘s cleanup-tasks.   * @throws IOException   */  public float cleanupProgress() throws IOException, InterruptedException {    ensureState(JobState.RUNNING);    ensureFreshStatus();    return status.getCleanupProgress();  }  /**   * Get the <i>progress</i> of the job‘s setup-tasks, as a float between 0.0    * and 1.0.  When all setup tasks have completed, the function returns 1.0.   *    * @return the progress of the job‘s setup-tasks.   * @throws IOException   */  public float setupProgress() throws IOException {    ensureState(JobState.RUNNING);    ensureFreshStatus();    return status.getSetupProgress();  }