qemu/util/aio-win32.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM Corp., 2008
   5 * Copyright Red Hat Inc., 2012
   6 *
   7 * Authors:
   8 *  Anthony Liguori   <aliguori@us.ibm.com>
   9 *  Paolo Bonzini     <pbonzini@redhat.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2.  See
  12 * the COPYING file in the top-level directory.
  13 *
  14 * Contributions after 2012-01-13 are licensed under the terms of the
  15 * GNU GPL, version 2 or (at your option) any later version.
  16 */
  17
  18#include "qemu/osdep.h"
  19#include "block/block.h"
  20#include "qemu/main-loop.h"
  21#include "qemu/queue.h"
  22#include "qemu/sockets.h"
  23#include "qapi/error.h"
  24#include "qemu/rcu_queue.h"
  25#include "qemu/error-report.h"
  26
  27struct AioHandler {
  28    EventNotifier *e;
  29    IOHandler *io_read;
  30    IOHandler *io_write;
  31    EventNotifierHandler *io_notify;
  32    GPollFD pfd;
  33    int deleted;
  34    void *opaque;
  35    bool is_external;
  36    QLIST_ENTRY(AioHandler) node;
  37};
  38
  39static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
  40{
  41    /*
  42     * If the GSource is in the process of being destroyed then
  43     * g_source_remove_poll() causes an assertion failure.  Skip
  44     * removal in that case, because glib cleans up its state during
  45     * destruction anyway.
  46     */
  47    if (!g_source_is_destroyed(&ctx->source)) {
  48        g_source_remove_poll(&ctx->source, &node->pfd);
  49    }
  50
  51    /* If aio_poll is in progress, just mark the node as deleted */
  52    if (qemu_lockcnt_count(&ctx->list_lock)) {
  53        node->deleted = 1;
  54        node->pfd.revents = 0;
  55    } else {
  56        /* Otherwise, delete it for real.  We can't just mark it as
  57         * deleted because deleted nodes are only cleaned up after
  58         * releasing the list_lock.
  59         */
  60        QLIST_REMOVE(node, node);
  61        g_free(node);
  62    }
  63}
  64
  65void aio_set_fd_handler(AioContext *ctx,
  66                        int fd,
  67                        bool is_external,
  68                        IOHandler *io_read,
  69                        IOHandler *io_write,
  70                        AioPollFn *io_poll,
  71                        IOHandler *io_poll_ready,
  72                        void *opaque)
  73{
  74    AioHandler *old_node;
  75    AioHandler *node = NULL;
  76    SOCKET s;
  77
  78    if (!fd_is_socket(fd)) {
  79        error_report("fd=%d is not a socket, AIO implementation is missing", fd);
  80        return;
  81    }
  82
  83    s = _get_osfhandle(fd);
  84
  85    qemu_lockcnt_lock(&ctx->list_lock);
  86    QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
  87        if (old_node->pfd.fd == s && !old_node->deleted) {
  88            break;
  89        }
  90    }
  91
  92    if (io_read || io_write) {
  93        HANDLE event;
  94        long bitmask = 0;
  95
  96        /* Alloc and insert if it's not already there */
  97        node = g_new0(AioHandler, 1);
  98        node->pfd.fd = s;
  99
 100        node->pfd.events = 0;
 101        if (node->io_read) {
 102            node->pfd.events |= G_IO_IN;
 103        }
 104        if (node->io_write) {
 105            node->pfd.events |= G_IO_OUT;
 106        }
 107
 108        node->e = &ctx->notifier;
 109
 110        /* Update handler with latest information */
 111        node->opaque = opaque;
 112        node->io_read = io_read;
 113        node->io_write = io_write;
 114        node->is_external = is_external;
 115
 116        if (io_read) {
 117            bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
 118        }
 119
 120        if (io_write) {
 121            bitmask |= FD_WRITE | FD_CONNECT;
 122        }
 123
 124        QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 125        event = event_notifier_get_handle(&ctx->notifier);
 126        qemu_socket_select(fd, event, bitmask, NULL);
 127    }
 128    if (old_node) {
 129        aio_remove_fd_handler(ctx, old_node);
 130    }
 131
 132    qemu_lockcnt_unlock(&ctx->list_lock);
 133    aio_notify(ctx);
 134}
 135
 136void aio_set_event_notifier(AioContext *ctx,
 137                            EventNotifier *e,
 138                            bool is_external,
 139                            EventNotifierHandler *io_notify,
 140                            AioPollFn *io_poll,
 141                            EventNotifierHandler *io_poll_ready)
 142{
 143    AioHandler *node;
 144
 145    qemu_lockcnt_lock(&ctx->list_lock);
 146    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 147        if (node->e == e && !node->deleted) {
 148            break;
 149        }
 150    }
 151
 152    /* Are we deleting the fd handler? */
 153    if (!io_notify) {
 154        if (node) {
 155            aio_remove_fd_handler(ctx, node);
 156        }
 157    } else {
 158        if (node == NULL) {
 159            /* Alloc and insert if it's not already there */
 160            node = g_new0(AioHandler, 1);
 161            node->e = e;
 162            node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
 163            node->pfd.events = G_IO_IN;
 164            node->is_external = is_external;
 165            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 166
 167            g_source_add_poll(&ctx->source, &node->pfd);
 168        }
 169        /* Update handler with latest information */
 170        node->io_notify = io_notify;
 171    }
 172
 173    qemu_lockcnt_unlock(&ctx->list_lock);
 174    aio_notify(ctx);
 175}
 176
 177void aio_set_event_notifier_poll(AioContext *ctx,
 178                                 EventNotifier *notifier,
 179                                 EventNotifierHandler *io_poll_begin,
 180                                 EventNotifierHandler *io_poll_end)
 181{
 182    /* Not implemented */
 183}
 184
 185bool aio_prepare(AioContext *ctx)
 186{
 187    static struct timeval tv0;
 188    AioHandler *node;
 189    bool have_select_revents = false;
 190    fd_set rfds, wfds;
 191
 192    /*
 193     * We have to walk very carefully in case aio_set_fd_handler is
 194     * called while we're walking.
 195     */
 196    qemu_lockcnt_inc(&ctx->list_lock);
 197
 198    /* fill fd sets */
 199    FD_ZERO(&rfds);
 200    FD_ZERO(&wfds);
 201    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 202        if (node->io_read) {
 203            FD_SET ((SOCKET)node->pfd.fd, &rfds);
 204        }
 205        if (node->io_write) {
 206            FD_SET ((SOCKET)node->pfd.fd, &wfds);
 207        }
 208    }
 209
 210    if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
 211        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 212            node->pfd.revents = 0;
 213            if (FD_ISSET(node->pfd.fd, &rfds)) {
 214                node->pfd.revents |= G_IO_IN;
 215                have_select_revents = true;
 216            }
 217
 218            if (FD_ISSET(node->pfd.fd, &wfds)) {
 219                node->pfd.revents |= G_IO_OUT;
 220                have_select_revents = true;
 221            }
 222        }
 223    }
 224
 225    qemu_lockcnt_dec(&ctx->list_lock);
 226    return have_select_revents;
 227}
 228
 229bool aio_pending(AioContext *ctx)
 230{
 231    AioHandler *node;
 232    bool result = false;
 233
 234    /*
 235     * We have to walk very carefully in case aio_set_fd_handler is
 236     * called while we're walking.
 237     */
 238    qemu_lockcnt_inc(&ctx->list_lock);
 239    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 240        if (node->pfd.revents && node->io_notify) {
 241            result = true;
 242            break;
 243        }
 244
 245        if ((node->pfd.revents & G_IO_IN) && node->io_read) {
 246            result = true;
 247            break;
 248        }
 249        if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
 250            result = true;
 251            break;
 252        }
 253    }
 254
 255    qemu_lockcnt_dec(&ctx->list_lock);
 256    return result;
 257}
 258
 259static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 260{
 261    AioHandler *node;
 262    bool progress = false;
 263    AioHandler *tmp;
 264
 265    /*
 266     * We have to walk very carefully in case aio_set_fd_handler is
 267     * called while we're walking.
 268     */
 269    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
 270        int revents = node->pfd.revents;
 271
 272        if (!node->deleted &&
 273            (revents || event_notifier_get_handle(node->e) == event) &&
 274            node->io_notify) {
 275            node->pfd.revents = 0;
 276            node->io_notify(node->e);
 277
 278            /* aio_notify() does not count as progress */
 279            if (node->e != &ctx->notifier) {
 280                progress = true;
 281            }
 282        }
 283
 284        if (!node->deleted &&
 285            (node->io_read || node->io_write)) {
 286            node->pfd.revents = 0;
 287            if ((revents & G_IO_IN) && node->io_read) {
 288                node->io_read(node->opaque);
 289                progress = true;
 290            }
 291            if ((revents & G_IO_OUT) && node->io_write) {
 292                node->io_write(node->opaque);
 293                progress = true;
 294            }
 295
 296            /* if the next select() will return an event, we have progressed */
 297            if (event == event_notifier_get_handle(&ctx->notifier)) {
 298                WSANETWORKEVENTS ev;
 299                WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
 300                if (ev.lNetworkEvents) {
 301                    progress = true;
 302                }
 303            }
 304        }
 305
 306        if (node->deleted) {
 307            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 308                QLIST_REMOVE(node, node);
 309                g_free(node);
 310                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 311            }
 312        }
 313    }
 314
 315    return progress;
 316}
 317
 318void aio_dispatch(AioContext *ctx)
 319{
 320    qemu_lockcnt_inc(&ctx->list_lock);
 321    aio_bh_poll(ctx);
 322    aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
 323    qemu_lockcnt_dec(&ctx->list_lock);
 324    timerlistgroup_run_timers(&ctx->tlg);
 325}
 326
 327bool aio_poll(AioContext *ctx, bool blocking)
 328{
 329    AioHandler *node;
 330    HANDLE events[MAXIMUM_WAIT_OBJECTS];
 331    bool progress, have_select_revents, first;
 332    unsigned count;
 333    int timeout;
 334
 335    /*
 336     * There cannot be two concurrent aio_poll calls for the same AioContext (or
 337     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
 338     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
 339     *
 340     * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
 341     * is special in that it runs in the main thread, but that thread's context
 342     * is qemu_aio_context.
 343     */
 344    assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
 345                                      qemu_get_aio_context() : ctx));
 346    progress = false;
 347
 348    /* aio_notify can avoid the expensive event_notifier_set if
 349     * everything (file descriptors, bottom halves, timers) will
 350     * be re-evaluated before the next blocking poll().  This is
 351     * already true when aio_poll is called with blocking == false;
 352     * if blocking == true, it is only true after poll() returns,
 353     * so disable the optimization now.
 354     */
 355    if (blocking) {
 356        qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
 357        /*
 358         * Write ctx->notify_me before computing the timeout
 359         * (reading bottom half flags, etc.).  Pairs with
 360         * smp_mb in aio_notify().
 361         */
 362        smp_mb();
 363    }
 364
 365    qemu_lockcnt_inc(&ctx->list_lock);
 366    have_select_revents = aio_prepare(ctx);
 367
 368    /* fill fd sets */
 369    count = 0;
 370    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 371        if (!node->deleted && node->io_notify
 372            && aio_node_check(ctx, node->is_external)) {
 373            assert(count < MAXIMUM_WAIT_OBJECTS);
 374            events[count++] = event_notifier_get_handle(node->e);
 375        }
 376    }
 377
 378    first = true;
 379
 380    /* ctx->notifier is always registered.  */
 381    assert(count > 0);
 382
 383    /* Multiple iterations, all of them non-blocking except the first,
 384     * may be necessary to process all pending events.  After the first
 385     * WaitForMultipleObjects call ctx->notify_me will be decremented.
 386     */
 387    do {
 388        HANDLE event;
 389        int ret;
 390
 391        timeout = blocking && !have_select_revents
 392            ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
 393        ret = WaitForMultipleObjects(count, events, FALSE, timeout);
 394        if (blocking) {
 395            assert(first);
 396            qatomic_store_release(&ctx->notify_me,
 397                                  qatomic_read(&ctx->notify_me) - 2);
 398            aio_notify_accept(ctx);
 399        }
 400
 401        if (first) {
 402            progress |= aio_bh_poll(ctx);
 403            first = false;
 404        }
 405
 406        /* if we have any signaled events, dispatch event */
 407        event = NULL;
 408        if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
 409            event = events[ret - WAIT_OBJECT_0];
 410            events[ret - WAIT_OBJECT_0] = events[--count];
 411        } else if (!have_select_revents) {
 412            break;
 413        }
 414
 415        have_select_revents = false;
 416        blocking = false;
 417
 418        progress |= aio_dispatch_handlers(ctx, event);
 419    } while (count > 0);
 420
 421    qemu_lockcnt_dec(&ctx->list_lock);
 422
 423    progress |= timerlistgroup_run_timers(&ctx->tlg);
 424    return progress;
 425}
 426
 427void aio_context_setup(AioContext *ctx)
 428{
 429}
 430
 431void aio_context_destroy(AioContext *ctx)
 432{
 433}
 434
 435void aio_context_use_g_source(AioContext *ctx)
 436{
 437}
 438
 439void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 440                                 int64_t grow, int64_t shrink, Error **errp)
 441{
 442    if (max_ns) {
 443        error_setg(errp, "AioContext polling is not implemented on Windows");
 444    }
 445}
 446
 447void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
 448                                Error **errp)
 449{
 450}
 451