首页 > 代码库 > Kafka学习-Producer和Customer
Kafka学习-Producer和Customer
在上一篇kafka入门的基础之上,本篇主要介绍Kafka的生产者和消费者。
Kafka 生产者
kafka Producer发布消息记录到Kakfa集群。生产者是线程安全的,在线程之间共享生产者实例。一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main
方法里就能直接运行,
public class ProducerDemo { private static final String KAFKA_TOPIC="kafka-topic"; public static void main(String[] args) { Map<String, Object> configs = new HashMap<String, Object>(); configs.put("bootstrap.servers", "localhost:9092"); configs.put("acks", "all"); configs.put("retries", 0); configs.put("batch.size", 16384); configs.put("linger.ms", 1); configs.put("buffer.memory", 33554432); configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer=new KafkaProducer<String, String>(configs); ProducerRecord<String, String> record=null; for (int i = 0; i <10; i++) { record=new ProducerRecord<String, String>(KAFKA_TOPIC, "record-"+i); Future<RecordMetadata> future=producer.send(record); try { RecordMetadata recordMetadata=future.get(); System.out.format("PARTITION: %d OFFSET: %d\n", recordMetadata.partition(),recordMetadata.offset()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } producer.close(); } }
- 生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。
- 生产者的send()方法是异步的,send()方法添加消息到缓冲区等待发送,并立即返回,这样并行发送多条消息而不阻塞去等待每一条消息的响应。为了减少请求的数量,生产者将单个的消息批量在一起发送来提高效率。
- acks是判断消息是否成功发送的条件,将acks指定了"all"将会阻塞消息,当所有的副本都返回后才表明该消息发送成功,这种设置性能最低,但是是最可靠的。
- retries表示重试的次数,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
- batch.size指定了缓冲区的大小,kafka的Producer会缓存每个分区未发送消息。
- linger.ms指示生产者发送请求前等待一段时间,等待等多的消息来填满批中。默认缓冲可立即发送,即遍缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.time大于0。如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使linger.time=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
- buffer.memory:控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。
- key.serializer和value.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节。
Producer的Send()
kafka的producer的send()方法提供多种重载:
send是异步的,一旦消息被保存在等待发送的消息缓存中,此方法就立即返回,这样可以你并行发送多条消息而不阻塞去等待每一条消息的响应。发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间,如果topic使用的是LogAppendTime,时间戳是broker的本地时间。由于send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。如果要模拟一个简单的阻塞调用,你可以调用get()方法。
byte[] key = "key".getBytes(); byte[] value = http://www.mamicode.com/"value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) producer.send(record).get();
完全无阻塞的话,可以利用参数提供的回调函数处理请求完成时的回调通知。
record=new ProducerRecord<String, String>(KAFKA_TOPIC, "CallbackRecord-"+i); producer.send(record,new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.format("PARTITION: %d OFFSET: %d\n", metadata.partition(),metadata.offset()); } });
发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 之前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果需要执行阻塞或耗时的回调,建议在callback主体中使用自己的Executor来并行处理。
Kafka学习-Producer和Customer