qemu/async.c
<<
>>
Prefs
   1/*
   2 * QEMU System Emulator
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 *
   6 * Permission is hereby granted, free of charge, to any person obtaining a copy
   7 * of this software and associated documentation files (the "Software"), to deal
   8 * in the Software without restriction, including without limitation the rights
   9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10 * copies of the Software, and to permit persons to whom the Software is
  11 * furnished to do so, subject to the following conditions:
  12 *
  13 * The above copyright notice and this permission notice shall be included in
  14 * all copies or substantial portions of the Software.
  15 *
  16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  19 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22 * THE SOFTWARE.
  23 */
  24
  25#include "qemu-common.h"
  26#include "block/aio.h"
  27#include "block/thread-pool.h"
  28#include "qemu/main-loop.h"
  29#include "qemu/atomic.h"
  30
  31/***********************************************************/
  32/* bottom halves (can be seen as timers which expire ASAP) */
  33
  34struct QEMUBH {
  35    AioContext *ctx;
  36    QEMUBHFunc *cb;
  37    void *opaque;
  38    QEMUBH *next;
  39    bool scheduled;
  40    bool idle;
  41    bool deleted;
  42};
  43
  44QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  45{
  46    QEMUBH *bh;
  47    bh = g_malloc0(sizeof(QEMUBH));
  48    bh->ctx = ctx;
  49    bh->cb = cb;
  50    bh->opaque = opaque;
  51    qemu_mutex_lock(&ctx->bh_lock);
  52    bh->next = ctx->first_bh;
  53    /* Make sure that the members are ready before putting bh into list */
  54    smp_wmb();
  55    ctx->first_bh = bh;
  56    qemu_mutex_unlock(&ctx->bh_lock);
  57    return bh;
  58}
  59
  60/* Multiple occurrences of aio_bh_poll cannot be called concurrently */
  61int aio_bh_poll(AioContext *ctx)
  62{
  63    QEMUBH *bh, **bhp, *next;
  64    int ret;
  65
  66    ctx->walking_bh++;
  67
  68    ret = 0;
  69    for (bh = ctx->first_bh; bh; bh = next) {
  70        /* Make sure that fetching bh happens before accessing its members */
  71        smp_read_barrier_depends();
  72        next = bh->next;
  73        if (!bh->deleted && bh->scheduled) {
  74            bh->scheduled = 0;
  75            /* Paired with write barrier in bh schedule to ensure reading for
  76             * idle & callbacks coming after bh's scheduling.
  77             */
  78            smp_rmb();
  79            if (!bh->idle)
  80                ret = 1;
  81            bh->idle = 0;
  82            bh->cb(bh->opaque);
  83        }
  84    }
  85
  86    ctx->walking_bh--;
  87
  88    /* remove deleted bhs */
  89    if (!ctx->walking_bh) {
  90        qemu_mutex_lock(&ctx->bh_lock);
  91        bhp = &ctx->first_bh;
  92        while (*bhp) {
  93            bh = *bhp;
  94            if (bh->deleted) {
  95                *bhp = bh->next;
  96                g_free(bh);
  97            } else {
  98                bhp = &bh->next;
  99            }
 100        }
 101        qemu_mutex_unlock(&ctx->bh_lock);
 102    }
 103
 104    return ret;
 105}
 106
 107void qemu_bh_schedule_idle(QEMUBH *bh)
 108{
 109    if (bh->scheduled)
 110        return;
 111    bh->idle = 1;
 112    /* Make sure that idle & any writes needed by the callback are done
 113     * before the locations are read in the aio_bh_poll.
 114     */
 115    smp_wmb();
 116    bh->scheduled = 1;
 117}
 118
 119void qemu_bh_schedule(QEMUBH *bh)
 120{
 121    AioContext *ctx;
 122
 123    if (bh->scheduled)
 124        return;
 125    ctx = bh->ctx;
 126    bh->idle = 0;
 127    /* Make sure that:
 128     * 1. idle & any writes needed by the callback are done before the
 129     *    locations are read in the aio_bh_poll.
 130     * 2. ctx is loaded before scheduled is set and the callback has a chance
 131     *    to execute.
 132     */
 133    smp_mb();
 134    bh->scheduled = 1;
 135    aio_notify(ctx);
 136}
 137
 138
 139/* This func is async.
 140 */
 141void qemu_bh_cancel(QEMUBH *bh)
 142{
 143    bh->scheduled = 0;
 144}
 145
 146/* This func is async.The bottom half will do the delete action at the finial
 147 * end.
 148 */
 149void qemu_bh_delete(QEMUBH *bh)
 150{
 151    bh->scheduled = 0;
 152    bh->deleted = 1;
 153}
 154
 155int64_t
 156aio_compute_timeout(AioContext *ctx)
 157{
 158    int64_t deadline;
 159    int timeout = -1;
 160    QEMUBH *bh;
 161
 162    for (bh = ctx->first_bh; bh; bh = bh->next) {
 163        if (!bh->deleted && bh->scheduled) {
 164            if (bh->idle) {
 165                /* idle bottom halves will be polled at least
 166                 * every 10ms */
 167                timeout = 10000000;
 168            } else {
 169                /* non-idle bottom halves will be executed
 170                 * immediately */
 171                return 0;
 172            }
 173        }
 174    }
 175
 176    deadline = timerlistgroup_deadline_ns(&ctx->tlg);
 177    if (deadline == 0) {
 178        return 0;
 179    } else {
 180        return qemu_soonest_timeout(timeout, deadline);
 181    }
 182}
 183
 184static gboolean
 185aio_ctx_prepare(GSource *source, gint    *timeout)
 186{
 187    AioContext *ctx = (AioContext *) source;
 188
 189    /* We assume there is no timeout already supplied */
 190    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
 191
 192    if (aio_prepare(ctx)) {
 193        *timeout = 0;
 194    }
 195
 196    return *timeout == 0;
 197}
 198
 199static gboolean
 200aio_ctx_check(GSource *source)
 201{
 202    AioContext *ctx = (AioContext *) source;
 203    QEMUBH *bh;
 204
 205    for (bh = ctx->first_bh; bh; bh = bh->next) {
 206        if (!bh->deleted && bh->scheduled) {
 207            return true;
 208        }
 209    }
 210    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
 211}
 212
 213static gboolean
 214aio_ctx_dispatch(GSource     *source,
 215                 GSourceFunc  callback,
 216                 gpointer     user_data)
 217{
 218    AioContext *ctx = (AioContext *) source;
 219
 220    assert(callback == NULL);
 221    aio_dispatch(ctx);
 222    return true;
 223}
 224
 225static void
 226aio_ctx_finalize(GSource     *source)
 227{
 228    AioContext *ctx = (AioContext *) source;
 229
 230    thread_pool_free(ctx->thread_pool);
 231    aio_set_event_notifier(ctx, &ctx->notifier, NULL);
 232    event_notifier_cleanup(&ctx->notifier);
 233    rfifolock_destroy(&ctx->lock);
 234    qemu_mutex_destroy(&ctx->bh_lock);
 235    g_array_free(ctx->pollfds, TRUE);
 236    timerlistgroup_deinit(&ctx->tlg);
 237}
 238
 239static GSourceFuncs aio_source_funcs = {
 240    aio_ctx_prepare,
 241    aio_ctx_check,
 242    aio_ctx_dispatch,
 243    aio_ctx_finalize
 244};
 245
 246GSource *aio_get_g_source(AioContext *ctx)
 247{
 248    g_source_ref(&ctx->source);
 249    return &ctx->source;
 250}
 251
 252ThreadPool *aio_get_thread_pool(AioContext *ctx)
 253{
 254    if (!ctx->thread_pool) {
 255        ctx->thread_pool = thread_pool_new(ctx);
 256    }
 257    return ctx->thread_pool;
 258}
 259
 260void aio_set_dispatching(AioContext *ctx, bool dispatching)
 261{
 262    ctx->dispatching = dispatching;
 263    if (!dispatching) {
 264        /* Write ctx->dispatching before reading e.g. bh->scheduled.
 265         * Optimization: this is only needed when we're entering the "unsafe"
 266         * phase where other threads must call event_notifier_set.
 267         */
 268        smp_mb();
 269    }
 270}
 271
 272void aio_notify(AioContext *ctx)
 273{
 274    /* Write e.g. bh->scheduled before reading ctx->dispatching.  */
 275    smp_mb();
 276    if (!ctx->dispatching) {
 277        event_notifier_set(&ctx->notifier);
 278    }
 279}
 280
 281static void aio_timerlist_notify(void *opaque)
 282{
 283    aio_notify(opaque);
 284}
 285
 286static void aio_rfifolock_cb(void *opaque)
 287{
 288    /* Kick owner thread in case they are blocked in aio_poll() */
 289    aio_notify(opaque);
 290}
 291
 292AioContext *aio_context_new(Error **errp)
 293{
 294    int ret;
 295    AioContext *ctx;
 296    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
 297    ret = event_notifier_init(&ctx->notifier, false);
 298    if (ret < 0) {
 299        g_source_destroy(&ctx->source);
 300        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
 301        return NULL;
 302    }
 303    aio_set_event_notifier(ctx, &ctx->notifier,
 304                           (EventNotifierHandler *)
 305                           event_notifier_test_and_clear);
 306    ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
 307    ctx->thread_pool = NULL;
 308    qemu_mutex_init(&ctx->bh_lock);
 309    rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
 310    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 311
 312    return ctx;
 313}
 314
 315void aio_context_ref(AioContext *ctx)
 316{
 317    g_source_ref(&ctx->source);
 318}
 319
 320void aio_context_unref(AioContext *ctx)
 321{
 322    g_source_unref(&ctx->source);
 323}
 324
 325void aio_context_acquire(AioContext *ctx)
 326{
 327    rfifolock_lock(&ctx->lock);
 328}
 329
 330void aio_context_release(AioContext *ctx)
 331{
 332    rfifolock_unlock(&ctx->lock);
 333}
 334