1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26#include "qemu/osdep.h"
27#include "qapi/error.h"
28#include "block/aio.h"
29#include "block/thread-pool.h"
30#include "qemu/main-loop.h"
31#include "qemu/atomic.h"
32#include "qemu/rcu_queue.h"
33#include "block/raw-aio.h"
34#include "qemu/coroutine_int.h"
35#include "qemu/coroutine-tls.h"
36#include "sysemu/cpu-timers.h"
37#include "trace.h"
38
39
40
41
42
43enum {
44
45 BH_PENDING = (1 << 0),
46
47
48 BH_SCHEDULED = (1 << 1),
49
50
51 BH_DELETED = (1 << 2),
52
53
54 BH_ONESHOT = (1 << 3),
55
56
57 BH_IDLE = (1 << 4),
58};
59
60struct QEMUBH {
61 AioContext *ctx;
62 const char *name;
63 QEMUBHFunc *cb;
64 void *opaque;
65 QSLIST_ENTRY(QEMUBH) next;
66 unsigned flags;
67};
68
69
70static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags)
71{
72 AioContext *ctx = bh->ctx;
73 unsigned old_flags;
74
75
76
77
78
79
80
81
82 old_flags = qatomic_fetch_or(&bh->flags, BH_PENDING | new_flags);
83 if (!(old_flags & BH_PENDING)) {
84 QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next);
85 }
86
87 aio_notify(ctx);
88
89
90
91
92
93
94 icount_notify_exit();
95}
96
97
98static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags)
99{
100 QEMUBH *bh = QSLIST_FIRST_RCU(head);
101
102 if (!bh) {
103 return NULL;
104 }
105
106 QSLIST_REMOVE_HEAD(head, next);
107
108
109
110
111
112
113
114
115 *flags = qatomic_fetch_and(&bh->flags,
116 ~(BH_PENDING | BH_SCHEDULED | BH_IDLE));
117 return bh;
118}
119
120void aio_bh_schedule_oneshot_full(AioContext *ctx, QEMUBHFunc *cb,
121 void *opaque, const char *name)
122{
123 QEMUBH *bh;
124 bh = g_new(QEMUBH, 1);
125 *bh = (QEMUBH){
126 .ctx = ctx,
127 .cb = cb,
128 .opaque = opaque,
129 .name = name,
130 };
131 aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT);
132}
133
134QEMUBH *aio_bh_new_full(AioContext *ctx, QEMUBHFunc *cb, void *opaque,
135 const char *name)
136{
137 QEMUBH *bh;
138 bh = g_new(QEMUBH, 1);
139 *bh = (QEMUBH){
140 .ctx = ctx,
141 .cb = cb,
142 .opaque = opaque,
143 .name = name,
144 };
145 return bh;
146}
147
148void aio_bh_call(QEMUBH *bh)
149{
150 bh->cb(bh->opaque);
151}
152
153
154int aio_bh_poll(AioContext *ctx)
155{
156 BHListSlice slice;
157 BHListSlice *s;
158 int ret = 0;
159
160 QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list);
161
162
163
164
165
166
167#if !defined(__clang__)
168#pragma GCC diagnostic push
169#pragma GCC diagnostic ignored "-Wpragmas"
170#pragma GCC diagnostic ignored "-Wdangling-pointer="
171#endif
172 QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next);
173#if !defined(__clang__)
174#pragma GCC diagnostic pop
175#endif
176
177 while ((s = QSIMPLEQ_FIRST(&ctx->bh_slice_list))) {
178 QEMUBH *bh;
179 unsigned flags;
180
181 bh = aio_bh_dequeue(&s->bh_list, &flags);
182 if (!bh) {
183 QSIMPLEQ_REMOVE_HEAD(&ctx->bh_slice_list, next);
184 continue;
185 }
186
187 if ((flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
188
189 if (!(flags & BH_IDLE)) {
190 ret = 1;
191 }
192 aio_bh_call(bh);
193 }
194 if (flags & (BH_DELETED | BH_ONESHOT)) {
195 g_free(bh);
196 }
197 }
198
199 return ret;
200}
201
202void qemu_bh_schedule_idle(QEMUBH *bh)
203{
204 aio_bh_enqueue(bh, BH_SCHEDULED | BH_IDLE);
205}
206
207void qemu_bh_schedule(QEMUBH *bh)
208{
209 aio_bh_enqueue(bh, BH_SCHEDULED);
210}
211
212
213
214void qemu_bh_cancel(QEMUBH *bh)
215{
216 qatomic_and(&bh->flags, ~BH_SCHEDULED);
217}
218
219
220
221
222void qemu_bh_delete(QEMUBH *bh)
223{
224 aio_bh_enqueue(bh, BH_DELETED);
225}
226
227static int64_t aio_compute_bh_timeout(BHList *head, int timeout)
228{
229 QEMUBH *bh;
230
231 QSLIST_FOREACH_RCU(bh, head, next) {
232 if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
233 if (bh->flags & BH_IDLE) {
234
235
236 timeout = 10000000;
237 } else {
238
239
240 return 0;
241 }
242 }
243 }
244
245 return timeout;
246}
247
248int64_t
249aio_compute_timeout(AioContext *ctx)
250{
251 BHListSlice *s;
252 int64_t deadline;
253 int timeout = -1;
254
255 timeout = aio_compute_bh_timeout(&ctx->bh_list, timeout);
256 if (timeout == 0) {
257 return 0;
258 }
259
260 QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
261 timeout = aio_compute_bh_timeout(&s->bh_list, timeout);
262 if (timeout == 0) {
263 return 0;
264 }
265 }
266
267 deadline = timerlistgroup_deadline_ns(&ctx->tlg);
268 if (deadline == 0) {
269 return 0;
270 } else {
271 return qemu_soonest_timeout(timeout, deadline);
272 }
273}
274
275static gboolean
276aio_ctx_prepare(GSource *source, gint *timeout)
277{
278 AioContext *ctx = (AioContext *) source;
279
280 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) | 1);
281
282
283
284
285
286
287 smp_mb();
288
289
290 *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
291
292 if (aio_prepare(ctx)) {
293 *timeout = 0;
294 }
295
296 return *timeout == 0;
297}
298
299static gboolean
300aio_ctx_check(GSource *source)
301{
302 AioContext *ctx = (AioContext *) source;
303 QEMUBH *bh;
304 BHListSlice *s;
305
306
307 qatomic_store_release(&ctx->notify_me, qatomic_read(&ctx->notify_me) & ~1);
308 aio_notify_accept(ctx);
309
310 QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
311 if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
312 return true;
313 }
314 }
315
316 QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
317 QSLIST_FOREACH_RCU(bh, &s->bh_list, next) {
318 if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
319 return true;
320 }
321 }
322 }
323 return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
324}
325
326static gboolean
327aio_ctx_dispatch(GSource *source,
328 GSourceFunc callback,
329 gpointer user_data)
330{
331 AioContext *ctx = (AioContext *) source;
332
333 assert(callback == NULL);
334 aio_dispatch(ctx);
335 return true;
336}
337
338static void
339aio_ctx_finalize(GSource *source)
340{
341 AioContext *ctx = (AioContext *) source;
342 QEMUBH *bh;
343 unsigned flags;
344
345 thread_pool_free(ctx->thread_pool);
346
347#ifdef CONFIG_LINUX_AIO
348 if (ctx->linux_aio) {
349 laio_detach_aio_context(ctx->linux_aio, ctx);
350 laio_cleanup(ctx->linux_aio);
351 ctx->linux_aio = NULL;
352 }
353#endif
354
355#ifdef CONFIG_LINUX_IO_URING
356 if (ctx->linux_io_uring) {
357 luring_detach_aio_context(ctx->linux_io_uring, ctx);
358 luring_cleanup(ctx->linux_io_uring);
359 ctx->linux_io_uring = NULL;
360 }
361#endif
362
363 assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
364 qemu_bh_delete(ctx->co_schedule_bh);
365
366
367 assert(QSIMPLEQ_EMPTY(&ctx->bh_slice_list));
368
369 while ((bh = aio_bh_dequeue(&ctx->bh_list, &flags))) {
370
371
372
373
374
375
376
377
378
379 if (unlikely(!(flags & BH_DELETED))) {
380 fprintf(stderr, "%s: BH '%s' leaked, aborting...\n",
381 __func__, bh->name);
382 abort();
383 }
384
385 g_free(bh);
386 }
387
388 aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL, NULL);
389 event_notifier_cleanup(&ctx->notifier);
390 qemu_rec_mutex_destroy(&ctx->lock);
391 qemu_lockcnt_destroy(&ctx->list_lock);
392 timerlistgroup_deinit(&ctx->tlg);
393 aio_context_destroy(ctx);
394}
395
396static GSourceFuncs aio_source_funcs = {
397 aio_ctx_prepare,
398 aio_ctx_check,
399 aio_ctx_dispatch,
400 aio_ctx_finalize
401};
402
403GSource *aio_get_g_source(AioContext *ctx)
404{
405 aio_context_use_g_source(ctx);
406 g_source_ref(&ctx->source);
407 return &ctx->source;
408}
409
410ThreadPool *aio_get_thread_pool(AioContext *ctx)
411{
412 if (!ctx->thread_pool) {
413 ctx->thread_pool = thread_pool_new(ctx);
414 }
415 return ctx->thread_pool;
416}
417
418#ifdef CONFIG_LINUX_AIO
419LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp)
420{
421 if (!ctx->linux_aio) {
422 ctx->linux_aio = laio_init(errp);
423 if (ctx->linux_aio) {
424 laio_attach_aio_context(ctx->linux_aio, ctx);
425 }
426 }
427 return ctx->linux_aio;
428}
429
430LinuxAioState *aio_get_linux_aio(AioContext *ctx)
431{
432 assert(ctx->linux_aio);
433 return ctx->linux_aio;
434}
435#endif
436
437#ifdef CONFIG_LINUX_IO_URING
438LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp)
439{
440 if (ctx->linux_io_uring) {
441 return ctx->linux_io_uring;
442 }
443
444 ctx->linux_io_uring = luring_init(errp);
445 if (!ctx->linux_io_uring) {
446 return NULL;
447 }
448
449 luring_attach_aio_context(ctx->linux_io_uring, ctx);
450 return ctx->linux_io_uring;
451}
452
453LuringState *aio_get_linux_io_uring(AioContext *ctx)
454{
455 assert(ctx->linux_io_uring);
456 return ctx->linux_io_uring;
457}
458#endif
459
460void aio_notify(AioContext *ctx)
461{
462
463
464
465
466 smp_wmb();
467 qatomic_set(&ctx->notified, true);
468
469
470
471
472
473 smp_mb();
474 if (qatomic_read(&ctx->notify_me)) {
475 event_notifier_set(&ctx->notifier);
476 }
477}
478
479void aio_notify_accept(AioContext *ctx)
480{
481 qatomic_set(&ctx->notified, false);
482
483
484
485
486
487 smp_mb();
488}
489
490static void aio_timerlist_notify(void *opaque, QEMUClockType type)
491{
492 aio_notify(opaque);
493}
494
495static void aio_context_notifier_cb(EventNotifier *e)
496{
497 AioContext *ctx = container_of(e, AioContext, notifier);
498
499 event_notifier_test_and_clear(&ctx->notifier);
500}
501
502
503static bool aio_context_notifier_poll(void *opaque)
504{
505 EventNotifier *e = opaque;
506 AioContext *ctx = container_of(e, AioContext, notifier);
507
508 return qatomic_read(&ctx->notified);
509}
510
511static void aio_context_notifier_poll_ready(EventNotifier *e)
512{
513
514}
515
516static void co_schedule_bh_cb(void *opaque)
517{
518 AioContext *ctx = opaque;
519 QSLIST_HEAD(, Coroutine) straight, reversed;
520
521 QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
522 QSLIST_INIT(&straight);
523
524 while (!QSLIST_EMPTY(&reversed)) {
525 Coroutine *co = QSLIST_FIRST(&reversed);
526 QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
527 QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
528 }
529
530 while (!QSLIST_EMPTY(&straight)) {
531 Coroutine *co = QSLIST_FIRST(&straight);
532 QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
533 trace_aio_co_schedule_bh_cb(ctx, co);
534 aio_context_acquire(ctx);
535
536
537 qatomic_set(&co->scheduled, NULL);
538 qemu_aio_coroutine_enter(ctx, co);
539 aio_context_release(ctx);
540 }
541}
542
543AioContext *aio_context_new(Error **errp)
544{
545 int ret;
546 AioContext *ctx;
547
548 ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
549 QSLIST_INIT(&ctx->bh_list);
550 QSIMPLEQ_INIT(&ctx->bh_slice_list);
551 aio_context_setup(ctx);
552
553 ret = event_notifier_init(&ctx->notifier, false);
554 if (ret < 0) {
555 error_setg_errno(errp, -ret, "Failed to initialize event notifier");
556 goto fail;
557 }
558 g_source_set_can_recurse(&ctx->source, true);
559 qemu_lockcnt_init(&ctx->list_lock);
560
561 ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
562 QSLIST_INIT(&ctx->scheduled_coroutines);
563
564 aio_set_event_notifier(ctx, &ctx->notifier,
565 false,
566 aio_context_notifier_cb,
567 aio_context_notifier_poll,
568 aio_context_notifier_poll_ready);
569#ifdef CONFIG_LINUX_AIO
570 ctx->linux_aio = NULL;
571#endif
572
573#ifdef CONFIG_LINUX_IO_URING
574 ctx->linux_io_uring = NULL;
575#endif
576
577 ctx->thread_pool = NULL;
578 qemu_rec_mutex_init(&ctx->lock);
579 timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
580
581 ctx->poll_ns = 0;
582 ctx->poll_max_ns = 0;
583 ctx->poll_grow = 0;
584 ctx->poll_shrink = 0;
585
586 ctx->aio_max_batch = 0;
587
588 ctx->thread_pool_min = 0;
589 ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
590
591 return ctx;
592fail:
593 g_source_destroy(&ctx->source);
594 return NULL;
595}
596
597void aio_co_schedule(AioContext *ctx, Coroutine *co)
598{
599 trace_aio_co_schedule(ctx, co);
600 const char *scheduled = qatomic_cmpxchg(&co->scheduled, NULL,
601 __func__);
602
603 if (scheduled) {
604 fprintf(stderr,
605 "%s: Co-routine was already scheduled in '%s'\n",
606 __func__, scheduled);
607 abort();
608 }
609
610
611
612
613
614 aio_context_ref(ctx);
615
616 QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
617 co, co_scheduled_next);
618 qemu_bh_schedule(ctx->co_schedule_bh);
619
620 aio_context_unref(ctx);
621}
622
623typedef struct AioCoRescheduleSelf {
624 Coroutine *co;
625 AioContext *new_ctx;
626} AioCoRescheduleSelf;
627
628static void aio_co_reschedule_self_bh(void *opaque)
629{
630 AioCoRescheduleSelf *data = opaque;
631 aio_co_schedule(data->new_ctx, data->co);
632}
633
634void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx)
635{
636 AioContext *old_ctx = qemu_get_current_aio_context();
637
638 if (old_ctx != new_ctx) {
639 AioCoRescheduleSelf data = {
640 .co = qemu_coroutine_self(),
641 .new_ctx = new_ctx,
642 };
643
644
645
646
647
648 aio_bh_schedule_oneshot(old_ctx, aio_co_reschedule_self_bh, &data);
649 qemu_coroutine_yield();
650 }
651}
652
653void aio_co_wake(struct Coroutine *co)
654{
655 AioContext *ctx;
656
657
658
659
660 smp_read_barrier_depends();
661 ctx = qatomic_read(&co->ctx);
662
663 aio_co_enter(ctx, co);
664}
665
666void aio_co_enter(AioContext *ctx, struct Coroutine *co)
667{
668 if (ctx != qemu_get_current_aio_context()) {
669 aio_co_schedule(ctx, co);
670 return;
671 }
672
673 if (qemu_in_coroutine()) {
674 Coroutine *self = qemu_coroutine_self();
675 assert(self != co);
676 QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
677 } else {
678 aio_context_acquire(ctx);
679 qemu_aio_coroutine_enter(ctx, co);
680 aio_context_release(ctx);
681 }
682}
683
684void aio_context_ref(AioContext *ctx)
685{
686 g_source_ref(&ctx->source);
687}
688
689void aio_context_unref(AioContext *ctx)
690{
691 g_source_unref(&ctx->source);
692}
693
694void aio_context_acquire(AioContext *ctx)
695{
696 qemu_rec_mutex_lock(&ctx->lock);
697}
698
699void aio_context_release(AioContext *ctx)
700{
701 qemu_rec_mutex_unlock(&ctx->lock);
702}
703
704QEMU_DEFINE_STATIC_CO_TLS(AioContext *, my_aiocontext)
705
706AioContext *qemu_get_current_aio_context(void)
707{
708 AioContext *ctx = get_my_aiocontext();
709 if (ctx) {
710 return ctx;
711 }
712 if (qemu_mutex_iothread_locked()) {
713
714 return qemu_get_aio_context();
715 }
716 return NULL;
717}
718
719void qemu_set_current_aio_context(AioContext *ctx)
720{
721 assert(!get_my_aiocontext());
722 set_my_aiocontext(ctx);
723}
724
725void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
726 int64_t max, Error **errp)
727{
728
729 if (min > max || !max || min > INT_MAX || max > INT_MAX) {
730 error_setg(errp, "bad thread-pool-min/thread-pool-max values");
731 return;
732 }
733
734 ctx->thread_pool_min = min;
735 ctx->thread_pool_max = max;
736
737 if (ctx->thread_pool) {
738 thread_pool_update_params(ctx->thread_pool, ctx);
739 }
740}
741