文档章节

Linux内核中RPS/RFS代码分析

刺猬一号
 刺猬一号
发布于 2017/05/23 01:38
字数 1773
阅读 85
收藏 0

netdev_rx_queue表示对应的接收队列,很多网卡硬件上已经支持多个队列,此时就会有多个netdev_rx_queue队列,这个结构是挂在net_device,初始化接收队列的函数:netif_alloc_rx_queues

netif_alloc_rx_queues
 

static int netif_alloc_rx_queues(struct net_device *dev)
{
/*获取接收队列的个数*/
    unsigned int i, count = netdev_extended(dev)->rps_data.num_rx_queues;
    struct netdev_rx_queue *rx;

    BUG_ON(count < 1);
 /*分配netdev_rx_queue 空间*/
    rx = kcalloc(count, sizeof(struct netdev_rx_queue), GFP_KERNEL);
    if (!rx) {
        pr_err("netdev: Unable to allocate %u rx queues.\n", count);
        return -ENOMEM;
    }
 /* netdev_rx_queue 和net_device关联起来。*/
    netdev_extended(dev)->rps_data._rx = rx;
 /*对netdev_rx_queue 中net_device进行赋值操作*/
    for (i = 0; i < count; i++)
        rx[i].dev = dev;
    return 0;
}

    struct netdev_rx_queue {
    /*保存当前队列的rps map*/
    struct rps_map *rps_map;
/* //每个设备的队列保存了一个rps_dev_flow_table */    
struct rps_dev_flow_table *rps_flow_table;
//对应的kobject
    struct kobject kobj; 
/*所属的net_device*/
    struct net_device *dev;
} ____cacheline_aligned_in_smp;


struct rps_map {
/*CPU的个数,也就是CPU数组的个数*/
    unsigned int len;
    struct rcu_head rcu;
/*保存了CPU的ID*/
    u16 cpus[0];
};

get_rps_cpu

static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
         struct rps_dev_flow **rflowp)
{
    struct ipv6hdr *ip6;
    struct iphdr *ip;
    struct netdev_rx_queue *rxqueue;
    struct rps_map *map;
    struct rps_dev_flow_table *flow_table;
    struct rps_sock_flow_table *sock_flow_table;
    struct netdev_rps_info *rpinfo = &netdev_extended(dev)->rps_data;
    int cpu = -1;
    int tcpu;
    u8 ip_proto;
    u32 addr1, addr2, ports, ihl;

    rcu_read_lock();

    if (skb_rx_queue_recorded(skb)) {
/*获取设备对应的rx队列。*/
        u16 index = skb_get_rx_queue(skb);
        if (unlikely(index >= rpinfo->num_rx_queues)) {
            WARN_ONCE(rpinfo->num_rx_queues > 1, "%s received packet "
                "on queue %u, but number of RX queues is %u\n",
                dev->name, index, rpinfo->num_rx_queues);
            goto done;
        }
        rxqueue = rpinfo->_rx + index;
    } else
        rxqueue = rpinfo->_rx;

    if (!rxqueue->rps_map && !rxqueue->rps_flow_table)
        goto done;

    if (skb->rxhash) //如果硬件已经计算过,则直接跳过,不需要计算HASH值
        goto got_hash; /* Skip hash computation on packet header */

