qemu/aio-posix.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include "qemu-common.h"
  17#include "block/block.h"
  18#include "qemu/queue.h"
  19#include "qemu/sockets.h"
  20
  21struct AioHandler
  22{
  23    GPollFD pfd;
  24    IOHandler *io_read;
  25    IOHandler *io_write;
  26    AioFlushHandler *io_flush;
  27    int deleted;
  28    void *opaque;
  29    QLIST_ENTRY(AioHandler) node;
  30};
  31
  32static AioHandler *find_aio_handler(AioContext *ctx, int fd)
  33{
  34    AioHandler *node;
  35
  36    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
  37        if (node->pfd.fd == fd)
  38            if (!node->deleted)
  39                return node;
  40    }
  41
  42    return NULL;
  43}
  44
  45void aio_set_fd_handler(AioContext *ctx,
  46                        int fd,
  47                        IOHandler *io_read,
  48                        IOHandler *io_write,
  49                        AioFlushHandler *io_flush,
  50                        void *opaque)
  51{
  52    AioHandler *node;
  53
  54    node = find_aio_handler(ctx, fd);
  55
  56    /* Are we deleting the fd handler? */
  57    if (!io_read && !io_write) {
  58        if (node) {
  59            g_source_remove_poll(&ctx->source, &node->pfd);
  60
  61            /* If the lock is held, just mark the node as deleted */
  62            if (ctx->walking_handlers) {
  63                node->deleted = 1;
  64                node->pfd.revents = 0;
  65            } else {
  66                /* Otherwise, delete it for real.  We can't just mark it as
  67                 * deleted because deleted nodes are only cleaned up after
  68                 * releasing the walking_handlers lock.
  69                 */
  70                QLIST_REMOVE(node, node);
  71                g_free(node);
  72            }
  73        }
  74    } else {
  75        if (node == NULL) {
  76            /* Alloc and insert if it's not already there */
  77            node = g_malloc0(sizeof(AioHandler));
  78            node->pfd.fd = fd;
  79            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
  80
  81            g_source_add_poll(&ctx->source, &node->pfd);
  82        }
  83        /* Update handler with latest information */
  84        node->io_read = io_read;
  85        node->io_write = io_write;
  86        node->io_flush = io_flush;
  87        node->opaque = opaque;
  88
  89        node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP : 0);
  90        node->pfd.events |= (io_write ? G_IO_OUT : 0);
  91    }
  92
  93    aio_notify(ctx);
  94}
  95
  96void aio_set_event_notifier(AioContext *ctx,
  97                            EventNotifier *notifier,
  98                            EventNotifierHandler *io_read,
  99                            AioFlushEventNotifierHandler *io_flush)
 100{
 101    aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
 102                       (IOHandler *)io_read, NULL,
 103                       (AioFlushHandler *)io_flush, notifier);
 104}
 105
 106bool aio_pending(AioContext *ctx)
 107{
 108    AioHandler *node;
 109
 110    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 111        int revents;
 112
 113        /*
 114         * FIXME: right now we cannot get G_IO_HUP and G_IO_ERR because
 115         * main-loop.c is still select based (due to the slirp legacy).
 116         * If main-loop.c ever switches to poll, G_IO_ERR should be
 117         * tested too.  Dispatching G_IO_ERR to both handlers should be
 118         * okay, since handlers need to be ready for spurious wakeups.
 119         */
 120        revents = node->pfd.revents & node->pfd.events;
 121        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
 122            return true;
 123        }
 124        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
 125            return true;
 126        }
 127    }
 128
 129    return false;
 130}
 131
 132bool aio_poll(AioContext *ctx, bool blocking)
 133{
 134    static struct timeval tv0;
 135    AioHandler *node;
 136    fd_set rdfds, wrfds;
 137    int max_fd = -1;
 138    int ret;
 139    bool busy, progress;
 140
 141    progress = false;
 142
 143    /*
 144     * If there are callbacks left that have been queued, we need to call then.
 145     * Do not call select in this case, because it is possible that the caller
 146     * does not need a complete flush (as is the case for qemu_aio_wait loops).
 147     */
 148    if (aio_bh_poll(ctx)) {
 149        blocking = false;
 150        progress = true;
 151    }
 152
 153    /*
 154     * Then dispatch any pending callbacks from the GSource.
 155     *
 156     * We have to walk very carefully in case qemu_aio_set_fd_handler is
 157     * called while we're walking.
 158     */
 159    node = QLIST_FIRST(&ctx->aio_handlers);
 160    while (node) {
 161        AioHandler *tmp;
 162        int revents;
 163
 164        ctx->walking_handlers++;
 165
 166        revents = node->pfd.revents & node->pfd.events;
 167        node->pfd.revents = 0;
 168
 169        /* See comment in aio_pending.  */
 170        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
 171            node->io_read(node->opaque);
 172            progress = true;
 173        }
 174        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
 175            node->io_write(node->opaque);
 176            progress = true;
 177        }
 178
 179        tmp = node;
 180        node = QLIST_NEXT(node, node);
 181
 182        ctx->walking_handlers--;
 183
 184        if (!ctx->walking_handlers && tmp->deleted) {
 185            QLIST_REMOVE(tmp, node);
 186            g_free(tmp);
 187        }
 188    }
 189
 190    if (progress && !blocking) {
 191        return true;
 192    }
 193
 194    ctx->walking_handlers++;
 195
 196    FD_ZERO(&rdfds);
 197    FD_ZERO(&wrfds);
 198
 199    /* fill fd sets */
 200    busy = false;
 201    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 202        /* If there aren't pending AIO operations, don't invoke callbacks.
 203         * Otherwise, if there are no AIO requests, qemu_aio_wait() would
 204         * wait indefinitely.
 205         */
 206        if (!node->deleted && node->io_flush) {
 207            if (node->io_flush(node->opaque) == 0) {
 208                continue;
 209            }
 210            busy = true;
 211        }
 212        if (!node->deleted && node->io_read) {
 213            FD_SET(node->pfd.fd, &rdfds);
 214            max_fd = MAX(max_fd, node->pfd.fd + 1);
 215        }
 216        if (!node->deleted && node->io_write) {
 217            FD_SET(node->pfd.fd, &wrfds);
 218            max_fd = MAX(max_fd, node->pfd.fd + 1);
 219        }
 220    }
 221
 222    ctx->walking_handlers--;
 223
 224    /* No AIO operations?  Get us out of here */
 225    if (!busy) {
 226        return progress;
 227    }
 228
 229    /* wait until next event */
 230    ret = select(max_fd, &rdfds, &wrfds, NULL, blocking ? NULL : &tv0);
 231
 232    /* if we have any readable fds, dispatch event */
 233    if (ret > 0) {
 234        /* we have to walk very carefully in case
 235         * qemu_aio_set_fd_handler is called while we're walking */
 236        node = QLIST_FIRST(&ctx->aio_handlers);
 237        while (node) {
 238            AioHandler *tmp;
 239
 240            ctx->walking_handlers++;
 241
 242            if (!node->deleted &&
 243                FD_ISSET(node->pfd.fd, &rdfds) &&
 244                node->io_read) {
 245                node->io_read(node->opaque);
 246                progress = true;
 247            }
 248            if (!node->deleted &&
 249                FD_ISSET(node->pfd.fd, &wrfds) &&
 250                node->io_write) {
 251                node->io_write(node->opaque);
 252                progress = true;
 253            }
 254
 255            tmp = node;
 256            node = QLIST_NEXT(node, node);
 257
 258            ctx->walking_handlers--;
 259
 260            if (!ctx->walking_handlers && tmp->deleted) {
 261                QLIST_REMOVE(tmp, node);
 262                g_free(tmp);
 263            }
 264        }
 265    }
 266
 267    assert(progress || busy);
 268    return true;
 269}
 270