首页 > 代码库 > kafka 自定义分区器

kafka 自定义分区器

package cn.xiaojf.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义分区方式
 */
public class CustomPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public CustomPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    /**
     * 自定义分区规则
     * @param topic
     * @param key
     * @param keyBytes
     * @param value
     * @param valueBytes
     * @param cluster
     * @return
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if(keyBytes == null) {
            int nextValue = http://www.mamicode.com/this.nextValue(topic);
            List availablePartitions = cluster.availablePartitionsForTopic(topic);
            if(availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if(null == counter) {
            counter = new AtomicInteger((new Random()).nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if(currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    public void close() {
    }
}
package cn.xiaojf.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 消息生产者
 * @author xiaojf 2017/3/22 14:27
 */
public class MsgProducer extends Thread {

    private final KafkaProducer<String, String> producer;
    private final String topic;
    private final boolean isAsync;

    public MsgProducer(String topic, boolean isAsync) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.59.130:9092");//broker 集群地址
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "MsgProducer");//自定义客户端id
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//key 序列号方式
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//value 序列号方式
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数

//        properties.load("properties配置文件");

        this.producer = new KafkaProducer<String, String>(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    @Override
    public void run() {
        int msgNo = 0;

        while (true) {
            String msg = "Msg: " + msgNo;
            String key = msgNo + "";
            if (isAsync) {//异步
                producer.send(new ProducerRecord<String, String>(this.topic,msg));
//                producer.send(new ProducerRecord<String, String>(this.topic, key, msg));
            } else {//同步
                producer.send(new ProducerRecord<String, String>(this.topic, key, msg),
                        new MsgProducerCallback(System.currentTimeMillis(), key, msg));
            }
        }
    }

    /**
     * 消息发送后的回调函数
     */
    class MsgProducerCallback implements Callback {

        private final long startTime;
        private final String key;
        private final String msg;

        public MsgProducerCallback(long startTime, String key, String msg) {
            this.startTime = startTime;
            this.key = key;
            this.msg = msg;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (recordMetadata != null) {
                System.out.println(msg + " be sended to partition no : " + recordMetadata.partition());
            }
        }
    }

    public static void main(String args[]) {
        new MsgProducer("my-replicated-topic",true).start();//开始发送消息
    }
}

 

kafka 自定义分区器