1
2
3
4
5#include <stdlib.h>
6
7#include <rte_common.h>
8#include <rte_cycles.h>
9#include <rte_lcore.h>
10#include <rte_ring.h>
11
12#include <rte_table_acl.h>
13#include <rte_table_array.h>
14#include <rte_table_hash.h>
15#include <rte_table_lpm.h>
16#include <rte_table_lpm_ipv6.h>
17
18#include "obj.h"
19#include "thread.h"
20
21#ifndef THREAD_PIPELINES_MAX
22#define THREAD_PIPELINES_MAX 256
23#endif
24
25#ifndef THREAD_MSGQ_SIZE
26#define THREAD_MSGQ_SIZE 64
27#endif
28
29#ifndef THREAD_TIMER_PERIOD_MS
30#define THREAD_TIMER_PERIOD_MS 100
31#endif
32
33
34
35
36
37
38#ifndef PIPELINE_INSTR_QUANTA
39#define PIPELINE_INSTR_QUANTA 1000
40#endif
41
42
43
44
45struct thread {
46 struct rte_ring *msgq_req;
47 struct rte_ring *msgq_rsp;
48
49 uint32_t enabled;
50};
51
52static struct thread thread[RTE_MAX_LCORE];
53
54
55
56
57struct pipeline_data {
58 struct rte_swx_pipeline *p;
59 uint64_t timer_period;
60 uint64_t time_next;
61};
62
63struct thread_data {
64 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
65 uint32_t n_pipelines;
66
67 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
68 struct rte_ring *msgq_req;
69 struct rte_ring *msgq_rsp;
70 uint64_t timer_period;
71 uint64_t time_next;
72 uint64_t time_next_min;
73} __rte_cache_aligned;
74
75static struct thread_data thread_data[RTE_MAX_LCORE];
76
77
78
79
80static void
81thread_free(void)
82{
83 uint32_t i;
84
85 for (i = 0; i < RTE_MAX_LCORE; i++) {
86 struct thread *t = &thread[i];
87
88 if (!rte_lcore_is_enabled(i))
89 continue;
90
91
92 rte_ring_free(t->msgq_req);
93
94 rte_ring_free(t->msgq_rsp);
95 }
96}
97
98int
99thread_init(void)
100{
101 uint32_t i;
102
103 RTE_LCORE_FOREACH_WORKER(i) {
104 char name[NAME_MAX];
105 struct rte_ring *msgq_req, *msgq_rsp;
106 struct thread *t = &thread[i];
107 struct thread_data *t_data = &thread_data[i];
108 uint32_t cpu_id = rte_lcore_to_socket_id(i);
109
110
111 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
112
113 msgq_req = rte_ring_create(name,
114 THREAD_MSGQ_SIZE,
115 cpu_id,
116 RING_F_SP_ENQ | RING_F_SC_DEQ);
117
118 if (msgq_req == NULL) {
119 thread_free();
120 return -1;
121 }
122
123 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
124
125 msgq_rsp = rte_ring_create(name,
126 THREAD_MSGQ_SIZE,
127 cpu_id,
128 RING_F_SP_ENQ | RING_F_SC_DEQ);
129
130 if (msgq_rsp == NULL) {
131 thread_free();
132 return -1;
133 }
134
135
136 t->msgq_req = msgq_req;
137 t->msgq_rsp = msgq_rsp;
138 t->enabled = 1;
139
140
141 t_data->n_pipelines = 0;
142 t_data->msgq_req = msgq_req;
143 t_data->msgq_rsp = msgq_rsp;
144 t_data->timer_period =
145 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
146 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
147 t_data->time_next_min = t_data->time_next;
148 }
149
150 return 0;
151}
152
153static inline int
154thread_is_running(uint32_t thread_id)
155{
156 enum rte_lcore_state_t thread_state;
157
158 thread_state = rte_eal_get_lcore_state(thread_id);
159 return (thread_state == RUNNING) ? 1 : 0;
160}
161
162
163
164
165enum thread_req_type {
166 THREAD_REQ_PIPELINE_ENABLE = 0,
167 THREAD_REQ_PIPELINE_DISABLE,
168 THREAD_REQ_MAX
169};
170
171struct thread_msg_req {
172 enum thread_req_type type;
173
174 union {
175 struct {
176 struct rte_swx_pipeline *p;
177 uint32_t timer_period_ms;
178 } pipeline_enable;
179
180 struct {
181 struct rte_swx_pipeline *p;
182 } pipeline_disable;
183 };
184};
185
186struct thread_msg_rsp {
187 int status;
188};
189
190
191
192
193static struct thread_msg_req *
194thread_msg_alloc(void)
195{
196 size_t size = RTE_MAX(sizeof(struct thread_msg_req),
197 sizeof(struct thread_msg_rsp));
198
199 return calloc(1, size);
200}
201
202static void
203thread_msg_free(struct thread_msg_rsp *rsp)
204{
205 free(rsp);
206}
207
208static struct thread_msg_rsp *
209thread_msg_send_recv(uint32_t thread_id,
210 struct thread_msg_req *req)
211{
212 struct thread *t = &thread[thread_id];
213 struct rte_ring *msgq_req = t->msgq_req;
214 struct rte_ring *msgq_rsp = t->msgq_rsp;
215 struct thread_msg_rsp *rsp;
216 int status;
217
218
219 do {
220 status = rte_ring_sp_enqueue(msgq_req, req);
221 } while (status == -ENOBUFS);
222
223
224 do {
225 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
226 } while (status != 0);
227
228 return rsp;
229}
230
231int
232thread_pipeline_enable(uint32_t thread_id,
233 struct obj *obj,
234 const char *pipeline_name)
235{
236 struct pipeline *p = pipeline_find(obj, pipeline_name);
237 struct thread *t;
238 struct thread_msg_req *req;
239 struct thread_msg_rsp *rsp;
240 int status;
241
242
243 if ((thread_id >= RTE_MAX_LCORE) ||
244 (p == NULL))
245 return -1;
246
247 t = &thread[thread_id];
248 if (t->enabled == 0)
249 return -1;
250
251 if (!thread_is_running(thread_id)) {
252 struct thread_data *td = &thread_data[thread_id];
253 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
254
255 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
256 return -1;
257
258
259 td->p[td->n_pipelines] = p->p;
260
261 tdp->p = p->p;
262 tdp->timer_period =
263 (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
264 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
265
266 td->n_pipelines++;
267
268
269 p->thread_id = thread_id;
270 p->enabled = 1;
271
272 return 0;
273 }
274
275
276 req = thread_msg_alloc();
277 if (req == NULL)
278 return -1;
279
280
281 req->type = THREAD_REQ_PIPELINE_ENABLE;
282 req->pipeline_enable.p = p->p;
283 req->pipeline_enable.timer_period_ms = p->timer_period_ms;
284
285
286 rsp = thread_msg_send_recv(thread_id, req);
287
288
289 status = rsp->status;
290
291
292 thread_msg_free(rsp);
293
294
295 if (status)
296 return status;
297
298 p->thread_id = thread_id;
299 p->enabled = 1;
300
301 return 0;
302}
303
304int
305thread_pipeline_disable(uint32_t thread_id,
306 struct obj *obj,
307 const char *pipeline_name)
308{
309 struct pipeline *p = pipeline_find(obj, pipeline_name);
310 struct thread *t;
311 struct thread_msg_req *req;
312 struct thread_msg_rsp *rsp;
313 int status;
314
315
316 if ((thread_id >= RTE_MAX_LCORE) ||
317 (p == NULL))
318 return -1;
319
320 t = &thread[thread_id];
321 if (t->enabled == 0)
322 return -1;
323
324 if (p->enabled == 0)
325 return 0;
326
327 if (p->thread_id != thread_id)
328 return -1;
329
330 if (!thread_is_running(thread_id)) {
331 struct thread_data *td = &thread_data[thread_id];
332 uint32_t i;
333
334 for (i = 0; i < td->n_pipelines; i++) {
335 struct pipeline_data *tdp = &td->pipeline_data[i];
336
337 if (tdp->p != p->p)
338 continue;
339
340
341 if (i < td->n_pipelines - 1) {
342 struct rte_swx_pipeline *pipeline_last =
343 td->p[td->n_pipelines - 1];
344 struct pipeline_data *tdp_last =
345 &td->pipeline_data[td->n_pipelines - 1];
346
347 td->p[i] = pipeline_last;
348 memcpy(tdp, tdp_last, sizeof(*tdp));
349 }
350
351 td->n_pipelines--;
352
353
354 p->enabled = 0;
355
356 break;
357 }
358
359 return 0;
360 }
361
362
363 req = thread_msg_alloc();
364 if (req == NULL)
365 return -1;
366
367
368 req->type = THREAD_REQ_PIPELINE_DISABLE;
369 req->pipeline_disable.p = p->p;
370
371
372 rsp = thread_msg_send_recv(thread_id, req);
373
374
375 status = rsp->status;
376
377
378 thread_msg_free(rsp);
379
380
381 if (status)
382 return status;
383
384 p->enabled = 0;
385
386 return 0;
387}
388
389
390
391
392static inline struct thread_msg_req *
393thread_msg_recv(struct rte_ring *msgq_req)
394{
395 struct thread_msg_req *req;
396
397 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
398
399 if (status != 0)
400 return NULL;
401
402 return req;
403}
404
405static inline void
406thread_msg_send(struct rte_ring *msgq_rsp,
407 struct thread_msg_rsp *rsp)
408{
409 int status;
410
411 do {
412 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
413 } while (status == -ENOBUFS);
414}
415
416static struct thread_msg_rsp *
417thread_msg_handle_pipeline_enable(struct thread_data *t,
418 struct thread_msg_req *req)
419{
420 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
421 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
422
423
424 if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
425 rsp->status = -1;
426 return rsp;
427 }
428
429 t->p[t->n_pipelines] = req->pipeline_enable.p;
430
431 p->p = req->pipeline_enable.p;
432 p->timer_period = (rte_get_tsc_hz() *
433 req->pipeline_enable.timer_period_ms) / 1000;
434 p->time_next = rte_get_tsc_cycles() + p->timer_period;
435
436 t->n_pipelines++;
437
438
439 rsp->status = 0;
440 return rsp;
441}
442
443static struct thread_msg_rsp *
444thread_msg_handle_pipeline_disable(struct thread_data *t,
445 struct thread_msg_req *req)
446{
447 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
448 uint32_t n_pipelines = t->n_pipelines;
449 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
450 uint32_t i;
451
452
453 for (i = 0; i < n_pipelines; i++) {
454 struct pipeline_data *p = &t->pipeline_data[i];
455
456 if (p->p != pipeline)
457 continue;
458
459 if (i < n_pipelines - 1) {
460 struct rte_swx_pipeline *pipeline_last =
461 t->p[n_pipelines - 1];
462 struct pipeline_data *p_last =
463 &t->pipeline_data[n_pipelines - 1];
464
465 t->p[i] = pipeline_last;
466 memcpy(p, p_last, sizeof(*p));
467 }
468
469 t->n_pipelines--;
470
471 rsp->status = 0;
472 return rsp;
473 }
474
475
476 rsp->status = 0;
477 return rsp;
478}
479
480static void
481thread_msg_handle(struct thread_data *t)
482{
483 for ( ; ; ) {
484 struct thread_msg_req *req;
485 struct thread_msg_rsp *rsp;
486
487 req = thread_msg_recv(t->msgq_req);
488 if (req == NULL)
489 break;
490
491 switch (req->type) {
492 case THREAD_REQ_PIPELINE_ENABLE:
493 rsp = thread_msg_handle_pipeline_enable(t, req);
494 break;
495
496 case THREAD_REQ_PIPELINE_DISABLE:
497 rsp = thread_msg_handle_pipeline_disable(t, req);
498 break;
499
500 default:
501 rsp = (struct thread_msg_rsp *) req;
502 rsp->status = -1;
503 }
504
505 thread_msg_send(t->msgq_rsp, rsp);
506 }
507}
508
509
510
511
512int
513thread_main(void *arg __rte_unused)
514{
515 struct thread_data *t;
516 uint32_t thread_id, i;
517
518 thread_id = rte_lcore_id();
519 t = &thread_data[thread_id];
520
521
522 for (i = 0; ; i++) {
523 uint32_t j;
524
525
526 for (j = 0; j < t->n_pipelines; j++)
527 rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
528
529
530 if ((i & 0xF) == 0) {
531 uint64_t time = rte_get_tsc_cycles();
532 uint64_t time_next_min = UINT64_MAX;
533
534 if (time < t->time_next_min)
535 continue;
536
537
538 {
539 uint64_t time_next = t->time_next;
540
541 if (time_next <= time) {
542 thread_msg_handle(t);
543 time_next = time + t->timer_period;
544 t->time_next = time_next;
545 }
546
547 if (time_next < time_next_min)
548 time_next_min = time_next;
549 }
550
551 t->time_next_min = time_next_min;
552 }
553 }
554
555 return 0;
556}
557