首页 > 代码库 > Nio学习5——对NIO.2(AIO) Reactor模式封装的拆解
Nio学习5——对NIO.2(AIO) Reactor模式封装的拆解
我们通过nio学习了Reactor模式,但是在java7中又出现了NIO.2,新的异步框架出来了,在上节中的服务端视线中看不到Reactor的影子了,但是Netty in action中写到:But notice that NIO.2 handles threading and the creation of the so-called event loop for you.所以模式还是没变,只是封装了而已!那让我们来分解下AIO(NIO.2)的封装吧!
首先看下AsynchronousServerSocketChannel类结构图:
AsynchronousSocketChannel的类结构图:
不同的jdk版本会有不同的实现,这里用的是windows的实现,只有一个类,所以跟踪源码不是很麻烦。
AsynchronousChannelGroup
这个类的官方解释是:A grouping of asynchronous channels for the purpose of resource sharing(实现异步通道的资源共享).
An asynchronous channel group encapsulates the mechanics required to handle the completion of I/O operations initiated byasynchronous channels
that are bound to the group(通道组封装了加入组中的那些通道的IO操作).A group has an associated thread pool to which tasks are submitted to handle I/O events and dispatch tocompletion-handlers
that consume the result of asynchronous operations performed on channels in the group. In addition to handling I/O events, the pooled threads may also execute other tasks required to support the execution of asynchronous I/O operations.(每个组都有一个线程池,这个线程池可以运行IO事件的分发和Handler的业务处理,如果没有显式的创建通道组,那么 在创建通道的时候没有指定group参数的时候会用jvm默认的通道组)
因为有了默认的通道组,所以如果没有显式制定的话,都会用系统默认的那一套,这对于所有的通道都一样。
AsynchronousServerSocketChannel
/** * Opens an asynchronous server-socket channel. * * <p> The new channel is created by invoking the {@link * java.nio.channels.spi.AsynchronousChannelProvider#openAsynchronousServerSocketChannel * openAsynchronousServerSocketChannel} method on the {@link * java.nio.channels.spi.AsynchronousChannelProvider} object that created * the given group. If the group parameter is <tt>null</tt> then the * resulting channel is created by the system-wide default provider, and * bound to the <em>default group</em>. * * @param group * The group to which the newly constructed channel should be bound, * or <tt>null</tt> for the default group * * @return A new asynchronous server socket channel * * @throws ShutdownChannelGroupException * If the channel group is shutdown * @throws IOException * If an I/O error occurs */ public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group) throws IOException { AsynchronousChannelProvider provider = (group == null) ? AsynchronousChannelProvider.provider() : group.provider(); return provider.openAsynchronousServerSocketChannel(group); } /** * Opens an asynchronous server-socket channel. * * <p> This method returns an asynchronous server socket channel that is * bound to the <em>default group</em>. This method is equivalent to evaluating * the expression: * <blockquote><pre> * open((AsynchronousChannelGroup)null); * </pre></blockquote> * * @return A new asynchronous server socket channel * * @throws IOException * If an I/O error occurs */ public static AsynchronousServerSocketChannel open() throws IOException { return open(null); }
这里可以看到group参数的影子了吧,但是我们一般都是用无参的open所以,所以都是默认配置!
<Object> void java.nio.channels.AsynchronousServerSocketChannel.accept(Object attachment,CompletionHandler<AsynchronousSocketChannel, ? superObject> handler)
看下解释:
When a new connection is accepted then the resulting AsynchronousSocketChannel
will be bound to the sameAsynchronousChannelGroup
as this channel.
所以当一切都是以AsynchronousServerSocketChannel为开头的时候,所有他创建的AsynchronousSocketChannel都是和他一个group的所以他们共享一个线程池。
public void serve(int port) throws IOException { System.out.println("Listening for connections on port " + port); final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel .open(); InetSocketAddress address = new InetSocketAddress(port); serverChannel.bind(address); final CountDownLatch latch = new CountDownLatch(1); serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed( final AsynchronousSocketChannel channel, Object attachment) { serverChannel.accept(null, this); ByteBuffer buffer = ByteBuffer.allocate(100); channel.read(buffer, buffer, new EchoCompletionHandler( channel)); } @Override public void failed(Throwable throwable, Object attachment) { try { serverChannel.close(); } catch (IOException e) { // ingnore on close } finally { latch.countDown(); } } }); try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
这里利用accept来接受客户端的请求,注意accept的递归调用!
WindowsAsynchronousServerSocketChannelImpl
Future<AsynchronousSocketChannel> implAccept(Object attachment, final CompletionHandler<AsynchronousSocketChannel,Object> handler) { if (!isOpen()) { Throwable exc = new ClosedChannelException(); if (handler == null) return CompletedFuture.withFailure(exc); Invoker.invokeIndirectly(this, handler, attachment, null, exc); return null; } if (isAcceptKilled()) throw new RuntimeException("Accept not allowed due to cancellation"); // ensure channel is bound to local address if (localAddress == null) throw new NotYetBoundException(); // create the socket that will be accepted. The creation of the socket // is enclosed by a begin/end for the listener socket to ensure that // we check that the listener is open and also to prevent the I/O // port from being closed as the new socket is registered. WindowsAsynchronousSocketChannelImpl ch = null; IOException ioe = null; try { begin(); ch = new WindowsAsynchronousSocketChannelImpl(iocp, false); } catch (IOException x) { ioe = x; } finally { end(); } if (ioe != null) { if (handler == null) return CompletedFuture.withFailure(ioe); Invoker.invokeIndirectly(this, handler, attachment, null, ioe); return null; } // need calling context when there is security manager as // permission check may be done in a different thread without // any application call frames on the stack AccessControlContext acc = (System.getSecurityManager() == null) ? null : AccessController.getContext(); PendingFuture<AsynchronousSocketChannel,Object> result = new PendingFuture<AsynchronousSocketChannel,Object>(this, handler, attachment); AcceptTask task = new AcceptTask(ch, acc, result); result.setContext(task); // check and set flag to prevent concurrent accepting if (!accepting.compareAndSet(false, true)) throw new AcceptPendingException(); // initiate I/O if (Iocp.supportsThreadAgnosticIo()) { task.run(); } else { Invoker.invokeOnThreadInThreadPool(this, task); } return result; }
看到accept操作封装成了AcceptTask,然后在Invoker.invokeOnThreadInThreadPool(this, task)中进行了调用
/** * Invokes the given task on the thread pool associated with the given * channel. If the current thread is in the thread pool then the task is * invoked directly. */ static void invokeOnThreadInThreadPool(Groupable channel, Runnable task) { boolean invokeDirect; GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get(); AsynchronousChannelGroupImpl targetGroup = channel.group(); if (thisGroupAndInvokeCount == null) { invokeDirect = false; } else { invokeDirect = (thisGroupAndInvokeCount.group == targetGroup); } try { if (invokeDirect) { task.run(); } else { targetGroup.executeOnPooledThread(task); } } catch (RejectedExecutionException ree) { throw new ShutdownChannelGroupException(); } }
因为把他封装成了task,这里也就不难明白为什么要递归调用了吧!
AcceptTask
/** * Initiates the accept operation. */ @Override public void run() { long overlapped = 0L; try { // begin usage of listener socket begin(); try { // begin usage of child socket (as it is registered with // completion port and so may be closed in the event that // the group is forcefully closed). channel.begin(); synchronized (result) { overlapped = ioCache.add(result); int n = accept0(handle, channel.handle(), overlapped, dataBuffer); if (n == IOStatus.UNAVAILABLE) { return; } // connection accepted immediately finishAccept(); // allow another accept before the result is set enableAccept(); result.setResult(channel); } } finally { // end usage on child socket channel.end(); } } catch (Throwable x) { // failed to initiate accept so release resources if (overlapped != 0L) ioCache.remove(overlapped); closeChildChannel(); if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); if (!(x instanceof IOException) && !(x instanceof SecurityException)) x = new IOException(x); enableAccept(); result.setFailure(x); } finally { // end of usage of listener socket end(); } // accept completed immediately but may not have executed on // initiating thread in which case the operation may have been // cancelled. if (result.isCancelled()) { closeChildChannel(); } // invoke completion handler Invoker.invokeIndirectly(result); }
result里面封装了我们注册的回调函数CompleteHandler,所以继续看上面代码最后一行
Invoker
1 . invokeIndirectly
/** * Invoke handler with completed result. The handler is invoked indirectly, * via the channel group's thread pool. */ static <V,A> void invokeIndirectly(PendingFuture<V,A> future) { assert future.isDone(); CompletionHandler<V,? super A> handler = future.handler(); if (handler != null) { invokeIndirectly(future.channel(), handler, future.attachment(), future.value(), future.exception()); } }2. invokeUnchecked
/** * Invokes the handler indirectly via the channel group's thread pool. */ static <V,A> void invokeIndirectly(AsynchronousChannel channel, final CompletionHandler<V,? super A> handler, final A attachment, final V result, final Throwable exc) { try { ((Groupable)channel).group().executeOnPooledThread(new Runnable() { public void run() { GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get(); if (thisGroupAndInvokeCount != null) thisGroupAndInvokeCount.setInvokeCount(1); invokeUnchecked(handler, attachment, result, exc); } }); } catch (RejectedExecutionException ree) { throw new ShutdownChannelGroupException(); } }
3.
/** * Invoke handler without checking the thread identity or number of handlers * on the thread stack. */ static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler, A attachment, V value, Throwable exc) { if (exc == null) { handler.completed(value, attachment); } else { handler.failed(exc, attachment); } // clear interrupt Thread.interrupted(); }
到这里看到了我们的handler的调用了吧!
所以封装毕竟是封装啊,但是可以看到很多资源的公用啊,做的优化~