linux/drivers/md/dm-kcopyd.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2002 Sistina Software (UK) Limited.
   3 * Copyright (C) 2006 Red Hat GmbH
   4 *
   5 * This file is released under the GPL.
   6 *
   7 * Kcopyd provides a simple interface for copying an area of one
   8 * block-device to one or more other block-devices, with an asynchronous
   9 * completion notification.
  10 */
  11
  12#include <linux/types.h>
  13#include <asm/atomic.h>
  14#include <linux/blkdev.h>
  15#include <linux/fs.h>
  16#include <linux/init.h>
  17#include <linux/list.h>
  18#include <linux/mempool.h>
  19#include <linux/module.h>
  20#include <linux/pagemap.h>
  21#include <linux/slab.h>
  22#include <linux/vmalloc.h>
  23#include <linux/workqueue.h>
  24#include <linux/mutex.h>
  25#include <linux/device-mapper.h>
  26#include <linux/dm-kcopyd.h>
  27
  28#include "dm.h"
  29
  30/*-----------------------------------------------------------------
  31 * Each kcopyd client has its own little pool of preallocated
  32 * pages for kcopyd io.
  33 *---------------------------------------------------------------*/
  34struct dm_kcopyd_client {
  35        spinlock_t lock;
  36        struct page_list *pages;
  37        unsigned int nr_pages;
  38        unsigned int nr_free_pages;
  39
  40        struct dm_io_client *io_client;
  41
  42        wait_queue_head_t destroyq;
  43        atomic_t nr_jobs;
  44
  45        mempool_t *job_pool;
  46
  47        struct workqueue_struct *kcopyd_wq;
  48        struct work_struct kcopyd_work;
  49
  50/*
  51 * We maintain three lists of jobs:
  52 *
  53 * i)   jobs waiting for pages
  54 * ii)  jobs that have pages, and are waiting for the io to be issued.
  55 * iii) jobs that have completed.
  56 *
  57 * All three of these are protected by job_lock.
  58 */
  59        spinlock_t job_lock;
  60        struct list_head complete_jobs;
  61        struct list_head io_jobs;
  62        struct list_head pages_jobs;
  63};
  64
  65static void wake(struct dm_kcopyd_client *kc)
  66{
  67        queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
  68}
  69
  70static struct page_list *alloc_pl(void)
  71{
  72        struct page_list *pl;
  73
  74        pl = kmalloc(sizeof(*pl), GFP_KERNEL);
  75        if (!pl)
  76                return NULL;
  77
  78        pl->page = alloc_page(GFP_KERNEL);
  79        if (!pl->page) {
  80                kfree(pl);
  81                return NULL;
  82        }
  83
  84        return pl;
  85}
  86
  87static void free_pl(struct page_list *pl)
  88{
  89        __free_page(pl->page);
  90        kfree(pl);
  91}
  92
  93static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
  94                            unsigned int nr, struct page_list **pages)
  95{
  96        struct page_list *pl;
  97
  98        spin_lock(&kc->lock);
  99        if (kc->nr_free_pages < nr) {
 100                spin_unlock(&kc->lock);
 101                return -ENOMEM;
 102        }
 103
 104        kc->nr_free_pages -= nr;
 105        for (*pages = pl = kc->pages; --nr; pl = pl->next)
 106                ;
 107
 108        kc->pages = pl->next;
 109        pl->next = NULL;
 110
 111        spin_unlock(&kc->lock);
 112
 113        return 0;
 114}
 115
 116static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
 117{
 118        struct page_list *cursor;
 119
 120        spin_lock(&kc->lock);
 121        for (cursor = pl; cursor->next; cursor = cursor->next)
 122                kc->nr_free_pages++;
 123
 124        kc->nr_free_pages++;
 125        cursor->next = kc->pages;
 126        kc->pages = pl;
 127        spin_unlock(&kc->lock);
 128}
 129
 130/*
 131 * These three functions resize the page pool.
 132 */
 133static void drop_pages(struct page_list *pl)
 134{
 135        struct page_list *next;
 136
 137        while (pl) {
 138                next = pl->next;
 139                free_pl(pl);
 140                pl = next;
 141        }
 142}
 143
 144static int client_alloc_pages(struct dm_kcopyd_client *kc, unsigned int nr)
 145{
 146        unsigned int i;
 147        struct page_list *pl = NULL, *next;
 148
 149        for (i = 0; i < nr; i++) {
 150                next = alloc_pl();
 151                if (!next) {
 152                        if (pl)
 153                                drop_pages(pl);
 154                        return -ENOMEM;
 155                }
 156                next->next = pl;
 157                pl = next;
 158        }
 159
 160        kcopyd_put_pages(kc, pl);
 161        kc->nr_pages += nr;
 162        return 0;
 163}
 164
 165static void client_free_pages(struct dm_kcopyd_client *kc)
 166{
 167        BUG_ON(kc->nr_free_pages != kc->nr_pages);
 168        drop_pages(kc->pages);
 169        kc->pages = NULL;
 170        kc->nr_free_pages = kc->nr_pages = 0;
 171}
 172
 173/*-----------------------------------------------------------------
 174 * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 175 * for this reason we use a mempool to prevent the client from
 176 * ever having to do io (which could cause a deadlock).
 177 *---------------------------------------------------------------*/
 178struct kcopyd_job {
 179        struct dm_kcopyd_client *kc;
 180        struct list_head list;
 181        unsigned long flags;
 182
 183        /*
 184         * Error state of the job.
 185         */
 186        int read_err;
 187        unsigned long write_err;
 188
 189        /*
 190         * Either READ or WRITE
 191         */
 192        int rw;
 193        struct dm_io_region source;
 194
 195        /*
 196         * The destinations for the transfer.
 197         */
 198        unsigned int num_dests;
 199        struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
 200
 201        sector_t offset;
 202        unsigned int nr_pages;
 203        struct page_list *pages;
 204
 205        /*
 206         * Set this to ensure you are notified when the job has
 207         * completed.  'context' is for callback to use.
 208         */
 209        dm_kcopyd_notify_fn fn;
 210        void *context;
 211
 212        /*
 213         * These fields are only used if the job has been split
 214         * into more manageable parts.
 215         */
 216        struct mutex lock;
 217        atomic_t sub_jobs;
 218        sector_t progress;
 219};
 220
 221/* FIXME: this should scale with the number of pages */
 222#define MIN_JOBS 512
 223
 224static struct kmem_cache *_job_cache;
 225
 226int __init dm_kcopyd_init(void)
 227{
 228        _job_cache = KMEM_CACHE(kcopyd_job, 0);
 229        if (!_job_cache)
 230                return -ENOMEM;
 231
 232        return 0;
 233}
 234
 235void dm_kcopyd_exit(void)
 236{
 237        kmem_cache_destroy(_job_cache);
 238        _job_cache = NULL;
 239}
 240
 241/*
 242 * Functions to push and pop a job onto the head of a given job
 243 * list.
 244 */
 245static struct kcopyd_job *pop(struct list_head *jobs,
 246                              struct dm_kcopyd_client *kc)
 247{
 248        struct kcopyd_job *job = NULL;
 249        unsigned long flags;
 250
 251        spin_lock_irqsave(&kc->job_lock, flags);
 252
 253        if (!list_empty(jobs)) {
 254                job = list_entry(jobs->next, struct kcopyd_job, list);
 255                list_del(&job->list);
 256        }
 257        spin_unlock_irqrestore(&kc->job_lock, flags);
 258
 259        return job;
 260}
 261
 262static void push(struct list_head *jobs, struct kcopyd_job *job)
 263{
 264        unsigned long flags;
 265        struct dm_kcopyd_client *kc = job->kc;
 266
 267        spin_lock_irqsave(&kc->job_lock, flags);
 268        list_add_tail(&job->list, jobs);
 269        spin_unlock_irqrestore(&kc->job_lock, flags);
 270}
 271
 272
 273static void push_head(struct list_head *jobs, struct kcopyd_job *job)
 274{
 275        unsigned long flags;
 276        struct dm_kcopyd_client *kc = job->kc;
 277
 278        spin_lock_irqsave(&kc->job_lock, flags);
 279        list_add(&job->list, jobs);
 280        spin_unlock_irqrestore(&kc->job_lock, flags);
 281}
 282
 283/*
 284 * These three functions process 1 item from the corresponding
 285 * job list.
 286 *
 287 * They return:
 288 * < 0: error
 289 *   0: success
 290 * > 0: can't process yet.
 291 */
 292static int run_complete_job(struct kcopyd_job *job)
 293{
 294        void *context = job->context;
 295        int read_err = job->read_err;
 296        unsigned long write_err = job->write_err;
 297        dm_kcopyd_notify_fn fn = job->fn;
 298        struct dm_kcopyd_client *kc = job->kc;
 299
 300        if (job->pages)
 301                kcopyd_put_pages(kc, job->pages);
 302        mempool_free(job, kc->job_pool);
 303        fn(read_err, write_err, context);
 304
 305        if (atomic_dec_and_test(&kc->nr_jobs))
 306                wake_up(&kc->destroyq);
 307
 308        return 0;
 309}
 310
 311static void complete_io(unsigned long error, void *context)
 312{
 313        struct kcopyd_job *job = (struct kcopyd_job *) context;
 314        struct dm_kcopyd_client *kc = job->kc;
 315
 316        if (error) {
 317                if (job->rw == WRITE)
 318                        job->write_err |= error;
 319                else
 320                        job->read_err = 1;
 321
 322                if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 323                        push(&kc->complete_jobs, job);
 324                        wake(kc);
 325                        return;
 326                }
 327        }
 328
 329        if (job->rw == WRITE)
 330                push(&kc->complete_jobs, job);
 331
 332        else {
 333                job->rw = WRITE;
 334                push(&kc->io_jobs, job);
 335        }
 336
 337        wake(kc);
 338}
 339
 340/*
 341 * Request io on as many buffer heads as we can currently get for
 342 * a particular job.
 343 */
 344static int run_io_job(struct kcopyd_job *job)
 345{
 346        int r;
 347        struct dm_io_request io_req = {
 348                .bi_rw = job->rw | (1 << BIO_RW_SYNCIO) | (1 << BIO_RW_UNPLUG),
 349                .mem.type = DM_IO_PAGE_LIST,
 350                .mem.ptr.pl = job->pages,
 351                .mem.offset = job->offset,
 352                .notify.fn = complete_io,
 353                .notify.context = job,
 354                .client = job->kc->io_client,
 355        };
 356
 357        if (job->rw == READ)
 358                r = dm_io(&io_req, 1, &job->source, NULL);
 359        else
 360                r = dm_io(&io_req, job->num_dests, job->dests, NULL);
 361
 362        return r;
 363}
 364
 365static int run_pages_job(struct kcopyd_job *job)
 366{
 367        int r;
 368
 369        job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
 370                                  PAGE_SIZE >> 9);
 371        r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
 372        if (!r) {
 373                /* this job is ready for io */
 374                push(&job->kc->io_jobs, job);
 375                return 0;
 376        }
 377
 378        if (r == -ENOMEM)
 379                /* can't complete now */
 380                return 1;
 381
 382        return r;
 383}
 384
 385/*
 386 * Run through a list for as long as possible.  Returns the count
 387 * of successful jobs.
 388 */
 389static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
 390                        int (*fn) (struct kcopyd_job *))
 391{
 392        struct kcopyd_job *job;
 393        int r, count = 0;
 394
 395        while ((job = pop(jobs, kc))) {
 396
 397                r = fn(job);
 398
 399                if (r < 0) {
 400                        /* error this rogue job */
 401                        if (job->rw == WRITE)
 402                                job->write_err = (unsigned long) -1L;
 403                        else
 404                                job->read_err = 1;
 405                        push(&kc->complete_jobs, job);
 406                        break;
 407                }
 408
 409                if (r > 0) {
 410                        /*
 411                         * We couldn't service this job ATM, so
 412                         * push this job back onto the list.
 413                         */
 414                        push_head(jobs, job);
 415                        break;
 416                }
 417
 418                count++;
 419        }
 420
 421        return count;
 422}
 423
 424/*
 425 * kcopyd does this every time it's woken up.
 426 */
 427static void do_work(struct work_struct *work)
 428{
 429        struct dm_kcopyd_client *kc = container_of(work,
 430                                        struct dm_kcopyd_client, kcopyd_work);
 431
 432        /*
 433         * The order that these are called is *very* important.
 434         * complete jobs can free some pages for pages jobs.
 435         * Pages jobs when successful will jump onto the io jobs
 436         * list.  io jobs call wake when they complete and it all
 437         * starts again.
 438         */
 439        process_jobs(&kc->complete_jobs, kc, run_complete_job);
 440        process_jobs(&kc->pages_jobs, kc, run_pages_job);
 441        process_jobs(&kc->io_jobs, kc, run_io_job);
 442}
 443
 444/*
 445 * If we are copying a small region we just dispatch a single job
 446 * to do the copy, otherwise the io has to be split up into many
 447 * jobs.
 448 */
 449static void dispatch_job(struct kcopyd_job *job)
 450{
 451        struct dm_kcopyd_client *kc = job->kc;
 452        atomic_inc(&kc->nr_jobs);
 453        push(&kc->pages_jobs, job);
 454        wake(kc);
 455}
 456
 457#define SUB_JOB_SIZE 128
 458static void segment_complete(int read_err, unsigned long write_err,
 459                             void *context)
 460{
 461        /* FIXME: tidy this function */
 462        sector_t progress = 0;
 463        sector_t count = 0;
 464        struct kcopyd_job *job = (struct kcopyd_job *) context;
 465        struct dm_kcopyd_client *kc = job->kc;
 466
 467        mutex_lock(&job->lock);
 468
 469        /* update the error */
 470        if (read_err)
 471                job->read_err = 1;
 472
 473        if (write_err)
 474                job->write_err |= write_err;
 475
 476        /*
 477         * Only dispatch more work if there hasn't been an error.
 478         */
 479        if ((!job->read_err && !job->write_err) ||
 480            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 481                /* get the next chunk of work */
 482                progress = job->progress;
 483                count = job->source.count - progress;
 484                if (count) {
 485                        if (count > SUB_JOB_SIZE)
 486                                count = SUB_JOB_SIZE;
 487
 488                        job->progress += count;
 489                }
 490        }
 491        mutex_unlock(&job->lock);
 492
 493        if (count) {
 494                int i;
 495                struct kcopyd_job *sub_job = mempool_alloc(kc->job_pool,
 496                                                           GFP_NOIO);
 497
 498                *sub_job = *job;
 499                sub_job->source.sector += progress;
 500                sub_job->source.count = count;
 501
 502                for (i = 0; i < job->num_dests; i++) {
 503                        sub_job->dests[i].sector += progress;
 504                        sub_job->dests[i].count = count;
 505                }
 506
 507                sub_job->fn = segment_complete;
 508                sub_job->context = job;
 509                dispatch_job(sub_job);
 510
 511        } else if (atomic_dec_and_test(&job->sub_jobs)) {
 512
 513                /*
 514                 * Queue the completion callback to the kcopyd thread.
 515                 *
 516                 * Some callers assume that all the completions are called
 517                 * from a single thread and don't race with each other.
 518                 *
 519                 * We must not call the callback directly here because this
 520                 * code may not be executing in the thread.
 521                 */
 522                push(&kc->complete_jobs, job);
 523                wake(kc);
 524        }
 525}
 526
 527/*
 528 * Create some little jobs that will do the move between
 529 * them.
 530 */
 531#define SPLIT_COUNT 8
 532static void split_job(struct kcopyd_job *job)
 533{
 534        int i;
 535
 536        atomic_inc(&job->kc->nr_jobs);
 537
 538        atomic_set(&job->sub_jobs, SPLIT_COUNT);
 539        for (i = 0; i < SPLIT_COUNT; i++)
 540                segment_complete(0, 0u, job);
 541}
 542
 543int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
 544                   unsigned int num_dests, struct dm_io_region *dests,
 545                   unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
 546{
 547        struct kcopyd_job *job;
 548
 549        /*
 550         * Allocate a new job.
 551         */
 552        job = mempool_alloc(kc->job_pool, GFP_NOIO);
 553
 554        /*
 555         * set up for the read.
 556         */
 557        job->kc = kc;
 558        job->flags = flags;
 559        job->read_err = 0;
 560        job->write_err = 0;
 561        job->rw = READ;
 562
 563        job->source = *from;
 564
 565        job->num_dests = num_dests;
 566        memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 567
 568        job->offset = 0;
 569        job->nr_pages = 0;
 570        job->pages = NULL;
 571
 572        job->fn = fn;
 573        job->context = context;
 574
 575        if (job->source.count < SUB_JOB_SIZE)
 576                dispatch_job(job);
 577
 578        else {
 579                mutex_init(&job->lock);
 580                job->progress = 0;
 581                split_job(job);
 582        }
 583
 584        return 0;
 585}
 586EXPORT_SYMBOL(dm_kcopyd_copy);
 587
 588/*
 589 * Cancels a kcopyd job, eg. someone might be deactivating a
 590 * mirror.
 591 */
 592#if 0
 593int kcopyd_cancel(struct kcopyd_job *job, int block)
 594{
 595        /* FIXME: finish */
 596        return -1;
 597}
 598#endif  /*  0  */
 599
 600/*-----------------------------------------------------------------
 601 * Client setup
 602 *---------------------------------------------------------------*/
 603int dm_kcopyd_client_create(unsigned int nr_pages,
 604                            struct dm_kcopyd_client **result)
 605{
 606        int r = -ENOMEM;
 607        struct dm_kcopyd_client *kc;
 608
 609        kc = kmalloc(sizeof(*kc), GFP_KERNEL);
 610        if (!kc)
 611                return -ENOMEM;
 612
 613        spin_lock_init(&kc->lock);
 614        spin_lock_init(&kc->job_lock);
 615        INIT_LIST_HEAD(&kc->complete_jobs);
 616        INIT_LIST_HEAD(&kc->io_jobs);
 617        INIT_LIST_HEAD(&kc->pages_jobs);
 618
 619        kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
 620        if (!kc->job_pool)
 621                goto bad_slab;
 622
 623        INIT_WORK(&kc->kcopyd_work, do_work);
 624        kc->kcopyd_wq = create_singlethread_workqueue("kcopyd");
 625        if (!kc->kcopyd_wq)
 626                goto bad_workqueue;
 627
 628        kc->pages = NULL;
 629        kc->nr_pages = kc->nr_free_pages = 0;
 630        r = client_alloc_pages(kc, nr_pages);
 631        if (r)
 632                goto bad_client_pages;
 633
 634        kc->io_client = dm_io_client_create(nr_pages);
 635        if (IS_ERR(kc->io_client)) {
 636                r = PTR_ERR(kc->io_client);
 637                goto bad_io_client;
 638        }
 639
 640        init_waitqueue_head(&kc->destroyq);
 641        atomic_set(&kc->nr_jobs, 0);
 642
 643        *result = kc;
 644        return 0;
 645
 646bad_io_client:
 647        client_free_pages(kc);
 648bad_client_pages:
 649        destroy_workqueue(kc->kcopyd_wq);
 650bad_workqueue:
 651        mempool_destroy(kc->job_pool);
 652bad_slab:
 653        kfree(kc);
 654
 655        return r;
 656}
 657EXPORT_SYMBOL(dm_kcopyd_client_create);
 658
 659void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
 660{
 661        /* Wait for completion of all jobs submitted by this client. */
 662        wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 663
 664        BUG_ON(!list_empty(&kc->complete_jobs));
 665        BUG_ON(!list_empty(&kc->io_jobs));
 666        BUG_ON(!list_empty(&kc->pages_jobs));
 667        destroy_workqueue(kc->kcopyd_wq);
 668        dm_io_client_destroy(kc->io_client);
 669        client_free_pages(kc);
 670        mempool_destroy(kc->job_pool);
 671        kfree(kc);
 672}
 673EXPORT_SYMBOL(dm_kcopyd_client_destroy);
 674