qemu/block/linux-aio.c
<<
>>
Prefs
   1/*
   2 * Linux native AIO support.
   3 *
   4 * Copyright (C) 2009 IBM, Corp.
   5 * Copyright (C) 2009 Red Hat, Inc.
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2 or later.
   8 * See the COPYING file in the top-level directory.
   9 */
  10#include "qemu/osdep.h"
  11#include "qemu-common.h"
  12#include "block/aio.h"
  13#include "qemu/queue.h"
  14#include "block/raw-aio.h"
  15#include "qemu/event_notifier.h"
  16
  17#include <libaio.h>
  18
  19/*
  20 * Queue size (per-device).
  21 *
  22 * XXX: eventually we need to communicate this to the guest and/or make it
  23 *      tunable by the guest.  If we get more outstanding requests at a time
  24 *      than this we will get EAGAIN from io_submit which is communicated to
  25 *      the guest as an I/O error.
  26 */
  27#define MAX_EVENTS 128
  28
  29#define MAX_QUEUED_IO  128
  30
  31struct qemu_laiocb {
  32    BlockAIOCB common;
  33    struct qemu_laio_state *ctx;
  34    struct iocb iocb;
  35    ssize_t ret;
  36    size_t nbytes;
  37    QEMUIOVector *qiov;
  38    bool is_read;
  39    QSIMPLEQ_ENTRY(qemu_laiocb) next;
  40};
  41
  42typedef struct {
  43    int plugged;
  44    unsigned int n;
  45    bool blocked;
  46    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
  47} LaioQueue;
  48
  49struct qemu_laio_state {
  50    io_context_t ctx;
  51    EventNotifier e;
  52
  53    /* io queue for submit at batch */
  54    LaioQueue io_q;
  55
  56    /* I/O completion processing */
  57    QEMUBH *completion_bh;
  58    struct io_event events[MAX_EVENTS];
  59    int event_idx;
  60    int event_max;
  61};
  62
  63static void ioq_submit(struct qemu_laio_state *s);
  64
  65static inline ssize_t io_event_ret(struct io_event *ev)
  66{
  67    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
  68}
  69
  70/*
  71 * Completes an AIO request (calls the callback and frees the ACB).
  72 */
  73static void qemu_laio_process_completion(struct qemu_laio_state *s,
  74    struct qemu_laiocb *laiocb)
  75{
  76    int ret;
  77
  78    ret = laiocb->ret;
  79    if (ret != -ECANCELED) {
  80        if (ret == laiocb->nbytes) {
  81            ret = 0;
  82        } else if (ret >= 0) {
  83            /* Short reads mean EOF, pad with zeros. */
  84            if (laiocb->is_read) {
  85                qemu_iovec_memset(laiocb->qiov, ret, 0,
  86                    laiocb->qiov->size - ret);
  87            } else {
  88                ret = -EINVAL;
  89            }
  90        }
  91    }
  92    laiocb->common.cb(laiocb->common.opaque, ret);
  93
  94    qemu_aio_unref(laiocb);
  95}
  96
  97/* The completion BH fetches completed I/O requests and invokes their
  98 * callbacks.
  99 *
 100 * The function is somewhat tricky because it supports nested event loops, for
 101 * example when a request callback invokes aio_poll().  In order to do this,
 102 * the completion events array and index are kept in qemu_laio_state.  The BH
 103 * reschedules itself as long as there are completions pending so it will
 104 * either be called again in a nested event loop or will be called after all
 105 * events have been completed.  When there are no events left to complete, the
 106 * BH returns without rescheduling.
 107 */
 108static void qemu_laio_completion_bh(void *opaque)
 109{
 110    struct qemu_laio_state *s = opaque;
 111
 112    /* Fetch more completion events when empty */
 113    if (s->event_idx == s->event_max) {
 114        do {
 115            struct timespec ts = { 0 };
 116            s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
 117                                        s->events, &ts);
 118        } while (s->event_max == -EINTR);
 119
 120        s->event_idx = 0;
 121        if (s->event_max <= 0) {
 122            s->event_max = 0;
 123            return; /* no more events */
 124        }
 125    }
 126
 127    /* Reschedule so nested event loops see currently pending completions */
 128    qemu_bh_schedule(s->completion_bh);
 129
 130    /* Process completion events */
 131    while (s->event_idx < s->event_max) {
 132        struct iocb *iocb = s->events[s->event_idx].obj;
 133        struct qemu_laiocb *laiocb =
 134                container_of(iocb, struct qemu_laiocb, iocb);
 135
 136        laiocb->ret = io_event_ret(&s->events[s->event_idx]);
 137        s->event_idx++;
 138
 139        qemu_laio_process_completion(s, laiocb);
 140    }
 141
 142    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
 143        ioq_submit(s);
 144    }
 145}
 146
 147static void qemu_laio_completion_cb(EventNotifier *e)
 148{
 149    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
 150
 151    if (event_notifier_test_and_clear(&s->e)) {
 152        qemu_bh_schedule(s->completion_bh);
 153    }
 154}
 155
 156static void laio_cancel(BlockAIOCB *blockacb)
 157{
 158    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
 159    struct io_event event;
 160    int ret;
 161
 162    if (laiocb->ret != -EINPROGRESS) {
 163        return;
 164    }
 165    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
 166    laiocb->ret = -ECANCELED;
 167    if (ret != 0) {
 168        /* iocb is not cancelled, cb will be called by the event loop later */
 169        return;
 170    }
 171
 172    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
 173}
 174
 175static const AIOCBInfo laio_aiocb_info = {
 176    .aiocb_size         = sizeof(struct qemu_laiocb),
 177    .cancel_async       = laio_cancel,
 178};
 179
 180static void ioq_init(LaioQueue *io_q)
 181{
 182    QSIMPLEQ_INIT(&io_q->pending);
 183    io_q->plugged = 0;
 184    io_q->n = 0;
 185    io_q->blocked = false;
 186}
 187
 188static void ioq_submit(struct qemu_laio_state *s)
 189{
 190    int ret, len;
 191    struct qemu_laiocb *aiocb;
 192    struct iocb *iocbs[MAX_QUEUED_IO];
 193    QSIMPLEQ_HEAD(, qemu_laiocb) completed;
 194
 195    do {
 196        len = 0;
 197        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
 198            iocbs[len++] = &aiocb->iocb;
 199            if (len == MAX_QUEUED_IO) {
 200                break;
 201            }
 202        }
 203
 204        ret = io_submit(s->ctx, len, iocbs);
 205        if (ret == -EAGAIN) {
 206            break;
 207        }
 208        if (ret < 0) {
 209            abort();
 210        }
 211
 212        s->io_q.n -= ret;
 213        aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
 214        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
 215    } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
 216    s->io_q.blocked = (s->io_q.n > 0);
 217}
 218
 219void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
 220{
 221    struct qemu_laio_state *s = aio_ctx;
 222
 223    s->io_q.plugged++;
 224}
 225
 226void laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
 227{
 228    struct qemu_laio_state *s = aio_ctx;
 229
 230    assert(s->io_q.plugged > 0 || !unplug);
 231
 232    if (unplug && --s->io_q.plugged > 0) {
 233        return;
 234    }
 235
 236    if (!s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
 237        ioq_submit(s);
 238    }
 239}
 240
 241BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
 242        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 243        BlockCompletionFunc *cb, void *opaque, int type)
 244{
 245    struct qemu_laio_state *s = aio_ctx;
 246    struct qemu_laiocb *laiocb;
 247    struct iocb *iocbs;
 248    off_t offset = sector_num * 512;
 249
 250    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
 251    laiocb->nbytes = nb_sectors * 512;
 252    laiocb->ctx = s;
 253    laiocb->ret = -EINPROGRESS;
 254    laiocb->is_read = (type == QEMU_AIO_READ);
 255    laiocb->qiov = qiov;
 256
 257    iocbs = &laiocb->iocb;
 258
 259    switch (type) {
 260    case QEMU_AIO_WRITE:
 261        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
 262        break;
 263    case QEMU_AIO_READ:
 264        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
 265        break;
 266    /* Currently Linux kernel does not support other operations */
 267    default:
 268        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
 269                        __func__, type);
 270        goto out_free_aiocb;
 271    }
 272    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
 273
 274    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
 275    s->io_q.n++;
 276    if (!s->io_q.blocked &&
 277        (!s->io_q.plugged || s->io_q.n >= MAX_QUEUED_IO)) {
 278        ioq_submit(s);
 279    }
 280    return &laiocb->common;
 281
 282out_free_aiocb:
 283    qemu_aio_unref(laiocb);
 284    return NULL;
 285}
 286
 287void laio_detach_aio_context(void *s_, AioContext *old_context)
 288{
 289    struct qemu_laio_state *s = s_;
 290
 291    aio_set_event_notifier(old_context, &s->e, false, NULL);
 292    qemu_bh_delete(s->completion_bh);
 293}
 294
 295void laio_attach_aio_context(void *s_, AioContext *new_context)
 296{
 297    struct qemu_laio_state *s = s_;
 298
 299    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
 300    aio_set_event_notifier(new_context, &s->e, false,
 301                           qemu_laio_completion_cb);
 302}
 303
 304void *laio_init(void)
 305{
 306    struct qemu_laio_state *s;
 307
 308    s = g_malloc0(sizeof(*s));
 309    if (event_notifier_init(&s->e, false) < 0) {
 310        goto out_free_state;
 311    }
 312
 313    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
 314        goto out_close_efd;
 315    }
 316
 317    ioq_init(&s->io_q);
 318
 319    return s;
 320
 321out_close_efd:
 322    event_notifier_cleanup(&s->e);
 323out_free_state:
 324    g_free(s);
 325    return NULL;
 326}
 327
 328void laio_cleanup(void *s_)
 329{
 330    struct qemu_laio_state *s = s_;
 331
 332    event_notifier_cleanup(&s->e);
 333
 334    if (io_destroy(s->ctx) != 0) {
 335        fprintf(stderr, "%s: destroy AIO context %p failed\n",
 336                        __func__, &s->ctx);
 337    }
 338    g_free(s);
 339}
 340