首页 > 代码库 > Yarn的服务库和事件库使用方法
Yarn的服务库和事件库使用方法
事件类型定义:
package org.apache.hadoop.event;public enum JobEventType { JOB_KILL, JOB_INIT, JOB_START}
package org.apache.hadoop.event;public enum TaskEventType { T_KILL, T_SCHEDULE}
事件定义:
package org.apache.hadoop.event;import org.apache.hadoop.yarn.event.AbstractEvent;public class JobEvent extends AbstractEvent<JobEventType> { private String jobID; public JobEvent(JobEventType type,String jobID) { super(type); this.jobID=jobID; } public String getJobID() { return jobID; } }
package org.apache.hadoop.event;import org.apache.hadoop.yarn.event.AbstractEvent;public class TaskEvent extends AbstractEvent<TaskEventType> { private String taskID; //TASkID public TaskEvent(TaskEventType type,String taskID) { super(type); this.taskID=taskID; } public String getTaskID() { return taskID; }}
简单服务定义:
package org.apache.hadoop.event;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.service.CompositeService;import org.apache.hadoop.service.Service;import org.apache.hadoop.yarn.event.AsyncDispatcher;import org.apache.hadoop.yarn.event.Dispatcher;import org.apache.hadoop.yarn.event.EventHandler;public class SimpleMRAppMaster extends CompositeService { private Dispatcher dispatcher; //中央异步调度器 private String jobID; private int taskNumber; //作业中包含的任务数 private String[] taskIDS; //该作业中包含的所有任务 public SimpleMRAppMaster(String name,String jobID,int taskNumber) { super(name); this.jobID=jobID; this.taskNumber=taskNumber; this.taskIDS=new String[taskNumber]; for(int i=0;i<taskNumber;i++){ this.taskIDS[i]=new String(jobID+"_task_"+i); } } @Override protected void serviceInit(Configuration conf) throws Exception { dispatcher=new AsyncDispatcher(); dispatcher.register(JobEventType.class, new JobEventHandller()); dispatcher.register(TaskEventType.class, new TaskEventHandller()); addService((Service)dispatcher); super.serviceInit(conf); } public Dispatcher getDispatcher(){ return dispatcher; } private class JobEventHandller implements EventHandler<JobEvent>{ @Override public void handle(JobEvent event) { //若收到 杀死 作业 事件 if(event.getType() == JobEventType.JOB_KILL){ System.out.println("收到 杀死作业事件 ,要 杀掉作业"+event.getJobID()+"下的所有任务"); for(int i=0;i<=taskNumber;i++){ dispatcher.getEventHandler().handle(new TaskEvent(TaskEventType.T_KILL, taskIDS[i])); } }else if(event.getType()== JobEventType.JOB_INIT){ System.out.println("收到 启动作业事件 ,要启动 作业"+event.getJobID()+"下的所有任务"); for(int i=0;i<=taskNumber;i++){ dispatcher.getEventHandler().handle(new TaskEvent(TaskEventType.T_SCHEDULE, taskIDS[i])); } } } } private class TaskEventHandller implements EventHandler<TaskEvent>{ @Override public void handle(TaskEvent event) { if(event.getType()==TaskEventType.T_KILL){ System.out.println("收到杀死任务命令,开始杀死任务"+event.getTaskID()); }else if(event.getType()==TaskEventType.T_SCHEDULE){ System.out.println("收到启动任务命令,开始启动任务"+event.getTaskID()); } } } }
测试程序定义:
package org.apache.hadoop.event;import static org.junit.Assert.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.yarn.conf.YarnConfiguration;import org.junit.Test;/** * 自己写的 关于事件库 和 服务 库的使用 * @author joqk * */public class SimpleMRAppMasterTest { @Test public void test() throws Exception { String jobID="job_20140912_01"; SimpleMRAppMaster appMaster=new SimpleMRAppMaster("作业测试", jobID, 10); YarnConfiguration conf = new YarnConfiguration(new Configuration()); appMaster.serviceInit(conf); appMaster.start(); appMaster.getDispatcher().getEventHandler().handle(new JobEvent(JobEventType.JOB_INIT, jobID)); }}
Yarn的服务库和事件库使用方法
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。