dpdk/drivers/event/dsw/dsw_event.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright(c) 2018 Ericsson AB
   3 */
   4
   5#include "dsw_evdev.h"
   6
   7#ifdef DSW_SORT_DEQUEUED
   8#include "dsw_sort.h"
   9#endif
  10
  11#include <stdbool.h>
  12#include <string.h>
  13
  14#include <rte_cycles.h>
  15#include <rte_memcpy.h>
  16#include <rte_random.h>
  17
  18static bool
  19dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
  20                         int32_t credits)
  21{
  22        int32_t inflight_credits = port->inflight_credits;
  23        int32_t missing_credits = credits - inflight_credits;
  24        int32_t total_on_loan;
  25        int32_t available;
  26        int32_t acquired_credits;
  27        int32_t new_total_on_loan;
  28
  29        if (likely(missing_credits <= 0)) {
  30                port->inflight_credits -= credits;
  31                return true;
  32        }
  33
  34        total_on_loan =
  35                __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED);
  36        available = dsw->max_inflight - total_on_loan;
  37        acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
  38
  39        if (available < acquired_credits)
  40                return false;
  41
  42        /* This is a race, no locks are involved, and thus some other
  43         * thread can allocate tokens in between the check and the
  44         * allocation.
  45         */
  46        new_total_on_loan =
  47            __atomic_add_fetch(&dsw->credits_on_loan, acquired_credits,
  48                               __ATOMIC_RELAXED);
  49
  50        if (unlikely(new_total_on_loan > dsw->max_inflight)) {
  51                /* Some other port took the last credits */
  52                __atomic_sub_fetch(&dsw->credits_on_loan, acquired_credits,
  53                                   __ATOMIC_RELAXED);
  54                return false;
  55        }
  56
  57        DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
  58                        acquired_credits);
  59
  60        port->inflight_credits += acquired_credits;
  61        port->inflight_credits -= credits;
  62
  63        return true;
  64}
  65
  66static void
  67dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
  68                        int32_t credits)
  69{
  70        port->inflight_credits += credits;
  71
  72        if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
  73                int32_t leave_credits = DSW_PORT_MIN_CREDITS;
  74                int32_t return_credits =
  75                        port->inflight_credits - leave_credits;
  76
  77                port->inflight_credits = leave_credits;
  78
  79                __atomic_sub_fetch(&dsw->credits_on_loan, return_credits,
  80                                   __ATOMIC_RELAXED);
  81
  82                DSW_LOG_DP_PORT(DEBUG, port->id,
  83                                "Returned %d tokens to pool.\n",
  84                                return_credits);
  85        }
  86}
  87
  88static void
  89dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
  90                       uint16_t num_forward, uint16_t num_release)
  91{
  92        port->new_enqueued += num_new;
  93        port->forward_enqueued += num_forward;
  94        port->release_enqueued += num_release;
  95}
  96
  97static void
  98dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
  99{
 100        source_port->queue_enqueued[queue_id]++;
 101}
 102
 103static void
 104dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
 105{
 106        port->dequeued += num;
 107}
 108
 109static void
 110dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
 111{
 112        source_port->queue_dequeued[queue_id]++;
 113}
 114
 115static void
 116dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
 117{
 118        if (dequeued > 0 && port->busy_start == 0)
 119                /* work period begins */
 120                port->busy_start = rte_get_timer_cycles();
 121        else if (dequeued == 0 && port->busy_start > 0) {
 122                /* work period ends */
 123                uint64_t work_period =
 124                        rte_get_timer_cycles() - port->busy_start;
 125                port->busy_cycles += work_period;
 126                port->busy_start = 0;
 127        }
 128}
 129
 130static int16_t
 131dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
 132{
 133        uint64_t passed = now - port->measurement_start;
 134        uint64_t busy_cycles = port->busy_cycles;
 135
 136        if (port->busy_start > 0) {
 137                busy_cycles += (now - port->busy_start);
 138                port->busy_start = now;
 139        }
 140
 141        int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
 142
 143        port->measurement_start = now;
 144        port->busy_cycles = 0;
 145
 146        port->total_busy_cycles += busy_cycles;
 147
 148        return load;
 149}
 150
 151static void
 152dsw_port_load_update(struct dsw_port *port, uint64_t now)
 153{
 154        int16_t old_load;
 155        int16_t period_load;
 156        int16_t new_load;
 157
 158        old_load = __atomic_load_n(&port->load, __ATOMIC_RELAXED);
 159
 160        period_load = dsw_port_load_close_period(port, now);
 161
 162        new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
 163                (DSW_OLD_LOAD_WEIGHT+1);
 164
 165        __atomic_store_n(&port->load, new_load, __ATOMIC_RELAXED);
 166
 167        /* The load of the recently immigrated flows should hopefully
 168         * be reflected the load estimate by now.
 169         */
 170        __atomic_store_n(&port->immigration_load, 0, __ATOMIC_RELAXED);
 171}
 172
 173static void
 174dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
 175{
 176        if (now < port->next_load_update)
 177                return;
 178
 179        port->next_load_update = now + port->load_update_interval;
 180
 181        dsw_port_load_update(port, now);
 182}
 183
 184static void
 185dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
 186{
 187        /* there's always room on the ring */
 188        while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
 189                rte_pause();
 190}
 191
 192static int
 193dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
 194{
 195        return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
 196}
 197
 198static void
 199dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
 200                       uint8_t type, struct dsw_queue_flow *qfs,
 201                       uint8_t qfs_len)
 202{
 203        uint16_t port_id;
 204        struct dsw_ctl_msg msg = {
 205                .type = type,
 206                .originating_port_id = source_port->id,
 207                .qfs_len = qfs_len
 208        };
 209
 210        memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
 211
 212        for (port_id = 0; port_id < dsw->num_ports; port_id++)
 213                if (port_id != source_port->id)
 214                        dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
 215}
 216
 217static __rte_always_inline bool
 218dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
 219                         uint8_t queue_id, uint16_t flow_hash)
 220{
 221        uint16_t i;
 222
 223        for (i = 0; i < qfs_len; i++)
 224                if (qfs[i].queue_id == queue_id &&
 225                    qfs[i].flow_hash == flow_hash)
 226                        return true;
 227
 228        return false;
 229}
 230
 231static __rte_always_inline bool
 232dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
 233                        uint16_t flow_hash)
 234{
 235        return dsw_is_queue_flow_in_ary(port->paused_flows,
 236                                        port->paused_flows_len,
 237                                        queue_id, flow_hash);
 238}
 239
 240static void
 241dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
 242                          uint8_t qfs_len)
 243{
 244        uint8_t i;
 245
 246        for (i = 0; i < qfs_len; i++) {
 247                struct dsw_queue_flow *qf = &qfs[i];
 248
 249                DSW_LOG_DP_PORT(DEBUG, port->id,
 250                                "Pausing queue_id %d flow_hash %d.\n",
 251                                qf->queue_id, qf->flow_hash);
 252
 253                port->paused_flows[port->paused_flows_len] = *qf;
 254                port->paused_flows_len++;
 255        };
 256}
 257
 258static void
 259dsw_port_remove_paused_flow(struct dsw_port *port,
 260                            struct dsw_queue_flow *target_qf)
 261{
 262        uint16_t i;
 263
 264        for (i = 0; i < port->paused_flows_len; i++) {
 265                struct dsw_queue_flow *qf = &port->paused_flows[i];
 266
 267                if (qf->queue_id == target_qf->queue_id &&
 268                    qf->flow_hash == target_qf->flow_hash) {
 269                        uint16_t last_idx = port->paused_flows_len-1;
 270                        if (i != last_idx)
 271                                port->paused_flows[i] =
 272                                        port->paused_flows[last_idx];
 273                        port->paused_flows_len--;
 274                        break;
 275                }
 276        }
 277}
 278
 279static void
 280dsw_port_remove_paused_flows(struct dsw_port *port,
 281                             struct dsw_queue_flow *qfs, uint8_t qfs_len)
 282{
 283        uint8_t i;
 284
 285        for (i = 0; i < qfs_len; i++)
 286                dsw_port_remove_paused_flow(port, &qfs[i]);
 287
 288}
 289
 290static void
 291dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
 292
 293static void
 294dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
 295                            uint8_t originating_port_id,
 296                            struct dsw_queue_flow *paused_qfs,
 297                            uint8_t qfs_len)
 298{
 299        struct dsw_ctl_msg cfm = {
 300                .type = DSW_CTL_CFM,
 301                .originating_port_id = port->id
 302        };
 303
 304        /* There might be already-scheduled events belonging to the
 305         * paused flow in the output buffers.
 306         */
 307        dsw_port_flush_out_buffers(dsw, port);
 308
 309        dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
 310
 311        /* Make sure any stores to the original port's in_ring is seen
 312         * before the ctl message.
 313         */
 314        rte_smp_wmb();
 315
 316        dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
 317}
 318
 319struct dsw_queue_flow_burst {
 320        struct dsw_queue_flow queue_flow;
 321        uint16_t count;
 322};
 323
 324#define DSW_QF_TO_INT(_qf)                                      \
 325        ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
 326
 327static inline int
 328dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
 329{
 330        const struct dsw_queue_flow *qf_a = v_qf_a;
 331        const struct dsw_queue_flow *qf_b = v_qf_b;
 332
 333        return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
 334}
 335
 336static uint16_t
 337dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
 338                       struct dsw_queue_flow_burst *bursts)
 339{
 340        uint16_t i;
 341        struct dsw_queue_flow_burst *current_burst = NULL;
 342        uint16_t num_bursts = 0;
 343
 344        /* We don't need the stable property, and the list is likely
 345         * large enough for qsort() to outperform dsw_stable_sort(),
 346         * so we use qsort() here.
 347         */
 348        qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
 349
 350        /* arrange the (now-consecutive) events into bursts */
 351        for (i = 0; i < qfs_len; i++) {
 352                if (i == 0 ||
 353                    dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
 354                        current_burst = &bursts[num_bursts];
 355                        current_burst->queue_flow = qfs[i];
 356                        current_burst->count = 0;
 357                        num_bursts++;
 358                }
 359                current_burst->count++;
 360        }
 361
 362        return num_bursts;
 363}
 364
 365static bool
 366dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
 367                        int16_t load_limit)
 368{
 369        bool below_limit = false;
 370        uint16_t i;
 371
 372        for (i = 0; i < dsw->num_ports; i++) {
 373                int16_t measured_load =
 374                        __atomic_load_n(&dsw->ports[i].load, __ATOMIC_RELAXED);
 375                int32_t immigration_load =
 376                        __atomic_load_n(&dsw->ports[i].immigration_load,
 377                                        __ATOMIC_RELAXED);
 378                int32_t load = measured_load + immigration_load;
 379
 380                load = RTE_MIN(load, DSW_MAX_LOAD);
 381
 382                if (load < load_limit)
 383                        below_limit = true;
 384                port_loads[i] = load;
 385        }
 386        return below_limit;
 387}
 388
 389static int16_t
 390dsw_flow_load(uint16_t num_events, int16_t port_load)
 391{
 392        return ((int32_t)port_load * (int32_t)num_events) /
 393                DSW_MAX_EVENTS_RECORDED;
 394}
 395
 396static int16_t
 397dsw_evaluate_migration(int16_t source_load, int16_t target_load,
 398                       int16_t flow_load)
 399{
 400        int32_t res_target_load;
 401        int32_t imbalance;
 402
 403        if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
 404                return -1;
 405
 406        imbalance = source_load - target_load;
 407
 408        if (imbalance < DSW_REBALANCE_THRESHOLD)
 409                return -1;
 410
 411        res_target_load = target_load + flow_load;
 412
 413        /* If the estimated load of the target port will be higher
 414         * than the source port's load, it doesn't make sense to move
 415         * the flow.
 416         */
 417        if (res_target_load > source_load)
 418                return -1;
 419
 420        /* The more idle the target will be, the better. This will
 421         * make migration prefer moving smaller flows, and flows to
 422         * lightly loaded ports.
 423         */
 424        return DSW_MAX_LOAD - res_target_load;
 425}
 426
 427static bool
 428dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
 429{
 430        struct dsw_queue *queue = &dsw->queues[queue_id];
 431        uint16_t i;
 432
 433        for (i = 0; i < queue->num_serving_ports; i++)
 434                if (queue->serving_ports[i] == port_id)
 435                        return true;
 436
 437        return false;
 438}
 439
 440static bool
 441dsw_select_emigration_target(struct dsw_evdev *dsw,
 442                            struct dsw_queue_flow_burst *bursts,
 443                            uint16_t num_bursts, uint8_t source_port_id,
 444                            int16_t *port_loads, uint16_t num_ports,
 445                            uint8_t *target_port_ids,
 446                            struct dsw_queue_flow *target_qfs,
 447                            uint8_t *targets_len)
 448{
 449        int16_t source_port_load = port_loads[source_port_id];
 450        struct dsw_queue_flow *candidate_qf = NULL;
 451        uint8_t candidate_port_id = 0;
 452        int16_t candidate_weight = -1;
 453        int16_t candidate_flow_load = -1;
 454        uint16_t i;
 455
 456        if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
 457                return false;
 458
 459        for (i = 0; i < num_bursts; i++) {
 460                struct dsw_queue_flow_burst *burst = &bursts[i];
 461                struct dsw_queue_flow *qf = &burst->queue_flow;
 462                int16_t flow_load;
 463                uint16_t port_id;
 464
 465                if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
 466                                             qf->queue_id, qf->flow_hash))
 467                        continue;
 468
 469                flow_load = dsw_flow_load(burst->count, source_port_load);
 470
 471                for (port_id = 0; port_id < num_ports; port_id++) {
 472                        int16_t weight;
 473
 474                        if (port_id == source_port_id)
 475                                continue;
 476
 477                        if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
 478                                continue;
 479
 480                        weight = dsw_evaluate_migration(source_port_load,
 481                                                        port_loads[port_id],
 482                                                        flow_load);
 483
 484                        if (weight > candidate_weight) {
 485                                candidate_qf = qf;
 486                                candidate_port_id = port_id;
 487                                candidate_weight = weight;
 488                                candidate_flow_load = flow_load;
 489                        }
 490                }
 491        }
 492
 493        if (candidate_weight < 0)
 494                return false;
 495
 496        DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
 497                        "flow_hash %d (with flow load %d) for migration "
 498                        "to port %d.\n", candidate_qf->queue_id,
 499                        candidate_qf->flow_hash,
 500                        DSW_LOAD_TO_PERCENT(candidate_flow_load),
 501                        candidate_port_id);
 502
 503        port_loads[candidate_port_id] += candidate_flow_load;
 504        port_loads[source_port_id] -= candidate_flow_load;
 505
 506        target_port_ids[*targets_len] = candidate_port_id;
 507        target_qfs[*targets_len] = *candidate_qf;
 508        (*targets_len)++;
 509
 510        __atomic_add_fetch(&dsw->ports[candidate_port_id].immigration_load,
 511                           candidate_flow_load, __ATOMIC_RELAXED);
 512
 513        return true;
 514}
 515
 516static void
 517dsw_select_emigration_targets(struct dsw_evdev *dsw,
 518                              struct dsw_port *source_port,
 519                              struct dsw_queue_flow_burst *bursts,
 520                              uint16_t num_bursts, int16_t *port_loads)
 521{
 522        struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
 523        uint8_t *target_port_ids = source_port->emigration_target_port_ids;
 524        uint8_t *targets_len = &source_port->emigration_targets_len;
 525        uint16_t i;
 526
 527        for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
 528                bool found;
 529
 530                found = dsw_select_emigration_target(dsw, bursts, num_bursts,
 531                                                     source_port->id,
 532                                                     port_loads, dsw->num_ports,
 533                                                     target_port_ids,
 534                                                     target_qfs,
 535                                                     targets_len);
 536                if (!found)
 537                        break;
 538        }
 539
 540        if (*targets_len == 0)
 541                DSW_LOG_DP_PORT(DEBUG, source_port->id,
 542                                "For the %d flows considered, no target port "
 543                                "was found.\n", num_bursts);
 544}
 545
 546static uint8_t
 547dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
 548{
 549        struct dsw_queue *queue = &dsw->queues[queue_id];
 550        uint8_t port_id;
 551
 552        if (queue->num_serving_ports > 1)
 553                port_id = queue->flow_to_port_map[flow_hash];
 554        else
 555                /* A single-link queue, or atomic/ordered/parallel but
 556                 * with just a single serving port.
 557                 */
 558                port_id = queue->serving_ports[0];
 559
 560        DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
 561                   "to port %d.\n", queue_id, flow_hash, port_id);
 562
 563        return port_id;
 564}
 565
 566static void
 567dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
 568                           uint8_t dest_port_id)
 569{
 570        struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
 571        uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
 572        struct rte_event *buffer = source_port->out_buffer[dest_port_id];
 573        uint16_t enqueued = 0;
 574
 575        if (*buffer_len == 0)
 576                return;
 577
 578        /* The rings are dimensioned to fit all in-flight events (even
 579         * on a single ring), so looping will work.
 580         */
 581        do {
 582                enqueued +=
 583                        rte_event_ring_enqueue_burst(dest_port->in_ring,
 584                                                     buffer+enqueued,
 585                                                     *buffer_len-enqueued,
 586                                                     NULL);
 587        } while (unlikely(enqueued != *buffer_len));
 588
 589        (*buffer_len) = 0;
 590}
 591
 592static uint16_t
 593dsw_port_get_parallel_flow_id(struct dsw_port *port)
 594{
 595        uint16_t flow_id = port->next_parallel_flow_id;
 596
 597        port->next_parallel_flow_id =
 598                (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
 599
 600        return flow_id;
 601}
 602
 603static void
 604dsw_port_buffer_paused(struct dsw_port *port,
 605                       const struct rte_event *paused_event)
 606{
 607        port->paused_events[port->paused_events_len] = *paused_event;
 608        port->paused_events_len++;
 609}
 610
 611static void
 612dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
 613                           uint8_t dest_port_id, const struct rte_event *event)
 614{
 615        struct rte_event *buffer = source_port->out_buffer[dest_port_id];
 616        uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
 617
 618        if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
 619                dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
 620
 621        buffer[*buffer_len] = *event;
 622
 623        (*buffer_len)++;
 624}
 625
 626#define DSW_FLOW_ID_BITS (24)
 627static uint16_t
 628dsw_flow_id_hash(uint32_t flow_id)
 629{
 630        uint16_t hash = 0;
 631        uint16_t offset = 0;
 632
 633        do {
 634                hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
 635                offset += DSW_MAX_FLOWS_BITS;
 636        } while (offset < DSW_FLOW_ID_BITS);
 637
 638        return hash;
 639}
 640
 641static void
 642dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
 643                         struct rte_event event)
 644{
 645        uint8_t dest_port_id;
 646
 647        event.flow_id = dsw_port_get_parallel_flow_id(source_port);
 648
 649        dest_port_id = dsw_schedule(dsw, event.queue_id,
 650                                    dsw_flow_id_hash(event.flow_id));
 651
 652        dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
 653}
 654
 655static void
 656dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
 657                      const struct rte_event *event)
 658{
 659        uint16_t flow_hash;
 660        uint8_t dest_port_id;
 661
 662        if (unlikely(dsw->queues[event->queue_id].schedule_type ==
 663                     RTE_SCHED_TYPE_PARALLEL)) {
 664                dsw_port_buffer_parallel(dsw, source_port, *event);
 665                return;
 666        }
 667
 668        flow_hash = dsw_flow_id_hash(event->flow_id);
 669
 670        if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
 671                                             flow_hash))) {
 672                dsw_port_buffer_paused(source_port, event);
 673                return;
 674        }
 675
 676        dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
 677
 678        dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
 679}
 680
 681static void
 682dsw_port_flush_paused_events(struct dsw_evdev *dsw,
 683                             struct dsw_port *source_port,
 684                             const struct dsw_queue_flow *qf)
 685{
 686        uint16_t paused_events_len = source_port->paused_events_len;
 687        struct rte_event paused_events[paused_events_len];
 688        uint8_t dest_port_id;
 689        uint16_t i;
 690
 691        if (paused_events_len == 0)
 692                return;
 693
 694        if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
 695                return;
 696
 697        rte_memcpy(paused_events, source_port->paused_events,
 698                   paused_events_len * sizeof(struct rte_event));
 699
 700        source_port->paused_events_len = 0;
 701
 702        dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
 703
 704        for (i = 0; i < paused_events_len; i++) {
 705                struct rte_event *event = &paused_events[i];
 706                uint16_t flow_hash;
 707
 708                flow_hash = dsw_flow_id_hash(event->flow_id);
 709
 710                if (event->queue_id == qf->queue_id &&
 711                    flow_hash == qf->flow_hash)
 712                        dsw_port_buffer_non_paused(dsw, source_port,
 713                                                   dest_port_id, event);
 714                else
 715                        dsw_port_buffer_paused(source_port, event);
 716        }
 717}
 718
 719static void
 720dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
 721{
 722        uint64_t flow_migration_latency;
 723
 724        flow_migration_latency =
 725                (rte_get_timer_cycles() - port->emigration_start);
 726        port->emigration_latency += (flow_migration_latency * finished);
 727        port->emigrations += finished;
 728}
 729
 730static void
 731dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
 732                        uint8_t schedule_type)
 733{
 734        uint8_t i;
 735        struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
 736        uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
 737        uint8_t left_qfs_len = 0;
 738        uint8_t finished;
 739
 740        for (i = 0; i < port->emigration_targets_len; i++) {
 741                struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
 742                uint8_t queue_id = qf->queue_id;
 743                uint8_t queue_schedule_type =
 744                        dsw->queues[queue_id].schedule_type;
 745                uint16_t flow_hash = qf->flow_hash;
 746
 747                if (queue_schedule_type != schedule_type) {
 748                        left_port_ids[left_qfs_len] =
 749                                port->emigration_target_port_ids[i];
 750                        left_qfs[left_qfs_len] = *qf;
 751                        left_qfs_len++;
 752                        continue;
 753                }
 754
 755                DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
 756                                "queue_id %d flow_hash %d.\n", queue_id,
 757                                flow_hash);
 758
 759                if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
 760                        dsw_port_remove_paused_flow(port, qf);
 761                        dsw_port_flush_paused_events(dsw, port, qf);
 762                }
 763        }
 764
 765        finished = port->emigration_targets_len - left_qfs_len;
 766
 767        if (finished > 0)
 768                dsw_port_emigration_stats(port, finished);
 769
 770        for (i = 0; i < left_qfs_len; i++) {
 771                port->emigration_target_port_ids[i] = left_port_ids[i];
 772                port->emigration_target_qfs[i] = left_qfs[i];
 773        }
 774        port->emigration_targets_len = left_qfs_len;
 775
 776        if (port->emigration_targets_len == 0) {
 777                port->migration_state = DSW_MIGRATION_STATE_IDLE;
 778                port->seen_events_len = 0;
 779        }
 780}
 781
 782static void
 783dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
 784                             struct dsw_port *source_port)
 785{
 786        uint8_t i;
 787
 788        for (i = 0; i < source_port->emigration_targets_len; i++) {
 789                struct dsw_queue_flow *qf =
 790                        &source_port->emigration_target_qfs[i];
 791                uint8_t queue_id = qf->queue_id;
 792
 793                if (dsw->queues[queue_id].schedule_type ==
 794                    RTE_SCHED_TYPE_PARALLEL) {
 795                        uint8_t dest_port_id =
 796                                source_port->emigration_target_port_ids[i];
 797                        uint16_t flow_hash = qf->flow_hash;
 798
 799                        /* Single byte-sized stores are always atomic. */
 800                        dsw->queues[queue_id].flow_to_port_map[flow_hash] =
 801                                dest_port_id;
 802                }
 803        }
 804
 805        rte_smp_wmb();
 806
 807        dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
 808}
 809
 810static void
 811dsw_port_consider_emigration(struct dsw_evdev *dsw,
 812                             struct dsw_port *source_port,
 813                             uint64_t now)
 814{
 815        bool any_port_below_limit;
 816        struct dsw_queue_flow *seen_events = source_port->seen_events;
 817        uint16_t seen_events_len = source_port->seen_events_len;
 818        struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
 819        uint16_t num_bursts;
 820        int16_t source_port_load;
 821        int16_t port_loads[dsw->num_ports];
 822
 823        if (now < source_port->next_emigration)
 824                return;
 825
 826        if (dsw->num_ports == 1)
 827                return;
 828
 829        if (seen_events_len < DSW_MAX_EVENTS_RECORDED)
 830                return;
 831
 832        DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
 833
 834        /* Randomize interval to avoid having all threads considering
 835         * emigration at the same in point in time, which might lead
 836         * to all choosing the same target port.
 837         */
 838        source_port->next_emigration = now +
 839                source_port->migration_interval / 2 +
 840                rte_rand() % source_port->migration_interval;
 841
 842        if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
 843                DSW_LOG_DP_PORT(DEBUG, source_port->id,
 844                                "Emigration already in progress.\n");
 845                return;
 846        }
 847
 848        /* For simplicity, avoid migration in the unlikely case there
 849         * is still events to consume in the in_buffer (from the last
 850         * emigration).
 851         */
 852        if (source_port->in_buffer_len > 0) {
 853                DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
 854                                "events in the input buffer.\n");
 855                return;
 856        }
 857
 858        source_port_load =
 859                __atomic_load_n(&source_port->load, __ATOMIC_RELAXED);
 860        if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
 861                DSW_LOG_DP_PORT(DEBUG, source_port->id,
 862                      "Load %d is below threshold level %d.\n",
 863                      DSW_LOAD_TO_PERCENT(source_port_load),
 864                      DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
 865                return;
 866        }
 867
 868        /* Avoid starting any expensive operations (sorting etc), in
 869         * case of a scenario with all ports above the load limit.
 870         */
 871        any_port_below_limit =
 872                dsw_retrieve_port_loads(dsw, port_loads,
 873                                        DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
 874        if (!any_port_below_limit) {
 875                DSW_LOG_DP_PORT(DEBUG, source_port->id,
 876                                "Candidate target ports are all too highly "
 877                                "loaded.\n");
 878                return;
 879        }
 880
 881        num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
 882                                            bursts);
 883
 884        /* For non-big-little systems, there's no point in moving the
 885         * only (known) flow.
 886         */
 887        if (num_bursts < 2) {
 888                DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
 889                                "queue_id %d flow_hash %d has been seen.\n",
 890                                bursts[0].queue_flow.queue_id,
 891                                bursts[0].queue_flow.flow_hash);
 892                return;
 893        }
 894
 895        dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
 896                                      port_loads);
 897
 898        if (source_port->emigration_targets_len == 0)
 899                return;
 900
 901        source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
 902        source_port->emigration_start = rte_get_timer_cycles();
 903
 904        /* No need to go through the whole pause procedure for
 905         * parallel queues, since atomic/ordered semantics need not to
 906         * be maintained.
 907         */
 908        dsw_port_move_parallel_flows(dsw, source_port);
 909
 910        /* All flows were on PARALLEL queues. */
 911        if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
 912                return;
 913
 914        /* There might be 'loopback' events already scheduled in the
 915         * output buffers.
 916         */
 917        dsw_port_flush_out_buffers(dsw, source_port);
 918
 919        dsw_port_add_paused_flows(source_port,
 920                                  source_port->emigration_target_qfs,
 921                                  source_port->emigration_targets_len);
 922
 923        dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
 924                               source_port->emigration_target_qfs,
 925                               source_port->emigration_targets_len);
 926        source_port->cfm_cnt = 0;
 927}
 928
 929static void
 930dsw_port_flush_paused_events(struct dsw_evdev *dsw,
 931                             struct dsw_port *source_port,
 932                             const struct dsw_queue_flow *qf);
 933
 934static void
 935dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
 936                              uint8_t originating_port_id,
 937                              struct dsw_queue_flow *paused_qfs,
 938                              uint8_t qfs_len)
 939{
 940        uint16_t i;
 941        struct dsw_ctl_msg cfm = {
 942                .type = DSW_CTL_CFM,
 943                .originating_port_id = port->id
 944        };
 945
 946        dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
 947
 948        rte_smp_rmb();
 949
 950        dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
 951
 952        for (i = 0; i < qfs_len; i++) {
 953                struct dsw_queue_flow *qf = &paused_qfs[i];
 954
 955                if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
 956                        port->immigrations++;
 957
 958                dsw_port_flush_paused_events(dsw, port, qf);
 959        }
 960}
 961
 962#define FORWARD_BURST_SIZE (32)
 963
 964static void
 965dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
 966                                struct rte_event_ring *dest_ring,
 967                                uint8_t queue_id,
 968                                uint16_t flow_hash)
 969{
 970        uint16_t events_left;
 971
 972        /* Control ring message should been seen before the ring count
 973         * is read on the port's in_ring.
 974         */
 975        rte_smp_rmb();
 976
 977        events_left = rte_event_ring_count(source_port->in_ring);
 978
 979        while (events_left > 0) {
 980                uint16_t in_burst_size =
 981                        RTE_MIN(FORWARD_BURST_SIZE, events_left);
 982                struct rte_event in_burst[in_burst_size];
 983                uint16_t in_len;
 984                uint16_t i;
 985
 986                in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
 987                                                      in_burst,
 988                                                      in_burst_size, NULL);
 989                /* No need to care about bursting forwarded events (to
 990                 * the destination port's in_ring), since migration
 991                 * doesn't happen very often, and also the majority of
 992                 * the dequeued events will likely *not* be forwarded.
 993                 */
 994                for (i = 0; i < in_len; i++) {
 995                        struct rte_event *e = &in_burst[i];
 996                        if (e->queue_id == queue_id &&
 997                            dsw_flow_id_hash(e->flow_id) == flow_hash) {
 998                                while (rte_event_ring_enqueue_burst(dest_ring,
 999                                                                    e, 1,
1000                                                                    NULL) != 1)
1001                                        rte_pause();
1002                        } else {
1003                                uint16_t last_idx = source_port->in_buffer_len;
1004                                source_port->in_buffer[last_idx] = *e;
1005                                source_port->in_buffer_len++;
1006                        }
1007                }
1008
1009                events_left -= in_len;
1010        }
1011}
1012
1013static void
1014dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
1015                               struct dsw_port *source_port)
1016{
1017        uint8_t i;
1018
1019        dsw_port_flush_out_buffers(dsw, source_port);
1020
1021        rte_smp_wmb();
1022
1023        for (i = 0; i < source_port->emigration_targets_len; i++) {
1024                struct dsw_queue_flow *qf =
1025                        &source_port->emigration_target_qfs[i];
1026                uint8_t dest_port_id =
1027                        source_port->emigration_target_port_ids[i];
1028                struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1029
1030                dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1031                        dest_port_id;
1032
1033                dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
1034                                                qf->queue_id, qf->flow_hash);
1035        }
1036
1037        /* Flow table update and migration destination port's enqueues
1038         * must be seen before the control message.
1039         */
1040        rte_smp_wmb();
1041
1042        dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1043                               source_port->emigration_target_qfs,
1044                               source_port->emigration_targets_len);
1045        source_port->cfm_cnt = 0;
1046        source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1047}
1048
1049static void
1050dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1051{
1052        port->cfm_cnt++;
1053
1054        if (port->cfm_cnt == (dsw->num_ports-1)) {
1055                switch (port->migration_state) {
1056                case DSW_MIGRATION_STATE_PAUSING:
1057                        DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
1058                                        "migration state.\n");
1059                        port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
1060                        break;
1061                case DSW_MIGRATION_STATE_UNPAUSING:
1062                        dsw_port_end_emigration(dsw, port,
1063                                                RTE_SCHED_TYPE_ATOMIC);
1064                        break;
1065                default:
1066                        RTE_ASSERT(0);
1067                        break;
1068                }
1069        }
1070}
1071
1072static void
1073dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1074{
1075        struct dsw_ctl_msg msg;
1076
1077        if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1078                switch (msg.type) {
1079                case DSW_CTL_PAUS_REQ:
1080                        dsw_port_handle_pause_flows(dsw, port,
1081                                                    msg.originating_port_id,
1082                                                    msg.qfs, msg.qfs_len);
1083                        break;
1084                case DSW_CTL_UNPAUS_REQ:
1085                        dsw_port_handle_unpause_flows(dsw, port,
1086                                                      msg.originating_port_id,
1087                                                      msg.qfs, msg.qfs_len);
1088                        break;
1089                case DSW_CTL_CFM:
1090                        dsw_port_handle_confirm(dsw, port);
1091                        break;
1092                }
1093        }
1094}
1095
1096static void
1097dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1098{
1099        /* To pull the control ring reasonbly often on busy ports,
1100         * each dequeued/enqueued event is considered an 'op' too.
1101         */
1102        port->ops_since_bg_task += (num_events+1);
1103}
1104
1105static void
1106dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1107{
1108        if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
1109                     port->pending_releases == 0))
1110                dsw_port_move_emigrating_flows(dsw, port);
1111
1112        /* Polling the control ring is relatively inexpensive, and
1113         * polling it often helps bringing down migration latency, so
1114         * do this for every iteration.
1115         */
1116        dsw_port_ctl_process(dsw, port);
1117
1118        /* To avoid considering migration and flushing output buffers
1119         * on every dequeue/enqueue call, the scheduler only performs
1120         * such 'background' tasks every nth
1121         * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1122         */
1123        if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1124                uint64_t now;
1125
1126                now = rte_get_timer_cycles();
1127
1128                port->last_bg = now;
1129
1130                /* Logic to avoid having events linger in the output
1131                 * buffer too long.
1132                 */
1133                dsw_port_flush_out_buffers(dsw, port);
1134
1135                dsw_port_consider_load_update(port, now);
1136
1137                dsw_port_consider_emigration(dsw, port, now);
1138
1139                port->ops_since_bg_task = 0;
1140        }
1141}
1142
1143static void
1144dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1145{
1146        uint16_t dest_port_id;
1147
1148        for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1149                dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1150}
1151
1152uint16_t
1153dsw_event_enqueue(void *port, const struct rte_event *ev)
1154{
1155        return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1156}
1157
1158static __rte_always_inline uint16_t
1159dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1160                                const struct rte_event events[],
1161                                uint16_t events_len, bool op_types_known,
1162                                uint16_t num_new, uint16_t num_release,
1163                                uint16_t num_non_release)
1164{
1165        struct dsw_evdev *dsw = source_port->dsw;
1166        bool enough_credits;
1167        uint16_t i;
1168
1169        DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1170                        "events to port %d.\n", events_len, source_port->id);
1171
1172        dsw_port_bg_process(dsw, source_port);
1173
1174        /* XXX: For performance (=ring efficiency) reasons, the
1175         * scheduler relies on internal non-ring buffers instead of
1176         * immediately sending the event to the destination ring. For
1177         * a producer that doesn't intend to produce or consume any
1178         * more events, the scheduler provides a way to flush the
1179         * buffer, by means of doing an enqueue of zero events. In
1180         * addition, a port cannot be left "unattended" (e.g. unused)
1181         * for long periods of time, since that would stall
1182         * migration. Eventdev API extensions to provide a cleaner way
1183         * to archieve both of these functions should be
1184         * considered.
1185         */
1186        if (unlikely(events_len == 0)) {
1187                dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1188                dsw_port_flush_out_buffers(dsw, source_port);
1189                return 0;
1190        }
1191
1192        dsw_port_note_op(source_port, events_len);
1193
1194        if (!op_types_known)
1195                for (i = 0; i < events_len; i++) {
1196                        switch (events[i].op) {
1197                        case RTE_EVENT_OP_RELEASE:
1198                                num_release++;
1199                                break;
1200                        case RTE_EVENT_OP_NEW:
1201                                num_new++;
1202                                /* Falls through. */
1203                        default:
1204                                num_non_release++;
1205                                break;
1206                        }
1207                }
1208
1209        /* Technically, we could allow the non-new events up to the
1210         * first new event in the array into the system, but for
1211         * simplicity reasons, we deny the whole burst if the port is
1212         * above the water mark.
1213         */
1214        if (unlikely(num_new > 0 &&
1215                     __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED) >
1216                     source_port->new_event_threshold))
1217                return 0;
1218
1219        enough_credits = dsw_port_acquire_credits(dsw, source_port,
1220                                                  num_non_release);
1221        if (unlikely(!enough_credits))
1222                return 0;
1223
1224        source_port->pending_releases -= num_release;
1225
1226        dsw_port_enqueue_stats(source_port, num_new,
1227                               num_non_release-num_new, num_release);
1228
1229        for (i = 0; i < events_len; i++) {
1230                const struct rte_event *event = &events[i];
1231
1232                if (likely(num_release == 0 ||
1233                           event->op != RTE_EVENT_OP_RELEASE))
1234                        dsw_port_buffer_event(dsw, source_port, event);
1235                dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1236        }
1237
1238        DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1239                        "accepted.\n", num_non_release);
1240
1241        return (num_non_release + num_release);
1242}
1243
1244uint16_t
1245dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1246                        uint16_t events_len)
1247{
1248        struct dsw_port *source_port = port;
1249
1250        if (unlikely(events_len > source_port->enqueue_depth))
1251                events_len = source_port->enqueue_depth;
1252
1253        return dsw_event_enqueue_burst_generic(source_port, events,
1254                                               events_len, false, 0, 0, 0);
1255}
1256
1257uint16_t
1258dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1259                            uint16_t events_len)
1260{
1261        struct dsw_port *source_port = port;
1262
1263        if (unlikely(events_len > source_port->enqueue_depth))
1264                events_len = source_port->enqueue_depth;
1265
1266        return dsw_event_enqueue_burst_generic(source_port, events,
1267                                               events_len, true, events_len,
1268                                               0, events_len);
1269}
1270
1271uint16_t
1272dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1273                                uint16_t events_len)
1274{
1275        struct dsw_port *source_port = port;
1276
1277        if (unlikely(events_len > source_port->enqueue_depth))
1278                events_len = source_port->enqueue_depth;
1279
1280        return dsw_event_enqueue_burst_generic(source_port, events,
1281                                               events_len, true, 0, 0,
1282                                               events_len);
1283}
1284
1285uint16_t
1286dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1287{
1288        return dsw_event_dequeue_burst(port, events, 1, wait);
1289}
1290
1291static void
1292dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1293                            uint16_t num)
1294{
1295        uint16_t i;
1296
1297        dsw_port_dequeue_stats(port, num);
1298
1299        for (i = 0; i < num; i++) {
1300                uint16_t l_idx = port->seen_events_idx;
1301                struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1302                struct rte_event *event = &events[i];
1303                qf->queue_id = event->queue_id;
1304                qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1305
1306                port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1307
1308                dsw_port_queue_dequeued_stats(port, event->queue_id);
1309        }
1310
1311        if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1312                port->seen_events_len =
1313                        RTE_MIN(port->seen_events_len + num,
1314                                DSW_MAX_EVENTS_RECORDED);
1315}
1316
1317#ifdef DSW_SORT_DEQUEUED
1318
1319#define DSW_EVENT_TO_INT(_event)                                \
1320        ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1321
1322static inline int
1323dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1324{
1325        const struct rte_event *event_a = v_event_a;
1326        const struct rte_event *event_b = v_event_b;
1327
1328        return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1329}
1330#endif
1331
1332static uint16_t
1333dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1334                       uint16_t num)
1335{
1336        if (unlikely(port->in_buffer_len > 0)) {
1337                uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1338
1339                rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1340                           dequeued * sizeof(struct rte_event));
1341
1342                port->in_buffer_start += dequeued;
1343                port->in_buffer_len -= dequeued;
1344
1345                if (port->in_buffer_len == 0)
1346                        port->in_buffer_start = 0;
1347
1348                return dequeued;
1349        }
1350
1351        return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1352}
1353
1354uint16_t
1355dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1356                        uint64_t wait __rte_unused)
1357{
1358        struct dsw_port *source_port = port;
1359        struct dsw_evdev *dsw = source_port->dsw;
1360        uint16_t dequeued;
1361
1362        source_port->pending_releases = 0;
1363
1364        dsw_port_bg_process(dsw, source_port);
1365
1366        if (unlikely(num > source_port->dequeue_depth))
1367                num = source_port->dequeue_depth;
1368
1369        dequeued = dsw_port_dequeue_burst(source_port, events, num);
1370
1371        source_port->pending_releases = dequeued;
1372
1373        dsw_port_load_record(source_port, dequeued);
1374
1375        dsw_port_note_op(source_port, dequeued);
1376
1377        if (dequeued > 0) {
1378                DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1379                                dequeued);
1380
1381                dsw_port_return_credits(dsw, source_port, dequeued);
1382
1383                /* One potential optimization one might think of is to
1384                 * add a migration state (prior to 'pausing'), and
1385                 * only record seen events when the port is in this
1386                 * state (and transit to 'pausing' when enough events
1387                 * have been gathered). However, that schema doesn't
1388                 * seem to improve performance.
1389                 */
1390                dsw_port_record_seen_events(port, events, dequeued);
1391        } else /* Zero-size dequeue means a likely idle port, and thus
1392                * we can afford trading some efficiency for a slightly
1393                * reduced event wall-time latency.
1394                */
1395                dsw_port_flush_out_buffers(dsw, port);
1396
1397#ifdef DSW_SORT_DEQUEUED
1398        dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1399#endif
1400
1401        return dequeued;
1402}
1403