首页 > 代码库 > 【Flume】flume中FailoverSinkProcessor容错处理机制源码分析

【Flume】flume中FailoverSinkProcessor容错处理机制源码分析

FailoverSinkProcessor顾名思义是flume中sink输出容错的处理器

继承自AbstractSinkProcessor

先看下整体源码

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.flume.sink;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Sink.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * FailoverSinkProcessor maintains a prioritized list of sinks,
 * guarranteeing that so long as one is available events will be processed.
 *
 * The failover mechanism works by relegating failed sinks to a pool
 * where they are assigned a cooldown period, increasing with sequential
 * failures before they are retried. Once a sink succesfully sends an
 * event it is restored to the live pool.
 *
 * FailoverSinkProcessor is in no way thread safe and expects to be run via
 * SinkRunner Additionally, setSinks must be called before configure, and
 * additional sinks cannot be added while running
 *
 * To configure, set a sink groups processor to "failover" and set priorities
 * for individual sinks, all priorities must be unique. Furthermore, an
 * upper limit to failover time can be set(in miliseconds) using maxpenalty
 *
 * Ex)
 *
 * host1.sinkgroups = group1
 *
 * host1.sinkgroups.group1.sinks = sink1 sink2
 * host1.sinkgroups.group1.processor.type = failover
 * host1.sinkgroups.group1.processor.priority.sink1 = 5
 * host1.sinkgroups.group1.processor.priority.sink2 = 10
 * host1.sinkgroups.group1.processor.maxpenalty = 10000
 *
 */
public class FailoverSinkProcessor extends AbstractSinkProcessor {
  private static final int FAILURE_PENALTY = 1000;
  private static final int DEFAULT_MAX_PENALTY = 30000;

  private class FailedSink implements Comparable<FailedSink> {
    private Long refresh;
    private Integer priority;
    private Sink sink;
    private Integer sequentialFailures;
    public FailedSink(Integer priority, Sink sink, int seqFailures) {
      this.sink = sink;
      this.priority = priority;
      this.sequentialFailures = seqFailures;
      adjustRefresh();
    }
    @Override
    public int compareTo(FailedSink arg0) {
      return refresh.compareTo(arg0.refresh);
    }

    public Long getRefresh() {
      return refresh;
    }

    public Sink getSink() {
      return sink;
    }
    public Integer getPriority() {
      return priority;
    }
    public void incFails() {
      sequentialFailures++;
      adjustRefresh();
      logger.debug("Sink {} failed again, new refresh is at {}, " +
            "current time {}", new Object[] {
              sink.getName(), refresh, System.currentTimeMillis()});
    }
    private void adjustRefresh() {
      refresh = System.currentTimeMillis()
              + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
    }
  }

  private static final Logger logger = LoggerFactory
      .getLogger(FailoverSinkProcessor.class);

  private static final String PRIORITY_PREFIX = "priority.";
  private static final String MAX_PENALTY_PREFIX = "maxpenalty";
  private Map<String, Sink> sinks;
  private Sink activeSink;
  private SortedMap<Integer, Sink> liveSinks;
  private Queue<FailedSink> failedSinks;
  private int maxPenalty;

  @Override
  public void configure(Context context) {
    liveSinks = new TreeMap<Integer, Sink>();
    failedSinks = new PriorityQueue<FailedSink>();
    Integer nextPrio = 0;
    String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX);
    if(maxPenaltyStr == null) {
      maxPenalty = DEFAULT_MAX_PENALTY;
    } else {
      try {
        maxPenalty = Integer.parseInt(maxPenaltyStr);
      } catch (NumberFormatException e) {
        logger.warn("{} is not a valid value for {}",
                new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
        maxPenalty  = DEFAULT_MAX_PENALTY;
      }
    }
    for (Entry<String, Sink> entry : sinks.entrySet()) {
      String priStr = PRIORITY_PREFIX + entry.getKey();
      Integer priority;
      try {
        priority =  Integer.parseInt(context.getString(priStr));
      } catch (Exception e) {
        priority = --nextPrio;
      }
      if(!liveSinks.containsKey(priority)) {
        liveSinks.put(priority, sinks.get(entry.getKey()));
      } else {
        logger.warn("Sink {} not added to FailverSinkProcessor as priority" +
            "duplicates that of sink {}", entry.getKey(),
            liveSinks.get(priority));
      }
    }
    activeSink = liveSinks.get(liveSinks.lastKey());
  }

