首页 > 代码库 > Netty3 源码分析 - ChannelFuture

Netty3 源码分析 - ChannelFuture

Netty3 源码分析 - ChannelFuture

ChannelFuture抽象的是Channel中异步IO操作的结果。在Netty中,所有的IO操作是异步的,意味着任何IO调用会立刻返回,而不是等到操作真正的执行完成。相反,会返回一个ChannelFuture 对象,在IO完成之后通过其得到结果状态。ChannelFuture 要么完成要么未完成,当IO操作开始执行会创建一个新的future对象,初始状态时uncompleted (不是成功,失败,也不是取消)因为IO操作还木有完成,一旦IO操作完成(成功,失败,或者被取消).这个CHannelFuture对象会被标记为completed ,并且有具体的信息,要注意的是即使失败或取消也属于完成状态。如下图:


有很多方法可以供我们检查IO操作是否完成,等待完成,提携IO操作的结果,并且可以增加ChannelFutureListeners  ,在IO操作完成的时候获得通知,这是观察者模式。建议的方式是利用 addListener(ChannelFutureListener) 而不是 await() 来等待IO操作的完成,addListener(ChannelFutureListener) 是非阻塞的只是为这个ChannelFuture增加了一个ChannelFutureListener,而且在与这个future关联的IO操作完成后,IO线程会通知这些观察者。因为非阻塞, ChannelFutureListener可以获得最后的性能和资源利用率,但是如果你不熟悉事件驱动编程(event-driven programming)的话,会发现实现一个时序逻辑(a sequential logic )会很棘手。与之相反的是,await() 是阻塞的操作,用await() 实现一个时序逻辑简单些,但是调用线程会毫无必要的阻塞在那里,直达IO操作完成,而且在线程间通信( inter-thread notification)的代价非常大。而且会有发生死锁的情况。
ChannelHandler 中的事件处理方法通常会被IO线程调用,除非pipeline中有个ExecutionHandler 。如果await() 方法被事件处理方法调用,它等待的IO操作可能永远不会完成,所以await() 方法会阻塞它所等待的IO操作,这是一个死锁。所以不要在ChannelHandler中调用await()。
 // BAD - NEVER DO THIS
 @Override
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     if (e.getMessage() instanceof GoodByeMessage) {
         ChannelFuture future = e.getChannel().close();
         future.awaitUninterruptibly();
         // Perform post-closure operation
         // ...
     }
 }

 // GOOD
 @Override
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     if (e.getMessage() instanceof GoodByeMessage) {
         ChannelFuture future = e.getChannel().close();
         future.addListener(new ChannelFutureListener() {
             public void operationComplete(ChannelFuture future) {
                 // Perform post-closure operation
                 // ...
             }
         });
     }
 }
虽然上面说到了await()方法的弊端,但是也有场景适合它,此时就要确保不会再IO线程中调用它,否则为了防止死锁会抛出 IllegalStateException 。
Do not confuse I/O timeout and await timeout
The timeout value you specify with 为方法await(long), await(long, TimeUnit), awaitUninterruptibly(long), 或者awaitUninterruptibly(long, TimeUnit)指定的超时时间值和IO超时没有任何关系。如果IO操作超时,这个future的状态时 ‘completed with failure,‘ 比如,连接超时这个选项应该为传输层设置而不是为这个future设置:
 // BAD - NEVER DO THIS
 ClientBootstrap b = ...;
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly(10, TimeUnit.SECONDS);
 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     // You might get a NullPointerException here because the future
     // might not be completed yet.
     f.getCause().printStackTrace();
 } else {
     // Connection established successfully
 }

 // GOOD
 ClientBootstrap b = ...;
 // Configure the connect timeout option.
 b.setOption("connectTimeoutMillis", 10000);
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly();

 // Now we are sure the future is completed.
 assert f.isDone();

 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     f.getCause().printStackTrace();
 } else {
     // Connection established successfully
 }
 
详细的源码注释看这里 github。







Netty3 源码分析 - ChannelFuture