1
2
3
4
5#include "dsw_evdev.h"
6
7#ifdef DSW_SORT_DEQUEUED
8#include "dsw_sort.h"
9#endif
10
11#include <stdbool.h>
12#include <string.h>
13
14#include <rte_cycles.h>
15#include <rte_memcpy.h>
16#include <rte_random.h>
17
18static bool
19dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
20 int32_t credits)
21{
22 int32_t inflight_credits = port->inflight_credits;
23 int32_t missing_credits = credits - inflight_credits;
24 int32_t total_on_loan;
25 int32_t available;
26 int32_t acquired_credits;
27 int32_t new_total_on_loan;
28
29 if (likely(missing_credits <= 0)) {
30 port->inflight_credits -= credits;
31 return true;
32 }
33
34 total_on_loan =
35 __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED);
36 available = dsw->max_inflight - total_on_loan;
37 acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38
39 if (available < acquired_credits)
40 return false;
41
42
43
44
45
46 new_total_on_loan =
47 __atomic_add_fetch(&dsw->credits_on_loan, acquired_credits,
48 __ATOMIC_RELAXED);
49
50 if (unlikely(new_total_on_loan > dsw->max_inflight)) {
51
52 __atomic_sub_fetch(&dsw->credits_on_loan, acquired_credits,
53 __ATOMIC_RELAXED);
54 return false;
55 }
56
57 DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
58 acquired_credits);
59
60 port->inflight_credits += acquired_credits;
61 port->inflight_credits -= credits;
62
63 return true;
64}
65
66static void
67dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
68 int32_t credits)
69{
70 port->inflight_credits += credits;
71
72 if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
73 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
74 int32_t return_credits =
75 port->inflight_credits - leave_credits;
76
77 port->inflight_credits = leave_credits;
78
79 __atomic_sub_fetch(&dsw->credits_on_loan, return_credits,
80 __ATOMIC_RELAXED);
81
82 DSW_LOG_DP_PORT(DEBUG, port->id,
83 "Returned %d tokens to pool.\n",
84 return_credits);
85 }
86}
87
88static void
89dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
90 uint16_t num_forward, uint16_t num_release)
91{
92 port->new_enqueued += num_new;
93 port->forward_enqueued += num_forward;
94 port->release_enqueued += num_release;
95}
96
97static void
98dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
99{
100 source_port->queue_enqueued[queue_id]++;
101}
102
103static void
104dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
105{
106 port->dequeued += num;
107}
108
109static void
110dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
111{
112 source_port->queue_dequeued[queue_id]++;
113}
114
115static void
116dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
117{
118 if (dequeued > 0 && port->busy_start == 0)
119
120 port->busy_start = rte_get_timer_cycles();
121 else if (dequeued == 0 && port->busy_start > 0) {
122
123 uint64_t work_period =
124 rte_get_timer_cycles() - port->busy_start;
125 port->busy_cycles += work_period;
126 port->busy_start = 0;
127 }
128}
129
130static int16_t
131dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
132{
133 uint64_t passed = now - port->measurement_start;
134 uint64_t busy_cycles = port->busy_cycles;
135
136 if (port->busy_start > 0) {
137 busy_cycles += (now - port->busy_start);
138 port->busy_start = now;
139 }
140
141 int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
142
143 port->measurement_start = now;
144 port->busy_cycles = 0;
145
146 port->total_busy_cycles += busy_cycles;
147
148 return load;
149}
150
151static void
152dsw_port_load_update(struct dsw_port *port, uint64_t now)
153{
154 int16_t old_load;
155 int16_t period_load;
156 int16_t new_load;
157
158 old_load = __atomic_load_n(&port->load, __ATOMIC_RELAXED);
159
160 period_load = dsw_port_load_close_period(port, now);
161
162 new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
163 (DSW_OLD_LOAD_WEIGHT+1);
164
165 __atomic_store_n(&port->load, new_load, __ATOMIC_RELAXED);
166
167
168
169
170 __atomic_store_n(&port->immigration_load, 0, __ATOMIC_RELAXED);
171}
172
173static void
174dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
175{
176 if (now < port->next_load_update)
177 return;
178
179 port->next_load_update = now + port->load_update_interval;
180
181 dsw_port_load_update(port, now);
182}
183
184static void
185dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
186{
187
188 while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
189 rte_pause();
190}
191
192static int
193dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
194{
195 return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
196}
197
198static void
199dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
200 uint8_t type, struct dsw_queue_flow *qfs,
201 uint8_t qfs_len)
202{
203 uint16_t port_id;
204 struct dsw_ctl_msg msg = {
205 .type = type,
206 .originating_port_id = source_port->id,
207 .qfs_len = qfs_len
208 };
209
210 memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
211
212 for (port_id = 0; port_id < dsw->num_ports; port_id++)
213 if (port_id != source_port->id)
214 dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
215}
216
217static __rte_always_inline bool
218dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
219 uint8_t queue_id, uint16_t flow_hash)
220{
221 uint16_t i;
222
223 for (i = 0; i < qfs_len; i++)
224 if (qfs[i].queue_id == queue_id &&
225 qfs[i].flow_hash == flow_hash)
226 return true;
227
228 return false;
229}
230
231static __rte_always_inline bool
232dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
233 uint16_t flow_hash)
234{
235 return dsw_is_queue_flow_in_ary(port->paused_flows,
236 port->paused_flows_len,
237 queue_id, flow_hash);
238}
239
240static void
241dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
242 uint8_t qfs_len)
243{
244 uint8_t i;
245
246 for (i = 0; i < qfs_len; i++) {
247 struct dsw_queue_flow *qf = &qfs[i];
248
249 DSW_LOG_DP_PORT(DEBUG, port->id,
250 "Pausing queue_id %d flow_hash %d.\n",
251 qf->queue_id, qf->flow_hash);
252
253 port->paused_flows[port->paused_flows_len] = *qf;
254 port->paused_flows_len++;
255 };
256}
257
258static void
259dsw_port_remove_paused_flow(struct dsw_port *port,
260 struct dsw_queue_flow *target_qf)
261{
262 uint16_t i;
263
264 for (i = 0; i < port->paused_flows_len; i++) {
265 struct dsw_queue_flow *qf = &port->paused_flows[i];
266
267 if (qf->queue_id == target_qf->queue_id &&
268 qf->flow_hash == target_qf->flow_hash) {
269 uint16_t last_idx = port->paused_flows_len-1;
270 if (i != last_idx)
271 port->paused_flows[i] =
272 port->paused_flows[last_idx];
273 port->paused_flows_len--;
274 break;
275 }
276 }
277}
278
279static void
280dsw_port_remove_paused_flows(struct dsw_port *port,
281 struct dsw_queue_flow *qfs, uint8_t qfs_len)
282{
283 uint8_t i;
284
285 for (i = 0; i < qfs_len; i++)
286 dsw_port_remove_paused_flow(port, &qfs[i]);
287
288}
289
290static void
291dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
292
293static void
294dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
295 uint8_t originating_port_id,
296 struct dsw_queue_flow *paused_qfs,
297 uint8_t qfs_len)
298{
299 struct dsw_ctl_msg cfm = {
300 .type = DSW_CTL_CFM,
301 .originating_port_id = port->id
302 };
303
304
305
306
307 dsw_port_flush_out_buffers(dsw, port);
308
309 dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
310
311
312
313
314 rte_smp_wmb();
315
316 dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
317}
318
319struct dsw_queue_flow_burst {
320 struct dsw_queue_flow queue_flow;
321 uint16_t count;
322};
323
324#define DSW_QF_TO_INT(_qf) \
325 ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
326
327static inline int
328dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
329{
330 const struct dsw_queue_flow *qf_a = v_qf_a;
331 const struct dsw_queue_flow *qf_b = v_qf_b;
332
333 return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
334}
335
336static uint16_t
337dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
338 struct dsw_queue_flow_burst *bursts)
339{
340 uint16_t i;
341 struct dsw_queue_flow_burst *current_burst = NULL;
342 uint16_t num_bursts = 0;
343
344
345
346
347
348 qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
349
350
351 for (i = 0; i < qfs_len; i++) {
352 if (i == 0 ||
353 dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) {
354 current_burst = &bursts[num_bursts];
355 current_burst->queue_flow = qfs[i];
356 current_burst->count = 0;
357 num_bursts++;
358 }
359 current_burst->count++;
360 }
361
362 return num_bursts;
363}
364
365static bool
366dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
367 int16_t load_limit)
368{
369 bool below_limit = false;
370 uint16_t i;
371
372 for (i = 0; i < dsw->num_ports; i++) {
373 int16_t measured_load =
374 __atomic_load_n(&dsw->ports[i].load, __ATOMIC_RELAXED);
375 int32_t immigration_load =
376 __atomic_load_n(&dsw->ports[i].immigration_load,
377 __ATOMIC_RELAXED);
378 int32_t load = measured_load + immigration_load;
379
380 load = RTE_MIN(load, DSW_MAX_LOAD);
381
382 if (load < load_limit)
383 below_limit = true;
384 port_loads[i] = load;
385 }
386 return below_limit;
387}
388
389static int16_t
390dsw_flow_load(uint16_t num_events, int16_t port_load)
391{
392 return ((int32_t)port_load * (int32_t)num_events) /
393 DSW_MAX_EVENTS_RECORDED;
394}
395
396static int16_t
397dsw_evaluate_migration(int16_t source_load, int16_t target_load,
398 int16_t flow_load)
399{
400 int32_t res_target_load;
401 int32_t imbalance;
402
403 if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
404 return -1;
405
406 imbalance = source_load - target_load;
407
408 if (imbalance < DSW_REBALANCE_THRESHOLD)
409 return -1;
410
411 res_target_load = target_load + flow_load;
412
413
414
415
416
417 if (res_target_load > source_load)
418 return -1;
419
420
421
422
423
424 return DSW_MAX_LOAD - res_target_load;
425}
426
427static bool
428dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
429{
430 struct dsw_queue *queue = &dsw->queues[queue_id];
431 uint16_t i;
432
433 for (i = 0; i < queue->num_serving_ports; i++)
434 if (queue->serving_ports[i] == port_id)
435 return true;
436
437 return false;
438}
439
440static bool
441dsw_select_emigration_target(struct dsw_evdev *dsw,
442 struct dsw_queue_flow_burst *bursts,
443 uint16_t num_bursts, uint8_t source_port_id,
444 int16_t *port_loads, uint16_t num_ports,
445 uint8_t *target_port_ids,
446 struct dsw_queue_flow *target_qfs,
447 uint8_t *targets_len)
448{
449 int16_t source_port_load = port_loads[source_port_id];
450 struct dsw_queue_flow *candidate_qf = NULL;
451 uint8_t candidate_port_id = 0;
452 int16_t candidate_weight = -1;
453 int16_t candidate_flow_load = -1;
454 uint16_t i;
455
456 if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
457 return false;
458
459 for (i = 0; i < num_bursts; i++) {
460 struct dsw_queue_flow_burst *burst = &bursts[i];
461 struct dsw_queue_flow *qf = &burst->queue_flow;
462 int16_t flow_load;
463 uint16_t port_id;
464
465 if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
466 qf->queue_id, qf->flow_hash))
467 continue;
468
469 flow_load = dsw_flow_load(burst->count, source_port_load);
470
471 for (port_id = 0; port_id < num_ports; port_id++) {
472 int16_t weight;
473
474 if (port_id == source_port_id)
475 continue;
476
477 if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
478 continue;
479
480 weight = dsw_evaluate_migration(source_port_load,
481 port_loads[port_id],
482 flow_load);
483
484 if (weight > candidate_weight) {
485 candidate_qf = qf;
486 candidate_port_id = port_id;
487 candidate_weight = weight;
488 candidate_flow_load = flow_load;
489 }
490 }
491 }
492
493 if (candidate_weight < 0)
494 return false;
495
496 DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
497 "flow_hash %d (with flow load %d) for migration "
498 "to port %d.\n", candidate_qf->queue_id,
499 candidate_qf->flow_hash,
500 DSW_LOAD_TO_PERCENT(candidate_flow_load),
501 candidate_port_id);
502
503 port_loads[candidate_port_id] += candidate_flow_load;
504 port_loads[source_port_id] -= candidate_flow_load;
505
506 target_port_ids[*targets_len] = candidate_port_id;
507 target_qfs[*targets_len] = *candidate_qf;
508 (*targets_len)++;
509
510 __atomic_add_fetch(&dsw->ports[candidate_port_id].immigration_load,
511 candidate_flow_load, __ATOMIC_RELAXED);
512
513 return true;
514}
515
516static void
517dsw_select_emigration_targets(struct dsw_evdev *dsw,
518 struct dsw_port *source_port,
519 struct dsw_queue_flow_burst *bursts,
520 uint16_t num_bursts, int16_t *port_loads)
521{
522 struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
523 uint8_t *target_port_ids = source_port->emigration_target_port_ids;
524 uint8_t *targets_len = &source_port->emigration_targets_len;
525 uint16_t i;
526
527 for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
528 bool found;
529
530 found = dsw_select_emigration_target(dsw, bursts, num_bursts,
531 source_port->id,
532 port_loads, dsw->num_ports,
533 target_port_ids,
534 target_qfs,
535 targets_len);
536 if (!found)
537 break;
538 }
539
540 if (*targets_len == 0)
541 DSW_LOG_DP_PORT(DEBUG, source_port->id,
542 "For the %d flows considered, no target port "
543 "was found.\n", num_bursts);
544}
545
546static uint8_t
547dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
548{
549 struct dsw_queue *queue = &dsw->queues[queue_id];
550 uint8_t port_id;
551
552 if (queue->num_serving_ports > 1)
553 port_id = queue->flow_to_port_map[flow_hash];
554 else
555
556
557
558 port_id = queue->serving_ports[0];
559
560 DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
561 "to port %d.\n", queue_id, flow_hash, port_id);
562
563 return port_id;
564}
565
566static void
567dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
568 uint8_t dest_port_id)
569{
570 struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
571 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
572 struct rte_event *buffer = source_port->out_buffer[dest_port_id];
573 uint16_t enqueued = 0;
574
575 if (*buffer_len == 0)
576 return;
577
578
579
580
581 do {
582 enqueued +=
583 rte_event_ring_enqueue_burst(dest_port->in_ring,
584 buffer+enqueued,
585 *buffer_len-enqueued,
586 NULL);
587 } while (unlikely(enqueued != *buffer_len));
588
589 (*buffer_len) = 0;
590}
591
592static uint16_t
593dsw_port_get_parallel_flow_id(struct dsw_port *port)
594{
595 uint16_t flow_id = port->next_parallel_flow_id;
596
597 port->next_parallel_flow_id =
598 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
599
600 return flow_id;
601}
602
603static void
604dsw_port_buffer_paused(struct dsw_port *port,
605 const struct rte_event *paused_event)
606{
607 port->paused_events[port->paused_events_len] = *paused_event;
608 port->paused_events_len++;
609}
610
611static void
612dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
613 uint8_t dest_port_id, const struct rte_event *event)
614{
615 struct rte_event *buffer = source_port->out_buffer[dest_port_id];
616 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
617
618 if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
619 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
620
621 buffer[*buffer_len] = *event;
622
623 (*buffer_len)++;
624}
625
626#define DSW_FLOW_ID_BITS (24)
627static uint16_t
628dsw_flow_id_hash(uint32_t flow_id)
629{
630 uint16_t hash = 0;
631 uint16_t offset = 0;
632
633 do {
634 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
635 offset += DSW_MAX_FLOWS_BITS;
636 } while (offset < DSW_FLOW_ID_BITS);
637
638 return hash;
639}
640
641static void
642dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
643 struct rte_event event)
644{
645 uint8_t dest_port_id;
646
647 event.flow_id = dsw_port_get_parallel_flow_id(source_port);
648
649 dest_port_id = dsw_schedule(dsw, event.queue_id,
650 dsw_flow_id_hash(event.flow_id));
651
652 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
653}
654
655static void
656dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
657 const struct rte_event *event)
658{
659 uint16_t flow_hash;
660 uint8_t dest_port_id;
661
662 if (unlikely(dsw->queues[event->queue_id].schedule_type ==
663 RTE_SCHED_TYPE_PARALLEL)) {
664 dsw_port_buffer_parallel(dsw, source_port, *event);
665 return;
666 }
667
668 flow_hash = dsw_flow_id_hash(event->flow_id);
669
670 if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
671 flow_hash))) {
672 dsw_port_buffer_paused(source_port, event);
673 return;
674 }
675
676 dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
677
678 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
679}
680
681static void
682dsw_port_flush_paused_events(struct dsw_evdev *dsw,
683 struct dsw_port *source_port,
684 const struct dsw_queue_flow *qf)
685{
686 uint16_t paused_events_len = source_port->paused_events_len;
687 struct rte_event paused_events[paused_events_len];
688 uint8_t dest_port_id;
689 uint16_t i;
690
691 if (paused_events_len == 0)
692 return;
693
694 if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
695 return;
696
697 rte_memcpy(paused_events, source_port->paused_events,
698 paused_events_len * sizeof(struct rte_event));
699
700 source_port->paused_events_len = 0;
701
702 dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
703
704 for (i = 0; i < paused_events_len; i++) {
705 struct rte_event *event = &paused_events[i];
706 uint16_t flow_hash;
707
708 flow_hash = dsw_flow_id_hash(event->flow_id);
709
710 if (event->queue_id == qf->queue_id &&
711 flow_hash == qf->flow_hash)
712 dsw_port_buffer_non_paused(dsw, source_port,
713 dest_port_id, event);
714 else
715 dsw_port_buffer_paused(source_port, event);
716 }
717}
718
719static void
720dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
721{
722 uint64_t flow_migration_latency;
723
724 flow_migration_latency =
725 (rte_get_timer_cycles() - port->emigration_start);
726 port->emigration_latency += (flow_migration_latency * finished);
727 port->emigrations += finished;
728}
729
730static void
731dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
732 uint8_t schedule_type)
733{
734 uint8_t i;
735 struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
736 uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
737 uint8_t left_qfs_len = 0;
738 uint8_t finished;
739
740 for (i = 0; i < port->emigration_targets_len; i++) {
741 struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
742 uint8_t queue_id = qf->queue_id;
743 uint8_t queue_schedule_type =
744 dsw->queues[queue_id].schedule_type;
745 uint16_t flow_hash = qf->flow_hash;
746
747 if (queue_schedule_type != schedule_type) {
748 left_port_ids[left_qfs_len] =
749 port->emigration_target_port_ids[i];
750 left_qfs[left_qfs_len] = *qf;
751 left_qfs_len++;
752 continue;
753 }
754
755 DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
756 "queue_id %d flow_hash %d.\n", queue_id,
757 flow_hash);
758
759 if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
760 dsw_port_remove_paused_flow(port, qf);
761 dsw_port_flush_paused_events(dsw, port, qf);
762 }
763 }
764
765 finished = port->emigration_targets_len - left_qfs_len;
766
767 if (finished > 0)
768 dsw_port_emigration_stats(port, finished);
769
770 for (i = 0; i < left_qfs_len; i++) {
771 port->emigration_target_port_ids[i] = left_port_ids[i];
772 port->emigration_target_qfs[i] = left_qfs[i];
773 }
774 port->emigration_targets_len = left_qfs_len;
775
776 if (port->emigration_targets_len == 0) {
777 port->migration_state = DSW_MIGRATION_STATE_IDLE;
778 port->seen_events_len = 0;
779 }
780}
781
782static void
783dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
784 struct dsw_port *source_port)
785{
786 uint8_t i;
787
788 for (i = 0; i < source_port->emigration_targets_len; i++) {
789 struct dsw_queue_flow *qf =
790 &source_port->emigration_target_qfs[i];
791 uint8_t queue_id = qf->queue_id;
792
793 if (dsw->queues[queue_id].schedule_type ==
794 RTE_SCHED_TYPE_PARALLEL) {
795 uint8_t dest_port_id =
796 source_port->emigration_target_port_ids[i];
797 uint16_t flow_hash = qf->flow_hash;
798
799
800 dsw->queues[queue_id].flow_to_port_map[flow_hash] =
801 dest_port_id;
802 }
803 }
804
805 rte_smp_wmb();
806
807 dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
808}
809
810static void
811dsw_port_consider_emigration(struct dsw_evdev *dsw,
812 struct dsw_port *source_port,
813 uint64_t now)
814{
815 bool any_port_below_limit;
816 struct dsw_queue_flow *seen_events = source_port->seen_events;
817 uint16_t seen_events_len = source_port->seen_events_len;
818 struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
819 uint16_t num_bursts;
820 int16_t source_port_load;
821 int16_t port_loads[dsw->num_ports];
822
823 if (now < source_port->next_emigration)
824 return;
825
826 if (dsw->num_ports == 1)
827 return;
828
829 if (seen_events_len < DSW_MAX_EVENTS_RECORDED)
830 return;
831
832 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
833
834
835
836
837
838 source_port->next_emigration = now +
839 source_port->migration_interval / 2 +
840 rte_rand() % source_port->migration_interval;
841
842 if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
843 DSW_LOG_DP_PORT(DEBUG, source_port->id,
844 "Emigration already in progress.\n");
845 return;
846 }
847
848
849
850
851
852 if (source_port->in_buffer_len > 0) {
853 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
854 "events in the input buffer.\n");
855 return;
856 }
857
858 source_port_load =
859 __atomic_load_n(&source_port->load, __ATOMIC_RELAXED);
860 if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
861 DSW_LOG_DP_PORT(DEBUG, source_port->id,
862 "Load %d is below threshold level %d.\n",
863 DSW_LOAD_TO_PERCENT(source_port_load),
864 DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
865 return;
866 }
867
868
869
870
871 any_port_below_limit =
872 dsw_retrieve_port_loads(dsw, port_loads,
873 DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
874 if (!any_port_below_limit) {
875 DSW_LOG_DP_PORT(DEBUG, source_port->id,
876 "Candidate target ports are all too highly "
877 "loaded.\n");
878 return;
879 }
880
881 num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
882 bursts);
883
884
885
886
887 if (num_bursts < 2) {
888 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
889 "queue_id %d flow_hash %d has been seen.\n",
890 bursts[0].queue_flow.queue_id,
891 bursts[0].queue_flow.flow_hash);
892 return;
893 }
894
895 dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
896 port_loads);
897
898 if (source_port->emigration_targets_len == 0)
899 return;
900
901 source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
902 source_port->emigration_start = rte_get_timer_cycles();
903
904
905
906
907
908 dsw_port_move_parallel_flows(dsw, source_port);
909
910
911 if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
912 return;
913
914
915
916
917 dsw_port_flush_out_buffers(dsw, source_port);
918
919 dsw_port_add_paused_flows(source_port,
920 source_port->emigration_target_qfs,
921 source_port->emigration_targets_len);
922
923 dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
924 source_port->emigration_target_qfs,
925 source_port->emigration_targets_len);
926 source_port->cfm_cnt = 0;
927}
928
929static void
930dsw_port_flush_paused_events(struct dsw_evdev *dsw,
931 struct dsw_port *source_port,
932 const struct dsw_queue_flow *qf);
933
934static void
935dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
936 uint8_t originating_port_id,
937 struct dsw_queue_flow *paused_qfs,
938 uint8_t qfs_len)
939{
940 uint16_t i;
941 struct dsw_ctl_msg cfm = {
942 .type = DSW_CTL_CFM,
943 .originating_port_id = port->id
944 };
945
946 dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
947
948 rte_smp_rmb();
949
950 dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
951
952 for (i = 0; i < qfs_len; i++) {
953 struct dsw_queue_flow *qf = &paused_qfs[i];
954
955 if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
956 port->immigrations++;
957
958 dsw_port_flush_paused_events(dsw, port, qf);
959 }
960}
961
962#define FORWARD_BURST_SIZE (32)
963
964static void
965dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
966 struct rte_event_ring *dest_ring,
967 uint8_t queue_id,
968 uint16_t flow_hash)
969{
970 uint16_t events_left;
971
972
973
974
975 rte_smp_rmb();
976
977 events_left = rte_event_ring_count(source_port->in_ring);
978
979 while (events_left > 0) {
980 uint16_t in_burst_size =
981 RTE_MIN(FORWARD_BURST_SIZE, events_left);
982 struct rte_event in_burst[in_burst_size];
983 uint16_t in_len;
984 uint16_t i;
985
986 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
987 in_burst,
988 in_burst_size, NULL);
989
990
991
992
993
994 for (i = 0; i < in_len; i++) {
995 struct rte_event *e = &in_burst[i];
996 if (e->queue_id == queue_id &&
997 dsw_flow_id_hash(e->flow_id) == flow_hash) {
998 while (rte_event_ring_enqueue_burst(dest_ring,
999 e, 1,
1000 NULL) != 1)
1001 rte_pause();
1002 } else {
1003 uint16_t last_idx = source_port->in_buffer_len;
1004 source_port->in_buffer[last_idx] = *e;
1005 source_port->in_buffer_len++;
1006 }
1007 }
1008
1009 events_left -= in_len;
1010 }
1011}
1012
1013static void
1014dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
1015 struct dsw_port *source_port)
1016{
1017 uint8_t i;
1018
1019 dsw_port_flush_out_buffers(dsw, source_port);
1020
1021 rte_smp_wmb();
1022
1023 for (i = 0; i < source_port->emigration_targets_len; i++) {
1024 struct dsw_queue_flow *qf =
1025 &source_port->emigration_target_qfs[i];
1026 uint8_t dest_port_id =
1027 source_port->emigration_target_port_ids[i];
1028 struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1029
1030 dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1031 dest_port_id;
1032
1033 dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
1034 qf->queue_id, qf->flow_hash);
1035 }
1036
1037
1038
1039
1040 rte_smp_wmb();
1041
1042 dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1043 source_port->emigration_target_qfs,
1044 source_port->emigration_targets_len);
1045 source_port->cfm_cnt = 0;
1046 source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1047}
1048
1049static void
1050dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1051{
1052 port->cfm_cnt++;
1053
1054 if (port->cfm_cnt == (dsw->num_ports-1)) {
1055 switch (port->migration_state) {
1056 case DSW_MIGRATION_STATE_PAUSING:
1057 DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
1058 "migration state.\n");
1059 port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
1060 break;
1061 case DSW_MIGRATION_STATE_UNPAUSING:
1062 dsw_port_end_emigration(dsw, port,
1063 RTE_SCHED_TYPE_ATOMIC);
1064 break;
1065 default:
1066 RTE_ASSERT(0);
1067 break;
1068 }
1069 }
1070}
1071
1072static void
1073dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1074{
1075 struct dsw_ctl_msg msg;
1076
1077 if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1078 switch (msg.type) {
1079 case DSW_CTL_PAUS_REQ:
1080 dsw_port_handle_pause_flows(dsw, port,
1081 msg.originating_port_id,
1082 msg.qfs, msg.qfs_len);
1083 break;
1084 case DSW_CTL_UNPAUS_REQ:
1085 dsw_port_handle_unpause_flows(dsw, port,
1086 msg.originating_port_id,
1087 msg.qfs, msg.qfs_len);
1088 break;
1089 case DSW_CTL_CFM:
1090 dsw_port_handle_confirm(dsw, port);
1091 break;
1092 }
1093 }
1094}
1095
1096static void
1097dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1098{
1099
1100
1101
1102 port->ops_since_bg_task += (num_events+1);
1103}
1104
1105static void
1106dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1107{
1108 if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
1109 port->pending_releases == 0))
1110 dsw_port_move_emigrating_flows(dsw, port);
1111
1112
1113
1114
1115
1116 dsw_port_ctl_process(dsw, port);
1117
1118
1119
1120
1121
1122
1123 if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1124 uint64_t now;
1125
1126 now = rte_get_timer_cycles();
1127
1128 port->last_bg = now;
1129
1130
1131
1132
1133 dsw_port_flush_out_buffers(dsw, port);
1134
1135 dsw_port_consider_load_update(port, now);
1136
1137 dsw_port_consider_emigration(dsw, port, now);
1138
1139 port->ops_since_bg_task = 0;
1140 }
1141}
1142
1143static void
1144dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1145{
1146 uint16_t dest_port_id;
1147
1148 for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1149 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1150}
1151
1152uint16_t
1153dsw_event_enqueue(void *port, const struct rte_event *ev)
1154{
1155 return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1156}
1157
1158static __rte_always_inline uint16_t
1159dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1160 const struct rte_event events[],
1161 uint16_t events_len, bool op_types_known,
1162 uint16_t num_new, uint16_t num_release,
1163 uint16_t num_non_release)
1164{
1165 struct dsw_evdev *dsw = source_port->dsw;
1166 bool enough_credits;
1167 uint16_t i;
1168
1169 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1170 "events to port %d.\n", events_len, source_port->id);
1171
1172 dsw_port_bg_process(dsw, source_port);
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186 if (unlikely(events_len == 0)) {
1187 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1188 dsw_port_flush_out_buffers(dsw, source_port);
1189 return 0;
1190 }
1191
1192 dsw_port_note_op(source_port, events_len);
1193
1194 if (!op_types_known)
1195 for (i = 0; i < events_len; i++) {
1196 switch (events[i].op) {
1197 case RTE_EVENT_OP_RELEASE:
1198 num_release++;
1199 break;
1200 case RTE_EVENT_OP_NEW:
1201 num_new++;
1202
1203 default:
1204 num_non_release++;
1205 break;
1206 }
1207 }
1208
1209
1210
1211
1212
1213
1214 if (unlikely(num_new > 0 &&
1215 __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED) >
1216 source_port->new_event_threshold))
1217 return 0;
1218
1219 enough_credits = dsw_port_acquire_credits(dsw, source_port,
1220 num_non_release);
1221 if (unlikely(!enough_credits))
1222 return 0;
1223
1224 source_port->pending_releases -= num_release;
1225
1226 dsw_port_enqueue_stats(source_port, num_new,
1227 num_non_release-num_new, num_release);
1228
1229 for (i = 0; i < events_len; i++) {
1230 const struct rte_event *event = &events[i];
1231
1232 if (likely(num_release == 0 ||
1233 event->op != RTE_EVENT_OP_RELEASE))
1234 dsw_port_buffer_event(dsw, source_port, event);
1235 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1236 }
1237
1238 DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1239 "accepted.\n", num_non_release);
1240
1241 return (num_non_release + num_release);
1242}
1243
1244uint16_t
1245dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1246 uint16_t events_len)
1247{
1248 struct dsw_port *source_port = port;
1249
1250 if (unlikely(events_len > source_port->enqueue_depth))
1251 events_len = source_port->enqueue_depth;
1252
1253 return dsw_event_enqueue_burst_generic(source_port, events,
1254 events_len, false, 0, 0, 0);
1255}
1256
1257uint16_t
1258dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1259 uint16_t events_len)
1260{
1261 struct dsw_port *source_port = port;
1262
1263 if (unlikely(events_len > source_port->enqueue_depth))
1264 events_len = source_port->enqueue_depth;
1265
1266 return dsw_event_enqueue_burst_generic(source_port, events,
1267 events_len, true, events_len,
1268 0, events_len);
1269}
1270
1271uint16_t
1272dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1273 uint16_t events_len)
1274{
1275 struct dsw_port *source_port = port;
1276
1277 if (unlikely(events_len > source_port->enqueue_depth))
1278 events_len = source_port->enqueue_depth;
1279
1280 return dsw_event_enqueue_burst_generic(source_port, events,
1281 events_len, true, 0, 0,
1282 events_len);
1283}
1284
1285uint16_t
1286dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1287{
1288 return dsw_event_dequeue_burst(port, events, 1, wait);
1289}
1290
1291static void
1292dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1293 uint16_t num)
1294{
1295 uint16_t i;
1296
1297 dsw_port_dequeue_stats(port, num);
1298
1299 for (i = 0; i < num; i++) {
1300 uint16_t l_idx = port->seen_events_idx;
1301 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1302 struct rte_event *event = &events[i];
1303 qf->queue_id = event->queue_id;
1304 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1305
1306 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1307
1308 dsw_port_queue_dequeued_stats(port, event->queue_id);
1309 }
1310
1311 if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1312 port->seen_events_len =
1313 RTE_MIN(port->seen_events_len + num,
1314 DSW_MAX_EVENTS_RECORDED);
1315}
1316
1317#ifdef DSW_SORT_DEQUEUED
1318
1319#define DSW_EVENT_TO_INT(_event) \
1320 ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1321
1322static inline int
1323dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1324{
1325 const struct rte_event *event_a = v_event_a;
1326 const struct rte_event *event_b = v_event_b;
1327
1328 return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1329}
1330#endif
1331
1332static uint16_t
1333dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1334 uint16_t num)
1335{
1336 if (unlikely(port->in_buffer_len > 0)) {
1337 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1338
1339 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1340 dequeued * sizeof(struct rte_event));
1341
1342 port->in_buffer_start += dequeued;
1343 port->in_buffer_len -= dequeued;
1344
1345 if (port->in_buffer_len == 0)
1346 port->in_buffer_start = 0;
1347
1348 return dequeued;
1349 }
1350
1351 return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1352}
1353
1354uint16_t
1355dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1356 uint64_t wait __rte_unused)
1357{
1358 struct dsw_port *source_port = port;
1359 struct dsw_evdev *dsw = source_port->dsw;
1360 uint16_t dequeued;
1361
1362 source_port->pending_releases = 0;
1363
1364 dsw_port_bg_process(dsw, source_port);
1365
1366 if (unlikely(num > source_port->dequeue_depth))
1367 num = source_port->dequeue_depth;
1368
1369 dequeued = dsw_port_dequeue_burst(source_port, events, num);
1370
1371 source_port->pending_releases = dequeued;
1372
1373 dsw_port_load_record(source_port, dequeued);
1374
1375 dsw_port_note_op(source_port, dequeued);
1376
1377 if (dequeued > 0) {
1378 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1379 dequeued);
1380
1381 dsw_port_return_credits(dsw, source_port, dequeued);
1382
1383
1384
1385
1386
1387
1388
1389
1390 dsw_port_record_seen_events(port, events, dequeued);
1391 } else
1392
1393
1394
1395 dsw_port_flush_out_buffers(dsw, port);
1396
1397#ifdef DSW_SORT_DEQUEUED
1398 dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1399#endif
1400
1401 return dequeued;
1402}
1403