    switch (skb->protocol) { /*根据不同的IP协议获取源IP和目的IP*/
    case __constant_htons(ETH_P_IP):
        if (!pskb_may_pull(skb, sizeof(*ip)))
            goto done;

        ip = (struct iphdr *) skb->data;
        ip_proto = ip->protocol;
        addr1 = ip->saddr;
        addr2 = ip->daddr;
        ihl = ip->ihl;
        break;
    case __constant_htons(ETH_P_IPV6):
        if (!pskb_may_pull(skb, sizeof(*ip6)))
            goto done;

        ip6 = (struct ipv6hdr *) skb->data;
        ip_proto = ip6->nexthdr;
        addr1 = ip6->saddr.s6_addr32[3];
        addr2 = ip6->daddr.s6_addr32[3];
        ihl = (40 >> 2);
        break;
    default:
        goto done;
    }
    ports = 0;
    switch (ip_proto) {
    case IPPROTO_TCP:
    case IPPROTO_UDP:
    case IPPROTO_DCCP:
    case IPPROTO_ESP:
    case IPPROTO_AH:
    case IPPROTO_SCTP:
    case IPPROTO_UDPLITE:
        if (pskb_may_pull(skb, (ihl * 4) + 4))
            ports = *((u32 *) (skb->data + (ihl * 4))); /*获取四层协议的端口号,tcp头的前4个字节就是源和目的端口,因此这里跳过ip头得到tcp头的前4个字节*/
        break;

    default:
        break;
    }
 /*根据获取到的SIP和DIP,PORT计算HSAH值,*/
    skb->rxhash = jhash_3words(addr1, addr2, ports, hashrnd) >> 16;
    if (!skb->rxhash)
        skb->rxhash = 1;

got_hash:
/* rps_sock_flow_table和rps_dev_flow_table 是为了解决RFS而添加的两张表,rps_sock_flow_table是一个全局的hash表,这个表针对socket的,映射了socket对应的cpu,这里的cpu就是应用层期待软中断所在的cpu ,rps_dev_flow_table,这个是针对设备的,每个设备队列都含有一个rps_dev_flow_table(这个表主要是保存了上次处理相同链接上的skb所在的cpu),这个hash表中每一个元素包含了一个cpu id,一个tail queue的计数器*/
    flow_table = rcu_dereference(rxqueue->rps_flow_table);
    sock_flow_table = rcu_dereference(rps_sock_flow_table);
    if (flow_table && sock_flow_table) {
        u16 next_cpu;
        struct rps_dev_flow *rflow;

        rflow = &flow_table->flows[skb->rxhash & flow_table->mask];
        tcpu = rflow->cpu;

        next_cpu = sock_flow_table->ents[skb->rxhash &
         sock_flow_table->mask];

        /*首先会得到两个flow table,一个是sock_flow_table,另一个是设备的rps_flow_table(skb对应的设备队列中对应的flow table),这里的逻辑是这样子的取出来两个cpu,一个是根据rps计算数据包前一次被调度过的cpu(tcpu),一个是应用程序期望的cpu(next_cpu),然后比较这两个值,如果 1 tcpu未设置(等于RPS_NO_CPU) 2 tcpu是离线的 3 tcpu的input_queue_head大于rps_flow_table中的last_qtail 的话就调度这个skb到next_cpu.而这里第三点input_queue_head大于rps_flow_table则说明在当前的dev flow table中的数据包已经发送完毕,否则的话为了避免乱序就还是继续使用tcpu
         * If the desired CPU (where last recvmsg was done) is
         * different from current CPU (one in the rx-queue flow
         * table entry), switch if one of the following holds:
         * - Current CPU is unset (equal to RPS_NO_CPU).
         * - Current CPU is offline.
         * - The current CPU's queue tail has advanced beyond the
         * last packet that was enqueued using this table entry.
         * This guarantees that all previous packets for the flow
         * have been dequeued, thus preserving in order delivery.
         */
        if (unlikely(tcpu != next_cpu) &&
         (tcpu == RPS_NO_CPU || !cpu_online(tcpu) ||
         ((int)(per_cpu(softnet_data, tcpu).input_queue_head -
         rflow->last_qtail)) >= 0)) {
            tcpu = rflow->cpu = next_cpu;
            if (tcpu != RPS_NO_CPU)
                rflow->last_qtail = per_cpu(softnet_data,
                 tcpu).input_queue_head;
        }
        if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) {
            *rflowp = rflow;
 /*设置返回的CPU*/
            cpu = tcpu;
            goto done;
        }
    }
/*当第一次进来时tcpu是RPS_NO_CPU,并且next_cpu也是RPS_NO_CPU,此时会导致跳过rfs处理,而是直接使用rps的处理, */
    map = rcu_dereference(rxqueue->rps_map);
    if (map) {
        tcpu = map->cpus[((u32) (skb->rxhash * map->len)) >> 16];
/*如果cpu是online的,则返回计算出的这个cpu */
        if (cpu_online(tcpu)) {
            cpu = tcpu;
            goto done;
        }
    }

done:
    rcu_read_unlock();
    return cpu;
}

/*将skb挂在到对应cpu的input queue上的, enqueue_to_backlog接受一个skb和cpu为参数,通过cpu来判断skb如何处理。要么加入所属的input_pkt_queue中,要么schecule 软中断*/

enqueue_to_backlog

