首页 > 代码库 > kafka spring 实例

kafka spring 实例

使用定时器发送后  结果如下

 


技术分享
 

 

 

kafka 代码下载 

Java代码  技术分享

  1. 15.安装kafka  

  2. cd /usr/local/  

  3. wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz  

  4. tar xf kafka_2.10-0.10.0.0.tgz  

  5. ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka  

  6. chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka  

  7. chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka  

  8.   

  9. /usr/local/zookeeper/bin/zkCli.sh  

  10. create /kafka ‘‘  

  11.   

  12. vim /usr/local/kafka/config/server.properties  

  13. broker.id=0  

  14. zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka  

  15.   

  16. scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/  

  17. scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/  

  18.   

  19. scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties  

  20. scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties  

  21.   

  22. master slave 启动  

  23. /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &  

  24. 创建topic  

  25. /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic  

  26. /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic  

  27.   

  28.   

  29. /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic  

  30.   

  31. /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic  

 

 

安装完成 后测试 下载 

productor
技术分享
 consumer
技术分享
 

 

spring 接受信息
技术分享
 代码部分

applicationContext-kafka-productor.xml

Java代码 下载  技术分享

  1. <?xml version="1.0" encoding="UTF-8"?>  

  2. <beans xmlns="http://www.springframework.org/schema/beans"  

  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"  

  4.     xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"  

  5.     xmlns:task="http://www.springframework.org/schema/task"  

  6.     xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka   

  7.                         http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd  

  8.                         http://www.springframework.org/schema/integration   

  9.                         http://www.springframework.org/schema/integration/spring-integration.xsd  

  10.                         http://www.springframework.org/schema/beans   

  11.                         http://www.springframework.org/schema/beans/spring-beans.xsd  

  12.                         http://www.springframework.org/schema/task   

  13.                         http://www.springframework.org/schema/task/spring-task.xsd">  

  14.   

  15.     <!-- commons config -->  

  16.     <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />  

  17.     <bean id="kafkaEncoder"  class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">  

  18.         <constructor-arg value=http://www.mamicode.com/"java.lang.String" />  

  19.     </bean>  

  20.     <bean id="producerProperties"  

  21.         class="org.springframework.beans.factory.config.PropertiesFactoryBean">  

  22.         <property name="properties">  

  23.             <props>  

  24.                 <prop key="topic.metadata.refresh.interval.ms">3600000</prop>  

  25.                 <prop key="message.send.max.retries">5</prop>  

  26.                 <prop key="serializer.class">kafka.serializer.StringEncoder</prop>  

  27.                 <prop key="request.required.acks">1</prop>  

  28.             </props>  

  29.         </property>  

  30.     </bean>  

  31.   

  32.     <!-- topic test config -->  

  33.   

  34.     <int:channel id="pChannel">  

  35.         <int:queue />  

  36.     </int:channel>  

  37.   

  38.     <int-kafka:outbound-channel-adapter  

  39.         id="kafkaOutboundChannelAdapterProductor"   

  40.         kafka-producer-context-ref="producerContext"    

  41.         auto-startup="true"   

  42.         channel="pChannel"   

  43.         order="3">  

  44.         <int:poller fixed-delay="1000" time-unit="MILLISECONDS"  receive-timeout="1" task-executor="taskProductorExecutor" />  

  45.     </int-kafka:outbound-channel-adapter>  

  46.       

  47.     <task:executor id="taskProductorExecutor" pool-size="5" keep-alive="120" queue-capacity="500" />  

  48.      

  49.     <int-kafka:producer-context id="producerContext" producer-properties="producerProperties">  

  50.         <int-kafka:producer-configurations>   

  51.             <int-kafka:producer-configuration  

  52.                 broker-list="172.23.27.120:9092,172.23.27.115:9092,172.23.27.116:9092"   

  53.                 key-serializer="stringSerializer"  

  54.                 value-class-type="java.lang.String"   

  55.                 value-serializer="stringSerializer"  

  56.                 topic="baoy-topic" />  

  57.         </int-kafka:producer-configurations>  

  58.     </int-kafka:producer-context>  

  59. </beans>  

 

