linux/drivers/md/persistent-data/dm-block-manager.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2011 Red Hat, Inc.
   3 *
   4 * This file is released under the GPL.
   5 */
   6#include "dm-block-manager.h"
   7#include "dm-persistent-data-internal.h"
   8#include "../dm-bufio.h"
   9
  10#include <linux/crc32c.h>
  11#include <linux/module.h>
  12#include <linux/slab.h>
  13#include <linux/rwsem.h>
  14#include <linux/device-mapper.h>
  15#include <linux/stacktrace.h>
  16
  17#define DM_MSG_PREFIX "block manager"
  18
  19/*----------------------------------------------------------------*/
  20
  21/*
  22 * This is a read/write semaphore with a couple of differences.
  23 *
  24 * i) There is a restriction on the number of concurrent read locks that
  25 * may be held at once.  This is just an implementation detail.
  26 *
  27 * ii) Recursive locking attempts are detected and return EINVAL.  A stack
  28 * trace is also emitted for the previous lock aquisition.
  29 *
  30 * iii) Priority is given to write locks.
  31 */
  32#define MAX_HOLDERS 4
  33#define MAX_STACK 10
  34
  35typedef unsigned long stack_entries[MAX_STACK];
  36
  37struct block_lock {
  38        spinlock_t lock;
  39        __s32 count;
  40        struct list_head waiters;
  41        struct task_struct *holders[MAX_HOLDERS];
  42
  43#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  44        struct stack_trace traces[MAX_HOLDERS];
  45        stack_entries entries[MAX_HOLDERS];
  46#endif
  47};
  48
  49struct waiter {
  50        struct list_head list;
  51        struct task_struct *task;
  52        int wants_write;
  53};
  54
  55static unsigned __find_holder(struct block_lock *lock,
  56                              struct task_struct *task)
  57{
  58        unsigned i;
  59
  60        for (i = 0; i < MAX_HOLDERS; i++)
  61                if (lock->holders[i] == task)
  62                        break;
  63
  64        BUG_ON(i == MAX_HOLDERS);
  65        return i;
  66}
  67
  68/* call this *after* you increment lock->count */
  69static void __add_holder(struct block_lock *lock, struct task_struct *task)
  70{
  71        unsigned h = __find_holder(lock, NULL);
  72#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  73        struct stack_trace *t;
  74#endif
  75
  76        get_task_struct(task);
  77        lock->holders[h] = task;
  78
  79#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  80        t = lock->traces + h;
  81        t->nr_entries = 0;
  82        t->max_entries = MAX_STACK;
  83        t->entries = lock->entries[h];
  84        t->skip = 2;
  85        save_stack_trace(t);
  86#endif
  87}
  88
  89/* call this *before* you decrement lock->count */
  90static void __del_holder(struct block_lock *lock, struct task_struct *task)
  91{
  92        unsigned h = __find_holder(lock, task);
  93        lock->holders[h] = NULL;
  94        put_task_struct(task);
  95}
  96
  97static int __check_holder(struct block_lock *lock)
  98{
  99        unsigned i;
 100#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
 101        static struct stack_trace t;
 102        static stack_entries entries;
 103#endif
 104
 105        for (i = 0; i < MAX_HOLDERS; i++) {
 106                if (lock->holders[i] == current) {
 107                        DMERR("recursive lock detected in pool metadata");
 108#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
 109                        DMERR("previously held here:");
 110                        print_stack_trace(lock->traces + i, 4);
 111
 112                        DMERR("subsequent aquisition attempted here:");
 113                        t.nr_entries = 0;
 114                        t.max_entries = MAX_STACK;
 115                        t.entries = entries;
 116                        t.skip = 3;
 117                        save_stack_trace(&t);
 118                        print_stack_trace(&t, 4);
 119#endif
 120                        return -EINVAL;
 121                }
 122        }
 123
 124        return 0;
 125}
 126
 127static void __wait(struct waiter *w)
 128{
 129        for (;;) {
 130                set_task_state(current, TASK_UNINTERRUPTIBLE);
 131
 132                if (!w->task)
 133                        break;
 134
 135                schedule();
 136        }
 137
 138        set_task_state(current, TASK_RUNNING);
 139}
 140
 141static void __wake_waiter(struct waiter *w)
 142{
 143        struct task_struct *task;
 144
 145        list_del(&w->list);
 146        task = w->task;
 147        smp_mb();
 148        w->task = NULL;
 149        wake_up_process(task);
 150}
 151
 152/*
 153 * We either wake a few readers or a single writer.
 154 */
 155static void __wake_many(struct block_lock *lock)
 156{
 157        struct waiter *w, *tmp;
 158
 159        BUG_ON(lock->count < 0);
 160        list_for_each_entry_safe(w, tmp, &lock->waiters, list) {
 161                if (lock->count >= MAX_HOLDERS)
 162                        return;
 163
 164                if (w->wants_write) {
 165                        if (lock->count > 0)
 166                                return; /* still read locked */
 167
 168                        lock->count = -1;
 169                        __add_holder(lock, w->task);
 170                        __wake_waiter(w);
 171                        return;
 172                }
 173
 174                lock->count++;
 175                __add_holder(lock, w->task);
 176                __wake_waiter(w);
 177        }
 178}
 179
 180static void bl_init(struct block_lock *lock)
 181{
 182        int i;
 183
 184        spin_lock_init(&lock->lock);
 185        lock->count = 0;
 186        INIT_LIST_HEAD(&lock->waiters);
 187        for (i = 0; i < MAX_HOLDERS; i++)
 188                lock->holders[i] = NULL;
 189}
 190
 191static int __available_for_read(struct block_lock *lock)
 192{
 193        return lock->count >= 0 &&
 194                lock->count < MAX_HOLDERS &&
 195                list_empty(&lock->waiters);
 196}
 197
 198static int bl_down_read(struct block_lock *lock)
 199{
 200        int r;
 201        struct waiter w;
 202
 203        spin_lock(&lock->lock);
 204        r = __check_holder(lock);
 205        if (r) {
 206                spin_unlock(&lock->lock);
 207                return r;
 208        }
 209
 210        if (__available_for_read(lock)) {
 211                lock->count++;
 212                __add_holder(lock, current);
 213                spin_unlock(&lock->lock);
 214                return 0;
 215        }
 216
 217        get_task_struct(current);
 218
 219        w.task = current;
 220        w.wants_write = 0;
 221        list_add_tail(&w.list, &lock->waiters);
 222        spin_unlock(&lock->lock);
 223
 224        __wait(&w);
 225        put_task_struct(current);
 226        return 0;
 227}
 228
 229static int bl_down_read_nonblock(struct block_lock *lock)
 230{
 231        int r;
 232
 233        spin_lock(&lock->lock);
 234        r = __check_holder(lock);
 235        if (r)
 236                goto out;
 237
 238        if (__available_for_read(lock)) {
 239                lock->count++;
 240                __add_holder(lock, current);
 241                r = 0;
 242        } else
 243                r = -EWOULDBLOCK;
 244
 245out:
 246        spin_unlock(&lock->lock);
 247        return r;
 248}
 249
 250static void bl_up_read(struct block_lock *lock)
 251{
 252        spin_lock(&lock->lock);
 253        BUG_ON(lock->count <= 0);
 254        __del_holder(lock, current);
 255        --lock->count;
 256        if (!list_empty(&lock->waiters))
 257                __wake_many(lock);
 258        spin_unlock(&lock->lock);
 259}
 260
 261static int bl_down_write(struct block_lock *lock)
 262{
 263        int r;
 264        struct waiter w;
 265
 266        spin_lock(&lock->lock);
 267        r = __check_holder(lock);
 268        if (r) {
 269                spin_unlock(&lock->lock);
 270                return r;
 271        }
 272
 273        if (lock->count == 0 && list_empty(&lock->waiters)) {
 274                lock->count = -1;
 275                __add_holder(lock, current);
 276                spin_unlock(&lock->lock);
 277                return 0;
 278        }
 279
 280        get_task_struct(current);
 281        w.task = current;
 282        w.wants_write = 1;
 283
 284        /*
 285         * Writers given priority. We know there's only one mutator in the
 286         * system, so ignoring the ordering reversal.
 287         */
 288        list_add(&w.list, &lock->waiters);
 289        spin_unlock(&lock->lock);
 290
 291        __wait(&w);
 292        put_task_struct(current);
 293
 294        return 0;
 295}
 296
 297static void bl_up_write(struct block_lock *lock)
 298{
 299        spin_lock(&lock->lock);
 300        __del_holder(lock, current);
 301        lock->count = 0;
 302        if (!list_empty(&lock->waiters))
 303                __wake_many(lock);
 304        spin_unlock(&lock->lock);
 305}
 306
 307static void report_recursive_bug(dm_block_t b, int r)
 308{
 309        if (r == -EINVAL)
 310                DMERR("recursive acquisition of block %llu requested.",
 311                      (unsigned long long) b);
 312}
 313
 314/*----------------------------------------------------------------*/
 315
 316/*
 317 * Block manager is currently implemented using dm-bufio.  struct
 318 * dm_block_manager and struct dm_block map directly onto a couple of
 319 * structs in the bufio interface.  I want to retain the freedom to move
 320 * away from bufio in the future.  So these structs are just cast within
 321 * this .c file, rather than making it through to the public interface.
 322 */
 323static struct dm_buffer *to_buffer(struct dm_block *b)
 324{
 325        return (struct dm_buffer *) b;
 326}
 327
 328dm_block_t dm_block_location(struct dm_block *b)
 329{
 330        return dm_bufio_get_block_number(to_buffer(b));
 331}
 332EXPORT_SYMBOL_GPL(dm_block_location);
 333
 334void *dm_block_data(struct dm_block *b)
 335{
 336        return dm_bufio_get_block_data(to_buffer(b));
 337}
 338EXPORT_SYMBOL_GPL(dm_block_data);
 339
 340struct buffer_aux {
 341        struct dm_block_validator *validator;
 342        struct block_lock lock;
 343        int write_locked;
 344};
 345
 346static void dm_block_manager_alloc_callback(struct dm_buffer *buf)
 347{
 348        struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
 349        aux->validator = NULL;
 350        bl_init(&aux->lock);
 351}
 352
 353static void dm_block_manager_write_callback(struct dm_buffer *buf)
 354{
 355        struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
 356        if (aux->validator) {
 357                aux->validator->prepare_for_write(aux->validator, (struct dm_block *) buf,
 358                         dm_bufio_get_block_size(dm_bufio_get_client(buf)));
 359        }
 360}
 361
 362/*----------------------------------------------------------------
 363 * Public interface
 364 *--------------------------------------------------------------*/
 365struct dm_block_manager {
 366        struct dm_bufio_client *bufio;
 367        bool read_only:1;
 368};
 369
 370struct dm_block_manager *dm_block_manager_create(struct block_device *bdev,
 371                                                 unsigned block_size,
 372                                                 unsigned cache_size,
 373                                                 unsigned max_held_per_thread)
 374{
 375        int r;
 376        struct dm_block_manager *bm;
 377
 378        bm = kmalloc(sizeof(*bm), GFP_KERNEL);
 379        if (!bm) {
 380                r = -ENOMEM;
 381                goto bad;
 382        }
 383
 384        bm->bufio = dm_bufio_client_create(bdev, block_size, max_held_per_thread,
 385                                           sizeof(struct buffer_aux),
 386                                           dm_block_manager_alloc_callback,
 387                                           dm_block_manager_write_callback);
 388        if (IS_ERR(bm->bufio)) {
 389                r = PTR_ERR(bm->bufio);
 390                kfree(bm);
 391                goto bad;
 392        }
 393
 394        bm->read_only = false;
 395
 396        return bm;
 397
 398bad:
 399        return ERR_PTR(r);
 400}
 401EXPORT_SYMBOL_GPL(dm_block_manager_create);
 402
 403void dm_block_manager_destroy(struct dm_block_manager *bm)
 404{
 405        dm_bufio_client_destroy(bm->bufio);
 406        kfree(bm);
 407}
 408EXPORT_SYMBOL_GPL(dm_block_manager_destroy);
 409
 410unsigned dm_bm_block_size(struct dm_block_manager *bm)
 411{
 412        return dm_bufio_get_block_size(bm->bufio);
 413}
 414EXPORT_SYMBOL_GPL(dm_bm_block_size);
 415
 416dm_block_t dm_bm_nr_blocks(struct dm_block_manager *bm)
 417{
 418        return dm_bufio_get_device_size(bm->bufio);
 419}
 420
 421static int dm_bm_validate_buffer(struct dm_block_manager *bm,
 422                                 struct dm_buffer *buf,
 423                                 struct buffer_aux *aux,
 424                                 struct dm_block_validator *v)
 425{
 426        if (unlikely(!aux->validator)) {
 427                int r;
 428                if (!v)
 429                        return 0;
 430                r = v->check(v, (struct dm_block *) buf, dm_bufio_get_block_size(bm->bufio));
 431                if (unlikely(r))
 432                        return r;
 433                aux->validator = v;
 434        } else {
 435                if (unlikely(aux->validator != v)) {
 436                        DMERR("validator mismatch (old=%s vs new=%s) for block %llu",
 437                                aux->validator->name, v ? v->name : "NULL",
 438                                (unsigned long long)
 439                                        dm_bufio_get_block_number(buf));
 440                        return -EINVAL;
 441                }
 442        }
 443
 444        return 0;
 445}
 446int dm_bm_read_lock(struct dm_block_manager *bm, dm_block_t b,
 447                    struct dm_block_validator *v,
 448                    struct dm_block **result)
 449{
 450        struct buffer_aux *aux;
 451        void *p;
 452        int r;
 453
 454        p = dm_bufio_read(bm->bufio, b, (struct dm_buffer **) result);
 455        if (unlikely(IS_ERR(p)))
 456                return PTR_ERR(p);
 457
 458        aux = dm_bufio_get_aux_data(to_buffer(*result));
 459        r = bl_down_read(&aux->lock);
 460        if (unlikely(r)) {
 461                dm_bufio_release(to_buffer(*result));
 462                report_recursive_bug(b, r);
 463                return r;
 464        }
 465
 466        aux->write_locked = 0;
 467
 468        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 469        if (unlikely(r)) {
 470                bl_up_read(&aux->lock);
 471                dm_bufio_release(to_buffer(*result));
 472                return r;
 473        }
 474
 475        return 0;
 476}
 477EXPORT_SYMBOL_GPL(dm_bm_read_lock);
 478
 479int dm_bm_write_lock(struct dm_block_manager *bm,
 480                     dm_block_t b, struct dm_block_validator *v,
 481                     struct dm_block **result)
 482{
 483        struct buffer_aux *aux;
 484        void *p;
 485        int r;
 486
 487        if (bm->read_only)
 488                return -EPERM;
 489
 490        p = dm_bufio_read(bm->bufio, b, (struct dm_buffer **) result);
 491        if (unlikely(IS_ERR(p)))
 492                return PTR_ERR(p);
 493
 494        aux = dm_bufio_get_aux_data(to_buffer(*result));
 495        r = bl_down_write(&aux->lock);
 496        if (r) {
 497                dm_bufio_release(to_buffer(*result));
 498                report_recursive_bug(b, r);
 499                return r;
 500        }
 501
 502        aux->write_locked = 1;
 503
 504        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 505        if (unlikely(r)) {
 506                bl_up_write(&aux->lock);
 507                dm_bufio_release(to_buffer(*result));
 508                return r;
 509        }
 510
 511        return 0;
 512}
 513EXPORT_SYMBOL_GPL(dm_bm_write_lock);
 514
 515int dm_bm_read_try_lock(struct dm_block_manager *bm,
 516                        dm_block_t b, struct dm_block_validator *v,
 517                        struct dm_block **result)
 518{
 519        struct buffer_aux *aux;
 520        void *p;
 521        int r;
 522
 523        p = dm_bufio_get(bm->bufio, b, (struct dm_buffer **) result);
 524        if (unlikely(IS_ERR(p)))
 525                return PTR_ERR(p);
 526        if (unlikely(!p))
 527                return -EWOULDBLOCK;
 528
 529        aux = dm_bufio_get_aux_data(to_buffer(*result));
 530        r = bl_down_read_nonblock(&aux->lock);
 531        if (r < 0) {
 532                dm_bufio_release(to_buffer(*result));
 533                report_recursive_bug(b, r);
 534                return r;
 535        }
 536        aux->write_locked = 0;
 537
 538        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 539        if (unlikely(r)) {
 540                bl_up_read(&aux->lock);
 541                dm_bufio_release(to_buffer(*result));
 542                return r;
 543        }
 544
 545        return 0;
 546}
 547
 548int dm_bm_write_lock_zero(struct dm_block_manager *bm,
 549                          dm_block_t b, struct dm_block_validator *v,
 550                          struct dm_block **result)
 551{
 552        int r;
 553        struct buffer_aux *aux;
 554        void *p;
 555
 556        if (bm->read_only)
 557                return -EPERM;
 558
 559        p = dm_bufio_new(bm->bufio, b, (struct dm_buffer **) result);
 560        if (unlikely(IS_ERR(p)))
 561                return PTR_ERR(p);
 562
 563        memset(p, 0, dm_bm_block_size(bm));
 564
 565        aux = dm_bufio_get_aux_data(to_buffer(*result));
 566        r = bl_down_write(&aux->lock);
 567        if (r) {
 568                dm_bufio_release(to_buffer(*result));
 569                return r;
 570        }
 571
 572        aux->write_locked = 1;
 573        aux->validator = v;
 574
 575        return 0;
 576}
 577EXPORT_SYMBOL_GPL(dm_bm_write_lock_zero);
 578
 579int dm_bm_unlock(struct dm_block *b)
 580{
 581        struct buffer_aux *aux;
 582        aux = dm_bufio_get_aux_data(to_buffer(b));
 583
 584        if (aux->write_locked) {
 585                dm_bufio_mark_buffer_dirty(to_buffer(b));
 586                bl_up_write(&aux->lock);
 587        } else
 588                bl_up_read(&aux->lock);
 589
 590        dm_bufio_release(to_buffer(b));
 591
 592        return 0;
 593}
 594EXPORT_SYMBOL_GPL(dm_bm_unlock);
 595
 596int dm_bm_flush_and_unlock(struct dm_block_manager *bm,
 597                           struct dm_block *superblock)
 598{
 599        int r;
 600
 601        if (bm->read_only)
 602                return -EPERM;
 603
 604        r = dm_bufio_write_dirty_buffers(bm->bufio);
 605        if (unlikely(r)) {
 606                dm_bm_unlock(superblock);
 607                return r;
 608        }
 609
 610        dm_bm_unlock(superblock);
 611
 612        return dm_bufio_write_dirty_buffers(bm->bufio);
 613}
 614
 615void dm_bm_set_read_only(struct dm_block_manager *bm)
 616{
 617        bm->read_only = true;
 618}
 619EXPORT_SYMBOL_GPL(dm_bm_set_read_only);
 620
 621u32 dm_bm_checksum(const void *data, size_t len, u32 init_xor)
 622{
 623        return crc32c(~(u32) 0, data, len) ^ init_xor;
 624}
 625EXPORT_SYMBOL_GPL(dm_bm_checksum);
 626
 627/*----------------------------------------------------------------*/
 628
 629MODULE_LICENSE("GPL");
 630MODULE_AUTHOR("Joe Thornber <dm-devel@redhat.com>");
 631MODULE_DESCRIPTION("Immutable metadata library for dm");
 632
 633/*----------------------------------------------------------------*/
 634