首页 > 代码库 > zeromq rpc原型
zeromq rpc原型
/*Asynchronous request-reply single-threaded server in Pythonthat spawns a request handler each time a request is receivedThis is different from other examples because the number of request handler threads is not defined ahead of time.Request:Client DEALER --> Server ROUTER --> Request handler (spawned)1. Clients send requests via a DEALER socket on port 55702. Server receives requests via a ROUTER socket on port 55703. Server passes both the request and the client identity directly to request handlers when they are spawnedReply:Client DEALER <-- Server ROUTER <-- Server DEALER <-- Request handler DEALER1. Request handler returns the reply to the Server via a DEALER socket on inproc2. Server receives the reply from the request handler via a DEALER socket on inproc3. Server sends the reply to the client via a ROUTER socket on port 55704. Client receives the reply via a DEALER socket on port 5570*/using System;using System.Text;using System.Threading;using System.Threading.Tasks;using NetMQ;using NetMQ.Sockets;namespace NetmqSample{ public class ZmqClient { public void Request(string input) { var socket = new DealerSocket(); socket.Options.Identity = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); socket.Connect("tcp://127.0.0.1:5570"); socket.SendFrame(input); Console.WriteLine($"client send: {input} : {DateTime.Now:T}"); var answer = socket.ReceiveFrameString(); Console.WriteLine($"client received: {answer} : {DateTime.Now:T}"); socket.Dispose(); } } public class ZmqServer { private DealerSocket _backend; private RouterSocket _frontend; public void Run() { _frontend = new RouterSocket(); _frontend.Bind("tcp://*:5570"); _frontend.ReceiveReady += Frontend_ReceiveReady; _backend = new DealerSocket(); _backend.Bind("inproc://backend"); _backend.ReceiveReady += Backend_ReceiveReady; var poller = new NetMQPoller { _frontend, _backend }; poller.RunAsync(); Console.WriteLine("server started"); } private void Backend_ReceiveReady(object sender, NetMQSocketEventArgs e) { var id = e.Socket.ReceiveFrameString(); var msg = e.Socket.ReceiveFrameString(); Console.WriteLine($"server backend response: {id} : {msg}"); _frontend.SendFrame(id, true); _frontend.SendFrame(msg); } private void Frontend_ReceiveReady(object sender, NetMQSocketEventArgs e) { var id = e.Socket.ReceiveFrameString(); var msg = e.Socket.ReceiveFrameString(); //Console.WriteLine($"server frontend received: {id} : {msg} : {DateTime.Now:T}"); var task = new Task(() => new RequestHandler().Run(id, msg), TaskCreationOptions.LongRunning); task.Start(); } } public class RequestHandler { public void Run(string id, string msg) { var worker = new DealerSocket("inproc://backend"); // Simulate a long-running operation Thread.Sleep(2000); worker.SendFrame(id, true); worker.SendFrame(msg + " : " + DateTime.Now.ToLongTimeString()); worker.Dispose(); } }}
class Program { static void Main(string[] args) { var server = new ZmqServer(); server.Run(); Enumerable.Range(0, 2000).ToList().ForEach(x => { Task.Factory.StartNew(() => new ZmqClient().Request(x.ToString("0000")), TaskCreationOptions.LongRunning); }); Console.ReadLine(); } }
zeromq rpc原型
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。