linux/drivers/md/persistent-data/dm-block-manager.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2011 Red Hat, Inc.
   3 *
   4 * This file is released under the GPL.
   5 */
   6#include "dm-block-manager.h"
   7#include "dm-persistent-data-internal.h"
   8#include "../dm-bufio.h"
   9
  10#include <linux/crc32c.h>
  11#include <linux/module.h>
  12#include <linux/slab.h>
  13#include <linux/rwsem.h>
  14#include <linux/device-mapper.h>
  15#include <linux/stacktrace.h>
  16
  17#define DM_MSG_PREFIX "block manager"
  18
  19/*----------------------------------------------------------------*/
  20
  21/*
  22 * This is a read/write semaphore with a couple of differences.
  23 *
  24 * i) There is a restriction on the number of concurrent read locks that
  25 * may be held at once.  This is just an implementation detail.
  26 *
  27 * ii) Recursive locking attempts are detected and return EINVAL.  A stack
  28 * trace is also emitted for the previous lock acquisition.
  29 *
  30 * iii) Priority is given to write locks.
  31 */
  32#define MAX_HOLDERS 4
  33#define MAX_STACK 10
  34
  35typedef unsigned long stack_entries[MAX_STACK];
  36
  37struct block_lock {
  38        spinlock_t lock;
  39        __s32 count;
  40        struct list_head waiters;
  41        struct task_struct *holders[MAX_HOLDERS];
  42
  43#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  44        struct stack_trace traces[MAX_HOLDERS];
  45        stack_entries entries[MAX_HOLDERS];
  46#endif
  47};
  48
  49struct waiter {
  50        struct list_head list;
  51        struct task_struct *task;
  52        int wants_write;
  53};
  54
  55static unsigned __find_holder(struct block_lock *lock,
  56                              struct task_struct *task)
  57{
  58        unsigned i;
  59
  60        for (i = 0; i < MAX_HOLDERS; i++)
  61                if (lock->holders[i] == task)
  62                        break;
  63
  64        BUG_ON(i == MAX_HOLDERS);
  65        return i;
  66}
  67
  68/* call this *after* you increment lock->count */
  69static void __add_holder(struct block_lock *lock, struct task_struct *task)
  70{
  71        unsigned h = __find_holder(lock, NULL);
  72#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  73        struct stack_trace *t;
  74#endif
  75
  76        get_task_struct(task);
  77        lock->holders[h] = task;
  78
  79#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  80        t = lock->traces + h;
  81        t->nr_entries = 0;
  82        t->max_entries = MAX_STACK;
  83        t->entries = lock->entries[h];
  84        t->skip = 2;
  85        save_stack_trace(t);
  86#endif
  87}
  88
  89/* call this *before* you decrement lock->count */
  90static void __del_holder(struct block_lock *lock, struct task_struct *task)
  91{
  92        unsigned h = __find_holder(lock, task);
  93        lock->holders[h] = NULL;
  94        put_task_struct(task);
  95}
  96
  97static int __check_holder(struct block_lock *lock)
  98{
  99        unsigned i;
 100#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
 101        static struct stack_trace t;
 102        static stack_entries entries;
 103#endif
 104
 105        for (i = 0; i < MAX_HOLDERS; i++) {
 106                if (lock->holders[i] == current) {
 107                        DMERR("recursive lock detected in metadata");
 108#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
 109                        DMERR("previously held here:");
 110                        print_stack_trace(lock->traces + i, 4);
 111
 112                        DMERR("subsequent acquisition attempted here:");
 113                        t.nr_entries = 0;
 114                        t.max_entries = MAX_STACK;
 115                        t.entries = entries;
 116                        t.skip = 3;
 117                        save_stack_trace(&t);
 118                        print_stack_trace(&t, 4);
 119#endif
 120                        return -EINVAL;
 121                }
 122        }
 123
 124        return 0;
 125}
 126
 127static void __wait(struct waiter *w)
 128{
 129        for (;;) {
 130                set_task_state(current, TASK_UNINTERRUPTIBLE);
 131
 132                if (!w->task)
 133                        break;
 134
 135                schedule();
 136        }
 137
 138        set_task_state(current, TASK_RUNNING);
 139}
 140
 141static void __wake_waiter(struct waiter *w)
 142{
 143        struct task_struct *task;
 144
 145        list_del(&w->list);
 146        task = w->task;
 147        smp_mb();
 148        w->task = NULL;
 149        wake_up_process(task);
 150}
 151
 152/*
 153 * We either wake a few readers or a single writer.
 154 */
 155static void __wake_many(struct block_lock *lock)
 156{
 157        struct waiter *w, *tmp;
 158
 159        BUG_ON(lock->count < 0);
 160        list_for_each_entry_safe(w, tmp, &lock->waiters, list) {
 161                if (lock->count >= MAX_HOLDERS)
 162                        return;
 163
 164                if (w->wants_write) {
 165                        if (lock->count > 0)
 166                                return; /* still read locked */
 167
 168                        lock->count = -1;
 169                        __add_holder(lock, w->task);
 170                        __wake_waiter(w);
 171                        return;
 172                }
 173
 174                lock->count++;
 175                __add_holder(lock, w->task);
 176                __wake_waiter(w);
 177        }
 178}
 179
 180static void bl_init(struct block_lock *lock)
 181{
 182        int i;
 183
 184        spin_lock_init(&lock->lock);
 185        lock->count = 0;
 186        INIT_LIST_HEAD(&lock->waiters);
 187        for (i = 0; i < MAX_HOLDERS; i++)
 188                lock->holders[i] = NULL;
 189}
 190
 191static int __available_for_read(struct block_lock *lock)
 192{
 193        return lock->count >= 0 &&
 194                lock->count < MAX_HOLDERS &&
 195                list_empty(&lock->waiters);
 196}
 197
 198static int bl_down_read(struct block_lock *lock)
 199{
 200        int r;
 201        struct waiter w;
 202
 203        spin_lock(&lock->lock);
 204        r = __check_holder(lock);
 205        if (r) {
 206                spin_unlock(&lock->lock);
 207                return r;
 208        }
 209
 210        if (__available_for_read(lock)) {
 211                lock->count++;
 212                __add_holder(lock, current);
 213                spin_unlock(&lock->lock);
 214                return 0;
 215        }
 216
 217        get_task_struct(current);
 218
 219        w.task = current;
 220        w.wants_write = 0;
 221        list_add_tail(&w.list, &lock->waiters);
 222        spin_unlock(&lock->lock);
 223
 224        __wait(&w);
 225        put_task_struct(current);
 226        return 0;
 227}
 228
 229static int bl_down_read_nonblock(struct block_lock *lock)
 230{
 231        int r;
 232
 233        spin_lock(&lock->lock);
 234        r = __check_holder(lock);
 235        if (r)
 236                goto out;
 237
 238        if (__available_for_read(lock)) {
 239                lock->count++;
 240                __add_holder(lock, current);
 241                r = 0;
 242        } else
 243                r = -EWOULDBLOCK;
 244
 245out:
 246        spin_unlock(&lock->lock);
 247        return r;
 248}
 249
 250static void bl_up_read(struct block_lock *lock)
 251{
 252        spin_lock(&lock->lock);
 253        BUG_ON(lock->count <= 0);
 254        __del_holder(lock, current);
 255        --lock->count;
 256        if (!list_empty(&lock->waiters))
 257                __wake_many(lock);
 258        spin_unlock(&lock->lock);
 259}
 260
 261static int bl_down_write(struct block_lock *lock)
 262{
 263        int r;
 264        struct waiter w;
 265
 266        spin_lock(&lock->lock);
 267        r = __check_holder(lock);
 268        if (r) {
 269                spin_unlock(&lock->lock);
 270                return r;
 271        }
 272
 273        if (lock->count == 0 && list_empty(&lock->waiters)) {
 274                lock->count = -1;
 275                __add_holder(lock, current);
 276                spin_unlock(&lock->lock);
 277                return 0;
 278        }
 279
 280        get_task_struct(current);
 281        w.task = current;
 282        w.wants_write = 1;
 283
 284        /*
 285         * Writers given priority. We know there's only one mutator in the
 286         * system, so ignoring the ordering reversal.
 287         */
 288        list_add(&w.list, &lock->waiters);
 289        spin_unlock(&lock->lock);
 290
 291        __wait(&w);
 292        put_task_struct(current);
 293
 294        return 0;
 295}
 296
 297static void bl_up_write(struct block_lock *lock)
 298{
 299        spin_lock(&lock->lock);
 300        __del_holder(lock, current);
 301        lock->count = 0;
 302        if (!list_empty(&lock->waiters))
 303                __wake_many(lock);
 304        spin_unlock(&lock->lock);
 305}
 306
 307static void report_recursive_bug(dm_block_t b, int r)
 308{
 309        if (r == -EINVAL)
 310                DMERR("recursive acquisition of block %llu requested.",
 311                      (unsigned long long) b);
 312}
 313
 314/*----------------------------------------------------------------*/
 315
 316/*
 317 * Block manager is currently implemented using dm-bufio.  struct
 318 * dm_block_manager and struct dm_block map directly onto a couple of
 319 * structs in the bufio interface.  I want to retain the freedom to move
 320 * away from bufio in the future.  So these structs are just cast within
 321 * this .c file, rather than making it through to the public interface.
 322 */
 323static struct dm_buffer *to_buffer(struct dm_block *b)
 324{
 325        return (struct dm_buffer *) b;
 326}
 327
 328dm_block_t dm_block_location(struct dm_block *b)
 329{
 330        return dm_bufio_get_block_number(to_buffer(b));
 331}
 332EXPORT_SYMBOL_GPL(dm_block_location);
 333
 334void *dm_block_data(struct dm_block *b)
 335{
 336        return dm_bufio_get_block_data(to_buffer(b));
 337}
 338EXPORT_SYMBOL_GPL(dm_block_data);
 339
 340struct buffer_aux {
 341        struct dm_block_validator *validator;
 342        struct block_lock lock;
 343        int write_locked;
 344};
 345
 346static void dm_block_manager_alloc_callback(struct dm_buffer *buf)
 347{
 348        struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
 349        aux->validator = NULL;
 350        bl_init(&aux->lock);
 351}
 352
 353static void dm_block_manager_write_callback(struct dm_buffer *buf)
 354{
 355        struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
 356        if (aux->validator) {
 357                aux->validator->prepare_for_write(aux->validator, (struct dm_block *) buf,
 358                         dm_bufio_get_block_size(dm_bufio_get_client(buf)));
 359        }
 360}
 361
 362/*----------------------------------------------------------------
 363 * Public interface
 364 *--------------------------------------------------------------*/
 365struct dm_block_manager {
 366        struct dm_bufio_client *bufio;
 367        bool read_only:1;
 368};
 369
 370struct dm_block_manager *dm_block_manager_create(struct block_device *bdev,
 371                                                 unsigned block_size,
 372                                                 unsigned cache_size,
 373                                                 unsigned max_held_per_thread)
 374{
 375        int r;
 376        struct dm_block_manager *bm;
 377
 378        bm = kmalloc(sizeof(*bm), GFP_KERNEL);
 379        if (!bm) {
 380                r = -ENOMEM;
 381                goto bad;
 382        }
 383
 384        bm->bufio = dm_bufio_client_create(bdev, block_size, max_held_per_thread,
 385                                           sizeof(struct buffer_aux),
 386                                           dm_block_manager_alloc_callback,
 387                                           dm_block_manager_write_callback);
 388        if (IS_ERR(bm->bufio)) {
 389                r = PTR_ERR(bm->bufio);
 390                kfree(bm);
 391                goto bad;
 392        }
 393
 394        bm->read_only = false;
 395
 396        return bm;
 397
 398bad:
 399        return ERR_PTR(r);
 400}
 401EXPORT_SYMBOL_GPL(dm_block_manager_create);
 402
 403void dm_block_manager_destroy(struct dm_block_manager *bm)
 404{
 405        dm_bufio_client_destroy(bm->bufio);
 406        kfree(bm);
 407}
 408EXPORT_SYMBOL_GPL(dm_block_manager_destroy);
 409
 410unsigned dm_bm_block_size(struct dm_block_manager *bm)
 411{
 412        return dm_bufio_get_block_size(bm->bufio);
 413}
 414EXPORT_SYMBOL_GPL(dm_bm_block_size);
 415
 416dm_block_t dm_bm_nr_blocks(struct dm_block_manager *bm)
 417{
 418        return dm_bufio_get_device_size(bm->bufio);
 419}
 420
 421static int dm_bm_validate_buffer(struct dm_block_manager *bm,
 422                                 struct dm_buffer *buf,
 423                                 struct buffer_aux *aux,
 424                                 struct dm_block_validator *v)
 425{
 426        if (unlikely(!aux->validator)) {
 427                int r;
 428                if (!v)
 429                        return 0;
 430                r = v->check(v, (struct dm_block *) buf, dm_bufio_get_block_size(bm->bufio));
 431                if (unlikely(r)) {
 432                        DMERR_LIMIT("%s validator check failed for block %llu", v->name,
 433                                    (unsigned long long) dm_bufio_get_block_number(buf));
 434                        return r;
 435                }
 436                aux->validator = v;
 437        } else {
 438                if (unlikely(aux->validator != v)) {
 439                        DMERR_LIMIT("validator mismatch (old=%s vs new=%s) for block %llu",
 440                                    aux->validator->name, v ? v->name : "NULL",
 441                                    (unsigned long long) dm_bufio_get_block_number(buf));
 442                        return -EINVAL;
 443                }
 444        }
 445
 446        return 0;
 447}
 448int dm_bm_read_lock(struct dm_block_manager *bm, dm_block_t b,
 449                    struct dm_block_validator *v,
 450                    struct dm_block **result)
 451{
 452        struct buffer_aux *aux;
 453        void *p;
 454        int r;
 455
 456        p = dm_bufio_read(bm->bufio, b, (struct dm_buffer **) result);
 457        if (unlikely(IS_ERR(p)))
 458                return PTR_ERR(p);
 459
 460        aux = dm_bufio_get_aux_data(to_buffer(*result));
 461        r = bl_down_read(&aux->lock);
 462        if (unlikely(r)) {
 463                dm_bufio_release(to_buffer(*result));
 464                report_recursive_bug(b, r);
 465                return r;
 466        }
 467
 468        aux->write_locked = 0;
 469
 470        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 471        if (unlikely(r)) {
 472                bl_up_read(&aux->lock);
 473                dm_bufio_release(to_buffer(*result));
 474                return r;
 475        }
 476
 477        return 0;
 478}
 479EXPORT_SYMBOL_GPL(dm_bm_read_lock);
 480
 481int dm_bm_write_lock(struct dm_block_manager *bm,
 482                     dm_block_t b, struct dm_block_validator *v,
 483                     struct dm_block **result)
 484{
 485        struct buffer_aux *aux;
 486        void *p;
 487        int r;
 488
 489        if (bm->read_only)
 490                return -EPERM;
 491
 492        p = dm_bufio_read(bm->bufio, b, (struct dm_buffer **) result);
 493        if (unlikely(IS_ERR(p)))
 494                return PTR_ERR(p);
 495
 496        aux = dm_bufio_get_aux_data(to_buffer(*result));
 497        r = bl_down_write(&aux->lock);
 498        if (r) {
 499                dm_bufio_release(to_buffer(*result));
 500                report_recursive_bug(b, r);
 501                return r;
 502        }
 503
 504        aux->write_locked = 1;
 505
 506        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 507        if (unlikely(r)) {
 508                bl_up_write(&aux->lock);
 509                dm_bufio_release(to_buffer(*result));
 510                return r;
 511        }
 512
 513        return 0;
 514}
 515EXPORT_SYMBOL_GPL(dm_bm_write_lock);
 516
 517int dm_bm_read_try_lock(struct dm_block_manager *bm,
 518                        dm_block_t b, struct dm_block_validator *v,
 519                        struct dm_block **result)
 520{
 521        struct buffer_aux *aux;
 522        void *p;
 523        int r;
 524
 525        p = dm_bufio_get(bm->bufio, b, (struct dm_buffer **) result);
 526        if (unlikely(IS_ERR(p)))
 527                return PTR_ERR(p);
 528        if (unlikely(!p))
 529                return -EWOULDBLOCK;
 530
 531        aux = dm_bufio_get_aux_data(to_buffer(*result));
 532        r = bl_down_read_nonblock(&aux->lock);
 533        if (r < 0) {
 534                dm_bufio_release(to_buffer(*result));
 535                report_recursive_bug(b, r);
 536                return r;
 537        }
 538        aux->write_locked = 0;
 539
 540        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 541        if (unlikely(r)) {
 542                bl_up_read(&aux->lock);
 543                dm_bufio_release(to_buffer(*result));
 544                return r;
 545        }
 546
 547        return 0;
 548}
 549
 550int dm_bm_write_lock_zero(struct dm_block_manager *bm,
 551                          dm_block_t b, struct dm_block_validator *v,
 552                          struct dm_block **result)
 553{
 554        int r;
 555        struct buffer_aux *aux;
 556        void *p;
 557
 558        if (bm->read_only)
 559                return -EPERM;
 560
 561        p = dm_bufio_new(bm->bufio, b, (struct dm_buffer **) result);
 562        if (unlikely(IS_ERR(p)))
 563                return PTR_ERR(p);
 564
 565        memset(p, 0, dm_bm_block_size(bm));
 566
 567        aux = dm_bufio_get_aux_data(to_buffer(*result));
 568        r = bl_down_write(&aux->lock);
 569        if (r) {
 570                dm_bufio_release(to_buffer(*result));
 571                return r;
 572        }
 573
 574        aux->write_locked = 1;
 575        aux->validator = v;
 576
 577        return 0;
 578}
 579EXPORT_SYMBOL_GPL(dm_bm_write_lock_zero);
 580
 581int dm_bm_unlock(struct dm_block *b)
 582{
 583        struct buffer_aux *aux;
 584        aux = dm_bufio_get_aux_data(to_buffer(b));
 585
 586        if (aux->write_locked) {
 587                dm_bufio_mark_buffer_dirty(to_buffer(b));
 588                bl_up_write(&aux->lock);
 589        } else
 590                bl_up_read(&aux->lock);
 591
 592        dm_bufio_release(to_buffer(b));
 593
 594        return 0;
 595}
 596EXPORT_SYMBOL_GPL(dm_bm_unlock);
 597
 598int dm_bm_flush_and_unlock(struct dm_block_manager *bm,
 599                           struct dm_block *superblock)
 600{
 601        int r;
 602
 603        if (bm->read_only)
 604                return -EPERM;
 605
 606        r = dm_bufio_write_dirty_buffers(bm->bufio);
 607        if (unlikely(r)) {
 608                dm_bm_unlock(superblock);
 609                return r;
 610        }
 611
 612        dm_bm_unlock(superblock);
 613
 614        return dm_bufio_write_dirty_buffers(bm->bufio);
 615}
 616EXPORT_SYMBOL_GPL(dm_bm_flush_and_unlock);
 617
 618void dm_bm_prefetch(struct dm_block_manager *bm, dm_block_t b)
 619{
 620        dm_bufio_prefetch(bm->bufio, b, 1);
 621}
 622
 623void dm_bm_set_read_only(struct dm_block_manager *bm)
 624{
 625        bm->read_only = true;
 626}
 627EXPORT_SYMBOL_GPL(dm_bm_set_read_only);
 628
 629void dm_bm_set_read_write(struct dm_block_manager *bm)
 630{
 631        bm->read_only = false;
 632}
 633EXPORT_SYMBOL_GPL(dm_bm_set_read_write);
 634
 635u32 dm_bm_checksum(const void *data, size_t len, u32 init_xor)
 636{
 637        return crc32c(~(u32) 0, data, len) ^ init_xor;
 638}
 639EXPORT_SYMBOL_GPL(dm_bm_checksum);
 640
 641/*----------------------------------------------------------------*/
 642
 643MODULE_LICENSE("GPL");
 644MODULE_AUTHOR("Joe Thornber <dm-devel@redhat.com>");
 645MODULE_DESCRIPTION("Immutable metadata library for dm");
 646
 647/*----------------------------------------------------------------*/
 648