linux/drivers/md/dm-kcopyd.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2002 Sistina Software (UK) Limited.
   3 * Copyright (C) 2006 Red Hat GmbH
   4 *
   5 * This file is released under the GPL.
   6 *
   7 * Kcopyd provides a simple interface for copying an area of one
   8 * block-device to one or more other block-devices, with an asynchronous
   9 * completion notification.
  10 */
  11
  12#include <linux/types.h>
  13#include <linux/atomic.h>
  14#include <linux/blkdev.h>
  15#include <linux/fs.h>
  16#include <linux/init.h>
  17#include <linux/list.h>
  18#include <linux/mempool.h>
  19#include <linux/module.h>
  20#include <linux/pagemap.h>
  21#include <linux/slab.h>
  22#include <linux/vmalloc.h>
  23#include <linux/workqueue.h>
  24#include <linux/mutex.h>
  25#include <linux/delay.h>
  26#include <linux/device-mapper.h>
  27#include <linux/dm-kcopyd.h>
  28
  29#include "dm-core.h"
  30
  31#define SUB_JOB_SIZE    128
  32#define SPLIT_COUNT     8
  33#define MIN_JOBS        8
  34#define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
  35
  36/*-----------------------------------------------------------------
  37 * Each kcopyd client has its own little pool of preallocated
  38 * pages for kcopyd io.
  39 *---------------------------------------------------------------*/
  40struct dm_kcopyd_client {
  41        struct page_list *pages;
  42        unsigned nr_reserved_pages;
  43        unsigned nr_free_pages;
  44
  45        struct dm_io_client *io_client;
  46
  47        wait_queue_head_t destroyq;
  48        atomic_t nr_jobs;
  49
  50        mempool_t *job_pool;
  51
  52        struct workqueue_struct *kcopyd_wq;
  53        struct work_struct kcopyd_work;
  54
  55        struct dm_kcopyd_throttle *throttle;
  56
  57/*
  58 * We maintain three lists of jobs:
  59 *
  60 * i)   jobs waiting for pages
  61 * ii)  jobs that have pages, and are waiting for the io to be issued.
  62 * iii) jobs that have completed.
  63 *
  64 * All three of these are protected by job_lock.
  65 */
  66        spinlock_t job_lock;
  67        struct list_head complete_jobs;
  68        struct list_head io_jobs;
  69        struct list_head pages_jobs;
  70};
  71
  72static struct page_list zero_page_list;
  73
  74static DEFINE_SPINLOCK(throttle_spinlock);
  75
  76/*
  77 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
  78 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
  79 * by 2.
  80 */
  81#define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
  82
  83/*
  84 * Sleep this number of milliseconds.
  85 *
  86 * The value was decided experimentally.
  87 * Smaller values seem to cause an increased copy rate above the limit.
  88 * The reason for this is unknown but possibly due to jiffies rounding errors
  89 * or read/write cache inside the disk.
  90 */
  91#define SLEEP_MSEC                      100
  92
  93/*
  94 * Maximum number of sleep events. There is a theoretical livelock if more
  95 * kcopyd clients do work simultaneously which this limit avoids.
  96 */
  97#define MAX_SLEEPS                      10
  98
  99static void io_job_start(struct dm_kcopyd_throttle *t)
 100{
 101        unsigned throttle, now, difference;
 102        int slept = 0, skew;
 103
 104        if (unlikely(!t))
 105                return;
 106
 107try_again:
 108        spin_lock_irq(&throttle_spinlock);
 109
 110        throttle = READ_ONCE(t->throttle);
 111
 112        if (likely(throttle >= 100))
 113                goto skip_limit;
 114
 115        now = jiffies;
 116        difference = now - t->last_jiffies;
 117        t->last_jiffies = now;
 118        if (t->num_io_jobs)
 119                t->io_period += difference;
 120        t->total_period += difference;
 121
 122        /*
 123         * Maintain sane values if we got a temporary overflow.
 124         */
 125        if (unlikely(t->io_period > t->total_period))
 126                t->io_period = t->total_period;
 127
 128        if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
 129                int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
 130                t->total_period >>= shift;
 131                t->io_period >>= shift;
 132        }
 133
 134        skew = t->io_period - throttle * t->total_period / 100;
 135
 136        if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
 137                slept++;
 138                spin_unlock_irq(&throttle_spinlock);
 139                msleep(SLEEP_MSEC);
 140                goto try_again;
 141        }
 142
 143skip_limit:
 144        t->num_io_jobs++;
 145
 146        spin_unlock_irq(&throttle_spinlock);
 147}
 148
 149static void io_job_finish(struct dm_kcopyd_throttle *t)
 150{
 151        unsigned long flags;
 152
 153        if (unlikely(!t))
 154                return;
 155
 156        spin_lock_irqsave(&throttle_spinlock, flags);
 157
 158        t->num_io_jobs--;
 159
 160        if (likely(READ_ONCE(t->throttle) >= 100))
 161                goto skip_limit;
 162
 163        if (!t->num_io_jobs) {
 164                unsigned now, difference;
 165
 166                now = jiffies;
 167                difference = now - t->last_jiffies;
 168                t->last_jiffies = now;
 169
 170                t->io_period += difference;
 171                t->total_period += difference;
 172
 173                /*
 174                 * Maintain sane values if we got a temporary overflow.
 175                 */
 176                if (unlikely(t->io_period > t->total_period))
 177                        t->io_period = t->total_period;
 178        }
 179
 180skip_limit:
 181        spin_unlock_irqrestore(&throttle_spinlock, flags);
 182}
 183
 184
 185static void wake(struct dm_kcopyd_client *kc)
 186{
 187        queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
 188}
 189
 190/*
 191 * Obtain one page for the use of kcopyd.
 192 */
 193static struct page_list *alloc_pl(gfp_t gfp)
 194{
 195        struct page_list *pl;
 196
 197        pl = kmalloc(sizeof(*pl), gfp);
 198        if (!pl)
 199                return NULL;
 200
 201        pl->page = alloc_page(gfp);
 202        if (!pl->page) {
 203                kfree(pl);
 204                return NULL;
 205        }
 206
 207        return pl;
 208}
 209
 210static void free_pl(struct page_list *pl)
 211{
 212        __free_page(pl->page);
 213        kfree(pl);
 214}
 215
 216/*
 217 * Add the provided pages to a client's free page list, releasing
 218 * back to the system any beyond the reserved_pages limit.
 219 */
 220static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
 221{
 222        struct page_list *next;
 223
 224        do {
 225                next = pl->next;
 226
 227                if (kc->nr_free_pages >= kc->nr_reserved_pages)
 228                        free_pl(pl);
 229                else {
 230                        pl->next = kc->pages;
 231                        kc->pages = pl;
 232                        kc->nr_free_pages++;
 233                }
 234
 235                pl = next;
 236        } while (pl);
 237}
 238
 239static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
 240                            unsigned int nr, struct page_list **pages)
 241{
 242        struct page_list *pl;
 243
 244        *pages = NULL;
 245
 246        do {
 247                pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
 248                if (unlikely(!pl)) {
 249                        /* Use reserved pages */
 250                        pl = kc->pages;
 251                        if (unlikely(!pl))
 252                                goto out_of_memory;
 253                        kc->pages = pl->next;
 254                        kc->nr_free_pages--;
 255                }
 256                pl->next = *pages;
 257                *pages = pl;
 258        } while (--nr);
 259
 260        return 0;
 261
 262out_of_memory:
 263        if (*pages)
 264                kcopyd_put_pages(kc, *pages);
 265        return -ENOMEM;
 266}
 267
 268/*
 269 * These three functions resize the page pool.
 270 */
 271static void drop_pages(struct page_list *pl)
 272{
 273        struct page_list *next;
 274
 275        while (pl) {
 276                next = pl->next;
 277                free_pl(pl);
 278                pl = next;
 279        }
 280}
 281
 282/*
 283 * Allocate and reserve nr_pages for the use of a specific client.
 284 */
 285static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
 286{
 287        unsigned i;
 288        struct page_list *pl = NULL, *next;
 289
 290        for (i = 0; i < nr_pages; i++) {
 291                next = alloc_pl(GFP_KERNEL);
 292                if (!next) {
 293                        if (pl)
 294                                drop_pages(pl);
 295                        return -ENOMEM;
 296                }
 297                next->next = pl;
 298                pl = next;
 299        }
 300
 301        kc->nr_reserved_pages += nr_pages;
 302        kcopyd_put_pages(kc, pl);
 303
 304        return 0;
 305}
 306
 307static void client_free_pages(struct dm_kcopyd_client *kc)
 308{
 309        BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
 310        drop_pages(kc->pages);
 311        kc->pages = NULL;
 312        kc->nr_free_pages = kc->nr_reserved_pages = 0;
 313}
 314
 315/*-----------------------------------------------------------------
 316 * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 317 * for this reason we use a mempool to prevent the client from
 318 * ever having to do io (which could cause a deadlock).
 319 *---------------------------------------------------------------*/
 320struct kcopyd_job {
 321        struct dm_kcopyd_client *kc;
 322        struct list_head list;
 323        unsigned long flags;
 324
 325        /*
 326         * Error state of the job.
 327         */
 328        int read_err;
 329        unsigned long write_err;
 330
 331        /*
 332         * Either READ or WRITE
 333         */
 334        int rw;
 335        struct dm_io_region source;
 336
 337        /*
 338         * The destinations for the transfer.
 339         */
 340        unsigned int num_dests;
 341        struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
 342
 343        struct page_list *pages;
 344
 345        /*
 346         * Set this to ensure you are notified when the job has
 347         * completed.  'context' is for callback to use.
 348         */
 349        dm_kcopyd_notify_fn fn;
 350        void *context;
 351
 352        /*
 353         * These fields are only used if the job has been split
 354         * into more manageable parts.
 355         */
 356        struct mutex lock;
 357        atomic_t sub_jobs;
 358        sector_t progress;
 359        sector_t write_offset;
 360
 361        struct kcopyd_job *master_job;
 362};
 363
 364static struct kmem_cache *_job_cache;
 365
 366int __init dm_kcopyd_init(void)
 367{
 368        _job_cache = kmem_cache_create("kcopyd_job",
 369                                sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
 370                                __alignof__(struct kcopyd_job), 0, NULL);
 371        if (!_job_cache)
 372                return -ENOMEM;
 373
 374        zero_page_list.next = &zero_page_list;
 375        zero_page_list.page = ZERO_PAGE(0);
 376
 377        return 0;
 378}
 379
 380void dm_kcopyd_exit(void)
 381{
 382        kmem_cache_destroy(_job_cache);
 383        _job_cache = NULL;
 384}
 385
 386/*
 387 * Functions to push and pop a job onto the head of a given job
 388 * list.
 389 */
 390static struct kcopyd_job *pop_io_job(struct list_head *jobs,
 391                                     struct dm_kcopyd_client *kc)
 392{
 393        struct kcopyd_job *job;
 394
 395        /*
 396         * For I/O jobs, pop any read, any write without sequential write
 397         * constraint and sequential writes that are at the right position.
 398         */
 399        list_for_each_entry(job, jobs, list) {
 400                if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
 401                        list_del(&job->list);
 402                        return job;
 403                }
 404
 405                if (job->write_offset == job->master_job->write_offset) {
 406                        job->master_job->write_offset += job->source.count;
 407                        list_del(&job->list);
 408                        return job;
 409                }
 410        }
 411
 412        return NULL;
 413}
 414
 415static struct kcopyd_job *pop(struct list_head *jobs,
 416                              struct dm_kcopyd_client *kc)
 417{
 418        struct kcopyd_job *job = NULL;
 419        unsigned long flags;
 420
 421        spin_lock_irqsave(&kc->job_lock, flags);
 422
 423        if (!list_empty(jobs)) {
 424                if (jobs == &kc->io_jobs)
 425                        job = pop_io_job(jobs, kc);
 426                else {
 427                        job = list_entry(jobs->next, struct kcopyd_job, list);
 428                        list_del(&job->list);
 429                }
 430        }
 431        spin_unlock_irqrestore(&kc->job_lock, flags);
 432
 433        return job;
 434}
 435
 436static void push(struct list_head *jobs, struct kcopyd_job *job)
 437{
 438        unsigned long flags;
 439        struct dm_kcopyd_client *kc = job->kc;
 440
 441        spin_lock_irqsave(&kc->job_lock, flags);
 442        list_add_tail(&job->list, jobs);
 443        spin_unlock_irqrestore(&kc->job_lock, flags);
 444}
 445
 446
 447static void push_head(struct list_head *jobs, struct kcopyd_job *job)
 448{
 449        unsigned long flags;
 450        struct dm_kcopyd_client *kc = job->kc;
 451
 452        spin_lock_irqsave(&kc->job_lock, flags);
 453        list_add(&job->list, jobs);
 454        spin_unlock_irqrestore(&kc->job_lock, flags);
 455}
 456
 457/*
 458 * These three functions process 1 item from the corresponding
 459 * job list.
 460 *
 461 * They return:
 462 * < 0: error
 463 *   0: success
 464 * > 0: can't process yet.
 465 */
 466static int run_complete_job(struct kcopyd_job *job)
 467{
 468        void *context = job->context;
 469        int read_err = job->read_err;
 470        unsigned long write_err = job->write_err;
 471        dm_kcopyd_notify_fn fn = job->fn;
 472        struct dm_kcopyd_client *kc = job->kc;
 473
 474        if (job->pages && job->pages != &zero_page_list)
 475                kcopyd_put_pages(kc, job->pages);
 476        /*
 477         * If this is the master job, the sub jobs have already
 478         * completed so we can free everything.
 479         */
 480        if (job->master_job == job)
 481                mempool_free(job, kc->job_pool);
 482        fn(read_err, write_err, context);
 483
 484        if (atomic_dec_and_test(&kc->nr_jobs))
 485                wake_up(&kc->destroyq);
 486
 487        return 0;
 488}
 489
 490static void complete_io(unsigned long error, void *context)
 491{
 492        struct kcopyd_job *job = (struct kcopyd_job *) context;
 493        struct dm_kcopyd_client *kc = job->kc;
 494
 495        io_job_finish(kc->throttle);
 496
 497        if (error) {
 498                if (op_is_write(job->rw))
 499                        job->write_err |= error;
 500                else
 501                        job->read_err = 1;
 502
 503                if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 504                        push(&kc->complete_jobs, job);
 505                        wake(kc);
 506                        return;
 507                }
 508        }
 509
 510        if (op_is_write(job->rw))
 511                push(&kc->complete_jobs, job);
 512
 513        else {
 514                job->rw = WRITE;
 515                push(&kc->io_jobs, job);
 516        }
 517
 518        wake(kc);
 519}
 520
 521/*
 522 * Request io on as many buffer heads as we can currently get for
 523 * a particular job.
 524 */
 525static int run_io_job(struct kcopyd_job *job)
 526{
 527        int r;
 528        struct dm_io_request io_req = {
 529                .bi_op = job->rw,
 530                .bi_op_flags = 0,
 531                .mem.type = DM_IO_PAGE_LIST,
 532                .mem.ptr.pl = job->pages,
 533                .mem.offset = 0,
 534                .notify.fn = complete_io,
 535                .notify.context = job,
 536                .client = job->kc->io_client,
 537        };
 538
 539        /*
 540         * If we need to write sequentially and some reads or writes failed,
 541         * no point in continuing.
 542         */
 543        if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
 544            job->master_job->write_err)
 545                return -EIO;
 546
 547        io_job_start(job->kc->throttle);
 548
 549        if (job->rw == READ)
 550                r = dm_io(&io_req, 1, &job->source, NULL);
 551        else
 552                r = dm_io(&io_req, job->num_dests, job->dests, NULL);
 553
 554        return r;
 555}
 556
 557static int run_pages_job(struct kcopyd_job *job)
 558{
 559        int r;
 560        unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
 561
 562        r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
 563        if (!r) {
 564                /* this job is ready for io */
 565                push(&job->kc->io_jobs, job);
 566                return 0;
 567        }
 568
 569        if (r == -ENOMEM)
 570                /* can't complete now */
 571                return 1;
 572
 573        return r;
 574}
 575
 576/*
 577 * Run through a list for as long as possible.  Returns the count
 578 * of successful jobs.
 579 */
 580static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
 581                        int (*fn) (struct kcopyd_job *))
 582{
 583        struct kcopyd_job *job;
 584        int r, count = 0;
 585
 586        while ((job = pop(jobs, kc))) {
 587
 588                r = fn(job);
 589
 590                if (r < 0) {
 591                        /* error this rogue job */
 592                        if (op_is_write(job->rw))
 593                                job->write_err = (unsigned long) -1L;
 594                        else
 595                                job->read_err = 1;
 596                        push(&kc->complete_jobs, job);
 597                        break;
 598                }
 599
 600                if (r > 0) {
 601                        /*
 602                         * We couldn't service this job ATM, so
 603                         * push this job back onto the list.
 604                         */
 605                        push_head(jobs, job);
 606                        break;
 607                }
 608
 609                count++;
 610        }
 611
 612        return count;
 613}
 614
 615/*
 616 * kcopyd does this every time it's woken up.
 617 */
 618static void do_work(struct work_struct *work)
 619{
 620        struct dm_kcopyd_client *kc = container_of(work,
 621                                        struct dm_kcopyd_client, kcopyd_work);
 622        struct blk_plug plug;
 623
 624        /*
 625         * The order that these are called is *very* important.
 626         * complete jobs can free some pages for pages jobs.
 627         * Pages jobs when successful will jump onto the io jobs
 628         * list.  io jobs call wake when they complete and it all
 629         * starts again.
 630         */
 631        blk_start_plug(&plug);
 632        process_jobs(&kc->complete_jobs, kc, run_complete_job);
 633        process_jobs(&kc->pages_jobs, kc, run_pages_job);
 634        process_jobs(&kc->io_jobs, kc, run_io_job);
 635        blk_finish_plug(&plug);
 636}
 637
 638/*
 639 * If we are copying a small region we just dispatch a single job
 640 * to do the copy, otherwise the io has to be split up into many
 641 * jobs.
 642 */
 643static void dispatch_job(struct kcopyd_job *job)
 644{
 645        struct dm_kcopyd_client *kc = job->kc;
 646        atomic_inc(&kc->nr_jobs);
 647        if (unlikely(!job->source.count))
 648                push(&kc->complete_jobs, job);
 649        else if (job->pages == &zero_page_list)
 650                push(&kc->io_jobs, job);
 651        else
 652                push(&kc->pages_jobs, job);
 653        wake(kc);
 654}
 655
 656static void segment_complete(int read_err, unsigned long write_err,
 657                             void *context)
 658{
 659        /* FIXME: tidy this function */
 660        sector_t progress = 0;
 661        sector_t count = 0;
 662        struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
 663        struct kcopyd_job *job = sub_job->master_job;
 664        struct dm_kcopyd_client *kc = job->kc;
 665
 666        mutex_lock(&job->lock);
 667
 668        /* update the error */
 669        if (read_err)
 670                job->read_err = 1;
 671
 672        if (write_err)
 673                job->write_err |= write_err;
 674
 675        /*
 676         * Only dispatch more work if there hasn't been an error.
 677         */
 678        if ((!job->read_err && !job->write_err) ||
 679            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 680                /* get the next chunk of work */
 681                progress = job->progress;
 682                count = job->source.count - progress;
 683                if (count) {
 684                        if (count > SUB_JOB_SIZE)
 685                                count = SUB_JOB_SIZE;
 686
 687                        job->progress += count;
 688                }
 689        }
 690        mutex_unlock(&job->lock);
 691
 692        if (count) {
 693                int i;
 694
 695                *sub_job = *job;
 696                sub_job->write_offset = progress;
 697                sub_job->source.sector += progress;
 698                sub_job->source.count = count;
 699
 700                for (i = 0; i < job->num_dests; i++) {
 701                        sub_job->dests[i].sector += progress;
 702                        sub_job->dests[i].count = count;
 703                }
 704
 705                sub_job->fn = segment_complete;
 706                sub_job->context = sub_job;
 707                dispatch_job(sub_job);
 708
 709        } else if (atomic_dec_and_test(&job->sub_jobs)) {
 710
 711                /*
 712                 * Queue the completion callback to the kcopyd thread.
 713                 *
 714                 * Some callers assume that all the completions are called
 715                 * from a single thread and don't race with each other.
 716                 *
 717                 * We must not call the callback directly here because this
 718                 * code may not be executing in the thread.
 719                 */
 720                push(&kc->complete_jobs, job);
 721                wake(kc);
 722        }
 723}
 724
 725/*
 726 * Create some sub jobs to share the work between them.
 727 */
 728static void split_job(struct kcopyd_job *master_job)
 729{
 730        int i;
 731
 732        atomic_inc(&master_job->kc->nr_jobs);
 733
 734        atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
 735        for (i = 0; i < SPLIT_COUNT; i++) {
 736                master_job[i + 1].master_job = master_job;
 737                segment_complete(0, 0u, &master_job[i + 1]);
 738        }
 739}
 740
 741int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
 742                   unsigned int num_dests, struct dm_io_region *dests,
 743                   unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
 744{
 745        struct kcopyd_job *job;
 746        int i;
 747
 748        /*
 749         * Allocate an array of jobs consisting of one master job
 750         * followed by SPLIT_COUNT sub jobs.
 751         */
 752        job = mempool_alloc(kc->job_pool, GFP_NOIO);
 753
 754        /*
 755         * set up for the read.
 756         */
 757        job->kc = kc;
 758        job->flags = flags;
 759        job->read_err = 0;
 760        job->write_err = 0;
 761
 762        job->num_dests = num_dests;
 763        memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 764
 765        /*
 766         * If one of the destination is a host-managed zoned block device,
 767         * we need to write sequentially. If one of the destination is a
 768         * host-aware device, then leave it to the caller to choose what to do.
 769         */
 770        if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
 771                for (i = 0; i < job->num_dests; i++) {
 772                        if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
 773                                set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
 774                                break;
 775                        }
 776                }
 777        }
 778
 779        /*
 780         * If we need to write sequentially, errors cannot be ignored.
 781         */
 782        if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
 783            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
 784                clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
 785
 786        if (from) {
 787                job->source = *from;
 788                job->pages = NULL;
 789                job->rw = READ;
 790        } else {
 791                memset(&job->source, 0, sizeof job->source);
 792                job->source.count = job->dests[0].count;
 793                job->pages = &zero_page_list;
 794
 795                /*
 796                 * Use WRITE ZEROES to optimize zeroing if all dests support it.
 797                 */
 798                job->rw = REQ_OP_WRITE_ZEROES;
 799                for (i = 0; i < job->num_dests; i++)
 800                        if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
 801                                job->rw = WRITE;
 802                                break;
 803                        }
 804        }
 805
 806        job->fn = fn;
 807        job->context = context;
 808        job->master_job = job;
 809        job->write_offset = 0;
 810
 811        if (job->source.count <= SUB_JOB_SIZE)
 812                dispatch_job(job);
 813        else {
 814                mutex_init(&job->lock);
 815                job->progress = 0;
 816                split_job(job);
 817        }
 818
 819        return 0;
 820}
 821EXPORT_SYMBOL(dm_kcopyd_copy);
 822
 823int dm_kcopyd_zero(struct dm_kcopyd_client *kc,
 824                   unsigned num_dests, struct dm_io_region *dests,
 825                   unsigned flags, dm_kcopyd_notify_fn fn, void *context)
 826{
 827        return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
 828}
 829EXPORT_SYMBOL(dm_kcopyd_zero);
 830
 831void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
 832                                 dm_kcopyd_notify_fn fn, void *context)
 833{
 834        struct kcopyd_job *job;
 835
 836        job = mempool_alloc(kc->job_pool, GFP_NOIO);
 837
 838        memset(job, 0, sizeof(struct kcopyd_job));
 839        job->kc = kc;
 840        job->fn = fn;
 841        job->context = context;
 842        job->master_job = job;
 843
 844        atomic_inc(&kc->nr_jobs);
 845
 846        return job;
 847}
 848EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
 849
 850void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
 851{
 852        struct kcopyd_job *job = j;
 853        struct dm_kcopyd_client *kc = job->kc;
 854
 855        job->read_err = read_err;
 856        job->write_err = write_err;
 857
 858        push(&kc->complete_jobs, job);
 859        wake(kc);
 860}
 861EXPORT_SYMBOL(dm_kcopyd_do_callback);
 862
 863/*
 864 * Cancels a kcopyd job, eg. someone might be deactivating a
 865 * mirror.
 866 */
 867#if 0
 868int kcopyd_cancel(struct kcopyd_job *job, int block)
 869{
 870        /* FIXME: finish */
 871        return -1;
 872}
 873#endif  /*  0  */
 874
 875/*-----------------------------------------------------------------
 876 * Client setup
 877 *---------------------------------------------------------------*/
 878struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
 879{
 880        int r = -ENOMEM;
 881        struct dm_kcopyd_client *kc;
 882
 883        kc = kmalloc(sizeof(*kc), GFP_KERNEL);
 884        if (!kc)
 885                return ERR_PTR(-ENOMEM);
 886
 887        spin_lock_init(&kc->job_lock);
 888        INIT_LIST_HEAD(&kc->complete_jobs);
 889        INIT_LIST_HEAD(&kc->io_jobs);
 890        INIT_LIST_HEAD(&kc->pages_jobs);
 891        kc->throttle = throttle;
 892
 893        kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
 894        if (!kc->job_pool)
 895                goto bad_slab;
 896
 897        INIT_WORK(&kc->kcopyd_work, do_work);
 898        kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
 899        if (!kc->kcopyd_wq)
 900                goto bad_workqueue;
 901
 902        kc->pages = NULL;
 903        kc->nr_reserved_pages = kc->nr_free_pages = 0;
 904        r = client_reserve_pages(kc, RESERVE_PAGES);
 905        if (r)
 906                goto bad_client_pages;
 907
 908        kc->io_client = dm_io_client_create();
 909        if (IS_ERR(kc->io_client)) {
 910                r = PTR_ERR(kc->io_client);
 911                goto bad_io_client;
 912        }
 913
 914        init_waitqueue_head(&kc->destroyq);
 915        atomic_set(&kc->nr_jobs, 0);
 916
 917        return kc;
 918
 919bad_io_client:
 920        client_free_pages(kc);
 921bad_client_pages:
 922        destroy_workqueue(kc->kcopyd_wq);
 923bad_workqueue:
 924        mempool_destroy(kc->job_pool);
 925bad_slab:
 926        kfree(kc);
 927
 928        return ERR_PTR(r);
 929}
 930EXPORT_SYMBOL(dm_kcopyd_client_create);
 931
 932void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
 933{
 934        /* Wait for completion of all jobs submitted by this client. */
 935        wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 936
 937        BUG_ON(!list_empty(&kc->complete_jobs));
 938        BUG_ON(!list_empty(&kc->io_jobs));
 939        BUG_ON(!list_empty(&kc->pages_jobs));
 940        destroy_workqueue(kc->kcopyd_wq);
 941        dm_io_client_destroy(kc->io_client);
 942        client_free_pages(kc);
 943        mempool_destroy(kc->job_pool);
 944        kfree(kc);
 945}
 946EXPORT_SYMBOL(dm_kcopyd_client_destroy);
 947