qemu/aio-posix.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include "qemu/osdep.h"
  17#include "qemu-common.h"
  18#include "block/block.h"
  19#include "qemu/queue.h"
  20#include "qemu/sockets.h"
  21#ifdef CONFIG_EPOLL_CREATE1
  22#include <sys/epoll.h>
  23#endif
  24
  25struct AioHandler
  26{
  27    GPollFD pfd;
  28    IOHandler *io_read;
  29    IOHandler *io_write;
  30    int deleted;
  31    void *opaque;
  32    bool is_external;
  33    QLIST_ENTRY(AioHandler) node;
  34};
  35
  36#ifdef CONFIG_EPOLL_CREATE1
  37
  38/* The fd number threashold to switch to epoll */
  39#define EPOLL_ENABLE_THRESHOLD 64
  40
  41static void aio_epoll_disable(AioContext *ctx)
  42{
  43    ctx->epoll_available = false;
  44    if (!ctx->epoll_enabled) {
  45        return;
  46    }
  47    ctx->epoll_enabled = false;
  48    close(ctx->epollfd);
  49}
  50
  51static inline int epoll_events_from_pfd(int pfd_events)
  52{
  53    return (pfd_events & G_IO_IN ? EPOLLIN : 0) |
  54           (pfd_events & G_IO_OUT ? EPOLLOUT : 0) |
  55           (pfd_events & G_IO_HUP ? EPOLLHUP : 0) |
  56           (pfd_events & G_IO_ERR ? EPOLLERR : 0);
  57}
  58
  59static bool aio_epoll_try_enable(AioContext *ctx)
  60{
  61    AioHandler *node;
  62    struct epoll_event event;
  63
  64    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
  65        int r;
  66        if (node->deleted || !node->pfd.events) {
  67            continue;
  68        }
  69        event.events = epoll_events_from_pfd(node->pfd.events);
  70        event.data.ptr = node;
  71        r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event);
  72        if (r) {
  73            return false;
  74        }
  75    }
  76    ctx->epoll_enabled = true;
  77    return true;
  78}
  79
  80static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
  81{
  82    struct epoll_event event;
  83    int r;
  84    int ctl;
  85
  86    if (!ctx->epoll_enabled) {
  87        return;
  88    }
  89    if (!node->pfd.events) {
  90        ctl = EPOLL_CTL_DEL;
  91    } else {
  92        event.data.ptr = node;
  93        event.events = epoll_events_from_pfd(node->pfd.events);
  94        ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
  95    }
  96
  97    r = epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event);
  98    if (r) {
  99        aio_epoll_disable(ctx);
 100    }
 101}
 102
 103static int aio_epoll(AioContext *ctx, GPollFD *pfds,
 104                     unsigned npfd, int64_t timeout)
 105{
 106    AioHandler *node;
 107    int i, ret = 0;
 108    struct epoll_event events[128];
 109
 110    assert(npfd == 1);
 111    assert(pfds[0].fd == ctx->epollfd);
 112    if (timeout > 0) {
 113        ret = qemu_poll_ns(pfds, npfd, timeout);
 114    }
 115    if (timeout <= 0 || ret > 0) {
 116        ret = epoll_wait(ctx->epollfd, events,
 117                         sizeof(events) / sizeof(events[0]),
 118                         timeout);
 119        if (ret <= 0) {
 120            goto out;
 121        }
 122        for (i = 0; i < ret; i++) {
 123            int ev = events[i].events;
 124            node = events[i].data.ptr;
 125            node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) |
 126                (ev & EPOLLOUT ? G_IO_OUT : 0) |
 127                (ev & EPOLLHUP ? G_IO_HUP : 0) |
 128                (ev & EPOLLERR ? G_IO_ERR : 0);
 129        }
 130    }
 131out:
 132    return ret;
 133}
 134
 135static bool aio_epoll_enabled(AioContext *ctx)
 136{
 137    /* Fall back to ppoll when external clients are disabled. */
 138    return !aio_external_disabled(ctx) && ctx->epoll_enabled;
 139}
 140
 141static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
 142                                 unsigned npfd, int64_t timeout)
 143{
 144    if (!ctx->epoll_available) {
 145        return false;
 146    }
 147    if (aio_epoll_enabled(ctx)) {
 148        return true;
 149    }
 150    if (npfd >= EPOLL_ENABLE_THRESHOLD) {
 151        if (aio_epoll_try_enable(ctx)) {
 152            return true;
 153        } else {
 154            aio_epoll_disable(ctx);
 155        }
 156    }
 157    return false;
 158}
 159
 160#else
 161
 162static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
 163{
 164}
 165
 166static int aio_epoll(AioContext *ctx, GPollFD *pfds,
 167                     unsigned npfd, int64_t timeout)
 168{
 169    assert(false);
 170}
 171
 172static bool aio_epoll_enabled(AioContext *ctx)
 173{
 174    return false;
 175}
 176
 177static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
 178                          unsigned npfd, int64_t timeout)
 179{
 180    return false;
 181}
 182
 183#endif
 184
 185static AioHandler *find_aio_handler(AioContext *ctx, int fd)
 186{
 187    AioHandler *node;
 188
 189    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 190        if (node->pfd.fd == fd)
 191            if (!node->deleted)
 192                return node;
 193    }
 194
 195    return NULL;
 196}
 197
 198void aio_set_fd_handler(AioContext *ctx,
 199                        int fd,
 200                        bool is_external,
 201                        IOHandler *io_read,
 202                        IOHandler *io_write,
 203                        void *opaque)
 204{
 205    AioHandler *node;
 206    bool is_new = false;
 207    bool deleted = false;
 208
 209    node = find_aio_handler(ctx, fd);
 210
 211    /* Are we deleting the fd handler? */
 212    if (!io_read && !io_write) {
 213        if (node == NULL) {
 214            return;
 215        }
 216
 217        g_source_remove_poll(&ctx->source, &node->pfd);
 218
 219        /* If the lock is held, just mark the node as deleted */
 220        if (ctx->walking_handlers) {
 221            node->deleted = 1;
 222            node->pfd.revents = 0;
 223        } else {
 224            /* Otherwise, delete it for real.  We can't just mark it as
 225             * deleted because deleted nodes are only cleaned up after
 226             * releasing the walking_handlers lock.
 227             */
 228            QLIST_REMOVE(node, node);
 229            deleted = true;
 230        }
 231    } else {
 232        if (node == NULL) {
 233            /* Alloc and insert if it's not already there */
 234            node = g_new0(AioHandler, 1);
 235            node->pfd.fd = fd;
 236            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
 237
 238            g_source_add_poll(&ctx->source, &node->pfd);
 239            is_new = true;
 240        }
 241        /* Update handler with latest information */
 242        node->io_read = io_read;
 243        node->io_write = io_write;
 244        node->opaque = opaque;
 245        node->is_external = is_external;
 246
 247        node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
 248        node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
 249    }
 250
 251    aio_epoll_update(ctx, node, is_new);
 252    aio_notify(ctx);
 253    if (deleted) {
 254        g_free(node);
 255    }
 256}
 257
 258void aio_set_event_notifier(AioContext *ctx,
 259                            EventNotifier *notifier,
 260                            bool is_external,
 261                            EventNotifierHandler *io_read)
 262{
 263    aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
 264                       is_external, (IOHandler *)io_read, NULL, notifier);
 265}
 266
 267bool aio_prepare(AioContext *ctx)
 268{
 269    return false;
 270}
 271
 272bool aio_pending(AioContext *ctx)
 273{
 274    AioHandler *node;
 275
 276    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 277        int revents;
 278
 279        revents = node->pfd.revents & node->pfd.events;
 280        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
 281            aio_node_check(ctx, node->is_external)) {
 282            return true;
 283        }
 284        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
 285            aio_node_check(ctx, node->is_external)) {
 286            return true;
 287        }
 288    }
 289
 290    return false;
 291}
 292
 293bool aio_dispatch(AioContext *ctx)
 294{
 295    AioHandler *node;
 296    bool progress = false;
 297
 298    /*
 299     * If there are callbacks left that have been queued, we need to call them.
 300     * Do not call select in this case, because it is possible that the caller
 301     * does not need a complete flush (as is the case for aio_poll loops).
 302     */
 303    if (aio_bh_poll(ctx)) {
 304        progress = true;
 305    }
 306
 307    /*
 308     * We have to walk very carefully in case aio_set_fd_handler is
 309     * called while we're walking.
 310     */
 311    node = QLIST_FIRST(&ctx->aio_handlers);
 312    while (node) {
 313        AioHandler *tmp;
 314        int revents;
 315
 316        ctx->walking_handlers++;
 317
 318        revents = node->pfd.revents & node->pfd.events;
 319        node->pfd.revents = 0;
 320
 321        if (!node->deleted &&
 322            (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
 323            aio_node_check(ctx, node->is_external) &&
 324            node->io_read) {
 325            node->io_read(node->opaque);
 326
 327            /* aio_notify() does not count as progress */
 328            if (node->opaque != &ctx->notifier) {
 329                progress = true;
 330            }
 331        }
 332        if (!node->deleted &&
 333            (revents & (G_IO_OUT | G_IO_ERR)) &&
 334            aio_node_check(ctx, node->is_external) &&
 335            node->io_write) {
 336            node->io_write(node->opaque);
 337            progress = true;
 338        }
 339
 340        tmp = node;
 341        node = QLIST_NEXT(node, node);
 342
 343        ctx->walking_handlers--;
 344
 345        if (!ctx->walking_handlers && tmp->deleted) {
 346            QLIST_REMOVE(tmp, node);
 347            g_free(tmp);
 348        }
 349    }
 350
 351    /* Run our timers */
 352    progress |= timerlistgroup_run_timers(&ctx->tlg);
 353
 354    return progress;
 355}
 356
 357/* These thread-local variables are used only in a small part of aio_poll
 358 * around the call to the poll() system call.  In particular they are not
 359 * used while aio_poll is performing callbacks, which makes it much easier
 360 * to think about reentrancy!
 361 *
 362 * Stack-allocated arrays would be perfect but they have size limitations;
 363 * heap allocation is expensive enough that we want to reuse arrays across
 364 * calls to aio_poll().  And because poll() has to be called without holding
 365 * any lock, the arrays cannot be stored in AioContext.  Thread-local data
 366 * has none of the disadvantages of these three options.
 367 */
 368static __thread GPollFD *pollfds;
 369static __thread AioHandler **nodes;
 370static __thread unsigned npfd, nalloc;
 371static __thread Notifier pollfds_cleanup_notifier;
 372
 373static void pollfds_cleanup(Notifier *n, void *unused)
 374{
 375    g_assert(npfd == 0);
 376    g_free(pollfds);
 377    g_free(nodes);
 378    nalloc = 0;
 379}
 380
 381static void add_pollfd(AioHandler *node)
 382{
 383    if (npfd == nalloc) {
 384        if (nalloc == 0) {
 385            pollfds_cleanup_notifier.notify = pollfds_cleanup;
 386            qemu_thread_atexit_add(&pollfds_cleanup_notifier);
 387            nalloc = 8;
 388        } else {
 389            g_assert(nalloc <= INT_MAX);
 390            nalloc *= 2;
 391        }
 392        pollfds = g_renew(GPollFD, pollfds, nalloc);
 393        nodes = g_renew(AioHandler *, nodes, nalloc);
 394    }
 395    nodes[npfd] = node;
 396    pollfds[npfd] = (GPollFD) {
 397        .fd = node->pfd.fd,
 398        .events = node->pfd.events,
 399    };
 400    npfd++;
 401}
 402
 403bool aio_poll(AioContext *ctx, bool blocking)
 404{
 405    AioHandler *node;
 406    int i, ret;
 407    bool progress;
 408    int64_t timeout;
 409
 410    aio_context_acquire(ctx);
 411    progress = false;
 412
 413    /* aio_notify can avoid the expensive event_notifier_set if
 414     * everything (file descriptors, bottom halves, timers) will
 415     * be re-evaluated before the next blocking poll().  This is
 416     * already true when aio_poll is called with blocking == false;
 417     * if blocking == true, it is only true after poll() returns,
 418     * so disable the optimization now.
 419     */
 420    if (blocking) {
 421        atomic_add(&ctx->notify_me, 2);
 422    }
 423
 424    ctx->walking_handlers++;
 425
 426    assert(npfd == 0);
 427
 428    /* fill pollfds */
 429
 430    if (!aio_epoll_enabled(ctx)) {
 431        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 432            if (!node->deleted && node->pfd.events
 433                && aio_node_check(ctx, node->is_external)) {
 434                add_pollfd(node);
 435            }
 436        }
 437    }
 438
 439    timeout = blocking ? aio_compute_timeout(ctx) : 0;
 440
 441    /* wait until next event */
 442    if (timeout) {
 443        aio_context_release(ctx);
 444    }
 445    if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
 446        AioHandler epoll_handler;
 447
 448        epoll_handler.pfd.fd = ctx->epollfd;
 449        epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR;
 450        npfd = 0;
 451        add_pollfd(&epoll_handler);
 452        ret = aio_epoll(ctx, pollfds, npfd, timeout);
 453    } else  {
 454        ret = qemu_poll_ns(pollfds, npfd, timeout);
 455    }
 456    if (blocking) {
 457        atomic_sub(&ctx->notify_me, 2);
 458    }
 459    if (timeout) {
 460        aio_context_acquire(ctx);
 461    }
 462
 463    aio_notify_accept(ctx);
 464
 465    /* if we have any readable fds, dispatch event */
 466    if (ret > 0) {
 467        for (i = 0; i < npfd; i++) {
 468            nodes[i]->pfd.revents = pollfds[i].revents;
 469        }
 470    }
 471
 472    npfd = 0;
 473    ctx->walking_handlers--;
 474
 475    /* Run dispatch even if there were no readable fds to run timers */
 476    if (aio_dispatch(ctx)) {
 477        progress = true;
 478    }
 479
 480    aio_context_release(ctx);
 481
 482    return progress;
 483}
 484
 485void aio_context_setup(AioContext *ctx)
 486{
 487#ifdef CONFIG_EPOLL_CREATE1
 488    assert(!ctx->epollfd);
 489    ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
 490    if (ctx->epollfd == -1) {
 491        fprintf(stderr, "Failed to create epoll instance: %s", strerror(errno));
 492        ctx->epoll_available = false;
 493    } else {
 494        ctx->epoll_available = true;
 495    }
 496#endif
 497}
 498