首页 > 代码库 > [编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现

[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现

netty Future是基于jdk Future扩展,以监听完成任务触发执行
Promise是对Future修改任务数据
DefaultPromise是重要的模板类,其它不同类型实现基本是一层简单的包装,如DefaultChannelPromise
主要是分析await是如何等侍结果的

技术分享

 

public interface Future<V> extends java.util.concurrent.Future<V> {   Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);}public interface Promise<V> extends Future<V> {    Promise<V> setSuccess(V result);    boolean trySuccess(V result);    Promise<V> setFailure(Throwable cause);    boolean tryFailure(Throwable cause);    boolean setUncancellable();}public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {    @Override    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {        return await0(unit.toNanos(timeout), true);    }    private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {        //已完成任务直接忽略        if (isDone()) {            return true;        }        //没有等侍时间返回处理记录        if (timeoutNanos <= 0) {            return isDone();        }        //已中断抛异常        if (interruptable && Thread.interrupted()) {            throw new InterruptedException(toString());        }        //checkDeadLock();        //netty 认为是当前线程是死锁状态        EventExecutor e = executor();        if (e != null && e.inEventLoop()) {            throw new BlockingOperationException(toString());        }                long startTime = System.nanoTime();        long waitTime = timeoutNanos;        boolean interrupted = false;         try {            for (;;) {                synchronized (this) {                    if (isDone()) {                        return true;                    }                    //最大检查次数为 Short.MAX_VALUE                    //很奇怪的逻辑,处理完后又自减                    if (waiters == Short.MAX_VALUE) {                        throw new IllegalStateException("too many waiters: " + this);                    }                    ++waiters;                    try {                        //阻塞的代码只是一行参数1是milliseconds,参数2是辅助用的大于0时milliseconds+1,如果是0的话会无限制阻塞                        wait(waitTime / 1000000, (int) (waitTime % 1000000));                    } catch (InterruptedException e) {                        if (interruptable) {                            throw e;                        } else {                            interrupted = true;                        }                    } finally {                        waiters--;                    }                }                //这里是double check跟并发无影响的逻辑放在synchronized外面                if (isDone()) {                    return true;                } else {                    waitTime = timeoutNanos - (System.nanoTime() - startTime);                    if (waitTime <= 0) {                        return isDone();                    }                }            }        } finally {            if (interrupted) {                Thread.currentThread().interrupt();            }        }    }}public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {    private final Channel channel;    public DefaultChannelPromise(Channel channel) {        this.channel = channel;    }    public DefaultChannelPromise(Channel channel, EventExecutor executor) {        super(executor);        this.channel = channel;    }}

 

[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现