首页 > 代码库 > Kafka学习-Producer和Customer

Kafka学习-Producer和Customer

  在上一篇kafka入门的基础之上,本篇主要介绍Kafka的生产者和消费者。

Kafka 生产者  

  kafka Producer发布消息记录到Kakfa集群。生产者是线程安全的,在线程之间共享生产者实例。一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main方法里就能直接运行,

技术分享
public class ProducerDemo {
    private static final String KAFKA_TOPIC="kafka-topic";
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9092");
        configs.put("acks", "all");
        configs.put("retries", 0);
        configs.put("batch.size", 16384);
        configs.put("linger.ms", 1);
        configs.put("buffer.memory", 33554432);
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer=new KafkaProducer<String, String>(configs);
        ProducerRecord<String, String> record=null;
        
        for (int i = 0; i <10; i++) {
            record=new ProducerRecord<String, String>(KAFKA_TOPIC, "record-"+i);
            Future<RecordMetadata> future=producer.send(record);
            try {
                RecordMetadata recordMetadata=future.get();
                System.out.format("PARTITION: %d OFFSET: %d\n", recordMetadata.partition(),recordMetadata.offset());
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
}
View Code

 

  • 生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。
  • 生产者的send()方法是异步的,send()方法添加消息到缓冲区等待发送,并立即返回,这样并行发送多条消息而不阻塞去等待每一条消息的响应。为了减少请求的数量,生产者将单个的消息批量在一起发送来提高效率。
  • acks是判断消息是否成功发送的条件,将acks指定了"all"将会阻塞消息,当所有的副本都返回后才表明该消息发送成功,这种设置性能最低,但是是最可靠的。
  • retries表示重试的次数,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
  • batch.size指定了缓冲区的大小,kafka的Producer会缓存每个分区未发送消息。
  • linger.ms指示生产者发送请求前等待一段时间,等待等多的消息来填满批中。默认缓冲可立即发送,即遍缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.time大于0。如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使linger.time=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
  • buffer.memory:控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。
  • key.serializer和value.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节。

Producer的Send()

  kafka的producer的send()方法提供多种重载:

技术分享

  send是异步的,一旦消息被保存在等待发送的消息缓存中,此方法就立即返回,这样可以你并行发送多条消息而不阻塞去等待每一条消息的响应。发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间,如果topic使用的是LogAppendTime,时间戳是broker的本地时间。由于send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。如果要模拟一个简单的阻塞调用,你可以调用get()方法。

技术分享
 byte[] key = "key".getBytes();
 byte[] value = http://www.mamicode.com/"value".getBytes();
 ProducerRecord<byte[],byte[]> record = new 
 ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();
View Code

 

  完全无阻塞的话,可以利用参数提供的回调函数处理请求完成时的回调通知。

技术分享
record=new ProducerRecord<String, String>(KAFKA_TOPIC, "CallbackRecord-"+i);
            producer.send(record,new Callback() {  
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.format("PARTITION: %d OFFSET: %d\n", metadata.partition(),metadata.offset());
                }
            });
View Code

 

  发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 之前:

技术分享
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
View Code

  注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果需要执行阻塞或耗时的回调,建议在callback主体中使用自己的Executor来并行处理。

 

 

Kafka学习-Producer和Customer