首页 > 代码库 > HBase源代码分析之HRegionServer上MemStore的flush处理流程(二)
HBase源代码分析之HRegionServer上MemStore的flush处理流程(二)
继上篇文章《HBase源代码分析之HRegionServer上MemStore的flush处理流程(一)》遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程。重点讲述下怎样选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是怎样发起的。
我们先来看下第一个问题:怎样选择一个HRegion进行flush以缓解MemStore压力。上文中我们讲到过flush处理线程假设从flushQueue队列中拉取出的一个FlushQueueEntry为为空,或者为WakeupFlushThread,而且通过isAboveLowWaterMark()方法推断全局MemStore的大小高于限制值得低水平线,调用flushOneForGlobalPressure()方法,依照一定策略,flush一个HRegion的MemStore,减少MemStore的大小。预防OOM等异常情况的发生。
以下。我们重点分析下flushOneForGlobalPressure()方法,代码例如以下:
/** * The memstore across all regions has exceeded the low water mark. Pick * one region to flush and flush it synchronously (this is called from the * flush thread) * * 全部region的memstore已超过最低水平。 * 选择一个region同步刷新。 * 被flush线程调用 * * @return true if successful */ private boolean flushOneForGlobalPressure() { // 获取RegionServer上的在线Region,依据Region的memstoreSize大小倒序排列。得到regionsBySize SortedMap<Long, HRegion> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); // 构造被排除的Region集合excludedRegions Set<HRegion> excludedRegions = new HashSet<HRegion>(); boolean flushedOne = false;// 标志位 while (!flushedOne) {// 循环一次,没有选中的话,再循环,直到选中或者没有可选的Region // Find the biggest region that doesn't have too many storefiles // (might be null!) // 选择一个Memstore最大的而且不含太多storefiles的region作为最有可能被选中的region,即bestFlushableRegion HRegion bestFlushableRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, true); // Find the biggest region, total, even if it might have too many flushes. // 选择一个Memstore最大的region,即便是它包括太多storefiles,作为终于可以被选中的备份方案,即bestAnyRegion HRegion bestAnyRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, false); // 在内存上阈值之上可是没有可以flush的region的话,直接返回false if (bestAnyRegion == null) { LOG.error("Above memory mark but there are no flushable regions!"); return false; } HRegion regionToFlush; // 选择须要flush的region // 假设bestAnyRegion的的memstore大小大于bestFlushableRegion的两倍。则选取bestAnyRegion if (bestFlushableRegion != null && bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { // Even if it's not supposed to be flushed, pick a region if it's more than twice // as big as the best flushable one - otherwise when we're under pressure we make // lots of little flushes and cause lots of compactions, etc, which just makes // life worse! if (LOG.isDebugEnabled()) { LOG.debug("Under global heap pressure: " + "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is " + StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + " vs best flushable region's " + StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + ". Choosing the bigger."); } regionToFlush = bestAnyRegion; } else {// 否则。优先选取bestFlushableRegion if (bestFlushableRegion == null) { regionToFlush = bestAnyRegion; } else { regionToFlush = bestFlushableRegion; } } // 检測状态:被选中Region的memstoreSize必须大于0 Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); // 调用flushRegion()方法。针对单个Region,进行MemStore的flush flushedOne = flushRegion(regionToFlush, true); if (!flushedOne) {// flush失败则加入到excludedRegions集合中。避免下次再被选中 LOG.info("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."); excludedRegions.add(regionToFlush); } } return true; }我们来总结下这种方法的处理逻辑。例如以下:
1、获取RegionServer上的在线Region。依据Region的memstoreSize大小倒序排列,得到regionsBySize。
2、构造被排除的Region集合excludedRegions;
3、标志位flushedOne设置为false;
4、循环。直到标志位flushedOne为true,即存在Region被选中,或者根本没有可选的Region:
4.1、循环regionsBySize,选择一个Memstore最大的而且不含太多storefiles的region作为最有可能被选中的region,即bestFlushableRegion:
4.1.1、假设当前region在excludedRegions列表中,直接跳过;
4.1.2、假设当前region的写状态为正在flush,或者当前region的写状态不是写启用,直接跳过;
4.1.3、假设须要检查StoreFile数目,且包括太多StoreFiles。也直接跳过;
4.1.4、否则返回该region;
4.2、循环regionsBySize。选择一个Memstore最大的region,即便是它包括太多storefiles,作为终于能够被选中的备份方案,即bestAnyRegion:
4.2.1、假设当前region在excludedRegions列表中。直接跳过;
4.2.2、假设当前region的写状态为正在flush,或者当前region的写状态不是写启用,直接跳过;
4.2.3、否则返回该region。
4.3、在内存上阈值之上可是没有可以flush的region的话。直接返回false。
4.4、选择须要flush的region:
4.4.1、假设bestAnyRegion的的memstore大小大于bestFlushableRegion的两倍。则选取bestAnyRegion;
4.4.2、否则,优先选取bestFlushableRegion;
4.5、检測状态:被选中Region的memstoreSize必须大于0。
4.6、调用flushRegion()方法,针对单个Region。进行MemStore的flush;
4.7、flush失败则加入到excludedRegions集合中,避免下次再被选中。
以上就是依照一定策略选择一个HRegion进行MemStore的flush以缓解MemStore压力的方法。那么,剩下的flush指定HRegion的问题就同接下来我们将要讲的HRegion的flush是怎样发起的一致了。我们先看下带一个參数的flushRegion()方法。代码例如以下:
/* * A flushRegion that checks store file count. If too many, puts the flush * on delay queue to retry later. * * 一个待刷新的Region首先会检測store file的数目,假设太多。会把该region的刷新推迟并稍后再试,否则马上刷新。 * * @param fqe * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the region was * not flushed. */ private boolean flushRegion(final FlushRegionEntry fqe) { HRegion region = fqe.region; if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {// 假设Region不是MetaRegion且Region上有太多的StoreFiles if (fqe.isMaximumWait(this.blockingWaitTime)) { // 假设已堵塞指定时间。记录日志并运行刷新 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) + "ms on a compaction to clean up 'too many store files'; waited " + "long enough... proceeding with flush of " + region.getRegionNameAsString()); } else { // If this is first time we've been put off, then emit a log message. // 假设是第一次推迟,并对该HRegion请求分裂或系统合并。记录一条日志信息 if (fqe.getRequeueCount() <= 0) { // Note: We don't impose blockingStoreFiles constraint on meta regions // 注意:我们不强加blockingstorefiles约束元区域 LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); // 对该HRegion先请求分裂Split,分裂不成功的话再请求系统合并SystemCompaction if (!this.server.compactSplitThread.requestSplit(region)) { try { this.server.compactSplitThread.requestSystemCompaction( region, Thread.currentThread().getName()); } catch (IOException e) { LOG.error( "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()), RemoteExceptionHandler.checkIOException(e)); } } } // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. // 再放回队列,等待900ms(參数可配置)后。再从队列中取出来 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); // Tell a lie, it's not flushed but it's ok // 佯言。该Region没有被flush,可是应该返回true return true; } } // 调用两个參数的flushRegion()方法。通知HRegion运行flush return flushRegion(region, false); }这个带一个參数的flushRegion()方法。实际上是在拿到一个待flush的HRegion的封装体FlushRegionEntry类型的fqe后,对其做一些必要的推断,决定是直接进行flush还是推后运行,且在第一次推后前。假设须要。则做分裂或系统合并处理。详细处理逻辑例如以下:
1、假设Region不是MetaRegion且Region上有太多的StoreFiles:
1.1、通过isMaximumWait()推断堵塞时间,已堵塞达到或超过指定时间。记录日志并运行flush,跳到2,结束。
1.2、假设是第一次推迟。记录一条日志信息。然后对该HRegion先请求分裂Split。分裂不成功的话再请求系统合并SystemCompaction。
1.3、再将fqe放回到队列flushQueue。添加延迟时间900ms(參数可配置),等到到期后再从队列中取出来进行处理;
1.4、佯言,该Region被推迟进行flush,结果还不确定,所以应该返回true;
2、调用两个參数的flushRegion()方法,通知HRegion运行flush。
怎样进行堵塞时间的推断呢?非常easy。推断当前时间减去创建时间是否大于指定时间就OK了。代码例如以下:
/** * @param maximumWait * @return True if we have been delayed > <code>maximumWait</code> milliseconds. */ public boolean isMaximumWait(final long maximumWait) { return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; }
好了,是时候该分析这个带有两个參数的flushRegion()方法了。先上代码。再做分析:
/* * Flush a region. * @param region Region to flush. * @param emergencyFlush Set if we are being force flushed. If true the region * needs to be removed from the flush queue. If false, when we were called * from the main flusher run loop and we got the entry to flush by calling * poll on the flush queue (which removed it). * * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the region was * not flushed. * * 刷新region */ private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { long startTime = 0; synchronized (this.regionsInQueue) { // 先从regionsInQueue中移除相应的HRegion信息 FlushRegionEntry fqe = this.regionsInQueue.remove(region); // Use the start time of the FlushRegionEntry if available if (fqe != null) { // 获取flush的開始时间startTime startTime = fqe.createTime; } if (fqe != null && emergencyFlush) { // Need to remove from region from delay queue. When NOT an // emergencyFlush, then item was removed via a flushQueue.poll. // 须要从flushQueue队列中移除,假设不是紧急刷新,fqe将通过flushQueue.poll被移除 // 由于假设是flush线程处理的,run()方法会周期性的从flushQueue队列取feq,而且假设取出的为null或者WakeupFlushThread, // 它会在MemStore位于低水平线上时。依照一定策略选择一个HRegion。包装成fqe进行flush。以减少MemStore,避免OOM等风险。 // 此时。假设fqe位于flushQueue中,须要被移除,移除的推断就是这个emergencyFlush是否为true, // 由于通过线程在到期的正常情况下进行处理的,会传入false,而为减少风险进行紧急flush的,会传入true,此时就须要从队列中移除,也是为了避免做反复工作 flushQueue.remove(fqe); } } // 获取flush的開始时间startTime if (startTime == 0) { // Avoid getting the system time unless we don't have a FlushRegionEntry; // shame we can't capture the time also spent in the above synchronized // block startTime = EnvironmentEdgeManager.currentTime(); } // 上读锁,意味着与其它拥有读锁的线程不冲突,能够同步进行,而与拥有写锁的线程相互排斥 lock.readLock().lock(); try { // 通过监听器Listener通知flush请求者flush的type notifyFlushRequest(region, emergencyFlush); // 调用HRegion的flushcache()方法,运行MemStore的flush HRegion.FlushResult flushResult = region.flushcache(); // 依据flush的结果,推断下一步该做怎样处理 // 推断是否应该进行合并compact boolean shouldCompact = flushResult.isCompactionNeeded(); // We just want to check the size // 检測是否应该进行分裂split boolean shouldSplit = region.checkSplit() != null; // 必要的情况下,先进行split,再进行system compact if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { server.compactSplitThread.requestSystemCompaction( region, Thread.currentThread().getName()); } // 假设flush成功,获取flush结束时间。计算耗时,记录HRegion上的度量信息 if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); server.metricsRegionServer.updateFlushTime(endTime - startTime); } } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical // section, we get a DroppedSnapshotException and a replay of wal // is required. Currently the only way to do this is a restart of // the server. Abort because hdfs is probably bad (HBASE-644 is a case // where hdfs was bad but passed the hdfs check). server.abort("Replay of WAL required. Forcing server shutdown", ex); return false; } catch (IOException ex) { LOG.error("Cache flush failed" + (region != null ?带有两个參数的flushRegion()方法大体逻辑例如以下:(" for region " + Bytes.toStringBinary(region.getRegionName())) : ""), RemoteExceptionHandler.checkIOException(ex)); if (!server.checkFileSystem()) { return false; } } finally { // 释放读锁 lock.readLock().unlock(); // 唤醒堵塞的其它线程 wakeUpIfBlocking(); } return true; }
1、首选处理regionsInQueue集合和flushQueue队列:
1.1、先从regionsInQueue中移除相应的HRegion信息,这个不管是否紧急flush,都是必需要做的;
1.2、获取flush的開始时间startTime;
1.3、假设是紧急刷新,须要从flushQueue队列中移除相应的fqe,假设不是紧急刷新,fqe将通过flushQueue.poll被移除。
2、假设startTime为null,获取flush的開始时间startTime。
3、上读锁,意味着与其它拥有读锁的线程不冲突。能够同步进行,而与拥有写锁的线程相互排斥(后期将会写专门的文章分析HBase内部各流程中锁的应用)。
4、通过监听器Listener通知flush请求者flush的type;
5、调用HRegion的flushcache()方法。运行MemStore的flush。并获得flush结果;
6、依据flush的结果,推断下一步该做怎样处理:
6.1、依据flush结果推断是否应该进行合并compact。即标志位shouldCompact;
6.2、调用HRegion的checkSplit()方法检測是否应该进行分裂split。即标志位shouldSplit;
6.3、通过两个标志位推断,必要的情况下,先进行split,再进行system compact;
7、假设flush成功,获取flush结束时间,计算耗时,记录HRegion上的度量信息。
8、最后释放读锁。唤醒堵塞的其它线程。
这里,先有必要解释下对flushQueue的特殊处理,假设是紧急刷新,须要从flushQueue队列中移除相应的fqe,假设不是紧急刷新,fqe将通过flushQueue.poll被移除。
由于假设是flush线程处理的,run()方法会周期性的从flushQueue队列取feq,而且假设取出的为null或者WakeupFlushThread,它会在MemStore位于低水平线上时。依照一定策略选择一个HRegion,包装成fqe进行flush,以减少MemStore,避免OOM等风险,此时,假设fqe位于flushQueue中。须要被移除,移除的推断就是这个emergencyFlush是否为true。由于通过线程在到期的正常情况下进行处理的。会传入false,而为减少风险进行紧急flush的。会传入true,此时就须要从队列中移除,也是为了避免做反复工作。
通过监听器Listener通知flush请求者flush的type也非常easy。也做凝视了,不再解释。代码例如以下:
private void notifyFlushRequest(HRegion region, boolean emergencyFlush) { // 默认类型为 FlushType.NORMAL FlushType type = FlushType.NORMAL; // 假设是紧急刷新,跟是否在高水位线上来确定type,高水位线上为FlushType.ABOVE_HIGHER_MARK。低水位线上为FlushType.ABOVE_LOWER_MARK if (emergencyFlush) { type = isAboveHighWaterMark() ?最后再说说这个flush结果FlushResult,它是HRegion中的一个静态内部类。包括一个Result枚举,当中包括的flush结果例如以下:FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK; } // 针对监听器逐个加入region、type for (FlushRequestListener listener : flushRequestListeners) { listener.flushRequested(type, region); } }
1、FLUSHED_NO_COMPACTION_NEEDED:flush成功,可是不须要运行compact;
2、FLUSHED_COMPACTION_NEEDED:flush成功,同一时候须要运行compact;
3、CANNOT_FLUSH_MEMSTORE_EMPTY:无法进行flush。由于MemStore为空;
4、CANNOT_FLUSH:无法进行flush。
推断flush是否成功。则就是看result是否为FLUSHED_NO_COMPACTION_NEEDED或FLUSHED_COMPACTION_NEEDED。推断是否须要进行compact,则就是看result是否为FLUSHED_COMPACTION_NEEDED。
相关代码例如以下:
/** * Convenience method, the equivalent of checking if result is * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED. * @return true if the memstores were flushed, else false. */ public boolean isFlushSucceeded() { return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result .FLUSHED_COMPACTION_NEEDED; } /** * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED. * @return True if the flush requested a compaction, else false (doesn't even mean it flushed). */ public boolean isCompactionNeeded() { return result == Result.FLUSHED_COMPACTION_NEEDED; }至此,HRegionServer上MemStore的flush处理流程所有分析完成。末尾关于split、compact,兴许会有专门的文章进行介绍,敬请关注本人博客。谢谢!
HBase源代码分析之HRegionServer上MemStore的flush处理流程(二)