首页 > 代码库 > Nio学习5——对NIO.2(AIO) Reactor模式封装的拆解

Nio学习5——对NIO.2(AIO) Reactor模式封装的拆解


我们通过nio学习了Reactor模式,但是在java7中又出现了NIO.2,新的异步框架出来了,在上节中的服务端视线中看不到Reactor的影子了,但是Netty in action中写到:But notice that NIO.2 handles threading and the creation of the so-called event loop for you.所以模式还是没变,只是封装了而已!那让我们来分解下AIO(NIO.2)的封装吧!

首先看下AsynchronousServerSocketChannel类结构图:


AsynchronousSocketChannel的类结构图:


不同的jdk版本会有不同的实现,这里用的是windows的实现,只有一个类,所以跟踪源码不是很麻烦。

AsynchronousChannelGroup

               这个类的官方解释是:A grouping of asynchronous channels for the purpose of resource sharing(实现异步通道的资源共享).

An asynchronous channel group encapsulates the mechanics required to handle the completion of I/O operations initiated byasynchronous channels that are bound to the group(通道组封装了加入组中的那些通道的IO操作).A group has an associated thread pool to which tasks are submitted to handle I/O events and dispatch tocompletion-handlers that consume the result of asynchronous operations performed on channels in the group. In addition to handling I/O events, the pooled threads may also execute other tasks required to support the execution of asynchronous I/O operations.(每个组都有一个线程池,这个线程池可以运行IO事件的分发和Handler的业务处理,如果没有显式的创建通道组,那么 在创建通道的时候没有指定group参数的时候会用jvm默认的通道组)

因为有了默认的通道组,所以如果没有显式制定的话,都会用系统默认的那一套,这对于所有的通道都一样。

AsynchronousServerSocketChannel

/**
     * Opens an asynchronous server-socket channel.
     *
     * <p> The new channel is created by invoking the {@link
     * java.nio.channels.spi.AsynchronousChannelProvider#openAsynchronousServerSocketChannel
     * openAsynchronousServerSocketChannel} method on the {@link
     * java.nio.channels.spi.AsynchronousChannelProvider} object that created
     * the given group. If the group parameter is <tt>null</tt> then the
     * resulting channel is created by the system-wide default provider, and
     * bound to the <em>default group</em>.
     *
     * @param   group
     *          The group to which the newly constructed channel should be bound,
     *          or <tt>null</tt> for the default group
     *
     * @return  A new asynchronous server socket channel
     *
     * @throws  ShutdownChannelGroupException
     *          If the channel group is shutdown
     * @throws  IOException
     *          If an I/O error occurs
     */
    public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group)
        throws IOException
    {
        AsynchronousChannelProvider provider = (group == null) ?
            AsynchronousChannelProvider.provider() : group.provider();
        return provider.openAsynchronousServerSocketChannel(group);
    }

    /**
     * Opens an asynchronous server-socket channel.
     *
     * <p> This method returns an asynchronous server socket channel that is
     * bound to the <em>default group</em>. This method is equivalent to evaluating
     * the expression:
     * <blockquote><pre>
     * open((AsynchronousChannelGroup)null);
     * </pre></blockquote>
     *
     * @return  A new asynchronous server socket channel
     *
     * @throws  IOException
     *          If an I/O error occurs
     */
    public static AsynchronousServerSocketChannel open()
        throws IOException
    {
        return open(null);
    }

这里可以看到group参数的影子了吧,但是我们一般都是用无参的open所以,所以都是默认配置!

<Object> void java.nio.channels.AsynchronousServerSocketChannel.accept(Object attachment,CompletionHandler<AsynchronousSocketChannel, ? superObject> handler)

看下解释:         

                 When a new connection is accepted then the resulting AsynchronousSocketChannel will be bound to the sameAsynchronousChannelGroup as this channel. 

所以当一切都是以AsynchronousServerSocketChannel为开头的时候,所有他创建的AsynchronousSocketChannel都是和他一个group的所以他们共享一个线程池。

public void serve(int port) throws IOException {
		System.out.println("Listening for connections on port " + port);
		final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel
				.open();
		InetSocketAddress address = new InetSocketAddress(port);
		serverChannel.bind(address);
		final CountDownLatch latch = new CountDownLatch(1);
		serverChannel.accept(null,
				new CompletionHandler<AsynchronousSocketChannel, Object>() {
					@Override
					public void completed(
							final AsynchronousSocketChannel channel,
							Object attachment) {
						serverChannel.accept(null, this);
						ByteBuffer buffer = ByteBuffer.allocate(100);
						channel.read(buffer, buffer, new EchoCompletionHandler(
								channel));
					}

					@Override
					public void failed(Throwable throwable, Object attachment) {
						try {
							serverChannel.close();
						} catch (IOException e) {
							// ingnore on close
						} finally {
							latch.countDown();
						}
					}
				});
		try {
			latch.await();
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}

这里利用accept来接受客户端的请求,注意accept的递归调用!

WindowsAsynchronousServerSocketChannelImpl

Future<AsynchronousSocketChannel> implAccept(Object attachment,
        final CompletionHandler<AsynchronousSocketChannel,Object> handler)
    {
        if (!isOpen()) {
            Throwable exc = new ClosedChannelException();
            if (handler == null)
                return CompletedFuture.withFailure(exc);
            Invoker.invokeIndirectly(this, handler, attachment, null, exc);
            return null;
        }
        if (isAcceptKilled())
            throw new RuntimeException("Accept not allowed due to cancellation");

        // ensure channel is bound to local address
        if (localAddress == null)
            throw new NotYetBoundException();

        // create the socket that will be accepted. The creation of the socket
        // is enclosed by a begin/end for the listener socket to ensure that
        // we check that the listener is open and also to prevent the I/O
        // port from being closed as the new socket is registered.
        WindowsAsynchronousSocketChannelImpl ch = null;
        IOException ioe = null;
        try {
            begin();
            ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);
        } catch (IOException x) {
            ioe = x;
        } finally {
            end();
        }
        if (ioe != null) {
            if (handler == null)
                return CompletedFuture.withFailure(ioe);
            Invoker.invokeIndirectly(this, handler, attachment, null, ioe);
            return null;
        }

        // need calling context when there is security manager as
        // permission check may be done in a different thread without
        // any application call frames on the stack
        AccessControlContext acc = (System.getSecurityManager() == null) ?
            null : AccessController.getContext();

        PendingFuture<AsynchronousSocketChannel,Object> result =
            new PendingFuture<AsynchronousSocketChannel,Object>(this, handler, attachment);
        AcceptTask task = new AcceptTask(ch, acc, result);
        result.setContext(task);

        // check and set flag to prevent concurrent accepting
        if (!accepting.compareAndSet(false, true))
            throw new AcceptPendingException();

        // initiate I/O
        if (Iocp.supportsThreadAgnosticIo()) {
            task.run();
        } else {
            Invoker.invokeOnThreadInThreadPool(this, task);
        }
        return result;
    }

