linux/drivers/md/dm-kcopyd.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2002 Sistina Software (UK) Limited.
   3 * Copyright (C) 2006 Red Hat GmbH
   4 *
   5 * This file is released under the GPL.
   6 *
   7 * Kcopyd provides a simple interface for copying an area of one
   8 * block-device to one or more other block-devices, with an asynchronous
   9 * completion notification.
  10 */
  11
  12#include <linux/types.h>
  13#include <linux/atomic.h>
  14#include <linux/blkdev.h>
  15#include <linux/fs.h>
  16#include <linux/init.h>
  17#include <linux/list.h>
  18#include <linux/mempool.h>
  19#include <linux/module.h>
  20#include <linux/pagemap.h>
  21#include <linux/slab.h>
  22#include <linux/vmalloc.h>
  23#include <linux/workqueue.h>
  24#include <linux/mutex.h>
  25#include <linux/delay.h>
  26#include <linux/device-mapper.h>
  27#include <linux/dm-kcopyd.h>
  28
  29#include "dm-core.h"
  30
  31#define SUB_JOB_SIZE    128
  32#define SPLIT_COUNT     8
  33#define MIN_JOBS        8
  34#define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
  35
  36/*-----------------------------------------------------------------
  37 * Each kcopyd client has its own little pool of preallocated
  38 * pages for kcopyd io.
  39 *---------------------------------------------------------------*/
  40struct dm_kcopyd_client {
  41        struct page_list *pages;
  42        unsigned nr_reserved_pages;
  43        unsigned nr_free_pages;
  44
  45        struct dm_io_client *io_client;
  46
  47        wait_queue_head_t destroyq;
  48
  49        mempool_t job_pool;
  50
  51        struct workqueue_struct *kcopyd_wq;
  52        struct work_struct kcopyd_work;
  53
  54        struct dm_kcopyd_throttle *throttle;
  55
  56        atomic_t nr_jobs;
  57
  58/*
  59 * We maintain four lists of jobs:
  60 *
  61 * i)   jobs waiting for pages
  62 * ii)  jobs that have pages, and are waiting for the io to be issued.
  63 * iii) jobs that don't need to do any IO and just run a callback
  64 * iv) jobs that have completed.
  65 *
  66 * All four of these are protected by job_lock.
  67 */
  68        spinlock_t job_lock;
  69        struct list_head callback_jobs;
  70        struct list_head complete_jobs;
  71        struct list_head io_jobs;
  72        struct list_head pages_jobs;
  73};
  74
  75static struct page_list zero_page_list;
  76
  77static DEFINE_SPINLOCK(throttle_spinlock);
  78
  79/*
  80 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
  81 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
  82 * by 2.
  83 */
  84#define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
  85
  86/*
  87 * Sleep this number of milliseconds.
  88 *
  89 * The value was decided experimentally.
  90 * Smaller values seem to cause an increased copy rate above the limit.
  91 * The reason for this is unknown but possibly due to jiffies rounding errors
  92 * or read/write cache inside the disk.
  93 */
  94#define SLEEP_MSEC                      100
  95
  96/*
  97 * Maximum number of sleep events. There is a theoretical livelock if more
  98 * kcopyd clients do work simultaneously which this limit avoids.
  99 */
 100#define MAX_SLEEPS                      10
 101
 102static void io_job_start(struct dm_kcopyd_throttle *t)
 103{
 104        unsigned throttle, now, difference;
 105        int slept = 0, skew;
 106
 107        if (unlikely(!t))
 108                return;
 109
 110try_again:
 111        spin_lock_irq(&throttle_spinlock);
 112
 113        throttle = READ_ONCE(t->throttle);
 114
 115        if (likely(throttle >= 100))
 116                goto skip_limit;
 117
 118        now = jiffies;
 119        difference = now - t->last_jiffies;
 120        t->last_jiffies = now;
 121        if (t->num_io_jobs)
 122                t->io_period += difference;
 123        t->total_period += difference;
 124
 125        /*
 126         * Maintain sane values if we got a temporary overflow.
 127         */
 128        if (unlikely(t->io_period > t->total_period))
 129                t->io_period = t->total_period;
 130
 131        if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
 132                int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
 133                t->total_period >>= shift;
 134                t->io_period >>= shift;
 135        }
 136
 137        skew = t->io_period - throttle * t->total_period / 100;
 138
 139        if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
 140                slept++;
 141                spin_unlock_irq(&throttle_spinlock);
 142                msleep(SLEEP_MSEC);
 143                goto try_again;
 144        }
 145
 146skip_limit:
 147        t->num_io_jobs++;
 148
 149        spin_unlock_irq(&throttle_spinlock);
 150}
 151
 152static void io_job_finish(struct dm_kcopyd_throttle *t)
 153{
 154        unsigned long flags;
 155
 156        if (unlikely(!t))
 157                return;
 158
 159        spin_lock_irqsave(&throttle_spinlock, flags);
 160
 161        t->num_io_jobs--;
 162
 163        if (likely(READ_ONCE(t->throttle) >= 100))
 164                goto skip_limit;
 165
 166        if (!t->num_io_jobs) {
 167                unsigned now, difference;
 168
 169                now = jiffies;
 170                difference = now - t->last_jiffies;
 171                t->last_jiffies = now;
 172
 173                t->io_period += difference;
 174                t->total_period += difference;
 175
 176                /*
 177                 * Maintain sane values if we got a temporary overflow.
 178                 */
 179                if (unlikely(t->io_period > t->total_period))
 180                        t->io_period = t->total_period;
 181        }
 182
 183skip_limit:
 184        spin_unlock_irqrestore(&throttle_spinlock, flags);
 185}
 186
 187
 188static void wake(struct dm_kcopyd_client *kc)
 189{
 190        queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
 191}
 192
 193/*
 194 * Obtain one page for the use of kcopyd.
 195 */
 196static struct page_list *alloc_pl(gfp_t gfp)
 197{
 198        struct page_list *pl;
 199
 200        pl = kmalloc(sizeof(*pl), gfp);
 201        if (!pl)
 202                return NULL;
 203
 204        pl->page = alloc_page(gfp);
 205        if (!pl->page) {
 206                kfree(pl);
 207                return NULL;
 208        }
 209
 210        return pl;
 211}
 212
 213static void free_pl(struct page_list *pl)
 214{
 215        __free_page(pl->page);
 216        kfree(pl);
 217}
 218
 219/*
 220 * Add the provided pages to a client's free page list, releasing
 221 * back to the system any beyond the reserved_pages limit.
 222 */
 223static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
 224{
 225        struct page_list *next;
 226
 227        do {
 228                next = pl->next;
 229
 230                if (kc->nr_free_pages >= kc->nr_reserved_pages)
 231                        free_pl(pl);
 232                else {
 233                        pl->next = kc->pages;
 234                        kc->pages = pl;
 235                        kc->nr_free_pages++;
 236                }
 237
 238                pl = next;
 239        } while (pl);
 240}
 241
 242static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
 243                            unsigned int nr, struct page_list **pages)
 244{
 245        struct page_list *pl;
 246
 247        *pages = NULL;
 248
 249        do {
 250                pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
 251                if (unlikely(!pl)) {
 252                        /* Use reserved pages */
 253                        pl = kc->pages;
 254                        if (unlikely(!pl))
 255                                goto out_of_memory;
 256                        kc->pages = pl->next;
 257                        kc->nr_free_pages--;
 258                }
 259                pl->next = *pages;
 260                *pages = pl;
 261        } while (--nr);
 262
 263        return 0;
 264
 265out_of_memory:
 266        if (*pages)
 267                kcopyd_put_pages(kc, *pages);
 268        return -ENOMEM;
 269}
 270
 271/*
 272 * These three functions resize the page pool.
 273 */
 274static void drop_pages(struct page_list *pl)
 275{
 276        struct page_list *next;
 277
 278        while (pl) {
 279                next = pl->next;
 280                free_pl(pl);
 281                pl = next;
 282        }
 283}
 284
 285/*
 286 * Allocate and reserve nr_pages for the use of a specific client.
 287 */
 288static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
 289{
 290        unsigned i;
 291        struct page_list *pl = NULL, *next;
 292
 293        for (i = 0; i < nr_pages; i++) {
 294                next = alloc_pl(GFP_KERNEL);
 295                if (!next) {
 296                        if (pl)
 297                                drop_pages(pl);
 298                        return -ENOMEM;
 299                }
 300                next->next = pl;
 301                pl = next;
 302        }
 303
 304        kc->nr_reserved_pages += nr_pages;
 305        kcopyd_put_pages(kc, pl);
 306
 307        return 0;
 308}
 309
 310static void client_free_pages(struct dm_kcopyd_client *kc)
 311{
 312        BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
 313        drop_pages(kc->pages);
 314        kc->pages = NULL;
 315        kc->nr_free_pages = kc->nr_reserved_pages = 0;
 316}
 317
 318/*-----------------------------------------------------------------
 319 * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 320 * for this reason we use a mempool to prevent the client from
 321 * ever having to do io (which could cause a deadlock).
 322 *---------------------------------------------------------------*/
 323struct kcopyd_job {
 324        struct dm_kcopyd_client *kc;
 325        struct list_head list;
 326        unsigned long flags;
 327
 328        /*
 329         * Error state of the job.
 330         */
 331        int read_err;
 332        unsigned long write_err;
 333
 334        /*
 335         * Either READ or WRITE
 336         */
 337        int rw;
 338        struct dm_io_region source;
 339
 340        /*
 341         * The destinations for the transfer.
 342         */
 343        unsigned int num_dests;
 344        struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
 345
 346        struct page_list *pages;
 347
 348        /*
 349         * Set this to ensure you are notified when the job has
 350         * completed.  'context' is for callback to use.
 351         */
 352        dm_kcopyd_notify_fn fn;
 353        void *context;
 354
 355        /*
 356         * These fields are only used if the job has been split
 357         * into more manageable parts.
 358         */
 359        struct mutex lock;
 360        atomic_t sub_jobs;
 361        sector_t progress;
 362        sector_t write_offset;
 363
 364        struct kcopyd_job *master_job;
 365};
 366
 367static struct kmem_cache *_job_cache;
 368
 369int __init dm_kcopyd_init(void)
 370{
 371        _job_cache = kmem_cache_create("kcopyd_job",
 372                                sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
 373                                __alignof__(struct kcopyd_job), 0, NULL);
 374        if (!_job_cache)
 375                return -ENOMEM;
 376
 377        zero_page_list.next = &zero_page_list;
 378        zero_page_list.page = ZERO_PAGE(0);
 379
 380        return 0;
 381}
 382
 383void dm_kcopyd_exit(void)
 384{
 385        kmem_cache_destroy(_job_cache);
 386        _job_cache = NULL;
 387}
 388
 389/*
 390 * Functions to push and pop a job onto the head of a given job
 391 * list.
 392 */
 393static struct kcopyd_job *pop_io_job(struct list_head *jobs,
 394                                     struct dm_kcopyd_client *kc)
 395{
 396        struct kcopyd_job *job;
 397
 398        /*
 399         * For I/O jobs, pop any read, any write without sequential write
 400         * constraint and sequential writes that are at the right position.
 401         */
 402        list_for_each_entry(job, jobs, list) {
 403                if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
 404                        list_del(&job->list);
 405                        return job;
 406                }
 407
 408                if (job->write_offset == job->master_job->write_offset) {
 409                        job->master_job->write_offset += job->source.count;
 410                        list_del(&job->list);
 411                        return job;
 412                }
 413        }
 414
 415        return NULL;
 416}
 417
 418static struct kcopyd_job *pop(struct list_head *jobs,
 419                              struct dm_kcopyd_client *kc)
 420{
 421        struct kcopyd_job *job = NULL;
 422        unsigned long flags;
 423
 424        spin_lock_irqsave(&kc->job_lock, flags);
 425
 426        if (!list_empty(jobs)) {
 427                if (jobs == &kc->io_jobs)
 428                        job = pop_io_job(jobs, kc);
 429                else {
 430                        job = list_entry(jobs->next, struct kcopyd_job, list);
 431                        list_del(&job->list);
 432                }
 433        }
 434        spin_unlock_irqrestore(&kc->job_lock, flags);
 435
 436        return job;
 437}
 438
 439static void push(struct list_head *jobs, struct kcopyd_job *job)
 440{
 441        unsigned long flags;
 442        struct dm_kcopyd_client *kc = job->kc;
 443
 444        spin_lock_irqsave(&kc->job_lock, flags);
 445        list_add_tail(&job->list, jobs);
 446        spin_unlock_irqrestore(&kc->job_lock, flags);
 447}
 448
 449
 450static void push_head(struct list_head *jobs, struct kcopyd_job *job)
 451{
 452        unsigned long flags;
 453        struct dm_kcopyd_client *kc = job->kc;
 454
 455        spin_lock_irqsave(&kc->job_lock, flags);
 456        list_add(&job->list, jobs);
 457        spin_unlock_irqrestore(&kc->job_lock, flags);
 458}
 459
 460/*
 461 * These three functions process 1 item from the corresponding
 462 * job list.
 463 *
 464 * They return:
 465 * < 0: error
 466 *   0: success
 467 * > 0: can't process yet.
 468 */
 469static int run_complete_job(struct kcopyd_job *job)
 470{
 471        void *context = job->context;
 472        int read_err = job->read_err;
 473        unsigned long write_err = job->write_err;
 474        dm_kcopyd_notify_fn fn = job->fn;
 475        struct dm_kcopyd_client *kc = job->kc;
 476
 477        if (job->pages && job->pages != &zero_page_list)
 478                kcopyd_put_pages(kc, job->pages);
 479        /*
 480         * If this is the master job, the sub jobs have already
 481         * completed so we can free everything.
 482         */
 483        if (job->master_job == job) {
 484                mutex_destroy(&job->lock);
 485                mempool_free(job, &kc->job_pool);
 486        }
 487        fn(read_err, write_err, context);
 488
 489        if (atomic_dec_and_test(&kc->nr_jobs))
 490                wake_up(&kc->destroyq);
 491
 492        cond_resched();
 493
 494        return 0;
 495}
 496
 497static void complete_io(unsigned long error, void *context)
 498{
 499        struct kcopyd_job *job = (struct kcopyd_job *) context;
 500        struct dm_kcopyd_client *kc = job->kc;
 501
 502        io_job_finish(kc->throttle);
 503
 504        if (error) {
 505                if (op_is_write(job->rw))
 506                        job->write_err |= error;
 507                else
 508                        job->read_err = 1;
 509
 510                if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 511                        push(&kc->complete_jobs, job);
 512                        wake(kc);
 513                        return;
 514                }
 515        }
 516
 517        if (op_is_write(job->rw))
 518                push(&kc->complete_jobs, job);
 519
 520        else {
 521                job->rw = WRITE;
 522                push(&kc->io_jobs, job);
 523        }
 524
 525        wake(kc);
 526}
 527
 528/*
 529 * Request io on as many buffer heads as we can currently get for
 530 * a particular job.
 531 */
 532static int run_io_job(struct kcopyd_job *job)
 533{
 534        int r;
 535        struct dm_io_request io_req = {
 536                .bi_op = job->rw,
 537                .bi_op_flags = 0,
 538                .mem.type = DM_IO_PAGE_LIST,
 539                .mem.ptr.pl = job->pages,
 540                .mem.offset = 0,
 541                .notify.fn = complete_io,
 542                .notify.context = job,
 543                .client = job->kc->io_client,
 544        };
 545
 546        /*
 547         * If we need to write sequentially and some reads or writes failed,
 548         * no point in continuing.
 549         */
 550        if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
 551            job->master_job->write_err)
 552                return -EIO;
 553
 554        io_job_start(job->kc->throttle);
 555
 556        if (job->rw == READ)
 557                r = dm_io(&io_req, 1, &job->source, NULL);
 558        else
 559                r = dm_io(&io_req, job->num_dests, job->dests, NULL);
 560
 561        return r;
 562}
 563
 564static int run_pages_job(struct kcopyd_job *job)
 565{
 566        int r;
 567        unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
 568
 569        r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
 570        if (!r) {
 571                /* this job is ready for io */
 572                push(&job->kc->io_jobs, job);
 573                return 0;
 574        }
 575
 576        if (r == -ENOMEM)
 577                /* can't complete now */
 578                return 1;
 579
 580        return r;
 581}
 582
 583/*
 584 * Run through a list for as long as possible.  Returns the count
 585 * of successful jobs.
 586 */
 587static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
 588                        int (*fn) (struct kcopyd_job *))
 589{
 590        struct kcopyd_job *job;
 591        int r, count = 0;
 592
 593        while ((job = pop(jobs, kc))) {
 594
 595                r = fn(job);
 596
 597                if (r < 0) {
 598                        /* error this rogue job */
 599                        if (op_is_write(job->rw))
 600                                job->write_err = (unsigned long) -1L;
 601                        else
 602                                job->read_err = 1;
 603                        push(&kc->complete_jobs, job);
 604                        break;
 605                }
 606
 607                if (r > 0) {
 608                        /*
 609                         * We couldn't service this job ATM, so
 610                         * push this job back onto the list.
 611                         */
 612                        push_head(jobs, job);
 613                        break;
 614                }
 615
 616                count++;
 617        }
 618
 619        return count;
 620}
 621
 622/*
 623 * kcopyd does this every time it's woken up.
 624 */
 625static void do_work(struct work_struct *work)
 626{
 627        struct dm_kcopyd_client *kc = container_of(work,
 628                                        struct dm_kcopyd_client, kcopyd_work);
 629        struct blk_plug plug;
 630        unsigned long flags;
 631
 632        /*
 633         * The order that these are called is *very* important.
 634         * complete jobs can free some pages for pages jobs.
 635         * Pages jobs when successful will jump onto the io jobs
 636         * list.  io jobs call wake when they complete and it all
 637         * starts again.
 638         */
 639        spin_lock_irqsave(&kc->job_lock, flags);
 640        list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
 641        spin_unlock_irqrestore(&kc->job_lock, flags);
 642
 643        blk_start_plug(&plug);
 644        process_jobs(&kc->complete_jobs, kc, run_complete_job);
 645        process_jobs(&kc->pages_jobs, kc, run_pages_job);
 646        process_jobs(&kc->io_jobs, kc, run_io_job);
 647        blk_finish_plug(&plug);
 648}
 649
 650/*
 651 * If we are copying a small region we just dispatch a single job
 652 * to do the copy, otherwise the io has to be split up into many
 653 * jobs.
 654 */
 655static void dispatch_job(struct kcopyd_job *job)
 656{
 657        struct dm_kcopyd_client *kc = job->kc;
 658        atomic_inc(&kc->nr_jobs);
 659        if (unlikely(!job->source.count))
 660                push(&kc->callback_jobs, job);
 661        else if (job->pages == &zero_page_list)
 662                push(&kc->io_jobs, job);
 663        else
 664                push(&kc->pages_jobs, job);
 665        wake(kc);
 666}
 667
 668static void segment_complete(int read_err, unsigned long write_err,
 669                             void *context)
 670{
 671        /* FIXME: tidy this function */
 672        sector_t progress = 0;
 673        sector_t count = 0;
 674        struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
 675        struct kcopyd_job *job = sub_job->master_job;
 676        struct dm_kcopyd_client *kc = job->kc;
 677
 678        mutex_lock(&job->lock);
 679
 680        /* update the error */
 681        if (read_err)
 682                job->read_err = 1;
 683
 684        if (write_err)
 685                job->write_err |= write_err;
 686
 687        /*
 688         * Only dispatch more work if there hasn't been an error.
 689         */
 690        if ((!job->read_err && !job->write_err) ||
 691            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 692                /* get the next chunk of work */
 693                progress = job->progress;
 694                count = job->source.count - progress;
 695                if (count) {
 696                        if (count > SUB_JOB_SIZE)
 697                                count = SUB_JOB_SIZE;
 698
 699                        job->progress += count;
 700                }
 701        }
 702        mutex_unlock(&job->lock);
 703
 704        if (count) {
 705                int i;
 706
 707                *sub_job = *job;
 708                sub_job->write_offset = progress;
 709                sub_job->source.sector += progress;
 710                sub_job->source.count = count;
 711
 712                for (i = 0; i < job->num_dests; i++) {
 713                        sub_job->dests[i].sector += progress;
 714                        sub_job->dests[i].count = count;
 715                }
 716
 717                sub_job->fn = segment_complete;
 718                sub_job->context = sub_job;
 719                dispatch_job(sub_job);
 720
 721        } else if (atomic_dec_and_test(&job->sub_jobs)) {
 722
 723                /*
 724                 * Queue the completion callback to the kcopyd thread.
 725                 *
 726                 * Some callers assume that all the completions are called
 727                 * from a single thread and don't race with each other.
 728                 *
 729                 * We must not call the callback directly here because this
 730                 * code may not be executing in the thread.
 731                 */
 732                push(&kc->complete_jobs, job);
 733                wake(kc);
 734        }
 735}
 736
 737/*
 738 * Create some sub jobs to share the work between them.
 739 */
 740static void split_job(struct kcopyd_job *master_job)
 741{
 742        int i;
 743
 744        atomic_inc(&master_job->kc->nr_jobs);
 745
 746        atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
 747        for (i = 0; i < SPLIT_COUNT; i++) {
 748                master_job[i + 1].master_job = master_job;
 749                segment_complete(0, 0u, &master_job[i + 1]);
 750        }
 751}
 752
 753void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
 754                    unsigned int num_dests, struct dm_io_region *dests,
 755                    unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
 756{
 757        struct kcopyd_job *job;
 758        int i;
 759
 760        /*
 761         * Allocate an array of jobs consisting of one master job
 762         * followed by SPLIT_COUNT sub jobs.
 763         */
 764        job = mempool_alloc(&kc->job_pool, GFP_NOIO);
 765        mutex_init(&job->lock);
 766
 767        /*
 768         * set up for the read.
 769         */
 770        job->kc = kc;
 771        job->flags = flags;
 772        job->read_err = 0;
 773        job->write_err = 0;
 774
 775        job->num_dests = num_dests;
 776        memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 777
 778        /*
 779         * If one of the destination is a host-managed zoned block device,
 780         * we need to write sequentially. If one of the destination is a
 781         * host-aware device, then leave it to the caller to choose what to do.
 782         */
 783        if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
 784                for (i = 0; i < job->num_dests; i++) {
 785                        if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
 786                                set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
 787                                break;
 788                        }
 789                }
 790        }
 791
 792        /*
 793         * If we need to write sequentially, errors cannot be ignored.
 794         */
 795        if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
 796            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
 797                clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
 798
 799        if (from) {
 800                job->source = *from;
 801                job->pages = NULL;
 802                job->rw = READ;
 803        } else {
 804                memset(&job->source, 0, sizeof job->source);
 805                job->source.count = job->dests[0].count;
 806                job->pages = &zero_page_list;
 807
 808                /*
 809                 * Use WRITE ZEROES to optimize zeroing if all dests support it.
 810                 */
 811                job->rw = REQ_OP_WRITE_ZEROES;
 812                for (i = 0; i < job->num_dests; i++)
 813                        if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
 814                                job->rw = WRITE;
 815                                break;
 816                        }
 817        }
 818
 819        job->fn = fn;
 820        job->context = context;
 821        job->master_job = job;
 822        job->write_offset = 0;
 823
 824        if (job->source.count <= SUB_JOB_SIZE)
 825                dispatch_job(job);
 826        else {
 827                job->progress = 0;
 828                split_job(job);
 829        }
 830}
 831EXPORT_SYMBOL(dm_kcopyd_copy);
 832
 833void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
 834                    unsigned num_dests, struct dm_io_region *dests,
 835                    unsigned flags, dm_kcopyd_notify_fn fn, void *context)
 836{
 837        dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
 838}
 839EXPORT_SYMBOL(dm_kcopyd_zero);
 840
 841void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
 842                                 dm_kcopyd_notify_fn fn, void *context)
 843{
 844        struct kcopyd_job *job;
 845
 846        job = mempool_alloc(&kc->job_pool, GFP_NOIO);
 847
 848        memset(job, 0, sizeof(struct kcopyd_job));
 849        job->kc = kc;
 850        job->fn = fn;
 851        job->context = context;
 852        job->master_job = job;
 853
 854        atomic_inc(&kc->nr_jobs);
 855
 856        return job;
 857}
 858EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
 859
 860void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
 861{
 862        struct kcopyd_job *job = j;
 863        struct dm_kcopyd_client *kc = job->kc;
 864
 865        job->read_err = read_err;
 866        job->write_err = write_err;
 867
 868        push(&kc->callback_jobs, job);
 869        wake(kc);
 870}
 871EXPORT_SYMBOL(dm_kcopyd_do_callback);
 872
 873/*
 874 * Cancels a kcopyd job, eg. someone might be deactivating a
 875 * mirror.
 876 */
 877#if 0
 878int kcopyd_cancel(struct kcopyd_job *job, int block)
 879{
 880        /* FIXME: finish */
 881        return -1;
 882}
 883#endif  /*  0  */
 884
 885/*-----------------------------------------------------------------
 886 * Client setup
 887 *---------------------------------------------------------------*/
 888struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
 889{
 890        int r;
 891        struct dm_kcopyd_client *kc;
 892
 893        kc = kzalloc(sizeof(*kc), GFP_KERNEL);
 894        if (!kc)
 895                return ERR_PTR(-ENOMEM);
 896
 897        spin_lock_init(&kc->job_lock);
 898        INIT_LIST_HEAD(&kc->callback_jobs);
 899        INIT_LIST_HEAD(&kc->complete_jobs);
 900        INIT_LIST_HEAD(&kc->io_jobs);
 901        INIT_LIST_HEAD(&kc->pages_jobs);
 902        kc->throttle = throttle;
 903
 904        r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
 905        if (r)
 906                goto bad_slab;
 907
 908        INIT_WORK(&kc->kcopyd_work, do_work);
 909        kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
 910        if (!kc->kcopyd_wq) {
 911                r = -ENOMEM;
 912                goto bad_workqueue;
 913        }
 914
 915        kc->pages = NULL;
 916        kc->nr_reserved_pages = kc->nr_free_pages = 0;
 917        r = client_reserve_pages(kc, RESERVE_PAGES);
 918        if (r)
 919                goto bad_client_pages;
 920
 921        kc->io_client = dm_io_client_create();
 922        if (IS_ERR(kc->io_client)) {
 923                r = PTR_ERR(kc->io_client);
 924                goto bad_io_client;
 925        }
 926
 927        init_waitqueue_head(&kc->destroyq);
 928        atomic_set(&kc->nr_jobs, 0);
 929
 930        return kc;
 931
 932bad_io_client:
 933        client_free_pages(kc);
 934bad_client_pages:
 935        destroy_workqueue(kc->kcopyd_wq);
 936bad_workqueue:
 937        mempool_exit(&kc->job_pool);
 938bad_slab:
 939        kfree(kc);
 940
 941        return ERR_PTR(r);
 942}
 943EXPORT_SYMBOL(dm_kcopyd_client_create);
 944
 945void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
 946{
 947        /* Wait for completion of all jobs submitted by this client. */
 948        wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 949
 950        BUG_ON(!list_empty(&kc->callback_jobs));
 951        BUG_ON(!list_empty(&kc->complete_jobs));
 952        BUG_ON(!list_empty(&kc->io_jobs));
 953        BUG_ON(!list_empty(&kc->pages_jobs));
 954        destroy_workqueue(kc->kcopyd_wq);
 955        dm_io_client_destroy(kc->io_client);
 956        client_free_pages(kc);
 957        mempool_exit(&kc->job_pool);
 958        kfree(kc);
 959}
 960EXPORT_SYMBOL(dm_kcopyd_client_destroy);
 961