qemu/util/aio-posix.c
<<
>>
Prefs
   1/*
   2 * QEMU aio implementation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include "qemu/osdep.h"
  17#include "block/block.h"
  18#include "qemu/rcu.h"
  19#include "qemu/rcu_queue.h"
  20#include "qemu/sockets.h"
  21#include "qemu/cutils.h"
  22#include "trace.h"
  23#include "aio-posix.h"
  24
  25/* Stop userspace polling on a handler if it isn't active for some time */
  26#define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
  27
  28bool aio_poll_disabled(AioContext *ctx)
  29{
  30    return atomic_read(&ctx->poll_disable_cnt);
  31}
  32
  33void aio_add_ready_handler(AioHandlerList *ready_list,
  34                           AioHandler *node,
  35                           int revents)
  36{
  37    QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
  38    node->pfd.revents = revents;
  39    QLIST_INSERT_HEAD(ready_list, node, node_ready);
  40}
  41
  42static AioHandler *find_aio_handler(AioContext *ctx, int fd)
  43{
  44    AioHandler *node;
  45
  46    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
  47        if (node->pfd.fd == fd) {
  48            if (!QLIST_IS_INSERTED(node, node_deleted)) {
  49                return node;
  50            }
  51        }
  52    }
  53
  54    return NULL;
  55}
  56
  57static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
  58{
  59    /* If the GSource is in the process of being destroyed then
  60     * g_source_remove_poll() causes an assertion failure.  Skip
  61     * removal in that case, because glib cleans up its state during
  62     * destruction anyway.
  63     */
  64    if (!g_source_is_destroyed(&ctx->source)) {
  65        g_source_remove_poll(&ctx->source, &node->pfd);
  66    }
  67
  68    node->pfd.revents = 0;
  69
  70    /* If the fd monitor has already marked it deleted, leave it alone */
  71    if (QLIST_IS_INSERTED(node, node_deleted)) {
  72        return false;
  73    }
  74
  75    /* If a read is in progress, just mark the node as deleted */
  76    if (qemu_lockcnt_count(&ctx->list_lock)) {
  77        QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
  78        return false;
  79    }
  80    /* Otherwise, delete it for real.  We can't just mark it as
  81     * deleted because deleted nodes are only cleaned up while
  82     * no one is walking the handlers list.
  83     */
  84    QLIST_SAFE_REMOVE(node, node_poll);
  85    QLIST_REMOVE(node, node);
  86    return true;
  87}
  88
  89void aio_set_fd_handler(AioContext *ctx,
  90                        int fd,
  91                        bool is_external,
  92                        IOHandler *io_read,
  93                        IOHandler *io_write,
  94                        AioPollFn *io_poll,
  95                        void *opaque)
  96{
  97    AioHandler *node;
  98    AioHandler *new_node = NULL;
  99    bool is_new = false;
 100    bool deleted = false;
 101    int poll_disable_change;
 102
 103    qemu_lockcnt_lock(&ctx->list_lock);
 104
 105    node = find_aio_handler(ctx, fd);
 106
 107    /* Are we deleting the fd handler? */
 108    if (!io_read && !io_write && !io_poll) {
 109        if (node == NULL) {
 110            qemu_lockcnt_unlock(&ctx->list_lock);
 111            return;
 112        }
 113        /* Clean events in order to unregister fd from the ctx epoll. */
 114        node->pfd.events = 0;
 115
 116        poll_disable_change = -!node->io_poll;
 117    } else {
 118        poll_disable_change = !io_poll - (node && !node->io_poll);
 119        if (node == NULL) {
 120            is_new = true;
 121        }
 122        /* Alloc and insert if it's not already there */
 123        new_node = g_new0(AioHandler, 1);
 124
 125        /* Update handler with latest information */
 126        new_node->io_read = io_read;
 127        new_node->io_write = io_write;
 128        new_node->io_poll = io_poll;
 129        new_node->opaque = opaque;
 130        new_node->is_external = is_external;
 131
 132        if (is_new) {
 133            new_node->pfd.fd = fd;
 134        } else {
 135            new_node->pfd = node->pfd;
 136        }
 137        g_source_add_poll(&ctx->source, &new_node->pfd);
 138
 139        new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
 140        new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
 141
 142        QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
 143    }
 144
 145    /* No need to order poll_disable_cnt writes against other updates;
 146     * the counter is only used to avoid wasting time and latency on
 147     * iterated polling when the system call will be ultimately necessary.
 148     * Changing handlers is a rare event, and a little wasted polling until
 149     * the aio_notify below is not an issue.
 150     */
 151    atomic_set(&ctx->poll_disable_cnt,
 152               atomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
 153
 154    ctx->fdmon_ops->update(ctx, node, new_node);
 155    if (node) {
 156        deleted = aio_remove_fd_handler(ctx, node);
 157    }
 158    qemu_lockcnt_unlock(&ctx->list_lock);
 159    aio_notify(ctx);
 160
 161    if (deleted) {
 162        g_free(node);
 163    }
 164}
 165
 166void aio_set_fd_poll(AioContext *ctx, int fd,
 167                     IOHandler *io_poll_begin,
 168                     IOHandler *io_poll_end)
 169{
 170    AioHandler *node = find_aio_handler(ctx, fd);
 171
 172    if (!node) {
 173        return;
 174    }
 175
 176    node->io_poll_begin = io_poll_begin;
 177    node->io_poll_end = io_poll_end;
 178}
 179
 180void aio_set_event_notifier(AioContext *ctx,
 181                            EventNotifier *notifier,
 182                            bool is_external,
 183                            EventNotifierHandler *io_read,
 184                            AioPollFn *io_poll)
 185{
 186    aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
 187                       (IOHandler *)io_read, NULL, io_poll, notifier);
 188}
 189
 190void aio_set_event_notifier_poll(AioContext *ctx,
 191                                 EventNotifier *notifier,
 192                                 EventNotifierHandler *io_poll_begin,
 193                                 EventNotifierHandler *io_poll_end)
 194{
 195    aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
 196                    (IOHandler *)io_poll_begin,
 197                    (IOHandler *)io_poll_end);
 198}
 199
 200static bool poll_set_started(AioContext *ctx, bool started)
 201{
 202    AioHandler *node;
 203    bool progress = false;
 204
 205    if (started == ctx->poll_started) {
 206        return false;
 207    }
 208
 209    ctx->poll_started = started;
 210
 211    qemu_lockcnt_inc(&ctx->list_lock);
 212    QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
 213        IOHandler *fn;
 214
 215        if (QLIST_IS_INSERTED(node, node_deleted)) {
 216            continue;
 217        }
 218
 219        if (started) {
 220            fn = node->io_poll_begin;
 221        } else {
 222            fn = node->io_poll_end;
 223        }
 224
 225        if (fn) {
 226            fn(node->opaque);
 227        }
 228
 229        /* Poll one last time in case ->io_poll_end() raced with the event */
 230        if (!started) {
 231            progress = node->io_poll(node->opaque) || progress;
 232        }
 233    }
 234    qemu_lockcnt_dec(&ctx->list_lock);
 235
 236    return progress;
 237}
 238
 239
 240bool aio_prepare(AioContext *ctx)
 241{
 242    /* Poll mode cannot be used with glib's event loop, disable it. */
 243    poll_set_started(ctx, false);
 244
 245    return false;
 246}
 247
 248bool aio_pending(AioContext *ctx)
 249{
 250    AioHandler *node;
 251    bool result = false;
 252
 253    /*
 254     * We have to walk very carefully in case aio_set_fd_handler is
 255     * called while we're walking.
 256     */
 257    qemu_lockcnt_inc(&ctx->list_lock);
 258
 259    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
 260        int revents;
 261
 262        revents = node->pfd.revents & node->pfd.events;
 263        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
 264            aio_node_check(ctx, node->is_external)) {
 265            result = true;
 266            break;
 267        }
 268        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
 269            aio_node_check(ctx, node->is_external)) {
 270            result = true;
 271            break;
 272        }
 273    }
 274    qemu_lockcnt_dec(&ctx->list_lock);
 275
 276    return result;
 277}
 278
 279static void aio_free_deleted_handlers(AioContext *ctx)
 280{
 281    AioHandler *node;
 282
 283    if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
 284        return;
 285    }
 286    if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
 287        return; /* we are nested, let the parent do the freeing */
 288    }
 289
 290    while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
 291        QLIST_REMOVE(node, node);
 292        QLIST_REMOVE(node, node_deleted);
 293        QLIST_SAFE_REMOVE(node, node_poll);
 294        g_free(node);
 295    }
 296
 297    qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
 298}
 299
 300static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
 301{
 302    bool progress = false;
 303    int revents;
 304
 305    revents = node->pfd.revents & node->pfd.events;
 306    node->pfd.revents = 0;
 307
 308    /*
 309     * Start polling AioHandlers when they become ready because activity is
 310     * likely to continue.  Note that starvation is theoretically possible when
 311     * fdmon_supports_polling(), but only until the fd fires for the first
 312     * time.
 313     */
 314    if (!QLIST_IS_INSERTED(node, node_deleted) &&
 315        !QLIST_IS_INSERTED(node, node_poll) &&
 316        node->io_poll) {
 317        trace_poll_add(ctx, node, node->pfd.fd, revents);
 318        if (ctx->poll_started && node->io_poll_begin) {
 319            node->io_poll_begin(node->opaque);
 320        }
 321        QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
 322    }
 323
 324    if (!QLIST_IS_INSERTED(node, node_deleted) &&
 325        (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
 326        aio_node_check(ctx, node->is_external) &&
 327        node->io_read) {
 328        node->io_read(node->opaque);
 329
 330        /* aio_notify() does not count as progress */
 331        if (node->opaque != &ctx->notifier) {
 332            progress = true;
 333        }
 334    }
 335    if (!QLIST_IS_INSERTED(node, node_deleted) &&
 336        (revents & (G_IO_OUT | G_IO_ERR)) &&
 337        aio_node_check(ctx, node->is_external) &&
 338        node->io_write) {
 339        node->io_write(node->opaque);
 340        progress = true;
 341    }
 342
 343    return progress;
 344}
 345
 346/*
 347 * If we have a list of ready handlers then this is more efficient than
 348 * scanning all handlers with aio_dispatch_handlers().
 349 */
 350static bool aio_dispatch_ready_handlers(AioContext *ctx,
 351                                        AioHandlerList *ready_list)
 352{
 353    bool progress = false;
 354    AioHandler *node;
 355
 356    while ((node = QLIST_FIRST(ready_list))) {
 357        QLIST_REMOVE(node, node_ready);
 358        progress = aio_dispatch_handler(ctx, node) || progress;
 359    }
 360
 361    return progress;
 362}
 363
 364/* Slower than aio_dispatch_ready_handlers() but only used via glib */
 365static bool aio_dispatch_handlers(AioContext *ctx)
 366{
 367    AioHandler *node, *tmp;
 368    bool progress = false;
 369
 370    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
 371        progress = aio_dispatch_handler(ctx, node) || progress;
 372    }
 373
 374    return progress;
 375}
 376
 377void aio_dispatch(AioContext *ctx)
 378{
 379    qemu_lockcnt_inc(&ctx->list_lock);
 380    aio_bh_poll(ctx);
 381    aio_dispatch_handlers(ctx);
 382    aio_free_deleted_handlers(ctx);
 383    qemu_lockcnt_dec(&ctx->list_lock);
 384
 385    timerlistgroup_run_timers(&ctx->tlg);
 386}
 387
 388static bool run_poll_handlers_once(AioContext *ctx,
 389                                   int64_t now,
 390                                   int64_t *timeout)
 391{
 392    bool progress = false;
 393    AioHandler *node;
 394    AioHandler *tmp;
 395
 396    QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
 397        if (aio_node_check(ctx, node->is_external) &&
 398            node->io_poll(node->opaque)) {
 399            node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
 400
 401            /*
 402             * Polling was successful, exit try_poll_mode immediately
 403             * to adjust the next polling time.
 404             */
 405            *timeout = 0;
 406            if (node->opaque != &ctx->notifier) {
 407                progress = true;
 408            }
 409        }
 410
 411        /* Caller handles freeing deleted nodes.  Don't do it here. */
 412    }
 413
 414    return progress;
 415}
 416
 417static bool fdmon_supports_polling(AioContext *ctx)
 418{
 419    return ctx->fdmon_ops->need_wait != aio_poll_disabled;
 420}
 421
 422static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now)
 423{
 424    AioHandler *node;
 425    AioHandler *tmp;
 426    bool progress = false;
 427
 428    /*
 429     * File descriptor monitoring implementations without userspace polling
 430     * support suffer from starvation when a subset of handlers is polled
 431     * because fds will not be processed in a timely fashion.  Don't remove
 432     * idle poll handlers.
 433     */
 434    if (!fdmon_supports_polling(ctx)) {
 435        return false;
 436    }
 437
 438    QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
 439        if (node->poll_idle_timeout == 0LL) {
 440            node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
 441        } else if (now >= node->poll_idle_timeout) {
 442            trace_poll_remove(ctx, node, node->pfd.fd);
 443            node->poll_idle_timeout = 0LL;
 444            QLIST_SAFE_REMOVE(node, node_poll);
 445            if (ctx->poll_started && node->io_poll_end) {
 446                node->io_poll_end(node->opaque);
 447
 448                /*
 449                 * Final poll in case ->io_poll_end() races with an event.
 450                 * Nevermind about re-adding the handler in the rare case where
 451                 * this causes progress.
 452                 */
 453                progress = node->io_poll(node->opaque) || progress;
 454            }
 455        }
 456    }
 457
 458    return progress;
 459}
 460
 461/* run_poll_handlers:
 462 * @ctx: the AioContext
 463 * @max_ns: maximum time to poll for, in nanoseconds
 464 *
 465 * Polls for a given time.
 466 *
 467 * Note that ctx->notify_me must be non-zero so this function can detect
 468 * aio_notify().
 469 *
 470 * Note that the caller must have incremented ctx->list_lock.
 471 *
 472 * Returns: true if progress was made, false otherwise
 473 */
 474static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
 475{
 476    bool progress;
 477    int64_t start_time, elapsed_time;
 478
 479    assert(ctx->notify_me);
 480    assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
 481
 482    trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
 483
 484    /*
 485     * Optimization: ->io_poll() handlers often contain RCU read critical
 486     * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
 487     * -> rcu_read_lock() -> ... sequences with expensive memory
 488     * synchronization primitives.  Make the entire polling loop an RCU
 489     * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
 490     * are cheap.
 491     */
 492    RCU_READ_LOCK_GUARD();
 493
 494    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 495    do {
 496        progress = run_poll_handlers_once(ctx, start_time, timeout);
 497        elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
 498        max_ns = qemu_soonest_timeout(*timeout, max_ns);
 499        assert(!(max_ns && progress));
 500    } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
 501
 502    if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) {
 503        *timeout = 0;
 504        progress = true;
 505    }
 506
 507    /* If time has passed with no successful polling, adjust *timeout to
 508     * keep the same ending time.
 509     */
 510    if (*timeout != -1) {
 511        *timeout -= MIN(*timeout, elapsed_time);
 512    }
 513
 514    trace_run_poll_handlers_end(ctx, progress, *timeout);
 515    return progress;
 516}
 517
 518/* try_poll_mode:
 519 * @ctx: the AioContext
 520 * @timeout: timeout for blocking wait, computed by the caller and updated if
 521 *    polling succeeds.
 522 *
 523 * ctx->notify_me must be non-zero so this function can detect aio_notify().
 524 *
 525 * Note that the caller must have incremented ctx->list_lock.
 526 *
 527 * Returns: true if progress was made, false otherwise
 528 */
 529static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
 530{
 531    int64_t max_ns;
 532
 533    if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
 534        return false;
 535    }
 536
 537    max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
 538    if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
 539        poll_set_started(ctx, true);
 540
 541        if (run_poll_handlers(ctx, max_ns, timeout)) {
 542            return true;
 543        }
 544    }
 545
 546    if (poll_set_started(ctx, false)) {
 547        *timeout = 0;
 548        return true;
 549    }
 550
 551    return false;
 552}
 553
 554bool aio_poll(AioContext *ctx, bool blocking)
 555{
 556    AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
 557    int ret = 0;
 558    bool progress;
 559    int64_t timeout;
 560    int64_t start = 0;
 561
 562    /*
 563     * There cannot be two concurrent aio_poll calls for the same AioContext (or
 564     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
 565     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
 566     */
 567    assert(in_aio_context_home_thread(ctx));
 568
 569    /* aio_notify can avoid the expensive event_notifier_set if
 570     * everything (file descriptors, bottom halves, timers) will
 571     * be re-evaluated before the next blocking poll().  This is
 572     * already true when aio_poll is called with blocking == false;
 573     * if blocking == true, it is only true after poll() returns,
 574     * so disable the optimization now.
 575     */
 576    if (blocking) {
 577        atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2);
 578        /*
 579         * Write ctx->notify_me before computing the timeout
 580         * (reading bottom half flags, etc.).  Pairs with
 581         * smp_mb in aio_notify().
 582         */
 583        smp_mb();
 584    }
 585
 586    qemu_lockcnt_inc(&ctx->list_lock);
 587
 588    if (ctx->poll_max_ns) {
 589        start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 590    }
 591
 592    timeout = blocking ? aio_compute_timeout(ctx) : 0;
 593    progress = try_poll_mode(ctx, &timeout);
 594    assert(!(timeout && progress));
 595
 596    /* If polling is allowed, non-blocking aio_poll does not need the
 597     * system call---a single round of run_poll_handlers_once suffices.
 598     */
 599    if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
 600        ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
 601    }
 602
 603    if (blocking) {
 604        /* Finish the poll before clearing the flag.  */
 605        atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2);
 606        aio_notify_accept(ctx);
 607    }
 608
 609    /* Adjust polling time */
 610    if (ctx->poll_max_ns) {
 611        int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
 612
 613        if (block_ns <= ctx->poll_ns) {
 614            /* This is the sweet spot, no adjustment needed */
 615        } else if (block_ns > ctx->poll_max_ns) {
 616            /* We'd have to poll for too long, poll less */
 617            int64_t old = ctx->poll_ns;
 618
 619            if (ctx->poll_shrink) {
 620                ctx->poll_ns /= ctx->poll_shrink;
 621            } else {
 622                ctx->poll_ns = 0;
 623            }
 624
 625            trace_poll_shrink(ctx, old, ctx->poll_ns);
 626        } else if (ctx->poll_ns < ctx->poll_max_ns &&
 627                   block_ns < ctx->poll_max_ns) {
 628            /* There is room to grow, poll longer */
 629            int64_t old = ctx->poll_ns;
 630            int64_t grow = ctx->poll_grow;
 631
 632            if (grow == 0) {
 633                grow = 2;
 634            }
 635
 636            if (ctx->poll_ns) {
 637                ctx->poll_ns *= grow;
 638            } else {
 639                ctx->poll_ns = 4000; /* start polling at 4 microseconds */
 640            }
 641
 642            if (ctx->poll_ns > ctx->poll_max_ns) {
 643                ctx->poll_ns = ctx->poll_max_ns;
 644            }
 645
 646            trace_poll_grow(ctx, old, ctx->poll_ns);
 647        }
 648    }
 649
 650    progress |= aio_bh_poll(ctx);
 651
 652    if (ret > 0) {
 653        progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
 654    }
 655
 656    aio_free_deleted_handlers(ctx);
 657
 658    qemu_lockcnt_dec(&ctx->list_lock);
 659
 660    progress |= timerlistgroup_run_timers(&ctx->tlg);
 661
 662    return progress;
 663}
 664
 665void aio_context_setup(AioContext *ctx)
 666{
 667    ctx->fdmon_ops = &fdmon_poll_ops;
 668    ctx->epollfd = -1;
 669
 670    /* Use the fastest fd monitoring implementation if available */
 671    if (fdmon_io_uring_setup(ctx)) {
 672        return;
 673    }
 674
 675    fdmon_epoll_setup(ctx);
 676}
 677
 678void aio_context_destroy(AioContext *ctx)
 679{
 680    fdmon_io_uring_destroy(ctx);
 681    fdmon_epoll_disable(ctx);
 682}
 683
 684void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 685                                 int64_t grow, int64_t shrink, Error **errp)
 686{
 687    /* No thread synchronization here, it doesn't matter if an incorrect value
 688     * is used once.
 689     */
 690    ctx->poll_max_ns = max_ns;
 691    ctx->poll_ns = 0;
 692    ctx->poll_grow = grow;
 693    ctx->poll_shrink = shrink;
 694
 695    aio_notify(ctx);
 696}
 697