首页 > 代码库 > 《用Java写一个通用的服务器程序》02 监听器

《用Java写一个通用的服务器程序》02 监听器

在一个服务器程序中,监听器的作用类似于公司前台,起引导作用,因此监听器花在每个新连接上的时间应该尽可能短,这样才能保证最快响应。

回到编程本身来说:

1. 监听器最好由单独的线程运行

2. 监听器在接到新的连接之后,处理连接的方法需要尽快返回

 

在Java Push Framework中,因为需要同时监听普通客户端和服务器监视服务的客户端,所以定义两种监听器:Acceptor和MonitorAcceptor。

由于两者的关于监听部分的逻辑是相同的,因此首先定义了抽象类Listener来实现了监视器的功能,把处理socket的部分定义为抽象方法。

// 处理socket的抽象方法protected abstract boolean handleAcceptedSocket(            PushClientSocket clientSocket);

 

对于监听功能的实现比较简单,还是那三步:create,bind,accept。

    private boolean doListening(InetSocketAddress serverAddr) {        boolean ret = false;        int socketBufferSize = getServerImpl().getServerOptions()                    .getSocketBufferSize();        int socketType = getServerImpl().getServerOptions()                    .getSocketType();        try {            // Create            serverSocket = SocketFactory.getDefault().createServerSocket(                    socketType, socketBufferSize);                        // Bind            serverSocket.bind(serverAddr);            Debug.debug("Start to listen " + serverAddr.getHostName() + ":"                    + serverAddr.getPort());                        // Accept            doAccept();                        ret = true;        } catch (IOException e) {            e.printStackTrace();            if (serverSocket != null) {                serverSocket.close();                serverSocket = null;            }        }                return ret;    }

考虑Java中现在有好几种不同的socket:同步阻塞Socket,同步非阻塞Socket,以及JDK7新添加的异步Socket,如果直接使用Java的Socket类,不方便在不同类型的socket之间切换使用。所以我自定义了PushServerSocket和PushClientSocket两个新接口:

// 对于服务器socket来说,只定义了必须的bind和accept,// 以及一个不会抛出异常的close。public interface PushServerSocket {    public void bind(InetSocketAddress serverAddr) throws IOException;    public PushClientSocket accept() throws IOException;    public void close();}
// 客户端socket接口的定义是C++的风格,因为原来的代码是C++写的,这么定义便于翻译原来的C++代码public interface PushClientSocket {    public String getIP();    public int getPort();        // 这里直接使用Selector其实是有问题的,注定了只能使用NIO的方式    // 后面会考虑修改    public SelectionKey registerSelector(Selector selector, int ops,             Object attachment) throws IOException;        public int send(byte[] buffer, int offset, int size) throws IOException;        public int recv(byte[] buffer, int offset, int size) throws IOException;        public boolean isOpen();        public boolean isConnected();        public void close();}

两者对应的NIO版本实现是PushServerSocketImpl和PushClientSocketImpl,代码实现比较简单,这里就不贴出来了。

回到Listener,来看doAccept:

    private void doAccept()    {        // Start a new thread        acceptorThread = new Thread(new Runnable()  {            public void run() {                while (blnRunning) {                    try {                        PushClientSocket clientSocket = serverSocket.accept();                                                Debug.debug("New client from " + clientSocket.getIP());                        // Start servicing the client connection                        if (!handleAcceptedSocket(clientSocket)) {                            clientSocket.close();                        }                    } catch (IOException e) {                        e.printStackTrace();                        return;                    }                }            }                    });                // Start the thread        acceptorThread.start();    }

这里服务器socket的accept方法实现是阻塞的,这样可以避免不停地轮询,因此在用NIO实现accept时要不能调用configureBlocking设置成非阻塞模式。

后面停止监听时直接调用服务器socket的close方法,accept方法会抛出异常从而跳出循环,结束监听线程的运行。

结束监听时不要忘记使用线程的join方法等待线程结束。

    public void stopListening() {        blnRunning = false;        // Close server socket        if (serverSocket != null) {            serverSocket.close();            serverSocket = null;        }                // Wait the thread to terminate        if (acceptorThread != null) {            try {                acceptorThread.join();            } catch (InterruptedException e) {                //e.printStackTrace();            }            acceptorThread = null;        }    }

Acceptor的实现相对复杂一些,需要记录访问的信息,做一些检查,然后再交给ClientFactory处理:

    protected boolean handleAcceptedSocket(PushClientSocket clientSocket) {        // 记录日志        ClientFactory clientFactoryImpl = serverImpl.getClientFactory();        ServerStats stats = serverImpl.getServerStats();        ServerOptions options = serverImpl.getServerOptions();        stats.addToCumul(ServerStats.Measures.VisitorsSYNs, 1);        // 检查是否达到最大允许访问数        if (clientFactoryImpl.getClientCount() >= options.getMaxConnections()) {            Debug.debug("Reach maximum clients allowed, deny it");            return false;        }        //检查IP是否被允许        if (!clientFactoryImpl.isAddressAllowed(clientSocket.getIP())) {            Debug.debug("IP refused: " + clientSocket.getIP());            return false;        }        // 处理socket        return clientFactoryImpl.createPhysicalConnection(clientSocket,                 false, listenerOptions);    }

MonitorAcceptor的实现比较简单,直接交给ClientFactory处理就可以。

    protected boolean handleAcceptedSocket(PushClientSocket clientSocket) {        return serverImpl.getClientFactory().createPhysicalConnection(                clientSocket, true, listenerOptions);    }

关于ClientFactory的处理逻辑后面的文章里细讲。

 

实现一个监听器功能是很容易的,所以可以说的东西不多。

 

《用Java写一个通用的服务器程序》02 监听器