首页 > 代码库 > Kafka Consumer(Python threading)
Kafka Consumer(Python threading)
import threading
from kafka import KafkaConsumer
threads = []
class MyThread(threading.Thread):
def __init__(self, threadName, keyName):
threading.Thread.__init__(self)
self.threadName=threadName
self.keyName=keyName
def run(self):
receiveinfo(self.threadName, self.keyName)
def receiveinfo(threadName, keyName):
consumer = KafkaConsumer(‘test‘, bootstrap_servers=‘192.168.1.10:9092‘)
for msg in consumer:
if msg.key==keyName:
if msg.valuehttp://www.mamicode.com/=="exit": break
print("("+threadName+")"+ " " + msg.value)
try:
t1=MyThread("T1","Thread-1")
threads.append(t1)
t2=MyThread("T2","Thread-2")
threads.append(t2)
t3=MyThread("T3","Thread-3")
threads.append(t3)
for t in threads:
t.start()
for t in threads:
t.join()
print ("exit program with 0")
except:
print ("Error: failed to run consumer program")
Kafka Consumer(Python threading)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。