首页 > 代码库 > ZeroMQ-Poller

ZeroMQ-Poller

In this program, we will create a command server that tells when the worker should exit. Workers subscribes to a topic published by a publisher and prints it. It exits when it receives “Exit” message from the command server.

# zmqpolling.py

import zmq
import time
import sys
import random
from  multiprocessing import Process

# PUSH server that sends command to workers to continue working or exit.
def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1)

# Publisher that publishes for topics “8”,”9”,”10” in random order.
def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1)   

# Worker that works on messages received for topic “9”. 
# We setup zmq poller to poll for messages on the socket connection to both command server and publisher.
def client(port_push, port_sub):
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://localhost:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://localhost:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)

    # We poll the sockets to check if we have messages to recv and work on it. Worker continues working until it receives exit condition.
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll())
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

# Finally, we fire up all the processes.
if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    Process(target=client, args=(server_push_port,server_pub_port,)).start()
# result:

(D:\anaconda) C:\Users\admin\Desktop\opt>python zmqpolling.py
Running server on port:  5556
Running server on port:  5558
8 server#8364
Connected to server with port 5556
Connected to publisher with port 5558
Recieved control command: Continue
9 server#8364
Processing ...  9 server#8364
Recieved control command: Continue
8 server#8364
Recieved control command: Continue
8 server#8364
Recieved control command: Continue
9 server#8364
Processing ...  9 server#8364
Recieved control command: Continue
8 server#8364
Recieved control command: Continue
9 server#8364
Processing ...  9 server#8364
Recieved control command: Exit
Recieved exit command, client will stop recieving messages
8 server#8364
9 server#8364
9 server#8364


ZeroMQ-Poller