qemu/util/aio-win32.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM Corp., 2008
   5 * Copyright Red Hat Inc., 2012
   6 *
   7 * Authors:
   8 *  Anthony Liguori   <aliguori@us.ibm.com>
   9 *  Paolo Bonzini     <pbonzini@redhat.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2.  See
  12 * the COPYING file in the top-level directory.
  13 *
  14 * Contributions after 2012-01-13 are licensed under the terms of the
  15 * GNU GPL, version 2 or (at your option) any later version.
  16 */
  17
  18#include "qemu/osdep.h"
  19#include "qemu-common.h"
  20#include "block/block.h"
  21#include "qemu/queue.h"
  22#include "qemu/sockets.h"
  23#include "qapi/error.h"
  24#include "qemu/rcu_queue.h"
  25
  26struct AioHandler {
  27    EventNotifier *e;
  28    IOHandler *io_read;
  29    IOHandler *io_write;
  30    EventNotifierHandler *io_notify;
  31    GPollFD pfd;
  32    int deleted;
  33    void *opaque;
  34    bool is_external;
  35    QLIST_ENTRY(AioHandler) node;
  36};
  37
  38static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
  39{
  40    /* If aio_poll is in progress, just mark the node as deleted */
  41    if (qemu_lockcnt_count(&ctx->list_lock)) {
  42        node->deleted = 1;
  43        node->pfd.revents = 0;
  44    } else {
  45        /* Otherwise, delete it for real.  We can't just mark it as
  46         * deleted because deleted nodes are only cleaned up after
  47         * releasing the list_lock.
  48         */
  49        QLIST_REMOVE(node, node);
  50        g_free(node);
  51    }
  52}
  53
  54void aio_set_fd_handler(AioContext *ctx,
  55                        int fd,
  56                        bool is_external,
  57                        IOHandler *io_read,
  58                        IOHandler *io_write,
  59                        AioPollFn *io_poll,
  60                        void *opaque)
  61{
  62    /* fd is a SOCKET in our case */
  63    AioHandler *old_node;
  64    AioHandler *node = NULL;
  65
  66    qemu_lockcnt_lock(&ctx->list_lock);
  67    QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
  68        if (old_node->pfd.fd == fd && !old_node->deleted) {
  69            break;
  70        }
  71    }
  72
  73    if (io_read || io_write) {
  74        HANDLE event;
  75        long bitmask = 0;
  76
  77        /* Alloc and insert if it's not already there */
  78        node = g_new0(AioHandler, 1);
  79        node->pfd.fd = fd;
  80
  81        node->pfd.events = 0;
  82        if (node->io_read) {
  83            node->pfd.events |= G_IO_IN;
  84        }
  85        if (node->io_write) {
  86            node->pfd.events |= G_IO_OUT;
  87        }
  88
  89        node->e = &ctx->notifier;
  90
  91        /* Update handler with latest information */
  92        node->opaque = opaque;
  93        node->io_read = io_read;
  94        node->io_write = io_write;
  95        node->is_external = is_external;
  96
  97        if (io_read) {
  98            bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
  99        }
 100
 101        if (io_write) {
 102            bitmask |= FD_WRITE | FD_CONNECT;
 103        }
 104
 105        QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 106        event = event_notifier_get_handle(&ctx->notifier);
 107        WSAEventSelect(node->pfd.fd, event, bitmask);
 108    }
 109    if (old_node) {
 110        aio_remove_fd_handler(ctx, old_node);
 111    }
 112
 113    qemu_lockcnt_unlock(&ctx->list_lock);
 114    aio_notify(ctx);
 115}
 116
 117void aio_set_fd_poll(AioContext *ctx, int fd,
 118                     IOHandler *io_poll_begin,
 119                     IOHandler *io_poll_end)
 120{
 121    /* Not implemented */
 122}
 123
 124void aio_set_event_notifier(AioContext *ctx,
 125                            EventNotifier *e,
 126                            bool is_external,
 127                            EventNotifierHandler *io_notify,
 128                            AioPollFn *io_poll)
 129{
 130    AioHandler *node;
 131
 132    qemu_lockcnt_lock(&ctx->list_lock);
 133    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 134        if (node->e == e && !node->deleted) {
 135            break;
 136        }
 137    }
 138
 139    /* Are we deleting the fd handler? */
 140    if (!io_notify) {
 141        if (node) {
 142            g_source_remove_poll(&ctx->source, &node->pfd);
 143
 144            aio_remove_fd_handler(ctx, node);
 145        }
 146    } else {
 147        if (node == NULL) {
 148            /* Alloc and insert if it's not already there */
 149            node = g_new0(AioHandler, 1);
 150            node->e = e;
 151            node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
 152            node->pfd.events = G_IO_IN;
 153            node->is_external = is_external;
 154            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 155
 156            g_source_add_poll(&ctx->source, &node->pfd);
 157        }
 158        /* Update handler with latest information */
 159        node->io_notify = io_notify;
 160    }
 161
 162    qemu_lockcnt_unlock(&ctx->list_lock);
 163    aio_notify(ctx);
 164}
 165
 166void aio_set_event_notifier_poll(AioContext *ctx,
 167                                 EventNotifier *notifier,
 168                                 EventNotifierHandler *io_poll_begin,
 169                                 EventNotifierHandler *io_poll_end)
 170{
 171    /* Not implemented */
 172}
 173
 174bool aio_prepare(AioContext *ctx)
 175{
 176    static struct timeval tv0;
 177    AioHandler *node;
 178    bool have_select_revents = false;
 179    fd_set rfds, wfds;
 180
 181    /*
 182     * We have to walk very carefully in case aio_set_fd_handler is
 183     * called while we're walking.
 184     */
 185    qemu_lockcnt_inc(&ctx->list_lock);
 186
 187    /* fill fd sets */
 188    FD_ZERO(&rfds);
 189    FD_ZERO(&wfds);
 190    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 191        if (node->io_read) {
 192            FD_SET ((SOCKET)node->pfd.fd, &rfds);
 193        }
 194        if (node->io_write) {
 195            FD_SET ((SOCKET)node->pfd.fd, &wfds);
 196        }
 197    }
 198
 199    if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
 200        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 201            node->pfd.revents = 0;
 202            if (FD_ISSET(node->pfd.fd, &rfds)) {
 203                node->pfd.revents |= G_IO_IN;
 204                have_select_revents = true;
 205            }
 206
 207            if (FD_ISSET(node->pfd.fd, &wfds)) {
 208                node->pfd.revents |= G_IO_OUT;
 209                have_select_revents = true;
 210            }
 211        }
 212    }
 213
 214    qemu_lockcnt_dec(&ctx->list_lock);
 215    return have_select_revents;
 216}
 217
 218bool aio_pending(AioContext *ctx)
 219{
 220    AioHandler *node;
 221    bool result = false;
 222
 223    /*
 224     * We have to walk very carefully in case aio_set_fd_handler is
 225     * called while we're walking.
 226     */
 227    qemu_lockcnt_inc(&ctx->list_lock);
 228    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 229        if (node->pfd.revents && node->io_notify) {
 230            result = true;
 231            break;
 232        }
 233
 234        if ((node->pfd.revents & G_IO_IN) && node->io_read) {
 235            result = true;
 236            break;
 237        }
 238        if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
 239            result = true;
 240            break;
 241        }
 242    }
 243
 244    qemu_lockcnt_dec(&ctx->list_lock);
 245    return result;
 246}
 247
 248static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 249{
 250    AioHandler *node;
 251    bool progress = false;
 252    AioHandler *tmp;
 253
 254    /*
 255     * We have to walk very carefully in case aio_set_fd_handler is
 256     * called while we're walking.
 257     */
 258    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
 259        int revents = node->pfd.revents;
 260
 261        if (!node->deleted &&
 262            (revents || event_notifier_get_handle(node->e) == event) &&
 263            node->io_notify) {
 264            node->pfd.revents = 0;
 265            node->io_notify(node->e);
 266
 267            /* aio_notify() does not count as progress */
 268            if (node->e != &ctx->notifier) {
 269                progress = true;
 270            }
 271        }
 272
 273        if (!node->deleted &&
 274            (node->io_read || node->io_write)) {
 275            node->pfd.revents = 0;
 276            if ((revents & G_IO_IN) && node->io_read) {
 277                node->io_read(node->opaque);
 278                progress = true;
 279            }
 280            if ((revents & G_IO_OUT) && node->io_write) {
 281                node->io_write(node->opaque);
 282                progress = true;
 283            }
 284
 285            /* if the next select() will return an event, we have progressed */
 286            if (event == event_notifier_get_handle(&ctx->notifier)) {
 287                WSANETWORKEVENTS ev;
 288                WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
 289                if (ev.lNetworkEvents) {
 290                    progress = true;
 291                }
 292            }
 293        }
 294
 295        if (node->deleted) {
 296            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 297                QLIST_REMOVE(node, node);
 298                g_free(node);
 299                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 300            }
 301        }
 302    }
 303
 304    return progress;
 305}
 306
 307void aio_dispatch(AioContext *ctx)
 308{
 309    qemu_lockcnt_inc(&ctx->list_lock);
 310    aio_bh_poll(ctx);
 311    aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
 312    qemu_lockcnt_dec(&ctx->list_lock);
 313    timerlistgroup_run_timers(&ctx->tlg);
 314}
 315
 316bool aio_poll(AioContext *ctx, bool blocking)
 317{
 318    AioHandler *node;
 319    HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
 320    bool progress, have_select_revents, first;
 321    int count;
 322    int timeout;
 323
 324    progress = false;
 325
 326    /* aio_notify can avoid the expensive event_notifier_set if
 327     * everything (file descriptors, bottom halves, timers) will
 328     * be re-evaluated before the next blocking poll().  This is
 329     * already true when aio_poll is called with blocking == false;
 330     * if blocking == true, it is only true after poll() returns,
 331     * so disable the optimization now.
 332     */
 333    if (blocking) {
 334        atomic_add(&ctx->notify_me, 2);
 335    }
 336
 337    qemu_lockcnt_inc(&ctx->list_lock);
 338    have_select_revents = aio_prepare(ctx);
 339
 340    /* fill fd sets */
 341    count = 0;
 342    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 343        if (!node->deleted && node->io_notify
 344            && aio_node_check(ctx, node->is_external)) {
 345            events[count++] = event_notifier_get_handle(node->e);
 346        }
 347    }
 348
 349    first = true;
 350
 351    /* ctx->notifier is always registered.  */
 352    assert(count > 0);
 353
 354    /* Multiple iterations, all of them non-blocking except the first,
 355     * may be necessary to process all pending events.  After the first
 356     * WaitForMultipleObjects call ctx->notify_me will be decremented.
 357     */
 358    do {
 359        HANDLE event;
 360        int ret;
 361
 362        timeout = blocking && !have_select_revents
 363            ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
 364        ret = WaitForMultipleObjects(count, events, FALSE, timeout);
 365        if (blocking) {
 366            assert(first);
 367            assert(in_aio_context_home_thread(ctx));
 368            atomic_sub(&ctx->notify_me, 2);
 369            aio_notify_accept(ctx);
 370        }
 371
 372        if (first) {
 373            progress |= aio_bh_poll(ctx);
 374            first = false;
 375        }
 376
 377        /* if we have any signaled events, dispatch event */
 378        event = NULL;
 379        if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
 380            event = events[ret - WAIT_OBJECT_0];
 381            events[ret - WAIT_OBJECT_0] = events[--count];
 382        } else if (!have_select_revents) {
 383            break;
 384        }
 385
 386        have_select_revents = false;
 387        blocking = false;
 388
 389        progress |= aio_dispatch_handlers(ctx, event);
 390    } while (count > 0);
 391
 392    qemu_lockcnt_dec(&ctx->list_lock);
 393
 394    progress |= timerlistgroup_run_timers(&ctx->tlg);
 395    return progress;
 396}
 397
 398void aio_context_setup(AioContext *ctx)
 399{
 400}
 401
 402void aio_context_destroy(AioContext *ctx)
 403{
 404}
 405
 406void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 407                                 int64_t grow, int64_t shrink, Error **errp)
 408{
 409    if (max_ns) {
 410        error_setg(errp, "AioContext polling is not implemented on Windows");
 411    }
 412}
 413