首页 > 代码库 > NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码

NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码

1、粘包与段包

 

粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。
造成的可能原因:

    发送端需要等缓冲区满才发送出去,造成粘包

    接收方不及时接收缓冲区的包,造成多个包接收

 

断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全。

 

2、消息传输的格式

 

消息长度+消息头+消息体  即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束。

消息头+消息体    即固定长度的消息,前几个字节为消息头,后面的是消息头。

在MINA中用的是

消息长度+消息体 即前4个字节用于存储消息的长度,用于判断当前消息什么时候结束。

 

3、编码与解码

 

   在网络中,信息的传输都是通过字节的形式传输的,而我们在编写自己的代码时,则都是具体的对象,那么要想我们的对象能够在网络中传输,就需要编码与解码。

 

   编码:即把我们的消息编码成二进制形式,能以字节的形式在网络中传输。

   解码:即把我们收到的字节解码成我们代码中的对象。

   在MINA中对象的编码与解码用的都是JDK提供的ObjectOutputStream来实现的。

 

4、MINA中消息的处理实现

 

消息的接受处理,我们常用的是TCP协议,而TCP协议会分片的,在下面的代码中,具体功能就是循环从通道里面读取数据,直到没有数据可读,或者buffer满了,然后就把接受到的数据发给解码工厂进行处理。

 

4.1、消息的接收

 

 

[java] view plain copy
 
 print?
  1. //class AbstractPollingIoProcessor  
  2. private void read(S session) {  
  3.         IoSessionConfig config = session.getConfig();  
  4.         int bufferSize = config.getReadBufferSize();  
  5.         IoBuffer buf = IoBuffer.allocate(bufferSize);  
  6.   
  7.         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();  
  8.   
  9.         try {  
  10.             int readBytes = 0;  
  11.             int ret;  
  12.   
  13.             try {  
  14.                 //是否有分片 tcp传输会有分片,即把大消息分片成多个小消息再传输  
  15.                 if (hasFragmentation) {  
  16.             //read方法非阻塞,没有读到数据的时候返回0  
  17.                     while ((ret = read(session, buf)) > 0) {  
  18.                         readBytes += ret;  
  19.                         //buffer 满了  
  20.                         if (!buf.hasRemaining()) {  
  21.                             break;  
  22.                         }  
  23.                     }  
  24.                 } else {  
  25.                     ret = read(session, buf);  
  26.   
  27.                     if (ret > 0) {  
  28.                         readBytes = ret;  
  29.                     }  
  30.                 }  
  31.             } finally {  
  32.                 buf.flip();  
  33.             }  
  34.   
  35.             if (readBytes > 0) {  
  36.                 IoFilterChain filterChain = session.getFilterChain();  
  37.         //处理消息  
  38.                 filterChain.fireMessageReceived(buf);  
  39.                 buf = null;  
  40.   
  41.                 if (hasFragmentation) {  
  42.                     if (readBytes << 1 < config.getReadBufferSize()) {  
  43.                         session.decreaseReadBufferSize();  
  44.                     } else if (readBytes == config.getReadBufferSize()) {  
  45.                         session.increaseReadBufferSize();  
  46.                     }  
  47.                 }  
  48.             }  
  49.   
  50.             if (ret < 0) {  
  51.                 scheduleRemove(session);  
  52.             }  
  53.         } catch (Throwable e) {  
  54.             if (e instanceof IOException) {  
  55.                 if (!(e instanceof PortUnreachableException)  
  56.                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())  
  57.                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {  
  58.                     scheduleRemove(session);  
  59.                 }  
  60.             }  
  61.   
  62.             IoFilterChain filterChain = session.getFilterChain();  
  63.             filterChain.fireExceptionCaught(e);  
  64.         }  
  65.     }  



 

 

 

 

 

 

4.2、解码与编码

 

 

