首页 > 代码库 > kcp源码走读

kcp源码走读

我看的是golang版本的kcp源码,下载地址:https://github.com/skywind3000/kcp

一个发送数据接收数据的基本流程如下
//发送端有一段数据buffer需要发送,于是调用send函数

kcp1.Send(buf.Bytes())//send函数将buffer分片成kcp的数据包格式,存在待发送队列中

kcp1.flush()//将发送队列中的数据通过下层协议(UDP)进行发送

//kcp2接收到下层协议(UDP)传进来的数据底层数据buffer

kcp2.Input(buffer[:hr], true, false)//调用Input函数将buffer转换成kcp的数据包格式

hr = int32(kcp2.Recv(buffer[:10]))//kcp2将接收的kcp数据包还原成kcp1发送的buffer数据

 


 
func (kcp *KCP) Recv(buffer []byte) (n int)

    // merge fragment
    count := 0
    for k := range kcp.rcv_queue {
        seg := &kcp.rcv_queue[k]
        copy(buffer, seg.data)
        buffer = buffer[len(seg.data):]
        n += len(seg.data)
        count++
        kcp.delSegment(*seg)
        if seg.frg == 0 {
            break
        }
    }
    if count > 0 {
        kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
    }

count = 0
    for k := range kcp.rcv_buf {
        seg := &kcp.rcv_buf[k]
        if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
            kcp.rcv_nxt++
            count++
        } else {
            break
        }
    }

    if count > 0 {
        kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
        kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
    }
// fast recover
    if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
        // ready to send back IKCP_CMD_WINS in ikcp_flush
        // tell remote my window size
        kcp.probe |= IKCP_ASK_TELL
    }

recv函数将接收消息队列中的数据包还原成原来的消息格式,通过buffer返回给调用者

还会把rcv_buf中的与接收序号相匹配的数据拷贝到rcv_queue中。这里注意到在Input->parse_data函数中有同样的处理,这里之所以需要重复处理是因为kcp.rcv_queue的大小可能会发生改变,len(kcp.rcv_queue) < int(kcp.rcv_wnd)条件有可能重新成立。

fast_recover标识的意思是快速告知对端我又有窗口大小空出来了,因为在Input函数中有可能窗口会满了,此时发送给对端的是窗口满消息,而在recv过后,因为取走了消息,可用接收窗口又变大了,此时需要快速告知对端可以继续发消息了。

 

 

func (kcp *KCP) Send(buffer []byte) int {
    var count int
    if len(buffer) == 0 {
        return -1
    }

    // append to previous segment in streaming mode (if possible)
    if kcp.stream != 0 {
        n := len(kcp.snd_queue)
        if n > 0 {
            seg := &kcp.snd_queue[n-1]
            if len(seg.data) < int(kcp.mss) {
                capacity := int(kcp.mss) - len(seg.data)
                extend := capacity
                if len(buffer) < capacity {
                    extend = len(buffer)
                }

                // grow slice, the underlying cap is guaranteed to
                // be larger than kcp.mss
                oldlen := len(seg.data)
                seg.data = http://www.mamicode.com/seg.data[:oldlen+extend]
                copy(seg.data[oldlen:], buffer)
                buffer = buffer[extend:]
            }
        }

        if len(buffer) == 0 {
            return 0
        }
    }

    if len(buffer) <= int(kcp.mss) {
        count = 1
    } else {
        count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
    }

    if count > 255 {
        return -2
    }

    if count == 0 {
        count = 1
    }

    for i := 0; i < count; i++ {
        var size int
        if len(buffer) > int(kcp.mss) {
            size = int(kcp.mss)
        } else {
            size = len(buffer)
        }
        seg := kcp.newSegment(size)
        copy(seg.data, buffer[:size])
        if kcp.stream == 0 { // message mode
            seg.frg = uint8(count - i - 1)
        } else { // stream mode
            seg.frg = 0
        }
        kcp.snd_queue = append(kcp.snd_queue, seg)
        buffer = buffer[size:]
    }
    return 0
}

send函数主要功能是把用户需要发送的字符数组转化成kcp的数据包。如果用户的数据超过一个MSS,还会对数据进行分片。这里有两种分片的方式,消息方式和流方式。消息方式把用户数据分片,为每个分片设置ID,将分片后的数据一个一个地存入发送队列种,接收方通过id解析回原来的包,消息方式一个分片的数据量可能不能达到MSS(最大分片大小)。流方式则是会检测每个发送队列里的分片是否达到最大mss,如果没有达到就会用新的数据填充分片。流方式的网络速度优于消息方式,但是流方式接收方接收时是一个分片一个分片地接收,而消息方式kcp接收函数会自己把原本属于一个数据的分片重组回来。

kcp源码走读