首页 > 代码库 > Java NIO——Selector机制源码分析---转

Java NIO——Selector机制源码分析---转

一直不明白pipe是如何唤醒selector的,所以又去看了jdk的源码(openjdk下载),整理了如下:

以Java nio自带demo : OperationServer.java   OperationClient.java(见附件)

其中server端的核心代码:

public void initSelector() {        try {            selector = SelectorProvider.provider().openSelector();            this.serverChannel1 = ServerSocketChannel.open();            serverChannel1.configureBlocking(false);            InetSocketAddress isa = new InetSocketAddress("localhost", this.port1);            serverChannel1.socket().bind(isa);            serverChannel1.register(selector, SelectionKey.OP_ACCEPT);        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }}

从头开始,

先看看SelectorProvider.provider()做了什么:

public static SelectorProvider provider() {        synchronized (lock) {            if (provider != null)                return provider;            return AccessController.doPrivileged(                new PrivilegedAction<SelectorProvider>() {                    public SelectorProvider run() {                            if (loadProviderFromProperty())                                return provider;                            if (loadProviderAsService())                                return provider;                            provider = sun.nio.ch.DefaultSelectorProvider.create();                            return provider;                        }                    });        }    }

其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;

if (provider != nullreturn provider;

保证了整个server程序中只有一个WindowsSelectorProvider对象;

再看看WindowsSelectorProvider. openSelector():

public AbstractSelector openSelector() throws IOException {        return new WindowsSelectorImpl(this);    }new WindowsSelectorImpl(SelectorProvider)代码:WindowsSelectorImpl(SelectorProvider sp) throws IOException {        super(sp);        pollWrapper = new PollArrayWrapper(INIT_CAP);        wakeupPipe = Pipe.open();        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();        // Disable the Nagle algorithm so that the wakeup is more immediate        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();        (sink.sc).socket().setTcpNoDelay(true);        wakeupSinkFd = ((SelChImpl)sink).getFDVal();        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);    }

其中Pipe.open()是关键,这个方法的调用过程是:

Java代码  
public static Pipe open() throws IOException {        return SelectorProvider.provider().openPipe();}SelectorProvider 中:public Pipe openPipe() throws IOException {        return new PipeImpl(this);}

 

再看看怎么new PipeImpl()的:

Java代码
PipeImpl(SelectorProvider sp) {        long pipeFds = IOUtil.makePipe(true);        int readFd = (int) (pipeFds >>> 32);        int writeFd = (int) pipeFds;        FileDescriptor sourcefd = new FileDescriptor();        IOUtil.setfdVal(sourcefd, readFd);        source = new SourceChannelImpl(sp, sourcefd);        FileDescriptor sinkfd = new FileDescriptor();        IOUtil.setfdVal(sinkfd, writeFd);        sink = new SinkChannelImpl(sp, sinkfd); }

 

 

其中IOUtil.makePipe(true)是个native方法:

/**

     * Returns two file descriptors for a pipe encoded in a long.

     * The read end of the pipe is returned in the high 32 bits,

     * while the write end is returned in the low 32 bits.

     */

staticnativelong makePipe(boolean blocking);

具体实现:

 

JNIEXPORT jlong JNICALLJava_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking){    int fd[2];    if (pipe(fd) < 0) {        JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");        return 0;    }    if (blocking == JNI_FALSE) {        if ((configureBlocking(fd[0], JNI_FALSE) < 0)            || (configureBlocking(fd[1], JNI_FALSE) < 0)) {            JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");            close(fd[0]);            close(fd[1]);            return 0;        }    }    return ((jlong) fd[0] << 32) | (jlong) fd[1];}static intconfigureBlocking(int fd, jboolean blocking){    int flags = fcntl(fd, F_GETFL);    int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);    return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);}

 

正如这段注释:

/**

     * Returns two file descriptors for a pipe encoded in a long.

     * The read end of the pipe is returned in the high 32 bits,

     * while the write end is returned in the low 32 bits.

     */

High32位存放的是通道read端的文件描述符FD(file descriptor),low 32 bits存放的是write端的文件描述符。所以取到makepipe()返回值后要做移位处理。

 

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

这行代码把返回的pipe的write端的FD放在了pollWrapper中(后面会发现,这么做是为了实现selector的wakeup())

 

ServerSocketChannel.open()的实现:

public static ServerSocketChannel open() throws IOException {        return SelectorProvider.provider().openServerSocketChannel();}SelectorProvider:public ServerSocketChannel openServerSocketChannel() throws IOException {        return new ServerSocketChannelImpl(this);}

 

