首页 > 代码库 > netty--NioEventLoop滴干活
netty--NioEventLoop滴干活
netty是最近项目要用到的nio框架,找了各种资料,发现称赞它的有点多,所以决定用它:其实也就二选一嘛,mina或netty或自己写。对于mina,也不熟,不过看各种介绍,貌似netty干活还是很不错的,尤其是最新的4.x和5.x重构后,且使用结构清晰就先了解了解了。
首先要把应用跑起来啦(官网的例子比较多),我这是一个关于mqtt的一个例子:
1 m_bossGroup = new NioEventLoopGroup(); 2 m_workerGroup = new NioEventLoopGroup(); 3 4 final NettyMQTTHandler handler = new NettyMQTTHandler(); 5 handler.setMessaging(messaging); 6 7 ServerBootstrap b = new ServerBootstrap(); 8 b.group(m_bossGroup, m_workerGroup) 9 .channel(NioServerSocketChannel.class) 10 .childHandler(new ChannelInitializer<SocketChannel>() { 11 @Override12 public void initChannel(SocketChannel ch) throws Exception {13 ChannelPipeline pipeline = ch.pipeline();14 //pipeline.addFirst("metrics", new BytesMetricsHandler(m_metricsCollector));15 pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT));16 pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimoutHandler());17 //pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));18 pipeline.addLast("decoder", new MQTTDecoder());19 pipeline.addLast("encoder", new MQTTEncoder());20 pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));21 pipeline.addLast("handler", handler);22 }23 })24 .option(ChannelOption.SO_BACKLOG, 128)25 .option(ChannelOption.SO_REUSEADDR, true)26 .childOption(ChannelOption.SO_KEEPALIVE, true); 27 try { 28 // Bind and start to accept incoming connections.29 ChannelFuture f = b.bind(Constants.PORT);30 LOG.info("Server binded");31 f.sync();32 } catch (InterruptedException ex) {33 LOG.error(null, ex);34 }
再回想下,我们自己写serversocket的时候是怎么写的呢(这是一个笨拙的实例代码):
ServerSocket socket; channel = ServerSocketChannel.open(); // 打开通道 socket = channel.socket(); //得到与通到相关的socket对象 socket.bind(new InetSocketAddress(port)); //将scoket榜定在制定的端口上 //配置通到使用非阻塞模式,在非阻塞模式下,可以编写多道程序同时避免使用复杂的多线程 channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_ACCEPT); try { while (true) { this.selector.select(); Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); this.handleKey(key); } } } catch (IOException ex) { ex.printStackTrace(); }
原理还是那些,channel.open(),然后register key,然后遍历,再然后才进行handleKey()的干活。
那netty的写法为什么那么潇洒呢,怀着这个莫名的疑问,我先不管它的结构什么的,直接进行search,发现了这么个东东:
1 NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {2 super(parent, threadFactory, false);3 if (selectorProvider == null) {4 throw new NullPointerException("selectorProvider");5 }6 provider = selectorProvider;7 selector = openSelector();8 }
其中第8行从名称上来看,有点点意思了,往下看:
1 private Selector openSelector() {2 final Selector selector;3 try {4 selector = provider.openSelector();
其中的provider就是我们熟悉的:java.nio.channels.spi.SelectorProvider类。
所以这个就是做了selector.open的工作。
接下来能看到NioEventLoop:
1 protected void run() {2 for (;;) {3 oldWakenUp = wakenUp.getAndSet(false);4 try {5 if (hasTasks()) {6 selectNow();7 } else {8 select();
再继续看,该类中处理的selectedKey:
1 final NioUnsafe unsafe = ch.unsafe(); 2 if (!k.isValid()) { 3 // close the channel if the key is not valid anymore 4 unsafe.close(unsafe.voidPromise()); 5 return; 6 } 7 8 try { 9 int readyOps = k.readyOps();10 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {11 unsafe.read();12 if (!ch.isOpen()) {13 // Connection already closed - no need to handle write.14 return;15 }16 }17 if ((readyOps & SelectionKey.OP_WRITE) != 0) {18 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write19 ch.unsafe().forceFlush();20 }21 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {22 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking23 // See https://github.com/netty/netty/issues/92424 int ops = k.interestOps();25 ops &= ~SelectionKey.OP_CONNECT;26 k.interestOps(ops);27 28 unsafe.finishConnect();29 }30 } catch (CancelledKeyException e) {31 unsafe.close(unsafe.voidPromise());32 }33
现在明白了吧,其实netty也是走这么一套逻辑。
然后再网上看,逻辑是这样:
NioEventLoopGroup extends MultithreadEventExecutorGroup,其初始化了n个单线程的线程池(children = new SingleThreadEventExecutor[nThreads];)
每个单线程的对象child[i]=NioEventLoop对象,每个NioEventLoop有一个Selector字段。
其run方法是该group都需要干活的具体业务逻辑代码。
后续再加上别的类说明。