dpdk/drivers/event/opdl/opdl_ring.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright(c) 2017 Intel Corporation
   3 */
   4
   5#include <stdbool.h>
   6#include <stddef.h>
   7#include <stdint.h>
   8#include <stdio.h>
   9
  10#include <rte_string_fns.h>
  11#include <rte_branch_prediction.h>
  12#include <rte_debug.h>
  13#include <rte_lcore.h>
  14#include <rte_log.h>
  15#include <rte_malloc.h>
  16#include <rte_memcpy.h>
  17#include <rte_memory.h>
  18#include <rte_memzone.h>
  19#include <rte_atomic.h>
  20
  21#include "opdl_ring.h"
  22#include "opdl_log.h"
  23
  24#define LIB_NAME "opdl_ring"
  25
  26#define OPDL_NAME_SIZE 64
  27
  28
  29#define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
  30#define OPDL_FLOWID_MASK (0xFFFFF)
  31#define OPDL_OPA_MASK    (0xFF)
  32#define OPDL_OPA_OFFSET  (0x38)
  33
  34/* Types of dependency between stages */
  35enum dep_type {
  36        DEP_NONE = 0,  /* no dependency */
  37        DEP_DIRECT,  /* stage has direct dependency */
  38        DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
  39        DEP_SELF,  /* stage dependency on itself, used to detect loops */
  40};
  41
  42/* Shared section of stage state.
  43 * Care is needed when accessing and the layout is important, especially to
  44 * limit the adjacent cache-line HW prefetcher from impacting performance.
  45 */
  46struct shared_state {
  47        /* Last known minimum sequence number of dependencies, used for multi
  48         * thread operation
  49         */
  50        uint32_t available_seq;
  51        char _pad1[RTE_CACHE_LINE_SIZE * 3];
  52        uint32_t head;  /* Head sequence number (for multi thread operation) */
  53        char _pad2[RTE_CACHE_LINE_SIZE * 3];
  54        struct opdl_stage *stage;  /* back pointer */
  55        uint32_t tail;  /* Tail sequence number */
  56        char _pad3[RTE_CACHE_LINE_SIZE * 2];
  57} __rte_cache_aligned;
  58
  59/* A structure to keep track of "unfinished" claims. This is only used for
  60 * stages that are threadsafe. Each lcore accesses its own instance of this
  61 * structure to record the entries it has claimed. This allows one lcore to make
  62 * multiple claims without being blocked by another. When disclaiming it moves
  63 * forward the shared tail when the shared tail matches the tail value recorded
  64 * here.
  65 */
  66struct claim_manager {
  67        uint32_t num_to_disclaim;
  68        uint32_t num_claimed;
  69        uint32_t mgr_head;
  70        uint32_t mgr_tail;
  71        struct {
  72                uint32_t head;
  73                uint32_t tail;
  74        } claims[OPDL_DISCLAIMS_PER_LCORE];
  75} __rte_cache_aligned;
  76
  77/* Context for each stage of opdl_ring.
  78 * Calculations on sequence numbers need to be done with other uint32_t values
  79 * so that results are modulus 2^32, and not undefined.
  80 */
  81struct opdl_stage {
  82        struct opdl_ring *t;  /* back pointer, set at init */
  83        uint32_t num_slots;  /* Number of slots for entries, set at init */
  84        uint32_t index;  /* ID for this stage, set at init */
  85        bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
  86        /* Last known min seq number of dependencies for used for single thread
  87         * operation
  88         */
  89        uint32_t available_seq;
  90        uint32_t head;  /* Current head for single-thread operation */
  91        uint32_t nb_instance;  /* Number of instances */
  92        uint32_t instance_id;  /* ID of this stage instance */
  93        uint16_t num_claimed;  /* Number of slots claimed */
  94        uint16_t num_event;             /* Number of events */
  95        uint32_t seq;                   /* sequence number  */
  96        uint32_t num_deps;  /* Number of direct dependencies */
  97        /* Keep track of all dependencies, used during init only */
  98        enum dep_type *dep_tracking;
  99        /* Direct dependencies of this stage */
 100        struct shared_state **deps;
 101        /* Other stages read this! */
 102        struct shared_state shared __rte_cache_aligned;
 103        /* For managing disclaims in multi-threaded processing stages */
 104        struct claim_manager pending_disclaims[RTE_MAX_LCORE]
 105                                               __rte_cache_aligned;
 106        uint32_t shadow_head;  /* Shadow head for single-thread operation */
 107        uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
 108        uint32_t pos;           /* Atomic scan position */
 109} __rte_cache_aligned;
 110
 111/* Context for opdl_ring */
 112struct opdl_ring {
 113        char name[OPDL_NAME_SIZE];  /* OPDL queue instance name */
 114        int socket;  /* NUMA socket that memory is allocated on */
 115        uint32_t num_slots;  /* Number of slots for entries */
 116        uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
 117        uint32_t slot_size;  /* Size of each slot in bytes */
 118        uint32_t num_stages;  /* Number of stages that have been added */
 119        uint32_t max_num_stages;  /* Max number of stages */
 120        /* Stages indexed by ID */
 121        struct opdl_stage *stages;
 122        /* Memory for storing slot data */
 123        uint8_t slots[0] __rte_cache_aligned;
 124};
 125
 126
 127/* Return input stage of a opdl_ring */
 128static __rte_always_inline struct opdl_stage *
 129input_stage(const struct opdl_ring *t)
 130{
 131        return &t->stages[0];
 132}
 133
 134/* Check if a stage is the input stage */
 135static __rte_always_inline bool
 136is_input_stage(const struct opdl_stage *s)
 137{
 138        return s->index == 0;
 139}
 140
 141/* Get slot pointer from sequence number */
 142static __rte_always_inline void *
 143get_slot(const struct opdl_ring *t, uint32_t n)
 144{
 145        return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
 146}
 147
 148/* Find how many entries are available for processing */
 149static __rte_always_inline uint32_t
 150available(const struct opdl_stage *s)
 151{
 152        if (s->threadsafe == true) {
 153                uint32_t n = __atomic_load_n(&s->shared.available_seq,
 154                                __ATOMIC_ACQUIRE) -
 155                                __atomic_load_n(&s->shared.head,
 156                                __ATOMIC_ACQUIRE);
 157
 158                /* Return 0 if available_seq needs to be updated */
 159                return (n <= s->num_slots) ? n : 0;
 160        }
 161
 162        /* Single threaded */
 163        return s->available_seq - s->head;
 164}
 165
 166/* Read sequence number of dependencies and find minimum */
 167static __rte_always_inline void
 168update_available_seq(struct opdl_stage *s)
 169{
 170        uint32_t i;
 171        uint32_t this_tail = s->shared.tail;
 172        uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
 173        /* Input stage sequence numbers are greater than the sequence numbers of
 174         * its dependencies so an offset of t->num_slots is needed when
 175         * calculating available slots and also the condition which is used to
 176         * determine the dependencies minimum sequence number must be reverted.
 177         */
 178        uint32_t wrap;
 179
 180        if (is_input_stage(s)) {
 181                wrap = s->num_slots;
 182                for (i = 1; i < s->num_deps; i++) {
 183                        uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
 184                                        __ATOMIC_ACQUIRE);
 185                        if ((this_tail - seq) > (this_tail - min_seq))
 186                                min_seq = seq;
 187                }
 188        } else {
 189                wrap = 0;
 190                for (i = 1; i < s->num_deps; i++) {
 191                        uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
 192                                        __ATOMIC_ACQUIRE);
 193                        if ((seq - this_tail) < (min_seq - this_tail))
 194                                min_seq = seq;
 195                }
 196        }
 197
 198        if (s->threadsafe == false)
 199                s->available_seq = min_seq + wrap;
 200        else
 201                __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
 202                                __ATOMIC_RELEASE);
 203}
 204
 205/* Wait until the number of available slots reaches number requested */
 206static __rte_always_inline void
 207wait_for_available(struct opdl_stage *s, uint32_t n)
 208{
 209        while (available(s) < n) {
 210                rte_pause();
 211                update_available_seq(s);
 212        }
 213}
 214
 215/* Return number of slots to process based on number requested and mode */
 216static __rte_always_inline uint32_t
 217num_to_process(struct opdl_stage *s, uint32_t n, bool block)
 218{
 219        /* Don't read tail sequences of dependencies if not needed */
 220        if (available(s) >= n)
 221                return n;
 222
 223        update_available_seq(s);
 224
 225        if (block == false) {
 226                uint32_t avail = available(s);
 227
 228                if (avail == 0) {
 229                        rte_pause();
 230                        return 0;
 231                }
 232                return (avail <= n) ? avail : n;
 233        }
 234
 235        if (unlikely(n > s->num_slots)) {
 236                PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
 237                                n, s->num_slots);
 238                return 0;  /* Avoid infinite loop */
 239        }
 240        /* blocking */
 241        wait_for_available(s, n);
 242        return n;
 243}
 244
 245/* Copy entries in to slots with wrap-around */
 246static __rte_always_inline void
 247copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
 248                uint32_t num_entries)
 249{
 250        uint32_t slot_size = t->slot_size;
 251        uint32_t slot_index = start & t->mask;
 252
 253        if (slot_index + num_entries <= t->num_slots) {
 254                rte_memcpy(get_slot(t, start), entries,
 255                                num_entries * slot_size);
 256        } else {
 257                uint32_t split = t->num_slots - slot_index;
 258
 259                rte_memcpy(get_slot(t, start), entries, split * slot_size);
 260                rte_memcpy(get_slot(t, 0),
 261                                RTE_PTR_ADD(entries, split * slot_size),
 262                                (num_entries - split) * slot_size);
 263        }
 264}
 265
 266/* Copy entries out from slots with wrap-around */
 267static __rte_always_inline void
 268copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
 269                uint32_t num_entries)
 270{
 271        uint32_t slot_size = t->slot_size;
 272        uint32_t slot_index = start & t->mask;
 273
 274        if (slot_index + num_entries <= t->num_slots) {
 275                rte_memcpy(entries, get_slot(t, start),
 276                                num_entries * slot_size);
 277        } else {
 278                uint32_t split = t->num_slots - slot_index;
 279
 280                rte_memcpy(entries, get_slot(t, start), split * slot_size);
 281                rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
 282                                get_slot(t, 0),
 283                                (num_entries - split) * slot_size);
 284        }
 285}
 286
 287/* Input function optimised for single thread */
 288static __rte_always_inline uint32_t
 289opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
 290                uint32_t num_entries, bool block)
 291{
 292        struct opdl_stage *s = input_stage(t);
 293        uint32_t head = s->head;
 294
 295        num_entries = num_to_process(s, num_entries, block);
 296        if (num_entries == 0)
 297                return 0;
 298
 299        copy_entries_in(t, head, entries, num_entries);
 300
 301        s->head += num_entries;
 302        __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
 303
 304        return num_entries;
 305}
 306
 307/* Convert head and tail of claim_manager into valid index */
 308static __rte_always_inline uint32_t
 309claim_mgr_index(uint32_t n)
 310{
 311        return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
 312}
 313
 314/* Check if there are available slots in claim_manager */
 315static __rte_always_inline bool
 316claim_mgr_available(struct claim_manager *mgr)
 317{
 318        return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
 319                        true : false;
 320}
 321
 322/* Record a new claim. Only use after first checking an entry is available */
 323static __rte_always_inline void
 324claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
 325{
 326        if ((mgr->mgr_head != mgr->mgr_tail) &&
 327                        (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
 328                        tail)) {
 329                /* Combine with previous claim */
 330                mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
 331        } else {
 332                mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
 333                mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
 334                mgr->mgr_head++;
 335        }
 336
 337        mgr->num_claimed += (head - tail);
 338}
 339
 340/* Read the oldest recorded claim */
 341static __rte_always_inline bool
 342claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
 343{
 344        if (mgr->mgr_head == mgr->mgr_tail)
 345                return false;
 346
 347        *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
 348        *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
 349        return true;
 350}
 351
 352/* Remove the oldest recorded claim. Only use after first reading the entry */
 353static __rte_always_inline void
 354claim_mgr_remove(struct claim_manager *mgr)
 355{
 356        mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
 357                        mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
 358        mgr->mgr_tail++;
 359}
 360
 361/* Update tail in the oldest claim. Only use after first reading the entry */
 362static __rte_always_inline void
 363claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
 364{
 365        mgr->num_claimed -= num_entries;
 366        mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
 367}
 368
 369static __rte_always_inline void
 370opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
 371                uint32_t num_entries, bool block)
 372{
 373        struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
 374        uint32_t head;
 375        uint32_t tail;
 376
 377        while (num_entries) {
 378                bool ret = claim_mgr_read(disclaims, &tail, &head);
 379
 380                if (ret == false)
 381                        break;  /* nothing is claimed */
 382                /* There should be no race condition here. If shared.tail
 383                 * matches, no other core can update it until this one does.
 384                 */
 385                if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
 386                                tail) {
 387                        if (num_entries >= (head - tail)) {
 388                                claim_mgr_remove(disclaims);
 389                                __atomic_store_n(&s->shared.tail, head,
 390                                                __ATOMIC_RELEASE);
 391                                num_entries -= (head - tail);
 392                        } else {
 393                                claim_mgr_move_tail(disclaims, num_entries);
 394                                __atomic_store_n(&s->shared.tail,
 395                                                num_entries + tail,
 396                                                __ATOMIC_RELEASE);
 397                                num_entries = 0;
 398                        }
 399                } else if (block == false)
 400                        break;  /* blocked by other thread */
 401                /* Keep going until num_entries are disclaimed. */
 402                rte_pause();
 403        }
 404
 405        disclaims->num_to_disclaim = num_entries;
 406}
 407
 408/* Move head atomically, returning number of entries available to process and
 409 * the original value of head. For non-input stages, the claim is recorded
 410 * so that the tail can be updated later by opdl_stage_disclaim().
 411 */
 412static __rte_always_inline void
 413move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
 414                uint32_t *old_head, bool block, bool claim_func)
 415{
 416        uint32_t orig_num_entries = *num_entries;
 417        uint32_t ret;
 418        struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
 419
 420        /* Attempt to disclaim any outstanding claims */
 421        opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
 422                        false);
 423
 424        *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
 425        while (true) {
 426                bool success;
 427                /* If called by opdl_ring_input(), claim does not need to be
 428                 * recorded, as there will be no disclaim.
 429                 */
 430                if (claim_func) {
 431                        /* Check that the claim can be recorded */
 432                        ret = claim_mgr_available(disclaims);
 433                        if (ret == false) {
 434                                /* exit out if claim can't be recorded */
 435                                *num_entries = 0;
 436                                return;
 437                        }
 438                }
 439
 440                *num_entries = num_to_process(s, orig_num_entries, block);
 441                if (*num_entries == 0)
 442                        return;
 443
 444                success = __atomic_compare_exchange_n(&s->shared.head, old_head,
 445                                *old_head + *num_entries,
 446                                true,  /* may fail spuriously */
 447                                __ATOMIC_RELEASE,  /* memory order on success */
 448                                __ATOMIC_ACQUIRE);  /* memory order on fail */
 449                if (likely(success))
 450                        break;
 451                rte_pause();
 452        }
 453
 454        if (claim_func)
 455                /* Store the claim record */
 456                claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
 457}
 458
 459/* Input function that supports multiple threads */
 460static __rte_always_inline uint32_t
 461opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
 462                uint32_t num_entries, bool block)
 463{
 464        struct opdl_stage *s = input_stage(t);
 465        uint32_t old_head;
 466
 467        move_head_atomically(s, &num_entries, &old_head, block, false);
 468        if (num_entries == 0)
 469                return 0;
 470
 471        copy_entries_in(t, old_head, entries, num_entries);
 472
 473        /* If another thread started inputting before this one, but hasn't
 474         * finished, we need to wait for it to complete to update the tail.
 475         */
 476        rte_wait_until_equal_32(&s->shared.tail, old_head, __ATOMIC_ACQUIRE);
 477
 478        __atomic_store_n(&s->shared.tail, old_head + num_entries,
 479                        __ATOMIC_RELEASE);
 480
 481        return num_entries;
 482}
 483
 484static __rte_always_inline uint32_t
 485opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
 486                uint8_t this_lcore)
 487{
 488        return ((nb_p_lcores <= 1) ? 0 :
 489                        (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
 490                        nb_p_lcores);
 491}
 492
 493/* Claim slots to process, optimised for single-thread operation */
 494static __rte_always_inline uint32_t
 495opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 496                uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
 497{
 498        uint32_t i = 0, j = 0,  offset;
 499        uint32_t opa_id   = 0;
 500        uint32_t flow_id  = 0;
 501        uint64_t event    = 0;
 502        void *get_slots;
 503        struct rte_event *ev;
 504        RTE_SET_USED(seq);
 505        struct opdl_ring *t = s->t;
 506        uint8_t *entries_offset = (uint8_t *)entries;
 507
 508        if (!atomic) {
 509
 510                offset = opdl_first_entry_id(s->seq, s->nb_instance,
 511                                s->instance_id);
 512
 513                num_entries = s->nb_instance * num_entries;
 514
 515                num_entries = num_to_process(s, num_entries, block);
 516
 517                for (; offset < num_entries; offset += s->nb_instance) {
 518                        get_slots = get_slot(t, s->head + offset);
 519                        memcpy(entries_offset, get_slots, t->slot_size);
 520                        entries_offset += t->slot_size;
 521                        i++;
 522                }
 523        } else {
 524                num_entries = num_to_process(s, num_entries, block);
 525
 526                for (j = 0; j < num_entries; j++) {
 527                        ev = (struct rte_event *)get_slot(t, s->head+j);
 528
 529                        event  = __atomic_load_n(&(ev->event),
 530                                        __ATOMIC_ACQUIRE);
 531
 532                        opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
 533                        flow_id  = OPDL_FLOWID_MASK & event;
 534
 535                        if (opa_id >= s->queue_id)
 536                                continue;
 537
 538                        if ((flow_id % s->nb_instance) == s->instance_id) {
 539                                memcpy(entries_offset, ev, t->slot_size);
 540                                entries_offset += t->slot_size;
 541                                i++;
 542                        }
 543                }
 544        }
 545        s->shadow_head = s->head;
 546        s->head += num_entries;
 547        s->num_claimed = num_entries;
 548        s->num_event = i;
 549        s->pos = 0;
 550
 551        /* automatically disclaim entries if number of rte_events is zero */
 552        if (unlikely(i == 0))
 553                opdl_stage_disclaim(s, 0, false);
 554
 555        return i;
 556}
 557
 558/* Thread-safe version of function to claim slots for processing */
 559static __rte_always_inline uint32_t
 560opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
 561                uint32_t num_entries, uint32_t *seq, bool block)
 562{
 563        uint32_t old_head;
 564        struct opdl_ring *t = s->t;
 565        uint32_t i = 0, offset;
 566        uint8_t *entries_offset = (uint8_t *)entries;
 567
 568        if (seq == NULL) {
 569                PMD_DRV_LOG(ERR, "Invalid seq PTR");
 570                return 0;
 571        }
 572        offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
 573        num_entries = offset + (s->nb_instance * num_entries);
 574
 575        move_head_atomically(s, &num_entries, &old_head, block, true);
 576
 577        for (; offset < num_entries; offset += s->nb_instance) {
 578                memcpy(entries_offset, get_slot(t, s->head + offset),
 579                        t->slot_size);
 580                entries_offset += t->slot_size;
 581                i++;
 582        }
 583
 584        *seq = old_head;
 585
 586        return i;
 587}
 588
 589/* Claim and copy slot pointers, optimised for single-thread operation */
 590static __rte_always_inline uint32_t
 591opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
 592                uint32_t num_entries, uint32_t *seq, bool block)
 593{
 594        num_entries = num_to_process(s, num_entries, block);
 595        if (num_entries == 0)
 596                return 0;
 597        copy_entries_out(s->t, s->head, entries, num_entries);
 598        if (seq != NULL)
 599                *seq = s->head;
 600        s->head += num_entries;
 601        return num_entries;
 602}
 603
 604/* Thread-safe version of function to claim and copy pointers to slots */
 605static __rte_always_inline uint32_t
 606opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
 607                uint32_t num_entries, uint32_t *seq, bool block)
 608{
 609        uint32_t old_head;
 610
 611        move_head_atomically(s, &num_entries, &old_head, block, true);
 612        if (num_entries == 0)
 613                return 0;
 614        copy_entries_out(s->t, old_head, entries, num_entries);
 615        if (seq != NULL)
 616                *seq = old_head;
 617        return num_entries;
 618}
 619
 620static __rte_always_inline void
 621opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
 622                uint32_t num_entries)
 623{
 624        uint32_t old_tail = s->shared.tail;
 625
 626        if (unlikely(num_entries > (s->head - old_tail))) {
 627                PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
 628                                num_entries, s->head - old_tail);
 629                num_entries = s->head - old_tail;
 630        }
 631        __atomic_store_n(&s->shared.tail, num_entries + old_tail,
 632                        __ATOMIC_RELEASE);
 633}
 634
 635uint32_t
 636opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
 637                bool block)
 638{
 639        if (input_stage(t)->threadsafe == false)
 640                return opdl_ring_input_singlethread(t, entries, num_entries,
 641                                block);
 642        else
 643                return opdl_ring_input_multithread(t, entries, num_entries,
 644                                block);
 645}
 646
 647uint32_t
 648opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
 649                const void *entries, uint32_t num_entries, bool block)
 650{
 651        uint32_t head = s->head;
 652
 653        num_entries = num_to_process(s, num_entries, block);
 654
 655        if (num_entries == 0)
 656                return 0;
 657
 658        copy_entries_in(t, head, entries, num_entries);
 659
 660        s->head += num_entries;
 661        __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
 662
 663        return num_entries;
 664
 665}
 666
 667uint32_t
 668opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
 669                void *entries, uint32_t num_entries, bool block)
 670{
 671        uint32_t head = s->head;
 672
 673        num_entries = num_to_process(s, num_entries, block);
 674        if (num_entries == 0)
 675                return 0;
 676
 677        copy_entries_out(t, head, entries, num_entries);
 678
 679        s->head += num_entries;
 680        __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
 681
 682        return num_entries;
 683}
 684
 685uint32_t
 686opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
 687{
 688        /* return (num_to_process(s, num_entries, false)); */
 689
 690        if (available(s) >= num_entries)
 691                return num_entries;
 692
 693        update_available_seq(s);
 694
 695        uint32_t avail = available(s);
 696
 697        if (avail == 0) {
 698                rte_pause();
 699                return 0;
 700        }
 701        return (avail <= num_entries) ? avail : num_entries;
 702}
 703
 704uint32_t
 705opdl_stage_claim(struct opdl_stage *s, void *entries,
 706                uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
 707{
 708        if (s->threadsafe == false)
 709                return opdl_stage_claim_singlethread(s, entries, num_entries,
 710                                seq, block, atomic);
 711        else
 712                return opdl_stage_claim_multithread(s, entries, num_entries,
 713                                seq, block);
 714}
 715
 716uint32_t
 717opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
 718                uint32_t num_entries, uint32_t *seq, bool block)
 719{
 720        if (s->threadsafe == false)
 721                return opdl_stage_claim_copy_singlethread(s, entries,
 722                                num_entries, seq, block);
 723        else
 724                return opdl_stage_claim_copy_multithread(s, entries,
 725                                num_entries, seq, block);
 726}
 727
 728void
 729opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
 730                bool block)
 731{
 732
 733        if (s->threadsafe == false) {
 734                opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
 735        } else {
 736                struct claim_manager *disclaims =
 737                        &s->pending_disclaims[rte_lcore_id()];
 738
 739                if (unlikely(num_entries > s->num_slots)) {
 740                        PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
 741                                        num_entries, disclaims->num_claimed);
 742                        num_entries = disclaims->num_claimed;
 743                }
 744
 745                num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
 746                                disclaims->num_claimed);
 747                opdl_stage_disclaim_multithread_n(s, num_entries, block);
 748        }
 749}
 750
 751int
 752opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
 753{
 754        if (num_entries != s->num_event) {
 755                rte_errno = EINVAL;
 756                return 0;
 757        }
 758        if (s->threadsafe == false) {
 759                __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
 760                s->seq += s->num_claimed;
 761                s->shadow_head = s->head;
 762                s->num_claimed = 0;
 763        } else {
 764                struct claim_manager *disclaims =
 765                                &s->pending_disclaims[rte_lcore_id()];
 766                opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
 767                                block);
 768        }
 769        return num_entries;
 770}
 771
 772uint32_t
 773opdl_ring_available(struct opdl_ring *t)
 774{
 775        return opdl_stage_available(&t->stages[0]);
 776}
 777
 778uint32_t
 779opdl_stage_available(struct opdl_stage *s)
 780{
 781        update_available_seq(s);
 782        return available(s);
 783}
 784
 785void
 786opdl_ring_flush(struct opdl_ring *t)
 787{
 788        struct opdl_stage *s = input_stage(t);
 789
 790        wait_for_available(s, s->num_slots);
 791}
 792
 793/******************** Non performance sensitive functions ********************/
 794
 795/* Initial setup of a new stage's context */
 796static int
 797init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
 798                bool is_input)
 799{
 800        uint32_t available = (is_input) ? t->num_slots : 0;
 801
 802        s->t = t;
 803        s->num_slots = t->num_slots;
 804        s->index = t->num_stages;
 805        s->threadsafe = threadsafe;
 806        s->shared.stage = s;
 807
 808        /* Alloc memory for deps */
 809        s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
 810                        t->max_num_stages * sizeof(enum dep_type),
 811                        0, t->socket);
 812        if (s->dep_tracking == NULL)
 813                return -ENOMEM;
 814
 815        s->deps = rte_zmalloc_socket(LIB_NAME,
 816                        t->max_num_stages * sizeof(struct shared_state *),
 817                        0, t->socket);
 818        if (s->deps == NULL) {
 819                rte_free(s->dep_tracking);
 820                return -ENOMEM;
 821        }
 822
 823        s->dep_tracking[s->index] = DEP_SELF;
 824
 825        if (threadsafe == true)
 826                s->shared.available_seq = available;
 827        else
 828                s->available_seq = available;
 829
 830        return 0;
 831}
 832
 833/* Add direct or indirect dependencies between stages */
 834static int
 835add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
 836                enum dep_type type)
 837{
 838        struct opdl_ring *t = dependent->t;
 839        uint32_t i;
 840
 841        /* Add new direct dependency */
 842        if ((type == DEP_DIRECT) &&
 843                        (dependent->dep_tracking[dependency->index] ==
 844                                        DEP_NONE)) {
 845                PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
 846                                t->name, dependent->index, dependency->index);
 847                dependent->dep_tracking[dependency->index] = DEP_DIRECT;
 848        }
 849
 850        /* Add new indirect dependency or change direct to indirect */
 851        if ((type == DEP_INDIRECT) &&
 852                        ((dependent->dep_tracking[dependency->index] ==
 853                        DEP_NONE) ||
 854                        (dependent->dep_tracking[dependency->index] ==
 855                        DEP_DIRECT))) {
 856                PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
 857                                t->name, dependent->index, dependency->index);
 858                dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
 859        }
 860
 861        /* Shouldn't happen... */
 862        if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
 863                        (dependent != input_stage(t))) {
 864                PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
 865                                t->name, dependent->index);
 866                return -EINVAL;
 867        }
 868
 869        /* Keep going to dependencies of the dependency, until input stage */
 870        if (dependency != input_stage(t))
 871                for (i = 0; i < dependency->num_deps; i++) {
 872                        int ret = add_dep(dependent, dependency->deps[i]->stage,
 873                                        DEP_INDIRECT);
 874
 875                        if (ret < 0)
 876                                return ret;
 877                }
 878
 879        /* Make list of sequence numbers for direct dependencies only */
 880        if (type == DEP_DIRECT)
 881                for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
 882                        if (dependent->dep_tracking[i] == DEP_DIRECT) {
 883                                if ((i == 0) && (dependent->num_deps > 1))
 884                                        rte_panic("%s:%u depends on > input",
 885                                                        t->name,
 886                                                        dependent->index);
 887                                dependent->deps[dependent->num_deps++] =
 888                                                &t->stages[i].shared;
 889                        }
 890
 891        return 0;
 892}
 893
 894struct opdl_ring *
 895opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
 896                uint32_t max_num_stages, int socket)
 897{
 898        struct opdl_ring *t;
 899        char mz_name[RTE_MEMZONE_NAMESIZE];
 900        int mz_flags = 0;
 901        struct opdl_stage *st = NULL;
 902        const struct rte_memzone *mz = NULL;
 903        size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
 904                        (num_slots * slot_size));
 905
 906        /* Compile time checking */
 907        RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
 908                        0);
 909        RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
 910                        RTE_CACHE_LINE_MASK) != 0);
 911        RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
 912                        RTE_CACHE_LINE_MASK) != 0);
 913        RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
 914
 915        /* Parameter checking */
 916        if (name == NULL) {
 917                PMD_DRV_LOG(ERR, "name param is NULL");
 918                return NULL;
 919        }
 920        if (!rte_is_power_of_2(num_slots)) {
 921                PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
 922                                num_slots, name);
 923                return NULL;
 924        }
 925
 926        /* Alloc memory for stages */
 927        st = rte_zmalloc_socket(LIB_NAME,
 928                max_num_stages * sizeof(struct opdl_stage),
 929                RTE_CACHE_LINE_SIZE, socket);
 930        if (st == NULL)
 931                goto exit_fail;
 932
 933        snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
 934
 935        /* Alloc memory for memzone */
 936        mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
 937        if (mz == NULL)
 938                goto exit_fail;
 939
 940        t = mz->addr;
 941
 942        /* Initialise opdl_ring queue */
 943        memset(t, 0, sizeof(*t));
 944        strlcpy(t->name, name, sizeof(t->name));
 945        t->socket = socket;
 946        t->num_slots = num_slots;
 947        t->mask = num_slots - 1;
 948        t->slot_size = slot_size;
 949        t->max_num_stages = max_num_stages;
 950        t->stages = st;
 951
 952        PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
 953                        t->name, t, num_slots, socket, slot_size);
 954
 955        return t;
 956
 957exit_fail:
 958        PMD_DRV_LOG(ERR, "Cannot reserve memory");
 959        rte_free(st);
 960        rte_memzone_free(mz);
 961
 962        return NULL;
 963}
 964
 965void *
 966opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
 967{
 968        return get_slot(t, index);
 969}
 970
 971bool
 972opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
 973                uint32_t index, bool atomic)
 974{
 975        uint32_t i = 0, offset;
 976        struct opdl_ring *t = s->t;
 977        struct rte_event *ev_orig = NULL;
 978        bool ev_updated = false;
 979        uint64_t ev_temp    = 0;
 980        uint64_t ev_update  = 0;
 981
 982        uint32_t opa_id   = 0;
 983        uint32_t flow_id  = 0;
 984        uint64_t event    = 0;
 985
 986        if (index > s->num_event) {
 987                PMD_DRV_LOG(ERR, "index is overflow");
 988                return ev_updated;
 989        }
 990
 991        ev_temp = ev->event & OPDL_EVENT_MASK;
 992
 993        if (!atomic) {
 994                offset = opdl_first_entry_id(s->seq, s->nb_instance,
 995                                s->instance_id);
 996                offset += index*s->nb_instance;
 997                ev_orig = get_slot(t, s->shadow_head+offset);
 998                if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
 999                        ev_orig->event = ev->event;
1000                        ev_updated = true;
1001                }
1002                if (ev_orig->u64 != ev->u64) {
1003                        ev_orig->u64 = ev->u64;
1004                        ev_updated = true;
1005                }
1006
1007        } else {
1008                for (i = s->pos; i < s->num_claimed; i++) {
1009                        ev_orig = (struct rte_event *)
1010                                get_slot(t, s->shadow_head+i);
1011
1012                        event  = __atomic_load_n(&(ev_orig->event),
1013                                        __ATOMIC_ACQUIRE);
1014
1015                        opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
1016                        flow_id  = OPDL_FLOWID_MASK & event;
1017
1018                        if (opa_id >= s->queue_id)
1019                                continue;
1020
1021                        if ((flow_id % s->nb_instance) == s->instance_id) {
1022                                ev_update = s->queue_id;
1023                                ev_update = (ev_update << OPDL_OPA_OFFSET)
1024                                        | ev->event;
1025
1026                                s->pos = i + 1;
1027
1028                                if ((event & OPDL_EVENT_MASK) !=
1029                                                ev_temp) {
1030                                        __atomic_store_n(&(ev_orig->event),
1031                                                        ev_update,
1032                                                        __ATOMIC_RELEASE);
1033                                        ev_updated = true;
1034                                }
1035                                if (ev_orig->u64 != ev->u64) {
1036                                        ev_orig->u64 = ev->u64;
1037                                        ev_updated = true;
1038                                }
1039
1040                                break;
1041                        }
1042                }
1043
1044        }
1045
1046        return ev_updated;
1047}
1048
1049int
1050opdl_ring_get_socket(const struct opdl_ring *t)
1051{
1052        return t->socket;
1053}
1054
1055uint32_t
1056opdl_ring_get_num_slots(const struct opdl_ring *t)
1057{
1058        return t->num_slots;
1059}
1060
1061const char *
1062opdl_ring_get_name(const struct opdl_ring *t)
1063{
1064        return t->name;
1065}
1066
1067/* Check dependency list is valid for a given opdl_ring */
1068static int
1069check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1070                uint32_t num_deps)
1071{
1072        unsigned int i;
1073
1074        for (i = 0; i < num_deps; ++i) {
1075                if (!deps[i]) {
1076                        PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1077                        return -EINVAL;
1078                }
1079                if (t != deps[i]->t) {
1080                        PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1081                                        i, deps[i]->t->name, t->name);
1082                        return -EINVAL;
1083                }
1084        }
1085
1086        return 0;
1087}
1088
1089struct opdl_stage *
1090opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1091{
1092        struct opdl_stage *s;
1093
1094        /* Parameter checking */
1095        if (!t) {
1096                PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1097                return NULL;
1098        }
1099        if (t->num_stages == t->max_num_stages) {
1100                PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1101                                t->name, t->max_num_stages);
1102                return NULL;
1103        }
1104
1105        s = &t->stages[t->num_stages];
1106
1107        if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1108                PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1109                                &s->shared, t->name);
1110
1111        if (init_stage(t, s, threadsafe, is_input) < 0) {
1112                PMD_DRV_LOG(ERR, "Cannot reserve memory");
1113                return NULL;
1114        }
1115        t->num_stages++;
1116
1117        return s;
1118}
1119
1120uint32_t
1121opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1122                uint32_t nb_instance, uint32_t instance_id,
1123                struct opdl_stage *deps[],
1124                uint32_t num_deps)
1125{
1126        uint32_t i;
1127        int ret = 0;
1128
1129        if ((num_deps > 0) && (!deps)) {
1130                PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1131                return -1;
1132        }
1133        ret = check_deps(t, deps, num_deps);
1134        if (ret < 0)
1135                return ret;
1136
1137        for (i = 0; i < num_deps; i++) {
1138                ret = add_dep(s, deps[i], DEP_DIRECT);
1139                if (ret < 0)
1140                        return ret;
1141        }
1142
1143        s->nb_instance = nb_instance;
1144        s->instance_id = instance_id;
1145
1146        return ret;
1147}
1148
1149struct opdl_stage *
1150opdl_ring_get_input_stage(const struct opdl_ring *t)
1151{
1152        return input_stage(t);
1153}
1154
1155int
1156opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1157                uint32_t num_deps)
1158{
1159        unsigned int i;
1160        int ret;
1161
1162        if ((num_deps == 0) || (!deps)) {
1163                PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1164                return -EINVAL;
1165        }
1166
1167        ret = check_deps(s->t, deps, num_deps);
1168        if (ret < 0)
1169                return ret;
1170
1171        /* Update deps */
1172        for (i = 0; i < num_deps; i++)
1173                s->deps[i] = &deps[i]->shared;
1174        s->num_deps = num_deps;
1175
1176        return 0;
1177}
1178
1179struct opdl_ring *
1180opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1181{
1182        return s->t;
1183}
1184
1185void
1186opdl_stage_set_queue_id(struct opdl_stage *s,
1187                uint32_t queue_id)
1188{
1189        s->queue_id = queue_id;
1190}
1191
1192void
1193opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1194{
1195        uint32_t i;
1196
1197        if (t == NULL) {
1198                fprintf(f, "NULL OPDL!\n");
1199                return;
1200        }
1201        fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1202                        t->name, t->num_slots, t->mask, t->slot_size,
1203                        t->num_stages, t->socket);
1204        for (i = 0; i < t->num_stages; i++) {
1205                uint32_t j;
1206                const struct opdl_stage *s = &t->stages[i];
1207
1208                fprintf(f, "  %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1209                                t->name, i, (s->threadsafe) ? "true" : "false",
1210                                (s->threadsafe) ? s->shared.head : s->head,
1211                                (s->threadsafe) ? s->shared.available_seq :
1212                                s->available_seq,
1213                                s->shared.tail, (s->num_deps > 0) ?
1214                                s->deps[0]->stage->index : 0);
1215                for (j = 1; j < s->num_deps; j++)
1216                        fprintf(f, ",%u", s->deps[j]->stage->index);
1217                fprintf(f, "\n");
1218        }
1219        fflush(f);
1220}
1221
1222void
1223opdl_ring_free(struct opdl_ring *t)
1224{
1225        uint32_t i;
1226        const struct rte_memzone *mz;
1227        char mz_name[RTE_MEMZONE_NAMESIZE];
1228
1229        if (t == NULL) {
1230                PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1231                return;
1232        }
1233
1234        PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1235
1236        for (i = 0; i < t->num_stages; ++i) {
1237                rte_free(t->stages[i].deps);
1238                rte_free(t->stages[i].dep_tracking);
1239        }
1240
1241        rte_free(t->stages);
1242
1243        snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1244        mz = rte_memzone_lookup(mz_name);
1245        if (rte_memzone_free(mz) != 0)
1246                PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1247}
1248
1249/* search a opdl_ring from its name */
1250struct opdl_ring *
1251opdl_ring_lookup(const char *name)
1252{
1253        const struct rte_memzone *mz;
1254        char mz_name[RTE_MEMZONE_NAMESIZE];
1255
1256        snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1257
1258        mz = rte_memzone_lookup(mz_name);
1259        if (mz == NULL)
1260                return NULL;
1261
1262        return mz->addr;
1263}
1264
1265void
1266opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1267{
1268        s->threadsafe = threadsafe;
1269}
1270