linux/fs/btrfs/async-thread.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2007 Oracle.  All rights reserved.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public
   6 * License v2 as published by the Free Software Foundation.
   7 *
   8 * This program is distributed in the hope that it will be useful,
   9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  11 * General Public License for more details.
  12 *
  13 * You should have received a copy of the GNU General Public
  14 * License along with this program; if not, write to the
  15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  16 * Boston, MA 021110-1307, USA.
  17 */
  18
  19#include <linux/kthread.h>
  20#include <linux/slab.h>
  21#include <linux/list.h>
  22#include <linux/spinlock.h>
  23#include <linux/freezer.h>
  24#include "async-thread.h"
  25
  26#define WORK_QUEUED_BIT 0
  27#define WORK_DONE_BIT 1
  28#define WORK_ORDER_DONE_BIT 2
  29#define WORK_HIGH_PRIO_BIT 3
  30
  31/*
  32 * container for the kthread task pointer and the list of pending work
  33 * One of these is allocated per thread.
  34 */
  35struct btrfs_worker_thread {
  36        /* pool we belong to */
  37        struct btrfs_workers *workers;
  38
  39        /* list of struct btrfs_work that are waiting for service */
  40        struct list_head pending;
  41        struct list_head prio_pending;
  42
  43        /* list of worker threads from struct btrfs_workers */
  44        struct list_head worker_list;
  45
  46        /* kthread */
  47        struct task_struct *task;
  48
  49        /* number of things on the pending list */
  50        atomic_t num_pending;
  51
  52        /* reference counter for this struct */
  53        atomic_t refs;
  54
  55        unsigned long sequence;
  56
  57        /* protects the pending list. */
  58        spinlock_t lock;
  59
  60        /* set to non-zero when this thread is already awake and kicking */
  61        int working;
  62
  63        /* are we currently idle */
  64        int idle;
  65};
  66
  67static int __btrfs_start_workers(struct btrfs_workers *workers);
  68
  69/*
  70 * btrfs_start_workers uses kthread_run, which can block waiting for memory
  71 * for a very long time.  It will actually throttle on page writeback,
  72 * and so it may not make progress until after our btrfs worker threads
  73 * process all of the pending work structs in their queue
  74 *
  75 * This means we can't use btrfs_start_workers from inside a btrfs worker
  76 * thread that is used as part of cleaning dirty memory, which pretty much
  77 * involves all of the worker threads.
  78 *
  79 * Instead we have a helper queue who never has more than one thread
  80 * where we scheduler thread start operations.  This worker_start struct
  81 * is used to contain the work and hold a pointer to the queue that needs
  82 * another worker.
  83 */
  84struct worker_start {
  85        struct btrfs_work work;
  86        struct btrfs_workers *queue;
  87};
  88
  89static void start_new_worker_func(struct btrfs_work *work)
  90{
  91        struct worker_start *start;
  92        start = container_of(work, struct worker_start, work);
  93        __btrfs_start_workers(start->queue);
  94        kfree(start);
  95}
  96
  97/*
  98 * helper function to move a thread onto the idle list after it
  99 * has finished some requests.
 100 */
 101static void check_idle_worker(struct btrfs_worker_thread *worker)
 102{
 103        if (!worker->idle && atomic_read(&worker->num_pending) <
 104            worker->workers->idle_thresh / 2) {
 105                unsigned long flags;
 106                spin_lock_irqsave(&worker->workers->lock, flags);
 107                worker->idle = 1;
 108
 109                /* the list may be empty if the worker is just starting */
 110                if (!list_empty(&worker->worker_list)) {
 111                        list_move(&worker->worker_list,
 112                                 &worker->workers->idle_list);
 113                }
 114                spin_unlock_irqrestore(&worker->workers->lock, flags);
 115        }
 116}
 117
 118/*
 119 * helper function to move a thread off the idle list after new
 120 * pending work is added.
 121 */
 122static void check_busy_worker(struct btrfs_worker_thread *worker)
 123{
 124        if (worker->idle && atomic_read(&worker->num_pending) >=
 125            worker->workers->idle_thresh) {
 126                unsigned long flags;
 127                spin_lock_irqsave(&worker->workers->lock, flags);
 128                worker->idle = 0;
 129
 130                if (!list_empty(&worker->worker_list)) {
 131                        list_move_tail(&worker->worker_list,
 132                                      &worker->workers->worker_list);
 133                }
 134                spin_unlock_irqrestore(&worker->workers->lock, flags);
 135        }
 136}
 137
 138static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
 139{
 140        struct btrfs_workers *workers = worker->workers;
 141        struct worker_start *start;
 142        unsigned long flags;
 143
 144        rmb();
 145        if (!workers->atomic_start_pending)
 146                return;
 147
 148        start = kzalloc(sizeof(*start), GFP_NOFS);
 149        if (!start)
 150                return;
 151
 152        start->work.func = start_new_worker_func;
 153        start->queue = workers;
 154
 155        spin_lock_irqsave(&workers->lock, flags);
 156        if (!workers->atomic_start_pending)
 157                goto out;
 158
 159        workers->atomic_start_pending = 0;
 160        if (workers->num_workers + workers->num_workers_starting >=
 161            workers->max_workers)
 162                goto out;
 163
 164        workers->num_workers_starting += 1;
 165        spin_unlock_irqrestore(&workers->lock, flags);
 166        btrfs_queue_worker(workers->atomic_worker_start, &start->work);
 167        return;
 168
 169out:
 170        kfree(start);
 171        spin_unlock_irqrestore(&workers->lock, flags);
 172}
 173
 174static noinline void run_ordered_completions(struct btrfs_workers *workers,
 175                                            struct btrfs_work *work)
 176{
 177        if (!workers->ordered)
 178                return;
 179
 180        set_bit(WORK_DONE_BIT, &work->flags);
 181
 182        spin_lock(&workers->order_lock);
 183
 184        while (1) {
 185                if (!list_empty(&workers->prio_order_list)) {
 186                        work = list_entry(workers->prio_order_list.next,
 187                                          struct btrfs_work, order_list);
 188                } else if (!list_empty(&workers->order_list)) {
 189                        work = list_entry(workers->order_list.next,
 190                                          struct btrfs_work, order_list);
 191                } else {
 192                        break;
 193                }
 194                if (!test_bit(WORK_DONE_BIT, &work->flags))
 195                        break;
 196
 197                /* we are going to call the ordered done function, but
 198                 * we leave the work item on the list as a barrier so
 199                 * that later work items that are done don't have their
 200                 * functions called before this one returns
 201                 */
 202                if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
 203                        break;
 204
 205                spin_unlock(&workers->order_lock);
 206
 207                work->ordered_func(work);
 208
 209                /* now take the lock again and drop our item from the list */
 210                spin_lock(&workers->order_lock);
 211                list_del(&work->order_list);
 212                spin_unlock(&workers->order_lock);
 213
 214                /*
 215                 * we don't want to call the ordered free functions
 216                 * with the lock held though
 217                 */
 218                work->ordered_free(work);
 219                spin_lock(&workers->order_lock);
 220        }
 221
 222        spin_unlock(&workers->order_lock);
 223}
 224
 225static void put_worker(struct btrfs_worker_thread *worker)
 226{
 227        if (atomic_dec_and_test(&worker->refs))
 228                kfree(worker);
 229}
 230
 231static int try_worker_shutdown(struct btrfs_worker_thread *worker)
 232{
 233        int freeit = 0;
 234
 235        spin_lock_irq(&worker->lock);
 236        spin_lock(&worker->workers->lock);
 237        if (worker->workers->num_workers > 1 &&
 238            worker->idle &&
 239            !worker->working &&
 240            !list_empty(&worker->worker_list) &&
 241            list_empty(&worker->prio_pending) &&
 242            list_empty(&worker->pending) &&
 243            atomic_read(&worker->num_pending) == 0) {
 244                freeit = 1;
 245                list_del_init(&worker->worker_list);
 246                worker->workers->num_workers--;
 247        }
 248        spin_unlock(&worker->workers->lock);
 249        spin_unlock_irq(&worker->lock);
 250
 251        if (freeit)
 252                put_worker(worker);
 253        return freeit;
 254}
 255
 256static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
 257                                        struct list_head *prio_head,
 258                                        struct list_head *head)
 259{
 260        struct btrfs_work *work = NULL;
 261        struct list_head *cur = NULL;
 262
 263        if(!list_empty(prio_head))
 264                cur = prio_head->next;
 265
 266        smp_mb();
 267        if (!list_empty(&worker->prio_pending))
 268                goto refill;
 269
 270        if (!list_empty(head))
 271                cur = head->next;
 272
 273        if (cur)
 274                goto out;
 275
 276refill:
 277        spin_lock_irq(&worker->lock);
 278        list_splice_tail_init(&worker->prio_pending, prio_head);
 279        list_splice_tail_init(&worker->pending, head);
 280
 281        if (!list_empty(prio_head))
 282                cur = prio_head->next;
 283        else if (!list_empty(head))
 284                cur = head->next;
 285        spin_unlock_irq(&worker->lock);
 286
 287        if (!cur)
 288                goto out_fail;
 289
 290out:
 291        work = list_entry(cur, struct btrfs_work, list);
 292
 293out_fail:
 294        return work;
 295}
 296
 297/*
 298 * main loop for servicing work items
 299 */
 300static int worker_loop(void *arg)
 301{
 302        struct btrfs_worker_thread *worker = arg;
 303        struct list_head head;
 304        struct list_head prio_head;
 305        struct btrfs_work *work;
 306
 307        INIT_LIST_HEAD(&head);
 308        INIT_LIST_HEAD(&prio_head);
 309
 310        do {
 311again:
 312                while (1) {
 313
 314
 315                        work = get_next_work(worker, &prio_head, &head);
 316                        if (!work)
 317                                break;
 318
 319                        list_del(&work->list);
 320                        clear_bit(WORK_QUEUED_BIT, &work->flags);
 321
 322                        work->worker = worker;
 323
 324                        work->func(work);
 325
 326                        atomic_dec(&worker->num_pending);
 327                        /*
 328                         * unless this is an ordered work queue,
 329                         * 'work' was probably freed by func above.
 330                         */
 331                        run_ordered_completions(worker->workers, work);
 332
 333                        check_pending_worker_creates(worker);
 334                        cond_resched();
 335                }
 336
 337                spin_lock_irq(&worker->lock);
 338                check_idle_worker(worker);
 339
 340                if (freezing(current)) {
 341                        worker->working = 0;
 342                        spin_unlock_irq(&worker->lock);
 343                        try_to_freeze();
 344                } else {
 345                        spin_unlock_irq(&worker->lock);
 346                        if (!kthread_should_stop()) {
 347                                cpu_relax();
 348                                /*
 349                                 * we've dropped the lock, did someone else
 350                                 * jump_in?
 351                                 */
 352                                smp_mb();
 353                                if (!list_empty(&worker->pending) ||
 354                                    !list_empty(&worker->prio_pending))
 355                                        continue;
 356
 357                                /*
 358                                 * this short schedule allows more work to
 359                                 * come in without the queue functions
 360                                 * needing to go through wake_up_process()
 361                                 *
 362                                 * worker->working is still 1, so nobody
 363                                 * is going to try and wake us up
 364                                 */
 365                                schedule_timeout(1);
 366                                smp_mb();
 367                                if (!list_empty(&worker->pending) ||
 368                                    !list_empty(&worker->prio_pending))
 369                                        continue;
 370
 371                                if (kthread_should_stop())
 372                                        break;
 373
 374                                /* still no more work?, sleep for real */
 375                                spin_lock_irq(&worker->lock);
 376                                set_current_state(TASK_INTERRUPTIBLE);
 377                                if (!list_empty(&worker->pending) ||
 378                                    !list_empty(&worker->prio_pending)) {
 379                                        spin_unlock_irq(&worker->lock);
 380                                        set_current_state(TASK_RUNNING);
 381                                        goto again;
 382                                }
 383
 384                                /*
 385                                 * this makes sure we get a wakeup when someone
 386                                 * adds something new to the queue
 387                                 */
 388                                worker->working = 0;
 389                                spin_unlock_irq(&worker->lock);
 390
 391                                if (!kthread_should_stop()) {
 392                                        schedule_timeout(HZ * 120);
 393                                        if (!worker->working &&
 394                                            try_worker_shutdown(worker)) {
 395                                                return 0;
 396                                        }
 397                                }
 398                        }
 399                        __set_current_state(TASK_RUNNING);
 400                }
 401        } while (!kthread_should_stop());
 402        return 0;
 403}
 404
 405/*
 406 * this will wait for all the worker threads to shutdown
 407 */
 408void btrfs_stop_workers(struct btrfs_workers *workers)
 409{
 410        struct list_head *cur;
 411        struct btrfs_worker_thread *worker;
 412        int can_stop;
 413
 414        spin_lock_irq(&workers->lock);
 415        list_splice_init(&workers->idle_list, &workers->worker_list);
 416        while (!list_empty(&workers->worker_list)) {
 417                cur = workers->worker_list.next;
 418                worker = list_entry(cur, struct btrfs_worker_thread,
 419                                    worker_list);
 420
 421                atomic_inc(&worker->refs);
 422                workers->num_workers -= 1;
 423                if (!list_empty(&worker->worker_list)) {
 424                        list_del_init(&worker->worker_list);
 425                        put_worker(worker);
 426                        can_stop = 1;
 427                } else
 428                        can_stop = 0;
 429                spin_unlock_irq(&workers->lock);
 430                if (can_stop)
 431                        kthread_stop(worker->task);
 432                spin_lock_irq(&workers->lock);
 433                put_worker(worker);
 434        }
 435        spin_unlock_irq(&workers->lock);
 436}
 437
 438/*
 439 * simple init on struct btrfs_workers
 440 */
 441void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
 442                        struct btrfs_workers *async_helper)
 443{
 444        workers->num_workers = 0;
 445        workers->num_workers_starting = 0;
 446        INIT_LIST_HEAD(&workers->worker_list);
 447        INIT_LIST_HEAD(&workers->idle_list);
 448        INIT_LIST_HEAD(&workers->order_list);
 449        INIT_LIST_HEAD(&workers->prio_order_list);
 450        spin_lock_init(&workers->lock);
 451        spin_lock_init(&workers->order_lock);
 452        workers->max_workers = max;
 453        workers->idle_thresh = 32;
 454        workers->name = name;
 455        workers->ordered = 0;
 456        workers->atomic_start_pending = 0;
 457        workers->atomic_worker_start = async_helper;
 458}
 459
 460/*
 461 * starts new worker threads.  This does not enforce the max worker
 462 * count in case you need to temporarily go past it.
 463 */
 464static int __btrfs_start_workers(struct btrfs_workers *workers)
 465{
 466        struct btrfs_worker_thread *worker;
 467        int ret = 0;
 468
 469        worker = kzalloc(sizeof(*worker), GFP_NOFS);
 470        if (!worker) {
 471                ret = -ENOMEM;
 472                goto fail;
 473        }
 474
 475        INIT_LIST_HEAD(&worker->pending);
 476        INIT_LIST_HEAD(&worker->prio_pending);
 477        INIT_LIST_HEAD(&worker->worker_list);
 478        spin_lock_init(&worker->lock);
 479
 480        atomic_set(&worker->num_pending, 0);
 481        atomic_set(&worker->refs, 1);
 482        worker->workers = workers;
 483        worker->task = kthread_run(worker_loop, worker,
 484                                   "btrfs-%s-%d", workers->name,
 485                                   workers->num_workers + 1);
 486        if (IS_ERR(worker->task)) {
 487                ret = PTR_ERR(worker->task);
 488                kfree(worker);
 489                goto fail;
 490        }
 491        spin_lock_irq(&workers->lock);
 492        list_add_tail(&worker->worker_list, &workers->idle_list);
 493        worker->idle = 1;
 494        workers->num_workers++;
 495        workers->num_workers_starting--;
 496        WARN_ON(workers->num_workers_starting < 0);
 497        spin_unlock_irq(&workers->lock);
 498
 499        return 0;
 500fail:
 501        spin_lock_irq(&workers->lock);
 502        workers->num_workers_starting--;
 503        spin_unlock_irq(&workers->lock);
 504        return ret;
 505}
 506
 507int btrfs_start_workers(struct btrfs_workers *workers)
 508{
 509        spin_lock_irq(&workers->lock);
 510        workers->num_workers_starting++;
 511        spin_unlock_irq(&workers->lock);
 512        return __btrfs_start_workers(workers);
 513}
 514
 515/*
 516 * run through the list and find a worker thread that doesn't have a lot
 517 * to do right now.  This can return null if we aren't yet at the thread
 518 * count limit and all of the threads are busy.
 519 */
 520static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
 521{
 522        struct btrfs_worker_thread *worker;
 523        struct list_head *next;
 524        int enforce_min;
 525
 526        enforce_min = (workers->num_workers + workers->num_workers_starting) <
 527                workers->max_workers;
 528
 529        /*
 530         * if we find an idle thread, don't move it to the end of the
 531         * idle list.  This improves the chance that the next submission
 532         * will reuse the same thread, and maybe catch it while it is still
 533         * working
 534         */
 535        if (!list_empty(&workers->idle_list)) {
 536                next = workers->idle_list.next;
 537                worker = list_entry(next, struct btrfs_worker_thread,
 538                                    worker_list);
 539                return worker;
 540        }
 541        if (enforce_min || list_empty(&workers->worker_list))
 542                return NULL;
 543
 544        /*
 545         * if we pick a busy task, move the task to the end of the list.
 546         * hopefully this will keep things somewhat evenly balanced.
 547         * Do the move in batches based on the sequence number.  This groups
 548         * requests submitted at roughly the same time onto the same worker.
 549         */
 550        next = workers->worker_list.next;
 551        worker = list_entry(next, struct btrfs_worker_thread, worker_list);
 552        worker->sequence++;
 553
 554        if (worker->sequence % workers->idle_thresh == 0)
 555                list_move_tail(next, &workers->worker_list);
 556        return worker;
 557}
 558
 559/*
 560 * selects a worker thread to take the next job.  This will either find
 561 * an idle worker, start a new worker up to the max count, or just return
 562 * one of the existing busy workers.
 563 */
 564static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
 565{
 566        struct btrfs_worker_thread *worker;
 567        unsigned long flags;
 568        struct list_head *fallback;
 569        int ret;
 570
 571        spin_lock_irqsave(&workers->lock, flags);
 572again:
 573        worker = next_worker(workers);
 574
 575        if (!worker) {
 576                if (workers->num_workers + workers->num_workers_starting >=
 577                    workers->max_workers) {
 578                        goto fallback;
 579                } else if (workers->atomic_worker_start) {
 580                        workers->atomic_start_pending = 1;
 581                        goto fallback;
 582                } else {
 583                        workers->num_workers_starting++;
 584                        spin_unlock_irqrestore(&workers->lock, flags);
 585                        /* we're below the limit, start another worker */
 586                        ret = __btrfs_start_workers(workers);
 587                        spin_lock_irqsave(&workers->lock, flags);
 588                        if (ret)
 589                                goto fallback;
 590                        goto again;
 591                }
 592        }
 593        goto found;
 594
 595fallback:
 596        fallback = NULL;
 597        /*
 598         * we have failed to find any workers, just
 599         * return the first one we can find.
 600         */
 601        if (!list_empty(&workers->worker_list))
 602                fallback = workers->worker_list.next;
 603        if (!list_empty(&workers->idle_list))
 604                fallback = workers->idle_list.next;
 605        BUG_ON(!fallback);
 606        worker = list_entry(fallback,
 607                  struct btrfs_worker_thread, worker_list);
 608found:
 609        /*
 610         * this makes sure the worker doesn't exit before it is placed
 611         * onto a busy/idle list
 612         */
 613        atomic_inc(&worker->num_pending);
 614        spin_unlock_irqrestore(&workers->lock, flags);
 615        return worker;
 616}
 617
 618/*
 619 * btrfs_requeue_work just puts the work item back on the tail of the list
 620 * it was taken from.  It is intended for use with long running work functions
 621 * that make some progress and want to give the cpu up for others.
 622 */
 623void btrfs_requeue_work(struct btrfs_work *work)
 624{
 625        struct btrfs_worker_thread *worker = work->worker;
 626        unsigned long flags;
 627        int wake = 0;
 628
 629        if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
 630                return;
 631
 632        spin_lock_irqsave(&worker->lock, flags);
 633        if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
 634                list_add_tail(&work->list, &worker->prio_pending);
 635        else
 636                list_add_tail(&work->list, &worker->pending);
 637        atomic_inc(&worker->num_pending);
 638
 639        /* by definition we're busy, take ourselves off the idle
 640         * list
 641         */
 642        if (worker->idle) {
 643                spin_lock(&worker->workers->lock);
 644                worker->idle = 0;
 645                list_move_tail(&worker->worker_list,
 646                              &worker->workers->worker_list);
 647                spin_unlock(&worker->workers->lock);
 648        }
 649        if (!worker->working) {
 650                wake = 1;
 651                worker->working = 1;
 652        }
 653
 654        if (wake)
 655                wake_up_process(worker->task);
 656        spin_unlock_irqrestore(&worker->lock, flags);
 657}
 658
 659void btrfs_set_work_high_prio(struct btrfs_work *work)
 660{
 661        set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
 662}
 663
 664/*
 665 * places a struct btrfs_work into the pending queue of one of the kthreads
 666 */
 667void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
 668{
 669        struct btrfs_worker_thread *worker;
 670        unsigned long flags;
 671        int wake = 0;
 672
 673        /* don't requeue something already on a list */
 674        if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
 675                return;
 676
 677        worker = find_worker(workers);
 678        if (workers->ordered) {
 679                /*
 680                 * you're not allowed to do ordered queues from an
 681                 * interrupt handler
 682                 */
 683                spin_lock(&workers->order_lock);
 684                if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
 685                        list_add_tail(&work->order_list,
 686                                      &workers->prio_order_list);
 687                } else {
 688                        list_add_tail(&work->order_list, &workers->order_list);
 689                }
 690                spin_unlock(&workers->order_lock);
 691        } else {
 692                INIT_LIST_HEAD(&work->order_list);
 693        }
 694
 695        spin_lock_irqsave(&worker->lock, flags);
 696
 697        if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
 698                list_add_tail(&work->list, &worker->prio_pending);
 699        else
 700                list_add_tail(&work->list, &worker->pending);
 701        check_busy_worker(worker);
 702
 703        /*
 704         * avoid calling into wake_up_process if this thread has already
 705         * been kicked
 706         */
 707        if (!worker->working)
 708                wake = 1;
 709        worker->working = 1;
 710
 711        if (wake)
 712                wake_up_process(worker->task);
 713        spin_unlock_irqrestore(&worker->lock, flags);
 714}
 715