首页 > 代码库 > Hadoop-2.6.0中关于控制应用是否通过CGroup限制CPU的优化

Hadoop-2.6.0中关于控制应用是否通过CGroup限制CPU的优化

一、背景

      Hadoop-2.6.0中,通过一系列复杂的配置,尤其是LinuxContainerExecutor和CgroupsLCEResourcesHandler这两个组件的使用,使得应用程序可以通过cgroup来限制其CPU的使用,防止CPU消耗过高的作业占住CPU,而其它作业无法使用。

      但是,这样也随之带来了一个问题,那就是一旦CPU CGroup启动,所有的应用都会受其限制,而且普遍的,生产集群配置的yarn.nodemanager.resource.cpu-vcores一般是高于物理内核数的,而作业的Container被分配的虚拟内核vcore为1的情况下,比不启用CPU CGroup时运行时间要长一些。我在5台机器上测试的结果是前者是后者的1.5倍。当然,这可能还需要在大规模集群上进行详细测试。但是有一点目前是肯定的,那就是启用CPU CGroup会使得原本不想限制CPU使用的应用受到限制,从而延长其运行时间。

      那么,有没有一种方案来实现应用级别的CPU CGroup呢?

二、思考

      通过分析CgroupsLCEResourcesHandler的源码,我了解到有一个参数,可以配置决定是否严格限制Container的CPU使用,即yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage,但实际上这个参数是属于NodeManager的,不是属于应用的,一旦设置并重启NodeManager后,任何应用都无法修改。并且,在Yarn中,ContainerExecutor这个组件是在NodeManager的serviceInit()方法中实例化的,如下:

@Override
protected void serviceInit(Configuration conf) throws Exception {
  // ...省略部分代码
  ContainerExecutor exec = ReflectionUtils.newInstance(
      conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
        DefaultContainerExecutor.class, ContainerExecutor.class), conf);
  try {
    exec.init();
  } catch (IOException e) {
    throw new YarnRuntimeException("Failed to initialize container executor", e);
  } 
  // ...省略部分代码
}
      也就是说,NodeManager进程中就一个ContainerExecutor实例,而LCEResourcesHandler又是在ContainerExecutor组件(这里是LinuxContainerExecutor)中实例化的,并且其配置Configuration实例与ContainerExecutor、NodeManager共用的一个,都是属于NodeManager级别的配置信息,如下:

