Selector:java nio无阻塞io实现的关键。

阻塞io是指jdk1.4之前版本面向流的io,服务端需要对每个请求建立一堆线程等待请求,而客户端发送请求后,先咨询服务端是否有线程相应,如果没有则会一直等待或者遭到拒 绝请求,如果有的话,客户端会线程会等待请求结束后才继续执行。







selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);ssc.socket().bind(new InetSocketAddress(port));ssc.register(selector, SelectionKey.OP_ACCEPT);while (true) {	// select()阻塞,等待有事件发生唤醒	int selected = selector.select();	if (selected > 0) {		Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();		while (selectedKeys.hasNext()) {			SelectionKey key = selectedKeys.next();			if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {				// 处理 accept 事件			} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {				// 处理 read 事件			} else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {				// 处理 write 事件			}			selectedKeys.remove();		}	}}

阻塞后唤醒可以通过注册在selector上的socket有事件发生 或者 selector.select(timeOut)超时 或者 selector.wakeup()主动唤醒;




1. Selector.open()


Selector.java-----public static Selector open() throws IOException {      return SelectorProvider.provider().openSelector();  }




SelectorProvider.java-----  public static SelectorProvider provider() {synchronized (lock) {	if (provider != null)	return provider;	return (SelectorProvider)AccessController	.doPrivileged(new PrivilegedAction() {		public Object run() {			if (loadProviderFromProperty())			return provider;			if (loadProviderAsService())			return provider;			provider = sun.nio.ch.DefaultSelectorProvider.create();			return provider;		}		});}}

 其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;



WindowsSelectorProvider.java----public AbstractSelector openSelector() throws IOException {	return new WindowsSelectorImpl(this);}WindowsSelectorImpl.java----WindowsSelectorImpl(SelectorProvider sp) throws IOException {	super(sp);	pollWrapper = new PollArrayWrapper(INIT_CAP);	wakeupPipe = Pipe.open();	wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();	// Disable the Nagle algorithm so that the wakeup is more immediate	SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();	(sink.sc).socket().setTcpNoDelay(true);	wakeupSinkFd = ((SelChImpl)sink).getFDVal();	pollWrapper.addWakeupSocket(wakeupSourceFd, 0);}




Pipe.java----public static Pipe open() throws IOException {return SelectorProvider.provider().openPipe();}




SelectorProvider.java----public Pipe openPipe() throws IOException {	return new PipeImpl(this);}




PipeImpl.java----PipeImpl(final SelectorProvider sp) throws IOException {	try {		AccessController.doPrivileged(new Initializer(sp));	} catch (PrivilegedActionException x) {		throw (IOException)x.getCause();	}}

 创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法



PipeImpl.Initializer-----public Object run() throws IOException {	ServerSocketChannel ssc = null;	SocketChannel sc1 = null;	SocketChannel sc2 = null;	try {		// loopback address		InetAddress lb = InetAddress.getByName("");		assert(lb.isLoopbackAddress());		// bind ServerSocketChannel to a port on the loopback address		ssc = ServerSocketChannel.open();		ssc.socket().bind(new InetSocketAddress(lb, 0));		// Establish connection (assumes connections are eagerly		// accepted)		InetSocketAddress sa			= new InetSocketAddress(lb, ssc.socket().getLocalPort());		sc1 = SocketChannel.open(sa);		ByteBuffer bb = ByteBuffer.allocate(8);		long secret = rnd.nextLong();		bb.putLong(secret).flip();		sc1.write(bb);		// Get a connection and verify it is legitimate		for (;;) {			sc2 = ssc.accept();			bb.clear();			sc2.read(bb);			bb.rewind();			if (bb.getLong() == secret)				break;			sc2.close();		}		// Create source and sink channels		source = new SourceChannelImpl(sp, sc1);		sink = new SinkChannelImpl(sp, sc2);	} catch (IOException e) {		try {			if (sc1 != null)				sc1.close();			if (sc2 != null)				sc2.close();		} catch (IOException e2) { }		IOException x = new IOException("Unable to establish"										+ " loopback connection");		x.initCause(e);		throw x;	} finally {		try {			if (ssc != null)				ssc.close();		} catch (IOException e2) { }	}	return null;}

source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0))



PollArrayWrapper.java----private AllocatedNativeObject pollArray; // The fd array// Adds Windows wakeup socket at a given index.void addWakeupSocket(int fdVal, int index) {	putDescriptor(index, fdVal);	putEventOps(index, POLLIN);}// Access methods for fd structuresvoid putDescriptor(int i, int fd) {	pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);}void putEventOps(int i, int event) {	pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);}




AllocatedNativeObject.java----class AllocatedNativeObject extends NativeObjectAllocatedNativeObject(int size, boolean pageAligned) {	super(size, pageAligned);}NativeObject.java----protected NativeObject(int size, boolean pageAligned) {	if (!pageAligned) {		this.allocationAddress = unsafe.allocateMemory(size);		this.address = this.allocationAddress;	} else {		int ps = pageSize();		long a = unsafe.allocateMemory(size + ps);		this.allocationAddress = a;		this.address = a + ps - (a & (ps - 1));	}}

 从以上可以看到pollArray是通过unsafe.allocateMemory(size + ps)分配的一块系统内存



2. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);


AbstractSelectableChannel.java --> register() --> SelectorImpl.java----protected final SelectionKey register(AbstractSelectableChannel ch,int ops,Object attachment){	if (!(ch instanceof SelChImpl))		throw new IllegalSelectorException();	SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);	k.attach(attachment);	synchronized (publicKeys) {		implRegister(k);	}	k.interestOps(ops);	return k;}