[java] view plain copy
 
 print?
  1. //class AbstractIoBuffer  
  2.  public Object getObject(final ClassLoader classLoader) throws ClassNotFoundException {  
  3.     //首先判断当前buffer中消息长度是否完整,不完整的话直接返回  
  4.         if (!prefixedDataAvailable(4)) {  
  5.             throw new BufferUnderflowException();  
  6.         }  
  7.   
  8.     //消息长度  
  9.         int length = getInt();  
  10.         if (length <= 4) {  
  11.             throw new BufferDataException("Object length should be greater than 4: " + length);  
  12.         }  
  13.   
  14.         int oldLimit = limit();  
  15.     //limit到消息结尾处  
  16.         limit(position() + length);  
  17.         try {  
  18.             ObjectInputStream in = new ObjectInputStream(asInputStream()) {  
  19.                 @Override  
  20.                 protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException {  
  21.                     int type = read();  
  22.                     if (type < 0) {  
  23.                         throw new EOFException();  
  24.                     }  
  25.                     switch (type) {  
  26.                     case 0: // NON-Serializable class or Primitive types  
  27.                         return super.readClassDescriptor();  
  28.                     case 1: // Serializable class  
  29.                         String className = readUTF();  
  30.                         Class<?> clazz = Class.forName(className, true, classLoader);  
  31.                         return ObjectStreamClass.lookup(clazz);  
  32.                     default:  
  33.                         throw new StreamCorruptedException("Unexpected class descriptor type: " + type);  
  34.                     }  
  35.                 }  
  36.   
  37.                 @Override  
  38.                 protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {  
  39.                     String name = desc.getName();  
  40.                     try {  
  41.                         return Class.forName(name, false, classLoader);  
  42.                     } catch (ClassNotFoundException ex) {  
  43.                         return super.resolveClass(desc);  
  44.                     }  
  45.                 }  
  46.             };  
  47.             return in.readObject();  
  48.         } catch (IOException e) {  
  49.             throw new BufferDataException(e);  
  50.         } finally {  
  51.             limit(oldLimit);  
  52.         }  
  53.     }  
  54.   
  55. //判断当前消息是否完整   
  56. public boolean prefixedDataAvailable(int prefixLength, int maxDataLength) {  
  57.         if (remaining() < prefixLength) {  
  58.             return false;  
  59.         }  
  60.   
  61.         int dataLength;  
  62.         switch (prefixLength) {  
  63.         case 1:  
  64.             dataLength = getUnsigned(position());  
  65.             break;  
  66.         case 2:  
  67.             dataLength = getUnsignedShort(position());  
  68.             break;  
  69.         case 4:  
  70.             dataLength = getInt(position());  
  71.             break;  
  72.         default:  
  73.             throw new IllegalArgumentException("prefixLength: " + prefixLength);  
  74.         }  
  75.   
  76.         if (dataLength < 0 || dataLength > maxDataLength) {  
  77.             throw new BufferDataException("dataLength: " + dataLength);  
  78.         }  
  79.     //判断当前消息是否完整   
  80.         return remaining() - prefixLength >= dataLength;  
  81.     }  
  82.   
  83. //编码  
  84.  public IoBuffer putObject(Object o) {  
  85.         int oldPos = position();  
  86.         skip(4); // Make a room for the length field.预留4个字节用于存储消息长度  
  87.         try {  
  88.             ObjectOutputStream out = new ObjectOutputStream(asOutputStream()) {  
  89.                 @Override  
  90.                 protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {  
  91.                     try {  
  92.                         Class<?> clz = Class.forName(desc.getName());  
  93.                         if (!Serializable.class.isAssignableFrom(clz)) { // NON-Serializable class  
  94.                             write(0);  
  95.                             super.writeClassDescriptor(desc);  
  96.                         } else { // Serializable class  
  97.                             write(1);  
  98.                             writeUTF(desc.getName());  
  99.                         }  
  100.                     } catch (ClassNotFoundException ex) { // Primitive types  
  101.                         write(0);  
  102.                         super.writeClassDescriptor(desc);  
  103.                     }  
  104.                 }  
  105.             };  
  106.             out.writeObject(o);  
  107.             out.flush();  
  108.         } catch (IOException e) {  
  109.             throw new BufferDataException(e);  
  110.         }  
  111.   
  112.         // Fill the length field  
  113.         int newPos = position();  
  114.         position(oldPos);  
  115.     //存储消息长度  
  116.         putInt(newPos - oldPos - 4);  
  117.         position(newPos);  
  118.         return this;  
  119.     }  



 

 

 

 

 

 

4.3、断包与粘包处理

 

