qemu/util/qemu-coroutine-lock.c
<<
>>
Prefs
   1/*
   2 * coroutine queues and locks
   3 *
   4 * Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com>
   5 *
   6 * Permission is hereby granted, free of charge, to any person obtaining a copy
   7 * of this software and associated documentation files (the "Software"), to deal
   8 * in the Software without restriction, including without limitation the rights
   9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10 * copies of the Software, and to permit persons to whom the Software is
  11 * furnished to do so, subject to the following conditions:
  12 *
  13 * The above copyright notice and this permission notice shall be included in
  14 * all copies or substantial portions of the Software.
  15 *
  16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  19 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22 * THE SOFTWARE.
  23 *
  24 * The lock-free mutex implementation is based on OSv
  25 * (core/lfmutex.cc, include/lockfree/mutex.hh).
  26 * Copyright (C) 2013 Cloudius Systems, Ltd.
  27 */
  28
  29#include "qemu/osdep.h"
  30#include "qemu/coroutine.h"
  31#include "qemu/coroutine_int.h"
  32#include "qemu/processor.h"
  33#include "qemu/queue.h"
  34#include "block/aio.h"
  35#include "trace.h"
  36
  37void qemu_co_queue_init(CoQueue *queue)
  38{
  39    QSIMPLEQ_INIT(&queue->entries);
  40}
  41
  42void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock,
  43                                          CoQueueWaitFlags flags)
  44{
  45    Coroutine *self = qemu_coroutine_self();
  46    if (flags & CO_QUEUE_WAIT_FRONT) {
  47        QSIMPLEQ_INSERT_HEAD(&queue->entries, self, co_queue_next);
  48    } else {
  49        QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
  50    }
  51
  52    if (lock) {
  53        qemu_lockable_unlock(lock);
  54    }
  55
  56    /* There is no race condition here.  Other threads will call
  57     * aio_co_schedule on our AioContext, which can reenter this
  58     * coroutine but only after this yield and after the main loop
  59     * has gone through the next iteration.
  60     */
  61    qemu_coroutine_yield();
  62    assert(qemu_in_coroutine());
  63
  64    /* TODO: OSv implements wait morphing here, where the wakeup
  65     * primitive automatically places the woken coroutine on the
  66     * mutex's queue.  This avoids the thundering herd effect.
  67     * This could be implemented for CoMutexes, but not really for
  68     * other cases of QemuLockable.
  69     */
  70    if (lock) {
  71        qemu_lockable_lock(lock);
  72    }
  73}
  74
  75bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
  76{
  77    Coroutine *next;
  78
  79    next = QSIMPLEQ_FIRST(&queue->entries);
  80    if (!next) {
  81        return false;
  82    }
  83
  84    QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
  85    if (lock) {
  86        qemu_lockable_unlock(lock);
  87    }
  88    aio_co_wake(next);
  89    if (lock) {
  90        qemu_lockable_lock(lock);
  91    }
  92    return true;
  93}
  94
  95bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
  96{
  97    /* No unlock/lock needed in coroutine context.  */
  98    return qemu_co_enter_next_impl(queue, NULL);
  99}
 100
 101void qemu_co_enter_all_impl(CoQueue *queue, QemuLockable *lock)
 102{
 103    while (qemu_co_enter_next_impl(queue, lock)) {
 104        /* just loop */
 105    }
 106}
 107
 108void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
 109{
 110    /* No unlock/lock needed in coroutine context.  */
 111    qemu_co_enter_all_impl(queue, NULL);
 112}
 113
 114bool qemu_co_queue_empty(CoQueue *queue)
 115{
 116    return QSIMPLEQ_FIRST(&queue->entries) == NULL;
 117}
 118
 119/* The wait records are handled with a multiple-producer, single-consumer
 120 * lock-free queue.  There cannot be two concurrent pop_waiter() calls
 121 * because pop_waiter() can only be called while mutex->handoff is zero.
 122 * This can happen in three cases:
 123 * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
 124 *   In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
 125 *   not take part in the handoff.
 126 * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
 127 *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
 128 *   the cmpxchg (it will see either 0 or the next sequence value) and
 129 *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
 130 *   woken up someone.
 131 * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
 132 *   In this case another iteration starts with mutex->handoff == 0;
 133 *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
 134 *   qemu_co_mutex_unlock will go back to case (1).
 135 *
 136 * The following functions manage this queue.
 137 */
 138typedef struct CoWaitRecord {
 139    Coroutine *co;
 140    QSLIST_ENTRY(CoWaitRecord) next;
 141} CoWaitRecord;
 142
 143static void coroutine_fn push_waiter(CoMutex *mutex, CoWaitRecord *w)
 144{
 145    w->co = qemu_coroutine_self();
 146    QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
 147}
 148
 149static void move_waiters(CoMutex *mutex)
 150{
 151    QSLIST_HEAD(, CoWaitRecord) reversed;
 152    QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
 153    while (!QSLIST_EMPTY(&reversed)) {
 154        CoWaitRecord *w = QSLIST_FIRST(&reversed);
 155        QSLIST_REMOVE_HEAD(&reversed, next);
 156        QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
 157    }
 158}
 159
 160static CoWaitRecord *pop_waiter(CoMutex *mutex)
 161{
 162    CoWaitRecord *w;
 163
 164    if (QSLIST_EMPTY(&mutex->to_pop)) {
 165        move_waiters(mutex);
 166        if (QSLIST_EMPTY(&mutex->to_pop)) {
 167            return NULL;
 168        }
 169    }
 170    w = QSLIST_FIRST(&mutex->to_pop);
 171    QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
 172    return w;
 173}
 174
 175static bool has_waiters(CoMutex *mutex)
 176{
 177    return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
 178}
 179
 180void qemu_co_mutex_init(CoMutex *mutex)
 181{
 182    memset(mutex, 0, sizeof(*mutex));
 183}
 184
 185static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
 186{
 187    /* Read co before co->ctx; pairs with smp_wmb() in
 188     * qemu_coroutine_enter().
 189     */
 190    smp_read_barrier_depends();
 191    mutex->ctx = co->ctx;
 192    aio_co_wake(co);
 193}
 194
 195static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
 196                                                     CoMutex *mutex)
 197{
 198    Coroutine *self = qemu_coroutine_self();
 199    CoWaitRecord w;
 200    unsigned old_handoff;
 201
 202    trace_qemu_co_mutex_lock_entry(mutex, self);
 203    push_waiter(mutex, &w);
 204
 205    /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
 206     * a concurrent unlock() the responsibility of waking somebody up.
 207     */
 208    old_handoff = qatomic_mb_read(&mutex->handoff);
 209    if (old_handoff &&
 210        has_waiters(mutex) &&
 211        qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
 212        /* There can be no concurrent pops, because there can be only
 213         * one active handoff at a time.
 214         */
 215        CoWaitRecord *to_wake = pop_waiter(mutex);
 216        Coroutine *co = to_wake->co;
 217        if (co == self) {
 218            /* We got the lock ourselves!  */
 219            assert(to_wake == &w);
 220            mutex->ctx = ctx;
 221            return;
 222        }
 223
 224        qemu_co_mutex_wake(mutex, co);
 225    }
 226
 227    qemu_coroutine_yield();
 228    trace_qemu_co_mutex_lock_return(mutex, self);
 229}
 230
 231void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
 232{
 233    AioContext *ctx = qemu_get_current_aio_context();
 234    Coroutine *self = qemu_coroutine_self();
 235    int waiters, i;
 236
 237    /* Running a very small critical section on pthread_mutex_t and CoMutex
 238     * shows that pthread_mutex_t is much faster because it doesn't actually
 239     * go to sleep.  What happens is that the critical section is shorter
 240     * than the latency of entering the kernel and thus FUTEX_WAIT always
 241     * fails.  With CoMutex there is no such latency but you still want to
 242     * avoid wait and wakeup.  So introduce it artificially.
 243     */
 244    i = 0;
 245retry_fast_path:
 246    waiters = qatomic_cmpxchg(&mutex->locked, 0, 1);
 247    if (waiters != 0) {
 248        while (waiters == 1 && ++i < 1000) {
 249            if (qatomic_read(&mutex->ctx) == ctx) {
 250                break;
 251            }
 252            if (qatomic_read(&mutex->locked) == 0) {
 253                goto retry_fast_path;
 254            }
 255            cpu_relax();
 256        }
 257        waiters = qatomic_fetch_inc(&mutex->locked);
 258    }
 259
 260    if (waiters == 0) {
 261        /* Uncontended.  */
 262        trace_qemu_co_mutex_lock_uncontended(mutex, self);
 263        mutex->ctx = ctx;
 264    } else {
 265        qemu_co_mutex_lock_slowpath(ctx, mutex);
 266    }
 267    mutex->holder = self;
 268    self->locks_held++;
 269}
 270
 271void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
 272{
 273    Coroutine *self = qemu_coroutine_self();
 274
 275    trace_qemu_co_mutex_unlock_entry(mutex, self);
 276
 277    assert(mutex->locked);
 278    assert(mutex->holder == self);
 279    assert(qemu_in_coroutine());
 280
 281    mutex->ctx = NULL;
 282    mutex->holder = NULL;
 283    self->locks_held--;
 284    if (qatomic_fetch_dec(&mutex->locked) == 1) {
 285        /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
 286        return;
 287    }
 288
 289    for (;;) {
 290        CoWaitRecord *to_wake = pop_waiter(mutex);
 291        unsigned our_handoff;
 292
 293        if (to_wake) {
 294            qemu_co_mutex_wake(mutex, to_wake->co);
 295            break;
 296        }
 297
 298        /* Some concurrent lock() is in progress (we know this because
 299         * mutex->locked was >1) but it hasn't yet put itself on the wait
 300         * queue.  Pick a sequence number for the handoff protocol (not 0).
 301         */
 302        if (++mutex->sequence == 0) {
 303            mutex->sequence = 1;
 304        }
 305
 306        our_handoff = mutex->sequence;
 307        qatomic_mb_set(&mutex->handoff, our_handoff);
 308        if (!has_waiters(mutex)) {
 309            /* The concurrent lock has not added itself yet, so it
 310             * will be able to pick our handoff.
 311             */
 312            break;
 313        }
 314
 315        /* Try to do the handoff protocol ourselves; if somebody else has
 316         * already taken it, however, we're done and they're responsible.
 317         */
 318        if (qatomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
 319            break;
 320        }
 321    }
 322
 323    trace_qemu_co_mutex_unlock_return(mutex, self);
 324}
 325
 326struct CoRwTicket {
 327    bool read;
 328    Coroutine *co;
 329    QSIMPLEQ_ENTRY(CoRwTicket) next;
 330};
 331
 332void qemu_co_rwlock_init(CoRwlock *lock)
 333{
 334    qemu_co_mutex_init(&lock->mutex);
 335    lock->owners = 0;
 336    QSIMPLEQ_INIT(&lock->tickets);
 337}
 338
 339/* Releases the internal CoMutex.  */
 340static void coroutine_fn qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
 341{
 342    CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
 343    Coroutine *co = NULL;
 344
 345    /*
 346     * Setting lock->owners here prevents rdlock and wrlock from
 347     * sneaking in between unlock and wake.
 348     */
 349
 350    if (tkt) {
 351        if (tkt->read) {
 352            if (lock->owners >= 0) {
 353                lock->owners++;
 354                co = tkt->co;
 355            }
 356        } else {
 357            if (lock->owners == 0) {
 358                lock->owners = -1;
 359                co = tkt->co;
 360            }
 361        }
 362    }
 363
 364    if (co) {
 365        QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
 366        qemu_co_mutex_unlock(&lock->mutex);
 367        aio_co_wake(co);
 368    } else {
 369        qemu_co_mutex_unlock(&lock->mutex);
 370    }
 371}
 372
 373void coroutine_fn qemu_co_rwlock_rdlock(CoRwlock *lock)
 374{
 375    Coroutine *self = qemu_coroutine_self();
 376
 377    qemu_co_mutex_lock(&lock->mutex);
 378    /* For fairness, wait if a writer is in line.  */
 379    if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) {
 380        lock->owners++;
 381        qemu_co_mutex_unlock(&lock->mutex);
 382    } else {
 383        CoRwTicket my_ticket = { true, self };
 384
 385        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
 386        qemu_co_mutex_unlock(&lock->mutex);
 387        qemu_coroutine_yield();
 388        assert(lock->owners >= 1);
 389
 390        /* Possibly wake another reader, which will wake the next in line.  */
 391        qemu_co_mutex_lock(&lock->mutex);
 392        qemu_co_rwlock_maybe_wake_one(lock);
 393    }
 394
 395    self->locks_held++;
 396}
 397
 398void coroutine_fn qemu_co_rwlock_unlock(CoRwlock *lock)
 399{
 400    Coroutine *self = qemu_coroutine_self();
 401
 402    assert(qemu_in_coroutine());
 403    self->locks_held--;
 404
 405    qemu_co_mutex_lock(&lock->mutex);
 406    if (lock->owners > 0) {
 407        lock->owners--;
 408    } else {
 409        assert(lock->owners == -1);
 410        lock->owners = 0;
 411    }
 412
 413    qemu_co_rwlock_maybe_wake_one(lock);
 414}
 415
 416void coroutine_fn qemu_co_rwlock_downgrade(CoRwlock *lock)
 417{
 418    qemu_co_mutex_lock(&lock->mutex);
 419    assert(lock->owners == -1);
 420    lock->owners = 1;
 421
 422    /* Possibly wake another reader, which will wake the next in line.  */
 423    qemu_co_rwlock_maybe_wake_one(lock);
 424}
 425
 426void coroutine_fn qemu_co_rwlock_wrlock(CoRwlock *lock)
 427{
 428    Coroutine *self = qemu_coroutine_self();
 429
 430    qemu_co_mutex_lock(&lock->mutex);
 431    if (lock->owners == 0) {
 432        lock->owners = -1;
 433        qemu_co_mutex_unlock(&lock->mutex);
 434    } else {
 435        CoRwTicket my_ticket = { false, qemu_coroutine_self() };
 436
 437        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
 438        qemu_co_mutex_unlock(&lock->mutex);
 439        qemu_coroutine_yield();
 440        assert(lock->owners == -1);
 441    }
 442
 443    self->locks_held++;
 444}
 445
 446void coroutine_fn qemu_co_rwlock_upgrade(CoRwlock *lock)
 447{
 448    qemu_co_mutex_lock(&lock->mutex);
 449    assert(lock->owners > 0);
 450    /* For fairness, wait if a writer is in line.  */
 451    if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
 452        lock->owners = -1;
 453        qemu_co_mutex_unlock(&lock->mutex);
 454    } else {
 455        CoRwTicket my_ticket = { false, qemu_coroutine_self() };
 456
 457        lock->owners--;
 458        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
 459        qemu_co_rwlock_maybe_wake_one(lock);
 460        qemu_coroutine_yield();
 461        assert(lock->owners == -1);
 462    }
 463}
 464