linux/drivers/md/dm-kcopyd.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2002 Sistina Software (UK) Limited.
   3 * Copyright (C) 2006 Red Hat GmbH
   4 *
   5 * This file is released under the GPL.
   6 *
   7 * Kcopyd provides a simple interface for copying an area of one
   8 * block-device to one or more other block-devices, with an asynchronous
   9 * completion notification.
  10 */
  11
  12#include <linux/types.h>
  13#include <linux/atomic.h>
  14#include <linux/blkdev.h>
  15#include <linux/fs.h>
  16#include <linux/init.h>
  17#include <linux/list.h>
  18#include <linux/mempool.h>
  19#include <linux/module.h>
  20#include <linux/pagemap.h>
  21#include <linux/slab.h>
  22#include <linux/vmalloc.h>
  23#include <linux/workqueue.h>
  24#include <linux/mutex.h>
  25#include <linux/delay.h>
  26#include <linux/device-mapper.h>
  27#include <linux/dm-kcopyd.h>
  28
  29#include "dm-core.h"
  30
  31#define SPLIT_COUNT     8
  32#define MIN_JOBS        8
  33
  34#define DEFAULT_SUB_JOB_SIZE_KB 512
  35#define MAX_SUB_JOB_SIZE_KB     1024
  36
  37static unsigned kcopyd_subjob_size_kb = DEFAULT_SUB_JOB_SIZE_KB;
  38
  39module_param(kcopyd_subjob_size_kb, uint, S_IRUGO | S_IWUSR);
  40MODULE_PARM_DESC(kcopyd_subjob_size_kb, "Sub-job size for dm-kcopyd clients");
  41
  42static unsigned dm_get_kcopyd_subjob_size(void)
  43{
  44        unsigned sub_job_size_kb;
  45
  46        sub_job_size_kb = __dm_get_module_param(&kcopyd_subjob_size_kb,
  47                                                DEFAULT_SUB_JOB_SIZE_KB,
  48                                                MAX_SUB_JOB_SIZE_KB);
  49
  50        return sub_job_size_kb << 1;
  51}
  52
  53/*-----------------------------------------------------------------
  54 * Each kcopyd client has its own little pool of preallocated
  55 * pages for kcopyd io.
  56 *---------------------------------------------------------------*/
  57struct dm_kcopyd_client {
  58        struct page_list *pages;
  59        unsigned nr_reserved_pages;
  60        unsigned nr_free_pages;
  61        unsigned sub_job_size;
  62
  63        struct dm_io_client *io_client;
  64
  65        wait_queue_head_t destroyq;
  66
  67        mempool_t job_pool;
  68
  69        struct workqueue_struct *kcopyd_wq;
  70        struct work_struct kcopyd_work;
  71
  72        struct dm_kcopyd_throttle *throttle;
  73
  74        atomic_t nr_jobs;
  75
  76/*
  77 * We maintain four lists of jobs:
  78 *
  79 * i)   jobs waiting for pages
  80 * ii)  jobs that have pages, and are waiting for the io to be issued.
  81 * iii) jobs that don't need to do any IO and just run a callback
  82 * iv) jobs that have completed.
  83 *
  84 * All four of these are protected by job_lock.
  85 */
  86        spinlock_t job_lock;
  87        struct list_head callback_jobs;
  88        struct list_head complete_jobs;
  89        struct list_head io_jobs;
  90        struct list_head pages_jobs;
  91};
  92
  93static struct page_list zero_page_list;
  94
  95static DEFINE_SPINLOCK(throttle_spinlock);
  96
  97/*
  98 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
  99 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
 100 * by 2.
 101 */
 102#define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
 103
 104/*
 105 * Sleep this number of milliseconds.
 106 *
 107 * The value was decided experimentally.
 108 * Smaller values seem to cause an increased copy rate above the limit.
 109 * The reason for this is unknown but possibly due to jiffies rounding errors
 110 * or read/write cache inside the disk.
 111 */
 112#define SLEEP_MSEC                      100
 113
 114/*
 115 * Maximum number of sleep events. There is a theoretical livelock if more
 116 * kcopyd clients do work simultaneously which this limit avoids.
 117 */
 118#define MAX_SLEEPS                      10
 119
 120static void io_job_start(struct dm_kcopyd_throttle *t)
 121{
 122        unsigned throttle, now, difference;
 123        int slept = 0, skew;
 124
 125        if (unlikely(!t))
 126                return;
 127
 128try_again:
 129        spin_lock_irq(&throttle_spinlock);
 130
 131        throttle = READ_ONCE(t->throttle);
 132
 133        if (likely(throttle >= 100))
 134                goto skip_limit;
 135
 136        now = jiffies;
 137        difference = now - t->last_jiffies;
 138        t->last_jiffies = now;
 139        if (t->num_io_jobs)
 140                t->io_period += difference;
 141        t->total_period += difference;
 142
 143        /*
 144         * Maintain sane values if we got a temporary overflow.
 145         */
 146        if (unlikely(t->io_period > t->total_period))
 147                t->io_period = t->total_period;
 148
 149        if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
 150                int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
 151                t->total_period >>= shift;
 152                t->io_period >>= shift;
 153        }
 154
 155        skew = t->io_period - throttle * t->total_period / 100;
 156
 157        if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
 158                slept++;
 159                spin_unlock_irq(&throttle_spinlock);
 160                msleep(SLEEP_MSEC);
 161                goto try_again;
 162        }
 163
 164skip_limit:
 165        t->num_io_jobs++;
 166
 167        spin_unlock_irq(&throttle_spinlock);
 168}
 169
 170static void io_job_finish(struct dm_kcopyd_throttle *t)
 171{
 172        unsigned long flags;
 173
 174        if (unlikely(!t))
 175                return;
 176
 177        spin_lock_irqsave(&throttle_spinlock, flags);
 178
 179        t->num_io_jobs--;
 180
 181        if (likely(READ_ONCE(t->throttle) >= 100))
 182                goto skip_limit;
 183
 184        if (!t->num_io_jobs) {
 185                unsigned now, difference;
 186
 187                now = jiffies;
 188                difference = now - t->last_jiffies;
 189                t->last_jiffies = now;
 190
 191                t->io_period += difference;
 192                t->total_period += difference;
 193
 194                /*
 195                 * Maintain sane values if we got a temporary overflow.
 196                 */
 197                if (unlikely(t->io_period > t->total_period))
 198                        t->io_period = t->total_period;
 199        }
 200
 201skip_limit:
 202        spin_unlock_irqrestore(&throttle_spinlock, flags);
 203}
 204
 205
 206static void wake(struct dm_kcopyd_client *kc)
 207{
 208        queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
 209}
 210
 211/*
 212 * Obtain one page for the use of kcopyd.
 213 */
 214static struct page_list *alloc_pl(gfp_t gfp)
 215{
 216        struct page_list *pl;
 217
 218        pl = kmalloc(sizeof(*pl), gfp);
 219        if (!pl)
 220                return NULL;
 221
 222        pl->page = alloc_page(gfp);
 223        if (!pl->page) {
 224                kfree(pl);
 225                return NULL;
 226        }
 227
 228        return pl;
 229}
 230
 231static void free_pl(struct page_list *pl)
 232{
 233        __free_page(pl->page);
 234        kfree(pl);
 235}
 236
 237/*
 238 * Add the provided pages to a client's free page list, releasing
 239 * back to the system any beyond the reserved_pages limit.
 240 */
 241static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
 242{
 243        struct page_list *next;
 244
 245        do {
 246                next = pl->next;
 247
 248                if (kc->nr_free_pages >= kc->nr_reserved_pages)
 249                        free_pl(pl);
 250                else {
 251                        pl->next = kc->pages;
 252                        kc->pages = pl;
 253                        kc->nr_free_pages++;
 254                }
 255
 256                pl = next;
 257        } while (pl);
 258}
 259
 260static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
 261                            unsigned int nr, struct page_list **pages)
 262{
 263        struct page_list *pl;
 264
 265        *pages = NULL;
 266
 267        do {
 268                pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
 269                if (unlikely(!pl)) {
 270                        /* Use reserved pages */
 271                        pl = kc->pages;
 272                        if (unlikely(!pl))
 273                                goto out_of_memory;
 274                        kc->pages = pl->next;
 275                        kc->nr_free_pages--;
 276                }
 277                pl->next = *pages;
 278                *pages = pl;
 279        } while (--nr);
 280
 281        return 0;
 282
 283out_of_memory:
 284        if (*pages)
 285                kcopyd_put_pages(kc, *pages);
 286        return -ENOMEM;
 287}
 288
 289/*
 290 * These three functions resize the page pool.
 291 */
 292static void drop_pages(struct page_list *pl)
 293{
 294        struct page_list *next;
 295
 296        while (pl) {
 297                next = pl->next;
 298                free_pl(pl);
 299                pl = next;
 300        }
 301}
 302
 303/*
 304 * Allocate and reserve nr_pages for the use of a specific client.
 305 */
 306static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
 307{
 308        unsigned i;
 309        struct page_list *pl = NULL, *next;
 310
 311        for (i = 0; i < nr_pages; i++) {
 312                next = alloc_pl(GFP_KERNEL);
 313                if (!next) {
 314                        if (pl)
 315                                drop_pages(pl);
 316                        return -ENOMEM;
 317                }
 318                next->next = pl;
 319                pl = next;
 320        }
 321
 322        kc->nr_reserved_pages += nr_pages;
 323        kcopyd_put_pages(kc, pl);
 324
 325        return 0;
 326}
 327
 328static void client_free_pages(struct dm_kcopyd_client *kc)
 329{
 330        BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
 331        drop_pages(kc->pages);
 332        kc->pages = NULL;
 333        kc->nr_free_pages = kc->nr_reserved_pages = 0;
 334}
 335
 336/*-----------------------------------------------------------------
 337 * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 338 * for this reason we use a mempool to prevent the client from
 339 * ever having to do io (which could cause a deadlock).
 340 *---------------------------------------------------------------*/
 341struct kcopyd_job {
 342        struct dm_kcopyd_client *kc;
 343        struct list_head list;
 344        unsigned long flags;
 345
 346        /*
 347         * Error state of the job.
 348         */
 349        int read_err;
 350        unsigned long write_err;
 351
 352        /*
 353         * Either READ or WRITE
 354         */
 355        int rw;
 356        struct dm_io_region source;
 357
 358        /*
 359         * The destinations for the transfer.
 360         */
 361        unsigned int num_dests;
 362        struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
 363
 364        struct page_list *pages;
 365
 366        /*
 367         * Set this to ensure you are notified when the job has
 368         * completed.  'context' is for callback to use.
 369         */
 370        dm_kcopyd_notify_fn fn;
 371        void *context;
 372
 373        /*
 374         * These fields are only used if the job has been split
 375         * into more manageable parts.
 376         */
 377        struct mutex lock;
 378        atomic_t sub_jobs;
 379        sector_t progress;
 380        sector_t write_offset;
 381
 382        struct kcopyd_job *master_job;
 383};
 384
 385static struct kmem_cache *_job_cache;
 386
 387int __init dm_kcopyd_init(void)
 388{
 389        _job_cache = kmem_cache_create("kcopyd_job",
 390                                sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
 391                                __alignof__(struct kcopyd_job), 0, NULL);
 392        if (!_job_cache)
 393                return -ENOMEM;
 394
 395        zero_page_list.next = &zero_page_list;
 396        zero_page_list.page = ZERO_PAGE(0);
 397
 398        return 0;
 399}
 400
 401void dm_kcopyd_exit(void)
 402{
 403        kmem_cache_destroy(_job_cache);
 404        _job_cache = NULL;
 405}
 406
 407/*
 408 * Functions to push and pop a job onto the head of a given job
 409 * list.
 410 */
 411static struct kcopyd_job *pop_io_job(struct list_head *jobs,
 412                                     struct dm_kcopyd_client *kc)
 413{
 414        struct kcopyd_job *job;
 415
 416        /*
 417         * For I/O jobs, pop any read, any write without sequential write
 418         * constraint and sequential writes that are at the right position.
 419         */
 420        list_for_each_entry(job, jobs, list) {
 421                if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
 422                        list_del(&job->list);
 423                        return job;
 424                }
 425
 426                if (job->write_offset == job->master_job->write_offset) {
 427                        job->master_job->write_offset += job->source.count;
 428                        list_del(&job->list);
 429                        return job;
 430                }
 431        }
 432
 433        return NULL;
 434}
 435
 436static struct kcopyd_job *pop(struct list_head *jobs,
 437                              struct dm_kcopyd_client *kc)
 438{
 439        struct kcopyd_job *job = NULL;
 440        unsigned long flags;
 441
 442        spin_lock_irqsave(&kc->job_lock, flags);
 443
 444        if (!list_empty(jobs)) {
 445                if (jobs == &kc->io_jobs)
 446                        job = pop_io_job(jobs, kc);
 447                else {
 448                        job = list_entry(jobs->next, struct kcopyd_job, list);
 449                        list_del(&job->list);
 450                }
 451        }
 452        spin_unlock_irqrestore(&kc->job_lock, flags);
 453
 454        return job;
 455}
 456
 457static void push(struct list_head *jobs, struct kcopyd_job *job)
 458{
 459        unsigned long flags;
 460        struct dm_kcopyd_client *kc = job->kc;
 461
 462        spin_lock_irqsave(&kc->job_lock, flags);
 463        list_add_tail(&job->list, jobs);
 464        spin_unlock_irqrestore(&kc->job_lock, flags);
 465}
 466
 467
 468static void push_head(struct list_head *jobs, struct kcopyd_job *job)
 469{
 470        unsigned long flags;
 471        struct dm_kcopyd_client *kc = job->kc;
 472
 473        spin_lock_irqsave(&kc->job_lock, flags);
 474        list_add(&job->list, jobs);
 475        spin_unlock_irqrestore(&kc->job_lock, flags);
 476}
 477
 478/*
 479 * These three functions process 1 item from the corresponding
 480 * job list.
 481 *
 482 * They return:
 483 * < 0: error
 484 *   0: success
 485 * > 0: can't process yet.
 486 */
 487static int run_complete_job(struct kcopyd_job *job)
 488{
 489        void *context = job->context;
 490        int read_err = job->read_err;
 491        unsigned long write_err = job->write_err;
 492        dm_kcopyd_notify_fn fn = job->fn;
 493        struct dm_kcopyd_client *kc = job->kc;
 494
 495        if (job->pages && job->pages != &zero_page_list)
 496                kcopyd_put_pages(kc, job->pages);
 497        /*
 498         * If this is the master job, the sub jobs have already
 499         * completed so we can free everything.
 500         */
 501        if (job->master_job == job) {
 502                mutex_destroy(&job->lock);
 503                mempool_free(job, &kc->job_pool);
 504        }
 505        fn(read_err, write_err, context);
 506
 507        if (atomic_dec_and_test(&kc->nr_jobs))
 508                wake_up(&kc->destroyq);
 509
 510        cond_resched();
 511
 512        return 0;
 513}
 514
 515static void complete_io(unsigned long error, void *context)
 516{
 517        struct kcopyd_job *job = (struct kcopyd_job *) context;
 518        struct dm_kcopyd_client *kc = job->kc;
 519
 520        io_job_finish(kc->throttle);
 521
 522        if (error) {
 523                if (op_is_write(job->rw))
 524                        job->write_err |= error;
 525                else
 526                        job->read_err = 1;
 527
 528                if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 529                        push(&kc->complete_jobs, job);
 530                        wake(kc);
 531                        return;
 532                }
 533        }
 534
 535        if (op_is_write(job->rw))
 536                push(&kc->complete_jobs, job);
 537
 538        else {
 539                job->rw = WRITE;
 540                push(&kc->io_jobs, job);
 541        }
 542
 543        wake(kc);
 544}
 545
 546/*
 547 * Request io on as many buffer heads as we can currently get for
 548 * a particular job.
 549 */
 550static int run_io_job(struct kcopyd_job *job)
 551{
 552        int r;
 553        struct dm_io_request io_req = {
 554                .bi_op = job->rw,
 555                .bi_op_flags = 0,
 556                .mem.type = DM_IO_PAGE_LIST,
 557                .mem.ptr.pl = job->pages,
 558                .mem.offset = 0,
 559                .notify.fn = complete_io,
 560                .notify.context = job,
 561                .client = job->kc->io_client,
 562        };
 563
 564        /*
 565         * If we need to write sequentially and some reads or writes failed,
 566         * no point in continuing.
 567         */
 568        if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
 569            job->master_job->write_err) {
 570                job->write_err = job->master_job->write_err;
 571                return -EIO;
 572        }
 573
 574        io_job_start(job->kc->throttle);
 575
 576        if (job->rw == READ)
 577                r = dm_io(&io_req, 1, &job->source, NULL);
 578        else
 579                r = dm_io(&io_req, job->num_dests, job->dests, NULL);
 580
 581        return r;
 582}
 583
 584static int run_pages_job(struct kcopyd_job *job)
 585{
 586        int r;
 587        unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
 588
 589        r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
 590        if (!r) {
 591                /* this job is ready for io */
 592                push(&job->kc->io_jobs, job);
 593                return 0;
 594        }
 595
 596        if (r == -ENOMEM)
 597                /* can't complete now */
 598                return 1;
 599
 600        return r;
 601}
 602
 603/*
 604 * Run through a list for as long as possible.  Returns the count
 605 * of successful jobs.
 606 */
 607static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
 608                        int (*fn) (struct kcopyd_job *))
 609{
 610        struct kcopyd_job *job;
 611        int r, count = 0;
 612
 613        while ((job = pop(jobs, kc))) {
 614
 615                r = fn(job);
 616
 617                if (r < 0) {
 618                        /* error this rogue job */
 619                        if (op_is_write(job->rw))
 620                                job->write_err = (unsigned long) -1L;
 621                        else
 622                                job->read_err = 1;
 623                        push(&kc->complete_jobs, job);
 624                        wake(kc);
 625                        break;
 626                }
 627
 628                if (r > 0) {
 629                        /*
 630                         * We couldn't service this job ATM, so
 631                         * push this job back onto the list.
 632                         */
 633                        push_head(jobs, job);
 634                        break;
 635                }
 636
 637                count++;
 638        }
 639
 640        return count;
 641}
 642
 643/*
 644 * kcopyd does this every time it's woken up.
 645 */
 646static void do_work(struct work_struct *work)
 647{
 648        struct dm_kcopyd_client *kc = container_of(work,
 649                                        struct dm_kcopyd_client, kcopyd_work);
 650        struct blk_plug plug;
 651        unsigned long flags;
 652
 653        /*
 654         * The order that these are called is *very* important.
 655         * complete jobs can free some pages for pages jobs.
 656         * Pages jobs when successful will jump onto the io jobs
 657         * list.  io jobs call wake when they complete and it all
 658         * starts again.
 659         */
 660        spin_lock_irqsave(&kc->job_lock, flags);
 661        list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
 662        spin_unlock_irqrestore(&kc->job_lock, flags);
 663
 664        blk_start_plug(&plug);
 665        process_jobs(&kc->complete_jobs, kc, run_complete_job);
 666        process_jobs(&kc->pages_jobs, kc, run_pages_job);
 667        process_jobs(&kc->io_jobs, kc, run_io_job);
 668        blk_finish_plug(&plug);
 669}
 670
 671/*
 672 * If we are copying a small region we just dispatch a single job
 673 * to do the copy, otherwise the io has to be split up into many
 674 * jobs.
 675 */
 676static void dispatch_job(struct kcopyd_job *job)
 677{
 678        struct dm_kcopyd_client *kc = job->kc;
 679        atomic_inc(&kc->nr_jobs);
 680        if (unlikely(!job->source.count))
 681                push(&kc->callback_jobs, job);
 682        else if (job->pages == &zero_page_list)
 683                push(&kc->io_jobs, job);
 684        else
 685                push(&kc->pages_jobs, job);
 686        wake(kc);
 687}
 688
 689static void segment_complete(int read_err, unsigned long write_err,
 690                             void *context)
 691{
 692        /* FIXME: tidy this function */
 693        sector_t progress = 0;
 694        sector_t count = 0;
 695        struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
 696        struct kcopyd_job *job = sub_job->master_job;
 697        struct dm_kcopyd_client *kc = job->kc;
 698
 699        mutex_lock(&job->lock);
 700
 701        /* update the error */
 702        if (read_err)
 703                job->read_err = 1;
 704
 705        if (write_err)
 706                job->write_err |= write_err;
 707
 708        /*
 709         * Only dispatch more work if there hasn't been an error.
 710         */
 711        if ((!job->read_err && !job->write_err) ||
 712            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 713                /* get the next chunk of work */
 714                progress = job->progress;
 715                count = job->source.count - progress;
 716                if (count) {
 717                        if (count > kc->sub_job_size)
 718                                count = kc->sub_job_size;
 719
 720                        job->progress += count;
 721                }
 722        }
 723        mutex_unlock(&job->lock);
 724
 725        if (count) {
 726                int i;
 727
 728                *sub_job = *job;
 729                sub_job->write_offset = progress;
 730                sub_job->source.sector += progress;
 731                sub_job->source.count = count;
 732
 733                for (i = 0; i < job->num_dests; i++) {
 734                        sub_job->dests[i].sector += progress;
 735                        sub_job->dests[i].count = count;
 736                }
 737
 738                sub_job->fn = segment_complete;
 739                sub_job->context = sub_job;
 740                dispatch_job(sub_job);
 741
 742        } else if (atomic_dec_and_test(&job->sub_jobs)) {
 743
 744                /*
 745                 * Queue the completion callback to the kcopyd thread.
 746                 *
 747                 * Some callers assume that all the completions are called
 748                 * from a single thread and don't race with each other.
 749                 *
 750                 * We must not call the callback directly here because this
 751                 * code may not be executing in the thread.
 752                 */
 753                push(&kc->complete_jobs, job);
 754                wake(kc);
 755        }
 756}
 757
 758/*
 759 * Create some sub jobs to share the work between them.
 760 */
 761static void split_job(struct kcopyd_job *master_job)
 762{
 763        int i;
 764
 765        atomic_inc(&master_job->kc->nr_jobs);
 766
 767        atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
 768        for (i = 0; i < SPLIT_COUNT; i++) {
 769                master_job[i + 1].master_job = master_job;
 770                segment_complete(0, 0u, &master_job[i + 1]);
 771        }
 772}
 773
 774void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
 775                    unsigned int num_dests, struct dm_io_region *dests,
 776                    unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
 777{
 778        struct kcopyd_job *job;
 779        int i;
 780
 781        /*
 782         * Allocate an array of jobs consisting of one master job
 783         * followed by SPLIT_COUNT sub jobs.
 784         */
 785        job = mempool_alloc(&kc->job_pool, GFP_NOIO);
 786        mutex_init(&job->lock);
 787
 788        /*
 789         * set up for the read.
 790         */
 791        job->kc = kc;
 792        job->flags = flags;
 793        job->read_err = 0;
 794        job->write_err = 0;
 795
 796        job->num_dests = num_dests;
 797        memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 798
 799        /*
 800         * If one of the destination is a host-managed zoned block device,
 801         * we need to write sequentially. If one of the destination is a
 802         * host-aware device, then leave it to the caller to choose what to do.
 803         */
 804        if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
 805                for (i = 0; i < job->num_dests; i++) {
 806                        if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
 807                                set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
 808                                break;
 809                        }
 810                }
 811        }
 812
 813        /*
 814         * If we need to write sequentially, errors cannot be ignored.
 815         */
 816        if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
 817            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
 818                clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
 819
 820        if (from) {
 821                job->source = *from;
 822                job->pages = NULL;
 823                job->rw = READ;
 824        } else {
 825                memset(&job->source, 0, sizeof job->source);
 826                job->source.count = job->dests[0].count;
 827                job->pages = &zero_page_list;
 828
 829                /*
 830                 * Use WRITE ZEROES to optimize zeroing if all dests support it.
 831                 */
 832                job->rw = REQ_OP_WRITE_ZEROES;
 833                for (i = 0; i < job->num_dests; i++)
 834                        if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
 835                                job->rw = WRITE;
 836                                break;
 837                        }
 838        }
 839
 840        job->fn = fn;
 841        job->context = context;
 842        job->master_job = job;
 843        job->write_offset = 0;
 844
 845        if (job->source.count <= kc->sub_job_size)
 846                dispatch_job(job);
 847        else {
 848                job->progress = 0;
 849                split_job(job);
 850        }
 851}
 852EXPORT_SYMBOL(dm_kcopyd_copy);
 853
 854void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
 855                    unsigned num_dests, struct dm_io_region *dests,
 856                    unsigned flags, dm_kcopyd_notify_fn fn, void *context)
 857{
 858        dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
 859}
 860EXPORT_SYMBOL(dm_kcopyd_zero);
 861
 862void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
 863                                 dm_kcopyd_notify_fn fn, void *context)
 864{
 865        struct kcopyd_job *job;
 866
 867        job = mempool_alloc(&kc->job_pool, GFP_NOIO);
 868
 869        memset(job, 0, sizeof(struct kcopyd_job));
 870        job->kc = kc;
 871        job->fn = fn;
 872        job->context = context;
 873        job->master_job = job;
 874
 875        atomic_inc(&kc->nr_jobs);
 876
 877        return job;
 878}
 879EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
 880
 881void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
 882{
 883        struct kcopyd_job *job = j;
 884        struct dm_kcopyd_client *kc = job->kc;
 885
 886        job->read_err = read_err;
 887        job->write_err = write_err;
 888
 889        push(&kc->callback_jobs, job);
 890        wake(kc);
 891}
 892EXPORT_SYMBOL(dm_kcopyd_do_callback);
 893
 894/*
 895 * Cancels a kcopyd job, eg. someone might be deactivating a
 896 * mirror.
 897 */
 898#if 0
 899int kcopyd_cancel(struct kcopyd_job *job, int block)
 900{
 901        /* FIXME: finish */
 902        return -1;
 903}
 904#endif  /*  0  */
 905
 906/*-----------------------------------------------------------------
 907 * Client setup
 908 *---------------------------------------------------------------*/
 909struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
 910{
 911        int r;
 912        unsigned reserve_pages;
 913        struct dm_kcopyd_client *kc;
 914
 915        kc = kzalloc(sizeof(*kc), GFP_KERNEL);
 916        if (!kc)
 917                return ERR_PTR(-ENOMEM);
 918
 919        spin_lock_init(&kc->job_lock);
 920        INIT_LIST_HEAD(&kc->callback_jobs);
 921        INIT_LIST_HEAD(&kc->complete_jobs);
 922        INIT_LIST_HEAD(&kc->io_jobs);
 923        INIT_LIST_HEAD(&kc->pages_jobs);
 924        kc->throttle = throttle;
 925
 926        r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
 927        if (r)
 928                goto bad_slab;
 929
 930        INIT_WORK(&kc->kcopyd_work, do_work);
 931        kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
 932        if (!kc->kcopyd_wq) {
 933                r = -ENOMEM;
 934                goto bad_workqueue;
 935        }
 936
 937        kc->sub_job_size = dm_get_kcopyd_subjob_size();
 938        reserve_pages = DIV_ROUND_UP(kc->sub_job_size << SECTOR_SHIFT, PAGE_SIZE);
 939
 940        kc->pages = NULL;
 941        kc->nr_reserved_pages = kc->nr_free_pages = 0;
 942        r = client_reserve_pages(kc, reserve_pages);
 943        if (r)
 944                goto bad_client_pages;
 945
 946        kc->io_client = dm_io_client_create();
 947        if (IS_ERR(kc->io_client)) {
 948                r = PTR_ERR(kc->io_client);
 949                goto bad_io_client;
 950        }
 951
 952        init_waitqueue_head(&kc->destroyq);
 953        atomic_set(&kc->nr_jobs, 0);
 954
 955        return kc;
 956
 957bad_io_client:
 958        client_free_pages(kc);
 959bad_client_pages:
 960        destroy_workqueue(kc->kcopyd_wq);
 961bad_workqueue:
 962        mempool_exit(&kc->job_pool);
 963bad_slab:
 964        kfree(kc);
 965
 966        return ERR_PTR(r);
 967}
 968EXPORT_SYMBOL(dm_kcopyd_client_create);
 969
 970void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
 971{
 972        /* Wait for completion of all jobs submitted by this client. */
 973        wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 974
 975        BUG_ON(!list_empty(&kc->callback_jobs));
 976        BUG_ON(!list_empty(&kc->complete_jobs));
 977        BUG_ON(!list_empty(&kc->io_jobs));
 978        BUG_ON(!list_empty(&kc->pages_jobs));
 979        destroy_workqueue(kc->kcopyd_wq);
 980        dm_io_client_destroy(kc->io_client);
 981        client_free_pages(kc);
 982        mempool_exit(&kc->job_pool);
 983        kfree(kc);
 984}
 985EXPORT_SYMBOL(dm_kcopyd_client_destroy);
 986