linux/kernel/locking/osq_lock.c
<<
>>
Prefs
   1#include <linux/percpu.h>
   2#include <linux/sched.h>
   3#include <linux/osq_lock.h>
   4
   5/*
   6 * An MCS like lock especially tailored for optimistic spinning for sleeping
   7 * lock implementations (mutex, rwsem, etc).
   8 *
   9 * Using a single mcs node per CPU is safe because sleeping locks should not be
  10 * called from interrupt context and we have preemption disabled while
  11 * spinning.
  12 */
  13static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node, osq_node);
  14
  15/*
  16 * We use the value 0 to represent "no CPU", thus the encoded value
  17 * will be the CPU number incremented by 1.
  18 */
  19static inline int encode_cpu(int cpu_nr)
  20{
  21        return cpu_nr + 1;
  22}
  23
  24static inline struct optimistic_spin_node *decode_cpu(int encoded_cpu_val)
  25{
  26        int cpu_nr = encoded_cpu_val - 1;
  27
  28        return per_cpu_ptr(&osq_node, cpu_nr);
  29}
  30
  31/*
  32 * Get a stable @node->next pointer, either for unlock() or unqueue() purposes.
  33 * Can return NULL in case we were the last queued and we updated @lock instead.
  34 */
  35static inline struct optimistic_spin_node *
  36osq_wait_next(struct optimistic_spin_queue *lock,
  37              struct optimistic_spin_node *node,
  38              struct optimistic_spin_node *prev)
  39{
  40        struct optimistic_spin_node *next = NULL;
  41        int curr = encode_cpu(smp_processor_id());
  42        int old;
  43
  44        /*
  45         * If there is a prev node in queue, then the 'old' value will be
  46         * the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if
  47         * we're currently last in queue, then the queue will then become empty.
  48         */
  49        old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
  50
  51        for (;;) {
  52                if (atomic_read(&lock->tail) == curr &&
  53                    atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) {
  54                        /*
  55                         * We were the last queued, we moved @lock back. @prev
  56                         * will now observe @lock and will complete its
  57                         * unlock()/unqueue().
  58                         */
  59                        break;
  60                }
  61
  62                /*
  63                 * We must xchg() the @node->next value, because if we were to
  64                 * leave it in, a concurrent unlock()/unqueue() from
  65                 * @node->next might complete Step-A and think its @prev is
  66                 * still valid.
  67                 *
  68                 * If the concurrent unlock()/unqueue() wins the race, we'll
  69                 * wait for either @lock to point to us, through its Step-B, or
  70                 * wait for a new @node->next from its Step-C.
  71                 */
  72                if (node->next) {
  73                        next = xchg(&node->next, NULL);
  74                        if (next)
  75                                break;
  76                }
  77
  78                cpu_relax_lowlatency();
  79        }
  80
  81        return next;
  82}
  83
  84bool osq_lock(struct optimistic_spin_queue *lock)
  85{
  86        struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
  87        struct optimistic_spin_node *prev, *next;
  88        int curr = encode_cpu(smp_processor_id());
  89        int old;
  90
  91        node->locked = 0;
  92        node->next = NULL;
  93        node->cpu = curr;
  94
  95        /*
  96         * We need both ACQUIRE (pairs with corresponding RELEASE in
  97         * unlock() uncontended, or fastpath) and RELEASE (to publish
  98         * the node fields we just initialised) semantics when updating
  99         * the lock tail.
 100         */
 101        old = atomic_xchg(&lock->tail, curr);
 102        if (old == OSQ_UNLOCKED_VAL)
 103                return true;
 104
 105        prev = decode_cpu(old);
 106        node->prev = prev;
 107        WRITE_ONCE(prev->next, node);
 108
 109        /*
 110         * Normally @prev is untouchable after the above store; because at that
 111         * moment unlock can proceed and wipe the node element from stack.
 112         *
 113         * However, since our nodes are static per-cpu storage, we're
 114         * guaranteed their existence -- this allows us to apply
 115         * cmpxchg in an attempt to undo our queueing.
 116         */
 117
 118        while (!READ_ONCE(node->locked)) {
 119                /*
 120                 * If we need to reschedule bail... so we can block.
 121                 */
 122                if (need_resched())
 123                        goto unqueue;
 124
 125                cpu_relax_lowlatency();
 126        }
 127        return true;
 128
 129unqueue:
 130        /*
 131         * Step - A  -- stabilize @prev
 132         *
 133         * Undo our @prev->next assignment; this will make @prev's
 134         * unlock()/unqueue() wait for a next pointer since @lock points to us
 135         * (or later).
 136         */
 137
 138        for (;;) {
 139                if (prev->next == node &&
 140                    cmpxchg(&prev->next, node, NULL) == node)
 141                        break;
 142
 143                /*
 144                 * We can only fail the cmpxchg() racing against an unlock(),
 145                 * in which case we should observe @node->locked becomming
 146                 * true.
 147                 */
 148                if (smp_load_acquire(&node->locked))
 149                        return true;
 150
 151                cpu_relax_lowlatency();
 152
 153                /*
 154                 * Or we race against a concurrent unqueue()'s step-B, in which
 155                 * case its step-C will write us a new @node->prev pointer.
 156                 */
 157                prev = READ_ONCE(node->prev);
 158        }
 159
 160        /*
 161         * Step - B -- stabilize @next
 162         *
 163         * Similar to unlock(), wait for @node->next or move @lock from @node
 164         * back to @prev.
 165         */
 166
 167        next = osq_wait_next(lock, node, prev);
 168        if (!next)
 169                return false;
 170
 171        /*
 172         * Step - C -- unlink
 173         *
 174         * @prev is stable because its still waiting for a new @prev->next
 175         * pointer, @next is stable because our @node->next pointer is NULL and
 176         * it will wait in Step-A.
 177         */
 178
 179        WRITE_ONCE(next->prev, prev);
 180        WRITE_ONCE(prev->next, next);
 181
 182        return false;
 183}
 184
 185void osq_unlock(struct optimistic_spin_queue *lock)
 186{
 187        struct optimistic_spin_node *node, *next;
 188        int curr = encode_cpu(smp_processor_id());
 189
 190        /*
 191         * Fast path for the uncontended case.
 192         */
 193        if (likely(atomic_cmpxchg_release(&lock->tail, curr,
 194                                          OSQ_UNLOCKED_VAL) == curr))
 195                return;
 196
 197        /*
 198         * Second most likely case.
 199         */
 200        node = this_cpu_ptr(&osq_node);
 201        next = xchg(&node->next, NULL);
 202        if (next) {
 203                WRITE_ONCE(next->locked, 1);
 204                return;
 205        }
 206
 207        next = osq_wait_next(lock, node, NULL);
 208        if (next)
 209                WRITE_ONCE(next->locked, 1);
 210}
 211