qemu/aio-posix.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include "qemu/osdep.h"
  17#include "qemu-common.h"
  18#include "block/block.h"
  19#include "qemu/queue.h"
  20#include "qemu/sockets.h"
  21#ifdef CONFIG_EPOLL_CREATE1
  22#include <sys/epoll.h>
  23#endif
  24
  25struct AioHandler
  26{
  27    GPollFD pfd;
  28    IOHandler *io_read;
  29    IOHandler *io_write;
  30    int deleted;
  31    void *opaque;
  32    bool is_external;
  33    QLIST_ENTRY(AioHandler) node;
  34};
  35
  36#ifdef CONFIG_EPOLL_CREATE1
  37
  38/* The fd number threashold to switch to epoll */
  39#define EPOLL_ENABLE_THRESHOLD 64
  40
  41static void aio_epoll_disable(AioContext *ctx)
  42{
  43    ctx->epoll_available = false;
  44    if (!ctx->epoll_enabled) {
  45        return;
  46    }
  47    ctx->epoll_enabled = false;
  48    close(ctx->epollfd);
  49}
  50
  51static inline int epoll_events_from_pfd(int pfd_events)
  52{
  53    return (pfd_events & G_IO_IN ? EPOLLIN : 0) |
  54           (pfd_events & G_IO_OUT ? EPOLLOUT : 0) |
  55           (pfd_events & G_IO_HUP ? EPOLLHUP : 0) |
  56           (pfd_events & G_IO_ERR ? EPOLLERR : 0);
  57}
  58
  59static bool aio_epoll_try_enable(AioContext *ctx)
  60{
  61    AioHandler *node;
  62    struct epoll_event event;
  63
  64    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
  65        int r;
  66        if (node->deleted || !node->pfd.events) {
  67            continue;
  68        }
  69        event.events = epoll_events_from_pfd(node->pfd.events);
  70        event.data.ptr = node;
  71        r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event);
  72        if (r) {
  73            return false;
  74        }
  75    }
  76    ctx->epoll_enabled = true;
  77    return true;
  78}
  79
  80static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
  81{
  82    struct epoll_event event;
  83    int r;
  84
  85    if (!ctx->epoll_enabled) {
  86        return;
  87    }
  88    if (!node->pfd.events) {
  89        r = epoll_ctl(ctx->epollfd, EPOLL_CTL_DEL, node->pfd.fd, &event);
  90        if (r) {
  91            aio_epoll_disable(ctx);
  92        }
  93    } else {
  94        event.data.ptr = node;
  95        event.events = epoll_events_from_pfd(node->pfd.events);
  96        if (is_new) {
  97            r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event);
  98            if (r) {
  99                aio_epoll_disable(ctx);
 100            }
 101        } else {
 102            r = epoll_ctl(ctx->epollfd, EPOLL_CTL_MOD, node->pfd.fd, &event);
 103            if (r) {
 104                aio_epoll_disable(ctx);
 105            }
 106        }
 107    }
 108}
 109
 110static int aio_epoll(AioContext *ctx, GPollFD *pfds,
 111                     unsigned npfd, int64_t timeout)
 112{
 113    AioHandler *node;
 114    int i, ret = 0;
 115    struct epoll_event events[128];
 116
 117    assert(npfd == 1);
 118    assert(pfds[0].fd == ctx->epollfd);
 119    if (timeout > 0) {
 120        ret = qemu_poll_ns(pfds, npfd, timeout);
 121    }
 122    if (timeout <= 0 || ret > 0) {
 123        ret = epoll_wait(ctx->epollfd, events,
 124                         sizeof(events) / sizeof(events[0]),
 125                         timeout);
 126        if (ret <= 0) {
 127            goto out;
 128        }
 129        for (i = 0; i < ret; i++) {
 130            int ev = events[i].events;
 131            node = events[i].data.ptr;
 132            node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) |
 133                (ev & EPOLLOUT ? G_IO_OUT : 0) |
 134                (ev & EPOLLHUP ? G_IO_HUP : 0) |
 135                (ev & EPOLLERR ? G_IO_ERR : 0);
 136        }
 137    }
 138out:
 139    return ret;
 140}
 141
 142static bool aio_epoll_enabled(AioContext *ctx)
 143{
 144    /* Fall back to ppoll when external clients are disabled. */
 145    return !aio_external_disabled(ctx) && ctx->epoll_enabled;
 146}
 147
 148static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
 149                                 unsigned npfd, int64_t timeout)
 150{
 151    if (!ctx->epoll_available) {
 152        return false;
 153    }
 154    if (aio_epoll_enabled(ctx)) {
 155        return true;
 156    }
 157    if (npfd >= EPOLL_ENABLE_THRESHOLD) {
 158        if (aio_epoll_try_enable(ctx)) {
 159            return true;
 160        } else {
 161            aio_epoll_disable(ctx);
 162        }
 163    }
 164    return false;
 165}
 166
 167#else
 168
 169static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new)
 170{
 171}
 172
 173static int aio_epoll(AioContext *ctx, GPollFD *pfds,
 174                     unsigned npfd, int64_t timeout)
 175{
 176    assert(false);
 177}
 178
 179static bool aio_epoll_enabled(AioContext *ctx)
 180{
 181    return false;
 182}
 183
 184static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds,
 185                          unsigned npfd, int64_t timeout)
 186{
 187    return false;
 188}
 189
 190#endif
 191
 192static AioHandler *find_aio_handler(AioContext *ctx, int fd)
 193{
 194    AioHandler *node;
 195
 196    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 197        if (node->pfd.fd == fd)
 198            if (!node->deleted)
 199                return node;
 200    }
 201
 202    return NULL;
 203}
 204
 205void aio_set_fd_handler(AioContext *ctx,
 206                        int fd,
 207                        bool is_external,
 208                        IOHandler *io_read,
 209                        IOHandler *io_write,
 210                        void *opaque)
 211{
 212    AioHandler *node;
 213    bool is_new = false;
 214    bool deleted = false;
 215
 216    node = find_aio_handler(ctx, fd);
 217
 218    /* Are we deleting the fd handler? */
 219    if (!io_read && !io_write) {
 220        if (node) {
 221            g_source_remove_poll(&ctx->source, &node->pfd);
 222
 223            /* If the lock is held, just mark the node as deleted */
 224            if (ctx->walking_handlers) {
 225                node->deleted = 1;
 226                node->pfd.revents = 0;
 227            } else {
 228                /* Otherwise, delete it for real.  We can't just mark it as
 229                 * deleted because deleted nodes are only cleaned up after
 230                 * releasing the walking_handlers lock.
 231                 */
 232                QLIST_REMOVE(node, node);
 233                deleted = true;
 234            }
 235        }
 236    } else {
 237        if (node == NULL) {
 238            /* Alloc and insert if it's not already there */
 239            node = g_new0(AioHandler, 1);
 240            node->pfd.fd = fd;
 241            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
 242
 243            g_source_add_poll(&ctx->source, &node->pfd);
 244            is_new = true;
 245        }
 246        /* Update handler with latest information */
 247        node->io_read = io_read;
 248        node->io_write = io_write;
 249        node->opaque = opaque;
 250        node->is_external = is_external;
 251
 252        node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
 253        node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
 254    }
 255
 256    aio_epoll_update(ctx, node, is_new);
 257    aio_notify(ctx);
 258    if (deleted) {
 259        g_free(node);
 260    }
 261}
 262
 263void aio_set_event_notifier(AioContext *ctx,
 264                            EventNotifier *notifier,
 265                            bool is_external,
 266                            EventNotifierHandler *io_read)
 267{
 268    aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
 269                       is_external, (IOHandler *)io_read, NULL, notifier);
 270}
 271
 272bool aio_prepare(AioContext *ctx)
 273{
 274    return false;
 275}
 276
 277bool aio_pending(AioContext *ctx)
 278{
 279    AioHandler *node;
 280
 281    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 282        int revents;
 283
 284        revents = node->pfd.revents & node->pfd.events;
 285        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
 286            aio_node_check(ctx, node->is_external)) {
 287            return true;
 288        }
 289        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
 290            aio_node_check(ctx, node->is_external)) {
 291            return true;
 292        }
 293    }
 294
 295    return false;
 296}
 297
 298bool aio_dispatch(AioContext *ctx)
 299{
 300    AioHandler *node;
 301    bool progress = false;
 302
 303    /*
 304     * If there are callbacks left that have been queued, we need to call them.
 305     * Do not call select in this case, because it is possible that the caller
 306     * does not need a complete flush (as is the case for aio_poll loops).
 307     */
 308    if (aio_bh_poll(ctx)) {
 309        progress = true;
 310    }
 311
 312    /*
 313     * We have to walk very carefully in case aio_set_fd_handler is
 314     * called while we're walking.
 315     */
 316    node = QLIST_FIRST(&ctx->aio_handlers);
 317    while (node) {
 318        AioHandler *tmp;
 319        int revents;
 320
 321        ctx->walking_handlers++;
 322
 323        revents = node->pfd.revents & node->pfd.events;
 324        node->pfd.revents = 0;
 325
 326        if (!node->deleted &&
 327            (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
 328            aio_node_check(ctx, node->is_external) &&
 329            node->io_read) {
 330            node->io_read(node->opaque);
 331
 332            /* aio_notify() does not count as progress */
 333            if (node->opaque != &ctx->notifier) {
 334                progress = true;
 335            }
 336        }
 337        if (!node->deleted &&
 338            (revents & (G_IO_OUT | G_IO_ERR)) &&
 339            aio_node_check(ctx, node->is_external) &&
 340            node->io_write) {
 341            node->io_write(node->opaque);
 342            progress = true;
 343        }
 344
 345        tmp = node;
 346        node = QLIST_NEXT(node, node);
 347
 348        ctx->walking_handlers--;
 349
 350        if (!ctx->walking_handlers && tmp->deleted) {
 351            QLIST_REMOVE(tmp, node);
 352            g_free(tmp);
 353        }
 354    }
 355
 356    /* Run our timers */
 357    progress |= timerlistgroup_run_timers(&ctx->tlg);
 358
 359    return progress;
 360}
 361
 362/* These thread-local variables are used only in a small part of aio_poll
 363 * around the call to the poll() system call.  In particular they are not
 364 * used while aio_poll is performing callbacks, which makes it much easier
 365 * to think about reentrancy!
 366 *
 367 * Stack-allocated arrays would be perfect but they have size limitations;
 368 * heap allocation is expensive enough that we want to reuse arrays across
 369 * calls to aio_poll().  And because poll() has to be called without holding
 370 * any lock, the arrays cannot be stored in AioContext.  Thread-local data
 371 * has none of the disadvantages of these three options.
 372 */
 373static __thread GPollFD *pollfds;
 374static __thread AioHandler **nodes;
 375static __thread unsigned npfd, nalloc;
 376static __thread Notifier pollfds_cleanup_notifier;
 377
 378static void pollfds_cleanup(Notifier *n, void *unused)
 379{
 380    g_assert(npfd == 0);
 381    g_free(pollfds);
 382    g_free(nodes);
 383    nalloc = 0;
 384}
 385
 386static void add_pollfd(AioHandler *node)
 387{
 388    if (npfd == nalloc) {
 389        if (nalloc == 0) {
 390            pollfds_cleanup_notifier.notify = pollfds_cleanup;
 391            qemu_thread_atexit_add(&pollfds_cleanup_notifier);
 392            nalloc = 8;
 393        } else {
 394            g_assert(nalloc <= INT_MAX);
 395            nalloc *= 2;
 396        }
 397        pollfds = g_renew(GPollFD, pollfds, nalloc);
 398        nodes = g_renew(AioHandler *, nodes, nalloc);
 399    }
 400    nodes[npfd] = node;
 401    pollfds[npfd] = (GPollFD) {
 402        .fd = node->pfd.fd,
 403        .events = node->pfd.events,
 404    };
 405    npfd++;
 406}
 407
 408bool aio_poll(AioContext *ctx, bool blocking)
 409{
 410    AioHandler *node;
 411    int i, ret;
 412    bool progress;
 413    int64_t timeout;
 414
 415    aio_context_acquire(ctx);
 416    progress = false;
 417
 418    /* aio_notify can avoid the expensive event_notifier_set if
 419     * everything (file descriptors, bottom halves, timers) will
 420     * be re-evaluated before the next blocking poll().  This is
 421     * already true when aio_poll is called with blocking == false;
 422     * if blocking == true, it is only true after poll() returns,
 423     * so disable the optimization now.
 424     */
 425    if (blocking) {
 426        atomic_add(&ctx->notify_me, 2);
 427    }
 428
 429    ctx->walking_handlers++;
 430
 431    assert(npfd == 0);
 432
 433    /* fill pollfds */
 434    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 435        if (!node->deleted && node->pfd.events
 436            && !aio_epoll_enabled(ctx)
 437            && aio_node_check(ctx, node->is_external)) {
 438            add_pollfd(node);
 439        }
 440    }
 441
 442    timeout = blocking ? aio_compute_timeout(ctx) : 0;
 443
 444    /* wait until next event */
 445    if (timeout) {
 446        aio_context_release(ctx);
 447    }
 448    if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
 449        AioHandler epoll_handler;
 450
 451        epoll_handler.pfd.fd = ctx->epollfd;
 452        epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR;
 453        npfd = 0;
 454        add_pollfd(&epoll_handler);
 455        ret = aio_epoll(ctx, pollfds, npfd, timeout);
 456    } else  {
 457        ret = qemu_poll_ns(pollfds, npfd, timeout);
 458    }
 459    if (blocking) {
 460        atomic_sub(&ctx->notify_me, 2);
 461    }
 462    if (timeout) {
 463        aio_context_acquire(ctx);
 464    }
 465
 466    aio_notify_accept(ctx);
 467
 468    /* if we have any readable fds, dispatch event */
 469    if (ret > 0) {
 470        for (i = 0; i < npfd; i++) {
 471            nodes[i]->pfd.revents = pollfds[i].revents;
 472        }
 473    }
 474
 475    npfd = 0;
 476    ctx->walking_handlers--;
 477
 478    /* Run dispatch even if there were no readable fds to run timers */
 479    if (aio_dispatch(ctx)) {
 480        progress = true;
 481    }
 482
 483    aio_context_release(ctx);
 484
 485    return progress;
 486}
 487
 488void aio_context_setup(AioContext *ctx, Error **errp)
 489{
 490#ifdef CONFIG_EPOLL_CREATE1
 491    assert(!ctx->epollfd);
 492    ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
 493    if (ctx->epollfd == -1) {
 494        ctx->epoll_available = false;
 495    } else {
 496        ctx->epoll_available = true;
 497    }
 498#endif
 499}
 500