qemu/block/mirror.c
<<
>>
Prefs
   1/*
   2 * Image mirroring
   3 *
   4 * Copyright Red Hat, Inc. 2012
   5 *
   6 * Authors:
   7 *  Paolo Bonzini  <pbonzini@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU LGPL, version 2 or later.
  10 * See the COPYING.LIB file in the top-level directory.
  11 *
  12 */
  13
  14#include "qemu/osdep.h"
  15#include "trace.h"
  16#include "block/blockjob.h"
  17#include "block/block_int.h"
  18#include "sysemu/block-backend.h"
  19#include "qapi/error.h"
  20#include "qapi/qmp/qerror.h"
  21#include "qemu/ratelimit.h"
  22#include "qemu/bitmap.h"
  23#include "qemu/error-report.h"
  24
  25#define SLICE_TIME    100000000ULL /* ns */
  26#define MAX_IN_FLIGHT 16
  27#define DEFAULT_MIRROR_BUF_SIZE   (10 << 20)
  28
  29/* The mirroring buffer is a list of granularity-sized chunks.
  30 * Free chunks are organized in a list.
  31 */
  32typedef struct MirrorBuffer {
  33    QSIMPLEQ_ENTRY(MirrorBuffer) next;
  34} MirrorBuffer;
  35
  36typedef struct MirrorBlockJob {
  37    BlockJob common;
  38    RateLimit limit;
  39    BlockDriverState *target;
  40    BlockDriverState *base;
  41    /* The name of the graph node to replace */
  42    char *replaces;
  43    /* The BDS to replace */
  44    BlockDriverState *to_replace;
  45    /* Used to block operations on the drive-mirror-replace target */
  46    Error *replace_blocker;
  47    bool is_none_mode;
  48    BlockdevOnError on_source_error, on_target_error;
  49    bool synced;
  50    bool should_complete;
  51    int64_t granularity;
  52    size_t buf_size;
  53    int64_t bdev_length;
  54    unsigned long *cow_bitmap;
  55    BdrvDirtyBitmap *dirty_bitmap;
  56    HBitmapIter hbi;
  57    uint8_t *buf;
  58    QSIMPLEQ_HEAD(, MirrorBuffer) buf_free;
  59    int buf_free_count;
  60
  61    unsigned long *in_flight_bitmap;
  62    int in_flight;
  63    int sectors_in_flight;
  64    int ret;
  65    bool unmap;
  66    bool waiting_for_io;
  67    int target_cluster_sectors;
  68    int max_iov;
  69} MirrorBlockJob;
  70
  71typedef struct MirrorOp {
  72    MirrorBlockJob *s;
  73    QEMUIOVector qiov;
  74    int64_t sector_num;
  75    int nb_sectors;
  76} MirrorOp;
  77
  78static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
  79                                            int error)
  80{
  81    s->synced = false;
  82    if (read) {
  83        return block_job_error_action(&s->common, s->common.bs,
  84                                      s->on_source_error, true, error);
  85    } else {
  86        return block_job_error_action(&s->common, s->target,
  87                                      s->on_target_error, false, error);
  88    }
  89}
  90
  91static void mirror_iteration_done(MirrorOp *op, int ret)
  92{
  93    MirrorBlockJob *s = op->s;
  94    struct iovec *iov;
  95    int64_t chunk_num;
  96    int i, nb_chunks, sectors_per_chunk;
  97
  98    trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors, ret);
  99
 100    s->in_flight--;
 101    s->sectors_in_flight -= op->nb_sectors;
 102    iov = op->qiov.iov;
 103    for (i = 0; i < op->qiov.niov; i++) {
 104        MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base;
 105        QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next);
 106        s->buf_free_count++;
 107    }
 108
 109    sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
 110    chunk_num = op->sector_num / sectors_per_chunk;
 111    nb_chunks = DIV_ROUND_UP(op->nb_sectors, sectors_per_chunk);
 112    bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
 113    if (ret >= 0) {
 114        if (s->cow_bitmap) {
 115            bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
 116        }
 117        s->common.offset += (uint64_t)op->nb_sectors * BDRV_SECTOR_SIZE;
 118    }
 119
 120    qemu_iovec_destroy(&op->qiov);
 121    g_free(op);
 122
 123    if (s->waiting_for_io) {
 124        qemu_coroutine_enter(s->common.co, NULL);
 125    }
 126}
 127
 128static void mirror_write_complete(void *opaque, int ret)
 129{
 130    MirrorOp *op = opaque;
 131    MirrorBlockJob *s = op->s;
 132    if (ret < 0) {
 133        BlockErrorAction action;
 134
 135        bdrv_set_dirty_bitmap(s->dirty_bitmap, op->sector_num, op->nb_sectors);
 136        action = mirror_error_action(s, false, -ret);
 137        if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
 138            s->ret = ret;
 139        }
 140    }
 141    mirror_iteration_done(op, ret);
 142}
 143
 144static void mirror_read_complete(void *opaque, int ret)
 145{
 146    MirrorOp *op = opaque;
 147    MirrorBlockJob *s = op->s;
 148    if (ret < 0) {
 149        BlockErrorAction action;
 150
 151        bdrv_set_dirty_bitmap(s->dirty_bitmap, op->sector_num, op->nb_sectors);
 152        action = mirror_error_action(s, true, -ret);
 153        if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
 154            s->ret = ret;
 155        }
 156
 157        mirror_iteration_done(op, ret);
 158        return;
 159    }
 160    bdrv_aio_writev(s->target, op->sector_num, &op->qiov, op->nb_sectors,
 161                    mirror_write_complete, op);
 162}
 163
 164static inline void mirror_clip_sectors(MirrorBlockJob *s,
 165                                       int64_t sector_num,
 166                                       int *nb_sectors)
 167{
 168    *nb_sectors = MIN(*nb_sectors,
 169                      s->bdev_length / BDRV_SECTOR_SIZE - sector_num);
 170}
 171
 172/* Round sector_num and/or nb_sectors to target cluster if COW is needed, and
 173 * return the offset of the adjusted tail sector against original. */
 174static int mirror_cow_align(MirrorBlockJob *s,
 175                            int64_t *sector_num,
 176                            int *nb_sectors)
 177{
 178    bool need_cow;
 179    int ret = 0;
 180    int chunk_sectors = s->granularity >> BDRV_SECTOR_BITS;
 181    int64_t align_sector_num = *sector_num;
 182    int align_nb_sectors = *nb_sectors;
 183    int max_sectors = chunk_sectors * s->max_iov;
 184
 185    need_cow = !test_bit(*sector_num / chunk_sectors, s->cow_bitmap);
 186    need_cow |= !test_bit((*sector_num + *nb_sectors - 1) / chunk_sectors,
 187                          s->cow_bitmap);
 188    if (need_cow) {
 189        bdrv_round_to_clusters(s->target, *sector_num, *nb_sectors,
 190                               &align_sector_num, &align_nb_sectors);
 191    }
 192
 193    if (align_nb_sectors > max_sectors) {
 194        align_nb_sectors = max_sectors;
 195        if (need_cow) {
 196            align_nb_sectors = QEMU_ALIGN_DOWN(align_nb_sectors,
 197                                               s->target_cluster_sectors);
 198        }
 199    }
 200    /* Clipping may result in align_nb_sectors unaligned to chunk boundary, but
 201     * that doesn't matter because it's already the end of source image. */
 202    mirror_clip_sectors(s, align_sector_num, &align_nb_sectors);
 203
 204    ret = align_sector_num + align_nb_sectors - (*sector_num + *nb_sectors);
 205    *sector_num = align_sector_num;
 206    *nb_sectors = align_nb_sectors;
 207    assert(ret >= 0);
 208    return ret;
 209}
 210
 211static inline void mirror_wait_for_io(MirrorBlockJob *s)
 212{
 213    assert(!s->waiting_for_io);
 214    s->waiting_for_io = true;
 215    qemu_coroutine_yield();
 216    s->waiting_for_io = false;
 217}
 218
 219/* Submit async read while handling COW.
 220 * Returns: nb_sectors if no alignment is necessary, or
 221 *          (new_end - sector_num) if tail is rounded up or down due to
 222 *          alignment or buffer limit.
 223 */
 224static int mirror_do_read(MirrorBlockJob *s, int64_t sector_num,
 225                          int nb_sectors)
 226{
 227    BlockDriverState *source = s->common.bs;
 228    int sectors_per_chunk, nb_chunks;
 229    int ret = nb_sectors;
 230    MirrorOp *op;
 231
 232    sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
 233
 234    /* We can only handle as much as buf_size at a time. */
 235    nb_sectors = MIN(s->buf_size >> BDRV_SECTOR_BITS, nb_sectors);
 236    assert(nb_sectors);
 237
 238    if (s->cow_bitmap) {
 239        ret += mirror_cow_align(s, &sector_num, &nb_sectors);
 240    }
 241    assert(nb_sectors << BDRV_SECTOR_BITS <= s->buf_size);
 242    /* The sector range must meet granularity because:
 243     * 1) Caller passes in aligned values;
 244     * 2) mirror_cow_align is used only when target cluster is larger. */
 245    assert(!(sector_num % sectors_per_chunk));
 246    nb_chunks = DIV_ROUND_UP(nb_sectors, sectors_per_chunk);
 247
 248    while (s->buf_free_count < nb_chunks) {
 249        trace_mirror_yield_in_flight(s, sector_num, s->in_flight);
 250        mirror_wait_for_io(s);
 251    }
 252
 253    /* Allocate a MirrorOp that is used as an AIO callback.  */
 254    op = g_new(MirrorOp, 1);
 255    op->s = s;
 256    op->sector_num = sector_num;
 257    op->nb_sectors = nb_sectors;
 258
 259    /* Now make a QEMUIOVector taking enough granularity-sized chunks
 260     * from s->buf_free.
 261     */
 262    qemu_iovec_init(&op->qiov, nb_chunks);
 263    while (nb_chunks-- > 0) {
 264        MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
 265        size_t remaining = nb_sectors * BDRV_SECTOR_SIZE - op->qiov.size;
 266
 267        QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
 268        s->buf_free_count--;
 269        qemu_iovec_add(&op->qiov, buf, MIN(s->granularity, remaining));
 270    }
 271
 272    /* Copy the dirty cluster.  */
 273    s->in_flight++;
 274    s->sectors_in_flight += nb_sectors;
 275    trace_mirror_one_iteration(s, sector_num, nb_sectors);
 276
 277    bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors,
 278                   mirror_read_complete, op);
 279    return ret;
 280}
 281
 282static void mirror_do_zero_or_discard(MirrorBlockJob *s,
 283                                      int64_t sector_num,
 284                                      int nb_sectors,
 285                                      bool is_discard)
 286{
 287    MirrorOp *op;
 288
 289    /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed
 290     * so the freeing in mirror_iteration_done is nop. */
 291    op = g_new0(MirrorOp, 1);
 292    op->s = s;
 293    op->sector_num = sector_num;
 294    op->nb_sectors = nb_sectors;
 295
 296    s->in_flight++;
 297    s->sectors_in_flight += nb_sectors;
 298    if (is_discard) {
 299        bdrv_aio_discard(s->target, sector_num, op->nb_sectors,
 300                         mirror_write_complete, op);
 301    } else {
 302        bdrv_aio_write_zeroes(s->target, sector_num, op->nb_sectors,
 303                              s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
 304                              mirror_write_complete, op);
 305    }
 306}
 307
 308static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
 309{
 310    BlockDriverState *source = s->common.bs;
 311    int64_t sector_num, first_chunk;
 312    uint64_t delay_ns = 0;
 313    /* At least the first dirty chunk is mirrored in one iteration. */
 314    int nb_chunks = 1;
 315    int64_t end = s->bdev_length / BDRV_SECTOR_SIZE;
 316    int sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
 317
 318    sector_num = hbitmap_iter_next(&s->hbi);
 319    if (sector_num < 0) {
 320        bdrv_dirty_iter_init(s->dirty_bitmap, &s->hbi);
 321        sector_num = hbitmap_iter_next(&s->hbi);
 322        trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap));
 323        assert(sector_num >= 0);
 324    }
 325
 326    first_chunk = sector_num / sectors_per_chunk;
 327    while (test_bit(first_chunk, s->in_flight_bitmap)) {
 328        trace_mirror_yield_in_flight(s, first_chunk, s->in_flight);
 329        mirror_wait_for_io(s);
 330    }
 331
 332    /* Find the number of consective dirty chunks following the first dirty
 333     * one, and wait for in flight requests in them. */
 334    while (nb_chunks * sectors_per_chunk < (s->buf_size >> BDRV_SECTOR_BITS)) {
 335        int64_t hbitmap_next;
 336        int64_t next_sector = sector_num + nb_chunks * sectors_per_chunk;
 337        int64_t next_chunk = next_sector / sectors_per_chunk;
 338        if (next_sector >= end ||
 339            !bdrv_get_dirty(source, s->dirty_bitmap, next_sector)) {
 340            break;
 341        }
 342        if (test_bit(next_chunk, s->in_flight_bitmap)) {
 343            break;
 344        }
 345
 346        hbitmap_next = hbitmap_iter_next(&s->hbi);
 347        if (hbitmap_next > next_sector || hbitmap_next < 0) {
 348            /* The bitmap iterator's cache is stale, refresh it */
 349            bdrv_set_dirty_iter(&s->hbi, next_sector);
 350            hbitmap_next = hbitmap_iter_next(&s->hbi);
 351        }
 352        assert(hbitmap_next == next_sector);
 353        nb_chunks++;
 354    }
 355
 356    /* Clear dirty bits before querying the block status, because
 357     * calling bdrv_get_block_status_above could yield - if some blocks are
 358     * marked dirty in this window, we need to know.
 359     */
 360    bdrv_reset_dirty_bitmap(s->dirty_bitmap, sector_num,
 361                            nb_chunks * sectors_per_chunk);
 362    bitmap_set(s->in_flight_bitmap, sector_num / sectors_per_chunk, nb_chunks);
 363    while (nb_chunks > 0 && sector_num < end) {
 364        int ret;
 365        int io_sectors;
 366        BlockDriverState *file;
 367        enum MirrorMethod {
 368            MIRROR_METHOD_COPY,
 369            MIRROR_METHOD_ZERO,
 370            MIRROR_METHOD_DISCARD
 371        } mirror_method = MIRROR_METHOD_COPY;
 372
 373        assert(!(sector_num % sectors_per_chunk));
 374        ret = bdrv_get_block_status_above(source, NULL, sector_num,
 375                                          nb_chunks * sectors_per_chunk,
 376                                          &io_sectors, &file);
 377        if (ret < 0) {
 378            io_sectors = nb_chunks * sectors_per_chunk;
 379        }
 380
 381        io_sectors -= io_sectors % sectors_per_chunk;
 382        if (io_sectors < sectors_per_chunk) {
 383            io_sectors = sectors_per_chunk;
 384        } else if (ret >= 0 && !(ret & BDRV_BLOCK_DATA)) {
 385            int64_t target_sector_num;
 386            int target_nb_sectors;
 387            bdrv_round_to_clusters(s->target, sector_num, io_sectors,
 388                                   &target_sector_num, &target_nb_sectors);
 389            if (target_sector_num == sector_num &&
 390                target_nb_sectors == io_sectors) {
 391                mirror_method = ret & BDRV_BLOCK_ZERO ?
 392                                    MIRROR_METHOD_ZERO :
 393                                    MIRROR_METHOD_DISCARD;
 394            }
 395        }
 396
 397        mirror_clip_sectors(s, sector_num, &io_sectors);
 398        switch (mirror_method) {
 399        case MIRROR_METHOD_COPY:
 400            io_sectors = mirror_do_read(s, sector_num, io_sectors);
 401            break;
 402        case MIRROR_METHOD_ZERO:
 403            mirror_do_zero_or_discard(s, sector_num, io_sectors, false);
 404            break;
 405        case MIRROR_METHOD_DISCARD:
 406            mirror_do_zero_or_discard(s, sector_num, io_sectors, true);
 407            break;
 408        default:
 409            abort();
 410        }
 411        assert(io_sectors);
 412        sector_num += io_sectors;
 413        nb_chunks -= DIV_ROUND_UP(io_sectors, sectors_per_chunk);
 414        delay_ns += ratelimit_calculate_delay(&s->limit, io_sectors);
 415    }
 416    return delay_ns;
 417}
 418
 419static void mirror_free_init(MirrorBlockJob *s)
 420{
 421    int granularity = s->granularity;
 422    size_t buf_size = s->buf_size;
 423    uint8_t *buf = s->buf;
 424
 425    assert(s->buf_free_count == 0);
 426    QSIMPLEQ_INIT(&s->buf_free);
 427    while (buf_size != 0) {
 428        MirrorBuffer *cur = (MirrorBuffer *)buf;
 429        QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next);
 430        s->buf_free_count++;
 431        buf_size -= granularity;
 432        buf += granularity;
 433    }
 434}
 435
 436static void mirror_drain(MirrorBlockJob *s)
 437{
 438    while (s->in_flight > 0) {
 439        mirror_wait_for_io(s);
 440    }
 441}
 442
 443typedef struct {
 444    int ret;
 445} MirrorExitData;
 446
 447static void mirror_exit(BlockJob *job, void *opaque)
 448{
 449    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
 450    MirrorExitData *data = opaque;
 451    AioContext *replace_aio_context = NULL;
 452    BlockDriverState *src = s->common.bs;
 453
 454    /* Make sure that the source BDS doesn't go away before we called
 455     * block_job_completed(). */
 456    bdrv_ref(src);
 457
 458    if (s->to_replace) {
 459        replace_aio_context = bdrv_get_aio_context(s->to_replace);
 460        aio_context_acquire(replace_aio_context);
 461    }
 462
 463    if (s->should_complete && data->ret == 0) {
 464        BlockDriverState *to_replace = s->common.bs;
 465        if (s->to_replace) {
 466            to_replace = s->to_replace;
 467        }
 468
 469        /* This was checked in mirror_start_job(), but meanwhile one of the
 470         * nodes could have been newly attached to a BlockBackend. */
 471        if (to_replace->blk && s->target->blk) {
 472            error_report("block job: Can't create node with two BlockBackends");
 473            data->ret = -EINVAL;
 474            goto out;
 475        }
 476
 477        if (bdrv_get_flags(s->target) != bdrv_get_flags(to_replace)) {
 478            bdrv_reopen(s->target, bdrv_get_flags(to_replace), NULL);
 479        }
 480        bdrv_replace_in_backing_chain(to_replace, s->target);
 481    }
 482
 483out:
 484    if (s->to_replace) {
 485        bdrv_op_unblock_all(s->to_replace, s->replace_blocker);
 486        error_free(s->replace_blocker);
 487        bdrv_unref(s->to_replace);
 488    }
 489    if (replace_aio_context) {
 490        aio_context_release(replace_aio_context);
 491    }
 492    g_free(s->replaces);
 493    bdrv_op_unblock_all(s->target, s->common.blocker);
 494    bdrv_unref(s->target);
 495    block_job_completed(&s->common, data->ret);
 496    g_free(data);
 497    bdrv_drained_end(src);
 498    if (qemu_get_aio_context() == bdrv_get_aio_context(src)) {
 499        aio_enable_external(iohandler_get_aio_context());
 500    }
 501    bdrv_unref(src);
 502}
 503
 504static void coroutine_fn mirror_run(void *opaque)
 505{
 506    MirrorBlockJob *s = opaque;
 507    MirrorExitData *data;
 508    BlockDriverState *bs = s->common.bs;
 509    int64_t sector_num, end, length;
 510    uint64_t last_pause_ns;
 511    BlockDriverInfo bdi;
 512    char backing_filename[2]; /* we only need 2 characters because we are only
 513                                 checking for a NULL string */
 514    int ret = 0;
 515    int n;
 516    int target_cluster_size = BDRV_SECTOR_SIZE;
 517
 518    if (block_job_is_cancelled(&s->common)) {
 519        goto immediate_exit;
 520    }
 521
 522    s->bdev_length = bdrv_getlength(bs);
 523    if (s->bdev_length < 0) {
 524        ret = s->bdev_length;
 525        goto immediate_exit;
 526    } else if (s->bdev_length == 0) {
 527        /* Report BLOCK_JOB_READY and wait for complete. */
 528        block_job_event_ready(&s->common);
 529        s->synced = true;
 530        while (!block_job_is_cancelled(&s->common) && !s->should_complete) {
 531            block_job_yield(&s->common);
 532        }
 533        s->common.cancelled = false;
 534        goto immediate_exit;
 535    }
 536
 537    length = DIV_ROUND_UP(s->bdev_length, s->granularity);
 538    s->in_flight_bitmap = bitmap_new(length);
 539
 540    /* If we have no backing file yet in the destination, we cannot let
 541     * the destination do COW.  Instead, we copy sectors around the
 542     * dirty data if needed.  We need a bitmap to do that.
 543     */
 544    bdrv_get_backing_filename(s->target, backing_filename,
 545                              sizeof(backing_filename));
 546    if (!bdrv_get_info(s->target, &bdi) && bdi.cluster_size) {
 547        target_cluster_size = bdi.cluster_size;
 548    }
 549    if (backing_filename[0] && !s->target->backing
 550        && s->granularity < target_cluster_size) {
 551        s->buf_size = MAX(s->buf_size, target_cluster_size);
 552        s->cow_bitmap = bitmap_new(length);
 553    }
 554    s->target_cluster_sectors = target_cluster_size >> BDRV_SECTOR_BITS;
 555    s->max_iov = MIN(s->common.bs->bl.max_iov, s->target->bl.max_iov);
 556
 557    end = s->bdev_length / BDRV_SECTOR_SIZE;
 558    s->buf = qemu_try_blockalign(bs, s->buf_size);
 559    if (s->buf == NULL) {
 560        ret = -ENOMEM;
 561        goto immediate_exit;
 562    }
 563
 564    mirror_free_init(s);
 565
 566    last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 567    if (!s->is_none_mode) {
 568        /* First part, loop on the sectors and initialize the dirty bitmap.  */
 569        BlockDriverState *base = s->base;
 570        bool mark_all_dirty = s->base == NULL && !bdrv_has_zero_init(s->target);
 571
 572        for (sector_num = 0; sector_num < end; ) {
 573            /* Just to make sure we are not exceeding int limit. */
 574            int nb_sectors = MIN(INT_MAX >> BDRV_SECTOR_BITS,
 575                                 end - sector_num);
 576            int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 577
 578            if (now - last_pause_ns > SLICE_TIME) {
 579                last_pause_ns = now;
 580                block_job_sleep_ns(&s->common, QEMU_CLOCK_REALTIME, 0);
 581            }
 582
 583            if (block_job_is_cancelled(&s->common)) {
 584                goto immediate_exit;
 585            }
 586
 587            ret = bdrv_is_allocated_above(bs, base, sector_num, nb_sectors, &n);
 588
 589            if (ret < 0) {
 590                goto immediate_exit;
 591            }
 592
 593            assert(n > 0);
 594            if (ret == 1 || mark_all_dirty) {
 595                bdrv_set_dirty_bitmap(s->dirty_bitmap, sector_num, n);
 596            }
 597            sector_num += n;
 598        }
 599    }
 600
 601    bdrv_dirty_iter_init(s->dirty_bitmap, &s->hbi);
 602    for (;;) {
 603        uint64_t delay_ns = 0;
 604        int64_t cnt;
 605        bool should_complete;
 606
 607        if (s->ret < 0) {
 608            ret = s->ret;
 609            goto immediate_exit;
 610        }
 611
 612        cnt = bdrv_get_dirty_count(s->dirty_bitmap);
 613        /* s->common.offset contains the number of bytes already processed so
 614         * far, cnt is the number of dirty sectors remaining and
 615         * s->sectors_in_flight is the number of sectors currently being
 616         * processed; together those are the current total operation length */
 617        s->common.len = s->common.offset +
 618                        (cnt + s->sectors_in_flight) * BDRV_SECTOR_SIZE;
 619
 620        /* Note that even when no rate limit is applied we need to yield
 621         * periodically with no pending I/O so that bdrv_drain_all() returns.
 622         * We do so every SLICE_TIME nanoseconds, or when there is an error,
 623         * or when the source is clean, whichever comes first.
 624         */
 625        if (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - last_pause_ns < SLICE_TIME &&
 626            s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
 627            if (s->in_flight == MAX_IN_FLIGHT || s->buf_free_count == 0 ||
 628                (cnt == 0 && s->in_flight > 0)) {
 629                trace_mirror_yield(s, s->in_flight, s->buf_free_count, cnt);
 630                mirror_wait_for_io(s);
 631                continue;
 632            } else if (cnt != 0) {
 633                delay_ns = mirror_iteration(s);
 634            }
 635        }
 636
 637        should_complete = false;
 638        if (s->in_flight == 0 && cnt == 0) {
 639            trace_mirror_before_flush(s);
 640            ret = bdrv_flush(s->target);
 641            if (ret < 0) {
 642                if (mirror_error_action(s, false, -ret) ==
 643                    BLOCK_ERROR_ACTION_REPORT) {
 644                    goto immediate_exit;
 645                }
 646            } else {
 647                /* We're out of the streaming phase.  From now on, if the job
 648                 * is cancelled we will actually complete all pending I/O and
 649                 * report completion.  This way, block-job-cancel will leave
 650                 * the target in a consistent state.
 651                 */
 652                if (!s->synced) {
 653                    block_job_event_ready(&s->common);
 654                    s->synced = true;
 655                }
 656
 657                should_complete = s->should_complete ||
 658                    block_job_is_cancelled(&s->common);
 659                cnt = bdrv_get_dirty_count(s->dirty_bitmap);
 660            }
 661        }
 662
 663        if (cnt == 0 && should_complete) {
 664            /* The dirty bitmap is not updated while operations are pending.
 665             * If we're about to exit, wait for pending operations before
 666             * calling bdrv_get_dirty_count(bs), or we may exit while the
 667             * source has dirty data to copy!
 668             *
 669             * Note that I/O can be submitted by the guest while
 670             * mirror_populate runs.
 671             */
 672            trace_mirror_before_drain(s, cnt);
 673            bdrv_co_drain(bs);
 674            cnt = bdrv_get_dirty_count(s->dirty_bitmap);
 675        }
 676
 677        ret = 0;
 678        trace_mirror_before_sleep(s, cnt, s->synced, delay_ns);
 679        if (!s->synced) {
 680            block_job_sleep_ns(&s->common, QEMU_CLOCK_REALTIME, delay_ns);
 681            if (block_job_is_cancelled(&s->common)) {
 682                break;
 683            }
 684        } else if (!should_complete) {
 685            delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
 686            block_job_sleep_ns(&s->common, QEMU_CLOCK_REALTIME, delay_ns);
 687        } else if (cnt == 0) {
 688            /* The two disks are in sync.  Exit and report successful
 689             * completion.
 690             */
 691            assert(QLIST_EMPTY(&bs->tracked_requests));
 692            s->common.cancelled = false;
 693            break;
 694        }
 695        last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 696    }
 697
 698immediate_exit:
 699    if (s->in_flight > 0) {
 700        /* We get here only if something went wrong.  Either the job failed,
 701         * or it was cancelled prematurely so that we do not guarantee that
 702         * the target is a copy of the source.
 703         */
 704        assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
 705        mirror_drain(s);
 706    }
 707
 708    assert(s->in_flight == 0);
 709    qemu_vfree(s->buf);
 710    g_free(s->cow_bitmap);
 711    g_free(s->in_flight_bitmap);
 712    bdrv_release_dirty_bitmap(bs, s->dirty_bitmap);
 713    if (s->target->blk) {
 714        blk_iostatus_disable(s->target->blk);
 715    }
 716
 717    data = g_malloc(sizeof(*data));
 718    data->ret = ret;
 719    /* Before we switch to target in mirror_exit, make sure data doesn't
 720     * change. */
 721    bdrv_drained_begin(s->common.bs);
 722    if (qemu_get_aio_context() == bdrv_get_aio_context(bs)) {
 723        /* FIXME: virtio host notifiers run on iohandler_ctx, therefore the
 724         * above bdrv_drained_end isn't enough to quiesce it. This is ugly, we
 725         * need a block layer API change to achieve this. */
 726        aio_disable_external(iohandler_get_aio_context());
 727    }
 728    block_job_defer_to_main_loop(&s->common, mirror_exit, data);
 729}
 730
 731static void mirror_set_speed(BlockJob *job, int64_t speed, Error **errp)
 732{
 733    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
 734
 735    if (speed < 0) {
 736        error_setg(errp, QERR_INVALID_PARAMETER, "speed");
 737        return;
 738    }
 739    ratelimit_set_speed(&s->limit, speed / BDRV_SECTOR_SIZE, SLICE_TIME);
 740}
 741
 742static void mirror_iostatus_reset(BlockJob *job)
 743{
 744    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
 745
 746    if (s->target->blk) {
 747        blk_iostatus_reset(s->target->blk);
 748    }
 749}
 750
 751static void mirror_complete(BlockJob *job, Error **errp)
 752{
 753    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
 754    Error *local_err = NULL;
 755    int ret;
 756
 757    ret = bdrv_open_backing_file(s->target, NULL, "backing", &local_err);
 758    if (ret < 0) {
 759        error_propagate(errp, local_err);
 760        return;
 761    }
 762    if (!s->synced) {
 763        error_setg(errp, QERR_BLOCK_JOB_NOT_READY, job->id);
 764        return;
 765    }
 766
 767    /* check the target bs is not blocked and block all operations on it */
 768    if (s->replaces) {
 769        AioContext *replace_aio_context;
 770
 771        s->to_replace = bdrv_find_node(s->replaces);
 772        if (!s->to_replace) {
 773            error_setg(errp, "Node name '%s' not found", s->replaces);
 774            return;
 775        }
 776
 777        replace_aio_context = bdrv_get_aio_context(s->to_replace);
 778        aio_context_acquire(replace_aio_context);
 779
 780        error_setg(&s->replace_blocker,
 781                   "block device is in use by block-job-complete");
 782        bdrv_op_block_all(s->to_replace, s->replace_blocker);
 783        bdrv_ref(s->to_replace);
 784
 785        aio_context_release(replace_aio_context);
 786    }
 787
 788    s->should_complete = true;
 789    block_job_enter(&s->common);
 790}
 791
 792static const BlockJobDriver mirror_job_driver = {
 793    .instance_size = sizeof(MirrorBlockJob),
 794    .job_type      = BLOCK_JOB_TYPE_MIRROR,
 795    .set_speed     = mirror_set_speed,
 796    .iostatus_reset= mirror_iostatus_reset,
 797    .complete      = mirror_complete,
 798};
 799
 800static const BlockJobDriver commit_active_job_driver = {
 801    .instance_size = sizeof(MirrorBlockJob),
 802    .job_type      = BLOCK_JOB_TYPE_COMMIT,
 803    .set_speed     = mirror_set_speed,
 804    .iostatus_reset
 805                   = mirror_iostatus_reset,
 806    .complete      = mirror_complete,
 807};
 808
 809static void mirror_start_job(BlockDriverState *bs, BlockDriverState *target,
 810                             const char *replaces,
 811                             int64_t speed, uint32_t granularity,
 812                             int64_t buf_size,
 813                             BlockdevOnError on_source_error,
 814                             BlockdevOnError on_target_error,
 815                             bool unmap,
 816                             BlockCompletionFunc *cb,
 817                             void *opaque, Error **errp,
 818                             const BlockJobDriver *driver,
 819                             bool is_none_mode, BlockDriverState *base)
 820{
 821    MirrorBlockJob *s;
 822    BlockDriverState *replaced_bs;
 823
 824    if (granularity == 0) {
 825        granularity = bdrv_get_default_bitmap_granularity(target);
 826    }
 827
 828    assert ((granularity & (granularity - 1)) == 0);
 829
 830    if ((on_source_error == BLOCKDEV_ON_ERROR_STOP ||
 831         on_source_error == BLOCKDEV_ON_ERROR_ENOSPC) &&
 832        (!bs->blk || !blk_iostatus_is_enabled(bs->blk))) {
 833        error_setg(errp, QERR_INVALID_PARAMETER, "on-source-error");
 834        return;
 835    }
 836
 837    if (buf_size < 0) {
 838        error_setg(errp, "Invalid parameter 'buf-size'");
 839        return;
 840    }
 841
 842    if (buf_size == 0) {
 843        buf_size = DEFAULT_MIRROR_BUF_SIZE;
 844    }
 845
 846    /* We can't support this case as long as the block layer can't handle
 847     * multiple BlockBackends per BlockDriverState. */
 848    if (replaces) {
 849        replaced_bs = bdrv_lookup_bs(replaces, replaces, errp);
 850        if (replaced_bs == NULL) {
 851            return;
 852        }
 853    } else {
 854        replaced_bs = bs;
 855    }
 856    if (replaced_bs->blk && target->blk) {
 857        error_setg(errp, "Can't create node with two BlockBackends");
 858        return;
 859    }
 860
 861    s = block_job_create(driver, bs, speed, cb, opaque, errp);
 862    if (!s) {
 863        return;
 864    }
 865
 866    s->replaces = g_strdup(replaces);
 867    s->on_source_error = on_source_error;
 868    s->on_target_error = on_target_error;
 869    s->target = target;
 870    s->is_none_mode = is_none_mode;
 871    s->base = base;
 872    s->granularity = granularity;
 873    s->buf_size = ROUND_UP(buf_size, granularity);
 874    s->unmap = unmap;
 875
 876    s->dirty_bitmap = bdrv_create_dirty_bitmap(bs, granularity, NULL, errp);
 877    if (!s->dirty_bitmap) {
 878        g_free(s->replaces);
 879        block_job_unref(&s->common);
 880        return;
 881    }
 882
 883    bdrv_op_block_all(s->target, s->common.blocker);
 884
 885    if (s->target->blk) {
 886        blk_set_on_error(s->target->blk, on_target_error, on_target_error);
 887        blk_iostatus_enable(s->target->blk);
 888    }
 889    s->common.co = qemu_coroutine_create(mirror_run);
 890    trace_mirror_start(bs, s, s->common.co, opaque);
 891    qemu_coroutine_enter(s->common.co, s);
 892}
 893
 894void mirror_start(BlockDriverState *bs, BlockDriverState *target,
 895                  const char *replaces,
 896                  int64_t speed, uint32_t granularity, int64_t buf_size,
 897                  MirrorSyncMode mode, BlockdevOnError on_source_error,
 898                  BlockdevOnError on_target_error,
 899                  bool unmap,
 900                  BlockCompletionFunc *cb,
 901                  void *opaque, Error **errp)
 902{
 903    bool is_none_mode;
 904    BlockDriverState *base;
 905
 906    if (mode == MIRROR_SYNC_MODE_INCREMENTAL) {
 907        error_setg(errp, "Sync mode 'incremental' not supported");
 908        return;
 909    }
 910    is_none_mode = mode == MIRROR_SYNC_MODE_NONE;
 911    base = mode == MIRROR_SYNC_MODE_TOP ? backing_bs(bs) : NULL;
 912    mirror_start_job(bs, target, replaces,
 913                     speed, granularity, buf_size,
 914                     on_source_error, on_target_error, unmap, cb, opaque, errp,
 915                     &mirror_job_driver, is_none_mode, base);
 916}
 917
 918void commit_active_start(BlockDriverState *bs, BlockDriverState *base,
 919                         int64_t speed,
 920                         BlockdevOnError on_error,
 921                         BlockCompletionFunc *cb,
 922                         void *opaque, Error **errp)
 923{
 924    int64_t length, base_length;
 925    int orig_base_flags;
 926    int ret;
 927    Error *local_err = NULL;
 928
 929    orig_base_flags = bdrv_get_flags(base);
 930
 931    if (bdrv_reopen(base, bs->open_flags, errp)) {
 932        return;
 933    }
 934
 935    length = bdrv_getlength(bs);
 936    if (length < 0) {
 937        error_setg_errno(errp, -length,
 938                         "Unable to determine length of %s", bs->filename);
 939        goto error_restore_flags;
 940    }
 941
 942    base_length = bdrv_getlength(base);
 943    if (base_length < 0) {
 944        error_setg_errno(errp, -base_length,
 945                         "Unable to determine length of %s", base->filename);
 946        goto error_restore_flags;
 947    }
 948
 949    if (length > base_length) {
 950        ret = bdrv_truncate(base, length);
 951        if (ret < 0) {
 952            error_setg_errno(errp, -ret,
 953                            "Top image %s is larger than base image %s, and "
 954                             "resize of base image failed",
 955                             bs->filename, base->filename);
 956            goto error_restore_flags;
 957        }
 958    }
 959
 960    bdrv_ref(base);
 961    mirror_start_job(bs, base, NULL, speed, 0, 0,
 962                     on_error, on_error, false, cb, opaque, &local_err,
 963                     &commit_active_job_driver, false, base);
 964    if (local_err) {
 965        error_propagate(errp, local_err);
 966        goto error_restore_flags;
 967    }
 968
 969    return;
 970
 971error_restore_flags:
 972    /* ignore error and errp for bdrv_reopen, because we want to propagate
 973     * the original error */
 974    bdrv_reopen(base, orig_base_flags, NULL);
 975    return;
 976}
 977