首页 > 代码库 > 【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决
【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决
如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下【这里取出了parent】:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-kafka-sink</artifactId> <name>Flume Kafka Sink</name> <version>1.0.0</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> </dependency> </dependencies> </project>这里取出了parent,也取出了rat plugin,这样就避免了编译时出现的常见错误https://issues.apache.org/jira/browse/FLUME-1372
自定义sink实现需要继承AbstractSink和实现接口Configurable,并重写部分方法,如下:
package com.cmcc.chiwei.kafka; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; public class CmccKafkaSink extends AbstractSink implements Configurable { private static final Logger log = LoggerFactory .getLogger(CmccKafkaSink.class); public static final String KEY_HDR = "key"; public static final String TOPIC_HDR = "topic"; private static final String CHARSET = "UTF-8"; private Properties kafkaProps; private Producer<String, byte[]> producer; private String topic; private int batchSize;// 一次事务的event数量,整体提交 private List<KeyedMessage<String, byte[]>> messageList; @Override public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = null; Event event = null; String eventTopic = null; String eventKey = null; try { long processedEvent = 0; transaction = channel.getTransaction(); transaction.begin();// 事务开始 messageList.clear(); for (; processedEvent < batchSize; processedEvent++) { event = channel.take();// 从channel取出一个事件 if (event == null) { break; } // Event对象有头和体之分 Map<String, String> headers = event.getHeaders(); byte[] eventBody = event.getBody(); if ((eventTopic = headers.get(TOPIC_HDR)) == null) {// 判断event头部中的topic是否为null eventTopic = topic; } eventKey = headers.get(KEY_HDR); if (log.isDebugEnabled()) { log.debug("{Event}" + eventTopic + ":" + eventKey + ":" + new String(eventBody, CHARSET)); log.debug("event #{}", processedEvent); } KeyedMessage<String, byte[]> data = http://www.mamicode.com/new KeyedMessage(>然后mvn clean install编译打包jar,将此jar包丢到flume安装目录的lib下就可以了,下面就是编辑conf文件了 当然conf文件中具体属性的key跟你自定义sink中的属性是一致的,自定义中读的key就是你配置文件中的key
如:
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=async producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=testToptic
【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。