1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16#include "qemu/osdep.h"
17#include "block/block.h"
18#include "qemu/main-loop.h"
19#include "qemu/rcu.h"
20#include "qemu/rcu_queue.h"
21#include "qemu/sockets.h"
22#include "qemu/cutils.h"
23#include "trace.h"
24#include "aio-posix.h"
25
26
27#define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
28
29bool aio_poll_disabled(AioContext *ctx)
30{
31 return qatomic_read(&ctx->poll_disable_cnt);
32}
33
34void aio_add_ready_handler(AioHandlerList *ready_list,
35 AioHandler *node,
36 int revents)
37{
38 QLIST_SAFE_REMOVE(node, node_ready);
39 node->pfd.revents = revents;
40 QLIST_INSERT_HEAD(ready_list, node, node_ready);
41}
42
43static void aio_add_poll_ready_handler(AioHandlerList *ready_list,
44 AioHandler *node)
45{
46 QLIST_SAFE_REMOVE(node, node_ready);
47 node->poll_ready = true;
48 QLIST_INSERT_HEAD(ready_list, node, node_ready);
49}
50
51static AioHandler *find_aio_handler(AioContext *ctx, int fd)
52{
53 AioHandler *node;
54
55 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
56 if (node->pfd.fd == fd) {
57 if (!QLIST_IS_INSERTED(node, node_deleted)) {
58 return node;
59 }
60 }
61 }
62
63 return NULL;
64}
65
66static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
67{
68
69
70
71
72
73 if (!g_source_is_destroyed(&ctx->source)) {
74 g_source_remove_poll(&ctx->source, &node->pfd);
75 }
76
77 node->pfd.revents = 0;
78 node->poll_ready = false;
79
80
81 if (QLIST_IS_INSERTED(node, node_deleted)) {
82 return false;
83 }
84
85
86 if (qemu_lockcnt_count(&ctx->list_lock)) {
87 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
88 return false;
89 }
90
91
92
93
94 QLIST_SAFE_REMOVE(node, node_poll);
95 QLIST_REMOVE(node, node);
96 return true;
97}
98
99void aio_set_fd_handler(AioContext *ctx,
100 int fd,
101 bool is_external,
102 IOHandler *io_read,
103 IOHandler *io_write,
104 AioPollFn *io_poll,
105 IOHandler *io_poll_ready,
106 void *opaque)
107{
108 AioHandler *node;
109 AioHandler *new_node = NULL;
110 bool is_new = false;
111 bool deleted = false;
112 int poll_disable_change;
113
114 if (io_poll && !io_poll_ready) {
115 io_poll = NULL;
116 }
117
118 qemu_lockcnt_lock(&ctx->list_lock);
119
120 node = find_aio_handler(ctx, fd);
121
122
123 if (!io_read && !io_write && !io_poll) {
124 if (node == NULL) {
125 qemu_lockcnt_unlock(&ctx->list_lock);
126 return;
127 }
128
129 node->pfd.events = 0;
130
131 poll_disable_change = -!node->io_poll;
132 } else {
133 poll_disable_change = !io_poll - (node && !node->io_poll);
134 if (node == NULL) {
135 is_new = true;
136 }
137
138 new_node = g_new0(AioHandler, 1);
139
140
141 new_node->io_read = io_read;
142 new_node->io_write = io_write;
143 new_node->io_poll = io_poll;
144 new_node->io_poll_ready = io_poll_ready;
145 new_node->opaque = opaque;
146 new_node->is_external = is_external;
147
148 if (is_new) {
149 new_node->pfd.fd = fd;
150 } else {
151 new_node->pfd = node->pfd;
152 }
153 g_source_add_poll(&ctx->source, &new_node->pfd);
154
155 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
156 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
157
158 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
159 }
160
161
162
163
164
165
166
167 qatomic_set(&ctx->poll_disable_cnt,
168 qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
169
170 ctx->fdmon_ops->update(ctx, node, new_node);
171 if (node) {
172 deleted = aio_remove_fd_handler(ctx, node);
173 }
174 qemu_lockcnt_unlock(&ctx->list_lock);
175 aio_notify(ctx);
176
177 if (deleted) {
178 g_free(node);
179 }
180}
181
182void aio_set_fd_poll(AioContext *ctx, int fd,
183 IOHandler *io_poll_begin,
184 IOHandler *io_poll_end)
185{
186 AioHandler *node = find_aio_handler(ctx, fd);
187
188 if (!node) {
189 return;
190 }
191
192 node->io_poll_begin = io_poll_begin;
193 node->io_poll_end = io_poll_end;
194}
195
196void aio_set_event_notifier(AioContext *ctx,
197 EventNotifier *notifier,
198 bool is_external,
199 EventNotifierHandler *io_read,
200 AioPollFn *io_poll,
201 EventNotifierHandler *io_poll_ready)
202{
203 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
204 (IOHandler *)io_read, NULL, io_poll,
205 (IOHandler *)io_poll_ready, notifier);
206}
207
208void aio_set_event_notifier_poll(AioContext *ctx,
209 EventNotifier *notifier,
210 EventNotifierHandler *io_poll_begin,
211 EventNotifierHandler *io_poll_end)
212{
213 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
214 (IOHandler *)io_poll_begin,
215 (IOHandler *)io_poll_end);
216}
217
218static bool poll_set_started(AioContext *ctx, AioHandlerList *ready_list,
219 bool started)
220{
221 AioHandler *node;
222 bool progress = false;
223
224 if (started == ctx->poll_started) {
225 return false;
226 }
227
228 ctx->poll_started = started;
229
230 qemu_lockcnt_inc(&ctx->list_lock);
231 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
232 IOHandler *fn;
233
234 if (QLIST_IS_INSERTED(node, node_deleted)) {
235 continue;
236 }
237
238 if (started) {
239 fn = node->io_poll_begin;
240 } else {
241 fn = node->io_poll_end;
242 }
243
244 if (fn) {
245 fn(node->opaque);
246 }
247
248
249 if (!started && node->io_poll(node->opaque)) {
250 aio_add_poll_ready_handler(ready_list, node);
251 progress = true;
252 }
253 }
254 qemu_lockcnt_dec(&ctx->list_lock);
255
256 return progress;
257}
258
259
260bool aio_prepare(AioContext *ctx)
261{
262 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
263
264
265 poll_set_started(ctx, &ready_list, false);
266
267
268 return false;
269}
270
271bool aio_pending(AioContext *ctx)
272{
273 AioHandler *node;
274 bool result = false;
275
276
277
278
279
280 qemu_lockcnt_inc(&ctx->list_lock);
281
282 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
283 int revents;
284
285
286 revents = node->pfd.revents & node->pfd.events;
287 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
288 aio_node_check(ctx, node->is_external)) {
289 result = true;
290 break;
291 }
292 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
293 aio_node_check(ctx, node->is_external)) {
294 result = true;
295 break;
296 }
297 }
298 qemu_lockcnt_dec(&ctx->list_lock);
299
300 return result;
301}
302
303static void aio_free_deleted_handlers(AioContext *ctx)
304{
305 AioHandler *node;
306
307 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
308 return;
309 }
310 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
311 return;
312 }
313
314 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
315 QLIST_REMOVE(node, node);
316 QLIST_REMOVE(node, node_deleted);
317 QLIST_SAFE_REMOVE(node, node_poll);
318 g_free(node);
319 }
320
321 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
322}
323
324static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
325{
326 bool progress = false;
327 bool poll_ready;
328 int revents;
329
330 revents = node->pfd.revents & node->pfd.events;
331 node->pfd.revents = 0;
332
333 poll_ready = node->poll_ready;
334 node->poll_ready = false;
335
336
337
338
339
340
341
342 if (!QLIST_IS_INSERTED(node, node_deleted) &&
343 !QLIST_IS_INSERTED(node, node_poll) &&
344 node->io_poll) {
345 trace_poll_add(ctx, node, node->pfd.fd, revents);
346 if (ctx->poll_started && node->io_poll_begin) {
347 node->io_poll_begin(node->opaque);
348 }
349 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
350 }
351 if (!QLIST_IS_INSERTED(node, node_deleted) &&
352 poll_ready && revents == 0 &&
353 aio_node_check(ctx, node->is_external) &&
354 node->io_poll_ready) {
355 node->io_poll_ready(node->opaque);
356
357
358
359
360
361 return node->opaque != &ctx->notifier;
362 }
363
364 if (!QLIST_IS_INSERTED(node, node_deleted) &&
365 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
366 aio_node_check(ctx, node->is_external) &&
367 node->io_read) {
368 node->io_read(node->opaque);
369
370
371 if (node->opaque != &ctx->notifier) {
372 progress = true;
373 }
374 }
375 if (!QLIST_IS_INSERTED(node, node_deleted) &&
376 (revents & (G_IO_OUT | G_IO_ERR)) &&
377 aio_node_check(ctx, node->is_external) &&
378 node->io_write) {
379 node->io_write(node->opaque);
380 progress = true;
381 }
382
383 return progress;
384}
385
386
387
388
389
390static bool aio_dispatch_ready_handlers(AioContext *ctx,
391 AioHandlerList *ready_list)
392{
393 bool progress = false;
394 AioHandler *node;
395
396 while ((node = QLIST_FIRST(ready_list))) {
397 QLIST_REMOVE(node, node_ready);
398 progress = aio_dispatch_handler(ctx, node) || progress;
399 }
400
401 return progress;
402}
403
404
405static bool aio_dispatch_handlers(AioContext *ctx)
406{
407 AioHandler *node, *tmp;
408 bool progress = false;
409
410 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
411 progress = aio_dispatch_handler(ctx, node) || progress;
412 }
413
414 return progress;
415}
416
417void aio_dispatch(AioContext *ctx)
418{
419 qemu_lockcnt_inc(&ctx->list_lock);
420 aio_bh_poll(ctx);
421 aio_dispatch_handlers(ctx);
422 aio_free_deleted_handlers(ctx);
423 qemu_lockcnt_dec(&ctx->list_lock);
424
425 timerlistgroup_run_timers(&ctx->tlg);
426}
427
428static bool run_poll_handlers_once(AioContext *ctx,
429 AioHandlerList *ready_list,
430 int64_t now,
431 int64_t *timeout)
432{
433 bool progress = false;
434 AioHandler *node;
435 AioHandler *tmp;
436
437 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
438 if (aio_node_check(ctx, node->is_external) &&
439 node->io_poll(node->opaque)) {
440 aio_add_poll_ready_handler(ready_list, node);
441
442 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
443
444
445
446
447
448 *timeout = 0;
449 if (node->opaque != &ctx->notifier) {
450 progress = true;
451 }
452 }
453
454
455 }
456
457 return progress;
458}
459
460static bool fdmon_supports_polling(AioContext *ctx)
461{
462 return ctx->fdmon_ops->need_wait != aio_poll_disabled;
463}
464
465static bool remove_idle_poll_handlers(AioContext *ctx,
466 AioHandlerList *ready_list,
467 int64_t now)
468{
469 AioHandler *node;
470 AioHandler *tmp;
471 bool progress = false;
472
473
474
475
476
477
478
479 if (!fdmon_supports_polling(ctx)) {
480 return false;
481 }
482
483 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
484 if (node->poll_idle_timeout == 0LL) {
485 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
486 } else if (now >= node->poll_idle_timeout) {
487 trace_poll_remove(ctx, node, node->pfd.fd);
488 node->poll_idle_timeout = 0LL;
489 QLIST_SAFE_REMOVE(node, node_poll);
490 if (ctx->poll_started && node->io_poll_end) {
491 node->io_poll_end(node->opaque);
492
493
494
495
496
497
498 if (node->io_poll(node->opaque)) {
499 aio_add_poll_ready_handler(ready_list, node);
500 progress = true;
501 }
502 }
503 }
504 }
505
506 return progress;
507}
508
509
510
511
512
513
514
515
516
517
518
519
520static bool run_poll_handlers(AioContext *ctx, AioHandlerList *ready_list,
521 int64_t max_ns, int64_t *timeout)
522{
523 bool progress;
524 int64_t start_time, elapsed_time;
525
526 assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
527
528 trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
529
530
531
532
533
534
535
536
537
538 RCU_READ_LOCK_GUARD();
539
540 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
541 do {
542 progress = run_poll_handlers_once(ctx, ready_list,
543 start_time, timeout);
544 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
545 max_ns = qemu_soonest_timeout(*timeout, max_ns);
546 assert(!(max_ns && progress));
547 } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
548
549 if (remove_idle_poll_handlers(ctx, ready_list,
550 start_time + elapsed_time)) {
551 *timeout = 0;
552 progress = true;
553 }
554
555
556
557
558 if (*timeout != -1) {
559 *timeout -= MIN(*timeout, elapsed_time);
560 }
561
562 trace_run_poll_handlers_end(ctx, progress, *timeout);
563 return progress;
564}
565
566
567
568
569
570
571
572
573
574
575
576static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list,
577 int64_t *timeout)
578{
579 int64_t max_ns;
580
581 if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
582 return false;
583 }
584
585 max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
586 if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
587 poll_set_started(ctx, ready_list, true);
588
589 if (run_poll_handlers(ctx, ready_list, max_ns, timeout)) {
590 return true;
591 }
592 }
593
594 if (poll_set_started(ctx, ready_list, false)) {
595 *timeout = 0;
596 return true;
597 }
598
599 return false;
600}
601
602bool aio_poll(AioContext *ctx, bool blocking)
603{
604 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
605 bool progress;
606 bool use_notify_me;
607 int64_t timeout;
608 int64_t start = 0;
609
610
611
612
613
614
615
616
617
618
619 assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
620 qemu_get_aio_context() : ctx));
621
622 qemu_lockcnt_inc(&ctx->list_lock);
623
624 if (ctx->poll_max_ns) {
625 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
626 }
627
628 timeout = blocking ? aio_compute_timeout(ctx) : 0;
629 progress = try_poll_mode(ctx, &ready_list, &timeout);
630 assert(!(timeout && progress));
631
632
633
634
635
636
637
638
639
640 use_notify_me = timeout != 0;
641 if (use_notify_me) {
642 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
643
644
645
646
647 smp_mb();
648
649
650 if (qatomic_read(&ctx->notified)) {
651 timeout = 0;
652 }
653 }
654
655
656
657
658 if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
659 ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
660 }
661
662 if (use_notify_me) {
663
664 qatomic_store_release(&ctx->notify_me,
665 qatomic_read(&ctx->notify_me) - 2);
666 }
667
668 aio_notify_accept(ctx);
669
670
671 if (ctx->poll_max_ns) {
672 int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
673
674 if (block_ns <= ctx->poll_ns) {
675
676 } else if (block_ns > ctx->poll_max_ns) {
677
678 int64_t old = ctx->poll_ns;
679
680 if (ctx->poll_shrink) {
681 ctx->poll_ns /= ctx->poll_shrink;
682 } else {
683 ctx->poll_ns = 0;
684 }
685
686 trace_poll_shrink(ctx, old, ctx->poll_ns);
687 } else if (ctx->poll_ns < ctx->poll_max_ns &&
688 block_ns < ctx->poll_max_ns) {
689
690 int64_t old = ctx->poll_ns;
691 int64_t grow = ctx->poll_grow;
692
693 if (grow == 0) {
694 grow = 2;
695 }
696
697 if (ctx->poll_ns) {
698 ctx->poll_ns *= grow;
699 } else {
700 ctx->poll_ns = 4000;
701 }
702
703 if (ctx->poll_ns > ctx->poll_max_ns) {
704 ctx->poll_ns = ctx->poll_max_ns;
705 }
706
707 trace_poll_grow(ctx, old, ctx->poll_ns);
708 }
709 }
710
711 progress |= aio_bh_poll(ctx);
712 progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
713
714 aio_free_deleted_handlers(ctx);
715
716 qemu_lockcnt_dec(&ctx->list_lock);
717
718 progress |= timerlistgroup_run_timers(&ctx->tlg);
719
720 return progress;
721}
722
723void aio_context_setup(AioContext *ctx)
724{
725 ctx->fdmon_ops = &fdmon_poll_ops;
726 ctx->epollfd = -1;
727
728
729 if (fdmon_io_uring_setup(ctx)) {
730 return;
731 }
732
733 fdmon_epoll_setup(ctx);
734}
735
736void aio_context_destroy(AioContext *ctx)
737{
738 fdmon_io_uring_destroy(ctx);
739 fdmon_epoll_disable(ctx);
740 aio_free_deleted_handlers(ctx);
741}
742
743void aio_context_use_g_source(AioContext *ctx)
744{
745
746
747
748
749
750
751 fdmon_io_uring_destroy(ctx);
752 aio_free_deleted_handlers(ctx);
753}
754
755void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
756 int64_t grow, int64_t shrink, Error **errp)
757{
758
759
760
761 ctx->poll_max_ns = max_ns;
762 ctx->poll_ns = 0;
763 ctx->poll_grow = grow;
764 ctx->poll_shrink = shrink;
765
766 aio_notify(ctx);
767}
768
769void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
770 Error **errp)
771{
772
773
774
775
776 ctx->aio_max_batch = max_batch;
777
778 aio_notify(ctx);
779}
780