qemu/util/aio-win32.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM Corp., 2008
   5 * Copyright Red Hat Inc., 2012
   6 *
   7 * Authors:
   8 *  Anthony Liguori   <aliguori@us.ibm.com>
   9 *  Paolo Bonzini     <pbonzini@redhat.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2.  See
  12 * the COPYING file in the top-level directory.
  13 *
  14 * Contributions after 2012-01-13 are licensed under the terms of the
  15 * GNU GPL, version 2 or (at your option) any later version.
  16 */
  17
  18#include "qemu/osdep.h"
  19#include "block/block.h"
  20#include "qemu/main-loop.h"
  21#include "qemu/queue.h"
  22#include "qemu/sockets.h"
  23#include "qapi/error.h"
  24#include "qemu/rcu_queue.h"
  25#include "qemu/error-report.h"
  26
  27struct AioHandler {
  28    EventNotifier *e;
  29    IOHandler *io_read;
  30    IOHandler *io_write;
  31    EventNotifierHandler *io_notify;
  32    GPollFD pfd;
  33    int deleted;
  34    void *opaque;
  35    QLIST_ENTRY(AioHandler) node;
  36};
  37
  38static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
  39{
  40    /*
  41     * If the GSource is in the process of being destroyed then
  42     * g_source_remove_poll() causes an assertion failure.  Skip
  43     * removal in that case, because glib cleans up its state during
  44     * destruction anyway.
  45     */
  46    if (!g_source_is_destroyed(&ctx->source)) {
  47        g_source_remove_poll(&ctx->source, &node->pfd);
  48    }
  49
  50    /* If aio_poll is in progress, just mark the node as deleted */
  51    if (qemu_lockcnt_count(&ctx->list_lock)) {
  52        node->deleted = 1;
  53        node->pfd.revents = 0;
  54    } else {
  55        /* Otherwise, delete it for real.  We can't just mark it as
  56         * deleted because deleted nodes are only cleaned up after
  57         * releasing the list_lock.
  58         */
  59        QLIST_REMOVE(node, node);
  60        g_free(node);
  61    }
  62}
  63
  64void aio_set_fd_handler(AioContext *ctx,
  65                        int fd,
  66                        IOHandler *io_read,
  67                        IOHandler *io_write,
  68                        AioPollFn *io_poll,
  69                        IOHandler *io_poll_ready,
  70                        void *opaque)
  71{
  72    AioHandler *old_node;
  73    AioHandler *node = NULL;
  74    SOCKET s;
  75
  76    if (!fd_is_socket(fd)) {
  77        error_report("fd=%d is not a socket, AIO implementation is missing", fd);
  78        return;
  79    }
  80
  81    s = _get_osfhandle(fd);
  82
  83    qemu_lockcnt_lock(&ctx->list_lock);
  84    QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
  85        if (old_node->pfd.fd == s && !old_node->deleted) {
  86            break;
  87        }
  88    }
  89
  90    if (io_read || io_write) {
  91        HANDLE event;
  92        long bitmask = 0;
  93
  94        /* Alloc and insert if it's not already there */
  95        node = g_new0(AioHandler, 1);
  96        node->pfd.fd = s;
  97
  98        node->pfd.events = 0;
  99        if (node->io_read) {
 100            node->pfd.events |= G_IO_IN;
 101        }
 102        if (node->io_write) {
 103            node->pfd.events |= G_IO_OUT;
 104        }
 105
 106        node->e = &ctx->notifier;
 107
 108        /* Update handler with latest information */
 109        node->opaque = opaque;
 110        node->io_read = io_read;
 111        node->io_write = io_write;
 112
 113        if (io_read) {
 114            bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
 115        }
 116
 117        if (io_write) {
 118            bitmask |= FD_WRITE | FD_CONNECT;
 119        }
 120
 121        QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 122        event = event_notifier_get_handle(&ctx->notifier);
 123        qemu_socket_select(fd, event, bitmask, NULL);
 124    }
 125    if (old_node) {
 126        aio_remove_fd_handler(ctx, old_node);
 127    }
 128
 129    qemu_lockcnt_unlock(&ctx->list_lock);
 130    aio_notify(ctx);
 131}
 132
 133void aio_set_event_notifier(AioContext *ctx,
 134                            EventNotifier *e,
 135                            EventNotifierHandler *io_notify,
 136                            AioPollFn *io_poll,
 137                            EventNotifierHandler *io_poll_ready)
 138{
 139    AioHandler *node;
 140
 141    qemu_lockcnt_lock(&ctx->list_lock);
 142    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 143        if (node->e == e && !node->deleted) {
 144            break;
 145        }
 146    }
 147
 148    /* Are we deleting the fd handler? */
 149    if (!io_notify) {
 150        if (node) {
 151            aio_remove_fd_handler(ctx, node);
 152        }
 153    } else {
 154        if (node == NULL) {
 155            /* Alloc and insert if it's not already there */
 156            node = g_new0(AioHandler, 1);
 157            node->e = e;
 158            node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
 159            node->pfd.events = G_IO_IN;
 160            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 161
 162            g_source_add_poll(&ctx->source, &node->pfd);
 163        }
 164        /* Update handler with latest information */
 165        node->io_notify = io_notify;
 166    }
 167
 168    qemu_lockcnt_unlock(&ctx->list_lock);
 169    aio_notify(ctx);
 170}
 171
 172void aio_set_event_notifier_poll(AioContext *ctx,
 173                                 EventNotifier *notifier,
 174                                 EventNotifierHandler *io_poll_begin,
 175                                 EventNotifierHandler *io_poll_end)
 176{
 177    /* Not implemented */
 178}
 179
 180bool aio_prepare(AioContext *ctx)
 181{
 182    static struct timeval tv0;
 183    AioHandler *node;
 184    bool have_select_revents = false;
 185    fd_set rfds, wfds;
 186
 187    /*
 188     * We have to walk very carefully in case aio_set_fd_handler is
 189     * called while we're walking.
 190     */
 191    qemu_lockcnt_inc(&ctx->list_lock);
 192
 193    /* fill fd sets */
 194    FD_ZERO(&rfds);
 195    FD_ZERO(&wfds);
 196    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 197        if (node->io_read) {
 198            FD_SET ((SOCKET)node->pfd.fd, &rfds);
 199        }
 200        if (node->io_write) {
 201            FD_SET ((SOCKET)node->pfd.fd, &wfds);
 202        }
 203    }
 204
 205    if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
 206        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 207            node->pfd.revents = 0;
 208            if (FD_ISSET(node->pfd.fd, &rfds)) {
 209                node->pfd.revents |= G_IO_IN;
 210                have_select_revents = true;
 211            }
 212
 213            if (FD_ISSET(node->pfd.fd, &wfds)) {
 214                node->pfd.revents |= G_IO_OUT;
 215                have_select_revents = true;
 216            }
 217        }
 218    }
 219
 220    qemu_lockcnt_dec(&ctx->list_lock);
 221    return have_select_revents;
 222}
 223
 224bool aio_pending(AioContext *ctx)
 225{
 226    AioHandler *node;
 227    bool result = false;
 228
 229    /*
 230     * We have to walk very carefully in case aio_set_fd_handler is
 231     * called while we're walking.
 232     */
 233    qemu_lockcnt_inc(&ctx->list_lock);
 234    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 235        if (node->pfd.revents && node->io_notify) {
 236            result = true;
 237            break;
 238        }
 239
 240        if ((node->pfd.revents & G_IO_IN) && node->io_read) {
 241            result = true;
 242            break;
 243        }
 244        if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
 245            result = true;
 246            break;
 247        }
 248    }
 249
 250    qemu_lockcnt_dec(&ctx->list_lock);
 251    return result;
 252}
 253
 254static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 255{
 256    AioHandler *node;
 257    bool progress = false;
 258    AioHandler *tmp;
 259
 260    /*
 261     * We have to walk very carefully in case aio_set_fd_handler is
 262     * called while we're walking.
 263     */
 264    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
 265        int revents = node->pfd.revents;
 266
 267        if (!node->deleted &&
 268            (revents || event_notifier_get_handle(node->e) == event) &&
 269            node->io_notify) {
 270            node->pfd.revents = 0;
 271            node->io_notify(node->e);
 272
 273            /* aio_notify() does not count as progress */
 274            if (node->e != &ctx->notifier) {
 275                progress = true;
 276            }
 277        }
 278
 279        if (!node->deleted &&
 280            (node->io_read || node->io_write)) {
 281            node->pfd.revents = 0;
 282            if ((revents & G_IO_IN) && node->io_read) {
 283                node->io_read(node->opaque);
 284                progress = true;
 285            }
 286            if ((revents & G_IO_OUT) && node->io_write) {
 287                node->io_write(node->opaque);
 288                progress = true;
 289            }
 290
 291            /* if the next select() will return an event, we have progressed */
 292            if (event == event_notifier_get_handle(&ctx->notifier)) {
 293                WSANETWORKEVENTS ev;
 294                WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
 295                if (ev.lNetworkEvents) {
 296                    progress = true;
 297                }
 298            }
 299        }
 300
 301        if (node->deleted) {
 302            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 303                QLIST_REMOVE(node, node);
 304                g_free(node);
 305                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 306            }
 307        }
 308    }
 309
 310    return progress;
 311}
 312
 313void aio_dispatch(AioContext *ctx)
 314{
 315    qemu_lockcnt_inc(&ctx->list_lock);
 316    aio_bh_poll(ctx);
 317    aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
 318    qemu_lockcnt_dec(&ctx->list_lock);
 319    timerlistgroup_run_timers(&ctx->tlg);
 320}
 321
 322bool aio_poll(AioContext *ctx, bool blocking)
 323{
 324    AioHandler *node;
 325    HANDLE events[MAXIMUM_WAIT_OBJECTS];
 326    bool progress, have_select_revents, first;
 327    unsigned count;
 328    int timeout;
 329
 330    /*
 331     * There cannot be two concurrent aio_poll calls for the same AioContext (or
 332     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
 333     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
 334     *
 335     * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
 336     * is special in that it runs in the main thread, but that thread's context
 337     * is qemu_aio_context.
 338     */
 339    assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
 340                                      qemu_get_aio_context() : ctx));
 341    progress = false;
 342
 343    /* aio_notify can avoid the expensive event_notifier_set if
 344     * everything (file descriptors, bottom halves, timers) will
 345     * be re-evaluated before the next blocking poll().  This is
 346     * already true when aio_poll is called with blocking == false;
 347     * if blocking == true, it is only true after poll() returns,
 348     * so disable the optimization now.
 349     */
 350    if (blocking) {
 351        qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
 352        /*
 353         * Write ctx->notify_me before computing the timeout
 354         * (reading bottom half flags, etc.).  Pairs with
 355         * smp_mb in aio_notify().
 356         */
 357        smp_mb();
 358    }
 359
 360    qemu_lockcnt_inc(&ctx->list_lock);
 361    have_select_revents = aio_prepare(ctx);
 362
 363    /* fill fd sets */
 364    count = 0;
 365    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 366        if (!node->deleted && node->io_notify) {
 367            assert(count < MAXIMUM_WAIT_OBJECTS);
 368            events[count++] = event_notifier_get_handle(node->e);
 369        }
 370    }
 371
 372    first = true;
 373
 374    /* ctx->notifier is always registered.  */
 375    assert(count > 0);
 376
 377    /* Multiple iterations, all of them non-blocking except the first,
 378     * may be necessary to process all pending events.  After the first
 379     * WaitForMultipleObjects call ctx->notify_me will be decremented.
 380     */
 381    do {
 382        HANDLE event;
 383        int ret;
 384
 385        timeout = blocking && !have_select_revents
 386            ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
 387        ret = WaitForMultipleObjects(count, events, FALSE, timeout);
 388        if (blocking) {
 389            assert(first);
 390            qatomic_store_release(&ctx->notify_me,
 391                                  qatomic_read(&ctx->notify_me) - 2);
 392            aio_notify_accept(ctx);
 393        }
 394
 395        if (first) {
 396            progress |= aio_bh_poll(ctx);
 397            first = false;
 398        }
 399
 400        /* if we have any signaled events, dispatch event */
 401        event = NULL;
 402        if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
 403            event = events[ret - WAIT_OBJECT_0];
 404            events[ret - WAIT_OBJECT_0] = events[--count];
 405        } else if (!have_select_revents) {
 406            break;
 407        }
 408
 409        have_select_revents = false;
 410        blocking = false;
 411
 412        progress |= aio_dispatch_handlers(ctx, event);
 413    } while (count > 0);
 414
 415    qemu_lockcnt_dec(&ctx->list_lock);
 416
 417    progress |= timerlistgroup_run_timers(&ctx->tlg);
 418    return progress;
 419}
 420
 421void aio_context_setup(AioContext *ctx)
 422{
 423}
 424
 425void aio_context_destroy(AioContext *ctx)
 426{
 427}
 428
 429void aio_context_use_g_source(AioContext *ctx)
 430{
 431}
 432
 433void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 434                                 int64_t grow, int64_t shrink, Error **errp)
 435{
 436    if (max_ns) {
 437        error_setg(errp, "AioContext polling is not implemented on Windows");
 438    }
 439}
 440
 441void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
 442                                Error **errp)
 443{
 444}
 445