[java] view plain copy
 
 print?
  1. // class CumulativeProtocolDecoder  
  2.  public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
  3.     //是否有分片,tcp 有分片  
  4.         if (!session.getTransportMetadata().hasFragmentation()) {  
  5.             while (in.hasRemaining()) {  
  6.                 if (!doDecode(session, in, out)) {  
  7.                     break;  
  8.                 }  
  9.             }  
  10.   
  11.             return;  
  12.         }  
  13.   
  14.     // 1、断包处理  
  15.     // 2、处理粘包  
  16.         boolean usingSessionBuffer = true;  
  17.     //session中是否有断包情况(上次处理后),断包保存在session中  
  18.         IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);  
  19.         // If we have a session buffer, append data to that; otherwise  
  20.         // use the buffer read from the network directly.  
  21.         if (buf != null) {//有断包,则把当前包拼接到断包里面  
  22.             boolean appended = false;  
  23.             // Make sure that the buffer is auto-expanded.  
  24.             if (buf.isAutoExpand()) {  
  25.                 try {  
  26.                     buf.put(in);  
  27.                     appended = true;  
  28.                 } catch (IllegalStateException e) {  
  29.                     // A user called derivation method (e.g. slice()),  
  30.                     // which disables auto-expansion of the parent buffer.  
  31.                 } catch (IndexOutOfBoundsException e) {  
  32.                     // A user disabled auto-expansion.  
  33.                 }  
  34.             }  
  35.   
  36.             if (appended) {  
  37.                 buf.flip();  
  38.             } else {  
  39.                 // Reallocate the buffer if append operation failed due to  
  40.                 // derivation or disabled auto-expansion.  
  41.                 buf.flip();  
  42.                 IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);  
  43.                 newBuf.order(buf.order());  
  44.                 newBuf.put(buf);  
  45.                 newBuf.put(in);  
  46.                 newBuf.flip();  
  47.                 buf = newBuf;  
  48.   
  49.                 // Update the session attribute.  
  50.                 session.setAttribute(BUFFER, buf);  
  51.             }  
  52.         } else {  
  53.             buf = in;  
  54.             usingSessionBuffer = false;  
  55.         }  
  56.   
  57.     //2 粘包处理,可能buffer中有多个消息,需要多次处理(解码)每个消息,直到消息处理完,或者剩下的消息不是一个完整的消息或者buffer没有数据了  
  58.   
  59.         for (;;) {  
  60.             int oldPos = buf.position();  
  61.             boolean decoded = doDecode(session, buf, out);  
  62.             if (decoded) {//解码 成功  
  63.                 if (buf.position() == oldPos) {  
  64.                     throw new IllegalStateException("doDecode() can‘t return true when buffer is not consumed.");  
  65.                 }  
  66.         //buffer空了  
  67.                 if (!buf.hasRemaining()) {//buffer没有数据了  
  68.                     break;  
  69.                 }  
  70.             } else {//剩下的消息不是一个完整的消息,断包出现了  
  71.                 break;  
  72.             }  
  73.         }  
  74.   
  75.         // if there is any data left that cannot be decoded, we store  
  76.         // it in a buffer in the session and next time this decoder is  
  77.         // invoked the session buffer gets appended to  
  78.         if (buf.hasRemaining()) {//剩下的消息不是一个完整的消息,断包出现了  
  79.         //如果断包已经保存在session中,则更新buffer,没有的话,就把剩下的断包保存在session中  
  80.             if (usingSessionBuffer && buf.isAutoExpand()) {  
  81.                 buf.compact();  
  82.             } else {  
  83.                 storeRemainingInSession(buf, session);  
  84.             }  
  85.         } else {  
  86.             if (usingSessionBuffer) {  
  87.                 removeSessionBuffer(session);  
  88.             }  
  89.         }  
  90.     }  



 

 

 

 

 

 

 

[java] view plain copy
 
 print?
    1. //class  ObjectSerializationDecoder  
    2.  protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
    3.     //首先判断当前buffer中消息长度是否完整,不完整的话直接返回  
    4.         if (!in.prefixedDataAvailable(4, maxObjectSize)) {  
    5.             return false;  
    6.         }  
    7.   
    8.         out.write(in.getObject(classLoader));  
    9.         return true;  
    10.     }  

NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码