首页 > 代码库 > flume拦截器
flume拦截器
拦截器作用:拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器。
flume修改时间戳的插件见 https://github.com/haebin/flume-timestamp-interceptor
有一个缺陷是,DateUtils.parseDate(timestamp, dateFormat)里面的dateFormat不支持unix时间戳,只能自己手动添加了
原来是:
- String timestamp = get(index, data);
- now = DateUtils.parseDate(timestamp, dateFormat).getTime();
- headers.put(TIMESTAMP, Long.toString(now));
修改后
- String timestamp = get(index, data);
- if (dateFormat[0].equals("tsecond")){
- now = Long.parseLong(timestamp)*1000;
- }
- else if(dateFormat[0].equals("tmillisecond")){
- now = Long.parseLong(timestamp);
- }
- else if(dateFormat[0].equals("tnanosecond")){
- now = Long.parseLong(timestamp)/1000000;
- }
- else {
- now = DateUtils.parseDate(timestamp, dateFormat).getTime();
- }
- headers.put(TIMESTAMP, Long.toString(now));
flume配置:
- kafka_sn_hive.sources.s1.interceptors = timestamp
- kafka_sn_hive.sources.s1.interceptors.timestamp.type = org.apache.flume.interceptor.EventTimestampInterceptor$Builder
- kafka_sn_hive.sources.s1.interceptors.timestamp.preserveExisting = false
- kafka_sn_hive.sources.s1.interceptors.timestamp.delimiter = ,
- kafka_sn_hive.sources.s1.interceptors.timestamp.dateIndex = 4
- kafka_sn_hive.sources.s1.interceptors.timestamp.dateFormat = tsecond
表示按逗号做分隔符的第四个(从0开始)字段是一个秒单位的时间戳。
在flume里面,时间戳是毫秒级别的,所以要判断这个字段是秒还是毫秒纳秒
见http://lisux.me/lishuai/?p=867
flume拦截器
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。