qemu/util/aio-win32.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM Corp., 2008
   5 * Copyright Red Hat Inc., 2012
   6 *
   7 * Authors:
   8 *  Anthony Liguori   <aliguori@us.ibm.com>
   9 *  Paolo Bonzini     <pbonzini@redhat.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2.  See
  12 * the COPYING file in the top-level directory.
  13 *
  14 * Contributions after 2012-01-13 are licensed under the terms of the
  15 * GNU GPL, version 2 or (at your option) any later version.
  16 */
  17
  18#include "qemu/osdep.h"
  19#include "block/block.h"
  20#include "qemu/main-loop.h"
  21#include "qemu/queue.h"
  22#include "qemu/sockets.h"
  23#include "qapi/error.h"
  24#include "qemu/rcu_queue.h"
  25
  26struct AioHandler {
  27    EventNotifier *e;
  28    IOHandler *io_read;
  29    IOHandler *io_write;
  30    EventNotifierHandler *io_notify;
  31    GPollFD pfd;
  32    int deleted;
  33    void *opaque;
  34    bool is_external;
  35    QLIST_ENTRY(AioHandler) node;
  36};
  37
  38static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
  39{
  40    /*
  41     * If the GSource is in the process of being destroyed then
  42     * g_source_remove_poll() causes an assertion failure.  Skip
  43     * removal in that case, because glib cleans up its state during
  44     * destruction anyway.
  45     */
  46    if (!g_source_is_destroyed(&ctx->source)) {
  47        g_source_remove_poll(&ctx->source, &node->pfd);
  48    }
  49
  50    /* If aio_poll is in progress, just mark the node as deleted */
  51    if (qemu_lockcnt_count(&ctx->list_lock)) {
  52        node->deleted = 1;
  53        node->pfd.revents = 0;
  54    } else {
  55        /* Otherwise, delete it for real.  We can't just mark it as
  56         * deleted because deleted nodes are only cleaned up after
  57         * releasing the list_lock.
  58         */
  59        QLIST_REMOVE(node, node);
  60        g_free(node);
  61    }
  62}
  63
  64void aio_set_fd_handler(AioContext *ctx,
  65                        int fd,
  66                        bool is_external,
  67                        IOHandler *io_read,
  68                        IOHandler *io_write,
  69                        AioPollFn *io_poll,
  70                        IOHandler *io_poll_ready,
  71                        void *opaque)
  72{
  73    /* fd is a SOCKET in our case */
  74    AioHandler *old_node;
  75    AioHandler *node = NULL;
  76
  77    qemu_lockcnt_lock(&ctx->list_lock);
  78    QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
  79        if (old_node->pfd.fd == fd && !old_node->deleted) {
  80            break;
  81        }
  82    }
  83
  84    if (io_read || io_write) {
  85        HANDLE event;
  86        long bitmask = 0;
  87
  88        /* Alloc and insert if it's not already there */
  89        node = g_new0(AioHandler, 1);
  90        node->pfd.fd = fd;
  91
  92        node->pfd.events = 0;
  93        if (node->io_read) {
  94            node->pfd.events |= G_IO_IN;
  95        }
  96        if (node->io_write) {
  97            node->pfd.events |= G_IO_OUT;
  98        }
  99
 100        node->e = &ctx->notifier;
 101
 102        /* Update handler with latest information */
 103        node->opaque = opaque;
 104        node->io_read = io_read;
 105        node->io_write = io_write;
 106        node->is_external = is_external;
 107
 108        if (io_read) {
 109            bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
 110        }
 111
 112        if (io_write) {
 113            bitmask |= FD_WRITE | FD_CONNECT;
 114        }
 115
 116        QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 117        event = event_notifier_get_handle(&ctx->notifier);
 118        WSAEventSelect(node->pfd.fd, event, bitmask);
 119    }
 120    if (old_node) {
 121        aio_remove_fd_handler(ctx, old_node);
 122    }
 123
 124    qemu_lockcnt_unlock(&ctx->list_lock);
 125    aio_notify(ctx);
 126}
 127
 128void aio_set_fd_poll(AioContext *ctx, int fd,
 129                     IOHandler *io_poll_begin,
 130                     IOHandler *io_poll_end)
 131{
 132    /* Not implemented */
 133}
 134
 135void aio_set_event_notifier(AioContext *ctx,
 136                            EventNotifier *e,
 137                            bool is_external,
 138                            EventNotifierHandler *io_notify,
 139                            AioPollFn *io_poll,
 140                            EventNotifierHandler *io_poll_ready)
 141{
 142    AioHandler *node;
 143
 144    qemu_lockcnt_lock(&ctx->list_lock);
 145    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
 146        if (node->e == e && !node->deleted) {
 147            break;
 148        }
 149    }
 150
 151    /* Are we deleting the fd handler? */
 152    if (!io_notify) {
 153        if (node) {
 154            aio_remove_fd_handler(ctx, node);
 155        }
 156    } else {
 157        if (node == NULL) {
 158            /* Alloc and insert if it's not already there */
 159            node = g_new0(AioHandler, 1);
 160            node->e = e;
 161            node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
 162            node->pfd.events = G_IO_IN;
 163            node->is_external = is_external;
 164            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 165
 166            g_source_add_poll(&ctx->source, &node->pfd);
 167        }
 168        /* Update handler with latest information */
 169        node->io_notify = io_notify;
 170    }
 171
 172    qemu_lockcnt_unlock(&ctx->list_lock);
 173    aio_notify(ctx);
 174}
 175
 176void aio_set_event_notifier_poll(AioContext *ctx,
 177                                 EventNotifier *notifier,
 178                                 EventNotifierHandler *io_poll_begin,
 179                                 EventNotifierHandler *io_poll_end)
 180{
 181    /* Not implemented */
 182}
 183
 184bool aio_prepare(AioContext *ctx)
 185{
 186    static struct timeval tv0;
 187    AioHandler *node;
 188    bool have_select_revents = false;
 189    fd_set rfds, wfds;
 190
 191    /*
 192     * We have to walk very carefully in case aio_set_fd_handler is
 193     * called while we're walking.
 194     */
 195    qemu_lockcnt_inc(&ctx->list_lock);
 196
 197    /* fill fd sets */
 198    FD_ZERO(&rfds);
 199    FD_ZERO(&wfds);
 200    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 201        if (node->io_read) {
 202            FD_SET ((SOCKET)node->pfd.fd, &rfds);
 203        }
 204        if (node->io_write) {
 205            FD_SET ((SOCKET)node->pfd.fd, &wfds);
 206        }
 207    }
 208
 209    if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
 210        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 211            node->pfd.revents = 0;
 212            if (FD_ISSET(node->pfd.fd, &rfds)) {
 213                node->pfd.revents |= G_IO_IN;
 214                have_select_revents = true;
 215            }
 216
 217            if (FD_ISSET(node->pfd.fd, &wfds)) {
 218                node->pfd.revents |= G_IO_OUT;
 219                have_select_revents = true;
 220            }
 221        }
 222    }
 223
 224    qemu_lockcnt_dec(&ctx->list_lock);
 225    return have_select_revents;
 226}
 227
 228bool aio_pending(AioContext *ctx)
 229{
 230    AioHandler *node;
 231    bool result = false;
 232
 233    /*
 234     * We have to walk very carefully in case aio_set_fd_handler is
 235     * called while we're walking.
 236     */
 237    qemu_lockcnt_inc(&ctx->list_lock);
 238    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 239        if (node->pfd.revents && node->io_notify) {
 240            result = true;
 241            break;
 242        }
 243
 244        if ((node->pfd.revents & G_IO_IN) && node->io_read) {
 245            result = true;
 246            break;
 247        }
 248        if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
 249            result = true;
 250            break;
 251        }
 252    }
 253
 254    qemu_lockcnt_dec(&ctx->list_lock);
 255    return result;
 256}
 257
 258static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 259{
 260    AioHandler *node;
 261    bool progress = false;
 262    AioHandler *tmp;
 263
 264    /*
 265     * We have to walk very carefully in case aio_set_fd_handler is
 266     * called while we're walking.
 267     */
 268    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
 269        int revents = node->pfd.revents;
 270
 271        if (!node->deleted &&
 272            (revents || event_notifier_get_handle(node->e) == event) &&
 273            node->io_notify) {
 274            node->pfd.revents = 0;
 275            node->io_notify(node->e);
 276
 277            /* aio_notify() does not count as progress */
 278            if (node->e != &ctx->notifier) {
 279                progress = true;
 280            }
 281        }
 282
 283        if (!node->deleted &&
 284            (node->io_read || node->io_write)) {
 285            node->pfd.revents = 0;
 286            if ((revents & G_IO_IN) && node->io_read) {
 287                node->io_read(node->opaque);
 288                progress = true;
 289            }
 290            if ((revents & G_IO_OUT) && node->io_write) {
 291                node->io_write(node->opaque);
 292                progress = true;
 293            }
 294
 295            /* if the next select() will return an event, we have progressed */
 296            if (event == event_notifier_get_handle(&ctx->notifier)) {
 297                WSANETWORKEVENTS ev;
 298                WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
 299                if (ev.lNetworkEvents) {
 300                    progress = true;
 301                }
 302            }
 303        }
 304
 305        if (node->deleted) {
 306            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 307                QLIST_REMOVE(node, node);
 308                g_free(node);
 309                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 310            }
 311        }
 312    }
 313
 314    return progress;
 315}
 316
 317void aio_dispatch(AioContext *ctx)
 318{
 319    qemu_lockcnt_inc(&ctx->list_lock);
 320    aio_bh_poll(ctx);
 321    aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
 322    qemu_lockcnt_dec(&ctx->list_lock);
 323    timerlistgroup_run_timers(&ctx->tlg);
 324}
 325
 326bool aio_poll(AioContext *ctx, bool blocking)
 327{
 328    AioHandler *node;
 329    HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
 330    bool progress, have_select_revents, first;
 331    int count;
 332    int timeout;
 333
 334    /*
 335     * There cannot be two concurrent aio_poll calls for the same AioContext (or
 336     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
 337     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
 338     *
 339     * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
 340     * is special in that it runs in the main thread, but that thread's context
 341     * is qemu_aio_context.
 342     */
 343    assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
 344                                      qemu_get_aio_context() : ctx));
 345    progress = false;
 346
 347    /* aio_notify can avoid the expensive event_notifier_set if
 348     * everything (file descriptors, bottom halves, timers) will
 349     * be re-evaluated before the next blocking poll().  This is
 350     * already true when aio_poll is called with blocking == false;
 351     * if blocking == true, it is only true after poll() returns,
 352     * so disable the optimization now.
 353     */
 354    if (blocking) {
 355        qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
 356        /*
 357         * Write ctx->notify_me before computing the timeout
 358         * (reading bottom half flags, etc.).  Pairs with
 359         * smp_mb in aio_notify().
 360         */
 361        smp_mb();
 362    }
 363
 364    qemu_lockcnt_inc(&ctx->list_lock);
 365    have_select_revents = aio_prepare(ctx);
 366
 367    /* fill fd sets */
 368    count = 0;
 369    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 370        if (!node->deleted && node->io_notify
 371            && aio_node_check(ctx, node->is_external)) {
 372            events[count++] = event_notifier_get_handle(node->e);
 373        }
 374    }
 375
 376    first = true;
 377
 378    /* ctx->notifier is always registered.  */
 379    assert(count > 0);
 380
 381    /* Multiple iterations, all of them non-blocking except the first,
 382     * may be necessary to process all pending events.  After the first
 383     * WaitForMultipleObjects call ctx->notify_me will be decremented.
 384     */
 385    do {
 386        HANDLE event;
 387        int ret;
 388
 389        timeout = blocking && !have_select_revents
 390            ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
 391        ret = WaitForMultipleObjects(count, events, FALSE, timeout);
 392        if (blocking) {
 393            assert(first);
 394            qatomic_store_release(&ctx->notify_me,
 395                                  qatomic_read(&ctx->notify_me) - 2);
 396            aio_notify_accept(ctx);
 397        }
 398
 399        if (first) {
 400            progress |= aio_bh_poll(ctx);
 401            first = false;
 402        }
 403
 404        /* if we have any signaled events, dispatch event */
 405        event = NULL;
 406        if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
 407            event = events[ret - WAIT_OBJECT_0];
 408            events[ret - WAIT_OBJECT_0] = events[--count];
 409        } else if (!have_select_revents) {
 410            break;
 411        }
 412
 413        have_select_revents = false;
 414        blocking = false;
 415
 416        progress |= aio_dispatch_handlers(ctx, event);
 417    } while (count > 0);
 418
 419    qemu_lockcnt_dec(&ctx->list_lock);
 420
 421    progress |= timerlistgroup_run_timers(&ctx->tlg);
 422    return progress;
 423}
 424
 425void aio_context_setup(AioContext *ctx)
 426{
 427}
 428
 429void aio_context_destroy(AioContext *ctx)
 430{
 431}
 432
 433void aio_context_use_g_source(AioContext *ctx)
 434{
 435}
 436
 437void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 438                                 int64_t grow, int64_t shrink, Error **errp)
 439{
 440    if (max_ns) {
 441        error_setg(errp, "AioContext polling is not implemented on Windows");
 442    }
 443}
 444
 445void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
 446                                Error **errp)
 447{
 448}
 449