首页 > 代码库 > Flume自定义Source
Flume自定义Source
大家好。
公司有个需求。要求Flumne 从MQ 取消息存储到DFS ,写了Flume自定义的source 。,由于我也是刚接触Flume 。 所以有啥不对的请谅解。
查看了Flume-ng的源码。 一般都是根据不同的场景 extends AbstractSource implements EventDrivenSource, Configurable
MQSource 代码如下:
1 public class MQSource extends AbstractSource implements EventDrivenSource, Configurable { 2 3 private Logger logger = org.slf4j.LoggerFactory.getLogger(MQSource.class); 4 5 private long heartbeat; 6 7 private MQReceiver receiver; 8 9 private HandleLineCallBack handle;10 11 private Thread t;12 13 @Override14 public void configure(Context context) {15 16 String mq_url = context.getString(MQContext.MQ_BROKER_URI, ActiveMQConnection.DEFAULT_BROKER_URL);17 String mq_userName = context.getString(MQContext.MQ_USERNAME, ActiveMQConnection.DEFAULT_USER);18 String mq_password = context.getString(MQContext.MQ_PASSWORD, ActiveMQConnection.DEFAULT_PASSWORD);19 String mq_queueKey = context.getString(MQContext.MQ_QUEUEKEY, "NULL");20 String handleClass = context.getString(MQContext.HANDLECLASS, "NULL");21 22 long mq_reciveTimeout = context.getLong(MQContext.MQ_RECIVETIMEOUT, 3000L);23 long heartbeat = context.getLong(MQContext.HEARTBEAT, 3000L);24 this.heartbeat = heartbeat;25 if ("NULL".equals(mq_queueKey)) {26 logger.error("{} : Unable to load MQ_queueKey ", getName());27 return;28 }29 if ("NULL".equals(handleClass)) {30 logger.warn("{} : Unable to handleClass using DefaultHandle ", getName());31 handleClass = "com.bidlink.handle.DefaultHandle";32 }33 34 MQConfig mqconfig = new MQConfig(mq_url, mq_userName, mq_password, mq_queueKey, mq_reciveTimeout);35 logger.info("{} MQ Configuration : {} ", getName(), mqconfig.toString());36 receiver = MQFactory.MQ.getReceiver(mqconfig);37 logger.info("{} .get recerver key is {} . obj is : {} ", getName(), mqconfig.getQueueKey(), receiver);38 39 try {40 @SuppressWarnings("unchecked")41 Class<?> handleClazz = (Class<? extends HandleLineCallBack>) Class.forName(handleClass);42 handle = (HandleLineCallBack) handleClazz.newInstance();43 } catch (ClassNotFoundException e) {44 logger.error("{} unable to load class {} . {} ", getName(), handleClass, e);45 } catch (InstantiationException e1) {46 logger.error("{} instance class error {} . {} ", getName(), handleClass, e1);47 } catch (IllegalAccessException e2) {48 logger.error("{} occur exception {} . {} ", getName(), handleClass, e2);49 }50 }51 52 @Override53 public synchronized void start() {54 logger.info("MQSource start.....");55 // TODO Auto-generated method stub56 try {57 t = new Thread() {58 public void run() {59 while (true) {60 try {61 List<String> lines = receiver.getText();62 for (String line : lines) {63 //logger.info("Message line : {} ",line);64 Event e = new SimpleEvent();65 String refStr = handle.refactor(line);66 e.setBody(refStr.getBytes("GBK"));67 getChannelProcessor().processEvent(e);68 }69 super.start();70 Thread.sleep(heartbeat);71 } catch (Exception e1) {72 e1.printStackTrace();73 }74 75 }76 };77 };78 t.start();79 } catch (Exception e1) {80 logger.error("error starting MQResource {} ",e1.getMessage());81 e1.printStackTrace();82 }83 }84 85 @Override86 public synchronized void stop() {87 logger.info("MQSource stoping...");88 if (t.isAlive()) {89 try {90 t.join();91 } catch (InterruptedException e) {92 e.printStackTrace();93 }94 t.interrupt();95 }96 super.stop();97 }98 99 }
start方法中主要代码:
Event e = new SimpleEvent(); e.setBody("hello everyone ".getBytes("GBK"));
getChannelProcessor().processEvent(e);
super.start();
configure方法中的context中能获取各种自定义的配置信息。如在flume.conf中配置以下信息:
tier1.sources.testSources.type = org.yunjume.source.MQSourcetier1.sources.testSources.MQ_userName= admintier1.sources.testSources.MQ_password= admin123tier1.sources.testSources.MQ_brokerURL=tcp://localhost:61616tier1.sources.testSources.MQ_queueKey=FirstQueuetier1.sources.testSources.MQ_reciveTimeout=30000tier1.sources.testSources.heartbeat=30000# to process mq message queue line and return new line . tier1.sources.testSources.handleClass=org.yunjume.handle.DefaultHandletier1.sources.testSources.channels = testChannels
获取MQ_userName值代码为:
String mq_userName = context.getString("MQ_userName", ActiveMQConnection.DEFAULT_USER);
stop 就结束了。
打包jar 放到Flume主目录的插件目录下。我的是/usr/lib/flume-ng/plugins.d
如打包的名字叫MQSource.jar 那应该在plugins.d创建文件夹 MQSource 然后把MQSource.jar放到MQSource/lib下。
依赖的jar 放到 MQSource/libext下 。目录结构如下
/usr/lib/flume-ng/plugins.d/MQSource/lib/MQSource.jar
/usr/lib/flume-ng/plugins.d/MQSource/libext/ 依赖的jar包
/usr/lib/flume-ng/plugins.d/MQSource/native 本地so文件或dll文件
Flume自定义Source
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。