linux/kernel/padata.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * padata.c - generic interface to process data streams in parallel
   4 *
   5 * See Documentation/core-api/padata.rst for more information.
   6 *
   7 * Copyright (C) 2008, 2009 secunet Security Networks AG
   8 * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
   9 *
  10 * Copyright (c) 2020 Oracle and/or its affiliates.
  11 * Author: Daniel Jordan <daniel.m.jordan@oracle.com>
  12 */
  13
  14#include <linux/completion.h>
  15#include <linux/export.h>
  16#include <linux/cpumask.h>
  17#include <linux/err.h>
  18#include <linux/cpu.h>
  19#include <linux/padata.h>
  20#include <linux/mutex.h>
  21#include <linux/sched.h>
  22#include <linux/slab.h>
  23#include <linux/sysfs.h>
  24#include <linux/rcupdate.h>
  25
  26#define PADATA_WORK_ONSTACK     1       /* Work's memory is on stack */
  27
  28struct padata_work {
  29        struct work_struct      pw_work;
  30        struct list_head        pw_list;  /* padata_free_works linkage */
  31        void                    *pw_data;
  32};
  33
  34static DEFINE_SPINLOCK(padata_works_lock);
  35static struct padata_work *padata_works;
  36static LIST_HEAD(padata_free_works);
  37
  38struct padata_mt_job_state {
  39        spinlock_t              lock;
  40        struct completion       completion;
  41        struct padata_mt_job    *job;
  42        int                     nworks;
  43        int                     nworks_fini;
  44        unsigned long           chunk_size;
  45};
  46
  47static void padata_free_pd(struct parallel_data *pd);
  48static void __init padata_mt_helper(struct work_struct *work);
  49
  50static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
  51{
  52        int cpu, target_cpu;
  53
  54        target_cpu = cpumask_first(pd->cpumask.pcpu);
  55        for (cpu = 0; cpu < cpu_index; cpu++)
  56                target_cpu = cpumask_next(target_cpu, pd->cpumask.pcpu);
  57
  58        return target_cpu;
  59}
  60
  61static int padata_cpu_hash(struct parallel_data *pd, unsigned int seq_nr)
  62{
  63        /*
  64         * Hash the sequence numbers to the cpus by taking
  65         * seq_nr mod. number of cpus in use.
  66         */
  67        int cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
  68
  69        return padata_index_to_cpu(pd, cpu_index);
  70}
  71
  72static struct padata_work *padata_work_alloc(void)
  73{
  74        struct padata_work *pw;
  75
  76        lockdep_assert_held(&padata_works_lock);
  77
  78        if (list_empty(&padata_free_works))
  79                return NULL;    /* No more work items allowed to be queued. */
  80
  81        pw = list_first_entry(&padata_free_works, struct padata_work, pw_list);
  82        list_del(&pw->pw_list);
  83        return pw;
  84}
  85
  86static void padata_work_init(struct padata_work *pw, work_func_t work_fn,
  87                             void *data, int flags)
  88{
  89        if (flags & PADATA_WORK_ONSTACK)
  90                INIT_WORK_ONSTACK(&pw->pw_work, work_fn);
  91        else
  92                INIT_WORK(&pw->pw_work, work_fn);
  93        pw->pw_data = data;
  94}
  95
  96static int __init padata_work_alloc_mt(int nworks, void *data,
  97                                       struct list_head *head)
  98{
  99        int i;
 100
 101        spin_lock(&padata_works_lock);
 102        /* Start at 1 because the current task participates in the job. */
 103        for (i = 1; i < nworks; ++i) {
 104                struct padata_work *pw = padata_work_alloc();
 105
 106                if (!pw)
 107                        break;
 108                padata_work_init(pw, padata_mt_helper, data, 0);
 109                list_add(&pw->pw_list, head);
 110        }
 111        spin_unlock(&padata_works_lock);
 112
 113        return i;
 114}
 115
 116static void padata_work_free(struct padata_work *pw)
 117{
 118        lockdep_assert_held(&padata_works_lock);
 119        list_add(&pw->pw_list, &padata_free_works);
 120}
 121
 122static void __init padata_works_free(struct list_head *works)
 123{
 124        struct padata_work *cur, *next;
 125
 126        if (list_empty(works))
 127                return;
 128
 129        spin_lock(&padata_works_lock);
 130        list_for_each_entry_safe(cur, next, works, pw_list) {
 131                list_del(&cur->pw_list);
 132                padata_work_free(cur);
 133        }
 134        spin_unlock(&padata_works_lock);
 135}
 136
 137static void padata_parallel_worker(struct work_struct *parallel_work)
 138{
 139        struct padata_work *pw = container_of(parallel_work, struct padata_work,
 140                                              pw_work);
 141        struct padata_priv *padata = pw->pw_data;
 142
 143        local_bh_disable();
 144        padata->parallel(padata);
 145        spin_lock(&padata_works_lock);
 146        padata_work_free(pw);
 147        spin_unlock(&padata_works_lock);
 148        local_bh_enable();
 149}
 150
 151/**
 152 * padata_do_parallel - padata parallelization function
 153 *
 154 * @ps: padatashell
 155 * @padata: object to be parallelized
 156 * @cb_cpu: pointer to the CPU that the serialization callback function should
 157 *          run on.  If it's not in the serial cpumask of @pinst
 158 *          (i.e. cpumask.cbcpu), this function selects a fallback CPU and if
 159 *          none found, returns -EINVAL.
 160 *
 161 * The parallelization callback function will run with BHs off.
 162 * Note: Every object which is parallelized by padata_do_parallel
 163 * must be seen by padata_do_serial.
 164 *
 165 * Return: 0 on success or else negative error code.
 166 */
 167int padata_do_parallel(struct padata_shell *ps,
 168                       struct padata_priv *padata, int *cb_cpu)
 169{
 170        struct padata_instance *pinst = ps->pinst;
 171        int i, cpu, cpu_index, err;
 172        struct parallel_data *pd;
 173        struct padata_work *pw;
 174
 175        rcu_read_lock_bh();
 176
 177        pd = rcu_dereference_bh(ps->pd);
 178
 179        err = -EINVAL;
 180        if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID)
 181                goto out;
 182
 183        if (!cpumask_test_cpu(*cb_cpu, pd->cpumask.cbcpu)) {
 184                if (!cpumask_weight(pd->cpumask.cbcpu))
 185                        goto out;
 186
 187                /* Select an alternate fallback CPU and notify the caller. */
 188                cpu_index = *cb_cpu % cpumask_weight(pd->cpumask.cbcpu);
 189
 190                cpu = cpumask_first(pd->cpumask.cbcpu);
 191                for (i = 0; i < cpu_index; i++)
 192                        cpu = cpumask_next(cpu, pd->cpumask.cbcpu);
 193
 194                *cb_cpu = cpu;
 195        }
 196
 197        err =  -EBUSY;
 198        if ((pinst->flags & PADATA_RESET))
 199                goto out;
 200
 201        refcount_inc(&pd->refcnt);
 202        padata->pd = pd;
 203        padata->cb_cpu = *cb_cpu;
 204
 205        spin_lock(&padata_works_lock);
 206        padata->seq_nr = ++pd->seq_nr;
 207        pw = padata_work_alloc();
 208        spin_unlock(&padata_works_lock);
 209
 210        rcu_read_unlock_bh();
 211
 212        if (pw) {
 213                padata_work_init(pw, padata_parallel_worker, padata, 0);
 214                queue_work(pinst->parallel_wq, &pw->pw_work);
 215        } else {
 216                /* Maximum works limit exceeded, run in the current task. */
 217                padata->parallel(padata);
 218        }
 219
 220        return 0;
 221out:
 222        rcu_read_unlock_bh();
 223
 224        return err;
 225}
 226EXPORT_SYMBOL(padata_do_parallel);
 227
 228/*
 229 * padata_find_next - Find the next object that needs serialization.
 230 *
 231 * Return:
 232 * * A pointer to the control struct of the next object that needs
 233 *   serialization, if present in one of the percpu reorder queues.
 234 * * NULL, if the next object that needs serialization will
 235 *   be parallel processed by another cpu and is not yet present in
 236 *   the cpu's reorder queue.
 237 */
 238static struct padata_priv *padata_find_next(struct parallel_data *pd,
 239                                            bool remove_object)
 240{
 241        struct padata_priv *padata;
 242        struct padata_list *reorder;
 243        int cpu = pd->cpu;
 244
 245        reorder = per_cpu_ptr(pd->reorder_list, cpu);
 246
 247        spin_lock(&reorder->lock);
 248        if (list_empty(&reorder->list)) {
 249                spin_unlock(&reorder->lock);
 250                return NULL;
 251        }
 252
 253        padata = list_entry(reorder->list.next, struct padata_priv, list);
 254
 255        /*
 256         * Checks the rare case where two or more parallel jobs have hashed to
 257         * the same CPU and one of the later ones finishes first.
 258         */
 259        if (padata->seq_nr != pd->processed) {
 260                spin_unlock(&reorder->lock);
 261                return NULL;
 262        }
 263
 264        if (remove_object) {
 265                list_del_init(&padata->list);
 266                ++pd->processed;
 267                pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1, false);
 268        }
 269
 270        spin_unlock(&reorder->lock);
 271        return padata;
 272}
 273
 274static void padata_reorder(struct parallel_data *pd)
 275{
 276        struct padata_instance *pinst = pd->ps->pinst;
 277        int cb_cpu;
 278        struct padata_priv *padata;
 279        struct padata_serial_queue *squeue;
 280        struct padata_list *reorder;
 281
 282        /*
 283         * We need to ensure that only one cpu can work on dequeueing of
 284         * the reorder queue the time. Calculating in which percpu reorder
 285         * queue the next object will arrive takes some time. A spinlock
 286         * would be highly contended. Also it is not clear in which order
 287         * the objects arrive to the reorder queues. So a cpu could wait to
 288         * get the lock just to notice that there is nothing to do at the
 289         * moment. Therefore we use a trylock and let the holder of the lock
 290         * care for all the objects enqueued during the holdtime of the lock.
 291         */
 292        if (!spin_trylock_bh(&pd->lock))
 293                return;
 294
 295        while (1) {
 296                padata = padata_find_next(pd, true);
 297
 298                /*
 299                 * If the next object that needs serialization is parallel
 300                 * processed by another cpu and is still on it's way to the
 301                 * cpu's reorder queue, nothing to do for now.
 302                 */
 303                if (!padata)
 304                        break;
 305
 306                cb_cpu = padata->cb_cpu;
 307                squeue = per_cpu_ptr(pd->squeue, cb_cpu);
 308
 309                spin_lock(&squeue->serial.lock);
 310                list_add_tail(&padata->list, &squeue->serial.list);
 311                spin_unlock(&squeue->serial.lock);
 312
 313                queue_work_on(cb_cpu, pinst->serial_wq, &squeue->work);
 314        }
 315
 316        spin_unlock_bh(&pd->lock);
 317
 318        /*
 319         * The next object that needs serialization might have arrived to
 320         * the reorder queues in the meantime.
 321         *
 322         * Ensure reorder queue is read after pd->lock is dropped so we see
 323         * new objects from another task in padata_do_serial.  Pairs with
 324         * smp_mb in padata_do_serial.
 325         */
 326        smp_mb();
 327
 328        reorder = per_cpu_ptr(pd->reorder_list, pd->cpu);
 329        if (!list_empty(&reorder->list) && padata_find_next(pd, false))
 330                queue_work(pinst->serial_wq, &pd->reorder_work);
 331}
 332
 333static void invoke_padata_reorder(struct work_struct *work)
 334{
 335        struct parallel_data *pd;
 336
 337        local_bh_disable();
 338        pd = container_of(work, struct parallel_data, reorder_work);
 339        padata_reorder(pd);
 340        local_bh_enable();
 341}
 342
 343static void padata_serial_worker(struct work_struct *serial_work)
 344{
 345        struct padata_serial_queue *squeue;
 346        struct parallel_data *pd;
 347        LIST_HEAD(local_list);
 348        int cnt;
 349
 350        local_bh_disable();
 351        squeue = container_of(serial_work, struct padata_serial_queue, work);
 352        pd = squeue->pd;
 353
 354        spin_lock(&squeue->serial.lock);
 355        list_replace_init(&squeue->serial.list, &local_list);
 356        spin_unlock(&squeue->serial.lock);
 357
 358        cnt = 0;
 359
 360        while (!list_empty(&local_list)) {
 361                struct padata_priv *padata;
 362
 363                padata = list_entry(local_list.next,
 364                                    struct padata_priv, list);
 365
 366                list_del_init(&padata->list);
 367
 368                padata->serial(padata);
 369                cnt++;
 370        }
 371        local_bh_enable();
 372
 373        if (refcount_sub_and_test(cnt, &pd->refcnt))
 374                padata_free_pd(pd);
 375}
 376
 377/**
 378 * padata_do_serial - padata serialization function
 379 *
 380 * @padata: object to be serialized.
 381 *
 382 * padata_do_serial must be called for every parallelized object.
 383 * The serialization callback function will run with BHs off.
 384 */
 385void padata_do_serial(struct padata_priv *padata)
 386{
 387        struct parallel_data *pd = padata->pd;
 388        int hashed_cpu = padata_cpu_hash(pd, padata->seq_nr);
 389        struct padata_list *reorder = per_cpu_ptr(pd->reorder_list, hashed_cpu);
 390        struct padata_priv *cur;
 391
 392        spin_lock(&reorder->lock);
 393        /* Sort in ascending order of sequence number. */
 394        list_for_each_entry_reverse(cur, &reorder->list, list)
 395                if (cur->seq_nr < padata->seq_nr)
 396                        break;
 397        list_add(&padata->list, &cur->list);
 398        spin_unlock(&reorder->lock);
 399
 400        /*
 401         * Ensure the addition to the reorder list is ordered correctly
 402         * with the trylock of pd->lock in padata_reorder.  Pairs with smp_mb
 403         * in padata_reorder.
 404         */
 405        smp_mb();
 406
 407        padata_reorder(pd);
 408}
 409EXPORT_SYMBOL(padata_do_serial);
 410
 411static int padata_setup_cpumasks(struct padata_instance *pinst)
 412{
 413        struct workqueue_attrs *attrs;
 414        int err;
 415
 416        attrs = alloc_workqueue_attrs();
 417        if (!attrs)
 418                return -ENOMEM;
 419
 420        /* Restrict parallel_wq workers to pd->cpumask.pcpu. */
 421        cpumask_copy(attrs->cpumask, pinst->cpumask.pcpu);
 422        err = apply_workqueue_attrs(pinst->parallel_wq, attrs);
 423        free_workqueue_attrs(attrs);
 424
 425        return err;
 426}
 427
 428static void __init padata_mt_helper(struct work_struct *w)
 429{
 430        struct padata_work *pw = container_of(w, struct padata_work, pw_work);
 431        struct padata_mt_job_state *ps = pw->pw_data;
 432        struct padata_mt_job *job = ps->job;
 433        bool done;
 434
 435        spin_lock(&ps->lock);
 436
 437        while (job->size > 0) {
 438                unsigned long start, size, end;
 439
 440                start = job->start;
 441                /* So end is chunk size aligned if enough work remains. */
 442                size = roundup(start + 1, ps->chunk_size) - start;
 443                size = min(size, job->size);
 444                end = start + size;
 445
 446                job->start = end;
 447                job->size -= size;
 448
 449                spin_unlock(&ps->lock);
 450                job->thread_fn(start, end, job->fn_arg);
 451                spin_lock(&ps->lock);
 452        }
 453
 454        ++ps->nworks_fini;
 455        done = (ps->nworks_fini == ps->nworks);
 456        spin_unlock(&ps->lock);
 457
 458        if (done)
 459                complete(&ps->completion);
 460}
 461
 462/**
 463 * padata_do_multithreaded - run a multithreaded job
 464 * @job: Description of the job.
 465 *
 466 * See the definition of struct padata_mt_job for more details.
 467 */
 468void __init padata_do_multithreaded(struct padata_mt_job *job)
 469{
 470        /* In case threads finish at different times. */
 471        static const unsigned long load_balance_factor = 4;
 472        struct padata_work my_work, *pw;
 473        struct padata_mt_job_state ps;
 474        LIST_HEAD(works);
 475        int nworks;
 476
 477        if (job->size == 0)
 478                return;
 479
 480        /* Ensure at least one thread when size < min_chunk. */
 481        nworks = max(job->size / job->min_chunk, 1ul);
 482        nworks = min(nworks, job->max_threads);
 483
 484        if (nworks == 1) {
 485                /* Single thread, no coordination needed, cut to the chase. */
 486                job->thread_fn(job->start, job->start + job->size, job->fn_arg);
 487                return;
 488        }
 489
 490        spin_lock_init(&ps.lock);
 491        init_completion(&ps.completion);
 492        ps.job         = job;
 493        ps.nworks      = padata_work_alloc_mt(nworks, &ps, &works);
 494        ps.nworks_fini = 0;
 495
 496        /*
 497         * Chunk size is the amount of work a helper does per call to the
 498         * thread function.  Load balance large jobs between threads by
 499         * increasing the number of chunks, guarantee at least the minimum
 500         * chunk size from the caller, and honor the caller's alignment.
 501         */
 502        ps.chunk_size = job->size / (ps.nworks * load_balance_factor);
 503        ps.chunk_size = max(ps.chunk_size, job->min_chunk);
 504        ps.chunk_size = roundup(ps.chunk_size, job->align);
 505
 506        list_for_each_entry(pw, &works, pw_list)
 507                queue_work(system_unbound_wq, &pw->pw_work);
 508
 509        /* Use the current thread, which saves starting a workqueue worker. */
 510        padata_work_init(&my_work, padata_mt_helper, &ps, PADATA_WORK_ONSTACK);
 511        padata_mt_helper(&my_work.pw_work);
 512
 513        /* Wait for all the helpers to finish. */
 514        wait_for_completion(&ps.completion);
 515
 516        destroy_work_on_stack(&my_work.pw_work);
 517        padata_works_free(&works);
 518}
 519
 520static void __padata_list_init(struct padata_list *pd_list)
 521{
 522        INIT_LIST_HEAD(&pd_list->list);
 523        spin_lock_init(&pd_list->lock);
 524}
 525
 526/* Initialize all percpu queues used by serial workers */
 527static void padata_init_squeues(struct parallel_data *pd)
 528{
 529        int cpu;
 530        struct padata_serial_queue *squeue;
 531
 532        for_each_cpu(cpu, pd->cpumask.cbcpu) {
 533                squeue = per_cpu_ptr(pd->squeue, cpu);
 534                squeue->pd = pd;
 535                __padata_list_init(&squeue->serial);
 536                INIT_WORK(&squeue->work, padata_serial_worker);
 537        }
 538}
 539
 540/* Initialize per-CPU reorder lists */
 541static void padata_init_reorder_list(struct parallel_data *pd)
 542{
 543        int cpu;
 544        struct padata_list *list;
 545
 546        for_each_cpu(cpu, pd->cpumask.pcpu) {
 547                list = per_cpu_ptr(pd->reorder_list, cpu);
 548                __padata_list_init(list);
 549        }
 550}
 551
 552/* Allocate and initialize the internal cpumask dependend resources. */
 553static struct parallel_data *padata_alloc_pd(struct padata_shell *ps)
 554{
 555        struct padata_instance *pinst = ps->pinst;
 556        struct parallel_data *pd;
 557
 558        pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
 559        if (!pd)
 560                goto err;
 561
 562        pd->reorder_list = alloc_percpu(struct padata_list);
 563        if (!pd->reorder_list)
 564                goto err_free_pd;
 565
 566        pd->squeue = alloc_percpu(struct padata_serial_queue);
 567        if (!pd->squeue)
 568                goto err_free_reorder_list;
 569
 570        pd->ps = ps;
 571
 572        if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
 573                goto err_free_squeue;
 574        if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL))
 575                goto err_free_pcpu;
 576
 577        cpumask_and(pd->cpumask.pcpu, pinst->cpumask.pcpu, cpu_online_mask);
 578        cpumask_and(pd->cpumask.cbcpu, pinst->cpumask.cbcpu, cpu_online_mask);
 579
 580        padata_init_reorder_list(pd);
 581        padata_init_squeues(pd);
 582        pd->seq_nr = -1;
 583        refcount_set(&pd->refcnt, 1);
 584        spin_lock_init(&pd->lock);
 585        pd->cpu = cpumask_first(pd->cpumask.pcpu);
 586        INIT_WORK(&pd->reorder_work, invoke_padata_reorder);
 587
 588        return pd;
 589
 590err_free_pcpu:
 591        free_cpumask_var(pd->cpumask.pcpu);
 592err_free_squeue:
 593        free_percpu(pd->squeue);
 594err_free_reorder_list:
 595        free_percpu(pd->reorder_list);
 596err_free_pd:
 597        kfree(pd);
 598err:
 599        return NULL;
 600}
 601
 602static void padata_free_pd(struct parallel_data *pd)
 603{
 604        free_cpumask_var(pd->cpumask.pcpu);
 605        free_cpumask_var(pd->cpumask.cbcpu);
 606        free_percpu(pd->reorder_list);
 607        free_percpu(pd->squeue);
 608        kfree(pd);
 609}
 610
 611static void __padata_start(struct padata_instance *pinst)
 612{
 613        pinst->flags |= PADATA_INIT;
 614}
 615
 616static void __padata_stop(struct padata_instance *pinst)
 617{
 618        if (!(pinst->flags & PADATA_INIT))
 619                return;
 620
 621        pinst->flags &= ~PADATA_INIT;
 622
 623        synchronize_rcu();
 624}
 625
 626/* Replace the internal control structure with a new one. */
 627static int padata_replace_one(struct padata_shell *ps)
 628{
 629        struct parallel_data *pd_new;
 630
 631        pd_new = padata_alloc_pd(ps);
 632        if (!pd_new)
 633                return -ENOMEM;
 634
 635        ps->opd = rcu_dereference_protected(ps->pd, 1);
 636        rcu_assign_pointer(ps->pd, pd_new);
 637
 638        return 0;
 639}
 640
 641static int padata_replace(struct padata_instance *pinst)
 642{
 643        struct padata_shell *ps;
 644        int err = 0;
 645
 646        pinst->flags |= PADATA_RESET;
 647
 648        list_for_each_entry(ps, &pinst->pslist, list) {
 649                err = padata_replace_one(ps);
 650                if (err)
 651                        break;
 652        }
 653
 654        synchronize_rcu();
 655
 656        list_for_each_entry_continue_reverse(ps, &pinst->pslist, list)
 657                if (refcount_dec_and_test(&ps->opd->refcnt))
 658                        padata_free_pd(ps->opd);
 659
 660        pinst->flags &= ~PADATA_RESET;
 661
 662        return err;
 663}
 664
 665/* If cpumask contains no active cpu, we mark the instance as invalid. */
 666static bool padata_validate_cpumask(struct padata_instance *pinst,
 667                                    const struct cpumask *cpumask)
 668{
 669        if (!cpumask_intersects(cpumask, cpu_online_mask)) {
 670                pinst->flags |= PADATA_INVALID;
 671                return false;
 672        }
 673
 674        pinst->flags &= ~PADATA_INVALID;
 675        return true;
 676}
 677
 678static int __padata_set_cpumasks(struct padata_instance *pinst,
 679                                 cpumask_var_t pcpumask,
 680                                 cpumask_var_t cbcpumask)
 681{
 682        int valid;
 683        int err;
 684
 685        valid = padata_validate_cpumask(pinst, pcpumask);
 686        if (!valid) {
 687                __padata_stop(pinst);
 688                goto out_replace;
 689        }
 690
 691        valid = padata_validate_cpumask(pinst, cbcpumask);
 692        if (!valid)
 693                __padata_stop(pinst);
 694
 695out_replace:
 696        cpumask_copy(pinst->cpumask.pcpu, pcpumask);
 697        cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
 698
 699        err = padata_setup_cpumasks(pinst) ?: padata_replace(pinst);
 700
 701        if (valid)
 702                __padata_start(pinst);
 703
 704        return err;
 705}
 706
 707/**
 708 * padata_set_cpumask - Sets specified by @cpumask_type cpumask to the value
 709 *                      equivalent to @cpumask.
 710 * @pinst: padata instance
 711 * @cpumask_type: PADATA_CPU_SERIAL or PADATA_CPU_PARALLEL corresponding
 712 *                to parallel and serial cpumasks respectively.
 713 * @cpumask: the cpumask to use
 714 *
 715 * Return: 0 on success or negative error code
 716 */
 717int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
 718                       cpumask_var_t cpumask)
 719{
 720        struct cpumask *serial_mask, *parallel_mask;
 721        int err = -EINVAL;
 722
 723        cpus_read_lock();
 724        mutex_lock(&pinst->lock);
 725
 726        switch (cpumask_type) {
 727        case PADATA_CPU_PARALLEL:
 728                serial_mask = pinst->cpumask.cbcpu;
 729                parallel_mask = cpumask;
 730                break;
 731        case PADATA_CPU_SERIAL:
 732                parallel_mask = pinst->cpumask.pcpu;
 733                serial_mask = cpumask;
 734                break;
 735        default:
 736                 goto out;
 737        }
 738
 739        err =  __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
 740
 741out:
 742        mutex_unlock(&pinst->lock);
 743        cpus_read_unlock();
 744
 745        return err;
 746}
 747EXPORT_SYMBOL(padata_set_cpumask);
 748
 749#ifdef CONFIG_HOTPLUG_CPU
 750
 751static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
 752{
 753        int err = 0;
 754
 755        if (cpumask_test_cpu(cpu, cpu_online_mask)) {
 756                err = padata_replace(pinst);
 757
 758                if (padata_validate_cpumask(pinst, pinst->cpumask.pcpu) &&
 759                    padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
 760                        __padata_start(pinst);
 761        }
 762
 763        return err;
 764}
 765
 766static int __padata_remove_cpu(struct padata_instance *pinst, int cpu)
 767{
 768        int err = 0;
 769
 770        if (!cpumask_test_cpu(cpu, cpu_online_mask)) {
 771                if (!padata_validate_cpumask(pinst, pinst->cpumask.pcpu) ||
 772                    !padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
 773                        __padata_stop(pinst);
 774
 775                err = padata_replace(pinst);
 776        }
 777
 778        return err;
 779}
 780
 781static inline int pinst_has_cpu(struct padata_instance *pinst, int cpu)
 782{
 783        return cpumask_test_cpu(cpu, pinst->cpumask.pcpu) ||
 784                cpumask_test_cpu(cpu, pinst->cpumask.cbcpu);
 785}
 786
 787static int padata_cpu_online(unsigned int cpu, struct hlist_node *node)
 788{
 789        struct padata_instance *pinst;
 790        int ret;
 791
 792        pinst = hlist_entry_safe(node, struct padata_instance, cpu_online_node);
 793        if (!pinst_has_cpu(pinst, cpu))
 794                return 0;
 795
 796        mutex_lock(&pinst->lock);
 797        ret = __padata_add_cpu(pinst, cpu);
 798        mutex_unlock(&pinst->lock);
 799        return ret;
 800}
 801
 802static int padata_cpu_dead(unsigned int cpu, struct hlist_node *node)
 803{
 804        struct padata_instance *pinst;
 805        int ret;
 806
 807        pinst = hlist_entry_safe(node, struct padata_instance, cpu_dead_node);
 808        if (!pinst_has_cpu(pinst, cpu))
 809                return 0;
 810
 811        mutex_lock(&pinst->lock);
 812        ret = __padata_remove_cpu(pinst, cpu);
 813        mutex_unlock(&pinst->lock);
 814        return ret;
 815}
 816
 817static enum cpuhp_state hp_online;
 818#endif
 819
 820static void __padata_free(struct padata_instance *pinst)
 821{
 822#ifdef CONFIG_HOTPLUG_CPU
 823        cpuhp_state_remove_instance_nocalls(CPUHP_PADATA_DEAD,
 824                                            &pinst->cpu_dead_node);
 825        cpuhp_state_remove_instance_nocalls(hp_online, &pinst->cpu_online_node);
 826#endif
 827
 828        WARN_ON(!list_empty(&pinst->pslist));
 829
 830        free_cpumask_var(pinst->cpumask.pcpu);
 831        free_cpumask_var(pinst->cpumask.cbcpu);
 832        destroy_workqueue(pinst->serial_wq);
 833        destroy_workqueue(pinst->parallel_wq);
 834        kfree(pinst);
 835}
 836
 837#define kobj2pinst(_kobj)                                       \
 838        container_of(_kobj, struct padata_instance, kobj)
 839#define attr2pentry(_attr)                                      \
 840        container_of(_attr, struct padata_sysfs_entry, attr)
 841
 842static void padata_sysfs_release(struct kobject *kobj)
 843{
 844        struct padata_instance *pinst = kobj2pinst(kobj);
 845        __padata_free(pinst);
 846}
 847
 848struct padata_sysfs_entry {
 849        struct attribute attr;
 850        ssize_t (*show)(struct padata_instance *, struct attribute *, char *);
 851        ssize_t (*store)(struct padata_instance *, struct attribute *,
 852                         const char *, size_t);
 853};
 854
 855static ssize_t show_cpumask(struct padata_instance *pinst,
 856                            struct attribute *attr,  char *buf)
 857{
 858        struct cpumask *cpumask;
 859        ssize_t len;
 860
 861        mutex_lock(&pinst->lock);
 862        if (!strcmp(attr->name, "serial_cpumask"))
 863                cpumask = pinst->cpumask.cbcpu;
 864        else
 865                cpumask = pinst->cpumask.pcpu;
 866
 867        len = snprintf(buf, PAGE_SIZE, "%*pb\n",
 868                       nr_cpu_ids, cpumask_bits(cpumask));
 869        mutex_unlock(&pinst->lock);
 870        return len < PAGE_SIZE ? len : -EINVAL;
 871}
 872
 873static ssize_t store_cpumask(struct padata_instance *pinst,
 874                             struct attribute *attr,
 875                             const char *buf, size_t count)
 876{
 877        cpumask_var_t new_cpumask;
 878        ssize_t ret;
 879        int mask_type;
 880
 881        if (!alloc_cpumask_var(&new_cpumask, GFP_KERNEL))
 882                return -ENOMEM;
 883
 884        ret = bitmap_parse(buf, count, cpumask_bits(new_cpumask),
 885                           nr_cpumask_bits);
 886        if (ret < 0)
 887                goto out;
 888
 889        mask_type = !strcmp(attr->name, "serial_cpumask") ?
 890                PADATA_CPU_SERIAL : PADATA_CPU_PARALLEL;
 891        ret = padata_set_cpumask(pinst, mask_type, new_cpumask);
 892        if (!ret)
 893                ret = count;
 894
 895out:
 896        free_cpumask_var(new_cpumask);
 897        return ret;
 898}
 899
 900#define PADATA_ATTR_RW(_name, _show_name, _store_name)          \
 901        static struct padata_sysfs_entry _name##_attr =         \
 902                __ATTR(_name, 0644, _show_name, _store_name)
 903#define PADATA_ATTR_RO(_name, _show_name)               \
 904        static struct padata_sysfs_entry _name##_attr = \
 905                __ATTR(_name, 0400, _show_name, NULL)
 906
 907PADATA_ATTR_RW(serial_cpumask, show_cpumask, store_cpumask);
 908PADATA_ATTR_RW(parallel_cpumask, show_cpumask, store_cpumask);
 909
 910/*
 911 * Padata sysfs provides the following objects:
 912 * serial_cpumask   [RW] - cpumask for serial workers
 913 * parallel_cpumask [RW] - cpumask for parallel workers
 914 */
 915static struct attribute *padata_default_attrs[] = {
 916        &serial_cpumask_attr.attr,
 917        &parallel_cpumask_attr.attr,
 918        NULL,
 919};
 920ATTRIBUTE_GROUPS(padata_default);
 921
 922static ssize_t padata_sysfs_show(struct kobject *kobj,
 923                                 struct attribute *attr, char *buf)
 924{
 925        struct padata_instance *pinst;
 926        struct padata_sysfs_entry *pentry;
 927        ssize_t ret = -EIO;
 928
 929        pinst = kobj2pinst(kobj);
 930        pentry = attr2pentry(attr);
 931        if (pentry->show)
 932                ret = pentry->show(pinst, attr, buf);
 933
 934        return ret;
 935}
 936
 937static ssize_t padata_sysfs_store(struct kobject *kobj, struct attribute *attr,
 938                                  const char *buf, size_t count)
 939{
 940        struct padata_instance *pinst;
 941        struct padata_sysfs_entry *pentry;
 942        ssize_t ret = -EIO;
 943
 944        pinst = kobj2pinst(kobj);
 945        pentry = attr2pentry(attr);
 946        if (pentry->show)
 947                ret = pentry->store(pinst, attr, buf, count);
 948
 949        return ret;
 950}
 951
 952static const struct sysfs_ops padata_sysfs_ops = {
 953        .show = padata_sysfs_show,
 954        .store = padata_sysfs_store,
 955};
 956
 957static struct kobj_type padata_attr_type = {
 958        .sysfs_ops = &padata_sysfs_ops,
 959        .default_groups = padata_default_groups,
 960        .release = padata_sysfs_release,
 961};
 962
 963/**
 964 * padata_alloc - allocate and initialize a padata instance
 965 * @name: used to identify the instance
 966 *
 967 * Return: new instance on success, NULL on error
 968 */
 969struct padata_instance *padata_alloc(const char *name)
 970{
 971        struct padata_instance *pinst;
 972
 973        pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL);
 974        if (!pinst)
 975                goto err;
 976
 977        pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_UNBOUND, 0,
 978                                             name);
 979        if (!pinst->parallel_wq)
 980                goto err_free_inst;
 981
 982        cpus_read_lock();
 983
 984        pinst->serial_wq = alloc_workqueue("%s_serial", WQ_MEM_RECLAIM |
 985                                           WQ_CPU_INTENSIVE, 1, name);
 986        if (!pinst->serial_wq)
 987                goto err_put_cpus;
 988
 989        if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
 990                goto err_free_serial_wq;
 991        if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {
 992                free_cpumask_var(pinst->cpumask.pcpu);
 993                goto err_free_serial_wq;
 994        }
 995
 996        INIT_LIST_HEAD(&pinst->pslist);
 997
 998        cpumask_copy(pinst->cpumask.pcpu, cpu_possible_mask);
 999        cpumask_copy(pinst->cpumask.cbcpu, cpu_possible_mask);
1000
1001        if (padata_setup_cpumasks(pinst))
1002                goto err_free_masks;
1003
1004        __padata_start(pinst);
1005
1006        kobject_init(&pinst->kobj, &padata_attr_type);
1007        mutex_init(&pinst->lock);
1008
1009#ifdef CONFIG_HOTPLUG_CPU
1010        cpuhp_state_add_instance_nocalls_cpuslocked(hp_online,
1011                                                    &pinst->cpu_online_node);
1012        cpuhp_state_add_instance_nocalls_cpuslocked(CPUHP_PADATA_DEAD,
1013                                                    &pinst->cpu_dead_node);
1014#endif
1015
1016        cpus_read_unlock();
1017
1018        return pinst;
1019
1020err_free_masks:
1021        free_cpumask_var(pinst->cpumask.pcpu);
1022        free_cpumask_var(pinst->cpumask.cbcpu);
1023err_free_serial_wq:
1024        destroy_workqueue(pinst->serial_wq);
1025err_put_cpus:
1026        cpus_read_unlock();
1027        destroy_workqueue(pinst->parallel_wq);
1028err_free_inst:
1029        kfree(pinst);
1030err:
1031        return NULL;
1032}
1033EXPORT_SYMBOL(padata_alloc);
1034
1035/**
1036 * padata_free - free a padata instance
1037 *
1038 * @pinst: padata instance to free
1039 */
1040void padata_free(struct padata_instance *pinst)
1041{
1042        kobject_put(&pinst->kobj);
1043}
1044EXPORT_SYMBOL(padata_free);
1045
1046/**
1047 * padata_alloc_shell - Allocate and initialize padata shell.
1048 *
1049 * @pinst: Parent padata_instance object.
1050 *
1051 * Return: new shell on success, NULL on error
1052 */
1053struct padata_shell *padata_alloc_shell(struct padata_instance *pinst)
1054{
1055        struct parallel_data *pd;
1056        struct padata_shell *ps;
1057
1058        ps = kzalloc(sizeof(*ps), GFP_KERNEL);
1059        if (!ps)
1060                goto out;
1061
1062        ps->pinst = pinst;
1063
1064        cpus_read_lock();
1065        pd = padata_alloc_pd(ps);
1066        cpus_read_unlock();
1067
1068        if (!pd)
1069                goto out_free_ps;
1070
1071        mutex_lock(&pinst->lock);
1072        RCU_INIT_POINTER(ps->pd, pd);
1073        list_add(&ps->list, &pinst->pslist);
1074        mutex_unlock(&pinst->lock);
1075
1076        return ps;
1077
1078out_free_ps:
1079        kfree(ps);
1080out:
1081        return NULL;
1082}
1083EXPORT_SYMBOL(padata_alloc_shell);
1084
1085/**
1086 * padata_free_shell - free a padata shell
1087 *
1088 * @ps: padata shell to free
1089 */
1090void padata_free_shell(struct padata_shell *ps)
1091{
1092        if (!ps)
1093                return;
1094
1095        mutex_lock(&ps->pinst->lock);
1096        list_del(&ps->list);
1097        padata_free_pd(rcu_dereference_protected(ps->pd, 1));
1098        mutex_unlock(&ps->pinst->lock);
1099
1100        kfree(ps);
1101}
1102EXPORT_SYMBOL(padata_free_shell);
1103
1104void __init padata_init(void)
1105{
1106        unsigned int i, possible_cpus;
1107#ifdef CONFIG_HOTPLUG_CPU
1108        int ret;
1109
1110        ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "padata:online",
1111                                      padata_cpu_online, NULL);
1112        if (ret < 0)
1113                goto err;
1114        hp_online = ret;
1115
1116        ret = cpuhp_setup_state_multi(CPUHP_PADATA_DEAD, "padata:dead",
1117                                      NULL, padata_cpu_dead);
1118        if (ret < 0)
1119                goto remove_online_state;
1120#endif
1121
1122        possible_cpus = num_possible_cpus();
1123        padata_works = kmalloc_array(possible_cpus, sizeof(struct padata_work),
1124                                     GFP_KERNEL);
1125        if (!padata_works)
1126                goto remove_dead_state;
1127
1128        for (i = 0; i < possible_cpus; ++i)
1129                list_add(&padata_works[i].pw_list, &padata_free_works);
1130
1131        return;
1132
1133remove_dead_state:
1134#ifdef CONFIG_HOTPLUG_CPU
1135        cpuhp_remove_multi_state(CPUHP_PADATA_DEAD);
1136remove_online_state:
1137        cpuhp_remove_multi_state(hp_online);
1138err:
1139#endif
1140        pr_warn("padata: initialization failed\n");
1141}
1142