qemu/block/linux-aio.c
<<
>>
Prefs
   1/*
   2 * Linux native AIO support.
   3 *
   4 * Copyright (C) 2009 IBM, Corp.
   5 * Copyright (C) 2009 Red Hat, Inc.
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2 or later.
   8 * See the COPYING file in the top-level directory.
   9 */
  10#include "qemu/osdep.h"
  11#include "qemu-common.h"
  12#include "block/aio.h"
  13#include "qemu/queue.h"
  14#include "block/block.h"
  15#include "block/raw-aio.h"
  16#include "qemu/event_notifier.h"
  17#include "qemu/coroutine.h"
  18#include "qapi/error.h"
  19
  20#include <libaio.h>
  21
  22/*
  23 * Queue size (per-device).
  24 *
  25 * XXX: eventually we need to communicate this to the guest and/or make it
  26 *      tunable by the guest.  If we get more outstanding requests at a time
  27 *      than this we will get EAGAIN from io_submit which is communicated to
  28 *      the guest as an I/O error.
  29 */
  30#define MAX_EVENTS 128
  31
  32struct qemu_laiocb {
  33    BlockAIOCB common;
  34    Coroutine *co;
  35    LinuxAioState *ctx;
  36    struct iocb iocb;
  37    ssize_t ret;
  38    size_t nbytes;
  39    QEMUIOVector *qiov;
  40    bool is_read;
  41    QSIMPLEQ_ENTRY(qemu_laiocb) next;
  42};
  43
  44typedef struct {
  45    int plugged;
  46    unsigned int in_queue;
  47    unsigned int in_flight;
  48    bool blocked;
  49    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
  50} LaioQueue;
  51
  52struct LinuxAioState {
  53    AioContext *aio_context;
  54
  55    io_context_t ctx;
  56    EventNotifier e;
  57
  58    /* io queue for submit at batch.  Protected by AioContext lock. */
  59    LaioQueue io_q;
  60
  61    /* I/O completion processing.  Only runs in I/O thread.  */
  62    QEMUBH *completion_bh;
  63    int event_idx;
  64    int event_max;
  65};
  66
  67static void ioq_submit(LinuxAioState *s);
  68
  69static inline ssize_t io_event_ret(struct io_event *ev)
  70{
  71    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
  72}
  73
  74/*
  75 * Completes an AIO request (calls the callback and frees the ACB).
  76 */
  77static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
  78{
  79    int ret;
  80
  81    ret = laiocb->ret;
  82    if (ret != -ECANCELED) {
  83        if (ret == laiocb->nbytes) {
  84            ret = 0;
  85        } else if (ret >= 0) {
  86            /* Short reads mean EOF, pad with zeros. */
  87            if (laiocb->is_read) {
  88                qemu_iovec_memset(laiocb->qiov, ret, 0,
  89                    laiocb->qiov->size - ret);
  90            } else {
  91                ret = -ENOSPC;
  92            }
  93        }
  94    }
  95
  96    laiocb->ret = ret;
  97    if (laiocb->co) {
  98        /* If the coroutine is already entered it must be in ioq_submit() and
  99         * will notice laio->ret has been filled in when it eventually runs
 100         * later.  Coroutines cannot be entered recursively so avoid doing
 101         * that!
 102         */
 103        if (!qemu_coroutine_entered(laiocb->co)) {
 104            aio_co_wake(laiocb->co);
 105        }
 106    } else {
 107        laiocb->common.cb(laiocb->common.opaque, ret);
 108        qemu_aio_unref(laiocb);
 109    }
 110}
 111
 112/**
 113 * aio_ring buffer which is shared between userspace and kernel.
 114 *
 115 * This copied from linux/fs/aio.c, common header does not exist
 116 * but AIO exists for ages so we assume ABI is stable.
 117 */
 118struct aio_ring {
 119    unsigned    id;    /* kernel internal index number */
 120    unsigned    nr;    /* number of io_events */
 121    unsigned    head;  /* Written to by userland or by kernel. */
 122    unsigned    tail;
 123
 124    unsigned    magic;
 125    unsigned    compat_features;
 126    unsigned    incompat_features;
 127    unsigned    header_length;  /* size of aio_ring */
 128
 129    struct io_event io_events[0];
 130};
 131
 132/**
 133 * io_getevents_peek:
 134 * @ctx: AIO context
 135 * @events: pointer on events array, output value
 136
 137 * Returns the number of completed events and sets a pointer
 138 * on events array.  This function does not update the internal
 139 * ring buffer, only reads head and tail.  When @events has been
 140 * processed io_getevents_commit() must be called.
 141 */
 142static inline unsigned int io_getevents_peek(io_context_t ctx,
 143                                             struct io_event **events)
 144{
 145    struct aio_ring *ring = (struct aio_ring *)ctx;
 146    unsigned int head = ring->head, tail = ring->tail;
 147    unsigned int nr;
 148
 149    nr = tail >= head ? tail - head : ring->nr - head;
 150    *events = ring->io_events + head;
 151    /* To avoid speculative loads of s->events[i] before observing tail.
 152       Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */
 153    smp_rmb();
 154
 155    return nr;
 156}
 157
 158/**
 159 * io_getevents_commit:
 160 * @ctx: AIO context
 161 * @nr: the number of events on which head should be advanced
 162 *
 163 * Advances head of a ring buffer.
 164 */
 165static inline void io_getevents_commit(io_context_t ctx, unsigned int nr)
 166{
 167    struct aio_ring *ring = (struct aio_ring *)ctx;
 168
 169    if (nr) {
 170        ring->head = (ring->head + nr) % ring->nr;
 171    }
 172}
 173
 174/**
 175 * io_getevents_advance_and_peek:
 176 * @ctx: AIO context
 177 * @events: pointer on events array, output value
 178 * @nr: the number of events on which head should be advanced
 179 *
 180 * Advances head of a ring buffer and returns number of elements left.
 181 */
 182static inline unsigned int
 183io_getevents_advance_and_peek(io_context_t ctx,
 184                              struct io_event **events,
 185                              unsigned int nr)
 186{
 187    io_getevents_commit(ctx, nr);
 188    return io_getevents_peek(ctx, events);
 189}
 190
 191/**
 192 * qemu_laio_process_completions:
 193 * @s: AIO state
 194 *
 195 * Fetches completed I/O requests and invokes their callbacks.
 196 *
 197 * The function is somewhat tricky because it supports nested event loops, for
 198 * example when a request callback invokes aio_poll().  In order to do this,
 199 * indices are kept in LinuxAioState.  Function schedules BH completion so it
 200 * can be called again in a nested event loop.  When there are no events left
 201 * to complete the BH is being canceled.
 202 */
 203static void qemu_laio_process_completions(LinuxAioState *s)
 204{
 205    struct io_event *events;
 206
 207    /* Reschedule so nested event loops see currently pending completions */
 208    qemu_bh_schedule(s->completion_bh);
 209
 210    while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events,
 211                                                         s->event_idx))) {
 212        for (s->event_idx = 0; s->event_idx < s->event_max; ) {
 213            struct iocb *iocb = events[s->event_idx].obj;
 214            struct qemu_laiocb *laiocb =
 215                container_of(iocb, struct qemu_laiocb, iocb);
 216
 217            laiocb->ret = io_event_ret(&events[s->event_idx]);
 218
 219            /* Change counters one-by-one because we can be nested. */
 220            s->io_q.in_flight--;
 221            s->event_idx++;
 222            qemu_laio_process_completion(laiocb);
 223        }
 224    }
 225
 226    qemu_bh_cancel(s->completion_bh);
 227
 228    /* If we are nested we have to notify the level above that we are done
 229     * by setting event_max to zero, upper level will then jump out of it's
 230     * own `for` loop.  If we are the last all counters droped to zero. */
 231    s->event_max = 0;
 232    s->event_idx = 0;
 233}
 234
 235static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
 236{
 237    aio_context_acquire(s->aio_context);
 238    qemu_laio_process_completions(s);
 239
 240    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
 241        ioq_submit(s);
 242    }
 243    aio_context_release(s->aio_context);
 244}
 245
 246static void qemu_laio_completion_bh(void *opaque)
 247{
 248    LinuxAioState *s = opaque;
 249
 250    qemu_laio_process_completions_and_submit(s);
 251}
 252
 253static void qemu_laio_completion_cb(EventNotifier *e)
 254{
 255    LinuxAioState *s = container_of(e, LinuxAioState, e);
 256
 257    if (event_notifier_test_and_clear(&s->e)) {
 258        qemu_laio_process_completions_and_submit(s);
 259    }
 260}
 261
 262static bool qemu_laio_poll_cb(void *opaque)
 263{
 264    EventNotifier *e = opaque;
 265    LinuxAioState *s = container_of(e, LinuxAioState, e);
 266    struct io_event *events;
 267
 268    if (!io_getevents_peek(s->ctx, &events)) {
 269        return false;
 270    }
 271
 272    qemu_laio_process_completions_and_submit(s);
 273    return true;
 274}
 275
 276static void laio_cancel(BlockAIOCB *blockacb)
 277{
 278    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
 279    struct io_event event;
 280    int ret;
 281
 282    if (laiocb->ret != -EINPROGRESS) {
 283        return;
 284    }
 285    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
 286    laiocb->ret = -ECANCELED;
 287    if (ret != 0) {
 288        /* iocb is not cancelled, cb will be called by the event loop later */
 289        return;
 290    }
 291
 292    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
 293}
 294
 295static const AIOCBInfo laio_aiocb_info = {
 296    .aiocb_size         = sizeof(struct qemu_laiocb),
 297    .cancel_async       = laio_cancel,
 298};
 299
 300static void ioq_init(LaioQueue *io_q)
 301{
 302    QSIMPLEQ_INIT(&io_q->pending);
 303    io_q->plugged = 0;
 304    io_q->in_queue = 0;
 305    io_q->in_flight = 0;
 306    io_q->blocked = false;
 307}
 308
 309static void ioq_submit(LinuxAioState *s)
 310{
 311    int ret, len;
 312    struct qemu_laiocb *aiocb;
 313    struct iocb *iocbs[MAX_EVENTS];
 314    QSIMPLEQ_HEAD(, qemu_laiocb) completed;
 315
 316    do {
 317        if (s->io_q.in_flight >= MAX_EVENTS) {
 318            break;
 319        }
 320        len = 0;
 321        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
 322            iocbs[len++] = &aiocb->iocb;
 323            if (s->io_q.in_flight + len >= MAX_EVENTS) {
 324                break;
 325            }
 326        }
 327
 328        ret = io_submit(s->ctx, len, iocbs);
 329        if (ret == -EAGAIN) {
 330            break;
 331        }
 332        if (ret < 0) {
 333            /* Fail the first request, retry the rest */
 334            aiocb = QSIMPLEQ_FIRST(&s->io_q.pending);
 335            QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
 336            s->io_q.in_queue--;
 337            aiocb->ret = ret;
 338            qemu_laio_process_completion(aiocb);
 339            continue;
 340        }
 341
 342        s->io_q.in_flight += ret;
 343        s->io_q.in_queue  -= ret;
 344        aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
 345        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
 346    } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
 347    s->io_q.blocked = (s->io_q.in_queue > 0);
 348
 349    if (s->io_q.in_flight) {
 350        /* We can try to complete something just right away if there are
 351         * still requests in-flight. */
 352        qemu_laio_process_completions(s);
 353        /*
 354         * Even we have completed everything (in_flight == 0), the queue can
 355         * have still pended requests (in_queue > 0).  We do not attempt to
 356         * repeat submission to avoid IO hang.  The reason is simple: s->e is
 357         * still set and completion callback will be called shortly and all
 358         * pended requests will be submitted from there.
 359         */
 360    }
 361}
 362
 363void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
 364{
 365    s->io_q.plugged++;
 366}
 367
 368void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s)
 369{
 370    assert(s->io_q.plugged);
 371    if (--s->io_q.plugged == 0 &&
 372        !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
 373        ioq_submit(s);
 374    }
 375}
 376
 377static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
 378                          int type)
 379{
 380    LinuxAioState *s = laiocb->ctx;
 381    struct iocb *iocbs = &laiocb->iocb;
 382    QEMUIOVector *qiov = laiocb->qiov;
 383
 384    switch (type) {
 385    case QEMU_AIO_WRITE:
 386        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
 387        break;
 388    case QEMU_AIO_READ:
 389        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
 390        break;
 391    /* Currently Linux kernel does not support other operations */
 392    default:
 393        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
 394                        __func__, type);
 395        return -EIO;
 396    }
 397    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
 398
 399    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
 400    s->io_q.in_queue++;
 401    if (!s->io_q.blocked &&
 402        (!s->io_q.plugged ||
 403         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
 404        ioq_submit(s);
 405    }
 406
 407    return 0;
 408}
 409
 410int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
 411                                uint64_t offset, QEMUIOVector *qiov, int type)
 412{
 413    int ret;
 414    struct qemu_laiocb laiocb = {
 415        .co         = qemu_coroutine_self(),
 416        .nbytes     = qiov->size,
 417        .ctx        = s,
 418        .ret        = -EINPROGRESS,
 419        .is_read    = (type == QEMU_AIO_READ),
 420        .qiov       = qiov,
 421    };
 422
 423    ret = laio_do_submit(fd, &laiocb, offset, type);
 424    if (ret < 0) {
 425        return ret;
 426    }
 427
 428    if (laiocb.ret == -EINPROGRESS) {
 429        qemu_coroutine_yield();
 430    }
 431    return laiocb.ret;
 432}
 433
 434BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
 435        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 436        BlockCompletionFunc *cb, void *opaque, int type)
 437{
 438    struct qemu_laiocb *laiocb;
 439    off_t offset = sector_num * BDRV_SECTOR_SIZE;
 440    int ret;
 441
 442    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
 443    laiocb->nbytes = nb_sectors * BDRV_SECTOR_SIZE;
 444    laiocb->ctx = s;
 445    laiocb->ret = -EINPROGRESS;
 446    laiocb->is_read = (type == QEMU_AIO_READ);
 447    laiocb->qiov = qiov;
 448
 449    ret = laio_do_submit(fd, laiocb, offset, type);
 450    if (ret < 0) {
 451        qemu_aio_unref(laiocb);
 452        return NULL;
 453    }
 454
 455    return &laiocb->common;
 456}
 457
 458void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
 459{
 460    aio_set_event_notifier(old_context, &s->e, false, NULL, NULL);
 461    qemu_bh_delete(s->completion_bh);
 462    s->aio_context = NULL;
 463}
 464
 465void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
 466{
 467    s->aio_context = new_context;
 468    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
 469    aio_set_event_notifier(new_context, &s->e, false,
 470                           qemu_laio_completion_cb,
 471                           qemu_laio_poll_cb);
 472}
 473
 474LinuxAioState *laio_init(Error **errp)
 475{
 476    int rc;
 477    LinuxAioState *s;
 478
 479    s = g_malloc0(sizeof(*s));
 480    rc = event_notifier_init(&s->e, false);
 481    if (rc < 0) {
 482        error_setg_errno(errp, -rc, "failed to to initialize event notifier");
 483        goto out_free_state;
 484    }
 485
 486    rc = io_setup(MAX_EVENTS, &s->ctx);
 487    if (rc < 0) {
 488        error_setg_errno(errp, -rc, "failed to create linux AIO context");
 489        goto out_close_efd;
 490    }
 491
 492    ioq_init(&s->io_q);
 493
 494    return s;
 495
 496out_close_efd:
 497    event_notifier_cleanup(&s->e);
 498out_free_state:
 499    g_free(s);
 500    return NULL;
 501}
 502
 503void laio_cleanup(LinuxAioState *s)
 504{
 505    event_notifier_cleanup(&s->e);
 506
 507    if (io_destroy(s->ctx) != 0) {
 508        fprintf(stderr, "%s: destroy AIO context %p failed\n",
 509                        __func__, &s->ctx);
 510    }
 511    g_free(s);
 512}
 513