qemu/block/io_uring.c
<<
>>
Prefs
   1/*
   2 * Linux io_uring support.
   3 *
   4 * Copyright (C) 2009 IBM, Corp.
   5 * Copyright (C) 2009 Red Hat, Inc.
   6 * Copyright (C) 2019 Aarushi Mehta
   7 *
   8 * This work is licensed under the terms of the GNU GPL, version 2 or later.
   9 * See the COPYING file in the top-level directory.
  10 */
  11#include "qemu/osdep.h"
  12#include <liburing.h>
  13#include "qemu-common.h"
  14#include "block/aio.h"
  15#include "qemu/queue.h"
  16#include "block/block.h"
  17#include "block/raw-aio.h"
  18#include "qemu/coroutine.h"
  19#include "qapi/error.h"
  20#include "trace.h"
  21
  22/* io_uring ring size */
  23#define MAX_ENTRIES 128
  24
  25typedef struct LuringAIOCB {
  26    Coroutine *co;
  27    struct io_uring_sqe sqeq;
  28    ssize_t ret;
  29    QEMUIOVector *qiov;
  30    bool is_read;
  31    QSIMPLEQ_ENTRY(LuringAIOCB) next;
  32
  33    /*
  34     * Buffered reads may require resubmission, see
  35     * luring_resubmit_short_read().
  36     */
  37    int total_read;
  38    QEMUIOVector resubmit_qiov;
  39} LuringAIOCB;
  40
  41typedef struct LuringQueue {
  42    int plugged;
  43    unsigned int in_queue;
  44    unsigned int in_flight;
  45    bool blocked;
  46    QSIMPLEQ_HEAD(, LuringAIOCB) submit_queue;
  47} LuringQueue;
  48
  49typedef struct LuringState {
  50    AioContext *aio_context;
  51
  52    struct io_uring ring;
  53
  54    /* io queue for submit at batch.  Protected by AioContext lock. */
  55    LuringQueue io_q;
  56
  57    /* I/O completion processing.  Only runs in I/O thread.  */
  58    QEMUBH *completion_bh;
  59} LuringState;
  60
  61/**
  62 * luring_resubmit:
  63 *
  64 * Resubmit a request by appending it to submit_queue.  The caller must ensure
  65 * that ioq_submit() is called later so that submit_queue requests are started.
  66 */
  67static void luring_resubmit(LuringState *s, LuringAIOCB *luringcb)
  68{
  69    QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
  70    s->io_q.in_queue++;
  71}
  72
  73/**
  74 * luring_resubmit_short_read:
  75 *
  76 * Before Linux commit 9d93a3f5a0c ("io_uring: punt short reads to async
  77 * context") a buffered I/O request with the start of the file range in the
  78 * page cache could result in a short read.  Applications need to resubmit the
  79 * remaining read request.
  80 *
  81 * This is a slow path but recent kernels never take it.
  82 */
  83static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb,
  84                                       int nread)
  85{
  86    QEMUIOVector *resubmit_qiov;
  87    size_t remaining;
  88
  89    trace_luring_resubmit_short_read(s, luringcb, nread);
  90
  91    /* Update read position */
  92    luringcb->total_read = nread;
  93    remaining = luringcb->qiov->size - luringcb->total_read;
  94
  95    /* Shorten qiov */
  96    resubmit_qiov = &luringcb->resubmit_qiov;
  97    if (resubmit_qiov->iov == NULL) {
  98        qemu_iovec_init(resubmit_qiov, luringcb->qiov->niov);
  99    } else {
 100        qemu_iovec_reset(resubmit_qiov);
 101    }
 102    qemu_iovec_concat(resubmit_qiov, luringcb->qiov, luringcb->total_read,
 103                      remaining);
 104
 105    /* Update sqe */
 106    luringcb->sqeq.off = nread;
 107    luringcb->sqeq.addr = (__u64)(uintptr_t)luringcb->resubmit_qiov.iov;
 108    luringcb->sqeq.len = luringcb->resubmit_qiov.niov;
 109
 110    luring_resubmit(s, luringcb);
 111}
 112
 113/**
 114 * luring_process_completions:
 115 * @s: AIO state
 116 *
 117 * Fetches completed I/O requests, consumes cqes and invokes their callbacks
 118 * The function is somewhat tricky because it supports nested event loops, for
 119 * example when a request callback invokes aio_poll().
 120 *
 121 * Function schedules BH completion so it  can be called again in a nested
 122 * event loop.  When there are no events left  to complete the BH is being
 123 * canceled.
 124 *
 125 */
 126static void luring_process_completions(LuringState *s)
 127{
 128    struct io_uring_cqe *cqes;
 129    int total_bytes;
 130    /*
 131     * Request completion callbacks can run the nested event loop.
 132     * Schedule ourselves so the nested event loop will "see" remaining
 133     * completed requests and process them.  Without this, completion
 134     * callbacks that wait for other requests using a nested event loop
 135     * would hang forever.
 136     *
 137     * This workaround is needed because io_uring uses poll_wait, which
 138     * is woken up when new events are added to the uring, thus polling on
 139     * the same uring fd will block unless more events are received.
 140     *
 141     * Other leaf block drivers (drivers that access the data themselves)
 142     * are networking based, so they poll sockets for data and run the
 143     * correct coroutine.
 144     */
 145    qemu_bh_schedule(s->completion_bh);
 146
 147    while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {
 148        LuringAIOCB *luringcb;
 149        int ret;
 150
 151        if (!cqes) {
 152            break;
 153        }
 154
 155        luringcb = io_uring_cqe_get_data(cqes);
 156        ret = cqes->res;
 157        io_uring_cqe_seen(&s->ring, cqes);
 158        cqes = NULL;
 159
 160        /* Change counters one-by-one because we can be nested. */
 161        s->io_q.in_flight--;
 162        trace_luring_process_completion(s, luringcb, ret);
 163
 164        /* total_read is non-zero only for resubmitted read requests */
 165        total_bytes = ret + luringcb->total_read;
 166
 167        if (ret < 0) {
 168            if (ret == -EINTR) {
 169                luring_resubmit(s, luringcb);
 170                continue;
 171            }
 172        } else if (!luringcb->qiov) {
 173            goto end;
 174        } else if (total_bytes == luringcb->qiov->size) {
 175            ret = 0;
 176        /* Only read/write */
 177        } else {
 178            /* Short Read/Write */
 179            if (luringcb->is_read) {
 180                if (ret > 0) {
 181                    luring_resubmit_short_read(s, luringcb, ret);
 182                    continue;
 183                } else {
 184                    /* Pad with zeroes */
 185                    qemu_iovec_memset(luringcb->qiov, total_bytes, 0,
 186                                      luringcb->qiov->size - total_bytes);
 187                    ret = 0;
 188                }
 189            } else {
 190                ret = -ENOSPC;
 191            }
 192        }
 193end:
 194        luringcb->ret = ret;
 195        qemu_iovec_destroy(&luringcb->resubmit_qiov);
 196
 197        /*
 198         * If the coroutine is already entered it must be in ioq_submit()
 199         * and will notice luringcb->ret has been filled in when it
 200         * eventually runs later. Coroutines cannot be entered recursively
 201         * so avoid doing that!
 202         */
 203        if (!qemu_coroutine_entered(luringcb->co)) {
 204            aio_co_wake(luringcb->co);
 205        }
 206    }
 207    qemu_bh_cancel(s->completion_bh);
 208}
 209
 210static int ioq_submit(LuringState *s)
 211{
 212    int ret = 0;
 213    LuringAIOCB *luringcb, *luringcb_next;
 214
 215    while (s->io_q.in_queue > 0) {
 216        /*
 217         * Try to fetch sqes from the ring for requests waiting in
 218         * the overflow queue
 219         */
 220        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.submit_queue, next,
 221                              luringcb_next) {
 222            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
 223            if (!sqes) {
 224                break;
 225            }
 226            /* Prep sqe for submission */
 227            *sqes = luringcb->sqeq;
 228            QSIMPLEQ_REMOVE_HEAD(&s->io_q.submit_queue, next);
 229        }
 230        ret = io_uring_submit(&s->ring);
 231        trace_luring_io_uring_submit(s, ret);
 232        /* Prevent infinite loop if submission is refused */
 233        if (ret <= 0) {
 234            if (ret == -EAGAIN || ret == -EINTR) {
 235                continue;
 236            }
 237            break;
 238        }
 239        s->io_q.in_flight += ret;
 240        s->io_q.in_queue  -= ret;
 241    }
 242    s->io_q.blocked = (s->io_q.in_queue > 0);
 243
 244    if (s->io_q.in_flight) {
 245        /*
 246         * We can try to complete something just right away if there are
 247         * still requests in-flight.
 248         */
 249        luring_process_completions(s);
 250    }
 251    return ret;
 252}
 253
 254static void luring_process_completions_and_submit(LuringState *s)
 255{
 256    aio_context_acquire(s->aio_context);
 257    luring_process_completions(s);
 258
 259    if (!s->io_q.plugged && s->io_q.in_queue > 0) {
 260        ioq_submit(s);
 261    }
 262    aio_context_release(s->aio_context);
 263}
 264
 265static void qemu_luring_completion_bh(void *opaque)
 266{
 267    LuringState *s = opaque;
 268    luring_process_completions_and_submit(s);
 269}
 270
 271static void qemu_luring_completion_cb(void *opaque)
 272{
 273    LuringState *s = opaque;
 274    luring_process_completions_and_submit(s);
 275}
 276
 277static bool qemu_luring_poll_cb(void *opaque)
 278{
 279    LuringState *s = opaque;
 280
 281    if (io_uring_cq_ready(&s->ring)) {
 282        luring_process_completions_and_submit(s);
 283        return true;
 284    }
 285
 286    return false;
 287}
 288
 289static void ioq_init(LuringQueue *io_q)
 290{
 291    QSIMPLEQ_INIT(&io_q->submit_queue);
 292    io_q->plugged = 0;
 293    io_q->in_queue = 0;
 294    io_q->in_flight = 0;
 295    io_q->blocked = false;
 296}
 297
 298void luring_io_plug(BlockDriverState *bs, LuringState *s)
 299{
 300    trace_luring_io_plug(s);
 301    s->io_q.plugged++;
 302}
 303
 304void luring_io_unplug(BlockDriverState *bs, LuringState *s)
 305{
 306    assert(s->io_q.plugged);
 307    trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged,
 308                           s->io_q.in_queue, s->io_q.in_flight);
 309    if (--s->io_q.plugged == 0 &&
 310        !s->io_q.blocked && s->io_q.in_queue > 0) {
 311        ioq_submit(s);
 312    }
 313}
 314
 315/**
 316 * luring_do_submit:
 317 * @fd: file descriptor for I/O
 318 * @luringcb: AIO control block
 319 * @s: AIO state
 320 * @offset: offset for request
 321 * @type: type of request
 322 *
 323 * Fetches sqes from ring, adds to pending queue and preps them
 324 *
 325 */
 326static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
 327                            uint64_t offset, int type)
 328{
 329    int ret;
 330    struct io_uring_sqe *sqes = &luringcb->sqeq;
 331
 332    switch (type) {
 333    case QEMU_AIO_WRITE:
 334        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
 335                             luringcb->qiov->niov, offset);
 336        break;
 337    case QEMU_AIO_READ:
 338        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
 339                            luringcb->qiov->niov, offset);
 340        break;
 341    case QEMU_AIO_FLUSH:
 342        io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC);
 343        break;
 344    default:
 345        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
 346                        __func__, type);
 347        abort();
 348    }
 349    io_uring_sqe_set_data(sqes, luringcb);
 350
 351    QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
 352    s->io_q.in_queue++;
 353    trace_luring_do_submit(s, s->io_q.blocked, s->io_q.plugged,
 354                           s->io_q.in_queue, s->io_q.in_flight);
 355    if (!s->io_q.blocked &&
 356        (!s->io_q.plugged ||
 357         s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES)) {
 358        ret = ioq_submit(s);
 359        trace_luring_do_submit_done(s, ret);
 360        return ret;
 361    }
 362    return 0;
 363}
 364
 365int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
 366                                  uint64_t offset, QEMUIOVector *qiov, int type)
 367{
 368    int ret;
 369    LuringAIOCB luringcb = {
 370        .co         = qemu_coroutine_self(),
 371        .ret        = -EINPROGRESS,
 372        .qiov       = qiov,
 373        .is_read    = (type == QEMU_AIO_READ),
 374    };
 375    trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0,
 376                           type);
 377    ret = luring_do_submit(fd, &luringcb, s, offset, type);
 378
 379    if (ret < 0) {
 380        return ret;
 381    }
 382
 383    if (luringcb.ret == -EINPROGRESS) {
 384        qemu_coroutine_yield();
 385    }
 386    return luringcb.ret;
 387}
 388
 389void luring_detach_aio_context(LuringState *s, AioContext *old_context)
 390{
 391    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
 392                       s);
 393    qemu_bh_delete(s->completion_bh);
 394    s->aio_context = NULL;
 395}
 396
 397void luring_attach_aio_context(LuringState *s, AioContext *new_context)
 398{
 399    s->aio_context = new_context;
 400    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
 401    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
 402                       qemu_luring_completion_cb, NULL, qemu_luring_poll_cb, s);
 403}
 404
 405LuringState *luring_init(Error **errp)
 406{
 407    int rc;
 408    LuringState *s = g_new0(LuringState, 1);
 409    struct io_uring *ring = &s->ring;
 410
 411    trace_luring_init_state(s, sizeof(*s));
 412
 413    rc = io_uring_queue_init(MAX_ENTRIES, ring, 0);
 414    if (rc < 0) {
 415        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
 416        g_free(s);
 417        return NULL;
 418    }
 419
 420    ioq_init(&s->io_q);
 421    return s;
 422
 423}
 424
 425void luring_cleanup(LuringState *s)
 426{
 427    io_uring_queue_exit(&s->ring);
 428    trace_luring_cleanup_state(s);
 429    g_free(s);
 430}
 431