static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
             unsigned int *qtail)
{
    struct softnet_data *queue;
    unsigned long flags;
   /*根据传递过来的CPU,获取softnet_data结构体*/
    queue = &per_cpu(softnet_data, cpu);

    local_irq_save(flags);
    __get_cpu_var(netdev_rx_stat).total++;

    spin_lock(&queue->input_pkt_queue.lock);
    if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
        if (queue->input_pkt_queue.qlen) {
enqueue:/*将数据包添加到input_pkt_queue队列中*/
            __skb_queue_tail(&queue->input_pkt_queue, skb);
            *qtail = queue->input_queue_head +
                 queue->input_pkt_queue.qlen;

            spin_unlock_irqrestore(&queue->input_pkt_queue.lock,
             flags);
            return NET_RX_SUCCESS;
        }

        /* Schedule NAPI for backlog device 可以调度软中断*/
        if (napi_schedule_prep(&queue->backlog)) {
            if (cpu != smp_processor_id()) {/*判断该SKB是否该CPU处理*/
                struct rps_remote_softirq_cpus *rcpus =
                 &__get_cpu_var(rps_remote_softirq_cpus);

                cpu_set(cpu, rcpus->mask[rcpus->select]);
                __raise_softirq_irqoff(NET_RX_SOFTIRQ);
            } else
/*应该当前cpu处理,则直接schedule 软中断,这里可以看到传递进去的是backlog */
                ____napi_schedule(queue, &queue->backlog);
        }
        goto enqueue;
    }

    spin_unlock(&queue->input_pkt_queue.lock);

    __get_cpu_var(netdev_rx_stat).dropped++;
    local_irq_restore(flags);

    kfree_skb(skb);
    return NET_RX_DROP;
}

enqueue_to_backlog

static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
             unsigned int *qtail)
{
    struct softnet_data *queue;
    unsigned long flags;
   /*根据传递过来的CPU,获取softnet_data结构体*/
    queue = &per_cpu(softnet_data, cpu);

    local_irq_save(flags);
    __get_cpu_var(netdev_rx_stat).total++;

    spin_lock(&queue->input_pkt_queue.lock);
    if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
        if (queue->input_pkt_queue.qlen) {
enqueue:/*将数据包添加到input_pkt_queue队列中*/
            __skb_queue_tail(&queue->input_pkt_queue, skb);
            *qtail = queue->input_queue_head +
                 queue->input_pkt_queue.qlen;

            spin_unlock_irqrestore(&queue->input_pkt_queue.lock,
             flags);
            return NET_RX_SUCCESS;
        }

        /* Schedule NAPI for backlog device 可以调度软中断*/
        if (napi_schedule_prep(&queue->backlog)) {
            if (cpu != smp_processor_id()) {/*判断该SKB是否该CPU处理*/
                struct rps_remote_softirq_cpus *rcpus =
                 &__get_cpu_var(rps_remote_softirq_cpus);

                cpu_set(cpu, rcpus->mask[rcpus->select]);
                __raise_softirq_irqoff(NET_RX_SOFTIRQ);
            } else
/*应该当前cpu处理,则直接schedule 软中断,这里可以看到传递进去的是backlog */
                ____napi_schedule(queue, &queue->backlog);
        }
        goto enqueue;
    }

    spin_unlock(&queue->input_pkt_queue.lock);

    __get_cpu_var(netdev_rx_stat).dropped++;
    local_irq_restore(flags);

    kfree_skb(skb);
    return NET_RX_DROP;
}

inet_recvmsg

int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
         size_t size, int flags)
{
    struct sock *sk = sock->sk;
    int addr_len = 0;
    int err;

    inet_rps_record_flow(sk);//设置HASH表

    err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,
                 flags & ~MSG_DONTWAIT, &addr_len);
    if (err >= 0)
        msg->msg_namelen = addr_len;
    return err;
}

这个函数主要是得到全局的rps_sock_flow_table,然后调用rps_record_sock_flow来对rps_sock_flow_table进行设置,这里会将socket的sk_rxhash传递进去当作hash的索引,而这个sk_rxhash其实就是skb里面的rxhash,skb的rxhash就是rps中设置的hash值,这个值是根据四元组进行hash的。这里用这个当索引一个是为了相同的socket都能落入一个index。而且下面的软中断上下文也比较容易存取这个hash表

inet_rps_record_flow

