首页 > 代码库 > [编织消息框架][设计协议]解决粘包半包(下)

[编织消息框架][设计协议]解决粘包半包(下)

接下来介绍netty如何切割分包

学习目的,了解处理业务,方便以后脱离依赖

读者如果不感兴趣或看不懂可以先忽略,难度比较大

LengthFieldBasedFrameDecoder.class

    public LengthFieldBasedFrameDecoder(            ByteOrder byteOrder,     //大小端模式 默认大端 ByteOrder BIG_ENDIAN            int maxFrameLength,      //包Frame netty叫帧概念 最大上限            int lengthFieldOffset,     //包长度信息偏移多少bytes            int lengthFieldLength,    //包长度单位为bytes            int lengthAdjustment,     //包附加信息占多少位,如包尾部checksum 也可以不用设置            int initialBytesToStrip,//忽悠包头信息,返给上层时会去掉这部份bytes            boolean failFast)        //解包出错,控制抛异常,默认为true 无需关心这选项

解读netty源码

分四部份

1.netty解码介绍

2.边界判断

3.计算逻辑

4.切割包

第一部份

先看下LengthFieldBasedFrameDecoder继承类之间的关系

技术分享

ByteToMessageDecoder处理比较复杂,先不考虑

其实解码只要工作方法是

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
//公开调用方法 out 是解码后保存返回,为什么是个数组?原因有可能出现粘包情况多次解码,合并结果一次返回上层业务protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {    //调用实现解码方法    Object decoded = decode(ctx, in);    if (decoded != null) {        out.add(decoded);    }}

提示:如果自己实现解码 继承ByteToMessageDecoder类,逻辑写在decode(ChannelHandlerContext ctx, ByteBuf in)方法即可

第三部份

技术分享
 1     //计算包长度偏移坐标 已读坐标+lengthFieldOffset参数 2     int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset; 3     //算出包长度 4     long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); 5      6     //包实际长度  加上开始忽悠的 lengthFieldEndOffset参数 加上 lengthAdjustment 参数 7     frameLength += lengthAdjustment + lengthFieldEndOffset; 8      9     //跳过忽悠头部 initialBytesToStrip参数10     in.skipBytes(initialBytesToStrip);11         12     //length 单位是byte 如2 是占一个short长度 4占length长度 8占long长度13     protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {14         //转换大小端模式15         buf = buf.order(order);16         long frameLength;17         switch (length) {18         case 1:19             frameLength = buf.getUnsignedByte(offset);20             break;21         case 2:22             frameLength = buf.getUnsignedShort(offset);23             break;24         case 3:25             frameLength = buf.getUnsignedMedium(offset);26             break;27         case 4:28             frameLength = buf.getUnsignedInt(offset);29             break;30         case 8:31             frameLength = buf.getLong(offset);32             break;33         default:34             throw new DecoderException(35                     "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");36         }37         return frameLength;38     }
计算逻辑

第四部份

技术分享
 1 int frameLengthInt = (int) frameLength; 2     // extract frame 3     int readerIndex = in.readerIndex(); 4     //计算实际包长度 上面逻辑已跑过 in.skipBytes(initialBytesToStrip) 所以要减去 initialBytesToStrip 5     int actualFrameLength = frameLengthInt - initialBytesToStrip; 6     //切割包  7     ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength); 8     //记录已读坐标 9     in.readerIndex(readerIndex + actualFrameLength);10         11         12     //切割实现,直接调用ByteBuf 方法切割13     protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {14         return buffer.retainedSlice(index, length);15     }16     17     AbstractUnpooledSlicedByteBuf.class18     //深追ByteBuf切割实现,发现返回的ByteBuf只是保留引用,只是简单记录了下offsetStart adjustment writerIndex19     //如果内存吃紧ByteBuf回收会产生意想不到bug20     AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) {21         super(length);22         checkSliceOutOfBounds(index, length, buffer);23 24         if (buffer instanceof AbstractUnpooledSlicedByteBuf) {25             this.buffer = ((AbstractUnpooledSlicedByteBuf) buffer).buffer;26             adjustment = ((AbstractUnpooledSlicedByteBuf) buffer).adjustment + index;27         } else if (buffer instanceof DuplicatedByteBuf) {28             this.buffer = buffer.unwrap();29             adjustment = index;30         } else {31             this.buffer = buffer;32             adjustment = index;33         }34 35         initLength(length);36         writerIndex(length);37     }
切割包

第二部份

现在回过头来看边界判断,主要是看处理出错逻辑部份

技术分享
 1 long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); 2         //解码出错 3         if (frameLength < 0) { 4             //无法计算包内容时netty策略选择跳过 包头信息跟包长度大小 5             in.skipBytes(lengthFieldEndOffset); 6             throw new CorruptedFrameException( 7                     "negative pre-adjustment length field: " + frameLength); 8         } 9 10          frameLength += lengthAdjustment + lengthFieldEndOffset;11         //奇怪的逻辑,上面已加上 lengthFieldEndOffset 除非 lengthAdjustment 参数或 lengthFieldEndOffset参数设置为负值12         //一般没什么复杂情况最好不要设置负值13         if (frameLength < lengthFieldEndOffset) {14             in.skipBytes(lengthFieldEndOffset);15             throw new CorruptedFrameException(16                     "Adjusted frame length (" + frameLength + ") is less " +17                     "than lengthFieldEndOffset: " + lengthFieldEndOffset);18         }19         20         // never overflows because it‘s less than maxFrameLength21         int frameLengthInt = (int) frameLength;22         //缓冲剩余可读bytes少于包长度,说明出现半包情况,忽略处理23         if (in.readableBytes() < frameLengthInt) {24             return null;25         }26         //如果包长度少于 initialBytesToStrip 参数会抛异常并且跳过包内容27         //没什么特殊情况最好不要设置initialBytesToStrip参数28         if (initialBytesToStrip > frameLengthInt) {29             in.skipBytes(frameLengthInt);30             throw new CorruptedFrameException(31                     "Adjusted frame length (" + frameLength + ") is less " +32                     "than initialBytesToStrip: " + initialBytesToStrip);33         }
