1
2
3
4
5
6
7#define RTE_MEM 1
8
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <stdint.h>
13#include <stddef.h>
14#include <limits.h>
15#include <inttypes.h>
16#include <unistd.h>
17#include <pthread.h>
18#include <fcntl.h>
19#include <sys/time.h>
20#include <sys/mman.h>
21#include <sched.h>
22
23#include <rte_prefetch.h>
24#include <rte_per_lcore.h>
25#include <rte_atomic.h>
26#include <rte_atomic_64.h>
27#include <rte_log.h>
28#include <rte_common.h>
29#include <rte_branch_prediction.h>
30
31#include "lthread_api.h"
32#include "lthread_int.h"
33#include "lthread_sched.h"
34#include "lthread_objcache.h"
35#include "lthread_timer.h"
36#include "lthread_mutex.h"
37#include "lthread_cond.h"
38#include "lthread_tls.h"
39#include "lthread_diag.h"
40
41
42
43
44
45
46
47
48
49
50static rte_atomic16_t num_schedulers;
51static rte_atomic16_t active_schedulers;
52
53
54RTE_DEFINE_PER_LCORE(struct lthread_sched *, this_sched) = NULL;
55
56struct lthread_sched *schedcore[LTHREAD_MAX_LCORES];
57
58diag_callback diag_cb;
59
60uint64_t diag_mask;
61
62
63
64RTE_INIT(lthread_sched_ctor)
65{
66 memset(schedcore, 0, sizeof(schedcore));
67 rte_atomic16_init(&num_schedulers);
68 rte_atomic16_set(&num_schedulers, 1);
69 rte_atomic16_init(&active_schedulers);
70 rte_atomic16_set(&active_schedulers, 0);
71 diag_cb = NULL;
72}
73
74
75enum sched_alloc_phase {
76 SCHED_ALLOC_OK,
77 SCHED_ALLOC_QNODE_POOL,
78 SCHED_ALLOC_READY_QUEUE,
79 SCHED_ALLOC_PREADY_QUEUE,
80 SCHED_ALLOC_LTHREAD_CACHE,
81 SCHED_ALLOC_STACK_CACHE,
82 SCHED_ALLOC_PERLT_CACHE,
83 SCHED_ALLOC_TLS_CACHE,
84 SCHED_ALLOC_COND_CACHE,
85 SCHED_ALLOC_MUTEX_CACHE,
86};
87
88static int
89_lthread_sched_alloc_resources(struct lthread_sched *new_sched)
90{
91 int alloc_status;
92
93 do {
94
95 alloc_status = SCHED_ALLOC_QNODE_POOL;
96 new_sched->qnode_pool =
97 _qnode_pool_create("qnode pool", LTHREAD_PREALLOC);
98 if (new_sched->qnode_pool == NULL)
99 break;
100
101
102 alloc_status = SCHED_ALLOC_READY_QUEUE;
103 new_sched->ready = _lthread_queue_create("ready queue");
104 if (new_sched->ready == NULL)
105 break;
106
107
108 alloc_status = SCHED_ALLOC_PREADY_QUEUE;
109 new_sched->pready = _lthread_queue_create("pready queue");
110 if (new_sched->pready == NULL)
111 break;
112
113
114 alloc_status = SCHED_ALLOC_LTHREAD_CACHE;
115 new_sched->lthread_cache =
116 _lthread_objcache_create("lthread cache",
117 sizeof(struct lthread),
118 LTHREAD_PREALLOC);
119 if (new_sched->lthread_cache == NULL)
120 break;
121
122
123 alloc_status = SCHED_ALLOC_STACK_CACHE;
124 new_sched->stack_cache =
125 _lthread_objcache_create("stack_cache",
126 sizeof(struct lthread_stack),
127 LTHREAD_PREALLOC);
128 if (new_sched->stack_cache == NULL)
129 break;
130
131
132 alloc_status = SCHED_ALLOC_PERLT_CACHE;
133 new_sched->per_lthread_cache =
134 _lthread_objcache_create("per_lt cache",
135 RTE_PER_LTHREAD_SECTION_SIZE,
136 LTHREAD_PREALLOC);
137 if (new_sched->per_lthread_cache == NULL)
138 break;
139
140
141 alloc_status = SCHED_ALLOC_TLS_CACHE;
142 new_sched->tls_cache =
143 _lthread_objcache_create("TLS cache",
144 sizeof(struct lthread_tls),
145 LTHREAD_PREALLOC);
146 if (new_sched->tls_cache == NULL)
147 break;
148
149
150 alloc_status = SCHED_ALLOC_COND_CACHE;
151 new_sched->cond_cache =
152 _lthread_objcache_create("cond cache",
153 sizeof(struct lthread_cond),
154 LTHREAD_PREALLOC);
155 if (new_sched->cond_cache == NULL)
156 break;
157
158
159 alloc_status = SCHED_ALLOC_MUTEX_CACHE;
160 new_sched->mutex_cache =
161 _lthread_objcache_create("mutex cache",
162 sizeof(struct lthread_mutex),
163 LTHREAD_PREALLOC);
164 if (new_sched->mutex_cache == NULL)
165 break;
166
167 alloc_status = SCHED_ALLOC_OK;
168 } while (0);
169
170
171 switch (alloc_status) {
172 case SCHED_ALLOC_MUTEX_CACHE:
173 _lthread_objcache_destroy(new_sched->cond_cache);
174
175 case SCHED_ALLOC_COND_CACHE:
176 _lthread_objcache_destroy(new_sched->tls_cache);
177
178 case SCHED_ALLOC_TLS_CACHE:
179 _lthread_objcache_destroy(new_sched->per_lthread_cache);
180
181 case SCHED_ALLOC_PERLT_CACHE:
182 _lthread_objcache_destroy(new_sched->stack_cache);
183
184 case SCHED_ALLOC_STACK_CACHE:
185 _lthread_objcache_destroy(new_sched->lthread_cache);
186
187 case SCHED_ALLOC_LTHREAD_CACHE:
188 _lthread_queue_destroy(new_sched->pready);
189
190 case SCHED_ALLOC_PREADY_QUEUE:
191 _lthread_queue_destroy(new_sched->ready);
192
193 case SCHED_ALLOC_READY_QUEUE:
194 _qnode_pool_destroy(new_sched->qnode_pool);
195
196 case SCHED_ALLOC_QNODE_POOL:
197
198 case SCHED_ALLOC_OK:
199 break;
200 }
201 return alloc_status;
202}
203
204
205
206
207
208struct lthread_sched *_lthread_sched_create(size_t stack_size)
209{
210 int status;
211 struct lthread_sched *new_sched;
212 unsigned lcoreid = rte_lcore_id();
213
214 RTE_ASSERT(stack_size <= LTHREAD_MAX_STACK_SIZE);
215
216 if (stack_size == 0)
217 stack_size = LTHREAD_MAX_STACK_SIZE;
218
219 new_sched =
220 rte_calloc_socket(NULL, 1, sizeof(struct lthread_sched),
221 RTE_CACHE_LINE_SIZE,
222 rte_socket_id());
223 if (new_sched == NULL) {
224 RTE_LOG(CRIT, LTHREAD,
225 "Failed to allocate memory for scheduler\n");
226 return NULL;
227 }
228
229 _lthread_key_pool_init();
230
231 new_sched->stack_size = stack_size;
232 new_sched->birth = rte_rdtsc();
233 THIS_SCHED = new_sched;
234
235 status = _lthread_sched_alloc_resources(new_sched);
236 if (status != SCHED_ALLOC_OK) {
237 RTE_LOG(CRIT, LTHREAD,
238 "Failed to allocate resources for scheduler code = %d\n",
239 status);
240 rte_free(new_sched);
241 return NULL;
242 }
243
244 bzero(&new_sched->ctx, sizeof(struct ctx));
245
246 new_sched->lcore_id = lcoreid;
247
248 schedcore[lcoreid] = new_sched;
249
250 new_sched->run_flag = 1;
251
252 DIAG_EVENT(new_sched, LT_DIAG_SCHED_CREATE, rte_lcore_id(), 0);
253
254 rte_wmb();
255 return new_sched;
256}
257
258
259
260
261int lthread_num_schedulers_set(int num)
262{
263 rte_atomic16_set(&num_schedulers, num);
264 return (int)rte_atomic16_read(&num_schedulers);
265}
266
267
268
269
270int lthread_active_schedulers(void)
271{
272 return (int)rte_atomic16_read(&active_schedulers);
273}
274
275
276
277
278
279void lthread_scheduler_shutdown(unsigned lcoreid)
280{
281 uint64_t coreid = (uint64_t) lcoreid;
282
283 if (coreid < LTHREAD_MAX_LCORES) {
284 if (schedcore[coreid] != NULL)
285 schedcore[coreid]->run_flag = 0;
286 }
287}
288
289
290
291
292void lthread_scheduler_shutdown_all(void)
293{
294 uint64_t i;
295
296
297
298
299
300
301
302 while (rte_atomic16_read(&active_schedulers) <
303 rte_atomic16_read(&num_schedulers))
304 sched_yield();
305
306 for (i = 0; i < LTHREAD_MAX_LCORES; i++) {
307 if (schedcore[i] != NULL)
308 schedcore[i]->run_flag = 0;
309 }
310}
311
312
313
314
315static __rte_always_inline void
316_lthread_resume(struct lthread *lt);
317static inline void _lthread_resume(struct lthread *lt)
318{
319 struct lthread_sched *sched = THIS_SCHED;
320 struct lthread_stack *s;
321 uint64_t state = lt->state;
322#if LTHREAD_DIAG
323 int init = 0;
324#endif
325
326 sched->current_lthread = lt;
327
328 if (state & (BIT(ST_LT_CANCELLED) | BIT(ST_LT_EXITED))) {
329
330 if (state & BIT(ST_LT_DETACH)) {
331 _lthread_free(lt);
332 sched->current_lthread = NULL;
333 return;
334 }
335 }
336
337 if (state & BIT(ST_LT_INIT)) {
338
339
340 lt->sched = THIS_SCHED;
341
342
343 s = _stack_alloc();
344
345 lt->stack_container = s;
346 _lthread_set_stack(lt, s->stack, s->stack_size);
347
348
349 _lthread_tls_alloc(lt);
350
351 lt->state = BIT(ST_LT_READY);
352#if LTHREAD_DIAG
353 init = 1;
354#endif
355 }
356
357 DIAG_EVENT(lt, LT_DIAG_LTHREAD_RESUMED, init, lt);
358
359
360 ctx_switch(<->ctx, &sched->ctx);
361
362
363
364
365
366
367 if (lt->pending_wr_queue != NULL) {
368 struct lthread_queue *dest = lt->pending_wr_queue;
369
370 lt->pending_wr_queue = NULL;
371
372
373 _lthread_queue_insert_mp(dest, lt);
374 }
375
376 sched->current_lthread = NULL;
377}
378
379
380
381
382void
383_sched_timer_cb(struct rte_timer *tim, void *arg)
384{
385 struct lthread *lt = (struct lthread *) arg;
386 uint64_t state = lt->state;
387
388 DIAG_EVENT(lt, LT_DIAG_LTHREAD_TMR_EXPIRED, <->tim, 0);
389
390 rte_timer_stop(tim);
391
392 if (lt->state & BIT(ST_LT_CANCELLED))
393 (THIS_SCHED)->nb_blocked_threads--;
394
395 lt->state = state | BIT(ST_LT_EXPIRED);
396 _lthread_resume(lt);
397 lt->state = state & CLEARBIT(ST_LT_EXPIRED);
398}
399
400
401
402
403
404
405static inline int _lthread_sched_isdone(struct lthread_sched *sched)
406{
407 return (sched->run_flag == 0) &&
408 (_lthread_queue_empty(sched->ready)) &&
409 (_lthread_queue_empty(sched->pready)) &&
410 (sched->nb_blocked_threads == 0);
411}
412
413
414
415
416static inline void _lthread_schedulers_sync_start(void)
417{
418 rte_atomic16_inc(&active_schedulers);
419
420
421
422
423
424
425 while (rte_atomic16_read(&active_schedulers) <
426 rte_atomic16_read(&num_schedulers))
427 sched_yield();
428
429}
430
431
432
433
434static inline void _lthread_schedulers_sync_stop(void)
435{
436 rte_atomic16_dec(&active_schedulers);
437 rte_atomic16_dec(&num_schedulers);
438
439
440
441
442
443
444 while (rte_atomic16_read(&active_schedulers) > 0)
445 sched_yield();
446
447}
448
449
450
451
452
453
454void lthread_run(void)
455{
456
457 struct lthread_sched *sched = THIS_SCHED;
458 struct lthread *lt = NULL;
459
460 RTE_LOG(INFO, LTHREAD,
461 "starting scheduler %p on lcore %u phys core %u\n",
462 sched, rte_lcore_id(),
463 rte_lcore_index(rte_lcore_id()));
464
465
466 _lthread_schedulers_sync_start();
467
468
469
470
471
472
473
474
475
476
477
478
479 while (!_lthread_sched_isdone(sched)) {
480
481 rte_timer_manage();
482
483 lt = _lthread_queue_poll(sched->ready);
484 if (lt != NULL)
485 _lthread_resume(lt);
486 lt = _lthread_queue_poll(sched->pready);
487 if (lt != NULL)
488 _lthread_resume(lt);
489 }
490
491
492
493 _lthread_schedulers_sync_stop();
494
495 (THIS_SCHED) = NULL;
496
497 RTE_LOG(INFO, LTHREAD,
498 "stopping scheduler %p on lcore %u phys core %u\n",
499 sched, rte_lcore_id(),
500 rte_lcore_index(rte_lcore_id()));
501 fflush(stdout);
502}
503
504
505
506
507
508struct lthread_sched *_lthread_sched_get(unsigned int lcore_id)
509{
510 struct lthread_sched *res = NULL;
511
512 if (lcore_id < LTHREAD_MAX_LCORES)
513 res = schedcore[lcore_id];
514
515 return res;
516}
517
518
519
520
521
522int lthread_set_affinity(unsigned lcoreid)
523{
524 struct lthread *lt = THIS_LTHREAD;
525 struct lthread_sched *dest_sched;
526
527 if (unlikely(lcoreid >= LTHREAD_MAX_LCORES))
528 return POSIX_ERRNO(EINVAL);
529
530 DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);
531
532 dest_sched = schedcore[lcoreid];
533
534 if (unlikely(dest_sched == NULL))
535 return POSIX_ERRNO(EINVAL);
536
537 if (likely(dest_sched != THIS_SCHED)) {
538 lt->sched = dest_sched;
539 lt->pending_wr_queue = dest_sched->pready;
540 _affinitize();
541 return 0;
542 }
543 return 0;
544}
545