qemu/block/io_uring.c
<<
>>
Prefs
   1/*
   2 * Linux io_uring support.
   3 *
   4 * Copyright (C) 2009 IBM, Corp.
   5 * Copyright (C) 2009 Red Hat, Inc.
   6 * Copyright (C) 2019 Aarushi Mehta
   7 *
   8 * This work is licensed under the terms of the GNU GPL, version 2 or later.
   9 * See the COPYING file in the top-level directory.
  10 */
  11#include "qemu/osdep.h"
  12#include <liburing.h>
  13#include "qemu-common.h"
  14#include "block/aio.h"
  15#include "qemu/queue.h"
  16#include "block/block.h"
  17#include "block/raw-aio.h"
  18#include "qemu/coroutine.h"
  19#include "qapi/error.h"
  20#include "trace.h"
  21
  22/* io_uring ring size */
  23#define MAX_ENTRIES 128
  24
  25typedef struct LuringAIOCB {
  26    Coroutine *co;
  27    struct io_uring_sqe sqeq;
  28    ssize_t ret;
  29    QEMUIOVector *qiov;
  30    bool is_read;
  31    QSIMPLEQ_ENTRY(LuringAIOCB) next;
  32
  33    /*
  34     * Buffered reads may require resubmission, see
  35     * luring_resubmit_short_read().
  36     */
  37    int total_read;
  38    QEMUIOVector resubmit_qiov;
  39} LuringAIOCB;
  40
  41typedef struct LuringQueue {
  42    int plugged;
  43    unsigned int in_queue;
  44    unsigned int in_flight;
  45    bool blocked;
  46    QSIMPLEQ_HEAD(, LuringAIOCB) submit_queue;
  47} LuringQueue;
  48
  49typedef struct LuringState {
  50    AioContext *aio_context;
  51
  52    struct io_uring ring;
  53
  54    /* io queue for submit at batch.  Protected by AioContext lock. */
  55    LuringQueue io_q;
  56
  57    /* I/O completion processing.  Only runs in I/O thread.  */
  58    QEMUBH *completion_bh;
  59} LuringState;
  60
  61/**
  62 * luring_resubmit:
  63 *
  64 * Resubmit a request by appending it to submit_queue.  The caller must ensure
  65 * that ioq_submit() is called later so that submit_queue requests are started.
  66 */
  67static void luring_resubmit(LuringState *s, LuringAIOCB *luringcb)
  68{
  69    QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
  70    s->io_q.in_queue++;
  71}
  72
  73/**
  74 * luring_resubmit_short_read:
  75 *
  76 * Before Linux commit 9d93a3f5a0c ("io_uring: punt short reads to async
  77 * context") a buffered I/O request with the start of the file range in the
  78 * page cache could result in a short read.  Applications need to resubmit the
  79 * remaining read request.
  80 *
  81 * This is a slow path but recent kernels never take it.
  82 */
  83static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb,
  84                                       int nread)
  85{
  86    QEMUIOVector *resubmit_qiov;
  87    size_t remaining;
  88
  89    trace_luring_resubmit_short_read(s, luringcb, nread);
  90
  91    /* Update read position */
  92    luringcb->total_read = nread;
  93    remaining = luringcb->qiov->size - luringcb->total_read;
  94
  95    /* Shorten qiov */
  96    resubmit_qiov = &luringcb->resubmit_qiov;
  97    if (resubmit_qiov->iov == NULL) {
  98        qemu_iovec_init(resubmit_qiov, luringcb->qiov->niov);
  99    } else {
 100        qemu_iovec_reset(resubmit_qiov);
 101    }
 102    qemu_iovec_concat(resubmit_qiov, luringcb->qiov, luringcb->total_read,
 103                      remaining);
 104
 105    /* Update sqe */
 106    luringcb->sqeq.off = nread;
 107    luringcb->sqeq.addr = (__u64)(uintptr_t)luringcb->resubmit_qiov.iov;
 108    luringcb->sqeq.len = luringcb->resubmit_qiov.niov;
 109
 110    luring_resubmit(s, luringcb);
 111}
 112
 113/**
 114 * luring_process_completions:
 115 * @s: AIO state
 116 *
 117 * Fetches completed I/O requests, consumes cqes and invokes their callbacks
 118 * The function is somewhat tricky because it supports nested event loops, for
 119 * example when a request callback invokes aio_poll().
 120 *
 121 * Function schedules BH completion so it  can be called again in a nested
 122 * event loop.  When there are no events left  to complete the BH is being
 123 * canceled.
 124 *
 125 */
 126static void luring_process_completions(LuringState *s)
 127{
 128    struct io_uring_cqe *cqes;
 129    int total_bytes;
 130    /*
 131     * Request completion callbacks can run the nested event loop.
 132     * Schedule ourselves so the nested event loop will "see" remaining
 133     * completed requests and process them.  Without this, completion
 134     * callbacks that wait for other requests using a nested event loop
 135     * would hang forever.
 136     *
 137     * This workaround is needed because io_uring uses poll_wait, which
 138     * is woken up when new events are added to the uring, thus polling on
 139     * the same uring fd will block unless more events are received.
 140     *
 141     * Other leaf block drivers (drivers that access the data themselves)
 142     * are networking based, so they poll sockets for data and run the
 143     * correct coroutine.
 144     */
 145    qemu_bh_schedule(s->completion_bh);
 146
 147    while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {
 148        LuringAIOCB *luringcb;
 149        int ret;
 150
 151        if (!cqes) {
 152            break;
 153        }
 154
 155        luringcb = io_uring_cqe_get_data(cqes);
 156        ret = cqes->res;
 157        io_uring_cqe_seen(&s->ring, cqes);
 158        cqes = NULL;
 159
 160        /* Change counters one-by-one because we can be nested. */
 161        s->io_q.in_flight--;
 162        trace_luring_process_completion(s, luringcb, ret);
 163
 164        /* total_read is non-zero only for resubmitted read requests */
 165        total_bytes = ret + luringcb->total_read;
 166
 167        if (ret < 0) {
 168            /*
 169             * Only writev/readv/fsync requests on regular files or host block
 170             * devices are submitted. Therefore -EAGAIN is not expected but it's
 171             * known to happen sometimes with Linux SCSI. Submit again and hope
 172             * the request completes successfully.
 173             *
 174             * For more information, see:
 175             * https://lore.kernel.org/io-uring/20210727165811.284510-3-axboe@kernel.dk/T/#u
 176             *
 177             * If the code is changed to submit other types of requests in the
 178             * future, then this workaround may need to be extended to deal with
 179             * genuine -EAGAIN results that should not be resubmitted
 180             * immediately.
 181             */
 182            if (ret == -EINTR || ret == -EAGAIN) {
 183                luring_resubmit(s, luringcb);
 184                continue;
 185            }
 186        } else if (!luringcb->qiov) {
 187            goto end;
 188        } else if (total_bytes == luringcb->qiov->size) {
 189            ret = 0;
 190        /* Only read/write */
 191        } else {
 192            /* Short Read/Write */
 193            if (luringcb->is_read) {
 194                if (ret > 0) {
 195                    luring_resubmit_short_read(s, luringcb, ret);
 196                    continue;
 197                } else {
 198                    /* Pad with zeroes */
 199                    qemu_iovec_memset(luringcb->qiov, total_bytes, 0,
 200                                      luringcb->qiov->size - total_bytes);
 201                    ret = 0;
 202                }
 203            } else {
 204                ret = -ENOSPC;
 205            }
 206        }
 207end:
 208        luringcb->ret = ret;
 209        qemu_iovec_destroy(&luringcb->resubmit_qiov);
 210
 211        /*
 212         * If the coroutine is already entered it must be in ioq_submit()
 213         * and will notice luringcb->ret has been filled in when it
 214         * eventually runs later. Coroutines cannot be entered recursively
 215         * so avoid doing that!
 216         */
 217        if (!qemu_coroutine_entered(luringcb->co)) {
 218            aio_co_wake(luringcb->co);
 219        }
 220    }
 221    qemu_bh_cancel(s->completion_bh);
 222}
 223
 224static int ioq_submit(LuringState *s)
 225{
 226    int ret = 0;
 227    LuringAIOCB *luringcb, *luringcb_next;
 228
 229    while (s->io_q.in_queue > 0) {
 230        /*
 231         * Try to fetch sqes from the ring for requests waiting in
 232         * the overflow queue
 233         */
 234        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.submit_queue, next,
 235                              luringcb_next) {
 236            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
 237            if (!sqes) {
 238                break;
 239            }
 240            /* Prep sqe for submission */
 241            *sqes = luringcb->sqeq;
 242            QSIMPLEQ_REMOVE_HEAD(&s->io_q.submit_queue, next);
 243        }
 244        ret = io_uring_submit(&s->ring);
 245        trace_luring_io_uring_submit(s, ret);
 246        /* Prevent infinite loop if submission is refused */
 247        if (ret <= 0) {
 248            if (ret == -EAGAIN || ret == -EINTR) {
 249                continue;
 250            }
 251            break;
 252        }
 253        s->io_q.in_flight += ret;
 254        s->io_q.in_queue  -= ret;
 255    }
 256    s->io_q.blocked = (s->io_q.in_queue > 0);
 257
 258    if (s->io_q.in_flight) {
 259        /*
 260         * We can try to complete something just right away if there are
 261         * still requests in-flight.
 262         */
 263        luring_process_completions(s);
 264    }
 265    return ret;
 266}
 267
 268static void luring_process_completions_and_submit(LuringState *s)
 269{
 270    aio_context_acquire(s->aio_context);
 271    luring_process_completions(s);
 272
 273    if (!s->io_q.plugged && s->io_q.in_queue > 0) {
 274        ioq_submit(s);
 275    }
 276    aio_context_release(s->aio_context);
 277}
 278
 279static void qemu_luring_completion_bh(void *opaque)
 280{
 281    LuringState *s = opaque;
 282    luring_process_completions_and_submit(s);
 283}
 284
 285static void qemu_luring_completion_cb(void *opaque)
 286{
 287    LuringState *s = opaque;
 288    luring_process_completions_and_submit(s);
 289}
 290
 291static bool qemu_luring_poll_cb(void *opaque)
 292{
 293    LuringState *s = opaque;
 294
 295    if (io_uring_cq_ready(&s->ring)) {
 296        luring_process_completions_and_submit(s);
 297        return true;
 298    }
 299
 300    return false;
 301}
 302
 303static void ioq_init(LuringQueue *io_q)
 304{
 305    QSIMPLEQ_INIT(&io_q->submit_queue);
 306    io_q->plugged = 0;
 307    io_q->in_queue = 0;
 308    io_q->in_flight = 0;
 309    io_q->blocked = false;
 310}
 311
 312void luring_io_plug(BlockDriverState *bs, LuringState *s)
 313{
 314    trace_luring_io_plug(s);
 315    s->io_q.plugged++;
 316}
 317
 318void luring_io_unplug(BlockDriverState *bs, LuringState *s)
 319{
 320    assert(s->io_q.plugged);
 321    trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged,
 322                           s->io_q.in_queue, s->io_q.in_flight);
 323    if (--s->io_q.plugged == 0 &&
 324        !s->io_q.blocked && s->io_q.in_queue > 0) {
 325        ioq_submit(s);
 326    }
 327}
 328
 329/**
 330 * luring_do_submit:
 331 * @fd: file descriptor for I/O
 332 * @luringcb: AIO control block
 333 * @s: AIO state
 334 * @offset: offset for request
 335 * @type: type of request
 336 *
 337 * Fetches sqes from ring, adds to pending queue and preps them
 338 *
 339 */
 340static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
 341                            uint64_t offset, int type)
 342{
 343    int ret;
 344    struct io_uring_sqe *sqes = &luringcb->sqeq;
 345
 346    switch (type) {
 347    case QEMU_AIO_WRITE:
 348        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
 349                             luringcb->qiov->niov, offset);
 350        break;
 351    case QEMU_AIO_READ:
 352        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
 353                            luringcb->qiov->niov, offset);
 354        break;
 355    case QEMU_AIO_FLUSH:
 356        io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC);
 357        break;
 358    default:
 359        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
 360                        __func__, type);
 361        abort();
 362    }
 363    io_uring_sqe_set_data(sqes, luringcb);
 364
 365    QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
 366    s->io_q.in_queue++;
 367    trace_luring_do_submit(s, s->io_q.blocked, s->io_q.plugged,
 368                           s->io_q.in_queue, s->io_q.in_flight);
 369    if (!s->io_q.blocked &&
 370        (!s->io_q.plugged ||
 371         s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES)) {
 372        ret = ioq_submit(s);
 373        trace_luring_do_submit_done(s, ret);
 374        return ret;
 375    }
 376    return 0;
 377}
 378
 379int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
 380                                  uint64_t offset, QEMUIOVector *qiov, int type)
 381{
 382    int ret;
 383    LuringAIOCB luringcb = {
 384        .co         = qemu_coroutine_self(),
 385        .ret        = -EINPROGRESS,
 386        .qiov       = qiov,
 387        .is_read    = (type == QEMU_AIO_READ),
 388    };
 389    trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0,
 390                           type);
 391    ret = luring_do_submit(fd, &luringcb, s, offset, type);
 392
 393    if (ret < 0) {
 394        return ret;
 395    }
 396
 397    if (luringcb.ret == -EINPROGRESS) {
 398        qemu_coroutine_yield();
 399    }
 400    return luringcb.ret;
 401}
 402
 403void luring_detach_aio_context(LuringState *s, AioContext *old_context)
 404{
 405    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
 406                       s);
 407    qemu_bh_delete(s->completion_bh);
 408    s->aio_context = NULL;
 409}
 410
 411void luring_attach_aio_context(LuringState *s, AioContext *new_context)
 412{
 413    s->aio_context = new_context;
 414    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
 415    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
 416                       qemu_luring_completion_cb, NULL, qemu_luring_poll_cb, s);
 417}
 418
 419LuringState *luring_init(Error **errp)
 420{
 421    int rc;
 422    LuringState *s = g_new0(LuringState, 1);
 423    struct io_uring *ring = &s->ring;
 424
 425    trace_luring_init_state(s, sizeof(*s));
 426
 427    rc = io_uring_queue_init(MAX_ENTRIES, ring, 0);
 428    if (rc < 0) {
 429        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
 430        g_free(s);
 431        return NULL;
 432    }
 433
 434    ioq_init(&s->io_q);
 435    return s;
 436
 437}
 438
 439void luring_cleanup(LuringState *s)
 440{
 441    io_uring_queue_exit(&s->ring);
 442    trace_luring_cleanup_state(s);
 443    g_free(s);
 444}
 445