1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26#include "qemu/osdep.h"
27#include "qapi/error.h"
28#include "block/aio.h"
29#include "block/thread-pool.h"
30#include "qemu/main-loop.h"
31#include "qemu/atomic.h"
32#include "qemu/rcu_queue.h"
33#include "block/raw-aio.h"
34#include "qemu/coroutine_int.h"
35#include "trace.h"
36
37
38
39
40
41enum {
42
43 BH_PENDING = (1 << 0),
44
45
46 BH_SCHEDULED = (1 << 1),
47
48
49 BH_DELETED = (1 << 2),
50
51
52 BH_ONESHOT = (1 << 3),
53
54
55 BH_IDLE = (1 << 4),
56};
57
58struct QEMUBH {
59 AioContext *ctx;
60 const char *name;
61 QEMUBHFunc *cb;
62 void *opaque;
63 QSLIST_ENTRY(QEMUBH) next;
64 unsigned flags;
65};
66
67
68static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags)
69{
70 AioContext *ctx = bh->ctx;
71 unsigned old_flags;
72
73
74
75
76
77
78
79
80 old_flags = qatomic_fetch_or(&bh->flags, BH_PENDING | new_flags);
81 if (!(old_flags & BH_PENDING)) {
82 QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next);
83 }
84
85 aio_notify(ctx);
86}
87
88
89static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags)
90{
91 QEMUBH *bh = QSLIST_FIRST_RCU(head);
92
93 if (!bh) {
94 return NULL;
95 }
96
97 QSLIST_REMOVE_HEAD(head, next);
98
99
100
101
102
103
104
105
106 *flags = qatomic_fetch_and(&bh->flags,
107 ~(BH_PENDING | BH_SCHEDULED | BH_IDLE));
108 return bh;
109}
110
111void aio_bh_schedule_oneshot_full(AioContext *ctx, QEMUBHFunc *cb,
112 void *opaque, const char *name)
113{
114 QEMUBH *bh;
115 bh = g_new(QEMUBH, 1);
116 *bh = (QEMUBH){
117 .ctx = ctx,
118 .cb = cb,
119 .opaque = opaque,
120 .name = name,
121 };
122 aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT);
123}
124
125QEMUBH *aio_bh_new_full(AioContext *ctx, QEMUBHFunc *cb, void *opaque,
126 const char *name)
127{
128 QEMUBH *bh;
129 bh = g_new(QEMUBH, 1);
130 *bh = (QEMUBH){
131 .ctx = ctx,
132 .cb = cb,
133 .opaque = opaque,
134 .name = name,
135 };
136 return bh;
137}
138
139void aio_bh_call(QEMUBH *bh)
140{
141 bh->cb(bh->opaque);
142}
143
144
145int aio_bh_poll(AioContext *ctx)
146{
147 BHListSlice slice;
148 BHListSlice *s;
149 int ret = 0;
150
151 QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list);
152 QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next);
153
154 while ((s = QSIMPLEQ_FIRST(&ctx->bh_slice_list))) {
155 QEMUBH *bh;
156 unsigned flags;
157
158 bh = aio_bh_dequeue(&s->bh_list, &flags);
159 if (!bh) {
160 QSIMPLEQ_REMOVE_HEAD(&ctx->bh_slice_list, next);
161 continue;
162 }
163
164 if ((flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
165
166 if (!(flags & BH_IDLE)) {
167 ret = 1;
168 }
169 aio_bh_call(bh);
170 }
171 if (flags & (BH_DELETED | BH_ONESHOT)) {
172 g_free(bh);
173 }
174 }
175
176 return ret;
177}
178
179void qemu_bh_schedule_idle(QEMUBH *bh)
180{
181 aio_bh_enqueue(bh, BH_SCHEDULED | BH_IDLE);
182}
183
184void qemu_bh_schedule(QEMUBH *bh)
185{
186 aio_bh_enqueue(bh, BH_SCHEDULED);
187}
188
189
190
191void qemu_bh_cancel(QEMUBH *bh)
192{
193 qatomic_and(&bh->flags, ~BH_SCHEDULED);
194}
195
196
197
198
199void qemu_bh_delete(QEMUBH *bh)
200{
201 aio_bh_enqueue(bh, BH_DELETED);
202}
203
204static int64_t aio_compute_bh_timeout(BHList *head, int timeout)
205{
206 QEMUBH *bh;
207
208 QSLIST_FOREACH_RCU(bh, head, next) {
209 if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
210 if (bh->flags & BH_IDLE) {
211
212
213 timeout = 10000000;
214 } else {
215
216
217 return 0;
218 }
219 }
220 }
221
222 return timeout;
223}
224
225int64_t
226aio_compute_timeout(AioContext *ctx)
227{
228 BHListSlice *s;
229 int64_t deadline;
230 int timeout = -1;
231
232 timeout = aio_compute_bh_timeout(&ctx->bh_list, timeout);
233 if (timeout == 0) {
234 return 0;
235 }
236
237 QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
238 timeout = aio_compute_bh_timeout(&s->bh_list, timeout);
239 if (timeout == 0) {
240 return 0;
241 }
242 }
243
244 deadline = timerlistgroup_deadline_ns(&ctx->tlg);
245 if (deadline == 0) {
246 return 0;
247 } else {
248 return qemu_soonest_timeout(timeout, deadline);
249 }
250}
251
252static gboolean
253aio_ctx_prepare(GSource *source, gint *timeout)
254{
255 AioContext *ctx = (AioContext *) source;
256
257 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) | 1);
258
259
260
261
262
263
264 smp_mb();
265
266
267 *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
268
269 if (aio_prepare(ctx)) {
270 *timeout = 0;
271 }
272
273 return *timeout == 0;
274}
275
276static gboolean
277aio_ctx_check(GSource *source)
278{
279 AioContext *ctx = (AioContext *) source;
280 QEMUBH *bh;
281 BHListSlice *s;
282
283
284 qatomic_store_release(&ctx->notify_me, qatomic_read(&ctx->notify_me) & ~1);
285 aio_notify_accept(ctx);
286
287 QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
288 if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
289 return true;
290 }
291 }
292
293 QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
294 QSLIST_FOREACH_RCU(bh, &s->bh_list, next) {
295 if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
296 return true;
297 }
298 }
299 }
300 return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
301}
302
303static gboolean
304aio_ctx_dispatch(GSource *source,
305 GSourceFunc callback,
306 gpointer user_data)
307{
308 AioContext *ctx = (AioContext *) source;
309
310 assert(callback == NULL);
311 aio_dispatch(ctx);
312 return true;
313}
314
315static void
316aio_ctx_finalize(GSource *source)
317{
318 AioContext *ctx = (AioContext *) source;
319 QEMUBH *bh;
320 unsigned flags;
321
322 thread_pool_free(ctx->thread_pool);
323
324#ifdef CONFIG_LINUX_AIO
325 if (ctx->linux_aio) {
326 laio_detach_aio_context(ctx->linux_aio, ctx);
327 laio_cleanup(ctx->linux_aio);
328 ctx->linux_aio = NULL;
329 }
330#endif
331
332#ifdef CONFIG_LINUX_IO_URING
333 if (ctx->linux_io_uring) {
334 luring_detach_aio_context(ctx->linux_io_uring, ctx);
335 luring_cleanup(ctx->linux_io_uring);
336 ctx->linux_io_uring = NULL;
337 }
338#endif
339
340 assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
341 qemu_bh_delete(ctx->co_schedule_bh);
342
343
344 assert(QSIMPLEQ_EMPTY(&ctx->bh_slice_list));
345
346 while ((bh = aio_bh_dequeue(&ctx->bh_list, &flags))) {
347
348
349
350
351
352
353
354
355
356 if (unlikely(!(flags & BH_DELETED))) {
357 fprintf(stderr, "%s: BH '%s' leaked, aborting...\n",
358 __func__, bh->name);
359 abort();
360 }
361
362 g_free(bh);
363 }
364
365 aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
366 event_notifier_cleanup(&ctx->notifier);
367 qemu_rec_mutex_destroy(&ctx->lock);
368 qemu_lockcnt_destroy(&ctx->list_lock);
369 timerlistgroup_deinit(&ctx->tlg);
370 aio_context_destroy(ctx);
371}
372
373static GSourceFuncs aio_source_funcs = {
374 aio_ctx_prepare,
375 aio_ctx_check,
376 aio_ctx_dispatch,
377 aio_ctx_finalize
378};
379
380GSource *aio_get_g_source(AioContext *ctx)
381{
382 aio_context_use_g_source(ctx);
383 g_source_ref(&ctx->source);
384 return &ctx->source;
385}
386
387ThreadPool *aio_get_thread_pool(AioContext *ctx)
388{
389 if (!ctx->thread_pool) {
390 ctx->thread_pool = thread_pool_new(ctx);
391 }
392 return ctx->thread_pool;
393}
394
395#ifdef CONFIG_LINUX_AIO
396LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp)
397{
398 if (!ctx->linux_aio) {
399 ctx->linux_aio = laio_init(errp);
400 if (ctx->linux_aio) {
401 laio_attach_aio_context(ctx->linux_aio, ctx);
402 }
403 }
404 return ctx->linux_aio;
405}
406
407LinuxAioState *aio_get_linux_aio(AioContext *ctx)
408{
409 assert(ctx->linux_aio);
410 return ctx->linux_aio;
411}
412#endif
413
414#ifdef CONFIG_LINUX_IO_URING
415LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp)
416{
417 if (ctx->linux_io_uring) {
418 return ctx->linux_io_uring;
419 }
420
421 ctx->linux_io_uring = luring_init(errp);
422 if (!ctx->linux_io_uring) {
423 return NULL;
424 }
425
426 luring_attach_aio_context(ctx->linux_io_uring, ctx);
427 return ctx->linux_io_uring;
428}
429
430LuringState *aio_get_linux_io_uring(AioContext *ctx)
431{
432 assert(ctx->linux_io_uring);
433 return ctx->linux_io_uring;
434}
435#endif
436
437void aio_notify(AioContext *ctx)
438{
439
440
441
442
443 smp_wmb();
444 qatomic_set(&ctx->notified, true);
445
446
447
448
449
450 smp_mb();
451 if (qatomic_read(&ctx->notify_me)) {
452 event_notifier_set(&ctx->notifier);
453 }
454}
455
456void aio_notify_accept(AioContext *ctx)
457{
458 qatomic_set(&ctx->notified, false);
459
460
461
462
463
464 smp_mb();
465}
466
467static void aio_timerlist_notify(void *opaque, QEMUClockType type)
468{
469 aio_notify(opaque);
470}
471
472static void aio_context_notifier_cb(EventNotifier *e)
473{
474 AioContext *ctx = container_of(e, AioContext, notifier);
475
476 event_notifier_test_and_clear(&ctx->notifier);
477}
478
479
480static bool aio_context_notifier_poll(void *opaque)
481{
482 EventNotifier *e = opaque;
483 AioContext *ctx = container_of(e, AioContext, notifier);
484
485 return qatomic_read(&ctx->notified);
486}
487
488static void co_schedule_bh_cb(void *opaque)
489{
490 AioContext *ctx = opaque;
491 QSLIST_HEAD(, Coroutine) straight, reversed;
492
493 QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
494 QSLIST_INIT(&straight);
495
496 while (!QSLIST_EMPTY(&reversed)) {
497 Coroutine *co = QSLIST_FIRST(&reversed);
498 QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
499 QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
500 }
501
502 while (!QSLIST_EMPTY(&straight)) {
503 Coroutine *co = QSLIST_FIRST(&straight);
504 QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
505 trace_aio_co_schedule_bh_cb(ctx, co);
506 aio_context_acquire(ctx);
507
508
509 qatomic_set(&co->scheduled, NULL);
510 qemu_aio_coroutine_enter(ctx, co);
511 aio_context_release(ctx);
512 }
513}
514
515AioContext *aio_context_new(Error **errp)
516{
517 int ret;
518 AioContext *ctx;
519
520 ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
521 QSLIST_INIT(&ctx->bh_list);
522 QSIMPLEQ_INIT(&ctx->bh_slice_list);
523 aio_context_setup(ctx);
524
525 ret = event_notifier_init(&ctx->notifier, false);
526 if (ret < 0) {
527 error_setg_errno(errp, -ret, "Failed to initialize event notifier");
528 goto fail;
529 }
530 g_source_set_can_recurse(&ctx->source, true);
531 qemu_lockcnt_init(&ctx->list_lock);
532
533 ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
534 QSLIST_INIT(&ctx->scheduled_coroutines);
535
536 aio_set_event_notifier(ctx, &ctx->notifier,
537 false,
538 aio_context_notifier_cb,
539 aio_context_notifier_poll);
540#ifdef CONFIG_LINUX_AIO
541 ctx->linux_aio = NULL;
542#endif
543
544#ifdef CONFIG_LINUX_IO_URING
545 ctx->linux_io_uring = NULL;
546#endif
547
548 ctx->thread_pool = NULL;
549 qemu_rec_mutex_init(&ctx->lock);
550 timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
551
552 ctx->poll_ns = 0;
553 ctx->poll_max_ns = 0;
554 ctx->poll_grow = 0;
555 ctx->poll_shrink = 0;
556
557 ctx->aio_max_batch = 0;
558
559 return ctx;
560fail:
561 g_source_destroy(&ctx->source);
562 return NULL;
563}
564
565void aio_co_schedule(AioContext *ctx, Coroutine *co)
566{
567 trace_aio_co_schedule(ctx, co);
568 const char *scheduled = qatomic_cmpxchg(&co->scheduled, NULL,
569 __func__);
570
571 if (scheduled) {
572 fprintf(stderr,
573 "%s: Co-routine was already scheduled in '%s'\n",
574 __func__, scheduled);
575 abort();
576 }
577
578
579
580
581
582 aio_context_ref(ctx);
583
584 QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
585 co, co_scheduled_next);
586 qemu_bh_schedule(ctx->co_schedule_bh);
587
588 aio_context_unref(ctx);
589}
590
591typedef struct AioCoRescheduleSelf {
592 Coroutine *co;
593 AioContext *new_ctx;
594} AioCoRescheduleSelf;
595
596static void aio_co_reschedule_self_bh(void *opaque)
597{
598 AioCoRescheduleSelf *data = opaque;
599 aio_co_schedule(data->new_ctx, data->co);
600}
601
602void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx)
603{
604 AioContext *old_ctx = qemu_get_current_aio_context();
605
606 if (old_ctx != new_ctx) {
607 AioCoRescheduleSelf data = {
608 .co = qemu_coroutine_self(),
609 .new_ctx = new_ctx,
610 };
611
612
613
614
615
616 aio_bh_schedule_oneshot(old_ctx, aio_co_reschedule_self_bh, &data);
617 qemu_coroutine_yield();
618 }
619}
620
621void aio_co_wake(struct Coroutine *co)
622{
623 AioContext *ctx;
624
625
626
627
628 smp_read_barrier_depends();
629 ctx = qatomic_read(&co->ctx);
630
631 aio_co_enter(ctx, co);
632}
633
634void aio_co_enter(AioContext *ctx, struct Coroutine *co)
635{
636 if (ctx != qemu_get_current_aio_context()) {
637 aio_co_schedule(ctx, co);
638 return;
639 }
640
641 if (qemu_in_coroutine()) {
642 Coroutine *self = qemu_coroutine_self();
643 assert(self != co);
644 QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
645 } else {
646 aio_context_acquire(ctx);
647 qemu_aio_coroutine_enter(ctx, co);
648 aio_context_release(ctx);
649 }
650}
651
652void aio_context_ref(AioContext *ctx)
653{
654 g_source_ref(&ctx->source);
655}
656
657void aio_context_unref(AioContext *ctx)
658{
659 g_source_unref(&ctx->source);
660}
661
662void aio_context_acquire(AioContext *ctx)
663{
664 qemu_rec_mutex_lock(&ctx->lock);
665}
666
667void aio_context_release(AioContext *ctx)
668{
669 qemu_rec_mutex_unlock(&ctx->lock);
670}
671
672static __thread AioContext *my_aiocontext;
673
674AioContext *qemu_get_current_aio_context(void)
675{
676 if (my_aiocontext) {
677 return my_aiocontext;
678 }
679 if (qemu_mutex_iothread_locked()) {
680
681 return qemu_get_aio_context();
682 }
683 return NULL;
684}
685
686void qemu_set_current_aio_context(AioContext *ctx)
687{
688 assert(!my_aiocontext);
689 my_aiocontext = ctx;
690}
691