首页 > 代码库 > jeroMq示例之 [req-broker-rep]

jeroMq示例之 [req-broker-rep]

[req-broker-rep]模式的好处是对于多个客户端的request,broker会将多个任务挨个分给workers,这样多个客户端request可以并发的发送到broker。 worker并发的执行运算。中间的broker随时把worker结果回传给对应的client。
 
 

broker程序:

package guide;import java.nio.charset.Charset;import java.text.SimpleDateFormat;import java.util.Calendar;import org.zeromq.ZMQ;import org.zeromq.ZMQ.Context;import org.zeromq.ZMQ.Poller;import org.zeromq.ZMQ.Socket;/*** Simple request-reply broker**/public class rrbroker{    public static void main (String[] args) {        //  Prepare our context and sockets        Context context = ZMQ.context(1);        Socket frontend = context.socket(ZMQ.ROUTER);        Socket backend  = context.socket(ZMQ.DEALER);        frontend.bind("tcp://*:5559");        backend.bind("tcp://*:5560");        System.out.println("launch and connect broker.");        //  Initialize poll set        Poller items = new Poller (2);        items.register(frontend, Poller.POLLIN);        items.register(backend, Poller.POLLIN);        boolean more = false;        byte[] message;        //  Switch messages between sockets        while (!Thread.currentThread().isInterrupted()) {                        //  poll and memorize multipart detection            items.poll();            if (items.pollin(0)) {                while (true) {                    // receive message                    message = frontend.recv(0);                    more = frontend.hasReceiveMore();                    // Broker it                    backend.send(message, more ? ZMQ.SNDMORE : 0);                                        String str_msg="";                    if (message != null) {                        str_msg= new String(message, Charset.forName("UTF-8"));                    }                    System.out.println(GetCurrtime()+": routed a Req. msg["+str_msg+"] more ["+more+"]");                    if(!more){                        break;                    }                }            }            if (items.pollin(1)) {                while (true) {                    // receive message                    message = backend.recv(0);                    more = backend.hasReceiveMore();                    // Broker it                    frontend.send(message,  more ? ZMQ.SNDMORE : 0);                    System.out.println(GetCurrtime()+": got a Rep.");                    if(!more){                        break;                    }                }            }        }        //  We never get here but clean up anyhow        frontend.close();        backend.close();        context.term();    }        public static String GetCurrtime(){   Calendar cal = Calendar.getInstance();cal.getTime();SimpleDateFormat sdf = new SimpleDateFormat("mm:ss.SS");return sdf.format(cal.getTime());    }}//以下是client程序:package guide;import org.zeromq.ZMQ;import org.zeromq.ZMQ.Context;import org.zeromq.ZMQ.Socket;/*** Hello World client* Connects REQ socket to tcp://localhost:5559* Sends "Hello" to server, expects "World" back*/public class rrclient{    public static void main (String[] args) {        Context context = ZMQ.context(1);        //  Socket to talk to server        Socket requester = context.socket(ZMQ.REQ);        requester.connect("tcp://localhost:5559");                System.out.println("launch and connect client.");        for (int request_nbr = 0; request_nbr < 20; request_nbr++) {            requester.send("Hello", 0);            String reply = requester.recvStr (0);            System.out.println("Received reply " + request_nbr + " [" + reply + "]");        }                //  We never get here but clean up anyhow        requester.close();        context.term();    }}
//worker程序:
package guide;import org.zeromq.ZMQ;import org.zeromq.ZMQ.Context;import org.zeromq.ZMQ.Socket;// Hello World worker// Connects REP socket to tcp://*:5560// Expects "Hello" from client, replies with "World"public class rrworker{ public static void main (String[] args) throws Exception { Context context = ZMQ.context (1); // Socket to talk to server Socket responder = context.socket (ZMQ.REP); responder.connect ("tcp://localhost:5560"); while (!Thread.currentThread ().isInterrupted ()) { // Wait for next request from client String string = responder.recvStr (0); System.out.printf ("Received request: [%s]\n", string); // Do some ‘work‘ Thread.sleep (1000); // Send reply back to client responder.send ("World"); } // We never get here but clean up anyhow responder.close(); context.term(); }}

 

jeroMq示例之 [req-broker-rep]