1
2
3
4
5#include <stdlib.h>
6
7#include <rte_common.h>
8#include <rte_cycles.h>
9#include <rte_lcore.h>
10#include <rte_ring.h>
11
12#include <rte_table_acl.h>
13#include <rte_table_array.h>
14#include <rte_table_hash.h>
15#include <rte_table_lpm.h>
16#include <rte_table_lpm_ipv6.h>
17
18#include "obj.h"
19#include "thread.h"
20
21#ifndef THREAD_PIPELINES_MAX
22#define THREAD_PIPELINES_MAX 256
23#endif
24
25#ifndef THREAD_MSGQ_SIZE
26#define THREAD_MSGQ_SIZE 64
27#endif
28
29#ifndef THREAD_TIMER_PERIOD_MS
30#define THREAD_TIMER_PERIOD_MS 100
31#endif
32
33
34
35
36struct thread {
37 struct rte_ring *msgq_req;
38 struct rte_ring *msgq_rsp;
39
40 uint32_t enabled;
41};
42
43static struct thread thread[RTE_MAX_LCORE];
44
45
46
47
48struct pipeline_data {
49 struct rte_swx_pipeline *p;
50 uint64_t timer_period;
51 uint64_t time_next;
52};
53
54struct thread_data {
55 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
56 uint32_t n_pipelines;
57
58 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
59 struct rte_ring *msgq_req;
60 struct rte_ring *msgq_rsp;
61 uint64_t timer_period;
62 uint64_t time_next;
63 uint64_t time_next_min;
64} __rte_cache_aligned;
65
66static struct thread_data thread_data[RTE_MAX_LCORE];
67
68
69
70
71static void
72thread_free(void)
73{
74 uint32_t i;
75
76 for (i = 0; i < RTE_MAX_LCORE; i++) {
77 struct thread *t = &thread[i];
78
79 if (!rte_lcore_is_enabled(i))
80 continue;
81
82
83 if (t->msgq_req)
84 rte_ring_free(t->msgq_req);
85
86 if (t->msgq_rsp)
87 rte_ring_free(t->msgq_rsp);
88 }
89}
90
91int
92thread_init(void)
93{
94 uint32_t i;
95
96 RTE_LCORE_FOREACH_WORKER(i) {
97 char name[NAME_MAX];
98 struct rte_ring *msgq_req, *msgq_rsp;
99 struct thread *t = &thread[i];
100 struct thread_data *t_data = &thread_data[i];
101 uint32_t cpu_id = rte_lcore_to_socket_id(i);
102
103
104 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
105
106 msgq_req = rte_ring_create(name,
107 THREAD_MSGQ_SIZE,
108 cpu_id,
109 RING_F_SP_ENQ | RING_F_SC_DEQ);
110
111 if (msgq_req == NULL) {
112 thread_free();
113 return -1;
114 }
115
116 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
117
118 msgq_rsp = rte_ring_create(name,
119 THREAD_MSGQ_SIZE,
120 cpu_id,
121 RING_F_SP_ENQ | RING_F_SC_DEQ);
122
123 if (msgq_rsp == NULL) {
124 thread_free();
125 return -1;
126 }
127
128
129 t->msgq_req = msgq_req;
130 t->msgq_rsp = msgq_rsp;
131 t->enabled = 1;
132
133
134 t_data->n_pipelines = 0;
135 t_data->msgq_req = msgq_req;
136 t_data->msgq_rsp = msgq_rsp;
137 t_data->timer_period =
138 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
139 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
140 t_data->time_next_min = t_data->time_next;
141 }
142
143 return 0;
144}
145
146static inline int
147thread_is_running(uint32_t thread_id)
148{
149 enum rte_lcore_state_t thread_state;
150
151 thread_state = rte_eal_get_lcore_state(thread_id);
152 return (thread_state == RUNNING) ? 1 : 0;
153}
154
155
156
157
158enum thread_req_type {
159 THREAD_REQ_PIPELINE_ENABLE = 0,
160 THREAD_REQ_PIPELINE_DISABLE,
161 THREAD_REQ_MAX
162};
163
164struct thread_msg_req {
165 enum thread_req_type type;
166
167 union {
168 struct {
169 struct rte_swx_pipeline *p;
170 uint32_t timer_period_ms;
171 } pipeline_enable;
172
173 struct {
174 struct rte_swx_pipeline *p;
175 } pipeline_disable;
176 };
177};
178
179struct thread_msg_rsp {
180 int status;
181};
182
183
184
185
186static struct thread_msg_req *
187thread_msg_alloc(void)
188{
189 size_t size = RTE_MAX(sizeof(struct thread_msg_req),
190 sizeof(struct thread_msg_rsp));
191
192 return calloc(1, size);
193}
194
195static void
196thread_msg_free(struct thread_msg_rsp *rsp)
197{
198 free(rsp);
199}
200
201static struct thread_msg_rsp *
202thread_msg_send_recv(uint32_t thread_id,
203 struct thread_msg_req *req)
204{
205 struct thread *t = &thread[thread_id];
206 struct rte_ring *msgq_req = t->msgq_req;
207 struct rte_ring *msgq_rsp = t->msgq_rsp;
208 struct thread_msg_rsp *rsp;
209 int status;
210
211
212 do {
213 status = rte_ring_sp_enqueue(msgq_req, req);
214 } while (status == -ENOBUFS);
215
216
217 do {
218 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
219 } while (status != 0);
220
221 return rsp;
222}
223
224int
225thread_pipeline_enable(uint32_t thread_id,
226 struct obj *obj,
227 const char *pipeline_name)
228{
229 struct pipeline *p = pipeline_find(obj, pipeline_name);
230 struct thread *t;
231 struct thread_msg_req *req;
232 struct thread_msg_rsp *rsp;
233 int status;
234
235
236 if ((thread_id >= RTE_MAX_LCORE) ||
237 (p == NULL))
238 return -1;
239
240 t = &thread[thread_id];
241 if (t->enabled == 0)
242 return -1;
243
244 if (!thread_is_running(thread_id)) {
245 struct thread_data *td = &thread_data[thread_id];
246 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
247
248 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
249 return -1;
250
251
252 td->p[td->n_pipelines] = p->p;
253
254 tdp->p = p->p;
255 tdp->timer_period =
256 (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
257 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
258
259 td->n_pipelines++;
260
261
262 p->thread_id = thread_id;
263 p->enabled = 1;
264
265 return 0;
266 }
267
268
269 req = thread_msg_alloc();
270 if (req == NULL)
271 return -1;
272
273
274 req->type = THREAD_REQ_PIPELINE_ENABLE;
275 req->pipeline_enable.p = p->p;
276 req->pipeline_enable.timer_period_ms = p->timer_period_ms;
277
278
279 rsp = thread_msg_send_recv(thread_id, req);
280
281
282 status = rsp->status;
283
284
285 thread_msg_free(rsp);
286
287
288 if (status)
289 return status;
290
291 p->thread_id = thread_id;
292 p->enabled = 1;
293
294 return 0;
295}
296
297int
298thread_pipeline_disable(uint32_t thread_id,
299 struct obj *obj,
300 const char *pipeline_name)
301{
302 struct pipeline *p = pipeline_find(obj, pipeline_name);
303 struct thread *t;
304 struct thread_msg_req *req;
305 struct thread_msg_rsp *rsp;
306 int status;
307
308
309 if ((thread_id >= RTE_MAX_LCORE) ||
310 (p == NULL))
311 return -1;
312
313 t = &thread[thread_id];
314 if (t->enabled == 0)
315 return -1;
316
317 if (p->enabled == 0)
318 return 0;
319
320 if (p->thread_id != thread_id)
321 return -1;
322
323 if (!thread_is_running(thread_id)) {
324 struct thread_data *td = &thread_data[thread_id];
325 uint32_t i;
326
327 for (i = 0; i < td->n_pipelines; i++) {
328 struct pipeline_data *tdp = &td->pipeline_data[i];
329
330 if (tdp->p != p->p)
331 continue;
332
333
334 if (i < td->n_pipelines - 1) {
335 struct rte_swx_pipeline *pipeline_last =
336 td->p[td->n_pipelines - 1];
337 struct pipeline_data *tdp_last =
338 &td->pipeline_data[td->n_pipelines - 1];
339
340 td->p[i] = pipeline_last;
341 memcpy(tdp, tdp_last, sizeof(*tdp));
342 }
343
344 td->n_pipelines--;
345
346
347 p->enabled = 0;
348
349 break;
350 }
351
352 return 0;
353 }
354
355
356 req = thread_msg_alloc();
357 if (req == NULL)
358 return -1;
359
360
361 req->type = THREAD_REQ_PIPELINE_DISABLE;
362 req->pipeline_disable.p = p->p;
363
364
365 rsp = thread_msg_send_recv(thread_id, req);
366
367
368 status = rsp->status;
369
370
371 thread_msg_free(rsp);
372
373
374 if (status)
375 return status;
376
377 p->enabled = 0;
378
379 return 0;
380}
381
382
383
384
385static inline struct thread_msg_req *
386thread_msg_recv(struct rte_ring *msgq_req)
387{
388 struct thread_msg_req *req;
389
390 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
391
392 if (status != 0)
393 return NULL;
394
395 return req;
396}
397
398static inline void
399thread_msg_send(struct rte_ring *msgq_rsp,
400 struct thread_msg_rsp *rsp)
401{
402 int status;
403
404 do {
405 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
406 } while (status == -ENOBUFS);
407}
408
409static struct thread_msg_rsp *
410thread_msg_handle_pipeline_enable(struct thread_data *t,
411 struct thread_msg_req *req)
412{
413 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
414 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
415
416
417 if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
418 rsp->status = -1;
419 return rsp;
420 }
421
422 t->p[t->n_pipelines] = req->pipeline_enable.p;
423
424 p->p = req->pipeline_enable.p;
425 p->timer_period = (rte_get_tsc_hz() *
426 req->pipeline_enable.timer_period_ms) / 1000;
427 p->time_next = rte_get_tsc_cycles() + p->timer_period;
428
429 t->n_pipelines++;
430
431
432 rsp->status = 0;
433 return rsp;
434}
435
436static struct thread_msg_rsp *
437thread_msg_handle_pipeline_disable(struct thread_data *t,
438 struct thread_msg_req *req)
439{
440 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
441 uint32_t n_pipelines = t->n_pipelines;
442 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
443 uint32_t i;
444
445
446 for (i = 0; i < n_pipelines; i++) {
447 struct pipeline_data *p = &t->pipeline_data[i];
448
449 if (p->p != pipeline)
450 continue;
451
452 if (i < n_pipelines - 1) {
453 struct rte_swx_pipeline *pipeline_last =
454 t->p[n_pipelines - 1];
455 struct pipeline_data *p_last =
456 &t->pipeline_data[n_pipelines - 1];
457
458 t->p[i] = pipeline_last;
459 memcpy(p, p_last, sizeof(*p));
460 }
461
462 t->n_pipelines--;
463
464 rsp->status = 0;
465 return rsp;
466 }
467
468
469 rsp->status = 0;
470 return rsp;
471}
472
473static void
474thread_msg_handle(struct thread_data *t)
475{
476 for ( ; ; ) {
477 struct thread_msg_req *req;
478 struct thread_msg_rsp *rsp;
479
480 req = thread_msg_recv(t->msgq_req);
481 if (req == NULL)
482 break;
483
484 switch (req->type) {
485 case THREAD_REQ_PIPELINE_ENABLE:
486 rsp = thread_msg_handle_pipeline_enable(t, req);
487 break;
488
489 case THREAD_REQ_PIPELINE_DISABLE:
490 rsp = thread_msg_handle_pipeline_disable(t, req);
491 break;
492
493 default:
494 rsp = (struct thread_msg_rsp *) req;
495 rsp->status = -1;
496 }
497
498 thread_msg_send(t->msgq_rsp, rsp);
499 }
500}
501
502
503
504
505int
506thread_main(void *arg __rte_unused)
507{
508 struct thread_data *t;
509 uint32_t thread_id, i;
510
511 thread_id = rte_lcore_id();
512 t = &thread_data[thread_id];
513
514
515 for (i = 0; ; i++) {
516 uint32_t j;
517
518
519 for (j = 0; j < t->n_pipelines; j++)
520 rte_swx_pipeline_run(t->p[j], 1000000);
521
522
523 if ((i & 0xF) == 0) {
524 uint64_t time = rte_get_tsc_cycles();
525 uint64_t time_next_min = UINT64_MAX;
526
527 if (time < t->time_next_min)
528 continue;
529
530
531 {
532 uint64_t time_next = t->time_next;
533
534 if (time_next <= time) {
535 thread_msg_handle(t);
536 time_next = time + t->timer_period;
537 t->time_next = time_next;
538 }
539
540 if (time_next < time_next_min)
541 time_next_min = time_next;
542 }
543
544 t->time_next_min = time_next_min;
545 }
546 }
547
548 return 0;
549}
550