applicationContext-kafka-consumer.xml

Java代码   

  1. <?xml version="1.0" encoding="UTF-8"?>  

  2. <beans xmlns="http://www.springframework.org/schema/beans"  

  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"  

  4.     xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"  

  5.     xmlns:task="http://www.springframework.org/schema/task"  

  6.     xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka   

  7.                         http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd  

  8.                         http://www.springframework.org/schema/integration   

  9.                         http://www.springframework.org/schema/integration/spring-integration.xsd  

  10.                         http://www.springframework.org/schema/beans   

  11.                         http://www.springframework.org/schema/beans/spring-beans.xsd  

  12.                         http://www.springframework.org/schema/task   

  13.                         http://www.springframework.org/schema/task/spring-task.xsd">  

  14.   

  15.     <!-- topic test conf -->  

  16.     <int:channel id="cChannel">  

  17.         <int:dispatcher task-executor="kafkaMessageExecutor" />  

  18.     </int:channel>  

  19.     <!-- zookeeper配置 可以配置多个 -->  

  20.     <int-kafka:zookeeper-connect id="zookeeperConnect"  

  21.         zk-connect="172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka" zk-connection-timeout="6000"  

  22.         zk-session-timeout="6000" zk-sync-time="2000" />  

  23.     <!-- channel配置 auto-startup="true" 否则接收不发数据 -->  

  24.     <int-kafka:inbound-channel-adapter  

  25.         id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"  

  26.         auto-startup="true" channel="cChannel">  

  27.         <int:poller fixed-delay="1" time-unit="MILLISECONDS" />  

  28.     </int-kafka:inbound-channel-adapter>  

  29.     <task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500" />  

  30.     <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" />  

  31.   

  32.     <bean id="consumerProperties"  

  33.         class="org.springframework.beans.factory.config.PropertiesFactoryBean">  

  34.         <property name="properties">  

  35.             <props>  

  36.                 <prop key="auto.offset.reset">smallest</prop>  

  37.                 <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->  

  38.                 <prop key="fetch.message.max.bytes">5242880</prop>  

  39.                 <prop key="auto.commit.interval.ms">1000</prop>  

  40.             </props>  

  41.         </property>  

  42.     </bean>  

  43.     <!-- 消息接收的BEEN -->  

  44.     <bean id="kafkaConsumerService" class="com.curiousby.baoy.cn.kafka.KafkaConsumerService" />  

  45.     <!-- 指定接收的方法 -->  

  46.     <int:outbound-channel-adapter channel="cChannel"  ref="kafkaConsumerService" method="process" />  

  47.   

  48.     <int-kafka:consumer-context id="consumerContext"  

  49.         consumer-timeout="1000" zookeeper-connect="zookeeperConnect"  

  50.         consumer-properties="consumerProperties">  

  51.         <int-kafka:consumer-configurations>  

  52.             <int-kafka:consumer-configuration  

  53.                 group-id="default" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"  

  54.                 max-messages="5000">   

  55.                 <int-kafka:topic id="baoy-topic" streams="5" />   

  56.             </int-kafka:consumer-configuration>  

  57.         </int-kafka:consumer-configurations>  

  58.     </int-kafka:consumer-context>  

  59. </beans>  

 

KafkaConsumerService

Java代码 下载  技术分享

  1. @Service  

  2. public class KafkaConsumerService {  

  3.   

  4.       

  5.     public void process(Map<String, Map<Integer, String>> msgs) {  

  6.         for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {  

  7.             System.out.println("======================================Consumer Message received: ");  

  8.             System.out.println("=====================================Suchit Topic:" + entry.getKey());  

  9.             for (String msg : entry.getValue().values()) {  

  10.                 System.out.println("================================Suchit Consumed Message: " + msg);  

  11.             }  

  12.         }  

  13.     }  

  14.   

  15. }  

 

