首页 > 代码库 > xsocket分析一,启动

xsocket分析一,启动

从一个简单的服务器Hander分析Xsocket的启动,首先定义一个简单的EchoHandler继承IDataHandler

public class EchoHandler implements IDataHandler{    public boolean onData(INonBlockingConnection nbc)                throws IOException,                BufferUnderflowException,                MaxReadSizeExceededException {         String data = nbc.readStringByDelimiter("\r\n");         nbc.write(data + "\r\n");         return true;      }}

然后利用这个IDataHandler启动服务器

public class XsocketTest {    public static void main(String[] args) throws UnknownHostException, IOException {        // TODO Auto-generated method stub         IServer srv = new Server(8090, new EchoHandler());                 // run it within the current thread.         srv.run();  // the call will not return                 // ... or start it by using a dedicated thread        //srv.start(); // returns after the server has been started    }}

 这样就实现了一个将客户端发送的字符串回发给客户端的服务器,接口还是很简单的。

从Server的构造开始看起,Server继承自IServer,类描述是接收新的连接,将INonBlockingConnection的处理代理给 分配给Server的 Handler。Server包含Dispatcher,Dispatcher负责执行IO操作,每个Connection都会被分配给一个Dispatcher。为了处理应用相关的事件如onData、onConnected等,Server的Handler的相应的回调函数会被调用。在Server启动阶段,Handler支持的回调函数会通过反射进行解析,回调函数是通过实现不同的接口如IConnectHandler、IDataHandler等进行标记的。通常一个服务器Handler会实现多个Handler接口。这段描述基本介绍了XSocket的整体结构,主要的概念都讲到了,后面还会具体分析。

Server重载了多个构造函数,上面那种构造方式调用了

    protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext, boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize) throws UnknownHostException, IOException {                        defaultWorkerPool = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);        workerpool = defaultWorkerPool;                if (sslContext != null) {            acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);                    } else {            acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);        }                    localHostname = acceptor.getLocalAddress().getHostName();        localPort = acceptor.getLocalPort();                setHandler(handler);    }

 

 

主要参数是端口,Ihandler,是否支持ssl,工作线程池参数,除了我们赋予的其他都是默认值。首先创建工作线程池WorkPool,WorkerPool继承自ThreadPoolExecutor,WorkPool构造函数需要一个在任务执行之前存放任务的队列,这里是WorkerPoolAwareQueue extends LinkedBlockingQueue<Runnable>,这个WorkerPoolAwareQueue重写了offer函数加入了日志,并在线程池没有空闲线程且没有达到最大线程数时返回false来强制启动一个新的工作线程而不是放到任务队列中等待,算是一个小的优化。

然后是创建Acceptor。这里创建了一个LifeCycleHandler(重要,后面还会讲),实现了IIoAcceptorCallback接口,接口里定义了onCoonected()等方法在Acceptor绑定到socket或者接收到请求时回调。ConnectionUtils提供了一些集合、ByteBuffer操作的便捷方法,其中的静态变量IoProvider定义了Socket IO的各种参数,createAcceptor函数调用

IoAcceptor acceptor = new IoAcceptor(callback, address, backlog, isReuseAddress);

 

IoAcceptor构造函数如下

    public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException {        this.callback = callback; //这个callback就是刚刚说的LifeCycleHandler        this.sslContext = sslContext;        this.sslOn = sslOn;        LOG.fine("try to bind server on " + address);        // create a new server socket        serverChannel = ServerSocketChannel.open();//构造serverChannel        assert (serverChannel != null);                serverChannel.configureBlocking(true);        serverChannel.socket().setSoTimeout(0);  // accept method never times out        serverChannel.socket().setReuseAddress(isReuseAddress);  //在channel关闭但还存活时是否能够重用端口                        try {            serverChannel.socket().bind(address, backlog);            //构建IoSocketDispatcherPool            dispatcherPool = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());                    } catch (BindException be) {            serverChannel.close();            LOG.warning("could not bind server to " + address + ". Reason: " + DataConverter.toString(be));            throw be;        }    }

 

 在构造函数中创建了IoSocketDispatcherPool,同时在构建该Pool时实例化了两个IoSocketDispatcher并运行

for (int i = currentRunning; i < size; i++) {    IoUnsynchronizedMemoryManager memoryManager = null;    if (preallocation) {
//内存管理,采用预分配内存的方法先不管 memoryManager
= IoUnsynchronizedMemoryManager.createPreallocatedMemoryManager(preallocationSize, bufferMinsize, useDirect); } else {memoryManager = IoUnsynchronizedMemoryManager.createNonPreallocatedMemoryManager(useDirect); } IoSocketDispatcher dispatcher = new IoSocketDispatcher(memoryManager, name + "#" + i); dispatchers.addLast(dispatcher); Thread t = new Thread(dispatcher); t.setDaemon(true); t.start(); for (IIoDispatcherPoolListener listener : listeners) { listener.onDispatcherAdded(dispatcher); }}

 

 IoSocketDispatcher构造函数调用了selector = Selector.open(); run方法如下

    public void run() {        // set thread name and attach dispatcher id to thread        Thread.currentThread().setName(name);        THREADBOUND_ID.set(id);                DIRECT_CALL_COUNTER.set(0);        if (LOG.isLoggable(Level.FINE)) {            LOG.fine("selector " + name + " listening ...");        }        int handledTasks = 0;        while(isOpen.get()) {            try {                int eventCount = selector.select(5000);                             handledTasks = performRegisterHandlerTasks(); //处理注册任务,重要                handledTasks += performKeyUpdateTasks(); //处理key更新任务,重要                if (eventCount > 0) {                    handleReadWriteKeys(); //处理读写事件,重要                }                handledTasks += performDeregisterHandlerTasks();                            checkForLooping(eventCount + handledTasks, lastTimeWokeUp);                            } catch (Throwable e) {                // eat and log exception                if (LOG.isLoggable(Level.FINE)) {                    LOG.fine("[" + Thread.currentThread().getName() + "] exception occured while processing. Reason " + DataConverter.toString(e));                }            }        }        for (IoSocketHandler socketHandler : getRegistered()) {            socketHandler.onDeregisteredEvent();        }        try {            selector.close();        } catch (Exception e) {            // eat and log exception            if (LOG.isLoggable(Level.FINE)) {                LOG.fine("error occured by close selector within tearDown " + DataConverter.toString(e));            }        }    }

 

再回到最初的main方法,其中调用了srv.run(); run函数最后调用了Acceptor的accept函数

    private void accept() {        while (isOpen.get()) {            try {                // blocking accept call                SocketChannel channel = serverChannel.accept();                // create IoSocketHandler                IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();                IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);                // notify call back                callback.onConnectionAccepted(ioHandler);                acceptedConnections++;            } catch (Exception e) {                // if acceptor is running (<socket>.close() causes that any                // thread currently blocked in accept() will throw a SocketException)                if (serverChannel.isOpen()) {                    LOG.warning("error occured while accepting connection: " + DataConverter.toString(e));                }            }        }    }

 

 重要就完成server的启动,主要启动了一个Acceptor线程和两个Dispatcher线程。

 

xsocket分析一,启动