首页 > 代码库 > Netty入门实例及分析
Netty入门实例及分析
什么是netty?下面是官方文档的简介:
The Netty project is an effort to providean asynchronous event-driven network application framework and tools for rapid development of maintainable high performance and high scalability protocol servers and clients. In other words, Netty isa NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP/IP socket server.
下面写一个简单的实例:
1.客户端细节分析
ChannelFactory是创建一个通道(和一次具体的通信实体关联如网络套接字)的主要接口,比如NioServerSocketChannelFactory 会创建一个Channel,有基于NIO的服务套接字作为底层的通信实体。一旦一个新的通道创建,那么对应的ChannelPipeline就会开始处理相关的ChannelEvents。
NioClientSocketChannelFactory会创建一个客户端的基于NIO的SocketChannel,利用非阻塞IO模型来高效处理这些并发的连接。其中有两种类型的线程, boss thread 和 worker thread,每个NioClientSocketChannelFactory 有一个boss thread,它主要是有请求要发出时试图进行一次连接,连接成功后,将这个连接的通道交付给一个worker thread,接下来这个worker thread 为一个或多个通道执行非阻塞的读写服务。
ClientBootstrap只是一个辅助函数,不会分配或者管理任何资源,管理资源是由构造器中指定的ChannelFactory完成的。所以从同一个ChannelFactory衍生出多个ClientBootstrap是可以的,从而为不同的Channel应用不同的设置。connect()方法会根据指定的SocketAddress试图建立连接,如果本地地址没有设置,就会自动分配,等价于:
ClientBootstrap b = ....;
b.connect(remoteAddress, b.getOption("localAddress"));
静态方法 Channels.pipeline(ChannelHandler... handlers)用参数所指定的ChannelHandler 来创建一个新的ChannelPipeline,当然它们是有顺序的,我们也可以自己一个一个的添加。
public static ChannelPipeline pipeline(ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException( "handlers");
}
ChannelPipeline newPipeline = pipeline ();
for (int i = 0; i < handlers. length; i ++) {
ChannelHandler h = handlers[i];
if (h == null) {
break;
}
newPipeline.addLast(ConversionUtil. toString(i), h);
}
return newPipeline;
}
2. 服务器端细节分析
服务器端构建的基本流程和客户端类似,只是这里的ChannelFactory,Bootstrap 都要满足作为server的特性。
NioServerSocketChannelFactory创建服务器端的,基于NIO的ServerSocketChannel,仍然是非阻塞模式。
每个绑定的ServerSocketChannel 有自身的boos thread,比如说打开监听了两个端口 80,443,那么就会有两个boss thread,各自负责各自端口的连接请求,直到那个端口解绑定,然后将接受的连接请求交给worker thread去处理。
这里是面向连接传输的ClientBootstrap 和 ServerBootstrap ,如果想用UDP的话就选 ConnectionlessBootstrap。
3. ChannelHandler的常见用法就会根据具体的事件类型做出具体的处理,牵扯到读写管道,而且有上下流的情况。
一个简单的netty例子:
TimeClientl.java
import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; public class TimeClient { public static void main(String[] args) { String host = args[0]; int port = Integer.parseInt(args[1]); ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new TimeClientHandler2()); } }); bootstrap.connect(new InetSocketAddress(host, port)); // } }
TimeClientHandler.java
import java.util.Date; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class TimeClientHandler extends SimpleChannelHandler{ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buffer = (ChannelBuffer)e.getMessage(); long currentTimeMills = buffer.readInt() * 1000L; System.out.println(new Date(currentTimeMills)); e.getChannel().close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); Channel c = e.getChannel(); c.close(); } }
TimeServer.java
import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; public class TimeServer { public static ChannelGroup allChannels = new DefaultChannelGroup("time-server"); public static void main(String[] args) { ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new TimeServerHandler2(), new TimeEncoder()); } }); bootstrap.setOption("reuseAddr", true); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); Channel channel = bootstrap.bind(new InetSocketAddress(8080)); allChannels.add(channel); //waitForShutdownCommand(); this is a imaginary logic:for instance //when there is accepted connection we close this server ; if(allChannels.size() >=2){ ChannelGroupFuture f = allChannels.close(); f.awaitUninterruptibly(); factory.releaseExternalResources(); } } }
TimeServerHandler.java
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class TimeServerHandler extends SimpleChannelHandler{ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { Channel ch = e.getChannel(); ChannelBuffer time = ChannelBuffers.buffer(4); //sizeof int time.writeInt((int)(System.currentTimeMillis()/1000L + 2208988800L)); ChannelFuture cf = ch.write(time); cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Channel ch = future.getChannel(); ch.close(); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); Channel c = e.getChannel(); c.close(); } }
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。