首页 > 代码库 > Flume-NG源码阅读之FileChannel

Flume-NG源码阅读之FileChannel

  FileChannel是flume一个非常重要的channel组件,非常常用。这个channel非常复杂,涉及的文件更多涉及三个包:org.apache.flume.channel.file、org.apache.flume.channel.file.encryption(加密)、org.apache.flume.channel.file.proto共计40个源码文件。

  一、configure(Context context)方法:

  1、首先获取配置文件中的checkpointDir和dataDirs属性,这是存放检查点和数据的目录,默认使用$user.home/.flume/file-channel/checkpoint和$user.home/.flume/file-channel/data来;checkpointDir是一个目录,而dataDirs可以是多个以“,”分割;且这两个目录最好不要来回修改,因为里面存储着数据呢;

  2、获取容量capacity,并做一些检查比如是否<0,是否是动态加载(有无变化?),默认1000000;这指的是checkpoint文件存放event信息的最大容量。

  3、keepAlive超时时间,就是如果channel中没有数据最长等待时间,默认3s;

  4、transactionCapacity事务的最大容量,默认1000;

  5、checkpointInterval检查点写入间隔,默认30000ms;

  6、maxFileSize,data文件大小的上限,用户设置的和1623195647 之间较小的那个;

  7、最少需要多少空间minimumRequiredSpace,max((用户配置的,500M),1M);

  8、useLogReplayV1,默认false;

  9、useFastReplay,默认false;

  10、encryptionActiveKey,加密密钥别名默认为null;

  11、encryptionContext加密配置信息;

  12、encryptionCipherProvider加密密码提供者,缺省值为null

  13、encryptionKeyProviderName,加密密钥提供者,缺省值为null;

  14、queueRemaining,队列是否有剩余空间信号量,初始化容量为capacity;

  15、设置Log log的检查间隔checkpointInterval和maxFileSize最大文件大小。

  16、是否新建一个计数器channelCounter。

  二、start()方法。

  1、通过Log.Builder()构建了一个builder对象,并设置了相应的参数,然后log = builder.build(),Log的构造方法会对checkpointDir及logDirs尝试获取锁操作,所以如果存在多个file channel则checkpointDir及logDirs最好配置在多个磁盘下或者多个目录下,否则只能一个获得初始化;Log用来将封装好的FlumeEvent写入磁盘并将指向这些event的指针存入一个内存队列queue。会创建一个线程工作内容就是每隔checkpointInterval毫秒,默认30s写一次检查点log.writeCheckpoint(),会将checpoint、inflightTakes、inflightPuts都刷新至磁盘,会先后将inflightPuts、inflightTakes、checkpoint.meta重建,更新checkpoint文件并刷新至磁盘,这些文件都在checkpointDir目录下;更新log-ID.meta文件;同时肩负起删除log文件及其对应的meta文件的责任。

  2、log.replay(),一旦一个Log对象被创建,则需要调用replay()方法使用queue最新的检查点来调整磁盘上的write ahead log。会获取最大的fileID;然后读取log文件根据record的类型进行相应的操作,进行恢复;遍历所有的data目录,然后roll(index)创建LogFile.Writer(空的);然后将queue刷新到相关文件。

  3、 open = true,表示channel打开;

  4、depth = getDepth(),FlumeEventQueue的大小,然后需要判断一下queueRemaining是否有足够的剩余量queueRemaining.tryAcquire(depth);

  5、如果open==true,计数器开始工作。

  三、createTransaction()方法主要是构造一个FileBackedTransaction对象用来直接操作channel,并返回。

  四、stop()停止channel,清理数据。

  五、静态类FileBackedTransaction extends BasicTransactionSemantics。类似可参考memory channel,必须要实现doTake()、doCommit()、doRollback()、doPut()四个方法。put和take不能同时操作。

  1、doPut(Event event)方法,该方法source会通过transaction.put()方法调用。检查LinkedBlockingDeque<FlumeEventPointer> putList是否有剩余空间(putList.remainingCapacity() == 0);检查queue是否有剩余空间,如果没有则等待keepAlive秒(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)),获取一个许可;获取log的共享锁;FlumeEventPointer ptr = log.put(transactionID, event)将event写入数据文件(是RandomAccessFile),transactionID指的是每次事务的编号,且都是在上一次基础之上加一而来,每个event都要调用put(Event)方法,该方法会获取要写入的数据文件的目录(配置文件中可以配置多个data文件目录,这里会依据transactionID轮训式的向所有的目录写数据),由于start()方法中有log.replay()方法,该方法会遍历所有的data目录并roll(index)创建LogFile.Writer,logFiles.get(logFileIndex).getUsableSpace()不会为0,检查是否剩余空间够,然后获取transactionID对应文件的写LogFile.Writer(其实是其子类LogFileV3.Writer),如果没有则调用方法 roll(logFileIndex, buffer)创建一个LogFileV3.Writer,放入logFiles(这个维持着每个data目录对应着一个正在活动的可以用于写的文件)可能会根据buffer的大小滚动文件因为单个文件有最大限制,LogFileV3.Writer的构造方法会在这个data目录创建一个meta文件,写入一些基本数据,FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer)会把event封装成的ByteBuffer,通过LogFile.Writer.write(buffer)方法写入磁盘文件(buffer会再次被封装,最终封装成的格式顺序是:OP_RECORD、buffer.limit()、buffer。buffer的内容是(RecordType(这是1)、TransactionID、LogWriteOrderID、event.getHeaders、event.getBody())),返回Pair.of(getLogFileID(), offset);ptr由两部分组成:fileID和在这个文件中的偏移量offset。fileID就是Log.nextFileID,不管有几个data目录,始终根据这个变量设置文件编号(文件名的后面的编号);putList.offer(ptr)加入到putList之中;queue.addWithoutCommit(ptr, transactionID)将这个方法在每个put操作中必须要被调用,确保检查点之后事务的提交,它会调用inflightPuts.addEvent(transactionID, e.toLong())此时e.toLong()=fileID+offset,addEvent方法会执行inflightEvents.put(transactionID, pointer)、inflightFileIDs.put(transactionID,FlumeEventPointer.fromLong(pointer).getFileID())、syncRequired = true,inflightEvents中存放的就是<transactionID,fileID+offset>而inflightFileIDs存放的是<transactionID,fileID>;success = true表示put成功;log.unlockShared()解除共享锁。put事件并未刷新至磁盘文件,因为并没有commit,commit操作会导致刷新至磁盘的操作。queue中累计的数量不能超过capacity,超过就会等待一定时间后异常。

  2、doTake()方法,该方法sink会通过transaction.take()方法调用。检查LinkedBlockingDeque<FlumeEventPointer> takeList时刻有剩余空间;然后获取共享锁;ptr = queue.removeHead(transactionID)取出头部的数据,ptr的内容<fileID, offset>,头部是逻辑上的0位置,但是物理上的头位置会一边take一边变化,是从checkpoint中取出的数据,期间会inflightTakes.addEvent(transactionID, value)将数据缓存在inflightTakes之中;然后放入takeList.offer(ptr);log.take(transactionID, ptr)会封装数据(buffer会再次被封装,最终封装成的格式顺序 是:OP_RECORD、buffer.limit()、buffer。buffer的内容是(RecordType(这是1)、 TransactionID、LogWriteOrderID、fileID、offset))然后写入缓存等待commit刷新至磁盘;event = log.get(ptr)是从data文件中取出数据event;最后释放共享锁。take操作queue.removeHead(transactionID)会从overwriteMap或者内存映射elementsBuffer中取出对应的head位置数据。

  3、doCommit()方法,该方法source和sink会通过transaction.commit()方法调用。首先获取takeList和putList的大小;putList和takeList不能同时都>0,其中有一个得是==0;如果putList>0,获取共享锁,log.commitPut(transactionID)会调用Log.commit(long transactionID, short type)方法把commit操作封装成一个ByteBuffer buffer(最终封装成的格式顺序是:OP_RECORD、 buffer.limit()、buffer。buffer的内容是(RecordType(这是4)、TransactionID、 LogWriteOrderID、type))写入数据文件,并刷新至磁盘文件,此刷新也会将这次的put或者take中的所有事件写入磁盘文件;然后是将所有putList中的数据放入queue中queue.addTail(putList.removeFirst())当添加第一个时会从checkpointfile的最后一个位置开始先写入overwriteMap(逻辑位置转为物理位置)中,后续会再从0开始循环写入overwriteMap,take操作也会从最后一个位置取会先检查overwriteMap中有无对应的数据,没有就再检查checkpoint的内存映射elementsBuffer中有无(控制take位置的是head位置,控制put位置的是size),每次更新检查点时都会把overwriteMap写入内存映射elementsBuffer中并刷新至磁盘文件checkpoint;queue.completeTransaction(transactionID)会执行清除操作即如果inflightPuts和inflightTakes执行其一,如果inflightPuts包含transactionID则清空inflightPuts,否则清空inflightTakes;然后解锁。如果takes>0,则获取共享锁,log.commitTake(transactionID)会进行封装写入data文件,commit中的类型标记除了自己的表示还有要提交类型的标记这里是TAKE,上面是PUT;queue.completeTransaction(transactionID)和上面的一样;解除共享锁;queueRemaining.release(takes)释放。最后将putList和takeList清空。

  4、doRollback()方法该方法source和sink会通过transaction.rollback()方法调用。首先会获取takeList和putList的大小;然后获取共享锁;如果takes>0,

