首页 > 代码库 > virtIO前后端notify机制详解

virtIO前后端notify机制详解

2016-11-15


 本来这是在前端驱动后期分析的,但是这部分内容比较多,且分析了后端notify前端的机制,所以还是单独拿出一节分析比较好!

还是拿网络驱动部分做案例,网络驱动部分有两个队列,(忽略控制队列):接收队列和发送队列;每个队列都对应一个virtqueue,两个队列之间是互不影响的。

前后端利用virtqueue的方式如下图所示:

技术分享

这里再详细的描述下,当两个queue都需要客户机填充buffer,ReceiveQueue需要客户机 driver提前填充分配好的空buffer,然后记录到availRing,并在恰当的时机通知后端,当外部网络有数据包到达时,qemu后端就从availRing 中获取一个buffer,然后填充数据,完事后记录buffer head index到usedRing.最后在恰当的时机通知客户机(向客户机注入中断),客户机接收到信号便知道有数据包到达,这里只需要从usedRing 中获取到index,然后取data数组的第i个元素即可。因为在客户机填充buffer的时候把逻辑buffer的指针保存在data数组中。

而SendQueue同样需要客户机去填充,只不过这里是当客户机需要发送数据包时,把数据包构造成逻辑buffer,然后填充到send Queue,并在恰当的时机通知后端,qemu后端收到通知就知道那个队列有请求到达,如果当前没有处理其他数据包就着手处理这个数据包。具体就同样是从AvailRing中取出buffer head index,然后从描述符表中get到buffer,这时就需要从buffer中copy数据了,因为要把数据包从host发送出去,然后更新usedRing。最后同样要在恰当的时机通知客户机。注意这里客户机同样需要从usedRing 中get index,但是这里主要是用于delay notify,因为数据包由客户机构造,其占用的buffer并不能重复使用,只是每次有数据包就把其构造成buffer而已。

 

以上便是基本的使用sendqueue和receive的原理,但是还有一点上面我没有提到,就是通知的那个恰当的时机,那么这个恰当的实际究竟是什么时候呢??在virtIO中有两种方式控制前后端的notify.

1、flags字段

2、事件触发

1、在vring_avail和vring_used的flags字段,控制前后端的通信。vring_used中的flags用于通知driver端,当add一个buffer的时候不用notify后端。而vring_avail中的flags用于通知qemu端,当消费一个buffer的时候不用interrupt 客户机。

2、在virtIO中又加入了另一种机制,需要由driver和qemu自己判断是否需要通知,也就是设置一个限额,当一端添加buffer或者消费buffer的数量达到指定数目,就触发事件,从而发生notify或者interrupt。在有这种机制的情况下就忽略了前面所说的flags。

 

这里我们以receiveQueue为例,分析下前后端的delay notify机制。

在front driver端:

客户机driver通过NAPI接收数据时,会在可用buffer不足的时候调用函数添加,具体就是try_fill_recv:

static bool try_fill_recv(struct receive_queue *rq, gfp_t gfp)
{
    struct virtnet_info *vi = rq->vq->vdev->priv;
    int err;
    bool oom;
    /*循环,每循环一次添加一个buffer,一直到填充满,即描述符表满*/
    do {
        if (vi->mergeable_rx_bufs)
            err = add_recvbuf_mergeable(rq, gfp);
        else if (vi->big_packets)
            err = add_recvbuf_big(rq, gfp);
        else
            err = add_recvbuf_small(rq, gfp);
        oom = err == -ENOMEM;
        if (err)
            break;
        ++rq->num;
    } while (rq->vq->num_free);
    if (unlikely(rq->num > rq->max))
        rq->max = rq->num;
    /*通知后端*/
    virtqueue_kick(rq->vq);
    return !oom;
}

 

至于添加的是哪种类型的buffer,我们这里并不关心,循环结束就调用virtqueue_kick(rq->vq)函数,此时参数是接收队列的virtqueue,

