首页 > 代码库 > Flume-NG源码阅读之HDFSEventSink
Flume-NG源码阅读之HDFSEventSink
HDFSEventSink是flume中一个很重要的sink,配置文件中type=hdfs。与此sink相关的类都在org.apache.flume.sink.hdfs包中。
HDFSEventSink算是一个比较复杂的sink,包下涉及的源代码文件数多达13个。。。可配置的参数众多。。。希望我能讲清楚。
一、首先依然是看configure(Context context)方法,用来获取配置文件中的配置信息,及初始化一些重要参数
1 public void configure(Context context) { 2 this.context = context; 3 //HDFS目录路径,必需(eg hdfs://namenode/flume/webdata/) 4 filePath = Preconditions.checkNotNull( 5 context.getString("hdfs.path"), "hdfs.path is required"); 6 //在Hdfs目录中生成的文件名字的前缀 7 fileName = context.getString("hdfs.filePrefix", defaultFileName); 8 //文件后缀,例如.avro,一般不用 9 this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix); 10 //内部写文件的时候表示正在写的文件的前缀和后缀 11 inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix); 12 inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);//默认是.tmp 13 String tzName = context.getString("hdfs.timeZone"); 14 timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName); 15 //当前写入的文件滚动间隔,默认30秒生成一个新的文件,0表示不基于时间间隔来滚动 16 rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval); 17 //以文件大小触发文件滚动,单位字节,0表示不基于文件大小间隔来滚动 18 rollSize = context.getLong("hdfs.rollSize", defaultRollSize); 19 //以写入的事件数触发文件滚动, 0表示不基于事件数大小间隔来滚动 20 rollCount = context.getLong("hdfs.rollCount", defaultRollCount); 21 //事件刷新到HDFS之前的数量 22 batchSize = context.getLong("hdfs.batchSize", defaultBatchSize); 23 //超时后关闭无效文件(0 =禁止自动关闭闲置的文件) 24 idleTimeout = context.getInteger("hdfs.idleTimeout", 0); 25 //压缩编码类型. one of following : gzip, bzip2, lzo, snappy 26 String codecName = context.getString("hdfs.codeC"); 27 //文件格式:当前为SequenceFile, DataStream or CompressedStream。 28 //(1)DataStream不压缩输出文件,不能设置codeC选项,(2)CompressedStream需要设置hdfs.codeC的一个可用的编解码器 29 fileType = context.getString("hdfs.fileType", defaultFileType); 30 //允许打开的文件数。如果超过这个数字,最早的文件被关闭。 31 maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles); 32 //HDFS的操作允许的毫秒数,如打开,写,刷新,关闭。这个数字应该增加,如果正在发生许多HDFS超时操作。 33 callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout); 34 //每个HDFS sink用于HDFS io操作的线程数,如打开、写入等操作。 35 threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", 36 defaultThreadPoolSize); 37 //每个HDFS sink用于调度定时文件滚动的线程数 38 rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize", 39 defaultRollTimerPoolSize); 40 //安全认证时使用Kerberos user principal for accessing secure HDFS 41 kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", ""); 42 //安全认证时使用Kerberos keytab for accessing secure HDFS 43 kerbKeytab = context.getString("hdfs.kerberosKeytab", ""); 44 proxyUserName = context.getString("hdfs.proxyUser", ""); //代理用户 45 46 Preconditions.checkArgument(batchSize > 0, 47 "batchSize must be greater than 0"); 48 if (codecName == null) { //不压缩数据 49 codeC = null; 50 compType = CompressionType.NONE; 51 } else { //压缩数据 52 codeC = getCodec(codecName); 53 // TODO : set proper compression type 54 compType = CompressionType.BLOCK; 55 } 56 57 // Do not allow user to set fileType DataStream with codeC together 58 // To prevent output file with compress extension (like .snappy) 59 if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)//如果fileType是DataStream,则不允许压缩 60 && codecName != null) { 61 throw new IllegalArgumentException("fileType: " + fileType + 62 " which does NOT support compressed output. Please don‘t set codeC" + 63 " or change the fileType if compressed output is desired."); 64 } 65 66 if(fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {//如果fileType是压缩类型,则codeC不允许为空 67 Preconditions.checkNotNull(codeC, "It‘s essential to set compress codec" 68 + " when fileType is: " + fileType); 69 } 70 71 if (!authenticate()) { //认证 72 LOG.error("Failed to authenticate!"); 73 } 74 //时间戳是否四舍五入(如果为true,会影响所有基于时间的转义序列%t除外) 75 needRounding = context.getBoolean("hdfs.round", false); 76 77 if(needRounding) { 78 //The unit of the round down value - second, minute or hour. 79 String unit = context.getString("hdfs.roundUnit", "second"); //滚动时间单位 80 if (unit.equalsIgnoreCase("hour")) { 81 this.roundUnit = Calendar.HOUR_OF_DAY; 82 } else if (unit.equalsIgnoreCase("minute")) { 83 this.roundUnit = Calendar.MINUTE; 84 } else if (unit.equalsIgnoreCase("second")){ 85 this.roundUnit = Calendar.SECOND; 86 } else { 87 LOG.warn("Rounding unit is not valid, please set one of" + 88 "minute, hour, or second. Rounding will be disabled"); 89 needRounding = false; 90 } 91 //Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. 92 this.roundValue = http://www.mamicode.com/context.getInteger("hdfs.roundValue", 1); //滚动时间大小 93 if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){//检查是否符合分、秒数值,0<v<=60 94 Preconditions.checkArgument(roundValue > 0 && roundValue <= 60, 95 "Round value" + 96 "must be > 0 and <= 60"); 97 } else if (roundUnit == Calendar.HOUR_OF_DAY){ 98 Preconditions.checkArgument(roundValue > 0 && roundValue <= 24, //检查是否符合时数值0<v<=24 99 "Round value" + 100 "must be > 0 and <= 24"); 101 } 102 } 103 104 if (sinkCounter == null) {//构造计数器 105 sinkCounter = new SinkCounter(getName()); 106 } 107 }
上面比较常用的参数有:rollInterval以固定时间间隔滚动文件,rollSize以文件大小为单位滚动文件,rollCount以行数来滚动文件,fileType(有3种SequenceFile(二进制)、DataStream(不能压缩)、CompressedStream(压缩文件))
二、接下来是start()方法。
1 public void start() { 2 String timeoutName = "hdfs-" + getName() + "-call-runner-%d"; 3 callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize, 4 new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); //这个线程池用来将event写入HDFS文件 5 6 String rollerName = "hdfs-" + getName() + "-roll-timer-%d"; 7 timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize, 8 new ThreadFactoryBuilder().setNameFormat(rollerName).build()); //这个线程池用来滚动文件 9 10 this.sfWriters = new WriterLinkedHashMap(maxOpenFiles); //用来存储文件的绝对路径以及对应的BucketWriter 11 sinkCounter.start(); 12 super.start(); 13 }
start方法主要是初始化两个线程池。
三、process()方法,是用来处理channel中的event的,非线程安全的,要确保HDFS中的文件是打开的。
1 public Status process() throws EventDeliveryException { 2 Channel channel = getChannel(); //获取对应的channel 3 Transaction transaction = channel.getTransaction();//获得事务 4 List<BucketWriter> writers = Lists.newArrayList(); //BucketWriter列表 5 transaction.begin(); 6 try { 7 int txnEventCount = 0; 8 for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {//批量处理 9 Event event = channel.take(); //获取event 10 if (event == null) { 11 break; 12 } 13 14 // reconstruct the path name by substituting place holders 15 String realPath = BucketPath.escapeString(filePath, event.getHeaders(), 16 timeZone, needRounding, roundUnit, roundValue); //格式化后的HDFS目录 17 String realName = BucketPath.escapeString(fileName, event.getHeaders(), 18 timeZone, needRounding, roundUnit, roundValue); //格式化后的文件名 19 20 String lookupPath = realPath + DIRECTORY_DELIMITER + realName; //要写入的文件的HDFS绝对路径 21 BucketWriter bucketWriter = sfWriters.get(lookupPath); //获取文件的BucketWriter 22 23 // we haven‘t seen this file yet, so open it and cache the handle 24 if (bucketWriter == null) { //如果没有这个文件 25 //根据fileType类型构造HDFSWriter(三种:SequenceFile、DataStream、CompressedStream) 26 HDFSWriter hdfsWriter = writerFactory.getWriter(fileType); 27 28 WriterCallback idleCallback = null; 29 if(idleTimeout != 0) { 30 idleCallback = new WriterCallback() { 31 @Override 32 public void run(String bucketPath) { 33 sfWriters.remove(bucketPath); 34 } 35 }; 36 } 37 bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, 38 batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, 39 suffix, codeC, compType, hdfsWriter, timedRollerPool, 40 proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath); 41 42 sfWriters.put(lookupPath, bucketWriter); //将文件路径和BucketWriter组成K-V,放入sfWriters 43 } 44 45 // track the buckets getting written in this transaction 46 if (!writers.contains(bucketWriter)) {//如果BucketWriter列表没有正在写的文件——bucketWriter,则加入 47 writers.add(bucketWriter); 48 } 49 50 // Write the data to HDFS 51 append(bucketWriter, event); //将event写入bucketWriter对应的文件中 52 } 53 54 if (txnEventCount == 0) { //这次事务没有处理任何event 55 sinkCounter.incrementBatchEmptyCount(); 56 } else if (txnEventCount == batchSize) {//一次处理batchSize个event 57 sinkCounter.incrementBatchCompleteCount(); 58 } else {//channel中剩余的events不足batchSize 59 sinkCounter.incrementBatchUnderflowCount(); 60 } 61 62 // flush all pending buckets before committing the transaction 63 for (BucketWriter bucketWriter : writers) { //将所有BucketWriter数据刷新到HDFS中 64 flush(bucketWriter); 65 } 66 67 transaction.commit(); //提交事务 68 69 if (txnEventCount < 1) { 70 return Status.BACKOFF; 71 } else { 72 sinkCounter.addToEventDrainSuccessCount(txnEventCount); 73 return Status.READY; 74 } 75 } catch (IOException eIO) { 76 transaction.rollback();//异常后回滚 77 LOG.warn("HDFS IO error", eIO); 78 return Status.BACKOFF; 79 } catch (Throwable th) { 80 transaction.rollback();//异常后回滚 81 LOG.error("process failed", th); 82 if (th instanceof Error) { 83 throw (Error) th; 84 } else { 85 throw new EventDeliveryException(th); 86 } 87 } finally { 88 transaction.close();//关闭事务 89 } 90 }
1、获取sink的channel和transaction,transaction.begin()是必要的步骤;
2、循环处理批量的event,如果event==null,说明channel已无数据,则退出循环;
3、realPath和realName都是格式化后的文件HDFS存储路径及文件名;lookupPath则是要写入的文件完整HDFS路径(目录+文件名);获取该文件对应的BucketWriter对象,要写入的文件及对应的BucketWriter对象需要存入sfWriters这个LinkedHashMap结构中,表示正在写的文件,BucketWriter类用来滚动文件、处理文件格式以及数据的序列化等操作,其实就是负责数据的写的;
4、如果文件对应的bucketWriter不存在,则文件需要滚动,创建一个BucketWriter对象,只有public方法才是线程安全的。
创建BucketWriter对象之前需要先构建一个HDFSWriter对象负责写文件,有三种类型:HDFSSequenceFile、HDFSDataStream、HDFSCompressedDataStream。
WriterCallback idleCallback是用来超时后滚动文件的时候调用的,前提得是配置文件中有配置hdfs.idleTimeout且不为0;
然后是new 一个BucketWriter对象,这有点复杂稍后说;
sfWriters.put(lookupPath, bucketWriter)然后就是将文件及对应的bucketWriter对象存入sfWriters中,表示正在写的文件。
5、这里要说下new BucketWriter对象的事。BucketWriter的构造函数首先是对众多参数赋值,然后isOpen = false,最后是this.writer.configure(context),即对writer对象进行配置。复杂就在这,这个writer对象是什么?它是上面4中所说的HDFSWriter。
HDFSWriterFactory工厂类会根据配置文件中设置的类型返回相应的HDFSWriter对象,没有配置文件类型的话默认是HDFSSequenceFile。
HDFSSequenceFile:configure(context)方法会首先获取写入格式writeFormat即参数"hdfs.writeFormat",默认格式是二进制的Writable(HDFSWritableSerializer.Builder.class),还有一个是Text(HDFSTextSerializer.Builder.class),第三个是null;再获取是否使用HDFS本地文件系统"hdfs.useRawLocalFileSystem",默认是flase不使用;然后获取writeFormat的所有配置信息serializerContext;然后根据writeFormat和serializerContext构造SequenceFileSerializer的对象serializer。在serializer中并无serializerContext配置的方法,在1.3.0中此处的serializerContext没有任何作用,可能是为以后做的预留。
HDFSDataStream:configure(context)方法先获取serializerType类型,默认是TEXT(BodyTextEventSerializer.Builder.class),此外还有HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class)、OTHER(null)、AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class)共四种类型;再获取是否使用HDFS本地文件系统"hdfs.useRawLocalFileSystem",默认是flase不使用;然后获取serializer的所有配置信息serializerContext。serializer的实例化是在HDFSDataStream.open(String filePath)方法中实现的。此处的serializerContext在BodyTextEventSerializer和HeaderAndBodyTextEventSerializer均未用到,可能是做预留,但是FlumeEventAvroEventSerializer在其Builder中用到了,并进行了配置。
HDFSCompressedDataStream:configure(context)方法和HDFSDataStream.configure(context)是一样的,serializerType的类型是一样的;其他也是一样。serializer的实例化是在HDFSCompressedDataStream.open(String filePath)方法中实现的,调用open(String filePath, CompressionCodec codec,CompressionType cType)来实例化。
6、如果存储着正在写的bucketWriter的writers列表中没有此bucketWriter,则添加进去,writers的存在是为了统一flush方便,后面会有介绍。
7、append(bucketWriter, event)这个是让bucketWriter处理event的方法,会使用bucketWriter.append(event)处理。这个方法的代码如下:
1 public synchronized void append(Event event) 2 throws IOException, InterruptedException { 3 checkAndThrowInterruptedException(); 4 if (!isOpen) { 5 if(idleClosed) { 6 throw new IOException("This bucket writer was closed due to idling and this handle " + 7 "is thus no longer valid"); 8 } 9 open();//已经写完一个文件,新建新文件 10 } 11 12 // check if it‘s time to rotate the file 13 if (shouldRotate()) {//检查行数、大小是否改完成一个文件 14 close(); 15 open();//新建新文件 16 } 17 18 // write the event 19 try { 20 sinkCounter.incrementEventDrainAttemptCount(); 21 writer.append(event); // could block写数据 22 } catch (IOException e) { 23 LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + 24 bucketPath + ") and rethrowing exception.", 25 e.getMessage()); 26 try { 27 close(); 28 } catch (IOException e2) { 29 LOG.warn("Caught IOException while closing file (" + 30 bucketPath + "). Exception follows.", e2); 31 } 32 throw e; 33 } 34 35 // update statistics 36 processSize += event.getBody().length; 37 eventCounter++; 38 batchCounter++; 39 40 if (batchCounter == batchSize) { 41 flush(); 42 } 43 }
A、首先会检查当前线程是否中断checkAndThrowInterruptedException();
B、BucketWriter初次运行时,isOpen=false(表示文件未打开不能写),idleClosed=false,会运行open()——doOpen()。fullFileName是"前缀.时间戳"组成的文件名,从这也可以看出时间戳部分不能更改,也就是HDFS中文件名无法自定义,除非自己定制HDFSSink;另外后缀名和压缩不能同时兼得,即如果没有配置压缩则可以在fullFileName后面添加自定义的后缀(比如后缀为.avro),否则只能添加压缩类型的后缀;bucketPath表示在HDFS中正在写的文件完整名字,这个名字中有标示正在写的文件的前后缀(inUsePrefix、inUseSuffix);targetPath这个是文件写完后的要更改成的完整名字,和bucketPath的区别是没有inUsePrefix、inUseSuffix;然后是根据有无压缩配置信息open此witer,没有压缩:writer.open(bucketPath),有压缩:writer.open(bucketPath, codeC, compType)。需要注意的是当使用Kerberos时,hadoop的RPC操作是非线程安全的包括getFileSystem()操作,open()操作在同一个JVM的同一时刻只能由一个线程使用,因为有可能导致死锁,见FLUME-1231。所以对open进行了同步。另外当在运行flume过程中出现类似异常“java.io.IOException: Callable timed out after 10000 ms on file”和"java.util.concurrent.TimeoutException"时,需要在这个方法上面config.set("dfs.socket.timeout", "3600000")增加超时时间,参考http://blog.csdn.net/yangbutao/article/details/8845025
writer包含的三类均有两个open方法,一个是对应不压缩的open(String filePath) ,一个是对应压缩的open(String filePath, CompressionCodec codec,CompressionType cType)。
首先writer若为HDFSSequenceFile,是支持压缩的,open(String filePath)会调用open(filePath, null, CompressionType.NONE)压缩方法,只不过没有压缩类型。压缩open方法先判断是否使用了本地文件系统,然后根据hadoop的配置信息是否支持追加"hdfs.append.support",构造相应的SequenceFile即writer。其中的serializer若为HDFSWritableSerializer则writer的Key为LongWritable类型,Value为BytesWritable二进制类型;若为HDFSTextSerializer,writer的Key为LongWritable类型,Value为Text文本类型。
其次writer若为HDFSDataStream,是不支持压缩的。它的压缩方法open(String filePath, CompressionCodec codec,CompressionType cType)直接调用非压缩方法open(filePath)。open(filePath)判断是否使用了本地文件系统;然后根据是否支持append操作(获取配置的"hdfs.append.support"参数),构造对应的输出流outStream;然后构造serializer,有三种类型BodyTextEventSerializer、HeaderAndBodyTextEventSerializer、FlumeEventAvroEventSerializer,前两种支持追加,最后一种不支持追加,所以FlumeEventAvroEventSerializer不能将"hdfs.append.support"设置为true。如果支持追加就执行serializer.afterReopen()前两种serializer未实现这个方法(1.3.0),不支持就serializer.afterCreate()前两种也未实现这个方法,第三种则是dataFileWriter.create(getSchema(), getOutputStream())。
最后writer若为HDFSCompressedDataStream,就是针对压缩的,其open(String filePath)会使用默认的DefaultCodec以及CompressionType.BLOCK来调用压缩open(String filePath, CompressionCodec codec,CompressionType cType)。压缩方法和HDFSDataStream的压缩方法类似,区别有两点一个是serializer的输出流变成压缩输出流了;另一个就是最后加了isFinished = false表示压缩流是否完毕。
回到BucketWriter,如果rollInterval(按时间滚动文件)不为0,则创建一个Callable,放入timedRollFuture中rollInterval秒之后关闭文件,默认是30s写一个文件,这只是控制文件滚动的3个条件之一;
isOpen = true表示文件已打开,可以write了。
C、回到上面7中,shouldRotate()方法会判断文件中的行数和文件的大小是否达到配置文件中的配置,如果任何一个满足条件则可以关闭文件,这是控制文件滚动的3个条件中的两个。close()方法会关闭文件,再清理俩线程池及一些其他的清理工作,及改名(将.tmp文件改名),再open()就又到了上面B中所说的。
D、writer.append(event)这是向HDFS中写数据的地方。这里又要分很多讨论了,因为writer有三类。
writer为HDFSSequenceFile:append(event)方法,会先通过serializer.serialize(e)把event处理成一个Key和一个Value。
(1)serializer为HDFSWritableSerializer时,则Key会是event.getHeaders().get("timestamp"),如果没有"timestamp"的Headers则使用当前系统时间System.currentTimeMillis(),然后将时间封装成LongWritable;Value是将event.getBody()封装成BytesWritable,代码是bytesObject.set(e.getBody(), 0, e.getBody().length);
(2)serializer为HDFSTextSerializer时,Key和上述HDFSWritableSerializer一样;Value会将event.getBody()封装成Text,代码是textObject.set(e.getBody(), 0, e.getBody().length)。
writer.append(event)中会将Key和Value,writer.append(record.getKey(), record.getValue())。
writer为HDFSDataStream:append(event)方法直接调用serializer.write(e)。
(1)serializer为BodyTextEventSerializer,则其write(e)方法会将e.getBody()写入输出流,并根据配置再写入一个"\n";
(2)serializer为HeaderAndBodyTextEventSerializer,则其write(e)方法会将e.getHeaders() + " "(注意此空格)和e.getBody()写入输出流,并根据配置再写入一个"\n";
(3)serializer为FlumeEventAvroEventSerializer,则其write(e)方法会将event整体写入dataFileWriter。
writer为HDFSCompressedDataStream:append(event)方法会首先判断是否完成一个阶段的压缩isFinished,如果是则更新压缩输出流的状态,并isFinished=false,否则剩下的执行和HDFSDataStream.append(event)相同。
E、是做一些统计工作processSize是统计文件大小的;eventCounter是统计文件行数的;batchCounter是统计最近一次flush之后的处理的event数;
F、如果处理的event数达到batchSize则刷新到HDFS中,flush()。flush()方法会首先执行writer.sync()即写入HDFS,然后清空batchCounter表明这次batch已经完成,可以准备下次的。涉及到writer就会涉及很多写入类型:
writer为HDFSSequenceFile:sync()方法执行SequenceFile.Writer.syncFs()将数据写入HDFS中;
writer为HDFSDataStream:sync()方法执行
writer为HDFSCompressedDataStream:sync()方法先执行serializer.flush():只有FlumeEventAvroEventSerializer的flush()方法也有实现dataFileWriter.flush(),其他俩BodyTextEventSerializer和HeaderAndBodyTextEventSerializer均未实现flush()方法。然后执行outStream.flush()和outStream.sync()将数据刷新至HDFS中。
如果idleTimeout>0,表示文件超时时间,超时后就成为无效文件需要关闭(默认是0不允许关闭的),构造一个Callable对象idleAction执行内容是:close()方法,idleClosed = true表示超时关闭了这个bucketwriter,而且onIdleCallback.run(onIdleCallbackPath)会将onIdleCallbackPath从HDFSEventSink.sfWriters中删除对应对应的bucketwriter,表示这个文件已经写完了。然后将这个idleAction放入timedRollerPool中idleTimeout秒后执行。
8、回到HDFSEventSink.process()方法中,会根据这次事务处理的event数量更新相应的计数器;
9、遍历writers,挨个刷新BucketWriter至HDFS;
10、transaction.commit();//提交事务
11、transaction.rollback();//异常后回滚
12、transaction.close();//关闭事务
四、stop()方法。首先会遍历sfWriters,挨个close(BucketWriter):BucketWriter.close()方法,如果isOpen=true表示文件还处于打开状态,则writer.close()(这里的writer就不分情况了,HDFSSequenceFile就直接writer.close();其他俩都是先flush(好些都没实现)再beforClose(好些都没实现)输出流再flush、sync、close),BucketWriter.close()方法方法接下来关闭俩线程池以及改名等清理操作。HDFSEventSink的stop()方法接下来是关闭俩线程池,清理一些数据比如sfWriters.clear()。
ps:1、BucketWriter中的public方法都是线程安全的,包括append、close、flush三个均是同步方法,会调用相应的do方法,做具体的操作。
2、callWithTimeout方法需要注意,在HDFSEventSink中多次用到这个方法:append、flush、close,这个方法会将对应的Callable放入callTimeoutPool线程池中执行,并等待callTimeout(默认是10000) ms返回结果。
问题1:WriterLinkedHashMap的sfWriters除了设置hdfs.idleTimeout且>0时才会从sfWriters中remove掉超时的bucketwriter,其它地方并没有发现remove操作,那么以后随着写入文件的增多sfWriters会不会始终增大?
解:肯定不会啊。别忘了还有一个"hdfs.maxOpenFiles"参数默认是5000,追踪发现HDFSEventSink内部静态类WriterLinkedHashMap继承了LinkedHashMap,并重写了removeEldestEntry方法,这个方法在sfWriters.put时总会调用,当sfWriters.size()>maxOpenFiles时就是自动清理之时了。maxOpenFiles就是sfWriters得最大容量。
这次的sink比较复杂,希望我写的大伙能够看懂,期间还有一些细节不太清楚,不过不影响整体的理解。
不解1:bucketwriter类中的doOpen方法中hadoop的RPC线程非安全,说是可以从FLUME-1231这得到解释
不解2:同样doOpen方法中有说“Need to get reference to FS using above config before underlying writer does in order to avoid shutdown hook & IllegalStateExceptions”这里也表示疑问,为什么这么说?
不解3:为什么HDFSWriter的3个实现类的open()方法中,均考虑了conf.getBoolean("hdfs.append.support", false) == true?一个是可追加的一个是不可追加的。但是都是一个SequenceFile.Writer或者FSDataOutputStream,尤其是在HDFSSequenceFile中的writer能不能追加似乎根本没什么区别,充其量是一个writer的参数是FSDataOutputStream,另外一个则不是,其他俩好歹还有需要设置appending=true用来判断是否可重复打开但也是有点牵强,都可以合二为一,但是为什么不那么做呢?
不解4:BucketPath.escapeString这个方法还没搞懂,导致格式化的结果不甚明了。。。哎
欢迎大伙交流!!