qemu/util/async.c
<<
>>
Prefs
   1/*
   2 * Data plane event loop
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2009-2017 QEMU contributors
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qapi/error.h"
  28#include "block/aio.h"
  29#include "block/thread-pool.h"
  30#include "qemu/main-loop.h"
  31#include "qemu/atomic.h"
  32#include "block/raw-aio.h"
  33#include "qemu/coroutine_int.h"
  34#include "trace.h"
  35
  36/***********************************************************/
  37/* bottom halves (can be seen as timers which expire ASAP) */
  38
  39struct QEMUBH {
  40    AioContext *ctx;
  41    QEMUBHFunc *cb;
  42    void *opaque;
  43    QEMUBH *next;
  44    bool scheduled;
  45    bool idle;
  46    bool deleted;
  47};
  48
  49void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  50{
  51    QEMUBH *bh;
  52    bh = g_new(QEMUBH, 1);
  53    *bh = (QEMUBH){
  54        .ctx = ctx,
  55        .cb = cb,
  56        .opaque = opaque,
  57    };
  58    qemu_lockcnt_lock(&ctx->list_lock);
  59    bh->next = ctx->first_bh;
  60    bh->scheduled = 1;
  61    bh->deleted = 1;
  62    /* Make sure that the members are ready before putting bh into list */
  63    smp_wmb();
  64    ctx->first_bh = bh;
  65    qemu_lockcnt_unlock(&ctx->list_lock);
  66    aio_notify(ctx);
  67}
  68
  69QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  70{
  71    QEMUBH *bh;
  72    bh = g_new(QEMUBH, 1);
  73    *bh = (QEMUBH){
  74        .ctx = ctx,
  75        .cb = cb,
  76        .opaque = opaque,
  77    };
  78    qemu_lockcnt_lock(&ctx->list_lock);
  79    bh->next = ctx->first_bh;
  80    /* Make sure that the members are ready before putting bh into list */
  81    smp_wmb();
  82    ctx->first_bh = bh;
  83    qemu_lockcnt_unlock(&ctx->list_lock);
  84    return bh;
  85}
  86
  87void aio_bh_call(QEMUBH *bh)
  88{
  89    bh->cb(bh->opaque);
  90}
  91
  92/* Multiple occurrences of aio_bh_poll cannot be called concurrently.
  93 * The count in ctx->list_lock is incremented before the call, and is
  94 * not affected by the call.
  95 */
  96int aio_bh_poll(AioContext *ctx)
  97{
  98    QEMUBH *bh, **bhp, *next;
  99    int ret;
 100    bool deleted = false;
 101
 102    ret = 0;
 103    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
 104        next = atomic_rcu_read(&bh->next);
 105        /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
 106         * implicit memory barrier ensures that the callback sees all writes
 107         * done by the scheduling thread.  It also ensures that the scheduling
 108         * thread sees the zero before bh->cb has run, and thus will call
 109         * aio_notify again if necessary.
 110         */
 111        if (atomic_xchg(&bh->scheduled, 0)) {
 112            /* Idle BHs don't count as progress */
 113            if (!bh->idle) {
 114                ret = 1;
 115            }
 116            bh->idle = 0;
 117            aio_bh_call(bh);
 118        }
 119        if (bh->deleted) {
 120            deleted = true;
 121        }
 122    }
 123
 124    /* remove deleted bhs */
 125    if (!deleted) {
 126        return ret;
 127    }
 128
 129    if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 130        bhp = &ctx->first_bh;
 131        while (*bhp) {
 132            bh = *bhp;
 133            if (bh->deleted && !bh->scheduled) {
 134                *bhp = bh->next;
 135                g_free(bh);
 136            } else {
 137                bhp = &bh->next;
 138            }
 139        }
 140        qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 141    }
 142    return ret;
 143}
 144
 145void qemu_bh_schedule_idle(QEMUBH *bh)
 146{
 147    bh->idle = 1;
 148    /* Make sure that idle & any writes needed by the callback are done
 149     * before the locations are read in the aio_bh_poll.
 150     */
 151    atomic_mb_set(&bh->scheduled, 1);
 152}
 153
 154void qemu_bh_schedule(QEMUBH *bh)
 155{
 156    AioContext *ctx;
 157
 158    ctx = bh->ctx;
 159    bh->idle = 0;
 160    /* The memory barrier implicit in atomic_xchg makes sure that:
 161     * 1. idle & any writes needed by the callback are done before the
 162     *    locations are read in the aio_bh_poll.
 163     * 2. ctx is loaded before scheduled is set and the callback has a chance
 164     *    to execute.
 165     */
 166    if (atomic_xchg(&bh->scheduled, 1) == 0) {
 167        aio_notify(ctx);
 168    }
 169}
 170
 171
 172/* This func is async.
 173 */
 174void qemu_bh_cancel(QEMUBH *bh)
 175{
 176    atomic_mb_set(&bh->scheduled, 0);
 177}
 178
 179/* This func is async.The bottom half will do the delete action at the finial
 180 * end.
 181 */
 182void qemu_bh_delete(QEMUBH *bh)
 183{
 184    bh->scheduled = 0;
 185    bh->deleted = 1;
 186}
 187
 188int64_t
 189aio_compute_timeout(AioContext *ctx)
 190{
 191    int64_t deadline;
 192    int timeout = -1;
 193    QEMUBH *bh;
 194
 195    for (bh = atomic_rcu_read(&ctx->first_bh); bh;
 196         bh = atomic_rcu_read(&bh->next)) {
 197        if (bh->scheduled) {
 198            if (bh->idle) {
 199                /* idle bottom halves will be polled at least
 200                 * every 10ms */
 201                timeout = 10000000;
 202            } else {
 203                /* non-idle bottom halves will be executed
 204                 * immediately */
 205                return 0;
 206            }
 207        }
 208    }
 209
 210    deadline = timerlistgroup_deadline_ns(&ctx->tlg);
 211    if (deadline == 0) {
 212        return 0;
 213    } else {
 214        return qemu_soonest_timeout(timeout, deadline);
 215    }
 216}
 217
 218static gboolean
 219aio_ctx_prepare(GSource *source, gint    *timeout)
 220{
 221    AioContext *ctx = (AioContext *) source;
 222
 223    atomic_or(&ctx->notify_me, 1);
 224
 225    /* We assume there is no timeout already supplied */
 226    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
 227
 228    if (aio_prepare(ctx)) {
 229        *timeout = 0;
 230    }
 231
 232    return *timeout == 0;
 233}
 234
 235static gboolean
 236aio_ctx_check(GSource *source)
 237{
 238    AioContext *ctx = (AioContext *) source;
 239    QEMUBH *bh;
 240
 241    atomic_and(&ctx->notify_me, ~1);
 242    aio_notify_accept(ctx);
 243
 244    for (bh = ctx->first_bh; bh; bh = bh->next) {
 245        if (bh->scheduled) {
 246            return true;
 247        }
 248    }
 249    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
 250}
 251
 252static gboolean
 253aio_ctx_dispatch(GSource     *source,
 254                 GSourceFunc  callback,
 255                 gpointer     user_data)
 256{
 257    AioContext *ctx = (AioContext *) source;
 258
 259    assert(callback == NULL);
 260    aio_dispatch(ctx);
 261    return true;
 262}
 263
 264static void
 265aio_ctx_finalize(GSource     *source)
 266{
 267    AioContext *ctx = (AioContext *) source;
 268
 269    thread_pool_free(ctx->thread_pool);
 270
 271#ifdef CONFIG_LINUX_AIO
 272    if (ctx->linux_aio) {
 273        laio_detach_aio_context(ctx->linux_aio, ctx);
 274        laio_cleanup(ctx->linux_aio);
 275        ctx->linux_aio = NULL;
 276    }
 277#endif
 278
 279    assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
 280    qemu_bh_delete(ctx->co_schedule_bh);
 281
 282    qemu_lockcnt_lock(&ctx->list_lock);
 283    assert(!qemu_lockcnt_count(&ctx->list_lock));
 284    while (ctx->first_bh) {
 285        QEMUBH *next = ctx->first_bh->next;
 286
 287        /* qemu_bh_delete() must have been called on BHs in this AioContext */
 288        assert(ctx->first_bh->deleted);
 289
 290        g_free(ctx->first_bh);
 291        ctx->first_bh = next;
 292    }
 293    qemu_lockcnt_unlock(&ctx->list_lock);
 294
 295    aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
 296    event_notifier_cleanup(&ctx->notifier);
 297    qemu_rec_mutex_destroy(&ctx->lock);
 298    qemu_lockcnt_destroy(&ctx->list_lock);
 299    timerlistgroup_deinit(&ctx->tlg);
 300    aio_context_destroy(ctx);
 301}
 302
 303static GSourceFuncs aio_source_funcs = {
 304    aio_ctx_prepare,
 305    aio_ctx_check,
 306    aio_ctx_dispatch,
 307    aio_ctx_finalize
 308};
 309
 310GSource *aio_get_g_source(AioContext *ctx)
 311{
 312    g_source_ref(&ctx->source);
 313    return &ctx->source;
 314}
 315
 316ThreadPool *aio_get_thread_pool(AioContext *ctx)
 317{
 318    if (!ctx->thread_pool) {
 319        ctx->thread_pool = thread_pool_new(ctx);
 320    }
 321    return ctx->thread_pool;
 322}
 323
 324#ifdef CONFIG_LINUX_AIO
 325LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp)
 326{
 327    if (!ctx->linux_aio) {
 328        ctx->linux_aio = laio_init(errp);
 329        if (ctx->linux_aio) {
 330            laio_attach_aio_context(ctx->linux_aio, ctx);
 331        }
 332    }
 333    return ctx->linux_aio;
 334}
 335
 336LinuxAioState *aio_get_linux_aio(AioContext *ctx)
 337{
 338    assert(ctx->linux_aio);
 339    return ctx->linux_aio;
 340}
 341#endif
 342
 343void aio_notify(AioContext *ctx)
 344{
 345    /* Write e.g. bh->scheduled before reading ctx->notify_me.  Pairs
 346     * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
 347     */
 348    smp_mb();
 349    if (ctx->notify_me) {
 350        event_notifier_set(&ctx->notifier);
 351        atomic_mb_set(&ctx->notified, true);
 352    }
 353}
 354
 355void aio_notify_accept(AioContext *ctx)
 356{
 357    if (atomic_xchg(&ctx->notified, false)) {
 358        event_notifier_test_and_clear(&ctx->notifier);
 359    }
 360}
 361
 362static void aio_timerlist_notify(void *opaque, QEMUClockType type)
 363{
 364    aio_notify(opaque);
 365}
 366
 367static void event_notifier_dummy_cb(EventNotifier *e)
 368{
 369}
 370
 371/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
 372static bool event_notifier_poll(void *opaque)
 373{
 374    EventNotifier *e = opaque;
 375    AioContext *ctx = container_of(e, AioContext, notifier);
 376
 377    return atomic_read(&ctx->notified);
 378}
 379
 380static void co_schedule_bh_cb(void *opaque)
 381{
 382    AioContext *ctx = opaque;
 383    QSLIST_HEAD(, Coroutine) straight, reversed;
 384
 385    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
 386    QSLIST_INIT(&straight);
 387
 388    while (!QSLIST_EMPTY(&reversed)) {
 389        Coroutine *co = QSLIST_FIRST(&reversed);
 390        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
 391        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
 392    }
 393
 394    while (!QSLIST_EMPTY(&straight)) {
 395        Coroutine *co = QSLIST_FIRST(&straight);
 396        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
 397        trace_aio_co_schedule_bh_cb(ctx, co);
 398        aio_context_acquire(ctx);
 399
 400        /* Protected by write barrier in qemu_aio_coroutine_enter */
 401        atomic_set(&co->scheduled, NULL);
 402        qemu_aio_coroutine_enter(ctx, co);
 403        aio_context_release(ctx);
 404    }
 405}
 406
 407AioContext *aio_context_new(Error **errp)
 408{
 409    int ret;
 410    AioContext *ctx;
 411
 412    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
 413    aio_context_setup(ctx);
 414
 415    ret = event_notifier_init(&ctx->notifier, false);
 416    if (ret < 0) {
 417        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
 418        goto fail;
 419    }
 420    g_source_set_can_recurse(&ctx->source, true);
 421    qemu_lockcnt_init(&ctx->list_lock);
 422
 423    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
 424    QSLIST_INIT(&ctx->scheduled_coroutines);
 425
 426    aio_set_event_notifier(ctx, &ctx->notifier,
 427                           false,
 428                           (EventNotifierHandler *)
 429                           event_notifier_dummy_cb,
 430                           event_notifier_poll);
 431#ifdef CONFIG_LINUX_AIO
 432    ctx->linux_aio = NULL;
 433#endif
 434    ctx->thread_pool = NULL;
 435    qemu_rec_mutex_init(&ctx->lock);
 436    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 437
 438    ctx->poll_ns = 0;
 439    ctx->poll_max_ns = 0;
 440    ctx->poll_grow = 0;
 441    ctx->poll_shrink = 0;
 442
 443    return ctx;
 444fail:
 445    g_source_destroy(&ctx->source);
 446    return NULL;
 447}
 448
 449void aio_co_schedule(AioContext *ctx, Coroutine *co)
 450{
 451    trace_aio_co_schedule(ctx, co);
 452    const char *scheduled = atomic_cmpxchg(&co->scheduled, NULL,
 453                                           __func__);
 454
 455    if (scheduled) {
 456        fprintf(stderr,
 457                "%s: Co-routine was already scheduled in '%s'\n",
 458                __func__, scheduled);
 459        abort();
 460    }
 461
 462    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
 463                              co, co_scheduled_next);
 464    qemu_bh_schedule(ctx->co_schedule_bh);
 465}
 466
 467void aio_co_wake(struct Coroutine *co)
 468{
 469    AioContext *ctx;
 470
 471    /* Read coroutine before co->ctx.  Matches smp_wmb in
 472     * qemu_coroutine_enter.
 473     */
 474    smp_read_barrier_depends();
 475    ctx = atomic_read(&co->ctx);
 476
 477    aio_co_enter(ctx, co);
 478}
 479
 480void aio_co_enter(AioContext *ctx, struct Coroutine *co)
 481{
 482    if (ctx != qemu_get_current_aio_context()) {
 483        aio_co_schedule(ctx, co);
 484        return;
 485    }
 486
 487    if (qemu_in_coroutine()) {
 488        Coroutine *self = qemu_coroutine_self();
 489        assert(self != co);
 490        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
 491    } else {
 492        aio_context_acquire(ctx);
 493        qemu_aio_coroutine_enter(ctx, co);
 494        aio_context_release(ctx);
 495    }
 496}
 497
 498void aio_context_ref(AioContext *ctx)
 499{
 500    g_source_ref(&ctx->source);
 501}
 502
 503void aio_context_unref(AioContext *ctx)
 504{
 505    g_source_unref(&ctx->source);
 506}
 507
 508void aio_context_acquire(AioContext *ctx)
 509{
 510    qemu_rec_mutex_lock(&ctx->lock);
 511}
 512
 513void aio_context_release(AioContext *ctx)
 514{
 515    qemu_rec_mutex_unlock(&ctx->lock);
 516}
 517