qemu/block/replication.c
<<
>>
Prefs
   1/*
   2 * Replication Block filter
   3 *
   4 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   5 * Copyright (c) 2016 Intel Corporation
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 *
   8 * Author:
   9 *   Wen Congyang <wency@cn.fujitsu.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  12 * See the COPYING file in the top-level directory.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qemu/module.h"
  17#include "qemu/option.h"
  18#include "block/nbd.h"
  19#include "block/blockjob.h"
  20#include "block/block_int.h"
  21#include "block/block_backup.h"
  22#include "sysemu/block-backend.h"
  23#include "qapi/error.h"
  24#include "qapi/qmp/qdict.h"
  25#include "block/replication.h"
  26
  27typedef enum {
  28    BLOCK_REPLICATION_NONE,             /* block replication is not started */
  29    BLOCK_REPLICATION_RUNNING,          /* block replication is running */
  30    BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
  31    BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
  32    BLOCK_REPLICATION_DONE,             /* block replication is done */
  33} ReplicationStage;
  34
  35typedef struct BDRVReplicationState {
  36    ReplicationMode mode;
  37    ReplicationStage stage;
  38    BlockJob *commit_job;
  39    BdrvChild *hidden_disk;
  40    BdrvChild *secondary_disk;
  41    BlockJob *backup_job;
  42    char *top_id;
  43    ReplicationState *rs;
  44    Error *blocker;
  45    bool orig_hidden_read_only;
  46    bool orig_secondary_read_only;
  47    int error;
  48} BDRVReplicationState;
  49
  50static void replication_start(ReplicationState *rs, ReplicationMode mode,
  51                              Error **errp);
  52static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
  53static void replication_get_error(ReplicationState *rs, Error **errp);
  54static void replication_stop(ReplicationState *rs, bool failover,
  55                             Error **errp);
  56
  57#define REPLICATION_MODE        "mode"
  58#define REPLICATION_TOP_ID      "top-id"
  59static QemuOptsList replication_runtime_opts = {
  60    .name = "replication",
  61    .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
  62    .desc = {
  63        {
  64            .name = REPLICATION_MODE,
  65            .type = QEMU_OPT_STRING,
  66        },
  67        {
  68            .name = REPLICATION_TOP_ID,
  69            .type = QEMU_OPT_STRING,
  70        },
  71        { /* end of list */ }
  72    },
  73};
  74
  75static ReplicationOps replication_ops = {
  76    .start = replication_start,
  77    .checkpoint = replication_do_checkpoint,
  78    .get_error = replication_get_error,
  79    .stop = replication_stop,
  80};
  81
  82static int replication_open(BlockDriverState *bs, QDict *options,
  83                            int flags, Error **errp)
  84{
  85    int ret;
  86    BDRVReplicationState *s = bs->opaque;
  87    QemuOpts *opts = NULL;
  88    const char *mode;
  89    const char *top_id;
  90
  91    ret = bdrv_open_file_child(NULL, options, "file", bs, errp);
  92    if (ret < 0) {
  93        return ret;
  94    }
  95
  96    ret = -EINVAL;
  97    opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
  98    if (!qemu_opts_absorb_qdict(opts, options, errp)) {
  99        goto fail;
 100    }
 101
 102    mode = qemu_opt_get(opts, REPLICATION_MODE);
 103    if (!mode) {
 104        error_setg(errp, "Missing the option mode");
 105        goto fail;
 106    }
 107
 108    if (!strcmp(mode, "primary")) {
 109        s->mode = REPLICATION_MODE_PRIMARY;
 110        top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
 111        if (top_id) {
 112            error_setg(errp,
 113                       "The primary side does not support option top-id");
 114            goto fail;
 115        }
 116    } else if (!strcmp(mode, "secondary")) {
 117        s->mode = REPLICATION_MODE_SECONDARY;
 118        top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
 119        s->top_id = g_strdup(top_id);
 120        if (!s->top_id) {
 121            error_setg(errp, "Missing the option top-id");
 122            goto fail;
 123        }
 124    } else {
 125        error_setg(errp,
 126                   "The option mode's value should be primary or secondary");
 127        goto fail;
 128    }
 129
 130    s->rs = replication_new(bs, &replication_ops);
 131
 132    ret = 0;
 133
 134fail:
 135    qemu_opts_del(opts);
 136    return ret;
 137}
 138
 139static void replication_close(BlockDriverState *bs)
 140{
 141    BDRVReplicationState *s = bs->opaque;
 142    Job *commit_job;
 143    GLOBAL_STATE_CODE();
 144
 145    if (s->stage == BLOCK_REPLICATION_RUNNING) {
 146        replication_stop(s->rs, false, NULL);
 147    }
 148    if (s->stage == BLOCK_REPLICATION_FAILOVER) {
 149        commit_job = &s->commit_job->job;
 150        assert(commit_job->aio_context == qemu_get_current_aio_context());
 151        job_cancel_sync(commit_job, false);
 152    }
 153
 154    if (s->mode == REPLICATION_MODE_SECONDARY) {
 155        g_free(s->top_id);
 156    }
 157
 158    replication_remove(s->rs);
 159}
 160
 161static void replication_child_perm(BlockDriverState *bs, BdrvChild *c,
 162                                   BdrvChildRole role,
 163                                   BlockReopenQueue *reopen_queue,
 164                                   uint64_t perm, uint64_t shared,
 165                                   uint64_t *nperm, uint64_t *nshared)
 166{
 167    if (role & BDRV_CHILD_PRIMARY) {
 168        *nperm = BLK_PERM_CONSISTENT_READ;
 169    } else {
 170        *nperm = 0;
 171    }
 172
 173    if ((bs->open_flags & (BDRV_O_INACTIVE | BDRV_O_RDWR)) == BDRV_O_RDWR) {
 174        *nperm |= BLK_PERM_WRITE;
 175    }
 176    *nshared = BLK_PERM_CONSISTENT_READ
 177               | BLK_PERM_WRITE
 178               | BLK_PERM_WRITE_UNCHANGED;
 179    return;
 180}
 181
 182static int64_t replication_getlength(BlockDriverState *bs)
 183{
 184    return bdrv_getlength(bs->file->bs);
 185}
 186
 187static int replication_get_io_status(BDRVReplicationState *s)
 188{
 189    switch (s->stage) {
 190    case BLOCK_REPLICATION_NONE:
 191        return -EIO;
 192    case BLOCK_REPLICATION_RUNNING:
 193        return 0;
 194    case BLOCK_REPLICATION_FAILOVER:
 195        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
 196    case BLOCK_REPLICATION_FAILOVER_FAILED:
 197        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
 198    case BLOCK_REPLICATION_DONE:
 199        /*
 200         * active commit job completes, and active disk and secondary_disk
 201         * is swapped, so we can operate bs->file directly
 202         */
 203        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
 204    default:
 205        abort();
 206    }
 207}
 208
 209static int replication_return_value(BDRVReplicationState *s, int ret)
 210{
 211    if (s->mode == REPLICATION_MODE_SECONDARY) {
 212        return ret;
 213    }
 214
 215    if (ret < 0) {
 216        s->error = ret;
 217        ret = 0;
 218    }
 219
 220    return ret;
 221}
 222
 223static coroutine_fn int replication_co_readv(BlockDriverState *bs,
 224                                             int64_t sector_num,
 225                                             int remaining_sectors,
 226                                             QEMUIOVector *qiov)
 227{
 228    BDRVReplicationState *s = bs->opaque;
 229    int ret;
 230
 231    if (s->mode == REPLICATION_MODE_PRIMARY) {
 232        /* We only use it to forward primary write requests */
 233        return -EIO;
 234    }
 235
 236    ret = replication_get_io_status(s);
 237    if (ret < 0) {
 238        return ret;
 239    }
 240
 241    ret = bdrv_co_preadv(bs->file, sector_num * BDRV_SECTOR_SIZE,
 242                         remaining_sectors * BDRV_SECTOR_SIZE, qiov, 0);
 243
 244    return replication_return_value(s, ret);
 245}
 246
 247static coroutine_fn int replication_co_writev(BlockDriverState *bs,
 248                                              int64_t sector_num,
 249                                              int remaining_sectors,
 250                                              QEMUIOVector *qiov,
 251                                              int flags)
 252{
 253    BDRVReplicationState *s = bs->opaque;
 254    QEMUIOVector hd_qiov;
 255    uint64_t bytes_done = 0;
 256    BdrvChild *top = bs->file;
 257    BdrvChild *base = s->secondary_disk;
 258    BdrvChild *target;
 259    int ret;
 260    int64_t n;
 261
 262    ret = replication_get_io_status(s);
 263    if (ret < 0) {
 264        goto out;
 265    }
 266
 267    if (ret == 0) {
 268        ret = bdrv_co_pwritev(top, sector_num * BDRV_SECTOR_SIZE,
 269                              remaining_sectors * BDRV_SECTOR_SIZE, qiov, 0);
 270        return replication_return_value(s, ret);
 271    }
 272
 273    /*
 274     * Failover failed, only write to active disk if the sectors
 275     * have already been allocated in active disk/hidden disk.
 276     */
 277    qemu_iovec_init(&hd_qiov, qiov->niov);
 278    while (remaining_sectors > 0) {
 279        int64_t count;
 280
 281        ret = bdrv_is_allocated_above(top->bs, base->bs, false,
 282                                      sector_num * BDRV_SECTOR_SIZE,
 283                                      remaining_sectors * BDRV_SECTOR_SIZE,
 284                                      &count);
 285        if (ret < 0) {
 286            goto out1;
 287        }
 288
 289        assert(QEMU_IS_ALIGNED(count, BDRV_SECTOR_SIZE));
 290        n = count >> BDRV_SECTOR_BITS;
 291        qemu_iovec_reset(&hd_qiov);
 292        qemu_iovec_concat(&hd_qiov, qiov, bytes_done, count);
 293
 294        target = ret ? top : base;
 295        ret = bdrv_co_pwritev(target, sector_num * BDRV_SECTOR_SIZE,
 296                              n * BDRV_SECTOR_SIZE, &hd_qiov, 0);
 297        if (ret < 0) {
 298            goto out1;
 299        }
 300
 301        remaining_sectors -= n;
 302        sector_num += n;
 303        bytes_done += count;
 304    }
 305
 306out1:
 307    qemu_iovec_destroy(&hd_qiov);
 308out:
 309    return ret;
 310}
 311
 312static void secondary_do_checkpoint(BlockDriverState *bs, Error **errp)
 313{
 314    BDRVReplicationState *s = bs->opaque;
 315    BdrvChild *active_disk = bs->file;
 316    Error *local_err = NULL;
 317    int ret;
 318
 319    if (!s->backup_job) {
 320        error_setg(errp, "Backup job was cancelled unexpectedly");
 321        return;
 322    }
 323
 324    backup_do_checkpoint(s->backup_job, &local_err);
 325    if (local_err) {
 326        error_propagate(errp, local_err);
 327        return;
 328    }
 329
 330    if (!active_disk->bs->drv) {
 331        error_setg(errp, "Active disk %s is ejected",
 332                   active_disk->bs->node_name);
 333        return;
 334    }
 335
 336    ret = bdrv_make_empty(active_disk, errp);
 337    if (ret < 0) {
 338        return;
 339    }
 340
 341    if (!s->hidden_disk->bs->drv) {
 342        error_setg(errp, "Hidden disk %s is ejected",
 343                   s->hidden_disk->bs->node_name);
 344        return;
 345    }
 346
 347    ret = bdrv_make_empty(s->hidden_disk, errp);
 348    if (ret < 0) {
 349        return;
 350    }
 351}
 352
 353/* This function is supposed to be called twice:
 354 * first with writable = true, then with writable = false.
 355 * The first call puts s->hidden_disk and s->secondary_disk in
 356 * r/w mode, and the second puts them back in their original state.
 357 */
 358static void reopen_backing_file(BlockDriverState *bs, bool writable,
 359                                Error **errp)
 360{
 361    BDRVReplicationState *s = bs->opaque;
 362    BdrvChild *hidden_disk, *secondary_disk;
 363    BlockReopenQueue *reopen_queue = NULL;
 364
 365    /*
 366     * s->hidden_disk and s->secondary_disk may not be set yet, as they will
 367     * only be set after the children are writable.
 368     */
 369    hidden_disk = bs->file->bs->backing;
 370    secondary_disk = hidden_disk->bs->backing;
 371
 372    if (writable) {
 373        s->orig_hidden_read_only = bdrv_is_read_only(hidden_disk->bs);
 374        s->orig_secondary_read_only = bdrv_is_read_only(secondary_disk->bs);
 375    }
 376
 377    bdrv_subtree_drained_begin(hidden_disk->bs);
 378    bdrv_subtree_drained_begin(secondary_disk->bs);
 379
 380    if (s->orig_hidden_read_only) {
 381        QDict *opts = qdict_new();
 382        qdict_put_bool(opts, BDRV_OPT_READ_ONLY, !writable);
 383        reopen_queue = bdrv_reopen_queue(reopen_queue, hidden_disk->bs,
 384                                         opts, true);
 385    }
 386
 387    if (s->orig_secondary_read_only) {
 388        QDict *opts = qdict_new();
 389        qdict_put_bool(opts, BDRV_OPT_READ_ONLY, !writable);
 390        reopen_queue = bdrv_reopen_queue(reopen_queue, secondary_disk->bs,
 391                                         opts, true);
 392    }
 393
 394    if (reopen_queue) {
 395        AioContext *ctx = bdrv_get_aio_context(bs);
 396        if (ctx != qemu_get_aio_context()) {
 397            aio_context_release(ctx);
 398        }
 399        bdrv_reopen_multiple(reopen_queue, errp);
 400        if (ctx != qemu_get_aio_context()) {
 401            aio_context_acquire(ctx);
 402        }
 403    }
 404
 405    bdrv_subtree_drained_end(hidden_disk->bs);
 406    bdrv_subtree_drained_end(secondary_disk->bs);
 407}
 408
 409static void backup_job_cleanup(BlockDriverState *bs)
 410{
 411    BDRVReplicationState *s = bs->opaque;
 412    BlockDriverState *top_bs;
 413
 414    s->backup_job = NULL;
 415
 416    top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
 417    if (!top_bs) {
 418        return;
 419    }
 420    bdrv_op_unblock_all(top_bs, s->blocker);
 421    error_free(s->blocker);
 422    reopen_backing_file(bs, false, NULL);
 423}
 424
 425static void backup_job_completed(void *opaque, int ret)
 426{
 427    BlockDriverState *bs = opaque;
 428    BDRVReplicationState *s = bs->opaque;
 429
 430    if (s->stage != BLOCK_REPLICATION_FAILOVER) {
 431        /* The backup job is cancelled unexpectedly */
 432        s->error = -EIO;
 433    }
 434
 435    backup_job_cleanup(bs);
 436}
 437
 438static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
 439{
 440    BdrvChild *child;
 441
 442    /* The bs itself is the top_bs */
 443    if (top_bs == bs) {
 444        return true;
 445    }
 446
 447    /* Iterate over top_bs's children */
 448    QLIST_FOREACH(child, &top_bs->children, next) {
 449        if (child->bs == bs || check_top_bs(child->bs, bs)) {
 450            return true;
 451        }
 452    }
 453
 454    return false;
 455}
 456
 457static void replication_start(ReplicationState *rs, ReplicationMode mode,
 458                              Error **errp)
 459{
 460    BlockDriverState *bs = rs->opaque;
 461    BDRVReplicationState *s;
 462    BlockDriverState *top_bs;
 463    BdrvChild *active_disk, *hidden_disk, *secondary_disk;
 464    int64_t active_length, hidden_length, disk_length;
 465    AioContext *aio_context;
 466    Error *local_err = NULL;
 467    BackupPerf perf = { .use_copy_range = true, .max_workers = 1 };
 468
 469    aio_context = bdrv_get_aio_context(bs);
 470    aio_context_acquire(aio_context);
 471    s = bs->opaque;
 472
 473    if (s->stage == BLOCK_REPLICATION_DONE ||
 474        s->stage == BLOCK_REPLICATION_FAILOVER) {
 475        /*
 476         * This case happens when a secondary is promoted to primary.
 477         * Ignore the request because the secondary side of replication
 478         * doesn't have to do anything anymore.
 479         */
 480        aio_context_release(aio_context);
 481        return;
 482    }
 483
 484    if (s->stage != BLOCK_REPLICATION_NONE) {
 485        error_setg(errp, "Block replication is running or done");
 486        aio_context_release(aio_context);
 487        return;
 488    }
 489
 490    if (s->mode != mode) {
 491        error_setg(errp, "The parameter mode's value is invalid, needs %d,"
 492                   " but got %d", s->mode, mode);
 493        aio_context_release(aio_context);
 494        return;
 495    }
 496
 497    switch (s->mode) {
 498    case REPLICATION_MODE_PRIMARY:
 499        break;
 500    case REPLICATION_MODE_SECONDARY:
 501        active_disk = bs->file;
 502        if (!active_disk || !active_disk->bs || !active_disk->bs->backing) {
 503            error_setg(errp, "Active disk doesn't have backing file");
 504            aio_context_release(aio_context);
 505            return;
 506        }
 507
 508        hidden_disk = active_disk->bs->backing;
 509        if (!hidden_disk->bs || !hidden_disk->bs->backing) {
 510            error_setg(errp, "Hidden disk doesn't have backing file");
 511            aio_context_release(aio_context);
 512            return;
 513        }
 514
 515        secondary_disk = hidden_disk->bs->backing;
 516        if (!secondary_disk->bs || !bdrv_has_blk(secondary_disk->bs)) {
 517            error_setg(errp, "The secondary disk doesn't have block backend");
 518            aio_context_release(aio_context);
 519            return;
 520        }
 521
 522        /* verify the length */
 523        active_length = bdrv_getlength(active_disk->bs);
 524        hidden_length = bdrv_getlength(hidden_disk->bs);
 525        disk_length = bdrv_getlength(secondary_disk->bs);
 526        if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
 527            active_length != hidden_length || hidden_length != disk_length) {
 528            error_setg(errp, "Active disk, hidden disk, secondary disk's length"
 529                       " are not the same");
 530            aio_context_release(aio_context);
 531            return;
 532        }
 533
 534        /* Must be true, or the bdrv_getlength() calls would have failed */
 535        assert(active_disk->bs->drv && hidden_disk->bs->drv);
 536
 537        if (!active_disk->bs->drv->bdrv_make_empty ||
 538            !hidden_disk->bs->drv->bdrv_make_empty) {
 539            error_setg(errp,
 540                       "Active disk or hidden disk doesn't support make_empty");
 541            aio_context_release(aio_context);
 542            return;
 543        }
 544
 545        /* reopen the backing file in r/w mode */
 546        reopen_backing_file(bs, true, &local_err);
 547        if (local_err) {
 548            error_propagate(errp, local_err);
 549            aio_context_release(aio_context);
 550            return;
 551        }
 552
 553        bdrv_ref(hidden_disk->bs);
 554        s->hidden_disk = bdrv_attach_child(bs, hidden_disk->bs, "hidden disk",
 555                                           &child_of_bds, BDRV_CHILD_DATA,
 556                                           &local_err);
 557        if (local_err) {
 558            error_propagate(errp, local_err);
 559            aio_context_release(aio_context);
 560            return;
 561        }
 562
 563        bdrv_ref(secondary_disk->bs);
 564        s->secondary_disk = bdrv_attach_child(bs, secondary_disk->bs,
 565                                              "secondary disk", &child_of_bds,
 566                                              BDRV_CHILD_DATA, &local_err);
 567        if (local_err) {
 568            error_propagate(errp, local_err);
 569            aio_context_release(aio_context);
 570            return;
 571        }
 572
 573        /* start backup job now */
 574        error_setg(&s->blocker,
 575                   "Block device is in use by internal backup job");
 576
 577        top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
 578        if (!top_bs || !bdrv_is_root_node(top_bs) ||
 579            !check_top_bs(top_bs, bs)) {
 580            error_setg(errp, "No top_bs or it is invalid");
 581            reopen_backing_file(bs, false, NULL);
 582            aio_context_release(aio_context);
 583            return;
 584        }
 585        bdrv_op_block_all(top_bs, s->blocker);
 586        bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
 587
 588        s->backup_job = backup_job_create(
 589                                NULL, s->secondary_disk->bs, s->hidden_disk->bs,
 590                                0, MIRROR_SYNC_MODE_NONE, NULL, 0, false, NULL,
 591                                &perf,
 592                                BLOCKDEV_ON_ERROR_REPORT,
 593                                BLOCKDEV_ON_ERROR_REPORT, JOB_INTERNAL,
 594                                backup_job_completed, bs, NULL, &local_err);
 595        if (local_err) {
 596            error_propagate(errp, local_err);
 597            backup_job_cleanup(bs);
 598            aio_context_release(aio_context);
 599            return;
 600        }
 601        job_start(&s->backup_job->job);
 602        break;
 603    default:
 604        aio_context_release(aio_context);
 605        abort();
 606    }
 607
 608    s->stage = BLOCK_REPLICATION_RUNNING;
 609
 610    if (s->mode == REPLICATION_MODE_SECONDARY) {
 611        secondary_do_checkpoint(bs, errp);
 612    }
 613
 614    s->error = 0;
 615    aio_context_release(aio_context);
 616}
 617
 618static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
 619{
 620    BlockDriverState *bs = rs->opaque;
 621    BDRVReplicationState *s;
 622    AioContext *aio_context;
 623
 624    aio_context = bdrv_get_aio_context(bs);
 625    aio_context_acquire(aio_context);
 626    s = bs->opaque;
 627
 628    if (s->stage == BLOCK_REPLICATION_DONE ||
 629        s->stage == BLOCK_REPLICATION_FAILOVER) {
 630        /*
 631         * This case happens when a secondary was promoted to primary.
 632         * Ignore the request because the secondary side of replication
 633         * doesn't have to do anything anymore.
 634         */
 635        aio_context_release(aio_context);
 636        return;
 637    }
 638
 639    if (s->mode == REPLICATION_MODE_SECONDARY) {
 640        secondary_do_checkpoint(bs, errp);
 641    }
 642    aio_context_release(aio_context);
 643}
 644
 645static void replication_get_error(ReplicationState *rs, Error **errp)
 646{
 647    BlockDriverState *bs = rs->opaque;
 648    BDRVReplicationState *s;
 649    AioContext *aio_context;
 650
 651    aio_context = bdrv_get_aio_context(bs);
 652    aio_context_acquire(aio_context);
 653    s = bs->opaque;
 654
 655    if (s->stage == BLOCK_REPLICATION_NONE) {
 656        error_setg(errp, "Block replication is not running");
 657        aio_context_release(aio_context);
 658        return;
 659    }
 660
 661    if (s->error) {
 662        error_setg(errp, "I/O error occurred");
 663        aio_context_release(aio_context);
 664        return;
 665    }
 666    aio_context_release(aio_context);
 667}
 668
 669static void replication_done(void *opaque, int ret)
 670{
 671    BlockDriverState *bs = opaque;
 672    BDRVReplicationState *s = bs->opaque;
 673
 674    if (ret == 0) {
 675        s->stage = BLOCK_REPLICATION_DONE;
 676
 677        bdrv_unref_child(bs, s->secondary_disk);
 678        s->secondary_disk = NULL;
 679        bdrv_unref_child(bs, s->hidden_disk);
 680        s->hidden_disk = NULL;
 681        s->error = 0;
 682    } else {
 683        s->stage = BLOCK_REPLICATION_FAILOVER_FAILED;
 684        s->error = -EIO;
 685    }
 686}
 687
 688static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
 689{
 690    BlockDriverState *bs = rs->opaque;
 691    BDRVReplicationState *s;
 692    AioContext *aio_context;
 693
 694    aio_context = bdrv_get_aio_context(bs);
 695    aio_context_acquire(aio_context);
 696    s = bs->opaque;
 697
 698    if (s->stage == BLOCK_REPLICATION_DONE ||
 699        s->stage == BLOCK_REPLICATION_FAILOVER) {
 700        /*
 701         * This case happens when a secondary was promoted to primary.
 702         * Ignore the request because the secondary side of replication
 703         * doesn't have to do anything anymore.
 704         */
 705        aio_context_release(aio_context);
 706        return;
 707    }
 708
 709    if (s->stage != BLOCK_REPLICATION_RUNNING) {
 710        error_setg(errp, "Block replication is not running");
 711        aio_context_release(aio_context);
 712        return;
 713    }
 714
 715    switch (s->mode) {
 716    case REPLICATION_MODE_PRIMARY:
 717        s->stage = BLOCK_REPLICATION_DONE;
 718        s->error = 0;
 719        break;
 720    case REPLICATION_MODE_SECONDARY:
 721        /*
 722         * This BDS will be closed, and the job should be completed
 723         * before the BDS is closed, because we will access hidden
 724         * disk, secondary disk in backup_job_completed().
 725         */
 726        if (s->backup_job) {
 727            aio_context_release(aio_context);
 728            job_cancel_sync(&s->backup_job->job, true);
 729            aio_context_acquire(aio_context);
 730        }
 731
 732        if (!failover) {
 733            secondary_do_checkpoint(bs, errp);
 734            s->stage = BLOCK_REPLICATION_DONE;
 735            aio_context_release(aio_context);
 736            return;
 737        }
 738
 739        s->stage = BLOCK_REPLICATION_FAILOVER;
 740        s->commit_job = commit_active_start(
 741                            NULL, bs->file->bs, s->secondary_disk->bs,
 742                            JOB_INTERNAL, 0, BLOCKDEV_ON_ERROR_REPORT,
 743                            NULL, replication_done, bs, true, errp);
 744        break;
 745    default:
 746        aio_context_release(aio_context);
 747        abort();
 748    }
 749    aio_context_release(aio_context);
 750}
 751
 752static const char *const replication_strong_runtime_opts[] = {
 753    REPLICATION_MODE,
 754    REPLICATION_TOP_ID,
 755
 756    NULL
 757};
 758
 759static BlockDriver bdrv_replication = {
 760    .format_name                = "replication",
 761    .instance_size              = sizeof(BDRVReplicationState),
 762
 763    .bdrv_open                  = replication_open,
 764    .bdrv_close                 = replication_close,
 765    .bdrv_child_perm            = replication_child_perm,
 766
 767    .bdrv_getlength             = replication_getlength,
 768    .bdrv_co_readv              = replication_co_readv,
 769    .bdrv_co_writev             = replication_co_writev,
 770
 771    .is_filter                  = true,
 772
 773    .has_variable_length        = true,
 774    .strong_runtime_opts        = replication_strong_runtime_opts,
 775};
 776
 777static void bdrv_replication_init(void)
 778{
 779    bdrv_register(&bdrv_replication);
 780}
 781
 782block_init(bdrv_replication_init);
 783