首页 > 代码库 > 深入Hadoop的调度器

深入Hadoop的调度器

Hadoop有两个大版本

0.20.x,1.x通常为hadoop 1版本,运行环境依赖JobTracker和TaskTracker,运行资源通过作业表示模型MapTask和ReduceTask来组成;运行资源通过槽位Slot来表示。

0.23.x,2.x称之为hadoop 2版本,在开发模型上类似1,都有新旧两套MapReduce API来完成;针对JobTracker的职责有YARN来管理;包括

a:资源管理 ResourceManager ,以及NodeManager

b:作业调度 ApplicationManager,跟RM协商获取应用所需资源,与NM合作完成执行和监控TASK的任务。

针对资源的管理在这里不做过多分析,重点分析作业调度,这个是Hadoop的核心中枢。

Hadoop1代中的调度器

TaskScheduler,JobTracker,TaskTracker

这里需要关注:JobTracker和TaskTracker直接的交换机制通过heartbeat

 

heartbeat

 

TaskTracker周期性(默认为3s)调用RPC向JobTracker汇报信息,形成heartbeat

汇报信息包括TaskTracker状态信息、Task运状况等.

同事关注资源的划分单位:Slot
分为map slot和reduce slot两种
由参数mapred.tasktracker.[map|reduce].tasks.maximum设置

 

 调度器的作业流程:

1:首先client通过submitJob提交作业未JobTracker

2:JobTracker通知TaskScheduler,有新的Job提交作业了

3:TaskScheduler开始初始化作业

4:JobTracker跟TaskTracker通过heartbeat获取具体的TaskTracker的资源情况,获取TaskTrackerStatus.

当然如果一个TaskTracker空闲时,也主动申请JobTracker分配任务;

5:JobTracker根据自己对资源的管理情况,请求TaskScheduler分配作业,TaskScheduler根据对应的资源情况和任务数,分配作业列表

6:JobTracker收到分配的作业列表,再通过HeartBeat将任务下发给具体的TaskTracker.

7:TaskTracker执行作业。

 

Hadoop的作业包括三个层次:

a:JobInProgress

b:TaskInProgress

c:TaskAttempt

 

Hadoop的调度包括三级调度

空闲的slot依次选择一个队列、作业和任务。

Queue:用户被划分到某个队列,每个队列分配一定的资源

Job:具体的一个Map/Reduce Job,关注提交时间和优先级

Task:通常具体的一个MapTask,ReduceTask,只在本地执行。

 

Hadoop已有的调度器

FIFO 批处理调度器 按到达时间排序,先来先服务
Capacity Scheduler 多用户调度器 根据用户分配不同的队列,在每个队列里面根据FIFO来处理
Fair Scheduler 多用户调度器 按照缺额排序,缺额大者优先,保证各个队列直接的负载均衡。

基于最小资源量(min share)与公平共享量(fair share)进行调度,作业优先级越高,分配到的资源越多

 

自己定制调度器

 步骤1 编写JobInProgressListener
步骤2 编写调度器类,继承抽象类TaskScheduler
步骤3 配置并启用Hadoop调度器

 

abstract class JobInProgressListener {
   public abstract void jobAdded(JobInProgress job) throws IOException;  //添加Job
   public abstract void jobRemoved(JobInProgress job); //删除Job
   public abstract void jobUpdated(JobChangeEvent event); //响应JobUpdate
}

 

实现JobInProgressListener

class CustomerJobListener extends JobInProgressListener {
private List<JobInProgress> jobQueue = new ArrayList<JobInProgress>();

public void jobAdded(JobInProgress job) {
   synchronized (jobQueue) {
     jobQueue.add(job);
     tt.initJob(job);
     sortJobs();
   }
}

public void jobRemoved(JobInProgress job) {
  synchronized (jobQueue) {
    jobQueue.remove(job);
  }
}

}

实现调度器

 

TaskScheduler的抽象方法

public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException;

实现TaskScheduler

public class CustomerScheduler extends TaskScheduler {
private CustomerJobListener customerJobListener;

public void start() {
    this.customerJobListener = new CustomerJobListener();
    this.customerJobListener.setTaskTrackerManager(taskTrackerManager );
    taskTrackerManager.addJobInProgressListener(customerJobListener);
}

 

public synchronized List<Task> assignTasks(TaskTrackerStatus tracker) throws IOException {
    if(customerJobListener.getJobs().size() > 0)
        JobInProgress job = customerJobListener.getJobs().get(0);
    return job.obtainNewMapTask(…);
}

}

 

配置TaskScheduler

(1)在mapred-site.xml中配置
<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.CustomerScheduler</value>
</property>
(2)重新启动mapreduce或者jobtracker
bin/start-mapred.sh
bin/hadoop-daemon.sh start jobtracker