qemu/util/async.c
<<
>>
Prefs
   1/*
   2 * Data plane event loop
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2009-2017 QEMU contributors
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qapi/error.h"
  28#include "block/aio.h"
  29#include "block/thread-pool.h"
  30#include "qemu/main-loop.h"
  31#include "qemu/atomic.h"
  32#include "qemu/rcu_queue.h"
  33#include "block/raw-aio.h"
  34#include "qemu/coroutine_int.h"
  35#include "trace.h"
  36
  37/***********************************************************/
  38/* bottom halves (can be seen as timers which expire ASAP) */
  39
  40/* QEMUBH::flags values */
  41enum {
  42    /* Already enqueued and waiting for aio_bh_poll() */
  43    BH_PENDING   = (1 << 0),
  44
  45    /* Invoke the callback */
  46    BH_SCHEDULED = (1 << 1),
  47
  48    /* Delete without invoking callback */
  49    BH_DELETED   = (1 << 2),
  50
  51    /* Delete after invoking callback */
  52    BH_ONESHOT   = (1 << 3),
  53
  54    /* Schedule periodically when the event loop is idle */
  55    BH_IDLE      = (1 << 4),
  56};
  57
  58struct QEMUBH {
  59    AioContext *ctx;
  60    const char *name;
  61    QEMUBHFunc *cb;
  62    void *opaque;
  63    QSLIST_ENTRY(QEMUBH) next;
  64    unsigned flags;
  65};
  66
  67/* Called concurrently from any thread */
  68static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags)
  69{
  70    AioContext *ctx = bh->ctx;
  71    unsigned old_flags;
  72
  73    /*
  74     * The memory barrier implicit in qatomic_fetch_or makes sure that:
  75     * 1. idle & any writes needed by the callback are done before the
  76     *    locations are read in the aio_bh_poll.
  77     * 2. ctx is loaded before the callback has a chance to execute and bh
  78     *    could be freed.
  79     */
  80    old_flags = qatomic_fetch_or(&bh->flags, BH_PENDING | new_flags);
  81    if (!(old_flags & BH_PENDING)) {
  82        QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next);
  83    }
  84
  85    aio_notify(ctx);
  86}
  87
  88/* Only called from aio_bh_poll() and aio_ctx_finalize() */
  89static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags)
  90{
  91    QEMUBH *bh = QSLIST_FIRST_RCU(head);
  92
  93    if (!bh) {
  94        return NULL;
  95    }
  96
  97    QSLIST_REMOVE_HEAD(head, next);
  98
  99    /*
 100     * The qatomic_and is paired with aio_bh_enqueue().  The implicit memory
 101     * barrier ensures that the callback sees all writes done by the scheduling
 102     * thread.  It also ensures that the scheduling thread sees the cleared
 103     * flag before bh->cb has run, and thus will call aio_notify again if
 104     * necessary.
 105     */
 106    *flags = qatomic_fetch_and(&bh->flags,
 107                              ~(BH_PENDING | BH_SCHEDULED | BH_IDLE));
 108    return bh;
 109}
 110
 111void aio_bh_schedule_oneshot_full(AioContext *ctx, QEMUBHFunc *cb,
 112                                  void *opaque, const char *name)
 113{
 114    QEMUBH *bh;
 115    bh = g_new(QEMUBH, 1);
 116    *bh = (QEMUBH){
 117        .ctx = ctx,
 118        .cb = cb,
 119        .opaque = opaque,
 120        .name = name,
 121    };
 122    aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT);
 123}
 124
 125QEMUBH *aio_bh_new_full(AioContext *ctx, QEMUBHFunc *cb, void *opaque,
 126                        const char *name)
 127{
 128    QEMUBH *bh;
 129    bh = g_new(QEMUBH, 1);
 130    *bh = (QEMUBH){
 131        .ctx = ctx,
 132        .cb = cb,
 133        .opaque = opaque,
 134        .name = name,
 135    };
 136    return bh;
 137}
 138
 139void aio_bh_call(QEMUBH *bh)
 140{
 141    bh->cb(bh->opaque);
 142}
 143
 144/* Multiple occurrences of aio_bh_poll cannot be called concurrently. */
 145int aio_bh_poll(AioContext *ctx)
 146{
 147    BHListSlice slice;
 148    BHListSlice *s;
 149    int ret = 0;
 150
 151    QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list);
 152    QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next);
 153
 154    while ((s = QSIMPLEQ_FIRST(&ctx->bh_slice_list))) {
 155        QEMUBH *bh;
 156        unsigned flags;
 157
 158        bh = aio_bh_dequeue(&s->bh_list, &flags);
 159        if (!bh) {
 160            QSIMPLEQ_REMOVE_HEAD(&ctx->bh_slice_list, next);
 161            continue;
 162        }
 163
 164        if ((flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 165            /* Idle BHs don't count as progress */
 166            if (!(flags & BH_IDLE)) {
 167                ret = 1;
 168            }
 169            aio_bh_call(bh);
 170        }
 171        if (flags & (BH_DELETED | BH_ONESHOT)) {
 172            g_free(bh);
 173        }
 174    }
 175
 176    return ret;
 177}
 178
 179void qemu_bh_schedule_idle(QEMUBH *bh)
 180{
 181    aio_bh_enqueue(bh, BH_SCHEDULED | BH_IDLE);
 182}
 183
 184void qemu_bh_schedule(QEMUBH *bh)
 185{
 186    aio_bh_enqueue(bh, BH_SCHEDULED);
 187}
 188
 189/* This func is async.
 190 */
 191void qemu_bh_cancel(QEMUBH *bh)
 192{
 193    qatomic_and(&bh->flags, ~BH_SCHEDULED);
 194}
 195
 196/* This func is async.The bottom half will do the delete action at the finial
 197 * end.
 198 */
 199void qemu_bh_delete(QEMUBH *bh)
 200{
 201    aio_bh_enqueue(bh, BH_DELETED);
 202}
 203
 204static int64_t aio_compute_bh_timeout(BHList *head, int timeout)
 205{
 206    QEMUBH *bh;
 207
 208    QSLIST_FOREACH_RCU(bh, head, next) {
 209        if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 210            if (bh->flags & BH_IDLE) {
 211                /* idle bottom halves will be polled at least
 212                 * every 10ms */
 213                timeout = 10000000;
 214            } else {
 215                /* non-idle bottom halves will be executed
 216                 * immediately */
 217                return 0;
 218            }
 219        }
 220    }
 221
 222    return timeout;
 223}
 224
 225int64_t
 226aio_compute_timeout(AioContext *ctx)
 227{
 228    BHListSlice *s;
 229    int64_t deadline;
 230    int timeout = -1;
 231
 232    timeout = aio_compute_bh_timeout(&ctx->bh_list, timeout);
 233    if (timeout == 0) {
 234        return 0;
 235    }
 236
 237    QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
 238        timeout = aio_compute_bh_timeout(&s->bh_list, timeout);
 239        if (timeout == 0) {
 240            return 0;
 241        }
 242    }
 243
 244    deadline = timerlistgroup_deadline_ns(&ctx->tlg);
 245    if (deadline == 0) {
 246        return 0;
 247    } else {
 248        return qemu_soonest_timeout(timeout, deadline);
 249    }
 250}
 251
 252static gboolean
 253aio_ctx_prepare(GSource *source, gint    *timeout)
 254{
 255    AioContext *ctx = (AioContext *) source;
 256
 257    qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) | 1);
 258
 259    /*
 260     * Write ctx->notify_me before computing the timeout
 261     * (reading bottom half flags, etc.).  Pairs with
 262     * smp_mb in aio_notify().
 263     */
 264    smp_mb();
 265
 266    /* We assume there is no timeout already supplied */
 267    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
 268
 269    if (aio_prepare(ctx)) {
 270        *timeout = 0;
 271    }
 272
 273    return *timeout == 0;
 274}
 275
 276static gboolean
 277aio_ctx_check(GSource *source)
 278{
 279    AioContext *ctx = (AioContext *) source;
 280    QEMUBH *bh;
 281    BHListSlice *s;
 282
 283    /* Finish computing the timeout before clearing the flag.  */
 284    qatomic_store_release(&ctx->notify_me, qatomic_read(&ctx->notify_me) & ~1);
 285    aio_notify_accept(ctx);
 286
 287    QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
 288        if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 289            return true;
 290        }
 291    }
 292
 293    QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) {
 294        QSLIST_FOREACH_RCU(bh, &s->bh_list, next) {
 295            if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) {
 296                return true;
 297            }
 298        }
 299    }
 300    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
 301}
 302
 303static gboolean
 304aio_ctx_dispatch(GSource     *source,
 305                 GSourceFunc  callback,
 306                 gpointer     user_data)
 307{
 308    AioContext *ctx = (AioContext *) source;
 309
 310    assert(callback == NULL);
 311    aio_dispatch(ctx);
 312    return true;
 313}
 314
 315static void
 316aio_ctx_finalize(GSource     *source)
 317{
 318    AioContext *ctx = (AioContext *) source;
 319    QEMUBH *bh;
 320    unsigned flags;
 321
 322    thread_pool_free(ctx->thread_pool);
 323
 324#ifdef CONFIG_LINUX_AIO
 325    if (ctx->linux_aio) {
 326        laio_detach_aio_context(ctx->linux_aio, ctx);
 327        laio_cleanup(ctx->linux_aio);
 328        ctx->linux_aio = NULL;
 329    }
 330#endif
 331
 332#ifdef CONFIG_LINUX_IO_URING
 333    if (ctx->linux_io_uring) {
 334        luring_detach_aio_context(ctx->linux_io_uring, ctx);
 335        luring_cleanup(ctx->linux_io_uring);
 336        ctx->linux_io_uring = NULL;
 337    }
 338#endif
 339
 340    assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
 341    qemu_bh_delete(ctx->co_schedule_bh);
 342
 343    /* There must be no aio_bh_poll() calls going on */
 344    assert(QSIMPLEQ_EMPTY(&ctx->bh_slice_list));
 345
 346    while ((bh = aio_bh_dequeue(&ctx->bh_list, &flags))) {
 347        /*
 348         * qemu_bh_delete() must have been called on BHs in this AioContext. In
 349         * many cases memory leaks, hangs, or inconsistent state occur when a
 350         * BH is leaked because something still expects it to run.
 351         *
 352         * If you hit this, fix the lifecycle of the BH so that
 353         * qemu_bh_delete() and any associated cleanup is called before the
 354         * AioContext is finalized.
 355         */
 356        if (unlikely(!(flags & BH_DELETED))) {
 357            fprintf(stderr, "%s: BH '%s' leaked, aborting...\n",
 358                    __func__, bh->name);
 359            abort();
 360        }
 361
 362        g_free(bh);
 363    }
 364
 365    aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
 366    event_notifier_cleanup(&ctx->notifier);
 367    qemu_rec_mutex_destroy(&ctx->lock);
 368    qemu_lockcnt_destroy(&ctx->list_lock);
 369    timerlistgroup_deinit(&ctx->tlg);
 370    aio_context_destroy(ctx);
 371}
 372
 373static GSourceFuncs aio_source_funcs = {
 374    aio_ctx_prepare,
 375    aio_ctx_check,
 376    aio_ctx_dispatch,
 377    aio_ctx_finalize
 378};
 379
 380GSource *aio_get_g_source(AioContext *ctx)
 381{
 382    aio_context_use_g_source(ctx);
 383    g_source_ref(&ctx->source);
 384    return &ctx->source;
 385}
 386
 387ThreadPool *aio_get_thread_pool(AioContext *ctx)
 388{
 389    if (!ctx->thread_pool) {
 390        ctx->thread_pool = thread_pool_new(ctx);
 391    }
 392    return ctx->thread_pool;
 393}
 394
 395#ifdef CONFIG_LINUX_AIO
 396LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp)
 397{
 398    if (!ctx->linux_aio) {
 399        ctx->linux_aio = laio_init(errp);
 400        if (ctx->linux_aio) {
 401            laio_attach_aio_context(ctx->linux_aio, ctx);
 402        }
 403    }
 404    return ctx->linux_aio;
 405}
 406
 407LinuxAioState *aio_get_linux_aio(AioContext *ctx)
 408{
 409    assert(ctx->linux_aio);
 410    return ctx->linux_aio;
 411}
 412#endif
 413
 414#ifdef CONFIG_LINUX_IO_URING
 415LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp)
 416{
 417    if (ctx->linux_io_uring) {
 418        return ctx->linux_io_uring;
 419    }
 420
 421    ctx->linux_io_uring = luring_init(errp);
 422    if (!ctx->linux_io_uring) {
 423        return NULL;
 424    }
 425
 426    luring_attach_aio_context(ctx->linux_io_uring, ctx);
 427    return ctx->linux_io_uring;
 428}
 429
 430LuringState *aio_get_linux_io_uring(AioContext *ctx)
 431{
 432    assert(ctx->linux_io_uring);
 433    return ctx->linux_io_uring;
 434}
 435#endif
 436
 437void aio_notify(AioContext *ctx)
 438{
 439    /*
 440     * Write e.g. bh->flags before writing ctx->notified.  Pairs with smp_mb in
 441     * aio_notify_accept.
 442     */
 443    smp_wmb();
 444    qatomic_set(&ctx->notified, true);
 445
 446    /*
 447     * Write ctx->notified before reading ctx->notify_me.  Pairs
 448     * with smp_mb in aio_ctx_prepare or aio_poll.
 449     */
 450    smp_mb();
 451    if (qatomic_read(&ctx->notify_me)) {
 452        event_notifier_set(&ctx->notifier);
 453    }
 454}
 455
 456void aio_notify_accept(AioContext *ctx)
 457{
 458    qatomic_set(&ctx->notified, false);
 459
 460    /*
 461     * Write ctx->notified before reading e.g. bh->flags.  Pairs with smp_wmb
 462     * in aio_notify.
 463     */
 464    smp_mb();
 465}
 466
 467static void aio_timerlist_notify(void *opaque, QEMUClockType type)
 468{
 469    aio_notify(opaque);
 470}
 471
 472static void aio_context_notifier_cb(EventNotifier *e)
 473{
 474    AioContext *ctx = container_of(e, AioContext, notifier);
 475
 476    event_notifier_test_and_clear(&ctx->notifier);
 477}
 478
 479/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
 480static bool aio_context_notifier_poll(void *opaque)
 481{
 482    EventNotifier *e = opaque;
 483    AioContext *ctx = container_of(e, AioContext, notifier);
 484
 485    return qatomic_read(&ctx->notified);
 486}
 487
 488static void co_schedule_bh_cb(void *opaque)
 489{
 490    AioContext *ctx = opaque;
 491    QSLIST_HEAD(, Coroutine) straight, reversed;
 492
 493    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
 494    QSLIST_INIT(&straight);
 495
 496    while (!QSLIST_EMPTY(&reversed)) {
 497        Coroutine *co = QSLIST_FIRST(&reversed);
 498        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
 499        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
 500    }
 501
 502    while (!QSLIST_EMPTY(&straight)) {
 503        Coroutine *co = QSLIST_FIRST(&straight);
 504        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
 505        trace_aio_co_schedule_bh_cb(ctx, co);
 506        aio_context_acquire(ctx);
 507
 508        /* Protected by write barrier in qemu_aio_coroutine_enter */
 509        qatomic_set(&co->scheduled, NULL);
 510        qemu_aio_coroutine_enter(ctx, co);
 511        aio_context_release(ctx);
 512    }
 513}
 514
 515AioContext *aio_context_new(Error **errp)
 516{
 517    int ret;
 518    AioContext *ctx;
 519
 520    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
 521    QSLIST_INIT(&ctx->bh_list);
 522    QSIMPLEQ_INIT(&ctx->bh_slice_list);
 523    aio_context_setup(ctx);
 524
 525    ret = event_notifier_init(&ctx->notifier, false);
 526    if (ret < 0) {
 527        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
 528        goto fail;
 529    }
 530    g_source_set_can_recurse(&ctx->source, true);
 531    qemu_lockcnt_init(&ctx->list_lock);
 532
 533    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
 534    QSLIST_INIT(&ctx->scheduled_coroutines);
 535
 536    aio_set_event_notifier(ctx, &ctx->notifier,
 537                           false,
 538                           aio_context_notifier_cb,
 539                           aio_context_notifier_poll);
 540#ifdef CONFIG_LINUX_AIO
 541    ctx->linux_aio = NULL;
 542#endif
 543
 544#ifdef CONFIG_LINUX_IO_URING
 545    ctx->linux_io_uring = NULL;
 546#endif
 547
 548    ctx->thread_pool = NULL;
 549    qemu_rec_mutex_init(&ctx->lock);
 550    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 551
 552    ctx->poll_ns = 0;
 553    ctx->poll_max_ns = 0;
 554    ctx->poll_grow = 0;
 555    ctx->poll_shrink = 0;
 556
 557    ctx->aio_max_batch = 0;
 558
 559    return ctx;
 560fail:
 561    g_source_destroy(&ctx->source);
 562    return NULL;
 563}
 564
 565void aio_co_schedule(AioContext *ctx, Coroutine *co)
 566{
 567    trace_aio_co_schedule(ctx, co);
 568    const char *scheduled = qatomic_cmpxchg(&co->scheduled, NULL,
 569                                           __func__);
 570
 571    if (scheduled) {
 572        fprintf(stderr,
 573                "%s: Co-routine was already scheduled in '%s'\n",
 574                __func__, scheduled);
 575        abort();
 576    }
 577
 578    /* The coroutine might run and release the last ctx reference before we
 579     * invoke qemu_bh_schedule().  Take a reference to keep ctx alive until
 580     * we're done.
 581     */
 582    aio_context_ref(ctx);
 583
 584    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
 585                              co, co_scheduled_next);
 586    qemu_bh_schedule(ctx->co_schedule_bh);
 587
 588    aio_context_unref(ctx);
 589}
 590
 591typedef struct AioCoRescheduleSelf {
 592    Coroutine *co;
 593    AioContext *new_ctx;
 594} AioCoRescheduleSelf;
 595
 596static void aio_co_reschedule_self_bh(void *opaque)
 597{
 598    AioCoRescheduleSelf *data = opaque;
 599    aio_co_schedule(data->new_ctx, data->co);
 600}
 601
 602void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx)
 603{
 604    AioContext *old_ctx = qemu_get_current_aio_context();
 605
 606    if (old_ctx != new_ctx) {
 607        AioCoRescheduleSelf data = {
 608            .co = qemu_coroutine_self(),
 609            .new_ctx = new_ctx,
 610        };
 611        /*
 612         * We can't directly schedule the coroutine in the target context
 613         * because this would be racy: The other thread could try to enter the
 614         * coroutine before it has yielded in this one.
 615         */
 616        aio_bh_schedule_oneshot(old_ctx, aio_co_reschedule_self_bh, &data);
 617        qemu_coroutine_yield();
 618    }
 619}
 620
 621void aio_co_wake(struct Coroutine *co)
 622{
 623    AioContext *ctx;
 624
 625    /* Read coroutine before co->ctx.  Matches smp_wmb in
 626     * qemu_coroutine_enter.
 627     */
 628    smp_read_barrier_depends();
 629    ctx = qatomic_read(&co->ctx);
 630
 631    aio_co_enter(ctx, co);
 632}
 633
 634void aio_co_enter(AioContext *ctx, struct Coroutine *co)
 635{
 636    if (ctx != qemu_get_current_aio_context()) {
 637        aio_co_schedule(ctx, co);
 638        return;
 639    }
 640
 641    if (qemu_in_coroutine()) {
 642        Coroutine *self = qemu_coroutine_self();
 643        assert(self != co);
 644        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
 645    } else {
 646        aio_context_acquire(ctx);
 647        qemu_aio_coroutine_enter(ctx, co);
 648        aio_context_release(ctx);
 649    }
 650}
 651
 652void aio_context_ref(AioContext *ctx)
 653{
 654    g_source_ref(&ctx->source);
 655}
 656
 657void aio_context_unref(AioContext *ctx)
 658{
 659    g_source_unref(&ctx->source);
 660}
 661
 662void aio_context_acquire(AioContext *ctx)
 663{
 664    qemu_rec_mutex_lock(&ctx->lock);
 665}
 666
 667void aio_context_release(AioContext *ctx)
 668{
 669    qemu_rec_mutex_unlock(&ctx->lock);
 670}
 671
 672static __thread AioContext *my_aiocontext;
 673
 674AioContext *qemu_get_current_aio_context(void)
 675{
 676    if (my_aiocontext) {
 677        return my_aiocontext;
 678    }
 679    if (qemu_mutex_iothread_locked()) {
 680        /* Possibly in a vCPU thread.  */
 681        return qemu_get_aio_context();
 682    }
 683    return NULL;
 684}
 685
 686void qemu_set_current_aio_context(AioContext *ctx)
 687{
 688    assert(!my_aiocontext);
 689    my_aiocontext = ctx;
 690}
 691