首页 > 代码库 > 内核接收分组理解

内核接收分组理解

背景:
       内核接收分组的方式有两种:第一种:传统方式,使用中断的方式;第二种:NAPI,使用中断和轮询结合的方式。
 
中断方式:
       下图为一个分组到达NIC之后,该分组穿过内核到达网络层函数的路径。
技术分享
 
此图的下半部分为中断处理,上半部分为软中断。在中断处理中,函数net_interupt是
设备驱动程序的中断处理程序,它将确认此中断是否由接收到分组引发的,如果确实如此,
则控制权移交到函数net_rx。函数net_rx也是特定于NIC,首先创建一个新的套接字缓冲区,
分组的内容接下来从NIC传输到缓冲区(进入到物理内存中),然后使用内核源码中针对
各个传输类型的库函数来分析首部的数据。函数netif_rx函数不是特定于网络驱动函数,该
函数位于net/core/dev.c,调用该函数,标志着控制由特定于网卡的代码转移到了网络层的
通用接口部分。此函数的作用在于,将接收分组放置到一个特定于CPU的等待队列上,并
退出中断上下文。内核使用softnet_data来管理进出流量,其定义入下:
 
2352 /*2353  * Incoming packets are placed on per-cpu queues2354  */2355 struct softnet_data {2356     struct list_head    poll_list;2357     struct sk_buff_head process_queue;2358   2359     /* stats */2360     unsigned int        processed;2361     unsigned int        time_squeeze;2362     unsigned int        cpu_collision;2363     unsigned int        received_rps;2364 #ifdef CONFIG_RPS2365     struct softnet_data *rps_ipi_list;2366 #endif2367 #ifdef CONFIG_NET_FLOW_LIMIT2368     struct sd_flow_limit __rcu *flow_limit;2369 #endif2370     struct Qdisc        *output_queue;2371     struct Qdisc        **output_queue_tailp;2372     struct sk_buff      *completion_queue;23732374 #ifdef CONFIG_RPS2375     /* Elements below can be accessed between CPUs for RPS */2376     struct call_single_data csd ____cacheline_aligned_in_smp;2377     struct softnet_data *rps_ipi_next;2378     unsigned int        cpu;2379     unsigned int        input_queue_head;2380     unsigned int        input_queue_tail;2381 #endif2382     unsigned int        dropped;2383     struct sk_buff_head input_pkt_queue;   //对所有进入分组建立一个链表。2384     struct napi_struct  backlog;23852386 };
第2383行的input_pkt_queue即是CPU的等待队列。netif_rx的实现如下:
3329 static int netif_rx_internal(struct sk_buff *skb)3330 {3331     int ret;33323333     net_timestamp_check(netdev_tstamp_prequeue, skb);33343335     trace_netif_rx(skb);3336 #ifdef CONFIG_RPS  //RPS 和 RFS 相关代码3337     if (static_key_false(&rps_needed)) {3338         struct rps_dev_flow voidflow, *rflow = &voidflow;3339         int cpu;33403341         preempt_disable();3342         rcu_read_lock();33433344         cpu = get_rps_cpu(skb->dev, skb, &rflow);   //选择合适的CPU id3345         if (cpu < 0)3346             cpu = smp_processor_id();33473348         ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);33493350         rcu_read_unlock();3351         preempt_enable();3352     } else3353 #endif3354     {3355         unsigned int qtail;3356         ret = enqueue_to_backlog(skb, get_cpu(), &qtail);     //将skb入队3357         put_cpu();3358     }3359     return ret;3360 }33613362 /**3363  *  netif_rx    -   post buffer to the network code3364  *  @skb: buffer to post3365  *3366  *  This function receives a packet from a device driver and queues it for3367  *  the upper (protocol) levels to process.  It always succeeds. The buffer3368  *  may be dropped during processing for congestion control or by the3369  *  protocol layers.3370  *3371  *  return values:3372  *  NET_RX_SUCCESS  (no congestion)3373  *  NET_RX_DROP     (packet was dropped)3374  *3375  */33763377 int netif_rx(struct sk_buff *skb)3378 {3379     trace_netif_rx_entry(skb);33803381     return netif_rx_internal(skb);3382 }
入队函数 enqueue_to_backlog的实现如下:
技术分享
3286 static int enqueue_to_backlog(struct sk_buff *skb, int cpu,3287                   unsigned int *qtail)3288 {3289     struct softnet_data *sd;3290     unsigned long flags;3291     unsigned int qlen;32923293     sd = &per_cpu(softnet_data, cpu);  //获得此cpu上的softnet_data32943295     local_irq_save(flags);  32963297     rps_lock(sd);3298     qlen = skb_queue_len(&sd->input_pkt_queue);3299     if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {3300         if (qlen) {3301 enqueue:3302             __skb_queue_tail(&sd->input_pkt_queue, skb);3303             input_queue_tail_incr_save(sd, qtail);3304             rps_unlock(sd);3305             local_irq_restore(flags);3306             return NET_RX_SUCCESS;3307         }33083309         /* Schedule NAPI for backlog device3310          * We can use non atomic operation since we own the queue lock3311          */3312         if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {   3313             if (!rps_ipi_queued(sd))3314                 ____napi_schedule(sd, &sd->backlog); //只有qlen是0的时候才执行到这里3315         }3316         goto enqueue;3317     }33183319     sd->dropped++;3320     rps_unlock(sd);33213322     local_irq_restore(flags);33233324     atomic_long_inc(&skb->dev->rx_dropped);3325     kfree_skb(skb);3326     return NET_RX_DROP;3327 }
View Code

 

NAPI
NAPI是混合了中断和轮询机制,当一个新的分组到达,而前一个分组依然在处理,这时内核
并不需要产生中断,内核会继续处理并在处理完毕后将中断开启。这样内核就利用了中断和轮询的好处,
只有有分组到达的时候,才会进行轮询。
NAPI存在两个优势
1.减少了CPU使用率,因为更少的中断。
2.处理各种设备更加公平
只有设备满足如下两个条件时,才能实现NAPI:
1.设备必须能够保留多个接收的分组
2.设备必须能够禁用用于分组接收的IRQ。而且,发送分组或其他可能通过IRQ进行的操作,都仍然
必须是启用的。
其运行概览如下:
技术分享
如上所示,各个设备在进入poll list前需要禁用IRQ,而设备中的分组都处理完毕后重新开启IRQ。poll list使用
napi_struct来管理设备,其定义如下:
 
技术分享
296 /*297  * Structure for NAPI scheduling similar to tasklet but with weighting298  */299 struct napi_struct {300     /* The poll_list must only be managed by the entity which301      * changes the state of the NAPI_STATE_SCHED bit.  This means302      * whoever atomically sets that bit can add this napi_struct303      * to the per-cpu poll_list, and whoever clears that bit304      * can remove from the list right before clearing the bit.305      */306     struct list_head    poll_list;    //用作链表元素307308     unsigned long       state;309     int         weight;        //设备权重310     unsigned int        gro_count;311     int         (*poll)(struct napi_struct *, int);   //设备提供的轮询函数312 #ifdef CONFIG_NETPOLL313     spinlock_t      poll_lock;314     int         poll_owner;315 #endif316     struct net_device   *dev;317     struct sk_buff      *gro_list;318     struct sk_buff      *skb;319     struct hrtimer      timer;320     struct list_head    dev_list;321     struct hlist_node   napi_hash_node;322     unsigned int        napi_id;323 };    
View Code
变量state可以为NAPI_STATE_SCHED或NAPI_STATE_DISABLE, 前者表示设备将在
内核的下一次循环时被轮询,后者表示轮询已经结束且没有更多的分组等待处理,但
设备并没有从poll list移除。
 
支持NAPI的NIC需要修改中断处理程序,将此设备放置在poll list上。示例代码如下:
技术分享
2215 static irqreturn_t e100_intr(int irq, void *dev_id)2216 {  2217     struct net_device *netdev = dev_id;2218     struct nic *nic = netdev_priv(netdev);2219     u8 stat_ack = ioread8(&nic->csr->scb.stat_ack);2220       2221     netif_printk(nic, intr, KERN_DEBUG, nic->netdev,2222              "stat_ack = 0x%02X\n", stat_ack);2223    2224     if (stat_ack == stat_ack_not_ours ||    /* Not our interrupt */2225        stat_ack == stat_ack_not_present)    /* Hardware is ejected */2226         return IRQ_NONE;2227    2228     /* Ack interrupt(s) */2229     iowrite8(stat_ack, &nic->csr->scb.stat_ack);2230    2231     /* We hit Receive No Resource (RNR); restart RU after cleaning */2232     if (stat_ack & stat_ack_rnr)2233         nic->ru_running = RU_SUSPENDED;22342235     if (likely(napi_schedule_prep(&nic->napi))) { //设置state为NAPI_STATE_SCHED2236         e100_disable_irq(nic);2237         __napi_schedule(&nic->napi); //将设备添加到 poll list,并开启软中断。2238     }22392240     return IRQ_HANDLED;2241 }
View Code
函数__napi_schedule的定义入下:
技术分享
3013 /* Called with irq disabled */3014 static inline void ____napi_schedule(struct softnet_data *sd,3015                      struct napi_struct *napi)3016 {3017     list_add_tail(&napi->poll_list, &sd->poll_list);3018     __raise_softirq_irqoff(NET_RX_SOFTIRQ);3019 } 4375 /**4376  * __napi_schedule - schedule for receive4377  * @n: entry to schedule4378  *4379  * The entry‘s receive function will be scheduled to run.4380  * Consider using __napi_schedule_irqoff() if hard irqs are masked.4381  */4382 void __napi_schedule(struct napi_struct *n)4383 {4384     unsigned long flags;43854386     local_irq_save(flags);4387     ____napi_schedule(this_cpu_ptr(&softnet_data), n);4388     local_irq_restore(flags);4389 }4390 EXPORT_SYMBOL(__napi_schedule);
View Code
设备除了对中断处理进行修改,还需要提供一个poll函数,使用此函数从NIC中获取分组。示例代码如下:
技术分享
特定于硬件的方法 hyper_do_poll 从NIC中获取分组,返回值work_done为处理的分组数目。当分组处理完后,
会调用netif_rx_complete将此设备从poll list中移除。ixgbe网卡的poll函数为ixgbe_poll, 而对于非NAPI的函数,
内核提供默认的处理函数process_backlog。
     
软中断处理相关
无论是NAPI接口还是非NAPI最后都是使用 net_rx_action 作为软中断处理函数。因此整个流程如下:
技术分享
上图中有些函数名已经发生变更,但是流程依然如此。在最新的3.19内核代码中,非NAPI的调用流程如下:
neif_rx会调用enqueue_to_backlog 将skb存入softnet_data,并调用____napi_schedule函数。
netif_rx===>netif_rx_internal===>enqueue_to_backlog===>____napi_schedule===>net_rx_action===>process_backlog===>__netif_receive_skb
e100网卡的NAPI调用流程入下:
e100_intr===>__napi_schedule===>net_rx_action===>e100_poll===>e100_rx_clean===>e100_rx_indicate===>netif_receive_skb
 
 
 

内核接收分组理解