1/* rwsem.c: R/W semaphores: contention handling functions 2 * 3 * Written by David Howells (dhowells@redhat.com). 4 * Derived from arch/i386/kernel/semaphore.c 5 * 6 * Writer lock-stealing by Alex Shi <alex.shi@intel.com> 7 * and Michel Lespinasse <walken@google.com> 8 * 9 * Optimistic spinning by Tim Chen <tim.c.chen@intel.com> 10 * and Davidlohr Bueso <davidlohr@hp.com>. Based on mutexes. 11 */ 12#include <linux/rwsem.h> 13#include <linux/sched.h> 14#include <linux/init.h> 15#include <linux/export.h> 16#include <linux/sched/rt.h> 17#include <linux/osq_lock.h> 18 19#include "../kernel/rwsem.h" 20 21/* 22 * Guide to the rw_semaphore's count field for common values. 23 * (32-bit case illustrated, similar for 64-bit) 24 * 25 * 0x0000000X (1) X readers active or attempting lock, no writer waiting 26 * X = #active_readers + #readers attempting to lock 27 * (X*ACTIVE_BIAS) 28 * 29 * 0x00000000 rwsem is unlocked, and no one is waiting for the lock or 30 * attempting to read lock or write lock. 31 * 32 * 0xffff000X (1) X readers active or attempting lock, with waiters for lock 33 * X = #active readers + # readers attempting lock 34 * (X*ACTIVE_BIAS + WAITING_BIAS) 35 * (2) 1 writer attempting lock, no waiters for lock 36 * X-1 = #active readers + #readers attempting lock 37 * ((X-1)*ACTIVE_BIAS + ACTIVE_WRITE_BIAS) 38 * (3) 1 writer active, no waiters for lock 39 * X-1 = #active readers + #readers attempting lock 40 * ((X-1)*ACTIVE_BIAS + ACTIVE_WRITE_BIAS) 41 * 42 * 0xffff0001 (1) 1 reader active or attempting lock, waiters for lock 43 * (WAITING_BIAS + ACTIVE_BIAS) 44 * (2) 1 writer active or attempting lock, no waiters for lock 45 * (ACTIVE_WRITE_BIAS) 46 * 47 * 0xffff0000 (1) There are writers or readers queued but none active 48 * or in the process of attempting lock. 49 * (WAITING_BIAS) 50 * Note: writer can attempt to steal lock for this count by adding 51 * ACTIVE_WRITE_BIAS in cmpxchg and checking the old count 52 * 53 * 0xfffe0001 (1) 1 writer active, or attempting lock. Waiters on queue. 54 * (ACTIVE_WRITE_BIAS + WAITING_BIAS) 55 * 56 * Note: Readers attempt to lock by adding ACTIVE_BIAS in down_read and checking 57 * the count becomes more than 0 for successful lock acquisition, 58 * i.e. the case where there are only readers or nobody has lock. 59 * (1st and 2nd case above). 60 * 61 * Writers attempt to lock by adding ACTIVE_WRITE_BIAS in down_write and 62 * checking the count becomes ACTIVE_WRITE_BIAS for successful lock 63 * acquisition (i.e. nobody else has lock or attempts lock). If 64 * unsuccessful, in rwsem_down_write_failed, we'll check to see if there 65 * are only waiters but none active (5th case above), and attempt to 66 * steal the lock. 67 * 68 */ 69 70/* 71 * Initialize an rwsem: 72 */ 73void __init_rwsem(struct rw_semaphore *sem, const char *name, 74 struct lock_class_key *key) 75{ 76#ifdef CONFIG_DEBUG_LOCK_ALLOC 77 /* 78 * Make sure we are not reinitializing a held semaphore: 79 */ 80 debug_check_no_locks_freed((void *)sem, sizeof(*sem)); 81 lockdep_init_map(&sem->dep_map, name, key, 0); 82#endif 83 atomic_long_set(&sem->count, RWSEM_UNLOCKED_VALUE); 84 raw_spin_lock_init(&sem->wait_lock); 85 INIT_SLIST_HEAD(&sem->wait_list); 86#ifdef CONFIG_RWSEM_SPIN_ON_OWNER 87 sem->owner = NULL; 88 osq_lock_init(&sem->osq); 89#endif 90} 91 92EXPORT_SYMBOL(__init_rwsem); 93 94enum rwsem_waiter_type { 95 RWSEM_WAITING_FOR_WRITE, 96 RWSEM_WAITING_FOR_READ 97}; 98 99struct rwsem_waiter { 100 struct list_head list; 101 struct task_struct *task; 102 enum rwsem_waiter_type type; 103}; 104 105enum rwsem_wake_type { 106 RWSEM_WAKE_ANY, /* Wake whatever's at head of wait list */ 107 RWSEM_WAKE_READERS, /* Wake readers only */ 108 RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */ 109}; 110 111/* 112 * handle the lock release when processes blocked on it that can now run 113 * - if we come here from up_xxxx(), then: 114 * - the 'active part' of count (&0x0000ffff) reached 0 (but may have changed) 115 * - the 'waiting part' of count (&0xffff0000) is -ve (and will still be so) 116 * - there must be someone on the queue 117 * - the wait_lock must be held by the caller 118 * - tasks are marked for wakeup, the caller must later invoke wake_up_q() 119 * to actually wakeup the blocked task(s) and drop the reference count, 120 * preferably when the wait_lock is released 121 * - woken process blocks are discarded from the list after having task zeroed 122 * - writers are only marked woken if downgrading is false 123 */ 124static void __rwsem_mark_wake(struct rw_semaphore *sem, 125 enum rwsem_wake_type wake_type, 126 struct wake_q_head *wake_q) 127{ 128 struct rwsem_waiter *waiter, *waiter2; 129 long oldcount, woken, adjustment = 0; 130 131 /* 132 * Take a peek at the queue head waiter such that we can determine 133 * the wakeup(s) to perform. 134 */ 135 waiter = list_first_entry(&sem->wait_list, struct rwsem_waiter, list); 136 137 if (waiter->type == RWSEM_WAITING_FOR_WRITE) { 138 if (wake_type == RWSEM_WAKE_ANY) { 139 /* 140 * Mark writer at the front of the queue for wakeup. 141 * Until the task is actually later awoken later by 142 * the caller, other writers are able to steal it. 143 * Readers, on the other hand, will block as they 144 * will notice the queued writer. 145 */ 146 wake_q_add(wake_q, waiter->task); 147 } 148 return; 149 } 150 151 /* 152 * Writers might steal the lock before we grant it to the next reader. 153 * We prefer to do the first reader grant before counting readers 154 * so we can bail out early if a writer stole the lock. 155 */ 156 if (wake_type != RWSEM_WAKE_READ_OWNED) { 157 adjustment = RWSEM_ACTIVE_READ_BIAS; 158 try_reader_grant: 159 oldcount = atomic_long_add_return(adjustment, &sem->count) - adjustment; 160 if (unlikely(oldcount < RWSEM_WAITING_BIAS)) { 161 /* 162 * If the count is still less than RWSEM_WAITING_BIAS 163 * after removing the adjustment, it is assumed that 164 * a writer has stolen the lock. We have to undo our 165 * reader grant. 166 */ 167 if (atomic_long_add_return(-adjustment, &sem->count) < 168 RWSEM_WAITING_BIAS) 169 return; 170 171 /* Last active locker left. Retry waking readers. */ 172 goto try_reader_grant; 173 } 174 /* 175 * It is not really necessary to set it to reader-owned here, 176 * but it gives the spinners an early indication that the 177 * readers now have the lock. 178 */ 179 __rwsem_set_reader_owned(sem, waiter->task); 180 } 181 182 /* 183 * Grant an infinite number of read locks to the readers at the front 184 * of the queue. We know that woken will be at least 1 as we accounted 185 * for above. Note we increment the 'active part' of the count by the 186 * number of readers before waking any processes up. 187 * 188 * We have to do wakeup in 2 passes to prevent the possibility that 189 * the reader count may be decremented before it is incremented. It 190 * is because the to-be-woken waiter may not have slept yet. So it 191 * may see waiter->task got cleared, finish its critical section and 192 * do an unlock before the reader count increment. 193 * 194 * 1) Count the read-waiters in the slist, and fully increment the 195 * reader count in rwsem. 196 * 2) For the given count number of waiters in the slist, clear 197 * waiter->task and put them into wake_q to be woken up later. 198 */ 199 woken = 1; 200 list_for_each_entry(waiter2, &waiter->list, list) { 201 if (waiter2->type == RWSEM_WAITING_FOR_WRITE) 202 break; 203 204 woken++; 205 } 206 207 adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment; 208 if (waiter2 == waiter) { 209 /* hit end of list above */ 210 adjustment -= RWSEM_WAITING_BIAS; 211 } 212 213 if (adjustment) 214 atomic_long_add(adjustment, &sem->count); 215 216 while (woken--) { 217 struct task_struct *tsk; 218 219 waiter = list_entry(sem->wait_list.next, typeof(*waiter), list); 220 tsk = waiter->task; 221 222 get_task_struct(tsk); 223 slist_del(&waiter->list, &sem->wait_list); 224 /* 225 * Ensure calling get_task_struct() before setting the reader 226 * waiter to nil such that rwsem_down_read_failed() cannot 227 * race with do_exit() by always holding a reference count 228 * to the task to wakeup. 229 */ 230 smp_store_release(&waiter->task, NULL); 231 /* 232 * Ensure issuing the wakeup (either by us or someone else) 233 * after setting the reader waiter to nil. 234 */ 235 wake_q_add(wake_q, tsk); 236 /* wake_q_add() already take the task ref */ 237 put_task_struct(tsk); 238 } 239} 240 241/* 242 * Wait for the read lock to be granted 243 */ 244__visible 245struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem) 246{ 247 long count, adjustment = -RWSEM_ACTIVE_READ_BIAS; 248 struct rwsem_waiter waiter; 249 struct task_struct *tsk = current; 250 WAKE_Q(wake_q); 251 252 /* set up my own style of waitqueue */ 253 waiter.task = tsk; 254 waiter.type = RWSEM_WAITING_FOR_READ; 255 256 raw_spin_lock_irq(&sem->wait_lock); 257 if (slist_empty(&sem->wait_list)) { 258 /* 259 * In case the wait queue is empty and the lock isn't owned 260 * by a writer, this reader can exit the slowpath and return 261 * immediately as its RWSEM_ACTIVE_READ_BIAS has already 262 * been set in the count. 263 */ 264 if (atomic_long_read(&sem->count) >= 0) { 265 raw_spin_unlock_irq(&sem->wait_lock); 266 return sem; 267 } 268 adjustment += RWSEM_WAITING_BIAS; 269 } 270 slist_add_tail(&waiter.list, &sem->wait_list); 271 272 /* we're now waiting on the lock, but no longer actively locking */ 273 count = atomic_long_add_return(adjustment, &sem->count); 274 275 /* 276 * If there are no active locks, wake the front queued process(es). 277 * 278 * If there are no writers and we are first in the queue, 279 * wake our own waiter to join the existing active readers ! 280 */ 281 if (count == RWSEM_WAITING_BIAS || 282 (count > RWSEM_WAITING_BIAS && 283 adjustment != -RWSEM_ACTIVE_READ_BIAS)) 284 __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); 285 286 raw_spin_unlock_irq(&sem->wait_lock); 287 wake_up_q(&wake_q); 288 289 /* wait to be given the lock */ 290 while (true) { 291 set_task_state(tsk, TASK_UNINTERRUPTIBLE); 292 if (!waiter.task) 293 break; 294 schedule(); 295 } 296 297 __set_task_state(tsk, TASK_RUNNING); 298 return sem; 299} 300EXPORT_SYMBOL(rwsem_down_read_failed); 301 302/* 303 * This function must be called with the sem->wait_lock held to prevent 304 * race conditions between checking the rwsem wait list and setting the 305 * sem->count accordingly. 306 */ 307static inline bool rwsem_try_write_lock(long count, struct rw_semaphore *sem) 308{ 309 /* 310 * Avoid trying to acquire write lock if count isn't RWSEM_WAITING_BIAS. 311 */ 312 if (count != RWSEM_WAITING_BIAS) 313 return false; 314 315 /* 316 * Acquire the lock by trying to set it to ACTIVE_WRITE_BIAS. If there 317 * are other tasks on the wait list, we need to add on WAITING_BIAS. 318 */ 319 count = slist_is_singular(&sem->wait_list) ? 320 RWSEM_ACTIVE_WRITE_BIAS : 321 RWSEM_ACTIVE_WRITE_BIAS + RWSEM_WAITING_BIAS; 322 323 if (atomic_long_cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) 324 == RWSEM_WAITING_BIAS) { 325 rwsem_set_owner(sem); 326 return true; 327 } 328 329 return false; 330} 331 332#ifdef CONFIG_RWSEM_SPIN_ON_OWNER 333/* 334 * Try to acquire write lock before the writer has been put on wait queue. 335 */ 336static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem) 337{ 338 long old, count = atomic_long_read(&sem->count); 339 340 while (true) { 341 if (!(count == 0 || count == RWSEM_WAITING_BIAS)) 342 return false; 343 344 old = atomic_long_cmpxchg(&sem->count, count, 345 count + RWSEM_ACTIVE_WRITE_BIAS); 346 if (old == count) { 347 rwsem_set_owner(sem); 348 return true; 349 } 350 351 count = old; 352 } 353} 354 355static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem) 356{ 357 struct task_struct *owner; 358 bool ret = true; 359 360 if (need_resched()) 361 return false; 362 363 rcu_read_lock(); 364 owner = READ_ONCE(sem->owner); 365 if (!owner || !is_rwsem_owner_spinnable(owner)) { 366 ret = !owner; /* !owner is spinnable */ 367 goto done; 368 } 369 370 /* 371 * Don't spin if the owner field hasn't been properly initialized yet. 372 */ 373 if (unlikely(owner == (void *)&sem->wait_list)) 374 return false; 375 376 ret = owner->on_cpu; 377done: 378 rcu_read_unlock(); 379 return ret; 380} 381 382/* 383 * Return true only if we can still spin on the owner field of the rwsem. 384 */ 385static noinline bool rwsem_spin_on_owner(struct rw_semaphore *sem) 386{ 387 struct task_struct *owner = READ_ONCE(sem->owner); 388 389 if (!is_rwsem_owner_spinnable(owner)) 390 return false; 391 392 rcu_read_lock(); 393 while (owner && (READ_ONCE(sem->owner) == owner)) { 394 /* 395 * Ensure we emit the owner->on_cpu, dereference _after_ 396 * checking sem->owner still matches owner, if that fails, 397 * owner might point to free()d memory, if it still matches, 398 * the rcu_read_lock() ensures the memory stays valid. 399 */ 400 barrier(); 401 402 /* abort spinning when need_resched or owner is not running */ 403 if (!owner->on_cpu || need_resched()) { 404 rcu_read_unlock(); 405 return false; 406 } 407 408 arch_mutex_cpu_relax(); 409 } 410 rcu_read_unlock(); 411 412 /* 413 * If there is a new owner or the owner is not set, we continue 414 * spinning. 415 */ 416 return is_rwsem_owner_spinnable(READ_ONCE(sem->owner)); 417} 418 419static bool rwsem_optimistic_spin(struct rw_semaphore *sem) 420{ 421 bool taken = false; 422 423 preempt_disable(); 424 425 /* sem->wait_lock should not be held when doing optimistic spinning */ 426 if (!rwsem_can_spin_on_owner(sem)) 427 goto done; 428 429 if (!osq_lock(&sem->osq)) 430 goto done; 431 432 /* 433 * Optimistically spin on the owner field and attempt to acquire the 434 * lock whenever the owner changes. Spinning will be stopped when: 435 * 1) the owning writer isn't running; or 436 * 2) readers own the lock as we can't determine if they are 437 * actively running or not. 438 */ 439 while (rwsem_spin_on_owner(sem)) { 440 /* 441 * Try to acquire the lock 442 */ 443 if (rwsem_try_write_lock_unqueued(sem)) { 444 taken = true; 445 break; 446 } 447 448 /* 449 * When there's no owner, we might have preempted between the 450 * owner acquiring the lock and setting the owner field. If 451 * we're an RT task that will live-lock because we won't let 452 * the owner complete. 453 */ 454 if (!sem->owner && (need_resched() || rt_task(current))) 455 break; 456 457 /* 458 * The cpu_relax() call is a compiler barrier which forces 459 * everything in this loop to be re-loaded. We don't need 460 * memory barriers as we'll eventually observe the right 461 * values at the cost of a few extra spins. 462 */ 463 arch_mutex_cpu_relax(); 464 } 465 osq_unlock(&sem->osq); 466done: 467 preempt_enable(); 468 return taken; 469} 470 471/* 472 * Return true if the rwsem has active spinner 473 */ 474static inline bool rwsem_has_spinner(struct rw_semaphore *sem) 475{ 476 return osq_is_locked(&sem->osq); 477} 478 479#else 480static bool rwsem_optimistic_spin(struct rw_semaphore *sem) 481{ 482 return false; 483} 484 485static inline bool rwsem_has_spinner(struct rw_semaphore *sem) 486{ 487 return false; 488} 489#endif 490 491/* 492 * Wait until we successfully acquire the write lock 493 */ 494__visible 495struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem) 496{ 497 long count; 498 bool waiting = true; /* any queued threads before us */ 499 struct rwsem_waiter waiter; 500 501 /* undo write bias from down_write operation, stop active locking */ 502 count = atomic_long_sub_return(RWSEM_ACTIVE_WRITE_BIAS, &sem->count); 503 504 /* do optimistic spinning and steal lock if possible */ 505 if (rwsem_optimistic_spin(sem)) 506 return sem; 507 508 /* 509 * Optimistic spinning failed, proceed to the slowpath 510 * and block until we can acquire the sem. 511 */ 512 waiter.task = current; 513 waiter.type = RWSEM_WAITING_FOR_WRITE; 514 515 raw_spin_lock_irq(&sem->wait_lock); 516 517 /* account for this before adding a new element to the list */ 518 if (slist_empty(&sem->wait_list)) 519 waiting = false; 520 521 slist_add_tail(&waiter.list, &sem->wait_list); 522 523 /* we're now waiting on the lock, but no longer actively locking */ 524 if (waiting) { 525 count = atomic_long_read(&sem->count); 526 527 /* 528 * If there were already threads queued before us and there are 529 * no active writers, the lock must be read owned; so we try to 530 * wake any read locks that were queued ahead of us. 531 */ 532 if (count > RWSEM_WAITING_BIAS) { 533 WAKE_Q(wake_q); 534 535 __rwsem_mark_wake(sem, RWSEM_WAKE_READERS, &wake_q); 536 /* 537 * The wakeup is normally called _after_ the wait_lock 538 * is released, but given that we are proactively waking 539 * readers we can deal with the wake_q overhead as it is 540 * similar to releasing and taking the wait_lock again 541 * for attempting rwsem_try_write_lock(). 542 */ 543 wake_up_q(&wake_q); 544 } 545 546 } else 547 count = atomic_long_add_return(RWSEM_WAITING_BIAS, &sem->count); 548 549 /* wait until we successfully acquire the lock */ 550 set_current_state(TASK_UNINTERRUPTIBLE); 551 while (true) { 552 if (rwsem_try_write_lock(count, sem)) 553 break; 554 raw_spin_unlock_irq(&sem->wait_lock); 555 556 /* Block until there are no active lockers. */ 557 do { 558 schedule(); 559 set_current_state(TASK_UNINTERRUPTIBLE); 560 } while ((count = atomic_long_read(&sem->count)) & RWSEM_ACTIVE_MASK); 561 562 raw_spin_lock_irq(&sem->wait_lock); 563 } 564 __set_current_state(TASK_RUNNING); 565 566 slist_del(&waiter.list, &sem->wait_list); 567 raw_spin_unlock_irq(&sem->wait_lock); 568 569 return sem; 570} 571EXPORT_SYMBOL(rwsem_down_write_failed); 572 573/* 574 * handle waking up a waiter on the semaphore 575 * - up_read/up_write has decremented the active part of count if we come here 576 */ 577__visible 578struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem) 579{ 580 unsigned long flags; 581 WAKE_Q(wake_q); 582 583 /* 584 * If a spinner is present, it is not necessary to do the wakeup. 585 * Try to do wakeup only if the trylock succeeds to minimize 586 * spinlock contention which may introduce too much delay in the 587 * unlock operation. 588 * 589 * spinning writer up_write/up_read caller 590 * --------------- ----------------------- 591 * [S] osq_unlock() [L] osq 592 * MB RMB 593 * [RmW] rwsem_try_write_lock() [RmW] spin_trylock(wait_lock) 594 * 595 * Here, it is important to make sure that there won't be a missed 596 * wakeup while the rwsem is free and the only spinning writer goes 597 * to sleep without taking the rwsem. Even when the spinning writer 598 * is just going to break out of the waiting loop, it will still do 599 * a trylock in rwsem_down_write_failed() before sleeping. IOW, if 600 * rwsem_has_spinner() is true, it will guarantee at least one 601 * trylock attempt on the rwsem later on. 602 */ 603 if (rwsem_has_spinner(sem)) { 604 /* 605 * The smp_rmb() here is to make sure that the spinner 606 * state is consulted before reading the wait_lock. 607 */ 608 smp_rmb(); 609 if (!raw_spin_trylock_irqsave(&sem->wait_lock, flags)) 610 return sem; 611 goto locked; 612 } 613 raw_spin_lock_irqsave(&sem->wait_lock, flags); 614locked: 615 616 /* do nothing if list empty */ 617 if (!slist_empty(&sem->wait_list)) 618 __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); 619 620 raw_spin_unlock_irqrestore(&sem->wait_lock, flags); 621 wake_up_q(&wake_q); 622 623 return sem; 624} 625EXPORT_SYMBOL(rwsem_wake); 626 627/* 628 * downgrade a write lock into a read lock 629 * - caller incremented waiting part of count and discovered it still negative 630 * - just wake up any readers at the front of the queue 631 */ 632__visible 633struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem) 634{ 635 unsigned long flags; 636 WAKE_Q(wake_q); 637 638 raw_spin_lock_irqsave(&sem->wait_lock, flags); 639 640 /* do nothing if list empty */ 641 if (!slist_empty(&sem->wait_list)) 642 __rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, &wake_q); 643 644 raw_spin_unlock_irqrestore(&sem->wait_lock, flags); 645 wake_up_q(&wake_q); 646 647 return sem; 648} 649EXPORT_SYMBOL(rwsem_downgrade_wake); 650