首页 > 代码库 > hive Context类和DriverContext类

hive Context类和DriverContext类

在hive的源码中经常可以看到Context类和DriverContext类,咋一看感觉这两个意思差不多,其实其作用区别还是蛮大的:

org.apache.hadoop.hive.ql.Context类
存储job的上下文信息,一个job创建一个Context对象,job运行完后,调用clear方法进行清除
1)初始化/创建/删除中间目录
中间的目录包括local job的和非local job的

protected int pathid = 10000;
private static final String MR_PREFIX = "-mr-";   //对应getMRTmpPath方法
private static final String EXT_PREFIX = "-ext-"; //对应getLocalTmpPath方法
private static final String LOCAL_PREFIX = "-local-"; //对应getExternalTmpPath方法
  private String nextPathId() {
    return Integer.toString(pathid++); //这导致最小的那个目录应该是1000
  }

 比如getMRTmpPath: 

  public Path getMRTmpPath() {
    return new Path(getMRScratchDir(), MR_PREFIX +
      nextPathId());
  }

产生的临时文件路径如:

hdfs://xxx:9000/tmp/hive-ericni/hive_2014-12-18_14-37-00_106_3507079460876567552-1/_tmp.-mr-10003

产生临时目录的调用方法如下:

创建临时目录的调用
getMRScratchDir--->getScratchDir方法或者getLocalScratchDir方法(getLocalScratchDir最终也是调用getScratchDir方法,不过传入的参数是localScratchDir)

private final Map<String, Path> fsScratchDirs = new HashMap<String, Path>();  //用来存放对应关系的hashmap
.....
  private Path getScratchDir(String scheme, String authority,
                               boolean mkdir, String scratchDir) {
    String fileSystem =  scheme + ":" + authority; //hdfs:10.100.90.204:9000
    Path dir = fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID());
     //第一次调用getScratchDir方法时,fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID())的值为null(其中
  
    if (dir == null) { //第一次运行时,为null,使用Utilities.createDirsWithPermission创建目录,权限为777(hive.scratch.dir.permission设置为777时)
      Path dirPath = new Path(scheme, authority,
          scratchDir + "-" + TaskRunner.getTaskRunnerID());
      if (mkdir) {
        try {
          FileSystem fs = dirPath.getFileSystem(conf);
          dirPath = new Path(fs.makeQualified(dirPath).toString());
          FsPermission fsPermission = new FsPermission(Short.parseShort(scratchDirPermission.trim(), 8));
          if (!Utilities.createDirsWithPermission(conf, dirPath, fsPermission)) {
            throw new RuntimeException("Cannot make directory: "
                                       + dirPath.toString());
          }
          if (isHDFSCleanup) {
            fs.deleteOnExit(dirPath);
          }
        } catch (IOException e) {
          throw new RuntimeException (e);
        }
      }
      dir = dirPath;
      fsScratchDirs.put(fileSystem + "-" + TaskRunner.getTaskRunnerID(), dir);
    }
    return dir;
  }

2)提供封装方法,操作一些其他的信息

比如isLocalOnlyExecutionMode  job是否为localmode setHiveLocks 设置锁的信息,
setHiveTxnManager设置锁的管理类,
getHiveTxnManager获取锁的管理类setNeedLockMgr 设置是否需要锁,
isNeedLockMgr返回是否需要锁等

org.apache.hadoop.hive.ql.DriverContext类

是和job 启动有关系的类,主要初始化两个queue,一个用来存在以及启动的job,一个用来存放可以启动的job

  private Queue<Task<? extends Serializable>> runnable;
  private Queue<TaskRunner> running;
  public DriverContext(Context ctx) {
    this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
    this.running = new LinkedBlockingQueue<TaskRunner>();
    this.ctx = ctx;
  }

常用方法:

addToRunnable,把Task加到到runnable队列中:

public synchronized boolean addToRunnable(Task<? extends Serializable> tsk) throws HiveException {
    if (runnable.contains(tsk)) {
      return false;
    }
    checkShutdown();
    runnable.add(tsk);
    tsk.setQueued();
    return true;
  }

launching 把TaskRunner对象加入到running队列中,其中TaskRunner是一个线程类,用来启动Task:

public synchronized void launching(TaskRunner runner) throws HiveException { 
    checkShutdown();
    running.add(runner);
  }

getRunnable和job的并发launch有关(默认hive.exec.parallel设置为false),在开启并发launch job时,如果runnable中还有元素,并且running的队列大小小于设置的线程数(默认hive.exec.parallel.thread.number设置为8),则取出runnable中第一个元素,并最终加入到running中

public synchronized Task<? extends Serializable> getRunnable( int maxthreads) throws HiveException {
    checkShutdown();
    if ( runnable.peek() != null && running.size() < maxthreads) {
      return runnable .remove();
    }
    return null ;
  }

pollFinished,从running的队列中获取TaskRunner,直到running队列为空,也就是等待所有job运行完毕:

   public synchronized TaskRunner pollFinished() throws InterruptedException {
    while (! shutdown) {
      Iterator< TaskRunner> it = running.iterator();
      while (it.hasNext()) {
        TaskRunner runner = it.next();
        if (runner != null && !runner.isRunning()) {
          it.remove();
          return runner;
        }
      }
      wait( SLEEP_TIME);
    }
    return null ;
  }

本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1591569

hive Context类和DriverContext类