qemu/block/block-copy.c
<<
>>
Prefs
   1/*
   2 * block_copy API
   3 *
   4 * Copyright (C) 2013 Proxmox Server Solutions
   5 * Copyright (c) 2019 Virtuozzo International GmbH.
   6 *
   7 * Authors:
   8 *  Dietmar Maurer (dietmar@proxmox.com)
   9 *  Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  12 * See the COPYING file in the top-level directory.
  13 */
  14
  15#include "qemu/osdep.h"
  16
  17#include "trace.h"
  18#include "qapi/error.h"
  19#include "block/block-copy.h"
  20#include "block/reqlist.h"
  21#include "sysemu/block-backend.h"
  22#include "qemu/units.h"
  23#include "qemu/coroutine.h"
  24#include "block/aio_task.h"
  25#include "qemu/error-report.h"
  26#include "qemu/memalign.h"
  27
  28#define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
  29#define BLOCK_COPY_MAX_BUFFER (1 * MiB)
  30#define BLOCK_COPY_MAX_MEM (128 * MiB)
  31#define BLOCK_COPY_MAX_WORKERS 64
  32#define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */
  33#define BLOCK_COPY_CLUSTER_SIZE_DEFAULT (1 << 16)
  34
  35typedef enum {
  36    COPY_READ_WRITE_CLUSTER,
  37    COPY_READ_WRITE,
  38    COPY_WRITE_ZEROES,
  39    COPY_RANGE_SMALL,
  40    COPY_RANGE_FULL
  41} BlockCopyMethod;
  42
  43static coroutine_fn int block_copy_task_entry(AioTask *task);
  44
  45typedef struct BlockCopyCallState {
  46    /* Fields initialized in block_copy_async() and never changed. */
  47    BlockCopyState *s;
  48    int64_t offset;
  49    int64_t bytes;
  50    int max_workers;
  51    int64_t max_chunk;
  52    bool ignore_ratelimit;
  53    BlockCopyAsyncCallbackFunc cb;
  54    void *cb_opaque;
  55    /* Coroutine where async block-copy is running */
  56    Coroutine *co;
  57
  58    /* Fields whose state changes throughout the execution */
  59    bool finished; /* atomic */
  60    QemuCoSleep sleep; /* TODO: protect API with a lock */
  61    bool cancelled; /* atomic */
  62    /* To reference all call states from BlockCopyState */
  63    QLIST_ENTRY(BlockCopyCallState) list;
  64
  65    /*
  66     * Fields that report information about return values and erros.
  67     * Protected by lock in BlockCopyState.
  68     */
  69    bool error_is_read;
  70    /*
  71     * @ret is set concurrently by tasks under mutex. Only set once by first
  72     * failed task (and untouched if no task failed).
  73     * After finishing (call_state->finished is true), it is not modified
  74     * anymore and may be safely read without mutex.
  75     */
  76    int ret;
  77} BlockCopyCallState;
  78
  79typedef struct BlockCopyTask {
  80    AioTask task;
  81
  82    /*
  83     * Fields initialized in block_copy_task_create()
  84     * and never changed.
  85     */
  86    BlockCopyState *s;
  87    BlockCopyCallState *call_state;
  88    /*
  89     * @method can also be set again in the while loop of
  90     * block_copy_dirty_clusters(), but it is never accessed concurrently
  91     * because the only other function that reads it is
  92     * block_copy_task_entry() and it is invoked afterwards in the same
  93     * iteration.
  94     */
  95    BlockCopyMethod method;
  96
  97    /*
  98     * Generally, req is protected by lock in BlockCopyState, Still req.offset
  99     * is only set on task creation, so may be read concurrently after creation.
 100     * req.bytes is changed at most once, and need only protecting the case of
 101     * parallel read while updating @bytes value in block_copy_task_shrink().
 102     */
 103    BlockReq req;
 104} BlockCopyTask;
 105
 106static int64_t task_end(BlockCopyTask *task)
 107{
 108    return task->req.offset + task->req.bytes;
 109}
 110
 111typedef struct BlockCopyState {
 112    /*
 113     * BdrvChild objects are not owned or managed by block-copy. They are
 114     * provided by block-copy user and user is responsible for appropriate
 115     * permissions on these children.
 116     */
 117    BdrvChild *source;
 118    BdrvChild *target;
 119
 120    /*
 121     * Fields initialized in block_copy_state_new()
 122     * and never changed.
 123     */
 124    int64_t cluster_size;
 125    int64_t max_transfer;
 126    uint64_t len;
 127    BdrvRequestFlags write_flags;
 128
 129    /*
 130     * Fields whose state changes throughout the execution
 131     * Protected by lock.
 132     */
 133    CoMutex lock;
 134    int64_t in_flight_bytes;
 135    BlockCopyMethod method;
 136    BlockReqList reqs;
 137    QLIST_HEAD(, BlockCopyCallState) calls;
 138    /*
 139     * skip_unallocated:
 140     *
 141     * Used by sync=top jobs, which first scan the source node for unallocated
 142     * areas and clear them in the copy_bitmap.  During this process, the bitmap
 143     * is thus not fully initialized: It may still have bits set for areas that
 144     * are unallocated and should actually not be copied.
 145     *
 146     * This is indicated by skip_unallocated.
 147     *
 148     * In this case, block_copy() will query the source’s allocation status,
 149     * skip unallocated regions, clear them in the copy_bitmap, and invoke
 150     * block_copy_reset_unallocated() every time it does.
 151     */
 152    bool skip_unallocated; /* atomic */
 153    /* State fields that use a thread-safe API */
 154    BdrvDirtyBitmap *copy_bitmap;
 155    ProgressMeter *progress;
 156    SharedResource *mem;
 157    RateLimit rate_limit;
 158} BlockCopyState;
 159
 160/* Called with lock held */
 161static int64_t block_copy_chunk_size(BlockCopyState *s)
 162{
 163    switch (s->method) {
 164    case COPY_READ_WRITE_CLUSTER:
 165        return s->cluster_size;
 166    case COPY_READ_WRITE:
 167    case COPY_RANGE_SMALL:
 168        return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER),
 169                   s->max_transfer);
 170    case COPY_RANGE_FULL:
 171        return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
 172                   s->max_transfer);
 173    default:
 174        /* Cannot have COPY_WRITE_ZEROES here.  */
 175        abort();
 176    }
 177}
 178
 179/*
 180 * Search for the first dirty area in offset/bytes range and create task at
 181 * the beginning of it.
 182 */
 183static coroutine_fn BlockCopyTask *
 184block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state,
 185                       int64_t offset, int64_t bytes)
 186{
 187    BlockCopyTask *task;
 188    int64_t max_chunk;
 189
 190    QEMU_LOCK_GUARD(&s->lock);
 191    max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk);
 192    if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
 193                                           offset, offset + bytes,
 194                                           max_chunk, &offset, &bytes))
 195    {
 196        return NULL;
 197    }
 198
 199    assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
 200    bytes = QEMU_ALIGN_UP(bytes, s->cluster_size);
 201
 202    /* region is dirty, so no existent tasks possible in it */
 203    assert(!reqlist_find_conflict(&s->reqs, offset, bytes));
 204
 205    bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
 206    s->in_flight_bytes += bytes;
 207
 208    task = g_new(BlockCopyTask, 1);
 209    *task = (BlockCopyTask) {
 210        .task.func = block_copy_task_entry,
 211        .s = s,
 212        .call_state = call_state,
 213        .method = s->method,
 214    };
 215    reqlist_init_req(&s->reqs, &task->req, offset, bytes);
 216
 217    return task;
 218}
 219
 220/*
 221 * block_copy_task_shrink
 222 *
 223 * Drop the tail of the task to be handled later. Set dirty bits back and
 224 * wake up all tasks waiting for us (may be some of them are not intersecting
 225 * with shrunk task)
 226 */
 227static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
 228                                                int64_t new_bytes)
 229{
 230    QEMU_LOCK_GUARD(&task->s->lock);
 231    if (new_bytes == task->req.bytes) {
 232        return;
 233    }
 234
 235    assert(new_bytes > 0 && new_bytes < task->req.bytes);
 236
 237    task->s->in_flight_bytes -= task->req.bytes - new_bytes;
 238    bdrv_set_dirty_bitmap(task->s->copy_bitmap,
 239                          task->req.offset + new_bytes,
 240                          task->req.bytes - new_bytes);
 241
 242    reqlist_shrink_req(&task->req, new_bytes);
 243}
 244
 245static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
 246{
 247    QEMU_LOCK_GUARD(&task->s->lock);
 248    task->s->in_flight_bytes -= task->req.bytes;
 249    if (ret < 0) {
 250        bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->req.offset,
 251                              task->req.bytes);
 252    }
 253    if (task->s->progress) {
 254        progress_set_remaining(task->s->progress,
 255                               bdrv_get_dirty_count(task->s->copy_bitmap) +
 256                               task->s->in_flight_bytes);
 257    }
 258    reqlist_remove_req(&task->req);
 259}
 260
 261void block_copy_state_free(BlockCopyState *s)
 262{
 263    if (!s) {
 264        return;
 265    }
 266
 267    ratelimit_destroy(&s->rate_limit);
 268    bdrv_release_dirty_bitmap(s->copy_bitmap);
 269    shres_destroy(s->mem);
 270    g_free(s);
 271}
 272
 273static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target)
 274{
 275    return MIN_NON_ZERO(INT_MAX,
 276                        MIN_NON_ZERO(source->bs->bl.max_transfer,
 277                                     target->bs->bl.max_transfer));
 278}
 279
 280void block_copy_set_copy_opts(BlockCopyState *s, bool use_copy_range,
 281                              bool compress)
 282{
 283    /* Keep BDRV_REQ_SERIALISING set (or not set) in block_copy_state_new() */
 284    s->write_flags = (s->write_flags & BDRV_REQ_SERIALISING) |
 285        (compress ? BDRV_REQ_WRITE_COMPRESSED : 0);
 286
 287    if (s->max_transfer < s->cluster_size) {
 288        /*
 289         * copy_range does not respect max_transfer. We don't want to bother
 290         * with requests smaller than block-copy cluster size, so fallback to
 291         * buffered copying (read and write respect max_transfer on their
 292         * behalf).
 293         */
 294        s->method = COPY_READ_WRITE_CLUSTER;
 295    } else if (compress) {
 296        /* Compression supports only cluster-size writes and no copy-range. */
 297        s->method = COPY_READ_WRITE_CLUSTER;
 298    } else {
 299        /*
 300         * If copy range enabled, start with COPY_RANGE_SMALL, until first
 301         * successful copy_range (look at block_copy_do_copy).
 302         */
 303        s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE;
 304    }
 305}
 306
 307static int64_t block_copy_calculate_cluster_size(BlockDriverState *target,
 308                                                 Error **errp)
 309{
 310    int ret;
 311    BlockDriverInfo bdi;
 312    bool target_does_cow = bdrv_backing_chain_next(target);
 313
 314    /*
 315     * If there is no backing file on the target, we cannot rely on COW if our
 316     * backup cluster size is smaller than the target cluster size. Even for
 317     * targets with a backing file, try to avoid COW if possible.
 318     */
 319    ret = bdrv_get_info(target, &bdi);
 320    if (ret == -ENOTSUP && !target_does_cow) {
 321        /* Cluster size is not defined */
 322        warn_report("The target block device doesn't provide "
 323                    "information about the block size and it doesn't have a "
 324                    "backing file. The default block size of %u bytes is "
 325                    "used. If the actual block size of the target exceeds "
 326                    "this default, the backup may be unusable",
 327                    BLOCK_COPY_CLUSTER_SIZE_DEFAULT);
 328        return BLOCK_COPY_CLUSTER_SIZE_DEFAULT;
 329    } else if (ret < 0 && !target_does_cow) {
 330        error_setg_errno(errp, -ret,
 331            "Couldn't determine the cluster size of the target image, "
 332            "which has no backing file");
 333        error_append_hint(errp,
 334            "Aborting, since this may create an unusable destination image\n");
 335        return ret;
 336    } else if (ret < 0 && target_does_cow) {
 337        /* Not fatal; just trudge on ahead. */
 338        return BLOCK_COPY_CLUSTER_SIZE_DEFAULT;
 339    }
 340
 341    return MAX(BLOCK_COPY_CLUSTER_SIZE_DEFAULT, bdi.cluster_size);
 342}
 343
 344BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
 345                                     const BdrvDirtyBitmap *bitmap,
 346                                     Error **errp)
 347{
 348    ERRP_GUARD();
 349    BlockCopyState *s;
 350    int64_t cluster_size;
 351    BdrvDirtyBitmap *copy_bitmap;
 352    bool is_fleecing;
 353
 354    cluster_size = block_copy_calculate_cluster_size(target->bs, errp);
 355    if (cluster_size < 0) {
 356        return NULL;
 357    }
 358
 359    copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL,
 360                                           errp);
 361    if (!copy_bitmap) {
 362        return NULL;
 363    }
 364    bdrv_disable_dirty_bitmap(copy_bitmap);
 365    if (bitmap) {
 366        if (!bdrv_merge_dirty_bitmap(copy_bitmap, bitmap, NULL, errp)) {
 367            error_prepend(errp, "Failed to merge bitmap '%s' to internal "
 368                          "copy-bitmap: ", bdrv_dirty_bitmap_name(bitmap));
 369            bdrv_release_dirty_bitmap(copy_bitmap);
 370            return NULL;
 371        }
 372    } else {
 373        bdrv_set_dirty_bitmap(copy_bitmap, 0,
 374                              bdrv_dirty_bitmap_size(copy_bitmap));
 375    }
 376
 377    /*
 378     * If source is in backing chain of target assume that target is going to be
 379     * used for "image fleecing", i.e. it should represent a kind of snapshot of
 380     * source at backup-start point in time. And target is going to be read by
 381     * somebody (for example, used as NBD export) during backup job.
 382     *
 383     * In this case, we need to add BDRV_REQ_SERIALISING write flag to avoid
 384     * intersection of backup writes and third party reads from target,
 385     * otherwise reading from target we may occasionally read already updated by
 386     * guest data.
 387     *
 388     * For more information see commit f8d59dfb40bb and test
 389     * tests/qemu-iotests/222
 390     */
 391    is_fleecing = bdrv_chain_contains(target->bs, source->bs);
 392
 393    s = g_new(BlockCopyState, 1);
 394    *s = (BlockCopyState) {
 395        .source = source,
 396        .target = target,
 397        .copy_bitmap = copy_bitmap,
 398        .cluster_size = cluster_size,
 399        .len = bdrv_dirty_bitmap_size(copy_bitmap),
 400        .write_flags = (is_fleecing ? BDRV_REQ_SERIALISING : 0),
 401        .mem = shres_create(BLOCK_COPY_MAX_MEM),
 402        .max_transfer = QEMU_ALIGN_DOWN(
 403                                    block_copy_max_transfer(source, target),
 404                                    cluster_size),
 405    };
 406
 407    block_copy_set_copy_opts(s, false, false);
 408
 409    ratelimit_init(&s->rate_limit);
 410    qemu_co_mutex_init(&s->lock);
 411    QLIST_INIT(&s->reqs);
 412    QLIST_INIT(&s->calls);
 413
 414    return s;
 415}
 416
 417/* Only set before running the job, no need for locking. */
 418void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
 419{
 420    s->progress = pm;
 421}
 422
 423/*
 424 * Takes ownership of @task
 425 *
 426 * If pool is NULL directly run the task, otherwise schedule it into the pool.
 427 *
 428 * Returns: task.func return code if pool is NULL
 429 *          otherwise -ECANCELED if pool status is bad
 430 *          otherwise 0 (successfully scheduled)
 431 */
 432static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
 433                                            BlockCopyTask *task)
 434{
 435    if (!pool) {
 436        int ret = task->task.func(&task->task);
 437
 438        g_free(task);
 439        return ret;
 440    }
 441
 442    aio_task_pool_wait_slot(pool);
 443    if (aio_task_pool_status(pool) < 0) {
 444        co_put_to_shres(task->s->mem, task->req.bytes);
 445        block_copy_task_end(task, -ECANCELED);
 446        g_free(task);
 447        return -ECANCELED;
 448    }
 449
 450    aio_task_pool_start_task(pool, &task->task);
 451
 452    return 0;
 453}
 454
 455/*
 456 * block_copy_do_copy
 457 *
 458 * Do copy of cluster-aligned chunk. Requested region is allowed to exceed
 459 * s->len only to cover last cluster when s->len is not aligned to clusters.
 460 *
 461 * No sync here: nor bitmap neighter intersecting requests handling, only copy.
 462 *
 463 * @method is an in-out argument, so that copy_range can be either extended to
 464 * a full-size buffer or disabled if the copy_range attempt fails.  The output
 465 * value of @method should be used for subsequent tasks.
 466 * Returns 0 on success.
 467 */
 468static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
 469                                           int64_t offset, int64_t bytes,
 470                                           BlockCopyMethod *method,
 471                                           bool *error_is_read)
 472{
 473    int ret;
 474    int64_t nbytes = MIN(offset + bytes, s->len) - offset;
 475    void *bounce_buffer = NULL;
 476
 477    assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes);
 478    assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
 479    assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
 480    assert(offset < s->len);
 481    assert(offset + bytes <= s->len ||
 482           offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
 483    assert(nbytes < INT_MAX);
 484
 485    switch (*method) {
 486    case COPY_WRITE_ZEROES:
 487        ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
 488                                    ~BDRV_REQ_WRITE_COMPRESSED);
 489        if (ret < 0) {
 490            trace_block_copy_write_zeroes_fail(s, offset, ret);
 491            *error_is_read = false;
 492        }
 493        return ret;
 494
 495    case COPY_RANGE_SMALL:
 496    case COPY_RANGE_FULL:
 497        ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
 498                                 0, s->write_flags);
 499        if (ret >= 0) {
 500            /* Successful copy-range, increase chunk size.  */
 501            *method = COPY_RANGE_FULL;
 502            return 0;
 503        }
 504
 505        trace_block_copy_copy_range_fail(s, offset, ret);
 506        *method = COPY_READ_WRITE;
 507        /* Fall through to read+write with allocated buffer */
 508
 509    case COPY_READ_WRITE_CLUSTER:
 510    case COPY_READ_WRITE:
 511        /*
 512         * In case of failed copy_range request above, we may proceed with
 513         * buffered request larger than BLOCK_COPY_MAX_BUFFER.
 514         * Still, further requests will be properly limited, so don't care too
 515         * much. Moreover the most likely case (copy_range is unsupported for
 516         * the configuration, so the very first copy_range request fails)
 517         * is handled by setting large copy_size only after first successful
 518         * copy_range.
 519         */
 520
 521        bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
 522
 523        ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
 524        if (ret < 0) {
 525            trace_block_copy_read_fail(s, offset, ret);
 526            *error_is_read = true;
 527            goto out;
 528        }
 529
 530        ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
 531                             s->write_flags);
 532        if (ret < 0) {
 533            trace_block_copy_write_fail(s, offset, ret);
 534            *error_is_read = false;
 535            goto out;
 536        }
 537
 538    out:
 539        qemu_vfree(bounce_buffer);
 540        break;
 541
 542    default:
 543        abort();
 544    }
 545
 546    return ret;
 547}
 548
 549static coroutine_fn int block_copy_task_entry(AioTask *task)
 550{
 551    BlockCopyTask *t = container_of(task, BlockCopyTask, task);
 552    BlockCopyState *s = t->s;
 553    bool error_is_read = false;
 554    BlockCopyMethod method = t->method;
 555    int ret;
 556
 557    ret = block_copy_do_copy(s, t->req.offset, t->req.bytes, &method,
 558                             &error_is_read);
 559
 560    WITH_QEMU_LOCK_GUARD(&s->lock) {
 561        if (s->method == t->method) {
 562            s->method = method;
 563        }
 564
 565        if (ret < 0) {
 566            if (!t->call_state->ret) {
 567                t->call_state->ret = ret;
 568                t->call_state->error_is_read = error_is_read;
 569            }
 570        } else if (s->progress) {
 571            progress_work_done(s->progress, t->req.bytes);
 572        }
 573    }
 574    co_put_to_shres(s->mem, t->req.bytes);
 575    block_copy_task_end(t, ret);
 576
 577    return ret;
 578}
 579
 580static int block_copy_block_status(BlockCopyState *s, int64_t offset,
 581                                   int64_t bytes, int64_t *pnum)
 582{
 583    int64_t num;
 584    BlockDriverState *base;
 585    int ret;
 586
 587    if (qatomic_read(&s->skip_unallocated)) {
 588        base = bdrv_backing_chain_next(s->source->bs);
 589    } else {
 590        base = NULL;
 591    }
 592
 593    ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num,
 594                                  NULL, NULL);
 595    if (ret < 0 || num < s->cluster_size) {
 596        /*
 597         * On error or if failed to obtain large enough chunk just fallback to
 598         * copy one cluster.
 599         */
 600        num = s->cluster_size;
 601        ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA;
 602    } else if (offset + num == s->len) {
 603        num = QEMU_ALIGN_UP(num, s->cluster_size);
 604    } else {
 605        num = QEMU_ALIGN_DOWN(num, s->cluster_size);
 606    }
 607
 608    *pnum = num;
 609    return ret;
 610}
 611
 612/*
 613 * Check if the cluster starting at offset is allocated or not.
 614 * return via pnum the number of contiguous clusters sharing this allocation.
 615 */
 616static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
 617                                           int64_t *pnum)
 618{
 619    BlockDriverState *bs = s->source->bs;
 620    int64_t count, total_count = 0;
 621    int64_t bytes = s->len - offset;
 622    int ret;
 623
 624    assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
 625
 626    while (true) {
 627        ret = bdrv_is_allocated(bs, offset, bytes, &count);
 628        if (ret < 0) {
 629            return ret;
 630        }
 631
 632        total_count += count;
 633
 634        if (ret || count == 0) {
 635            /*
 636             * ret: partial segment(s) are considered allocated.
 637             * otherwise: unallocated tail is treated as an entire segment.
 638             */
 639            *pnum = DIV_ROUND_UP(total_count, s->cluster_size);
 640            return ret;
 641        }
 642
 643        /* Unallocated segment(s) with uncertain following segment(s) */
 644        if (total_count >= s->cluster_size) {
 645            *pnum = total_count / s->cluster_size;
 646            return 0;
 647        }
 648
 649        offset += count;
 650        bytes -= count;
 651    }
 652}
 653
 654void block_copy_reset(BlockCopyState *s, int64_t offset, int64_t bytes)
 655{
 656    QEMU_LOCK_GUARD(&s->lock);
 657
 658    bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
 659    if (s->progress) {
 660        progress_set_remaining(s->progress,
 661                               bdrv_get_dirty_count(s->copy_bitmap) +
 662                               s->in_flight_bytes);
 663    }
 664}
 665
 666/*
 667 * Reset bits in copy_bitmap starting at offset if they represent unallocated
 668 * data in the image. May reset subsequent contiguous bits.
 669 * @return 0 when the cluster at @offset was unallocated,
 670 *         1 otherwise, and -ret on error.
 671 */
 672int64_t block_copy_reset_unallocated(BlockCopyState *s,
 673                                     int64_t offset, int64_t *count)
 674{
 675    int ret;
 676    int64_t clusters, bytes;
 677
 678    ret = block_copy_is_cluster_allocated(s, offset, &clusters);
 679    if (ret < 0) {
 680        return ret;
 681    }
 682
 683    bytes = clusters * s->cluster_size;
 684
 685    if (!ret) {
 686        block_copy_reset(s, offset, bytes);
 687    }
 688
 689    *count = bytes;
 690    return ret;
 691}
 692
 693/*
 694 * block_copy_dirty_clusters
 695 *
 696 * Copy dirty clusters in @offset/@bytes range.
 697 * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty
 698 * clusters found and -errno on failure.
 699 */
 700static int coroutine_fn
 701block_copy_dirty_clusters(BlockCopyCallState *call_state)
 702{
 703    BlockCopyState *s = call_state->s;
 704    int64_t offset = call_state->offset;
 705    int64_t bytes = call_state->bytes;
 706
 707    int ret = 0;
 708    bool found_dirty = false;
 709    int64_t end = offset + bytes;
 710    AioTaskPool *aio = NULL;
 711
 712    /*
 713     * block_copy() user is responsible for keeping source and target in same
 714     * aio context
 715     */
 716    assert(bdrv_get_aio_context(s->source->bs) ==
 717           bdrv_get_aio_context(s->target->bs));
 718
 719    assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
 720    assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
 721
 722    while (bytes && aio_task_pool_status(aio) == 0 &&
 723           !qatomic_read(&call_state->cancelled)) {
 724        BlockCopyTask *task;
 725        int64_t status_bytes;
 726
 727        task = block_copy_task_create(s, call_state, offset, bytes);
 728        if (!task) {
 729            /* No more dirty bits in the bitmap */
 730            trace_block_copy_skip_range(s, offset, bytes);
 731            break;
 732        }
 733        if (task->req.offset > offset) {
 734            trace_block_copy_skip_range(s, offset, task->req.offset - offset);
 735        }
 736
 737        found_dirty = true;
 738
 739        ret = block_copy_block_status(s, task->req.offset, task->req.bytes,
 740                                      &status_bytes);
 741        assert(ret >= 0); /* never fail */
 742        if (status_bytes < task->req.bytes) {
 743            block_copy_task_shrink(task, status_bytes);
 744        }
 745        if (qatomic_read(&s->skip_unallocated) &&
 746            !(ret & BDRV_BLOCK_ALLOCATED)) {
 747            block_copy_task_end(task, 0);
 748            trace_block_copy_skip_range(s, task->req.offset, task->req.bytes);
 749            offset = task_end(task);
 750            bytes = end - offset;
 751            g_free(task);
 752            continue;
 753        }
 754        if (ret & BDRV_BLOCK_ZERO) {
 755            task->method = COPY_WRITE_ZEROES;
 756        }
 757
 758        if (!call_state->ignore_ratelimit) {
 759            uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0);
 760            if (ns > 0) {
 761                block_copy_task_end(task, -EAGAIN);
 762                g_free(task);
 763                qemu_co_sleep_ns_wakeable(&call_state->sleep,
 764                                          QEMU_CLOCK_REALTIME, ns);
 765                continue;
 766            }
 767        }
 768
 769        ratelimit_calculate_delay(&s->rate_limit, task->req.bytes);
 770
 771        trace_block_copy_process(s, task->req.offset);
 772
 773        co_get_from_shres(s->mem, task->req.bytes);
 774
 775        offset = task_end(task);
 776        bytes = end - offset;
 777
 778        if (!aio && bytes) {
 779            aio = aio_task_pool_new(call_state->max_workers);
 780        }
 781
 782        ret = block_copy_task_run(aio, task);
 783        if (ret < 0) {
 784            goto out;
 785        }
 786    }
 787
 788out:
 789    if (aio) {
 790        aio_task_pool_wait_all(aio);
 791
 792        /*
 793         * We are not really interested in -ECANCELED returned from
 794         * block_copy_task_run. If it fails, it means some task already failed
 795         * for real reason, let's return first failure.
 796         * Still, assert that we don't rewrite failure by success.
 797         *
 798         * Note: ret may be positive here because of block-status result.
 799         */
 800        assert(ret >= 0 || aio_task_pool_status(aio) < 0);
 801        ret = aio_task_pool_status(aio);
 802
 803        aio_task_pool_free(aio);
 804    }
 805
 806    return ret < 0 ? ret : found_dirty;
 807}
 808
 809void block_copy_kick(BlockCopyCallState *call_state)
 810{
 811    qemu_co_sleep_wake(&call_state->sleep);
 812}
 813
 814/*
 815 * block_copy_common
 816 *
 817 * Copy requested region, accordingly to dirty bitmap.
 818 * Collaborate with parallel block_copy requests: if they succeed it will help
 819 * us. If they fail, we will retry not-copied regions. So, if we return error,
 820 * it means that some I/O operation failed in context of _this_ block_copy call,
 821 * not some parallel operation.
 822 */
 823static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
 824{
 825    int ret;
 826    BlockCopyState *s = call_state->s;
 827
 828    qemu_co_mutex_lock(&s->lock);
 829    QLIST_INSERT_HEAD(&s->calls, call_state, list);
 830    qemu_co_mutex_unlock(&s->lock);
 831
 832    do {
 833        ret = block_copy_dirty_clusters(call_state);
 834
 835        if (ret == 0 && !qatomic_read(&call_state->cancelled)) {
 836            WITH_QEMU_LOCK_GUARD(&s->lock) {
 837                /*
 838                 * Check that there is no task we still need to
 839                 * wait to complete
 840                 */
 841                ret = reqlist_wait_one(&s->reqs, call_state->offset,
 842                                       call_state->bytes, &s->lock);
 843                if (ret == 0) {
 844                    /*
 845                     * No pending tasks, but check again the bitmap in this
 846                     * same critical section, since a task might have failed
 847                     * between this and the critical section in
 848                     * block_copy_dirty_clusters().
 849                     *
 850                     * reqlist_wait_one return value 0 also means that it
 851                     * didn't release the lock. So, we are still in the same
 852                     * critical section, not interrupted by any concurrent
 853                     * access to state.
 854                     */
 855                    ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap,
 856                                                       call_state->offset,
 857                                                       call_state->bytes) >= 0;
 858                }
 859            }
 860        }
 861
 862        /*
 863         * We retry in two cases:
 864         * 1. Some progress done
 865         *    Something was copied, which means that there were yield points
 866         *    and some new dirty bits may have appeared (due to failed parallel
 867         *    block-copy requests).
 868         * 2. We have waited for some intersecting block-copy request
 869         *    It may have failed and produced new dirty bits.
 870         */
 871    } while (ret > 0 && !qatomic_read(&call_state->cancelled));
 872
 873    qatomic_store_release(&call_state->finished, true);
 874
 875    if (call_state->cb) {
 876        call_state->cb(call_state->cb_opaque);
 877    }
 878
 879    qemu_co_mutex_lock(&s->lock);
 880    QLIST_REMOVE(call_state, list);
 881    qemu_co_mutex_unlock(&s->lock);
 882
 883    return ret;
 884}
 885
 886static void coroutine_fn block_copy_async_co_entry(void *opaque)
 887{
 888    block_copy_common(opaque);
 889}
 890
 891int coroutine_fn block_copy(BlockCopyState *s, int64_t start, int64_t bytes,
 892                            bool ignore_ratelimit, uint64_t timeout_ns,
 893                            BlockCopyAsyncCallbackFunc cb,
 894                            void *cb_opaque)
 895{
 896    int ret;
 897    BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
 898
 899    *call_state = (BlockCopyCallState) {
 900        .s = s,
 901        .offset = start,
 902        .bytes = bytes,
 903        .ignore_ratelimit = ignore_ratelimit,
 904        .max_workers = BLOCK_COPY_MAX_WORKERS,
 905        .cb = cb,
 906        .cb_opaque = cb_opaque,
 907    };
 908
 909    ret = qemu_co_timeout(block_copy_async_co_entry, call_state, timeout_ns,
 910                          g_free);
 911    if (ret < 0) {
 912        assert(ret == -ETIMEDOUT);
 913        block_copy_call_cancel(call_state);
 914        /* call_state will be freed by running coroutine. */
 915        return ret;
 916    }
 917
 918    ret = call_state->ret;
 919    g_free(call_state);
 920
 921    return ret;
 922}
 923
 924BlockCopyCallState *block_copy_async(BlockCopyState *s,
 925                                     int64_t offset, int64_t bytes,
 926                                     int max_workers, int64_t max_chunk,
 927                                     BlockCopyAsyncCallbackFunc cb,
 928                                     void *cb_opaque)
 929{
 930    BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
 931
 932    *call_state = (BlockCopyCallState) {
 933        .s = s,
 934        .offset = offset,
 935        .bytes = bytes,
 936        .max_workers = max_workers,
 937        .max_chunk = max_chunk,
 938        .cb = cb,
 939        .cb_opaque = cb_opaque,
 940
 941        .co = qemu_coroutine_create(block_copy_async_co_entry, call_state),
 942    };
 943
 944    qemu_coroutine_enter(call_state->co);
 945
 946    return call_state;
 947}
 948
 949void block_copy_call_free(BlockCopyCallState *call_state)
 950{
 951    if (!call_state) {
 952        return;
 953    }
 954
 955    assert(qatomic_read(&call_state->finished));
 956    g_free(call_state);
 957}
 958
 959bool block_copy_call_finished(BlockCopyCallState *call_state)
 960{
 961    return qatomic_read(&call_state->finished);
 962}
 963
 964bool block_copy_call_succeeded(BlockCopyCallState *call_state)
 965{
 966    return qatomic_load_acquire(&call_state->finished) &&
 967           !qatomic_read(&call_state->cancelled) &&
 968           call_state->ret == 0;
 969}
 970
 971bool block_copy_call_failed(BlockCopyCallState *call_state)
 972{
 973    return qatomic_load_acquire(&call_state->finished) &&
 974           !qatomic_read(&call_state->cancelled) &&
 975           call_state->ret < 0;
 976}
 977
 978bool block_copy_call_cancelled(BlockCopyCallState *call_state)
 979{
 980    return qatomic_read(&call_state->cancelled);
 981}
 982
 983int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
 984{
 985    assert(qatomic_load_acquire(&call_state->finished));
 986    if (error_is_read) {
 987        *error_is_read = call_state->error_is_read;
 988    }
 989    return call_state->ret;
 990}
 991
 992/*
 993 * Note that cancelling and finishing are racy.
 994 * User can cancel a block-copy that is already finished.
 995 */
 996void block_copy_call_cancel(BlockCopyCallState *call_state)
 997{
 998    qatomic_set(&call_state->cancelled, true);
 999    block_copy_kick(call_state);
1000}
1001
1002BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
1003{
1004    return s->copy_bitmap;
1005}
1006
1007int64_t block_copy_cluster_size(BlockCopyState *s)
1008{
1009    return s->cluster_size;
1010}
1011
1012void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
1013{
1014    qatomic_set(&s->skip_unallocated, skip);
1015}
1016
1017void block_copy_set_speed(BlockCopyState *s, uint64_t speed)
1018{
1019    ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME);
1020
1021    /*
1022     * Note: it's good to kick all call states from here, but it should be done
1023     * only from a coroutine, to not crash if s->calls list changed while
1024     * entering one call. So for now, the only user of this function kicks its
1025     * only one call_state by hand.
1026     */
1027}
1028