linux/drivers/md/persistent-data/dm-block-manager.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2011 Red Hat, Inc.
   3 *
   4 * This file is released under the GPL.
   5 */
   6#include "dm-block-manager.h"
   7#include "dm-persistent-data-internal.h"
   8
   9#include <linux/dm-bufio.h>
  10#include <linux/crc32c.h>
  11#include <linux/module.h>
  12#include <linux/slab.h>
  13#include <linux/rwsem.h>
  14#include <linux/device-mapper.h>
  15#include <linux/stacktrace.h>
  16#include <linux/sched/task.h>
  17
  18#define DM_MSG_PREFIX "block manager"
  19
  20/*----------------------------------------------------------------*/
  21
  22#ifdef CONFIG_DM_DEBUG_BLOCK_MANAGER_LOCKING
  23
  24/*
  25 * This is a read/write semaphore with a couple of differences.
  26 *
  27 * i) There is a restriction on the number of concurrent read locks that
  28 * may be held at once.  This is just an implementation detail.
  29 *
  30 * ii) Recursive locking attempts are detected and return EINVAL.  A stack
  31 * trace is also emitted for the previous lock acquisition.
  32 *
  33 * iii) Priority is given to write locks.
  34 */
  35#define MAX_HOLDERS 4
  36#define MAX_STACK 10
  37
  38struct stack_store {
  39        unsigned int    nr_entries;
  40        unsigned long   entries[MAX_STACK];
  41};
  42
  43struct block_lock {
  44        spinlock_t lock;
  45        __s32 count;
  46        struct list_head waiters;
  47        struct task_struct *holders[MAX_HOLDERS];
  48
  49#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  50        struct stack_store traces[MAX_HOLDERS];
  51#endif
  52};
  53
  54struct waiter {
  55        struct list_head list;
  56        struct task_struct *task;
  57        int wants_write;
  58};
  59
  60static unsigned __find_holder(struct block_lock *lock,
  61                              struct task_struct *task)
  62{
  63        unsigned i;
  64
  65        for (i = 0; i < MAX_HOLDERS; i++)
  66                if (lock->holders[i] == task)
  67                        break;
  68
  69        BUG_ON(i == MAX_HOLDERS);
  70        return i;
  71}
  72
  73/* call this *after* you increment lock->count */
  74static void __add_holder(struct block_lock *lock, struct task_struct *task)
  75{
  76        unsigned h = __find_holder(lock, NULL);
  77#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  78        struct stack_store *t;
  79#endif
  80
  81        get_task_struct(task);
  82        lock->holders[h] = task;
  83
  84#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  85        t = lock->traces + h;
  86        t->nr_entries = stack_trace_save(t->entries, MAX_STACK, 2);
  87#endif
  88}
  89
  90/* call this *before* you decrement lock->count */
  91static void __del_holder(struct block_lock *lock, struct task_struct *task)
  92{
  93        unsigned h = __find_holder(lock, task);
  94        lock->holders[h] = NULL;
  95        put_task_struct(task);
  96}
  97
  98static int __check_holder(struct block_lock *lock)
  99{
 100        unsigned i;
 101
 102        for (i = 0; i < MAX_HOLDERS; i++) {
 103                if (lock->holders[i] == current) {
 104                        DMERR("recursive lock detected in metadata");
 105#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
 106                        DMERR("previously held here:");
 107                        stack_trace_print(lock->traces[i].entries,
 108                                          lock->traces[i].nr_entries, 4);
 109
 110                        DMERR("subsequent acquisition attempted here:");
 111                        dump_stack();
 112#endif
 113                        return -EINVAL;
 114                }
 115        }
 116
 117        return 0;
 118}
 119
 120static void __wait(struct waiter *w)
 121{
 122        for (;;) {
 123                set_current_state(TASK_UNINTERRUPTIBLE);
 124
 125                if (!w->task)
 126                        break;
 127
 128                schedule();
 129        }
 130
 131        set_current_state(TASK_RUNNING);
 132}
 133
 134static void __wake_waiter(struct waiter *w)
 135{
 136        struct task_struct *task;
 137
 138        list_del(&w->list);
 139        task = w->task;
 140        smp_mb();
 141        w->task = NULL;
 142        wake_up_process(task);
 143}
 144
 145/*
 146 * We either wake a few readers or a single writer.
 147 */
 148static void __wake_many(struct block_lock *lock)
 149{
 150        struct waiter *w, *tmp;
 151
 152        BUG_ON(lock->count < 0);
 153        list_for_each_entry_safe(w, tmp, &lock->waiters, list) {
 154                if (lock->count >= MAX_HOLDERS)
 155                        return;
 156
 157                if (w->wants_write) {
 158                        if (lock->count > 0)
 159                                return; /* still read locked */
 160
 161                        lock->count = -1;
 162                        __add_holder(lock, w->task);
 163                        __wake_waiter(w);
 164                        return;
 165                }
 166
 167                lock->count++;
 168                __add_holder(lock, w->task);
 169                __wake_waiter(w);
 170        }
 171}
 172
 173static void bl_init(struct block_lock *lock)
 174{
 175        int i;
 176
 177        spin_lock_init(&lock->lock);
 178        lock->count = 0;
 179        INIT_LIST_HEAD(&lock->waiters);
 180        for (i = 0; i < MAX_HOLDERS; i++)
 181                lock->holders[i] = NULL;
 182}
 183
 184static int __available_for_read(struct block_lock *lock)
 185{
 186        return lock->count >= 0 &&
 187                lock->count < MAX_HOLDERS &&
 188                list_empty(&lock->waiters);
 189}
 190
 191static int bl_down_read(struct block_lock *lock)
 192{
 193        int r;
 194        struct waiter w;
 195
 196        spin_lock(&lock->lock);
 197        r = __check_holder(lock);
 198        if (r) {
 199                spin_unlock(&lock->lock);
 200                return r;
 201        }
 202
 203        if (__available_for_read(lock)) {
 204                lock->count++;
 205                __add_holder(lock, current);
 206                spin_unlock(&lock->lock);
 207                return 0;
 208        }
 209
 210        get_task_struct(current);
 211
 212        w.task = current;
 213        w.wants_write = 0;
 214        list_add_tail(&w.list, &lock->waiters);
 215        spin_unlock(&lock->lock);
 216
 217        __wait(&w);
 218        put_task_struct(current);
 219        return 0;
 220}
 221
 222static int bl_down_read_nonblock(struct block_lock *lock)
 223{
 224        int r;
 225
 226        spin_lock(&lock->lock);
 227        r = __check_holder(lock);
 228        if (r)
 229                goto out;
 230
 231        if (__available_for_read(lock)) {
 232                lock->count++;
 233                __add_holder(lock, current);
 234                r = 0;
 235        } else
 236                r = -EWOULDBLOCK;
 237
 238out:
 239        spin_unlock(&lock->lock);
 240        return r;
 241}
 242
 243static void bl_up_read(struct block_lock *lock)
 244{
 245        spin_lock(&lock->lock);
 246        BUG_ON(lock->count <= 0);
 247        __del_holder(lock, current);
 248        --lock->count;
 249        if (!list_empty(&lock->waiters))
 250                __wake_many(lock);
 251        spin_unlock(&lock->lock);
 252}
 253
 254static int bl_down_write(struct block_lock *lock)
 255{
 256        int r;
 257        struct waiter w;
 258
 259        spin_lock(&lock->lock);
 260        r = __check_holder(lock);
 261        if (r) {
 262                spin_unlock(&lock->lock);
 263                return r;
 264        }
 265
 266        if (lock->count == 0 && list_empty(&lock->waiters)) {
 267                lock->count = -1;
 268                __add_holder(lock, current);
 269                spin_unlock(&lock->lock);
 270                return 0;
 271        }
 272
 273        get_task_struct(current);
 274        w.task = current;
 275        w.wants_write = 1;
 276
 277        /*
 278         * Writers given priority. We know there's only one mutator in the
 279         * system, so ignoring the ordering reversal.
 280         */
 281        list_add(&w.list, &lock->waiters);
 282        spin_unlock(&lock->lock);
 283
 284        __wait(&w);
 285        put_task_struct(current);
 286
 287        return 0;
 288}
 289
 290static void bl_up_write(struct block_lock *lock)
 291{
 292        spin_lock(&lock->lock);
 293        __del_holder(lock, current);
 294        lock->count = 0;
 295        if (!list_empty(&lock->waiters))
 296                __wake_many(lock);
 297        spin_unlock(&lock->lock);
 298}
 299
 300static void report_recursive_bug(dm_block_t b, int r)
 301{
 302        if (r == -EINVAL)
 303                DMERR("recursive acquisition of block %llu requested.",
 304                      (unsigned long long) b);
 305}
 306
 307#else  /* !CONFIG_DM_DEBUG_BLOCK_MANAGER_LOCKING */
 308
 309#define bl_init(x) do { } while (0)
 310#define bl_down_read(x) 0
 311#define bl_down_read_nonblock(x) 0
 312#define bl_up_read(x) do { } while (0)
 313#define bl_down_write(x) 0
 314#define bl_up_write(x) do { } while (0)
 315#define report_recursive_bug(x, y) do { } while (0)
 316
 317#endif /* CONFIG_DM_DEBUG_BLOCK_MANAGER_LOCKING */
 318
 319/*----------------------------------------------------------------*/
 320
 321/*
 322 * Block manager is currently implemented using dm-bufio.  struct
 323 * dm_block_manager and struct dm_block map directly onto a couple of
 324 * structs in the bufio interface.  I want to retain the freedom to move
 325 * away from bufio in the future.  So these structs are just cast within
 326 * this .c file, rather than making it through to the public interface.
 327 */
 328static struct dm_buffer *to_buffer(struct dm_block *b)
 329{
 330        return (struct dm_buffer *) b;
 331}
 332
 333dm_block_t dm_block_location(struct dm_block *b)
 334{
 335        return dm_bufio_get_block_number(to_buffer(b));
 336}
 337EXPORT_SYMBOL_GPL(dm_block_location);
 338
 339void *dm_block_data(struct dm_block *b)
 340{
 341        return dm_bufio_get_block_data(to_buffer(b));
 342}
 343EXPORT_SYMBOL_GPL(dm_block_data);
 344
 345struct buffer_aux {
 346        struct dm_block_validator *validator;
 347        int write_locked;
 348
 349#ifdef CONFIG_DM_DEBUG_BLOCK_MANAGER_LOCKING
 350        struct block_lock lock;
 351#endif
 352};
 353
 354static void dm_block_manager_alloc_callback(struct dm_buffer *buf)
 355{
 356        struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
 357        aux->validator = NULL;
 358        bl_init(&aux->lock);
 359}
 360
 361static void dm_block_manager_write_callback(struct dm_buffer *buf)
 362{
 363        struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
 364        if (aux->validator) {
 365                aux->validator->prepare_for_write(aux->validator, (struct dm_block *) buf,
 366                         dm_bufio_get_block_size(dm_bufio_get_client(buf)));
 367        }
 368}
 369
 370/*----------------------------------------------------------------
 371 * Public interface
 372 *--------------------------------------------------------------*/
 373struct dm_block_manager {
 374        struct dm_bufio_client *bufio;
 375        bool read_only:1;
 376};
 377
 378struct dm_block_manager *dm_block_manager_create(struct block_device *bdev,
 379                                                 unsigned block_size,
 380                                                 unsigned max_held_per_thread)
 381{
 382        int r;
 383        struct dm_block_manager *bm;
 384
 385        bm = kmalloc(sizeof(*bm), GFP_KERNEL);
 386        if (!bm) {
 387                r = -ENOMEM;
 388                goto bad;
 389        }
 390
 391        bm->bufio = dm_bufio_client_create(bdev, block_size, max_held_per_thread,
 392                                           sizeof(struct buffer_aux),
 393                                           dm_block_manager_alloc_callback,
 394                                           dm_block_manager_write_callback);
 395        if (IS_ERR(bm->bufio)) {
 396                r = PTR_ERR(bm->bufio);
 397                kfree(bm);
 398                goto bad;
 399        }
 400
 401        bm->read_only = false;
 402
 403        return bm;
 404
 405bad:
 406        return ERR_PTR(r);
 407}
 408EXPORT_SYMBOL_GPL(dm_block_manager_create);
 409
 410void dm_block_manager_destroy(struct dm_block_manager *bm)
 411{
 412        dm_bufio_client_destroy(bm->bufio);
 413        kfree(bm);
 414}
 415EXPORT_SYMBOL_GPL(dm_block_manager_destroy);
 416
 417unsigned dm_bm_block_size(struct dm_block_manager *bm)
 418{
 419        return dm_bufio_get_block_size(bm->bufio);
 420}
 421EXPORT_SYMBOL_GPL(dm_bm_block_size);
 422
 423dm_block_t dm_bm_nr_blocks(struct dm_block_manager *bm)
 424{
 425        return dm_bufio_get_device_size(bm->bufio);
 426}
 427
 428static int dm_bm_validate_buffer(struct dm_block_manager *bm,
 429                                 struct dm_buffer *buf,
 430                                 struct buffer_aux *aux,
 431                                 struct dm_block_validator *v)
 432{
 433        if (unlikely(!aux->validator)) {
 434                int r;
 435                if (!v)
 436                        return 0;
 437                r = v->check(v, (struct dm_block *) buf, dm_bufio_get_block_size(bm->bufio));
 438                if (unlikely(r)) {
 439                        DMERR_LIMIT("%s validator check failed for block %llu", v->name,
 440                                    (unsigned long long) dm_bufio_get_block_number(buf));
 441                        return r;
 442                }
 443                aux->validator = v;
 444        } else {
 445                if (unlikely(aux->validator != v)) {
 446                        DMERR_LIMIT("validator mismatch (old=%s vs new=%s) for block %llu",
 447                                    aux->validator->name, v ? v->name : "NULL",
 448                                    (unsigned long long) dm_bufio_get_block_number(buf));
 449                        return -EINVAL;
 450                }
 451        }
 452
 453        return 0;
 454}
 455int dm_bm_read_lock(struct dm_block_manager *bm, dm_block_t b,
 456                    struct dm_block_validator *v,
 457                    struct dm_block **result)
 458{
 459        struct buffer_aux *aux;
 460        void *p;
 461        int r;
 462
 463        p = dm_bufio_read(bm->bufio, b, (struct dm_buffer **) result);
 464        if (IS_ERR(p))
 465                return PTR_ERR(p);
 466
 467        aux = dm_bufio_get_aux_data(to_buffer(*result));
 468        r = bl_down_read(&aux->lock);
 469        if (unlikely(r)) {
 470                dm_bufio_release(to_buffer(*result));
 471                report_recursive_bug(b, r);
 472                return r;
 473        }
 474
 475        aux->write_locked = 0;
 476
 477        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 478        if (unlikely(r)) {
 479                bl_up_read(&aux->lock);
 480                dm_bufio_release(to_buffer(*result));
 481                return r;
 482        }
 483
 484        return 0;
 485}
 486EXPORT_SYMBOL_GPL(dm_bm_read_lock);
 487
 488int dm_bm_write_lock(struct dm_block_manager *bm,
 489                     dm_block_t b, struct dm_block_validator *v,
 490                     struct dm_block **result)
 491{
 492        struct buffer_aux *aux;
 493        void *p;
 494        int r;
 495
 496        if (bm->read_only)
 497                return -EPERM;
 498
 499        p = dm_bufio_read(bm->bufio, b, (struct dm_buffer **) result);
 500        if (IS_ERR(p))
 501                return PTR_ERR(p);
 502
 503        aux = dm_bufio_get_aux_data(to_buffer(*result));
 504        r = bl_down_write(&aux->lock);
 505        if (r) {
 506                dm_bufio_release(to_buffer(*result));
 507                report_recursive_bug(b, r);
 508                return r;
 509        }
 510
 511        aux->write_locked = 1;
 512
 513        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 514        if (unlikely(r)) {
 515                bl_up_write(&aux->lock);
 516                dm_bufio_release(to_buffer(*result));
 517                return r;
 518        }
 519
 520        return 0;
 521}
 522EXPORT_SYMBOL_GPL(dm_bm_write_lock);
 523
 524int dm_bm_read_try_lock(struct dm_block_manager *bm,
 525                        dm_block_t b, struct dm_block_validator *v,
 526                        struct dm_block **result)
 527{
 528        struct buffer_aux *aux;
 529        void *p;
 530        int r;
 531
 532        p = dm_bufio_get(bm->bufio, b, (struct dm_buffer **) result);
 533        if (IS_ERR(p))
 534                return PTR_ERR(p);
 535        if (unlikely(!p))
 536                return -EWOULDBLOCK;
 537
 538        aux = dm_bufio_get_aux_data(to_buffer(*result));
 539        r = bl_down_read_nonblock(&aux->lock);
 540        if (r < 0) {
 541                dm_bufio_release(to_buffer(*result));
 542                report_recursive_bug(b, r);
 543                return r;
 544        }
 545        aux->write_locked = 0;
 546
 547        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 548        if (unlikely(r)) {
 549                bl_up_read(&aux->lock);
 550                dm_bufio_release(to_buffer(*result));
 551                return r;
 552        }
 553
 554        return 0;
 555}
 556
 557int dm_bm_write_lock_zero(struct dm_block_manager *bm,
 558                          dm_block_t b, struct dm_block_validator *v,
 559                          struct dm_block **result)
 560{
 561        int r;
 562        struct buffer_aux *aux;
 563        void *p;
 564
 565        if (bm->read_only)
 566                return -EPERM;
 567
 568        p = dm_bufio_new(bm->bufio, b, (struct dm_buffer **) result);
 569        if (IS_ERR(p))
 570                return PTR_ERR(p);
 571
 572        memset(p, 0, dm_bm_block_size(bm));
 573
 574        aux = dm_bufio_get_aux_data(to_buffer(*result));
 575        r = bl_down_write(&aux->lock);
 576        if (r) {
 577                dm_bufio_release(to_buffer(*result));
 578                return r;
 579        }
 580
 581        aux->write_locked = 1;
 582        aux->validator = v;
 583
 584        return 0;
 585}
 586EXPORT_SYMBOL_GPL(dm_bm_write_lock_zero);
 587
 588void dm_bm_unlock(struct dm_block *b)
 589{
 590        struct buffer_aux *aux;
 591        aux = dm_bufio_get_aux_data(to_buffer(b));
 592
 593        if (aux->write_locked) {
 594                dm_bufio_mark_buffer_dirty(to_buffer(b));
 595                bl_up_write(&aux->lock);
 596        } else
 597                bl_up_read(&aux->lock);
 598
 599        dm_bufio_release(to_buffer(b));
 600}
 601EXPORT_SYMBOL_GPL(dm_bm_unlock);
 602
 603int dm_bm_flush(struct dm_block_manager *bm)
 604{
 605        if (bm->read_only)
 606                return -EPERM;
 607
 608        return dm_bufio_write_dirty_buffers(bm->bufio);
 609}
 610EXPORT_SYMBOL_GPL(dm_bm_flush);
 611
 612void dm_bm_prefetch(struct dm_block_manager *bm, dm_block_t b)
 613{
 614        dm_bufio_prefetch(bm->bufio, b, 1);
 615}
 616
 617bool dm_bm_is_read_only(struct dm_block_manager *bm)
 618{
 619        return bm->read_only;
 620}
 621EXPORT_SYMBOL_GPL(dm_bm_is_read_only);
 622
 623void dm_bm_set_read_only(struct dm_block_manager *bm)
 624{
 625        bm->read_only = true;
 626}
 627EXPORT_SYMBOL_GPL(dm_bm_set_read_only);
 628
 629void dm_bm_set_read_write(struct dm_block_manager *bm)
 630{
 631        bm->read_only = false;
 632}
 633EXPORT_SYMBOL_GPL(dm_bm_set_read_write);
 634
 635u32 dm_bm_checksum(const void *data, size_t len, u32 init_xor)
 636{
 637        return crc32c(~(u32) 0, data, len) ^ init_xor;
 638}
 639EXPORT_SYMBOL_GPL(dm_bm_checksum);
 640
 641/*----------------------------------------------------------------*/
 642
 643MODULE_LICENSE("GPL");
 644MODULE_AUTHOR("Joe Thornber <dm-devel@redhat.com>");
 645MODULE_DESCRIPTION("Immutable metadata library for dm");
 646
 647/*----------------------------------------------------------------*/
 648