首页 > 代码库 > jeroMq示例之 [req-broker-rep]
jeroMq示例之 [req-broker-rep]
[req-broker-rep]模式的好处是对于多个客户端的request,broker会将多个任务挨个分给workers,这样多个客户端request可以并发的发送到broker。 worker并发的执行运算。中间的broker随时把worker结果回传给对应的client。
broker程序:
package guide;import java.nio.charset.Charset;import java.text.SimpleDateFormat;import java.util.Calendar;import org.zeromq.ZMQ;import org.zeromq.ZMQ.Context;import org.zeromq.ZMQ.Poller;import org.zeromq.ZMQ.Socket;/*** Simple request-reply broker**/public class rrbroker{ public static void main (String[] args) { // Prepare our context and sockets Context context = ZMQ.context(1); Socket frontend = context.socket(ZMQ.ROUTER); Socket backend = context.socket(ZMQ.DEALER); frontend.bind("tcp://*:5559"); backend.bind("tcp://*:5560"); System.out.println("launch and connect broker."); // Initialize poll set Poller items = new Poller (2); items.register(frontend, Poller.POLLIN); items.register(backend, Poller.POLLIN); boolean more = false; byte[] message; // Switch messages between sockets while (!Thread.currentThread().isInterrupted()) { // poll and memorize multipart detection items.poll(); if (items.pollin(0)) { while (true) { // receive message message = frontend.recv(0); more = frontend.hasReceiveMore(); // Broker it backend.send(message, more ? ZMQ.SNDMORE : 0); String str_msg=""; if (message != null) { str_msg= new String(message, Charset.forName("UTF-8")); } System.out.println(GetCurrtime()+": routed a Req. msg["+str_msg+"] more ["+more+"]"); if(!more){ break; } } } if (items.pollin(1)) { while (true) { // receive message message = backend.recv(0); more = backend.hasReceiveMore(); // Broker it frontend.send(message, more ? ZMQ.SNDMORE : 0); System.out.println(GetCurrtime()+": got a Rep."); if(!more){ break; } } } } // We never get here but clean up anyhow frontend.close(); backend.close(); context.term(); } public static String GetCurrtime(){ Calendar cal = Calendar.getInstance();cal.getTime();SimpleDateFormat sdf = new SimpleDateFormat("mm:ss.SS");return sdf.format(cal.getTime()); }}//以下是client程序:package guide;import org.zeromq.ZMQ;import org.zeromq.ZMQ.Context;import org.zeromq.ZMQ.Socket;/*** Hello World client* Connects REQ socket to tcp://localhost:5559* Sends "Hello" to server, expects "World" back*/public class rrclient{ public static void main (String[] args) { Context context = ZMQ.context(1); // Socket to talk to server Socket requester = context.socket(ZMQ.REQ); requester.connect("tcp://localhost:5559"); System.out.println("launch and connect client."); for (int request_nbr = 0; request_nbr < 20; request_nbr++) { requester.send("Hello", 0); String reply = requester.recvStr (0); System.out.println("Received reply " + request_nbr + " [" + reply + "]"); } // We never get here but clean up anyhow requester.close(); context.term(); }}
//worker程序:package guide;import org.zeromq.ZMQ;import org.zeromq.ZMQ.Context;import org.zeromq.ZMQ.Socket;// Hello World worker// Connects REP socket to tcp://*:5560// Expects "Hello" from client, replies with "World"public class rrworker{ public static void main (String[] args) throws Exception { Context context = ZMQ.context (1); // Socket to talk to server Socket responder = context.socket (ZMQ.REP); responder.connect ("tcp://localhost:5560"); while (!Thread.currentThread ().isInterrupted ()) { // Wait for next request from client String string = responder.recvStr (0); System.out.printf ("Received request: [%s]\n", string); // Do some ‘work‘ Thread.sleep (1000); // Send reply back to client responder.send ("World"); } // We never get here but clean up anyhow responder.close(); context.term(); }}
jeroMq示例之 [req-broker-rep]
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。