linux/kernel/rcu/srcu.c
<<
>>
Prefs
   1/*
   2 * Sleepable Read-Copy Update mechanism for mutual exclusion.
   3 *
   4 * This program is free software; you can redistribute it and/or modify
   5 * it under the terms of the GNU General Public License as published by
   6 * the Free Software Foundation; either version 2 of the License, or
   7 * (at your option) any later version.
   8 *
   9 * This program is distributed in the hope that it will be useful,
  10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12 * GNU General Public License for more details.
  13 *
  14 * You should have received a copy of the GNU General Public License
  15 * along with this program; if not, you can access it online at
  16 * http://www.gnu.org/licenses/gpl-2.0.html.
  17 *
  18 * Copyright (C) IBM Corporation, 2006
  19 * Copyright (C) Fujitsu, 2012
  20 *
  21 * Author: Paul McKenney <paulmck@us.ibm.com>
  22 *         Lai Jiangshan <laijs@cn.fujitsu.com>
  23 *
  24 * For detailed explanation of Read-Copy Update mechanism see -
  25 *              Documentation/RCU/ *.txt
  26 *
  27 */
  28
  29#include <linux/export.h>
  30#include <linux/mutex.h>
  31#include <linux/percpu.h>
  32#include <linux/preempt.h>
  33#include <linux/rcupdate.h>
  34#include <linux/sched.h>
  35#include <linux/smp.h>
  36#include <linux/delay.h>
  37#include <linux/srcu.h>
  38
  39#include "rcu.h"
  40
  41/*
  42 * Initialize an rcu_batch structure to empty.
  43 */
  44static inline void rcu_batch_init(struct rcu_batch *b)
  45{
  46        b->head = NULL;
  47        b->tail = &b->head;
  48}
  49
  50/*
  51 * Enqueue a callback onto the tail of the specified rcu_batch structure.
  52 */
  53static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head)
  54{
  55        *b->tail = head;
  56        b->tail = &head->next;
  57}
  58
  59/*
  60 * Is the specified rcu_batch structure empty?
  61 */
  62static inline bool rcu_batch_empty(struct rcu_batch *b)
  63{
  64        return b->tail == &b->head;
  65}
  66
  67/*
  68 * Remove the callback at the head of the specified rcu_batch structure
  69 * and return a pointer to it, or return NULL if the structure is empty.
  70 */
  71static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b)
  72{
  73        struct rcu_head *head;
  74
  75        if (rcu_batch_empty(b))
  76                return NULL;
  77
  78        head = b->head;
  79        b->head = head->next;
  80        if (b->tail == &head->next)
  81                rcu_batch_init(b);
  82
  83        return head;
  84}
  85
  86/*
  87 * Move all callbacks from the rcu_batch structure specified by "from" to
  88 * the structure specified by "to".
  89 */
  90static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
  91{
  92        if (!rcu_batch_empty(from)) {
  93                *to->tail = from->head;
  94                to->tail = from->tail;
  95                rcu_batch_init(from);
  96        }
  97}
  98
  99static int init_srcu_struct_fields(struct srcu_struct *sp)
 100{
 101        sp->completed = 0;
 102        spin_lock_init(&sp->queue_lock);
 103        sp->running = false;
 104        rcu_batch_init(&sp->batch_queue);
 105        rcu_batch_init(&sp->batch_check0);
 106        rcu_batch_init(&sp->batch_check1);
 107        rcu_batch_init(&sp->batch_done);
 108        INIT_DELAYED_WORK(&sp->work, process_srcu);
 109        sp->per_cpu_ref = alloc_percpu(struct srcu_struct_array);
 110        return sp->per_cpu_ref ? 0 : -ENOMEM;
 111}
 112
 113#ifdef CONFIG_DEBUG_LOCK_ALLOC
 114
 115int __init_srcu_struct(struct srcu_struct *sp, const char *name,
 116                       struct lock_class_key *key)
 117{
 118        /* Don't re-initialize a lock while it is held. */
 119        debug_check_no_locks_freed((void *)sp, sizeof(*sp));
 120        lockdep_init_map(&sp->dep_map, name, key, 0);
 121        return init_srcu_struct_fields(sp);
 122}
 123EXPORT_SYMBOL_GPL(__init_srcu_struct);
 124
 125#else /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
 126
 127/**
 128 * init_srcu_struct - initialize a sleep-RCU structure
 129 * @sp: structure to initialize.
 130 *
 131 * Must invoke this on a given srcu_struct before passing that srcu_struct
 132 * to any other function.  Each srcu_struct represents a separate domain
 133 * of SRCU protection.
 134 */
 135int init_srcu_struct(struct srcu_struct *sp)
 136{
 137        return init_srcu_struct_fields(sp);
 138}
 139EXPORT_SYMBOL_GPL(init_srcu_struct);
 140
 141#endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */
 142
 143/*
 144 * Returns approximate total of the readers' ->seq[] values for the
 145 * rank of per-CPU counters specified by idx.
 146 */
 147static unsigned long srcu_readers_seq_idx(struct srcu_struct *sp, int idx)
 148{
 149        int cpu;
 150        unsigned long sum = 0;
 151        unsigned long t;
 152
 153        for_each_possible_cpu(cpu) {
 154                t = READ_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->seq[idx]);
 155                sum += t;
 156        }
 157        return sum;
 158}
 159
 160/*
 161 * Returns approximate number of readers active on the specified rank
 162 * of the per-CPU ->c[] counters.
 163 */
 164static unsigned long srcu_readers_active_idx(struct srcu_struct *sp, int idx)
 165{
 166        int cpu;
 167        unsigned long sum = 0;
 168        unsigned long t;
 169
 170        for_each_possible_cpu(cpu) {
 171                t = READ_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx]);
 172                sum += t;
 173        }
 174        return sum;
 175}
 176
 177/*
 178 * Return true if the number of pre-existing readers is determined to
 179 * be stably zero.  An example unstable zero can occur if the call
 180 * to srcu_readers_active_idx() misses an __srcu_read_lock() increment,
 181 * but due to task migration, sees the corresponding __srcu_read_unlock()
 182 * decrement.  This can happen because srcu_readers_active_idx() takes
 183 * time to sum the array, and might in fact be interrupted or preempted
 184 * partway through the summation.
 185 */
 186static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx)
 187{
 188        unsigned long seq;
 189
 190        seq = srcu_readers_seq_idx(sp, idx);
 191
 192        /*
 193         * The following smp_mb() A pairs with the smp_mb() B located in
 194         * __srcu_read_lock().  This pairing ensures that if an
 195         * __srcu_read_lock() increments its counter after the summation
 196         * in srcu_readers_active_idx(), then the corresponding SRCU read-side
 197         * critical section will see any changes made prior to the start
 198         * of the current SRCU grace period.
 199         *
 200         * Also, if the above call to srcu_readers_seq_idx() saw the
 201         * increment of ->seq[], then the call to srcu_readers_active_idx()
 202         * must see the increment of ->c[].
 203         */
 204        smp_mb(); /* A */
 205
 206        /*
 207         * Note that srcu_readers_active_idx() can incorrectly return
 208         * zero even though there is a pre-existing reader throughout.
 209         * To see this, suppose that task A is in a very long SRCU
 210         * read-side critical section that started on CPU 0, and that
 211         * no other reader exists, so that the sum of the counters
 212         * is equal to one.  Then suppose that task B starts executing
 213         * srcu_readers_active_idx(), summing up to CPU 1, and then that
 214         * task C starts reading on CPU 0, so that its increment is not
 215         * summed, but finishes reading on CPU 2, so that its decrement
 216         * -is- summed.  Then when task B completes its sum, it will
 217         * incorrectly get zero, despite the fact that task A has been
 218         * in its SRCU read-side critical section the whole time.
 219         *
 220         * We therefore do a validation step should srcu_readers_active_idx()
 221         * return zero.
 222         */
 223        if (srcu_readers_active_idx(sp, idx) != 0)
 224                return false;
 225
 226        /*
 227         * The remainder of this function is the validation step.
 228         * The following smp_mb() D pairs with the smp_mb() C in
 229         * __srcu_read_unlock().  If the __srcu_read_unlock() was seen
 230         * by srcu_readers_active_idx() above, then any destructive
 231         * operation performed after the grace period will happen after
 232         * the corresponding SRCU read-side critical section.
 233         *
 234         * Note that there can be at most NR_CPUS worth of readers using
 235         * the old index, which is not enough to overflow even a 32-bit
 236         * integer.  (Yes, this does mean that systems having more than
 237         * a billion or so CPUs need to be 64-bit systems.)  Therefore,
 238         * the sum of the ->seq[] counters cannot possibly overflow.
 239         * Therefore, the only way that the return values of the two
 240         * calls to srcu_readers_seq_idx() can be equal is if there were
 241         * no increments of the corresponding rank of ->seq[] counts
 242         * in the interim.  But the missed-increment scenario laid out
 243         * above includes an increment of the ->seq[] counter by
 244         * the corresponding __srcu_read_lock().  Therefore, if this
 245         * scenario occurs, the return values from the two calls to
 246         * srcu_readers_seq_idx() will differ, and thus the validation
 247         * step below suffices.
 248         */
 249        smp_mb(); /* D */
 250
 251        return srcu_readers_seq_idx(sp, idx) == seq;
 252}
 253
 254/**
 255 * srcu_readers_active - returns true if there are readers. and false
 256 *                       otherwise
 257 * @sp: which srcu_struct to count active readers (holding srcu_read_lock).
 258 *
 259 * Note that this is not an atomic primitive, and can therefore suffer
 260 * severe errors when invoked on an active srcu_struct.  That said, it
 261 * can be useful as an error check at cleanup time.
 262 */
 263static bool srcu_readers_active(struct srcu_struct *sp)
 264{
 265        int cpu;
 266        unsigned long sum = 0;
 267
 268        for_each_possible_cpu(cpu) {
 269                sum += READ_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[0]);
 270                sum += READ_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[1]);
 271        }
 272        return sum;
 273}
 274
 275/**
 276 * cleanup_srcu_struct - deconstruct a sleep-RCU structure
 277 * @sp: structure to clean up.
 278 *
 279 * Must invoke this after you are finished using a given srcu_struct that
 280 * was initialized via init_srcu_struct(), else you leak memory.
 281 */
 282void cleanup_srcu_struct(struct srcu_struct *sp)
 283{
 284        if (WARN_ON(srcu_readers_active(sp)))
 285                return; /* Leakage unless caller handles error. */
 286        free_percpu(sp->per_cpu_ref);
 287        sp->per_cpu_ref = NULL;
 288}
 289EXPORT_SYMBOL_GPL(cleanup_srcu_struct);
 290
 291/*
 292 * Counts the new reader in the appropriate per-CPU element of the
 293 * srcu_struct.  Must be called from process context.
 294 * Returns an index that must be passed to the matching srcu_read_unlock().
 295 */
 296int __srcu_read_lock(struct srcu_struct *sp)
 297{
 298        int idx;
 299
 300        idx = READ_ONCE(sp->completed) & 0x1;
 301        __this_cpu_inc(sp->per_cpu_ref->c[idx]);
 302        smp_mb(); /* B */  /* Avoid leaking the critical section. */
 303        __this_cpu_inc(sp->per_cpu_ref->seq[idx]);
 304        return idx;
 305}
 306EXPORT_SYMBOL_GPL(__srcu_read_lock);
 307
 308/*
 309 * Removes the count for the old reader from the appropriate per-CPU
 310 * element of the srcu_struct.  Note that this may well be a different
 311 * CPU than that which was incremented by the corresponding srcu_read_lock().
 312 * Must be called from process context.
 313 */
 314void __srcu_read_unlock(struct srcu_struct *sp, int idx)
 315{
 316        smp_mb(); /* C */  /* Avoid leaking the critical section. */
 317        this_cpu_dec(sp->per_cpu_ref->c[idx]);
 318}
 319EXPORT_SYMBOL_GPL(__srcu_read_unlock);
 320
 321/*
 322 * We use an adaptive strategy for synchronize_srcu() and especially for
 323 * synchronize_srcu_expedited().  We spin for a fixed time period
 324 * (defined below) to allow SRCU readers to exit their read-side critical
 325 * sections.  If there are still some readers after 10 microseconds,
 326 * we repeatedly block for 1-millisecond time periods.  This approach
 327 * has done well in testing, so there is no need for a config parameter.
 328 */
 329#define SRCU_RETRY_CHECK_DELAY          5
 330#define SYNCHRONIZE_SRCU_TRYCOUNT       2
 331#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT   12
 332
 333/*
 334 * @@@ Wait until all pre-existing readers complete.  Such readers
 335 * will have used the index specified by "idx".
 336 * the caller should ensures the ->completed is not changed while checking
 337 * and idx = (->completed & 1) ^ 1
 338 */
 339static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
 340{
 341        for (;;) {
 342                if (srcu_readers_active_idx_check(sp, idx))
 343                        return true;
 344                if (--trycount <= 0)
 345                        return false;
 346                udelay(SRCU_RETRY_CHECK_DELAY);
 347        }
 348}
 349
 350/*
 351 * Increment the ->completed counter so that future SRCU readers will
 352 * use the other rank of the ->c[] and ->seq[] arrays.  This allows
 353 * us to wait for pre-existing readers in a starvation-free manner.
 354 */
 355static void srcu_flip(struct srcu_struct *sp)
 356{
 357        sp->completed++;
 358}
 359
 360/*
 361 * Enqueue an SRCU callback on the specified srcu_struct structure,
 362 * initiating grace-period processing if it is not already running.
 363 *
 364 * Note that all CPUs must agree that the grace period extended beyond
 365 * all pre-existing SRCU read-side critical section.  On systems with
 366 * more than one CPU, this means that when "func()" is invoked, each CPU
 367 * is guaranteed to have executed a full memory barrier since the end of
 368 * its last corresponding SRCU read-side critical section whose beginning
 369 * preceded the call to call_rcu().  It also means that each CPU executing
 370 * an SRCU read-side critical section that continues beyond the start of
 371 * "func()" must have executed a memory barrier after the call_rcu()
 372 * but before the beginning of that SRCU read-side critical section.
 373 * Note that these guarantees include CPUs that are offline, idle, or
 374 * executing in user mode, as well as CPUs that are executing in the kernel.
 375 *
 376 * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
 377 * resulting SRCU callback function "func()", then both CPU A and CPU
 378 * B are guaranteed to execute a full memory barrier during the time
 379 * interval between the call to call_rcu() and the invocation of "func()".
 380 * This guarantee applies even if CPU A and CPU B are the same CPU (but
 381 * again only if the system has more than one CPU).
 382 *
 383 * Of course, these guarantees apply only for invocations of call_srcu(),
 384 * srcu_read_lock(), and srcu_read_unlock() that are all passed the same
 385 * srcu_struct structure.
 386 */
 387void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
 388               rcu_callback_t func)
 389{
 390        unsigned long flags;
 391
 392        head->next = NULL;
 393        head->func = func;
 394        spin_lock_irqsave(&sp->queue_lock, flags);
 395        rcu_batch_queue(&sp->batch_queue, head);
 396        if (!sp->running) {
 397                sp->running = true;
 398                queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
 399        }
 400        spin_unlock_irqrestore(&sp->queue_lock, flags);
 401}
 402EXPORT_SYMBOL_GPL(call_srcu);
 403
 404static void srcu_advance_batches(struct srcu_struct *sp, int trycount);
 405static void srcu_reschedule(struct srcu_struct *sp);
 406
 407/*
 408 * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
 409 */
 410static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
 411{
 412        struct rcu_synchronize rcu;
 413        struct rcu_head *head = &rcu.head;
 414        bool done = false;
 415
 416        RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) ||
 417                         lock_is_held(&rcu_bh_lock_map) ||
 418                         lock_is_held(&rcu_lock_map) ||
 419                         lock_is_held(&rcu_sched_lock_map),
 420                         "Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section");
 421
 422        might_sleep();
 423        init_completion(&rcu.completion);
 424
 425        head->next = NULL;
 426        head->func = wakeme_after_rcu;
 427        spin_lock_irq(&sp->queue_lock);
 428        if (!sp->running) {
 429                /* steal the processing owner */
 430                sp->running = true;
 431                rcu_batch_queue(&sp->batch_check0, head);
 432                spin_unlock_irq(&sp->queue_lock);
 433
 434                srcu_advance_batches(sp, trycount);
 435                if (!rcu_batch_empty(&sp->batch_done)) {
 436                        BUG_ON(sp->batch_done.head != head);
 437                        rcu_batch_dequeue(&sp->batch_done);
 438                        done = true;
 439                }
 440                /* give the processing owner to work_struct */
 441                srcu_reschedule(sp);
 442        } else {
 443                rcu_batch_queue(&sp->batch_queue, head);
 444                spin_unlock_irq(&sp->queue_lock);
 445        }
 446
 447        if (!done)
 448                wait_for_completion(&rcu.completion);
 449}
 450
 451/**
 452 * synchronize_srcu - wait for prior SRCU read-side critical-section completion
 453 * @sp: srcu_struct with which to synchronize.
 454 *
 455 * Wait for the count to drain to zero of both indexes. To avoid the
 456 * possible starvation of synchronize_srcu(), it waits for the count of
 457 * the index=((->completed & 1) ^ 1) to drain to zero at first,
 458 * and then flip the completed and wait for the count of the other index.
 459 *
 460 * Can block; must be called from process context.
 461 *
 462 * Note that it is illegal to call synchronize_srcu() from the corresponding
 463 * SRCU read-side critical section; doing so will result in deadlock.
 464 * However, it is perfectly legal to call synchronize_srcu() on one
 465 * srcu_struct from some other srcu_struct's read-side critical section,
 466 * as long as the resulting graph of srcu_structs is acyclic.
 467 *
 468 * There are memory-ordering constraints implied by synchronize_srcu().
 469 * On systems with more than one CPU, when synchronize_srcu() returns,
 470 * each CPU is guaranteed to have executed a full memory barrier since
 471 * the end of its last corresponding SRCU-sched read-side critical section
 472 * whose beginning preceded the call to synchronize_srcu().  In addition,
 473 * each CPU having an SRCU read-side critical section that extends beyond
 474 * the return from synchronize_srcu() is guaranteed to have executed a
 475 * full memory barrier after the beginning of synchronize_srcu() and before
 476 * the beginning of that SRCU read-side critical section.  Note that these
 477 * guarantees include CPUs that are offline, idle, or executing in user mode,
 478 * as well as CPUs that are executing in the kernel.
 479 *
 480 * Furthermore, if CPU A invoked synchronize_srcu(), which returned
 481 * to its caller on CPU B, then both CPU A and CPU B are guaranteed
 482 * to have executed a full memory barrier during the execution of
 483 * synchronize_srcu().  This guarantee applies even if CPU A and CPU B
 484 * are the same CPU, but again only if the system has more than one CPU.
 485 *
 486 * Of course, these memory-ordering guarantees apply only when
 487 * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are
 488 * passed the same srcu_struct structure.
 489 */
 490void synchronize_srcu(struct srcu_struct *sp)
 491{
 492        __synchronize_srcu(sp, (rcu_gp_is_expedited() && !rcu_gp_is_normal())
 493                           ? SYNCHRONIZE_SRCU_EXP_TRYCOUNT
 494                           : SYNCHRONIZE_SRCU_TRYCOUNT);
 495}
 496EXPORT_SYMBOL_GPL(synchronize_srcu);
 497
 498/**
 499 * synchronize_srcu_expedited - Brute-force SRCU grace period
 500 * @sp: srcu_struct with which to synchronize.
 501 *
 502 * Wait for an SRCU grace period to elapse, but be more aggressive about
 503 * spinning rather than blocking when waiting.
 504 *
 505 * Note that synchronize_srcu_expedited() has the same deadlock and
 506 * memory-ordering properties as does synchronize_srcu().
 507 */
 508void synchronize_srcu_expedited(struct srcu_struct *sp)
 509{
 510        __synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT);
 511}
 512EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
 513
 514/**
 515 * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
 516 * @sp: srcu_struct on which to wait for in-flight callbacks.
 517 */
 518void srcu_barrier(struct srcu_struct *sp)
 519{
 520        synchronize_srcu(sp);
 521}
 522EXPORT_SYMBOL_GPL(srcu_barrier);
 523
 524/**
 525 * srcu_batches_completed - return batches completed.
 526 * @sp: srcu_struct on which to report batch completion.
 527 *
 528 * Report the number of batches, correlated with, but not necessarily
 529 * precisely the same as, the number of grace periods that have elapsed.
 530 */
 531unsigned long srcu_batches_completed(struct srcu_struct *sp)
 532{
 533        return sp->completed;
 534}
 535EXPORT_SYMBOL_GPL(srcu_batches_completed);
 536
 537#define SRCU_CALLBACK_BATCH     10
 538#define SRCU_INTERVAL           1
 539
 540/*
 541 * Move any new SRCU callbacks to the first stage of the SRCU grace
 542 * period pipeline.
 543 */
 544static void srcu_collect_new(struct srcu_struct *sp)
 545{
 546        if (!rcu_batch_empty(&sp->batch_queue)) {
 547                spin_lock_irq(&sp->queue_lock);
 548                rcu_batch_move(&sp->batch_check0, &sp->batch_queue);
 549                spin_unlock_irq(&sp->queue_lock);
 550        }
 551}
 552
 553/*
 554 * Core SRCU state machine.  Advance callbacks from ->batch_check0 to
 555 * ->batch_check1 and then to ->batch_done as readers drain.
 556 */
 557static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
 558{
 559        int idx = 1 ^ (sp->completed & 1);
 560
 561        /*
 562         * Because readers might be delayed for an extended period after
 563         * fetching ->completed for their index, at any point in time there
 564         * might well be readers using both idx=0 and idx=1.  We therefore
 565         * need to wait for readers to clear from both index values before
 566         * invoking a callback.
 567         */
 568
 569        if (rcu_batch_empty(&sp->batch_check0) &&
 570            rcu_batch_empty(&sp->batch_check1))
 571                return; /* no callbacks need to be advanced */
 572
 573        if (!try_check_zero(sp, idx, trycount))
 574                return; /* failed to advance, will try after SRCU_INTERVAL */
 575
 576        /*
 577         * The callbacks in ->batch_check1 have already done with their
 578         * first zero check and flip back when they were enqueued on
 579         * ->batch_check0 in a previous invocation of srcu_advance_batches().
 580         * (Presumably try_check_zero() returned false during that
 581         * invocation, leaving the callbacks stranded on ->batch_check1.)
 582         * They are therefore ready to invoke, so move them to ->batch_done.
 583         */
 584        rcu_batch_move(&sp->batch_done, &sp->batch_check1);
 585
 586        if (rcu_batch_empty(&sp->batch_check0))
 587                return; /* no callbacks need to be advanced */
 588        srcu_flip(sp);
 589
 590        /*
 591         * The callbacks in ->batch_check0 just finished their
 592         * first check zero and flip, so move them to ->batch_check1
 593         * for future checking on the other idx.
 594         */
 595        rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
 596
 597        /*
 598         * SRCU read-side critical sections are normally short, so check
 599         * at least twice in quick succession after a flip.
 600         */
 601        trycount = trycount < 2 ? 2 : trycount;
 602        if (!try_check_zero(sp, idx^1, trycount))
 603                return; /* failed to advance, will try after SRCU_INTERVAL */
 604
 605        /*
 606         * The callbacks in ->batch_check1 have now waited for all
 607         * pre-existing readers using both idx values.  They are therefore
 608         * ready to invoke, so move them to ->batch_done.
 609         */
 610        rcu_batch_move(&sp->batch_done, &sp->batch_check1);
 611}
 612
 613/*
 614 * Invoke a limited number of SRCU callbacks that have passed through
 615 * their grace period.  If there are more to do, SRCU will reschedule
 616 * the workqueue.
 617 */
 618static void srcu_invoke_callbacks(struct srcu_struct *sp)
 619{
 620        int i;
 621        struct rcu_head *head;
 622
 623        for (i = 0; i < SRCU_CALLBACK_BATCH; i++) {
 624                head = rcu_batch_dequeue(&sp->batch_done);
 625                if (!head)
 626                        break;
 627                local_bh_disable();
 628                head->func(head);
 629                local_bh_enable();
 630        }
 631}
 632
 633/*
 634 * Finished one round of SRCU grace period.  Start another if there are
 635 * more SRCU callbacks queued, otherwise put SRCU into not-running state.
 636 */
 637static void srcu_reschedule(struct srcu_struct *sp)
 638{
 639        bool pending = true;
 640
 641        if (rcu_batch_empty(&sp->batch_done) &&
 642            rcu_batch_empty(&sp->batch_check1) &&
 643            rcu_batch_empty(&sp->batch_check0) &&
 644            rcu_batch_empty(&sp->batch_queue)) {
 645                spin_lock_irq(&sp->queue_lock);
 646                if (rcu_batch_empty(&sp->batch_done) &&
 647                    rcu_batch_empty(&sp->batch_check1) &&
 648                    rcu_batch_empty(&sp->batch_check0) &&
 649                    rcu_batch_empty(&sp->batch_queue)) {
 650                        sp->running = false;
 651                        pending = false;
 652                }
 653                spin_unlock_irq(&sp->queue_lock);
 654        }
 655
 656        if (pending)
 657                queue_delayed_work(system_power_efficient_wq,
 658                                   &sp->work, SRCU_INTERVAL);
 659}
 660
 661/*
 662 * This is the work-queue function that handles SRCU grace periods.
 663 */
 664void process_srcu(struct work_struct *work)
 665{
 666        struct srcu_struct *sp;
 667
 668        sp = container_of(work, struct srcu_struct, work.work);
 669
 670        srcu_collect_new(sp);
 671        srcu_advance_batches(sp, 1);
 672        srcu_invoke_callbacks(sp);
 673        srcu_reschedule(sp);
 674}
 675EXPORT_SYMBOL_GPL(process_srcu);
 676