看到accept操作封装成了AcceptTask,然后在Invoker.invokeOnThreadInThreadPool(this, task)中进行了调用

 /**
     * Invokes the given task on the thread pool associated with the given
     * channel. If the current thread is in the thread pool then the task is
     * invoked directly.
     */
    static void invokeOnThreadInThreadPool(Groupable channel,
                                           Runnable task)
    {
        boolean invokeDirect;
        GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
        AsynchronousChannelGroupImpl targetGroup = channel.group();
        if (thisGroupAndInvokeCount == null) {
            invokeDirect = false;
        } else {
            invokeDirect = (thisGroupAndInvokeCount.group == targetGroup);
        }
        try {
            if (invokeDirect) {
                task.run();
            } else {
                targetGroup.executeOnPooledThread(task);
            }
        } catch (RejectedExecutionException ree) {
            throw new ShutdownChannelGroupException();
        }
    }

因为把他封装成了task,这里也就不难明白为什么要递归调用了吧!

AcceptTask

/**
         * Initiates the accept operation.
         */
        @Override
        public void run() {
            long overlapped = 0L;

            try {
                // begin usage of listener socket
                begin();
                try {
                    // begin usage of child socket (as it is registered with
                    // completion port and so may be closed in the event that
                    // the group is forcefully closed).
                    channel.begin();

                    synchronized (result) {
                        overlapped = ioCache.add(result);

                        int n = accept0(handle, channel.handle(), overlapped, dataBuffer);
                        if (n == IOStatus.UNAVAILABLE) {
                            return;
                        }

                        // connection accepted immediately
                        finishAccept();

                        // allow another accept before the result is set
                        enableAccept();
                        result.setResult(channel);
                    }
                } finally {
                    // end usage on child socket
                    channel.end();
                }
            } catch (Throwable x) {
                // failed to initiate accept so release resources
                if (overlapped != 0L)
                    ioCache.remove(overlapped);
                closeChildChannel();
                if (x instanceof ClosedChannelException)
                    x = new AsynchronousCloseException();
                if (!(x instanceof IOException) && !(x instanceof SecurityException))
                    x = new IOException(x);
                enableAccept();
                result.setFailure(x);
            } finally {
                // end of usage of listener socket
                end();
            }

            // accept completed immediately but may not have executed on
            // initiating thread in which case the operation may have been
            // cancelled.
            if (result.isCancelled()) {
                closeChildChannel();
            }

            // invoke completion handler
            Invoker.invokeIndirectly(result);
        }

result里面封装了我们注册的回调函数CompleteHandler,所以继续看上面代码最后一行

Invoker

1 .  invokeIndirectly 

/**
     * Invoke handler with completed result. The handler is invoked indirectly,
     * via the channel group's thread pool.
     */
    static <V,A> void invokeIndirectly(PendingFuture<V,A> future) {
        assert future.isDone();
        CompletionHandler<V,? super A> handler = future.handler();
        if (handler != null) {
            invokeIndirectly(future.channel(),
                             handler,
                             future.attachment(),
                             future.value(),
                             future.exception());
        }
    }
2. invokeUnchecked

 /**
     * Invokes the handler indirectly via the channel group's thread pool.
     */
    static <V,A> void invokeIndirectly(AsynchronousChannel channel,
                                       final CompletionHandler<V,? super A> handler,
                                       final A attachment,
                                       final V result,
                                       final Throwable exc)
    {
        try {
            ((Groupable)channel).group().executeOnPooledThread(new Runnable() {
                public void run() {
                    GroupAndInvokeCount thisGroupAndInvokeCount =
                        myGroupAndInvokeCount.get();
                    if (thisGroupAndInvokeCount != null)
                        thisGroupAndInvokeCount.setInvokeCount(1);
                    invokeUnchecked(handler, attachment, result, exc);
                }
            });
        } catch (RejectedExecutionException ree) {
            throw new ShutdownChannelGroupException();
        }
    }

3.

 /**
     * Invoke handler without checking the thread identity or number of handlers
     * on the thread stack.
     */
    static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
                                      A attachment,
                                      V value,
                                      Throwable exc)
    {
        if (exc == null) {
            handler.completed(value, attachment);
        } else {
            handler.failed(exc, attachment);
        }

        // clear interrupt
        Thread.interrupted();
    }

到这里看到了我们的handler的调用了吧!

所以封装毕竟是封装啊,但是可以看到很多资源的公用啊,做的优化~