qemu/linux-aio.c
<<
>>
Prefs
   1/*
   2 * Linux native AIO support.
   3 *
   4 * Copyright (C) 2009 IBM, Corp.
   5 * Copyright (C) 2009 Red Hat, Inc.
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2 or later.
   8 * See the COPYING file in the top-level directory.
   9 */
  10#include "qemu-common.h"
  11#include "qemu-aio.h"
  12#include "block_int.h"
  13#include "block/raw-posix-aio.h"
  14
  15#include <sys/eventfd.h>
  16#include <libaio.h>
  17
  18/*
  19 * Queue size (per-device).
  20 *
  21 * XXX: eventually we need to communicate this to the guest and/or make it
  22 *      tunable by the guest.  If we get more outstanding requests at a time
  23 *      than this we will get EAGAIN from io_submit which is communicated to
  24 *      the guest as an I/O error.
  25 */
  26#define MAX_EVENTS 128
  27
  28struct qemu_laiocb {
  29    BlockDriverAIOCB common;
  30    struct qemu_laio_state *ctx;
  31    struct iocb iocb;
  32    ssize_t ret;
  33    size_t nbytes;
  34    int async_context_id;
  35    QLIST_ENTRY(qemu_laiocb) node;
  36};
  37
  38struct qemu_laio_state {
  39    io_context_t ctx;
  40    int efd;
  41    int count;
  42    QLIST_HEAD(, qemu_laiocb) completed_reqs;
  43};
  44
  45static inline ssize_t io_event_ret(struct io_event *ev)
  46{
  47    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
  48}
  49
  50/*
  51 * Completes an AIO request (calls the callback and frees the ACB).
  52 * Be sure to be in the right AsyncContext before calling this function.
  53 */
  54static void qemu_laio_process_completion(struct qemu_laio_state *s,
  55    struct qemu_laiocb *laiocb)
  56{
  57    int ret;
  58
  59    s->count--;
  60
  61    ret = laiocb->ret;
  62    if (ret != -ECANCELED) {
  63        if (ret == laiocb->nbytes)
  64            ret = 0;
  65        else if (ret >= 0)
  66            ret = -EINVAL;
  67
  68        laiocb->common.cb(laiocb->common.opaque, ret);
  69    }
  70
  71    qemu_aio_release(laiocb);
  72}
  73
  74/*
  75 * Processes all queued AIO requests, i.e. requests that have return from OS
  76 * but their callback was not called yet. Requests that cannot have their
  77 * callback called in the current AsyncContext, remain in the queue.
  78 *
  79 * Returns 1 if at least one request could be completed, 0 otherwise.
  80 */
  81static int qemu_laio_process_requests(void *opaque)
  82{
  83    struct qemu_laio_state *s = opaque;
  84    struct qemu_laiocb *laiocb, *next;
  85    int res = 0;
  86
  87    QLIST_FOREACH_SAFE (laiocb, &s->completed_reqs, node, next) {
  88        if (laiocb->async_context_id == get_async_context_id()) {
  89            qemu_laio_process_completion(s, laiocb);
  90            QLIST_REMOVE(laiocb, node);
  91            res = 1;
  92        }
  93    }
  94
  95    return res;
  96}
  97
  98/*
  99 * Puts a request in the completion queue so that its callback is called the
 100 * next time when it's possible. If we already are in the right AsyncContext,
 101 * the request is completed immediately instead.
 102 */
 103static void qemu_laio_enqueue_completed(struct qemu_laio_state *s,
 104    struct qemu_laiocb* laiocb)
 105{
 106    if (laiocb->async_context_id == get_async_context_id()) {
 107        qemu_laio_process_completion(s, laiocb);
 108    } else {
 109        QLIST_INSERT_HEAD(&s->completed_reqs, laiocb, node);
 110    }
 111}
 112
 113static void qemu_laio_completion_cb(void *opaque)
 114{
 115    struct qemu_laio_state *s = opaque;
 116
 117    while (1) {
 118        struct io_event events[MAX_EVENTS];
 119        uint64_t val;
 120        ssize_t ret;
 121        struct timespec ts = { 0 };
 122        int nevents, i;
 123
 124        do {
 125            ret = read(s->efd, &val, sizeof(val));
 126        } while (ret == -1 && errno == EINTR);
 127
 128        if (ret == -1 && errno == EAGAIN)
 129            break;
 130
 131        if (ret != 8)
 132            break;
 133
 134        do {
 135            nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts);
 136        } while (nevents == -EINTR);
 137
 138        for (i = 0; i < nevents; i++) {
 139            struct iocb *iocb = events[i].obj;
 140            struct qemu_laiocb *laiocb =
 141                    container_of(iocb, struct qemu_laiocb, iocb);
 142
 143            laiocb->ret = io_event_ret(&events[i]);
 144            qemu_laio_enqueue_completed(s, laiocb);
 145        }
 146    }
 147}
 148
 149static int qemu_laio_flush_cb(void *opaque)
 150{
 151    struct qemu_laio_state *s = opaque;
 152
 153    return (s->count > 0) ? 1 : 0;
 154}
 155
 156static void laio_cancel(BlockDriverAIOCB *blockacb)
 157{
 158    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
 159    struct io_event event;
 160    int ret;
 161
 162    if (laiocb->ret != -EINPROGRESS)
 163        return;
 164
 165    /*
 166     * Note that as of Linux 2.6.31 neither the block device code nor any
 167     * filesystem implements cancellation of AIO request.
 168     * Thus the polling loop below is the normal code path.
 169     */
 170    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
 171    if (ret == 0) {
 172        laiocb->ret = -ECANCELED;
 173        return;
 174    }
 175
 176    /*
 177     * We have to wait for the iocb to finish.
 178     *
 179     * The only way to get the iocb status update is by polling the io context.
 180     * We might be able to do this slightly more optimal by removing the
 181     * O_NONBLOCK flag.
 182     */
 183    while (laiocb->ret == -EINPROGRESS)
 184        qemu_laio_completion_cb(laiocb->ctx);
 185}
 186
 187static AIOPool laio_pool = {
 188    .aiocb_size         = sizeof(struct qemu_laiocb),
 189    .cancel             = laio_cancel,
 190};
 191
 192BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
 193        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 194        BlockDriverCompletionFunc *cb, void *opaque, int type)
 195{
 196    struct qemu_laio_state *s = aio_ctx;
 197    struct qemu_laiocb *laiocb;
 198    struct iocb *iocbs;
 199    off_t offset = sector_num * 512;
 200
 201    laiocb = qemu_aio_get(&laio_pool, bs, cb, opaque);
 202    if (!laiocb)
 203        return NULL;
 204    laiocb->nbytes = nb_sectors * 512;
 205    laiocb->ctx = s;
 206    laiocb->ret = -EINPROGRESS;
 207    laiocb->async_context_id = get_async_context_id();
 208
 209    iocbs = &laiocb->iocb;
 210
 211    switch (type) {
 212    case QEMU_AIO_WRITE:
 213        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
 214        break;
 215    case QEMU_AIO_READ:
 216        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
 217        break;
 218    default:
 219        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
 220                        __func__, type);
 221        goto out_free_aiocb;
 222    }
 223    io_set_eventfd(&laiocb->iocb, s->efd);
 224    s->count++;
 225
 226    if (io_submit(s->ctx, 1, &iocbs) < 0)
 227        goto out_dec_count;
 228    return &laiocb->common;
 229
 230out_free_aiocb:
 231    qemu_aio_release(laiocb);
 232out_dec_count:
 233    s->count--;
 234    return NULL;
 235}
 236
 237void *laio_init(void)
 238{
 239    struct qemu_laio_state *s;
 240
 241    s = qemu_mallocz(sizeof(*s));
 242    QLIST_INIT(&s->completed_reqs);
 243    s->efd = eventfd(0, 0);
 244    if (s->efd == -1)
 245        goto out_free_state;
 246    fcntl(s->efd, F_SETFL, O_NONBLOCK);
 247
 248    if (io_setup(MAX_EVENTS, &s->ctx) != 0)
 249        goto out_close_efd;
 250
 251    qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, NULL,
 252        qemu_laio_flush_cb, qemu_laio_process_requests, s);
 253
 254    return s;
 255
 256out_close_efd:
 257    close(s->efd);
 258out_free_state:
 259    qemu_free(s);
 260    return NULL;
 261}
 262