WindowsSelectorImpl.java----protected void implRegister(SelectionKeyImpl ski) {	growIfNeeded();	channelArray[totalChannels] = ski;	ski.setIndex(totalChannels);	fdMap.put(ski);	keys.add(ski);	pollWrapper.addEntry(totalChannels, ski);	totalChannels++;}PollArrayWrapper.java----void addEntry(int index, SelectionKeyImpl ski) {	putDescriptor(index, ski.channel.getFDVal());}




3. selector.select();


SelectorImpl.java----public int select(long timeout) throws IOException{	if (timeout < 0)		throw new IllegalArgumentException("Negative timeout");	return lockAndDoSelect((timeout == 0) ? -1 : timeout);}private int lockAndDoSelect(long timeout) throws IOException {	synchronized (this) {		if (!isOpen())			throw new ClosedSelectorException();		synchronized (publicKeys) {			synchronized (publicSelectedKeys) {				return doSelect(timeout);			}		}	}}




WindowsSelectorImpl.java----protected int doSelect(long timeout) throws IOException {	if (channelArray == null)		throw new ClosedSelectorException();	this.timeout = timeout; // set selector timeout	processDeregisterQueue();	if (interruptTriggered) {		resetWakeupSocket();		return 0;	}	// Calculate number of helper threads needed for poll. If necessary	// threads are created here and start waiting on startLock	adjustThreadsCount();	finishLock.reset(); // reset finishLock	// Wakeup helper threads, waiting on startLock, so they start polling.	// Redundant threads will exit here after wakeup.	startLock.startThreads();	// do polling in the main thread. Main thread is responsible for	// first MAX_SELECTABLE_FDS entries in pollArray.	try {		begin();		try {			subSelector.poll();		} catch (IOException e) {			finishLock.setException(e); // Save this exception		}		// Main thread is out of poll(). Wakeup others and wait for them		if (threads.size() > 0)			finishLock.waitForHelperThreads();	  } finally {		  end();	  }	// Done with poll(). Set wakeupSocket to nonsignaled  for the next run.	finishLock.checkForException();	processDeregisterQueue();	int updated = updateSelectedKeys();	// Done with poll(). Set wakeupSocket to nonsignaled  for the next run.	resetWakeupSocket();	return updated;}private int poll() throws IOException{ // poll for the main thread	return poll0(pollWrapper.pollArrayAddress,				 Math.min(totalChannels, MAX_SELECTABLE_FDS),				 readFds, writeFds, exceptFds, timeout);}

 private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);



WindowsSelectorImpl.c----Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,                                   jlong pollAddress, jint numfds,                                   jintArray returnReadFds, jintArray returnWriteFds,                                   jintArray returnExceptFds, jlong timeout){								   	// 代码.... 此处省略一万字	/* Call select */    if ((result = select(0 , &readfds, &writefds, &exceptfds, tv)) == SOCKET_ERROR) {			// 代码.... 此处省略一万字				for (i = 0; i < numfds; i++) {			// 代码.... 此处省略一万字		}													 	}}


到这里已经比较清楚了,退出阻塞的方式有:regist在selector上的socketChannel处于就绪状态(放在pollArray中的socketChannel的FD就绪) 或者 第1节中放在pollArray中的wakeupSourceFd就绪。前者(socketChannel)就绪唤醒应证了文章开始的阻塞->事件驱动->唤醒的过程,后者(wakeupSourceFd)就是下面要看的主动wakeup。



4. selector.wakeup()


WindowsSelectorImpl.java----public Selector wakeup() {	synchronized (interruptLock) {		if (!interruptTriggered) {			setWakeupSocket();			interruptTriggered = true;		}	}	return this;}// Sets Windows wakeup socket to a signaled state.private void setWakeupSocket() {	setWakeupSocket0(wakeupSinkFd);}private native void setWakeupSocket0(int wakeupSinkFd);



WindowsSelectorImpl.c----Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,                                                jint scoutFd){    /* Write one byte into the pipe */    send(scoutFd, (char*)&POLLIN, 1, 0);}









