qemu/block/replication.c
<<
>>
Prefs
   1/*
   2 * Replication Block filter
   3 *
   4 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   5 * Copyright (c) 2016 Intel Corporation
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 *
   8 * Author:
   9 *   Wen Congyang <wency@cn.fujitsu.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  12 * See the COPYING file in the top-level directory.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qemu-common.h"
  17#include "block/nbd.h"
  18#include "block/blockjob.h"
  19#include "block/block_int.h"
  20#include "block/block_backup.h"
  21#include "sysemu/block-backend.h"
  22#include "qapi/error.h"
  23#include "replication.h"
  24
  25typedef struct BDRVReplicationState {
  26    ReplicationMode mode;
  27    int replication_state;
  28    BdrvChild *active_disk;
  29    BdrvChild *hidden_disk;
  30    BdrvChild *secondary_disk;
  31    char *top_id;
  32    ReplicationState *rs;
  33    Error *blocker;
  34    int orig_hidden_flags;
  35    int orig_secondary_flags;
  36    int error;
  37} BDRVReplicationState;
  38
  39enum {
  40    BLOCK_REPLICATION_NONE,             /* block replication is not started */
  41    BLOCK_REPLICATION_RUNNING,          /* block replication is running */
  42    BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
  43    BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
  44    BLOCK_REPLICATION_DONE,             /* block replication is done */
  45};
  46
  47static void replication_start(ReplicationState *rs, ReplicationMode mode,
  48                              Error **errp);
  49static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
  50static void replication_get_error(ReplicationState *rs, Error **errp);
  51static void replication_stop(ReplicationState *rs, bool failover,
  52                             Error **errp);
  53
  54#define REPLICATION_MODE        "mode"
  55#define REPLICATION_TOP_ID      "top-id"
  56static QemuOptsList replication_runtime_opts = {
  57    .name = "replication",
  58    .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
  59    .desc = {
  60        {
  61            .name = REPLICATION_MODE,
  62            .type = QEMU_OPT_STRING,
  63        },
  64        {
  65            .name = REPLICATION_TOP_ID,
  66            .type = QEMU_OPT_STRING,
  67        },
  68        { /* end of list */ }
  69    },
  70};
  71
  72static ReplicationOps replication_ops = {
  73    .start = replication_start,
  74    .checkpoint = replication_do_checkpoint,
  75    .get_error = replication_get_error,
  76    .stop = replication_stop,
  77};
  78
  79static int replication_open(BlockDriverState *bs, QDict *options,
  80                            int flags, Error **errp)
  81{
  82    int ret;
  83    BDRVReplicationState *s = bs->opaque;
  84    Error *local_err = NULL;
  85    QemuOpts *opts = NULL;
  86    const char *mode;
  87    const char *top_id;
  88
  89    ret = -EINVAL;
  90    opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
  91    qemu_opts_absorb_qdict(opts, options, &local_err);
  92    if (local_err) {
  93        goto fail;
  94    }
  95
  96    mode = qemu_opt_get(opts, REPLICATION_MODE);
  97    if (!mode) {
  98        error_setg(&local_err, "Missing the option mode");
  99        goto fail;
 100    }
 101
 102    if (!strcmp(mode, "primary")) {
 103        s->mode = REPLICATION_MODE_PRIMARY;
 104        top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
 105        if (top_id) {
 106            error_setg(&local_err, "The primary side does not support option top-id");
 107            goto fail;
 108        }
 109    } else if (!strcmp(mode, "secondary")) {
 110        s->mode = REPLICATION_MODE_SECONDARY;
 111        top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
 112        s->top_id = g_strdup(top_id);
 113        if (!s->top_id) {
 114            error_setg(&local_err, "Missing the option top-id");
 115            goto fail;
 116        }
 117    } else {
 118        error_setg(&local_err,
 119                   "The option mode's value should be primary or secondary");
 120        goto fail;
 121    }
 122
 123    s->rs = replication_new(bs, &replication_ops);
 124
 125    ret = 0;
 126
 127fail:
 128    qemu_opts_del(opts);
 129    error_propagate(errp, local_err);
 130
 131    return ret;
 132}
 133
 134static void replication_close(BlockDriverState *bs)
 135{
 136    BDRVReplicationState *s = bs->opaque;
 137
 138    if (s->replication_state == BLOCK_REPLICATION_RUNNING) {
 139        replication_stop(s->rs, false, NULL);
 140    }
 141    if (s->replication_state == BLOCK_REPLICATION_FAILOVER) {
 142        block_job_cancel_sync(s->active_disk->bs->job);
 143    }
 144
 145    if (s->mode == REPLICATION_MODE_SECONDARY) {
 146        g_free(s->top_id);
 147    }
 148
 149    replication_remove(s->rs);
 150}
 151
 152static int64_t replication_getlength(BlockDriverState *bs)
 153{
 154    return bdrv_getlength(bs->file->bs);
 155}
 156
 157static int replication_get_io_status(BDRVReplicationState *s)
 158{
 159    switch (s->replication_state) {
 160    case BLOCK_REPLICATION_NONE:
 161        return -EIO;
 162    case BLOCK_REPLICATION_RUNNING:
 163        return 0;
 164    case BLOCK_REPLICATION_FAILOVER:
 165        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
 166    case BLOCK_REPLICATION_FAILOVER_FAILED:
 167        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
 168    case BLOCK_REPLICATION_DONE:
 169        /*
 170         * active commit job completes, and active disk and secondary_disk
 171         * is swapped, so we can operate bs->file directly
 172         */
 173        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
 174    default:
 175        abort();
 176    }
 177}
 178
 179static int replication_return_value(BDRVReplicationState *s, int ret)
 180{
 181    if (s->mode == REPLICATION_MODE_SECONDARY) {
 182        return ret;
 183    }
 184
 185    if (ret < 0) {
 186        s->error = ret;
 187        ret = 0;
 188    }
 189
 190    return ret;
 191}
 192
 193static coroutine_fn int replication_co_readv(BlockDriverState *bs,
 194                                             int64_t sector_num,
 195                                             int remaining_sectors,
 196                                             QEMUIOVector *qiov)
 197{
 198    BDRVReplicationState *s = bs->opaque;
 199    BdrvChild *child = s->secondary_disk;
 200    BlockJob *job = NULL;
 201    CowRequest req;
 202    int ret;
 203
 204    if (s->mode == REPLICATION_MODE_PRIMARY) {
 205        /* We only use it to forward primary write requests */
 206        return -EIO;
 207    }
 208
 209    ret = replication_get_io_status(s);
 210    if (ret < 0) {
 211        return ret;
 212    }
 213
 214    if (child && child->bs) {
 215        job = child->bs->job;
 216    }
 217
 218    if (job) {
 219        backup_wait_for_overlapping_requests(child->bs->job, sector_num,
 220                                             remaining_sectors);
 221        backup_cow_request_begin(&req, child->bs->job, sector_num,
 222                                 remaining_sectors);
 223        ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors,
 224                            qiov);
 225        backup_cow_request_end(&req);
 226        goto out;
 227    }
 228
 229    ret = bdrv_co_readv(bs->file, sector_num, remaining_sectors, qiov);
 230out:
 231    return replication_return_value(s, ret);
 232}
 233
 234static coroutine_fn int replication_co_writev(BlockDriverState *bs,
 235                                              int64_t sector_num,
 236                                              int remaining_sectors,
 237                                              QEMUIOVector *qiov)
 238{
 239    BDRVReplicationState *s = bs->opaque;
 240    QEMUIOVector hd_qiov;
 241    uint64_t bytes_done = 0;
 242    BdrvChild *top = bs->file;
 243    BdrvChild *base = s->secondary_disk;
 244    BdrvChild *target;
 245    int ret, n;
 246
 247    ret = replication_get_io_status(s);
 248    if (ret < 0) {
 249        goto out;
 250    }
 251
 252    if (ret == 0) {
 253        ret = bdrv_co_writev(top, sector_num,
 254                             remaining_sectors, qiov);
 255        return replication_return_value(s, ret);
 256    }
 257
 258    /*
 259     * Failover failed, only write to active disk if the sectors
 260     * have already been allocated in active disk/hidden disk.
 261     */
 262    qemu_iovec_init(&hd_qiov, qiov->niov);
 263    while (remaining_sectors > 0) {
 264        ret = bdrv_is_allocated_above(top->bs, base->bs, sector_num,
 265                                      remaining_sectors, &n);
 266        if (ret < 0) {
 267            goto out1;
 268        }
 269
 270        qemu_iovec_reset(&hd_qiov);
 271        qemu_iovec_concat(&hd_qiov, qiov, bytes_done, n * BDRV_SECTOR_SIZE);
 272
 273        target = ret ? top : base;
 274        ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
 275        if (ret < 0) {
 276            goto out1;
 277        }
 278
 279        remaining_sectors -= n;
 280        sector_num += n;
 281        bytes_done += n * BDRV_SECTOR_SIZE;
 282    }
 283
 284out1:
 285    qemu_iovec_destroy(&hd_qiov);
 286out:
 287    return ret;
 288}
 289
 290static bool replication_recurse_is_first_non_filter(BlockDriverState *bs,
 291                                                    BlockDriverState *candidate)
 292{
 293    return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
 294}
 295
 296static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
 297{
 298    Error *local_err = NULL;
 299    int ret;
 300
 301    if (!s->secondary_disk->bs->job) {
 302        error_setg(errp, "Backup job was cancelled unexpectedly");
 303        return;
 304    }
 305
 306    backup_do_checkpoint(s->secondary_disk->bs->job, &local_err);
 307    if (local_err) {
 308        error_propagate(errp, local_err);
 309        return;
 310    }
 311
 312    ret = s->active_disk->bs->drv->bdrv_make_empty(s->active_disk->bs);
 313    if (ret < 0) {
 314        error_setg(errp, "Cannot make active disk empty");
 315        return;
 316    }
 317
 318    ret = s->hidden_disk->bs->drv->bdrv_make_empty(s->hidden_disk->bs);
 319    if (ret < 0) {
 320        error_setg(errp, "Cannot make hidden disk empty");
 321        return;
 322    }
 323}
 324
 325static void reopen_backing_file(BlockDriverState *bs, bool writable,
 326                                Error **errp)
 327{
 328    BDRVReplicationState *s = bs->opaque;
 329    BlockReopenQueue *reopen_queue = NULL;
 330    int orig_hidden_flags, orig_secondary_flags;
 331    int new_hidden_flags, new_secondary_flags;
 332    Error *local_err = NULL;
 333
 334    if (writable) {
 335        orig_hidden_flags = s->orig_hidden_flags =
 336                                bdrv_get_flags(s->hidden_disk->bs);
 337        new_hidden_flags = (orig_hidden_flags | BDRV_O_RDWR) &
 338                                                    ~BDRV_O_INACTIVE;
 339        orig_secondary_flags = s->orig_secondary_flags =
 340                                bdrv_get_flags(s->secondary_disk->bs);
 341        new_secondary_flags = (orig_secondary_flags | BDRV_O_RDWR) &
 342                                                     ~BDRV_O_INACTIVE;
 343    } else {
 344        orig_hidden_flags = (s->orig_hidden_flags | BDRV_O_RDWR) &
 345                                                    ~BDRV_O_INACTIVE;
 346        new_hidden_flags = s->orig_hidden_flags;
 347        orig_secondary_flags = (s->orig_secondary_flags | BDRV_O_RDWR) &
 348                                                    ~BDRV_O_INACTIVE;
 349        new_secondary_flags = s->orig_secondary_flags;
 350    }
 351
 352    if (orig_hidden_flags != new_hidden_flags) {
 353        reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk->bs, NULL,
 354                                         new_hidden_flags);
 355    }
 356
 357    if (!(orig_secondary_flags & BDRV_O_RDWR)) {
 358        reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk->bs,
 359                                         NULL, new_secondary_flags);
 360    }
 361
 362    if (reopen_queue) {
 363        bdrv_reopen_multiple(bdrv_get_aio_context(bs),
 364                             reopen_queue, &local_err);
 365        error_propagate(errp, local_err);
 366    }
 367}
 368
 369static void backup_job_cleanup(BlockDriverState *bs)
 370{
 371    BDRVReplicationState *s = bs->opaque;
 372    BlockDriverState *top_bs;
 373
 374    top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
 375    if (!top_bs) {
 376        return;
 377    }
 378    bdrv_op_unblock_all(top_bs, s->blocker);
 379    error_free(s->blocker);
 380    reopen_backing_file(bs, false, NULL);
 381}
 382
 383static void backup_job_completed(void *opaque, int ret)
 384{
 385    BlockDriverState *bs = opaque;
 386    BDRVReplicationState *s = bs->opaque;
 387
 388    if (s->replication_state != BLOCK_REPLICATION_FAILOVER) {
 389        /* The backup job is cancelled unexpectedly */
 390        s->error = -EIO;
 391    }
 392
 393    backup_job_cleanup(bs);
 394}
 395
 396static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
 397{
 398    BdrvChild *child;
 399
 400    /* The bs itself is the top_bs */
 401    if (top_bs == bs) {
 402        return true;
 403    }
 404
 405    /* Iterate over top_bs's children */
 406    QLIST_FOREACH(child, &top_bs->children, next) {
 407        if (child->bs == bs || check_top_bs(child->bs, bs)) {
 408            return true;
 409        }
 410    }
 411
 412    return false;
 413}
 414
 415static void replication_start(ReplicationState *rs, ReplicationMode mode,
 416                              Error **errp)
 417{
 418    BlockDriverState *bs = rs->opaque;
 419    BDRVReplicationState *s;
 420    BlockDriverState *top_bs;
 421    int64_t active_length, hidden_length, disk_length;
 422    AioContext *aio_context;
 423    Error *local_err = NULL;
 424    BlockJob *job;
 425
 426    aio_context = bdrv_get_aio_context(bs);
 427    aio_context_acquire(aio_context);
 428    s = bs->opaque;
 429
 430    if (s->replication_state != BLOCK_REPLICATION_NONE) {
 431        error_setg(errp, "Block replication is running or done");
 432        aio_context_release(aio_context);
 433        return;
 434    }
 435
 436    if (s->mode != mode) {
 437        error_setg(errp, "The parameter mode's value is invalid, needs %d,"
 438                   " but got %d", s->mode, mode);
 439        aio_context_release(aio_context);
 440        return;
 441    }
 442
 443    switch (s->mode) {
 444    case REPLICATION_MODE_PRIMARY:
 445        break;
 446    case REPLICATION_MODE_SECONDARY:
 447        s->active_disk = bs->file;
 448        if (!s->active_disk || !s->active_disk->bs ||
 449                                    !s->active_disk->bs->backing) {
 450            error_setg(errp, "Active disk doesn't have backing file");
 451            aio_context_release(aio_context);
 452            return;
 453        }
 454
 455        s->hidden_disk = s->active_disk->bs->backing;
 456        if (!s->hidden_disk->bs || !s->hidden_disk->bs->backing) {
 457            error_setg(errp, "Hidden disk doesn't have backing file");
 458            aio_context_release(aio_context);
 459            return;
 460        }
 461
 462        s->secondary_disk = s->hidden_disk->bs->backing;
 463        if (!s->secondary_disk->bs || !bdrv_has_blk(s->secondary_disk->bs)) {
 464            error_setg(errp, "The secondary disk doesn't have block backend");
 465            aio_context_release(aio_context);
 466            return;
 467        }
 468
 469        /* verify the length */
 470        active_length = bdrv_getlength(s->active_disk->bs);
 471        hidden_length = bdrv_getlength(s->hidden_disk->bs);
 472        disk_length = bdrv_getlength(s->secondary_disk->bs);
 473        if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
 474            active_length != hidden_length || hidden_length != disk_length) {
 475            error_setg(errp, "Active disk, hidden disk, secondary disk's length"
 476                       " are not the same");
 477            aio_context_release(aio_context);
 478            return;
 479        }
 480
 481        if (!s->active_disk->bs->drv->bdrv_make_empty ||
 482            !s->hidden_disk->bs->drv->bdrv_make_empty) {
 483            error_setg(errp,
 484                       "Active disk or hidden disk doesn't support make_empty");
 485            aio_context_release(aio_context);
 486            return;
 487        }
 488
 489        /* reopen the backing file in r/w mode */
 490        reopen_backing_file(bs, true, &local_err);
 491        if (local_err) {
 492            error_propagate(errp, local_err);
 493            aio_context_release(aio_context);
 494            return;
 495        }
 496
 497        /* start backup job now */
 498        error_setg(&s->blocker,
 499                   "Block device is in use by internal backup job");
 500
 501        top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
 502        if (!top_bs || !bdrv_is_root_node(top_bs) ||
 503            !check_top_bs(top_bs, bs)) {
 504            error_setg(errp, "No top_bs or it is invalid");
 505            reopen_backing_file(bs, false, NULL);
 506            aio_context_release(aio_context);
 507            return;
 508        }
 509        bdrv_op_block_all(top_bs, s->blocker);
 510        bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
 511
 512        job = backup_job_create(NULL, s->secondary_disk->bs, s->hidden_disk->bs,
 513                                0, MIRROR_SYNC_MODE_NONE, NULL, false,
 514                                BLOCKDEV_ON_ERROR_REPORT,
 515                                BLOCKDEV_ON_ERROR_REPORT, BLOCK_JOB_INTERNAL,
 516                                backup_job_completed, bs, NULL, &local_err);
 517        if (local_err) {
 518            error_propagate(errp, local_err);
 519            backup_job_cleanup(bs);
 520            aio_context_release(aio_context);
 521            return;
 522        }
 523        block_job_start(job);
 524        break;
 525    default:
 526        aio_context_release(aio_context);
 527        abort();
 528    }
 529
 530    s->replication_state = BLOCK_REPLICATION_RUNNING;
 531
 532    if (s->mode == REPLICATION_MODE_SECONDARY) {
 533        secondary_do_checkpoint(s, errp);
 534    }
 535
 536    s->error = 0;
 537    aio_context_release(aio_context);
 538}
 539
 540static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
 541{
 542    BlockDriverState *bs = rs->opaque;
 543    BDRVReplicationState *s;
 544    AioContext *aio_context;
 545
 546    aio_context = bdrv_get_aio_context(bs);
 547    aio_context_acquire(aio_context);
 548    s = bs->opaque;
 549
 550    if (s->mode == REPLICATION_MODE_SECONDARY) {
 551        secondary_do_checkpoint(s, errp);
 552    }
 553    aio_context_release(aio_context);
 554}
 555
 556static void replication_get_error(ReplicationState *rs, Error **errp)
 557{
 558    BlockDriverState *bs = rs->opaque;
 559    BDRVReplicationState *s;
 560    AioContext *aio_context;
 561
 562    aio_context = bdrv_get_aio_context(bs);
 563    aio_context_acquire(aio_context);
 564    s = bs->opaque;
 565
 566    if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
 567        error_setg(errp, "Block replication is not running");
 568        aio_context_release(aio_context);
 569        return;
 570    }
 571
 572    if (s->error) {
 573        error_setg(errp, "I/O error occurred");
 574        aio_context_release(aio_context);
 575        return;
 576    }
 577    aio_context_release(aio_context);
 578}
 579
 580static void replication_done(void *opaque, int ret)
 581{
 582    BlockDriverState *bs = opaque;
 583    BDRVReplicationState *s = bs->opaque;
 584
 585    if (ret == 0) {
 586        s->replication_state = BLOCK_REPLICATION_DONE;
 587
 588        /* refresh top bs's filename */
 589        bdrv_refresh_filename(bs);
 590        s->active_disk = NULL;
 591        s->secondary_disk = NULL;
 592        s->hidden_disk = NULL;
 593        s->error = 0;
 594    } else {
 595        s->replication_state = BLOCK_REPLICATION_FAILOVER_FAILED;
 596        s->error = -EIO;
 597    }
 598}
 599
 600static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
 601{
 602    BlockDriverState *bs = rs->opaque;
 603    BDRVReplicationState *s;
 604    AioContext *aio_context;
 605
 606    aio_context = bdrv_get_aio_context(bs);
 607    aio_context_acquire(aio_context);
 608    s = bs->opaque;
 609
 610    if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
 611        error_setg(errp, "Block replication is not running");
 612        aio_context_release(aio_context);
 613        return;
 614    }
 615
 616    switch (s->mode) {
 617    case REPLICATION_MODE_PRIMARY:
 618        s->replication_state = BLOCK_REPLICATION_DONE;
 619        s->error = 0;
 620        break;
 621    case REPLICATION_MODE_SECONDARY:
 622        /*
 623         * This BDS will be closed, and the job should be completed
 624         * before the BDS is closed, because we will access hidden
 625         * disk, secondary disk in backup_job_completed().
 626         */
 627        if (s->secondary_disk->bs->job) {
 628            block_job_cancel_sync(s->secondary_disk->bs->job);
 629        }
 630
 631        if (!failover) {
 632            secondary_do_checkpoint(s, errp);
 633            s->replication_state = BLOCK_REPLICATION_DONE;
 634            aio_context_release(aio_context);
 635            return;
 636        }
 637
 638        s->replication_state = BLOCK_REPLICATION_FAILOVER;
 639        commit_active_start(NULL, s->active_disk->bs, s->secondary_disk->bs,
 640                            BLOCK_JOB_INTERNAL, 0, BLOCKDEV_ON_ERROR_REPORT,
 641                            replication_done, bs, errp, true);
 642        break;
 643    default:
 644        aio_context_release(aio_context);
 645        abort();
 646    }
 647    aio_context_release(aio_context);
 648}
 649
 650BlockDriver bdrv_replication = {
 651    .format_name                = "replication",
 652    .protocol_name              = "replication",
 653    .instance_size              = sizeof(BDRVReplicationState),
 654
 655    .bdrv_open                  = replication_open,
 656    .bdrv_close                 = replication_close,
 657
 658    .bdrv_getlength             = replication_getlength,
 659    .bdrv_co_readv              = replication_co_readv,
 660    .bdrv_co_writev             = replication_co_writev,
 661
 662    .is_filter                  = true,
 663    .bdrv_recurse_is_first_non_filter = replication_recurse_is_first_non_filter,
 664
 665    .has_variable_length        = true,
 666};
 667
 668static void bdrv_replication_init(void)
 669{
 670    bdrv_register(&bdrv_replication);
 671}
 672
 673block_init(bdrv_replication_init);
 674