linux/fs/io-wq.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Basic worker thread pool for io_uring
   4 *
   5 * Copyright (C) 2019 Jens Axboe
   6 *
   7 */
   8#include <linux/kernel.h>
   9#include <linux/init.h>
  10#include <linux/errno.h>
  11#include <linux/sched/signal.h>
  12#include <linux/mm.h>
  13#include <linux/mmu_context.h>
  14#include <linux/sched/mm.h>
  15#include <linux/percpu.h>
  16#include <linux/slab.h>
  17#include <linux/kthread.h>
  18#include <linux/rculist_nulls.h>
  19#include <linux/fs_struct.h>
  20#include <linux/task_work.h>
  21
  22#include "io-wq.h"
  23
  24#define WORKER_IDLE_TIMEOUT     (5 * HZ)
  25
  26enum {
  27        IO_WORKER_F_UP          = 1,    /* up and active */
  28        IO_WORKER_F_RUNNING     = 2,    /* account as running */
  29        IO_WORKER_F_FREE        = 4,    /* worker on free list */
  30        IO_WORKER_F_EXITING     = 8,    /* worker exiting */
  31        IO_WORKER_F_FIXED       = 16,   /* static idle worker */
  32        IO_WORKER_F_BOUND       = 32,   /* is doing bounded work */
  33};
  34
  35enum {
  36        IO_WQ_BIT_EXIT          = 0,    /* wq exiting */
  37        IO_WQ_BIT_CANCEL        = 1,    /* cancel work on list */
  38        IO_WQ_BIT_ERROR         = 2,    /* error on setup */
  39};
  40
  41enum {
  42        IO_WQE_FLAG_STALLED     = 1,    /* stalled on hash */
  43};
  44
  45/*
  46 * One for each thread in a wqe pool
  47 */
  48struct io_worker {
  49        refcount_t ref;
  50        unsigned flags;
  51        struct hlist_nulls_node nulls_node;
  52        struct list_head all_list;
  53        struct task_struct *task;
  54        struct io_wqe *wqe;
  55
  56        struct io_wq_work *cur_work;
  57        spinlock_t lock;
  58
  59        struct rcu_head rcu;
  60        struct mm_struct *mm;
  61        const struct cred *cur_creds;
  62        const struct cred *saved_creds;
  63        struct files_struct *restore_files;
  64        struct fs_struct *restore_fs;
  65};
  66
  67#if BITS_PER_LONG == 64
  68#define IO_WQ_HASH_ORDER        6
  69#else
  70#define IO_WQ_HASH_ORDER        5
  71#endif
  72
  73#define IO_WQ_NR_HASH_BUCKETS   (1u << IO_WQ_HASH_ORDER)
  74
  75struct io_wqe_acct {
  76        unsigned nr_workers;
  77        unsigned max_workers;
  78        atomic_t nr_running;
  79};
  80
  81enum {
  82        IO_WQ_ACCT_BOUND,
  83        IO_WQ_ACCT_UNBOUND,
  84};
  85
  86/*
  87 * Per-node worker thread pool
  88 */
  89struct io_wqe {
  90        struct {
  91                spinlock_t lock;
  92                struct io_wq_work_list work_list;
  93                unsigned long hash_map;
  94                unsigned flags;
  95        } ____cacheline_aligned_in_smp;
  96
  97        int node;
  98        struct io_wqe_acct acct[2];
  99
 100        struct hlist_nulls_head free_list;
 101        struct list_head all_list;
 102
 103        struct io_wq *wq;
 104        struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
 105};
 106
 107/*
 108 * Per io_wq state
 109  */
 110struct io_wq {
 111        struct io_wqe **wqes;
 112        unsigned long state;
 113
 114        free_work_fn *free_work;
 115
 116        struct task_struct *manager;
 117        struct user_struct *user;
 118        refcount_t refs;
 119        struct completion done;
 120
 121        refcount_t use_refs;
 122};
 123
 124static bool io_worker_get(struct io_worker *worker)
 125{
 126        return refcount_inc_not_zero(&worker->ref);
 127}
 128
 129static void io_worker_release(struct io_worker *worker)
 130{
 131        if (refcount_dec_and_test(&worker->ref))
 132                wake_up_process(worker->task);
 133}
 134
 135/*
 136 * Note: drops the wqe->lock if returning true! The caller must re-acquire
 137 * the lock in that case. Some callers need to restart handling if this
 138 * happens, so we can't just re-acquire the lock on behalf of the caller.
 139 */
 140static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
 141{
 142        bool dropped_lock = false;
 143
 144        if (worker->saved_creds) {
 145                revert_creds(worker->saved_creds);
 146                worker->cur_creds = worker->saved_creds = NULL;
 147        }
 148
 149        if (current->files != worker->restore_files) {
 150                __acquire(&wqe->lock);
 151                spin_unlock_irq(&wqe->lock);
 152                dropped_lock = true;
 153
 154                task_lock(current);
 155                current->files = worker->restore_files;
 156                task_unlock(current);
 157        }
 158
 159        if (current->fs != worker->restore_fs)
 160                current->fs = worker->restore_fs;
 161
 162        /*
 163         * If we have an active mm, we need to drop the wq lock before unusing
 164         * it. If we do, return true and let the caller retry the idle loop.
 165         */
 166        if (worker->mm) {
 167                if (!dropped_lock) {
 168                        __acquire(&wqe->lock);
 169                        spin_unlock_irq(&wqe->lock);
 170                        dropped_lock = true;
 171                }
 172                __set_current_state(TASK_RUNNING);
 173                set_fs(KERNEL_DS);
 174                unuse_mm(worker->mm);
 175                mmput(worker->mm);
 176                worker->mm = NULL;
 177        }
 178
 179        return dropped_lock;
 180}
 181
 182static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
 183                                                   struct io_wq_work *work)
 184{
 185        if (work->flags & IO_WQ_WORK_UNBOUND)
 186                return &wqe->acct[IO_WQ_ACCT_UNBOUND];
 187
 188        return &wqe->acct[IO_WQ_ACCT_BOUND];
 189}
 190
 191static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
 192                                                  struct io_worker *worker)
 193{
 194        if (worker->flags & IO_WORKER_F_BOUND)
 195                return &wqe->acct[IO_WQ_ACCT_BOUND];
 196
 197        return &wqe->acct[IO_WQ_ACCT_UNBOUND];
 198}
 199
 200static void io_worker_exit(struct io_worker *worker)
 201{
 202        struct io_wqe *wqe = worker->wqe;
 203        struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
 204        unsigned nr_workers;
 205
 206        /*
 207         * If we're not at zero, someone else is holding a brief reference
 208         * to the worker. Wait for that to go away.
 209         */
 210        set_current_state(TASK_INTERRUPTIBLE);
 211        if (!refcount_dec_and_test(&worker->ref))
 212                schedule();
 213        __set_current_state(TASK_RUNNING);
 214
 215        preempt_disable();
 216        current->flags &= ~PF_IO_WORKER;
 217        if (worker->flags & IO_WORKER_F_RUNNING)
 218                atomic_dec(&acct->nr_running);
 219        if (!(worker->flags & IO_WORKER_F_BOUND))
 220                atomic_dec(&wqe->wq->user->processes);
 221        worker->flags = 0;
 222        preempt_enable();
 223
 224        spin_lock_irq(&wqe->lock);
 225        hlist_nulls_del_rcu(&worker->nulls_node);
 226        list_del_rcu(&worker->all_list);
 227        if (__io_worker_unuse(wqe, worker)) {
 228                __release(&wqe->lock);
 229                spin_lock_irq(&wqe->lock);
 230        }
 231        acct->nr_workers--;
 232        nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers +
 233                        wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers;
 234        spin_unlock_irq(&wqe->lock);
 235
 236        /* all workers gone, wq exit can proceed */
 237        if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs))
 238                complete(&wqe->wq->done);
 239
 240        kfree_rcu(worker, rcu);
 241}
 242
 243static inline bool io_wqe_run_queue(struct io_wqe *wqe)
 244        __must_hold(wqe->lock)
 245{
 246        if (!wq_list_empty(&wqe->work_list) &&
 247            !(wqe->flags & IO_WQE_FLAG_STALLED))
 248                return true;
 249        return false;
 250}
 251
 252/*
 253 * Check head of free list for an available worker. If one isn't available,
 254 * caller must wake up the wq manager to create one.
 255 */
 256static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
 257        __must_hold(RCU)
 258{
 259        struct hlist_nulls_node *n;
 260        struct io_worker *worker;
 261
 262        n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
 263        if (is_a_nulls(n))
 264                return false;
 265
 266        worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
 267        if (io_worker_get(worker)) {
 268                wake_up_process(worker->task);
 269                io_worker_release(worker);
 270                return true;
 271        }
 272
 273        return false;
 274}
 275
 276/*
 277 * We need a worker. If we find a free one, we're good. If not, and we're
 278 * below the max number of workers, wake up the manager to create one.
 279 */
 280static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 281{
 282        bool ret;
 283
 284        /*
 285         * Most likely an attempt to queue unbounded work on an io_wq that
 286         * wasn't setup with any unbounded workers.
 287         */
 288        WARN_ON_ONCE(!acct->max_workers);
 289
 290        rcu_read_lock();
 291        ret = io_wqe_activate_free_worker(wqe);
 292        rcu_read_unlock();
 293
 294        if (!ret && acct->nr_workers < acct->max_workers)
 295                wake_up_process(wqe->wq->manager);
 296}
 297
 298static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
 299{
 300        struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
 301
 302        atomic_inc(&acct->nr_running);
 303}
 304
 305static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
 306        __must_hold(wqe->lock)
 307{
 308        struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
 309
 310        if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
 311                io_wqe_wake_worker(wqe, acct);
 312}
 313
 314static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
 315{
 316        allow_kernel_signal(SIGINT);
 317
 318        current->flags |= PF_IO_WORKER;
 319
 320        worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
 321        worker->restore_files = current->files;
 322        worker->restore_fs = current->fs;
 323        io_wqe_inc_running(wqe, worker);
 324}
 325
 326/*
 327 * Worker will start processing some work. Move it to the busy list, if
 328 * it's currently on the freelist
 329 */
 330static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
 331                             struct io_wq_work *work)
 332        __must_hold(wqe->lock)
 333{
 334        bool worker_bound, work_bound;
 335
 336        if (worker->flags & IO_WORKER_F_FREE) {
 337                worker->flags &= ~IO_WORKER_F_FREE;
 338                hlist_nulls_del_init_rcu(&worker->nulls_node);
 339        }
 340
 341        /*
 342         * If worker is moving from bound to unbound (or vice versa), then
 343         * ensure we update the running accounting.
 344         */
 345        worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
 346        work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
 347        if (worker_bound != work_bound) {
 348                io_wqe_dec_running(wqe, worker);
 349                if (work_bound) {
 350                        worker->flags |= IO_WORKER_F_BOUND;
 351                        wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
 352                        wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
 353                        atomic_dec(&wqe->wq->user->processes);
 354                } else {
 355                        worker->flags &= ~IO_WORKER_F_BOUND;
 356                        wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
 357                        wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
 358                        atomic_inc(&wqe->wq->user->processes);
 359                }
 360                io_wqe_inc_running(wqe, worker);
 361         }
 362}
 363
 364/*
 365 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 366 * have one attached. Dropping the mm may potentially sleep, so we drop
 367 * the lock in that case and return success. Since the caller has to
 368 * retry the loop in that case (we changed task state), we don't regrab
 369 * the lock if we return success.
 370 */
 371static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
 372        __must_hold(wqe->lock)
 373{
 374        if (!(worker->flags & IO_WORKER_F_FREE)) {
 375                worker->flags |= IO_WORKER_F_FREE;
 376                hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
 377        }
 378
 379        return __io_worker_unuse(wqe, worker);
 380}
 381
 382static inline unsigned int io_get_work_hash(struct io_wq_work *work)
 383{
 384        return work->flags >> IO_WQ_HASH_SHIFT;
 385}
 386
 387static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
 388        __must_hold(wqe->lock)
 389{
 390        struct io_wq_work_node *node, *prev;
 391        struct io_wq_work *work, *tail;
 392        unsigned int hash;
 393
 394        wq_list_for_each(node, prev, &wqe->work_list) {
 395                work = container_of(node, struct io_wq_work, list);
 396
 397                /* not hashed, can run anytime */
 398                if (!io_wq_is_hashed(work)) {
 399                        wq_list_del(&wqe->work_list, node, prev);
 400                        return work;
 401                }
 402
 403                /* hashed, can run if not already running */
 404                hash = io_get_work_hash(work);
 405                if (!(wqe->hash_map & BIT(hash))) {
 406                        wqe->hash_map |= BIT(hash);
 407                        /* all items with this hash lie in [work, tail] */
 408                        tail = wqe->hash_tail[hash];
 409                        wqe->hash_tail[hash] = NULL;
 410                        wq_list_cut(&wqe->work_list, &tail->list, prev);
 411                        return work;
 412                }
 413        }
 414
 415        return NULL;
 416}
 417
 418static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work)
 419{
 420        if (worker->mm) {
 421                unuse_mm(worker->mm);
 422                mmput(worker->mm);
 423                worker->mm = NULL;
 424        }
 425        if (!work->mm) {
 426                set_fs(KERNEL_DS);
 427                return;
 428        }
 429        if (mmget_not_zero(work->mm)) {
 430                use_mm(work->mm);
 431                if (!worker->mm)
 432                        set_fs(USER_DS);
 433                worker->mm = work->mm;
 434                /* hang on to this mm */
 435                work->mm = NULL;
 436                return;
 437        }
 438
 439        /* failed grabbing mm, ensure work gets cancelled */
 440        work->flags |= IO_WQ_WORK_CANCEL;
 441}
 442
 443static void io_wq_switch_creds(struct io_worker *worker,
 444                               struct io_wq_work *work)
 445{
 446        const struct cred *old_creds = override_creds(work->creds);
 447
 448        worker->cur_creds = work->creds;
 449        if (worker->saved_creds)
 450                put_cred(old_creds); /* creds set by previous switch */
 451        else
 452                worker->saved_creds = old_creds;
 453}
 454
 455static void io_impersonate_work(struct io_worker *worker,
 456                                struct io_wq_work *work)
 457{
 458        if (work->files && current->files != work->files) {
 459                task_lock(current);
 460                current->files = work->files;
 461                task_unlock(current);
 462        }
 463        if (work->fs && current->fs != work->fs)
 464                current->fs = work->fs;
 465        if (work->mm != worker->mm)
 466                io_wq_switch_mm(worker, work);
 467        if (worker->cur_creds != work->creds)
 468                io_wq_switch_creds(worker, work);
 469}
 470
 471static void io_assign_current_work(struct io_worker *worker,
 472                                   struct io_wq_work *work)
 473{
 474        if (work) {
 475                /* flush pending signals before assigning new work */
 476                if (signal_pending(current))
 477                        flush_signals(current);
 478                cond_resched();
 479        }
 480
 481        spin_lock_irq(&worker->lock);
 482        worker->cur_work = work;
 483        spin_unlock_irq(&worker->lock);
 484}
 485
 486static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
 487
 488static void io_worker_handle_work(struct io_worker *worker)
 489        __releases(wqe->lock)
 490{
 491        struct io_wqe *wqe = worker->wqe;
 492        struct io_wq *wq = wqe->wq;
 493
 494        do {
 495                struct io_wq_work *work;
 496                unsigned int hash;
 497get_next:
 498                /*
 499                 * If we got some work, mark us as busy. If we didn't, but
 500                 * the list isn't empty, it means we stalled on hashed work.
 501                 * Mark us stalled so we don't keep looking for work when we
 502                 * can't make progress, any work completion or insertion will
 503                 * clear the stalled flag.
 504                 */
 505                work = io_get_next_work(wqe);
 506                if (work)
 507                        __io_worker_busy(wqe, worker, work);
 508                else if (!wq_list_empty(&wqe->work_list))
 509                        wqe->flags |= IO_WQE_FLAG_STALLED;
 510
 511                spin_unlock_irq(&wqe->lock);
 512                if (!work)
 513                        break;
 514                io_assign_current_work(worker, work);
 515
 516                /* handle a whole dependent link */
 517                do {
 518                        struct io_wq_work *old_work, *next_hashed, *linked;
 519
 520                        next_hashed = wq_next_work(work);
 521                        io_impersonate_work(worker, work);
 522                        /*
 523                         * OK to set IO_WQ_WORK_CANCEL even for uncancellable
 524                         * work, the worker function will do the right thing.
 525                         */
 526                        if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
 527                                work->flags |= IO_WQ_WORK_CANCEL;
 528
 529                        hash = io_get_work_hash(work);
 530                        linked = old_work = work;
 531                        linked->func(&linked);
 532                        linked = (old_work == linked) ? NULL : linked;
 533
 534                        work = next_hashed;
 535                        if (!work && linked && !io_wq_is_hashed(linked)) {
 536                                work = linked;
 537                                linked = NULL;
 538                        }
 539                        io_assign_current_work(worker, work);
 540                        wq->free_work(old_work);
 541
 542                        if (linked)
 543                                io_wqe_enqueue(wqe, linked);
 544
 545                        if (hash != -1U && !next_hashed) {
 546                                spin_lock_irq(&wqe->lock);
 547                                wqe->hash_map &= ~BIT_ULL(hash);
 548                                wqe->flags &= ~IO_WQE_FLAG_STALLED;
 549                                /* dependent work is not hashed */
 550                                hash = -1U;
 551                                /* skip unnecessary unlock-lock wqe->lock */
 552                                if (!work)
 553                                        goto get_next;
 554                                spin_unlock_irq(&wqe->lock);
 555                        }
 556                } while (work);
 557
 558                spin_lock_irq(&wqe->lock);
 559        } while (1);
 560}
 561
 562static int io_wqe_worker(void *data)
 563{
 564        struct io_worker *worker = data;
 565        struct io_wqe *wqe = worker->wqe;
 566        struct io_wq *wq = wqe->wq;
 567
 568        io_worker_start(wqe, worker);
 569
 570        while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 571                set_current_state(TASK_INTERRUPTIBLE);
 572loop:
 573                spin_lock_irq(&wqe->lock);
 574                if (io_wqe_run_queue(wqe)) {
 575                        __set_current_state(TASK_RUNNING);
 576                        io_worker_handle_work(worker);
 577                        goto loop;
 578                }
 579                /* drops the lock on success, retry */
 580                if (__io_worker_idle(wqe, worker)) {
 581                        __release(&wqe->lock);
 582                        goto loop;
 583                }
 584                spin_unlock_irq(&wqe->lock);
 585                if (signal_pending(current))
 586                        flush_signals(current);
 587                if (schedule_timeout(WORKER_IDLE_TIMEOUT))
 588                        continue;
 589                /* timed out, exit unless we're the fixed worker */
 590                if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
 591                    !(worker->flags & IO_WORKER_F_FIXED))
 592                        break;
 593        }
 594
 595        if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 596                spin_lock_irq(&wqe->lock);
 597                if (!wq_list_empty(&wqe->work_list))
 598                        io_worker_handle_work(worker);
 599                else
 600                        spin_unlock_irq(&wqe->lock);
 601        }
 602
 603        io_worker_exit(worker);
 604        return 0;
 605}
 606
 607/*
 608 * Called when a worker is scheduled in. Mark us as currently running.
 609 */
 610void io_wq_worker_running(struct task_struct *tsk)
 611{
 612        struct io_worker *worker = kthread_data(tsk);
 613        struct io_wqe *wqe = worker->wqe;
 614
 615        if (!(worker->flags & IO_WORKER_F_UP))
 616                return;
 617        if (worker->flags & IO_WORKER_F_RUNNING)
 618                return;
 619        worker->flags |= IO_WORKER_F_RUNNING;
 620        io_wqe_inc_running(wqe, worker);
 621}
 622
 623/*
 624 * Called when worker is going to sleep. If there are no workers currently
 625 * running and we have work pending, wake up a free one or have the manager
 626 * set one up.
 627 */
 628void io_wq_worker_sleeping(struct task_struct *tsk)
 629{
 630        struct io_worker *worker = kthread_data(tsk);
 631        struct io_wqe *wqe = worker->wqe;
 632
 633        if (!(worker->flags & IO_WORKER_F_UP))
 634                return;
 635        if (!(worker->flags & IO_WORKER_F_RUNNING))
 636                return;
 637
 638        worker->flags &= ~IO_WORKER_F_RUNNING;
 639
 640        spin_lock_irq(&wqe->lock);
 641        io_wqe_dec_running(wqe, worker);
 642        spin_unlock_irq(&wqe->lock);
 643}
 644
 645static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 646{
 647        struct io_wqe_acct *acct =&wqe->acct[index];
 648        struct io_worker *worker;
 649
 650        worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
 651        if (!worker)
 652                return false;
 653
 654        refcount_set(&worker->ref, 1);
 655        worker->nulls_node.pprev = NULL;
 656        worker->wqe = wqe;
 657        spin_lock_init(&worker->lock);
 658
 659        worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
 660                                "io_wqe_worker-%d/%d", index, wqe->node);
 661        if (IS_ERR(worker->task)) {
 662                kfree(worker);
 663                return false;
 664        }
 665
 666        spin_lock_irq(&wqe->lock);
 667        hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
 668        list_add_tail_rcu(&worker->all_list, &wqe->all_list);
 669        worker->flags |= IO_WORKER_F_FREE;
 670        if (index == IO_WQ_ACCT_BOUND)
 671                worker->flags |= IO_WORKER_F_BOUND;
 672        if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
 673                worker->flags |= IO_WORKER_F_FIXED;
 674        acct->nr_workers++;
 675        spin_unlock_irq(&wqe->lock);
 676
 677        if (index == IO_WQ_ACCT_UNBOUND)
 678                atomic_inc(&wq->user->processes);
 679
 680        wake_up_process(worker->task);
 681        return true;
 682}
 683
 684static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
 685        __must_hold(wqe->lock)
 686{
 687        struct io_wqe_acct *acct = &wqe->acct[index];
 688
 689        /* if we have available workers or no work, no need */
 690        if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
 691                return false;
 692        return acct->nr_workers < acct->max_workers;
 693}
 694
 695/*
 696 * Manager thread. Tasked with creating new workers, if we need them.
 697 */
 698static int io_wq_manager(void *data)
 699{
 700        struct io_wq *wq = data;
 701        int workers_to_create = num_possible_nodes();
 702        int node;
 703
 704        /* create fixed workers */
 705        refcount_set(&wq->refs, workers_to_create);
 706        for_each_node(node) {
 707                if (!node_online(node))
 708                        continue;
 709                if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
 710                        goto err;
 711                workers_to_create--;
 712        }
 713
 714        while (workers_to_create--)
 715                refcount_dec(&wq->refs);
 716
 717        complete(&wq->done);
 718
 719        while (!kthread_should_stop()) {
 720                if (current->task_works)
 721                        task_work_run();
 722
 723                for_each_node(node) {
 724                        struct io_wqe *wqe = wq->wqes[node];
 725                        bool fork_worker[2] = { false, false };
 726
 727                        if (!node_online(node))
 728                                continue;
 729
 730                        spin_lock_irq(&wqe->lock);
 731                        if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
 732                                fork_worker[IO_WQ_ACCT_BOUND] = true;
 733                        if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
 734                                fork_worker[IO_WQ_ACCT_UNBOUND] = true;
 735                        spin_unlock_irq(&wqe->lock);
 736                        if (fork_worker[IO_WQ_ACCT_BOUND])
 737                                create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
 738                        if (fork_worker[IO_WQ_ACCT_UNBOUND])
 739                                create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
 740                }
 741                set_current_state(TASK_INTERRUPTIBLE);
 742                schedule_timeout(HZ);
 743        }
 744
 745        if (current->task_works)
 746                task_work_run();
 747
 748        return 0;
 749err:
 750        set_bit(IO_WQ_BIT_ERROR, &wq->state);
 751        set_bit(IO_WQ_BIT_EXIT, &wq->state);
 752        if (refcount_sub_and_test(workers_to_create, &wq->refs))
 753                complete(&wq->done);
 754        return 0;
 755}
 756
 757static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
 758                            struct io_wq_work *work)
 759{
 760        bool free_worker;
 761
 762        if (!(work->flags & IO_WQ_WORK_UNBOUND))
 763                return true;
 764        if (atomic_read(&acct->nr_running))
 765                return true;
 766
 767        rcu_read_lock();
 768        free_worker = !hlist_nulls_empty(&wqe->free_list);
 769        rcu_read_unlock();
 770        if (free_worker)
 771                return true;
 772
 773        if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
 774            !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
 775                return false;
 776
 777        return true;
 778}
 779
 780static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 781{
 782        struct io_wq *wq = wqe->wq;
 783
 784        do {
 785                struct io_wq_work *old_work = work;
 786
 787                work->flags |= IO_WQ_WORK_CANCEL;
 788                work->func(&work);
 789                work = (work == old_work) ? NULL : work;
 790                wq->free_work(old_work);
 791        } while (work);
 792}
 793
 794static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
 795{
 796        unsigned int hash;
 797        struct io_wq_work *tail;
 798
 799        if (!io_wq_is_hashed(work)) {
 800append:
 801                wq_list_add_tail(&work->list, &wqe->work_list);
 802                return;
 803        }
 804
 805        hash = io_get_work_hash(work);
 806        tail = wqe->hash_tail[hash];
 807        wqe->hash_tail[hash] = work;
 808        if (!tail)
 809                goto append;
 810
 811        wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
 812}
 813
 814static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 815{
 816        struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 817        int work_flags;
 818        unsigned long flags;
 819
 820        /*
 821         * Do early check to see if we need a new unbound worker, and if we do,
 822         * if we're allowed to do so. This isn't 100% accurate as there's a
 823         * gap between this check and incrementing the value, but that's OK.
 824         * It's close enough to not be an issue, fork() has the same delay.
 825         */
 826        if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
 827                io_run_cancel(work, wqe);
 828                return;
 829        }
 830
 831        work_flags = work->flags;
 832        spin_lock_irqsave(&wqe->lock, flags);
 833        io_wqe_insert_work(wqe, work);
 834        wqe->flags &= ~IO_WQE_FLAG_STALLED;
 835        spin_unlock_irqrestore(&wqe->lock, flags);
 836
 837        if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
 838            !atomic_read(&acct->nr_running))
 839                io_wqe_wake_worker(wqe, acct);
 840}
 841
 842void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
 843{
 844        struct io_wqe *wqe = wq->wqes[numa_node_id()];
 845
 846        io_wqe_enqueue(wqe, work);
 847}
 848
 849/*
 850 * Work items that hash to the same value will not be done in parallel.
 851 * Used to limit concurrent writes, generally hashed by inode.
 852 */
 853void io_wq_hash_work(struct io_wq_work *work, void *val)
 854{
 855        unsigned int bit;
 856
 857        bit = hash_ptr(val, IO_WQ_HASH_ORDER);
 858        work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
 859}
 860
 861static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
 862{
 863        send_sig(SIGINT, worker->task, 1);
 864        return false;
 865}
 866
 867/*
 868 * Iterate the passed in list and call the specific function for each
 869 * worker that isn't exiting
 870 */
 871static bool io_wq_for_each_worker(struct io_wqe *wqe,
 872                                  bool (*func)(struct io_worker *, void *),
 873                                  void *data)
 874{
 875        struct io_worker *worker;
 876        bool ret = false;
 877
 878        list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
 879                if (io_worker_get(worker)) {
 880                        /* no task if node is/was offline */
 881                        if (worker->task)
 882                                ret = func(worker, data);
 883                        io_worker_release(worker);
 884                        if (ret)
 885                                break;
 886                }
 887        }
 888
 889        return ret;
 890}
 891
 892void io_wq_cancel_all(struct io_wq *wq)
 893{
 894        int node;
 895
 896        set_bit(IO_WQ_BIT_CANCEL, &wq->state);
 897
 898        rcu_read_lock();
 899        for_each_node(node) {
 900                struct io_wqe *wqe = wq->wqes[node];
 901
 902                io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
 903        }
 904        rcu_read_unlock();
 905}
 906
 907struct io_cb_cancel_data {
 908        work_cancel_fn *fn;
 909        void *data;
 910};
 911
 912static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 913{
 914        struct io_cb_cancel_data *match = data;
 915        unsigned long flags;
 916        bool ret = false;
 917
 918        /*
 919         * Hold the lock to avoid ->cur_work going out of scope, caller
 920         * may dereference the passed in work.
 921         */
 922        spin_lock_irqsave(&worker->lock, flags);
 923        if (worker->cur_work &&
 924            !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
 925            match->fn(worker->cur_work, match->data)) {
 926                send_sig(SIGINT, worker->task, 1);
 927                ret = true;
 928        }
 929        spin_unlock_irqrestore(&worker->lock, flags);
 930
 931        return ret;
 932}
 933
 934static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
 935                                            struct io_cb_cancel_data *match)
 936{
 937        struct io_wq_work_node *node, *prev;
 938        struct io_wq_work *work;
 939        unsigned long flags;
 940        bool found = false;
 941
 942        /*
 943         * First check pending list, if we're lucky we can just remove it
 944         * from there. CANCEL_OK means that the work is returned as-new,
 945         * no completion will be posted for it.
 946         */
 947        spin_lock_irqsave(&wqe->lock, flags);
 948        wq_list_for_each(node, prev, &wqe->work_list) {
 949                work = container_of(node, struct io_wq_work, list);
 950
 951                if (match->fn(work, match->data)) {
 952                        wq_list_del(&wqe->work_list, node, prev);
 953                        found = true;
 954                        break;
 955                }
 956        }
 957        spin_unlock_irqrestore(&wqe->lock, flags);
 958
 959        if (found) {
 960                io_run_cancel(work, wqe);
 961                return IO_WQ_CANCEL_OK;
 962        }
 963
 964        /*
 965         * Now check if a free (going busy) or busy worker has the work
 966         * currently running. If we find it there, we'll return CANCEL_RUNNING
 967         * as an indication that we attempt to signal cancellation. The
 968         * completion will run normally in this case.
 969         */
 970        rcu_read_lock();
 971        found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
 972        rcu_read_unlock();
 973        return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
 974}
 975
 976enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
 977                                  void *data)
 978{
 979        struct io_cb_cancel_data match = {
 980                .fn     = cancel,
 981                .data   = data,
 982        };
 983        enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
 984        int node;
 985
 986        for_each_node(node) {
 987                struct io_wqe *wqe = wq->wqes[node];
 988
 989                ret = io_wqe_cancel_work(wqe, &match);
 990                if (ret != IO_WQ_CANCEL_NOTFOUND)
 991                        break;
 992        }
 993
 994        return ret;
 995}
 996
 997static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data)
 998{
 999        return work == data;
1000}
1001
1002enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
1003{
1004        return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork);
1005}
1006
1007static bool io_wq_pid_match(struct io_wq_work *work, void *data)
1008{
1009        pid_t pid = (pid_t) (unsigned long) data;
1010
1011        return work->task_pid == pid;
1012}
1013
1014enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid)
1015{
1016        void *data = (void *) (unsigned long) pid;
1017
1018        return io_wq_cancel_cb(wq, io_wq_pid_match, data);
1019}
1020
1021struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1022{
1023        int ret = -ENOMEM, node;
1024        struct io_wq *wq;
1025
1026        if (WARN_ON_ONCE(!data->free_work))
1027                return ERR_PTR(-EINVAL);
1028
1029        wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1030        if (!wq)
1031                return ERR_PTR(-ENOMEM);
1032
1033        wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
1034        if (!wq->wqes) {
1035                kfree(wq);
1036                return ERR_PTR(-ENOMEM);
1037        }
1038
1039        wq->free_work = data->free_work;
1040
1041        /* caller must already hold a reference to this */
1042        wq->user = data->user;
1043
1044        for_each_node(node) {
1045                struct io_wqe *wqe;
1046                int alloc_node = node;
1047
1048                if (!node_online(alloc_node))
1049                        alloc_node = NUMA_NO_NODE;
1050                wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1051                if (!wqe)
1052                        goto err;
1053                wq->wqes[node] = wqe;
1054                wqe->node = alloc_node;
1055                wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1056                atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
1057                if (wq->user) {
1058                        wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1059                                        task_rlimit(current, RLIMIT_NPROC);
1060                }
1061                atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
1062                wqe->wq = wq;
1063                spin_lock_init(&wqe->lock);
1064                INIT_WQ_LIST(&wqe->work_list);
1065                INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1066                INIT_LIST_HEAD(&wqe->all_list);
1067        }
1068
1069        init_completion(&wq->done);
1070
1071        wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
1072        if (!IS_ERR(wq->manager)) {
1073                wake_up_process(wq->manager);
1074                wait_for_completion(&wq->done);
1075                if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
1076                        ret = -ENOMEM;
1077                        goto err;
1078                }
1079                refcount_set(&wq->use_refs, 1);
1080                reinit_completion(&wq->done);
1081                return wq;
1082        }
1083
1084        ret = PTR_ERR(wq->manager);
1085        complete(&wq->done);
1086err:
1087        for_each_node(node)
1088                kfree(wq->wqes[node]);
1089        kfree(wq->wqes);
1090        kfree(wq);
1091        return ERR_PTR(ret);
1092}
1093
1094bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
1095{
1096        if (data->free_work != wq->free_work)
1097                return false;
1098
1099        return refcount_inc_not_zero(&wq->use_refs);
1100}
1101
1102static bool io_wq_worker_wake(struct io_worker *worker, void *data)
1103{
1104        wake_up_process(worker->task);
1105        return false;
1106}
1107
1108static void __io_wq_destroy(struct io_wq *wq)
1109{
1110        int node;
1111
1112        set_bit(IO_WQ_BIT_EXIT, &wq->state);
1113        if (wq->manager)
1114                kthread_stop(wq->manager);
1115
1116        rcu_read_lock();
1117        for_each_node(node)
1118                io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
1119        rcu_read_unlock();
1120
1121        wait_for_completion(&wq->done);
1122
1123        for_each_node(node)
1124                kfree(wq->wqes[node]);
1125        kfree(wq->wqes);
1126        kfree(wq);
1127}
1128
1129void io_wq_destroy(struct io_wq *wq)
1130{
1131        if (refcount_dec_and_test(&wq->use_refs))
1132                __io_wq_destroy(wq);
1133}
1134
1135struct task_struct *io_wq_get_task(struct io_wq *wq)
1136{
1137        return wq->manager;
1138}
1139