首页 > 代码库 > Flume sink 相关内容
Flume sink 相关内容
SinkRunner.java 开启线程调用相应的Processor(Policy) , 根据 Policy调用process的返回值来决定线程睡眠时间,每次默认延后1s,最大默认为5s。
public class SinkRunner implements LifecycleAware { private static final Logger logger = LoggerFactory .getLogger(SinkRunner.class); private static final long backoffSleepIncrement = 1000; private static final long maxBackoffSleep = 5000;
} @Override public void start() { SinkProcessor policy = getPolicy(); policy.start(); runner = new PollingRunner(); runner.policy = policy; runner.counterGroup = counterGroup; runner.shouldStop = new AtomicBoolean(); runnerThread = new Thread(runner); runnerThread.setName("SinkRunner-PollingRunner-" + policy.getClass().getSimpleName()); runnerThread.start(); lifecycleState = LifecycleState.START; } public static class PollingRunner implements Runnable { private SinkProcessor policy; private AtomicBoolean shouldStop; private CounterGroup counterGroup; @Override public void run() { logger.debug("Polling sink runner starting"); while (!shouldStop.get()) { try { if (policy.process().equals(Sink.Status.BACKOFF)) { counterGroup.incrementAndGet("runner.backoffs"); Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * backoffSleepIncrement, maxBackoffSleep)); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } catch (InterruptedException e) { logger.debug("Interrupted while processing an event. Exiting."); counterGroup.incrementAndGet("runner.interruptions"); } catch (Exception e) { logger.error("Unable to deliver event. Exception follows.", e); if (e instanceof EventDeliveryException) { counterGroup.incrementAndGet("runner.deliveryErrors"); } else { counterGroup.incrementAndGet("runner.errors"); } try { Thread.sleep(maxBackoffSleep); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } logger.debug("Polling runner exiting. Metrics:{}", counterGroup); } }
HDFSEentSink.java 被SinkRunner 作为processor 使用,调用process来进行处理。维护一个bucketWriter的集合,根据event获得对应的BucketWriter. 并调用他们的append方法。
之后对所有的这次process过程中有任务的bucketWriter 即在writers集合中 遍历,调用他们的flush()。如果Event 数量小于1,那么会返回BACKOFF 状态。sinkRunner会对其进行相应的backoff操作,默认多睡眠1秒。
除此之外还要保证transaction的完整性(未深入阅读)
public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); List<BucketWriter> writers = Lists.newArrayList(); transaction.begin(); try { int txnEventCount = 0; for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { Event event = channel.take(); if (event == null) { break; } // reconstruct the path name by substituting place holders String realPath = BucketPath.escapeString(filePath, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue, useLocalTime); String realName = BucketPath.escapeString(fileName, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue, useLocalTime); String lookupPath = realPath + DIRECTORY_DELIMITER + realName; BucketWriter bucketWriter; HDFSWriter hdfsWriter = null; // Callback to remove the reference to the bucket writer from the // sfWriters map so that all buffers used by the HDFS file // handles are garbage collected. WriterCallback closeCallback = new WriterCallback() { @Override public void run(String bucketPath) { LOG.info("Writer callback called."); synchronized (sfWritersLock) { sfWriters.remove(bucketPath); } } }; synchronized (sfWritersLock) { bucketWriter = sfWriters.get(lookupPath); // we haven‘t seen this file yet, so open it and cache the handle if (bucketWriter == null) { hdfsWriter = writerFactory.getWriter(fileType); bucketWriter = initializeBucketWriter(realPath, realName, lookupPath, hdfsWriter, closeCallback); sfWriters.put(lookupPath, bucketWriter); } } // track the buckets getting written in this transaction if (!writers.contains(bucketWriter)) { writers.add(bucketWriter); } // Write the data to HDFS try { bucketWriter.append(event); } catch (BucketClosedException ex) { LOG.info("Bucket was closed while trying to append, " + "reinitializing bucket and writing event."); hdfsWriter = writerFactory.getWriter(fileType); bucketWriter = initializeBucketWriter(realPath, realName, lookupPath, hdfsWriter, closeCallback); synchronized (sfWritersLock) { sfWriters.put(lookupPath, bucketWriter); } bucketWriter.append(event); } } if (txnEventCount == 0) { sinkCounter.incrementBatchEmptyCount(); } else if (txnEventCount == batchSize) { sinkCounter.incrementBatchCompleteCount(); } else { sinkCounter.incrementBatchUnderflowCount(); } // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { bucketWriter.flush(); } transaction.commit(); if (txnEventCount < 1) { return Status.BACKOFF; } else { sinkCounter.addToEventDrainSuccessCount(txnEventCount); return Status.READY; } } catch (IOException eIO) { transaction.rollback(); LOG.warn("HDFS IO error", eIO); return Status.BACKOFF; } catch (Throwable th) { transaction.rollback(); LOG.error("process failed", th); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { transaction.close(); } }
BucketWriter.java 将对应的文件写到HDFS上,并负责监控是否应该进行rotate操作(count,size,time)等因素。只列出了append函数。
/** * Open file handles, write data, update stats, handle file rolling and * batching / flushing. <br /> * If the write fails, the file is implicitly closed and then the IOException * is rethrown. <br /> * We rotate before append, and not after, so that the active file rolling * mechanism will never roll an empty file. This also ensures that the file * creation time reflects when the first event was written. * * @throws IOException * @throws InterruptedException */ public synchronized void append(final Event event) throws IOException, InterruptedException { checkAndThrowInterruptedException(); // If idleFuture is not null, cancel it before we move forward to avoid a // close call in the middle of the append. if(idleFuture != null) { idleFuture.cancel(false); // There is still a small race condition - if the idleFuture is already // running, interrupting it can cause HDFS close operation to throw - // so we cannot interrupt it while running. If the future could not be // cancelled, it is already running - wait for it to finish before // attempting to write. if(!idleFuture.isDone()) { try { idleFuture.get(callTimeout, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { LOG.warn("Timeout while trying to cancel closing of idle file. Idle" + " file close may have failed", ex); } catch (Exception ex) { LOG.warn("Error while trying to cancel closing of idle file. ", ex); } } idleFuture = null; } // If the bucket writer was closed due to roll timeout or idle timeout, // force a new bucket writer to be created. Roll count and roll size will // just reuse this one if (!isOpen) { if (closed) { throw new BucketClosedException("This bucket writer was closed and " + "this handle is thus no longer valid"); } open(); } // check if it‘s time to rotate the file if (shouldRotate()) { boolean doRotate = true; if (isUnderReplicated) { if (maxConsecUnderReplRotations > 0 && consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) { doRotate = false; if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) { LOG.error("Hit max consecutive under-replication rotations ({}); " + "will not continue rolling files under this path due to " + "under-replication", maxConsecUnderReplRotations); } } else { LOG.warn("Block Under-replication detected. Rotating file."); } consecutiveUnderReplRotateCount++; } else { consecutiveUnderReplRotateCount = 0; } if (doRotate) { close(); open(); } } // write the event try { sinkCounter.incrementEventDrainAttemptCount(); callWithTimeout(new CallRunner<Void>() { @Override public Void call() throws Exception { writer.append(event); // could block return null; } }); } catch (IOException e) { LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + bucketPath + ") and rethrowing exception.", e.getMessage()); try { close(true); } catch (IOException e2) { LOG.warn("Caught IOException while closing file (" + bucketPath + "). Exception follows.", e2); } throw e; } // update statistics processSize += event.getBody().length; eventCounter++; batchCounter++; if (batchCounter == batchSize) { flush(); } }
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。