linux/samples/bpf/xsk_fwd.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/* Copyright(c) 2020 Intel Corporation. */
   3
   4#define _GNU_SOURCE
   5#include <poll.h>
   6#include <pthread.h>
   7#include <signal.h>
   8#include <sched.h>
   9#include <stdio.h>
  10#include <stdlib.h>
  11#include <string.h>
  12#include <sys/mman.h>
  13#include <sys/resource.h>
  14#include <sys/socket.h>
  15#include <sys/types.h>
  16#include <time.h>
  17#include <unistd.h>
  18#include <getopt.h>
  19#include <netinet/ether.h>
  20#include <net/if.h>
  21
  22#include <linux/bpf.h>
  23#include <linux/if_link.h>
  24#include <linux/if_xdp.h>
  25
  26#include <bpf/libbpf.h>
  27#include <bpf/xsk.h>
  28#include <bpf/bpf.h>
  29
  30#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
  31
  32typedef __u64 u64;
  33typedef __u32 u32;
  34typedef __u16 u16;
  35typedef __u8  u8;
  36
  37/* This program illustrates the packet forwarding between multiple AF_XDP
  38 * sockets in multi-threaded environment. All threads are sharing a common
  39 * buffer pool, with each socket having its own private buffer cache.
  40 *
  41 * Example 1: Single thread handling two sockets. The packets received by socket
  42 * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue
  43 * QB), while the packets received by socket B are forwarded to socket A. The
  44 * thread is running on CPU core X:
  45 *
  46 *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X
  47 *
  48 * Example 2: Two threads, each handling two sockets. The thread running on CPU
  49 * core X forwards all the packets received by socket A to socket B, and all the
  50 * packets received by socket B to socket A. The thread running on CPU core Y is
  51 * performing the same packet forwarding between sockets C and D:
  52 *
  53 *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD
  54 *         -c CX -c CY
  55 */
  56
  57/*
  58 * Buffer pool and buffer cache
  59 *
  60 * For packet forwarding, the packet buffers are typically allocated from the
  61 * pool for packet reception and freed back to the pool for further reuse once
  62 * the packet transmission is completed.
  63 *
  64 * The buffer pool is shared between multiple threads. In order to minimize the
  65 * access latency to the shared buffer pool, each thread creates one (or
  66 * several) buffer caches, which, unlike the buffer pool, are private to the
  67 * thread that creates them and therefore cannot be shared with other threads.
  68 * The access to the shared pool is only needed either (A) when the cache gets
  69 * empty due to repeated buffer allocations and it needs to be replenished from
  70 * the pool, or (B) when the cache gets full due to repeated buffer free and it
  71 * needs to be flushed back to the pull.
  72 *
  73 * In a packet forwarding system, a packet received on any input port can
  74 * potentially be transmitted on any output port, depending on the forwarding
  75 * configuration. For AF_XDP sockets, for this to work with zero-copy of the
  76 * packet buffers when, it is required that the buffer pool memory fits into the
  77 * UMEM area shared by all the sockets.
  78 */
  79
  80struct bpool_params {
  81        u32 n_buffers;
  82        u32 buffer_size;
  83        int mmap_flags;
  84
  85        u32 n_users_max;
  86        u32 n_buffers_per_slab;
  87};
  88
  89/* This buffer pool implementation organizes the buffers into equally sized
  90 * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the
  91 * pool that are completely filled with buffer pointers (full slabs).
  92 *
  93 * Each buffer cache has a slab for buffer allocation and a slab for buffer
  94 * free, with both of these slabs initially empty. When the cache's allocation
  95 * slab goes empty, it is swapped with one of the available full slabs from the
  96 * pool, if any is available. When the cache's free slab goes full, it is
  97 * swapped for one of the empty slabs from the pool, which is guaranteed to
  98 * succeed.
  99 *
 100 * Partially filled slabs never get traded between the cache and the pool
 101 * (except when the cache itself is destroyed), which enables fast operation
 102 * through pointer swapping.
 103 */
 104struct bpool {
 105        struct bpool_params params;
 106        pthread_mutex_t lock;
 107        void *addr;
 108
 109        u64 **slabs;
 110        u64 **slabs_reserved;
 111        u64 *buffers;
 112        u64 *buffers_reserved;
 113
 114        u64 n_slabs;
 115        u64 n_slabs_reserved;
 116        u64 n_buffers;
 117
 118        u64 n_slabs_available;
 119        u64 n_slabs_reserved_available;
 120
 121        struct xsk_umem_config umem_cfg;
 122        struct xsk_ring_prod umem_fq;
 123        struct xsk_ring_cons umem_cq;
 124        struct xsk_umem *umem;
 125};
 126
 127static struct bpool *
 128bpool_init(struct bpool_params *params,
 129           struct xsk_umem_config *umem_cfg)
 130{
 131        struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
 132        u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
 133        u64 slabs_size, slabs_reserved_size;
 134        u64 buffers_size, buffers_reserved_size;
 135        u64 total_size, i;
 136        struct bpool *bp;
 137        u8 *p;
 138        int status;
 139
 140        /* mmap prep. */
 141        if (setrlimit(RLIMIT_MEMLOCK, &r))
 142                return NULL;
 143
 144        /* bpool internals dimensioning. */
 145        n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) /
 146                params->n_buffers_per_slab;
 147        n_slabs_reserved = params->n_users_max * 2;
 148        n_buffers = n_slabs * params->n_buffers_per_slab;
 149        n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
 150
 151        slabs_size = n_slabs * sizeof(u64 *);
 152        slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
 153        buffers_size = n_buffers * sizeof(u64);
 154        buffers_reserved_size = n_buffers_reserved * sizeof(u64);
 155
 156        total_size = sizeof(struct bpool) +
 157                slabs_size + slabs_reserved_size +
 158                buffers_size + buffers_reserved_size;
 159
 160        /* bpool memory allocation. */
 161        p = calloc(total_size, sizeof(u8));
 162        if (!p)
 163                return NULL;
 164
 165        /* bpool memory initialization. */
 166        bp = (struct bpool *)p;
 167        memcpy(&bp->params, params, sizeof(*params));
 168        bp->params.n_buffers = n_buffers;
 169
 170        bp->slabs = (u64 **)&p[sizeof(struct bpool)];
 171        bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) +
 172                slabs_size];
 173        bp->buffers = (u64 *)&p[sizeof(struct bpool) +
 174                slabs_size + slabs_reserved_size];
 175        bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) +
 176                slabs_size + slabs_reserved_size + buffers_size];
 177
 178        bp->n_slabs = n_slabs;
 179        bp->n_slabs_reserved = n_slabs_reserved;
 180        bp->n_buffers = n_buffers;
 181
 182        for (i = 0; i < n_slabs; i++)
 183                bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
 184        bp->n_slabs_available = n_slabs;
 185
 186        for (i = 0; i < n_slabs_reserved; i++)
 187                bp->slabs_reserved[i] = &bp->buffers_reserved[i *
 188                        params->n_buffers_per_slab];
 189        bp->n_slabs_reserved_available = n_slabs_reserved;
 190
 191        for (i = 0; i < n_buffers; i++)
 192                bp->buffers[i] = i * params->buffer_size;
 193
 194        /* lock. */
 195        status = pthread_mutex_init(&bp->lock, NULL);
 196        if (status) {
 197                free(p);
 198                return NULL;
 199        }
 200
 201        /* mmap. */
 202        bp->addr = mmap(NULL,
 203                        n_buffers * params->buffer_size,
 204                        PROT_READ | PROT_WRITE,
 205                        MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags,
 206                        -1,
 207                        0);
 208        if (bp->addr == MAP_FAILED) {
 209                pthread_mutex_destroy(&bp->lock);
 210                free(p);
 211                return NULL;
 212        }
 213
 214        /* umem. */
 215        status = xsk_umem__create(&bp->umem,
 216                                  bp->addr,
 217                                  bp->params.n_buffers * bp->params.buffer_size,
 218                                  &bp->umem_fq,
 219                                  &bp->umem_cq,
 220                                  umem_cfg);
 221        if (status) {
 222                munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
 223                pthread_mutex_destroy(&bp->lock);
 224                free(p);
 225                return NULL;
 226        }
 227        memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));
 228
 229        return bp;
 230}
 231
 232static void
 233bpool_free(struct bpool *bp)
 234{
 235        if (!bp)
 236                return;
 237
 238        xsk_umem__delete(bp->umem);
 239        munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
 240        pthread_mutex_destroy(&bp->lock);
 241        free(bp);
 242}
 243
 244struct bcache {
 245        struct bpool *bp;
 246
 247        u64 *slab_cons;
 248        u64 *slab_prod;
 249
 250        u64 n_buffers_cons;
 251        u64 n_buffers_prod;
 252};
 253
 254static u32
 255bcache_slab_size(struct bcache *bc)
 256{
 257        struct bpool *bp = bc->bp;
 258
 259        return bp->params.n_buffers_per_slab;
 260}
 261
 262static struct bcache *
 263bcache_init(struct bpool *bp)
 264{
 265        struct bcache *bc;
 266
 267        bc = calloc(1, sizeof(struct bcache));
 268        if (!bc)
 269                return NULL;
 270
 271        bc->bp = bp;
 272        bc->n_buffers_cons = 0;
 273        bc->n_buffers_prod = 0;
 274
 275        pthread_mutex_lock(&bp->lock);
 276        if (bp->n_slabs_reserved_available == 0) {
 277                pthread_mutex_unlock(&bp->lock);
 278                free(bc);
 279                return NULL;
 280        }
 281
 282        bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
 283        bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
 284        bp->n_slabs_reserved_available -= 2;
 285        pthread_mutex_unlock(&bp->lock);
 286
 287        return bc;
 288}
 289
 290static void
 291bcache_free(struct bcache *bc)
 292{
 293        struct bpool *bp;
 294
 295        if (!bc)
 296                return;
 297
 298        /* In order to keep this example simple, the case of freeing any
 299         * existing buffers from the cache back to the pool is ignored.
 300         */
 301
 302        bp = bc->bp;
 303        pthread_mutex_lock(&bp->lock);
 304        bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
 305        bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
 306        bp->n_slabs_reserved_available += 2;
 307        pthread_mutex_unlock(&bp->lock);
 308
 309        free(bc);
 310}
 311
 312/* To work correctly, the implementation requires that the *n_buffers* input
 313 * argument is never greater than the buffer pool's *n_buffers_per_slab*. This
 314 * is typically the case, with one exception taking place when large number of
 315 * buffers are allocated at init time (e.g. for the UMEM fill queue setup).
 316 */
 317static inline u32
 318bcache_cons_check(struct bcache *bc, u32 n_buffers)
 319{
 320        struct bpool *bp = bc->bp;
 321        u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
 322        u64 n_buffers_cons = bc->n_buffers_cons;
 323        u64 n_slabs_available;
 324        u64 *slab_full;
 325
 326        /*
 327         * Consumer slab is not empty: Use what's available locally. Do not
 328         * look for more buffers from the pool when the ask can only be
 329         * partially satisfied.
 330         */
 331        if (n_buffers_cons)
 332                return (n_buffers_cons < n_buffers) ?
 333                        n_buffers_cons :
 334                        n_buffers;
 335
 336        /*
 337         * Consumer slab is empty: look to trade the current consumer slab
 338         * (full) for a full slab from the pool, if any is available.
 339         */
 340        pthread_mutex_lock(&bp->lock);
 341        n_slabs_available = bp->n_slabs_available;
 342        if (!n_slabs_available) {
 343                pthread_mutex_unlock(&bp->lock);
 344                return 0;
 345        }
 346
 347        n_slabs_available--;
 348        slab_full = bp->slabs[n_slabs_available];
 349        bp->slabs[n_slabs_available] = bc->slab_cons;
 350        bp->n_slabs_available = n_slabs_available;
 351        pthread_mutex_unlock(&bp->lock);
 352
 353        bc->slab_cons = slab_full;
 354        bc->n_buffers_cons = n_buffers_per_slab;
 355        return n_buffers;
 356}
 357
 358static inline u64
 359bcache_cons(struct bcache *bc)
 360{
 361        u64 n_buffers_cons = bc->n_buffers_cons - 1;
 362        u64 buffer;
 363
 364        buffer = bc->slab_cons[n_buffers_cons];
 365        bc->n_buffers_cons = n_buffers_cons;
 366        return buffer;
 367}
 368
 369static inline void
 370bcache_prod(struct bcache *bc, u64 buffer)
 371{
 372        struct bpool *bp = bc->bp;
 373        u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
 374        u64 n_buffers_prod = bc->n_buffers_prod;
 375        u64 n_slabs_available;
 376        u64 *slab_empty;
 377
 378        /*
 379         * Producer slab is not yet full: store the current buffer to it.
 380         */
 381        if (n_buffers_prod < n_buffers_per_slab) {
 382                bc->slab_prod[n_buffers_prod] = buffer;
 383                bc->n_buffers_prod = n_buffers_prod + 1;
 384                return;
 385        }
 386
 387        /*
 388         * Producer slab is full: trade the cache's current producer slab
 389         * (full) for an empty slab from the pool, then store the current
 390         * buffer to the new producer slab. As one full slab exists in the
 391         * cache, it is guaranteed that there is at least one empty slab
 392         * available in the pool.
 393         */
 394        pthread_mutex_lock(&bp->lock);
 395        n_slabs_available = bp->n_slabs_available;
 396        slab_empty = bp->slabs[n_slabs_available];
 397        bp->slabs[n_slabs_available] = bc->slab_prod;
 398        bp->n_slabs_available = n_slabs_available + 1;
 399        pthread_mutex_unlock(&bp->lock);
 400
 401        slab_empty[0] = buffer;
 402        bc->slab_prod = slab_empty;
 403        bc->n_buffers_prod = 1;
 404}
 405
 406/*
 407 * Port
 408 *
 409 * Each of the forwarding ports sits on top of an AF_XDP socket. In order for
 410 * packet forwarding to happen with no packet buffer copy, all the sockets need
 411 * to share the same UMEM area, which is used as the buffer pool memory.
 412 */
 413#ifndef MAX_BURST_RX
 414#define MAX_BURST_RX 64
 415#endif
 416
 417#ifndef MAX_BURST_TX
 418#define MAX_BURST_TX 64
 419#endif
 420
 421struct burst_rx {
 422        u64 addr[MAX_BURST_RX];
 423        u32 len[MAX_BURST_RX];
 424};
 425
 426struct burst_tx {
 427        u64 addr[MAX_BURST_TX];
 428        u32 len[MAX_BURST_TX];
 429        u32 n_pkts;
 430};
 431
 432struct port_params {
 433        struct xsk_socket_config xsk_cfg;
 434        struct bpool *bp;
 435        const char *iface;
 436        u32 iface_queue;
 437};
 438
 439struct port {
 440        struct port_params params;
 441
 442        struct bcache *bc;
 443
 444        struct xsk_ring_cons rxq;
 445        struct xsk_ring_prod txq;
 446        struct xsk_ring_prod umem_fq;
 447        struct xsk_ring_cons umem_cq;
 448        struct xsk_socket *xsk;
 449        int umem_fq_initialized;
 450
 451        u64 n_pkts_rx;
 452        u64 n_pkts_tx;
 453};
 454
 455static void
 456port_free(struct port *p)
 457{
 458        if (!p)
 459                return;
 460
 461        /* To keep this example simple, the code to free the buffers from the
 462         * socket's receive and transmit queues, as well as from the UMEM fill
 463         * and completion queues, is not included.
 464         */
 465
 466        if (p->xsk)
 467                xsk_socket__delete(p->xsk);
 468
 469        bcache_free(p->bc);
 470
 471        free(p);
 472}
 473
 474static struct port *
 475port_init(struct port_params *params)
 476{
 477        struct port *p;
 478        u32 umem_fq_size, pos = 0;
 479        int status, i;
 480
 481        /* Memory allocation and initialization. */
 482        p = calloc(sizeof(struct port), 1);
 483        if (!p)
 484                return NULL;
 485
 486        memcpy(&p->params, params, sizeof(p->params));
 487        umem_fq_size = params->bp->umem_cfg.fill_size;
 488
 489        /* bcache. */
 490        p->bc = bcache_init(params->bp);
 491        if (!p->bc ||
 492            (bcache_slab_size(p->bc) < umem_fq_size) ||
 493            (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) {
 494                port_free(p);
 495                return NULL;
 496        }
 497
 498        /* xsk socket. */
 499        status = xsk_socket__create_shared(&p->xsk,
 500                                           params->iface,
 501                                           params->iface_queue,
 502                                           params->bp->umem,
 503                                           &p->rxq,
 504                                           &p->txq,
 505                                           &p->umem_fq,
 506                                           &p->umem_cq,
 507                                           &params->xsk_cfg);
 508        if (status) {
 509                port_free(p);
 510                return NULL;
 511        }
 512
 513        /* umem fq. */
 514        xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
 515
 516        for (i = 0; i < umem_fq_size; i++)
 517                *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
 518                        bcache_cons(p->bc);
 519
 520        xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
 521        p->umem_fq_initialized = 1;
 522
 523        return p;
 524}
 525
 526static inline u32
 527port_rx_burst(struct port *p, struct burst_rx *b)
 528{
 529        u32 n_pkts, pos, i;
 530
 531        /* Free buffers for FQ replenish. */
 532        n_pkts = ARRAY_SIZE(b->addr);
 533
 534        n_pkts = bcache_cons_check(p->bc, n_pkts);
 535        if (!n_pkts)
 536                return 0;
 537
 538        /* RXQ. */
 539        n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
 540        if (!n_pkts) {
 541                if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
 542                        struct pollfd pollfd = {
 543                                .fd = xsk_socket__fd(p->xsk),
 544                                .events = POLLIN,
 545                        };
 546
 547                        poll(&pollfd, 1, 0);
 548                }
 549                return 0;
 550        }
 551
 552        for (i = 0; i < n_pkts; i++) {
 553                b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
 554                b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
 555        }
 556
 557        xsk_ring_cons__release(&p->rxq, n_pkts);
 558        p->n_pkts_rx += n_pkts;
 559
 560        /* UMEM FQ. */
 561        for ( ; ; ) {
 562                int status;
 563
 564                status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
 565                if (status == n_pkts)
 566                        break;
 567
 568                if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
 569                        struct pollfd pollfd = {
 570                                .fd = xsk_socket__fd(p->xsk),
 571                                .events = POLLIN,
 572                        };
 573
 574                        poll(&pollfd, 1, 0);
 575                }
 576        }
 577
 578        for (i = 0; i < n_pkts; i++)
 579                *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
 580                        bcache_cons(p->bc);
 581
 582        xsk_ring_prod__submit(&p->umem_fq, n_pkts);
 583
 584        return n_pkts;
 585}
 586
 587static inline void
 588port_tx_burst(struct port *p, struct burst_tx *b)
 589{
 590        u32 n_pkts, pos, i;
 591        int status;
 592
 593        /* UMEM CQ. */
 594        n_pkts = p->params.bp->umem_cfg.comp_size;
 595
 596        n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
 597
 598        for (i = 0; i < n_pkts; i++) {
 599                u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
 600
 601                bcache_prod(p->bc, addr);
 602        }
 603
 604        xsk_ring_cons__release(&p->umem_cq, n_pkts);
 605
 606        /* TXQ. */
 607        n_pkts = b->n_pkts;
 608
 609        for ( ; ; ) {
 610                status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
 611                if (status == n_pkts)
 612                        break;
 613
 614                if (xsk_ring_prod__needs_wakeup(&p->txq))
 615                        sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT,
 616                               NULL, 0);
 617        }
 618
 619        for (i = 0; i < n_pkts; i++) {
 620                xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
 621                xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
 622        }
 623
 624        xsk_ring_prod__submit(&p->txq, n_pkts);
 625        if (xsk_ring_prod__needs_wakeup(&p->txq))
 626                sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
 627        p->n_pkts_tx += n_pkts;
 628}
 629
 630/*
 631 * Thread
 632 *
 633 * Packet forwarding threads.
 634 */
 635#ifndef MAX_PORTS_PER_THREAD
 636#define MAX_PORTS_PER_THREAD 16
 637#endif
 638
 639struct thread_data {
 640        struct port *ports_rx[MAX_PORTS_PER_THREAD];
 641        struct port *ports_tx[MAX_PORTS_PER_THREAD];
 642        u32 n_ports_rx;
 643        struct burst_rx burst_rx;
 644        struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
 645        u32 cpu_core_id;
 646        int quit;
 647};
 648
 649static void swap_mac_addresses(void *data)
 650{
 651        struct ether_header *eth = (struct ether_header *)data;
 652        struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost;
 653        struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost;
 654        struct ether_addr tmp;
 655
 656        tmp = *src_addr;
 657        *src_addr = *dst_addr;
 658        *dst_addr = tmp;
 659}
 660
 661static void *
 662thread_func(void *arg)
 663{
 664        struct thread_data *t = arg;
 665        cpu_set_t cpu_cores;
 666        u32 i;
 667
 668        CPU_ZERO(&cpu_cores);
 669        CPU_SET(t->cpu_core_id, &cpu_cores);
 670        pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores);
 671
 672        for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
 673                struct port *port_rx = t->ports_rx[i];
 674                struct port *port_tx = t->ports_tx[i];
 675                struct burst_rx *brx = &t->burst_rx;
 676                struct burst_tx *btx = &t->burst_tx[i];
 677                u32 n_pkts, j;
 678
 679                /* RX. */
 680                n_pkts = port_rx_burst(port_rx, brx);
 681                if (!n_pkts)
 682                        continue;
 683
 684                /* Process & TX. */
 685                for (j = 0; j < n_pkts; j++) {
 686                        u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
 687                        u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr,
 688                                                     addr);
 689
 690                        swap_mac_addresses(pkt);
 691
 692                        btx->addr[btx->n_pkts] = brx->addr[j];
 693                        btx->len[btx->n_pkts] = brx->len[j];
 694                        btx->n_pkts++;
 695
 696                        if (btx->n_pkts == MAX_BURST_TX) {
 697                                port_tx_burst(port_tx, btx);
 698                                btx->n_pkts = 0;
 699                        }
 700                }
 701        }
 702
 703        return NULL;
 704}
 705
 706/*
 707 * Process
 708 */
 709static const struct bpool_params bpool_params_default = {
 710        .n_buffers = 64 * 1024,
 711        .buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
 712        .mmap_flags = 0,
 713
 714        .n_users_max = 16,
 715        .n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
 716};
 717
 718static const struct xsk_umem_config umem_cfg_default = {
 719        .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
 720        .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
 721        .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
 722        .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
 723        .flags = 0,
 724};
 725
 726static const struct port_params port_params_default = {
 727        .xsk_cfg = {
 728                .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
 729                .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
 730                .libbpf_flags = 0,
 731                .xdp_flags = XDP_FLAGS_DRV_MODE,
 732                .bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY,
 733        },
 734
 735        .bp = NULL,
 736        .iface = NULL,
 737        .iface_queue = 0,
 738};
 739
 740#ifndef MAX_PORTS
 741#define MAX_PORTS 64
 742#endif
 743
 744#ifndef MAX_THREADS
 745#define MAX_THREADS 64
 746#endif
 747
 748static struct bpool_params bpool_params;
 749static struct xsk_umem_config umem_cfg;
 750static struct bpool *bp;
 751
 752static struct port_params port_params[MAX_PORTS];
 753static struct port *ports[MAX_PORTS];
 754static u64 n_pkts_rx[MAX_PORTS];
 755static u64 n_pkts_tx[MAX_PORTS];
 756static int n_ports;
 757
 758static pthread_t threads[MAX_THREADS];
 759static struct thread_data thread_data[MAX_THREADS];
 760static int n_threads;
 761
 762static void
 763print_usage(char *prog_name)
 764{
 765        const char *usage =
 766                "Usage:\n"
 767                "\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
 768                "\n"
 769                "-c CORE        CPU core to run a packet forwarding thread\n"
 770                "               on. May be invoked multiple times.\n"
 771                "\n"
 772                "-b SIZE        Number of buffers in the buffer pool shared\n"
 773                "               by all the forwarding threads. Default: %u.\n"
 774                "\n"
 775                "-i INTERFACE   Network interface. Each (INTERFACE, QUEUE)\n"
 776                "               pair specifies one forwarding port. May be\n"
 777                "               invoked multiple times.\n"
 778                "\n"
 779                "-q QUEUE       Network interface queue for RX and TX. Each\n"
 780                "               (INTERFACE, QUEUE) pair specified one\n"
 781                "               forwarding port. Default: %u. May be invoked\n"
 782                "               multiple times.\n"
 783                "\n";
 784        printf(usage,
 785               prog_name,
 786               bpool_params_default.n_buffers,
 787               port_params_default.iface_queue);
 788}
 789
 790static int
 791parse_args(int argc, char **argv)
 792{
 793        struct option lgopts[] = {
 794                { NULL,  0, 0, 0 }
 795        };
 796        int opt, option_index;
 797
 798        /* Parse the input arguments. */
 799        for ( ; ;) {
 800                opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
 801                if (opt == EOF)
 802                        break;
 803
 804                switch (opt) {
 805                case 'b':
 806                        bpool_params.n_buffers = atoi(optarg);
 807                        break;
 808
 809                case 'c':
 810                        if (n_threads == MAX_THREADS) {
 811                                printf("Max number of threads (%d) reached.\n",
 812                                       MAX_THREADS);
 813                                return -1;
 814                        }
 815
 816                        thread_data[n_threads].cpu_core_id = atoi(optarg);
 817                        n_threads++;
 818                        break;
 819
 820                case 'i':
 821                        if (n_ports == MAX_PORTS) {
 822                                printf("Max number of ports (%d) reached.\n",
 823                                       MAX_PORTS);
 824                                return -1;
 825                        }
 826
 827                        port_params[n_ports].iface = optarg;
 828                        port_params[n_ports].iface_queue = 0;
 829                        n_ports++;
 830                        break;
 831
 832                case 'q':
 833                        if (n_ports == 0) {
 834                                printf("No port specified for queue.\n");
 835                                return -1;
 836                        }
 837                        port_params[n_ports - 1].iface_queue = atoi(optarg);
 838                        break;
 839
 840                default:
 841                        printf("Illegal argument.\n");
 842                        return -1;
 843                }
 844        }
 845
 846        optind = 1; /* reset getopt lib */
 847
 848        /* Check the input arguments. */
 849        if (!n_ports) {
 850                printf("No ports specified.\n");
 851                return -1;
 852        }
 853
 854        if (!n_threads) {
 855                printf("No threads specified.\n");
 856                return -1;
 857        }
 858
 859        if (n_ports % n_threads) {
 860                printf("Ports cannot be evenly distributed to threads.\n");
 861                return -1;
 862        }
 863
 864        return 0;
 865}
 866
 867static void
 868print_port(u32 port_id)
 869{
 870        struct port *port = ports[port_id];
 871
 872        printf("Port %u: interface = %s, queue = %u\n",
 873               port_id, port->params.iface, port->params.iface_queue);
 874}
 875
 876static void
 877print_thread(u32 thread_id)
 878{
 879        struct thread_data *t = &thread_data[thread_id];
 880        u32 i;
 881
 882        printf("Thread %u (CPU core %u): ",
 883               thread_id, t->cpu_core_id);
 884
 885        for (i = 0; i < t->n_ports_rx; i++) {
 886                struct port *port_rx = t->ports_rx[i];
 887                struct port *port_tx = t->ports_tx[i];
 888
 889                printf("(%s, %u) -> (%s, %u), ",
 890                       port_rx->params.iface,
 891                       port_rx->params.iface_queue,
 892                       port_tx->params.iface,
 893                       port_tx->params.iface_queue);
 894        }
 895
 896        printf("\n");
 897}
 898
 899static void
 900print_port_stats_separator(void)
 901{
 902        printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
 903               "----",
 904               "------------",
 905               "-------------",
 906               "------------",
 907               "-------------");
 908}
 909
 910static void
 911print_port_stats_header(void)
 912{
 913        print_port_stats_separator();
 914        printf("| %4s | %12s | %13s | %12s | %13s |\n",
 915               "Port",
 916               "RX packets",
 917               "RX rate (pps)",
 918               "TX packets",
 919               "TX_rate (pps)");
 920        print_port_stats_separator();
 921}
 922
 923static void
 924print_port_stats_trailer(void)
 925{
 926        print_port_stats_separator();
 927        printf("\n");
 928}
 929
 930static void
 931print_port_stats(int port_id, u64 ns_diff)
 932{
 933        struct port *p = ports[port_id];
 934        double rx_pps, tx_pps;
 935
 936        rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
 937        tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;
 938
 939        printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
 940               port_id,
 941               p->n_pkts_rx,
 942               rx_pps,
 943               p->n_pkts_tx,
 944               tx_pps);
 945
 946        n_pkts_rx[port_id] = p->n_pkts_rx;
 947        n_pkts_tx[port_id] = p->n_pkts_tx;
 948}
 949
 950static void
 951print_port_stats_all(u64 ns_diff)
 952{
 953        int i;
 954
 955        print_port_stats_header();
 956        for (i = 0; i < n_ports; i++)
 957                print_port_stats(i, ns_diff);
 958        print_port_stats_trailer();
 959}
 960
 961static int quit;
 962
 963static void
 964signal_handler(int sig)
 965{
 966        quit = 1;
 967}
 968
 969static void remove_xdp_program(void)
 970{
 971        int i;
 972
 973        for (i = 0 ; i < n_ports; i++)
 974                bpf_set_link_xdp_fd(if_nametoindex(port_params[i].iface), -1,
 975                                    port_params[i].xsk_cfg.xdp_flags);
 976}
 977
 978int main(int argc, char **argv)
 979{
 980        struct timespec time;
 981        u64 ns0;
 982        int i;
 983
 984        /* Parse args. */
 985        memcpy(&bpool_params, &bpool_params_default,
 986               sizeof(struct bpool_params));
 987        memcpy(&umem_cfg, &umem_cfg_default,
 988               sizeof(struct xsk_umem_config));
 989        for (i = 0; i < MAX_PORTS; i++)
 990                memcpy(&port_params[i], &port_params_default,
 991                       sizeof(struct port_params));
 992
 993        if (parse_args(argc, argv)) {
 994                print_usage(argv[0]);
 995                return -1;
 996        }
 997
 998        /* Buffer pool initialization. */
 999        bp = bpool_init(&bpool_params, &umem_cfg);
1000        if (!bp) {
1001                printf("Buffer pool initialization failed.\n");
1002                return -1;
1003        }
1004        printf("Buffer pool created successfully.\n");
1005
1006        /* Ports initialization. */
1007        for (i = 0; i < MAX_PORTS; i++)
1008                port_params[i].bp = bp;
1009
1010        for (i = 0; i < n_ports; i++) {
1011                ports[i] = port_init(&port_params[i]);
1012                if (!ports[i]) {
1013                        printf("Port %d initialization failed.\n", i);
1014                        return -1;
1015                }
1016                print_port(i);
1017        }
1018        printf("All ports created successfully.\n");
1019
1020        /* Threads. */
1021        for (i = 0; i < n_threads; i++) {
1022                struct thread_data *t = &thread_data[i];
1023                u32 n_ports_per_thread = n_ports / n_threads, j;
1024
1025                for (j = 0; j < n_ports_per_thread; j++) {
1026                        t->ports_rx[j] = ports[i * n_ports_per_thread + j];
1027                        t->ports_tx[j] = ports[i * n_ports_per_thread +
1028                                (j + 1) % n_ports_per_thread];
1029                }
1030
1031                t->n_ports_rx = n_ports_per_thread;
1032
1033                print_thread(i);
1034        }
1035
1036        for (i = 0; i < n_threads; i++) {
1037                int status;
1038
1039                status = pthread_create(&threads[i],
1040                                        NULL,
1041                                        thread_func,
1042                                        &thread_data[i]);
1043                if (status) {
1044                        printf("Thread %d creation failed.\n", i);
1045                        return -1;
1046                }
1047        }
1048        printf("All threads created successfully.\n");
1049
1050        /* Print statistics. */
1051        signal(SIGINT, signal_handler);
1052        signal(SIGTERM, signal_handler);
1053        signal(SIGABRT, signal_handler);
1054
1055        clock_gettime(CLOCK_MONOTONIC, &time);
1056        ns0 = time.tv_sec * 1000000000UL + time.tv_nsec;
1057        for ( ; !quit; ) {
1058                u64 ns1, ns_diff;
1059
1060                sleep(1);
1061                clock_gettime(CLOCK_MONOTONIC, &time);
1062                ns1 = time.tv_sec * 1000000000UL + time.tv_nsec;
1063                ns_diff = ns1 - ns0;
1064                ns0 = ns1;
1065
1066                print_port_stats_all(ns_diff);
1067        }
1068
1069        /* Threads completion. */
1070        printf("Quit.\n");
1071        for (i = 0; i < n_threads; i++)
1072                thread_data[i].quit = 1;
1073
1074        for (i = 0; i < n_threads; i++)
1075                pthread_join(threads[i], NULL);
1076
1077        for (i = 0; i < n_ports; i++)
1078                port_free(ports[i]);
1079
1080        bpool_free(bp);
1081
1082        remove_xdp_program();
1083
1084        return 0;
1085}
1086