linux/kernel/padata.c
<<
>>
Prefs
   1/*
   2 * padata.c - generic interface to process data streams in parallel
   3 *
   4 * See Documentation/padata.txt for an api documentation.
   5 *
   6 * Copyright (C) 2008, 2009 secunet Security Networks AG
   7 * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
   8 *
   9 * This program is free software; you can redistribute it and/or modify it
  10 * under the terms and conditions of the GNU General Public License,
  11 * version 2, as published by the Free Software Foundation.
  12 *
  13 * This program is distributed in the hope it will be useful, but WITHOUT
  14 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  15 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
  16 * more details.
  17 *
  18 * You should have received a copy of the GNU General Public License along with
  19 * this program; if not, write to the Free Software Foundation, Inc.,
  20 * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
  21 */
  22
  23#include <linux/export.h>
  24#include <linux/cpumask.h>
  25#include <linux/err.h>
  26#include <linux/cpu.h>
  27#include <linux/padata.h>
  28#include <linux/mutex.h>
  29#include <linux/sched.h>
  30#include <linux/slab.h>
  31#include <linux/sysfs.h>
  32#include <linux/rcupdate.h>
  33
  34#define MAX_OBJ_NUM 1000
  35
  36static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
  37{
  38        int cpu, target_cpu;
  39
  40        target_cpu = cpumask_first(pd->cpumask.pcpu);
  41        for (cpu = 0; cpu < cpu_index; cpu++)
  42                target_cpu = cpumask_next(target_cpu, pd->cpumask.pcpu);
  43
  44        return target_cpu;
  45}
  46
  47static int padata_cpu_hash(struct parallel_data *pd)
  48{
  49        int cpu_index;
  50
  51        /*
  52         * Hash the sequence numbers to the cpus by taking
  53         * seq_nr mod. number of cpus in use.
  54         */
  55
  56        spin_lock(&pd->seq_lock);
  57        cpu_index =  pd->seq_nr % cpumask_weight(pd->cpumask.pcpu);
  58        pd->seq_nr++;
  59        spin_unlock(&pd->seq_lock);
  60
  61        return padata_index_to_cpu(pd, cpu_index);
  62}
  63
  64static void padata_parallel_worker(struct work_struct *parallel_work)
  65{
  66        struct padata_parallel_queue *pqueue;
  67        struct parallel_data *pd;
  68        struct padata_instance *pinst;
  69        LIST_HEAD(local_list);
  70
  71        local_bh_disable();
  72        pqueue = container_of(parallel_work,
  73                              struct padata_parallel_queue, work);
  74        pd = pqueue->pd;
  75        pinst = pd->pinst;
  76
  77        spin_lock(&pqueue->parallel.lock);
  78        list_replace_init(&pqueue->parallel.list, &local_list);
  79        spin_unlock(&pqueue->parallel.lock);
  80
  81        while (!list_empty(&local_list)) {
  82                struct padata_priv *padata;
  83
  84                padata = list_entry(local_list.next,
  85                                    struct padata_priv, list);
  86
  87                list_del_init(&padata->list);
  88
  89                padata->parallel(padata);
  90        }
  91
  92        local_bh_enable();
  93}
  94
  95/**
  96 * padata_do_parallel - padata parallelization function
  97 *
  98 * @pinst: padata instance
  99 * @padata: object to be parallelized
 100 * @cb_cpu: cpu the serialization callback function will run on,
 101 *          must be in the serial cpumask of padata(i.e. cpumask.cbcpu).
 102 *
 103 * The parallelization callback function will run with BHs off.
 104 * Note: Every object which is parallelized by padata_do_parallel
 105 * must be seen by padata_do_serial.
 106 */
 107int padata_do_parallel(struct padata_instance *pinst,
 108                       struct padata_priv *padata, int cb_cpu)
 109{
 110        int target_cpu, err;
 111        struct padata_parallel_queue *queue;
 112        struct parallel_data *pd;
 113
 114        rcu_read_lock_bh();
 115
 116        pd = rcu_dereference(pinst->pd);
 117
 118        err = -EINVAL;
 119        if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID)
 120                goto out;
 121
 122        if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu))
 123                goto out;
 124
 125        err =  -EBUSY;
 126        if ((pinst->flags & PADATA_RESET))
 127                goto out;
 128
 129        if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM)
 130                goto out;
 131
 132        err = 0;
 133        atomic_inc(&pd->refcnt);
 134        padata->pd = pd;
 135        padata->cb_cpu = cb_cpu;
 136
 137        target_cpu = padata_cpu_hash(pd);
 138        queue = per_cpu_ptr(pd->pqueue, target_cpu);
 139
 140        spin_lock(&queue->parallel.lock);
 141        list_add_tail(&padata->list, &queue->parallel.list);
 142        spin_unlock(&queue->parallel.lock);
 143
 144        queue_work_on(target_cpu, pinst->wq, &queue->work);
 145
 146out:
 147        rcu_read_unlock_bh();
 148
 149        return err;
 150}
 151EXPORT_SYMBOL(padata_do_parallel);
 152
 153/*
 154 * padata_get_next - Get the next object that needs serialization.
 155 *
 156 * Return values are:
 157 *
 158 * A pointer to the control struct of the next object that needs
 159 * serialization, if present in one of the percpu reorder queues.
 160 *
 161 * NULL, if all percpu reorder queues are empty.
 162 *
 163 * -EINPROGRESS, if the next object that needs serialization will
 164 *  be parallel processed by another cpu and is not yet present in
 165 *  the cpu's reorder queue.
 166 *
 167 * -ENODATA, if this cpu has to do the parallel processing for
 168 *  the next object.
 169 */
 170static struct padata_priv *padata_get_next(struct parallel_data *pd)
 171{
 172        int cpu, num_cpus;
 173        unsigned int next_nr, next_index;
 174        struct padata_parallel_queue *next_queue;
 175        struct padata_priv *padata;
 176        struct padata_list *reorder;
 177
 178        num_cpus = cpumask_weight(pd->cpumask.pcpu);
 179
 180        /*
 181         * Calculate the percpu reorder queue and the sequence
 182         * number of the next object.
 183         */
 184        next_nr = pd->processed;
 185        next_index = next_nr % num_cpus;
 186        cpu = padata_index_to_cpu(pd, next_index);
 187        next_queue = per_cpu_ptr(pd->pqueue, cpu);
 188
 189        padata = NULL;
 190
 191        reorder = &next_queue->reorder;
 192
 193        if (!list_empty(&reorder->list)) {
 194                padata = list_entry(reorder->list.next,
 195                                    struct padata_priv, list);
 196
 197                spin_lock(&reorder->lock);
 198                list_del_init(&padata->list);
 199                atomic_dec(&pd->reorder_objects);
 200                spin_unlock(&reorder->lock);
 201
 202                pd->processed++;
 203
 204                goto out;
 205        }
 206
 207        if (__this_cpu_read(pd->pqueue->cpu_index) == next_queue->cpu_index) {
 208                padata = ERR_PTR(-ENODATA);
 209                goto out;
 210        }
 211
 212        padata = ERR_PTR(-EINPROGRESS);
 213out:
 214        return padata;
 215}
 216
 217static void padata_reorder(struct parallel_data *pd)
 218{
 219        int cb_cpu;
 220        struct padata_priv *padata;
 221        struct padata_serial_queue *squeue;
 222        struct padata_instance *pinst = pd->pinst;
 223
 224        /*
 225         * We need to ensure that only one cpu can work on dequeueing of
 226         * the reorder queue the time. Calculating in which percpu reorder
 227         * queue the next object will arrive takes some time. A spinlock
 228         * would be highly contended. Also it is not clear in which order
 229         * the objects arrive to the reorder queues. So a cpu could wait to
 230         * get the lock just to notice that there is nothing to do at the
 231         * moment. Therefore we use a trylock and let the holder of the lock
 232         * care for all the objects enqueued during the holdtime of the lock.
 233         */
 234        if (!spin_trylock_bh(&pd->lock))
 235                return;
 236
 237        while (1) {
 238                padata = padata_get_next(pd);
 239
 240                /*
 241                 * All reorder queues are empty, or the next object that needs
 242                 * serialization is parallel processed by another cpu and is
 243                 * still on it's way to the cpu's reorder queue, nothing to
 244                 * do for now.
 245                 */
 246                if (!padata || PTR_ERR(padata) == -EINPROGRESS)
 247                        break;
 248
 249                /*
 250                 * This cpu has to do the parallel processing of the next
 251                 * object. It's waiting in the cpu's parallelization queue,
 252                 * so exit immediately.
 253                 */
 254                if (PTR_ERR(padata) == -ENODATA) {
 255                        del_timer(&pd->timer);
 256                        spin_unlock_bh(&pd->lock);
 257                        return;
 258                }
 259
 260                cb_cpu = padata->cb_cpu;
 261                squeue = per_cpu_ptr(pd->squeue, cb_cpu);
 262
 263                spin_lock(&squeue->serial.lock);
 264                list_add_tail(&padata->list, &squeue->serial.list);
 265                spin_unlock(&squeue->serial.lock);
 266
 267                queue_work_on(cb_cpu, pinst->wq, &squeue->work);
 268        }
 269
 270        spin_unlock_bh(&pd->lock);
 271
 272        /*
 273         * The next object that needs serialization might have arrived to
 274         * the reorder queues in the meantime, we will be called again
 275         * from the timer function if no one else cares for it.
 276         */
 277        if (atomic_read(&pd->reorder_objects)
 278                        && !(pinst->flags & PADATA_RESET))
 279                mod_timer(&pd->timer, jiffies + HZ);
 280        else
 281                del_timer(&pd->timer);
 282
 283        return;
 284}
 285
 286static void padata_reorder_timer(unsigned long arg)
 287{
 288        struct parallel_data *pd = (struct parallel_data *)arg;
 289
 290        padata_reorder(pd);
 291}
 292
 293static void padata_serial_worker(struct work_struct *serial_work)
 294{
 295        struct padata_serial_queue *squeue;
 296        struct parallel_data *pd;
 297        LIST_HEAD(local_list);
 298
 299        local_bh_disable();
 300        squeue = container_of(serial_work, struct padata_serial_queue, work);
 301        pd = squeue->pd;
 302
 303        spin_lock(&squeue->serial.lock);
 304        list_replace_init(&squeue->serial.list, &local_list);
 305        spin_unlock(&squeue->serial.lock);
 306
 307        while (!list_empty(&local_list)) {
 308                struct padata_priv *padata;
 309
 310                padata = list_entry(local_list.next,
 311                                    struct padata_priv, list);
 312
 313                list_del_init(&padata->list);
 314
 315                padata->serial(padata);
 316                atomic_dec(&pd->refcnt);
 317        }
 318        local_bh_enable();
 319}
 320
 321/**
 322 * padata_do_serial - padata serialization function
 323 *
 324 * @padata: object to be serialized.
 325 *
 326 * padata_do_serial must be called for every parallelized object.
 327 * The serialization callback function will run with BHs off.
 328 */
 329void padata_do_serial(struct padata_priv *padata)
 330{
 331        int cpu;
 332        struct padata_parallel_queue *pqueue;
 333        struct parallel_data *pd;
 334
 335        pd = padata->pd;
 336
 337        cpu = get_cpu();
 338        pqueue = per_cpu_ptr(pd->pqueue, cpu);
 339
 340        spin_lock(&pqueue->reorder.lock);
 341        atomic_inc(&pd->reorder_objects);
 342        list_add_tail(&padata->list, &pqueue->reorder.list);
 343        spin_unlock(&pqueue->reorder.lock);
 344
 345        put_cpu();
 346
 347        padata_reorder(pd);
 348}
 349EXPORT_SYMBOL(padata_do_serial);
 350
 351static int padata_setup_cpumasks(struct parallel_data *pd,
 352                                 const struct cpumask *pcpumask,
 353                                 const struct cpumask *cbcpumask)
 354{
 355        if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
 356                return -ENOMEM;
 357
 358        cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask);
 359        if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) {
 360                free_cpumask_var(pd->cpumask.cbcpu);
 361                return -ENOMEM;
 362        }
 363
 364        cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask);
 365        return 0;
 366}
 367
 368static void __padata_list_init(struct padata_list *pd_list)
 369{
 370        INIT_LIST_HEAD(&pd_list->list);
 371        spin_lock_init(&pd_list->lock);
 372}
 373
 374/* Initialize all percpu queues used by serial workers */
 375static void padata_init_squeues(struct parallel_data *pd)
 376{
 377        int cpu;
 378        struct padata_serial_queue *squeue;
 379
 380        for_each_cpu(cpu, pd->cpumask.cbcpu) {
 381                squeue = per_cpu_ptr(pd->squeue, cpu);
 382                squeue->pd = pd;
 383                __padata_list_init(&squeue->serial);
 384                INIT_WORK(&squeue->work, padata_serial_worker);
 385        }
 386}
 387
 388/* Initialize all percpu queues used by parallel workers */
 389static void padata_init_pqueues(struct parallel_data *pd)
 390{
 391        int cpu_index, cpu;
 392        struct padata_parallel_queue *pqueue;
 393
 394        cpu_index = 0;
 395        for_each_cpu(cpu, pd->cpumask.pcpu) {
 396                pqueue = per_cpu_ptr(pd->pqueue, cpu);
 397                pqueue->pd = pd;
 398                pqueue->cpu_index = cpu_index;
 399                cpu_index++;
 400
 401                __padata_list_init(&pqueue->reorder);
 402                __padata_list_init(&pqueue->parallel);
 403                INIT_WORK(&pqueue->work, padata_parallel_worker);
 404                atomic_set(&pqueue->num_obj, 0);
 405        }
 406}
 407
 408/* Allocate and initialize the internal cpumask dependend resources. */
 409static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
 410                                             const struct cpumask *pcpumask,
 411                                             const struct cpumask *cbcpumask)
 412{
 413        struct parallel_data *pd;
 414
 415        pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
 416        if (!pd)
 417                goto err;
 418
 419        pd->pqueue = alloc_percpu(struct padata_parallel_queue);
 420        if (!pd->pqueue)
 421                goto err_free_pd;
 422
 423        pd->squeue = alloc_percpu(struct padata_serial_queue);
 424        if (!pd->squeue)
 425                goto err_free_pqueue;
 426        if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0)
 427                goto err_free_squeue;
 428
 429        padata_init_pqueues(pd);
 430        padata_init_squeues(pd);
 431        setup_timer(&pd->timer, padata_reorder_timer, (unsigned long)pd);
 432        pd->seq_nr = 0;
 433        atomic_set(&pd->reorder_objects, 0);
 434        atomic_set(&pd->refcnt, 0);
 435        pd->pinst = pinst;
 436        spin_lock_init(&pd->lock);
 437
 438        return pd;
 439
 440err_free_squeue:
 441        free_percpu(pd->squeue);
 442err_free_pqueue:
 443        free_percpu(pd->pqueue);
 444err_free_pd:
 445        kfree(pd);
 446err:
 447        return NULL;
 448}
 449
 450static void padata_free_pd(struct parallel_data *pd)
 451{
 452        free_cpumask_var(pd->cpumask.pcpu);
 453        free_cpumask_var(pd->cpumask.cbcpu);
 454        free_percpu(pd->pqueue);
 455        free_percpu(pd->squeue);
 456        kfree(pd);
 457}
 458
 459/* Flush all objects out of the padata queues. */
 460static void padata_flush_queues(struct parallel_data *pd)
 461{
 462        int cpu;
 463        struct padata_parallel_queue *pqueue;
 464        struct padata_serial_queue *squeue;
 465
 466        for_each_cpu(cpu, pd->cpumask.pcpu) {
 467                pqueue = per_cpu_ptr(pd->pqueue, cpu);
 468                flush_work(&pqueue->work);
 469        }
 470
 471        del_timer_sync(&pd->timer);
 472
 473        if (atomic_read(&pd->reorder_objects))
 474                padata_reorder(pd);
 475
 476        for_each_cpu(cpu, pd->cpumask.cbcpu) {
 477                squeue = per_cpu_ptr(pd->squeue, cpu);
 478                flush_work(&squeue->work);
 479        }
 480
 481        BUG_ON(atomic_read(&pd->refcnt) != 0);
 482}
 483
 484static void __padata_start(struct padata_instance *pinst)
 485{
 486        pinst->flags |= PADATA_INIT;
 487}
 488
 489static void __padata_stop(struct padata_instance *pinst)
 490{
 491        if (!(pinst->flags & PADATA_INIT))
 492                return;
 493
 494        pinst->flags &= ~PADATA_INIT;
 495
 496        synchronize_rcu();
 497
 498        get_online_cpus();
 499        padata_flush_queues(pinst->pd);
 500        put_online_cpus();
 501}
 502
 503/* Replace the internal control structure with a new one. */
 504static void padata_replace(struct padata_instance *pinst,
 505                           struct parallel_data *pd_new)
 506{
 507        struct parallel_data *pd_old = pinst->pd;
 508        int notification_mask = 0;
 509
 510        pinst->flags |= PADATA_RESET;
 511
 512        rcu_assign_pointer(pinst->pd, pd_new);
 513
 514        synchronize_rcu();
 515
 516        if (!cpumask_equal(pd_old->cpumask.pcpu, pd_new->cpumask.pcpu))
 517                notification_mask |= PADATA_CPU_PARALLEL;
 518        if (!cpumask_equal(pd_old->cpumask.cbcpu, pd_new->cpumask.cbcpu))
 519                notification_mask |= PADATA_CPU_SERIAL;
 520
 521        padata_flush_queues(pd_old);
 522        padata_free_pd(pd_old);
 523
 524        if (notification_mask)
 525                blocking_notifier_call_chain(&pinst->cpumask_change_notifier,
 526                                             notification_mask,
 527                                             &pd_new->cpumask);
 528
 529        pinst->flags &= ~PADATA_RESET;
 530}
 531
 532/**
 533 * padata_register_cpumask_notifier - Registers a notifier that will be called
 534 *                             if either pcpu or cbcpu or both cpumasks change.
 535 *
 536 * @pinst: A poineter to padata instance
 537 * @nblock: A pointer to notifier block.
 538 */
 539int padata_register_cpumask_notifier(struct padata_instance *pinst,
 540                                     struct notifier_block *nblock)
 541{
 542        return blocking_notifier_chain_register(&pinst->cpumask_change_notifier,
 543                                                nblock);
 544}
 545EXPORT_SYMBOL(padata_register_cpumask_notifier);
 546
 547/**
 548 * padata_unregister_cpumask_notifier - Unregisters cpumask notifier
 549 *        registered earlier  using padata_register_cpumask_notifier
 550 *
 551 * @pinst: A pointer to data instance.
 552 * @nlock: A pointer to notifier block.
 553 */
 554int padata_unregister_cpumask_notifier(struct padata_instance *pinst,
 555                                       struct notifier_block *nblock)
 556{
 557        return blocking_notifier_chain_unregister(
 558                &pinst->cpumask_change_notifier,
 559                nblock);
 560}
 561EXPORT_SYMBOL(padata_unregister_cpumask_notifier);
 562
 563
 564/* If cpumask contains no active cpu, we mark the instance as invalid. */
 565static bool padata_validate_cpumask(struct padata_instance *pinst,
 566                                    const struct cpumask *cpumask)
 567{
 568        if (!cpumask_intersects(cpumask, cpu_online_mask)) {
 569                pinst->flags |= PADATA_INVALID;
 570                return false;
 571        }
 572
 573        pinst->flags &= ~PADATA_INVALID;
 574        return true;
 575}
 576
 577static int __padata_set_cpumasks(struct padata_instance *pinst,
 578                                 cpumask_var_t pcpumask,
 579                                 cpumask_var_t cbcpumask)
 580{
 581        int valid;
 582        struct parallel_data *pd;
 583
 584        valid = padata_validate_cpumask(pinst, pcpumask);
 585        if (!valid) {
 586                __padata_stop(pinst);
 587                goto out_replace;
 588        }
 589
 590        valid = padata_validate_cpumask(pinst, cbcpumask);
 591        if (!valid)
 592                __padata_stop(pinst);
 593
 594out_replace:
 595        pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
 596        if (!pd)
 597                return -ENOMEM;
 598
 599        cpumask_copy(pinst->cpumask.pcpu, pcpumask);
 600        cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
 601
 602        padata_replace(pinst, pd);
 603
 604        if (valid)
 605                __padata_start(pinst);
 606
 607        return 0;
 608}
 609
 610/**
 611 * padata_set_cpumasks - Set both parallel and serial cpumasks. The first
 612 *                       one is used by parallel workers and the second one
 613 *                       by the wokers doing serialization.
 614 *
 615 * @pinst: padata instance
 616 * @pcpumask: the cpumask to use for parallel workers
 617 * @cbcpumask: the cpumsak to use for serial workers
 618 */
 619int padata_set_cpumasks(struct padata_instance *pinst, cpumask_var_t pcpumask,
 620                        cpumask_var_t cbcpumask)
 621{
 622        int err;
 623
 624        mutex_lock(&pinst->lock);
 625        get_online_cpus();
 626
 627        err = __padata_set_cpumasks(pinst, pcpumask, cbcpumask);
 628
 629        put_online_cpus();
 630        mutex_unlock(&pinst->lock);
 631
 632        return err;
 633
 634}
 635EXPORT_SYMBOL(padata_set_cpumasks);
 636
 637/**
 638 * padata_set_cpumask: Sets specified by @cpumask_type cpumask to the value
 639 *                     equivalent to @cpumask.
 640 *
 641 * @pinst: padata instance
 642 * @cpumask_type: PADATA_CPU_SERIAL or PADATA_CPU_PARALLEL corresponding
 643 *                to parallel and serial cpumasks respectively.
 644 * @cpumask: the cpumask to use
 645 */
 646int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
 647                       cpumask_var_t cpumask)
 648{
 649        struct cpumask *serial_mask, *parallel_mask;
 650        int err = -EINVAL;
 651
 652        mutex_lock(&pinst->lock);
 653        get_online_cpus();
 654
 655        switch (cpumask_type) {
 656        case PADATA_CPU_PARALLEL:
 657                serial_mask = pinst->cpumask.cbcpu;
 658                parallel_mask = cpumask;
 659                break;
 660        case PADATA_CPU_SERIAL:
 661                parallel_mask = pinst->cpumask.pcpu;
 662                serial_mask = cpumask;
 663                break;
 664        default:
 665                 goto out;
 666        }
 667
 668        err =  __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
 669
 670out:
 671        put_online_cpus();
 672        mutex_unlock(&pinst->lock);
 673
 674        return err;
 675}
 676EXPORT_SYMBOL(padata_set_cpumask);
 677
 678static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
 679{
 680        struct parallel_data *pd;
 681
 682        if (cpumask_test_cpu(cpu, cpu_online_mask)) {
 683                pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
 684                                     pinst->cpumask.cbcpu);
 685                if (!pd)
 686                        return -ENOMEM;
 687
 688                padata_replace(pinst, pd);
 689
 690                if (padata_validate_cpumask(pinst, pinst->cpumask.pcpu) &&
 691                    padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
 692                        __padata_start(pinst);
 693        }
 694
 695        return 0;
 696}
 697
 698 /**
 699 * padata_add_cpu - add a cpu to one or both(parallel and serial)
 700 *                  padata cpumasks.
 701 *
 702 * @pinst: padata instance
 703 * @cpu: cpu to add
 704 * @mask: bitmask of flags specifying to which cpumask @cpu shuld be added.
 705 *        The @mask may be any combination of the following flags:
 706 *          PADATA_CPU_SERIAL   - serial cpumask
 707 *          PADATA_CPU_PARALLEL - parallel cpumask
 708 */
 709
 710int padata_add_cpu(struct padata_instance *pinst, int cpu, int mask)
 711{
 712        int err;
 713
 714        if (!(mask & (PADATA_CPU_SERIAL | PADATA_CPU_PARALLEL)))
 715                return -EINVAL;
 716
 717        mutex_lock(&pinst->lock);
 718
 719        get_online_cpus();
 720        if (mask & PADATA_CPU_SERIAL)
 721                cpumask_set_cpu(cpu, pinst->cpumask.cbcpu);
 722        if (mask & PADATA_CPU_PARALLEL)
 723                cpumask_set_cpu(cpu, pinst->cpumask.pcpu);
 724
 725        err = __padata_add_cpu(pinst, cpu);
 726        put_online_cpus();
 727
 728        mutex_unlock(&pinst->lock);
 729
 730        return err;
 731}
 732EXPORT_SYMBOL(padata_add_cpu);
 733
 734static int __padata_remove_cpu(struct padata_instance *pinst, int cpu)
 735{
 736        struct parallel_data *pd = NULL;
 737
 738        if (cpumask_test_cpu(cpu, cpu_online_mask)) {
 739
 740                if (!padata_validate_cpumask(pinst, pinst->cpumask.pcpu) ||
 741                    !padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
 742                        __padata_stop(pinst);
 743
 744                pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
 745                                     pinst->cpumask.cbcpu);
 746                if (!pd)
 747                        return -ENOMEM;
 748
 749                padata_replace(pinst, pd);
 750
 751                cpumask_clear_cpu(cpu, pd->cpumask.cbcpu);
 752                cpumask_clear_cpu(cpu, pd->cpumask.pcpu);
 753        }
 754
 755        return 0;
 756}
 757
 758 /**
 759 * padata_remove_cpu - remove a cpu from the one or both(serial and parallel)
 760 *                     padata cpumasks.
 761 *
 762 * @pinst: padata instance
 763 * @cpu: cpu to remove
 764 * @mask: bitmask specifying from which cpumask @cpu should be removed
 765 *        The @mask may be any combination of the following flags:
 766 *          PADATA_CPU_SERIAL   - serial cpumask
 767 *          PADATA_CPU_PARALLEL - parallel cpumask
 768 */
 769int padata_remove_cpu(struct padata_instance *pinst, int cpu, int mask)
 770{
 771        int err;
 772
 773        if (!(mask & (PADATA_CPU_SERIAL | PADATA_CPU_PARALLEL)))
 774                return -EINVAL;
 775
 776        mutex_lock(&pinst->lock);
 777
 778        get_online_cpus();
 779        if (mask & PADATA_CPU_SERIAL)
 780                cpumask_clear_cpu(cpu, pinst->cpumask.cbcpu);
 781        if (mask & PADATA_CPU_PARALLEL)
 782                cpumask_clear_cpu(cpu, pinst->cpumask.pcpu);
 783
 784        err = __padata_remove_cpu(pinst, cpu);
 785        put_online_cpus();
 786
 787        mutex_unlock(&pinst->lock);
 788
 789        return err;
 790}
 791EXPORT_SYMBOL(padata_remove_cpu);
 792
 793/**
 794 * padata_start - start the parallel processing
 795 *
 796 * @pinst: padata instance to start
 797 */
 798int padata_start(struct padata_instance *pinst)
 799{
 800        int err = 0;
 801
 802        mutex_lock(&pinst->lock);
 803
 804        if (pinst->flags & PADATA_INVALID)
 805                err =-EINVAL;
 806
 807         __padata_start(pinst);
 808
 809        mutex_unlock(&pinst->lock);
 810
 811        return err;
 812}
 813EXPORT_SYMBOL(padata_start);
 814
 815/**
 816 * padata_stop - stop the parallel processing
 817 *
 818 * @pinst: padata instance to stop
 819 */
 820void padata_stop(struct padata_instance *pinst)
 821{
 822        mutex_lock(&pinst->lock);
 823        __padata_stop(pinst);
 824        mutex_unlock(&pinst->lock);
 825}
 826EXPORT_SYMBOL(padata_stop);
 827
 828#ifdef CONFIG_HOTPLUG_CPU
 829
 830static inline int pinst_has_cpu(struct padata_instance *pinst, int cpu)
 831{
 832        return cpumask_test_cpu(cpu, pinst->cpumask.pcpu) ||
 833                cpumask_test_cpu(cpu, pinst->cpumask.cbcpu);
 834}
 835
 836
 837static int padata_cpu_callback(struct notifier_block *nfb,
 838                               unsigned long action, void *hcpu)
 839{
 840        int err;
 841        struct padata_instance *pinst;
 842        int cpu = (unsigned long)hcpu;
 843
 844        pinst = container_of(nfb, struct padata_instance, cpu_notifier);
 845
 846        switch (action) {
 847        case CPU_ONLINE:
 848        case CPU_ONLINE_FROZEN:
 849                if (!pinst_has_cpu(pinst, cpu))
 850                        break;
 851                mutex_lock(&pinst->lock);
 852                err = __padata_add_cpu(pinst, cpu);
 853                mutex_unlock(&pinst->lock);
 854                if (err)
 855                        return notifier_from_errno(err);
 856                break;
 857
 858        case CPU_DOWN_PREPARE:
 859        case CPU_DOWN_PREPARE_FROZEN:
 860                if (!pinst_has_cpu(pinst, cpu))
 861                        break;
 862                mutex_lock(&pinst->lock);
 863                err = __padata_remove_cpu(pinst, cpu);
 864                mutex_unlock(&pinst->lock);
 865                if (err)
 866                        return notifier_from_errno(err);
 867                break;
 868
 869        case CPU_UP_CANCELED:
 870        case CPU_UP_CANCELED_FROZEN:
 871                if (!pinst_has_cpu(pinst, cpu))
 872                        break;
 873                mutex_lock(&pinst->lock);
 874                __padata_remove_cpu(pinst, cpu);
 875                mutex_unlock(&pinst->lock);
 876
 877        case CPU_DOWN_FAILED:
 878        case CPU_DOWN_FAILED_FROZEN:
 879                if (!pinst_has_cpu(pinst, cpu))
 880                        break;
 881                mutex_lock(&pinst->lock);
 882                __padata_add_cpu(pinst, cpu);
 883                mutex_unlock(&pinst->lock);
 884        }
 885
 886        return NOTIFY_OK;
 887}
 888#endif
 889
 890static void __padata_free(struct padata_instance *pinst)
 891{
 892#ifdef CONFIG_HOTPLUG_CPU
 893        unregister_hotcpu_notifier(&pinst->cpu_notifier);
 894#endif
 895
 896        padata_stop(pinst);
 897        padata_free_pd(pinst->pd);
 898        free_cpumask_var(pinst->cpumask.pcpu);
 899        free_cpumask_var(pinst->cpumask.cbcpu);
 900        kfree(pinst);
 901}
 902
 903#define kobj2pinst(_kobj)                                       \
 904        container_of(_kobj, struct padata_instance, kobj)
 905#define attr2pentry(_attr)                                      \
 906        container_of(_attr, struct padata_sysfs_entry, attr)
 907
 908static void padata_sysfs_release(struct kobject *kobj)
 909{
 910        struct padata_instance *pinst = kobj2pinst(kobj);
 911        __padata_free(pinst);
 912}
 913
 914struct padata_sysfs_entry {
 915        struct attribute attr;
 916        ssize_t (*show)(struct padata_instance *, struct attribute *, char *);
 917        ssize_t (*store)(struct padata_instance *, struct attribute *,
 918                         const char *, size_t);
 919};
 920
 921static ssize_t show_cpumask(struct padata_instance *pinst,
 922                            struct attribute *attr,  char *buf)
 923{
 924        struct cpumask *cpumask;
 925        ssize_t len;
 926
 927        mutex_lock(&pinst->lock);
 928        if (!strcmp(attr->name, "serial_cpumask"))
 929                cpumask = pinst->cpumask.cbcpu;
 930        else
 931                cpumask = pinst->cpumask.pcpu;
 932
 933        len = bitmap_scnprintf(buf, PAGE_SIZE, cpumask_bits(cpumask),
 934                               nr_cpu_ids);
 935        if (PAGE_SIZE - len < 2)
 936                len = -EINVAL;
 937        else
 938                len += sprintf(buf + len, "\n");
 939
 940        mutex_unlock(&pinst->lock);
 941        return len;
 942}
 943
 944static ssize_t store_cpumask(struct padata_instance *pinst,
 945                             struct attribute *attr,
 946                             const char *buf, size_t count)
 947{
 948        cpumask_var_t new_cpumask;
 949        ssize_t ret;
 950        int mask_type;
 951
 952        if (!alloc_cpumask_var(&new_cpumask, GFP_KERNEL))
 953                return -ENOMEM;
 954
 955        ret = bitmap_parse(buf, count, cpumask_bits(new_cpumask),
 956                           nr_cpumask_bits);
 957        if (ret < 0)
 958                goto out;
 959
 960        mask_type = !strcmp(attr->name, "serial_cpumask") ?
 961                PADATA_CPU_SERIAL : PADATA_CPU_PARALLEL;
 962        ret = padata_set_cpumask(pinst, mask_type, new_cpumask);
 963        if (!ret)
 964                ret = count;
 965
 966out:
 967        free_cpumask_var(new_cpumask);
 968        return ret;
 969}
 970
 971#define PADATA_ATTR_RW(_name, _show_name, _store_name)          \
 972        static struct padata_sysfs_entry _name##_attr =         \
 973                __ATTR(_name, 0644, _show_name, _store_name)
 974#define PADATA_ATTR_RO(_name, _show_name)               \
 975        static struct padata_sysfs_entry _name##_attr = \
 976                __ATTR(_name, 0400, _show_name, NULL)
 977
 978PADATA_ATTR_RW(serial_cpumask, show_cpumask, store_cpumask);
 979PADATA_ATTR_RW(parallel_cpumask, show_cpumask, store_cpumask);
 980
 981/*
 982 * Padata sysfs provides the following objects:
 983 * serial_cpumask   [RW] - cpumask for serial workers
 984 * parallel_cpumask [RW] - cpumask for parallel workers
 985 */
 986static struct attribute *padata_default_attrs[] = {
 987        &serial_cpumask_attr.attr,
 988        &parallel_cpumask_attr.attr,
 989        NULL,
 990};
 991
 992static ssize_t padata_sysfs_show(struct kobject *kobj,
 993                                 struct attribute *attr, char *buf)
 994{
 995        struct padata_instance *pinst;
 996        struct padata_sysfs_entry *pentry;
 997        ssize_t ret = -EIO;
 998
 999        pinst = kobj2pinst(kobj);
1000        pentry = attr2pentry(attr);
1001        if (pentry->show)
1002                ret = pentry->show(pinst, attr, buf);
1003
1004        return ret;
1005}
1006
1007static ssize_t padata_sysfs_store(struct kobject *kobj, struct attribute *attr,
1008                                  const char *buf, size_t count)
1009{
1010        struct padata_instance *pinst;
1011        struct padata_sysfs_entry *pentry;
1012        ssize_t ret = -EIO;
1013
1014        pinst = kobj2pinst(kobj);
1015        pentry = attr2pentry(attr);
1016        if (pentry->show)
1017                ret = pentry->store(pinst, attr, buf, count);
1018
1019        return ret;
1020}
1021
1022static const struct sysfs_ops padata_sysfs_ops = {
1023        .show = padata_sysfs_show,
1024        .store = padata_sysfs_store,
1025};
1026
1027static struct kobj_type padata_attr_type = {
1028        .sysfs_ops = &padata_sysfs_ops,
1029        .default_attrs = padata_default_attrs,
1030        .release = padata_sysfs_release,
1031};
1032
1033/**
1034 * padata_alloc_possible - Allocate and initialize padata instance.
1035 *                         Use the cpu_possible_mask for serial and
1036 *                         parallel workers.
1037 *
1038 * @wq: workqueue to use for the allocated padata instance
1039 */
1040struct padata_instance *padata_alloc_possible(struct workqueue_struct *wq)
1041{
1042        return padata_alloc(wq, cpu_possible_mask, cpu_possible_mask);
1043}
1044EXPORT_SYMBOL(padata_alloc_possible);
1045
1046/**
1047 * padata_alloc - allocate and initialize a padata instance and specify
1048 *                cpumasks for serial and parallel workers.
1049 *
1050 * @wq: workqueue to use for the allocated padata instance
1051 * @pcpumask: cpumask that will be used for padata parallelization
1052 * @cbcpumask: cpumask that will be used for padata serialization
1053 */
1054struct padata_instance *padata_alloc(struct workqueue_struct *wq,
1055                                     const struct cpumask *pcpumask,
1056                                     const struct cpumask *cbcpumask)
1057{
1058        struct padata_instance *pinst;
1059        struct parallel_data *pd = NULL;
1060
1061        pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL);
1062        if (!pinst)
1063                goto err;
1064
1065        get_online_cpus();
1066        if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
1067                goto err_free_inst;
1068        if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {
1069                free_cpumask_var(pinst->cpumask.pcpu);
1070                goto err_free_inst;
1071        }
1072        if (!padata_validate_cpumask(pinst, pcpumask) ||
1073            !padata_validate_cpumask(pinst, cbcpumask))
1074                goto err_free_masks;
1075
1076        pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
1077        if (!pd)
1078                goto err_free_masks;
1079
1080        rcu_assign_pointer(pinst->pd, pd);
1081
1082        pinst->wq = wq;
1083
1084        cpumask_copy(pinst->cpumask.pcpu, pcpumask);
1085        cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
1086
1087        pinst->flags = 0;
1088
1089#ifdef CONFIG_HOTPLUG_CPU
1090        pinst->cpu_notifier.notifier_call = padata_cpu_callback;
1091        pinst->cpu_notifier.priority = 0;
1092        register_hotcpu_notifier(&pinst->cpu_notifier);
1093#endif
1094
1095        put_online_cpus();
1096
1097        BLOCKING_INIT_NOTIFIER_HEAD(&pinst->cpumask_change_notifier);
1098        kobject_init(&pinst->kobj, &padata_attr_type);
1099        mutex_init(&pinst->lock);
1100
1101        return pinst;
1102
1103err_free_masks:
1104        free_cpumask_var(pinst->cpumask.pcpu);
1105        free_cpumask_var(pinst->cpumask.cbcpu);
1106err_free_inst:
1107        kfree(pinst);
1108        put_online_cpus();
1109err:
1110        return NULL;
1111}
1112EXPORT_SYMBOL(padata_alloc);
1113
1114/**
1115 * padata_free - free a padata instance
1116 *
1117 * @padata_inst: padata instance to free
1118 */
1119void padata_free(struct padata_instance *pinst)
1120{
1121        kobject_put(&pinst->kobj);
1122}
1123EXPORT_SYMBOL(padata_free);
1124