首页 > 代码库 > Yarn的服务库和事件库使用方法

Yarn的服务库和事件库使用方法

事件类型定义:

package org.apache.hadoop.event;public enum JobEventType {        JOB_KILL,        JOB_INIT,        JOB_START}
package org.apache.hadoop.event;public enum TaskEventType {    T_KILL,    T_SCHEDULE}

事件定义:

package org.apache.hadoop.event;import org.apache.hadoop.yarn.event.AbstractEvent;public class JobEvent extends AbstractEvent<JobEventType> {        private String jobID;        public JobEvent(JobEventType type,String jobID) {        super(type);        this.jobID=jobID;    }    public String getJobID() {        return jobID;    }        }
package org.apache.hadoop.event;import org.apache.hadoop.yarn.event.AbstractEvent;public class TaskEvent extends AbstractEvent<TaskEventType> {        private String taskID; //TASkID    public TaskEvent(TaskEventType type,String taskID) {        super(type);        this.taskID=taskID;    }    public String getTaskID() {        return taskID;    }}

简单服务定义:

package org.apache.hadoop.event;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.service.CompositeService;import org.apache.hadoop.service.Service;import org.apache.hadoop.yarn.event.AsyncDispatcher;import org.apache.hadoop.yarn.event.Dispatcher;import org.apache.hadoop.yarn.event.EventHandler;public class SimpleMRAppMaster extends CompositeService {    private Dispatcher dispatcher;   //中央异步调度器    private String jobID;    private int taskNumber;  //作业中包含的任务数    private String[] taskIDS; //该作业中包含的所有任务        public SimpleMRAppMaster(String name,String jobID,int taskNumber) {        super(name);        this.jobID=jobID;        this.taskNumber=taskNumber;        this.taskIDS=new String[taskNumber];        for(int i=0;i<taskNumber;i++){            this.taskIDS[i]=new String(jobID+"_task_"+i);        }    }    @Override    protected void serviceInit(Configuration conf) throws Exception {                dispatcher=new AsyncDispatcher();                dispatcher.register(JobEventType.class, new JobEventHandller());        dispatcher.register(TaskEventType.class, new TaskEventHandller());        addService((Service)dispatcher);                super.serviceInit(conf);    }            public Dispatcher getDispatcher(){        return dispatcher;    }        private class JobEventHandller implements EventHandler<JobEvent>{        @Override        public void handle(JobEvent event) {                        //若收到 杀死  作业 事件            if(event.getType() == JobEventType.JOB_KILL){                System.out.println("收到 杀死作业事件   ,要 杀掉作业"+event.getJobID()+"下的所有任务");                                for(int i=0;i<=taskNumber;i++){                    dispatcher.getEventHandler().handle(new TaskEvent(TaskEventType.T_KILL, taskIDS[i]));                }                            }else if(event.getType()== JobEventType.JOB_INIT){                System.out.println("收到 启动作业事件   ,要启动 作业"+event.getJobID()+"下的所有任务");                for(int i=0;i<=taskNumber;i++){                    dispatcher.getEventHandler().handle(new TaskEvent(TaskEventType.T_SCHEDULE, taskIDS[i]));                }            }        }    }        private class TaskEventHandller implements EventHandler<TaskEvent>{        @Override        public void handle(TaskEvent event) {            if(event.getType()==TaskEventType.T_KILL){                System.out.println("收到杀死任务命令,开始杀死任务"+event.getTaskID());            }else if(event.getType()==TaskEventType.T_SCHEDULE){                System.out.println("收到启动任务命令,开始启动任务"+event.getTaskID());            }        }    }        }

测试程序定义:

package org.apache.hadoop.event;import static org.junit.Assert.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.yarn.conf.YarnConfiguration;import org.junit.Test;/** * 自己写的 关于事件库 和 服务 库的使用 * @author joqk * */public class SimpleMRAppMasterTest {    @Test    public void test() throws Exception {                String jobID="job_20140912_01";            SimpleMRAppMaster appMaster=new SimpleMRAppMaster("作业测试", jobID, 10);            YarnConfiguration conf = new YarnConfiguration(new Configuration());                        appMaster.serviceInit(conf);                        appMaster.start();                        appMaster.getDispatcher().getEventHandler().handle(new JobEvent(JobEventType.JOB_INIT, jobID));                }}

 

 

Yarn的服务库和事件库使用方法