qemu/aio-posix.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include "qemu-common.h"
  17#include "block/block.h"
  18#include "qemu/queue.h"
  19#include "qemu/sockets.h"
  20
  21struct AioHandler
  22{
  23    GPollFD pfd;
  24    IOHandler *io_read;
  25    IOHandler *io_write;
  26    int deleted;
  27    int pollfds_idx;
  28    void *opaque;
  29    QLIST_ENTRY(AioHandler) node;
  30};
  31
  32static AioHandler *find_aio_handler(AioContext *ctx, int fd)
  33{
  34    AioHandler *node;
  35
  36    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
  37        if (node->pfd.fd == fd)
  38            if (!node->deleted)
  39                return node;
  40    }
  41
  42    return NULL;
  43}
  44
  45void aio_set_fd_handler(AioContext *ctx,
  46                        int fd,
  47                        IOHandler *io_read,
  48                        IOHandler *io_write,
  49                        void *opaque)
  50{
  51    AioHandler *node;
  52
  53    node = find_aio_handler(ctx, fd);
  54
  55    /* Are we deleting the fd handler? */
  56    if (!io_read && !io_write) {
  57        if (node) {
  58            g_source_remove_poll(&ctx->source, &node->pfd);
  59
  60            /* If the lock is held, just mark the node as deleted */
  61            if (ctx->walking_handlers) {
  62                node->deleted = 1;
  63                node->pfd.revents = 0;
  64            } else {
  65                /* Otherwise, delete it for real.  We can't just mark it as
  66                 * deleted because deleted nodes are only cleaned up after
  67                 * releasing the walking_handlers lock.
  68                 */
  69                QLIST_REMOVE(node, node);
  70                g_free(node);
  71            }
  72        }
  73    } else {
  74        if (node == NULL) {
  75            /* Alloc and insert if it's not already there */
  76            node = g_malloc0(sizeof(AioHandler));
  77            node->pfd.fd = fd;
  78            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
  79
  80            g_source_add_poll(&ctx->source, &node->pfd);
  81        }
  82        /* Update handler with latest information */
  83        node->io_read = io_read;
  84        node->io_write = io_write;
  85        node->opaque = opaque;
  86        node->pollfds_idx = -1;
  87
  88        node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
  89        node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
  90    }
  91
  92    aio_notify(ctx);
  93}
  94
  95void aio_set_event_notifier(AioContext *ctx,
  96                            EventNotifier *notifier,
  97                            EventNotifierHandler *io_read)
  98{
  99    aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
 100                       (IOHandler *)io_read, NULL, notifier);
 101}
 102
 103bool aio_pending(AioContext *ctx)
 104{
 105    AioHandler *node;
 106
 107    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 108        int revents;
 109
 110        revents = node->pfd.revents & node->pfd.events;
 111        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
 112            return true;
 113        }
 114        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
 115            return true;
 116        }
 117    }
 118
 119    return false;
 120}
 121
 122static bool aio_dispatch(AioContext *ctx)
 123{
 124    AioHandler *node;
 125    bool progress = false;
 126
 127    /*
 128     * We have to walk very carefully in case qemu_aio_set_fd_handler is
 129     * called while we're walking.
 130     */
 131    node = QLIST_FIRST(&ctx->aio_handlers);
 132    while (node) {
 133        AioHandler *tmp;
 134        int revents;
 135
 136        ctx->walking_handlers++;
 137
 138        revents = node->pfd.revents & node->pfd.events;
 139        node->pfd.revents = 0;
 140
 141        if (!node->deleted &&
 142            (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
 143            node->io_read) {
 144            node->io_read(node->opaque);
 145
 146            /* aio_notify() does not count as progress */
 147            if (node->opaque != &ctx->notifier) {
 148                progress = true;
 149            }
 150        }
 151        if (!node->deleted &&
 152            (revents & (G_IO_OUT | G_IO_ERR)) &&
 153            node->io_write) {
 154            node->io_write(node->opaque);
 155            progress = true;
 156        }
 157
 158        tmp = node;
 159        node = QLIST_NEXT(node, node);
 160
 161        ctx->walking_handlers--;
 162
 163        if (!ctx->walking_handlers && tmp->deleted) {
 164            QLIST_REMOVE(tmp, node);
 165            g_free(tmp);
 166        }
 167    }
 168
 169    /* Run our timers */
 170    progress |= timerlistgroup_run_timers(&ctx->tlg);
 171
 172    return progress;
 173}
 174
 175bool aio_poll(AioContext *ctx, bool blocking)
 176{
 177    AioHandler *node;
 178    int ret;
 179    bool progress;
 180
 181    progress = false;
 182
 183    /*
 184     * If there are callbacks left that have been queued, we need to call them.
 185     * Do not call select in this case, because it is possible that the caller
 186     * does not need a complete flush (as is the case for qemu_aio_wait loops).
 187     */
 188    if (aio_bh_poll(ctx)) {
 189        blocking = false;
 190        progress = true;
 191    }
 192
 193    if (aio_dispatch(ctx)) {
 194        progress = true;
 195    }
 196
 197    if (progress && !blocking) {
 198        return true;
 199    }
 200
 201    ctx->walking_handlers++;
 202
 203    g_array_set_size(ctx->pollfds, 0);
 204
 205    /* fill pollfds */
 206    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 207        node->pollfds_idx = -1;
 208        if (!node->deleted && node->pfd.events) {
 209            GPollFD pfd = {
 210                .fd = node->pfd.fd,
 211                .events = node->pfd.events,
 212            };
 213            node->pollfds_idx = ctx->pollfds->len;
 214            g_array_append_val(ctx->pollfds, pfd);
 215        }
 216    }
 217
 218    ctx->walking_handlers--;
 219
 220    /* wait until next event */
 221    ret = qemu_poll_ns((GPollFD *)ctx->pollfds->data,
 222                         ctx->pollfds->len,
 223                         blocking ? timerlistgroup_deadline_ns(&ctx->tlg) : 0);
 224
 225    /* if we have any readable fds, dispatch event */
 226    if (ret > 0) {
 227        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 228            if (node->pollfds_idx != -1) {
 229                GPollFD *pfd = &g_array_index(ctx->pollfds, GPollFD,
 230                                              node->pollfds_idx);
 231                node->pfd.revents = pfd->revents;
 232            }
 233        }
 234    }
 235
 236    /* Run dispatch even if there were no readable fds to run timers */
 237    if (aio_dispatch(ctx)) {
 238        progress = true;
 239    }
 240
 241    return progress;
 242}
 243