@Override
public void setConf(Configuration conf) {
  super.setConf(conf);
  containerExecutorExe = getContainerExecutorExecutablePath(conf);
   
  resourcesHandler = ReflectionUtils.newInstance(
          conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
            DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
  resourcesHandler.setConf(conf);
  // ...省略部分代码
}

      这也就决定了通过参数配置这条道路是行不通的,因为配置是属于NodeManager的,应用无法修改。那么我们可以通过什么方式来实现呢?

      CgroupsLCEResourcesHandler主要是通过preExecute(ContainerId containerId, Resource containerResource)、postExecute(ContainerId containerId)两个方法实现的CPU CGroup限制时的环境设置,而它们的参数都有ContainerId,也就是它们是与容器息息相关的,通过仔细研究容器Container的相关代码,我找到了答案,那就是容器运行时的环境变量参数。而容器的getLaunchContext()方法提供了容器启动的上下文信息ContainerLaunchContext,这个上下文提供了获取和配置环境变量的getEnvironment()和setEnvironment()方法,通过它就能实现应用级别的CPU CGroup限制了。而MapReduce为我们提供了mapreduce.map.env、mapreduce.reduce.env、yarn.app.mapreduce.am.env三个参数,来分别设置Map容器、Reduce容器和AM容器时的环境变量。

三、优化方案

      1、添加ContainerExecutor组件

                 这个ContainerExecutor组件可以直接继承自LinuxContainerExecutor类,然后重写其launchContainer()方法,通过上述环境变量加ContainerID,在执行LinuxContainerExecutor的launchContainer()前设置一个容器级别的参数,如下:

package com.xxx.cgroup;
 
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
 
import java.io.IOException;
import java.util.List;
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 
public class XxxLinuxContainerExecutor extends LinuxContainerExecutor {
 
    @Override
    public int launchContainer(Container container, Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
            String user, String appId, Path containerWorkDir, List<String> localDirs, List<String> logDirs)
            throws IOException {
 
         
        boolean map_cgroup_para = Boolean.valueOf(container.getLaunchContext().getEnvironment().get("xxx_cpu_cgroup_map"));
        boolean reduce_cgroup_para = Boolean.valueOf(container.getLaunchContext().getEnvironment().get("xxx_cpu_cgroup_reduce"));
        boolean am_cgroup_para = Boolean.valueOf(container.getLaunchContext().getEnvironment().get("xxx_cpu_cgroup_am"));
 
        // 设置容器参数
        if (map_cgroup_para) {
            super.getConf().set(container.getContainerId().toString() + "_xxx_cpu_cgroup_map", String.valueOf(map_cgroup_para));
        }
        if (reduce_cgroup_para) {
            super.getConf().set(container.getContainerId().toString() + "_xxx_cpu_cgroup_reduce", String.valueOf(reduce_cgroup_para));
        }
        if (am_cgroup_para) {
            super.getConf().set(container.getContainerId().toString() + "_xxx_cpu_cgroup_am", String.valueOf(am_cgroup_para));
        }
 
        int i = super.launchContainer(container, nmPrivateCotainerScriptPath, nmPrivateTokensPath, user, appId,
                containerWorkDir, localDirs, logDirs);
 
        // 清空容器参数,防止配置Map数据量过大
        if (map_cgroup_para) {
            super.getConf().unset(container.getContainerId().toString() + "_xxx_cpu_cgroup_map");
        }
        if (reduce_cgroup_para) {
            super.getConf().unset(container.getContainerId().toString() + "_xxx_cpu_cgroup_reduce");
        }
        if (am_cgroup_para) {
            super.getConf().unset(container.getContainerId().toString() + "_xxx_cpu_cgroup_am");
        }
 
        return i;
    }
}

       然后通过配置yarn.nodemanager.container-executor.class为com.bfd.cgroup.XxxLinuxContainerExecutor。

      2、添加LCEResourcesHandler组件

         这个LCEResourcesHandler组件直接继承自原BfdCgroupsLCEResourcesHandler,然后覆写其中的三个方法,preExecute()、postExecute()、getResourcesOption(),通过容器相关的环境变量来决定

是否启用CPU CGroup,如下:

package com.xxx.cgroup;
 
import java.io.IOException;
 
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
 
public class XxxCgroupsLCEResourcesHandler extends CgroupsLCEResourcesHandler {
 
    public XxxCgroupsLCEResourcesHandler() {
        super();
    }
 
    @Override
    public void preExecute(ContainerId containerId, Resource containerResource) throws IOException {
 
        boolean map_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_map"));
        boolean reduce_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_reduce"));
        boolean am_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_am"));
 
        if (map_cgroup_para || reduce_cgroup_para || am_cgroup_para) {
            super.preExecute(containerId, containerResource);
        }
    }
 
    @Override
    public void postExecute(ContainerId containerId) {
 
        boolean map_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_map"));
        boolean reduce_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_reduce"));
        boolean am_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_am"));
 
        if (map_cgroup_para || reduce_cgroup_para || am_cgroup_para) {
            super.postExecute(containerId);
        }
    }
 
    @Override
    public String getResourcesOption(ContainerId containerId) {
 
        boolean map_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_map"));
        boolean reduce_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_reduce"));
        boolean am_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_am"));
 
        if (map_cgroup_para || reduce_cgroup_para || am_cgroup_para) {
            return super.getResourcesOption(containerId);
        } else {
 
            // 下面这些必须有,否则报错
            StringBuilder sb = new StringBuilder("cgroups=");
 
            if (sb.charAt(sb.length() - 1) == ‘,‘) {
                sb.deleteCharAt(sb.length() - 1);
            }
 
            return sb.toString();
        }
    }
}

        然后配置yarn.nodemanager.linux-container-executor.resources-handler.class为com.bfd.cgroup.XxxCgroupsLCEResourcesHandler。

      3、应用程序中设置环境变量参数

         应用程序中,如果想对CPU进行CGroup限制,需要配置以下三个环境变量,如下:

conf.set("mapreduce.map.env", "xxx_cpu_cgroup_map=true");
conf.set("mapreduce.reduce.env", "xxx_cpu_cgroup_reduce=true");
conf.set("yarn.app.mapreduce.am.env", "xxx_cpu_cgroup_am=true");

四、测试

        测试结果显示,上述改动能够实现应用级别的CPU CGroup限制,启动和不启动的执行时间明显有差距,且两者并行运行的话,也能实现该效果,说明参数能够达到应用级别(实际上是容器级别)而互不干扰。







Hadoop-2.6.0中关于控制应用是否通过CGroup限制CPU的优化