首页 > 代码库 > ZeroMQ-Publish/Subscribe

ZeroMQ-Publish/Subscribe

Publish/Subscribe is another classic pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers. Messages are published without the knowledge of what or if any subscriber of that knowledge exists.

Scenario #2 is more known, general pattern where multiple subscribers subscribes to messages/topics being published by a publisher. It is scenario #1 which is more interesting. Just like ZMQ.REQ which can connect to multiple ZMQ.REP, ZMQ.SUB can connect to multiple ZMQ.PUB (publishers). No single publisher overwhelms the subscriber. The messages from both publishers are interleaved.

#  pub_server.py

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

context = zmq.Context()

#  Publishers are created with ZMQ.PUB socket types
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

#  Data is published along with a topic. The subscribers usually sets a filter on these topics for topic of their interests.
while True:
    topic = random.randrange(9999,10005)
    messagedata = random.randrange(1,215) - 80
    print "%d %d" % (topic, messagedata)
    socket.send("%d %d" % (topic, messagedata))
    time.sleep(1)
#  sub_client.py
#  Subscribers are created with ZMQ.SUB socket types. You should notice that a zmq subscriber can connect to many publishers.

import sys
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)
    
if len(sys.argv) > 2:
    port1 =  sys.argv[2]
    int(port1)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

if len(sys.argv) > 2:
    socket.connect ("tcp://localhost:%s" % port1)
    
# The current version of zmq supports filtering of messages based on topics at subscriber side. This is usually set via socketoption.
# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
for update_nbr in range (5):
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print topic, messagedata

print "Average messagedata value for topic ‘%s‘ was %dF" % (topicfilter, total_value / update_nbr)
# running it:
python pub_server.py 5556
python pub_server.py 5546
python sub_client.py 5556 5546

# server-1:

(D:\anaconda) C:\Users\admin\Desktop\opt>python pub_server.py 5566
9999 49
9999 76
10001 -62
10004 28
10003 -65
10004 69
9999 -17
9999 -49
10000 21
10003 -59
10004 -12
9999 131
9999 41
9999 16
10000 101
10002 -14
10001 -43
10004 -70
10004 33

# server-2
(D:\anaconda) C:\Users\admin\Desktop\opt>python pub_server.py 5576
10001 26
10003 22
9999 -43
10001 -21
10000 33
10003 -48
10001 95
10001 36
9999 -6
10002 91

# client
(D:\anaconda) C:\Users\admin\Desktop\opt>python sub_client.py 5566 5576
Collecting updates from weather server...
10001 -62
10001 -21
10001 95
10001 36
10001 -43
Average messagedata value for topic ‘10001‘ was 1F


ZeroMQ-Publish/Subscribe