首页 > 代码库 > Mina、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

Mina、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

在上一篇博文中,有介绍到用换行符分割消息的方法。但是这种方法有个小问题,如果消息中本身就包含换行符,那将会将这条消息分割成两条,结果就不对了。

本文介绍另外一种消息分割方式,即上一篇博文中讲的第2条:use a fixed length header that indicates the length of the body,用一个固定字节数的Header前缀来指定Body的字节数,以此来分割消息。

上面图中Header固定为4字节,Header中保存的是一个4字节(32位)的整数,例如12即为0x0000000C,这个整数用来指定Body的长度(字节数)。当读完这么多字节的Body之后,又是下一条消息的Header。

下面分别用MINA、Netty、Twisted来实现对这种消息的切合和解码。

MINA:

MINA提供了PrefixedStringCodecFactory来对这种类型的消息进行编码解码,PrefixedStringCodecFactory默认Header的大小是4字节,当然也可以指定成1或2。

public class TcpServer {        public static void main(String[] args) throws IOException {          IoAcceptor acceptor = new NioSocketAcceptor();                    // 4字节的Header指定Body的字节数,对这种消息的处理          acceptor.getFilterChain().addLast("codec",                   new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));                    acceptor.setHandler(new TcpServerHandle());          acceptor.bind(new InetSocketAddress(8080));      }    }    class TcpServerHandle extends IoHandlerAdapter {        @Override      public void exceptionCaught(IoSession session, Throwable cause)              throws Exception {          cause.printStackTrace();      }        // 接收到新的数据      @Override      public void messageReceived(IoSession session, Object message)              throws Exception {            String msg = (String) message;          System.out.println("messageReceived:" + msg);                }        @Override      public void sessionCreated(IoSession session) throws Exception {          System.out.println("sessionCreated");      }        @Override      public void sessionClosed(IoSession session) throws Exception {          System.out.println("sessionClosed");      }  } 

Netty:

Netty使用LengthFieldBasedFrameDecoder来处理这种消息。下面代码中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5个参数,分别是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength为消息的最大长度,lengthFieldOffset为Header的位置,lengthFieldLength为Header的长度,lengthAdjustment为长度调整(默认Header中的值表示Body的长度,并不包含Header自己),initialBytesToStrip为去掉字节数(默认解码后返回Header+Body的全部内容,这里设为4表示去掉4字节的Header,只留下Body)。

public class TcpServer {        public static void main(String[] args) throws InterruptedException {          EventLoopGroup bossGroup = new NioEventLoopGroup();          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {              ServerBootstrap b = new ServerBootstrap();              b.group(bossGroup, workerGroup)                      .channel(NioServerSocketChannel.class)                      .childHandler(new ChannelInitializer<SocketChannel>() {                          @Override                          public void initChannel(SocketChannel ch)                                  throws Exception {                              ChannelPipeline pipeline = ch.pipeline();                                                            // LengthFieldBasedFrameDecoder按行分割消息,取出body                              pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));                              // 再按UTF-8编码转成字符串                              pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));                                                            pipeline.addLast(new TcpServerHandler());                          }                      });              ChannelFuture f = b.bind(8080).sync();              f.channel().closeFuture().sync();          } finally {              workerGroup.shutdownGracefully();              bossGroup.shutdownGracefully();          }      }    }    class TcpServerHandler extends ChannelInboundHandlerAdapter {        // 接收到新的数据      @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) {                    String message = (String) msg;          System.out.println("channelRead:" + message);      }        @Override      public void channelActive(ChannelHandlerContext ctx) {          System.out.println("channelActive");      }        @Override      public void channelInactive(ChannelHandlerContext ctx) {          System.out.println("channelInactive");      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {          cause.printStackTrace();          ctx.close();      }  }  

Twisted:

在Twisted中需要继承Int32StringReceiver,不再继承Protocol。Int32StringReceiver表示固定32位(4字节)的Header,另外还有Int16StringReceiver、Int8StringReceiver等。而需要实现的接受数据事件的方法不再是dataReceived,也不是lineReceived,而是stringReceived。

# -*- coding:utf-8 –*-    from twisted.protocols.basic import Int32StringReceiver  from twisted.internet.protocol import Factory  from twisted.internet import reactor    class TcpServerHandle(Int32StringReceiver):        # 新的连接建立      def connectionMade(self):          print connectionMade        # 连接断开      def connectionLost(self, reason):          print connectionLost        # 接收到新的数据      def stringReceived(self, data):          print stringReceived: + data    factory = Factory()  factory.protocol = TcpServerHandle  reactor.listenTCP(8080, factory)  reactor.run()  

下面是Java编写的一个客户端测试程序:

public class TcpClient {        public static void main(String[] args) throws IOException {            Socket socket = null;          DataOutputStream out = null;            try {                socket = new Socket("localhost", 8080);              out = new DataOutputStream(socket.getOutputStream());                // 请求服务器              String data1 = "牛顿";              byte[] outputBytes1 = data1.getBytes("UTF-8");              out.writeInt(outputBytes1.length); // write header              out.write(outputBytes1); // write body                            String data2 = "爱因斯坦";              byte[] outputBytes2 = data2.getBytes("UTF-8");              out.writeInt(outputBytes2.length); // write header              out.write(outputBytes2); // write body                            out.flush();            } finally {              // 关闭连接              out.close();              socket.close();          }      }  } 

MINA服务器输出结果:

sessionCreated
messageReceived:牛顿
messageReceived:爱因斯坦
sessionClosed

Netty服务器输出结果:

channelActive
channelRead:牛顿
channelRead:爱因斯坦
channelInactive

Twisted服务器输出结果:

connectionMade
stringReceived:牛顿
stringReceived:爱因斯坦
connectionLost

Mina、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)