  @Override
  public Status process() throws EventDeliveryException {
    // Retry any failed sinks that have gone through their "cooldown" period
    Long now = System.currentTimeMillis();
    while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
      FailedSink cur = failedSinks.poll();
      Status s;
      try {
        s = cur.getSink().process();
        if (s  == Status.READY) {
          liveSinks.put(cur.getPriority(), cur.getSink());
          activeSink = liveSinks.get(liveSinks.lastKey());
          logger.debug("Sink {} was recovered from the fail list",
                  cur.getSink().getName());
        } else {
          // if it's a backoff it needn't be penalized.
          failedSinks.add(cur);
        }
        return s;
      } catch (Exception e) {
        cur.incFails();
        failedSinks.add(cur);
      }
    }

    Status ret = null;
    while(activeSink != null) {
      try {
        ret = activeSink.process();
        return ret;
      } catch (Exception e) {
        logger.warn("Sink {} failed and has been sent to failover list",
                activeSink.getName(), e);
        activeSink = moveActiveToDeadAndGetNext();
      }
    }

    throw new EventDeliveryException("All sinks failed to process, " +
        "nothing left to failover to");
  }

  private Sink moveActiveToDeadAndGetNext() {
    Integer key = liveSinks.lastKey();
    failedSinks.add(new FailedSink(key, activeSink, 1));
    liveSinks.remove(key);
    if(liveSinks.isEmpty()) return null;
    if(liveSinks.lastKey() != null) {
      return liveSinks.get(liveSinks.lastKey());
    } else {
      return null;
    }
  }

  @Override
  public void setSinks(List<Sink> sinks) {
    // needed to implement the start/stop functionality
    super.setSinks(sinks);

    this.sinks = new HashMap<String, Sink>();
    for (Sink sink : sinks) {
      this.sinks.put(sink.getName(), sink);
    }
  }

}

该类中有个内部类FailSink,是对失败的sink的定义

private Long refresh;
    private Integer priority;
    private Sink sink;
    private Integer sequentialFailures;
这是变量的定义

1、选定当前激活的sink的系统时间

2、sink优先级

3、具体sink

4、失败次数

public void incFails() {
      sequentialFailures++;
      adjustRefresh();
      logger.debug("Sink {} failed again, new refresh is at {}, " +
            "current time {}", new Object[] {
              sink.getName(), refresh, System.currentTimeMillis()});
    }
该方法是sink失败时触发的方法。
下面看主要的逻辑代码:

private static final String PRIORITY_PREFIX = "priority.";
  private static final String MAX_PENALTY_PREFIX = "maxpenalty";
  private Map<String, Sink> sinks;
  private Sink activeSink;
  private SortedMap<Integer, Sink> liveSinks;
  private Queue<FailedSink> failedSinks;
  private int maxPenalty;
变量定义。

 public void configure(Context context) {
    liveSinks = new TreeMap<Integer, Sink>();
    failedSinks = new PriorityQueue<FailedSink>();
    Integer nextPrio = 0;
    String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX);
    if(maxPenaltyStr == null) {
      maxPenalty = DEFAULT_MAX_PENALTY;
    } else {
      try {
        maxPenalty = Integer.parseInt(maxPenaltyStr);
      } catch (NumberFormatException e) {
        logger.warn("{} is not a valid value for {}",
                new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
        maxPenalty  = DEFAULT_MAX_PENALTY;
      }
    }
    for (Entry<String, Sink> entry : sinks.entrySet()) {
      String priStr = PRIORITY_PREFIX + entry.getKey();
      Integer priority;
      try {
        priority =  Integer.parseInt(context.getString(priStr));
      } catch (Exception e) {
        priority = --nextPrio;
      }
      if(!liveSinks.containsKey(priority)) {
        liveSinks.put(priority, sinks.get(entry.getKey()));
      } else {
        logger.warn("Sink {} not added to FailverSinkProcessor as priority" +
            "duplicates that of sink {}", entry.getKey(),
            liveSinks.get(priority));
      }
    }
    activeSink = liveSinks.get(liveSinks.lastKey());
  }
该方法主要是读取配置,并且初始化一些变量

