首页 > 代码库 > flume-ng-taildirectory-source 修改调试可用

flume-ng-taildirectory-source 修改调试可用

由于flume-ng至1.5版本仍旧没有稳定可用的类似flume-og中的taildir的功能,所以从git中https://github.com/jinoos/flume-ng-extends找了一个别人针对flume-ng实现的的taildir这个按照github上他自己说明,是没法正常使用的。查看了源码后,做了一些相应修改

1. 默认的DirectoryTailParserModulable类修改

他实现了2种DirectoryTailParserModulable

  第一种是SingleLineParserModule,适用日志里只有单条记录的。并且代码中默认就是使用的这个,显然很不靠谱。提供了配置项,但是说明里没有写出来,配置项为 ‘parser’.

   第二种是MultiLineParserModule,适用多行的日志文件的。这里我们大部分情况肯定是要用这个的。

DirectoryTailSource类中如下行

  private static final String DEFAULT_PARSER_MODULE_CLASS = "com.jinoos.flume.SingleLineParserModule";

修改为。包名根据实际情况来更改

  private static final String DEFAULT_PARSER_MODULE_CLASS = "org.apache.flume.source.taildirectory.MultiLineParserModule";

2. first-line-pattern配置

这是MultiLineParserModule中的一个属性,用来验证读进来的行是否为第一行。这个说明中也没提到

如果没有配置这个配置,那么就无法正常执行,会报“wrong log format”。

主要代码如下:

 1         private void readMessage(FileSet fileSet) { 2             try { 3                 String buffer; 4  5                 synchronized (fileSet) { 6  7                     while ((buffer = fileSet.readLine()) != null) { 8                         if (buffer.length() == 0) { 9                             continue;10                         }11 12                         boolean isFirstLine = parserModule.isFirstLine(buffer);13                         if (isFirstLine) {14                             sendEvent(fileSet);15                             fileSet.appendLine(buffer);16                             parserModule.parse(buffer, fileSet);17 18                         } else {19                             if (fileSet.getLineSize() == 0) {20                                 logger.debug("Wrong log format, " + buffer);21                                 continue;22                             } else {23                                 fileSet.appendLine(buffer);24                                 parserModule.parse(buffer, fileSet);25                             }26                         }27 28                         if (parserModule.isLastLine(buffer)) {29                             sendEvent(fileSet);30                         }31                     }32                 }33             } catch (IOException e) {34                 logger.warn(e.getMessage(), e);35             }36         }

根据我们的实际需求,我们不需要判断是否第一行,只要有change事件,全部写入到channel中即可

修改为如下方式

 1         // ??? ?? Event? ????. 2         private void readMessage(FileSet fileSet) { 3             try { 4                 String buffer; 5  6                 synchronized (fileSet) { 7  8                     while ((buffer = fileSet.readLine()) != null) { 9                         if (buffer.length() == 0) {10                             continue;11                         }12 13                         fileSet.appendLine(buffer);14                         sendEvent(fileSet);15                     }16                 }17             } catch (IOException e) {18                 logger.warn(e.getMessage(), e);19             }20         }

改为这种方式后,只要来一行就会send到channel中。如果需要批量的,可以按自己要求更改。

现在就不再需要关注first-line-pattern这个配置了。

注意:但是配置在配置文件中还是配的,虽然它没有起到任何作用。如果想不配置,请修改MultiLineParserModule的configure(Context context)方法

 

3.监控文件中有中文,编码的配置添加

 目前这个版本是无法支持中文的文件的。

正式读取数据的方法:位置FileSet类中

  public String readLine() throws IOException {    return rReader.readLine();  }

