qemu/util/qemu-coroutine-lock.c
<<
>>
Prefs
   1/*
   2 * coroutine queues and locks
   3 *
   4 * Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com>
   5 *
   6 * Permission is hereby granted, free of charge, to any person obtaining a copy
   7 * of this software and associated documentation files (the "Software"), to deal
   8 * in the Software without restriction, including without limitation the rights
   9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10 * copies of the Software, and to permit persons to whom the Software is
  11 * furnished to do so, subject to the following conditions:
  12 *
  13 * The above copyright notice and this permission notice shall be included in
  14 * all copies or substantial portions of the Software.
  15 *
  16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  19 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22 * THE SOFTWARE.
  23 *
  24 * The lock-free mutex implementation is based on OSv
  25 * (core/lfmutex.cc, include/lockfree/mutex.hh).
  26 * Copyright (C) 2013 Cloudius Systems, Ltd.
  27 */
  28
  29#include "qemu/osdep.h"
  30#include "qemu-common.h"
  31#include "qemu/coroutine.h"
  32#include "qemu/coroutine_int.h"
  33#include "qemu/processor.h"
  34#include "qemu/queue.h"
  35#include "block/aio.h"
  36#include "trace.h"
  37
  38void qemu_co_queue_init(CoQueue *queue)
  39{
  40    QSIMPLEQ_INIT(&queue->entries);
  41}
  42
  43void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock)
  44{
  45    Coroutine *self = qemu_coroutine_self();
  46    QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
  47
  48    if (lock) {
  49        qemu_lockable_unlock(lock);
  50    }
  51
  52    /* There is no race condition here.  Other threads will call
  53     * aio_co_schedule on our AioContext, which can reenter this
  54     * coroutine but only after this yield and after the main loop
  55     * has gone through the next iteration.
  56     */
  57    qemu_coroutine_yield();
  58    assert(qemu_in_coroutine());
  59
  60    /* TODO: OSv implements wait morphing here, where the wakeup
  61     * primitive automatically places the woken coroutine on the
  62     * mutex's queue.  This avoids the thundering herd effect.
  63     * This could be implemented for CoMutexes, but not really for
  64     * other cases of QemuLockable.
  65     */
  66    if (lock) {
  67        qemu_lockable_lock(lock);
  68    }
  69}
  70
  71static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
  72{
  73    Coroutine *next;
  74
  75    if (QSIMPLEQ_EMPTY(&queue->entries)) {
  76        return false;
  77    }
  78
  79    while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
  80        QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
  81        aio_co_wake(next);
  82        if (single) {
  83            break;
  84        }
  85    }
  86    return true;
  87}
  88
  89bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
  90{
  91    assert(qemu_in_coroutine());
  92    return qemu_co_queue_do_restart(queue, true);
  93}
  94
  95void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
  96{
  97    assert(qemu_in_coroutine());
  98    qemu_co_queue_do_restart(queue, false);
  99}
 100
 101bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
 102{
 103    Coroutine *next;
 104
 105    next = QSIMPLEQ_FIRST(&queue->entries);
 106    if (!next) {
 107        return false;
 108    }
 109
 110    QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
 111    if (lock) {
 112        qemu_lockable_unlock(lock);
 113    }
 114    aio_co_wake(next);
 115    if (lock) {
 116        qemu_lockable_lock(lock);
 117    }
 118    return true;
 119}
 120
 121bool qemu_co_queue_empty(CoQueue *queue)
 122{
 123    return QSIMPLEQ_FIRST(&queue->entries) == NULL;
 124}
 125
 126/* The wait records are handled with a multiple-producer, single-consumer
 127 * lock-free queue.  There cannot be two concurrent pop_waiter() calls
 128 * because pop_waiter() can only be called while mutex->handoff is zero.
 129 * This can happen in three cases:
 130 * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
 131 *   In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
 132 *   not take part in the handoff.
 133 * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
 134 *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
 135 *   the cmpxchg (it will see either 0 or the next sequence value) and
 136 *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
 137 *   woken up someone.
 138 * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
 139 *   In this case another iteration starts with mutex->handoff == 0;
 140 *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
 141 *   qemu_co_mutex_unlock will go back to case (1).
 142 *
 143 * The following functions manage this queue.
 144 */
 145typedef struct CoWaitRecord {
 146    Coroutine *co;
 147    QSLIST_ENTRY(CoWaitRecord) next;
 148} CoWaitRecord;
 149
 150static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
 151{
 152    w->co = qemu_coroutine_self();
 153    QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
 154}
 155
 156static void move_waiters(CoMutex *mutex)
 157{
 158    QSLIST_HEAD(, CoWaitRecord) reversed;
 159    QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
 160    while (!QSLIST_EMPTY(&reversed)) {
 161        CoWaitRecord *w = QSLIST_FIRST(&reversed);
 162        QSLIST_REMOVE_HEAD(&reversed, next);
 163        QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
 164    }
 165}
 166
 167static CoWaitRecord *pop_waiter(CoMutex *mutex)
 168{
 169    CoWaitRecord *w;
 170
 171    if (QSLIST_EMPTY(&mutex->to_pop)) {
 172        move_waiters(mutex);
 173        if (QSLIST_EMPTY(&mutex->to_pop)) {
 174            return NULL;
 175        }
 176    }
 177    w = QSLIST_FIRST(&mutex->to_pop);
 178    QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
 179    return w;
 180}
 181
 182static bool has_waiters(CoMutex *mutex)
 183{
 184    return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
 185}
 186
 187void qemu_co_mutex_init(CoMutex *mutex)
 188{
 189    memset(mutex, 0, sizeof(*mutex));
 190}
 191
 192static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
 193{
 194    /* Read co before co->ctx; pairs with smp_wmb() in
 195     * qemu_coroutine_enter().
 196     */
 197    smp_read_barrier_depends();
 198    mutex->ctx = co->ctx;
 199    aio_co_wake(co);
 200}
 201
 202static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
 203                                                     CoMutex *mutex)
 204{
 205    Coroutine *self = qemu_coroutine_self();
 206    CoWaitRecord w;
 207    unsigned old_handoff;
 208
 209    trace_qemu_co_mutex_lock_entry(mutex, self);
 210    w.co = self;
 211    push_waiter(mutex, &w);
 212
 213    /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
 214     * a concurrent unlock() the responsibility of waking somebody up.
 215     */
 216    old_handoff = atomic_mb_read(&mutex->handoff);
 217    if (old_handoff &&
 218        has_waiters(mutex) &&
 219        atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
 220        /* There can be no concurrent pops, because there can be only
 221         * one active handoff at a time.
 222         */
 223        CoWaitRecord *to_wake = pop_waiter(mutex);
 224        Coroutine *co = to_wake->co;
 225        if (co == self) {
 226            /* We got the lock ourselves!  */
 227            assert(to_wake == &w);
 228            mutex->ctx = ctx;
 229            return;
 230        }
 231
 232        qemu_co_mutex_wake(mutex, co);
 233    }
 234
 235    qemu_coroutine_yield();
 236    trace_qemu_co_mutex_lock_return(mutex, self);
 237}
 238
 239void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
 240{
 241    AioContext *ctx = qemu_get_current_aio_context();
 242    Coroutine *self = qemu_coroutine_self();
 243    int waiters, i;
 244
 245    /* Running a very small critical section on pthread_mutex_t and CoMutex
 246     * shows that pthread_mutex_t is much faster because it doesn't actually
 247     * go to sleep.  What happens is that the critical section is shorter
 248     * than the latency of entering the kernel and thus FUTEX_WAIT always
 249     * fails.  With CoMutex there is no such latency but you still want to
 250     * avoid wait and wakeup.  So introduce it artificially.
 251     */
 252    i = 0;
 253retry_fast_path:
 254    waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
 255    if (waiters != 0) {
 256        while (waiters == 1 && ++i < 1000) {
 257            if (atomic_read(&mutex->ctx) == ctx) {
 258                break;
 259            }
 260            if (atomic_read(&mutex->locked) == 0) {
 261                goto retry_fast_path;
 262            }
 263            cpu_relax();
 264        }
 265        waiters = atomic_fetch_inc(&mutex->locked);
 266    }
 267
 268    if (waiters == 0) {
 269        /* Uncontended.  */
 270        trace_qemu_co_mutex_lock_uncontended(mutex, self);
 271        mutex->ctx = ctx;
 272    } else {
 273        qemu_co_mutex_lock_slowpath(ctx, mutex);
 274    }
 275    mutex->holder = self;
 276    self->locks_held++;
 277}
 278
 279void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
 280{
 281    Coroutine *self = qemu_coroutine_self();
 282
 283    trace_qemu_co_mutex_unlock_entry(mutex, self);
 284
 285    assert(mutex->locked);
 286    assert(mutex->holder == self);
 287    assert(qemu_in_coroutine());
 288
 289    mutex->ctx = NULL;
 290    mutex->holder = NULL;
 291    self->locks_held--;
 292    if (atomic_fetch_dec(&mutex->locked) == 1) {
 293        /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
 294        return;
 295    }
 296
 297    for (;;) {
 298        CoWaitRecord *to_wake = pop_waiter(mutex);
 299        unsigned our_handoff;
 300
 301        if (to_wake) {
 302            qemu_co_mutex_wake(mutex, to_wake->co);
 303            break;
 304        }
 305
 306        /* Some concurrent lock() is in progress (we know this because
 307         * mutex->locked was >1) but it hasn't yet put itself on the wait
 308         * queue.  Pick a sequence number for the handoff protocol (not 0).
 309         */
 310        if (++mutex->sequence == 0) {
 311            mutex->sequence = 1;
 312        }
 313
 314        our_handoff = mutex->sequence;
 315        atomic_mb_set(&mutex->handoff, our_handoff);
 316        if (!has_waiters(mutex)) {
 317            /* The concurrent lock has not added itself yet, so it
 318             * will be able to pick our handoff.
 319             */
 320            break;
 321        }
 322
 323        /* Try to do the handoff protocol ourselves; if somebody else has
 324         * already taken it, however, we're done and they're responsible.
 325         */
 326        if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
 327            break;
 328        }
 329    }
 330
 331    trace_qemu_co_mutex_unlock_return(mutex, self);
 332}
 333
 334void qemu_co_rwlock_init(CoRwlock *lock)
 335{
 336    memset(lock, 0, sizeof(*lock));
 337    qemu_co_queue_init(&lock->queue);
 338    qemu_co_mutex_init(&lock->mutex);
 339}
 340
 341void qemu_co_rwlock_rdlock(CoRwlock *lock)
 342{
 343    Coroutine *self = qemu_coroutine_self();
 344
 345    qemu_co_mutex_lock(&lock->mutex);
 346    /* For fairness, wait if a writer is in line.  */
 347    while (lock->pending_writer) {
 348        qemu_co_queue_wait(&lock->queue, &lock->mutex);
 349    }
 350    lock->reader++;
 351    qemu_co_mutex_unlock(&lock->mutex);
 352
 353    /* The rest of the read-side critical section is run without the mutex.  */
 354    self->locks_held++;
 355}
 356
 357void qemu_co_rwlock_unlock(CoRwlock *lock)
 358{
 359    Coroutine *self = qemu_coroutine_self();
 360
 361    assert(qemu_in_coroutine());
 362    if (!lock->reader) {
 363        /* The critical section started in qemu_co_rwlock_wrlock.  */
 364        qemu_co_queue_restart_all(&lock->queue);
 365    } else {
 366        self->locks_held--;
 367
 368        qemu_co_mutex_lock(&lock->mutex);
 369        lock->reader--;
 370        assert(lock->reader >= 0);
 371        /* Wakeup only one waiting writer */
 372        if (!lock->reader) {
 373            qemu_co_queue_next(&lock->queue);
 374        }
 375    }
 376    qemu_co_mutex_unlock(&lock->mutex);
 377}
 378
 379void qemu_co_rwlock_downgrade(CoRwlock *lock)
 380{
 381    Coroutine *self = qemu_coroutine_self();
 382
 383    /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
 384     * qemu_co_rwlock_upgrade.
 385     */
 386    assert(lock->reader == 0);
 387    lock->reader++;
 388    qemu_co_mutex_unlock(&lock->mutex);
 389
 390    /* The rest of the read-side critical section is run without the mutex.  */
 391    self->locks_held++;
 392}
 393
 394void qemu_co_rwlock_wrlock(CoRwlock *lock)
 395{
 396    qemu_co_mutex_lock(&lock->mutex);
 397    lock->pending_writer++;
 398    while (lock->reader) {
 399        qemu_co_queue_wait(&lock->queue, &lock->mutex);
 400    }
 401    lock->pending_writer--;
 402
 403    /* The rest of the write-side critical section is run with
 404     * the mutex taken, so that lock->reader remains zero.
 405     * There is no need to update self->locks_held.
 406     */
 407}
 408
 409void qemu_co_rwlock_upgrade(CoRwlock *lock)
 410{
 411    Coroutine *self = qemu_coroutine_self();
 412
 413    qemu_co_mutex_lock(&lock->mutex);
 414    assert(lock->reader > 0);
 415    lock->reader--;
 416    lock->pending_writer++;
 417    while (lock->reader) {
 418        qemu_co_queue_wait(&lock->queue, &lock->mutex);
 419    }
 420    lock->pending_writer--;
 421
 422    /* The rest of the write-side critical section is run with
 423     * the mutex taken, similar to qemu_co_rwlock_wrlock.  Do
 424     * not account for the lock twice in self->locks_held.
 425     */
 426    self->locks_held--;
 427}
 428