首页 > 代码库 > ZeroMQ-Publish/Subscribe
ZeroMQ-Publish/Subscribe
Publish/Subscribe is another classic pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers. Messages are published without the knowledge of what or if any subscriber of that knowledge exists.
Scenario #2 is more known, general pattern where multiple subscribers subscribes to messages/topics being published by a publisher. It is scenario #1 which is more interesting. Just like ZMQ.REQ which can connect to multiple ZMQ.REP, ZMQ.SUB can connect to multiple ZMQ.PUB (publishers). No single publisher overwhelms the subscriber. The messages from both publishers are interleaved.
# pub_server.py import zmq import random import sys import time port = "5556" if len(sys.argv) > 1: port = sys.argv[1] int(port) context = zmq.Context() # Publishers are created with ZMQ.PUB socket types socket = context.socket(zmq.PUB) socket.bind("tcp://*:%s" % port) # Data is published along with a topic. The subscribers usually sets a filter on these topics for topic of their interests. while True: topic = random.randrange(9999,10005) messagedata = random.randrange(1,215) - 80 print "%d %d" % (topic, messagedata) socket.send("%d %d" % (topic, messagedata)) time.sleep(1)
# sub_client.py # Subscribers are created with ZMQ.SUB socket types. You should notice that a zmq subscriber can connect to many publishers. import sys import zmq port = "5556" if len(sys.argv) > 1: port = sys.argv[1] int(port) if len(sys.argv) > 2: port1 = sys.argv[2] int(port1) # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) print "Collecting updates from weather server..." socket.connect ("tcp://localhost:%s" % port) if len(sys.argv) > 2: socket.connect ("tcp://localhost:%s" % port1) # The current version of zmq supports filtering of messages based on topics at subscriber side. This is usually set via socketoption. # Subscribe to zipcode, default is NYC, 10001 topicfilter = "10001" socket.setsockopt(zmq.SUBSCRIBE, topicfilter) # Process 5 updates total_value = 0 for update_nbr in range (5): string = socket.recv() topic, messagedata = string.split() total_value += int(messagedata) print topic, messagedata print "Average messagedata value for topic ‘%s‘ was %dF" % (topicfilter, total_value / update_nbr)
# running it: python pub_server.py 5556 python pub_server.py 5546 python sub_client.py 5556 5546 # server-1: (D:\anaconda) C:\Users\admin\Desktop\opt>python pub_server.py 5566 9999 49 9999 76 10001 -62 10004 28 10003 -65 10004 69 9999 -17 9999 -49 10000 21 10003 -59 10004 -12 9999 131 9999 41 9999 16 10000 101 10002 -14 10001 -43 10004 -70 10004 33 # server-2 (D:\anaconda) C:\Users\admin\Desktop\opt>python pub_server.py 5576 10001 26 10003 22 9999 -43 10001 -21 10000 33 10003 -48 10001 95 10001 36 9999 -6 10002 91 # client (D:\anaconda) C:\Users\admin\Desktop\opt>python sub_client.py 5566 5576 Collecting updates from weather server... 10001 -62 10001 -21 10001 95 10001 36 10001 -43 Average messagedata value for topic ‘10001‘ was 1F
ZeroMQ-Publish/Subscribe
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。