1、liveSinks,failedSinks初始化为空的map和queue

2、读取maxpenalty

3、初始化sinks,这里通过setSinks方法初始化的,内部逻辑是读取conf配置文件的【具体过程可以查看源码AbstractConfigurationProvider.getConfiguration(),FlumeConfiguration.getConfigurationFor()一步步往后看就知道了】

4、初始化liveSinks赋值,将配置中的sink全部添加到liveSinks中
5、从liveSinks中选出最后一个sink作为当前激活状态的sink来处理数据输出

下面再来看具体的处理逻辑:

public Status process() throws EventDeliveryException {
    // Retry any failed sinks that have gone through their "cooldown" period
    Long now = System.currentTimeMillis();
    while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
      FailedSink cur = failedSinks.poll();
      Status s;
      try {
        s = cur.getSink().process();
        if (s  == Status.READY) {
          liveSinks.put(cur.getPriority(), cur.getSink());
          activeSink = liveSinks.get(liveSinks.lastKey());
          logger.debug("Sink {} was recovered from the fail list",
                  cur.getSink().getName());
        } else {
          // if it's a backoff it needn't be penalized.
          failedSinks.add(cur);
        }
        return s;
      } catch (Exception e) {
        cur.incFails();
        failedSinks.add(cur);
      }
    }

    Status ret = null;
    while(activeSink != null) {
      try {
        ret = activeSink.process();
        return ret;
      } catch (Exception e) {
        logger.warn("Sink {} failed and has been sent to failover list",
                activeSink.getName(), e);
        activeSink = moveActiveToDeadAndGetNext();
      }
    }

    throw new EventDeliveryException("All sinks failed to process, " +
        "nothing left to failover to");
  }
到目前为止failedSinks还是空的,所以优先执行后半部分代码

Status ret = null;
    while(activeSink != null) {
      try {
        ret = activeSink.process();
        return ret;
      } catch (Exception e) {
        logger.warn("Sink {} failed and has been sent to failover list",
                activeSink.getName(), e);
        activeSink = moveActiveToDeadAndGetNext();
      }
    }
1、当前激活状态的sink不为空

2、调用当前sink进行处理

3、如果处理发生异常,则将当前的sink添加到failedSinks中,并从liveSinks中删除

private Sink moveActiveToDeadAndGetNext() {
    Integer key = liveSinks.lastKey();
    failedSinks.add(new FailedSink(key, activeSink, 1));
    liveSinks.remove(key);
    if(liveSinks.isEmpty()) return null;
    if(liveSinks.lastKey() != null) {
      return liveSinks.get(liveSinks.lastKey());
    } else {
      return null;
    }
  }

4、返回一个可用的sink

如果出现了一次失败,再来看process中的前半部分代码的执行逻辑:

 Long now = System.currentTimeMillis();
    while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
      FailedSink cur = failedSinks.poll();
      Status s;
      try {
        s = cur.getSink().process();
        if (s  == Status.READY) {
          liveSinks.put(cur.getPriority(), cur.getSink());
          activeSink = liveSinks.get(liveSinks.lastKey());
          logger.debug("Sink {} was recovered from the fail list",
                  cur.getSink().getName());
        } else {
          // if it's a backoff it needn't be penalized.
          failedSinks.add(cur);
        }
        return s;
      } catch (Exception e) {
        cur.incFails();
        failedSinks.add(cur);
      }
    }

前提条件:failedSinks不为空且队头的sink的激活时间小于当前时间

1、poll出队列的头个FailedSInk

2、使用当前的sink处理,如果说处理成功了,则将该sink重新添加到liveSinks中,并将activeSinks赋值为当前sink

3、如果处理失败了,重新添加会failedSinks队列中

4、异常情况,则触发incFails(),同样重新添加会failedSinks队列中

以上这段逻辑是核心内容,也就是一个backoff的机制,如果说failedSinks队列中的sink可以继续处理了,我会回收使用,并不会去惩罚它


private void adjustRefresh() {
      refresh = System.currentTimeMillis()
              + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
    }
一个失败过的sink会不会被再次选中来处理,得看上面的条件refresh<now才行
也就是说失败一次后,必须等待【Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY)】这个时间后才会被再次选中执行。






【Flume】flume中FailoverSinkProcessor容错处理机制源码分析