接下来就调用到了virtqueue_kick_prepare函数,该函数判断当前应不应该通知后端。先看下函数的代码:

 1 bool virtqueue_kick_prepare(struct virtqueue *_vq)
 2 {
 3     struct vring_virtqueue *vq = to_vvq(_vq);
 4     u16 new, old;
 5     bool needs_kick;
 6 
 7     START_USE(vq);
 8     /* We need to expose available array entries before checking avail
 9      * event. */
10     virtio_mb(vq->weak_barriers);
11 
12     old = vq->vring.avail->idx - vq->num_added;
13     new = vq->vring.avail->idx;
14     vq->num_added = 0;
15 
16 #ifdef DEBUG
17     if (vq->last_add_time_valid) {
18         WARN_ON(ktime_to_ms(ktime_sub(ktime_get(),
19                           vq->last_add_time)) > 100);
20     }
21     vq->last_add_time_valid = false;
22 #endif
23 
24     if (vq->event) {
25         needs_kick = vring_need_event(vring_avail_event(&vq->vring),
26                           new, old);
27     } else {
28         needs_kick = !(vq->vring.used->flags & VRING_USED_F_NO_NOTIFY);
29     }
30     END_USE(vq);
31     return needs_kick;

 

 

这里面涉及到几个变量,old是add_sg之前的avail.idx,而new是当前的avail.idx,还有一个是vring_avail_event(&vq->vring),看具体的实现:

1 #define vring_avail_event(vr) (*(__u16 *)&(vr)->used->ring[(vr)->num])

 

可以看到这里是VRingUsed中的ring数组最后一项的值,该值在后端驱动从virtqueue中pop一个elem之前设置成相应队列的下一个将要使用的index,即last_avail_index。

看下vring_need_event函数:

1 static inline int vring_need_event(__u16 event_idx, __u16 new_idx, __u16 old)
2 {
3     /* Note: Xen has similar logic for notification hold-off
4      * in include/xen/interface/io/ring.h with req_event and req_prod
5      * corresponding to event_idx + 1 and new_idx respectively.
6      * Note also that req_event and req_prod in Xen start at 1,
7      * event indexes in virtio start at 0. */
8     return (__u16)(new_idx - event_idx - 1) < (__u16)(new_idx - old);
9 }

 

前后端通过对比 (__u16)(new_idx - event_idx - 1) < (__u16)(new_idx - old)来判断是否需要notify后端,这在数据量比较大的时候显得很实用。在初始状态下,即在qemu一个buffer还没有使用的情况下,event_idx必然是0,那么此时这里的判断肯定为真,所以notify后端。后端收到通知就从virtqueue中pop buffer,同时在此之前需要设置event_idx,代码见qemu virtio.c的virtqueue_pop函数:

 1 void *virtqueue_pop(VirtQueue *vq, size_t sz)
 2 {
 3     ......
 4 
 5     i = head = virtqueue_get_head(vq, vq->last_avail_idx++);
 6     if (virtio_vdev_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX))    {
 7         vring_set_avail_event(vq, vq->last_avail_idx);
 8     }
10     ......
11 }

 

如果是初始化状态,即当前是首次执行virtqueue_pop函数,last_avail_idx=0,在++后就成了1,然后设置此值到UsedRing.ring[]数组的最后一项:

1 static inline void vring_set_avail_event(VirtQueue *vq, uint16_t val)
2 {
3     hwaddr pa;
4     if (!vq->notification) {
5         return;
6     }
7     pa = vq->vring.used + offsetof(VRingUsed, ring[vq->vring.num]);
8     virtio_stw_phys(vq->vdev, pa, val);
9 }

 

设置成功后就执行pop之后的处理,写入数据完成后,调用后端的 virtio_notify(vdev, q->rx_vq)函数。该函数执行前同样需要判断是否需要notify,具体函数为virtio_should_notify

