首页 > 代码库 > 使用jetty的continuations实现"服务器推"

使用jetty的continuations实现"服务器推"

 在实际的开发中,我们可能会有这样的场景:许多客户端都连接到服务器端,当有某个客户端的消息的时候,服务器端会主动"推"消息给客户端,手机app的推送是一个典型的场景(IOS的推送都是要经过苹果的服务器的,一般是通过苹果的APNS服务来实现,不需要做过多的开发,安卓的推送就需要我们自己来实现了)

我们可选的技术方案实际上是很多的,使用netty这样的异步的网络通信框架或者servlet容器提供的异步的方案都是可以实现的,它们的理念都是一样的,异步和事件驱动,客户端请求服务器,当服务器没有需要推送的数据(或者是需要执行很长时间的IO操作)的时候,请求会被挂起,当服务器端的数据准备好的时候(例如需要向客户端推送一个消息的时候,或者是服务器端IO操作执行完毕了)请求会被重新激活,数据返回客户端.

使用jetty的continuations或者是netty来实现这两种是我觉得比较好的实现方案,今天介绍一下如何使用jetty的continuations来实现一个服务器推的原型,和正式环境中向安卓手机的推送的实现方法是完全一样的

continuations介绍:jetty的continuations是jetty实现的实现异步请求和事件驱动的组件,从jetty7起,continuations不止在jetty中可以使用,任何支持servlet3.0规范的servlet容器都可以使用continuations来实现异步和事件驱动,相比servlet3.0规范中的异步servlet,continuations提供了更加简化的编程模型.

 

目标:用浏览器请求服务器的一个URL(用浏览器来模拟我们的客户端),实现任何时候当服务器需要推送数据的时候,浏览器能够立即显示出来

我们需要提供两个接口:提供给客户端做长连接的接口,向客户端发送数据的接口

提供给客户端连接的servlet:

 

package com.jiaoyiping.websample.asyncServlet.jetty; /*  * Created with Intellij IDEA  * USER: 焦一平  * Mail: jiaoyiping@gmail.com  * Date: 2016/10/23  * Time: 23:52  * To change this template use File | Settings | Editor | File and Code Templates */import org.eclipse.jetty.continuation.Continuation;import org.eclipse.jetty.continuation.ContinuationListener;import org.eclipse.jetty.continuation.ContinuationSupport;import javax.servlet.ServletException;import javax.servlet.annotation.WebServlet;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import java.io.IOException;import java.io.OutputStream;import java.io.OutputStreamWriter;import java.io.Writer;import java.util.Map;@WebServlet(urlPatterns = "/pull", asyncSupported = true)public class ContinuationServlet extends HttpServlet {    @Override    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {        String user = req.getParameter("user");        Map<String, PushAgent> pushAgentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap");        if (pushAgentMap.containsKey(user)) {            PushAgent pushAgent = pushAgentMap.get(user);            Continuation continuation = ContinuationSupport.getContinuation(req);            continuation.setTimeout(90000000);            //第一次请求进来            if (continuation.isInitial()) {                resp.setContentType("text/evf;charset=utf-8");                resp.setHeader("Connection", "keep-alive");                resp.setHeader("Keep-Alive", "timeout=2000");                PushAdapter pushAdapter = new PushAdapter(continuation, pushAgent);                continuation.setAttribute("adapter", pushAdapter);                continuation.addContinuationListener(new ContinuationListener() {                    @Override                    public void onComplete(Continuation continuation) {                        PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter");                        if (null != adapter) {                            continuation.setAttribute("adapter", null);                        }                    }                    @Override                    public void onTimeout(Continuation continuation) {                        onComplete(continuation);                    }                });                resp.flushBuffer();            }            if (continuation.isExpired()) {                return;            }            Writer writer = getWriter(resp);            PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter");            Message message;            while (true) {                message = adapter.getPushAgent().pull();                if (null == message)                    break;                try {                    writer.write(message.getContent());                    writer.flush();                    writer.write("\r\n");                    writer.flush();                    resp.flushBuffer();                } catch (Exception e) {                    throw e;                }            }            //若没有该客户端的消息,则请求被挂起            continuation.suspend();        }    }    private Writer getWriter(HttpServletResponse response) throws IOException {        OutputStream os = response.getOutputStream();        return new OutputStreamWriter(os, "UTF-8");    }}

 

向客户端推送消息的servlet:

