qemu/util/async.c
<<
>>
Prefs
   1/*
   2 * Data plane event loop
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2009-2017 QEMU contributors
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qapi/error.h"
  28#include "qemu-common.h"
  29#include "block/aio.h"
  30#include "block/thread-pool.h"
  31#include "qemu/main-loop.h"
  32#include "qemu/atomic.h"
  33#include "block/raw-aio.h"
  34#include "qemu/coroutine_int.h"
  35#include "trace.h"
  36
  37/***********************************************************/
  38/* bottom halves (can be seen as timers which expire ASAP) */
  39
  40struct QEMUBH {
  41    AioContext *ctx;
  42    QEMUBHFunc *cb;
  43    void *opaque;
  44    QEMUBH *next;
  45    bool scheduled;
  46    bool idle;
  47    bool deleted;
  48};
  49
  50void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  51{
  52    QEMUBH *bh;
  53    bh = g_new(QEMUBH, 1);
  54    *bh = (QEMUBH){
  55        .ctx = ctx,
  56        .cb = cb,
  57        .opaque = opaque,
  58    };
  59    qemu_lockcnt_lock(&ctx->list_lock);
  60    bh->next = ctx->first_bh;
  61    bh->scheduled = 1;
  62    bh->deleted = 1;
  63    /* Make sure that the members are ready before putting bh into list */
  64    smp_wmb();
  65    ctx->first_bh = bh;
  66    qemu_lockcnt_unlock(&ctx->list_lock);
  67    aio_notify(ctx);
  68}
  69
  70QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  71{
  72    QEMUBH *bh;
  73    bh = g_new(QEMUBH, 1);
  74    *bh = (QEMUBH){
  75        .ctx = ctx,
  76        .cb = cb,
  77        .opaque = opaque,
  78    };
  79    qemu_lockcnt_lock(&ctx->list_lock);
  80    bh->next = ctx->first_bh;
  81    /* Make sure that the members are ready before putting bh into list */
  82    smp_wmb();
  83    ctx->first_bh = bh;
  84    qemu_lockcnt_unlock(&ctx->list_lock);
  85    return bh;
  86}
  87
  88void aio_bh_call(QEMUBH *bh)
  89{
  90    bh->cb(bh->opaque);
  91}
  92
  93/* Multiple occurrences of aio_bh_poll cannot be called concurrently.
  94 * The count in ctx->list_lock is incremented before the call, and is
  95 * not affected by the call.
  96 */
  97int aio_bh_poll(AioContext *ctx)
  98{
  99    QEMUBH *bh, **bhp, *next;
 100    int ret;
 101    bool deleted = false;
 102
 103    ret = 0;
 104    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
 105        next = atomic_rcu_read(&bh->next);
 106        /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
 107         * implicit memory barrier ensures that the callback sees all writes
 108         * done by the scheduling thread.  It also ensures that the scheduling
 109         * thread sees the zero before bh->cb has run, and thus will call
 110         * aio_notify again if necessary.
 111         */
 112        if (atomic_xchg(&bh->scheduled, 0)) {
 113            /* Idle BHs don't count as progress */
 114            if (!bh->idle) {
 115                ret = 1;
 116            }
 117            bh->idle = 0;
 118            aio_bh_call(bh);
 119        }
 120        if (bh->deleted) {
 121            deleted = true;
 122        }
 123    }
 124
 125    /* remove deleted bhs */
 126    if (!deleted) {
 127        return ret;
 128    }
 129
 130    if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 131        bhp = &ctx->first_bh;
 132        while (*bhp) {
 133            bh = *bhp;
 134            if (bh->deleted && !bh->scheduled) {
 135                *bhp = bh->next;
 136                g_free(bh);
 137            } else {
 138                bhp = &bh->next;
 139            }
 140        }
 141        qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 142    }
 143    return ret;
 144}
 145
 146void qemu_bh_schedule_idle(QEMUBH *bh)
 147{
 148    bh->idle = 1;
 149    /* Make sure that idle & any writes needed by the callback are done
 150     * before the locations are read in the aio_bh_poll.
 151     */
 152    atomic_mb_set(&bh->scheduled, 1);
 153}
 154
 155void qemu_bh_schedule(QEMUBH *bh)
 156{
 157    AioContext *ctx;
 158
 159    ctx = bh->ctx;
 160    bh->idle = 0;
 161    /* The memory barrier implicit in atomic_xchg makes sure that:
 162     * 1. idle & any writes needed by the callback are done before the
 163     *    locations are read in the aio_bh_poll.
 164     * 2. ctx is loaded before scheduled is set and the callback has a chance
 165     *    to execute.
 166     */
 167    if (atomic_xchg(&bh->scheduled, 1) == 0) {
 168        aio_notify(ctx);
 169    }
 170}
 171
 172
 173/* This func is async.
 174 */
 175void qemu_bh_cancel(QEMUBH *bh)
 176{
 177    atomic_mb_set(&bh->scheduled, 0);
 178}
 179
 180/* This func is async.The bottom half will do the delete action at the finial
 181 * end.
 182 */
 183void qemu_bh_delete(QEMUBH *bh)
 184{
 185    bh->scheduled = 0;
 186    bh->deleted = 1;
 187}
 188
 189int64_t
 190aio_compute_timeout(AioContext *ctx)
 191{
 192    int64_t deadline;
 193    int timeout = -1;
 194    QEMUBH *bh;
 195
 196    for (bh = atomic_rcu_read(&ctx->first_bh); bh;
 197         bh = atomic_rcu_read(&bh->next)) {
 198        if (bh->scheduled) {
 199            if (bh->idle) {
 200                /* idle bottom halves will be polled at least
 201                 * every 10ms */
 202                timeout = 10000000;
 203            } else {
 204                /* non-idle bottom halves will be executed
 205                 * immediately */
 206                return 0;
 207            }
 208        }
 209    }
 210
 211    deadline = timerlistgroup_deadline_ns(&ctx->tlg);
 212    if (deadline == 0) {
 213        return 0;
 214    } else {
 215        return qemu_soonest_timeout(timeout, deadline);
 216    }
 217}
 218
 219static gboolean
 220aio_ctx_prepare(GSource *source, gint    *timeout)
 221{
 222    AioContext *ctx = (AioContext *) source;
 223
 224    atomic_or(&ctx->notify_me, 1);
 225
 226    /* We assume there is no timeout already supplied */
 227    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
 228
 229    if (aio_prepare(ctx)) {
 230        *timeout = 0;
 231    }
 232
 233    return *timeout == 0;
 234}
 235
 236static gboolean
 237aio_ctx_check(GSource *source)
 238{
 239    AioContext *ctx = (AioContext *) source;
 240    QEMUBH *bh;
 241
 242    atomic_and(&ctx->notify_me, ~1);
 243    aio_notify_accept(ctx);
 244
 245    for (bh = ctx->first_bh; bh; bh = bh->next) {
 246        if (bh->scheduled) {
 247            return true;
 248        }
 249    }
 250    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
 251}
 252
 253static gboolean
 254aio_ctx_dispatch(GSource     *source,
 255                 GSourceFunc  callback,
 256                 gpointer     user_data)
 257{
 258    AioContext *ctx = (AioContext *) source;
 259
 260    assert(callback == NULL);
 261    aio_dispatch(ctx);
 262    return true;
 263}
 264
 265static void
 266aio_ctx_finalize(GSource     *source)
 267{
 268    AioContext *ctx = (AioContext *) source;
 269
 270    thread_pool_free(ctx->thread_pool);
 271
 272#ifdef CONFIG_LINUX_AIO
 273    if (ctx->linux_aio) {
 274        laio_detach_aio_context(ctx->linux_aio, ctx);
 275        laio_cleanup(ctx->linux_aio);
 276        ctx->linux_aio = NULL;
 277    }
 278#endif
 279
 280    assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
 281    qemu_bh_delete(ctx->co_schedule_bh);
 282
 283    qemu_lockcnt_lock(&ctx->list_lock);
 284    assert(!qemu_lockcnt_count(&ctx->list_lock));
 285    while (ctx->first_bh) {
 286        QEMUBH *next = ctx->first_bh->next;
 287
 288        /* qemu_bh_delete() must have been called on BHs in this AioContext */
 289        assert(ctx->first_bh->deleted);
 290
 291        g_free(ctx->first_bh);
 292        ctx->first_bh = next;
 293    }
 294    qemu_lockcnt_unlock(&ctx->list_lock);
 295
 296    aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
 297    event_notifier_cleanup(&ctx->notifier);
 298    qemu_rec_mutex_destroy(&ctx->lock);
 299    qemu_lockcnt_destroy(&ctx->list_lock);
 300    timerlistgroup_deinit(&ctx->tlg);
 301    aio_context_destroy(ctx);
 302}
 303
 304static GSourceFuncs aio_source_funcs = {
 305    aio_ctx_prepare,
 306    aio_ctx_check,
 307    aio_ctx_dispatch,
 308    aio_ctx_finalize
 309};
 310
 311GSource *aio_get_g_source(AioContext *ctx)
 312{
 313    g_source_ref(&ctx->source);
 314    return &ctx->source;
 315}
 316
 317ThreadPool *aio_get_thread_pool(AioContext *ctx)
 318{
 319    if (!ctx->thread_pool) {
 320        ctx->thread_pool = thread_pool_new(ctx);
 321    }
 322    return ctx->thread_pool;
 323}
 324
 325#ifdef CONFIG_LINUX_AIO
 326LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp)
 327{
 328    if (!ctx->linux_aio) {
 329        ctx->linux_aio = laio_init(errp);
 330        if (ctx->linux_aio) {
 331            laio_attach_aio_context(ctx->linux_aio, ctx);
 332        }
 333    }
 334    return ctx->linux_aio;
 335}
 336
 337LinuxAioState *aio_get_linux_aio(AioContext *ctx)
 338{
 339    assert(ctx->linux_aio);
 340    return ctx->linux_aio;
 341}
 342#endif
 343
 344void aio_notify(AioContext *ctx)
 345{
 346    /* Write e.g. bh->scheduled before reading ctx->notify_me.  Pairs
 347     * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
 348     */
 349    smp_mb();
 350    if (ctx->notify_me) {
 351        event_notifier_set(&ctx->notifier);
 352        atomic_mb_set(&ctx->notified, true);
 353    }
 354}
 355
 356void aio_notify_accept(AioContext *ctx)
 357{
 358    if (atomic_xchg(&ctx->notified, false)) {
 359        event_notifier_test_and_clear(&ctx->notifier);
 360    }
 361}
 362
 363static void aio_timerlist_notify(void *opaque, QEMUClockType type)
 364{
 365    aio_notify(opaque);
 366}
 367
 368static void event_notifier_dummy_cb(EventNotifier *e)
 369{
 370}
 371
 372/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
 373static bool event_notifier_poll(void *opaque)
 374{
 375    EventNotifier *e = opaque;
 376    AioContext *ctx = container_of(e, AioContext, notifier);
 377
 378    return atomic_read(&ctx->notified);
 379}
 380
 381static void co_schedule_bh_cb(void *opaque)
 382{
 383    AioContext *ctx = opaque;
 384    QSLIST_HEAD(, Coroutine) straight, reversed;
 385
 386    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
 387    QSLIST_INIT(&straight);
 388
 389    while (!QSLIST_EMPTY(&reversed)) {
 390        Coroutine *co = QSLIST_FIRST(&reversed);
 391        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
 392        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
 393    }
 394
 395    while (!QSLIST_EMPTY(&straight)) {
 396        Coroutine *co = QSLIST_FIRST(&straight);
 397        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
 398        trace_aio_co_schedule_bh_cb(ctx, co);
 399        aio_context_acquire(ctx);
 400
 401        /* Protected by write barrier in qemu_aio_coroutine_enter */
 402        atomic_set(&co->scheduled, NULL);
 403        qemu_coroutine_enter(co);
 404        aio_context_release(ctx);
 405    }
 406}
 407
 408AioContext *aio_context_new(Error **errp)
 409{
 410    int ret;
 411    AioContext *ctx;
 412
 413    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
 414    aio_context_setup(ctx);
 415
 416    ret = event_notifier_init(&ctx->notifier, false);
 417    if (ret < 0) {
 418        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
 419        goto fail;
 420    }
 421    g_source_set_can_recurse(&ctx->source, true);
 422    qemu_lockcnt_init(&ctx->list_lock);
 423
 424    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
 425    QSLIST_INIT(&ctx->scheduled_coroutines);
 426
 427    aio_set_event_notifier(ctx, &ctx->notifier,
 428                           false,
 429                           (EventNotifierHandler *)
 430                           event_notifier_dummy_cb,
 431                           event_notifier_poll);
 432#ifdef CONFIG_LINUX_AIO
 433    ctx->linux_aio = NULL;
 434#endif
 435    ctx->thread_pool = NULL;
 436    qemu_rec_mutex_init(&ctx->lock);
 437    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 438
 439    ctx->poll_ns = 0;
 440    ctx->poll_max_ns = 0;
 441    ctx->poll_grow = 0;
 442    ctx->poll_shrink = 0;
 443
 444    return ctx;
 445fail:
 446    g_source_destroy(&ctx->source);
 447    return NULL;
 448}
 449
 450void aio_co_schedule(AioContext *ctx, Coroutine *co)
 451{
 452    trace_aio_co_schedule(ctx, co);
 453    const char *scheduled = atomic_cmpxchg(&co->scheduled, NULL,
 454                                           __func__);
 455
 456    if (scheduled) {
 457        fprintf(stderr,
 458                "%s: Co-routine was already scheduled in '%s'\n",
 459                __func__, scheduled);
 460        abort();
 461    }
 462
 463    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
 464                              co, co_scheduled_next);
 465    qemu_bh_schedule(ctx->co_schedule_bh);
 466}
 467
 468void aio_co_wake(struct Coroutine *co)
 469{
 470    AioContext *ctx;
 471
 472    /* Read coroutine before co->ctx.  Matches smp_wmb in
 473     * qemu_coroutine_enter.
 474     */
 475    smp_read_barrier_depends();
 476    ctx = atomic_read(&co->ctx);
 477
 478    aio_co_enter(ctx, co);
 479}
 480
 481void aio_co_enter(AioContext *ctx, struct Coroutine *co)
 482{
 483    if (ctx != qemu_get_current_aio_context()) {
 484        aio_co_schedule(ctx, co);
 485        return;
 486    }
 487
 488    if (qemu_in_coroutine()) {
 489        Coroutine *self = qemu_coroutine_self();
 490        assert(self != co);
 491        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
 492    } else {
 493        aio_context_acquire(ctx);
 494        qemu_aio_coroutine_enter(ctx, co);
 495        aio_context_release(ctx);
 496    }
 497}
 498
 499void aio_context_ref(AioContext *ctx)
 500{
 501    g_source_ref(&ctx->source);
 502}
 503
 504void aio_context_unref(AioContext *ctx)
 505{
 506    g_source_unref(&ctx->source);
 507}
 508
 509void aio_context_acquire(AioContext *ctx)
 510{
 511    qemu_rec_mutex_lock(&ctx->lock);
 512}
 513
 514void aio_context_release(AioContext *ctx)
 515{
 516    qemu_rec_mutex_unlock(&ctx->lock);
 517}
 518