点击(此处)折叠或打开

static inline void inet_rps_record_flow(struct sock *sk)
{
    struct rps_sock_flow_table *sock_flow_table;

    rcu_read_lock();
    sock_flow_table = rcu_dereference(rps_sock_flow_table);
    rps_record_sock_flow(sock_flow_table, inet_sk_rxhash(sk));
    rcu_read_unlock();
}

rps_record_sock_flow

点击(此处)折叠或打开

static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
                    u32 hash)
{
    if (table && hash) {
/*获取索引*/
        unsigned int cpu, index = hash & table->mask;

        /* We only give a hint, preemption can change cpu under us 获取CPU */
        cpu = raw_smp_processor_id();
 /*保存对应的cpu,如果等于当前cpu,则说明已经设置过了*/
        if (table->ents[index] != cpu)
            table->ents[index] = cpu;
    }
}


 

    图:内核代码流程

© 著作权归作者所有

刺猬一号
粉丝 12
博文 373
码字总数 616361
作品 0
深圳
私信 提问
最新带谷歌技术的 Linux 内核亮相

最新带谷歌技术的Linux内核亮相 2.6.35 版Linux内核发行。这一最新版本的22个显著改进包括集成了对未来的Intel图形芯片,AMD Radeon 芯片电源管理和谷歌贡献的多核处理器下网络性能提升技术的...

xyxzfj
2010/08/05
2.1K
6
Google 向新版 Linux 内核贡献技术

本月初刚刚发布的 Linux 2.6.35 版内核中,包含了 RPS 和 RFS 这两项由 Google 贡献的新技术。 RPS 的全称是 Receive Packet Steering,这项技术将流入的数据包分布给所有可用的 CPU 去处理,...

红薯
2010/08/12
1K
0
Linux RPS/RFS 实现原理浅析

本文快速解析一下RPS/RFS的基本原理。 RPS-Receive Packet Steering 下面这个就是RPS的原理: 其实就是一个软件对CPU负载重分发的机制。其使能的作用点在CPU开始处理软中断的开始,即下面的地...

dog250
2018/04/21
0
0
Tips: 升级 Ubuntu 10.04 内核到最新的 2.6.35 版

前段时间 Linux 发布了最新版的 2.6.35 最新内核,增加了一些新特性。在 Ubuntu 10.10 Alpha 3 中官方已启用了最新的内核,但在目前的 10.04 Lucid 中内核依旧没有采用新版本。不过如果你有需...

红薯
2010/08/12
905
0
socket选项 SO_REUSEPORT

前言 本篇用于记录学习SOREUSEPORT的笔记和心得,末尾还会提供一个bindp小工具也能为已有的程序享受这个新的特性。 当前Linux网络应用程序问题 运行在Linux系统上网络应用程序,为了利用多核...

miffa
2015/03/24
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Angular 英雄编辑器

应用程序现在有了基本的标题。 接下来你要创建一个新的组件来显示英雄信息并且把这个组件放到应用程序的外壳里去。 创建英雄组件 使用 Angular CLI 创建一个名为 heroes 的新组件。 ng gener...

honeymoose
今天
5
0
Kernel DMA

为什么会有DMA(直接内存访问)?我们知道通常情况下,内存数据跟外设之间的通信是通过cpu来传递的。cpu运行io指令将数据从内存拷贝到外设的io端口,或者从外设的io端口拷贝到内存。由于外设...

yepanl
今天
6
0
hive

一、hive的定义: Hive是一个SQL解析引擎,将SQL语句转译成MR Job,然后再在Hadoop平台上运行,达到快速开发的目的 Hive中的表是纯逻辑表,就只是表的定义,即表的元数据。本质就是Hadoop的目...

霉男纸
今天
5
0
二、Spring Cloud—Eureka(Greenwich.SR1)

注:本系列文章所用工具及版本如下:开发工具(IDEA 2018.3.5),Spring Boot(2.1.3.RELEASE),Spring Cloud(Greenwich.SR1),Maven(3.6.0),JDK(1.8) Eureka: Eureka是Netflix开发...

倪伟伟
昨天
15
0
eclipse常用插件

amaterasUML https://takezoe.github.io/amateras-update-site/ https://github.com/takezoe/amateras-modeler modelGoon https://www.cnblogs.com/aademeng/articles/6890266.html......

大头鬼_yc
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部