linux/fs/btrfs/async-thread.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2007 Oracle.  All rights reserved.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public
   6 * License v2 as published by the Free Software Foundation.
   7 *
   8 * This program is distributed in the hope that it will be useful,
   9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  11 * General Public License for more details.
  12 *
  13 * You should have received a copy of the GNU General Public
  14 * License along with this program; if not, write to the
  15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  16 * Boston, MA 021110-1307, USA.
  17 */
  18
  19#include <linux/kthread.h>
  20#include <linux/list.h>
  21#include <linux/spinlock.h>
  22#include <linux/freezer.h>
  23#include "async-thread.h"
  24
  25#define WORK_QUEUED_BIT 0
  26#define WORK_DONE_BIT 1
  27#define WORK_ORDER_DONE_BIT 2
  28#define WORK_HIGH_PRIO_BIT 3
  29
  30/*
  31 * container for the kthread task pointer and the list of pending work
  32 * One of these is allocated per thread.
  33 */
  34struct btrfs_worker_thread {
  35        /* pool we belong to */
  36        struct btrfs_workers *workers;
  37
  38        /* list of struct btrfs_work that are waiting for service */
  39        struct list_head pending;
  40        struct list_head prio_pending;
  41
  42        /* list of worker threads from struct btrfs_workers */
  43        struct list_head worker_list;
  44
  45        /* kthread */
  46        struct task_struct *task;
  47
  48        /* number of things on the pending list */
  49        atomic_t num_pending;
  50
  51        /* reference counter for this struct */
  52        atomic_t refs;
  53
  54        unsigned long sequence;
  55
  56        /* protects the pending list. */
  57        spinlock_t lock;
  58
  59        /* set to non-zero when this thread is already awake and kicking */
  60        int working;
  61
  62        /* are we currently idle */
  63        int idle;
  64};
  65
  66/*
  67 * btrfs_start_workers uses kthread_run, which can block waiting for memory
  68 * for a very long time.  It will actually throttle on page writeback,
  69 * and so it may not make progress until after our btrfs worker threads
  70 * process all of the pending work structs in their queue
  71 *
  72 * This means we can't use btrfs_start_workers from inside a btrfs worker
  73 * thread that is used as part of cleaning dirty memory, which pretty much
  74 * involves all of the worker threads.
  75 *
  76 * Instead we have a helper queue who never has more than one thread
  77 * where we scheduler thread start operations.  This worker_start struct
  78 * is used to contain the work and hold a pointer to the queue that needs
  79 * another worker.
  80 */
  81struct worker_start {
  82        struct btrfs_work work;
  83        struct btrfs_workers *queue;
  84};
  85
  86static void start_new_worker_func(struct btrfs_work *work)
  87{
  88        struct worker_start *start;
  89        start = container_of(work, struct worker_start, work);
  90        btrfs_start_workers(start->queue, 1);
  91        kfree(start);
  92}
  93
  94static int start_new_worker(struct btrfs_workers *queue)
  95{
  96        struct worker_start *start;
  97        int ret;
  98
  99        start = kzalloc(sizeof(*start), GFP_NOFS);
 100        if (!start)
 101                return -ENOMEM;
 102
 103        start->work.func = start_new_worker_func;
 104        start->queue = queue;
 105        ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
 106        if (ret)
 107                kfree(start);
 108        return ret;
 109}
 110
 111/*
 112 * helper function to move a thread onto the idle list after it
 113 * has finished some requests.
 114 */
 115static void check_idle_worker(struct btrfs_worker_thread *worker)
 116{
 117        if (!worker->idle && atomic_read(&worker->num_pending) <
 118            worker->workers->idle_thresh / 2) {
 119                unsigned long flags;
 120                spin_lock_irqsave(&worker->workers->lock, flags);
 121                worker->idle = 1;
 122
 123                /* the list may be empty if the worker is just starting */
 124                if (!list_empty(&worker->worker_list)) {
 125                        list_move(&worker->worker_list,
 126                                 &worker->workers->idle_list);
 127                }
 128                spin_unlock_irqrestore(&worker->workers->lock, flags);
 129        }
 130}
 131
 132/*
 133 * helper function to move a thread off the idle list after new
 134 * pending work is added.
 135 */
 136static void check_busy_worker(struct btrfs_worker_thread *worker)
 137{
 138        if (worker->idle && atomic_read(&worker->num_pending) >=
 139            worker->workers->idle_thresh) {
 140                unsigned long flags;
 141                spin_lock_irqsave(&worker->workers->lock, flags);
 142                worker->idle = 0;
 143
 144                if (!list_empty(&worker->worker_list)) {
 145                        list_move_tail(&worker->worker_list,
 146                                      &worker->workers->worker_list);
 147                }
 148                spin_unlock_irqrestore(&worker->workers->lock, flags);
 149        }
 150}
 151
 152static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
 153{
 154        struct btrfs_workers *workers = worker->workers;
 155        unsigned long flags;
 156
 157        rmb();
 158        if (!workers->atomic_start_pending)
 159                return;
 160
 161        spin_lock_irqsave(&workers->lock, flags);
 162        if (!workers->atomic_start_pending)
 163                goto out;
 164
 165        workers->atomic_start_pending = 0;
 166        if (workers->num_workers + workers->num_workers_starting >=
 167            workers->max_workers)
 168                goto out;
 169
 170        workers->num_workers_starting += 1;
 171        spin_unlock_irqrestore(&workers->lock, flags);
 172        start_new_worker(workers);
 173        return;
 174
 175out:
 176        spin_unlock_irqrestore(&workers->lock, flags);
 177}
 178
 179static noinline int run_ordered_completions(struct btrfs_workers *workers,
 180                                            struct btrfs_work *work)
 181{
 182        if (!workers->ordered)
 183                return 0;
 184
 185        set_bit(WORK_DONE_BIT, &work->flags);
 186
 187        spin_lock(&workers->order_lock);
 188
 189        while (1) {
 190                if (!list_empty(&workers->prio_order_list)) {
 191                        work = list_entry(workers->prio_order_list.next,
 192                                          struct btrfs_work, order_list);
 193                } else if (!list_empty(&workers->order_list)) {
 194                        work = list_entry(workers->order_list.next,
 195                                          struct btrfs_work, order_list);
 196                } else {
 197                        break;
 198                }
 199                if (!test_bit(WORK_DONE_BIT, &work->flags))
 200                        break;
 201
 202                /* we are going to call the ordered done function, but
 203                 * we leave the work item on the list as a barrier so
 204                 * that later work items that are done don't have their
 205                 * functions called before this one returns
 206                 */
 207                if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
 208                        break;
 209
 210                spin_unlock(&workers->order_lock);
 211
 212                work->ordered_func(work);
 213
 214                /* now take the lock again and call the freeing code */
 215                spin_lock(&workers->order_lock);
 216                list_del(&work->order_list);
 217                work->ordered_free(work);
 218        }
 219
 220        spin_unlock(&workers->order_lock);
 221        return 0;
 222}
 223
 224static void put_worker(struct btrfs_worker_thread *worker)
 225{
 226        if (atomic_dec_and_test(&worker->refs))
 227                kfree(worker);
 228}
 229
 230static int try_worker_shutdown(struct btrfs_worker_thread *worker)
 231{
 232        int freeit = 0;
 233
 234        spin_lock_irq(&worker->lock);
 235        spin_lock(&worker->workers->lock);
 236        if (worker->workers->num_workers > 1 &&
 237            worker->idle &&
 238            !worker->working &&
 239            !list_empty(&worker->worker_list) &&
 240            list_empty(&worker->prio_pending) &&
 241            list_empty(&worker->pending) &&
 242            atomic_read(&worker->num_pending) == 0) {
 243                freeit = 1;
 244                list_del_init(&worker->worker_list);
 245                worker->workers->num_workers--;
 246        }
 247        spin_unlock(&worker->workers->lock);
 248        spin_unlock_irq(&worker->lock);
 249
 250        if (freeit)
 251                put_worker(worker);
 252        return freeit;
 253}
 254
 255static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
 256                                        struct list_head *prio_head,
 257                                        struct list_head *head)
 258{
 259        struct btrfs_work *work = NULL;
 260        struct list_head *cur = NULL;
 261
 262        if(!list_empty(prio_head))
 263                cur = prio_head->next;
 264
 265        smp_mb();
 266        if (!list_empty(&worker->prio_pending))
 267                goto refill;
 268
 269        if (!list_empty(head))
 270                cur = head->next;
 271
 272        if (cur)
 273                goto out;
 274
 275refill:
 276        spin_lock_irq(&worker->lock);
 277        list_splice_tail_init(&worker->prio_pending, prio_head);
 278        list_splice_tail_init(&worker->pending, head);
 279
 280        if (!list_empty(prio_head))
 281                cur = prio_head->next;
 282        else if (!list_empty(head))
 283                cur = head->next;
 284        spin_unlock_irq(&worker->lock);
 285
 286        if (!cur)
 287                goto out_fail;
 288
 289out:
 290        work = list_entry(cur, struct btrfs_work, list);
 291
 292out_fail:
 293        return work;
 294}
 295
 296/*
 297 * main loop for servicing work items
 298 */
 299static int worker_loop(void *arg)
 300{
 301        struct btrfs_worker_thread *worker = arg;
 302        struct list_head head;
 303        struct list_head prio_head;
 304        struct btrfs_work *work;
 305
 306        INIT_LIST_HEAD(&head);
 307        INIT_LIST_HEAD(&prio_head);
 308
 309        do {
 310again:
 311                while (1) {
 312
 313
 314                        work = get_next_work(worker, &prio_head, &head);
 315                        if (!work)
 316                                break;
 317
 318                        list_del(&work->list);
 319                        clear_bit(WORK_QUEUED_BIT, &work->flags);
 320
 321                        work->worker = worker;
 322
 323                        work->func(work);
 324
 325                        atomic_dec(&worker->num_pending);
 326                        /*
 327                         * unless this is an ordered work queue,
 328                         * 'work' was probably freed by func above.
 329                         */
 330                        run_ordered_completions(worker->workers, work);
 331
 332                        check_pending_worker_creates(worker);
 333
 334                }
 335
 336                spin_lock_irq(&worker->lock);
 337                check_idle_worker(worker);
 338
 339                if (freezing(current)) {
 340                        worker->working = 0;
 341                        spin_unlock_irq(&worker->lock);
 342                        refrigerator();
 343                } else {
 344                        spin_unlock_irq(&worker->lock);
 345                        if (!kthread_should_stop()) {
 346                                cpu_relax();
 347                                /*
 348                                 * we've dropped the lock, did someone else
 349                                 * jump_in?
 350                                 */
 351                                smp_mb();
 352                                if (!list_empty(&worker->pending) ||
 353                                    !list_empty(&worker->prio_pending))
 354                                        continue;
 355
 356                                /*
 357                                 * this short schedule allows more work to
 358                                 * come in without the queue functions
 359                                 * needing to go through wake_up_process()
 360                                 *
 361                                 * worker->working is still 1, so nobody
 362                                 * is going to try and wake us up
 363                                 */
 364                                schedule_timeout(1);
 365                                smp_mb();
 366                                if (!list_empty(&worker->pending) ||
 367                                    !list_empty(&worker->prio_pending))
 368                                        continue;
 369
 370                                if (kthread_should_stop())
 371                                        break;
 372
 373                                /* still no more work?, sleep for real */
 374                                spin_lock_irq(&worker->lock);
 375                                set_current_state(TASK_INTERRUPTIBLE);
 376                                if (!list_empty(&worker->pending) ||
 377                                    !list_empty(&worker->prio_pending)) {
 378                                        spin_unlock_irq(&worker->lock);
 379                                        goto again;
 380                                }
 381
 382                                /*
 383                                 * this makes sure we get a wakeup when someone
 384                                 * adds something new to the queue
 385                                 */
 386                                worker->working = 0;
 387                                spin_unlock_irq(&worker->lock);
 388
 389                                if (!kthread_should_stop()) {
 390                                        schedule_timeout(HZ * 120);
 391                                        if (!worker->working &&
 392                                            try_worker_shutdown(worker)) {
 393                                                return 0;
 394                                        }
 395                                }
 396                        }
 397                        __set_current_state(TASK_RUNNING);
 398                }
 399        } while (!kthread_should_stop());
 400        return 0;
 401}
 402
 403/*
 404 * this will wait for all the worker threads to shutdown
 405 */
 406int btrfs_stop_workers(struct btrfs_workers *workers)
 407{
 408        struct list_head *cur;
 409        struct btrfs_worker_thread *worker;
 410        int can_stop;
 411
 412        spin_lock_irq(&workers->lock);
 413        list_splice_init(&workers->idle_list, &workers->worker_list);
 414        while (!list_empty(&workers->worker_list)) {
 415                cur = workers->worker_list.next;
 416                worker = list_entry(cur, struct btrfs_worker_thread,
 417                                    worker_list);
 418
 419                atomic_inc(&worker->refs);
 420                workers->num_workers -= 1;
 421                if (!list_empty(&worker->worker_list)) {
 422                        list_del_init(&worker->worker_list);
 423                        put_worker(worker);
 424                        can_stop = 1;
 425                } else
 426                        can_stop = 0;
 427                spin_unlock_irq(&workers->lock);
 428                if (can_stop)
 429                        kthread_stop(worker->task);
 430                spin_lock_irq(&workers->lock);
 431                put_worker(worker);
 432        }
 433        spin_unlock_irq(&workers->lock);
 434        return 0;
 435}
 436
 437/*
 438 * simple init on struct btrfs_workers
 439 */
 440void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
 441                        struct btrfs_workers *async_helper)
 442{
 443        workers->num_workers = 0;
 444        workers->num_workers_starting = 0;
 445        INIT_LIST_HEAD(&workers->worker_list);
 446        INIT_LIST_HEAD(&workers->idle_list);
 447        INIT_LIST_HEAD(&workers->order_list);
 448        INIT_LIST_HEAD(&workers->prio_order_list);
 449        spin_lock_init(&workers->lock);
 450        spin_lock_init(&workers->order_lock);
 451        workers->max_workers = max;
 452        workers->idle_thresh = 32;
 453        workers->name = name;
 454        workers->ordered = 0;
 455        workers->atomic_start_pending = 0;
 456        workers->atomic_worker_start = async_helper;
 457}
 458
 459/*
 460 * starts new worker threads.  This does not enforce the max worker
 461 * count in case you need to temporarily go past it.
 462 */
 463static int __btrfs_start_workers(struct btrfs_workers *workers,
 464                                 int num_workers)
 465{
 466        struct btrfs_worker_thread *worker;
 467        int ret = 0;
 468        int i;
 469
 470        for (i = 0; i < num_workers; i++) {
 471                worker = kzalloc(sizeof(*worker), GFP_NOFS);
 472                if (!worker) {
 473                        ret = -ENOMEM;
 474                        goto fail;
 475                }
 476
 477                INIT_LIST_HEAD(&worker->pending);
 478                INIT_LIST_HEAD(&worker->prio_pending);
 479                INIT_LIST_HEAD(&worker->worker_list);
 480                spin_lock_init(&worker->lock);
 481
 482                atomic_set(&worker->num_pending, 0);
 483                atomic_set(&worker->refs, 1);
 484                worker->workers = workers;
 485                worker->task = kthread_run(worker_loop, worker,
 486                                           "btrfs-%s-%d", workers->name,
 487                                           workers->num_workers + i);
 488                if (IS_ERR(worker->task)) {
 489                        ret = PTR_ERR(worker->task);
 490                        kfree(worker);
 491                        goto fail;
 492                }
 493                spin_lock_irq(&workers->lock);
 494                list_add_tail(&worker->worker_list, &workers->idle_list);
 495                worker->idle = 1;
 496                workers->num_workers++;
 497                workers->num_workers_starting--;
 498                WARN_ON(workers->num_workers_starting < 0);
 499                spin_unlock_irq(&workers->lock);
 500        }
 501        return 0;
 502fail:
 503        btrfs_stop_workers(workers);
 504        return ret;
 505}
 506
 507int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
 508{
 509        spin_lock_irq(&workers->lock);
 510        workers->num_workers_starting += num_workers;
 511        spin_unlock_irq(&workers->lock);
 512        return __btrfs_start_workers(workers, num_workers);
 513}
 514
 515/*
 516 * run through the list and find a worker thread that doesn't have a lot
 517 * to do right now.  This can return null if we aren't yet at the thread
 518 * count limit and all of the threads are busy.
 519 */
 520static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
 521{
 522        struct btrfs_worker_thread *worker;
 523        struct list_head *next;
 524        int enforce_min;
 525
 526        enforce_min = (workers->num_workers + workers->num_workers_starting) <
 527                workers->max_workers;
 528
 529        /*
 530         * if we find an idle thread, don't move it to the end of the
 531         * idle list.  This improves the chance that the next submission
 532         * will reuse the same thread, and maybe catch it while it is still
 533         * working
 534         */
 535        if (!list_empty(&workers->idle_list)) {
 536                next = workers->idle_list.next;
 537                worker = list_entry(next, struct btrfs_worker_thread,
 538                                    worker_list);
 539                return worker;
 540        }
 541        if (enforce_min || list_empty(&workers->worker_list))
 542                return NULL;
 543
 544        /*
 545         * if we pick a busy task, move the task to the end of the list.
 546         * hopefully this will keep things somewhat evenly balanced.
 547         * Do the move in batches based on the sequence number.  This groups
 548         * requests submitted at roughly the same time onto the same worker.
 549         */
 550        next = workers->worker_list.next;
 551        worker = list_entry(next, struct btrfs_worker_thread, worker_list);
 552        worker->sequence++;
 553
 554        if (worker->sequence % workers->idle_thresh == 0)
 555                list_move_tail(next, &workers->worker_list);
 556        return worker;
 557}
 558
 559/*
 560 * selects a worker thread to take the next job.  This will either find
 561 * an idle worker, start a new worker up to the max count, or just return
 562 * one of the existing busy workers.
 563 */
 564static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
 565{
 566        struct btrfs_worker_thread *worker;
 567        unsigned long flags;
 568        struct list_head *fallback;
 569
 570again:
 571        spin_lock_irqsave(&workers->lock, flags);
 572        worker = next_worker(workers);
 573
 574        if (!worker) {
 575                if (workers->num_workers + workers->num_workers_starting >=
 576                    workers->max_workers) {
 577                        goto fallback;
 578                } else if (workers->atomic_worker_start) {
 579                        workers->atomic_start_pending = 1;
 580                        goto fallback;
 581                } else {
 582                        workers->num_workers_starting++;
 583                        spin_unlock_irqrestore(&workers->lock, flags);
 584                        /* we're below the limit, start another worker */
 585                        __btrfs_start_workers(workers, 1);
 586                        goto again;
 587                }
 588        }
 589        goto found;
 590
 591fallback:
 592        fallback = NULL;
 593        /*
 594         * we have failed to find any workers, just
 595         * return the first one we can find.
 596         */
 597        if (!list_empty(&workers->worker_list))
 598                fallback = workers->worker_list.next;
 599        if (!list_empty(&workers->idle_list))
 600                fallback = workers->idle_list.next;
 601        BUG_ON(!fallback);
 602        worker = list_entry(fallback,
 603                  struct btrfs_worker_thread, worker_list);
 604found:
 605        /*
 606         * this makes sure the worker doesn't exit before it is placed
 607         * onto a busy/idle list
 608         */
 609        atomic_inc(&worker->num_pending);
 610        spin_unlock_irqrestore(&workers->lock, flags);
 611        return worker;
 612}
 613
 614/*
 615 * btrfs_requeue_work just puts the work item back on the tail of the list
 616 * it was taken from.  It is intended for use with long running work functions
 617 * that make some progress and want to give the cpu up for others.
 618 */
 619int btrfs_requeue_work(struct btrfs_work *work)
 620{
 621        struct btrfs_worker_thread *worker = work->worker;
 622        unsigned long flags;
 623        int wake = 0;
 624
 625        if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
 626                goto out;
 627
 628        spin_lock_irqsave(&worker->lock, flags);
 629        if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
 630                list_add_tail(&work->list, &worker->prio_pending);
 631        else
 632                list_add_tail(&work->list, &worker->pending);
 633        atomic_inc(&worker->num_pending);
 634
 635        /* by definition we're busy, take ourselves off the idle
 636         * list
 637         */
 638        if (worker->idle) {
 639                spin_lock(&worker->workers->lock);
 640                worker->idle = 0;
 641                list_move_tail(&worker->worker_list,
 642                              &worker->workers->worker_list);
 643                spin_unlock(&worker->workers->lock);
 644        }
 645        if (!worker->working) {
 646                wake = 1;
 647                worker->working = 1;
 648        }
 649
 650        if (wake)
 651                wake_up_process(worker->task);
 652        spin_unlock_irqrestore(&worker->lock, flags);
 653out:
 654
 655        return 0;
 656}
 657
 658void btrfs_set_work_high_prio(struct btrfs_work *work)
 659{
 660        set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
 661}
 662
 663/*
 664 * places a struct btrfs_work into the pending queue of one of the kthreads
 665 */
 666int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
 667{
 668        struct btrfs_worker_thread *worker;
 669        unsigned long flags;
 670        int wake = 0;
 671
 672        /* don't requeue something already on a list */
 673        if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
 674                goto out;
 675
 676        worker = find_worker(workers);
 677        if (workers->ordered) {
 678                /*
 679                 * you're not allowed to do ordered queues from an
 680                 * interrupt handler
 681                 */
 682                spin_lock(&workers->order_lock);
 683                if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
 684                        list_add_tail(&work->order_list,
 685                                      &workers->prio_order_list);
 686                } else {
 687                        list_add_tail(&work->order_list, &workers->order_list);
 688                }
 689                spin_unlock(&workers->order_lock);
 690        } else {
 691                INIT_LIST_HEAD(&work->order_list);
 692        }
 693
 694        spin_lock_irqsave(&worker->lock, flags);
 695
 696        if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
 697                list_add_tail(&work->list, &worker->prio_pending);
 698        else
 699                list_add_tail(&work->list, &worker->pending);
 700        check_busy_worker(worker);
 701
 702        /*
 703         * avoid calling into wake_up_process if this thread has already
 704         * been kicked
 705         */
 706        if (!worker->working)
 707                wake = 1;
 708        worker->working = 1;
 709
 710        if (wake)
 711                wake_up_process(worker->task);
 712        spin_unlock_irqrestore(&worker->lock, flags);
 713
 714out:
 715        return 0;
 716}
 717