首页 > 代码库 > flume自定义sink
flume自定义sink
package me; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class MySink extends AbstractSink implements Configurable { // 在整个sink结束时执行一遍 @Override public synchronized void stop() { // TODO Auto-generated method stub super.stop(); } // 在整个sink开始时执行一遍 @Override public synchronized void start() { // TODO Auto-generated method stub super.start(); } // 不断循环调用 @Override public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Channel ch = getChannel(); Transaction txn = ch.getTransaction(); Event event =null; txn.begin(); while(true){ event = ch.take(); if (event!=null) { break; } } try { String body = new String(event.getBody()); System.out.println("event.getBody()-----" + body); txn.commit(); return Status.READY; } catch (Throwable th) { txn.rollback(); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { txn.close(); } } @Override public void configure(Context arg0) { // TODO Auto-generated method stub System.out.println("configure-------" + arg0); } }
agent.sources = s1 agent.channels = c1 agent.sinks = sk1 agent.sources.s1.type = netcat agent.sources.s1.bind = localhost agent.sources.s1.port = 5678 agent.sources.s1.channels = c1 agent.sinks.sk1.type = me.MySink agent.sinks.sk1.hostname=192.168.16.33 agent.sinks.sk1.port=3306 agent.sinks.sk1.databaseName=test agent.sinks.sk1.tableName=user agent.sinks.sk1.user=root agent.sinks.sk1.password=WoChu@123 agent.sinks.sk1.column_name=id, username, password agent.sinks.sk1.field_separator=\\| agent.sinks.sk1.channel = c1 agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100
lihudeMacBook-Pro:~ SunAndLi$ telnet localhost 5678
flume自定义sink
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。