linux/fs/io-wq.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Basic worker thread pool for io_uring
   4 *
   5 * Copyright (C) 2019 Jens Axboe
   6 *
   7 */
   8#include <linux/kernel.h>
   9#include <linux/init.h>
  10#include <linux/errno.h>
  11#include <linux/sched/signal.h>
  12#include <linux/percpu.h>
  13#include <linux/slab.h>
  14#include <linux/rculist_nulls.h>
  15#include <linux/cpu.h>
  16#include <linux/tracehook.h>
  17#include <uapi/linux/io_uring.h>
  18
  19#include "io-wq.h"
  20
  21#define WORKER_IDLE_TIMEOUT     (5 * HZ)
  22
  23enum {
  24        IO_WORKER_F_UP          = 1,    /* up and active */
  25        IO_WORKER_F_RUNNING     = 2,    /* account as running */
  26        IO_WORKER_F_FREE        = 4,    /* worker on free list */
  27        IO_WORKER_F_BOUND       = 8,    /* is doing bounded work */
  28};
  29
  30enum {
  31        IO_WQ_BIT_EXIT          = 0,    /* wq exiting */
  32};
  33
  34enum {
  35        IO_ACCT_STALLED_BIT     = 0,    /* stalled on hash */
  36};
  37
  38/*
  39 * One for each thread in a wqe pool
  40 */
  41struct io_worker {
  42        refcount_t ref;
  43        unsigned flags;
  44        struct hlist_nulls_node nulls_node;
  45        struct list_head all_list;
  46        struct task_struct *task;
  47        struct io_wqe *wqe;
  48
  49        struct io_wq_work *cur_work;
  50        spinlock_t lock;
  51
  52        struct completion ref_done;
  53
  54        unsigned long create_state;
  55        struct callback_head create_work;
  56        int create_index;
  57
  58        union {
  59                struct rcu_head rcu;
  60                struct work_struct work;
  61        };
  62};
  63
  64#if BITS_PER_LONG == 64
  65#define IO_WQ_HASH_ORDER        6
  66#else
  67#define IO_WQ_HASH_ORDER        5
  68#endif
  69
  70#define IO_WQ_NR_HASH_BUCKETS   (1u << IO_WQ_HASH_ORDER)
  71
  72struct io_wqe_acct {
  73        unsigned nr_workers;
  74        unsigned max_workers;
  75        int index;
  76        atomic_t nr_running;
  77        struct io_wq_work_list work_list;
  78        unsigned long flags;
  79};
  80
  81enum {
  82        IO_WQ_ACCT_BOUND,
  83        IO_WQ_ACCT_UNBOUND,
  84        IO_WQ_ACCT_NR,
  85};
  86
  87/*
  88 * Per-node worker thread pool
  89 */
  90struct io_wqe {
  91        raw_spinlock_t lock;
  92        struct io_wqe_acct acct[2];
  93
  94        int node;
  95
  96        struct hlist_nulls_head free_list;
  97        struct list_head all_list;
  98
  99        struct wait_queue_entry wait;
 100
 101        struct io_wq *wq;
 102        struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
 103
 104        cpumask_var_t cpu_mask;
 105};
 106
 107/*
 108 * Per io_wq state
 109  */
 110struct io_wq {
 111        unsigned long state;
 112
 113        free_work_fn *free_work;
 114        io_wq_work_fn *do_work;
 115
 116        struct io_wq_hash *hash;
 117
 118        atomic_t worker_refs;
 119        struct completion worker_done;
 120
 121        struct hlist_node cpuhp_node;
 122
 123        struct task_struct *task;
 124
 125        struct io_wqe *wqes[];
 126};
 127
 128static enum cpuhp_state io_wq_online;
 129
 130struct io_cb_cancel_data {
 131        work_cancel_fn *fn;
 132        void *data;
 133        int nr_running;
 134        int nr_pending;
 135        bool cancel_all;
 136};
 137
 138static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
 139static void io_wqe_dec_running(struct io_worker *worker);
 140static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
 141                                        struct io_wqe_acct *acct,
 142                                        struct io_cb_cancel_data *match);
 143
 144static bool io_worker_get(struct io_worker *worker)
 145{
 146        return refcount_inc_not_zero(&worker->ref);
 147}
 148
 149static void io_worker_release(struct io_worker *worker)
 150{
 151        if (refcount_dec_and_test(&worker->ref))
 152                complete(&worker->ref_done);
 153}
 154
 155static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
 156{
 157        return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
 158}
 159
 160static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
 161                                                   struct io_wq_work *work)
 162{
 163        return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
 164}
 165
 166static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
 167{
 168        return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
 169}
 170
 171static void io_worker_ref_put(struct io_wq *wq)
 172{
 173        if (atomic_dec_and_test(&wq->worker_refs))
 174                complete(&wq->worker_done);
 175}
 176
 177static void io_worker_exit(struct io_worker *worker)
 178{
 179        struct io_wqe *wqe = worker->wqe;
 180
 181        if (refcount_dec_and_test(&worker->ref))
 182                complete(&worker->ref_done);
 183        wait_for_completion(&worker->ref_done);
 184
 185        raw_spin_lock(&wqe->lock);
 186        if (worker->flags & IO_WORKER_F_FREE)
 187                hlist_nulls_del_rcu(&worker->nulls_node);
 188        list_del_rcu(&worker->all_list);
 189        preempt_disable();
 190        io_wqe_dec_running(worker);
 191        worker->flags = 0;
 192        current->flags &= ~PF_IO_WORKER;
 193        preempt_enable();
 194        raw_spin_unlock(&wqe->lock);
 195
 196        kfree_rcu(worker, rcu);
 197        io_worker_ref_put(wqe->wq);
 198        do_exit(0);
 199}
 200
 201static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
 202{
 203        if (!wq_list_empty(&acct->work_list) &&
 204            !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
 205                return true;
 206        return false;
 207}
 208
 209/*
 210 * Check head of free list for an available worker. If one isn't available,
 211 * caller must create one.
 212 */
 213static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
 214                                        struct io_wqe_acct *acct)
 215        __must_hold(RCU)
 216{
 217        struct hlist_nulls_node *n;
 218        struct io_worker *worker;
 219
 220        /*
 221         * Iterate free_list and see if we can find an idle worker to
 222         * activate. If a given worker is on the free_list but in the process
 223         * of exiting, keep trying.
 224         */
 225        hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
 226                if (!io_worker_get(worker))
 227                        continue;
 228                if (io_wqe_get_acct(worker) != acct) {
 229                        io_worker_release(worker);
 230                        continue;
 231                }
 232                if (wake_up_process(worker->task)) {
 233                        io_worker_release(worker);
 234                        return true;
 235                }
 236                io_worker_release(worker);
 237        }
 238
 239        return false;
 240}
 241
 242/*
 243 * We need a worker. If we find a free one, we're good. If not, and we're
 244 * below the max number of workers, create one.
 245 */
 246static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 247{
 248        /*
 249         * Most likely an attempt to queue unbounded work on an io_wq that
 250         * wasn't setup with any unbounded workers.
 251         */
 252        if (unlikely(!acct->max_workers))
 253                pr_warn_once("io-wq is not configured for unbound workers");
 254
 255        raw_spin_lock(&wqe->lock);
 256        if (acct->nr_workers >= acct->max_workers) {
 257                raw_spin_unlock(&wqe->lock);
 258                return true;
 259        }
 260        acct->nr_workers++;
 261        raw_spin_unlock(&wqe->lock);
 262        atomic_inc(&acct->nr_running);
 263        atomic_inc(&wqe->wq->worker_refs);
 264        return create_io_worker(wqe->wq, wqe, acct->index);
 265}
 266
 267static void io_wqe_inc_running(struct io_worker *worker)
 268{
 269        struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 270
 271        atomic_inc(&acct->nr_running);
 272}
 273
 274static void create_worker_cb(struct callback_head *cb)
 275{
 276        struct io_worker *worker;
 277        struct io_wq *wq;
 278        struct io_wqe *wqe;
 279        struct io_wqe_acct *acct;
 280        bool do_create = false;
 281
 282        worker = container_of(cb, struct io_worker, create_work);
 283        wqe = worker->wqe;
 284        wq = wqe->wq;
 285        acct = &wqe->acct[worker->create_index];
 286        raw_spin_lock(&wqe->lock);
 287        if (acct->nr_workers < acct->max_workers) {
 288                acct->nr_workers++;
 289                do_create = true;
 290        }
 291        raw_spin_unlock(&wqe->lock);
 292        if (do_create) {
 293                create_io_worker(wq, wqe, worker->create_index);
 294        } else {
 295                atomic_dec(&acct->nr_running);
 296                io_worker_ref_put(wq);
 297        }
 298        clear_bit_unlock(0, &worker->create_state);
 299        io_worker_release(worker);
 300}
 301
 302static bool io_queue_worker_create(struct io_worker *worker,
 303                                   struct io_wqe_acct *acct,
 304                                   task_work_func_t func)
 305{
 306        struct io_wqe *wqe = worker->wqe;
 307        struct io_wq *wq = wqe->wq;
 308
 309        /* raced with exit, just ignore create call */
 310        if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
 311                goto fail;
 312        if (!io_worker_get(worker))
 313                goto fail;
 314        /*
 315         * create_state manages ownership of create_work/index. We should
 316         * only need one entry per worker, as the worker going to sleep
 317         * will trigger the condition, and waking will clear it once it
 318         * runs the task_work.
 319         */
 320        if (test_bit(0, &worker->create_state) ||
 321            test_and_set_bit_lock(0, &worker->create_state))
 322                goto fail_release;
 323
 324        init_task_work(&worker->create_work, func);
 325        worker->create_index = acct->index;
 326        if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
 327                return true;
 328        clear_bit_unlock(0, &worker->create_state);
 329fail_release:
 330        io_worker_release(worker);
 331fail:
 332        atomic_dec(&acct->nr_running);
 333        io_worker_ref_put(wq);
 334        return false;
 335}
 336
 337static void io_wqe_dec_running(struct io_worker *worker)
 338        __must_hold(wqe->lock)
 339{
 340        struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 341        struct io_wqe *wqe = worker->wqe;
 342
 343        if (!(worker->flags & IO_WORKER_F_UP))
 344                return;
 345
 346        if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
 347                atomic_inc(&acct->nr_running);
 348                atomic_inc(&wqe->wq->worker_refs);
 349                io_queue_worker_create(worker, acct, create_worker_cb);
 350        }
 351}
 352
 353/*
 354 * Worker will start processing some work. Move it to the busy list, if
 355 * it's currently on the freelist
 356 */
 357static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
 358                             struct io_wq_work *work)
 359        __must_hold(wqe->lock)
 360{
 361        if (worker->flags & IO_WORKER_F_FREE) {
 362                worker->flags &= ~IO_WORKER_F_FREE;
 363                hlist_nulls_del_init_rcu(&worker->nulls_node);
 364        }
 365}
 366
 367/*
 368 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 369 * have one attached. Dropping the mm may potentially sleep, so we drop
 370 * the lock in that case and return success. Since the caller has to
 371 * retry the loop in that case (we changed task state), we don't regrab
 372 * the lock if we return success.
 373 */
 374static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
 375        __must_hold(wqe->lock)
 376{
 377        if (!(worker->flags & IO_WORKER_F_FREE)) {
 378                worker->flags |= IO_WORKER_F_FREE;
 379                hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
 380        }
 381}
 382
 383static inline unsigned int io_get_work_hash(struct io_wq_work *work)
 384{
 385        return work->flags >> IO_WQ_HASH_SHIFT;
 386}
 387
 388static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
 389{
 390        struct io_wq *wq = wqe->wq;
 391
 392        spin_lock_irq(&wq->hash->wait.lock);
 393        if (list_empty(&wqe->wait.entry)) {
 394                __add_wait_queue(&wq->hash->wait, &wqe->wait);
 395                if (!test_bit(hash, &wq->hash->map)) {
 396                        __set_current_state(TASK_RUNNING);
 397                        list_del_init(&wqe->wait.entry);
 398                }
 399        }
 400        spin_unlock_irq(&wq->hash->wait.lock);
 401}
 402
 403static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
 404                                           struct io_worker *worker)
 405        __must_hold(wqe->lock)
 406{
 407        struct io_wq_work_node *node, *prev;
 408        struct io_wq_work *work, *tail;
 409        unsigned int stall_hash = -1U;
 410        struct io_wqe *wqe = worker->wqe;
 411
 412        wq_list_for_each(node, prev, &acct->work_list) {
 413                unsigned int hash;
 414
 415                work = container_of(node, struct io_wq_work, list);
 416
 417                /* not hashed, can run anytime */
 418                if (!io_wq_is_hashed(work)) {
 419                        wq_list_del(&acct->work_list, node, prev);
 420                        return work;
 421                }
 422
 423                hash = io_get_work_hash(work);
 424                /* all items with this hash lie in [work, tail] */
 425                tail = wqe->hash_tail[hash];
 426
 427                /* hashed, can run if not already running */
 428                if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
 429                        wqe->hash_tail[hash] = NULL;
 430                        wq_list_cut(&acct->work_list, &tail->list, prev);
 431                        return work;
 432                }
 433                if (stall_hash == -1U)
 434                        stall_hash = hash;
 435                /* fast forward to a next hash, for-each will fix up @prev */
 436                node = &tail->list;
 437        }
 438
 439        if (stall_hash != -1U) {
 440                /*
 441                 * Set this before dropping the lock to avoid racing with new
 442                 * work being added and clearing the stalled bit.
 443                 */
 444                set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 445                raw_spin_unlock(&wqe->lock);
 446                io_wait_on_hash(wqe, stall_hash);
 447                raw_spin_lock(&wqe->lock);
 448        }
 449
 450        return NULL;
 451}
 452
 453static bool io_flush_signals(void)
 454{
 455        if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
 456                __set_current_state(TASK_RUNNING);
 457                tracehook_notify_signal();
 458                return true;
 459        }
 460        return false;
 461}
 462
 463static void io_assign_current_work(struct io_worker *worker,
 464                                   struct io_wq_work *work)
 465{
 466        if (work) {
 467                io_flush_signals();
 468                cond_resched();
 469        }
 470
 471        spin_lock(&worker->lock);
 472        worker->cur_work = work;
 473        spin_unlock(&worker->lock);
 474}
 475
 476static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
 477
 478static void io_worker_handle_work(struct io_worker *worker)
 479        __releases(wqe->lock)
 480{
 481        struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 482        struct io_wqe *wqe = worker->wqe;
 483        struct io_wq *wq = wqe->wq;
 484        bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
 485
 486        do {
 487                struct io_wq_work *work;
 488get_next:
 489                /*
 490                 * If we got some work, mark us as busy. If we didn't, but
 491                 * the list isn't empty, it means we stalled on hashed work.
 492                 * Mark us stalled so we don't keep looking for work when we
 493                 * can't make progress, any work completion or insertion will
 494                 * clear the stalled flag.
 495                 */
 496                work = io_get_next_work(acct, worker);
 497                if (work)
 498                        __io_worker_busy(wqe, worker, work);
 499
 500                raw_spin_unlock(&wqe->lock);
 501                if (!work)
 502                        break;
 503                io_assign_current_work(worker, work);
 504                __set_current_state(TASK_RUNNING);
 505
 506                /* handle a whole dependent link */
 507                do {
 508                        struct io_wq_work *next_hashed, *linked;
 509                        unsigned int hash = io_get_work_hash(work);
 510
 511                        next_hashed = wq_next_work(work);
 512
 513                        if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
 514                                work->flags |= IO_WQ_WORK_CANCEL;
 515                        wq->do_work(work);
 516                        io_assign_current_work(worker, NULL);
 517
 518                        linked = wq->free_work(work);
 519                        work = next_hashed;
 520                        if (!work && linked && !io_wq_is_hashed(linked)) {
 521                                work = linked;
 522                                linked = NULL;
 523                        }
 524                        io_assign_current_work(worker, work);
 525                        if (linked)
 526                                io_wqe_enqueue(wqe, linked);
 527
 528                        if (hash != -1U && !next_hashed) {
 529                                clear_bit(hash, &wq->hash->map);
 530                                clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 531                                if (wq_has_sleeper(&wq->hash->wait))
 532                                        wake_up(&wq->hash->wait);
 533                                raw_spin_lock(&wqe->lock);
 534                                /* skip unnecessary unlock-lock wqe->lock */
 535                                if (!work)
 536                                        goto get_next;
 537                                raw_spin_unlock(&wqe->lock);
 538                        }
 539                } while (work);
 540
 541                raw_spin_lock(&wqe->lock);
 542        } while (1);
 543}
 544
 545static int io_wqe_worker(void *data)
 546{
 547        struct io_worker *worker = data;
 548        struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 549        struct io_wqe *wqe = worker->wqe;
 550        struct io_wq *wq = wqe->wq;
 551        bool last_timeout = false;
 552        char buf[TASK_COMM_LEN];
 553
 554        worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
 555
 556        snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
 557        set_task_comm(current, buf);
 558
 559        while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 560                long ret;
 561
 562                set_current_state(TASK_INTERRUPTIBLE);
 563loop:
 564                raw_spin_lock(&wqe->lock);
 565                if (io_acct_run_queue(acct)) {
 566                        io_worker_handle_work(worker);
 567                        goto loop;
 568                }
 569                /* timed out, exit unless we're the last worker */
 570                if (last_timeout && acct->nr_workers > 1) {
 571                        acct->nr_workers--;
 572                        raw_spin_unlock(&wqe->lock);
 573                        __set_current_state(TASK_RUNNING);
 574                        break;
 575                }
 576                last_timeout = false;
 577                __io_worker_idle(wqe, worker);
 578                raw_spin_unlock(&wqe->lock);
 579                if (io_flush_signals())
 580                        continue;
 581                ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
 582                if (signal_pending(current)) {
 583                        struct ksignal ksig;
 584
 585                        if (!get_signal(&ksig))
 586                                continue;
 587                        break;
 588                }
 589                last_timeout = !ret;
 590        }
 591
 592        if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 593                raw_spin_lock(&wqe->lock);
 594                io_worker_handle_work(worker);
 595        }
 596
 597        io_worker_exit(worker);
 598        return 0;
 599}
 600
 601/*
 602 * Called when a worker is scheduled in. Mark us as currently running.
 603 */
 604void io_wq_worker_running(struct task_struct *tsk)
 605{
 606        struct io_worker *worker = tsk->pf_io_worker;
 607
 608        if (!worker)
 609                return;
 610        if (!(worker->flags & IO_WORKER_F_UP))
 611                return;
 612        if (worker->flags & IO_WORKER_F_RUNNING)
 613                return;
 614        worker->flags |= IO_WORKER_F_RUNNING;
 615        io_wqe_inc_running(worker);
 616}
 617
 618/*
 619 * Called when worker is going to sleep. If there are no workers currently
 620 * running and we have work pending, wake up a free one or create a new one.
 621 */
 622void io_wq_worker_sleeping(struct task_struct *tsk)
 623{
 624        struct io_worker *worker = tsk->pf_io_worker;
 625
 626        if (!worker)
 627                return;
 628        if (!(worker->flags & IO_WORKER_F_UP))
 629                return;
 630        if (!(worker->flags & IO_WORKER_F_RUNNING))
 631                return;
 632
 633        worker->flags &= ~IO_WORKER_F_RUNNING;
 634
 635        raw_spin_lock(&worker->wqe->lock);
 636        io_wqe_dec_running(worker);
 637        raw_spin_unlock(&worker->wqe->lock);
 638}
 639
 640static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
 641                               struct task_struct *tsk)
 642{
 643        tsk->pf_io_worker = worker;
 644        worker->task = tsk;
 645        set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
 646        tsk->flags |= PF_NO_SETAFFINITY;
 647
 648        raw_spin_lock(&wqe->lock);
 649        hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
 650        list_add_tail_rcu(&worker->all_list, &wqe->all_list);
 651        worker->flags |= IO_WORKER_F_FREE;
 652        raw_spin_unlock(&wqe->lock);
 653        wake_up_new_task(tsk);
 654}
 655
 656static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
 657{
 658        return true;
 659}
 660
 661static inline bool io_should_retry_thread(long err)
 662{
 663        switch (err) {
 664        case -EAGAIN:
 665        case -ERESTARTSYS:
 666        case -ERESTARTNOINTR:
 667        case -ERESTARTNOHAND:
 668                return true;
 669        default:
 670                return false;
 671        }
 672}
 673
 674static void create_worker_cont(struct callback_head *cb)
 675{
 676        struct io_worker *worker;
 677        struct task_struct *tsk;
 678        struct io_wqe *wqe;
 679
 680        worker = container_of(cb, struct io_worker, create_work);
 681        clear_bit_unlock(0, &worker->create_state);
 682        wqe = worker->wqe;
 683        tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
 684        if (!IS_ERR(tsk)) {
 685                io_init_new_worker(wqe, worker, tsk);
 686                io_worker_release(worker);
 687                return;
 688        } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
 689                struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 690
 691                atomic_dec(&acct->nr_running);
 692                raw_spin_lock(&wqe->lock);
 693                acct->nr_workers--;
 694                if (!acct->nr_workers) {
 695                        struct io_cb_cancel_data match = {
 696                                .fn             = io_wq_work_match_all,
 697                                .cancel_all     = true,
 698                        };
 699
 700                        while (io_acct_cancel_pending_work(wqe, acct, &match))
 701                                raw_spin_lock(&wqe->lock);
 702                }
 703                raw_spin_unlock(&wqe->lock);
 704                io_worker_ref_put(wqe->wq);
 705                kfree(worker);
 706                return;
 707        }
 708
 709        /* re-create attempts grab a new worker ref, drop the existing one */
 710        io_worker_release(worker);
 711        schedule_work(&worker->work);
 712}
 713
 714static void io_workqueue_create(struct work_struct *work)
 715{
 716        struct io_worker *worker = container_of(work, struct io_worker, work);
 717        struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 718
 719        if (!io_queue_worker_create(worker, acct, create_worker_cont)) {
 720                clear_bit_unlock(0, &worker->create_state);
 721                io_worker_release(worker);
 722                kfree(worker);
 723        }
 724}
 725
 726static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 727{
 728        struct io_wqe_acct *acct = &wqe->acct[index];
 729        struct io_worker *worker;
 730        struct task_struct *tsk;
 731
 732        __set_current_state(TASK_RUNNING);
 733
 734        worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
 735        if (!worker) {
 736fail:
 737                atomic_dec(&acct->nr_running);
 738                raw_spin_lock(&wqe->lock);
 739                acct->nr_workers--;
 740                raw_spin_unlock(&wqe->lock);
 741                io_worker_ref_put(wq);
 742                return false;
 743        }
 744
 745        refcount_set(&worker->ref, 1);
 746        worker->wqe = wqe;
 747        spin_lock_init(&worker->lock);
 748        init_completion(&worker->ref_done);
 749
 750        if (index == IO_WQ_ACCT_BOUND)
 751                worker->flags |= IO_WORKER_F_BOUND;
 752
 753        tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
 754        if (!IS_ERR(tsk)) {
 755                io_init_new_worker(wqe, worker, tsk);
 756        } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
 757                kfree(worker);
 758                goto fail;
 759        } else {
 760                INIT_WORK(&worker->work, io_workqueue_create);
 761                schedule_work(&worker->work);
 762        }
 763
 764        return true;
 765}
 766
 767/*
 768 * Iterate the passed in list and call the specific function for each
 769 * worker that isn't exiting
 770 */
 771static bool io_wq_for_each_worker(struct io_wqe *wqe,
 772                                  bool (*func)(struct io_worker *, void *),
 773                                  void *data)
 774{
 775        struct io_worker *worker;
 776        bool ret = false;
 777
 778        list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
 779                if (io_worker_get(worker)) {
 780                        /* no task if node is/was offline */
 781                        if (worker->task)
 782                                ret = func(worker, data);
 783                        io_worker_release(worker);
 784                        if (ret)
 785                                break;
 786                }
 787        }
 788
 789        return ret;
 790}
 791
 792static bool io_wq_worker_wake(struct io_worker *worker, void *data)
 793{
 794        set_notify_signal(worker->task);
 795        wake_up_process(worker->task);
 796        return false;
 797}
 798
 799static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 800{
 801        struct io_wq *wq = wqe->wq;
 802
 803        do {
 804                work->flags |= IO_WQ_WORK_CANCEL;
 805                wq->do_work(work);
 806                work = wq->free_work(work);
 807        } while (work);
 808}
 809
 810static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
 811{
 812        struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 813        unsigned int hash;
 814        struct io_wq_work *tail;
 815
 816        if (!io_wq_is_hashed(work)) {
 817append:
 818                wq_list_add_tail(&work->list, &acct->work_list);
 819                return;
 820        }
 821
 822        hash = io_get_work_hash(work);
 823        tail = wqe->hash_tail[hash];
 824        wqe->hash_tail[hash] = work;
 825        if (!tail)
 826                goto append;
 827
 828        wq_list_add_after(&work->list, &tail->list, &acct->work_list);
 829}
 830
 831static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
 832{
 833        return work == data;
 834}
 835
 836static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 837{
 838        struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 839        unsigned work_flags = work->flags;
 840        bool do_create;
 841
 842        /*
 843         * If io-wq is exiting for this task, or if the request has explicitly
 844         * been marked as one that should not get executed, cancel it here.
 845         */
 846        if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
 847            (work->flags & IO_WQ_WORK_CANCEL)) {
 848                io_run_cancel(work, wqe);
 849                return;
 850        }
 851
 852        raw_spin_lock(&wqe->lock);
 853        io_wqe_insert_work(wqe, work);
 854        clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 855
 856        rcu_read_lock();
 857        do_create = !io_wqe_activate_free_worker(wqe, acct);
 858        rcu_read_unlock();
 859
 860        raw_spin_unlock(&wqe->lock);
 861
 862        if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
 863            !atomic_read(&acct->nr_running))) {
 864                bool did_create;
 865
 866                did_create = io_wqe_create_worker(wqe, acct);
 867                if (likely(did_create))
 868                        return;
 869
 870                raw_spin_lock(&wqe->lock);
 871                /* fatal condition, failed to create the first worker */
 872                if (!acct->nr_workers) {
 873                        struct io_cb_cancel_data match = {
 874                                .fn             = io_wq_work_match_item,
 875                                .data           = work,
 876                                .cancel_all     = false,
 877                        };
 878
 879                        if (io_acct_cancel_pending_work(wqe, acct, &match))
 880                                raw_spin_lock(&wqe->lock);
 881                }
 882                raw_spin_unlock(&wqe->lock);
 883        }
 884}
 885
 886void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
 887{
 888        struct io_wqe *wqe = wq->wqes[numa_node_id()];
 889
 890        io_wqe_enqueue(wqe, work);
 891}
 892
 893/*
 894 * Work items that hash to the same value will not be done in parallel.
 895 * Used to limit concurrent writes, generally hashed by inode.
 896 */
 897void io_wq_hash_work(struct io_wq_work *work, void *val)
 898{
 899        unsigned int bit;
 900
 901        bit = hash_ptr(val, IO_WQ_HASH_ORDER);
 902        work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
 903}
 904
 905static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 906{
 907        struct io_cb_cancel_data *match = data;
 908
 909        /*
 910         * Hold the lock to avoid ->cur_work going out of scope, caller
 911         * may dereference the passed in work.
 912         */
 913        spin_lock(&worker->lock);
 914        if (worker->cur_work &&
 915            match->fn(worker->cur_work, match->data)) {
 916                set_notify_signal(worker->task);
 917                match->nr_running++;
 918        }
 919        spin_unlock(&worker->lock);
 920
 921        return match->nr_running && !match->cancel_all;
 922}
 923
 924static inline void io_wqe_remove_pending(struct io_wqe *wqe,
 925                                         struct io_wq_work *work,
 926                                         struct io_wq_work_node *prev)
 927{
 928        struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 929        unsigned int hash = io_get_work_hash(work);
 930        struct io_wq_work *prev_work = NULL;
 931
 932        if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
 933                if (prev)
 934                        prev_work = container_of(prev, struct io_wq_work, list);
 935                if (prev_work && io_get_work_hash(prev_work) == hash)
 936                        wqe->hash_tail[hash] = prev_work;
 937                else
 938                        wqe->hash_tail[hash] = NULL;
 939        }
 940        wq_list_del(&acct->work_list, &work->list, prev);
 941}
 942
 943static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
 944                                        struct io_wqe_acct *acct,
 945                                        struct io_cb_cancel_data *match)
 946        __releases(wqe->lock)
 947{
 948        struct io_wq_work_node *node, *prev;
 949        struct io_wq_work *work;
 950
 951        wq_list_for_each(node, prev, &acct->work_list) {
 952                work = container_of(node, struct io_wq_work, list);
 953                if (!match->fn(work, match->data))
 954                        continue;
 955                io_wqe_remove_pending(wqe, work, prev);
 956                raw_spin_unlock(&wqe->lock);
 957                io_run_cancel(work, wqe);
 958                match->nr_pending++;
 959                /* not safe to continue after unlock */
 960                return true;
 961        }
 962
 963        return false;
 964}
 965
 966static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
 967                                       struct io_cb_cancel_data *match)
 968{
 969        int i;
 970retry:
 971        raw_spin_lock(&wqe->lock);
 972        for (i = 0; i < IO_WQ_ACCT_NR; i++) {
 973                struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
 974
 975                if (io_acct_cancel_pending_work(wqe, acct, match)) {
 976                        if (match->cancel_all)
 977                                goto retry;
 978                        return;
 979                }
 980        }
 981        raw_spin_unlock(&wqe->lock);
 982}
 983
 984static void io_wqe_cancel_running_work(struct io_wqe *wqe,
 985                                       struct io_cb_cancel_data *match)
 986{
 987        rcu_read_lock();
 988        io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
 989        rcu_read_unlock();
 990}
 991
 992enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
 993                                  void *data, bool cancel_all)
 994{
 995        struct io_cb_cancel_data match = {
 996                .fn             = cancel,
 997                .data           = data,
 998                .cancel_all     = cancel_all,
 999        };
1000        int node;
1001
1002        /*
1003         * First check pending list, if we're lucky we can just remove it
1004         * from there. CANCEL_OK means that the work is returned as-new,
1005         * no completion will be posted for it.
1006         */
1007        for_each_node(node) {
1008                struct io_wqe *wqe = wq->wqes[node];
1009
1010                io_wqe_cancel_pending_work(wqe, &match);
1011                if (match.nr_pending && !match.cancel_all)
1012                        return IO_WQ_CANCEL_OK;
1013        }
1014
1015        /*
1016         * Now check if a free (going busy) or busy worker has the work
1017         * currently running. If we find it there, we'll return CANCEL_RUNNING
1018         * as an indication that we attempt to signal cancellation. The
1019         * completion will run normally in this case.
1020         */
1021        for_each_node(node) {
1022                struct io_wqe *wqe = wq->wqes[node];
1023
1024                io_wqe_cancel_running_work(wqe, &match);
1025                if (match.nr_running && !match.cancel_all)
1026                        return IO_WQ_CANCEL_RUNNING;
1027        }
1028
1029        if (match.nr_running)
1030                return IO_WQ_CANCEL_RUNNING;
1031        if (match.nr_pending)
1032                return IO_WQ_CANCEL_OK;
1033        return IO_WQ_CANCEL_NOTFOUND;
1034}
1035
1036static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1037                            int sync, void *key)
1038{
1039        struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1040        int i;
1041
1042        list_del_init(&wait->entry);
1043
1044        rcu_read_lock();
1045        for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1046                struct io_wqe_acct *acct = &wqe->acct[i];
1047
1048                if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1049                        io_wqe_activate_free_worker(wqe, acct);
1050        }
1051        rcu_read_unlock();
1052        return 1;
1053}
1054
1055struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1056{
1057        int ret, node, i;
1058        struct io_wq *wq;
1059
1060        if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1061                return ERR_PTR(-EINVAL);
1062        if (WARN_ON_ONCE(!bounded))
1063                return ERR_PTR(-EINVAL);
1064
1065        wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1066        if (!wq)
1067                return ERR_PTR(-ENOMEM);
1068        ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1069        if (ret)
1070                goto err_wq;
1071
1072        refcount_inc(&data->hash->refs);
1073        wq->hash = data->hash;
1074        wq->free_work = data->free_work;
1075        wq->do_work = data->do_work;
1076
1077        ret = -ENOMEM;
1078        for_each_node(node) {
1079                struct io_wqe *wqe;
1080                int alloc_node = node;
1081
1082                if (!node_online(alloc_node))
1083                        alloc_node = NUMA_NO_NODE;
1084                wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1085                if (!wqe)
1086                        goto err;
1087                if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
1088                        goto err;
1089                cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
1090                wq->wqes[node] = wqe;
1091                wqe->node = alloc_node;
1092                wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1093                wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1094                                        task_rlimit(current, RLIMIT_NPROC);
1095                INIT_LIST_HEAD(&wqe->wait.entry);
1096                wqe->wait.func = io_wqe_hash_wake;
1097                for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1098                        struct io_wqe_acct *acct = &wqe->acct[i];
1099
1100                        acct->index = i;
1101                        atomic_set(&acct->nr_running, 0);
1102                        INIT_WQ_LIST(&acct->work_list);
1103                }
1104                wqe->wq = wq;
1105                raw_spin_lock_init(&wqe->lock);
1106                INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1107                INIT_LIST_HEAD(&wqe->all_list);
1108        }
1109
1110        wq->task = get_task_struct(data->task);
1111        atomic_set(&wq->worker_refs, 1);
1112        init_completion(&wq->worker_done);
1113        return wq;
1114err:
1115        io_wq_put_hash(data->hash);
1116        cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1117        for_each_node(node) {
1118                if (!wq->wqes[node])
1119                        continue;
1120                free_cpumask_var(wq->wqes[node]->cpu_mask);
1121                kfree(wq->wqes[node]);
1122        }
1123err_wq:
1124        kfree(wq);
1125        return ERR_PTR(ret);
1126}
1127
1128static bool io_task_work_match(struct callback_head *cb, void *data)
1129{
1130        struct io_worker *worker;
1131
1132        if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1133                return false;
1134        worker = container_of(cb, struct io_worker, create_work);
1135        return worker->wqe->wq == data;
1136}
1137
1138void io_wq_exit_start(struct io_wq *wq)
1139{
1140        set_bit(IO_WQ_BIT_EXIT, &wq->state);
1141}
1142
1143static void io_wq_exit_workers(struct io_wq *wq)
1144{
1145        struct callback_head *cb;
1146        int node;
1147
1148        if (!wq->task)
1149                return;
1150
1151        while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1152                struct io_worker *worker;
1153                struct io_wqe_acct *acct;
1154
1155                worker = container_of(cb, struct io_worker, create_work);
1156                acct = io_wqe_get_acct(worker);
1157                atomic_dec(&acct->nr_running);
1158                raw_spin_lock(&worker->wqe->lock);
1159                acct->nr_workers--;
1160                raw_spin_unlock(&worker->wqe->lock);
1161                io_worker_ref_put(wq);
1162                clear_bit_unlock(0, &worker->create_state);
1163                io_worker_release(worker);
1164        }
1165
1166        rcu_read_lock();
1167        for_each_node(node) {
1168                struct io_wqe *wqe = wq->wqes[node];
1169
1170                io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1171        }
1172        rcu_read_unlock();
1173        io_worker_ref_put(wq);
1174        wait_for_completion(&wq->worker_done);
1175
1176        for_each_node(node) {
1177                spin_lock_irq(&wq->hash->wait.lock);
1178                list_del_init(&wq->wqes[node]->wait.entry);
1179                spin_unlock_irq(&wq->hash->wait.lock);
1180        }
1181        put_task_struct(wq->task);
1182        wq->task = NULL;
1183}
1184
1185static void io_wq_destroy(struct io_wq *wq)
1186{
1187        int node;
1188
1189        cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1190
1191        for_each_node(node) {
1192                struct io_wqe *wqe = wq->wqes[node];
1193                struct io_cb_cancel_data match = {
1194                        .fn             = io_wq_work_match_all,
1195                        .cancel_all     = true,
1196                };
1197                io_wqe_cancel_pending_work(wqe, &match);
1198                free_cpumask_var(wqe->cpu_mask);
1199                kfree(wqe);
1200        }
1201        io_wq_put_hash(wq->hash);
1202        kfree(wq);
1203}
1204
1205void io_wq_put_and_exit(struct io_wq *wq)
1206{
1207        WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1208
1209        io_wq_exit_workers(wq);
1210        io_wq_destroy(wq);
1211}
1212
1213struct online_data {
1214        unsigned int cpu;
1215        bool online;
1216};
1217
1218static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1219{
1220        struct online_data *od = data;
1221
1222        if (od->online)
1223                cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
1224        else
1225                cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1226        return false;
1227}
1228
1229static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1230{
1231        struct online_data od = {
1232                .cpu = cpu,
1233                .online = online
1234        };
1235        int i;
1236
1237        rcu_read_lock();
1238        for_each_node(i)
1239                io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1240        rcu_read_unlock();
1241        return 0;
1242}
1243
1244static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1245{
1246        struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1247
1248        return __io_wq_cpu_online(wq, cpu, true);
1249}
1250
1251static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1252{
1253        struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1254
1255        return __io_wq_cpu_online(wq, cpu, false);
1256}
1257
1258int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
1259{
1260        int i;
1261
1262        rcu_read_lock();
1263        for_each_node(i) {
1264                struct io_wqe *wqe = wq->wqes[i];
1265
1266                if (mask)
1267                        cpumask_copy(wqe->cpu_mask, mask);
1268                else
1269                        cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
1270        }
1271        rcu_read_unlock();
1272        return 0;
1273}
1274
1275/*
1276 * Set max number of unbounded workers, returns old value. If new_count is 0,
1277 * then just return the old value.
1278 */
1279int io_wq_max_workers(struct io_wq *wq, int *new_count)
1280{
1281        int i, node, prev = 0;
1282
1283        BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
1284        BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1285        BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);
1286
1287        for (i = 0; i < 2; i++) {
1288                if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1289                        new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1290        }
1291
1292        rcu_read_lock();
1293        for_each_node(node) {
1294                struct io_wqe *wqe = wq->wqes[node];
1295                struct io_wqe_acct *acct;
1296
1297                raw_spin_lock(&wqe->lock);
1298                for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1299                        acct = &wqe->acct[i];
1300                        prev = max_t(int, acct->max_workers, prev);
1301                        if (new_count[i])
1302                                acct->max_workers = new_count[i];
1303                        new_count[i] = prev;
1304                }
1305                raw_spin_unlock(&wqe->lock);
1306        }
1307        rcu_read_unlock();
1308        return 0;
1309}
1310
1311static __init int io_wq_init(void)
1312{
1313        int ret;
1314
1315        ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1316                                        io_wq_cpu_online, io_wq_cpu_offline);
1317        if (ret < 0)
1318                return ret;
1319        io_wq_online = ret;
1320        return 0;
1321}
1322subsys_initcall(io_wq_init);
1323