qemu/block/linux-aio.c
<<
>>
Prefs
   1/*
   2 * Linux native AIO support.
   3 *
   4 * Copyright (C) 2009 IBM, Corp.
   5 * Copyright (C) 2009 Red Hat, Inc.
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2 or later.
   8 * See the COPYING file in the top-level directory.
   9 */
  10#include "qemu-common.h"
  11#include "block/aio.h"
  12#include "qemu/queue.h"
  13#include "block/raw-aio.h"
  14#include "qemu/event_notifier.h"
  15
  16#include <libaio.h>
  17
  18/*
  19 * Queue size (per-device).
  20 *
  21 * XXX: eventually we need to communicate this to the guest and/or make it
  22 *      tunable by the guest.  If we get more outstanding requests at a time
  23 *      than this we will get EAGAIN from io_submit which is communicated to
  24 *      the guest as an I/O error.
  25 */
  26#define MAX_EVENTS 128
  27
  28#define MAX_QUEUED_IO  128
  29
  30struct qemu_laiocb {
  31    BlockAIOCB common;
  32    struct qemu_laio_state *ctx;
  33    struct iocb iocb;
  34    ssize_t ret;
  35    size_t nbytes;
  36    QEMUIOVector *qiov;
  37    bool is_read;
  38    QSIMPLEQ_ENTRY(qemu_laiocb) next;
  39};
  40
  41typedef struct {
  42    int plugged;
  43    unsigned int n;
  44    bool blocked;
  45    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
  46} LaioQueue;
  47
  48struct qemu_laio_state {
  49    io_context_t ctx;
  50    EventNotifier e;
  51
  52    /* io queue for submit at batch */
  53    LaioQueue io_q;
  54
  55    /* I/O completion processing */
  56    QEMUBH *completion_bh;
  57    struct io_event events[MAX_EVENTS];
  58    int event_idx;
  59    int event_max;
  60};
  61
  62static void ioq_submit(struct qemu_laio_state *s);
  63
  64static inline ssize_t io_event_ret(struct io_event *ev)
  65{
  66    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
  67}
  68
  69/*
  70 * Completes an AIO request (calls the callback and frees the ACB).
  71 */
  72static void qemu_laio_process_completion(struct qemu_laio_state *s,
  73    struct qemu_laiocb *laiocb)
  74{
  75    int ret;
  76
  77    ret = laiocb->ret;
  78    if (ret != -ECANCELED) {
  79        if (ret == laiocb->nbytes) {
  80            ret = 0;
  81        } else if (ret >= 0) {
  82            /* Short reads mean EOF, pad with zeros. */
  83            if (laiocb->is_read) {
  84                qemu_iovec_memset(laiocb->qiov, ret, 0,
  85                    laiocb->qiov->size - ret);
  86            } else {
  87                ret = -EINVAL;
  88            }
  89        }
  90    }
  91    laiocb->common.cb(laiocb->common.opaque, ret);
  92
  93    qemu_aio_unref(laiocb);
  94}
  95
  96/* The completion BH fetches completed I/O requests and invokes their
  97 * callbacks.
  98 *
  99 * The function is somewhat tricky because it supports nested event loops, for
 100 * example when a request callback invokes aio_poll().  In order to do this,
 101 * the completion events array and index are kept in qemu_laio_state.  The BH
 102 * reschedules itself as long as there are completions pending so it will
 103 * either be called again in a nested event loop or will be called after all
 104 * events have been completed.  When there are no events left to complete, the
 105 * BH returns without rescheduling.
 106 */
 107static void qemu_laio_completion_bh(void *opaque)
 108{
 109    struct qemu_laio_state *s = opaque;
 110
 111    /* Fetch more completion events when empty */
 112    if (s->event_idx == s->event_max) {
 113        do {
 114            struct timespec ts = { 0 };
 115            s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
 116                                        s->events, &ts);
 117        } while (s->event_max == -EINTR);
 118
 119        s->event_idx = 0;
 120        if (s->event_max <= 0) {
 121            s->event_max = 0;
 122            return; /* no more events */
 123        }
 124    }
 125
 126    /* Reschedule so nested event loops see currently pending completions */
 127    qemu_bh_schedule(s->completion_bh);
 128
 129    /* Process completion events */
 130    while (s->event_idx < s->event_max) {
 131        struct iocb *iocb = s->events[s->event_idx].obj;
 132        struct qemu_laiocb *laiocb =
 133                container_of(iocb, struct qemu_laiocb, iocb);
 134
 135        laiocb->ret = io_event_ret(&s->events[s->event_idx]);
 136        s->event_idx++;
 137
 138        qemu_laio_process_completion(s, laiocb);
 139    }
 140
 141    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
 142        ioq_submit(s);
 143    }
 144}
 145
 146static void qemu_laio_completion_cb(EventNotifier *e)
 147{
 148    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
 149
 150    if (event_notifier_test_and_clear(&s->e)) {
 151        qemu_bh_schedule(s->completion_bh);
 152    }
 153}
 154
 155static void laio_cancel(BlockAIOCB *blockacb)
 156{
 157    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
 158    struct io_event event;
 159    int ret;
 160
 161    if (laiocb->ret != -EINPROGRESS) {
 162        return;
 163    }
 164    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
 165    laiocb->ret = -ECANCELED;
 166    if (ret != 0) {
 167        /* iocb is not cancelled, cb will be called by the event loop later */
 168        return;
 169    }
 170
 171    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
 172}
 173
 174static const AIOCBInfo laio_aiocb_info = {
 175    .aiocb_size         = sizeof(struct qemu_laiocb),
 176    .cancel_async       = laio_cancel,
 177};
 178
 179static void ioq_init(LaioQueue *io_q)
 180{
 181    QSIMPLEQ_INIT(&io_q->pending);
 182    io_q->plugged = 0;
 183    io_q->n = 0;
 184    io_q->blocked = false;
 185}
 186
 187static void ioq_submit(struct qemu_laio_state *s)
 188{
 189    int ret, len;
 190    struct qemu_laiocb *aiocb;
 191    struct iocb *iocbs[MAX_QUEUED_IO];
 192    QSIMPLEQ_HEAD(, qemu_laiocb) completed;
 193
 194    do {
 195        len = 0;
 196        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
 197            iocbs[len++] = &aiocb->iocb;
 198            if (len == MAX_QUEUED_IO) {
 199                break;
 200            }
 201        }
 202
 203        ret = io_submit(s->ctx, len, iocbs);
 204        if (ret == -EAGAIN) {
 205            break;
 206        }
 207        if (ret < 0) {
 208            abort();
 209        }
 210
 211        s->io_q.n -= ret;
 212        aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
 213        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
 214    } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
 215    s->io_q.blocked = (s->io_q.n > 0);
 216}
 217
 218void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
 219{
 220    struct qemu_laio_state *s = aio_ctx;
 221
 222    s->io_q.plugged++;
 223}
 224
 225void laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
 226{
 227    struct qemu_laio_state *s = aio_ctx;
 228
 229    assert(s->io_q.plugged > 0 || !unplug);
 230
 231    if (unplug && --s->io_q.plugged > 0) {
 232        return;
 233    }
 234
 235    if (!s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
 236        ioq_submit(s);
 237    }
 238}
 239
 240BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
 241        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 242        BlockCompletionFunc *cb, void *opaque, int type)
 243{
 244    struct qemu_laio_state *s = aio_ctx;
 245    struct qemu_laiocb *laiocb;
 246    struct iocb *iocbs;
 247    off_t offset = sector_num * 512;
 248
 249    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
 250    laiocb->nbytes = nb_sectors * 512;
 251    laiocb->ctx = s;
 252    laiocb->ret = -EINPROGRESS;
 253    laiocb->is_read = (type == QEMU_AIO_READ);
 254    laiocb->qiov = qiov;
 255
 256    iocbs = &laiocb->iocb;
 257
 258    switch (type) {
 259    case QEMU_AIO_WRITE:
 260        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
 261        break;
 262    case QEMU_AIO_READ:
 263        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
 264        break;
 265    /* Currently Linux kernel does not support other operations */
 266    default:
 267        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
 268                        __func__, type);
 269        goto out_free_aiocb;
 270    }
 271    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
 272
 273    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
 274    s->io_q.n++;
 275    if (!s->io_q.blocked &&
 276        (!s->io_q.plugged || s->io_q.n >= MAX_QUEUED_IO)) {
 277        ioq_submit(s);
 278    }
 279    return &laiocb->common;
 280
 281out_free_aiocb:
 282    qemu_aio_unref(laiocb);
 283    return NULL;
 284}
 285
 286void laio_detach_aio_context(void *s_, AioContext *old_context)
 287{
 288    struct qemu_laio_state *s = s_;
 289
 290    aio_set_event_notifier(old_context, &s->e, false, NULL);
 291    qemu_bh_delete(s->completion_bh);
 292}
 293
 294void laio_attach_aio_context(void *s_, AioContext *new_context)
 295{
 296    struct qemu_laio_state *s = s_;
 297
 298    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
 299    aio_set_event_notifier(new_context, &s->e, false,
 300                           qemu_laio_completion_cb);
 301}
 302
 303void *laio_init(void)
 304{
 305    struct qemu_laio_state *s;
 306
 307    s = g_malloc0(sizeof(*s));
 308    if (event_notifier_init(&s->e, false) < 0) {
 309        goto out_free_state;
 310    }
 311
 312    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
 313        goto out_close_efd;
 314    }
 315
 316    ioq_init(&s->io_q);
 317
 318    return s;
 319
 320out_close_efd:
 321    event_notifier_cleanup(&s->e);
 322out_free_state:
 323    g_free(s);
 324    return NULL;
 325}
 326
 327void laio_cleanup(void *s_)
 328{
 329    struct qemu_laio_state *s = s_;
 330
 331    event_notifier_cleanup(&s->e);
 332
 333    if (io_destroy(s->ctx) != 0) {
 334        fprintf(stderr, "%s: destroy AIO context %p failed\n",
 335                        __func__, &s->ctx);
 336    }
 337    g_free(s);
 338}
 339