首页 > 代码库 > Kafka Consumer 启动测试类
Kafka Consumer 启动测试类
https://github.com/MarcoGhise/SpringKafka.git
1 package it.demo.kafka.springkafka.listener; 2 3 import org.springframework.beans.BeansException; 4 import org.springframework.context.ApplicationContext; 5 import org.springframework.context.ApplicationContextAware; 6 import org.springframework.integration.endpoint.EventDrivenConsumer; 7 import org.springframework.integration.endpoint.SourcePollingChannelAdapter; 8 import org.springframework.integration.kafka.support.ConsumerConfiguration; 9 import org.springframework.integration.kafka.support.KafkaConsumerContext; 10 11 import com.yammer.metrics.Metrics; 12 13 public class KafkaConsumerStarter implements ApplicationContextAware 14 { 15 private ApplicationContext appContext; 16 17 private SourcePollingChannelAdapter kafkaInboundChannelAdapter; 18 19 private KafkaConsumerContext kafkaConsumerContext; 20 21 public void initIt() throws Exception 22 { 23 kafkaInboundChannelAdapter = appContext.getBean("kafka-inbound-channel-adapter", SourcePollingChannelAdapter.class); 24 kafkaInboundChannelAdapter.start(); 25 26 kafkaConsumerContext = appContext.getBean("consumerContext", KafkaConsumerContext.class); 27 } 28 29 public void cleanUp() throws Exception 30 { 31 if (kafkaInboundChannelAdapter != null) 32 { 33 kafkaInboundChannelAdapter.stop(); 34 } 35 36 Thread.sleep(1000); 37 38 Metrics.defaultRegistry().shutdown(); 39 } 40 41 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 42 { 43 this.appContext = applicationContext; 44 } 45 46 }
Kafka Consumer 启动测试类
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。