bool virtio_should_notify(VirtIODevice *vdev, VirtQueue *vq)
{
    uint16_t old, new;
    bool v;
    /* We need to expose used array entries before checking used event. */
    smp_mb();
    /* Always notify when queue is empty (when feature acknowledge) */
    if (virtio_vdev_has_feature(vdev, VIRTIO_F_NOTIFY_ON_EMPTY) &&
        !vq->inuse && virtio_queue_empty(vq)) {
        return true;
    }

    if (!virtio_vdev_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX)) {
        return !(vring_avail_flags(vq) & VRING_AVAIL_F_NO_INTERRUPT);
    }

    v = vq->signalled_used_valid;
    vq->signalled_used_valid = true;
    old = vq->signalled_used;
    new = vq->signalled_used = vq->used_idx;
    return !v || vring_need_event(vring_get_used_event(vq), new, old);
}

 

该函数逻辑和前端driver总的判断函数大致类似,但是还是有些不同,首先,如果队列为空即当前没有可用buffer了,那么必然会notify前端;

接着判断是否支持这样事件触发式的方式即VIRTIO_RING_F_EVENT_IDX,如果不支持,就通过flags字段来判断。而如果支持,就通过事件触发来通知。

这里有两个条件:第一个是v = vq->signalled_used_valid和vring_need_event(vring_get_used_event(vq), new, old)

 v = vq->signalled_used_valid在初始化的时候被设置成false,表示还没有向前端做任何通知,而后再每次的virtio_should_notify中就会设置成true,并更新vq->signalled_used = vq->used_idx;所以如果是首次尝试通知前端,则总能成功,否则需要判断vring_need_event(vring_get_used_event(vq), new, old),该函数具体是根前面逻辑是一样的,正如前面所说,这是第一次尝试通知,所以总能成功。而vring_get_used_event(vq)是VRingAvail.ring[]数组的最后一项的值,该值在客户机driver中被设置

 

在次回到linux driver中,就会从usedRing中取buffer,同样每取出一个buffer就会设置used_event,代码见virtio_ring.c的virtqueue_get_buf函数,设置的值是vq->last_used_idx,记录客户机处理位置。

 1 void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len)
 2 {
 3     
 4     ......
 5     if (!(vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
 6         vring_used_event(&vq->vring) = vq->last_used_idx;
 7         virtio_mb(vq->weak_barriers);
 8     }
 9     ......
10     
11 }

 

到目前为止,基本一次完整的交互已经完成了,但是由于是初次交互,前后端的delay机制都没起作用,判断条件中使用到的event_idx已经更新了,假如说首次add 8个buffer,然后通知了后端,并且后端使用了三个buffer并首次notify前端,此时 后端向第4个buffer中写数据,last_avail_idx=4(从0开始),那么used_event=4,此时前端发现可用buffer不足,需要添加,那么本次添加了5个,即new=8+5=13,old=8,new-old=5,而此时new-used_event-1=8,条件不满足,所以此时前端driver添加的buffer就不用notify后端。而话说这段时间后端又处理好了第二个数据包,使用了3个buffer。但不幸,前端还在处理第二个buffer,即last_used_idx=2,则used_event=2;对于后端来讲new-old=3,new-used_event-1=3,条件不满足,所以也不用通知。这样delay notify的机制便显示出效果了。笔者认为这其实本质上就是一场速度的对决,为了保证公平,即使一方处理快,也不能任意向另一端发送数据,只能待对方处理的差不多了你才能发,这样发送一方可以歇歇,而接受一方也不会因为处理不及而丢弃,从而造成浪费!哈哈,真是无规矩不成方圆!

 

后记:

到此,virtIO部分已经分析的差不多了,分析期间真实感觉到了自己知识的匮乏,其间多次向开发者求助,并均得到认真回复,在此在此感谢这些优秀的开发者。有时候看内核代码就感觉工程师和硬件在干仗,站在工程师的角度,需要尽其所能榨取硬件的性能。大到实现算法的优化,小到分析程序执行流的概率,从而针对编译做优化。站在硬件的角度,你处理不好,我就不给你工作。而从这方面,工程师自然是完胜,并且还在不遗余力的朝着胜利的另一个境界挺近,即征服硬件!哈哈,不过谁都知道,这是一场没有胜负的战争,工程师自然优秀,但是,因为工程师内部的竞争,这样战斗将永无休止!!唉,瞎扯淡了,各位朋友,下篇文章见!

virtIO前后端notify机制详解