首页 > 代码库 > zeromq rpc原型

zeromq rpc原型

/*Asynchronous request-reply single-threaded server in Pythonthat spawns a request handler each time a request is receivedThis is different from other examples because the number of request handler threads is not defined ahead of time.Request:Client DEALER --> Server ROUTER --> Request handler (spawned)1. Clients send requests via a DEALER socket on port 55702. Server receives requests via a ROUTER socket on port 55703. Server passes both the request and the client identity directly to request handlers when they are spawnedReply:Client DEALER <-- Server ROUTER <-- Server DEALER <-- Request handler DEALER1. Request handler returns the reply to the Server via a DEALER socket on inproc2. Server receives the reply from the request handler via a DEALER socket on inproc3. Server sends the reply to the client via a ROUTER socket on port 55704. Client receives the reply via a DEALER socket on port 5570*/using System;using System.Text;using System.Threading;using System.Threading.Tasks;using NetMQ;using NetMQ.Sockets;namespace NetmqSample{    public class ZmqClient    {        public void Request(string input)        {            var socket = new DealerSocket();            socket.Options.Identity = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());            socket.Connect("tcp://127.0.0.1:5570");            socket.SendFrame(input);            Console.WriteLine($"client send: {input} : {DateTime.Now:T}");            var answer = socket.ReceiveFrameString();            Console.WriteLine($"client received: {answer} : {DateTime.Now:T}");            socket.Dispose();        }    }    public class ZmqServer    {        private DealerSocket _backend;        private RouterSocket _frontend;        public void Run()        {            _frontend = new RouterSocket();            _frontend.Bind("tcp://*:5570");            _frontend.ReceiveReady += Frontend_ReceiveReady;            _backend = new DealerSocket();            _backend.Bind("inproc://backend");            _backend.ReceiveReady += Backend_ReceiveReady;            var poller = new NetMQPoller { _frontend, _backend };            poller.RunAsync();            Console.WriteLine("server started");        }        private void Backend_ReceiveReady(object sender, NetMQSocketEventArgs e)        {            var id = e.Socket.ReceiveFrameString();            var msg = e.Socket.ReceiveFrameString();            Console.WriteLine($"server backend response: {id} : {msg}");            _frontend.SendFrame(id, true);            _frontend.SendFrame(msg);        }        private void Frontend_ReceiveReady(object sender, NetMQSocketEventArgs e)        {            var id = e.Socket.ReceiveFrameString();            var msg = e.Socket.ReceiveFrameString();            //Console.WriteLine($"server frontend received: {id} : {msg} : {DateTime.Now:T}");            var task = new Task(() => new RequestHandler().Run(id, msg), TaskCreationOptions.LongRunning);            task.Start();        }    }    public class RequestHandler    {        public void Run(string id, string msg)        {            var worker = new DealerSocket("inproc://backend");            // Simulate a long-running operation            Thread.Sleep(2000);            worker.SendFrame(id, true);            worker.SendFrame(msg + " : " + DateTime.Now.ToLongTimeString());            worker.Dispose();         }    }}

 

    class Program    {        static void Main(string[] args)        {            var server = new ZmqServer();            server.Run();            Enumerable.Range(0, 2000).ToList().ForEach(x =>            {                Task.Factory.StartNew(() => new ZmqClient().Request(x.ToString("0000")), TaskCreationOptions.LongRunning);            });            Console.ReadLine();        }    }

 

zeromq rpc原型