首页 > 代码库 > kafka C客户端librdkafka producer源码分析
kafka C客户端librdkafka producer源码分析
简介
kafka网站上提供了C语言的客户端librdkafka,地址在这。
librdkafka是使用C语言根据apache kafka 协议实现的客户端。另外这个客户端还有简单的c++接口。客户端作者对这个客户端比较上心,经常会修改bug并提交新功能。
librdkafka的基本原理和我之前博客说的java版producer类似,一个线程向队列中加数据,另一个线程通过非阻塞的方式从队列中取出数据,并写入到broker。
源码分析
源码包含两个文件夹src和src-cpp
src是用c实现的源码,而src-cpp是在c接口上包装的一层c++类,实现了基本的功能。
代码运行流程如下
1、rd_kafka_conf_set设置全局配置
2、rd_kafka_topic_conf_set设置topic配置
3、rd_kafka_brokers_add设置broker地址,启动向broker发送消息的线程
4、rd_kafka_new启动kafka主线程
5、rd_kafka_topic_new建topic
6、rd_kafka_produce使用本函数发送消息
7、rd_kafka_poll调用回调函数
还是看发送一条消息的过程
入队列过程
调用rd_kafka_produce可以将消息写到队列
1 int rd_kafka_produce (...) {2 //调用rd_kafka_msg_new3 return rd_kafka_msg_new(...);4 }
首先先将消息包装成rd_kafka_msg_t类型,然后获取分区并相应的队列
1 int rd_kafka_msg_new (...) {2 ...3 //创建消息,将传入的参数转换为rkm4 rkm = rd_kafka_msg_new0(...);5 //分区并入队6 err = rd_kafka_msg_partitioner(rkt, rkm, 1);7 ...8 return -1;9 }
1 int rd_kafka_msg_partitioner (...) { 2 ... 3 //获取分区号 4 switch (rkt->rkt_state) 5 { 6 ... 7 } 8 //获取分区 9 rktp_new = rd_kafka_toppar_get(rkt, partition, 0);10 ...11 //加入队列12 rd_kafka_toppar_enq_msg(rktp_new, rkm);13 return 0;14 }
出队列过程
添加broker的过程中就启动了扫描队列的操作
1 static rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, 2 rd_kafka_confsource_t source, 3 const char *name, uint16_t port, 4 int32_t nodeid) { 5 ... 6 pthread_attr_init(&attr); 7 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 8 //启动向broker发送消息的主线程 9 if ((err = pthread_create(&rkb->rkb_thread, &attr,10 rd_kafka_broker_thread_main, rkb))) {11 ...12 return NULL;13 }14 //将broker加到broker队列中15 TAILQ_INSERT_TAIL(&rkb->rkb_rk->rk_brokers, rkb, rkb_link);16 (void)rd_atomic_add(&rkb->rkb_rk->rk_broker_cnt, 1);17 ...18 return rkb;19 }
启动rd_kafka_broker_thread_main主线程
1 static void *rd_kafka_broker_thread_main (void *arg) { 2 ... 3 while (!rkb->rkb_rk->rk_terminate) { 4 switch (rkb->rkb_state) 5 { 6 //如果broker连接未初始化,或中断,则不断重连broker 7 case RD_KAFKA_BROKER_STATE_INIT: 8 case RD_KAFKA_BROKER_STATE_DOWN: 9 if (rd_kafka_broker_connect(rkb) == -1) {10 ...11 }12 break;13 //如果broker连接已经建立,则调用serve函数14 case RD_KAFKA_BROKER_STATE_UP:15 if (rkb->rkb_nodeid == RD_KAFKA_NODEID_UA)16 rd_kafka_broker_ua_idle(rkb);17 else if (rk->rk_type == RD_KAFKA_PRODUCER)18 rd_kafka_broker_producer_serve(rkb);19 else if (rk->rk_type == RD_KAFKA_CONSUMER)20 rd_kafka_broker_consumer_serve(rkb);21 break;22 }23 }24 ...25 return NULL;26 }
只看producer的处理函数,该函数扫描消息并发送
1 static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) { 2 ... 3 while (!rkb->rkb_rk->rk_terminate && 4 rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) { 5 ... 6 do { 7 cnt = 0; 8 ... 9 //扫描所有的topic-partitions,并发送消息10 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {11 ...12 //将入队过程中的队列rktp_msgq加到rktp_xmit_msgq中13 if (rktp->rktp_msgq.rkmq_msg_cnt > 0)14 rd_kafka_msgq_concat(&rktp->15 rktp_xmit_msgq,16 &rktp->rktp_msgq);17 rd_kafka_toppar_unlock(rktp);18 //扫描消息队列中数据是否超时19 if (unlikely(do_timeout_scan))20 rd_kafka_msgq_age_scan(&rktp->21 rktp_xmit_msgq,22 &timedout,23 now);24 //队列为空则从头继续25 if (rktp->rktp_xmit_msgq.rkmq_msg_cnt == 0)26 continue;27 28 //如果没有超时,或者没达到处理消息数量的阈值,则从头继续,这样批处理可以提高性能29 if (rktp->rktp_ts_last_xmit +30 (rkb->rkb_rk->rk_conf.31 buffering_max_ms * 1000) > now &&32 rktp->rktp_xmit_msgq.rkmq_msg_cnt <33 rkb->rkb_rk->rk_conf.34 batch_num_messages) {35 /* Wait for more messages */36 continue;37 }38 39 rktp->rktp_ts_last_xmit = now;40 41 //按协议转换并填充数据到rkb中42 while (rktp->rktp_xmit_msgq.rkmq_msg_cnt > 0) {43 int r = rd_kafka_broker_produce_toppar(44 rkb, rktp);45 if (likely(r > 0))46 cnt += r;47 else48 break;49 }50 }51 52 } while (cnt);53 54 //触发数据发送情况的回调函数,将发送失败的写到一个操作结果队列中55 if (unlikely(isrfailed.rkmq_msg_cnt > 0))56 rd_kafka_dr_msgq(rkb->rkb_rk, &isrfailed,57 RD_KAFKA_RESP_ERR__ISR_INSUFF);58 59 if (unlikely(timedout.rkmq_msg_cnt > 0))60 rd_kafka_dr_msgq(rkb->rkb_rk, &timedout,61 RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);62 63 rd_kafka_broker_toppars_unlock(rkb);64 65 /* Check and move retry buffers */66 if (unlikely(rkb->rkb_retrybufs.rkbq_cnt) > 0)67 rd_kafka_broker_retry_bufs_move(rkb);68 69 rd_kafka_broker_unlock(rkb);70 71 //开始在网络上发送数据72 rd_kafka_broker_io_serve(rkb);73 74 /* Scan wait-response queue75 * Note: ‘now‘ may be a bit outdated by now. */76 if (do_timeout_scan)77 rd_kafka_broker_waitresp_timeout_scan(rkb, now);78 79 rd_kafka_broker_lock(rkb);80 }81 82 rd_kafka_broker_unlock(rkb);83 }
通过poll处理网络事件,将消息从网络发送到broker
1 static void rd_kafka_broker_io_serve (rd_kafka_broker_t *rkb) { 2 rd_kafka_op_t *rko; 3 rd_ts_t now = rd_clock(); 4 //处理broker操作 5 if (unlikely(rd_kafka_q_len(&rkb->rkb_ops) > 0)) 6 while ((rko = rd_kafka_q_pop(&rkb->rkb_ops, RD_POLL_NOWAIT))) 7 rd_kafka_broker_op_serve(rkb, rko); 8 //请求metadata 9 if (unlikely(now >= rkb->rkb_ts_metadata_poll))10 rd_kafka_broker_metadata_req(rkb, 1 /* all topics */, NULL,11 NULL, "periodic refresh");12 //如果有消息,手动增加写事件13 if (rkb->rkb_outbufs.rkbq_cnt > 0)14 rkb->rkb_pfd.events |= POLLOUT;15 else16 rkb->rkb_pfd.events &= ~POLLOUT;17 if (poll(&rkb->rkb_pfd, 1,18 rkb->rkb_rk->rk_conf.buffering_max_ms) <= 0)19 return;20 //poll函数,处理各种事件,发送消息时,只处理写事件,当请求metadata时,处理读事件21 if (rkb->rkb_pfd.revents & POLLIN)22 while (rd_kafka_recv(rkb) > 0)23 ;24 if (rkb->rkb_pfd.revents & POLLHUP)25 return rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__TRANSPORT,26 "Connection closed");27 if (rkb->rkb_pfd.revents & POLLOUT)28 while (rd_kafka_send(rkb) > 0)29 ;30 }
问题
librdkafka不像java客户端那样,可以通过future.get()实现同步发送。所以,如果broker不能连通的话,send方法还是可以正常将消息放入队列。这会导致两个问题
1、我们的客户端是不会知道broker已经挂掉了,因而不能对这种情况作出及时处理,导致消息全部堆积在内存中,如果此时不幸,我们的客户端也挂掉了,那这部分消息就全部丢失了。
2、如果broker一直没有恢复,而我们一直向队列中写数据的话,producer中有一个选项message.timeout.ms,如果超过了设定的消息超时时间,那么会有线程清理队列中的数据,导致消息丢失,而如果将时间设置为0(永不超时)的话,将导致客户端内存撑满。
上面这个问题可以通过如下方法实现的同步发送来解决
1 void dr_cb (...err, , void *msg_opaque) { 2 int *produce_statusp = (int *)msg_opaque; 3 4 /* set sync_produce()‘s produce_status value to the error code (which can be NO_ERROR) */ 5 *produce_statusp = err; 6 } 7 8 int sync_produce (rkt, msg..) { 9 int produce_status = -100000; /* or some other magic value that is not proper value in rd_kafka_resp_err_t */10 11 rd_kafka_produce(rkt, ..msg, .., &produce_status /* msg_opaque */);12 13 do {14 /* poll dr and error callbacks. */15 rd_kafka_poll(rk, 1000);16 /* wait for dr_cb to be called and setting produce_status to the error value. */17 } while (produce_status == -100000);18 19 if (produce_status == RD_KAFKA_RESP_ERR_NO_ERROR)20 return SUCCESS!;21 else22 return FAILURE;23 }