linux/fs/fscache/operation.c
<<
>>
Prefs
   1/* FS-Cache worker operation management routines
   2 *
   3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 *
  11 * See Documentation/filesystems/caching/operations.txt
  12 */
  13
  14#define FSCACHE_DEBUG_LEVEL OPERATION
  15#include <linux/module.h>
  16#include <linux/seq_file.h>
  17#include <linux/slab.h>
  18#include "internal.h"
  19
  20atomic_t fscache_op_debug_id;
  21EXPORT_SYMBOL(fscache_op_debug_id);
  22
  23static void fscache_operation_dummy_cancel(struct fscache_operation *op)
  24{
  25}
  26
  27/**
  28 * fscache_operation_init - Do basic initialisation of an operation
  29 * @op: The operation to initialise
  30 * @release: The release function to assign
  31 *
  32 * Do basic initialisation of an operation.  The caller must still set flags,
  33 * object and processor if needed.
  34 */
  35void fscache_operation_init(struct fscache_operation *op,
  36                            fscache_operation_processor_t processor,
  37                            fscache_operation_cancel_t cancel,
  38                            fscache_operation_release_t release)
  39{
  40        INIT_WORK(&op->work, fscache_op_work_func);
  41        atomic_set(&op->usage, 1);
  42        op->state = FSCACHE_OP_ST_INITIALISED;
  43        op->debug_id = atomic_inc_return(&fscache_op_debug_id);
  44        op->processor = processor;
  45        op->cancel = cancel ?: fscache_operation_dummy_cancel;
  46        op->release = release;
  47        INIT_LIST_HEAD(&op->pend_link);
  48        fscache_stat(&fscache_n_op_initialised);
  49}
  50EXPORT_SYMBOL(fscache_operation_init);
  51
  52/**
  53 * fscache_enqueue_operation - Enqueue an operation for processing
  54 * @op: The operation to enqueue
  55 *
  56 * Enqueue an operation for processing by the FS-Cache thread pool.
  57 *
  58 * This will get its own ref on the object.
  59 */
  60void fscache_enqueue_operation(struct fscache_operation *op)
  61{
  62        _enter("{OBJ%x OP%x,%u}",
  63               op->object->debug_id, op->debug_id, atomic_read(&op->usage));
  64
  65        ASSERT(list_empty(&op->pend_link));
  66        ASSERT(op->processor != NULL);
  67        ASSERT(fscache_object_is_available(op->object));
  68        ASSERTCMP(atomic_read(&op->usage), >, 0);
  69        ASSERTCMP(op->state, ==, FSCACHE_OP_ST_IN_PROGRESS);
  70
  71        fscache_stat(&fscache_n_op_enqueue);
  72        switch (op->flags & FSCACHE_OP_TYPE) {
  73        case FSCACHE_OP_ASYNC:
  74                _debug("queue async");
  75                atomic_inc(&op->usage);
  76                if (!queue_work(fscache_op_wq, &op->work))
  77                        fscache_put_operation(op);
  78                break;
  79        case FSCACHE_OP_MYTHREAD:
  80                _debug("queue for caller's attention");
  81                break;
  82        default:
  83                pr_err("Unexpected op type %lx", op->flags);
  84                BUG();
  85                break;
  86        }
  87}
  88EXPORT_SYMBOL(fscache_enqueue_operation);
  89
  90/*
  91 * start an op running
  92 */
  93static void fscache_run_op(struct fscache_object *object,
  94                           struct fscache_operation *op)
  95{
  96        ASSERTCMP(op->state, ==, FSCACHE_OP_ST_PENDING);
  97
  98        op->state = FSCACHE_OP_ST_IN_PROGRESS;
  99        object->n_in_progress++;
 100        if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
 101                wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
 102        if (op->processor)
 103                fscache_enqueue_operation(op);
 104        fscache_stat(&fscache_n_op_run);
 105}
 106
 107/*
 108 * report an unexpected submission
 109 */
 110static void fscache_report_unexpected_submission(struct fscache_object *object,
 111                                                 struct fscache_operation *op,
 112                                                 const struct fscache_state *ostate)
 113{
 114        static bool once_only;
 115        struct fscache_operation *p;
 116        unsigned n;
 117
 118        if (once_only)
 119                return;
 120        once_only = true;
 121
 122        kdebug("unexpected submission OP%x [OBJ%x %s]",
 123               op->debug_id, object->debug_id, object->state->name);
 124        kdebug("objstate=%s [%s]", object->state->name, ostate->name);
 125        kdebug("objflags=%lx", object->flags);
 126        kdebug("objevent=%lx [%lx]", object->events, object->event_mask);
 127        kdebug("ops=%u inp=%u exc=%u",
 128               object->n_ops, object->n_in_progress, object->n_exclusive);
 129
 130        if (!list_empty(&object->pending_ops)) {
 131                n = 0;
 132                list_for_each_entry(p, &object->pending_ops, pend_link) {
 133                        ASSERTCMP(p->object, ==, object);
 134                        kdebug("%p %p", op->processor, op->release);
 135                        n++;
 136                }
 137
 138                kdebug("n=%u", n);
 139        }
 140
 141        dump_stack();
 142}
 143
 144/*
 145 * submit an exclusive operation for an object
 146 * - other ops are excluded from running simultaneously with this one
 147 * - this gets any extra refs it needs on an op
 148 */
 149int fscache_submit_exclusive_op(struct fscache_object *object,
 150                                struct fscache_operation *op)
 151{
 152        const struct fscache_state *ostate;
 153        unsigned long flags;
 154        int ret;
 155
 156        _enter("{OBJ%x OP%x},", object->debug_id, op->debug_id);
 157
 158        ASSERTCMP(op->state, ==, FSCACHE_OP_ST_INITIALISED);
 159        ASSERTCMP(atomic_read(&op->usage), >, 0);
 160
 161        spin_lock(&object->lock);
 162        ASSERTCMP(object->n_ops, >=, object->n_in_progress);
 163        ASSERTCMP(object->n_ops, >=, object->n_exclusive);
 164        ASSERT(list_empty(&op->pend_link));
 165
 166        ostate = object->state;
 167        smp_rmb();
 168
 169        op->state = FSCACHE_OP_ST_PENDING;
 170        flags = READ_ONCE(object->flags);
 171        if (unlikely(!(flags & BIT(FSCACHE_OBJECT_IS_LIVE)))) {
 172                fscache_stat(&fscache_n_op_rejected);
 173                op->cancel(op);
 174                op->state = FSCACHE_OP_ST_CANCELLED;
 175                ret = -ENOBUFS;
 176        } else if (unlikely(fscache_cache_is_broken(object))) {
 177                op->cancel(op);
 178                op->state = FSCACHE_OP_ST_CANCELLED;
 179                ret = -EIO;
 180        } else if (flags & BIT(FSCACHE_OBJECT_IS_AVAILABLE)) {
 181                op->object = object;
 182                object->n_ops++;
 183                object->n_exclusive++;  /* reads and writes must wait */
 184
 185                if (object->n_in_progress > 0) {
 186                        atomic_inc(&op->usage);
 187                        list_add_tail(&op->pend_link, &object->pending_ops);
 188                        fscache_stat(&fscache_n_op_pend);
 189                } else if (!list_empty(&object->pending_ops)) {
 190                        atomic_inc(&op->usage);
 191                        list_add_tail(&op->pend_link, &object->pending_ops);
 192                        fscache_stat(&fscache_n_op_pend);
 193                        fscache_start_operations(object);
 194                } else {
 195                        ASSERTCMP(object->n_in_progress, ==, 0);
 196                        fscache_run_op(object, op);
 197                }
 198
 199                /* need to issue a new write op after this */
 200                clear_bit(FSCACHE_OBJECT_PENDING_WRITE, &object->flags);
 201                ret = 0;
 202        } else if (flags & BIT(FSCACHE_OBJECT_IS_LOOKED_UP)) {
 203                op->object = object;
 204                object->n_ops++;
 205                object->n_exclusive++;  /* reads and writes must wait */
 206                atomic_inc(&op->usage);
 207                list_add_tail(&op->pend_link, &object->pending_ops);
 208                fscache_stat(&fscache_n_op_pend);
 209                ret = 0;
 210        } else if (flags & BIT(FSCACHE_OBJECT_KILLED_BY_CACHE)) {
 211                op->cancel(op);
 212                op->state = FSCACHE_OP_ST_CANCELLED;
 213                ret = -ENOBUFS;
 214        } else {
 215                fscache_report_unexpected_submission(object, op, ostate);
 216                op->cancel(op);
 217                op->state = FSCACHE_OP_ST_CANCELLED;
 218                ret = -ENOBUFS;
 219        }
 220
 221        spin_unlock(&object->lock);
 222        return ret;
 223}
 224
 225/*
 226 * submit an operation for an object
 227 * - objects may be submitted only in the following states:
 228 *   - during object creation (write ops may be submitted)
 229 *   - whilst the object is active
 230 *   - after an I/O error incurred in one of the two above states (op rejected)
 231 * - this gets any extra refs it needs on an op
 232 */
 233int fscache_submit_op(struct fscache_object *object,
 234                      struct fscache_operation *op)
 235{
 236        const struct fscache_state *ostate;
 237        unsigned long flags;
 238        int ret;
 239
 240        _enter("{OBJ%x OP%x},{%u}",
 241               object->debug_id, op->debug_id, atomic_read(&op->usage));
 242
 243        ASSERTCMP(op->state, ==, FSCACHE_OP_ST_INITIALISED);
 244        ASSERTCMP(atomic_read(&op->usage), >, 0);
 245
 246        spin_lock(&object->lock);
 247        ASSERTCMP(object->n_ops, >=, object->n_in_progress);
 248        ASSERTCMP(object->n_ops, >=, object->n_exclusive);
 249        ASSERT(list_empty(&op->pend_link));
 250
 251        ostate = object->state;
 252        smp_rmb();
 253
 254        op->state = FSCACHE_OP_ST_PENDING;
 255        flags = READ_ONCE(object->flags);
 256        if (unlikely(!(flags & BIT(FSCACHE_OBJECT_IS_LIVE)))) {
 257                fscache_stat(&fscache_n_op_rejected);
 258                op->cancel(op);
 259                op->state = FSCACHE_OP_ST_CANCELLED;
 260                ret = -ENOBUFS;
 261        } else if (unlikely(fscache_cache_is_broken(object))) {
 262                op->cancel(op);
 263                op->state = FSCACHE_OP_ST_CANCELLED;
 264                ret = -EIO;
 265        } else if (flags & BIT(FSCACHE_OBJECT_IS_AVAILABLE)) {
 266                op->object = object;
 267                object->n_ops++;
 268
 269                if (object->n_exclusive > 0) {
 270                        atomic_inc(&op->usage);
 271                        list_add_tail(&op->pend_link, &object->pending_ops);
 272                        fscache_stat(&fscache_n_op_pend);
 273                } else if (!list_empty(&object->pending_ops)) {
 274                        atomic_inc(&op->usage);
 275                        list_add_tail(&op->pend_link, &object->pending_ops);
 276                        fscache_stat(&fscache_n_op_pend);
 277                        fscache_start_operations(object);
 278                } else {
 279                        ASSERTCMP(object->n_exclusive, ==, 0);
 280                        fscache_run_op(object, op);
 281                }
 282                ret = 0;
 283        } else if (flags & BIT(FSCACHE_OBJECT_IS_LOOKED_UP)) {
 284                op->object = object;
 285                object->n_ops++;
 286                atomic_inc(&op->usage);
 287                list_add_tail(&op->pend_link, &object->pending_ops);
 288                fscache_stat(&fscache_n_op_pend);
 289                ret = 0;
 290        } else if (flags & BIT(FSCACHE_OBJECT_KILLED_BY_CACHE)) {
 291                op->cancel(op);
 292                op->state = FSCACHE_OP_ST_CANCELLED;
 293                ret = -ENOBUFS;
 294        } else {
 295                fscache_report_unexpected_submission(object, op, ostate);
 296                ASSERT(!fscache_object_is_active(object));
 297                op->cancel(op);
 298                op->state = FSCACHE_OP_ST_CANCELLED;
 299                ret = -ENOBUFS;
 300        }
 301
 302        spin_unlock(&object->lock);
 303        return ret;
 304}
 305
 306/*
 307 * queue an object for withdrawal on error, aborting all following asynchronous
 308 * operations
 309 */
 310void fscache_abort_object(struct fscache_object *object)
 311{
 312        _enter("{OBJ%x}", object->debug_id);
 313
 314        fscache_raise_event(object, FSCACHE_OBJECT_EV_ERROR);
 315}
 316
 317/*
 318 * Jump start the operation processing on an object.  The caller must hold
 319 * object->lock.
 320 */
 321void fscache_start_operations(struct fscache_object *object)
 322{
 323        struct fscache_operation *op;
 324        bool stop = false;
 325
 326        while (!list_empty(&object->pending_ops) && !stop) {
 327                op = list_entry(object->pending_ops.next,
 328                                struct fscache_operation, pend_link);
 329
 330                if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
 331                        if (object->n_in_progress > 0)
 332                                break;
 333                        stop = true;
 334                }
 335                list_del_init(&op->pend_link);
 336                fscache_run_op(object, op);
 337
 338                /* the pending queue was holding a ref on the object */
 339                fscache_put_operation(op);
 340        }
 341
 342        ASSERTCMP(object->n_in_progress, <=, object->n_ops);
 343
 344        _debug("woke %d ops on OBJ%x",
 345               object->n_in_progress, object->debug_id);
 346}
 347
 348/*
 349 * cancel an operation that's pending on an object
 350 */
 351int fscache_cancel_op(struct fscache_operation *op,
 352                      bool cancel_in_progress_op)
 353{
 354        struct fscache_object *object = op->object;
 355        bool put = false;
 356        int ret;
 357
 358        _enter("OBJ%x OP%x}", op->object->debug_id, op->debug_id);
 359
 360        ASSERTCMP(op->state, >=, FSCACHE_OP_ST_PENDING);
 361        ASSERTCMP(op->state, !=, FSCACHE_OP_ST_CANCELLED);
 362        ASSERTCMP(atomic_read(&op->usage), >, 0);
 363
 364        spin_lock(&object->lock);
 365
 366        ret = -EBUSY;
 367        if (op->state == FSCACHE_OP_ST_PENDING) {
 368                ASSERT(!list_empty(&op->pend_link));
 369                list_del_init(&op->pend_link);
 370                put = true;
 371
 372                fscache_stat(&fscache_n_op_cancelled);
 373                op->cancel(op);
 374                op->state = FSCACHE_OP_ST_CANCELLED;
 375                if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
 376                        object->n_exclusive--;
 377                if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
 378                        wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
 379                ret = 0;
 380        } else if (op->state == FSCACHE_OP_ST_IN_PROGRESS && cancel_in_progress_op) {
 381                ASSERTCMP(object->n_in_progress, >, 0);
 382                if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
 383                        object->n_exclusive--;
 384                object->n_in_progress--;
 385                if (object->n_in_progress == 0)
 386                        fscache_start_operations(object);
 387
 388                fscache_stat(&fscache_n_op_cancelled);
 389                op->cancel(op);
 390                op->state = FSCACHE_OP_ST_CANCELLED;
 391                if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
 392                        object->n_exclusive--;
 393                if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
 394                        wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
 395                ret = 0;
 396        }
 397
 398        if (put)
 399                fscache_put_operation(op);
 400        spin_unlock(&object->lock);
 401        _leave(" = %d", ret);
 402        return ret;
 403}
 404
 405/*
 406 * Cancel all pending operations on an object
 407 */
 408void fscache_cancel_all_ops(struct fscache_object *object)
 409{
 410        struct fscache_operation *op;
 411
 412        _enter("OBJ%x", object->debug_id);
 413
 414        spin_lock(&object->lock);
 415
 416        while (!list_empty(&object->pending_ops)) {
 417                op = list_entry(object->pending_ops.next,
 418                                struct fscache_operation, pend_link);
 419                fscache_stat(&fscache_n_op_cancelled);
 420                list_del_init(&op->pend_link);
 421
 422                ASSERTCMP(op->state, ==, FSCACHE_OP_ST_PENDING);
 423                op->cancel(op);
 424                op->state = FSCACHE_OP_ST_CANCELLED;
 425
 426                if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
 427                        object->n_exclusive--;
 428                if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
 429                        wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
 430                fscache_put_operation(op);
 431                cond_resched_lock(&object->lock);
 432        }
 433
 434        spin_unlock(&object->lock);
 435        _leave("");
 436}
 437
 438/*
 439 * Record the completion or cancellation of an in-progress operation.
 440 */
 441void fscache_op_complete(struct fscache_operation *op, bool cancelled)
 442{
 443        struct fscache_object *object = op->object;
 444
 445        _enter("OBJ%x", object->debug_id);
 446
 447        ASSERTCMP(op->state, ==, FSCACHE_OP_ST_IN_PROGRESS);
 448        ASSERTCMP(object->n_in_progress, >, 0);
 449        ASSERTIFCMP(test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags),
 450                    object->n_exclusive, >, 0);
 451        ASSERTIFCMP(test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags),
 452                    object->n_in_progress, ==, 1);
 453
 454        spin_lock(&object->lock);
 455
 456        if (!cancelled) {
 457                op->state = FSCACHE_OP_ST_COMPLETE;
 458        } else {
 459                op->cancel(op);
 460                op->state = FSCACHE_OP_ST_CANCELLED;
 461        }
 462
 463        if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
 464                object->n_exclusive--;
 465        object->n_in_progress--;
 466        if (object->n_in_progress == 0)
 467                fscache_start_operations(object);
 468
 469        spin_unlock(&object->lock);
 470        _leave("");
 471}
 472EXPORT_SYMBOL(fscache_op_complete);
 473
 474/*
 475 * release an operation
 476 * - queues pending ops if this is the last in-progress op
 477 */
 478void fscache_put_operation(struct fscache_operation *op)
 479{
 480        struct fscache_object *object;
 481        struct fscache_cache *cache;
 482
 483        _enter("{OBJ%x OP%x,%d}",
 484               op->object->debug_id, op->debug_id, atomic_read(&op->usage));
 485
 486        ASSERTCMP(atomic_read(&op->usage), >, 0);
 487
 488        if (!atomic_dec_and_test(&op->usage))
 489                return;
 490
 491        _debug("PUT OP");
 492        ASSERTIFCMP(op->state != FSCACHE_OP_ST_INITIALISED &&
 493                    op->state != FSCACHE_OP_ST_COMPLETE,
 494                    op->state, ==, FSCACHE_OP_ST_CANCELLED);
 495
 496        fscache_stat(&fscache_n_op_release);
 497
 498        if (op->release) {
 499                op->release(op);
 500                op->release = NULL;
 501        }
 502        op->state = FSCACHE_OP_ST_DEAD;
 503
 504        object = op->object;
 505        if (likely(object)) {
 506                if (test_bit(FSCACHE_OP_DEC_READ_CNT, &op->flags))
 507                        atomic_dec(&object->n_reads);
 508                if (test_bit(FSCACHE_OP_UNUSE_COOKIE, &op->flags))
 509                        fscache_unuse_cookie(object);
 510
 511                /* now... we may get called with the object spinlock held, so we
 512                 * complete the cleanup here only if we can immediately acquire the
 513                 * lock, and defer it otherwise */
 514                if (!spin_trylock(&object->lock)) {
 515                        _debug("defer put");
 516                        fscache_stat(&fscache_n_op_deferred_release);
 517
 518                        cache = object->cache;
 519                        spin_lock(&cache->op_gc_list_lock);
 520                        list_add_tail(&op->pend_link, &cache->op_gc_list);
 521                        spin_unlock(&cache->op_gc_list_lock);
 522                        schedule_work(&cache->op_gc);
 523                        _leave(" [defer]");
 524                        return;
 525                }
 526
 527                ASSERTCMP(object->n_ops, >, 0);
 528                object->n_ops--;
 529                if (object->n_ops == 0)
 530                        fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED);
 531
 532                spin_unlock(&object->lock);
 533        }
 534
 535        kfree(op);
 536        _leave(" [done]");
 537}
 538EXPORT_SYMBOL(fscache_put_operation);
 539
 540/*
 541 * garbage collect operations that have had their release deferred
 542 */
 543void fscache_operation_gc(struct work_struct *work)
 544{
 545        struct fscache_operation *op;
 546        struct fscache_object *object;
 547        struct fscache_cache *cache =
 548                container_of(work, struct fscache_cache, op_gc);
 549        int count = 0;
 550
 551        _enter("");
 552
 553        do {
 554                spin_lock(&cache->op_gc_list_lock);
 555                if (list_empty(&cache->op_gc_list)) {
 556                        spin_unlock(&cache->op_gc_list_lock);
 557                        break;
 558                }
 559
 560                op = list_entry(cache->op_gc_list.next,
 561                                struct fscache_operation, pend_link);
 562                list_del(&op->pend_link);
 563                spin_unlock(&cache->op_gc_list_lock);
 564
 565                object = op->object;
 566                spin_lock(&object->lock);
 567
 568                _debug("GC DEFERRED REL OBJ%x OP%x",
 569                       object->debug_id, op->debug_id);
 570                fscache_stat(&fscache_n_op_gc);
 571
 572                ASSERTCMP(atomic_read(&op->usage), ==, 0);
 573                ASSERTCMP(op->state, ==, FSCACHE_OP_ST_DEAD);
 574
 575                ASSERTCMP(object->n_ops, >, 0);
 576                object->n_ops--;
 577                if (object->n_ops == 0)
 578                        fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED);
 579
 580                spin_unlock(&object->lock);
 581                kfree(op);
 582
 583        } while (count++ < 20);
 584
 585        if (!list_empty(&cache->op_gc_list))
 586                schedule_work(&cache->op_gc);
 587
 588        _leave("");
 589}
 590
 591/*
 592 * execute an operation using fs_op_wq to provide processing context -
 593 * the caller holds a ref to this object, so we don't need to hold one
 594 */
 595void fscache_op_work_func(struct work_struct *work)
 596{
 597        struct fscache_operation *op =
 598                container_of(work, struct fscache_operation, work);
 599        unsigned long start;
 600
 601        _enter("{OBJ%x OP%x,%d}",
 602               op->object->debug_id, op->debug_id, atomic_read(&op->usage));
 603
 604        ASSERT(op->processor != NULL);
 605        start = jiffies;
 606        op->processor(op);
 607        fscache_hist(fscache_ops_histogram, start);
 608        fscache_put_operation(op);
 609
 610        _leave("");
 611}
 612