linux/net/xdp/xsk_queue.h
<<
>>
Prefs
   1/* SPDX-License-Identifier: GPL-2.0 */
   2/* XDP user-space ring structure
   3 * Copyright(c) 2018 Intel Corporation.
   4 */
   5
   6#ifndef _LINUX_XSK_QUEUE_H
   7#define _LINUX_XSK_QUEUE_H
   8
   9#include <linux/types.h>
  10#include <linux/if_xdp.h>
  11#include <net/xdp_sock.h>
  12
  13#define RX_BATCH_SIZE 16
  14#define LAZY_UPDATE_THRESHOLD 128
  15
  16struct xdp_ring {
  17        u32 producer ____cacheline_aligned_in_smp;
  18        u32 consumer ____cacheline_aligned_in_smp;
  19};
  20
  21/* Used for the RX and TX queues for packets */
  22struct xdp_rxtx_ring {
  23        struct xdp_ring ptrs;
  24        struct xdp_desc desc[0] ____cacheline_aligned_in_smp;
  25};
  26
  27/* Used for the fill and completion queues for buffers */
  28struct xdp_umem_ring {
  29        struct xdp_ring ptrs;
  30        u64 desc[0] ____cacheline_aligned_in_smp;
  31};
  32
  33struct xsk_queue {
  34        u64 chunk_mask;
  35        u64 size;
  36        u32 ring_mask;
  37        u32 nentries;
  38        u32 prod_head;
  39        u32 prod_tail;
  40        u32 cons_head;
  41        u32 cons_tail;
  42        struct xdp_ring *ring;
  43        u64 invalid_descs;
  44};
  45
  46/* The structure of the shared state of the rings are the same as the
  47 * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
  48 * ring, the kernel is the producer and user space is the consumer. For
  49 * the Tx and fill rings, the kernel is the consumer and user space is
  50 * the producer.
  51 *
  52 * producer                         consumer
  53 *
  54 * if (LOAD ->consumer) {           LOAD ->producer
  55 *                    (A)           smp_rmb()       (C)
  56 *    STORE $data                   LOAD $data
  57 *    smp_wmb()       (B)           smp_mb()        (D)
  58 *    STORE ->producer              STORE ->consumer
  59 * }
  60 *
  61 * (A) pairs with (D), and (B) pairs with (C).
  62 *
  63 * Starting with (B), it protects the data from being written after
  64 * the producer pointer. If this barrier was missing, the consumer
  65 * could observe the producer pointer being set and thus load the data
  66 * before the producer has written the new data. The consumer would in
  67 * this case load the old data.
  68 *
  69 * (C) protects the consumer from speculatively loading the data before
  70 * the producer pointer actually has been read. If we do not have this
  71 * barrier, some architectures could load old data as speculative loads
  72 * are not discarded as the CPU does not know there is a dependency
  73 * between ->producer and data.
  74 *
  75 * (A) is a control dependency that separates the load of ->consumer
  76 * from the stores of $data. In case ->consumer indicates there is no
  77 * room in the buffer to store $data we do not. So no barrier is needed.
  78 *
  79 * (D) protects the load of the data to be observed to happen after the
  80 * store of the consumer pointer. If we did not have this memory
  81 * barrier, the producer could observe the consumer pointer being set
  82 * and overwrite the data with a new value before the consumer got the
  83 * chance to read the old value. The consumer would thus miss reading
  84 * the old entry and very likely read the new entry twice, once right
  85 * now and again after circling through the ring.
  86 */
  87
  88/* Common functions operating for both RXTX and umem queues */
  89
  90static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
  91{
  92        return q ? q->invalid_descs : 0;
  93}
  94
  95static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
  96{
  97        u32 entries = q->prod_tail - q->cons_tail;
  98
  99        if (entries == 0) {
 100                /* Refresh the local pointer */
 101                q->prod_tail = READ_ONCE(q->ring->producer);
 102                entries = q->prod_tail - q->cons_tail;
 103        }
 104
 105        return (entries > dcnt) ? dcnt : entries;
 106}
 107
 108static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
 109{
 110        u32 free_entries = q->nentries - (producer - q->cons_tail);
 111
 112        if (free_entries >= dcnt)
 113                return free_entries;
 114
 115        /* Refresh the local tail pointer */
 116        q->cons_tail = READ_ONCE(q->ring->consumer);
 117        return q->nentries - (producer - q->cons_tail);
 118}
 119
 120static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
 121{
 122        u32 entries = q->prod_tail - q->cons_tail;
 123
 124        if (entries >= cnt)
 125                return true;
 126
 127        /* Refresh the local pointer. */
 128        q->prod_tail = READ_ONCE(q->ring->producer);
 129        entries = q->prod_tail - q->cons_tail;
 130
 131        return entries >= cnt;
 132}
 133
 134/* UMEM queue */
 135
 136static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
 137{
 138        if (addr >= q->size) {
 139                q->invalid_descs++;
 140                return false;
 141        }
 142
 143        return true;
 144}
 145
 146static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr)
 147{
 148        while (q->cons_tail != q->cons_head) {
 149                struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
 150                unsigned int idx = q->cons_tail & q->ring_mask;
 151
 152                *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask;
 153                if (xskq_is_valid_addr(q, *addr))
 154                        return addr;
 155
 156                q->cons_tail++;
 157        }
 158
 159        return NULL;
 160}
 161
 162static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr)
 163{
 164        if (q->cons_tail == q->cons_head) {
 165                smp_mb(); /* D, matches A */
 166                WRITE_ONCE(q->ring->consumer, q->cons_tail);
 167                q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
 168
 169                /* Order consumer and data */
 170                smp_rmb();
 171        }
 172
 173        return xskq_validate_addr(q, addr);
 174}
 175
 176static inline void xskq_discard_addr(struct xsk_queue *q)
 177{
 178        q->cons_tail++;
 179}
 180
 181static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
 182{
 183        struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
 184
 185        if (xskq_nb_free(q, q->prod_tail, 1) == 0)
 186                return -ENOSPC;
 187
 188        /* A, matches D */
 189        ring->desc[q->prod_tail++ & q->ring_mask] = addr;
 190
 191        /* Order producer and data */
 192        smp_wmb(); /* B, matches C */
 193
 194        WRITE_ONCE(q->ring->producer, q->prod_tail);
 195        return 0;
 196}
 197
 198static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
 199{
 200        struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
 201
 202        if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
 203                return -ENOSPC;
 204
 205        /* A, matches D */
 206        ring->desc[q->prod_head++ & q->ring_mask] = addr;
 207        return 0;
 208}
 209
 210static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
 211                                             u32 nb_entries)
 212{
 213        /* Order producer and data */
 214        smp_wmb(); /* B, matches C */
 215
 216        q->prod_tail += nb_entries;
 217        WRITE_ONCE(q->ring->producer, q->prod_tail);
 218}
 219
 220static inline int xskq_reserve_addr(struct xsk_queue *q)
 221{
 222        if (xskq_nb_free(q, q->prod_head, 1) == 0)
 223                return -ENOSPC;
 224
 225        /* A, matches D */
 226        q->prod_head++;
 227        return 0;
 228}
 229
 230/* Rx/Tx queue */
 231
 232static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d)
 233{
 234        if (!xskq_is_valid_addr(q, d->addr))
 235                return false;
 236
 237        if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) ||
 238            d->options) {
 239                q->invalid_descs++;
 240                return false;
 241        }
 242
 243        return true;
 244}
 245
 246static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
 247                                                  struct xdp_desc *desc)
 248{
 249        while (q->cons_tail != q->cons_head) {
 250                struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
 251                unsigned int idx = q->cons_tail & q->ring_mask;
 252
 253                *desc = READ_ONCE(ring->desc[idx]);
 254                if (xskq_is_valid_desc(q, desc))
 255                        return desc;
 256
 257                q->cons_tail++;
 258        }
 259
 260        return NULL;
 261}
 262
 263static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
 264                                              struct xdp_desc *desc)
 265{
 266        if (q->cons_tail == q->cons_head) {
 267                smp_mb(); /* D, matches A */
 268                WRITE_ONCE(q->ring->consumer, q->cons_tail);
 269                q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
 270
 271                /* Order consumer and data */
 272                smp_rmb(); /* C, matches B */
 273        }
 274
 275        return xskq_validate_desc(q, desc);
 276}
 277
 278static inline void xskq_discard_desc(struct xsk_queue *q)
 279{
 280        q->cons_tail++;
 281}
 282
 283static inline int xskq_produce_batch_desc(struct xsk_queue *q,
 284                                          u64 addr, u32 len)
 285{
 286        struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
 287        unsigned int idx;
 288
 289        if (xskq_nb_free(q, q->prod_head, 1) == 0)
 290                return -ENOSPC;
 291
 292        /* A, matches D */
 293        idx = (q->prod_head++) & q->ring_mask;
 294        ring->desc[idx].addr = addr;
 295        ring->desc[idx].len = len;
 296
 297        return 0;
 298}
 299
 300static inline void xskq_produce_flush_desc(struct xsk_queue *q)
 301{
 302        /* Order producer and data */
 303        smp_wmb(); /* B, matches C */
 304
 305        q->prod_tail = q->prod_head;
 306        WRITE_ONCE(q->ring->producer, q->prod_tail);
 307}
 308
 309static inline bool xskq_full_desc(struct xsk_queue *q)
 310{
 311        return xskq_nb_avail(q, q->nentries) == q->nentries;
 312}
 313
 314static inline bool xskq_empty_desc(struct xsk_queue *q)
 315{
 316        return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
 317}
 318
 319void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
 320struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
 321void xskq_destroy(struct xsk_queue *q_ops);
 322
 323/* Executed by the core when the entire UMEM gets freed */
 324void xsk_reuseq_destroy(struct xdp_umem *umem);
 325
 326#endif /* _LINUX_XSK_QUEUE_H */
 327