首页 > 代码库 > hbase源码系列(七)Snapshot的过程

hbase源码系列(七)Snapshot的过程

  在看这一章之前,建议大家先去看一下snapshot的使用。可能有人会有疑问为什么要做Snapshot,hdfs不是自带了3个备份吗,这是个很大的误区,要知道hdfs的3个备份是用于防止网络传输中的失败或者别的异常情况导致数据块丢失或者不正确,它不能避免人为的删除数据导致的后果。它就想是给数据库做备份,尤其是做删除动作之前,不管是hbase还是hdfs,请经常做Snapshot,否则哪天手贱了。。。

  直接进入主题吧,上代码。

public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
    // 清空之前完成的备份和恢复的任务
cleanupSentinels(); // 设置snapshot的版本 snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION) .build(); // if the table is enabled, then have the RS run actually the snapshot work TableName snapshotTable = TableName.valueOf(snapshot.getTable()); AssignmentManager assignmentMgr = master.getAssignmentManager(); //根据表的状态选择snapshot的类型 if (assignmentMgr.getZKTable().isEnabledTable(snapshotTable)) { snapshotEnabledTable(snapshot); } // 被禁用的表走这个方法 else if (assignmentMgr.getZKTable().isDisabledTable(snapshotTable)) { snapshotDisabledTable(snapshot); } else { throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot); } }

  从代码上看得出来,启用的表和被禁用的表走的是两个不同的方法。

Snapshot启用的表

  先看snapshotEnabledTable方法吧,看看在线的表是怎么备份的。

private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
      throws HBaseSnapshotException {
    // snapshot准备工作
    prepareToTakeSnapshot(snapshot);
    
    // new一个handler
    EnabledTableSnapshotHandler handler =
        new EnabledTableSnapshotHandler(snapshot, master, this);
    //通过handler线程来备份
    snapshotTable(snapshot, handler);
 }

  这里就两步,先去看看snapshot前的准备工作吧,F3进入prepareToTakeSnapshot方法。这个方法里面也没干啥,就是检查一下是否可以对这个表做备份或者恢复的操作,然后就会重建这个工作目录,这个工作目录在.hbase-snapshot/.tmps下面,每个snapshot都有自己的目录。

  在snapshotTable里面把就线程提交一下,让handler来处理了。

handler.prepare();
this.executorService.submit(handler);
this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);

  这些都不是重点,咱到handler那边去看看吧,EnabledTableSnapshotHandler是继承TakeSnapshotHandler的,prepare方法和process方法都一样,区别在于snapshotRegions方法被重写了。

看prepare方法还是检查表的定义文件在不在,我们直接进入process方法吧。

  

      // 把snapshot的信息写入到工作目录
      SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, this.fs);
      // 开一个线程去复制表信息文件
      new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
      monitor.rethrowException();
      //查找该表相关的region和位置
      List<Pair<HRegionInfo, ServerName>> regionsAndLocations =
          MetaReader.getTableRegionsAndLocations(this.server.getCatalogTracker(),
              snapshotTable, false);

      // 开始snapshot
      snapshotRegions(regionsAndLocations);

      // 获取serverNames列表,后面的校验snapshot用到
      Set<String> serverNames = new HashSet<String>();
      for (Pair<HRegionInfo, ServerName> p : regionsAndLocations) {
        if (p != null && p.getFirst() != null && p.getSecond() != null) {
          HRegionInfo hri = p.getFirst();
          if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
          serverNames.add(p.getSecond().toString());
        }
      }

      // 检查snapshot是否合格
      status.setStatus("Verifying snapshot: " + snapshot.getName());
      verifier.verifySnapshot(this.workingDir, serverNames);
