qemu/util/async.c
<<
>>
Prefs
   1/*
   2 * Data plane event loop
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2009-2017 QEMU contributors
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qapi/error.h"
  28#include "block/aio.h"
  29#include "block/thread-pool.h"
  30#include "qemu/main-loop.h"
  31#include "qemu/atomic.h"
  32#include "qemu/rcu_queue.h"
  33#include "block/raw-aio.h"
  34#include "qemu/coroutine_int.h"
  35#include "trace.h"
  36
  37/***********************************************************/
  38/* bottom halves (can be seen as timers which expire ASAP) */
  39
  40/* QEMUBH::flags values */
  41enum {
  42    /* Already enqueued and waiting for aio_bh_poll() */
  43    BH_PENDING   = (1 << 0),
  44
  45    /* Invoke the callback */
  46    BH_SCHEDULED = (1 << 1),
  47
  48    /* Delete without invoking callback */
  49    BH_DELETED   = (1 << 2),
  50
  51    /* Delete after invoking callback */
  52    BH_ONESHOT   = (1 << 3),
  53
  54    /* Schedule periodically when the event loop is idle */
  55    BH_IDLE      = (1 << 4),
  56};
  57
  58struct QEMUBH {
  59    AioContext *ctx;
  60    QEMUBHFunc *cb;
  61    void *opaque;
  62    QSLIST_ENTRY(QEMUBH) next;
  63    unsigned flags;
  64};
  65
  66/* Called concurrently from any thread */
  67static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags)
  68{
  69    AioContext *ctx = bh->ctx;
  70    unsigned old_flags;
  71
  72    /*
  73     * The memory barrier implicit in qatomic_fetch_or makes sure that:
  74     * 1. idle & any writes needed by the callback are done before the
  75     *    locations are read in the aio_bh_poll.
  76     * 2. ctx is loaded before the callback has a chance to execute and bh
  77     *    could be freed.
  78     */
  79    old_flags = qatomic_fetch_or(&bh->flags, BH_PENDING | new_flags);
  80    if (!(old_flags & BH_PENDING)) {
  81        QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next);
  82    }
  83
  84    aio_notify(ctx);
  85}
  86
  87/* Only called from aio_bh_poll() and aio_ctx_finalize() */
  88static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags)
  89{
  90    QEMUBH *bh = QSLIST_FIRST_RCU(head);
  91
  92    if (!bh) {
  93        return NULL;
  94    }
  95
  96    QSLIST_REMOVE_HEAD(head, next);
  97
  98    /*
  99     * The qatomic_and is paired with aio_bh_enqueue().  The implicit memory
 100     * barrier ensures that the callback sees all writes done by the scheduling
 101     * thread.  It also ensures that the scheduling thread sees the cleared
 102     * flag before bh->cb has run, and thus will call aio_notify again if
 103     * necessary.
 104     */
 105    *flags = qatomic_fetch_and(&bh->flags,
 106                              ~(BH_PENDING | BH_SCHEDULED | BH_IDLE));
 107    return bh;
 108}
 109
 110void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
 111{
 112    QEMUBH *bh;
 113    bh = g_new(QEMUBH, 1);
 114    *bh = (QEMUBH){
 115        .ctx = ctx,
 116        .cb = cb,
 117        .opaque = opaque,
 118    };
 119    aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT);
 120}
 121
 122QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
 123{
 124    QEMUBH *bh;
 125    bh = g_new(QEMUBH, 1);
 126    *bh = (QEMUBH){
 127        .ctx = ctx,
 128        .cb = cb,
 129        .opaque = opaque,
 130    };
 131    return bh;
 132}
 133
 134void aio_bh_call(QEMUBH *bh)
 135{
 136    bh->cb(bh->opaque);
 137}
 138
 139/* Multiple occurrences of aio_bh_poll cannot be called concurrently. */
 140int aio_bh_poll(AioContext *ctx)
 141{
 142    BHListSlice slice;
 143    BHListSlice *s;
 144    int ret = 0;
 145
 146    QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list);
 147    QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next);
 148
 149    while ((s = QSIMPLEQ_FIRST(&ctx->bh_slice_list))) {
 150        QEMUBH *bh;
 151        unsigned flags;
 152
 153        bh = aio_bh_dequeue(&s->bh_list, &flags);
 154        if (!bh) {
 155            QSIMPLEQ_REMOVE_HEAD(&ctx->bh_slice_list, next);
 156            continue;
 157        }
 158
 159        if ((flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 160            /* Idle BHs don't count as progress */
 161            if (!(flags & BH_IDLE)) {
 162                ret = 1;
 163            }
 164            aio_bh_call(bh);
 165        }
 166        if (flags & (BH_DELETED | BH_ONESHOT)) {
 167            g_free(bh);
 168        }
 169    }
 170
 171    return ret;
 172}
 173
 174void qemu_bh_schedule_idle(QEMUBH *bh)
 175{
 176    aio_bh_enqueue(bh, BH_SCHEDULED | BH_IDLE);
 177}
 178
 179void qemu_bh_schedule(QEMUBH *bh)
 180{
 181    aio_bh_enqueue(bh, BH_SCHEDULED);
 182}
 183
 184/* This func is async.
 185 */
 186void qemu_bh_cancel(QEMUBH *bh)
 187{
 188    qatomic_and(&bh->flags, ~BH_SCHEDULED);
 189}
 190
 191/* This func is async.The bottom half will do the delete action at the finial
 192 * end.
 193 */
 194void qemu_bh_delete(QEMUBH *bh)
 195{
 196    aio_bh_enqueue(bh, BH_DELETED);
 197}
 198
 199static int64_t aio_compute_bh_timeout(BHList *head, int timeout)
 200{
 201    QEMUBH *bh;
 202
 203    QSLIST_FOREACH_RCU(bh, head, next) {
 204        if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 205            if (bh->flags & BH_IDLE) {
 206                /* idle bottom halves will be polled at least
 207                 * every 10ms */
 208                timeout = 10000000;
 209            } else {
 210                /* non-idle bottom halves will be executed
 211                 * immediately */
 212                return 0;
 213            }
 214        }
 215    }
 216
 217    return timeout;
 218}
 219
 220int64_t
 221aio_compute_timeout(AioContext *ctx)
 222{
 223    BHListSlice *s;
 224    int64_t deadline;
 225    int timeout = -1;
 226
 227    timeout = aio_compute_bh_timeout(&ctx->bh_list, timeout);
 228    if (timeout == 0) {
 229        return 0;
 230    }
 231
 232    QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
 233        timeout = aio_compute_bh_timeout(&s->bh_list, timeout);
 234        if (timeout == 0) {
 235            return 0;
 236        }
 237    }
 238
 239    deadline = timerlistgroup_deadline_ns(&ctx->tlg);
 240    if (deadline == 0) {
 241        return 0;
 242    } else {
 243        return qemu_soonest_timeout(timeout, deadline);
 244    }
 245}
 246
 247static gboolean
 248aio_ctx_prepare(GSource *source, gint    *timeout)
 249{
 250    AioContext *ctx = (AioContext *) source;
 251
 252    qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) | 1);
 253
 254    /*
 255     * Write ctx->notify_me before computing the timeout
 256     * (reading bottom half flags, etc.).  Pairs with
 257     * smp_mb in aio_notify().
 258     */
 259    smp_mb();
 260
 261    /* We assume there is no timeout already supplied */
 262    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
 263
 264    if (aio_prepare(ctx)) {
 265        *timeout = 0;
 266    }
 267
 268    return *timeout == 0;
 269}
 270
 271static gboolean
 272aio_ctx_check(GSource *source)
 273{
 274    AioContext *ctx = (AioContext *) source;
 275    QEMUBH *bh;
 276    BHListSlice *s;
 277
 278    /* Finish computing the timeout before clearing the flag.  */
 279    qatomic_store_release(&ctx->notify_me, qatomic_read(&ctx->notify_me) & ~1);
 280    aio_notify_accept(ctx);
 281
 282    QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
 283        if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 284            return true;
 285        }
 286    }
 287
 288    QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
 289        QSLIST_FOREACH_RCU(bh, &s->bh_list, next) {
 290            if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 291                return true;
 292            }
 293        }
 294    }
 295    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
 296}
 297
 298static gboolean
 299aio_ctx_dispatch(GSource     *source,
 300                 GSourceFunc  callback,
 301                 gpointer     user_data)
 302{
 303    AioContext *ctx = (AioContext *) source;
 304
 305    assert(callback == NULL);
 306    aio_dispatch(ctx);
 307    return true;
 308}
 309
 310static void
 311aio_ctx_finalize(GSource     *source)
 312{
 313    AioContext *ctx = (AioContext *) source;
 314    QEMUBH *bh;
 315    unsigned flags;
 316
 317    thread_pool_free(ctx->thread_pool);
 318
 319#ifdef CONFIG_LINUX_AIO
 320    if (ctx->linux_aio) {
 321        laio_detach_aio_context(ctx->linux_aio, ctx);
 322        laio_cleanup(ctx->linux_aio);
 323        ctx->linux_aio = NULL;
 324    }
 325#endif
 326
 327#ifdef CONFIG_LINUX_IO_URING
 328    if (ctx->linux_io_uring) {
 329        luring_detach_aio_context(ctx->linux_io_uring, ctx);
 330        luring_cleanup(ctx->linux_io_uring);
 331        ctx->linux_io_uring = NULL;
 332    }
 333#endif
 334
 335    assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
 336    qemu_bh_delete(ctx->co_schedule_bh);
 337
 338    /* There must be no aio_bh_poll() calls going on */
 339    assert(QSIMPLEQ_EMPTY(&ctx->bh_slice_list));
 340
 341    while ((bh = aio_bh_dequeue(&ctx->bh_list, &flags))) {
 342        /* qemu_bh_delete() must have been called on BHs in this AioContext */
 343        assert(flags & BH_DELETED);
 344
 345        g_free(bh);
 346    }
 347
 348    aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
 349    event_notifier_cleanup(&ctx->notifier);
 350    qemu_rec_mutex_destroy(&ctx->lock);
 351    qemu_lockcnt_destroy(&ctx->list_lock);
 352    timerlistgroup_deinit(&ctx->tlg);
 353    aio_context_destroy(ctx);
 354}
 355
 356static GSourceFuncs aio_source_funcs = {
 357    aio_ctx_prepare,
 358    aio_ctx_check,
 359    aio_ctx_dispatch,
 360    aio_ctx_finalize
 361};
 362
 363GSource *aio_get_g_source(AioContext *ctx)
 364{
 365    aio_context_use_g_source(ctx);
 366    g_source_ref(&ctx->source);
 367    return &ctx->source;
 368}
 369
 370ThreadPool *aio_get_thread_pool(AioContext *ctx)
 371{
 372    if (!ctx->thread_pool) {
 373        ctx->thread_pool = thread_pool_new(ctx);
 374    }
 375    return ctx->thread_pool;
 376}
 377
 378#ifdef CONFIG_LINUX_AIO
 379LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp)
 380{
 381    if (!ctx->linux_aio) {
 382        ctx->linux_aio = laio_init(errp);
 383        if (ctx->linux_aio) {
 384            laio_attach_aio_context(ctx->linux_aio, ctx);
 385        }
 386    }
 387    return ctx->linux_aio;
 388}
 389
 390LinuxAioState *aio_get_linux_aio(AioContext *ctx)
 391{
 392    assert(ctx->linux_aio);
 393    return ctx->linux_aio;
 394}
 395#endif
 396
 397#ifdef CONFIG_LINUX_IO_URING
 398LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp)
 399{
 400    if (ctx->linux_io_uring) {
 401        return ctx->linux_io_uring;
 402    }
 403
 404    ctx->linux_io_uring = luring_init(errp);
 405    if (!ctx->linux_io_uring) {
 406        return NULL;
 407    }
 408
 409    luring_attach_aio_context(ctx->linux_io_uring, ctx);
 410    return ctx->linux_io_uring;
 411}
 412
 413LuringState *aio_get_linux_io_uring(AioContext *ctx)
 414{
 415    assert(ctx->linux_io_uring);
 416    return ctx->linux_io_uring;
 417}
 418#endif
 419
 420void aio_notify(AioContext *ctx)
 421{
 422    /*
 423     * Write e.g. bh->flags before writing ctx->notified.  Pairs with smp_mb in
 424     * aio_notify_accept.
 425     */
 426    smp_wmb();
 427    qatomic_set(&ctx->notified, true);
 428
 429    /*
 430     * Write ctx->notified before reading ctx->notify_me.  Pairs
 431     * with smp_mb in aio_ctx_prepare or aio_poll.
 432     */
 433    smp_mb();
 434    if (qatomic_read(&ctx->notify_me)) {
 435        event_notifier_set(&ctx->notifier);
 436    }
 437}
 438
 439void aio_notify_accept(AioContext *ctx)
 440{
 441    qatomic_set(&ctx->notified, false);
 442
 443    /*
 444     * Write ctx->notified before reading e.g. bh->flags.  Pairs with smp_wmb
 445     * in aio_notify.
 446     */
 447    smp_mb();
 448}
 449
 450static void aio_timerlist_notify(void *opaque, QEMUClockType type)
 451{
 452    aio_notify(opaque);
 453}
 454
 455static void aio_context_notifier_cb(EventNotifier *e)
 456{
 457    AioContext *ctx = container_of(e, AioContext, notifier);
 458
 459    event_notifier_test_and_clear(&ctx->notifier);
 460}
 461
 462/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
 463static bool aio_context_notifier_poll(void *opaque)
 464{
 465    EventNotifier *e = opaque;
 466    AioContext *ctx = container_of(e, AioContext, notifier);
 467
 468    return qatomic_read(&ctx->notified);
 469}
 470
 471static void co_schedule_bh_cb(void *opaque)
 472{
 473    AioContext *ctx = opaque;
 474    QSLIST_HEAD(, Coroutine) straight, reversed;
 475
 476    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
 477    QSLIST_INIT(&straight);
 478
 479    while (!QSLIST_EMPTY(&reversed)) {
 480        Coroutine *co = QSLIST_FIRST(&reversed);
 481        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
 482        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
 483    }
 484
 485    while (!QSLIST_EMPTY(&straight)) {
 486        Coroutine *co = QSLIST_FIRST(&straight);
 487        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
 488        trace_aio_co_schedule_bh_cb(ctx, co);
 489        aio_context_acquire(ctx);
 490
 491        /* Protected by write barrier in qemu_aio_coroutine_enter */
 492        qatomic_set(&co->scheduled, NULL);
 493        qemu_aio_coroutine_enter(ctx, co);
 494        aio_context_release(ctx);
 495    }
 496}
 497
 498AioContext *aio_context_new(Error **errp)
 499{
 500    int ret;
 501    AioContext *ctx;
 502
 503    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
 504    QSLIST_INIT(&ctx->bh_list);
 505    QSIMPLEQ_INIT(&ctx->bh_slice_list);
 506    aio_context_setup(ctx);
 507
 508    ret = event_notifier_init(&ctx->notifier, false);
 509    if (ret < 0) {
 510        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
 511        goto fail;
 512    }
 513    g_source_set_can_recurse(&ctx->source, true);
 514    qemu_lockcnt_init(&ctx->list_lock);
 515
 516    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
 517    QSLIST_INIT(&ctx->scheduled_coroutines);
 518
 519    aio_set_event_notifier(ctx, &ctx->notifier,
 520                           false,
 521                           aio_context_notifier_cb,
 522                           aio_context_notifier_poll);
 523#ifdef CONFIG_LINUX_AIO
 524    ctx->linux_aio = NULL;
 525#endif
 526
 527#ifdef CONFIG_LINUX_IO_URING
 528    ctx->linux_io_uring = NULL;
 529#endif
 530
 531    ctx->thread_pool = NULL;
 532    qemu_rec_mutex_init(&ctx->lock);
 533    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 534
 535    ctx->poll_ns = 0;
 536    ctx->poll_max_ns = 0;
 537    ctx->poll_grow = 0;
 538    ctx->poll_shrink = 0;
 539
 540    return ctx;
 541fail:
 542    g_source_destroy(&ctx->source);
 543    return NULL;
 544}
 545
 546void aio_co_schedule(AioContext *ctx, Coroutine *co)
 547{
 548    trace_aio_co_schedule(ctx, co);
 549    const char *scheduled = qatomic_cmpxchg(&co->scheduled, NULL,
 550                                           __func__);
 551
 552    if (scheduled) {
 553        fprintf(stderr,
 554                "%s: Co-routine was already scheduled in '%s'\n",
 555                __func__, scheduled);
 556        abort();
 557    }
 558
 559    /* The coroutine might run and release the last ctx reference before we
 560     * invoke qemu_bh_schedule().  Take a reference to keep ctx alive until
 561     * we're done.
 562     */
 563    aio_context_ref(ctx);
 564
 565    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
 566                              co, co_scheduled_next);
 567    qemu_bh_schedule(ctx->co_schedule_bh);
 568
 569    aio_context_unref(ctx);
 570}
 571
 572typedef struct AioCoRescheduleSelf {
 573    Coroutine *co;
 574    AioContext *new_ctx;
 575} AioCoRescheduleSelf;
 576
 577static void aio_co_reschedule_self_bh(void *opaque)
 578{
 579    AioCoRescheduleSelf *data = opaque;
 580    aio_co_schedule(data->new_ctx, data->co);
 581}
 582
 583void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx)
 584{
 585    AioContext *old_ctx = qemu_get_current_aio_context();
 586
 587    if (old_ctx != new_ctx) {
 588        AioCoRescheduleSelf data = {
 589            .co = qemu_coroutine_self(),
 590            .new_ctx = new_ctx,
 591        };
 592        /*
 593         * We can't directly schedule the coroutine in the target context
 594         * because this would be racy: The other thread could try to enter the
 595         * coroutine before it has yielded in this one.
 596         */
 597        aio_bh_schedule_oneshot(old_ctx, aio_co_reschedule_self_bh, &data);
 598        qemu_coroutine_yield();
 599    }
 600}
 601
 602void aio_co_wake(struct Coroutine *co)
 603{
 604    AioContext *ctx;
 605
 606    /* Read coroutine before co->ctx.  Matches smp_wmb in
 607     * qemu_coroutine_enter.
 608     */
 609    smp_read_barrier_depends();
 610    ctx = qatomic_read(&co->ctx);
 611
 612    aio_co_enter(ctx, co);
 613}
 614
 615void aio_co_enter(AioContext *ctx, struct Coroutine *co)
 616{
 617    if (ctx != qemu_get_current_aio_context()) {
 618        aio_co_schedule(ctx, co);
 619        return;
 620    }
 621
 622    if (qemu_in_coroutine()) {
 623        Coroutine *self = qemu_coroutine_self();
 624        assert(self != co);
 625        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
 626    } else {
 627        aio_context_acquire(ctx);
 628        qemu_aio_coroutine_enter(ctx, co);
 629        aio_context_release(ctx);
 630    }
 631}
 632
 633void aio_context_ref(AioContext *ctx)
 634{
 635    g_source_ref(&ctx->source);
 636}
 637
 638void aio_context_unref(AioContext *ctx)
 639{
 640    g_source_unref(&ctx->source);
 641}
 642
 643void aio_context_acquire(AioContext *ctx)
 644{
 645    qemu_rec_mutex_lock(&ctx->lock);
 646}
 647
 648void aio_context_release(AioContext *ctx)
 649{
 650    qemu_rec_mutex_unlock(&ctx->lock);
 651}
 652