qemu/util/fdmon-io_uring.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: GPL-2.0-or-later */
   2/*
   3 * Linux io_uring file descriptor monitoring
   4 *
   5 * The Linux io_uring API supports file descriptor monitoring with a few
   6 * advantages over existing APIs like poll(2) and epoll(7):
   7 *
   8 * 1. Userspace polling of events is possible because the completion queue (cq
   9 *    ring) is shared between the kernel and userspace.  This allows
  10 *    applications that rely on userspace polling to also monitor file
  11 *    descriptors in the same userspace polling loop.
  12 *
  13 * 2. Submission and completion is batched and done together in a single system
  14 *    call.  This minimizes the number of system calls.
  15 *
  16 * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than
  17 *    poll(2).
  18 *
  19 * 4. Nanosecond timeouts are supported so it requires fewer syscalls than
  20 *    epoll(7).
  21 *
  22 * This code only monitors file descriptors and does not do asynchronous disk
  23 * I/O.  Implementing disk I/O efficiently has other requirements and should
  24 * use a separate io_uring so it does not make sense to unify the code.
  25 *
  26 * File descriptor monitoring is implemented using the following operations:
  27 *
  28 * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored.
  29 * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored.  When
  30 *    the poll mask changes for a file descriptor it is first removed and then
  31 *    re-added with the new poll mask, so this operation is also used as part
  32 *    of modifying an existing monitored file descriptor.
  33 * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait
  34 *    for events.  This operation self-cancels if another event completes
  35 *    before the timeout.
  36 *
  37 * io_uring calls the submission queue the "sq ring" and the completion queue
  38 * the "cq ring".  Ring entries are called "sqe" and "cqe", respectively.
  39 *
  40 * The code is structured so that sq/cq rings are only modified within
  41 * fdmon_io_uring_wait().  Changes to AioHandlers are made by enqueuing them on
  42 * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD
  43 * and/or IORING_OP_POLL_REMOVE sqes for them.
  44 */
  45
  46#include "qemu/osdep.h"
  47#include <poll.h>
  48#include "qemu/rcu_queue.h"
  49#include "aio-posix.h"
  50
  51enum {
  52    FDMON_IO_URING_ENTRIES  = 128, /* sq/cq ring size */
  53
  54    /* AioHandler::flags */
  55    FDMON_IO_URING_PENDING  = (1 << 0),
  56    FDMON_IO_URING_ADD      = (1 << 1),
  57    FDMON_IO_URING_REMOVE   = (1 << 2),
  58};
  59
  60static inline int poll_events_from_pfd(int pfd_events)
  61{
  62    return (pfd_events & G_IO_IN ? POLLIN : 0) |
  63           (pfd_events & G_IO_OUT ? POLLOUT : 0) |
  64           (pfd_events & G_IO_HUP ? POLLHUP : 0) |
  65           (pfd_events & G_IO_ERR ? POLLERR : 0);
  66}
  67
  68static inline int pfd_events_from_poll(int poll_events)
  69{
  70    return (poll_events & POLLIN ? G_IO_IN : 0) |
  71           (poll_events & POLLOUT ? G_IO_OUT : 0) |
  72           (poll_events & POLLHUP ? G_IO_HUP : 0) |
  73           (poll_events & POLLERR ? G_IO_ERR : 0);
  74}
  75
  76/*
  77 * Returns an sqe for submitting a request.  Only be called within
  78 * fdmon_io_uring_wait().
  79 */
  80static struct io_uring_sqe *get_sqe(AioContext *ctx)
  81{
  82    struct io_uring *ring = &ctx->fdmon_io_uring;
  83    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  84    int ret;
  85
  86    if (likely(sqe)) {
  87        return sqe;
  88    }
  89
  90    /* No free sqes left, submit pending sqes first */
  91    do {
  92        ret = io_uring_submit(ring);
  93    } while (ret == -EINTR);
  94
  95    assert(ret > 1);
  96    sqe = io_uring_get_sqe(ring);
  97    assert(sqe);
  98    return sqe;
  99}
 100
 101/* Atomically enqueue an AioHandler for sq ring submission */
 102static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags)
 103{
 104    unsigned old_flags;
 105
 106    old_flags = qatomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags);
 107    if (!(old_flags & FDMON_IO_URING_PENDING)) {
 108        QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted);
 109    }
 110}
 111
 112/* Dequeue an AioHandler for sq ring submission.  Called by fill_sq_ring(). */
 113static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags)
 114{
 115    AioHandler *node = QSLIST_FIRST(head);
 116
 117    if (!node) {
 118        return NULL;
 119    }
 120
 121    /* Doesn't need to be atomic since fill_sq_ring() moves the list */
 122    QSLIST_REMOVE_HEAD(head, node_submitted);
 123
 124    /*
 125     * Don't clear FDMON_IO_URING_REMOVE.  It's sticky so it can serve two
 126     * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and
 127     * telling process_cqe() to delete the AioHandler when its
 128     * IORING_OP_POLL_ADD completes.
 129     */
 130    *flags = qatomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING |
 131                                              FDMON_IO_URING_ADD));
 132    return node;
 133}
 134
 135static void fdmon_io_uring_update(AioContext *ctx,
 136                                  AioHandler *old_node,
 137                                  AioHandler *new_node)
 138{
 139    if (new_node) {
 140        enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD);
 141    }
 142
 143    if (old_node) {
 144        /*
 145         * Deletion is tricky because IORING_OP_POLL_ADD and
 146         * IORING_OP_POLL_REMOVE are async.  We need to wait for the original
 147         * IORING_OP_POLL_ADD to complete before this handler can be freed
 148         * safely.
 149         *
 150         * It's possible that the file descriptor becomes ready and the
 151         * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is
 152         * submitted, too.
 153         *
 154         * Mark this handler deleted right now but don't place it on
 155         * ctx->deleted_aio_handlers yet.  Instead, manually fudge the list
 156         * entry to make QLIST_IS_INSERTED() think this handler has been
 157         * inserted and other code recognizes this AioHandler as deleted.
 158         *
 159         * Once the original IORING_OP_POLL_ADD completes we enqueue the
 160         * handler on the real ctx->deleted_aio_handlers list to be freed.
 161         */
 162        assert(!QLIST_IS_INSERTED(old_node, node_deleted));
 163        old_node->node_deleted.le_prev = &old_node->node_deleted.le_next;
 164
 165        enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE);
 166    }
 167}
 168
 169static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
 170{
 171    struct io_uring_sqe *sqe = get_sqe(ctx);
 172    int events = poll_events_from_pfd(node->pfd.events);
 173
 174    io_uring_prep_poll_add(sqe, node->pfd.fd, events);
 175    io_uring_sqe_set_data(sqe, node);
 176}
 177
 178static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
 179{
 180    struct io_uring_sqe *sqe = get_sqe(ctx);
 181
 182    io_uring_prep_poll_remove(sqe, node);
 183}
 184
 185/* Add a timeout that self-cancels when another cqe becomes ready */
 186static void add_timeout_sqe(AioContext *ctx, int64_t ns)
 187{
 188    struct io_uring_sqe *sqe;
 189    struct __kernel_timespec ts = {
 190        .tv_sec = ns / NANOSECONDS_PER_SECOND,
 191        .tv_nsec = ns % NANOSECONDS_PER_SECOND,
 192    };
 193
 194    sqe = get_sqe(ctx);
 195    io_uring_prep_timeout(sqe, &ts, 1, 0);
 196}
 197
 198/* Add sqes from ctx->submit_list for submission */
 199static void fill_sq_ring(AioContext *ctx)
 200{
 201    AioHandlerSList submit_list;
 202    AioHandler *node;
 203    unsigned flags;
 204
 205    QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list);
 206
 207    while ((node = dequeue(&submit_list, &flags))) {
 208        /* Order matters, just in case both flags were set */
 209        if (flags & FDMON_IO_URING_ADD) {
 210            add_poll_add_sqe(ctx, node);
 211        }
 212        if (flags & FDMON_IO_URING_REMOVE) {
 213            add_poll_remove_sqe(ctx, node);
 214        }
 215    }
 216}
 217
 218/* Returns true if a handler became ready */
 219static bool process_cqe(AioContext *ctx,
 220                        AioHandlerList *ready_list,
 221                        struct io_uring_cqe *cqe)
 222{
 223    AioHandler *node = io_uring_cqe_get_data(cqe);
 224    unsigned flags;
 225
 226    /* poll_timeout and poll_remove have a zero user_data field */
 227    if (!node) {
 228        return false;
 229    }
 230
 231    /*
 232     * Deletion can only happen when IORING_OP_POLL_ADD completes.  If we race
 233     * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
 234     * bit before IORING_OP_POLL_REMOVE is submitted.
 235     */
 236    flags = qatomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE);
 237    if (flags & FDMON_IO_URING_REMOVE) {
 238        QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
 239        return false;
 240    }
 241
 242    aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res));
 243
 244    /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */
 245    add_poll_add_sqe(ctx, node);
 246    return true;
 247}
 248
 249static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
 250{
 251    struct io_uring *ring = &ctx->fdmon_io_uring;
 252    struct io_uring_cqe *cqe;
 253    unsigned num_cqes = 0;
 254    unsigned num_ready = 0;
 255    unsigned head;
 256
 257    io_uring_for_each_cqe(ring, head, cqe) {
 258        if (process_cqe(ctx, ready_list, cqe)) {
 259            num_ready++;
 260        }
 261
 262        num_cqes++;
 263    }
 264
 265    io_uring_cq_advance(ring, num_cqes);
 266    return num_ready;
 267}
 268
 269static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,
 270                               int64_t timeout)
 271{
 272    unsigned wait_nr = 1; /* block until at least one cqe is ready */
 273    int ret;
 274
 275    /* Fall back while external clients are disabled */
 276    if (qatomic_read(&ctx->external_disable_cnt)) {
 277        return fdmon_poll_ops.wait(ctx, ready_list, timeout);
 278    }
 279
 280    if (timeout == 0) {
 281        wait_nr = 0; /* non-blocking */
 282    } else if (timeout > 0) {
 283        add_timeout_sqe(ctx, timeout);
 284    }
 285
 286    fill_sq_ring(ctx);
 287
 288    do {
 289        ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
 290    } while (ret == -EINTR);
 291
 292    assert(ret >= 0);
 293
 294    return process_cq_ring(ctx, ready_list);
 295}
 296
 297static bool fdmon_io_uring_need_wait(AioContext *ctx)
 298{
 299    /* Have io_uring events completed? */
 300    if (io_uring_cq_ready(&ctx->fdmon_io_uring)) {
 301        return true;
 302    }
 303
 304    /* Are there pending sqes to submit? */
 305    if (io_uring_sq_ready(&ctx->fdmon_io_uring)) {
 306        return true;
 307    }
 308
 309    /* Do we need to process AioHandlers for io_uring changes? */
 310    if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) {
 311        return true;
 312    }
 313
 314    /* Are we falling back to fdmon-poll? */
 315    return qatomic_read(&ctx->external_disable_cnt);
 316}
 317
 318static const FDMonOps fdmon_io_uring_ops = {
 319    .update = fdmon_io_uring_update,
 320    .wait = fdmon_io_uring_wait,
 321    .need_wait = fdmon_io_uring_need_wait,
 322};
 323
 324bool fdmon_io_uring_setup(AioContext *ctx)
 325{
 326    int ret;
 327
 328    ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);
 329    if (ret != 0) {
 330        return false;
 331    }
 332
 333    QSLIST_INIT(&ctx->submit_list);
 334    ctx->fdmon_ops = &fdmon_io_uring_ops;
 335    return true;
 336}
 337
 338void fdmon_io_uring_destroy(AioContext *ctx)
 339{
 340    if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
 341        AioHandler *node;
 342
 343        io_uring_queue_exit(&ctx->fdmon_io_uring);
 344
 345        /* Move handlers due to be removed onto the deleted list */
 346        while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
 347            unsigned flags = qatomic_fetch_and(&node->flags,
 348                    ~(FDMON_IO_URING_PENDING |
 349                      FDMON_IO_URING_ADD |
 350                      FDMON_IO_URING_REMOVE));
 351
 352            if (flags & FDMON_IO_URING_REMOVE) {
 353                QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
 354            }
 355
 356            QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
 357        }
 358
 359        ctx->fdmon_ops = &fdmon_poll_ops;
 360    }
 361}
 362