linux/kernel/futex/requeue.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2
   3#include <linux/sched/signal.h>
   4
   5#include "futex.h"
   6#include "../locking/rtmutex_common.h"
   7
   8/*
   9 * On PREEMPT_RT, the hash bucket lock is a 'sleeping' spinlock with an
  10 * underlying rtmutex. The task which is about to be requeued could have
  11 * just woken up (timeout, signal). After the wake up the task has to
  12 * acquire hash bucket lock, which is held by the requeue code.  As a task
  13 * can only be blocked on _ONE_ rtmutex at a time, the proxy lock blocking
  14 * and the hash bucket lock blocking would collide and corrupt state.
  15 *
  16 * On !PREEMPT_RT this is not a problem and everything could be serialized
  17 * on hash bucket lock, but aside of having the benefit of common code,
  18 * this allows to avoid doing the requeue when the task is already on the
  19 * way out and taking the hash bucket lock of the original uaddr1 when the
  20 * requeue has been completed.
  21 *
  22 * The following state transitions are valid:
  23 *
  24 * On the waiter side:
  25 *   Q_REQUEUE_PI_NONE          -> Q_REQUEUE_PI_IGNORE
  26 *   Q_REQUEUE_PI_IN_PROGRESS   -> Q_REQUEUE_PI_WAIT
  27 *
  28 * On the requeue side:
  29 *   Q_REQUEUE_PI_NONE          -> Q_REQUEUE_PI_INPROGRESS
  30 *   Q_REQUEUE_PI_IN_PROGRESS   -> Q_REQUEUE_PI_DONE/LOCKED
  31 *   Q_REQUEUE_PI_IN_PROGRESS   -> Q_REQUEUE_PI_NONE (requeue failed)
  32 *   Q_REQUEUE_PI_WAIT          -> Q_REQUEUE_PI_DONE/LOCKED
  33 *   Q_REQUEUE_PI_WAIT          -> Q_REQUEUE_PI_IGNORE (requeue failed)
  34 *
  35 * The requeue side ignores a waiter with state Q_REQUEUE_PI_IGNORE as this
  36 * signals that the waiter is already on the way out. It also means that
  37 * the waiter is still on the 'wait' futex, i.e. uaddr1.
  38 *
  39 * The waiter side signals early wakeup to the requeue side either through
  40 * setting state to Q_REQUEUE_PI_IGNORE or to Q_REQUEUE_PI_WAIT depending
  41 * on the current state. In case of Q_REQUEUE_PI_IGNORE it can immediately
  42 * proceed to take the hash bucket lock of uaddr1. If it set state to WAIT,
  43 * which means the wakeup is interleaving with a requeue in progress it has
  44 * to wait for the requeue side to change the state. Either to DONE/LOCKED
  45 * or to IGNORE. DONE/LOCKED means the waiter q is now on the uaddr2 futex
  46 * and either blocked (DONE) or has acquired it (LOCKED). IGNORE is set by
  47 * the requeue side when the requeue attempt failed via deadlock detection
  48 * and therefore the waiter q is still on the uaddr1 futex.
  49 */
  50enum {
  51        Q_REQUEUE_PI_NONE               =  0,
  52        Q_REQUEUE_PI_IGNORE,
  53        Q_REQUEUE_PI_IN_PROGRESS,
  54        Q_REQUEUE_PI_WAIT,
  55        Q_REQUEUE_PI_DONE,
  56        Q_REQUEUE_PI_LOCKED,
  57};
  58
  59const struct futex_q futex_q_init = {
  60        /* list gets initialized in futex_queue()*/
  61        .key            = FUTEX_KEY_INIT,
  62        .bitset         = FUTEX_BITSET_MATCH_ANY,
  63        .requeue_state  = ATOMIC_INIT(Q_REQUEUE_PI_NONE),
  64};
  65
  66/**
  67 * requeue_futex() - Requeue a futex_q from one hb to another
  68 * @q:          the futex_q to requeue
  69 * @hb1:        the source hash_bucket
  70 * @hb2:        the target hash_bucket
  71 * @key2:       the new key for the requeued futex_q
  72 */
  73static inline
  74void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1,
  75                   struct futex_hash_bucket *hb2, union futex_key *key2)
  76{
  77
  78        /*
  79         * If key1 and key2 hash to the same bucket, no need to
  80         * requeue.
  81         */
  82        if (likely(&hb1->chain != &hb2->chain)) {
  83                plist_del(&q->list, &hb1->chain);
  84                futex_hb_waiters_dec(hb1);
  85                futex_hb_waiters_inc(hb2);
  86                plist_add(&q->list, &hb2->chain);
  87                q->lock_ptr = &hb2->lock;
  88        }
  89        q->key = *key2;
  90}
  91
  92static inline bool futex_requeue_pi_prepare(struct futex_q *q,
  93                                            struct futex_pi_state *pi_state)
  94{
  95        int old, new;
  96
  97        /*
  98         * Set state to Q_REQUEUE_PI_IN_PROGRESS unless an early wakeup has
  99         * already set Q_REQUEUE_PI_IGNORE to signal that requeue should
 100         * ignore the waiter.
 101         */
 102        old = atomic_read_acquire(&q->requeue_state);
 103        do {
 104                if (old == Q_REQUEUE_PI_IGNORE)
 105                        return false;
 106
 107                /*
 108                 * futex_proxy_trylock_atomic() might have set it to
 109                 * IN_PROGRESS and a interleaved early wake to WAIT.
 110                 *
 111                 * It was considered to have an extra state for that
 112                 * trylock, but that would just add more conditionals
 113                 * all over the place for a dubious value.
 114                 */
 115                if (old != Q_REQUEUE_PI_NONE)
 116                        break;
 117
 118                new = Q_REQUEUE_PI_IN_PROGRESS;
 119        } while (!atomic_try_cmpxchg(&q->requeue_state, &old, new));
 120
 121        q->pi_state = pi_state;
 122        return true;
 123}
 124
 125static inline void futex_requeue_pi_complete(struct futex_q *q, int locked)
 126{
 127        int old, new;
 128
 129        old = atomic_read_acquire(&q->requeue_state);
 130        do {
 131                if (old == Q_REQUEUE_PI_IGNORE)
 132                        return;
 133
 134                if (locked >= 0) {
 135                        /* Requeue succeeded. Set DONE or LOCKED */
 136                        WARN_ON_ONCE(old != Q_REQUEUE_PI_IN_PROGRESS &&
 137                                     old != Q_REQUEUE_PI_WAIT);
 138                        new = Q_REQUEUE_PI_DONE + locked;
 139                } else if (old == Q_REQUEUE_PI_IN_PROGRESS) {
 140                        /* Deadlock, no early wakeup interleave */
 141                        new = Q_REQUEUE_PI_NONE;
 142                } else {
 143                        /* Deadlock, early wakeup interleave. */
 144                        WARN_ON_ONCE(old != Q_REQUEUE_PI_WAIT);
 145                        new = Q_REQUEUE_PI_IGNORE;
 146                }
 147        } while (!atomic_try_cmpxchg(&q->requeue_state, &old, new));
 148
 149#ifdef CONFIG_PREEMPT_RT
 150        /* If the waiter interleaved with the requeue let it know */
 151        if (unlikely(old == Q_REQUEUE_PI_WAIT))
 152                rcuwait_wake_up(&q->requeue_wait);
 153#endif
 154}
 155
 156static inline int futex_requeue_pi_wakeup_sync(struct futex_q *q)
 157{
 158        int old, new;
 159
 160        old = atomic_read_acquire(&q->requeue_state);
 161        do {
 162                /* Is requeue done already? */
 163                if (old >= Q_REQUEUE_PI_DONE)
 164                        return old;
 165
 166                /*
 167                 * If not done, then tell the requeue code to either ignore
 168                 * the waiter or to wake it up once the requeue is done.
 169                 */
 170                new = Q_REQUEUE_PI_WAIT;
 171                if (old == Q_REQUEUE_PI_NONE)
 172                        new = Q_REQUEUE_PI_IGNORE;
 173        } while (!atomic_try_cmpxchg(&q->requeue_state, &old, new));
 174
 175        /* If the requeue was in progress, wait for it to complete */
 176        if (old == Q_REQUEUE_PI_IN_PROGRESS) {
 177#ifdef CONFIG_PREEMPT_RT
 178                rcuwait_wait_event(&q->requeue_wait,
 179                                   atomic_read(&q->requeue_state) != Q_REQUEUE_PI_WAIT,
 180                                   TASK_UNINTERRUPTIBLE);
 181#else
 182                (void)atomic_cond_read_relaxed(&q->requeue_state, VAL != Q_REQUEUE_PI_WAIT);
 183#endif
 184        }
 185
 186        /*
 187         * Requeue is now either prohibited or complete. Reread state
 188         * because during the wait above it might have changed. Nothing
 189         * will modify q->requeue_state after this point.
 190         */
 191        return atomic_read(&q->requeue_state);
 192}
 193
 194/**
 195 * requeue_pi_wake_futex() - Wake a task that acquired the lock during requeue
 196 * @q:          the futex_q
 197 * @key:        the key of the requeue target futex
 198 * @hb:         the hash_bucket of the requeue target futex
 199 *
 200 * During futex_requeue, with requeue_pi=1, it is possible to acquire the
 201 * target futex if it is uncontended or via a lock steal.
 202 *
 203 * 1) Set @q::key to the requeue target futex key so the waiter can detect
 204 *    the wakeup on the right futex.
 205 *
 206 * 2) Dequeue @q from the hash bucket.
 207 *
 208 * 3) Set @q::rt_waiter to NULL so the woken up task can detect atomic lock
 209 *    acquisition.
 210 *
 211 * 4) Set the q->lock_ptr to the requeue target hb->lock for the case that
 212 *    the waiter has to fixup the pi state.
 213 *
 214 * 5) Complete the requeue state so the waiter can make progress. After
 215 *    this point the waiter task can return from the syscall immediately in
 216 *    case that the pi state does not have to be fixed up.
 217 *
 218 * 6) Wake the waiter task.
 219 *
 220 * Must be called with both q->lock_ptr and hb->lock held.
 221 */
 222static inline
 223void requeue_pi_wake_futex(struct futex_q *q, union futex_key *key,
 224                           struct futex_hash_bucket *hb)
 225{
 226        q->key = *key;
 227
 228        __futex_unqueue(q);
 229
 230        WARN_ON(!q->rt_waiter);
 231        q->rt_waiter = NULL;
 232
 233        q->lock_ptr = &hb->lock;
 234
 235        /* Signal locked state to the waiter */
 236        futex_requeue_pi_complete(q, 1);
 237        wake_up_state(q->task, TASK_NORMAL);
 238}
 239
 240/**
 241 * futex_proxy_trylock_atomic() - Attempt an atomic lock for the top waiter
 242 * @pifutex:            the user address of the to futex
 243 * @hb1:                the from futex hash bucket, must be locked by the caller
 244 * @hb2:                the to futex hash bucket, must be locked by the caller
 245 * @key1:               the from futex key
 246 * @key2:               the to futex key
 247 * @ps:                 address to store the pi_state pointer
 248 * @exiting:            Pointer to store the task pointer of the owner task
 249 *                      which is in the middle of exiting
 250 * @set_waiters:        force setting the FUTEX_WAITERS bit (1) or not (0)
 251 *
 252 * Try and get the lock on behalf of the top waiter if we can do it atomically.
 253 * Wake the top waiter if we succeed.  If the caller specified set_waiters,
 254 * then direct futex_lock_pi_atomic() to force setting the FUTEX_WAITERS bit.
 255 * hb1 and hb2 must be held by the caller.
 256 *
 257 * @exiting is only set when the return value is -EBUSY. If so, this holds
 258 * a refcount on the exiting task on return and the caller needs to drop it
 259 * after waiting for the exit to complete.
 260 *
 261 * Return:
 262 *  -  0 - failed to acquire the lock atomically;
 263 *  - >0 - acquired the lock, return value is vpid of the top_waiter
 264 *  - <0 - error
 265 */
 266static int
 267futex_proxy_trylock_atomic(u32 __user *pifutex, struct futex_hash_bucket *hb1,
 268                           struct futex_hash_bucket *hb2, union futex_key *key1,
 269                           union futex_key *key2, struct futex_pi_state **ps,
 270                           struct task_struct **exiting, int set_waiters)
 271{
 272        struct futex_q *top_waiter = NULL;
 273        u32 curval;
 274        int ret;
 275
 276        if (futex_get_value_locked(&curval, pifutex))
 277                return -EFAULT;
 278
 279        if (unlikely(should_fail_futex(true)))
 280                return -EFAULT;
 281
 282        /*
 283         * Find the top_waiter and determine if there are additional waiters.
 284         * If the caller intends to requeue more than 1 waiter to pifutex,
 285         * force futex_lock_pi_atomic() to set the FUTEX_WAITERS bit now,
 286         * as we have means to handle the possible fault.  If not, don't set
 287         * the bit unnecessarily as it will force the subsequent unlock to enter
 288         * the kernel.
 289         */
 290        top_waiter = futex_top_waiter(hb1, key1);
 291
 292        /* There are no waiters, nothing for us to do. */
 293        if (!top_waiter)
 294                return 0;
 295
 296        /*
 297         * Ensure that this is a waiter sitting in futex_wait_requeue_pi()
 298         * and waiting on the 'waitqueue' futex which is always !PI.
 299         */
 300        if (!top_waiter->rt_waiter || top_waiter->pi_state)
 301                return -EINVAL;
 302
 303        /* Ensure we requeue to the expected futex. */
 304        if (!futex_match(top_waiter->requeue_pi_key, key2))
 305                return -EINVAL;
 306
 307        /* Ensure that this does not race against an early wakeup */
 308        if (!futex_requeue_pi_prepare(top_waiter, NULL))
 309                return -EAGAIN;
 310
 311        /*
 312         * Try to take the lock for top_waiter and set the FUTEX_WAITERS bit
 313         * in the contended case or if @set_waiters is true.
 314         *
 315         * In the contended case PI state is attached to the lock owner. If
 316         * the user space lock can be acquired then PI state is attached to
 317         * the new owner (@top_waiter->task) when @set_waiters is true.
 318         */
 319        ret = futex_lock_pi_atomic(pifutex, hb2, key2, ps, top_waiter->task,
 320                                   exiting, set_waiters);
 321        if (ret == 1) {
 322                /*
 323                 * Lock was acquired in user space and PI state was
 324                 * attached to @top_waiter->task. That means state is fully
 325                 * consistent and the waiter can return to user space
 326                 * immediately after the wakeup.
 327                 */
 328                requeue_pi_wake_futex(top_waiter, key2, hb2);
 329        } else if (ret < 0) {
 330                /* Rewind top_waiter::requeue_state */
 331                futex_requeue_pi_complete(top_waiter, ret);
 332        } else {
 333                /*
 334                 * futex_lock_pi_atomic() did not acquire the user space
 335                 * futex, but managed to establish the proxy lock and pi
 336                 * state. top_waiter::requeue_state cannot be fixed up here
 337                 * because the waiter is not enqueued on the rtmutex
 338                 * yet. This is handled at the callsite depending on the
 339                 * result of rt_mutex_start_proxy_lock() which is
 340                 * guaranteed to be reached with this function returning 0.
 341                 */
 342        }
 343        return ret;
 344}
 345
 346/**
 347 * futex_requeue() - Requeue waiters from uaddr1 to uaddr2
 348 * @uaddr1:     source futex user address
 349 * @flags:      futex flags (FLAGS_SHARED, etc.)
 350 * @uaddr2:     target futex user address
 351 * @nr_wake:    number of waiters to wake (must be 1 for requeue_pi)
 352 * @nr_requeue: number of waiters to requeue (0-INT_MAX)
 353 * @cmpval:     @uaddr1 expected value (or %NULL)
 354 * @requeue_pi: if we are attempting to requeue from a non-pi futex to a
 355 *              pi futex (pi to pi requeue is not supported)
 356 *
 357 * Requeue waiters on uaddr1 to uaddr2. In the requeue_pi case, try to acquire
 358 * uaddr2 atomically on behalf of the top waiter.
 359 *
 360 * Return:
 361 *  - >=0 - on success, the number of tasks requeued or woken;
 362 *  -  <0 - on error
 363 */
 364int futex_requeue(u32 __user *uaddr1, unsigned int flags, u32 __user *uaddr2,
 365                  int nr_wake, int nr_requeue, u32 *cmpval, int requeue_pi)
 366{
 367        union futex_key key1 = FUTEX_KEY_INIT, key2 = FUTEX_KEY_INIT;
 368        int task_count = 0, ret;
 369        struct futex_pi_state *pi_state = NULL;
 370        struct futex_hash_bucket *hb1, *hb2;
 371        struct futex_q *this, *next;
 372        DEFINE_WAKE_Q(wake_q);
 373
 374        if (nr_wake < 0 || nr_requeue < 0)
 375                return -EINVAL;
 376
 377        /*
 378         * When PI not supported: return -ENOSYS if requeue_pi is true,
 379         * consequently the compiler knows requeue_pi is always false past
 380         * this point which will optimize away all the conditional code
 381         * further down.
 382         */
 383        if (!IS_ENABLED(CONFIG_FUTEX_PI) && requeue_pi)
 384                return -ENOSYS;
 385
 386        if (requeue_pi) {
 387                /*
 388                 * Requeue PI only works on two distinct uaddrs. This
 389                 * check is only valid for private futexes. See below.
 390                 */
 391                if (uaddr1 == uaddr2)
 392                        return -EINVAL;
 393
 394                /*
 395                 * futex_requeue() allows the caller to define the number
 396                 * of waiters to wake up via the @nr_wake argument. With
 397                 * REQUEUE_PI, waking up more than one waiter is creating
 398                 * more problems than it solves. Waking up a waiter makes
 399                 * only sense if the PI futex @uaddr2 is uncontended as
 400                 * this allows the requeue code to acquire the futex
 401                 * @uaddr2 before waking the waiter. The waiter can then
 402                 * return to user space without further action. A secondary
 403                 * wakeup would just make the futex_wait_requeue_pi()
 404                 * handling more complex, because that code would have to
 405                 * look up pi_state and do more or less all the handling
 406                 * which the requeue code has to do for the to be requeued
 407                 * waiters. So restrict the number of waiters to wake to
 408                 * one, and only wake it up when the PI futex is
 409                 * uncontended. Otherwise requeue it and let the unlock of
 410                 * the PI futex handle the wakeup.
 411                 *
 412                 * All REQUEUE_PI users, e.g. pthread_cond_signal() and
 413                 * pthread_cond_broadcast() must use nr_wake=1.
 414                 */
 415                if (nr_wake != 1)
 416                        return -EINVAL;
 417
 418                /*
 419                 * requeue_pi requires a pi_state, try to allocate it now
 420                 * without any locks in case it fails.
 421                 */
 422                if (refill_pi_state_cache())
 423                        return -ENOMEM;
 424        }
 425
 426retry:
 427        ret = get_futex_key(uaddr1, flags & FLAGS_SHARED, &key1, FUTEX_READ);
 428        if (unlikely(ret != 0))
 429                return ret;
 430        ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2,
 431                            requeue_pi ? FUTEX_WRITE : FUTEX_READ);
 432        if (unlikely(ret != 0))
 433                return ret;
 434
 435        /*
 436         * The check above which compares uaddrs is not sufficient for
 437         * shared futexes. We need to compare the keys:
 438         */
 439        if (requeue_pi && futex_match(&key1, &key2))
 440                return -EINVAL;
 441
 442        hb1 = futex_hash(&key1);
 443        hb2 = futex_hash(&key2);
 444
 445retry_private:
 446        futex_hb_waiters_inc(hb2);
 447        double_lock_hb(hb1, hb2);
 448
 449        if (likely(cmpval != NULL)) {
 450                u32 curval;
 451
 452                ret = futex_get_value_locked(&curval, uaddr1);
 453
 454                if (unlikely(ret)) {
 455                        double_unlock_hb(hb1, hb2);
 456                        futex_hb_waiters_dec(hb2);
 457
 458                        ret = get_user(curval, uaddr1);
 459                        if (ret)
 460                                return ret;
 461
 462                        if (!(flags & FLAGS_SHARED))
 463                                goto retry_private;
 464
 465                        goto retry;
 466                }
 467                if (curval != *cmpval) {
 468                        ret = -EAGAIN;
 469                        goto out_unlock;
 470                }
 471        }
 472
 473        if (requeue_pi) {
 474                struct task_struct *exiting = NULL;
 475
 476                /*
 477                 * Attempt to acquire uaddr2 and wake the top waiter. If we
 478                 * intend to requeue waiters, force setting the FUTEX_WAITERS
 479                 * bit.  We force this here where we are able to easily handle
 480                 * faults rather in the requeue loop below.
 481                 *
 482                 * Updates topwaiter::requeue_state if a top waiter exists.
 483                 */
 484                ret = futex_proxy_trylock_atomic(uaddr2, hb1, hb2, &key1,
 485                                                 &key2, &pi_state,
 486                                                 &exiting, nr_requeue);
 487
 488                /*
 489                 * At this point the top_waiter has either taken uaddr2 or
 490                 * is waiting on it. In both cases pi_state has been
 491                 * established and an initial refcount on it. In case of an
 492                 * error there's nothing.
 493                 *
 494                 * The top waiter's requeue_state is up to date:
 495                 *
 496                 *  - If the lock was acquired atomically (ret == 1), then
 497                 *    the state is Q_REQUEUE_PI_LOCKED.
 498                 *
 499                 *    The top waiter has been dequeued and woken up and can
 500                 *    return to user space immediately. The kernel/user
 501                 *    space state is consistent. In case that there must be
 502                 *    more waiters requeued the WAITERS bit in the user
 503                 *    space futex is set so the top waiter task has to go
 504                 *    into the syscall slowpath to unlock the futex. This
 505                 *    will block until this requeue operation has been
 506                 *    completed and the hash bucket locks have been
 507                 *    dropped.
 508                 *
 509                 *  - If the trylock failed with an error (ret < 0) then
 510                 *    the state is either Q_REQUEUE_PI_NONE, i.e. "nothing
 511                 *    happened", or Q_REQUEUE_PI_IGNORE when there was an
 512                 *    interleaved early wakeup.
 513                 *
 514                 *  - If the trylock did not succeed (ret == 0) then the
 515                 *    state is either Q_REQUEUE_PI_IN_PROGRESS or
 516                 *    Q_REQUEUE_PI_WAIT if an early wakeup interleaved.
 517                 *    This will be cleaned up in the loop below, which
 518                 *    cannot fail because futex_proxy_trylock_atomic() did
 519                 *    the same sanity checks for requeue_pi as the loop
 520                 *    below does.
 521                 */
 522                switch (ret) {
 523                case 0:
 524                        /* We hold a reference on the pi state. */
 525                        break;
 526
 527                case 1:
 528                        /*
 529                         * futex_proxy_trylock_atomic() acquired the user space
 530                         * futex. Adjust task_count.
 531                         */
 532                        task_count++;
 533                        ret = 0;
 534                        break;
 535
 536                /*
 537                 * If the above failed, then pi_state is NULL and
 538                 * waiter::requeue_state is correct.
 539                 */
 540                case -EFAULT:
 541                        double_unlock_hb(hb1, hb2);
 542                        futex_hb_waiters_dec(hb2);
 543                        ret = fault_in_user_writeable(uaddr2);
 544                        if (!ret)
 545                                goto retry;
 546                        return ret;
 547                case -EBUSY:
 548                case -EAGAIN:
 549                        /*
 550                         * Two reasons for this:
 551                         * - EBUSY: Owner is exiting and we just wait for the
 552                         *   exit to complete.
 553                         * - EAGAIN: The user space value changed.
 554                         */
 555                        double_unlock_hb(hb1, hb2);
 556                        futex_hb_waiters_dec(hb2);
 557                        /*
 558                         * Handle the case where the owner is in the middle of
 559                         * exiting. Wait for the exit to complete otherwise
 560                         * this task might loop forever, aka. live lock.
 561                         */
 562                        wait_for_owner_exiting(ret, exiting);
 563                        cond_resched();
 564                        goto retry;
 565                default:
 566                        goto out_unlock;
 567                }
 568        }
 569
 570        plist_for_each_entry_safe(this, next, &hb1->chain, list) {
 571                if (task_count - nr_wake >= nr_requeue)
 572                        break;
 573
 574                if (!futex_match(&this->key, &key1))
 575                        continue;
 576
 577                /*
 578                 * FUTEX_WAIT_REQUEUE_PI and FUTEX_CMP_REQUEUE_PI should always
 579                 * be paired with each other and no other futex ops.
 580                 *
 581                 * We should never be requeueing a futex_q with a pi_state,
 582                 * which is awaiting a futex_unlock_pi().
 583                 */
 584                if ((requeue_pi && !this->rt_waiter) ||
 585                    (!requeue_pi && this->rt_waiter) ||
 586                    this->pi_state) {
 587                        ret = -EINVAL;
 588                        break;
 589                }
 590
 591                /* Plain futexes just wake or requeue and are done */
 592                if (!requeue_pi) {
 593                        if (++task_count <= nr_wake)
 594                                futex_wake_mark(&wake_q, this);
 595                        else
 596                                requeue_futex(this, hb1, hb2, &key2);
 597                        continue;
 598                }
 599
 600                /* Ensure we requeue to the expected futex for requeue_pi. */
 601                if (!futex_match(this->requeue_pi_key, &key2)) {
 602                        ret = -EINVAL;
 603                        break;
 604                }
 605
 606                /*
 607                 * Requeue nr_requeue waiters and possibly one more in the case
 608                 * of requeue_pi if we couldn't acquire the lock atomically.
 609                 *
 610                 * Prepare the waiter to take the rt_mutex. Take a refcount
 611                 * on the pi_state and store the pointer in the futex_q
 612                 * object of the waiter.
 613                 */
 614                get_pi_state(pi_state);
 615
 616                /* Don't requeue when the waiter is already on the way out. */
 617                if (!futex_requeue_pi_prepare(this, pi_state)) {
 618                        /*
 619                         * Early woken waiter signaled that it is on the
 620                         * way out. Drop the pi_state reference and try the
 621                         * next waiter. @this->pi_state is still NULL.
 622                         */
 623                        put_pi_state(pi_state);
 624                        continue;
 625                }
 626
 627                ret = rt_mutex_start_proxy_lock(&pi_state->pi_mutex,
 628                                                this->rt_waiter,
 629                                                this->task);
 630
 631                if (ret == 1) {
 632                        /*
 633                         * We got the lock. We do neither drop the refcount
 634                         * on pi_state nor clear this->pi_state because the
 635                         * waiter needs the pi_state for cleaning up the
 636                         * user space value. It will drop the refcount
 637                         * after doing so. this::requeue_state is updated
 638                         * in the wakeup as well.
 639                         */
 640                        requeue_pi_wake_futex(this, &key2, hb2);
 641                        task_count++;
 642                } else if (!ret) {
 643                        /* Waiter is queued, move it to hb2 */
 644                        requeue_futex(this, hb1, hb2, &key2);
 645                        futex_requeue_pi_complete(this, 0);
 646                        task_count++;
 647                } else {
 648                        /*
 649                         * rt_mutex_start_proxy_lock() detected a potential
 650                         * deadlock when we tried to queue that waiter.
 651                         * Drop the pi_state reference which we took above
 652                         * and remove the pointer to the state from the
 653                         * waiters futex_q object.
 654                         */
 655                        this->pi_state = NULL;
 656                        put_pi_state(pi_state);
 657                        futex_requeue_pi_complete(this, ret);
 658                        /*
 659                         * We stop queueing more waiters and let user space
 660                         * deal with the mess.
 661                         */
 662                        break;
 663                }
 664        }
 665
 666        /*
 667         * We took an extra initial reference to the pi_state in
 668         * futex_proxy_trylock_atomic(). We need to drop it here again.
 669         */
 670        put_pi_state(pi_state);
 671
 672out_unlock:
 673        double_unlock_hb(hb1, hb2);
 674        wake_up_q(&wake_q);
 675        futex_hb_waiters_dec(hb2);
 676        return ret ? ret : task_count;
 677}
 678
 679/**
 680 * handle_early_requeue_pi_wakeup() - Handle early wakeup on the initial futex
 681 * @hb:         the hash_bucket futex_q was original enqueued on
 682 * @q:          the futex_q woken while waiting to be requeued
 683 * @timeout:    the timeout associated with the wait (NULL if none)
 684 *
 685 * Determine the cause for the early wakeup.
 686 *
 687 * Return:
 688 *  -EWOULDBLOCK or -ETIMEDOUT or -ERESTARTNOINTR
 689 */
 690static inline
 691int handle_early_requeue_pi_wakeup(struct futex_hash_bucket *hb,
 692                                   struct futex_q *q,
 693                                   struct hrtimer_sleeper *timeout)
 694{
 695        int ret;
 696
 697        /*
 698         * With the hb lock held, we avoid races while we process the wakeup.
 699         * We only need to hold hb (and not hb2) to ensure atomicity as the
 700         * wakeup code can't change q.key from uaddr to uaddr2 if we hold hb.
 701         * It can't be requeued from uaddr2 to something else since we don't
 702         * support a PI aware source futex for requeue.
 703         */
 704        WARN_ON_ONCE(&hb->lock != q->lock_ptr);
 705
 706        /*
 707         * We were woken prior to requeue by a timeout or a signal.
 708         * Unqueue the futex_q and determine which it was.
 709         */
 710        plist_del(&q->list, &hb->chain);
 711        futex_hb_waiters_dec(hb);
 712
 713        /* Handle spurious wakeups gracefully */
 714        ret = -EWOULDBLOCK;
 715        if (timeout && !timeout->task)
 716                ret = -ETIMEDOUT;
 717        else if (signal_pending(current))
 718                ret = -ERESTARTNOINTR;
 719        return ret;
 720}
 721
 722/**
 723 * futex_wait_requeue_pi() - Wait on uaddr and take uaddr2
 724 * @uaddr:      the futex we initially wait on (non-pi)
 725 * @flags:      futex flags (FLAGS_SHARED, FLAGS_CLOCKRT, etc.), they must be
 726 *              the same type, no requeueing from private to shared, etc.
 727 * @val:        the expected value of uaddr
 728 * @abs_time:   absolute timeout
 729 * @bitset:     32 bit wakeup bitset set by userspace, defaults to all
 730 * @uaddr2:     the pi futex we will take prior to returning to user-space
 731 *
 732 * The caller will wait on uaddr and will be requeued by futex_requeue() to
 733 * uaddr2 which must be PI aware and unique from uaddr.  Normal wakeup will wake
 734 * on uaddr2 and complete the acquisition of the rt_mutex prior to returning to
 735 * userspace.  This ensures the rt_mutex maintains an owner when it has waiters;
 736 * without one, the pi logic would not know which task to boost/deboost, if
 737 * there was a need to.
 738 *
 739 * We call schedule in futex_wait_queue() when we enqueue and return there
 740 * via the following--
 741 * 1) wakeup on uaddr2 after an atomic lock acquisition by futex_requeue()
 742 * 2) wakeup on uaddr2 after a requeue
 743 * 3) signal
 744 * 4) timeout
 745 *
 746 * If 3, cleanup and return -ERESTARTNOINTR.
 747 *
 748 * If 2, we may then block on trying to take the rt_mutex and return via:
 749 * 5) successful lock
 750 * 6) signal
 751 * 7) timeout
 752 * 8) other lock acquisition failure
 753 *
 754 * If 6, return -EWOULDBLOCK (restarting the syscall would do the same).
 755 *
 756 * If 4 or 7, we cleanup and return with -ETIMEDOUT.
 757 *
 758 * Return:
 759 *  -  0 - On success;
 760 *  - <0 - On error
 761 */
 762int futex_wait_requeue_pi(u32 __user *uaddr, unsigned int flags,
 763                          u32 val, ktime_t *abs_time, u32 bitset,
 764                          u32 __user *uaddr2)
 765{
 766        struct hrtimer_sleeper timeout, *to;
 767        struct rt_mutex_waiter rt_waiter;
 768        struct futex_hash_bucket *hb;
 769        union futex_key key2 = FUTEX_KEY_INIT;
 770        struct futex_q q = futex_q_init;
 771        struct rt_mutex_base *pi_mutex;
 772        int res, ret;
 773
 774        if (!IS_ENABLED(CONFIG_FUTEX_PI))
 775                return -ENOSYS;
 776
 777        if (uaddr == uaddr2)
 778                return -EINVAL;
 779
 780        if (!bitset)
 781                return -EINVAL;
 782
 783        to = futex_setup_timer(abs_time, &timeout, flags,
 784                               current->timer_slack_ns);
 785
 786        /*
 787         * The waiter is allocated on our stack, manipulated by the requeue
 788         * code while we sleep on uaddr.
 789         */
 790        rt_mutex_init_waiter(&rt_waiter);
 791
 792        ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, FUTEX_WRITE);
 793        if (unlikely(ret != 0))
 794                goto out;
 795
 796        q.bitset = bitset;
 797        q.rt_waiter = &rt_waiter;
 798        q.requeue_pi_key = &key2;
 799
 800        /*
 801         * Prepare to wait on uaddr. On success, it holds hb->lock and q
 802         * is initialized.
 803         */
 804        ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
 805        if (ret)
 806                goto out;
 807
 808        /*
 809         * The check above which compares uaddrs is not sufficient for
 810         * shared futexes. We need to compare the keys:
 811         */
 812        if (futex_match(&q.key, &key2)) {
 813                futex_q_unlock(hb);
 814                ret = -EINVAL;
 815                goto out;
 816        }
 817
 818        /* Queue the futex_q, drop the hb lock, wait for wakeup. */
 819        futex_wait_queue(hb, &q, to);
 820
 821        switch (futex_requeue_pi_wakeup_sync(&q)) {
 822        case Q_REQUEUE_PI_IGNORE:
 823                /* The waiter is still on uaddr1 */
 824                spin_lock(&hb->lock);
 825                ret = handle_early_requeue_pi_wakeup(hb, &q, to);
 826                spin_unlock(&hb->lock);
 827                break;
 828
 829        case Q_REQUEUE_PI_LOCKED:
 830                /* The requeue acquired the lock */
 831                if (q.pi_state && (q.pi_state->owner != current)) {
 832                        spin_lock(q.lock_ptr);
 833                        ret = fixup_pi_owner(uaddr2, &q, true);
 834                        /*
 835                         * Drop the reference to the pi state which the
 836                         * requeue_pi() code acquired for us.
 837                         */
 838                        put_pi_state(q.pi_state);
 839                        spin_unlock(q.lock_ptr);
 840                        /*
 841                         * Adjust the return value. It's either -EFAULT or
 842                         * success (1) but the caller expects 0 for success.
 843                         */
 844                        ret = ret < 0 ? ret : 0;
 845                }
 846                break;
 847
 848        case Q_REQUEUE_PI_DONE:
 849                /* Requeue completed. Current is 'pi_blocked_on' the rtmutex */
 850                pi_mutex = &q.pi_state->pi_mutex;
 851                ret = rt_mutex_wait_proxy_lock(pi_mutex, to, &rt_waiter);
 852
 853                /* Current is not longer pi_blocked_on */
 854                spin_lock(q.lock_ptr);
 855                if (ret && !rt_mutex_cleanup_proxy_lock(pi_mutex, &rt_waiter))
 856                        ret = 0;
 857
 858                debug_rt_mutex_free_waiter(&rt_waiter);
 859                /*
 860                 * Fixup the pi_state owner and possibly acquire the lock if we
 861                 * haven't already.
 862                 */
 863                res = fixup_pi_owner(uaddr2, &q, !ret);
 864                /*
 865                 * If fixup_pi_owner() returned an error, propagate that.  If it
 866                 * acquired the lock, clear -ETIMEDOUT or -EINTR.
 867                 */
 868                if (res)
 869                        ret = (res < 0) ? res : 0;
 870
 871                futex_unqueue_pi(&q);
 872                spin_unlock(q.lock_ptr);
 873
 874                if (ret == -EINTR) {
 875                        /*
 876                         * We've already been requeued, but cannot restart
 877                         * by calling futex_lock_pi() directly. We could
 878                         * restart this syscall, but it would detect that
 879                         * the user space "val" changed and return
 880                         * -EWOULDBLOCK.  Save the overhead of the restart
 881                         * and return -EWOULDBLOCK directly.
 882                         */
 883                        ret = -EWOULDBLOCK;
 884                }
 885                break;
 886        default:
 887                BUG();
 888        }
 889
 890out:
 891        if (to) {
 892                hrtimer_cancel(&to->timer);
 893                destroy_hrtimer_on_stack(&to->timer);
 894        }
 895        return ret;
 896}
 897
 898