qemu/util/async.c
<<
>>
Prefs
   1/*
   2 * Data plane event loop
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2009-2017 QEMU contributors
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qapi/error.h"
  28#include "qemu-common.h"
  29#include "block/aio.h"
  30#include "block/thread-pool.h"
  31#include "qemu/main-loop.h"
  32#include "qemu/atomic.h"
  33#include "block/raw-aio.h"
  34#include "qemu/coroutine_int.h"
  35#include "trace.h"
  36
  37/***********************************************************/
  38/* bottom halves (can be seen as timers which expire ASAP) */
  39
  40struct QEMUBH {
  41    AioContext *ctx;
  42    QEMUBHFunc *cb;
  43    void *opaque;
  44    QEMUBH *next;
  45    bool scheduled;
  46    bool idle;
  47    bool deleted;
  48};
  49
  50void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  51{
  52    QEMUBH *bh;
  53    bh = g_new(QEMUBH, 1);
  54    *bh = (QEMUBH){
  55        .ctx = ctx,
  56        .cb = cb,
  57        .opaque = opaque,
  58    };
  59    qemu_lockcnt_lock(&ctx->list_lock);
  60    bh->next = ctx->first_bh;
  61    bh->scheduled = 1;
  62    bh->deleted = 1;
  63    /* Make sure that the members are ready before putting bh into list */
  64    smp_wmb();
  65    ctx->first_bh = bh;
  66    qemu_lockcnt_unlock(&ctx->list_lock);
  67    aio_notify(ctx);
  68}
  69
  70QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  71{
  72    QEMUBH *bh;
  73    bh = g_new(QEMUBH, 1);
  74    *bh = (QEMUBH){
  75        .ctx = ctx,
  76        .cb = cb,
  77        .opaque = opaque,
  78    };
  79    qemu_lockcnt_lock(&ctx->list_lock);
  80    bh->next = ctx->first_bh;
  81    /* Make sure that the members are ready before putting bh into list */
  82    smp_wmb();
  83    ctx->first_bh = bh;
  84    qemu_lockcnt_unlock(&ctx->list_lock);
  85    return bh;
  86}
  87
  88void aio_bh_call(QEMUBH *bh)
  89{
  90    bh->cb(bh->opaque);
  91}
  92
  93/* Multiple occurrences of aio_bh_poll cannot be called concurrently.
  94 * The count in ctx->list_lock is incremented before the call, and is
  95 * not affected by the call.
  96 */
  97int aio_bh_poll(AioContext *ctx)
  98{
  99    QEMUBH *bh, **bhp, *next;
 100    int ret;
 101    bool deleted = false;
 102
 103    ret = 0;
 104    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
 105        next = atomic_rcu_read(&bh->next);
 106        /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
 107         * implicit memory barrier ensures that the callback sees all writes
 108         * done by the scheduling thread.  It also ensures that the scheduling
 109         * thread sees the zero before bh->cb has run, and thus will call
 110         * aio_notify again if necessary.
 111         */
 112        if (atomic_xchg(&bh->scheduled, 0)) {
 113            /* Idle BHs don't count as progress */
 114            if (!bh->idle) {
 115                ret = 1;
 116            }
 117            bh->idle = 0;
 118            aio_bh_call(bh);
 119        }
 120        if (bh->deleted) {
 121            deleted = true;
 122        }
 123    }
 124
 125    /* remove deleted bhs */
 126    if (!deleted) {
 127        return ret;
 128    }
 129
 130    if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 131        bhp = &ctx->first_bh;
 132        while (*bhp) {
 133            bh = *bhp;
 134            if (bh->deleted && !bh->scheduled) {
 135                *bhp = bh->next;
 136                g_free(bh);
 137            } else {
 138                bhp = &bh->next;
 139            }
 140        }
 141        qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 142    }
 143    return ret;
 144}
 145
 146void qemu_bh_schedule_idle(QEMUBH *bh)
 147{
 148    bh->idle = 1;
 149    /* Make sure that idle & any writes needed by the callback are done
 150     * before the locations are read in the aio_bh_poll.
 151     */
 152    atomic_mb_set(&bh->scheduled, 1);
 153}
 154
 155void qemu_bh_schedule(QEMUBH *bh)
 156{
 157    AioContext *ctx;
 158
 159    ctx = bh->ctx;
 160    bh->idle = 0;
 161    /* The memory barrier implicit in atomic_xchg makes sure that:
 162     * 1. idle & any writes needed by the callback are done before the
 163     *    locations are read in the aio_bh_poll.
 164     * 2. ctx is loaded before scheduled is set and the callback has a chance
 165     *    to execute.
 166     */
 167    if (atomic_xchg(&bh->scheduled, 1) == 0) {
 168        aio_notify(ctx);
 169    }
 170}
 171
 172
 173/* This func is async.
 174 */
 175void qemu_bh_cancel(QEMUBH *bh)
 176{
 177    atomic_mb_set(&bh->scheduled, 0);
 178}
 179
 180/* This func is async.The bottom half will do the delete action at the finial
 181 * end.
 182 */
 183void qemu_bh_delete(QEMUBH *bh)
 184{
 185    bh->scheduled = 0;
 186    bh->deleted = 1;
 187}
 188
 189int64_t
 190aio_compute_timeout(AioContext *ctx)
 191{
 192    int64_t deadline;
 193    int timeout = -1;
 194    QEMUBH *bh;
 195
 196    for (bh = atomic_rcu_read(&ctx->first_bh); bh;
 197         bh = atomic_rcu_read(&bh->next)) {
 198        if (bh->scheduled) {
 199            if (bh->idle) {
 200                /* idle bottom halves will be polled at least
 201                 * every 10ms */
 202                timeout = 10000000;
 203            } else {
 204                /* non-idle bottom halves will be executed
 205                 * immediately */
 206                return 0;
 207            }
 208        }
 209    }
 210
 211    deadline = timerlistgroup_deadline_ns(&ctx->tlg);
 212    if (deadline == 0) {
 213        return 0;
 214    } else {
 215        return qemu_soonest_timeout(timeout, deadline);
 216    }
 217}
 218
 219static gboolean
 220aio_ctx_prepare(GSource *source, gint    *timeout)
 221{
 222    AioContext *ctx = (AioContext *) source;
 223
 224    atomic_or(&ctx->notify_me, 1);
 225
 226    /* We assume there is no timeout already supplied */
 227    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
 228
 229    if (aio_prepare(ctx)) {
 230        *timeout = 0;
 231    }
 232
 233    return *timeout == 0;
 234}
 235
 236static gboolean
 237aio_ctx_check(GSource *source)
 238{
 239    AioContext *ctx = (AioContext *) source;
 240    QEMUBH *bh;
 241
 242    atomic_and(&ctx->notify_me, ~1);
 243    aio_notify_accept(ctx);
 244
 245    for (bh = ctx->first_bh; bh; bh = bh->next) {
 246        if (bh->scheduled) {
 247            return true;
 248        }
 249    }
 250    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
 251}
 252
 253static gboolean
 254aio_ctx_dispatch(GSource     *source,
 255                 GSourceFunc  callback,
 256                 gpointer     user_data)
 257{
 258    AioContext *ctx = (AioContext *) source;
 259
 260    assert(callback == NULL);
 261    aio_dispatch(ctx);
 262    return true;
 263}
 264
 265static void
 266aio_ctx_finalize(GSource     *source)
 267{
 268    AioContext *ctx = (AioContext *) source;
 269
 270    thread_pool_free(ctx->thread_pool);
 271
 272#ifdef CONFIG_LINUX_AIO
 273    if (ctx->linux_aio) {
 274        laio_detach_aio_context(ctx->linux_aio, ctx);
 275        laio_cleanup(ctx->linux_aio);
 276        ctx->linux_aio = NULL;
 277    }
 278#endif
 279
 280    assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
 281    qemu_bh_delete(ctx->co_schedule_bh);
 282
 283    qemu_lockcnt_lock(&ctx->list_lock);
 284    assert(!qemu_lockcnt_count(&ctx->list_lock));
 285    while (ctx->first_bh) {
 286        QEMUBH *next = ctx->first_bh->next;
 287
 288        /* qemu_bh_delete() must have been called on BHs in this AioContext */
 289        assert(ctx->first_bh->deleted);
 290
 291        g_free(ctx->first_bh);
 292        ctx->first_bh = next;
 293    }
 294    qemu_lockcnt_unlock(&ctx->list_lock);
 295
 296    aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
 297    event_notifier_cleanup(&ctx->notifier);
 298    qemu_rec_mutex_destroy(&ctx->lock);
 299    qemu_lockcnt_destroy(&ctx->list_lock);
 300    timerlistgroup_deinit(&ctx->tlg);
 301}
 302
 303static GSourceFuncs aio_source_funcs = {
 304    aio_ctx_prepare,
 305    aio_ctx_check,
 306    aio_ctx_dispatch,
 307    aio_ctx_finalize
 308};
 309
 310GSource *aio_get_g_source(AioContext *ctx)
 311{
 312    g_source_ref(&ctx->source);
 313    return &ctx->source;
 314}
 315
 316ThreadPool *aio_get_thread_pool(AioContext *ctx)
 317{
 318    if (!ctx->thread_pool) {
 319        ctx->thread_pool = thread_pool_new(ctx);
 320    }
 321    return ctx->thread_pool;
 322}
 323
 324#ifdef CONFIG_LINUX_AIO
 325LinuxAioState *aio_get_linux_aio(AioContext *ctx)
 326{
 327    if (!ctx->linux_aio) {
 328        ctx->linux_aio = laio_init();
 329        laio_attach_aio_context(ctx->linux_aio, ctx);
 330    }
 331    return ctx->linux_aio;
 332}
 333#endif
 334
 335void aio_notify(AioContext *ctx)
 336{
 337    /* Write e.g. bh->scheduled before reading ctx->notify_me.  Pairs
 338     * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
 339     */
 340    smp_mb();
 341    if (ctx->notify_me) {
 342        event_notifier_set(&ctx->notifier);
 343        atomic_mb_set(&ctx->notified, true);
 344    }
 345}
 346
 347void aio_notify_accept(AioContext *ctx)
 348{
 349    if (atomic_xchg(&ctx->notified, false)) {
 350        event_notifier_test_and_clear(&ctx->notifier);
 351    }
 352}
 353
 354static void aio_timerlist_notify(void *opaque, QEMUClockType type)
 355{
 356    aio_notify(opaque);
 357}
 358
 359static void event_notifier_dummy_cb(EventNotifier *e)
 360{
 361}
 362
 363/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
 364static bool event_notifier_poll(void *opaque)
 365{
 366    EventNotifier *e = opaque;
 367    AioContext *ctx = container_of(e, AioContext, notifier);
 368
 369    return atomic_read(&ctx->notified);
 370}
 371
 372static void co_schedule_bh_cb(void *opaque)
 373{
 374    AioContext *ctx = opaque;
 375    QSLIST_HEAD(, Coroutine) straight, reversed;
 376
 377    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
 378    QSLIST_INIT(&straight);
 379
 380    while (!QSLIST_EMPTY(&reversed)) {
 381        Coroutine *co = QSLIST_FIRST(&reversed);
 382        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
 383        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
 384    }
 385
 386    while (!QSLIST_EMPTY(&straight)) {
 387        Coroutine *co = QSLIST_FIRST(&straight);
 388        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
 389        trace_aio_co_schedule_bh_cb(ctx, co);
 390        aio_context_acquire(ctx);
 391
 392        /* Protected by write barrier in qemu_aio_coroutine_enter */
 393        atomic_set(&co->scheduled, NULL);
 394        qemu_coroutine_enter(co);
 395        aio_context_release(ctx);
 396    }
 397}
 398
 399AioContext *aio_context_new(Error **errp)
 400{
 401    int ret;
 402    AioContext *ctx;
 403
 404    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
 405    aio_context_setup(ctx);
 406
 407    ret = event_notifier_init(&ctx->notifier, false);
 408    if (ret < 0) {
 409        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
 410        goto fail;
 411    }
 412    g_source_set_can_recurse(&ctx->source, true);
 413    qemu_lockcnt_init(&ctx->list_lock);
 414
 415    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
 416    QSLIST_INIT(&ctx->scheduled_coroutines);
 417
 418    aio_set_event_notifier(ctx, &ctx->notifier,
 419                           false,
 420                           (EventNotifierHandler *)
 421                           event_notifier_dummy_cb,
 422                           event_notifier_poll);
 423#ifdef CONFIG_LINUX_AIO
 424    ctx->linux_aio = NULL;
 425#endif
 426    ctx->thread_pool = NULL;
 427    qemu_rec_mutex_init(&ctx->lock);
 428    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 429
 430    ctx->poll_ns = 0;
 431    ctx->poll_max_ns = 0;
 432    ctx->poll_grow = 0;
 433    ctx->poll_shrink = 0;
 434
 435    return ctx;
 436fail:
 437    g_source_destroy(&ctx->source);
 438    return NULL;
 439}
 440
 441void aio_co_schedule(AioContext *ctx, Coroutine *co)
 442{
 443    trace_aio_co_schedule(ctx, co);
 444    const char *scheduled = atomic_cmpxchg(&co->scheduled, NULL,
 445                                           __func__);
 446
 447    if (scheduled) {
 448        fprintf(stderr,
 449                "%s: Co-routine was already scheduled in '%s'\n",
 450                __func__, scheduled);
 451        abort();
 452    }
 453
 454    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
 455                              co, co_scheduled_next);
 456    qemu_bh_schedule(ctx->co_schedule_bh);
 457}
 458
 459void aio_co_wake(struct Coroutine *co)
 460{
 461    AioContext *ctx;
 462
 463    /* Read coroutine before co->ctx.  Matches smp_wmb in
 464     * qemu_coroutine_enter.
 465     */
 466    smp_read_barrier_depends();
 467    ctx = atomic_read(&co->ctx);
 468
 469    aio_co_enter(ctx, co);
 470}
 471
 472void aio_co_enter(AioContext *ctx, struct Coroutine *co)
 473{
 474    if (ctx != qemu_get_current_aio_context()) {
 475        aio_co_schedule(ctx, co);
 476        return;
 477    }
 478
 479    if (qemu_in_coroutine()) {
 480        Coroutine *self = qemu_coroutine_self();
 481        assert(self != co);
 482        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
 483    } else {
 484        aio_context_acquire(ctx);
 485        qemu_aio_coroutine_enter(ctx, co);
 486        aio_context_release(ctx);
 487    }
 488}
 489
 490void aio_context_ref(AioContext *ctx)
 491{
 492    g_source_ref(&ctx->source);
 493}
 494
 495void aio_context_unref(AioContext *ctx)
 496{
 497    g_source_unref(&ctx->source);
 498}
 499
 500void aio_context_acquire(AioContext *ctx)
 501{
 502    qemu_rec_mutex_lock(&ctx->lock);
 503}
 504
 505void aio_context_release(AioContext *ctx)
 506{
 507    qemu_rec_mutex_unlock(&ctx->lock);
 508}
 509