linux/drivers/md/dm-kcopyd.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2002 Sistina Software (UK) Limited.
   3 * Copyright (C) 2006 Red Hat GmbH
   4 *
   5 * This file is released under the GPL.
   6 *
   7 * Kcopyd provides a simple interface for copying an area of one
   8 * block-device to one or more other block-devices, with an asynchronous
   9 * completion notification.
  10 */
  11
  12#include <linux/types.h>
  13#include <linux/atomic.h>
  14#include <linux/blkdev.h>
  15#include <linux/fs.h>
  16#include <linux/init.h>
  17#include <linux/list.h>
  18#include <linux/mempool.h>
  19#include <linux/module.h>
  20#include <linux/pagemap.h>
  21#include <linux/slab.h>
  22#include <linux/vmalloc.h>
  23#include <linux/workqueue.h>
  24#include <linux/mutex.h>
  25#include <linux/delay.h>
  26#include <linux/device-mapper.h>
  27#include <linux/dm-kcopyd.h>
  28
  29#include "dm.h"
  30
  31#define SUB_JOB_SIZE    128
  32#define SPLIT_COUNT     8
  33#define MIN_JOBS        8
  34#define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
  35
  36/*-----------------------------------------------------------------
  37 * Each kcopyd client has its own little pool of preallocated
  38 * pages for kcopyd io.
  39 *---------------------------------------------------------------*/
  40struct dm_kcopyd_client {
  41        struct page_list *pages;
  42        unsigned nr_reserved_pages;
  43        unsigned nr_free_pages;
  44
  45        struct dm_io_client *io_client;
  46
  47        wait_queue_head_t destroyq;
  48        atomic_t nr_jobs;
  49
  50        mempool_t *job_pool;
  51
  52        struct workqueue_struct *kcopyd_wq;
  53        struct work_struct kcopyd_work;
  54
  55        struct dm_kcopyd_throttle *throttle;
  56
  57/*
  58 * We maintain three lists of jobs:
  59 *
  60 * i)   jobs waiting for pages
  61 * ii)  jobs that have pages, and are waiting for the io to be issued.
  62 * iii) jobs that have completed.
  63 *
  64 * All three of these are protected by job_lock.
  65 */
  66        spinlock_t job_lock;
  67        struct list_head complete_jobs;
  68        struct list_head io_jobs;
  69        struct list_head pages_jobs;
  70};
  71
  72static struct page_list zero_page_list;
  73
  74static DEFINE_SPINLOCK(throttle_spinlock);
  75
  76/*
  77 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
  78 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
  79 * by 2.
  80 */
  81#define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
  82
  83/*
  84 * Sleep this number of milliseconds.
  85 *
  86 * The value was decided experimentally.
  87 * Smaller values seem to cause an increased copy rate above the limit.
  88 * The reason for this is unknown but possibly due to jiffies rounding errors
  89 * or read/write cache inside the disk.
  90 */
  91#define SLEEP_MSEC                      100
  92
  93/*
  94 * Maximum number of sleep events. There is a theoretical livelock if more
  95 * kcopyd clients do work simultaneously which this limit avoids.
  96 */
  97#define MAX_SLEEPS                      10
  98
  99static void io_job_start(struct dm_kcopyd_throttle *t)
 100{
 101        unsigned throttle, now, difference;
 102        int slept = 0, skew;
 103
 104        if (unlikely(!t))
 105                return;
 106
 107try_again:
 108        spin_lock_irq(&throttle_spinlock);
 109
 110        throttle = ACCESS_ONCE(t->throttle);
 111
 112        if (likely(throttle >= 100))
 113                goto skip_limit;
 114
 115        now = jiffies;
 116        difference = now - t->last_jiffies;
 117        t->last_jiffies = now;
 118        if (t->num_io_jobs)
 119                t->io_period += difference;
 120        t->total_period += difference;
 121
 122        /*
 123         * Maintain sane values if we got a temporary overflow.
 124         */
 125        if (unlikely(t->io_period > t->total_period))
 126                t->io_period = t->total_period;
 127
 128        if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
 129                int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
 130                t->total_period >>= shift;
 131                t->io_period >>= shift;
 132        }
 133
 134        skew = t->io_period - throttle * t->total_period / 100;
 135
 136        if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
 137                slept++;
 138                spin_unlock_irq(&throttle_spinlock);
 139                msleep(SLEEP_MSEC);
 140                goto try_again;
 141        }
 142
 143skip_limit:
 144        t->num_io_jobs++;
 145
 146        spin_unlock_irq(&throttle_spinlock);
 147}
 148
 149static void io_job_finish(struct dm_kcopyd_throttle *t)
 150{
 151        unsigned long flags;
 152
 153        if (unlikely(!t))
 154                return;
 155
 156        spin_lock_irqsave(&throttle_spinlock, flags);
 157
 158        t->num_io_jobs--;
 159
 160        if (likely(ACCESS_ONCE(t->throttle) >= 100))
 161                goto skip_limit;
 162
 163        if (!t->num_io_jobs) {
 164                unsigned now, difference;
 165
 166                now = jiffies;
 167                difference = now - t->last_jiffies;
 168                t->last_jiffies = now;
 169
 170                t->io_period += difference;
 171                t->total_period += difference;
 172
 173                /*
 174                 * Maintain sane values if we got a temporary overflow.
 175                 */
 176                if (unlikely(t->io_period > t->total_period))
 177                        t->io_period = t->total_period;
 178        }
 179
 180skip_limit:
 181        spin_unlock_irqrestore(&throttle_spinlock, flags);
 182}
 183
 184
 185static void wake(struct dm_kcopyd_client *kc)
 186{
 187        queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
 188}
 189
 190/*
 191 * Obtain one page for the use of kcopyd.
 192 */
 193static struct page_list *alloc_pl(gfp_t gfp)
 194{
 195        struct page_list *pl;
 196
 197        pl = kmalloc(sizeof(*pl), gfp);
 198        if (!pl)
 199                return NULL;
 200
 201        pl->page = alloc_page(gfp);
 202        if (!pl->page) {
 203                kfree(pl);
 204                return NULL;
 205        }
 206
 207        return pl;
 208}
 209
 210static void free_pl(struct page_list *pl)
 211{
 212        __free_page(pl->page);
 213        kfree(pl);
 214}
 215
 216/*
 217 * Add the provided pages to a client's free page list, releasing
 218 * back to the system any beyond the reserved_pages limit.
 219 */
 220static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
 221{
 222        struct page_list *next;
 223
 224        do {
 225                next = pl->next;
 226
 227                if (kc->nr_free_pages >= kc->nr_reserved_pages)
 228                        free_pl(pl);
 229                else {
 230                        pl->next = kc->pages;
 231                        kc->pages = pl;
 232                        kc->nr_free_pages++;
 233                }
 234
 235                pl = next;
 236        } while (pl);
 237}
 238
 239static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
 240                            unsigned int nr, struct page_list **pages)
 241{
 242        struct page_list *pl;
 243
 244        *pages = NULL;
 245
 246        do {
 247                pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
 248                if (unlikely(!pl)) {
 249                        /* Use reserved pages */
 250                        pl = kc->pages;
 251                        if (unlikely(!pl))
 252                                goto out_of_memory;
 253                        kc->pages = pl->next;
 254                        kc->nr_free_pages--;
 255                }
 256                pl->next = *pages;
 257                *pages = pl;
 258        } while (--nr);
 259
 260        return 0;
 261
 262out_of_memory:
 263        if (*pages)
 264                kcopyd_put_pages(kc, *pages);
 265        return -ENOMEM;
 266}
 267
 268/*
 269 * These three functions resize the page pool.
 270 */
 271static void drop_pages(struct page_list *pl)
 272{
 273        struct page_list *next;
 274
 275        while (pl) {
 276                next = pl->next;
 277                free_pl(pl);
 278                pl = next;
 279        }
 280}
 281
 282/*
 283 * Allocate and reserve nr_pages for the use of a specific client.
 284 */
 285static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
 286{
 287        unsigned i;
 288        struct page_list *pl = NULL, *next;
 289
 290        for (i = 0; i < nr_pages; i++) {
 291                next = alloc_pl(GFP_KERNEL);
 292                if (!next) {
 293                        if (pl)
 294                                drop_pages(pl);
 295                        return -ENOMEM;
 296                }
 297                next->next = pl;
 298                pl = next;
 299        }
 300
 301        kc->nr_reserved_pages += nr_pages;
 302        kcopyd_put_pages(kc, pl);
 303
 304        return 0;
 305}
 306
 307static void client_free_pages(struct dm_kcopyd_client *kc)
 308{
 309        BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
 310        drop_pages(kc->pages);
 311        kc->pages = NULL;
 312        kc->nr_free_pages = kc->nr_reserved_pages = 0;
 313}
 314
 315/*-----------------------------------------------------------------
 316 * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 317 * for this reason we use a mempool to prevent the client from
 318 * ever having to do io (which could cause a deadlock).
 319 *---------------------------------------------------------------*/
 320struct kcopyd_job {
 321        struct dm_kcopyd_client *kc;
 322        struct list_head list;
 323        unsigned long flags;
 324
 325        /*
 326         * Error state of the job.
 327         */
 328        int read_err;
 329        unsigned long write_err;
 330
 331        /*
 332         * Either READ or WRITE
 333         */
 334        int rw;
 335        struct dm_io_region source;
 336
 337        /*
 338         * The destinations for the transfer.
 339         */
 340        unsigned int num_dests;
 341        struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
 342
 343        struct page_list *pages;
 344
 345        /*
 346         * Set this to ensure you are notified when the job has
 347         * completed.  'context' is for callback to use.
 348         */
 349        dm_kcopyd_notify_fn fn;
 350        void *context;
 351
 352        /*
 353         * These fields are only used if the job has been split
 354         * into more manageable parts.
 355         */
 356        struct mutex lock;
 357        atomic_t sub_jobs;
 358        sector_t progress;
 359
 360        struct kcopyd_job *master_job;
 361};
 362
 363static struct kmem_cache *_job_cache;
 364
 365int __init dm_kcopyd_init(void)
 366{
 367        _job_cache = kmem_cache_create("kcopyd_job",
 368                                sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
 369                                __alignof__(struct kcopyd_job), 0, NULL);
 370        if (!_job_cache)
 371                return -ENOMEM;
 372
 373        zero_page_list.next = &zero_page_list;
 374        zero_page_list.page = ZERO_PAGE(0);
 375
 376        return 0;
 377}
 378
 379void dm_kcopyd_exit(void)
 380{
 381        kmem_cache_destroy(_job_cache);
 382        _job_cache = NULL;
 383}
 384
 385/*
 386 * Functions to push and pop a job onto the head of a given job
 387 * list.
 388 */
 389static struct kcopyd_job *pop(struct list_head *jobs,
 390                              struct dm_kcopyd_client *kc)
 391{
 392        struct kcopyd_job *job = NULL;
 393        unsigned long flags;
 394
 395        spin_lock_irqsave(&kc->job_lock, flags);
 396
 397        if (!list_empty(jobs)) {
 398                job = list_entry(jobs->next, struct kcopyd_job, list);
 399                list_del(&job->list);
 400        }
 401        spin_unlock_irqrestore(&kc->job_lock, flags);
 402
 403        return job;
 404}
 405
 406static void push(struct list_head *jobs, struct kcopyd_job *job)
 407{
 408        unsigned long flags;
 409        struct dm_kcopyd_client *kc = job->kc;
 410
 411        spin_lock_irqsave(&kc->job_lock, flags);
 412        list_add_tail(&job->list, jobs);
 413        spin_unlock_irqrestore(&kc->job_lock, flags);
 414}
 415
 416
 417static void push_head(struct list_head *jobs, struct kcopyd_job *job)
 418{
 419        unsigned long flags;
 420        struct dm_kcopyd_client *kc = job->kc;
 421
 422        spin_lock_irqsave(&kc->job_lock, flags);
 423        list_add(&job->list, jobs);
 424        spin_unlock_irqrestore(&kc->job_lock, flags);
 425}
 426
 427/*
 428 * These three functions process 1 item from the corresponding
 429 * job list.
 430 *
 431 * They return:
 432 * < 0: error
 433 *   0: success
 434 * > 0: can't process yet.
 435 */
 436static int run_complete_job(struct kcopyd_job *job)
 437{
 438        void *context = job->context;
 439        int read_err = job->read_err;
 440        unsigned long write_err = job->write_err;
 441        dm_kcopyd_notify_fn fn = job->fn;
 442        struct dm_kcopyd_client *kc = job->kc;
 443
 444        if (job->pages && job->pages != &zero_page_list)
 445                kcopyd_put_pages(kc, job->pages);
 446        /*
 447         * If this is the master job, the sub jobs have already
 448         * completed so we can free everything.
 449         */
 450        if (job->master_job == job)
 451                mempool_free(job, kc->job_pool);
 452        fn(read_err, write_err, context);
 453
 454        if (atomic_dec_and_test(&kc->nr_jobs))
 455                wake_up(&kc->destroyq);
 456
 457        return 0;
 458}
 459
 460static void complete_io(unsigned long error, void *context)
 461{
 462        struct kcopyd_job *job = (struct kcopyd_job *) context;
 463        struct dm_kcopyd_client *kc = job->kc;
 464
 465        io_job_finish(kc->throttle);
 466
 467        if (error) {
 468                if (job->rw & WRITE)
 469                        job->write_err |= error;
 470                else
 471                        job->read_err = 1;
 472
 473                if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 474                        push(&kc->complete_jobs, job);
 475                        wake(kc);
 476                        return;
 477                }
 478        }
 479
 480        if (job->rw & WRITE)
 481                push(&kc->complete_jobs, job);
 482
 483        else {
 484                job->rw = WRITE;
 485                push(&kc->io_jobs, job);
 486        }
 487
 488        wake(kc);
 489}
 490
 491/*
 492 * Request io on as many buffer heads as we can currently get for
 493 * a particular job.
 494 */
 495static int run_io_job(struct kcopyd_job *job)
 496{
 497        int r;
 498        struct dm_io_request io_req = {
 499                .bi_rw = job->rw,
 500                .mem.type = DM_IO_PAGE_LIST,
 501                .mem.ptr.pl = job->pages,
 502                .mem.offset = 0,
 503                .notify.fn = complete_io,
 504                .notify.context = job,
 505                .client = job->kc->io_client,
 506        };
 507
 508        io_job_start(job->kc->throttle);
 509
 510        if (job->rw == READ)
 511                r = dm_io(&io_req, 1, &job->source, NULL);
 512        else
 513                r = dm_io(&io_req, job->num_dests, job->dests, NULL);
 514
 515        return r;
 516}
 517
 518static int run_pages_job(struct kcopyd_job *job)
 519{
 520        int r;
 521        unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
 522
 523        r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
 524        if (!r) {
 525                /* this job is ready for io */
 526                push(&job->kc->io_jobs, job);
 527                return 0;
 528        }
 529
 530        if (r == -ENOMEM)
 531                /* can't complete now */
 532                return 1;
 533
 534        return r;
 535}
 536
 537/*
 538 * Run through a list for as long as possible.  Returns the count
 539 * of successful jobs.
 540 */
 541static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
 542                        int (*fn) (struct kcopyd_job *))
 543{
 544        struct kcopyd_job *job;
 545        int r, count = 0;
 546
 547        while ((job = pop(jobs, kc))) {
 548
 549                r = fn(job);
 550
 551                if (r < 0) {
 552                        /* error this rogue job */
 553                        if (job->rw & WRITE)
 554                                job->write_err = (unsigned long) -1L;
 555                        else
 556                                job->read_err = 1;
 557                        push(&kc->complete_jobs, job);
 558                        break;
 559                }
 560
 561                if (r > 0) {
 562                        /*
 563                         * We couldn't service this job ATM, so
 564                         * push this job back onto the list.
 565                         */
 566                        push_head(jobs, job);
 567                        break;
 568                }
 569
 570                count++;
 571        }
 572
 573        return count;
 574}
 575
 576/*
 577 * kcopyd does this every time it's woken up.
 578 */
 579static void do_work(struct work_struct *work)
 580{
 581        struct dm_kcopyd_client *kc = container_of(work,
 582                                        struct dm_kcopyd_client, kcopyd_work);
 583        struct blk_plug plug;
 584
 585        /*
 586         * The order that these are called is *very* important.
 587         * complete jobs can free some pages for pages jobs.
 588         * Pages jobs when successful will jump onto the io jobs
 589         * list.  io jobs call wake when they complete and it all
 590         * starts again.
 591         */
 592        blk_start_plug(&plug);
 593        process_jobs(&kc->complete_jobs, kc, run_complete_job);
 594        process_jobs(&kc->pages_jobs, kc, run_pages_job);
 595        process_jobs(&kc->io_jobs, kc, run_io_job);
 596        blk_finish_plug(&plug);
 597}
 598
 599/*
 600 * If we are copying a small region we just dispatch a single job
 601 * to do the copy, otherwise the io has to be split up into many
 602 * jobs.
 603 */
 604static void dispatch_job(struct kcopyd_job *job)
 605{
 606        struct dm_kcopyd_client *kc = job->kc;
 607        atomic_inc(&kc->nr_jobs);
 608        if (unlikely(!job->source.count))
 609                push(&kc->complete_jobs, job);
 610        else if (job->pages == &zero_page_list)
 611                push(&kc->io_jobs, job);
 612        else
 613                push(&kc->pages_jobs, job);
 614        wake(kc);
 615}
 616
 617static void segment_complete(int read_err, unsigned long write_err,
 618                             void *context)
 619{
 620        /* FIXME: tidy this function */
 621        sector_t progress = 0;
 622        sector_t count = 0;
 623        struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
 624        struct kcopyd_job *job = sub_job->master_job;
 625        struct dm_kcopyd_client *kc = job->kc;
 626
 627        mutex_lock(&job->lock);
 628
 629        /* update the error */
 630        if (read_err)
 631                job->read_err = 1;
 632
 633        if (write_err)
 634                job->write_err |= write_err;
 635
 636        /*
 637         * Only dispatch more work if there hasn't been an error.
 638         */
 639        if ((!job->read_err && !job->write_err) ||
 640            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 641                /* get the next chunk of work */
 642                progress = job->progress;
 643                count = job->source.count - progress;
 644                if (count) {
 645                        if (count > SUB_JOB_SIZE)
 646                                count = SUB_JOB_SIZE;
 647
 648                        job->progress += count;
 649                }
 650        }
 651        mutex_unlock(&job->lock);
 652
 653        if (count) {
 654                int i;
 655
 656                *sub_job = *job;
 657                sub_job->source.sector += progress;
 658                sub_job->source.count = count;
 659
 660                for (i = 0; i < job->num_dests; i++) {
 661                        sub_job->dests[i].sector += progress;
 662                        sub_job->dests[i].count = count;
 663                }
 664
 665                sub_job->fn = segment_complete;
 666                sub_job->context = sub_job;
 667                dispatch_job(sub_job);
 668
 669        } else if (atomic_dec_and_test(&job->sub_jobs)) {
 670
 671                /*
 672                 * Queue the completion callback to the kcopyd thread.
 673                 *
 674                 * Some callers assume that all the completions are called
 675                 * from a single thread and don't race with each other.
 676                 *
 677                 * We must not call the callback directly here because this
 678                 * code may not be executing in the thread.
 679                 */
 680                push(&kc->complete_jobs, job);
 681                wake(kc);
 682        }
 683}
 684
 685/*
 686 * Create some sub jobs to share the work between them.
 687 */
 688static void split_job(struct kcopyd_job *master_job)
 689{
 690        int i;
 691
 692        atomic_inc(&master_job->kc->nr_jobs);
 693
 694        atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
 695        for (i = 0; i < SPLIT_COUNT; i++) {
 696                master_job[i + 1].master_job = master_job;
 697                segment_complete(0, 0u, &master_job[i + 1]);
 698        }
 699}
 700
 701int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
 702                   unsigned int num_dests, struct dm_io_region *dests,
 703                   unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
 704{
 705        struct kcopyd_job *job;
 706        int i;
 707
 708        /*
 709         * Allocate an array of jobs consisting of one master job
 710         * followed by SPLIT_COUNT sub jobs.
 711         */
 712        job = mempool_alloc(kc->job_pool, GFP_NOIO);
 713
 714        /*
 715         * set up for the read.
 716         */
 717        job->kc = kc;
 718        job->flags = flags;
 719        job->read_err = 0;
 720        job->write_err = 0;
 721
 722        job->num_dests = num_dests;
 723        memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 724
 725        if (from) {
 726                job->source = *from;
 727                job->pages = NULL;
 728                job->rw = READ;
 729        } else {
 730                memset(&job->source, 0, sizeof job->source);
 731                job->source.count = job->dests[0].count;
 732                job->pages = &zero_page_list;
 733
 734                /*
 735                 * Use WRITE SAME to optimize zeroing if all dests support it.
 736                 */
 737                job->rw = WRITE | REQ_WRITE_SAME;
 738                for (i = 0; i < job->num_dests; i++)
 739                        if (!bdev_write_same(job->dests[i].bdev)) {
 740                                job->rw = WRITE;
 741                                break;
 742                        }
 743        }
 744
 745        job->fn = fn;
 746        job->context = context;
 747        job->master_job = job;
 748
 749        if (job->source.count <= SUB_JOB_SIZE)
 750                dispatch_job(job);
 751        else {
 752                mutex_init(&job->lock);
 753                job->progress = 0;
 754                split_job(job);
 755        }
 756
 757        return 0;
 758}
 759EXPORT_SYMBOL(dm_kcopyd_copy);
 760
 761int dm_kcopyd_zero(struct dm_kcopyd_client *kc,
 762                   unsigned num_dests, struct dm_io_region *dests,
 763                   unsigned flags, dm_kcopyd_notify_fn fn, void *context)
 764{
 765        return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
 766}
 767EXPORT_SYMBOL(dm_kcopyd_zero);
 768
 769void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
 770                                 dm_kcopyd_notify_fn fn, void *context)
 771{
 772        struct kcopyd_job *job;
 773
 774        job = mempool_alloc(kc->job_pool, GFP_NOIO);
 775
 776        memset(job, 0, sizeof(struct kcopyd_job));
 777        job->kc = kc;
 778        job->fn = fn;
 779        job->context = context;
 780        job->master_job = job;
 781
 782        atomic_inc(&kc->nr_jobs);
 783
 784        return job;
 785}
 786EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
 787
 788void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
 789{
 790        struct kcopyd_job *job = j;
 791        struct dm_kcopyd_client *kc = job->kc;
 792
 793        job->read_err = read_err;
 794        job->write_err = write_err;
 795
 796        push(&kc->complete_jobs, job);
 797        wake(kc);
 798}
 799EXPORT_SYMBOL(dm_kcopyd_do_callback);
 800
 801/*
 802 * Cancels a kcopyd job, eg. someone might be deactivating a
 803 * mirror.
 804 */
 805#if 0
 806int kcopyd_cancel(struct kcopyd_job *job, int block)
 807{
 808        /* FIXME: finish */
 809        return -1;
 810}
 811#endif  /*  0  */
 812
 813/*-----------------------------------------------------------------
 814 * Client setup
 815 *---------------------------------------------------------------*/
 816struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
 817{
 818        int r = -ENOMEM;
 819        struct dm_kcopyd_client *kc;
 820
 821        kc = kmalloc(sizeof(*kc), GFP_KERNEL);
 822        if (!kc)
 823                return ERR_PTR(-ENOMEM);
 824
 825        spin_lock_init(&kc->job_lock);
 826        INIT_LIST_HEAD(&kc->complete_jobs);
 827        INIT_LIST_HEAD(&kc->io_jobs);
 828        INIT_LIST_HEAD(&kc->pages_jobs);
 829        kc->throttle = throttle;
 830
 831        kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
 832        if (!kc->job_pool)
 833                goto bad_slab;
 834
 835        INIT_WORK(&kc->kcopyd_work, do_work);
 836        kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
 837        if (!kc->kcopyd_wq)
 838                goto bad_workqueue;
 839
 840        kc->pages = NULL;
 841        kc->nr_reserved_pages = kc->nr_free_pages = 0;
 842        r = client_reserve_pages(kc, RESERVE_PAGES);
 843        if (r)
 844                goto bad_client_pages;
 845
 846        kc->io_client = dm_io_client_create();
 847        if (IS_ERR(kc->io_client)) {
 848                r = PTR_ERR(kc->io_client);
 849                goto bad_io_client;
 850        }
 851
 852        init_waitqueue_head(&kc->destroyq);
 853        atomic_set(&kc->nr_jobs, 0);
 854
 855        return kc;
 856
 857bad_io_client:
 858        client_free_pages(kc);
 859bad_client_pages:
 860        destroy_workqueue(kc->kcopyd_wq);
 861bad_workqueue:
 862        mempool_destroy(kc->job_pool);
 863bad_slab:
 864        kfree(kc);
 865
 866        return ERR_PTR(r);
 867}
 868EXPORT_SYMBOL(dm_kcopyd_client_create);
 869
 870void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
 871{
 872        /* Wait for completion of all jobs submitted by this client. */
 873        wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 874
 875        BUG_ON(!list_empty(&kc->complete_jobs));
 876        BUG_ON(!list_empty(&kc->io_jobs));
 877        BUG_ON(!list_empty(&kc->pages_jobs));
 878        destroy_workqueue(kc->kcopyd_wq);
 879        dm_io_client_destroy(kc->io_client);
 880        client_free_pages(kc);
 881        mempool_destroy(kc->job_pool);
 882        kfree(kc);
 883}
 884EXPORT_SYMBOL(dm_kcopyd_client_destroy);
 885