qemu/util/async.c
<<
>>
Prefs
   1/*
   2 * Data plane event loop
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2009-2017 QEMU contributors
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qapi/error.h"
  28#include "block/aio.h"
  29#include "block/thread-pool.h"
  30#include "qemu/main-loop.h"
  31#include "qemu/atomic.h"
  32#include "qemu/rcu_queue.h"
  33#include "block/raw-aio.h"
  34#include "qemu/coroutine_int.h"
  35#include "trace.h"
  36
  37/***********************************************************/
  38/* bottom halves (can be seen as timers which expire ASAP) */
  39
  40/* QEMUBH::flags values */
  41enum {
  42    /* Already enqueued and waiting for aio_bh_poll() */
  43    BH_PENDING   = (1 << 0),
  44
  45    /* Invoke the callback */
  46    BH_SCHEDULED = (1 << 1),
  47
  48    /* Delete without invoking callback */
  49    BH_DELETED   = (1 << 2),
  50
  51    /* Delete after invoking callback */
  52    BH_ONESHOT   = (1 << 3),
  53
  54    /* Schedule periodically when the event loop is idle */
  55    BH_IDLE      = (1 << 4),
  56};
  57
  58struct QEMUBH {
  59    AioContext *ctx;
  60    QEMUBHFunc *cb;
  61    void *opaque;
  62    QSLIST_ENTRY(QEMUBH) next;
  63    unsigned flags;
  64};
  65
  66/* Called concurrently from any thread */
  67static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags)
  68{
  69    AioContext *ctx = bh->ctx;
  70    unsigned old_flags;
  71
  72    /*
  73     * The memory barrier implicit in atomic_fetch_or makes sure that:
  74     * 1. idle & any writes needed by the callback are done before the
  75     *    locations are read in the aio_bh_poll.
  76     * 2. ctx is loaded before the callback has a chance to execute and bh
  77     *    could be freed.
  78     */
  79    old_flags = atomic_fetch_or(&bh->flags, BH_PENDING | new_flags);
  80    if (!(old_flags & BH_PENDING)) {
  81        QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next);
  82    }
  83
  84    aio_notify(ctx);
  85}
  86
  87/* Only called from aio_bh_poll() and aio_ctx_finalize() */
  88static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags)
  89{
  90    QEMUBH *bh = QSLIST_FIRST_RCU(head);
  91
  92    if (!bh) {
  93        return NULL;
  94    }
  95
  96    QSLIST_REMOVE_HEAD(head, next);
  97
  98    /*
  99     * The atomic_and is paired with aio_bh_enqueue().  The implicit memory
 100     * barrier ensures that the callback sees all writes done by the scheduling
 101     * thread.  It also ensures that the scheduling thread sees the cleared
 102     * flag before bh->cb has run, and thus will call aio_notify again if
 103     * necessary.
 104     */
 105    *flags = atomic_fetch_and(&bh->flags,
 106                              ~(BH_PENDING | BH_SCHEDULED | BH_IDLE));
 107    return bh;
 108}
 109
 110void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
 111{
 112    QEMUBH *bh;
 113    bh = g_new(QEMUBH, 1);
 114    *bh = (QEMUBH){
 115        .ctx = ctx,
 116        .cb = cb,
 117        .opaque = opaque,
 118    };
 119    aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT);
 120}
 121
 122QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
 123{
 124    QEMUBH *bh;
 125    bh = g_new(QEMUBH, 1);
 126    *bh = (QEMUBH){
 127        .ctx = ctx,
 128        .cb = cb,
 129        .opaque = opaque,
 130    };
 131    return bh;
 132}
 133
 134void aio_bh_call(QEMUBH *bh)
 135{
 136    bh->cb(bh->opaque);
 137}
 138
 139/* Multiple occurrences of aio_bh_poll cannot be called concurrently. */
 140int aio_bh_poll(AioContext *ctx)
 141{
 142    BHListSlice slice;
 143    BHListSlice *s;
 144    int ret = 0;
 145
 146    QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list);
 147    QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next);
 148
 149    while ((s = QSIMPLEQ_FIRST(&ctx->bh_slice_list))) {
 150        QEMUBH *bh;
 151        unsigned flags;
 152
 153        bh = aio_bh_dequeue(&s->bh_list, &flags);
 154        if (!bh) {
 155            QSIMPLEQ_REMOVE_HEAD(&ctx->bh_slice_list, next);
 156            continue;
 157        }
 158
 159        if ((flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 160            /* Idle BHs don't count as progress */
 161            if (!(flags & BH_IDLE)) {
 162                ret = 1;
 163            }
 164            aio_bh_call(bh);
 165        }
 166        if (flags & (BH_DELETED | BH_ONESHOT)) {
 167            g_free(bh);
 168        }
 169    }
 170
 171    return ret;
 172}
 173
 174void qemu_bh_schedule_idle(QEMUBH *bh)
 175{
 176    aio_bh_enqueue(bh, BH_SCHEDULED | BH_IDLE);
 177}
 178
 179void qemu_bh_schedule(QEMUBH *bh)
 180{
 181    aio_bh_enqueue(bh, BH_SCHEDULED);
 182}
 183
 184/* This func is async.
 185 */
 186void qemu_bh_cancel(QEMUBH *bh)
 187{
 188    atomic_and(&bh->flags, ~BH_SCHEDULED);
 189}
 190
 191/* This func is async.The bottom half will do the delete action at the finial
 192 * end.
 193 */
 194void qemu_bh_delete(QEMUBH *bh)
 195{
 196    aio_bh_enqueue(bh, BH_DELETED);
 197}
 198
 199static int64_t aio_compute_bh_timeout(BHList *head, int timeout)
 200{
 201    QEMUBH *bh;
 202
 203    QSLIST_FOREACH_RCU(bh, head, next) {
 204        if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 205            if (bh->flags & BH_IDLE) {
 206                /* idle bottom halves will be polled at least
 207                 * every 10ms */
 208                timeout = 10000000;
 209            } else {
 210                /* non-idle bottom halves will be executed
 211                 * immediately */
 212                return 0;
 213            }
 214        }
 215    }
 216
 217    return timeout;
 218}
 219
 220int64_t
 221aio_compute_timeout(AioContext *ctx)
 222{
 223    BHListSlice *s;
 224    int64_t deadline;
 225    int timeout = -1;
 226
 227    timeout = aio_compute_bh_timeout(&ctx->bh_list, timeout);
 228    if (timeout == 0) {
 229        return 0;
 230    }
 231
 232    QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
 233        timeout = aio_compute_bh_timeout(&s->bh_list, timeout);
 234        if (timeout == 0) {
 235            return 0;
 236        }
 237    }
 238
 239    deadline = timerlistgroup_deadline_ns(&ctx->tlg);
 240    if (deadline == 0) {
 241        return 0;
 242    } else {
 243        return qemu_soonest_timeout(timeout, deadline);
 244    }
 245}
 246
 247static gboolean
 248aio_ctx_prepare(GSource *source, gint    *timeout)
 249{
 250    AioContext *ctx = (AioContext *) source;
 251
 252    atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) | 1);
 253
 254    /*
 255     * Write ctx->notify_me before computing the timeout
 256     * (reading bottom half flags, etc.).  Pairs with
 257     * smp_mb in aio_notify().
 258     */
 259    smp_mb();
 260
 261    /* We assume there is no timeout already supplied */
 262    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
 263
 264    if (aio_prepare(ctx)) {
 265        *timeout = 0;
 266    }
 267
 268    return *timeout == 0;
 269}
 270
 271static gboolean
 272aio_ctx_check(GSource *source)
 273{
 274    AioContext *ctx = (AioContext *) source;
 275    QEMUBH *bh;
 276    BHListSlice *s;
 277
 278    /* Finish computing the timeout before clearing the flag.  */
 279    atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) & ~1);
 280    aio_notify_accept(ctx);
 281
 282    QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
 283        if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 284            return true;
 285        }
 286    }
 287
 288    QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
 289        QSLIST_FOREACH_RCU(bh, &s->bh_list, next) {
 290            if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 291                return true;
 292            }
 293        }
 294    }
 295    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
 296}
 297
 298static gboolean
 299aio_ctx_dispatch(GSource     *source,
 300                 GSourceFunc  callback,
 301                 gpointer     user_data)
 302{
 303    AioContext *ctx = (AioContext *) source;
 304
 305    assert(callback == NULL);
 306    aio_dispatch(ctx);
 307    return true;
 308}
 309
 310static void
 311aio_ctx_finalize(GSource     *source)
 312{
 313    AioContext *ctx = (AioContext *) source;
 314    QEMUBH *bh;
 315    unsigned flags;
 316
 317    thread_pool_free(ctx->thread_pool);
 318
 319#ifdef CONFIG_LINUX_AIO
 320    if (ctx->linux_aio) {
 321        laio_detach_aio_context(ctx->linux_aio, ctx);
 322        laio_cleanup(ctx->linux_aio);
 323        ctx->linux_aio = NULL;
 324    }
 325#endif
 326
 327#ifdef CONFIG_LINUX_IO_URING
 328    if (ctx->linux_io_uring) {
 329        luring_detach_aio_context(ctx->linux_io_uring, ctx);
 330        luring_cleanup(ctx->linux_io_uring);
 331        ctx->linux_io_uring = NULL;
 332    }
 333#endif
 334
 335    assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
 336    qemu_bh_delete(ctx->co_schedule_bh);
 337
 338    /* There must be no aio_bh_poll() calls going on */
 339    assert(QSIMPLEQ_EMPTY(&ctx->bh_slice_list));
 340
 341    while ((bh = aio_bh_dequeue(&ctx->bh_list, &flags))) {
 342        /* qemu_bh_delete() must have been called on BHs in this AioContext */
 343        assert(flags & BH_DELETED);
 344
 345        g_free(bh);
 346    }
 347
 348    aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
 349    event_notifier_cleanup(&ctx->notifier);
 350    qemu_rec_mutex_destroy(&ctx->lock);
 351    qemu_lockcnt_destroy(&ctx->list_lock);
 352    timerlistgroup_deinit(&ctx->tlg);
 353    aio_context_destroy(ctx);
 354}
 355
 356static GSourceFuncs aio_source_funcs = {
 357    aio_ctx_prepare,
 358    aio_ctx_check,
 359    aio_ctx_dispatch,
 360    aio_ctx_finalize
 361};
 362
 363GSource *aio_get_g_source(AioContext *ctx)
 364{
 365    aio_context_use_g_source(ctx);
 366    g_source_ref(&ctx->source);
 367    return &ctx->source;
 368}
 369
 370ThreadPool *aio_get_thread_pool(AioContext *ctx)
 371{
 372    if (!ctx->thread_pool) {
 373        ctx->thread_pool = thread_pool_new(ctx);
 374    }
 375    return ctx->thread_pool;
 376}
 377
 378#ifdef CONFIG_LINUX_AIO
 379LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp)
 380{
 381    if (!ctx->linux_aio) {
 382        ctx->linux_aio = laio_init(errp);
 383        if (ctx->linux_aio) {
 384            laio_attach_aio_context(ctx->linux_aio, ctx);
 385        }
 386    }
 387    return ctx->linux_aio;
 388}
 389
 390LinuxAioState *aio_get_linux_aio(AioContext *ctx)
 391{
 392    assert(ctx->linux_aio);
 393    return ctx->linux_aio;
 394}
 395#endif
 396
 397#ifdef CONFIG_LINUX_IO_URING
 398LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp)
 399{
 400    if (ctx->linux_io_uring) {
 401        return ctx->linux_io_uring;
 402    }
 403
 404    ctx->linux_io_uring = luring_init(errp);
 405    if (!ctx->linux_io_uring) {
 406        return NULL;
 407    }
 408
 409    luring_attach_aio_context(ctx->linux_io_uring, ctx);
 410    return ctx->linux_io_uring;
 411}
 412
 413LuringState *aio_get_linux_io_uring(AioContext *ctx)
 414{
 415    assert(ctx->linux_io_uring);
 416    return ctx->linux_io_uring;
 417}
 418#endif
 419
 420void aio_notify(AioContext *ctx)
 421{
 422    /* Write e.g. bh->scheduled before reading ctx->notify_me.  Pairs
 423     * with smp_mb in aio_ctx_prepare or aio_poll.
 424     */
 425    smp_mb();
 426    if (atomic_read(&ctx->notify_me)) {
 427        event_notifier_set(&ctx->notifier);
 428        atomic_mb_set(&ctx->notified, true);
 429    }
 430}
 431
 432void aio_notify_accept(AioContext *ctx)
 433{
 434    if (atomic_xchg(&ctx->notified, false)
 435#ifdef WIN32
 436        || true
 437#endif
 438    ) {
 439        event_notifier_test_and_clear(&ctx->notifier);
 440    }
 441}
 442
 443static void aio_timerlist_notify(void *opaque, QEMUClockType type)
 444{
 445    aio_notify(opaque);
 446}
 447
 448static void event_notifier_dummy_cb(EventNotifier *e)
 449{
 450}
 451
 452/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
 453static bool event_notifier_poll(void *opaque)
 454{
 455    EventNotifier *e = opaque;
 456    AioContext *ctx = container_of(e, AioContext, notifier);
 457
 458    return atomic_read(&ctx->notified);
 459}
 460
 461static void co_schedule_bh_cb(void *opaque)
 462{
 463    AioContext *ctx = opaque;
 464    QSLIST_HEAD(, Coroutine) straight, reversed;
 465
 466    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
 467    QSLIST_INIT(&straight);
 468
 469    while (!QSLIST_EMPTY(&reversed)) {
 470        Coroutine *co = QSLIST_FIRST(&reversed);
 471        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
 472        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
 473    }
 474
 475    while (!QSLIST_EMPTY(&straight)) {
 476        Coroutine *co = QSLIST_FIRST(&straight);
 477        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
 478        trace_aio_co_schedule_bh_cb(ctx, co);
 479        aio_context_acquire(ctx);
 480
 481        /* Protected by write barrier in qemu_aio_coroutine_enter */
 482        atomic_set(&co->scheduled, NULL);
 483        qemu_aio_coroutine_enter(ctx, co);
 484        aio_context_release(ctx);
 485    }
 486}
 487
 488AioContext *aio_context_new(Error **errp)
 489{
 490    int ret;
 491    AioContext *ctx;
 492
 493    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
 494    QSLIST_INIT(&ctx->bh_list);
 495    QSIMPLEQ_INIT(&ctx->bh_slice_list);
 496    aio_context_setup(ctx);
 497
 498    ret = event_notifier_init(&ctx->notifier, false);
 499    if (ret < 0) {
 500        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
 501        goto fail;
 502    }
 503    g_source_set_can_recurse(&ctx->source, true);
 504    qemu_lockcnt_init(&ctx->list_lock);
 505
 506    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
 507    QSLIST_INIT(&ctx->scheduled_coroutines);
 508
 509    aio_set_event_notifier(ctx, &ctx->notifier,
 510                           false,
 511                           event_notifier_dummy_cb,
 512                           event_notifier_poll);
 513#ifdef CONFIG_LINUX_AIO
 514    ctx->linux_aio = NULL;
 515#endif
 516
 517#ifdef CONFIG_LINUX_IO_URING
 518    ctx->linux_io_uring = NULL;
 519#endif
 520
 521    ctx->thread_pool = NULL;
 522    qemu_rec_mutex_init(&ctx->lock);
 523    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 524
 525    ctx->poll_ns = 0;
 526    ctx->poll_max_ns = 0;
 527    ctx->poll_grow = 0;
 528    ctx->poll_shrink = 0;
 529
 530    return ctx;
 531fail:
 532    g_source_destroy(&ctx->source);
 533    return NULL;
 534}
 535
 536void aio_co_schedule(AioContext *ctx, Coroutine *co)
 537{
 538    trace_aio_co_schedule(ctx, co);
 539    const char *scheduled = atomic_cmpxchg(&co->scheduled, NULL,
 540                                           __func__);
 541
 542    if (scheduled) {
 543        fprintf(stderr,
 544                "%s: Co-routine was already scheduled in '%s'\n",
 545                __func__, scheduled);
 546        abort();
 547    }
 548
 549    /* The coroutine might run and release the last ctx reference before we
 550     * invoke qemu_bh_schedule().  Take a reference to keep ctx alive until
 551     * we're done.
 552     */
 553    aio_context_ref(ctx);
 554
 555    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
 556                              co, co_scheduled_next);
 557    qemu_bh_schedule(ctx->co_schedule_bh);
 558
 559    aio_context_unref(ctx);
 560}
 561
 562void aio_co_wake(struct Coroutine *co)
 563{
 564    AioContext *ctx;
 565
 566    /* Read coroutine before co->ctx.  Matches smp_wmb in
 567     * qemu_coroutine_enter.
 568     */
 569    smp_read_barrier_depends();
 570    ctx = atomic_read(&co->ctx);
 571
 572    aio_co_enter(ctx, co);
 573}
 574
 575void aio_co_enter(AioContext *ctx, struct Coroutine *co)
 576{
 577    if (ctx != qemu_get_current_aio_context()) {
 578        aio_co_schedule(ctx, co);
 579        return;
 580    }
 581
 582    if (qemu_in_coroutine()) {
 583        Coroutine *self = qemu_coroutine_self();
 584        assert(self != co);
 585        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
 586    } else {
 587        aio_context_acquire(ctx);
 588        qemu_aio_coroutine_enter(ctx, co);
 589        aio_context_release(ctx);
 590    }
 591}
 592
 593void aio_context_ref(AioContext *ctx)
 594{
 595    g_source_ref(&ctx->source);
 596}
 597
 598void aio_context_unref(AioContext *ctx)
 599{
 600    g_source_unref(&ctx->source);
 601}
 602
 603void aio_context_acquire(AioContext *ctx)
 604{
 605    qemu_rec_mutex_lock(&ctx->lock);
 606}
 607
 608void aio_context_release(AioContext *ctx)
 609{
 610    qemu_rec_mutex_unlock(&ctx->lock);
 611}
 612