首页 > 代码库 > HDFS“慢节点”监控分析功能

HDFS“慢节点”监控分析功能

前言


当集群规模在日益变大的时候,往往有的时候出现机器的老化,而这些“老化”的机器又会表现出一些奇怪的特征:“磁盘读写慢”、“网络数据传输慢”等。对于前者,曾经笔者写过一篇Hadoop节点”慢磁盘”监控的解决方案,当然社区目前已有更好的方案: HDFS-10959(Adding per disk IO statistics and metrics in DataNode)。而对于后者,我们同样需要有相应的监控方案,方便让我们这类异常的节点。此功能的实现于最近刚刚完成的 HDFS-11194(Maintain aggregated peer performance metrics on NameNode)。本文笔者来讲解讲解这个功能的设计思路以及它是如何做到对于“数据传输慢”节点的监控的。

HDFS“慢节点”监控的设计


HDFS“慢节点”监控分析功能的内部其实可以划分为2个部分。第一个部分是监控数据的采集,这里监控数据对应的是网络数据传输的耗时。第二个则是监控数据的汇总处理。在这个过程中会进行一定的筛选比较,然后给出分析报告

与笔者之前提过的纯Metric统计方案有所不同,HDFS-11194在实现这个功能的时候,还定义了一个SlowPeerReports这样的对象,这个对象内部包含的数据就是“慢”节点的数据以及对应的耗时时间。先抛开这个slow report,如果是纯Metric的统计方案,有什么不好的地方呢?笔者认为有以下原因:

Metrics方便用户查阅,不方便用户获取其所包含的统计数据。

于是,在这里设计者定义来了SlowPeerReports对象老包装的这样的数据,然后作为心跳数据的一部分,发给NameNode。最终达到的目的是:用户可以通过简单的jmx接口就能获取这些慢节点的数据了

下图是此功能的简单结构图。


技术分享
图 1-1 HDFS”慢节点”监控功能结构图

HDFS”慢节点”监控功能的实现


数据的采集


正如上一部分所提到,这里的“慢”指的是“网络数据传输慢”。所以我们需要在HDFS数据传输的操作上做一个耗时统计,此处添加监控的位置在BlockReceiver的receivePacket方法。receivePacket方法的作用正如其名称所表示的意思:接收和处理数据包。代码如下:

  private int receivePacket() throws IOException {
    // read the next packet
    packetReceiver.receiveNextPacket(in);
    ...
    //First write the packet to the mirror:
    if (mirrorOut != null && !mirrorError) {
      try {
        // 记住开始时间
        long begin = Time.monotonicNow();
        // For testing. Normally no-op.
        DataNodeFaultInjector.get().stopSendingPacketDownstream(mirrorAddr);
        packetReceiver.mirrorPacketTo(mirrorOut);
        mirrorOut.flush();
        // 获取目前时间
        long now = Time.monotonicNow();
        setLastSentTime(now);
        // 计算数据传输耗时
        long duration = now - begin;
        DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
            mirrorAddr,
            duration);
        // 加入到metric统计中
        trackSendPacketToLastNodeInPipeline(duration);
        if (duration > datanodeSlowLogThresholdMs) {
          LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
              + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
        }
      } catch (IOException e) {
        handleMirrorOutError(e);
      }
    }
    ...
}

注意此方法监控的操作行为是发往Pipeline中最后一个节点数据包的耗时情况。我们进入trackSendPacketToLastNodeInPipeline方法内部,

  private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
    final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
    if (peerMetrics != null && isPenultimateNode) {
      peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
    }
  }

这里的mirrorNameForMetrics指的是与当前DataNode通信的节点。

“慢”节点报告的生成与汇报


有了这些数据之后,我们需要把这些数据信息以心跳的信息报告给NameNode。所以这里需要更改NameNode与DataNode之间的心跳报告的协议,增加一类报告信息的定义。关于心跳协议的改造,笔者在之前的一篇文章DataNode生命线消息一文中也涉及到一些。

我们直接定位到相应的方法,

  HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
      throws IOException {
    ...
    final long now = monotonicNow();
    scheduler.updateLastHeartbeatTime(now);
    VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
        .getVolumeFailureSummary();
    int numFailedVolumes = volumeFailureSummary != null ?
        volumeFailureSummary.getFailedStorageLocations().length : 0;
    // 判断此时是否已经到了发送慢节点报告的周期时间内
    final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now);
    // 利用metric统计值数据构造慢节点报告数据
    final SlowPeerReports slowPeers =
        slowPeersReportDue && dn.getPeerMetrics() != null ?
            SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
            SlowPeerReports.EMPTY_REPORT;
    HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getXceiverCount(),
        numFailedVolumes,
        volumeFailureSummary,
        requestBlockReportLease,
        // 将慢节点报告数据也加入到心跳信息中
        slowPeers);
    ...
  }

SlowPeerReports报告数据由DataNodePeerMetrics().getOutliers())所产生,在getOutliers方法内部,会对收集到的数据做一层简单的过滤。

  public Map<String, Double> getOutliers() {
    // 从滑动窗口中获取部分采集数据
    final Map<String, Double> stats =
        sendPacketDownstreamRollingAvgerages.getStats(
            MIN_OUTLIER_DETECTION_SAMPLES);
    LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
    // 对采集到的部分数据进行过滤出来
    return slowNodeDetector.getOutliers(stats);
  }

