1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16#include "qemu/osdep.h"
17#include "block/block.h"
18#include "qemu/main-loop.h"
19#include "qemu/rcu.h"
20#include "qemu/rcu_queue.h"
21#include "qemu/sockets.h"
22#include "qemu/cutils.h"
23#include "trace.h"
24#include "aio-posix.h"
25
26
27#define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
28
29bool aio_poll_disabled(AioContext *ctx)
30{
31 return qatomic_read(&ctx->poll_disable_cnt);
32}
33
34void aio_add_ready_handler(AioHandlerList *ready_list,
35 AioHandler *node,
36 int revents)
37{
38 QLIST_SAFE_REMOVE(node, node_ready);
39 node->pfd.revents = revents;
40 QLIST_INSERT_HEAD(ready_list, node, node_ready);
41}
42
43static AioHandler *find_aio_handler(AioContext *ctx, int fd)
44{
45 AioHandler *node;
46
47 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
48 if (node->pfd.fd == fd) {
49 if (!QLIST_IS_INSERTED(node, node_deleted)) {
50 return node;
51 }
52 }
53 }
54
55 return NULL;
56}
57
58static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
59{
60
61
62
63
64
65 if (!g_source_is_destroyed(&ctx->source)) {
66 g_source_remove_poll(&ctx->source, &node->pfd);
67 }
68
69 node->pfd.revents = 0;
70
71
72 if (QLIST_IS_INSERTED(node, node_deleted)) {
73 return false;
74 }
75
76
77 if (qemu_lockcnt_count(&ctx->list_lock)) {
78 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
79 return false;
80 }
81
82
83
84
85 QLIST_SAFE_REMOVE(node, node_poll);
86 QLIST_REMOVE(node, node);
87 return true;
88}
89
90void aio_set_fd_handler(AioContext *ctx,
91 int fd,
92 bool is_external,
93 IOHandler *io_read,
94 IOHandler *io_write,
95 AioPollFn *io_poll,
96 void *opaque)
97{
98 AioHandler *node;
99 AioHandler *new_node = NULL;
100 bool is_new = false;
101 bool deleted = false;
102 int poll_disable_change;
103
104 qemu_lockcnt_lock(&ctx->list_lock);
105
106 node = find_aio_handler(ctx, fd);
107
108
109 if (!io_read && !io_write && !io_poll) {
110 if (node == NULL) {
111 qemu_lockcnt_unlock(&ctx->list_lock);
112 return;
113 }
114
115 node->pfd.events = 0;
116
117 poll_disable_change = -!node->io_poll;
118 } else {
119 poll_disable_change = !io_poll - (node && !node->io_poll);
120 if (node == NULL) {
121 is_new = true;
122 }
123
124 new_node = g_new0(AioHandler, 1);
125
126
127 new_node->io_read = io_read;
128 new_node->io_write = io_write;
129 new_node->io_poll = io_poll;
130 new_node->opaque = opaque;
131 new_node->is_external = is_external;
132
133 if (is_new) {
134 new_node->pfd.fd = fd;
135 } else {
136 new_node->pfd = node->pfd;
137 }
138 g_source_add_poll(&ctx->source, &new_node->pfd);
139
140 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
141 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
142
143 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
144 }
145
146
147
148
149
150
151
152 qatomic_set(&ctx->poll_disable_cnt,
153 qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
154
155 ctx->fdmon_ops->update(ctx, node, new_node);
156 if (node) {
157 deleted = aio_remove_fd_handler(ctx, node);
158 }
159 qemu_lockcnt_unlock(&ctx->list_lock);
160 aio_notify(ctx);
161
162 if (deleted) {
163 g_free(node);
164 }
165}
166
167void aio_set_fd_poll(AioContext *ctx, int fd,
168 IOHandler *io_poll_begin,
169 IOHandler *io_poll_end)
170{
171 AioHandler *node = find_aio_handler(ctx, fd);
172
173 if (!node) {
174 return;
175 }
176
177 node->io_poll_begin = io_poll_begin;
178 node->io_poll_end = io_poll_end;
179}
180
181void aio_set_event_notifier(AioContext *ctx,
182 EventNotifier *notifier,
183 bool is_external,
184 EventNotifierHandler *io_read,
185 AioPollFn *io_poll)
186{
187 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
188 (IOHandler *)io_read, NULL, io_poll, notifier);
189}
190
191void aio_set_event_notifier_poll(AioContext *ctx,
192 EventNotifier *notifier,
193 EventNotifierHandler *io_poll_begin,
194 EventNotifierHandler *io_poll_end)
195{
196 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
197 (IOHandler *)io_poll_begin,
198 (IOHandler *)io_poll_end);
199}
200
201static bool poll_set_started(AioContext *ctx, bool started)
202{
203 AioHandler *node;
204 bool progress = false;
205
206 if (started == ctx->poll_started) {
207 return false;
208 }
209
210 ctx->poll_started = started;
211
212 qemu_lockcnt_inc(&ctx->list_lock);
213 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
214 IOHandler *fn;
215
216 if (QLIST_IS_INSERTED(node, node_deleted)) {
217 continue;
218 }
219
220 if (started) {
221 fn = node->io_poll_begin;
222 } else {
223 fn = node->io_poll_end;
224 }
225
226 if (fn) {
227 fn(node->opaque);
228 }
229
230
231 if (!started) {
232 progress = node->io_poll(node->opaque) || progress;
233 }
234 }
235 qemu_lockcnt_dec(&ctx->list_lock);
236
237 return progress;
238}
239
240
241bool aio_prepare(AioContext *ctx)
242{
243
244 poll_set_started(ctx, false);
245
246 return false;
247}
248
249bool aio_pending(AioContext *ctx)
250{
251 AioHandler *node;
252 bool result = false;
253
254
255
256
257
258 qemu_lockcnt_inc(&ctx->list_lock);
259
260 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
261 int revents;
262
263 revents = node->pfd.revents & node->pfd.events;
264 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
265 aio_node_check(ctx, node->is_external)) {
266 result = true;
267 break;
268 }
269 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
270 aio_node_check(ctx, node->is_external)) {
271 result = true;
272 break;
273 }
274 }
275 qemu_lockcnt_dec(&ctx->list_lock);
276
277 return result;
278}
279
280static void aio_free_deleted_handlers(AioContext *ctx)
281{
282 AioHandler *node;
283
284 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
285 return;
286 }
287 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
288 return;
289 }
290
291 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
292 QLIST_REMOVE(node, node);
293 QLIST_REMOVE(node, node_deleted);
294 QLIST_SAFE_REMOVE(node, node_poll);
295 g_free(node);
296 }
297
298 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
299}
300
301static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
302{
303 bool progress = false;
304 int revents;
305
306 revents = node->pfd.revents & node->pfd.events;
307 node->pfd.revents = 0;
308
309
310
311
312
313
314
315 if (!QLIST_IS_INSERTED(node, node_deleted) &&
316 !QLIST_IS_INSERTED(node, node_poll) &&
317 node->io_poll) {
318 trace_poll_add(ctx, node, node->pfd.fd, revents);
319 if (ctx->poll_started && node->io_poll_begin) {
320 node->io_poll_begin(node->opaque);
321 }
322 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
323 }
324
325 if (!QLIST_IS_INSERTED(node, node_deleted) &&
326 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
327 aio_node_check(ctx, node->is_external) &&
328 node->io_read) {
329 node->io_read(node->opaque);
330
331
332 if (node->opaque != &ctx->notifier) {
333 progress = true;
334 }
335 }
336 if (!QLIST_IS_INSERTED(node, node_deleted) &&
337 (revents & (G_IO_OUT | G_IO_ERR)) &&
338 aio_node_check(ctx, node->is_external) &&
339 node->io_write) {
340 node->io_write(node->opaque);
341 progress = true;
342 }
343
344 return progress;
345}
346
347
348
349
350
351static bool aio_dispatch_ready_handlers(AioContext *ctx,
352 AioHandlerList *ready_list)
353{
354 bool progress = false;
355 AioHandler *node;
356
357 while ((node = QLIST_FIRST(ready_list))) {
358 QLIST_REMOVE(node, node_ready);
359 progress = aio_dispatch_handler(ctx, node) || progress;
360 }
361
362 return progress;
363}
364
365
366static bool aio_dispatch_handlers(AioContext *ctx)
367{
368 AioHandler *node, *tmp;
369 bool progress = false;
370
371 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
372 progress = aio_dispatch_handler(ctx, node) || progress;
373 }
374
375 return progress;
376}
377
378void aio_dispatch(AioContext *ctx)
379{
380 qemu_lockcnt_inc(&ctx->list_lock);
381 aio_bh_poll(ctx);
382 aio_dispatch_handlers(ctx);
383 aio_free_deleted_handlers(ctx);
384 qemu_lockcnt_dec(&ctx->list_lock);
385
386 timerlistgroup_run_timers(&ctx->tlg);
387}
388
389static bool run_poll_handlers_once(AioContext *ctx,
390 int64_t now,
391 int64_t *timeout)
392{
393 bool progress = false;
394 AioHandler *node;
395 AioHandler *tmp;
396
397 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
398 if (aio_node_check(ctx, node->is_external) &&
399 node->io_poll(node->opaque)) {
400 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
401
402
403
404
405
406 *timeout = 0;
407 if (node->opaque != &ctx->notifier) {
408 progress = true;
409 }
410 }
411
412
413 }
414
415 return progress;
416}
417
418static bool fdmon_supports_polling(AioContext *ctx)
419{
420 return ctx->fdmon_ops->need_wait != aio_poll_disabled;
421}
422
423static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now)
424{
425 AioHandler *node;
426 AioHandler *tmp;
427 bool progress = false;
428
429
430
431
432
433
434
435 if (!fdmon_supports_polling(ctx)) {
436 return false;
437 }
438
439 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
440 if (node->poll_idle_timeout == 0LL) {
441 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
442 } else if (now >= node->poll_idle_timeout) {
443 trace_poll_remove(ctx, node, node->pfd.fd);
444 node->poll_idle_timeout = 0LL;
445 QLIST_SAFE_REMOVE(node, node_poll);
446 if (ctx->poll_started && node->io_poll_end) {
447 node->io_poll_end(node->opaque);
448
449
450
451
452
453
454 progress = node->io_poll(node->opaque) || progress;
455 }
456 }
457 }
458
459 return progress;
460}
461
462
463
464
465
466
467
468
469
470
471
472static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
473{
474 bool progress;
475 int64_t start_time, elapsed_time;
476
477 assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
478
479 trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
480
481
482
483
484
485
486
487
488
489 RCU_READ_LOCK_GUARD();
490
491 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
492 do {
493 progress = run_poll_handlers_once(ctx, start_time, timeout);
494 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
495 max_ns = qemu_soonest_timeout(*timeout, max_ns);
496 assert(!(max_ns && progress));
497 } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
498
499 if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) {
500 *timeout = 0;
501 progress = true;
502 }
503
504
505
506
507 if (*timeout != -1) {
508 *timeout -= MIN(*timeout, elapsed_time);
509 }
510
511 trace_run_poll_handlers_end(ctx, progress, *timeout);
512 return progress;
513}
514
515
516
517
518
519
520
521
522
523
524static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
525{
526 int64_t max_ns;
527
528 if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
529 return false;
530 }
531
532 max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
533 if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
534 poll_set_started(ctx, true);
535
536 if (run_poll_handlers(ctx, max_ns, timeout)) {
537 return true;
538 }
539 }
540
541 if (poll_set_started(ctx, false)) {
542 *timeout = 0;
543 return true;
544 }
545
546 return false;
547}
548
549bool aio_poll(AioContext *ctx, bool blocking)
550{
551 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
552 int ret = 0;
553 bool progress;
554 bool use_notify_me;
555 int64_t timeout;
556 int64_t start = 0;
557
558
559
560
561
562
563
564
565
566
567 assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
568 qemu_get_aio_context() : ctx));
569
570 qemu_lockcnt_inc(&ctx->list_lock);
571
572 if (ctx->poll_max_ns) {
573 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
574 }
575
576 timeout = blocking ? aio_compute_timeout(ctx) : 0;
577 progress = try_poll_mode(ctx, &timeout);
578 assert(!(timeout && progress));
579
580
581
582
583
584
585
586
587
588 use_notify_me = timeout != 0;
589 if (use_notify_me) {
590 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
591
592
593
594
595 smp_mb();
596
597
598 if (qatomic_read(&ctx->notified)) {
599 timeout = 0;
600 }
601 }
602
603
604
605
606 if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
607 ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
608 }
609
610 if (use_notify_me) {
611
612 qatomic_store_release(&ctx->notify_me,
613 qatomic_read(&ctx->notify_me) - 2);
614 }
615
616 aio_notify_accept(ctx);
617
618
619 if (ctx->poll_max_ns) {
620 int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
621
622 if (block_ns <= ctx->poll_ns) {
623
624 } else if (block_ns > ctx->poll_max_ns) {
625
626 int64_t old = ctx->poll_ns;
627
628 if (ctx->poll_shrink) {
629 ctx->poll_ns /= ctx->poll_shrink;
630 } else {
631 ctx->poll_ns = 0;
632 }
633
634 trace_poll_shrink(ctx, old, ctx->poll_ns);
635 } else if (ctx->poll_ns < ctx->poll_max_ns &&
636 block_ns < ctx->poll_max_ns) {
637
638 int64_t old = ctx->poll_ns;
639 int64_t grow = ctx->poll_grow;
640
641 if (grow == 0) {
642 grow = 2;
643 }
644
645 if (ctx->poll_ns) {
646 ctx->poll_ns *= grow;
647 } else {
648 ctx->poll_ns = 4000;
649 }
650
651 if (ctx->poll_ns > ctx->poll_max_ns) {
652 ctx->poll_ns = ctx->poll_max_ns;
653 }
654
655 trace_poll_grow(ctx, old, ctx->poll_ns);
656 }
657 }
658
659 progress |= aio_bh_poll(ctx);
660
661 if (ret > 0) {
662 progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
663 }
664
665 aio_free_deleted_handlers(ctx);
666
667 qemu_lockcnt_dec(&ctx->list_lock);
668
669 progress |= timerlistgroup_run_timers(&ctx->tlg);
670
671 return progress;
672}
673
674void aio_context_setup(AioContext *ctx)
675{
676 ctx->fdmon_ops = &fdmon_poll_ops;
677 ctx->epollfd = -1;
678
679
680 if (fdmon_io_uring_setup(ctx)) {
681 return;
682 }
683
684 fdmon_epoll_setup(ctx);
685}
686
687void aio_context_destroy(AioContext *ctx)
688{
689 fdmon_io_uring_destroy(ctx);
690 fdmon_epoll_disable(ctx);
691 aio_free_deleted_handlers(ctx);
692}
693
694void aio_context_use_g_source(AioContext *ctx)
695{
696
697
698
699
700
701
702 fdmon_io_uring_destroy(ctx);
703 aio_free_deleted_handlers(ctx);
704}
705
706void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
707 int64_t grow, int64_t shrink, Error **errp)
708{
709
710
711
712 ctx->poll_max_ns = max_ns;
713 ctx->poll_ns = 0;
714 ctx->poll_grow = grow;
715 ctx->poll_shrink = shrink;
716
717 aio_notify(ctx);
718}
719