// 备份完毕之后,把临时目录转移到正式的目录
      completeSnapshot(this.snapshotDir, this.workingDir, this.fs);

  1、写一个.snapshotinfo文件到工作目录下

  2、把表的定义信息写一份到工作目录下,即.tabledesc文件

  3、查找和表相关的Region Server和机器

  4、开始备份

  5、检验snapshot的结果

  6、确认没问题了,就把临时目录rename到正式目录

  

  我们直接到备份这一步去看吧,方法在EnabledTableSnapshotHandler里面,重写了。

  // 用分布式事务来备份在线的,太强悍了
    Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(),
      this.snapshot.toByteArray(), Lists.newArrayList(regionServers));
    try {
      // 等待完成
proc.waitForCompleted();
    // 备份split过的region Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); for (Pair<HRegionInfo, ServerName> region : regions) { HRegionInfo regionInfo = region.getFirst(); if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) { if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) { LOG.info("Take disabled snapshot of offline region=" + regionInfo); snapshotDisabledRegion(regionInfo); } } }

  这里用到一个分布式事务,这里被我叫做分布式事务,我也不知道它是不是事务,但是Procedure这个词我真的不好翻译,叫过程也不合适。

分布式事务

  我们进入ProcedureCoordinator的startProcedure看看吧。

Procedure proc = createProcedure(fed, procName, procArgs,expectedMembers);
if (!this.submitProcedure(proc)) {
      LOG.error("Failed to submit procedure ‘" + procName + "‘");
      return null;
}

  先创建Procedure,然后提交它,这块没什么特别的,继续深入进去submitProcedure方法也找不到什么有用的信息,我们得回到Procedure类里面去,它是一个Callable的类,奥秘就在call方法里面。

  final public Void call() {
    try {
//在acquired节点下面建立实例节点 sendGlobalBarrierStart(); // 等待所有的rs回复 waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
//在reached节点下面建立实例节点 sendGlobalBarrierReached(); //等待所有的rs回复 waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released"); } finally { sendGlobalBarrierComplete(); completedLatch.countDown(); }
}

 

   从sendGlobalBarrierStart开始看吧,里面就一句话。

  coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));

 

   再追杀下去。

final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
      throws IOException, IllegalArgumentException {
    String procName = proc.getName();
    // 获取abort节点的名称
    String abortNode = zkProc.getAbortZNode(procName);
    try {
      // 如果存在abort节点,就广播错误,中断该过程
      if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
        abort(abortNode);
      }
    } catch (KeeperException e) {throw new IOException("Failed while watching abort node:" + abortNode, e);
    }

    // 获得acquire节点名称
    String acquire = zkProc.getAcquiredBarrierNode(procName);try {
      // 创建acquire节点
      byte[] data =http://www.mamicode.com/ ProtobufUtil.prependPBMagic(info); 
      ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);// 监控acquire下面的节点,发现指定的节点,就报告给coordinator
      for (String node : nodeNames) {
        String znode = ZKUtil.joinZNode(acquire, node);if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
          coordinator.memberAcquiredBarrier(procName, node);
        }
      }
    } catch (KeeperException e) {
      throw new IOException("Failed while creating acquire node:" + acquire, e);
    }
  }

 

   1、首先是检查abortNode ,什么是abortNode ?每个procName在zk下面都有一个对应的节点,比如snapshot,然后在procName下面又分了acquired、reached、abort三个节点。检查abort节点下面有没有当前的实例。

  2、在acquired节点为该实例创建节点,创建完成之后,在该实例节点下面监控各个Region Server的节点。如果发现已经有了,就更新Procedure中的acquiringMembers列表和inBarrierMembers,把节点从

acquiringMembers中删除,然后添加到inBarrierMembers列表当中。

  3、到这一步服务端的工作就停下来了,等到所有RS接受到指令之后在acquired节点下创建节点。

  4、收到所有RS的回复之后,它才会开始在reached节点创建实例节点,然后继续等待。

  5、RS完成任务之后,在reached的实例节点下面创建相应的节点,然后回复。

  6、在确定所有的RS都完成工作之后,清理zk当中的相应proName节点。

  注意:在这个过程当中,有任务的错误,都会在abort节点下面建立该实例的节点,RS上面的子过程一旦发现abort存在该节点的实例,就会取消该过程。

 

  Snapshot这块在Region Server是由RegionServerSnapshotManager类里面的ProcedureMemberRpcs负责监测snapshot下面的节点变化,当发现acquired下面有实例之后,启动新任务。

