首页 > 代码库 > kafka-3python生产者和消费者实用demo
kafka-3python生产者和消费者实用demo
程序分为productor.py是发送消息端,consumer为消费消息端,
启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,
productor.py
#!/usr/bin/env python2.7 #_*_coding: utf-8 _*_ from kafka import KafkaProducer kafka_host = ‘192.168.1.200‘ # kafka服务器地址 kafka_port = 9092 # kafka服务器的端口 producer = KafkaProducer(bootstrap_servers=[‘{kafka_host}:{kafka_port}‘.format( kafka_host = kafka_host, kafka_port = kafka_port )]) #简单for循环10次,发送10条消息 for i in range(1,10): message_string = ‘some message‘.format(i) #调用send方法,发送名字为‘topic1‘的topicid ,发送的消息为message_string response = producer.send(‘topic1‘, message_string.encode(‘utf-8‘)) print response
consumer.py
#!/usr/bin/env python #_*_coding: utf-8 _*_ import json from kafka import * kafka_host = ‘192.168.1.200‘ # kafka服务器地址 kafka_port = 9092 # kafka服务器端口 #消费topic1的topic,并指定group_id(自定义),多个机器或进程想顺序消费,可以指定同一个group_id, # 如果想一条消费多次消费,可以换一个group_id,会从头开始消费 consumer = KafkaConsumer( ‘topic1‘, group_id = ‘my-group‘, bootstrap_servers = [‘{kafka_host}:{kafka_port}‘.format(kafka_host=kafka_host, kafka_port=kafka_port)] ) for message in consumer: #json读取kafka的消息 content = json.loads(message.value) print content
本文出自 “马鹏飞——著” 博客,请务必保留此出处http://mapengfei.blog.51cto.com/1552412/1926068
kafka-3python生产者和消费者实用demo
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。