qemu/block/replication.c
<<
>>
Prefs
   1/*
   2 * Replication Block filter
   3 *
   4 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   5 * Copyright (c) 2016 Intel Corporation
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 *
   8 * Author:
   9 *   Wen Congyang <wency@cn.fujitsu.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  12 * See the COPYING file in the top-level directory.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qemu/module.h"
  17#include "qemu/option.h"
  18#include "block/nbd.h"
  19#include "block/blockjob.h"
  20#include "block/block_int.h"
  21#include "block/block_backup.h"
  22#include "sysemu/block-backend.h"
  23#include "qapi/error.h"
  24#include "qapi/qmp/qdict.h"
  25#include "block/replication.h"
  26
  27typedef enum {
  28    BLOCK_REPLICATION_NONE,             /* block replication is not started */
  29    BLOCK_REPLICATION_RUNNING,          /* block replication is running */
  30    BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
  31    BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
  32    BLOCK_REPLICATION_DONE,             /* block replication is done */
  33} ReplicationStage;
  34
  35typedef struct BDRVReplicationState {
  36    ReplicationMode mode;
  37    ReplicationStage stage;
  38    BlockJob *commit_job;
  39    BdrvChild *hidden_disk;
  40    BdrvChild *secondary_disk;
  41    BlockJob *backup_job;
  42    char *top_id;
  43    ReplicationState *rs;
  44    Error *blocker;
  45    bool orig_hidden_read_only;
  46    bool orig_secondary_read_only;
  47    int error;
  48} BDRVReplicationState;
  49
  50static void replication_start(ReplicationState *rs, ReplicationMode mode,
  51                              Error **errp);
  52static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
  53static void replication_get_error(ReplicationState *rs, Error **errp);
  54static void replication_stop(ReplicationState *rs, bool failover,
  55                             Error **errp);
  56
  57#define REPLICATION_MODE        "mode"
  58#define REPLICATION_TOP_ID      "top-id"
  59static QemuOptsList replication_runtime_opts = {
  60    .name = "replication",
  61    .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
  62    .desc = {
  63        {
  64            .name = REPLICATION_MODE,
  65            .type = QEMU_OPT_STRING,
  66        },
  67        {
  68            .name = REPLICATION_TOP_ID,
  69            .type = QEMU_OPT_STRING,
  70        },
  71        { /* end of list */ }
  72    },
  73};
  74
  75static ReplicationOps replication_ops = {
  76    .start = replication_start,
  77    .checkpoint = replication_do_checkpoint,
  78    .get_error = replication_get_error,
  79    .stop = replication_stop,
  80};
  81
  82static int replication_open(BlockDriverState *bs, QDict *options,
  83                            int flags, Error **errp)
  84{
  85    int ret;
  86    BDRVReplicationState *s = bs->opaque;
  87    QemuOpts *opts = NULL;
  88    const char *mode;
  89    const char *top_id;
  90
  91    ret = bdrv_open_file_child(NULL, options, "file", bs, errp);
  92    if (ret < 0) {
  93        return ret;
  94    }
  95
  96    ret = -EINVAL;
  97    opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
  98    if (!qemu_opts_absorb_qdict(opts, options, errp)) {
  99        goto fail;
 100    }
 101
 102    mode = qemu_opt_get(opts, REPLICATION_MODE);
 103    if (!mode) {
 104        error_setg(errp, "Missing the option mode");
 105        goto fail;
 106    }
 107
 108    if (!strcmp(mode, "primary")) {
 109        s->mode = REPLICATION_MODE_PRIMARY;
 110        top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
 111        if (top_id) {
 112            error_setg(errp,
 113                       "The primary side does not support option top-id");
 114            goto fail;
 115        }
 116    } else if (!strcmp(mode, "secondary")) {
 117        s->mode = REPLICATION_MODE_SECONDARY;
 118        top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
 119        s->top_id = g_strdup(top_id);
 120        if (!s->top_id) {
 121            error_setg(errp, "Missing the option top-id");
 122            goto fail;
 123        }
 124    } else {
 125        error_setg(errp,
 126                   "The option mode's value should be primary or secondary");
 127        goto fail;
 128    }
 129
 130    s->rs = replication_new(bs, &replication_ops);
 131
 132    ret = 0;
 133
 134fail:
 135    qemu_opts_del(opts);
 136    return ret;
 137}
 138
 139static void replication_close(BlockDriverState *bs)
 140{
 141    BDRVReplicationState *s = bs->opaque;
 142    Job *commit_job;
 143    GLOBAL_STATE_CODE();
 144
 145    if (s->stage == BLOCK_REPLICATION_RUNNING) {
 146        replication_stop(s->rs, false, NULL);
 147    }
 148    if (s->stage == BLOCK_REPLICATION_FAILOVER) {
 149        commit_job = &s->commit_job->job;
 150        assert(commit_job->aio_context == qemu_get_current_aio_context());
 151        job_cancel_sync(commit_job, false);
 152    }
 153
 154    if (s->mode == REPLICATION_MODE_SECONDARY) {
 155        g_free(s->top_id);
 156    }
 157
 158    replication_remove(s->rs);
 159}
 160
 161static void replication_child_perm(BlockDriverState *bs, BdrvChild *c,
 162                                   BdrvChildRole role,
 163                                   BlockReopenQueue *reopen_queue,
 164                                   uint64_t perm, uint64_t shared,
 165                                   uint64_t *nperm, uint64_t *nshared)
 166{
 167    if (role & BDRV_CHILD_PRIMARY) {
 168        *nperm = BLK_PERM_CONSISTENT_READ;
 169    } else {
 170        *nperm = 0;
 171    }
 172
 173    if ((bs->open_flags & (BDRV_O_INACTIVE | BDRV_O_RDWR)) == BDRV_O_RDWR) {
 174        *nperm |= BLK_PERM_WRITE;
 175    }
 176    *nshared = BLK_PERM_CONSISTENT_READ
 177               | BLK_PERM_WRITE
 178               | BLK_PERM_WRITE_UNCHANGED;
 179    return;
 180}
 181
 182static int64_t coroutine_fn GRAPH_RDLOCK
 183replication_co_getlength(BlockDriverState *bs)
 184{
 185    return bdrv_co_getlength(bs->file->bs);
 186}
 187
 188static int replication_get_io_status(BDRVReplicationState *s)
 189{
 190    switch (s->stage) {
 191    case BLOCK_REPLICATION_NONE:
 192        return -EIO;
 193    case BLOCK_REPLICATION_RUNNING:
 194        return 0;
 195    case BLOCK_REPLICATION_FAILOVER:
 196        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
 197    case BLOCK_REPLICATION_FAILOVER_FAILED:
 198        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
 199    case BLOCK_REPLICATION_DONE:
 200        /*
 201         * active commit job completes, and active disk and secondary_disk
 202         * is swapped, so we can operate bs->file directly
 203         */
 204        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
 205    default:
 206        abort();
 207    }
 208}
 209
 210static int replication_return_value(BDRVReplicationState *s, int ret)
 211{
 212    if (s->mode == REPLICATION_MODE_SECONDARY) {
 213        return ret;
 214    }
 215
 216    if (ret < 0) {
 217        s->error = ret;
 218        ret = 0;
 219    }
 220
 221    return ret;
 222}
 223
 224static int coroutine_fn GRAPH_RDLOCK
 225replication_co_readv(BlockDriverState *bs, int64_t sector_num,
 226                     int remaining_sectors, QEMUIOVector *qiov)
 227{
 228    BDRVReplicationState *s = bs->opaque;
 229    int ret;
 230
 231    if (s->mode == REPLICATION_MODE_PRIMARY) {
 232        /* We only use it to forward primary write requests */
 233        return -EIO;
 234    }
 235
 236    ret = replication_get_io_status(s);
 237    if (ret < 0) {
 238        return ret;
 239    }
 240
 241    ret = bdrv_co_preadv(bs->file, sector_num * BDRV_SECTOR_SIZE,
 242                         remaining_sectors * BDRV_SECTOR_SIZE, qiov, 0);
 243
 244    return replication_return_value(s, ret);
 245}
 246
 247static int coroutine_fn GRAPH_RDLOCK
 248replication_co_writev(BlockDriverState *bs, int64_t sector_num,
 249                      int remaining_sectors, QEMUIOVector *qiov, int flags)
 250{
 251    BDRVReplicationState *s = bs->opaque;
 252    QEMUIOVector hd_qiov;
 253    uint64_t bytes_done = 0;
 254    BdrvChild *top = bs->file;
 255    BdrvChild *base = s->secondary_disk;
 256    BdrvChild *target;
 257    int ret;
 258    int64_t n;
 259
 260    ret = replication_get_io_status(s);
 261    if (ret < 0) {
 262        goto out;
 263    }
 264
 265    if (ret == 0) {
 266        ret = bdrv_co_pwritev(top, sector_num * BDRV_SECTOR_SIZE,
 267                              remaining_sectors * BDRV_SECTOR_SIZE, qiov, 0);
 268        return replication_return_value(s, ret);
 269    }
 270
 271    /*
 272     * Failover failed, only write to active disk if the sectors
 273     * have already been allocated in active disk/hidden disk.
 274     */
 275    qemu_iovec_init(&hd_qiov, qiov->niov);
 276    while (remaining_sectors > 0) {
 277        int64_t count;
 278
 279        ret = bdrv_is_allocated_above(top->bs, base->bs, false,
 280                                      sector_num * BDRV_SECTOR_SIZE,
 281                                      remaining_sectors * BDRV_SECTOR_SIZE,
 282                                      &count);
 283        if (ret < 0) {
 284            goto out1;
 285        }
 286
 287        assert(QEMU_IS_ALIGNED(count, BDRV_SECTOR_SIZE));
 288        n = count >> BDRV_SECTOR_BITS;
 289        qemu_iovec_reset(&hd_qiov);
 290        qemu_iovec_concat(&hd_qiov, qiov, bytes_done, count);
 291
 292        target = ret ? top : base;
 293        ret = bdrv_co_pwritev(target, sector_num * BDRV_SECTOR_SIZE,
 294                              n * BDRV_SECTOR_SIZE, &hd_qiov, 0);
 295        if (ret < 0) {
 296            goto out1;
 297        }
 298
 299        remaining_sectors -= n;
 300        sector_num += n;
 301        bytes_done += count;
 302    }
 303
 304out1:
 305    qemu_iovec_destroy(&hd_qiov);
 306out:
 307    return ret;
 308}
 309
 310static void secondary_do_checkpoint(BlockDriverState *bs, Error **errp)
 311{
 312    BDRVReplicationState *s = bs->opaque;
 313    BdrvChild *active_disk = bs->file;
 314    Error *local_err = NULL;
 315    int ret;
 316
 317    if (!s->backup_job) {
 318        error_setg(errp, "Backup job was cancelled unexpectedly");
 319        return;
 320    }
 321
 322    backup_do_checkpoint(s->backup_job, &local_err);
 323    if (local_err) {
 324        error_propagate(errp, local_err);
 325        return;
 326    }
 327
 328    if (!active_disk->bs->drv) {
 329        error_setg(errp, "Active disk %s is ejected",
 330                   active_disk->bs->node_name);
 331        return;
 332    }
 333
 334    ret = bdrv_make_empty(active_disk, errp);
 335    if (ret < 0) {
 336        return;
 337    }
 338
 339    if (!s->hidden_disk->bs->drv) {
 340        error_setg(errp, "Hidden disk %s is ejected",
 341                   s->hidden_disk->bs->node_name);
 342        return;
 343    }
 344
 345    ret = bdrv_make_empty(s->hidden_disk, errp);
 346    if (ret < 0) {
 347        return;
 348    }
 349}
 350
 351/* This function is supposed to be called twice:
 352 * first with writable = true, then with writable = false.
 353 * The first call puts s->hidden_disk and s->secondary_disk in
 354 * r/w mode, and the second puts them back in their original state.
 355 */
 356static void reopen_backing_file(BlockDriverState *bs, bool writable,
 357                                Error **errp)
 358{
 359    BDRVReplicationState *s = bs->opaque;
 360    BdrvChild *hidden_disk, *secondary_disk;
 361    BlockReopenQueue *reopen_queue = NULL;
 362
 363    /*
 364     * s->hidden_disk and s->secondary_disk may not be set yet, as they will
 365     * only be set after the children are writable.
 366     */
 367    hidden_disk = bs->file->bs->backing;
 368    secondary_disk = hidden_disk->bs->backing;
 369
 370    if (writable) {
 371        s->orig_hidden_read_only = bdrv_is_read_only(hidden_disk->bs);
 372        s->orig_secondary_read_only = bdrv_is_read_only(secondary_disk->bs);
 373    }
 374
 375    if (s->orig_hidden_read_only) {
 376        QDict *opts = qdict_new();
 377        qdict_put_bool(opts, BDRV_OPT_READ_ONLY, !writable);
 378        reopen_queue = bdrv_reopen_queue(reopen_queue, hidden_disk->bs,
 379                                         opts, true);
 380    }
 381
 382    if (s->orig_secondary_read_only) {
 383        QDict *opts = qdict_new();
 384        qdict_put_bool(opts, BDRV_OPT_READ_ONLY, !writable);
 385        reopen_queue = bdrv_reopen_queue(reopen_queue, secondary_disk->bs,
 386                                         opts, true);
 387    }
 388
 389    if (reopen_queue) {
 390        AioContext *ctx = bdrv_get_aio_context(bs);
 391        if (ctx != qemu_get_aio_context()) {
 392            aio_context_release(ctx);
 393        }
 394        bdrv_reopen_multiple(reopen_queue, errp);
 395        if (ctx != qemu_get_aio_context()) {
 396            aio_context_acquire(ctx);
 397        }
 398    }
 399}
 400
 401static void backup_job_cleanup(BlockDriverState *bs)
 402{
 403    BDRVReplicationState *s = bs->opaque;
 404    BlockDriverState *top_bs;
 405
 406    s->backup_job = NULL;
 407
 408    top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
 409    if (!top_bs) {
 410        return;
 411    }
 412    bdrv_op_unblock_all(top_bs, s->blocker);
 413    error_free(s->blocker);
 414    reopen_backing_file(bs, false, NULL);
 415}
 416
 417static void backup_job_completed(void *opaque, int ret)
 418{
 419    BlockDriverState *bs = opaque;
 420    BDRVReplicationState *s = bs->opaque;
 421
 422    if (s->stage != BLOCK_REPLICATION_FAILOVER) {
 423        /* The backup job is cancelled unexpectedly */
 424        s->error = -EIO;
 425    }
 426
 427    backup_job_cleanup(bs);
 428}
 429
 430static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
 431{
 432    BdrvChild *child;
 433
 434    /* The bs itself is the top_bs */
 435    if (top_bs == bs) {
 436        return true;
 437    }
 438
 439    /* Iterate over top_bs's children */
 440    QLIST_FOREACH(child, &top_bs->children, next) {
 441        if (child->bs == bs || check_top_bs(child->bs, bs)) {
 442            return true;
 443        }
 444    }
 445
 446    return false;
 447}
 448
 449static void replication_start(ReplicationState *rs, ReplicationMode mode,
 450                              Error **errp)
 451{
 452    BlockDriverState *bs = rs->opaque;
 453    BDRVReplicationState *s;
 454    BlockDriverState *top_bs;
 455    BdrvChild *active_disk, *hidden_disk, *secondary_disk;
 456    int64_t active_length, hidden_length, disk_length;
 457    AioContext *aio_context;
 458    Error *local_err = NULL;
 459    BackupPerf perf = { .use_copy_range = true, .max_workers = 1 };
 460
 461    aio_context = bdrv_get_aio_context(bs);
 462    aio_context_acquire(aio_context);
 463    s = bs->opaque;
 464
 465    if (s->stage == BLOCK_REPLICATION_DONE ||
 466        s->stage == BLOCK_REPLICATION_FAILOVER) {
 467        /*
 468         * This case happens when a secondary is promoted to primary.
 469         * Ignore the request because the secondary side of replication
 470         * doesn't have to do anything anymore.
 471         */
 472        aio_context_release(aio_context);
 473        return;
 474    }
 475
 476    if (s->stage != BLOCK_REPLICATION_NONE) {
 477        error_setg(errp, "Block replication is running or done");
 478        aio_context_release(aio_context);
 479        return;
 480    }
 481
 482    if (s->mode != mode) {
 483        error_setg(errp, "The parameter mode's value is invalid, needs %d,"
 484                   " but got %d", s->mode, mode);
 485        aio_context_release(aio_context);
 486        return;
 487    }
 488
 489    switch (s->mode) {
 490    case REPLICATION_MODE_PRIMARY:
 491        break;
 492    case REPLICATION_MODE_SECONDARY:
 493        active_disk = bs->file;
 494        if (!active_disk || !active_disk->bs || !active_disk->bs->backing) {
 495            error_setg(errp, "Active disk doesn't have backing file");
 496            aio_context_release(aio_context);
 497            return;
 498        }
 499
 500        hidden_disk = active_disk->bs->backing;
 501        if (!hidden_disk->bs || !hidden_disk->bs->backing) {
 502            error_setg(errp, "Hidden disk doesn't have backing file");
 503            aio_context_release(aio_context);
 504            return;
 505        }
 506
 507        secondary_disk = hidden_disk->bs->backing;
 508        if (!secondary_disk->bs || !bdrv_has_blk(secondary_disk->bs)) {
 509            error_setg(errp, "The secondary disk doesn't have block backend");
 510            aio_context_release(aio_context);
 511            return;
 512        }
 513
 514        /* verify the length */
 515        active_length = bdrv_getlength(active_disk->bs);
 516        hidden_length = bdrv_getlength(hidden_disk->bs);
 517        disk_length = bdrv_getlength(secondary_disk->bs);
 518        if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
 519            active_length != hidden_length || hidden_length != disk_length) {
 520            error_setg(errp, "Active disk, hidden disk, secondary disk's length"
 521                       " are not the same");
 522            aio_context_release(aio_context);
 523            return;
 524        }
 525
 526        /* Must be true, or the bdrv_getlength() calls would have failed */
 527        assert(active_disk->bs->drv && hidden_disk->bs->drv);
 528
 529        if (!active_disk->bs->drv->bdrv_make_empty ||
 530            !hidden_disk->bs->drv->bdrv_make_empty) {
 531            error_setg(errp,
 532                       "Active disk or hidden disk doesn't support make_empty");
 533            aio_context_release(aio_context);
 534            return;
 535        }
 536
 537        /* reopen the backing file in r/w mode */
 538        reopen_backing_file(bs, true, &local_err);
 539        if (local_err) {
 540            error_propagate(errp, local_err);
 541            aio_context_release(aio_context);
 542            return;
 543        }
 544
 545        bdrv_ref(hidden_disk->bs);
 546        s->hidden_disk = bdrv_attach_child(bs, hidden_disk->bs, "hidden disk",
 547                                           &child_of_bds, BDRV_CHILD_DATA,
 548                                           &local_err);
 549        if (local_err) {
 550            error_propagate(errp, local_err);
 551            aio_context_release(aio_context);
 552            return;
 553        }
 554
 555        bdrv_ref(secondary_disk->bs);
 556        s->secondary_disk = bdrv_attach_child(bs, secondary_disk->bs,
 557                                              "secondary disk", &child_of_bds,
 558                                              BDRV_CHILD_DATA, &local_err);
 559        if (local_err) {
 560            error_propagate(errp, local_err);
 561            aio_context_release(aio_context);
 562            return;
 563        }
 564
 565        /* start backup job now */
 566        error_setg(&s->blocker,
 567                   "Block device is in use by internal backup job");
 568
 569        top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
 570        if (!top_bs || !bdrv_is_root_node(top_bs) ||
 571            !check_top_bs(top_bs, bs)) {
 572            error_setg(errp, "No top_bs or it is invalid");
 573            reopen_backing_file(bs, false, NULL);
 574            aio_context_release(aio_context);
 575            return;
 576        }
 577        bdrv_op_block_all(top_bs, s->blocker);
 578        bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
 579
 580        s->backup_job = backup_job_create(
 581                                NULL, s->secondary_disk->bs, s->hidden_disk->bs,
 582                                0, MIRROR_SYNC_MODE_NONE, NULL, 0, false, NULL,
 583                                &perf,
 584                                BLOCKDEV_ON_ERROR_REPORT,
 585                                BLOCKDEV_ON_ERROR_REPORT, JOB_INTERNAL,
 586                                backup_job_completed, bs, NULL, &local_err);
 587        if (local_err) {
 588            error_propagate(errp, local_err);
 589            backup_job_cleanup(bs);
 590            aio_context_release(aio_context);
 591            return;
 592        }
 593        job_start(&s->backup_job->job);
 594        break;
 595    default:
 596        aio_context_release(aio_context);
 597        abort();
 598    }
 599
 600    s->stage = BLOCK_REPLICATION_RUNNING;
 601
 602    if (s->mode == REPLICATION_MODE_SECONDARY) {
 603        secondary_do_checkpoint(bs, errp);
 604    }
 605
 606    s->error = 0;
 607    aio_context_release(aio_context);
 608}
 609
 610static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
 611{
 612    BlockDriverState *bs = rs->opaque;
 613    BDRVReplicationState *s;
 614    AioContext *aio_context;
 615
 616    aio_context = bdrv_get_aio_context(bs);
 617    aio_context_acquire(aio_context);
 618    s = bs->opaque;
 619
 620    if (s->stage == BLOCK_REPLICATION_DONE ||
 621        s->stage == BLOCK_REPLICATION_FAILOVER) {
 622        /*
 623         * This case happens when a secondary was promoted to primary.
 624         * Ignore the request because the secondary side of replication
 625         * doesn't have to do anything anymore.
 626         */
 627        aio_context_release(aio_context);
 628        return;
 629    }
 630
 631    if (s->mode == REPLICATION_MODE_SECONDARY) {
 632        secondary_do_checkpoint(bs, errp);
 633    }
 634    aio_context_release(aio_context);
 635}
 636
 637static void replication_get_error(ReplicationState *rs, Error **errp)
 638{
 639    BlockDriverState *bs = rs->opaque;
 640    BDRVReplicationState *s;
 641    AioContext *aio_context;
 642
 643    aio_context = bdrv_get_aio_context(bs);
 644    aio_context_acquire(aio_context);
 645    s = bs->opaque;
 646
 647    if (s->stage == BLOCK_REPLICATION_NONE) {
 648        error_setg(errp, "Block replication is not running");
 649        aio_context_release(aio_context);
 650        return;
 651    }
 652
 653    if (s->error) {
 654        error_setg(errp, "I/O error occurred");
 655        aio_context_release(aio_context);
 656        return;
 657    }
 658    aio_context_release(aio_context);
 659}
 660
 661static void replication_done(void *opaque, int ret)
 662{
 663    BlockDriverState *bs = opaque;
 664    BDRVReplicationState *s = bs->opaque;
 665
 666    if (ret == 0) {
 667        s->stage = BLOCK_REPLICATION_DONE;
 668
 669        bdrv_unref_child(bs, s->secondary_disk);
 670        s->secondary_disk = NULL;
 671        bdrv_unref_child(bs, s->hidden_disk);
 672        s->hidden_disk = NULL;
 673        s->error = 0;
 674    } else {
 675        s->stage = BLOCK_REPLICATION_FAILOVER_FAILED;
 676        s->error = -EIO;
 677    }
 678}
 679
 680static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
 681{
 682    BlockDriverState *bs = rs->opaque;
 683    BDRVReplicationState *s;
 684    AioContext *aio_context;
 685
 686    aio_context = bdrv_get_aio_context(bs);
 687    aio_context_acquire(aio_context);
 688    s = bs->opaque;
 689
 690    if (s->stage == BLOCK_REPLICATION_DONE ||
 691        s->stage == BLOCK_REPLICATION_FAILOVER) {
 692        /*
 693         * This case happens when a secondary was promoted to primary.
 694         * Ignore the request because the secondary side of replication
 695         * doesn't have to do anything anymore.
 696         */
 697        aio_context_release(aio_context);
 698        return;
 699    }
 700
 701    if (s->stage != BLOCK_REPLICATION_RUNNING) {
 702        error_setg(errp, "Block replication is not running");
 703        aio_context_release(aio_context);
 704        return;
 705    }
 706
 707    switch (s->mode) {
 708    case REPLICATION_MODE_PRIMARY:
 709        s->stage = BLOCK_REPLICATION_DONE;
 710        s->error = 0;
 711        break;
 712    case REPLICATION_MODE_SECONDARY:
 713        /*
 714         * This BDS will be closed, and the job should be completed
 715         * before the BDS is closed, because we will access hidden
 716         * disk, secondary disk in backup_job_completed().
 717         */
 718        if (s->backup_job) {
 719            aio_context_release(aio_context);
 720            job_cancel_sync(&s->backup_job->job, true);
 721            aio_context_acquire(aio_context);
 722        }
 723
 724        if (!failover) {
 725            secondary_do_checkpoint(bs, errp);
 726            s->stage = BLOCK_REPLICATION_DONE;
 727            aio_context_release(aio_context);
 728            return;
 729        }
 730
 731        s->stage = BLOCK_REPLICATION_FAILOVER;
 732        s->commit_job = commit_active_start(
 733                            NULL, bs->file->bs, s->secondary_disk->bs,
 734                            JOB_INTERNAL, 0, BLOCKDEV_ON_ERROR_REPORT,
 735                            NULL, replication_done, bs, true, errp);
 736        break;
 737    default:
 738        aio_context_release(aio_context);
 739        abort();
 740    }
 741    aio_context_release(aio_context);
 742}
 743
 744static const char *const replication_strong_runtime_opts[] = {
 745    REPLICATION_MODE,
 746    REPLICATION_TOP_ID,
 747
 748    NULL
 749};
 750
 751static BlockDriver bdrv_replication = {
 752    .format_name                = "replication",
 753    .instance_size              = sizeof(BDRVReplicationState),
 754
 755    .bdrv_open                  = replication_open,
 756    .bdrv_close                 = replication_close,
 757    .bdrv_child_perm            = replication_child_perm,
 758
 759    .bdrv_co_getlength          = replication_co_getlength,
 760    .bdrv_co_readv              = replication_co_readv,
 761    .bdrv_co_writev             = replication_co_writev,
 762
 763    .is_filter                  = true,
 764
 765    .strong_runtime_opts        = replication_strong_runtime_opts,
 766};
 767
 768static void bdrv_replication_init(void)
 769{
 770    bdrv_register(&bdrv_replication);
 771}
 772
 773block_init(bdrv_replication_init);
 774