这个rReader是个RandomAccessFile对象

  public FileSet(AbstractSource source, FileObject fileObject)      throws IOException {    this.source = source;    this.fileObject = fileObject;    this.bufferList = new ArrayList<String>();    //File f = new File(fileObject.getName().getPath());    File f = new File("d:/tmp/log_compare/test1.txt");    rReader = new RandomAccessFile(f, "r");    rReader.seek(f.length());    bufferList = new ArrayList<String>();    headers = new HashMap<String, String>();    logger.debug("FileSet has been created " + fileObject.getName().getPath());    this.seq = 0L;  }

在FileSet类实例化时创建。

下面开始修改操作,源代码中是直接使用了RandomAccessFile的readline()方法,修改为按byte读取的方式

  /**   *    * @Title: readLine   * @Description: TODO(读取文件中的一行)   * @param @throws IOException    设定文件   * @return String    返回类型   * @throws   */  public String readLine() throws IOException {    if(rReader.getFilePointer() < rReader.length()) {        byte b = rReader.readByte();//读取一个byte        int i = 0;        byte[] buf = new byte[10240];//创建大小为1M的数据,如果你的单行超过1M,那么会出错        //如果读到换行符,或者读到文件最后就停止。表示已经读完一行        while(b != ‘\n‘ && rReader.getFilePointer() < rReader.length()) {            buf[i++] = b;            b = rReader.readByte();                    }        return new String(buf,0,i);    }else{        return "";    }  }

改完后重新打包再次测试,发现已经可以支持中文了。

4.每次新文件刚被创建时会丢失第一条数据

代码如下

        public void run() {            while (true) {                try {                    // DirectoryTailEvent event = eventQueue.poll(                    // eventQueueWorkerTimeoutMiliSecond,                    // TimeUnit.MILLISECONDS);                    DirectoryTailEvent event = eventQueue.take();                    if (event == null) {                        continue;                    }                    if (event.type == FileEventType.FILE_CHANGED) {                        fileChanged(event.event);                    } else if (event.type == FileEventType.FILE_CREATED) {                        fileCreated(event.event);                    } else if (event.type == FileEventType.FILE_DELETED) {                        fileDeleted(event.event);                    } else if (event.type == FileEventType.FLUSH) {                        if (event.fileSet != null)                            sendEvent(event.fileSet);                    }                } catch (InterruptedException e) {                    logger.debug(e.getMessage(), e);                } catch (FileSystemException e) {                    logger.info(e.getMessage(), e);                }            }        }

上面这段代码为监测的文件夹有新的事件时的处理。这里我们要看的是FILE_CREATE事件,他调用了fileCreated(event.event);

 1     private void fileCreated(FileChangeEvent event) 2                 throws FileSystemException { 3             String path = event.getFile().getName().getPath(); 4             String dirPath = event.getFile().getParent().getName().getPath(); 5  6             logger.debug(path + " has been created."); 7  8             DirPattern dirPattern = null; 9             dirPattern = pathMap.get(dirPath);10 11             if (dirPattern == null) {12                 logger.warn("Occurred create event from un-indexed directory. "13                         + dirPath);14                 return;15             }16 17             // ???? ???? ????.18             if (!isInFilePattern(event.getFile(), dirPattern.getFilePattern())) {19                 logger.debug(path + " is not in file pattern.");20                 return;21             }22 23             FileSet fileSet;24 25             fileSet = fileSetMap.get(event.getFile().getName().getPath());26             //fileSet = fileSetMap.get(path);27             if (fileSet == null) {28                 try {29                     logger.info(path30                             + " is not in monitoring list. It‘s going to be listed.");31                     32                     fileSet = new FileSet(source, event.getFile());33                     // a little synchronized bug here.fixed by tqli,2014-08-0734                     // ,E-mail:tiangang1126@126.com35                     synchronized (fileSetMap) {36                         fileSetMap.put(path, fileSet);37                     }38                 } catch (IOException e) {39                     logger.error(e.getMessage(), e);40                     return;41                 }42             }43         }

看第27行,当新的文件进来,需要创建一个fileSet对象。将这个fileSet对象存入fileSetMap中

看fileSet实例化的方法,上面已经贴过了

 1   public FileSet(AbstractSource source, FileObject fileObject) 2       throws IOException { 3     this.source = source; 4     this.fileObject = fileObject; 5  6     this.bufferList = new ArrayList<String>(); 7  8     File f = new File(fileObject.getName().getPath()); 9     //File f = new File("d:/tmp/log_compare/test1.txt");10     rReader = new RandomAccessFile(f, "r");11     rReader.seek(f.length());12     bufferList = new ArrayList<String>();13     headers = new HashMap<String, String>();14     logger.debug("FileSet has been created " + fileObject.getName().getPath());15     logger.debug("file length now is : " + f.length());16     this.seq = 0L;17   }

注意看第11行,将游标移到到f.length的位置,这样的问题就是跟着文件新建时写入的内容,全部被忽略了。这样就造成了数据丢失

那怎么解决这个问题呢,简单的改为

rReader.seek(0);
肯定是不行的,具体的原因,大家自己思考下吧。

我们目的的就是在有监控新的事件时,创建的fileSet,游标位置能在文件原来的的位置。
需求明确了,下面就知道该做哪些事了。
1 首先在DirectoryTailSource中start方法执行时,将配置监控文件下符合正则条件文件的length都保存在一个Map里
2 在监听到新事件新建fileSet时,判断这个文件是新建的还是之前就存在的,如果是之前就存在的,那么就可以直接取之前记下的这个文件的大小。如果不存在,说明这个文件是个新文件,则从0位置开始读
注意:这个不支持文件更改的情况,只能适应只对文件做增加的场景
下面是代码修改的部分

DirectoryTailSource类
添加 fileInitLengthMap 属性
1     private Map<String, DirPattern> dirMap;2     private Map<String, DirPattern> pathMap;3     private Map<String,Long> fileInitLengthMap;//文件初始大小记录,用来定位新建fileSet时的游标初始位置
在configure方法中实例化fileInitLengthMap
    public void configure(Context context) {        logger.info("Source Configuring..");        dirMap = new HashMap<String, DirPattern>();        pathMap = new HashMap<String, DirPattern>();        fileInitLengthMap = new HashMap<String,Long>();

在start方法中初始化fileInitLengthMap。保存全部符合正则条件的文件大小。红色部分为添加的代码

 1     public void start() { 2         logger.info("Source Starting.."); 3  4         if (sourceCounter == null) { 5             sourceCounter = new SourceCounter(getName()); 6         } 7  8         fileSetMap = new Hashtable<String, FileSet>(); 9 10         try {11             fsManager = VFS.getManager();12         } catch (FileSystemException e) {13             logger.error(e.getMessage(), e);14             return;15         }16 17         monitorRunnable = new MonitorRunnable();18 19         fileMonitor = new DefaultFileMonitor(monitorRunnable);20         fileMonitor.setRecursive(false);21 22         FileObject fileObject;23 24         logger.debug("Dirlist count " + dirMap.size());25         for (Entry<String, DirPattern> entry : dirMap.entrySet()) {26             logger.debug("Scan dir " + entry.getKey());27 28             DirPattern dirPattern = entry.getValue();29 30             try {31                 fileObject = fsManager.resolveFile(dirPattern.getPath());32             } catch (FileSystemException e) {33                 logger.error(e.getMessage(), e);34                 continue;35             }36 37             try {38                 if (!fileObject.isReadable()) {39                     logger.warn("No have readable permission, "40                             + fileObject.getURL());41                     continue;42                 }43 44                 if (FileType.FOLDER != fileObject.getType()) {45                     logger.warn("Not a directory, " + fileObject.getURL());46                     continue;47                 }48 49                 // ??? Monitoring ??? ????.50                 fileMonitor.addFile(fileObject);51                 logger.debug(fileObject.getName().getPath()52                         + " directory has been add in monitoring list");53                 pathMap.put(fileObject.getName().getPath(), entry.getValue());54                 //pathMap.put("d:/tmp/log_compare", entry.getValue());55                 //新增部分,文件初始化大小保存56                 FileObject[] allChiledfile = fileObject.getChildren();57                 for(FileObject chiledFileobject : allChiledfile) {58                     if(dirPattern.getFilePattern().matcher(chiledFileobject.getName().getBaseName()).find()) {59                         String chiledFildPath = chiledFileobject.getName().getPath();60                         //String chiledFildPath = "d:/tmp/log_compare/test1.txt";61                         File chiledfile = new File(chiledFildPath);62                         fileInitLengthMap.put(chiledFildPath, 63                                 chiledfile.length());64                         logger.debug(chiledFildPath + " init length is :" + chiledfile.length());65                     }66                 }67             } catch (FileSystemException e) {68                 logger.warn(e.getMessage(), e);69                 continue;70             } catch (Exception e) {71                 logger.debug(e.getMessage(), e);72             }73 74         }75 76         executorService = Executors77                 .newFixedThreadPool(eventQueueWorkerSize + 1);78         monitorFuture = executorService.submit(monitorRunnable);79 80         for (int i = 0; i < eventQueueWorkerSize; i++) {81             workerFuture[i] = executorService.submit(new WorkerRunnable(this));82         }83 84         sourceCounter.start();85         super.start();86     }

FileSet类

 

 1   public FileSet(AbstractSource source, FileObject fileObject,Map<String,Long> fileInitLengthMap) 2       throws IOException { 3     this.source = source; 4     this.fileObject = fileObject; 5  6     this.bufferList = new ArrayList<String>(); 7  8     File f = new File(fileObject.getName().getPath()); 9     rReader = new RandomAccessFile(f, "r");10     /*11      *判断在初始化taildirSource时,这个文件是否存在,如果存在则游标定位当时记录下的文件长度开始12      *如果不存在,则说明这是一个新建的文件,游标从0开始13      */14     if(fileInitLengthMap.containsKey(fileObject.getName().getPath())) {15         rReader.seek(fileInitLengthMap.get(fileObject.getName().getPath()));16     }else{17         rReader.seek(0);18     }19     20     bufferList = new ArrayList<String>();21     headers = new HashMap<String, String>();22     logger.debug("FileSet has been created " + fileObject.getName().getPath());23     logger.debug("file length now is : " + f.length());24     this.seq = 0L;25   }

 

修改类实例化的方法。并修改DirectoryTailSource类中调用FileSet实例化方法的地方。

至此修改全部全部完成。

 

没找到能上传附件的地方,改完的jar包就不提供了。

此为一个使用这个jar的例子

a.sources = sourcesa.sinks = sinksa.channels = c#configure sourcesa.sources.sources.type = org.apache.flume.source.taildirectory.DirectoryTailSourcea.sources.sources.dirs = s0#a.sources.sources.dirs.s0.path = /usr/local/nginx/logs/a.sources.sources.dirs.s0.path = /home/flume/testTailDira.sources.sources.dirs.s0.file-pattern = ^access_.*log$a.sources.sources.first-line-pattern = ^(.*)$#congfigure sinksa.sinks.sinks.type = file_rolla.sinks.sinks.sink.directory = /home/flume/testTailDir2a.sinks.sinks.sink.rollInterval = 30a.sinks.sinks.channel = c#configure channalsa.channels.c.type = memory#bind channela.sources.sources.channels = c

 

flume-ng-taildirectory-source 修改调试可用