qemu/aio-posix.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include "qemu-common.h"
  17#include "block/block.h"
  18#include "qemu/queue.h"
  19#include "qemu/sockets.h"
  20
  21struct AioHandler
  22{
  23    GPollFD pfd;
  24    IOHandler *io_read;
  25    IOHandler *io_write;
  26    AioFlushHandler *io_flush;
  27    int deleted;
  28    int pollfds_idx;
  29    void *opaque;
  30    QLIST_ENTRY(AioHandler) node;
  31};
  32
  33static AioHandler *find_aio_handler(AioContext *ctx, int fd)
  34{
  35    AioHandler *node;
  36
  37    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
  38        if (node->pfd.fd == fd)
  39            if (!node->deleted)
  40                return node;
  41    }
  42
  43    return NULL;
  44}
  45
  46void aio_set_fd_handler(AioContext *ctx,
  47                        int fd,
  48                        IOHandler *io_read,
  49                        IOHandler *io_write,
  50                        AioFlushHandler *io_flush,
  51                        void *opaque)
  52{
  53    AioHandler *node;
  54
  55    node = find_aio_handler(ctx, fd);
  56
  57    /* Are we deleting the fd handler? */
  58    if (!io_read && !io_write) {
  59        if (node) {
  60            g_source_remove_poll(&ctx->source, &node->pfd);
  61
  62            /* If the lock is held, just mark the node as deleted */
  63            if (ctx->walking_handlers) {
  64                node->deleted = 1;
  65                node->pfd.revents = 0;
  66            } else {
  67                /* Otherwise, delete it for real.  We can't just mark it as
  68                 * deleted because deleted nodes are only cleaned up after
  69                 * releasing the walking_handlers lock.
  70                 */
  71                QLIST_REMOVE(node, node);
  72                g_free(node);
  73            }
  74        }
  75    } else {
  76        if (node == NULL) {
  77            /* Alloc and insert if it's not already there */
  78            node = g_malloc0(sizeof(AioHandler));
  79            node->pfd.fd = fd;
  80            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
  81
  82            g_source_add_poll(&ctx->source, &node->pfd);
  83        }
  84        /* Update handler with latest information */
  85        node->io_read = io_read;
  86        node->io_write = io_write;
  87        node->io_flush = io_flush;
  88        node->opaque = opaque;
  89        node->pollfds_idx = -1;
  90
  91        node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
  92        node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
  93    }
  94
  95    aio_notify(ctx);
  96}
  97
  98void aio_set_event_notifier(AioContext *ctx,
  99                            EventNotifier *notifier,
 100                            EventNotifierHandler *io_read,
 101                            AioFlushEventNotifierHandler *io_flush)
 102{
 103    aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
 104                       (IOHandler *)io_read, NULL,
 105                       (AioFlushHandler *)io_flush, notifier);
 106}
 107
 108bool aio_pending(AioContext *ctx)
 109{
 110    AioHandler *node;
 111
 112    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 113        int revents;
 114
 115        revents = node->pfd.revents & node->pfd.events;
 116        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
 117            return true;
 118        }
 119        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
 120            return true;
 121        }
 122    }
 123
 124    return false;
 125}
 126
 127static bool aio_dispatch(AioContext *ctx)
 128{
 129    AioHandler *node;
 130    bool progress = false;
 131
 132    /*
 133     * We have to walk very carefully in case qemu_aio_set_fd_handler is
 134     * called while we're walking.
 135     */
 136    node = QLIST_FIRST(&ctx->aio_handlers);
 137    while (node) {
 138        AioHandler *tmp;
 139        int revents;
 140
 141        ctx->walking_handlers++;
 142
 143        revents = node->pfd.revents & node->pfd.events;
 144        node->pfd.revents = 0;
 145
 146        if (!node->deleted &&
 147            (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
 148            node->io_read) {
 149            node->io_read(node->opaque);
 150            progress = true;
 151        }
 152        if (!node->deleted &&
 153            (revents & (G_IO_OUT | G_IO_ERR)) &&
 154            node->io_write) {
 155            node->io_write(node->opaque);
 156            progress = true;
 157        }
 158
 159        tmp = node;
 160        node = QLIST_NEXT(node, node);
 161
 162        ctx->walking_handlers--;
 163
 164        if (!ctx->walking_handlers && tmp->deleted) {
 165            QLIST_REMOVE(tmp, node);
 166            g_free(tmp);
 167        }
 168    }
 169    return progress;
 170}
 171
 172bool aio_poll(AioContext *ctx, bool blocking)
 173{
 174    AioHandler *node;
 175    int ret;
 176    bool busy, progress;
 177
 178    progress = false;
 179
 180    /*
 181     * If there are callbacks left that have been queued, we need to call them.
 182     * Do not call select in this case, because it is possible that the caller
 183     * does not need a complete flush (as is the case for qemu_aio_wait loops).
 184     */
 185    if (aio_bh_poll(ctx)) {
 186        blocking = false;
 187        progress = true;
 188    }
 189
 190    if (aio_dispatch(ctx)) {
 191        progress = true;
 192    }
 193
 194    if (progress && !blocking) {
 195        return true;
 196    }
 197
 198    ctx->walking_handlers++;
 199
 200    g_array_set_size(ctx->pollfds, 0);
 201
 202    /* fill pollfds */
 203    busy = false;
 204    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 205        node->pollfds_idx = -1;
 206
 207        /* If there aren't pending AIO operations, don't invoke callbacks.
 208         * Otherwise, if there are no AIO requests, qemu_aio_wait() would
 209         * wait indefinitely.
 210         */
 211        if (!node->deleted && node->io_flush) {
 212            if (node->io_flush(node->opaque) == 0) {
 213                continue;
 214            }
 215            busy = true;
 216        }
 217        if (!node->deleted && node->pfd.events) {
 218            GPollFD pfd = {
 219                .fd = node->pfd.fd,
 220                .events = node->pfd.events,
 221            };
 222            node->pollfds_idx = ctx->pollfds->len;
 223            g_array_append_val(ctx->pollfds, pfd);
 224        }
 225    }
 226
 227    ctx->walking_handlers--;
 228
 229    /* No AIO operations?  Get us out of here */
 230    if (!busy) {
 231        return progress;
 232    }
 233
 234    /* wait until next event */
 235    ret = g_poll((GPollFD *)ctx->pollfds->data,
 236                 ctx->pollfds->len,
 237                 blocking ? -1 : 0);
 238
 239    /* if we have any readable fds, dispatch event */
 240    if (ret > 0) {
 241        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 242            if (node->pollfds_idx != -1) {
 243                GPollFD *pfd = &g_array_index(ctx->pollfds, GPollFD,
 244                                              node->pollfds_idx);
 245                node->pfd.revents = pfd->revents;
 246            }
 247        }
 248        if (aio_dispatch(ctx)) {
 249            progress = true;
 250        }
 251    }
 252
 253    assert(progress || busy);
 254    return true;
 255}
 256