基本处理
技术分享
 1         //包长度大于 maxFrameLength 2         //看下netty处理策略 3         if (frameLength > maxFrameLength) { 4             //算出超出范围 5             long discard = frameLength - in.readableBytes(); 6             //记录包长度 7             tooLongFrameLength = frameLength; 8  9             //有两种情况处理10             11             //存在粘包情况,直接 in.skipBytes((int) frameLength);跳过当前包 什么也不用记录12             if (discard < 0) {13                 // buffer contains more bytes then the frameLength so we can discard all now14                 in.skipBytes((int) frameLength);15             } else {16                 //当 discard>=0 17                 //说明没存在粘包情况,有可能出现半包情况,in.skipBytes(in.readableBytes()); 清空所有接收数据18                 //设置discardingTooLongFrame,bytesToDiscard记录超出范围,下次解码时用上19                 // Enter the discard mode and discard everything received so far.20                 discardingTooLongFrame = true;21                 bytesToDiscard = discard;22                 in.skipBytes(in.readableBytes());23             }24             failIfNecessary(true);25             return null;26         }27         28         //当上次包出现超过最大包限制,要忽悠上个包半包情况29         if (discardingTooLongFrame) {30             long bytesToDiscard = this.bytesToDiscard;31             //剩余半包长度32             int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());33             //忽略剩余半包长度34             in.skipBytes(localBytesToDiscard);35             //记录忽略已忽略多少bytes36             bytesToDiscard -= localBytesToDiscard;37             this.bytesToDiscard = bytesToDiscard;38             //如果this.bytesToDiscard==0 说明已全部忽略上一个包 会重置 discardingTooLongFrame39             failIfNecessary(false);40         }41         42     private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {43         if (bytesToDiscard == 0) {44             // Reset to the initial state and tell the handlers that45             // the frame was too large.46             long tooLongFrameLength = this.tooLongFrameLength;47             this.tooLongFrameLength = 0;48             discardingTooLongFrame = false;49             if (!failFast ||50                 failFast && firstDetectionOfTooLongFrame) {51                 fail(tooLongFrameLength);52             }53         } else {54             // Keep discarding and notify handlers if necessary.55             if (failFast && firstDetectionOfTooLongFrame) {56                 fail(tooLongFrameLength);57             }58         }59     }
大于maxFrameLength策略

当大于maxFrameLength 处理核心忽略当前包,如果存在粘包直接忽略,如果存在半包记录剩余包大小bytesToDiscard,下次循环解码时不停跳过bytesToDiscard

 

关于分包处理

为何需要分包?比如要传送10G大小的文件,如果把数据全读进内存再发送出去,明显是不合理的,这时要把10G大小的数据切割分批读,分批发送出去

为什么不做在应用协议里?因业太多业务上的逻辑,底层是无法控制的,再说是特定的功能,没必要添加支持,看作成业务上一个小功能实现就行了

[编织消息框架][设计协议]解决粘包半包(下)