public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
      throws KeeperException {
    this.zkController = new ZKProcedureUtil(watcher, procType) {
      @Override
      public void nodeCreated(String path) {
        if (!isInProcedurePath(path)) {
          return;
        }

        String parent = ZKUtil.getParent(path);
        // if its the end barrier, the procedure can be completed
        if (isReachedNode(parent)) {
          receivedReachedGlobalBarrier(path);
          return;
        } else if (isAbortNode(parent)) {
          abort(path);
          return;
        } else if (isAcquiredNode(parent)) {
          startNewSubprocedure(path);
        } else {
          LOG.debug("Ignoring created notification for node:" + path);
        }
      }

     };
  }
View Code

  这块折叠起来,不是咱们的重点,让大家看看而已。我们直接进入Subprocedure这个类里面看看吧。

final public Void call() {
    
    try {
      // 目前是么也没干
acquireBarrier(); // 在acquired的实例节点下面建立rs的节点
rpcs.sendMemberAcquired(this); // 等待reached的实例节点的建立 waitForReachedGlobalBarrier(); // 干活
insideBarrier(); // 活干完了
rpcs.sendMemberCompleted(this); } catch (Exception e) { } finally { releasedLocalBarrier.countDown(); } }

  insideBarrier的实现在FlushSnapshotSubprocedure这个类里面,调用了flushSnapshot(),这个方法给每个region都开一个线程去提交。

for (HRegion region : regions) {
      taskManager.submitTask(new RegionSnapshotTask(region));
 }

  

Snapshot在线的region

  我们接下来看看RegionSnapshotTask的call方法

public Void call() throws Exception {
      // 上锁,暂时不让读了
      region.startRegionOperation();
      try {
        region.flushcache();
        region.addRegionToSnapshot(snapshot, monitor);
      } finally {
        LOG.debug("Closing region operation on " + region);
        region.closeRegionOperation();
      }
      return null;
    }
}

  

  在对region操作之前,先上锁,不让读了。然后就flushCache,这个方法很大,也好难懂哦,不过我们还是要迎接困难上,我折叠起来吧,想看的就看,不想看的就看我下面的写的步骤吧。

   MultiVersionConsistencyControl.WriteEntry w = null;
    this.updatesLock.writeLock().lock();
    long flushsize = this.memstoreSize.get();
    List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
    long flushSeqId = -1L;
    //先flush日志,再flush memstore到文件
    try {
      // Record the mvcc for all transactions in progress.
      w = mvcc.beginMemstoreInsert();
      mvcc.advanceMemstore(w);

      if (wal != null) {
        //准备flush日志,进入等待flush的队列,这个startSeqId很重要,在恢复的时候就靠它了,它之前的日志就是已经flush过了,不用恢复
        Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
        if (startSeqId == null) {
          return false;
        }
        flushSeqId = startSeqId.longValue();
      } else {
        flushSeqId = myseqid;
      }

      for (Store s : stores.values()) {
        storeFlushCtxs.add(s.createFlushContext(flushSeqId));
      }

      // 给MemStore做个snapshot,它的内部是两个队列,实际是从一个经常访问的队列放到另外一个不常访问的队列,那个队列名叫snapshot
      for (StoreFlushContext flush : storeFlushCtxs) {
        flush.prepare();
      }
    } finally {
      this.updatesLock.writeLock().unlock();
    }
    // 同步未flush的日志到硬盘上
    if (wal != null && !shouldSyncLog()) {
      wal.sync();
    }

    // 等待日志同步完毕
    mvcc.waitForRead(w);
    boolean compactionRequested = false;
    try {//把memstore中的keyvalues全部flush到storefile保存在临时目录当中,把flushSeqId追加到storefile里
      for (StoreFlushContext flush : storeFlushCtxs) {
        flush.flushCache(status);
      }
      // 把之前生成在临时目录的文件转移到正式目录
      for (StoreFlushContext flush : storeFlushCtxs) {
        boolean needsCompaction = flush.commit(status);
        if (needsCompaction) {
          compactionRequested = true;
        }
      }
      storeFlushCtxs.clear();
      // flush之后,就要减掉相应的memstore的大小
      this.addAndGetGlobalMemstoreSize(-flushsize);
View Code

  1、获取WAL日志的flushId(要写入到hfile当中,以后恢复的时候,要拿日志的flushId和hfile的flushId对比,小于hfile的flushId的就不用恢复了)

  2、给MemStore的做snapshot,从kvset集合转移到snapshot集合

  3、同步日志,写入到硬盘

  4、把MemStore的的snapshot集合当中的内容写入到hfile当中,MemStore当中保存的是KeyValue的集合,写入其实就是一个循环,调用StoreFile.Writer的append方法追加,具体的可以看我的那篇博客《非mapreduce生成Hfile,然后导入hbase当中》

  5、上一步的生成的文件是保存在临时目录中的,转移到正式的目录当中

  6、更新MemStore当中的大小

  

  好,我们继续看addRegionToSnapshot方法,好累啊,尼玛,这么多步骤。

public void addRegionToSnapshot(SnapshotDescription desc,
      ForeignExceptionSnare exnSnare) throws IOException {// 获取工作目录
    Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
    Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);

    // 1. 在工作目录创建region目录和写入region的信息
    HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
        this.fs.getFileSystem(), snapshotDir, getRegionInfo());

    // 2. 为hfile创建引用
    for (Store store : stores.values()) {
      // 2.1. 分列族为store创建引用目录,每个store属于不同的列族
      Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
      List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());// 2.2. 遍历hfile,然后创建引用
