qemu/util/async.c
<<
>>
Prefs
   1/*
   2 * Data plane event loop
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2009-2017 QEMU contributors
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qapi/error.h"
  28#include "qemu-common.h"
  29#include "block/aio.h"
  30#include "block/thread-pool.h"
  31#include "qemu/main-loop.h"
  32#include "qemu/atomic.h"
  33#include "block/raw-aio.h"
  34#include "qemu/coroutine_int.h"
  35#include "trace.h"
  36
  37/***********************************************************/
  38/* bottom halves (can be seen as timers which expire ASAP) */
  39
  40struct QEMUBH {
  41    AioContext *ctx;
  42    QEMUBHFunc *cb;
  43    void *opaque;
  44    QEMUBH *next;
  45    bool scheduled;
  46    bool idle;
  47    bool deleted;
  48};
  49
  50void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  51{
  52    QEMUBH *bh;
  53    bh = g_new(QEMUBH, 1);
  54    *bh = (QEMUBH){
  55        .ctx = ctx,
  56        .cb = cb,
  57        .opaque = opaque,
  58    };
  59    qemu_lockcnt_lock(&ctx->list_lock);
  60    bh->next = ctx->first_bh;
  61    bh->scheduled = 1;
  62    bh->deleted = 1;
  63    /* Make sure that the members are ready before putting bh into list */
  64    smp_wmb();
  65    ctx->first_bh = bh;
  66    qemu_lockcnt_unlock(&ctx->list_lock);
  67    aio_notify(ctx);
  68}
  69
  70QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  71{
  72    QEMUBH *bh;
  73    bh = g_new(QEMUBH, 1);
  74    *bh = (QEMUBH){
  75        .ctx = ctx,
  76        .cb = cb,
  77        .opaque = opaque,
  78    };
  79    qemu_lockcnt_lock(&ctx->list_lock);
  80    bh->next = ctx->first_bh;
  81    /* Make sure that the members are ready before putting bh into list */
  82    smp_wmb();
  83    ctx->first_bh = bh;
  84    qemu_lockcnt_unlock(&ctx->list_lock);
  85    return bh;
  86}
  87
  88void aio_bh_call(QEMUBH *bh)
  89{
  90    bh->cb(bh->opaque);
  91}
  92
  93/* Multiple occurrences of aio_bh_poll cannot be called concurrently.
  94 * The count in ctx->list_lock is incremented before the call, and is
  95 * not affected by the call.
  96 */
  97int aio_bh_poll(AioContext *ctx)
  98{
  99    QEMUBH *bh, **bhp, *next;
 100    int ret;
 101    bool deleted = false;
 102
 103    ret = 0;
 104    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
 105        next = atomic_rcu_read(&bh->next);
 106        /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
 107         * implicit memory barrier ensures that the callback sees all writes
 108         * done by the scheduling thread.  It also ensures that the scheduling
 109         * thread sees the zero before bh->cb has run, and thus will call
 110         * aio_notify again if necessary.
 111         */
 112        if (atomic_xchg(&bh->scheduled, 0)) {
 113            /* Idle BHs don't count as progress */
 114            if (!bh->idle) {
 115                ret = 1;
 116            }
 117            bh->idle = 0;
 118            aio_bh_call(bh);
 119        }
 120        if (bh->deleted) {
 121            deleted = true;
 122        }
 123    }
 124
 125    /* remove deleted bhs */
 126    if (!deleted) {
 127        return ret;
 128    }
 129
 130    if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 131        bhp = &ctx->first_bh;
 132        while (*bhp) {
 133            bh = *bhp;
 134            if (bh->deleted && !bh->scheduled) {
 135                *bhp = bh->next;
 136                g_free(bh);
 137            } else {
 138                bhp = &bh->next;
 139            }
 140        }
 141        qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 142    }
 143    return ret;
 144}
 145
 146void qemu_bh_schedule_idle(QEMUBH *bh)
 147{
 148    bh->idle = 1;
 149    /* Make sure that idle & any writes needed by the callback are done
 150     * before the locations are read in the aio_bh_poll.
 151     */
 152    atomic_mb_set(&bh->scheduled, 1);
 153}
 154
 155void qemu_bh_schedule(QEMUBH *bh)
 156{
 157    AioContext *ctx;
 158
 159    ctx = bh->ctx;
 160    bh->idle = 0;
 161    /* The memory barrier implicit in atomic_xchg makes sure that:
 162     * 1. idle & any writes needed by the callback are done before the
 163     *    locations are read in the aio_bh_poll.
 164     * 2. ctx is loaded before scheduled is set and the callback has a chance
 165     *    to execute.
 166     */
 167    if (atomic_xchg(&bh->scheduled, 1) == 0) {
 168        aio_notify(ctx);
 169    }
 170}
 171
 172
 173/* This func is async.
 174 */
 175void qemu_bh_cancel(QEMUBH *bh)
 176{
 177    bh->scheduled = 0;
 178}
 179
 180/* This func is async.The bottom half will do the delete action at the finial
 181 * end.
 182 */
 183void qemu_bh_delete(QEMUBH *bh)
 184{
 185    bh->scheduled = 0;
 186    bh->deleted = 1;
 187}
 188
 189int64_t
 190aio_compute_timeout(AioContext *ctx)
 191{
 192    int64_t deadline;
 193    int timeout = -1;
 194    QEMUBH *bh;
 195
 196    for (bh = atomic_rcu_read(&ctx->first_bh); bh;
 197         bh = atomic_rcu_read(&bh->next)) {
 198        if (bh->scheduled) {
 199            if (bh->idle) {
 200                /* idle bottom halves will be polled at least
 201                 * every 10ms */
 202                timeout = 10000000;
 203            } else {
 204                /* non-idle bottom halves will be executed
 205                 * immediately */
 206                return 0;
 207            }
 208        }
 209    }
 210
 211    deadline = timerlistgroup_deadline_ns(&ctx->tlg);
 212    if (deadline == 0) {
 213        return 0;
 214    } else {
 215        return qemu_soonest_timeout(timeout, deadline);
 216    }
 217}
 218
 219static gboolean
 220aio_ctx_prepare(GSource *source, gint    *timeout)
 221{
 222    AioContext *ctx = (AioContext *) source;
 223
 224    atomic_or(&ctx->notify_me, 1);
 225
 226    /* We assume there is no timeout already supplied */
 227    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
 228
 229    if (aio_prepare(ctx)) {
 230        *timeout = 0;
 231    }
 232
 233    return *timeout == 0;
 234}
 235
 236static gboolean
 237aio_ctx_check(GSource *source)
 238{
 239    AioContext *ctx = (AioContext *) source;
 240    QEMUBH *bh;
 241
 242    atomic_and(&ctx->notify_me, ~1);
 243    aio_notify_accept(ctx);
 244
 245    for (bh = ctx->first_bh; bh; bh = bh->next) {
 246        if (bh->scheduled) {
 247            return true;
 248        }
 249    }
 250    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
 251}
 252
 253static gboolean
 254aio_ctx_dispatch(GSource     *source,
 255                 GSourceFunc  callback,
 256                 gpointer     user_data)
 257{
 258    AioContext *ctx = (AioContext *) source;
 259
 260    assert(callback == NULL);
 261    aio_dispatch(ctx);
 262    return true;
 263}
 264
 265static void
 266aio_ctx_finalize(GSource     *source)
 267{
 268    AioContext *ctx = (AioContext *) source;
 269
 270    thread_pool_free(ctx->thread_pool);
 271
 272#ifdef CONFIG_LINUX_AIO
 273    if (ctx->linux_aio) {
 274        laio_detach_aio_context(ctx->linux_aio, ctx);
 275        laio_cleanup(ctx->linux_aio);
 276        ctx->linux_aio = NULL;
 277    }
 278#endif
 279
 280    assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
 281    qemu_bh_delete(ctx->co_schedule_bh);
 282
 283    qemu_lockcnt_lock(&ctx->list_lock);
 284    assert(!qemu_lockcnt_count(&ctx->list_lock));
 285    while (ctx->first_bh) {
 286        QEMUBH *next = ctx->first_bh->next;
 287
 288        /* qemu_bh_delete() must have been called on BHs in this AioContext */
 289        assert(ctx->first_bh->deleted);
 290
 291        g_free(ctx->first_bh);
 292        ctx->first_bh = next;
 293    }
 294    qemu_lockcnt_unlock(&ctx->list_lock);
 295
 296    aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
 297    event_notifier_cleanup(&ctx->notifier);
 298    qemu_rec_mutex_destroy(&ctx->lock);
 299    qemu_lockcnt_destroy(&ctx->list_lock);
 300    timerlistgroup_deinit(&ctx->tlg);
 301}
 302
 303static GSourceFuncs aio_source_funcs = {
 304    aio_ctx_prepare,
 305    aio_ctx_check,
 306    aio_ctx_dispatch,
 307    aio_ctx_finalize
 308};
 309
 310GSource *aio_get_g_source(AioContext *ctx)
 311{
 312    g_source_ref(&ctx->source);
 313    return &ctx->source;
 314}
 315
 316ThreadPool *aio_get_thread_pool(AioContext *ctx)
 317{
 318    if (!ctx->thread_pool) {
 319        ctx->thread_pool = thread_pool_new(ctx);
 320    }
 321    return ctx->thread_pool;
 322}
 323
 324#ifdef CONFIG_LINUX_AIO
 325LinuxAioState *aio_get_linux_aio(AioContext *ctx)
 326{
 327    if (!ctx->linux_aio) {
 328        ctx->linux_aio = laio_init();
 329        laio_attach_aio_context(ctx->linux_aio, ctx);
 330    }
 331    return ctx->linux_aio;
 332}
 333#endif
 334
 335void aio_notify(AioContext *ctx)
 336{
 337    /* Write e.g. bh->scheduled before reading ctx->notify_me.  Pairs
 338     * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
 339     */
 340    smp_mb();
 341    if (ctx->notify_me) {
 342        event_notifier_set(&ctx->notifier);
 343        atomic_mb_set(&ctx->notified, true);
 344    }
 345}
 346
 347void aio_notify_accept(AioContext *ctx)
 348{
 349    if (atomic_xchg(&ctx->notified, false)) {
 350        event_notifier_test_and_clear(&ctx->notifier);
 351    }
 352}
 353
 354static void aio_timerlist_notify(void *opaque, QEMUClockType type)
 355{
 356    aio_notify(opaque);
 357}
 358
 359static void event_notifier_dummy_cb(EventNotifier *e)
 360{
 361}
 362
 363/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
 364static bool event_notifier_poll(void *opaque)
 365{
 366    EventNotifier *e = opaque;
 367    AioContext *ctx = container_of(e, AioContext, notifier);
 368
 369    return atomic_read(&ctx->notified);
 370}
 371
 372static void co_schedule_bh_cb(void *opaque)
 373{
 374    AioContext *ctx = opaque;
 375    QSLIST_HEAD(, Coroutine) straight, reversed;
 376
 377    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
 378    QSLIST_INIT(&straight);
 379
 380    while (!QSLIST_EMPTY(&reversed)) {
 381        Coroutine *co = QSLIST_FIRST(&reversed);
 382        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
 383        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
 384    }
 385
 386    while (!QSLIST_EMPTY(&straight)) {
 387        Coroutine *co = QSLIST_FIRST(&straight);
 388        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
 389        trace_aio_co_schedule_bh_cb(ctx, co);
 390        aio_context_acquire(ctx);
 391        qemu_coroutine_enter(co);
 392        aio_context_release(ctx);
 393    }
 394}
 395
 396AioContext *aio_context_new(Error **errp)
 397{
 398    int ret;
 399    AioContext *ctx;
 400
 401    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
 402    aio_context_setup(ctx);
 403
 404    ret = event_notifier_init(&ctx->notifier, false);
 405    if (ret < 0) {
 406        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
 407        goto fail;
 408    }
 409    g_source_set_can_recurse(&ctx->source, true);
 410    qemu_lockcnt_init(&ctx->list_lock);
 411
 412    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
 413    QSLIST_INIT(&ctx->scheduled_coroutines);
 414
 415    aio_set_event_notifier(ctx, &ctx->notifier,
 416                           false,
 417                           (EventNotifierHandler *)
 418                           event_notifier_dummy_cb,
 419                           event_notifier_poll);
 420#ifdef CONFIG_LINUX_AIO
 421    ctx->linux_aio = NULL;
 422#endif
 423    ctx->thread_pool = NULL;
 424    qemu_rec_mutex_init(&ctx->lock);
 425    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 426
 427    ctx->poll_ns = 0;
 428    ctx->poll_max_ns = 0;
 429    ctx->poll_grow = 0;
 430    ctx->poll_shrink = 0;
 431
 432    return ctx;
 433fail:
 434    g_source_destroy(&ctx->source);
 435    return NULL;
 436}
 437
 438void aio_co_schedule(AioContext *ctx, Coroutine *co)
 439{
 440    trace_aio_co_schedule(ctx, co);
 441    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
 442                              co, co_scheduled_next);
 443    qemu_bh_schedule(ctx->co_schedule_bh);
 444}
 445
 446void aio_co_wake(struct Coroutine *co)
 447{
 448    AioContext *ctx;
 449
 450    /* Read coroutine before co->ctx.  Matches smp_wmb in
 451     * qemu_coroutine_enter.
 452     */
 453    smp_read_barrier_depends();
 454    ctx = atomic_read(&co->ctx);
 455
 456    aio_co_enter(ctx, co);
 457}
 458
 459void aio_co_enter(AioContext *ctx, struct Coroutine *co)
 460{
 461    if (ctx != qemu_get_current_aio_context()) {
 462        aio_co_schedule(ctx, co);
 463        return;
 464    }
 465
 466    if (qemu_in_coroutine()) {
 467        Coroutine *self = qemu_coroutine_self();
 468        assert(self != co);
 469        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
 470    } else {
 471        aio_context_acquire(ctx);
 472        qemu_aio_coroutine_enter(ctx, co);
 473        aio_context_release(ctx);
 474    }
 475}
 476
 477void aio_context_ref(AioContext *ctx)
 478{
 479    g_source_ref(&ctx->source);
 480}
 481
 482void aio_context_unref(AioContext *ctx)
 483{
 484    g_source_unref(&ctx->source);
 485}
 486
 487void aio_context_acquire(AioContext *ctx)
 488{
 489    qemu_rec_mutex_lock(&ctx->lock);
 490}
 491
 492void aio_context_release(AioContext *ctx)
 493{
 494    qemu_rec_mutex_unlock(&ctx->lock);
 495}
 496