getOutliers方法如下:

  public Map<String, Double> getOutliers(Map<String, Double> stats) {
    ...
    // Compute the median absolute deviation of the aggregates.
    final List<Double> sorted = new ArrayList<>(stats.values());
    Collections.sort(sorted);
    final Double median = computeMedian(sorted);
    final Double mad = computeMad(sorted);
    // 计算延时的上限值,如果收集的耗时时间比此值还要大的话,则为慢节点
    Double upperLimitLatency = Math.max(
        lowThresholdMs, median * MEDIAN_MULTIPLIER);
    upperLimitLatency = Math.max(
        upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));

    final Map<String, Double> slowNodes = new HashMap<>();

    LOG.trace("getOutliers: List={}, MedianLatency={}, " +
        "MedianAbsoluteDeviation={}, upperLimitLatency={}",
        sorted, median, mad, upperLimitLatency);

    // 根据延时的上限值,选出慢节点
    for (Map.Entry<String, Double> entry : stats.entrySet()) {
      if (entry.getValue() > upperLimitLatency) {
        slowNodes.put(entry.getKey(), entry.getValue());
      }
    }
    // 返回慢节点列表
    return slowNodes;
  }

“慢”节点数据的展示


当这些“慢”节点数据成功被发送给NameNode之后,我们就可以将其暴露给用户,使得用户能方便地拿到这个数据,比如说通过jmx接口就能直接看到这个数据了。

那么我们是否需要暴露出所有收集到的数据呢?因为每个DataNode内部都会有一份自己的“慢”节点数据,如果NameNode都将其进行暴露,那么这个信息量绝对是不小的。还有一个问题,延时的报告信息需不需要?因为心跳有的时候会出现延时到达的情况。所以作者在这里做了以下2点限制。

  • 在对外生成慢节点报告时,对节点做数量的限制。
  • 对报告信息做规定延时时间的验证。

然后我们继续上面的分析,报告信息到了NameNode这边,会在DatanodeManager的handleHeartbeat被处理。

  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] reports, final String blockPoolId,
      long cacheCapacity, long cacheUsed, int xceiverCount, 
      int maxTransfers, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary,
      @Nonnull SlowPeerReports slowPeers) throws IOException {
    ...
    // 慢报告数据信息的处理
    if (slowPeerTracker != null) {
      // 获取慢节点报告信息
      final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
      if (!slowPeersMap.isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
              slowPeersMap);
        }
        // 将慢节点信息加入到SlowPeerTracker
        for (String slowNodeId : slowPeersMap.keySet()) {
          // 同上带上报告节点的ipc地址,表明当前节点与目标慢节点出现慢通信的情况
          slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
        }
      }
    }
    ...
    return new DatanodeCommand[0];
  }

经过上述方法的处理之后,这些报告信息会被汇总到SlowPeerTracker中,被维护在了下面的映射图中:

  // 组织关系为慢节点--><报告节点, 报告时间>
  private final ConcurrentMap<String, ConcurrentMap<String, Long>>
      allReports;

addReport方法的处理逻辑如下:

  public void addReport(String slowNode,
                        String reportingNode) {
    // 获取总报告信息中对应慢节点名称的map对象
    ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);

    if (nodeEntries == null) {
      // putIfAbsent guards against multiple writers.
      allReports.putIfAbsent(slowNode, new ConcurrentHashMap<>());
      nodeEntries = allReports.get(slowNode);
    }
    // 加入报告的节点,并更新报告时间
    nodeEntries.put(reportingNode, timer.monotonicNow());
  }

这里将时间更新的操作是为了过滤掉过期的报告数据。报告数据最终json字符串的方式来呈现。

  private Collection<ReportForJson> getJsonReports(int numNodes) {
    ...

    final PriorityQueue<ReportForJson> topNReports =
        ...
        });
    // 获取当前时间
    final long now = timer.monotonicNow();

    for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
        allReports.entrySet()) {
      // 以当前时间算起,过滤掉落后时间比较多的报告数据,得到慢节点列表
      SortedSet<String> validReports = filterNodeReports(
          entry.getValue(), now);
      // 加入慢节点报告数据到排好序的列表中
      // 如果当前队列中的报告数量还没到需要返回的节点报告数时,则直接添加
      if (!validReports.isEmpty()) {
        if (topNReports.size() < numNodes) {
          topNReports.add(new ReportForJson(entry.getKey(), validReports));
        } else if (topNReports.peek().getReportingNodes().size() <
            validReports.size()){
          // 否则比较当前队列头部报告数据中的报告节点数,如果包含的节点数量小于当前报告中的节点数,则进行取代并移除
          // Remove the lowest element
          topNReports.poll();
          topNReports.add(new ReportForJson(entry.getKey(), validReports));
        }
      }
    }
    return topNReports;
  }

getJsonReports方法最终会被NameNode的getSlowPeersReport方法所调用。

参考资料


[1].https://issues.apache.org/jira/browse/HDFS-11194
[2].https://issues.apache.org/jira/browse/HDFS-10917
[3].https://issues.apache.org/jira/secure/attachment/12849166/HDFS-11194.06.patch

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    HDFS“慢节点”监控分析功能