首页 > 代码库 > 【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决

【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决

技术分享

如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下【这里取出了parent】:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <groupId>org.apache.flume.flume-ng-sinks</groupId>
  <artifactId>flume-ng-kafka-sink</artifactId>
  <name>Flume Kafka Sink</name>
  <version>1.0.0</version>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-sdk</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-configuration</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.6.1</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.8.1.1</version>
    </dependency>
  </dependencies>

</project>
这里取出了parent,也取出了rat plugin,这样就避免了编译时出现的常见错误https://issues.apache.org/jira/browse/FLUME-1372

自定义sink实现需要继承AbstractSink和实现接口Configurable,并重写部分方法,如下:

package com.cmcc.chiwei.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;

public class CmccKafkaSink extends AbstractSink implements Configurable {

	private static final Logger log = LoggerFactory
			.getLogger(CmccKafkaSink.class);

	public static final String KEY_HDR = "key";
	public static final String TOPIC_HDR = "topic";
	private static final String CHARSET = "UTF-8";
	private Properties kafkaProps;
	private Producer<String, byte[]> producer;
	private String topic;
	private int batchSize;// 一次事务的event数量,整体提交
	private List<KeyedMessage<String, byte[]>> messageList;

	@Override
	public Status process() throws EventDeliveryException {
		// TODO Auto-generated method stub
		Status result = Status.READY;
		Channel channel = getChannel();
		Transaction transaction = null;
		Event event = null;
		String eventTopic = null;
		String eventKey = null;
		try {
			long processedEvent = 0;
			transaction = channel.getTransaction();
			transaction.begin();// 事务开始
			messageList.clear();
			for (; processedEvent < batchSize; processedEvent++) {
				event = channel.take();// 从channel取出一个事件
				if (event == null) {
					break;
				}
				// Event对象有头和体之分
				Map<String, String> headers = event.getHeaders();
				byte[] eventBody = event.getBody();
				if ((eventTopic = headers.get(TOPIC_HDR)) == null) {// 判断event头部中的topic是否为null
					eventTopic = topic;
				}
				eventKey = headers.get(KEY_HDR);

				if (log.isDebugEnabled()) {
					log.debug("{Event}" + eventTopic + ":" + eventKey + ":"
							+ new String(eventBody, CHARSET));
					log.debug("event #{}", processedEvent);
				}

				KeyedMessage<String, byte[]> data = http://www.mamicode.com/new KeyedMessage(>然后mvn clean install编译打包jar,将此jar包丢到flume安装目录的lib下就可以了,下面就是编辑conf文件了

当然conf文件中具体属性的key跟你自定义sink中的属性是一致的,自定义中读的key就是你配置文件中的key

如:

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=async
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=testToptic






















【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决