首页 > 代码库 > Flume自定义Source

Flume自定义Source

 大家好。

 

    公司有个需求。要求Flumne 从MQ 取消息存储到DFS ,写了Flume自定义的source 。,由于我也是刚接触Flume 。 所以有啥不对的请谅解。

    查看了Flume-ng的源码。  一般都是根据不同的场景  extends AbstractSource implements EventDrivenSource, Configurable

 

   MQSource 代码如下:

  

 1 public class MQSource extends AbstractSource implements EventDrivenSource, Configurable { 2  3     private Logger logger = org.slf4j.LoggerFactory.getLogger(MQSource.class); 4  5     private long heartbeat; 6  7     private MQReceiver receiver; 8  9     private HandleLineCallBack handle;10 11     private Thread t;12 13     @Override14     public void configure(Context context) {15 16         String mq_url = context.getString(MQContext.MQ_BROKER_URI, ActiveMQConnection.DEFAULT_BROKER_URL);17         String mq_userName = context.getString(MQContext.MQ_USERNAME, ActiveMQConnection.DEFAULT_USER);18         String mq_password = context.getString(MQContext.MQ_PASSWORD, ActiveMQConnection.DEFAULT_PASSWORD);19         String mq_queueKey = context.getString(MQContext.MQ_QUEUEKEY, "NULL");20         String handleClass = context.getString(MQContext.HANDLECLASS, "NULL");21 22         long mq_reciveTimeout = context.getLong(MQContext.MQ_RECIVETIMEOUT, 3000L);23         long heartbeat = context.getLong(MQContext.HEARTBEAT, 3000L);24         this.heartbeat = heartbeat;25         if ("NULL".equals(mq_queueKey)) {26             logger.error("{} :  Unable to load MQ_queueKey ", getName());27             return;28         }29         if ("NULL".equals(handleClass)) {30             logger.warn("{} :  Unable to handleClass using DefaultHandle ", getName());31             handleClass = "com.bidlink.handle.DefaultHandle";32         }33 34         MQConfig mqconfig = new MQConfig(mq_url, mq_userName, mq_password, mq_queueKey, mq_reciveTimeout);35         logger.info("{}  MQ Configuration : {} ", getName(), mqconfig.toString());36         receiver = MQFactory.MQ.getReceiver(mqconfig);37         logger.info("{} .get recerver key is {} .  obj is  : {} ", getName(), mqconfig.getQueueKey(), receiver);38 39         try {40             @SuppressWarnings("unchecked")41             Class<?> handleClazz = (Class<? extends HandleLineCallBack>) Class.forName(handleClass);42             handle = (HandleLineCallBack) handleClazz.newInstance();43         } catch (ClassNotFoundException e) {44             logger.error("{} unable to load class  {}  . {} ", getName(), handleClass, e);45         } catch (InstantiationException e1) {46             logger.error("{} instance class error {} . {} ", getName(), handleClass, e1);47         } catch (IllegalAccessException e2) {48             logger.error("{} occur  exception {} . {} ", getName(), handleClass, e2);49         }50     }51 52     @Override53     public synchronized void start() {54         logger.info("MQSource start.....");55         // TODO Auto-generated method stub56         try {57             t = new Thread() {58                 public void run() {59                     while (true) {60                         try {61                             List<String> lines = receiver.getText();62                             for (String line : lines) {63                                 //logger.info("Message line : {} ",line);64                                 Event e = new SimpleEvent();65                                 String refStr = handle.refactor(line);66                                 e.setBody(refStr.getBytes("GBK"));67                                 getChannelProcessor().processEvent(e);68                             }69                             super.start();70                             Thread.sleep(heartbeat);71                         } catch (Exception e1) {72                             e1.printStackTrace();73                         }74                         75                     }76                 };77             };78             t.start();79         } catch (Exception e1) {80             logger.error("error starting MQResource {} ",e1.getMessage());81             e1.printStackTrace();82         }83     }84 85     @Override86     public synchronized void stop() {87         logger.info("MQSource stoping...");88         if (t.isAlive()) {89             try {90                 t.join();91             } catch (InterruptedException e) {92                 e.printStackTrace();93             }94             t.interrupt();95         }96         super.stop();97     }98 99 }

  start方法中主要代码:

 

  Event e = new SimpleEvent();                                e.setBody("hello everyone ".getBytes("GBK")); 
getChannelProcessor().processEvent(e);

super.start();
configure方法中的context中能获取各种自定义的配置信息。如在flume.conf中配置以下信息:

tier1.sources.testSources.type = org.yunjume.source.MQSourcetier1.sources.testSources.MQ_userName= admintier1.sources.testSources.MQ_password= admin123tier1.sources.testSources.MQ_brokerURL=tcp://localhost:61616tier1.sources.testSources.MQ_queueKey=FirstQueuetier1.sources.testSources.MQ_reciveTimeout=30000tier1.sources.testSources.heartbeat=30000# to process mq message queue line and return new line . tier1.sources.testSources.handleClass=org.yunjume.handle.DefaultHandletier1.sources.testSources.channels = testChannels

 



获取MQ_userName值代码为:
String mq_userName = context.getString("MQ_userName", ActiveMQConnection.DEFAULT_USER);

 stop 就结束了。

 

 打包jar 放到Flume主目录的插件目录下。我的是/usr/lib/flume-ng/plugins.d

 如打包的名字叫MQSource.jar 那应该在plugins.d创建文件夹 MQSource 然后把MQSource.jar放到MQSource/lib下。

依赖的jar 放到 MQSource/libext下 。目录结构如下

 /usr/lib/flume-ng/plugins.d/MQSource/lib/MQSource.jar

 /usr/lib/flume-ng/plugins.d/MQSource/libext/   依赖的jar包

/usr/lib/flume-ng/plugins.d/MQSource/native   本地so文件或dll文件

Flume自定义Source