qemu/aio-posix.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include "qemu-common.h"
  17#include "block/block.h"
  18#include "qemu/queue.h"
  19#include "qemu/sockets.h"
  20#ifdef CONFIG_EPOLL
  21#include <sys/epoll.h>
  22#endif
  23
  24struct AioHandler
  25{
  26    GPollFD pfd;
  27    IOHandler *io_read;
  28    IOHandler *io_write;
  29    int deleted;
  30    void *opaque;
  31    bool is_external;
  32    QLIST_ENTRY(AioHandler) node;
  33};
  34
  35#ifdef CONFIG_EPOLL
  36
  37/* The fd number threashold to switch to epoll */
  38#define EPOLL_ENABLE_THRESHOLD 64
  39
  40static void aio_epoll_disable(AioContext *ctx)
  41{
  42    ctx->epoll_available = false;
  43    if (!ctx->epoll_enabled) {
  44        return;
  45    }
  46    ctx->epoll_enabled = false;
  47    close(ctx->epollfd);
  48}
  49
  50static inline int epoll_events_from_pfd(int pfd_events)
  51{
  52    return (pfd_events & G_IO_IN ? EPOLLIN : 0) |
  53           (pfd_events & G_IO_OUT ? EPOLLOUT : 0) |
  54           (pfd_events & G_IO_HUP ? EPOLLHUP : 0) |
  55           (pfd_events & G_IO_ERR ? EPOLLERR : 0);
  56}
  57
  58static bool aio_epoll_try_enable(AioContext *ctx)
  59{
  60    AioHandler *node;
  61    struct epoll_event event;
  62
  63    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
  64        int r;
  65        if (node->deleted || !node->pfd.events) {
  66            continue;
  67        }
  68        event.events = epoll_events_from_pfd(node->pfd.events);
  69        event.data.ptr = node;
  70        r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event);
  71        if (r) {
  72            return false;
  73        }
  74    }
  75    ctx->epoll_enabled = true;
  76    return true;
  77}
  78
  79static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
  80{
  81    struct epoll_event event;
  82    int r;
  83
  84    if (!ctx->epoll_enabled) {
  85        return;
  86    }
  87    if (!node->pfd.events) {
  88        r = epoll_ctl(ctx->epollfd, EPOLL_CTL_DEL, node->pfd.fd, &event);
  89        if (r) {
  90            aio_epoll_disable(ctx);
  91        }
  92    } else {
  93        event.data.ptr = node;
  94        event.events = epoll_events_from_pfd(node->pfd.events);
  95        if (is_new) {
  96            r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event);
  97            if (r) {
  98                aio_epoll_disable(ctx);
  99            }
 100        } else {
 101            r = epoll_ctl(ctx->epollfd, EPOLL_CTL_MOD, node->pfd.fd, &event);
 102            if (r) {
 103                aio_epoll_disable(ctx);
 104            }
 105        }
 106    }
 107}
 108
 109static int aio_epoll(AioContext *ctx, GPollFD *pfds,
 110                     unsigned npfd, int64_t timeout)
 111{
 112    AioHandler *node;
 113    int i, ret = 0;
 114    struct epoll_event events[128];
 115
 116    assert(npfd == 1);
 117    assert(pfds[0].fd == ctx->epollfd);
 118    if (timeout > 0) {
 119        ret = qemu_poll_ns(pfds, npfd, timeout);
 120    }
 121    if (timeout <= 0 || ret > 0) {
 122        ret = epoll_wait(ctx->epollfd, events,
 123                         sizeof(events) / sizeof(events[0]),
 124                         timeout);
 125        if (ret <= 0) {
 126            goto out;
 127        }
 128        for (i = 0; i < ret; i++) {
 129            int ev = events[i].events;
 130            node = events[i].data.ptr;
 131            node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) |
 132                (ev & EPOLLOUT ? G_IO_OUT : 0) |
 133                (ev & EPOLLHUP ? G_IO_HUP : 0) |
 134                (ev & EPOLLERR ? G_IO_ERR : 0);
 135        }
 136    }
 137out:
 138    return ret;
 139}
 140
 141static bool aio_epoll_enabled(AioContext *ctx)
 142{
 143    /* Fall back to ppoll when external clients are disabled. */
 144    return !aio_external_disabled(ctx) && ctx->epoll_enabled;
 145}
 146
 147static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
 148                                 unsigned npfd, int64_t timeout)
 149{
 150    if (!ctx->epoll_available) {
 151        return false;
 152    }
 153    if (aio_epoll_enabled(ctx)) {
 154        return true;
 155    }
 156    if (npfd >= EPOLL_ENABLE_THRESHOLD) {
 157        if (aio_epoll_try_enable(ctx)) {
 158            return true;
 159        } else {
 160            aio_epoll_disable(ctx);
 161        }
 162    }
 163    return false;
 164}
 165
 166#else
 167
 168static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
 169{
 170}
 171
 172static int aio_epoll(AioContext *ctx, GPollFD *pfds,
 173                     unsigned npfd, int64_t timeout)
 174{
 175    assert(false);
 176}
 177
 178static bool aio_epoll_enabled(AioContext *ctx)
 179{
 180    return false;
 181}
 182
 183static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
 184                          unsigned npfd, int64_t timeout)
 185{
 186    return false;
 187}
 188
 189#endif
 190
 191static AioHandler *find_aio_handler(AioContext *ctx, int fd)
 192{
 193    AioHandler *node;
 194
 195    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 196        if (node->pfd.fd == fd)
 197            if (!node->deleted)
 198                return node;
 199    }
 200
 201    return NULL;
 202}
 203
 204void aio_set_fd_handler(AioContext *ctx,
 205                        int fd,
 206                        bool is_external,
 207                        IOHandler *io_read,
 208                        IOHandler *io_write,
 209                        void *opaque)
 210{
 211    AioHandler *node;
 212    bool is_new = false;
 213    bool deleted = false;
 214
 215    node = find_aio_handler(ctx, fd);
 216
 217    /* Are we deleting the fd handler? */
 218    if (!io_read && !io_write) {
 219        if (node) {
 220            g_source_remove_poll(&ctx->source, &node->pfd);
 221
 222            /* If the lock is held, just mark the node as deleted */
 223            if (ctx->walking_handlers) {
 224                node->deleted = 1;
 225                node->pfd.revents = 0;
 226            } else {
 227                /* Otherwise, delete it for real.  We can't just mark it as
 228                 * deleted because deleted nodes are only cleaned up after
 229                 * releasing the walking_handlers lock.
 230                 */
 231                QLIST_REMOVE(node, node);
 232                deleted = true;
 233            }
 234        }
 235    } else {
 236        if (node == NULL) {
 237            /* Alloc and insert if it's not already there */
 238            node = g_new0(AioHandler, 1);
 239            node->pfd.fd = fd;
 240            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
 241
 242            g_source_add_poll(&ctx->source, &node->pfd);
 243            is_new = true;
 244        }
 245        /* Update handler with latest information */
 246        node->io_read = io_read;
 247        node->io_write = io_write;
 248        node->opaque = opaque;
 249        node->is_external = is_external;
 250
 251        node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
 252        node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
 253    }
 254
 255    aio_epoll_update(ctx, node, is_new);
 256    aio_notify(ctx);
 257    if (deleted) {
 258        g_free(node);
 259    }
 260}
 261
 262void aio_set_event_notifier(AioContext *ctx,
 263                            EventNotifier *notifier,
 264                            bool is_external,
 265                            EventNotifierHandler *io_read)
 266{
 267    aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
 268                       is_external, (IOHandler *)io_read, NULL, notifier);
 269}
 270
 271bool aio_prepare(AioContext *ctx)
 272{
 273    return false;
 274}
 275
 276bool aio_pending(AioContext *ctx)
 277{
 278    AioHandler *node;
 279
 280    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 281        int revents;
 282
 283        revents = node->pfd.revents & node->pfd.events;
 284        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
 285            return true;
 286        }
 287        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
 288            return true;
 289        }
 290    }
 291
 292    return false;
 293}
 294
 295bool aio_dispatch(AioContext *ctx)
 296{
 297    AioHandler *node;
 298    bool progress = false;
 299
 300    /*
 301     * If there are callbacks left that have been queued, we need to call them.
 302     * Do not call select in this case, because it is possible that the caller
 303     * does not need a complete flush (as is the case for aio_poll loops).
 304     */
 305    if (aio_bh_poll(ctx)) {
 306        progress = true;
 307    }
 308
 309    /*
 310     * We have to walk very carefully in case aio_set_fd_handler is
 311     * called while we're walking.
 312     */
 313    node = QLIST_FIRST(&ctx->aio_handlers);
 314    while (node) {
 315        AioHandler *tmp;
 316        int revents;
 317
 318        ctx->walking_handlers++;
 319
 320        revents = node->pfd.revents & node->pfd.events;
 321        node->pfd.revents = 0;
 322
 323        if (!node->deleted &&
 324            (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
 325            node->io_read) {
 326            node->io_read(node->opaque);
 327
 328            /* aio_notify() does not count as progress */
 329            if (node->opaque != &ctx->notifier) {
 330                progress = true;
 331            }
 332        }
 333        if (!node->deleted &&
 334            (revents & (G_IO_OUT | G_IO_ERR)) &&
 335            node->io_write) {
 336            node->io_write(node->opaque);
 337            progress = true;
 338        }
 339
 340        tmp = node;
 341        node = QLIST_NEXT(node, node);
 342
 343        ctx->walking_handlers--;
 344
 345        if (!ctx->walking_handlers && tmp->deleted) {
 346            QLIST_REMOVE(tmp, node);
 347            g_free(tmp);
 348        }
 349    }
 350
 351    /* Run our timers */
 352    progress |= timerlistgroup_run_timers(&ctx->tlg);
 353
 354    return progress;
 355}
 356
 357/* These thread-local variables are used only in a small part of aio_poll
 358 * around the call to the poll() system call.  In particular they are not
 359 * used while aio_poll is performing callbacks, which makes it much easier
 360 * to think about reentrancy!
 361 *
 362 * Stack-allocated arrays would be perfect but they have size limitations;
 363 * heap allocation is expensive enough that we want to reuse arrays across
 364 * calls to aio_poll().  And because poll() has to be called without holding
 365 * any lock, the arrays cannot be stored in AioContext.  Thread-local data
 366 * has none of the disadvantages of these three options.
 367 */
 368static __thread GPollFD *pollfds;
 369static __thread AioHandler **nodes;
 370static __thread unsigned npfd, nalloc;
 371static __thread Notifier pollfds_cleanup_notifier;
 372
 373static void pollfds_cleanup(Notifier *n, void *unused)
 374{
 375    g_assert(npfd == 0);
 376    g_free(pollfds);
 377    g_free(nodes);
 378    nalloc = 0;
 379}
 380
 381static void add_pollfd(AioHandler *node)
 382{
 383    if (npfd == nalloc) {
 384        if (nalloc == 0) {
 385            pollfds_cleanup_notifier.notify = pollfds_cleanup;
 386            qemu_thread_atexit_add(&pollfds_cleanup_notifier);
 387            nalloc = 8;
 388        } else {
 389            g_assert(nalloc <= INT_MAX);
 390            nalloc *= 2;
 391        }
 392        pollfds = g_renew(GPollFD, pollfds, nalloc);
 393        nodes = g_renew(AioHandler *, nodes, nalloc);
 394    }
 395    nodes[npfd] = node;
 396    pollfds[npfd] = (GPollFD) {
 397        .fd = node->pfd.fd,
 398        .events = node->pfd.events,
 399    };
 400    npfd++;
 401}
 402
 403bool aio_poll(AioContext *ctx, bool blocking)
 404{
 405    AioHandler *node;
 406    int i, ret;
 407    bool progress;
 408    int64_t timeout;
 409
 410    aio_context_acquire(ctx);
 411    progress = false;
 412
 413    /* aio_notify can avoid the expensive event_notifier_set if
 414     * everything (file descriptors, bottom halves, timers) will
 415     * be re-evaluated before the next blocking poll().  This is
 416     * already true when aio_poll is called with blocking == false;
 417     * if blocking == true, it is only true after poll() returns,
 418     * so disable the optimization now.
 419     */
 420    if (blocking) {
 421        atomic_add(&ctx->notify_me, 2);
 422    }
 423
 424    ctx->walking_handlers++;
 425
 426    assert(npfd == 0);
 427
 428    /* fill pollfds */
 429    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 430        if (!node->deleted && node->pfd.events
 431            && !aio_epoll_enabled(ctx)
 432            && aio_node_check(ctx, node->is_external)) {
 433            add_pollfd(node);
 434        }
 435    }
 436
 437    timeout = blocking ? aio_compute_timeout(ctx) : 0;
 438
 439    /* wait until next event */
 440    if (timeout) {
 441        aio_context_release(ctx);
 442    }
 443    if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
 444        AioHandler epoll_handler;
 445
 446        epoll_handler.pfd.fd = ctx->epollfd;
 447        epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR;
 448        npfd = 0;
 449        add_pollfd(&epoll_handler);
 450        ret = aio_epoll(ctx, pollfds, npfd, timeout);
 451    } else  {
 452        ret = qemu_poll_ns(pollfds, npfd, timeout);
 453    }
 454    if (blocking) {
 455        atomic_sub(&ctx->notify_me, 2);
 456    }
 457    if (timeout) {
 458        aio_context_acquire(ctx);
 459    }
 460
 461    aio_notify_accept(ctx);
 462
 463    /* if we have any readable fds, dispatch event */
 464    if (ret > 0) {
 465        for (i = 0; i < npfd; i++) {
 466            nodes[i]->pfd.revents = pollfds[i].revents;
 467        }
 468    }
 469
 470    npfd = 0;
 471    ctx->walking_handlers--;
 472
 473    /* Run dispatch even if there were no readable fds to run timers */
 474    if (aio_dispatch(ctx)) {
 475        progress = true;
 476    }
 477
 478    aio_context_release(ctx);
 479
 480    return progress;
 481}
 482
 483void aio_context_setup(AioContext *ctx, Error **errp)
 484{
 485#ifdef CONFIG_EPOLL
 486    assert(!ctx->epollfd);
 487    ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
 488    if (ctx->epollfd == -1) {
 489        ctx->epoll_available = false;
 490    } else {
 491        ctx->epoll_available = true;
 492    }
 493#endif
 494}
 495