qemu/util/aio-posix.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include "qemu/osdep.h"
  17#include "block/block.h"
  18#include "block/thread-pool.h"
  19#include "qemu/main-loop.h"
  20#include "qemu/rcu.h"
  21#include "qemu/rcu_queue.h"
  22#include "qemu/sockets.h"
  23#include "qemu/cutils.h"
  24#include "trace.h"
  25#include "aio-posix.h"
  26
  27/* Stop userspace polling on a handler if it isn't active for some time */
  28#define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
  29
  30bool aio_poll_disabled(AioContext *ctx)
  31{
  32    return qatomic_read(&ctx->poll_disable_cnt);
  33}
  34
  35void aio_add_ready_handler(AioHandlerList *ready_list,
  36                           AioHandler *node,
  37                           int revents)
  38{
  39    QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
  40    node->pfd.revents = revents;
  41    QLIST_INSERT_HEAD(ready_list, node, node_ready);
  42}
  43
  44static void aio_add_poll_ready_handler(AioHandlerList *ready_list,
  45                                       AioHandler *node)
  46{
  47    QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
  48    node->poll_ready = true;
  49    QLIST_INSERT_HEAD(ready_list, node, node_ready);
  50}
  51
  52static AioHandler *find_aio_handler(AioContext *ctx, int fd)
  53{
  54    AioHandler *node;
  55
  56    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
  57        if (node->pfd.fd == fd) {
  58            if (!QLIST_IS_INSERTED(node, node_deleted)) {
  59                return node;
  60            }
  61        }
  62    }
  63
  64    return NULL;
  65}
  66
  67static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
  68{
  69    /* If the GSource is in the process of being destroyed then
  70     * g_source_remove_poll() causes an assertion failure.  Skip
  71     * removal in that case, because glib cleans up its state during
  72     * destruction anyway.
  73     */
  74    if (!g_source_is_destroyed(&ctx->source)) {
  75        g_source_remove_poll(&ctx->source, &node->pfd);
  76    }
  77
  78    node->pfd.revents = 0;
  79    node->poll_ready = false;
  80
  81    /* If the fd monitor has already marked it deleted, leave it alone */
  82    if (QLIST_IS_INSERTED(node, node_deleted)) {
  83        return false;
  84    }
  85
  86    /* If a read is in progress, just mark the node as deleted */
  87    if (qemu_lockcnt_count(&ctx->list_lock)) {
  88        QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
  89        return false;
  90    }
  91    /* Otherwise, delete it for real.  We can't just mark it as
  92     * deleted because deleted nodes are only cleaned up while
  93     * no one is walking the handlers list.
  94     */
  95    QLIST_SAFE_REMOVE(node, node_poll);
  96    QLIST_REMOVE(node, node);
  97    return true;
  98}
  99
 100void aio_set_fd_handler(AioContext *ctx,
 101                        int fd,
 102                        bool is_external,
 103                        IOHandler *io_read,
 104                        IOHandler *io_write,
 105                        AioPollFn *io_poll,
 106                        IOHandler *io_poll_ready,
 107                        void *opaque)
 108{
 109    AioHandler *node;
 110    AioHandler *new_node = NULL;
 111    bool is_new = false;
 112    bool deleted = false;
 113    int poll_disable_change;
 114
 115    if (io_poll && !io_poll_ready) {
 116        io_poll = NULL; /* polling only makes sense if there is a handler */
 117    }
 118
 119    qemu_lockcnt_lock(&ctx->list_lock);
 120
 121    node = find_aio_handler(ctx, fd);
 122
 123    /* Are we deleting the fd handler? */
 124    if (!io_read && !io_write && !io_poll) {
 125        if (node == NULL) {
 126            qemu_lockcnt_unlock(&ctx->list_lock);
 127            return;
 128        }
 129        /* Clean events in order to unregister fd from the ctx epoll. */
 130        node->pfd.events = 0;
 131
 132        poll_disable_change = -!node->io_poll;
 133    } else {
 134        poll_disable_change = !io_poll - (node && !node->io_poll);
 135        if (node == NULL) {
 136            is_new = true;
 137        }
 138        /* Alloc and insert if it's not already there */
 139        new_node = g_new0(AioHandler, 1);
 140
 141        /* Update handler with latest information */
 142        new_node->io_read = io_read;
 143        new_node->io_write = io_write;
 144        new_node->io_poll = io_poll;
 145        new_node->io_poll_ready = io_poll_ready;
 146        new_node->opaque = opaque;
 147        new_node->is_external = is_external;
 148
 149        if (is_new) {
 150            new_node->pfd.fd = fd;
 151        } else {
 152            new_node->pfd = node->pfd;
 153        }
 154        g_source_add_poll(&ctx->source, &new_node->pfd);
 155
 156        new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
 157        new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
 158
 159        QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
 160    }
 161
 162    /* No need to order poll_disable_cnt writes against other updates;
 163     * the counter is only used to avoid wasting time and latency on
 164     * iterated polling when the system call will be ultimately necessary.
 165     * Changing handlers is a rare event, and a little wasted polling until
 166     * the aio_notify below is not an issue.
 167     */
 168    qatomic_set(&ctx->poll_disable_cnt,
 169               qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
 170
 171    ctx->fdmon_ops->update(ctx, node, new_node);
 172    if (node) {
 173        deleted = aio_remove_fd_handler(ctx, node);
 174    }
 175    qemu_lockcnt_unlock(&ctx->list_lock);
 176    aio_notify(ctx);
 177
 178    if (deleted) {
 179        g_free(node);
 180    }
 181}
 182
 183static void aio_set_fd_poll(AioContext *ctx, int fd,
 184                            IOHandler *io_poll_begin,
 185                            IOHandler *io_poll_end)
 186{
 187    AioHandler *node = find_aio_handler(ctx, fd);
 188
 189    if (!node) {
 190        return;
 191    }
 192
 193    node->io_poll_begin = io_poll_begin;
 194    node->io_poll_end = io_poll_end;
 195}
 196
 197void aio_set_event_notifier(AioContext *ctx,
 198                            EventNotifier *notifier,
 199                            bool is_external,
 200                            EventNotifierHandler *io_read,
 201                            AioPollFn *io_poll,
 202                            EventNotifierHandler *io_poll_ready)
 203{
 204    aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
 205                       (IOHandler *)io_read, NULL, io_poll,
 206                       (IOHandler *)io_poll_ready, notifier);
 207}
 208
 209void aio_set_event_notifier_poll(AioContext *ctx,
 210                                 EventNotifier *notifier,
 211                                 EventNotifierHandler *io_poll_begin,
 212                                 EventNotifierHandler *io_poll_end)
 213{
 214    aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
 215                    (IOHandler *)io_poll_begin,
 216                    (IOHandler *)io_poll_end);
 217}
 218
 219static bool poll_set_started(AioContext *ctx, AioHandlerList *ready_list,
 220                             bool started)
 221{
 222    AioHandler *node;
 223    bool progress = false;
 224
 225    if (started == ctx->poll_started) {
 226        return false;
 227    }
 228
 229    ctx->poll_started = started;
 230
 231    qemu_lockcnt_inc(&ctx->list_lock);
 232    QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
 233        IOHandler *fn;
 234
 235        if (QLIST_IS_INSERTED(node, node_deleted)) {
 236            continue;
 237        }
 238
 239        if (started) {
 240            fn = node->io_poll_begin;
 241        } else {
 242            fn = node->io_poll_end;
 243        }
 244
 245        if (fn) {
 246            fn(node->opaque);
 247        }
 248
 249        /* Poll one last time in case ->io_poll_end() raced with the event */
 250        if (!started && node->io_poll(node->opaque)) {
 251            aio_add_poll_ready_handler(ready_list, node);
 252            progress = true;
 253        }
 254    }
 255    qemu_lockcnt_dec(&ctx->list_lock);
 256
 257    return progress;
 258}
 259
 260
 261bool aio_prepare(AioContext *ctx)
 262{
 263    AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
 264
 265    /* Poll mode cannot be used with glib's event loop, disable it. */
 266    poll_set_started(ctx, &ready_list, false);
 267    /* TODO what to do with this list? */
 268
 269    return false;
 270}
 271
 272bool aio_pending(AioContext *ctx)
 273{
 274    AioHandler *node;
 275    bool result = false;
 276
 277    /*
 278     * We have to walk very carefully in case aio_set_fd_handler is
 279     * called while we're walking.
 280     */
 281    qemu_lockcnt_inc(&ctx->list_lock);
 282
 283    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 284        int revents;
 285
 286        /* TODO should this check poll ready? */
 287        revents = node->pfd.revents & node->pfd.events;
 288        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
 289            aio_node_check(ctx, node->is_external)) {
 290            result = true;
 291            break;
 292        }
 293        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
 294            aio_node_check(ctx, node->is_external)) {
 295            result = true;
 296            break;
 297        }
 298    }
 299    qemu_lockcnt_dec(&ctx->list_lock);
 300
 301    return result;
 302}
 303
 304static void aio_free_deleted_handlers(AioContext *ctx)
 305{
 306    AioHandler *node;
 307
 308    if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
 309        return;
 310    }
 311    if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 312        return; /* we are nested, let the parent do the freeing */
 313    }
 314
 315    while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
 316        QLIST_REMOVE(node, node);
 317        QLIST_REMOVE(node, node_deleted);
 318        QLIST_SAFE_REMOVE(node, node_poll);
 319        g_free(node);
 320    }
 321
 322    qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 323}
 324
 325static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
 326{
 327    bool progress = false;
 328    bool poll_ready;
 329    int revents;
 330
 331    revents = node->pfd.revents & node->pfd.events;
 332    node->pfd.revents = 0;
 333
 334    poll_ready = node->poll_ready;
 335    node->poll_ready = false;
 336
 337    /*
 338     * Start polling AioHandlers when they become ready because activity is
 339     * likely to continue.  Note that starvation is theoretically possible when
 340     * fdmon_supports_polling(), but only until the fd fires for the first
 341     * time.
 342     */
 343    if (!QLIST_IS_INSERTED(node, node_deleted) &&
 344        !QLIST_IS_INSERTED(node, node_poll) &&
 345        node->io_poll) {
 346        trace_poll_add(ctx, node, node->pfd.fd, revents);
 347        if (ctx->poll_started && node->io_poll_begin) {
 348            node->io_poll_begin(node->opaque);
 349        }
 350        QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
 351    }
 352    if (!QLIST_IS_INSERTED(node, node_deleted) &&
 353        poll_ready && revents == 0 &&
 354        aio_node_check(ctx, node->is_external) &&
 355        node->io_poll_ready) {
 356        /*
 357         * Remove temporarily to avoid infinite loops when ->io_poll_ready()
 358         * calls aio_poll() before clearing the condition that made the poll
 359         * handler become ready.
 360         */
 361        QLIST_SAFE_REMOVE(node, node_poll);
 362
 363        node->io_poll_ready(node->opaque);
 364
 365        if (!QLIST_IS_INSERTED(node, node_poll)) {
 366            QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
 367        }
 368
 369        /*
 370         * Return early since revents was zero. aio_notify() does not count as
 371         * progress.
 372         */
 373        return node->opaque != &ctx->notifier;
 374    }
 375
 376    if (!QLIST_IS_INSERTED(node, node_deleted) &&
 377        (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
 378        aio_node_check(ctx, node->is_external) &&
 379        node->io_read) {
 380        node->io_read(node->opaque);
 381
 382        /* aio_notify() does not count as progress */
 383        if (node->opaque != &ctx->notifier) {
 384            progress = true;
 385        }
 386    }
 387    if (!QLIST_IS_INSERTED(node, node_deleted) &&
 388        (revents & (G_IO_OUT | G_IO_ERR)) &&
 389        aio_node_check(ctx, node->is_external) &&
 390        node->io_write) {
 391        node->io_write(node->opaque);
 392        progress = true;
 393    }
 394
 395    return progress;
 396}
 397
 398/*
 399 * If we have a list of ready handlers then this is more efficient than
 400 * scanning all handlers with aio_dispatch_handlers().
 401 */
 402static bool aio_dispatch_ready_handlers(AioContext *ctx,
 403                                        AioHandlerList *ready_list)
 404{
 405    bool progress = false;
 406    AioHandler *node;
 407
 408    while ((node = QLIST_FIRST(ready_list))) {
 409        QLIST_REMOVE(node, node_ready);
 410        progress = aio_dispatch_handler(ctx, node) || progress;
 411    }
 412
 413    return progress;
 414}
 415
 416/* Slower than aio_dispatch_ready_handlers() but only used via glib */
 417static bool aio_dispatch_handlers(AioContext *ctx)
 418{
 419    AioHandler *node, *tmp;
 420    bool progress = false;
 421
 422    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
 423        progress = aio_dispatch_handler(ctx, node) || progress;
 424    }
 425
 426    return progress;
 427}
 428
 429void aio_dispatch(AioContext *ctx)
 430{
 431    qemu_lockcnt_inc(&ctx->list_lock);
 432    aio_bh_poll(ctx);
 433    aio_dispatch_handlers(ctx);
 434    aio_free_deleted_handlers(ctx);
 435    qemu_lockcnt_dec(&ctx->list_lock);
 436
 437    timerlistgroup_run_timers(&ctx->tlg);
 438}
 439
 440static bool run_poll_handlers_once(AioContext *ctx,
 441                                   AioHandlerList *ready_list,
 442                                   int64_t now,
 443                                   int64_t *timeout)
 444{
 445    bool progress = false;
 446    AioHandler *node;
 447    AioHandler *tmp;
 448
 449    QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
 450        if (aio_node_check(ctx, node->is_external) &&
 451            node->io_poll(node->opaque)) {
 452            aio_add_poll_ready_handler(ready_list, node);
 453
 454            node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
 455
 456            /*
 457             * Polling was successful, exit try_poll_mode immediately
 458             * to adjust the next polling time.
 459             */
 460            *timeout = 0;
 461            if (node->opaque != &ctx->notifier) {
 462                progress = true;
 463            }
 464        }
 465
 466        /* Caller handles freeing deleted nodes.  Don't do it here. */
 467    }
 468
 469    return progress;
 470}
 471
 472static bool fdmon_supports_polling(AioContext *ctx)
 473{
 474    return ctx->fdmon_ops->need_wait != aio_poll_disabled;
 475}
 476
 477static bool remove_idle_poll_handlers(AioContext *ctx,
 478                                      AioHandlerList *ready_list,
 479                                      int64_t now)
 480{
 481    AioHandler *node;
 482    AioHandler *tmp;
 483    bool progress = false;
 484
 485    /*
 486     * File descriptor monitoring implementations without userspace polling
 487     * support suffer from starvation when a subset of handlers is polled
 488     * because fds will not be processed in a timely fashion.  Don't remove
 489     * idle poll handlers.
 490     */
 491    if (!fdmon_supports_polling(ctx)) {
 492        return false;
 493    }
 494
 495    QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
 496        if (node->poll_idle_timeout == 0LL) {
 497            node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
 498        } else if (now >= node->poll_idle_timeout) {
 499            trace_poll_remove(ctx, node, node->pfd.fd);
 500            node->poll_idle_timeout = 0LL;
 501            QLIST_SAFE_REMOVE(node, node_poll);
 502            if (ctx->poll_started && node->io_poll_end) {
 503                node->io_poll_end(node->opaque);
 504
 505                /*
 506                 * Final poll in case ->io_poll_end() races with an event.
 507                 * Nevermind about re-adding the handler in the rare case where
 508                 * this causes progress.
 509                 */
 510                if (node->io_poll(node->opaque)) {
 511                    aio_add_poll_ready_handler(ready_list, node);
 512                    progress = true;
 513                }
 514            }
 515        }
 516    }
 517
 518    return progress;
 519}
 520
 521/* run_poll_handlers:
 522 * @ctx: the AioContext
 523 * @ready_list: the list to place ready handlers on
 524 * @max_ns: maximum time to poll for, in nanoseconds
 525 *
 526 * Polls for a given time.
 527 *
 528 * Note that the caller must have incremented ctx->list_lock.
 529 *
 530 * Returns: true if progress was made, false otherwise
 531 */
 532static bool run_poll_handlers(AioContext *ctx, AioHandlerList *ready_list,
 533                              int64_t max_ns, int64_t *timeout)
 534{
 535    bool progress;
 536    int64_t start_time, elapsed_time;
 537
 538    assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
 539
 540    trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
 541
 542    /*
 543     * Optimization: ->io_poll() handlers often contain RCU read critical
 544     * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
 545     * -> rcu_read_lock() -> ... sequences with expensive memory
 546     * synchronization primitives.  Make the entire polling loop an RCU
 547     * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
 548     * are cheap.
 549     */
 550    RCU_READ_LOCK_GUARD();
 551
 552    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 553    do {
 554        progress = run_poll_handlers_once(ctx, ready_list,
 555                                          start_time, timeout);
 556        elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
 557        max_ns = qemu_soonest_timeout(*timeout, max_ns);
 558        assert(!(max_ns && progress));
 559    } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
 560
 561    if (remove_idle_poll_handlers(ctx, ready_list,
 562                                  start_time + elapsed_time)) {
 563        *timeout = 0;
 564        progress = true;
 565    }
 566
 567    /* If time has passed with no successful polling, adjust *timeout to
 568     * keep the same ending time.
 569     */
 570    if (*timeout != -1) {
 571        *timeout -= MIN(*timeout, elapsed_time);
 572    }
 573
 574    trace_run_poll_handlers_end(ctx, progress, *timeout);
 575    return progress;
 576}
 577
 578/* try_poll_mode:
 579 * @ctx: the AioContext
 580 * @ready_list: list to add handlers that need to be run
 581 * @timeout: timeout for blocking wait, computed by the caller and updated if
 582 *    polling succeeds.
 583 *
 584 * Note that the caller must have incremented ctx->list_lock.
 585 *
 586 * Returns: true if progress was made, false otherwise
 587 */
 588static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list,
 589                          int64_t *timeout)
 590{
 591    int64_t max_ns;
 592
 593    if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
 594        return false;
 595    }
 596
 597    max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
 598    if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
 599        /*
 600         * Enable poll mode. It pairs with the poll_set_started() in
 601         * aio_poll() which disables poll mode.
 602         */
 603        poll_set_started(ctx, ready_list, true);
 604
 605        if (run_poll_handlers(ctx, ready_list, max_ns, timeout)) {
 606            return true;
 607        }
 608    }
 609    return false;
 610}
 611
 612bool aio_poll(AioContext *ctx, bool blocking)
 613{
 614    AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
 615    bool progress;
 616    bool use_notify_me;
 617    int64_t timeout;
 618    int64_t start = 0;
 619
 620    /*
 621     * There cannot be two concurrent aio_poll calls for the same AioContext (or
 622     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
 623     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
 624     *
 625     * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
 626     * is special in that it runs in the main thread, but that thread's context
 627     * is qemu_aio_context.
 628     */
 629    assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
 630                                      qemu_get_aio_context() : ctx));
 631
 632    qemu_lockcnt_inc(&ctx->list_lock);
 633
 634    if (ctx->poll_max_ns) {
 635        start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 636    }
 637
 638    timeout = blocking ? aio_compute_timeout(ctx) : 0;
 639    progress = try_poll_mode(ctx, &ready_list, &timeout);
 640    assert(!(timeout && progress));
 641
 642    /*
 643     * aio_notify can avoid the expensive event_notifier_set if
 644     * everything (file descriptors, bottom halves, timers) will
 645     * be re-evaluated before the next blocking poll().  This is
 646     * already true when aio_poll is called with blocking == false;
 647     * if blocking == true, it is only true after poll() returns,
 648     * so disable the optimization now.
 649     */
 650    use_notify_me = timeout != 0;
 651    if (use_notify_me) {
 652        qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
 653        /*
 654         * Write ctx->notify_me before reading ctx->notified.  Pairs with
 655         * smp_mb in aio_notify().
 656         */
 657        smp_mb();
 658
 659        /* Don't block if aio_notify() was called */
 660        if (qatomic_read(&ctx->notified)) {
 661            timeout = 0;
 662        }
 663    }
 664
 665    /* If polling is allowed, non-blocking aio_poll does not need the
 666     * system call---a single round of run_poll_handlers_once suffices.
 667     */
 668    if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
 669        /*
 670         * Disable poll mode. poll mode should be disabled before the call
 671         * of ctx->fdmon_ops->wait() so that guest's notification can wake
 672         * up IO threads when some work becomes pending. It is essential to
 673         * avoid hangs or unnecessary latency.
 674         */
 675        if (poll_set_started(ctx, &ready_list, false)) {
 676            timeout = 0;
 677            progress = true;
 678        }
 679
 680        ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
 681    }
 682
 683    if (use_notify_me) {
 684        /* Finish the poll before clearing the flag.  */
 685        qatomic_store_release(&ctx->notify_me,
 686                             qatomic_read(&ctx->notify_me) - 2);
 687    }
 688
 689    aio_notify_accept(ctx);
 690
 691    /* Adjust polling time */
 692    if (ctx->poll_max_ns) {
 693        int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
 694
 695        if (block_ns <= ctx->poll_ns) {
 696            /* This is the sweet spot, no adjustment needed */
 697        } else if (block_ns > ctx->poll_max_ns) {
 698            /* We'd have to poll for too long, poll less */
 699            int64_t old = ctx->poll_ns;
 700
 701            if (ctx->poll_shrink) {
 702                ctx->poll_ns /= ctx->poll_shrink;
 703            } else {
 704                ctx->poll_ns = 0;
 705            }
 706
 707            trace_poll_shrink(ctx, old, ctx->poll_ns);
 708        } else if (ctx->poll_ns < ctx->poll_max_ns &&
 709                   block_ns < ctx->poll_max_ns) {
 710            /* There is room to grow, poll longer */
 711            int64_t old = ctx->poll_ns;
 712            int64_t grow = ctx->poll_grow;
 713
 714            if (grow == 0) {
 715                grow = 2;
 716            }
 717
 718            if (ctx->poll_ns) {
 719                ctx->poll_ns *= grow;
 720            } else {
 721                ctx->poll_ns = 4000; /* start polling at 4 microseconds */
 722            }
 723
 724            if (ctx->poll_ns > ctx->poll_max_ns) {
 725                ctx->poll_ns = ctx->poll_max_ns;
 726            }
 727
 728            trace_poll_grow(ctx, old, ctx->poll_ns);
 729        }
 730    }
 731
 732    progress |= aio_bh_poll(ctx);
 733    progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
 734
 735    aio_free_deleted_handlers(ctx);
 736
 737    qemu_lockcnt_dec(&ctx->list_lock);
 738
 739    progress |= timerlistgroup_run_timers(&ctx->tlg);
 740
 741    return progress;
 742}
 743
 744void aio_context_setup(AioContext *ctx)
 745{
 746    ctx->fdmon_ops = &fdmon_poll_ops;
 747    ctx->epollfd = -1;
 748
 749    /* Use the fastest fd monitoring implementation if available */
 750    if (fdmon_io_uring_setup(ctx)) {
 751        return;
 752    }
 753
 754    fdmon_epoll_setup(ctx);
 755}
 756
 757void aio_context_destroy(AioContext *ctx)
 758{
 759    fdmon_io_uring_destroy(ctx);
 760    fdmon_epoll_disable(ctx);
 761    aio_free_deleted_handlers(ctx);
 762}
 763
 764void aio_context_use_g_source(AioContext *ctx)
 765{
 766    /*
 767     * Disable io_uring when the glib main loop is used because it doesn't
 768     * support mixed glib/aio_poll() usage. It relies on aio_poll() being
 769     * called regularly so that changes to the monitored file descriptors are
 770     * submitted, otherwise a list of pending fd handlers builds up.
 771     */
 772    fdmon_io_uring_destroy(ctx);
 773    aio_free_deleted_handlers(ctx);
 774}
 775
 776void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 777                                 int64_t grow, int64_t shrink, Error **errp)
 778{
 779    /* No thread synchronization here, it doesn't matter if an incorrect value
 780     * is used once.
 781     */
 782    ctx->poll_max_ns = max_ns;
 783    ctx->poll_ns = 0;
 784    ctx->poll_grow = grow;
 785    ctx->poll_shrink = shrink;
 786
 787    aio_notify(ctx);
 788}
 789
 790void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
 791                                Error **errp)
 792{
 793    /*
 794     * No thread synchronization here, it doesn't matter if an incorrect value
 795     * is used once.
 796     */
 797    ctx->aio_max_batch = max_batch;
 798
 799    aio_notify(ctx);
 800}
 801