int sz = storeFiles.size(); for (int i = 0; i < sz; i++) { StoreFile storeFile = storeFiles.get(i); Path file = storeFile.getPath(); Path referenceFile = new Path(dstStoreDir, file.getName()); boolean success = true; if (storeFile.isReference()) { // 把旧的引用文件的内容写入到新的引用文件当中 storeFile.getFileInfo().getReference().write(fs.getFileSystem(), referenceFile); } else { // 创建一个空的引用文件
success = fs.getFileSystem().createNewFile(referenceFile); } if (!success) { throw new IOException("Failed to create reference file:" + referenceFile); } } } }

  在工作目录在.hbase-snapshot/.tmps/snapshotName/region/familyName/下面给hfile创建引用文件。在创建引用文件的时候,还要先判断一下这个所谓的hfile是不是真的hfile,还是它本身就是一个引用文件了。

  如果已经是引用文件的话,把旧的引用文件里面的内容写入到新的引用文件当中。

  如果是一个正常的hfile的话,就创建一个空的引用文件即可,以后我们可以通过它的名字找到它在snapshot下面相应的文件。

  okay,到这里,每个RS的工作都完成了。

备份split过的region

  完成执行分布式事务,就是备份split过的region了,把之前的代码再贴一次吧,折叠起来,需要的自己看。

if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) {
    if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) {
          LOG.info("Take disabled snapshot of offline region=" + regionInfo);
          snapshotDisabledRegion(regionInfo);
     }
}
View Code
protected void snapshotDisabledRegion(final HRegionInfo regionInfo)
      throws IOException {
    // 创建新的region目录和region信息
    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs,
      workingDir, regionInfo);

     // 把region下的recovered.edits目录的文件复制snapshot的对应region下面
    Path regionDir = HRegion.getRegionDir(rootDir, regionInfo);
    Path snapshotRegionDir = regionFs.getRegionDir();
    new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).call();
    
    // 给每个列族的下面的文件创建引用,所谓引用就是一个同名的空文件
new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir).call(); }

  备份启用的表,现在已经结束了,但是备份禁用的表吧,前面说了区别是snapshotRegions方法,但是方法除了做一些准备工作之外,就是snapshotDisabledRegion。。。。所以snapshot到这里就完了,下面我们再回顾一遍吧。

  1、进行snapshot之前的准备,创建目录,复制一些必要的信息文件等。

  2、对于启用的表,启动分布式事务,RS接到任务,flush掉WAL日志和MemStore的数据,写入文件。

  3、为hfile创建引用文件,这里的引用文件居然是空的文件,而且名字一样,它不是真的备份hfile,这是什么回事呢?这个要到下一章,从snapshot中恢复,才能弄明白了,这个和hbase的归档文件机制有关系,hbase删除文件的时候,不是直接删除,而是把它先放入archive文件夹内。