首页 > 代码库 > [编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现
[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现
netty Future是基于jdk Future扩展,以监听完成任务触发执行
Promise是对Future修改任务数据
DefaultPromise是重要的模板类,其它不同类型实现基本是一层简单的包装,如DefaultChannelPromise
主要是分析await是如何等侍结果的
public interface Future<V> extends java.util.concurrent.Future<V> { Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);}public interface Promise<V> extends Future<V> { Promise<V> setSuccess(V result); boolean trySuccess(V result); Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); boolean setUncancellable();}public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { //已完成任务直接忽略 if (isDone()) { return true; } //没有等侍时间返回处理记录 if (timeoutNanos <= 0) { return isDone(); } //已中断抛异常 if (interruptable && Thread.interrupted()) { throw new InterruptedException(toString()); } //checkDeadLock(); //netty 认为是当前线程是死锁状态 EventExecutor e = executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); } long startTime = System.nanoTime(); long waitTime = timeoutNanos; boolean interrupted = false; try { for (;;) { synchronized (this) { if (isDone()) { return true; } //最大检查次数为 Short.MAX_VALUE //很奇怪的逻辑,处理完后又自减 if (waiters == Short.MAX_VALUE) { throw new IllegalStateException("too many waiters: " + this); } ++waiters; try { //阻塞的代码只是一行参数1是milliseconds,参数2是辅助用的大于0时milliseconds+1,如果是0的话会无限制阻塞 wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } finally { waiters--; } } //这里是double check跟并发无影响的逻辑放在synchronized外面 if (isDone()) { return true; } else { waitTime = timeoutNanos - (System.nanoTime() - startTime); if (waitTime <= 0) { return isDone(); } } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }}public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { private final Channel channel; public DefaultChannelPromise(Channel channel) { this.channel = channel; } public DefaultChannelPromise(Channel channel, EventExecutor executor) { super(executor); this.channel = channel; }}
[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。