并且puts==0,将putList中的所有有数据queue.addHead(takeList.removeLast()),addHead操作和声明的addTail操作相似,只不过是要在调用add(int index, long value)

方法时index是0,会插入到第一个位置;清空putList、takeList;queue.completeTransaction(transactionID)上面已经讲过;log.rollback(transactionID)会调用Log.rollback(long transactionID, short type)方法把commit操作封装成一个ByteBuffer buffer(最 终封装成的格式顺序是:OP_RECORD、 buffer.limit()、buffer。buffer的内容是(RecordType(这是3)、TransactionID、 LogWriteOrderID))写入缓存中;释放共享锁;queueRemaining.release(puts)释放许可。(这一段的格式乱了,这编辑器,我屮艸芔茻!!无语了。。。不自动换行了。。手动换的行。)

  1中的put操作将写入log文件的指针添加进了缓存putList中;2中的take操作从缓存中的取出指针,然如takeList中,然后写入log文件,从log文件中获取数据还原为event;3中的commit操作无论是对put的还是对take的都会讲commit信息写入log文件,都会清理queue中的缓存(inflightPuts和inflightTakes),如果对put还要将putList中的所有数据添加进queue的队尾,实际上是overwriteMap中,如果是对take则要释放queueRemaining的takes个许可量,还要清空putList、takeList;4中的rollback操作会将takeList中所有数据放入queue的头部,再清空putList、takeList,再清空queue中的缓存(inflightPuts和inflightTakes),将rollback信息写入log,要释放queueRemaining的puts个许可量。

 

  ps:

  1、data/log-ID,这种类型的文件存放的是put、take、commit、rollback的操作记录及数据。

  2、checkpoint/checkpoint存放的是event在那个data文件logFileID,的什么位置offset等信息。

  2、checkpoint/inflightTakes存放的是事务take的缓存数据,每隔段时间就重建文件。内容:1、16字节是校验码;2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...

  3、checkpoint/inflightPuts存放的是事务对应的put缓存数据,每隔段时间就重建文件。内容:1、16字节是校验码;2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...

  4、checkpoint/checkpoint.meta主要存储的是logfileID及对应event的数量等信息。

  5、data/log-ID.meta,主要记录log-ID下一个写入位置以及logWriteOrderID等信息。

    6、每个data目录里data文件保持不超过2个。

  7、putList和takeList是缓存存储的是相应的FlumeEventPointer,但是inflightTakes和inflightPuts其实也是缓存存储的也是相应的信息,只不过比两者多存一些信息罢了,功能重合度很高,为什么会这样呢?我想是一个只能在内存,一个可以永久存储(当然是不断重建的),后者可以用来进行flume再启动的恢复。

  file channel太过复杂了,比配置的文件的加载复杂更多,涉及的知识非常多,还不能一下子就消耗了。。。。大体的已经了解了,剩下的都是细节!!后续会再慢慢咀嚼!!争取吃透file。