KafkaProductorService

Java代码  技术分享

  1. @Service  

  2. ublic class KafkaProductorService {  

  3.   

  4.      

  5.    @Autowired  

  6.    @Qualifier("pChannel")  

  7.    private MessageChannel messageChannel;  

  8.   

  9.       

  10.    public void sendInfo(String topic, Object obj) {  

  11.        System.out.println("---Service:KafkaService------sendInfo------");   

  12.        messageChannel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC,topic).build());  

  13.    }  

  14.      

 

pom

 

Java代码 下载  技术分享

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

  2.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">  

  3.   <modelVersion>4.0.0</modelVersion>  

  4.   <groupId>com.curiousby.baoyou.cn</groupId>  

  5.   <artifactId>SpringKafkaDEMO</artifactId>  

  6.   <packaging>war</packaging>  

  7.   <version>0.0.1-SNAPSHOT</version>  

  8.   <name>SpringKafkaDEMO Maven Webapp</name>  

  9.   <url>http://maven.apache.org</url>  

  10.     

  11.   

  12.     <!-- properties constant -->  

  13.     <properties>   

  14.         <spring.version>4.2.5.RELEASE</spring.version>  

  15.     </properties>  

  16.   

  17.     <dependencies>  

  18.         <!-- junit4 -->  

  19.         <dependency>  

  20.             <groupId>junit</groupId>  

  21.             <artifactId>junit</artifactId>  

  22.             <version>4.7</version>  

  23.             <type>jar</type>  

  24.             <scope>test</scope>  

  25.         </dependency>  

  26.         <dependency>  

  27.             <groupId>org.dbunit</groupId>  

  28.             <artifactId>dbunit</artifactId>  

  29.             <version>2.4.9</version>  

  30.             <scope>test</scope>  

  31.         </dependency>  

  32.         <dependency>  

  33.             <groupId>com.github.springtestdbunit</groupId>  

  34.             <artifactId>spring-test-dbunit</artifactId>  

  35.             <version>1.1.0</version>  

  36.             <scope>test</scope>  

  37.         </dependency>  

  38.          <dependency>  

  39.             <groupId>org.springframework</groupId>  

  40.             <artifactId>spring-test</artifactId>  

  41.             <version>${spring.version}</version>  

  42.             <scope>test</scope>  

  43.         </dependency>  

  44.            

  45.            

  46.         <dependency>  

  47.             <groupId>javax.servlet</groupId>  

  48.             <artifactId>javax.servlet-api</artifactId>  

  49.             <version>3.1.0</version>  

  50.             <scope>provided</scope>  

  51.         </dependency>  

  52.         <dependency>  

  53.             <groupId>org.aspectj</groupId>  

  54.             <artifactId>aspectjrt</artifactId>  

  55.             <version>1.7.2</version>  

  56.         </dependency>  

  57.         <dependency>  

  58.             <groupId>org.aspectj</groupId>  

  59.             <artifactId>aspectjweaver</artifactId>  

  60.             <version>1.7.2</version>  

  61.         </dependency>  

  62.         <dependency>  

  63.             <groupId>org.springframework</groupId>  

  64.             <artifactId>spring-aspects</artifactId>  

  65.             <version>${spring.version}</version>  

  66.             <type>jar</type>  

  67.         </dependency>  

  68.         <dependency>  

  69.             <groupId>org.springframework</groupId>  

  70.             <artifactId>spring-core</artifactId>  

  71.             <version>${spring.version}</version>  

  72.         </dependency>  

  73.         <dependency>  

  74.             <groupId>org.springframework</groupId>  

  75.             <artifactId>spring-web</artifactId>  

  76.             <version>${spring.version}</version>  

  77.         </dependency>  

  78.         <dependency>  

  79.             <groupId>org.springframework</groupId>  

  80.             <artifactId>spring-webmvc</artifactId>  

  81.             <version>${spring.version}</version>  

  82.         </dependency>  

  83.         <dependency>  

  84.             <groupId>org.springframework.integration</groupId>  

  85.             <artifactId>spring-integration-kafka</artifactId>  

  86.             <version>1.3.0.RELEASE</version>  

  87.         </dependency>  

  88.         <dependency>  

  89.             <groupId>commons-logging</groupId>  

  90.             <artifactId>commons-logging</artifactId>  

  91.             <version>1.1.1</version>  

  92.         </dependency>  

  93.       <dependency>  

  94.             <groupId>org.slf4j</groupId>  

  95.             <artifactId>slf4j-api</artifactId>  

  96.             <version>1.6.4</version>  

  97.             <type>jar</type>  

  98.         </dependency>  

  99.         <dependency>  

  100.             <groupId>org.slf4j</groupId>  

  101.             <artifactId>slf4j-log4j12</artifactId>  

  102.             <version>1.6.4</version>  

  103.             <type>jar</type>  

  104.         </dependency>  下载 

  105.         <dependency>  

  106.             <groupId>javax</groupId>  

  107.             <artifactId>javaee-api</artifactId>  

  108.             <version>7.0</version>  

  109.         </dependency>  

  110.        <dependency>  

  111.             <groupId>com.fasterxml.jackson.core</groupId>  

  112.             <artifactId>jackson-core</artifactId>  

  113.             <version>2.7.6</version>  

  114.         </dependency>  

  115.         <dependency>  

  116.             <groupId>com.fasterxml.jackson.core</groupId>  

  117.             <artifactId>jackson-databind</artifactId>  

  118.             <version>2.7.6</version>  

  119.         </dependency>  

  120.         <dependency>  

  121.             <groupId>com.fasterxml.jackson.core</groupId>  

  122.             <artifactId>jackson-annotations</artifactId>  

  123.             <version>2.7.6</version>  

  124.         </dependency>  

  125.        

  126.         <dependency>  

  127.             <groupId>org.apache.avro</groupId>  

  128.             <artifactId>avro</artifactId>  

  129.             <version>1.7.7</version>  

  130.         </dependency>  

  131.   

  132.     </dependencies>  

  133.     <build>  

  134.         <finalName>SpringKafkaDEMO</finalName>  

  135.         <plugins>  

  136.             <plugin>  

  137.                 <groupId>org.apache.maven.plugins</groupId>  

  138.                 <artifactId>maven-compiler-plugin</artifactId>  

  139.                 <version>3.3</version>  

  140.                 <dependencies>  

  141.                     <dependency>  

  142.                         <groupId>org.codehaus.plexus</groupId>  

  143.                         <artifactId>plexus-compiler-javac</artifactId>  

  144.                         <version>2.5</version>  

  145.                     </dependency>  

  146.                 </dependencies>  

  147.                 <configuration>  

  148.                     <source>1.7</source>  

  149.                     <target>1.7</target>  

  150.                     <encoding>UTF-8</encoding>  

  151.                     <compilerArguments>  

  152.                         <verbose />  

  153.                         <bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath>  

  154.                     </compilerArguments>  

  155.                 </configuration>  

  156.             </plugin>  

  157.         </plugins>  

  158.     </build>  

  159. </project>  

 

 遇到的问题:下载地址

1. spring 中 日志 中的 logback  必须 保持一致   ,这里我使用 org.slf4j 1.6.4 

Java代码  技术分享

  1. <groupId>org.slf4j</groupId>  

  2.             <artifactId>slf4j-api</artifactId>  

  3.             <version>1.6.4</version>  

  4.             <type>jar</type>  

  5.         </dependency>  

  6.         <dependency>  

  7.             <groupId>org.slf4j</groupId>  

  8.             <artifactId>slf4j-log4j12</artifactId>  

  9.             <version>1.6.4</version>  

  10.             <type>jar</type>  

  11.         </dependency>  


kafka spring 实例