首页 > 代码库 > Java Netty (2)

Java Netty (2)

通过一个实例来说明Netty的使用。用1个服务器连接5个客户端线程,客户端连接上服务器以后就向服务器发送消息,服务器接收到消息后向客户端返回消息,客户端接收到消息以后,等待随机的时间,再向服务端发送消息,这样一直循环下去。

项目结构:

技术分享

NettyServer.java:

package Server;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;  import org.jboss.netty.channel.*;  import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;  import org.jboss.netty.handler.execution.ExecutionHandler;import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;import Util.Constant;public class NettyServer {        public static String host = "127.0.0.1";        // 创建1个线程池    static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));        public static void main(String[] args) {        // ChannelFactory        final ChannelFactory channelFactory = new NioServerSocketChannelFactory(                  // Boss线程池,处理Socket请求                Executors.newCachedThreadPool(),                  // Worker线程池,由于使用的是NIO,1个Worker线程可以管理多个Channel                Executors.newCachedThreadPool());         // ServerBootstrap        ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);        ServerPipelineFactory serverPipelineFactory = new ServerPipelineFactory(executionHandler);        bootstrap.setPipelineFactory(serverPipelineFactory);                 // 禁用nagle算法        bootstrap.setOption("child.tcpNoDelay", true);          // 启用TCP保活检测        bootstrap.setOption("child.keepAlive", true);                 // 监听5个端口        bootstrap.bind(new InetSocketAddress(Constant.p1));        System.out.println("Listening port " + Constant.p1 + "...");        bootstrap.bind(new InetSocketAddress(Constant.p2));        System.out.println("Listening port " + Constant.p2 + "...");        bootstrap.bind(new InetSocketAddress(Constant.p3));        System.out.println("Listening port " + Constant.p3 + "...");        bootstrap.bind(new InetSocketAddress(Constant.p4));        System.out.println("Listening port " + Constant.p4 + "...");        bootstrap.bind(new InetSocketAddress(Constant.p5));        System.out.println("Listening port " + Constant.p5 + "...");    }}

ServerPipelineFactory.java:

package Server;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.handler.codec.string.StringDecoder;import org.jboss.netty.handler.codec.string.StringEncoder;import org.jboss.netty.handler.execution.ExecutionHandler;public class ServerPipelineFactory implements ChannelPipelineFactory {        private final ExecutionHandler executionHandler;         public ServerPipelineFactory(ExecutionHandler executionHandler){        this.executionHandler = executionHandler;    }    @Override    public ChannelPipeline getPipeline() throws Exception {        // TODO Auto-generated method stub        return Channels.pipeline(                 new StringEncoder(),                    new StringDecoder(),                 // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前                executionHandler,                // 业务逻辑handler                new MyServerHandler());    } }

MyServerHandler.java:

package Server;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ChannelStateEvent;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;import Util.Tool;public class MyServerHandler extends SimpleChannelHandler{        @SuppressWarnings("static-access")    @Override    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        System.out.println("Server received:" + e.getMessage());        // 休息随机秒后发送消息        Thread th = Thread.currentThread();        int interval = Tool.getInterval(100);        th.sleep(interval*1000);        e.getChannel().write("from Server: Hello!");        super.messageReceived(ctx, e);    }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {          e.getCause().printStackTrace();          Channel ch = e.getChannel();          ch.close();         super.exceptionCaught(ctx, e);    }         @Override      public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {        System.out.println("A client connected!");        super.channelConnected(ctx, e);     }}

NettyClient.java:

package Client;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ClientBootstrap;  import org.jboss.netty.channel.*;  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;  import org.jboss.netty.handler.execution.ExecutionHandler;import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;import Util.Constant;public class NettyClient extends Thread{        public static String host = "127.0.0.1";    ClientBootstrap bootstrap;    int port;        // 创建1个线程池    static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));        public NettyClient(int port) {        this.port = port;        // ChannelFactory        final ChannelFactory channelFactory = new NioClientSocketChannelFactory(                  // Boss线程池                Executors.newCachedThreadPool(),                  // Worker线程池                Executors.newCachedThreadPool());          // ServerBootstrap        bootstrap = new ClientBootstrap(channelFactory);                ClientPipelineFactory clientPipelineFactory = new ClientPipelineFactory(executionHandler);        bootstrap.setPipelineFactory(clientPipelineFactory);        bootstrap.setOption("tcpNoDelay" ,true);          bootstrap.setOption("keepAlive", true);          bootstrap.connect(new InetSocketAddress(port));     }        public void run(){        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));        // 开始试图连接        System.out.println("Connecting to port " + port + "...");        // 等待直到连接关闭或失败        future.getChannel().getCloseFuture().awaitUninterruptibly();         // 关闭线程池准备退出        bootstrap.releaseExternalResources();    }        public static void main(String[] args) {        NettyClient nc1 = new NettyClient(Constant.p1);        NettyClient nc2 = new NettyClient(Constant.p2);        NettyClient nc3 = new NettyClient(Constant.p3);        NettyClient nc4 = new NettyClient(Constant.p4);        NettyClient nc5 = new NettyClient(Constant.p5);                nc1.start();        nc2.start();        nc3.start();        nc4.start();        nc5.start();    }}

ClientPipelineFactory.java:

package Client;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.handler.codec.string.StringDecoder;import org.jboss.netty.handler.codec.string.StringEncoder;import org.jboss.netty.handler.execution.ExecutionHandler;public class ClientPipelineFactory implements ChannelPipelineFactory {        private final ExecutionHandler executionHandler;         public ClientPipelineFactory(ExecutionHandler executionHandler){        this.executionHandler = executionHandler;    }    @Override    public ChannelPipeline getPipeline() throws Exception {        // TODO Auto-generated method stub        return Channels.pipeline(                 new StringEncoder(),                    new StringDecoder(),                 // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前                executionHandler,                // 业务逻辑handler                new MyClientHandler());    } }

MyClientHandler.java:

package Client;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.ChannelStateEvent;import org.jboss.netty.channel.ExceptionEvent;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;import org.jboss.netty.channel.SimpleChannelUpstreamHandler;import Util.Tool;public class MyClientHandler extends SimpleChannelHandler{              // 连接到服务端时,发出消息    @Override    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {         System.out.println("Connected to Server!");        e.getChannel().write("from Client: Hello! " + System.currentTimeMillis());         super.channelConnected(ctx, e);    }        @SuppressWarnings("static-access")    @Override    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        System.out.println("Client Received:" + e.getMessage());        // 休息随机秒后发送消息        Thread th = Thread.currentThread();        int interval = Tool.getInterval(5);        th.sleep(interval*1000);        e.getChannel().write("from Client: Hello! "  + System.currentTimeMillis());        super.messageReceived(ctx, e);    }            @Override    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {          e.getCause().printStackTrace();          Channel ch = e.getChannel();          ch.close();         super.exceptionCaught(ctx, e);    } }

Constant.java:

package Util;public class Constant {    final static int start = 10000;    public static int p1 = start + 1;    public static int p2 = start + 2;    public static int p3 = start + 3;    public static int p4 = start + 4;    public static int p5 = start + 5;}

Tool.java:

package Util;import java.util.Random;public class Tool {        static Random rand = new Random();        public static int getInterval(int max){        return rand.nextInt(max);    }}

 

Java Netty (2)