dpdk/lib/distributor/rte_distributor_single.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright(c) 2010-2014 Intel Corporation
   3 */
   4
   5#include <stdio.h>
   6#include <sys/queue.h>
   7#include <string.h>
   8#include <rte_mbuf.h>
   9#include <rte_memory.h>
  10#include <rte_memzone.h>
  11#include <rte_errno.h>
  12#include <rte_string_fns.h>
  13#include <rte_eal_memconfig.h>
  14#include <rte_pause.h>
  15#include <rte_tailq.h>
  16
  17#include "rte_distributor_single.h"
  18#include "distributor_private.h"
  19
  20TAILQ_HEAD(rte_distributor_list, rte_distributor_single);
  21
  22static struct rte_tailq_elem rte_distributor_tailq = {
  23        .name = "RTE_DISTRIBUTOR",
  24};
  25EAL_REGISTER_TAILQ(rte_distributor_tailq)
  26
  27/**** APIs called by workers ****/
  28
  29void
  30rte_distributor_request_pkt_single(struct rte_distributor_single *d,
  31                unsigned worker_id, struct rte_mbuf *oldpkt)
  32{
  33        union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
  34        int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
  35                        | RTE_DISTRIB_GET_BUF;
  36        RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
  37                ==, 0, __ATOMIC_RELAXED);
  38
  39        /* Sync with distributor on GET_BUF flag. */
  40        __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
  41}
  42
  43struct rte_mbuf *
  44rte_distributor_poll_pkt_single(struct rte_distributor_single *d,
  45                unsigned worker_id)
  46{
  47        union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
  48        /* Sync with distributor. Acquire bufptr64. */
  49        if (__atomic_load_n(&buf->bufptr64, __ATOMIC_ACQUIRE)
  50                & RTE_DISTRIB_GET_BUF)
  51                return NULL;
  52
  53        /* since bufptr64 is signed, this should be an arithmetic shift */
  54        int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
  55        return (struct rte_mbuf *)((uintptr_t)ret);
  56}
  57
  58struct rte_mbuf *
  59rte_distributor_get_pkt_single(struct rte_distributor_single *d,
  60                unsigned worker_id, struct rte_mbuf *oldpkt)
  61{
  62        struct rte_mbuf *ret;
  63        rte_distributor_request_pkt_single(d, worker_id, oldpkt);
  64        while ((ret = rte_distributor_poll_pkt_single(d, worker_id)) == NULL)
  65                rte_pause();
  66        return ret;
  67}
  68
  69int
  70rte_distributor_return_pkt_single(struct rte_distributor_single *d,
  71                unsigned worker_id, struct rte_mbuf *oldpkt)
  72{
  73        union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
  74        uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
  75                        | RTE_DISTRIB_RETURN_BUF;
  76        RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
  77                ==, 0, __ATOMIC_RELAXED);
  78
  79        /* Sync with distributor on RETURN_BUF flag. */
  80        __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
  81        return 0;
  82}
  83
  84/**** APIs called on distributor core ***/
  85
  86/* as name suggests, adds a packet to the backlog for a particular worker */
  87static int
  88add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
  89{
  90        if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
  91                return -1;
  92
  93        bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
  94                        = item;
  95        return 0;
  96}
  97
  98/* takes the next packet for a worker off the backlog */
  99static int64_t
 100backlog_pop(struct rte_distributor_backlog *bl)
 101{
 102        bl->count--;
 103        return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
 104}
 105
 106/* stores a packet returned from a worker inside the returns array */
 107static inline void
 108store_return(uintptr_t oldbuf, struct rte_distributor_single *d,
 109                unsigned *ret_start, unsigned *ret_count)
 110{
 111        /* store returns in a circular buffer - code is branch-free */
 112        d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
 113                        = (void *)oldbuf;
 114        *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
 115        *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
 116}
 117
 118static inline void
 119handle_worker_shutdown(struct rte_distributor_single *d, unsigned int wkr)
 120{
 121        d->in_flight_tags[wkr] = 0;
 122        d->in_flight_bitmask &= ~(1UL << wkr);
 123        /* Sync with worker. Release bufptr64. */
 124        __atomic_store_n(&(d->bufs[wkr].bufptr64), 0, __ATOMIC_RELEASE);
 125        if (unlikely(d->backlog[wkr].count != 0)) {
 126                /* On return of a packet, we need to move the
 127                 * queued packets for this core elsewhere.
 128                 * Easiest solution is to set things up for
 129                 * a recursive call. That will cause those
 130                 * packets to be queued up for the next free
 131                 * core, i.e. it will return as soon as a
 132                 * core becomes free to accept the first
 133                 * packet, as subsequent ones will be added to
 134                 * the backlog for that core.
 135                 */
 136                struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
 137                unsigned i;
 138                struct rte_distributor_backlog *bl = &d->backlog[wkr];
 139
 140                for (i = 0; i < bl->count; i++) {
 141                        unsigned idx = (bl->start + i) &
 142                                        RTE_DISTRIB_BACKLOG_MASK;
 143                        pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
 144                                        RTE_DISTRIB_FLAG_BITS));
 145                }
 146                /* recursive call.
 147                 * Note that the tags were set before first level call
 148                 * to rte_distributor_process.
 149                 */
 150                rte_distributor_process_single(d, pkts, i);
 151                bl->count = bl->start = 0;
 152        }
 153}
 154
 155/* this function is called when process() fn is called without any new
 156 * packets. It goes through all the workers and clears any returned packets
 157 * to do a partial flush.
 158 */
 159static int
 160process_returns(struct rte_distributor_single *d)
 161{
 162        unsigned wkr;
 163        unsigned flushed = 0;
 164        unsigned ret_start = d->returns.start,
 165                        ret_count = d->returns.count;
 166
 167        for (wkr = 0; wkr < d->num_workers; wkr++) {
 168                uintptr_t oldbuf = 0;
 169                /* Sync with worker. Acquire bufptr64. */
 170                const int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64),
 171                                                        __ATOMIC_ACQUIRE);
 172
 173                if (data & RTE_DISTRIB_GET_BUF) {
 174                        flushed++;
 175                        if (d->backlog[wkr].count)
 176                                /* Sync with worker. Release bufptr64. */
 177                                __atomic_store_n(&(d->bufs[wkr].bufptr64),
 178                                        backlog_pop(&d->backlog[wkr]),
 179                                        __ATOMIC_RELEASE);
 180                        else {
 181                                /* Sync with worker on GET_BUF flag. */
 182                                __atomic_store_n(&(d->bufs[wkr].bufptr64),
 183                                        RTE_DISTRIB_GET_BUF,
 184                                        __ATOMIC_RELEASE);
 185                                d->in_flight_tags[wkr] = 0;
 186                                d->in_flight_bitmask &= ~(1UL << wkr);
 187                        }
 188                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
 189                } else if (data & RTE_DISTRIB_RETURN_BUF) {
 190                        handle_worker_shutdown(d, wkr);
 191                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
 192                }
 193
 194                store_return(oldbuf, d, &ret_start, &ret_count);
 195        }
 196
 197        d->returns.start = ret_start;
 198        d->returns.count = ret_count;
 199
 200        return flushed;
 201}
 202
 203/* process a set of packets to distribute them to workers */
 204int
 205rte_distributor_process_single(struct rte_distributor_single *d,
 206                struct rte_mbuf **mbufs, unsigned num_mbufs)
 207{
 208        unsigned next_idx = 0;
 209        unsigned wkr = 0;
 210        struct rte_mbuf *next_mb = NULL;
 211        int64_t next_value = 0;
 212        uint32_t new_tag = 0;
 213        unsigned ret_start = d->returns.start,
 214                        ret_count = d->returns.count;
 215
 216        if (unlikely(num_mbufs == 0))
 217                return process_returns(d);
 218
 219        while (next_idx < num_mbufs || next_mb != NULL) {
 220                uintptr_t oldbuf = 0;
 221                /* Sync with worker. Acquire bufptr64. */
 222                int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64),
 223                                                __ATOMIC_ACQUIRE);
 224
 225                if (!next_mb) {
 226                        next_mb = mbufs[next_idx++];
 227                        next_value = (((int64_t)(uintptr_t)next_mb)
 228                                        << RTE_DISTRIB_FLAG_BITS);
 229                        /*
 230                         * User is advocated to set tag value for each
 231                         * mbuf before calling rte_distributor_process.
 232                         * User defined tags are used to identify flows,
 233                         * or sessions.
 234                         */
 235                        new_tag = next_mb->hash.usr;
 236
 237                        /*
 238                         * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
 239                         * then the size of match has to be expanded.
 240                         */
 241                        uint64_t match = 0;
 242                        unsigned i;
 243                        /*
 244                         * to scan for a match use "xor" and "not" to get a 0/1
 245                         * value, then use shifting to merge to single "match"
 246                         * variable, where a one-bit indicates a match for the
 247                         * worker given by the bit-position
 248                         */
 249                        for (i = 0; i < d->num_workers; i++)
 250                                match |= (!(d->in_flight_tags[i] ^ new_tag)
 251                                        << i);
 252
 253                        /* Only turned-on bits are considered as match */
 254                        match &= d->in_flight_bitmask;
 255
 256                        if (match) {
 257                                next_mb = NULL;
 258                                unsigned worker = __builtin_ctzl(match);
 259                                if (add_to_backlog(&d->backlog[worker],
 260                                                next_value) < 0)
 261                                        next_idx--;
 262                        }
 263                }
 264
 265                if ((data & RTE_DISTRIB_GET_BUF) &&
 266                                (d->backlog[wkr].count || next_mb)) {
 267
 268                        if (d->backlog[wkr].count)
 269                                /* Sync with worker. Release bufptr64. */
 270                                __atomic_store_n(&(d->bufs[wkr].bufptr64),
 271                                                backlog_pop(&d->backlog[wkr]),
 272                                                __ATOMIC_RELEASE);
 273
 274                        else {
 275                                /* Sync with worker. Release bufptr64.  */
 276                                __atomic_store_n(&(d->bufs[wkr].bufptr64),
 277                                                next_value,
 278                                                __ATOMIC_RELEASE);
 279                                d->in_flight_tags[wkr] = new_tag;
 280                                d->in_flight_bitmask |= (1UL << wkr);
 281                                next_mb = NULL;
 282                        }
 283                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
 284                } else if (data & RTE_DISTRIB_RETURN_BUF) {
 285                        handle_worker_shutdown(d, wkr);
 286                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
 287                }
 288
 289                /* store returns in a circular buffer */
 290                store_return(oldbuf, d, &ret_start, &ret_count);
 291
 292                if (++wkr == d->num_workers)
 293                        wkr = 0;
 294        }
 295        /* to finish, check all workers for backlog and schedule work for them
 296         * if they are ready */
 297        for (wkr = 0; wkr < d->num_workers; wkr++)
 298                if (d->backlog[wkr].count &&
 299                                /* Sync with worker. Acquire bufptr64. */
 300                                (__atomic_load_n(&(d->bufs[wkr].bufptr64),
 301                                __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
 302
 303                        int64_t oldbuf = d->bufs[wkr].bufptr64 >>
 304                                        RTE_DISTRIB_FLAG_BITS;
 305
 306                        store_return(oldbuf, d, &ret_start, &ret_count);
 307
 308                        /* Sync with worker. Release bufptr64. */
 309                        __atomic_store_n(&(d->bufs[wkr].bufptr64),
 310                                backlog_pop(&d->backlog[wkr]),
 311                                __ATOMIC_RELEASE);
 312                }
 313
 314        d->returns.start = ret_start;
 315        d->returns.count = ret_count;
 316        return num_mbufs;
 317}
 318
 319/* return to the caller, packets returned from workers */
 320int
 321rte_distributor_returned_pkts_single(struct rte_distributor_single *d,
 322                struct rte_mbuf **mbufs, unsigned max_mbufs)
 323{
 324        struct rte_distributor_returned_pkts *returns = &d->returns;
 325        unsigned retval = (max_mbufs < returns->count) ?
 326                        max_mbufs : returns->count;
 327        unsigned i;
 328
 329        for (i = 0; i < retval; i++) {
 330                unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
 331                mbufs[i] = returns->mbufs[idx];
 332        }
 333        returns->start += i;
 334        returns->count -= i;
 335
 336        return retval;
 337}
 338
 339/* return the number of packets in-flight in a distributor, i.e. packets
 340 * being worked on or queued up in a backlog.
 341 */
 342static inline unsigned
 343total_outstanding(const struct rte_distributor_single *d)
 344{
 345        unsigned wkr, total_outstanding;
 346
 347        total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
 348
 349        for (wkr = 0; wkr < d->num_workers; wkr++)
 350                total_outstanding += d->backlog[wkr].count;
 351
 352        return total_outstanding;
 353}
 354
 355/* flush the distributor, so that there are no outstanding packets in flight or
 356 * queued up. */
 357int
 358rte_distributor_flush_single(struct rte_distributor_single *d)
 359{
 360        const unsigned flushed = total_outstanding(d);
 361
 362        while (total_outstanding(d) > 0)
 363                rte_distributor_process_single(d, NULL, 0);
 364
 365        return flushed;
 366}
 367
 368/* clears the internal returns array in the distributor */
 369void
 370rte_distributor_clear_returns_single(struct rte_distributor_single *d)
 371{
 372        d->returns.start = d->returns.count = 0;
 373#ifndef __OPTIMIZE__
 374        memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
 375#endif
 376}
 377
 378/* creates a distributor instance */
 379struct rte_distributor_single *
 380rte_distributor_create_single(const char *name,
 381                unsigned socket_id,
 382                unsigned num_workers)
 383{
 384        struct rte_distributor_single *d;
 385        struct rte_distributor_list *distributor_list;
 386        char mz_name[RTE_MEMZONE_NAMESIZE];
 387        const struct rte_memzone *mz;
 388
 389        /* compilation-time checks */
 390        RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
 391        RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
 392        RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
 393                                sizeof(d->in_flight_bitmask) * CHAR_BIT);
 394
 395        if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
 396                rte_errno = EINVAL;
 397                return NULL;
 398        }
 399
 400        snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
 401        mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
 402        if (mz == NULL) {
 403                rte_errno = ENOMEM;
 404                return NULL;
 405        }
 406
 407        d = mz->addr;
 408        strlcpy(d->name, name, sizeof(d->name));
 409        d->num_workers = num_workers;
 410
 411        distributor_list = RTE_TAILQ_CAST(rte_distributor_tailq.head,
 412                                          rte_distributor_list);
 413
 414        rte_mcfg_tailq_write_lock();
 415        TAILQ_INSERT_TAIL(distributor_list, d, next);
 416        rte_mcfg_tailq_write_unlock();
 417
 418        return d;
 419}
 420