首页 > 代码库 > Flink - metrics

Flink - metrics

 

Metrics是以MetricsGroup来组织的

MetricGroup

MetricGroup

这就是个metric容器,里面可以放subGroup,或者各种metric

所以主要的接口就是注册,

/** * A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups. *  * <p>Instances of this class can be used to register new metrics with Flink and to create a nested * hierarchy based on the group names. *  * <p>A MetricGroup is uniquely identified by it‘s place in the hierarchy and name. */public interface MetricGroup {    <C extends Counter> C counter(int name, C counter);    <T, G extends Gauge<T>> G gauge(int name, G gauge);    <H extends Histogram> H histogram(String name, H histogram);    MetricGroup addGroup(String name);}

 

AbstractMetricGroup

关键是实现MetricGroup,逻辑很简单,在注册或close的时候都需要加锁互斥

/** * Abstract {@link MetricGroup} that contains key functionality for adding metrics and groups. *  */ public abstract class AbstractMetricGroup implements MetricGroup {    /** The registry that this metrics group belongs to */    protected final MetricRegistry registry;    /** All metrics that are directly contained in this group */    private final Map<String, Metric> metrics = new HashMap<>();    /** All metric subgroups of this group */    private final Map<String, AbstractMetricGroup> groups = new HashMap<>();    /** The metrics scope represented by this group.     *  For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */    private final String[] scopeComponents;  //命名空间    /** The metrics scope represented by this group, as a concatenated string, lazily computed.     * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */    private String scopeString;    @Override    public <C extends Counter> C counter(String name, C counter) {        addMetric(name, counter);        return counter;    }        /**     * Adds the given metric to the group and registers it at the registry, if the group     * is not yet closed, and if no metric with the same name has been registered before.     *      * @param name the name to register the metric under     * @param metric the metric to register     */    protected void addMetric(String name, Metric metric) {        // add the metric only if the group is still open        synchronized (this) { //加锁            if (!closed) {                // immediately put without a ‘contains‘ check to optimize the common case (no collition)                // collisions are resolved later                Metric prior = metrics.put(name, metric);                // check for collisions with other metric names                if (prior == null) {                    // no other metric with this name yet                    registry.register(metric, name, this);                }                else {                    // we had a collision. put back the original value                    metrics.put(name, prior);                                    }            }        }    }}

 

MetricReporter

采集好的Metrics需要用reporter才能发送出去,

/** * Reporters are used to export {@link Metric Metrics} to an external backend. *  * <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a * public no-argument constructor. */public interface MetricReporter {    // ------------------------------------------------------------------------    //  life cycle    // ------------------------------------------------------------------------    /**     * Configures this reporter. Since reporters are instantiated generically and hence parameter-less,     * this method is the place where the reporters set their basic fields based on configuration values.     *      * <p>This method is always called first on a newly instantiated reporter.     *     * @param config The configuration with all parameters.     */    void open(MetricConfig config);    /**     * Closes this reporter. Should be used to close channels, streams and release resources.     */    void close();    void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);    void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);}

 

AbstractReporter实现MetricReport接口,

/** * Base interface for custom metric reporters. */public abstract class AbstractReporter implements MetricReporter, CharacterFilter {    protected final Logger log = LoggerFactory.getLogger(getClass());    protected final Map<Gauge<?>, String> gauges = new HashMap<>();    protected final Map<Counter, String> counters = new HashMap<>();    protected final Map<Histogram, String> histograms = new HashMap<>();    @Override    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {        final String name = group.getMetricIdentifier(metricName, this); //group只是用来获取metrics完整的name        synchronized (this) {            if (metric instanceof Counter) {                counters.put((Counter) metric, name);            } else if (metric instanceof Gauge) {                gauges.put((Gauge<?>) metric, name);            } else if (metric instanceof Histogram) {                histograms.put((Histogram) metric, name);            } else {                log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +                    "does not support this metric type.", metric.getClass().getName());            }        }    }    @Override    public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {        synchronized (this) {            if (metric instanceof Counter) {                counters.remove(metric);            } else if (metric instanceof Gauge) {                gauges.remove(metric);            } else if (metric instanceof Histogram) {                histograms.remove(metric);            } else {                log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +                    "does not support this metric type.", metric.getClass().getName());            }        }    }}

 

MetricRegistry

MetricRegistry用于连接MetricGroups和MetricReporters,

会把需要report的metric加到MetricReporters,并启动定时的report线程

/** * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}. */public class MetricRegistry {        private List<MetricReporter> reporters;    private ScheduledExecutorService executor;    private final ScopeFormats scopeFormats;    private final char delimiter;    /**     * Creates a new MetricRegistry and starts the configured reporter.     */    public MetricRegistry(Configuration config) {        // first parse the scope formats, these are needed for all reporters        ScopeFormats scopeFormats;        try {            scopeFormats = createScopeConfig(config);  //从配置中读到scope的格式,即监控数据的namespace的格式是什么        }        catch (Exception e) {            LOG.warn("Failed to parse scope format, using default scope formats", e);            scopeFormats = new ScopeFormats();        }        this.scopeFormats = scopeFormats;        char delim;        try {            delim = config.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);  //从配置里面读出分隔符        } catch (Exception e) {            LOG.warn("Failed to parse delimiter, using default delimiter.", e);            delim = ‘.‘;        }        this.delimiter = delim;        // second, instantiate any custom configured reporters        this.reporters = new ArrayList<>();        final String definedReporters = config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);  //读出配置的Reporters        if (definedReporters == null) {            // no reporters defined            // by default, don‘t report anything            LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");            this.executor = null;        } else {            // we have some reporters so            String[] namedReporters = definedReporters.split("\\s*,\\s*");            for (String namedReporter : namedReporters) {  //对于配置的每个reporter                DelegatingConfiguration reporterConfig = new DelegatingConfiguration(config, ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + ".");                final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);  //reporter class名配置                try {                    String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); //report interval配置                    TimeUnit timeunit = TimeUnit.SECONDS;                    long period = 10;                    if (configuredPeriod != null) {                        try {                            String[] interval = configuredPeriod.split(" ");                            period = Long.parseLong(interval[0]);                            timeunit = TimeUnit.valueOf(interval[1]);                        }                        catch (Exception e) {                            LOG.error("Cannot parse report interval from config: " + configuredPeriod +                                    " - please use values like ‘10 SECONDS‘ or ‘500 MILLISECONDS‘. " +                                    "Using default reporting interval.");                        }                    }                    Class<?> reporterClass = Class.forName(className);                    MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance(); //实例化reporter                    MetricConfig metricConfig = new MetricConfig();                    reporterConfig.addAllToProperties(metricConfig);                    reporterInstance.open(metricConfig);  //open reporter                    if (reporterInstance instanceof Scheduled) {                        if (this.executor == null) {                            executor = Executors.newSingleThreadScheduledExecutor(); //创建Executor                        }                        LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);                        executor.scheduleWithFixedDelay(                                new ReporterTask((Scheduled) reporterInstance), period, period, timeunit); //Scheduled report                    }                    reporters.add(reporterInstance); //加入reporters列表                }                catch (Throwable t) {                    shutdownExecutor();                    LOG.error("Could not instantiate metrics reporter" + namedReporter + ". Metrics might not be exposed/reported.", t);                }            }        }    }    // ------------------------------------------------------------------------    //  Metrics (de)registration    // ------------------------------------------------------------------------    /**     * Registers a new {@link Metric} with this registry.     *     * @param metric      the metric that was added     * @param metricName  the name of the metric     * @param group       the group that contains the metric     */    public void register(Metric metric, String metricName, MetricGroup group) { //在AbstractMetricGroup.addMetric中被调用,metric被加到group的同时也会加到reporter中
        try {            if (reporters != null) {                for (MetricReporter reporter : reporters) {                    if (reporter != null) {                        reporter.notifyOfAddedMetric(metric, metricName, group); //把metric加到每个reporters上面                    }                }            }        } catch (Exception e) {            LOG.error("Error while registering metric.", e);        }    }    /**     * Un-registers the given {@link org.apache.flink.metrics.Metric} with this registry.     *     * @param metric      the metric that should be removed     * @param metricName  the name of the metric     * @param group       the group that contains the metric     */    public void unregister(Metric metric, String metricName, MetricGroup group) {        try {            if (reporters != null) {                for (MetricReporter reporter : reporters) {                    if (reporter != null) {                        reporter.notifyOfRemovedMetric(metric, metricName, group);                    }                }            }        } catch (Exception e) {            LOG.error("Error while registering metric.", e);        }    }    // ------------------------------------------------------------------------    /**     * This task is explicitly a static class, so that it does not hold any references to the enclosing     * MetricsRegistry instance.     *     * This is a subtle difference, but very important: With this static class, the enclosing class instance     * may become garbage-collectible, whereas with an anonymous inner class, the timer thread     * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer.     * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible,     * which acts as a fail-safe to stop the timer thread and prevents resource leaks.     */    private static final class ReporterTask extends TimerTask {        private final Scheduled reporter;        private ReporterTask(Scheduled reporter) {            this.reporter = reporter;        }        @Override        public void run() {            try {                reporter.report();  //Task的核心就是调用reporter.report            } catch (Throwable t) {                LOG.warn("Error while reporting metrics", t);            }        }    }}

 

TaskManager

在TaskManager中,

associateWithJobManager
metricsRegistry = new FlinkMetricRegistry(config.configuration)taskManagerMetricGroup =   new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)TaskManager.instantiateStatusMetrics(taskManagerMetricGroup)

创建metricsRegistry 和TaskManagerMetricGroup

可以看到instantiateStatusMetrics,只是注册各种taskManager的status metrics,

private def instantiateStatusMetrics(taskManagerMetricGroup: MetricGroup) : Unit = {    val jvm = taskManagerMetricGroup      .addGroup("Status")      .addGroup("JVM")    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))    instantiateMemoryMetrics(jvm.addGroup("Memory"))    instantiateThreadMetrics(jvm.addGroup("Threads"))    instantiateCPUMetrics(jvm.addGroup("CPU"))  }  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {    val mxBean = ManagementFactory.getClassLoadingMXBean //从ManagementFactory可以取出表示JVM指标的MXBean    metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {      override def getValue: Long = mxBean.getTotalLoadedClassCount    })    metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {      override def getValue: Long = mxBean.getUnloadedClassCount    })  }

 

在submitTask的时候,

submitTask
  val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd)  val task = new Task(    tdd,    memoryManager,    ioManager,    network,    bcVarManager,    selfGateway,    jobManagerGateway,    config.timeout,    libCache,    fileCache,    runtimeInfo,    taskMetricGroup)

看到会为每个task,创建taskMetricGroup

并在创建Task对象的时候传入该对象,

Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,        executionConfig, taskInfo, jobConfiguration, taskConfiguration,        userCodeClassLoader, memoryManager, ioManager,        broadcastVariableManager, accumulatorRegistry,        splitProvider, distributedCacheEntries,        writers, inputGates, jobManager, taskManagerConfig, metrics, this);// let the task code create its readers and writersinvokable.setEnvironment(env);

在Task中, 关键的就是把这个taskMetricGroup,加入RuntimeEnvironment,这样在实际逻辑中,就可以通过RuntimeEnvironment获取到metrics

Flink - metrics