可见创建的ServerSocketChannelImpl也有WindowsSelectorImpl的引用。

ServerSocketChannelImpl(SelectorProvider sp) throws IOException {        super(sp);        this.fd =  Net.serverSocket(true);    //打开一个socket,返回FD        this.fdVal = IOUtil.fdVal(fd);        this.state = ST_INUSE;}

 

然后通过serverChannel1.register(selector, SelectionKey.OP_ACCEPT);把selector和channel绑定在一起,也就是把new ServerSocketChannel时创建的FD与selector绑定在了一起。

到此,server端已启动完成了,主要创建了以下对象:

WindowsSelectorProvider:单例

WindowsSelectorImpl中包含:

    pollWrapper:保存selector上注册的FD,包括pipe的write端FD和ServerSocketChannel所用的FD

    wakeupPipe:通道(其实就是两个FD,一个read,一个write)

 

再到Server 中的run():

selector.select();主要调用了WindowsSelectorImpl中的这个方法:

 

protected int doSelect(long timeout) throws IOException {        if (channelArray == null)            throw new ClosedSelectorException();        this.timeout = timeout; // set selector timeout        processDeregisterQueue();        if (interruptTriggered) {            resetWakeupSocket();            return 0;        }        // Calculate number of helper threads needed for poll. If necessary        // threads are created here and start waiting on startLock        adjustThreadsCount();        finishLock.reset(); // reset finishLock        // Wakeup helper threads, waiting on startLock, so they start polling.        // Redundant threads will exit here after wakeup.        startLock.startThreads();        // do polling in the main thread. Main thread is responsible for        // first MAX_SELECTABLE_FDS entries in pollArray.        try {            begin();            try {                subSelector.poll();            } catch (IOException e) {                finishLock.setException(e); // Save this exception            }            // Main thread is out of poll(). Wakeup others and wait for them            if (threads.size() > 0)                finishLock.waitForHelperThreads();          } finally {              end();          }        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.        finishLock.checkForException();        processDeregisterQueue();        int updated = updateSelectedKeys();        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.        resetWakeupSocket();        return updated;    }

 

 其中subSelector.poll()是核心,也就是轮训pollWrapper中保存的FD;具体实现是调用native方法poll0:

private int poll() throws IOException{ // poll for the main thread            return poll0(pollWrapper.pollArrayAddress,                         Math.min(totalChannels, MAX_SELECTABLE_FDS),                         readFds, writeFds, exceptFds, timeout);        }private native int poll0(long pollAddress, int numfds,             int[] readFds, int[] writeFds, int[] exceptFds, long timeout);// These arrays will hold result of native select().            // The first element of each array is the number of selected sockets.        // Other elements are file descriptors of selected sockets.        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存发生read的FD        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生write的FD        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生except的FD

 

这个poll0()会监听pollWrapper中的FD有没有数据进出,这会造成IO阻塞,直到有数据读写事件发生。比如,由于pollWrapper中保存的也有ServerSocketChannel的FD,所以只要ClientSocket发一份数据到ServerSocket,那么poll0()就会返回;又由于pollWrapper中保存的也有pipe的write端的FD,所以只要pipe的write端向FD发一份数据,也会造成poll0()返回;如果这两种情况都没有发生,那么poll0()就一直阻塞,也就是selector.select()会一直阻塞;如果有任何一种情况发生,那么selector.select()就会返回,所有在OperationServer的run()里要用while (true) {,这样就可以保证在selector接收到数据并处理完后继续监听poll();

这时再来看看WindowsSelectorImpl. Wakeup():

 

public Selector wakeup() {        synchronized (interruptLock) {            if (!interruptTriggered) {                setWakeupSocket();                interruptTriggered = true;            }        }        return this;    }// Sets Windows wakeup socket to a signaled state.    private void setWakeupSocket() {        setWakeupSocket0(wakeupSinkFd);    }private native void setWakeupSocket0(int wakeupSinkFd);JNIEXPORT void JNICALLJava_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,                                                jint scoutFd){    /* Write one byte into the pipe */    const char byte = 1;    send(scoutFd, &byte, 1, 0);}

可见wakeup()是通过pipe的write 端send(scoutFd, &byte, 1, 0),发生一个字节1,来唤醒poll()。所以在需要的时候就可以调用selector.wakeup()来唤醒selector。

原文:http://goon.iteye.com/blog/1775421