首页 > 代码库 > 深入Hadoop的调度器
深入Hadoop的调度器
Hadoop有两个大版本
0.20.x,1.x通常为hadoop 1版本,运行环境依赖JobTracker和TaskTracker,运行资源通过作业表示模型MapTask和ReduceTask来组成;运行资源通过槽位Slot来表示。
0.23.x,2.x称之为hadoop 2版本,在开发模型上类似1,都有新旧两套MapReduce API来完成;针对JobTracker的职责有YARN来管理;包括
a:资源管理 ResourceManager ,以及NodeManager
b:作业调度 ApplicationManager,跟RM协商获取应用所需资源,与NM合作完成执行和监控TASK的任务。
针对资源的管理在这里不做过多分析,重点分析作业调度,这个是Hadoop的核心中枢。
Hadoop1代中的调度器
TaskScheduler,JobTracker,TaskTracker
这里需要关注:JobTracker和TaskTracker直接的交换机制通过heartbeat
heartbeat
TaskTracker周期性(默认为3s)调用RPC向JobTracker汇报信息,形成heartbeat
汇报信息包括TaskTracker状态信息、Task运状况等.
同事关注资源的划分单位:Slot
分为map slot和reduce slot两种
由参数mapred.tasktracker.[map|reduce].tasks.maximum设置
调度器的作业流程:
1:首先client通过submitJob提交作业未JobTracker
2:JobTracker通知TaskScheduler,有新的Job提交作业了
3:TaskScheduler开始初始化作业
4:JobTracker跟TaskTracker通过heartbeat获取具体的TaskTracker的资源情况,获取TaskTrackerStatus.
当然如果一个TaskTracker空闲时,也主动申请JobTracker分配任务;
5:JobTracker根据自己对资源的管理情况,请求TaskScheduler分配作业,TaskScheduler根据对应的资源情况和任务数,分配作业列表
6:JobTracker收到分配的作业列表,再通过HeartBeat将任务下发给具体的TaskTracker.
7:TaskTracker执行作业。
Hadoop的作业包括三个层次:
a:JobInProgress
b:TaskInProgress
c:TaskAttempt
Hadoop的调度包括三级调度
空闲的slot依次选择一个队列、作业和任务。
Queue:用户被划分到某个队列,每个队列分配一定的资源
Job:具体的一个Map/Reduce Job,关注提交时间和优先级
Task:通常具体的一个MapTask,ReduceTask,只在本地执行。
Hadoop已有的调度器
FIFO 批处理调度器 按到达时间排序,先来先服务
Capacity Scheduler 多用户调度器 根据用户分配不同的队列,在每个队列里面根据FIFO来处理
Fair Scheduler 多用户调度器 按照缺额排序,缺额大者优先,保证各个队列直接的负载均衡。
基于最小资源量(min share)与公平共享量(fair share)进行调度,作业优先级越高,分配到的资源越多
自己定制调度器
步骤1 编写JobInProgressListener
步骤2 编写调度器类,继承抽象类TaskScheduler
步骤3 配置并启用Hadoop调度器
abstract class JobInProgressListener {
public abstract void jobAdded(JobInProgress job) throws IOException; //添加Job
public abstract void jobRemoved(JobInProgress job); //删除Job
public abstract void jobUpdated(JobChangeEvent event); //响应JobUpdate
}
实现JobInProgressListener
class CustomerJobListener extends JobInProgressListener {
private List<JobInProgress> jobQueue = new ArrayList<JobInProgress>();
public void jobAdded(JobInProgress job) {
synchronized (jobQueue) {
jobQueue.add(job);
tt.initJob(job);
sortJobs();
}
}
public void jobRemoved(JobInProgress job) {
synchronized (jobQueue) {
jobQueue.remove(job);
}
}
}
实现调度器
TaskScheduler的抽象方法
public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException;
实现TaskScheduler
public class CustomerScheduler extends TaskScheduler {
private CustomerJobListener customerJobListener;
public void start() {
this.customerJobListener = new CustomerJobListener();
this.customerJobListener.setTaskTrackerManager(taskTrackerManager );
taskTrackerManager.addJobInProgressListener(customerJobListener);
}
public synchronized List<Task> assignTasks(TaskTrackerStatus tracker) throws IOException {
if(customerJobListener.getJobs().size() > 0)
JobInProgress job = customerJobListener.getJobs().get(0);
return job.obtainNewMapTask(…);
}
}
配置TaskScheduler
(1)在mapred-site.xml中配置
<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.CustomerScheduler</value>
</property>
(2)重新启动mapreduce或者jobtracker
bin/start-mapred.sh
bin/hadoop-daemon.sh start jobtracker