首页 > 代码库 > 用Jetty和redis实现接入服务器adapter

用Jetty和redis实现接入服务器adapter

传统的服务器端为若干个客户端提供服务,一般需要开启多个服务器端进程。为了进一步提升服务器端的处理能力,可以如下图所示将服务解耦为两部分(adapter与workers),它们之间通过消息队列传输数据,其中workers处理具体业务,adapter负责接入请求以及反馈结果,具体包含下面两个工作。

1,将所有客户端的请求发送到消息队列(进而传给后台处理)

2,后台处理完毕后将结果返回响应队列,client adapter获取到结果后返回给相应客户端。

流程图如下:

我们选择用Jetty(8),redis以及php简单实现这个场景,主要用到Jetty的continuation机制和redis的list先进先出数据结构

接入服务器

A.1,先配置一个服务器如下,同时开启一个守护线程阻塞监听response queue(用到json lib库以及jedis库)

package test;import java.util.HashMap;import java.util.List;import org.eclipse.jetty.continuation.Continuation;import org.eclipse.jetty.server.*;import org.eclipse.jetty.server.nio.SelectChannelConnector;import org.eclipse.jetty.servlet.ServletContextHandler;import org.eclipse.jetty.servlet.ServletHolder;import org.eclipse.jetty.util.thread.QueuedThreadPool;import org.json.simple.*;import redis.clients.jedis.Jedis;public class PJetty{        public static HashMap<String,Continuation>globalMap = new HashMap<String,Continuation>();        // 用一个守护线程阻塞等待结果队列返回数据    public static class DaemonThread extends Thread{                private JSONObject obj = new JSONObject();        private Jedis jedis = new Jedis("127.0.0.1",6379);        private List<String> res;                public void run(){                        while(true){                // 阻塞等待响应队列                res = jedis.brpop(0, "response_queue");                                // 获取结果信息                String response = res.get(1);                obj=(JSONObject) JSONValue.parse(response);                String request_sid = obj.get("request_sid").toString();                String result = obj.get("results").toString();                                if(request_sid == null){                    continue;                }                                // 通过消息中的连接的sessonid获取到响应的continuation实例,然后设置结果信息再唤醒实例                Continuation con = globalMap.get(request_sid);                if(con == null){continue;}                globalMap.remove(request_sid);                                //客户端异常断开这里会抛错                try{                    con.setAttribute("results", result);                    con.resume();                } catch(Exception e){                    continue;                }            }                    }    }        public static void main(String[] args) throws Exception {                //开启守护线程去阻塞等待响应结果队列,唤醒请求        DaemonThread dt = new DaemonThread();        dt.start();                //设置connectors        SelectChannelConnector connector1 = new SelectChannelConnector();        connector1.setPort(1987);        connector1.setThreadPool(new QueuedThreadPool(5));                Server server = new Server();        server.setConnectors(new Connector[]{connector1});        //使用servlet处理请求        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);        context.setContextPath("/");        context.addServlet(new ServletHolder(new NonBlockingServlet()), "/fetch");        server.setHandler(context);                server.start();        server.join();    }}

A.2,实现自定义的servlets接受前端client连接,将请求信息传入队列request queue

package test;import java.io.IOException;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import javax.servlet.http.HttpSession;import org.eclipse.jetty.continuation.Continuation;import org.eclipse.jetty.continuation.ContinuationSupport;import org.json.simple.JSONObject;import redis.clients.jedis.Jedis;public class NonBlockingServlet extends HttpServlet {    /**     * generated serialize number     */    private static final long serialVersionUID = 3313258432391586994L;            protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException    {        // 用sleeptime来模拟后台工作量         String sleepTime = request.getParameter("st");         if(sleepTime == null){             sleepTime = "0";         }                  // 查看结果队列是否返回此连接的请求结果         Object results = request.getAttribute("results");         if (results==null) // 如果异步处理尚未返回结果          {           Continuation continuation = ContinuationSupport.getContinuation(request);                if(continuation.isInitial()){               // 设置连接超时时间                continuation.setTimeout(10000);                   response.setContentType("text/plain");                response.getWriter().flush();                                HttpSession session=request.getSession();                String sid = session.getId();                                Jedis jedis = new Jedis("127.0.0.1",6379);                //将请求连接sessionid以及请求内容encode后传到处理队列中                JSONObject obj=new JSONObject();                obj.put("request_sid",sid);                obj.put("params",sleepTime);                                jedis.lpush("request_queue", obj.toJSONString());                                //将连接和continuation实例做个映射关系存到全局hashmap中,不确定这里是否应该加锁                PJetty.globalMap.put(sid, continuation);           }                      // 判断是否超时           if (continuation.isExpired())           {             // 返回超时Response             response.getWriter().println("timeout");                response.getWriter().flush();               return;           }                // 挂起HTTP连接           continuation.suspend();                 return; // or continuation.undispatch();         }              // 连接恢复后返回结果         response.getWriter().println("Got Result:\t" + results);         response.getWriter().flush();      }}

业务服务器

B,实现后端worker.php(可以自定义worker进程数,我这里设置为5个php进程,进程数多能获取更好的并发)(用到predis库)

#!/root/bin/php<?phprequire_once("lib/Predis/Autoloader.php");function worker_thread(){        Predis\Autoloader::register();        $redis = new Predis\Client(‘tcp://127.0.0.1:6379‘);                while(true){                        try{                                $request = $redis->brpop("request_queue", 0);                        } catch(Exception $e){                                continue;                        }                        /** demo                         array(2) {                         [0]=>                         string(13) "request_queue"                         [1]=>                         string(55) "{"request_sid":"q0muxazo8k1h1k3uw85wuayh","params":"4"}"                         }                         */                        $request = json_decode($request[1], true);                        // sleep represents work loads                        sleep(intval($request["params"]));                        $results = array("request_sid"=>$request["request_sid"], "results"=>$request["params"]);                        $response = json_encode($results);                        $redis->lpush("response_queue",$response);                }}//开启多个worker进程提供服务for ($worker_nbr = 0; $worker_nbr < 5; $worker_nbr++) {        $pid = pcntl_fork();        if ($pid == 0) {                worker_thread();                return;        }}?>

运行结果如下

 这只是一个简单的demo,为了防止redis,workers进程挂掉或者客户端异常断开,还需要做些异常处理,比如设置请求超时,捕获一些空指针等,超时需要将continuation从globalMap中剔除,防止内存得不到释放。

root # for((i=10;i>=1;i--)) ; do lynx -dump http://127.0.0.1:1987/fetch?st=$i & done[1] 14112[2] 14113[3] 14114[4] 14115[5] 14116[6] 14117[7] 14118[8] 14119[9] 14120[10] 14121root # Got Result:     3Got Result:     4Got Result:     2Got Result:     7Got Result:     1Got Result:     9Got Result:     6timeouttimeouttimeout[1]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i[2]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i[3]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i[4]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i[5]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i[6]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i[7]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i[8]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i[9]-  Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i[10]+  Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i

redis数据库中存储的内容如下,可以看出虽然经后台处理后顺序变化了,但是对应关系正确,接入服务器能够根据request_sid把结果返回给相应的用户:

redis 127.0.0.1:6379> lrange request_queue 0 15 1) "{\"request_sid\":\"igiwkwnb715aphw8uvtfa6rj\",\"params\":\"3\"}" 2) "{\"request_sid\":\"wsrglxa3h6ef19ik5i0nbiiys\",\"params\":\"2\"}" 3) "{\"request_sid\":\"tyiqoj6awj5t16ddpqusftwc8\",\"params\":\"6\"}" 4) "{\"request_sid\":\"1052tgkiyy7c31bmxjtbom7ca\",\"params\":\"5\"}" 5) "{\"request_sid\":\"17jo1xwnnkd3h1mhcqcfplrl5k\",\"params\":\"8\"}" 6) "{\"request_sid\":\"1xk521sq6vmmf6enxauwzduj9\",\"params\":\"4\"}" 7) "{\"request_sid\":\"1cxnir1slgjiq1o2n3xwznh0kk\",\"params\":\"9\"}" 8) "{\"request_sid\":\"961vf8hao3stsv4vt1qif3ws\",\"params\":\"7\"}" 9) "{\"request_sid\":\"35pfn5au6p8qdbri17p636si\",\"params\":\"10\"}"10) "{\"request_sid\":\"1ca4wy8qsfr7av0hwk8xtlqhp\",\"params\":\"1\"}"redis 127.0.0.1:6379> lrange response_queue 0 15 1) "{\"request_sid\":\"tyiqoj6awj5t16ddpqusftwc8\",\"results\":\"6\"}" 2) "{\"request_sid\":\"igiwkwnb715aphw8uvtfa6rj\",\"results\":\"3\"}" 3) "{\"request_sid\":\"wsrglxa3h6ef19ik5i0nbiiys\",\"results\":\"2\"}" 4) "{\"request_sid\":\"35pfn5au6p8qdbri17p636si\",\"results\":\"10\"}" 5) "{\"request_sid\":\"1052tgkiyy7c31bmxjtbom7ca\",\"results\":\"5\"}" 6) "{\"request_sid\":\"1cxnir1slgjiq1o2n3xwznh0kk\",\"results\":\"9\"}" 7) "{\"request_sid\":\"17jo1xwnnkd3h1mhcqcfplrl5k\",\"results\":\"8\"}" 8) "{\"request_sid\":\"961vf8hao3stsv4vt1qif3ws\",\"results\":\"7\"}" 9) "{\"request_sid\":\"1xk521sq6vmmf6enxauwzduj9\",\"results\":\"4\"}"10) "{\"request_sid\":\"1ca4wy8qsfr7av0hwk8xtlqhp\",\"results\":\"1\"}"