package com.jiaoyiping.websample.asyncServlet.jetty;import javax.servlet.ServletException;import javax.servlet.annotation.WebServlet;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import java.io.IOException;import java.io.PrintWriter;import java.util.Map;/*  * Created with Intellij IDEA  * USER: 焦一平  * Mail: jiaoyiping@gmail.com  * Date: 2016/10/25  * Time: 23:46  * To change this template use File | Settings | Editor | File and Code Templates */@WebServlet(urlPatterns = "/send")public class MesssageSendServlet extends HttpServlet {    @Override    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {        //不要在自己实现的servlet中调用 super.doGet(0)或者是super.doPost()        //因为在tomcat它们的默认实现是报405(HTTP1.1)或者400(其他版本的HTTP)        //        super.doPost(req, resp);        String target = req.getParameter("target");        String messageStr = req.getParameter("message");        Map<String, PushAgent> agentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap");        if (agentMap.keySet().contains(target)) {            Message message = new Message();            message.setTarget(target);            message.setContent(messageStr);            if (agentMap.get(target).isInited()) {                agentMap.get(target).onEvent(message);            }            agentMap.get(target).send(message);            PrintWriter out = resp.getWriter();            out.print("发送成功");            out.flush();            out.close();        }    }    @Override    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {        this.doPost(req, resp);    }}

 

推送代理:就是可以拿到客户端相关信息,并且维护客户端消息队列的类,在这个推送代理中,我们可以加入一个监听器,当有数据需要推送的时候,激活请求

package com.jiaoyiping.websample.asyncServlet.jetty; /*  * Created with Intellij IDEA  * USER: 焦一平  * Mail: jiaoyiping@gmail.com  * Date: 2016/10/25  * Time: 22:56  * To change this template use File | Settings | Editor | File and Code Templates */public interface PushAgent {    Terminal getTerminal();    String getAddress();    String getToken();    Message send(Message message);    Message pull();    void addListener(MessageListener messageListener);    void onEvent(Message message);    boolean isInited();}

默认实现(每个需要接受推送的用户对应一个PushAgent,和用户端保持长连接的线程从queue里读取mesage对象,向某个用户推送的时候将message对象放到该用户对应的PushAgent的queue里,这里是一个生产者-消费者模式):

package com.jiaoyiping.websample.asyncServlet.jetty; /*  * Created with Intellij IDEA  * USER: 焦一平  * Mail: jiaoyiping@gmail.com  * Date: 2016/10/25  * Time: 23:17  * To change this template use File | Settings | Editor | File and Code Templates */import java.util.PriorityQueue;import java.util.Queue;public class DefaultPushAgent implements PushAgent {    private Terminal terminal;    //客户端通过长连接连接到服务器时,服务器不断地从该队列poll(),若果拿到新的消息,则返回给客户端    private Queue<Message> messages = new PriorityQueue<>();    private MessageListener messageListener;    @Override    public Terminal getTerminal() {        return this.terminal;    }    @Override    public String getAddress() {        return null;    }    @Override    public String getToken() {        return null;    }    @Override    public Message send(Message message) {        synchronized (message) {            messages.add(message);        }        return message;    }    @Override    public Message pull() {        synchronized (messages) {            return messages.poll();        }    }    @Override    public void addListener(MessageListener messageListener) {        this.messageListener = messageListener;    }    @Override    public void onEvent(Message message) {        this.messageListener.onMessage(message);    }    @Override    public boolean isInited() {        return this.messageListener != null;    }    public DefaultPushAgent(Terminal terminal) {        this.terminal = terminal;    }}

 

PushAdapter的实现(用户将Continuation和PushAgent关联起来):
package com.jiaoyiping.websample.asyncServlet.jetty; /*  * Created with Intellij IDEA  * USER: 焦一平  * Mail: jiaoyiping@gmail.com  * Date: 2016/10/25  * Time: 23:37  * To change this template use File | Settings | Editor | File and Code Templates */import org.eclipse.jetty.continuation.Continuation;public class PushAdapter {    private Continuation continuation;    private PushAgent pushAgent;    public PushAdapter(Continuation continuation, PushAgent pushAgent) {        this.continuation = continuation;        this.pushAgent = pushAgent;        this.pushAgent.addListener(message -> {            if (PushAdapter.this.continuation.isSuspended()) {                PushAdapter.this.continuation.resume();            }        });    }    public Continuation getContinuation() {        return continuation;    }    public void setContinuation(Continuation continuation) {        this.continuation = continuation;    }    public PushAgent getPushAgent() {        return pushAgent;    }    public void setPushAgent(PushAgent pushAgent) {        this.pushAgent = pushAgent;    }}

 

MessageListener的实现(监听需要推送消息的事件,这里为了做演示,并没有实现一个完整的观察者模式,只是在需要推送消息的时候,手工调用 onMessage()):

package com.jiaoyiping.websample.asyncServlet.jetty; /*  * Created with Intellij IDEA  * USER: 焦一平  * Mail: jiaoyiping@gmail.com  * Date: 2016/10/26  * Time: 2:03  * To change this template use File | Settings | Editor | File and Code Templates */public interface MessageListener {    void onMessage(Message message);}    

 

测试数据:使用一个listener在应用初始化的时候,初始化一些数据做为测试数据

 

package com.jiaoyiping.websample.asyncServlet.jetty;import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;import javax.servlet.annotation.WebListener;import java.util.HashMap;import java.util.Map;/*  * Created with Intellij IDEA  * USER: 焦一平  * Mail: jiaoyiping@gmail.com  * Date: 2016/10/25  * Time: 23:55  * To change this template use File | Settings | Editor | File and Code Templates */@WebListenerpublic class PushListener implements ServletContextListener {    @Override    public void contextInitialized(ServletContextEvent sce) {        Map<String, PushAgent> agentMap = new HashMap<>();        agentMap.put("zhangsan", new DefaultPushAgent(new Terminal() {{            setAddress("zhangsan");            setToken("zhangsan_token");        }}));        agentMap.put("lisi", new DefaultPushAgent(new Terminal() {{            setAddress("lisi");            setToken("lisi_token");        }}));        sce.getServletContext().setAttribute("agentmap",agentMap);    }    @Override    public void contextDestroyed(ServletContextEvent sce) {    }}

 

最终的效果是这样的,我截了一个git图:

技术分享

 

使用jetty的continuations实现"服务器推"