qemu/block/quorum.c
<<
>>
Prefs
   1/*
   2 * Quorum Block filter
   3 *
   4 * Copyright (C) 2012-2014 Nodalink, EURL.
   5 *
   6 * Author:
   7 *   BenoƮt Canet <benoit.canet@irqsave.net>
   8 *
   9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
  10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
  11 *
  12 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  13 * See the COPYING file in the top-level directory.
  14 */
  15
  16#include <gnutls/gnutls.h>
  17#include <gnutls/crypto.h>
  18#include "block/block_int.h"
  19#include "qapi/qmp/qjson.h"
  20#include "qapi-event.h"
  21
  22#define HASH_LENGTH 32
  23
  24#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
  25#define QUORUM_OPT_BLKVERIFY      "blkverify"
  26#define QUORUM_OPT_REWRITE        "rewrite-corrupted"
  27
  28/* This union holds a vote hash value */
  29typedef union QuorumVoteValue {
  30    char h[HASH_LENGTH];       /* SHA-256 hash */
  31    int64_t l;                 /* simpler 64 bits hash */
  32} QuorumVoteValue;
  33
  34/* A vote item */
  35typedef struct QuorumVoteItem {
  36    int index;
  37    QLIST_ENTRY(QuorumVoteItem) next;
  38} QuorumVoteItem;
  39
  40/* this structure is a vote version. A version is the set of votes sharing the
  41 * same vote value.
  42 * The set of votes will be tracked with the items field and its cardinality is
  43 * vote_count.
  44 */
  45typedef struct QuorumVoteVersion {
  46    QuorumVoteValue value;
  47    int index;
  48    int vote_count;
  49    QLIST_HEAD(, QuorumVoteItem) items;
  50    QLIST_ENTRY(QuorumVoteVersion) next;
  51} QuorumVoteVersion;
  52
  53/* this structure holds a group of vote versions together */
  54typedef struct QuorumVotes {
  55    QLIST_HEAD(, QuorumVoteVersion) vote_list;
  56    bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
  57} QuorumVotes;
  58
  59/* the following structure holds the state of one quorum instance */
  60typedef struct BDRVQuorumState {
  61    BlockDriverState **bs; /* children BlockDriverStates */
  62    int num_children;      /* children count */
  63    int threshold;         /* if less than threshold children reads gave the
  64                            * same result a quorum error occurs.
  65                            */
  66    bool is_blkverify;     /* true if the driver is in blkverify mode
  67                            * Writes are mirrored on two children devices.
  68                            * On reads the two children devices' contents are
  69                            * compared and if a difference is spotted its
  70                            * location is printed and the code aborts.
  71                            * It is useful to debug other block drivers by
  72                            * comparing them with a reference one.
  73                            */
  74    bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted
  75                            * block if Quorum is reached.
  76                            */
  77} BDRVQuorumState;
  78
  79typedef struct QuorumAIOCB QuorumAIOCB;
  80
  81/* Quorum will create one instance of the following structure per operation it
  82 * performs on its children.
  83 * So for each read/write operation coming from the upper layer there will be
  84 * $children_count QuorumChildRequest.
  85 */
  86typedef struct QuorumChildRequest {
  87    BlockDriverAIOCB *aiocb;
  88    QEMUIOVector qiov;
  89    uint8_t *buf;
  90    int ret;
  91    QuorumAIOCB *parent;
  92} QuorumChildRequest;
  93
  94/* Quorum will use the following structure to track progress of each read/write
  95 * operation received by the upper layer.
  96 * This structure hold pointers to the QuorumChildRequest structures instances
  97 * used to do operations on each children and track overall progress.
  98 */
  99struct QuorumAIOCB {
 100    BlockDriverAIOCB common;
 101
 102    /* Request metadata */
 103    uint64_t sector_num;
 104    int nb_sectors;
 105
 106    QEMUIOVector *qiov;         /* calling IOV */
 107
 108    QuorumChildRequest *qcrs;   /* individual child requests */
 109    int count;                  /* number of completed AIOCB */
 110    int success_count;          /* number of successfully completed AIOCB */
 111
 112    int rewrite_count;          /* number of replica to rewrite: count down to
 113                                 * zero once writes are fired
 114                                 */
 115
 116    QuorumVotes votes;
 117
 118    bool is_read;
 119    int vote_ret;
 120};
 121
 122static bool quorum_vote(QuorumAIOCB *acb);
 123
 124static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
 125{
 126    QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
 127    BDRVQuorumState *s = acb->common.bs->opaque;
 128    int i;
 129
 130    /* cancel all callbacks */
 131    for (i = 0; i < s->num_children; i++) {
 132        bdrv_aio_cancel(acb->qcrs[i].aiocb);
 133    }
 134
 135    g_free(acb->qcrs);
 136    qemu_aio_release(acb);
 137}
 138
 139static AIOCBInfo quorum_aiocb_info = {
 140    .aiocb_size         = sizeof(QuorumAIOCB),
 141    .cancel             = quorum_aio_cancel,
 142};
 143
 144static void quorum_aio_finalize(QuorumAIOCB *acb)
 145{
 146    BDRVQuorumState *s = acb->common.bs->opaque;
 147    int i, ret = 0;
 148
 149    if (acb->vote_ret) {
 150        ret = acb->vote_ret;
 151    }
 152
 153    acb->common.cb(acb->common.opaque, ret);
 154
 155    if (acb->is_read) {
 156        for (i = 0; i < s->num_children; i++) {
 157            qemu_vfree(acb->qcrs[i].buf);
 158            qemu_iovec_destroy(&acb->qcrs[i].qiov);
 159        }
 160    }
 161
 162    g_free(acb->qcrs);
 163    qemu_aio_release(acb);
 164}
 165
 166static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 167{
 168    return !memcmp(a->h, b->h, HASH_LENGTH);
 169}
 170
 171static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 172{
 173    return a->l == b->l;
 174}
 175
 176static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
 177                                   BlockDriverState *bs,
 178                                   QEMUIOVector *qiov,
 179                                   uint64_t sector_num,
 180                                   int nb_sectors,
 181                                   BlockDriverCompletionFunc *cb,
 182                                   void *opaque)
 183{
 184    QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
 185    int i;
 186
 187    acb->common.bs->opaque = s;
 188    acb->sector_num = sector_num;
 189    acb->nb_sectors = nb_sectors;
 190    acb->qiov = qiov;
 191    acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
 192    acb->count = 0;
 193    acb->success_count = 0;
 194    acb->rewrite_count = 0;
 195    acb->votes.compare = quorum_sha256_compare;
 196    QLIST_INIT(&acb->votes.vote_list);
 197    acb->is_read = false;
 198    acb->vote_ret = 0;
 199
 200    for (i = 0; i < s->num_children; i++) {
 201        acb->qcrs[i].buf = NULL;
 202        acb->qcrs[i].ret = 0;
 203        acb->qcrs[i].parent = acb;
 204    }
 205
 206    return acb;
 207}
 208
 209static void quorum_report_bad(QuorumAIOCB *acb, char *node_name, int ret)
 210{
 211    const char *msg = NULL;
 212    if (ret < 0) {
 213        msg = strerror(-ret);
 214    }
 215    qapi_event_send_quorum_report_bad(!!msg, msg, node_name,
 216                                      acb->sector_num, acb->nb_sectors, &error_abort);
 217}
 218
 219static void quorum_report_failure(QuorumAIOCB *acb)
 220{
 221    const char *reference = acb->common.bs->device_name[0] ?
 222                            acb->common.bs->device_name :
 223                            acb->common.bs->node_name;
 224
 225    qapi_event_send_quorum_failure(reference, acb->sector_num,
 226                                   acb->nb_sectors, &error_abort);
 227}
 228
 229static int quorum_vote_error(QuorumAIOCB *acb);
 230
 231static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
 232{
 233    BDRVQuorumState *s = acb->common.bs->opaque;
 234
 235    if (acb->success_count < s->threshold) {
 236        acb->vote_ret = quorum_vote_error(acb);
 237        quorum_report_failure(acb);
 238        return true;
 239    }
 240
 241    return false;
 242}
 243
 244static void quorum_rewrite_aio_cb(void *opaque, int ret)
 245{
 246    QuorumAIOCB *acb = opaque;
 247
 248    /* one less rewrite to do */
 249    acb->rewrite_count--;
 250
 251    /* wait until all rewrite callbacks have completed */
 252    if (acb->rewrite_count) {
 253        return;
 254    }
 255
 256    quorum_aio_finalize(acb);
 257}
 258
 259static void quorum_aio_cb(void *opaque, int ret)
 260{
 261    QuorumChildRequest *sacb = opaque;
 262    QuorumAIOCB *acb = sacb->parent;
 263    BDRVQuorumState *s = acb->common.bs->opaque;
 264    bool rewrite = false;
 265
 266    sacb->ret = ret;
 267    acb->count++;
 268    if (ret == 0) {
 269        acb->success_count++;
 270    } else {
 271        quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
 272    }
 273    assert(acb->count <= s->num_children);
 274    assert(acb->success_count <= s->num_children);
 275    if (acb->count < s->num_children) {
 276        return;
 277    }
 278
 279    /* Do the vote on read */
 280    if (acb->is_read) {
 281        rewrite = quorum_vote(acb);
 282    } else {
 283        quorum_has_too_much_io_failed(acb);
 284    }
 285
 286    /* if no rewrite is done the code will finish right away */
 287    if (!rewrite) {
 288        quorum_aio_finalize(acb);
 289    }
 290}
 291
 292static void quorum_report_bad_versions(BDRVQuorumState *s,
 293                                       QuorumAIOCB *acb,
 294                                       QuorumVoteValue *value)
 295{
 296    QuorumVoteVersion *version;
 297    QuorumVoteItem *item;
 298
 299    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 300        if (acb->votes.compare(&version->value, value)) {
 301            continue;
 302        }
 303        QLIST_FOREACH(item, &version->items, next) {
 304            quorum_report_bad(acb, s->bs[item->index]->node_name, 0);
 305        }
 306    }
 307}
 308
 309static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb,
 310                                        QuorumVoteValue *value)
 311{
 312    QuorumVoteVersion *version;
 313    QuorumVoteItem *item;
 314    int count = 0;
 315
 316    /* first count the number of bad versions: done first to avoid concurrency
 317     * issues.
 318     */
 319    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 320        if (acb->votes.compare(&version->value, value)) {
 321            continue;
 322        }
 323        QLIST_FOREACH(item, &version->items, next) {
 324            count++;
 325        }
 326    }
 327
 328    /* quorum_rewrite_aio_cb will count down this to zero */
 329    acb->rewrite_count = count;
 330
 331    /* now fire the correcting rewrites */
 332    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 333        if (acb->votes.compare(&version->value, value)) {
 334            continue;
 335        }
 336        QLIST_FOREACH(item, &version->items, next) {
 337            bdrv_aio_writev(s->bs[item->index], acb->sector_num, acb->qiov,
 338                            acb->nb_sectors, quorum_rewrite_aio_cb, acb);
 339        }
 340    }
 341
 342    /* return true if any rewrite is done else false */
 343    return count;
 344}
 345
 346static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
 347{
 348    int i;
 349    assert(dest->niov == source->niov);
 350    assert(dest->size == source->size);
 351    for (i = 0; i < source->niov; i++) {
 352        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
 353        memcpy(dest->iov[i].iov_base,
 354               source->iov[i].iov_base,
 355               source->iov[i].iov_len);
 356    }
 357}
 358
 359static void quorum_count_vote(QuorumVotes *votes,
 360                              QuorumVoteValue *value,
 361                              int index)
 362{
 363    QuorumVoteVersion *v = NULL, *version = NULL;
 364    QuorumVoteItem *item;
 365
 366    /* look if we have something with this hash */
 367    QLIST_FOREACH(v, &votes->vote_list, next) {
 368        if (votes->compare(&v->value, value)) {
 369            version = v;
 370            break;
 371        }
 372    }
 373
 374    /* It's a version not yet in the list add it */
 375    if (!version) {
 376        version = g_new0(QuorumVoteVersion, 1);
 377        QLIST_INIT(&version->items);
 378        memcpy(&version->value, value, sizeof(version->value));
 379        version->index = index;
 380        version->vote_count = 0;
 381        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
 382    }
 383
 384    version->vote_count++;
 385
 386    item = g_new0(QuorumVoteItem, 1);
 387    item->index = index;
 388    QLIST_INSERT_HEAD(&version->items, item, next);
 389}
 390
 391static void quorum_free_vote_list(QuorumVotes *votes)
 392{
 393    QuorumVoteVersion *version, *next_version;
 394    QuorumVoteItem *item, *next_item;
 395
 396    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
 397        QLIST_REMOVE(version, next);
 398        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
 399            QLIST_REMOVE(item, next);
 400            g_free(item);
 401        }
 402        g_free(version);
 403    }
 404}
 405
 406static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
 407{
 408    int j, ret;
 409    gnutls_hash_hd_t dig;
 410    QEMUIOVector *qiov = &acb->qcrs[i].qiov;
 411
 412    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
 413
 414    if (ret < 0) {
 415        return ret;
 416    }
 417
 418    for (j = 0; j < qiov->niov; j++) {
 419        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
 420        if (ret < 0) {
 421            break;
 422        }
 423    }
 424
 425    gnutls_hash_deinit(dig, (void *) hash);
 426    return ret;
 427}
 428
 429static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
 430{
 431    int max = 0;
 432    QuorumVoteVersion *candidate, *winner = NULL;
 433
 434    QLIST_FOREACH(candidate, &votes->vote_list, next) {
 435        if (candidate->vote_count > max) {
 436            max = candidate->vote_count;
 437            winner = candidate;
 438        }
 439    }
 440
 441    return winner;
 442}
 443
 444/* qemu_iovec_compare is handy for blkverify mode because it returns the first
 445 * differing byte location. Yet it is handcoded to compare vectors one byte
 446 * after another so it does not benefit from the libc SIMD optimizations.
 447 * quorum_iovec_compare is written for speed and should be used in the non
 448 * blkverify mode of quorum.
 449 */
 450static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
 451{
 452    int i;
 453    int result;
 454
 455    assert(a->niov == b->niov);
 456    for (i = 0; i < a->niov; i++) {
 457        assert(a->iov[i].iov_len == b->iov[i].iov_len);
 458        result = memcmp(a->iov[i].iov_base,
 459                        b->iov[i].iov_base,
 460                        a->iov[i].iov_len);
 461        if (result) {
 462            return false;
 463        }
 464    }
 465
 466    return true;
 467}
 468
 469static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
 470                                          const char *fmt, ...)
 471{
 472    va_list ap;
 473
 474    va_start(ap, fmt);
 475    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
 476            acb->sector_num, acb->nb_sectors);
 477    vfprintf(stderr, fmt, ap);
 478    fprintf(stderr, "\n");
 479    va_end(ap);
 480    exit(1);
 481}
 482
 483static bool quorum_compare(QuorumAIOCB *acb,
 484                           QEMUIOVector *a,
 485                           QEMUIOVector *b)
 486{
 487    BDRVQuorumState *s = acb->common.bs->opaque;
 488    ssize_t offset;
 489
 490    /* This driver will replace blkverify in this particular case */
 491    if (s->is_blkverify) {
 492        offset = qemu_iovec_compare(a, b);
 493        if (offset != -1) {
 494            quorum_err(acb, "contents mismatch in sector %" PRId64,
 495                       acb->sector_num +
 496                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
 497        }
 498        return true;
 499    }
 500
 501    return quorum_iovec_compare(a, b);
 502}
 503
 504/* Do a vote to get the error code */
 505static int quorum_vote_error(QuorumAIOCB *acb)
 506{
 507    BDRVQuorumState *s = acb->common.bs->opaque;
 508    QuorumVoteVersion *winner = NULL;
 509    QuorumVotes error_votes;
 510    QuorumVoteValue result_value;
 511    int i, ret = 0;
 512    bool error = false;
 513
 514    QLIST_INIT(&error_votes.vote_list);
 515    error_votes.compare = quorum_64bits_compare;
 516
 517    for (i = 0; i < s->num_children; i++) {
 518        ret = acb->qcrs[i].ret;
 519        if (ret) {
 520            error = true;
 521            result_value.l = ret;
 522            quorum_count_vote(&error_votes, &result_value, i);
 523        }
 524    }
 525
 526    if (error) {
 527        winner = quorum_get_vote_winner(&error_votes);
 528        ret = winner->value.l;
 529    }
 530
 531    quorum_free_vote_list(&error_votes);
 532
 533    return ret;
 534}
 535
 536static bool quorum_vote(QuorumAIOCB *acb)
 537{
 538    bool quorum = true;
 539    bool rewrite = false;
 540    int i, j, ret;
 541    QuorumVoteValue hash;
 542    BDRVQuorumState *s = acb->common.bs->opaque;
 543    QuorumVoteVersion *winner;
 544
 545    if (quorum_has_too_much_io_failed(acb)) {
 546        return false;
 547    }
 548
 549    /* get the index of the first successful read */
 550    for (i = 0; i < s->num_children; i++) {
 551        if (!acb->qcrs[i].ret) {
 552            break;
 553        }
 554    }
 555
 556    assert(i < s->num_children);
 557
 558    /* compare this read with all other successful reads stopping at quorum
 559     * failure
 560     */
 561    for (j = i + 1; j < s->num_children; j++) {
 562        if (acb->qcrs[j].ret) {
 563            continue;
 564        }
 565        quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
 566        if (!quorum) {
 567            break;
 568       }
 569    }
 570
 571    /* Every successful read agrees */
 572    if (quorum) {
 573        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
 574        return false;
 575    }
 576
 577    /* compute hashes for each successful read, also store indexes */
 578    for (i = 0; i < s->num_children; i++) {
 579        if (acb->qcrs[i].ret) {
 580            continue;
 581        }
 582        ret = quorum_compute_hash(acb, i, &hash);
 583        /* if ever the hash computation failed */
 584        if (ret < 0) {
 585            acb->vote_ret = ret;
 586            goto free_exit;
 587        }
 588        quorum_count_vote(&acb->votes, &hash, i);
 589    }
 590
 591    /* vote to select the most represented version */
 592    winner = quorum_get_vote_winner(&acb->votes);
 593
 594    /* if the winner count is smaller than threshold the read fails */
 595    if (winner->vote_count < s->threshold) {
 596        quorum_report_failure(acb);
 597        acb->vote_ret = -EIO;
 598        goto free_exit;
 599    }
 600
 601    /* we have a winner: copy it */
 602    quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
 603
 604    /* some versions are bad print them */
 605    quorum_report_bad_versions(s, acb, &winner->value);
 606
 607    /* corruption correction is enabled */
 608    if (s->rewrite_corrupted) {
 609        rewrite = quorum_rewrite_bad_versions(s, acb, &winner->value);
 610    }
 611
 612free_exit:
 613    /* free lists */
 614    quorum_free_vote_list(&acb->votes);
 615    return rewrite;
 616}
 617
 618static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
 619                                         int64_t sector_num,
 620                                         QEMUIOVector *qiov,
 621                                         int nb_sectors,
 622                                         BlockDriverCompletionFunc *cb,
 623                                         void *opaque)
 624{
 625    BDRVQuorumState *s = bs->opaque;
 626    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
 627                                      nb_sectors, cb, opaque);
 628    int i;
 629
 630    acb->is_read = true;
 631
 632    for (i = 0; i < s->num_children; i++) {
 633        acb->qcrs[i].buf = qemu_blockalign(s->bs[i], qiov->size);
 634        qemu_iovec_init(&acb->qcrs[i].qiov, qiov->niov);
 635        qemu_iovec_clone(&acb->qcrs[i].qiov, qiov, acb->qcrs[i].buf);
 636    }
 637
 638    for (i = 0; i < s->num_children; i++) {
 639        bdrv_aio_readv(s->bs[i], sector_num, &acb->qcrs[i].qiov, nb_sectors,
 640                       quorum_aio_cb, &acb->qcrs[i]);
 641    }
 642
 643    return &acb->common;
 644}
 645
 646static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
 647                                          int64_t sector_num,
 648                                          QEMUIOVector *qiov,
 649                                          int nb_sectors,
 650                                          BlockDriverCompletionFunc *cb,
 651                                          void *opaque)
 652{
 653    BDRVQuorumState *s = bs->opaque;
 654    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors,
 655                                      cb, opaque);
 656    int i;
 657
 658    for (i = 0; i < s->num_children; i++) {
 659        acb->qcrs[i].aiocb = bdrv_aio_writev(s->bs[i], sector_num, qiov,
 660                                             nb_sectors, &quorum_aio_cb,
 661                                             &acb->qcrs[i]);
 662    }
 663
 664    return &acb->common;
 665}
 666
 667static int64_t quorum_getlength(BlockDriverState *bs)
 668{
 669    BDRVQuorumState *s = bs->opaque;
 670    int64_t result;
 671    int i;
 672
 673    /* check that all file have the same length */
 674    result = bdrv_getlength(s->bs[0]);
 675    if (result < 0) {
 676        return result;
 677    }
 678    for (i = 1; i < s->num_children; i++) {
 679        int64_t value = bdrv_getlength(s->bs[i]);
 680        if (value < 0) {
 681            return value;
 682        }
 683        if (value != result) {
 684            return -EIO;
 685        }
 686    }
 687
 688    return result;
 689}
 690
 691static void quorum_invalidate_cache(BlockDriverState *bs, Error **errp)
 692{
 693    BDRVQuorumState *s = bs->opaque;
 694    Error *local_err = NULL;
 695    int i;
 696
 697    for (i = 0; i < s->num_children; i++) {
 698        bdrv_invalidate_cache(s->bs[i], &local_err);
 699        if (local_err) {
 700            error_propagate(errp, local_err);
 701            return;
 702        }
 703    }
 704}
 705
 706static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
 707{
 708    BDRVQuorumState *s = bs->opaque;
 709    QuorumVoteVersion *winner = NULL;
 710    QuorumVotes error_votes;
 711    QuorumVoteValue result_value;
 712    int i;
 713    int result = 0;
 714
 715    QLIST_INIT(&error_votes.vote_list);
 716    error_votes.compare = quorum_64bits_compare;
 717
 718    for (i = 0; i < s->num_children; i++) {
 719        result = bdrv_co_flush(s->bs[i]);
 720        result_value.l = result;
 721        quorum_count_vote(&error_votes, &result_value, i);
 722    }
 723
 724    winner = quorum_get_vote_winner(&error_votes);
 725    result = winner->value.l;
 726
 727    quorum_free_vote_list(&error_votes);
 728
 729    return result;
 730}
 731
 732static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
 733                                               BlockDriverState *candidate)
 734{
 735    BDRVQuorumState *s = bs->opaque;
 736    int i;
 737
 738    for (i = 0; i < s->num_children; i++) {
 739        bool perm = bdrv_recurse_is_first_non_filter(s->bs[i],
 740                                                     candidate);
 741        if (perm) {
 742            return true;
 743        }
 744    }
 745
 746    return false;
 747}
 748
 749static int quorum_valid_threshold(int threshold, int num_children, Error **errp)
 750{
 751
 752    if (threshold < 1) {
 753        error_set(errp, QERR_INVALID_PARAMETER_VALUE,
 754                  "vote-threshold", "value >= 1");
 755        return -ERANGE;
 756    }
 757
 758    if (threshold > num_children) {
 759        error_setg(errp, "threshold may not exceed children count");
 760        return -ERANGE;
 761    }
 762
 763    return 0;
 764}
 765
 766static QemuOptsList quorum_runtime_opts = {
 767    .name = "quorum",
 768    .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
 769    .desc = {
 770        {
 771            .name = QUORUM_OPT_VOTE_THRESHOLD,
 772            .type = QEMU_OPT_NUMBER,
 773            .help = "The number of vote needed for reaching quorum",
 774        },
 775        {
 776            .name = QUORUM_OPT_BLKVERIFY,
 777            .type = QEMU_OPT_BOOL,
 778            .help = "Trigger block verify mode if set",
 779        },
 780        {
 781            .name = QUORUM_OPT_REWRITE,
 782            .type = QEMU_OPT_BOOL,
 783            .help = "Rewrite corrupted block on read quorum",
 784        },
 785        { /* end of list */ }
 786    },
 787};
 788
 789static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
 790                       Error **errp)
 791{
 792    BDRVQuorumState *s = bs->opaque;
 793    Error *local_err = NULL;
 794    QemuOpts *opts;
 795    bool *opened;
 796    QDict *sub = NULL;
 797    QList *list = NULL;
 798    const QListEntry *lentry;
 799    int i;
 800    int ret = 0;
 801
 802    qdict_flatten(options);
 803    qdict_extract_subqdict(options, &sub, "children.");
 804    qdict_array_split(sub, &list);
 805
 806    if (qdict_size(sub)) {
 807        error_setg(&local_err, "Invalid option children.%s",
 808                   qdict_first(sub)->key);
 809        ret = -EINVAL;
 810        goto exit;
 811    }
 812
 813    /* count how many different children are present */
 814    s->num_children = qlist_size(list);
 815    if (s->num_children < 2) {
 816        error_setg(&local_err,
 817                   "Number of provided children must be greater than 1");
 818        ret = -EINVAL;
 819        goto exit;
 820    }
 821
 822    opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
 823    qemu_opts_absorb_qdict(opts, options, &local_err);
 824    if (local_err) {
 825        ret = -EINVAL;
 826        goto exit;
 827    }
 828
 829    s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
 830
 831    /* and validate it against s->num_children */
 832    ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err);
 833    if (ret < 0) {
 834        goto exit;
 835    }
 836
 837    /* is the driver in blkverify mode */
 838    if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) &&
 839        s->num_children == 2 && s->threshold == 2) {
 840        s->is_blkverify = true;
 841    } else if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false)) {
 842        fprintf(stderr, "blkverify mode is set by setting blkverify=on "
 843                "and using two files with vote_threshold=2\n");
 844    }
 845
 846    s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE, false);
 847    if (s->rewrite_corrupted && s->is_blkverify) {
 848        error_setg(&local_err,
 849                   "rewrite-corrupted=on cannot be used with blkverify=on");
 850        ret = -EINVAL;
 851        goto exit;
 852    }
 853
 854    /* allocate the children BlockDriverState array */
 855    s->bs = g_new0(BlockDriverState *, s->num_children);
 856    opened = g_new0(bool, s->num_children);
 857
 858    for (i = 0, lentry = qlist_first(list); lentry;
 859         lentry = qlist_next(lentry), i++) {
 860        QDict *d;
 861        QString *string;
 862
 863        switch (qobject_type(lentry->value))
 864        {
 865            /* List of options */
 866            case QTYPE_QDICT:
 867                d = qobject_to_qdict(lentry->value);
 868                QINCREF(d);
 869                ret = bdrv_open(&s->bs[i], NULL, NULL, d, flags, NULL,
 870                                &local_err);
 871                break;
 872
 873            /* QMP reference */
 874            case QTYPE_QSTRING:
 875                string = qobject_to_qstring(lentry->value);
 876                ret = bdrv_open(&s->bs[i], NULL, qstring_get_str(string), NULL,
 877                                flags, NULL, &local_err);
 878                break;
 879
 880            default:
 881                error_setg(&local_err, "Specification of child block device %i "
 882                           "is invalid", i);
 883                ret = -EINVAL;
 884        }
 885
 886        if (ret < 0) {
 887            goto close_exit;
 888        }
 889        opened[i] = true;
 890    }
 891
 892    g_free(opened);
 893    goto exit;
 894
 895close_exit:
 896    /* cleanup on error */
 897    for (i = 0; i < s->num_children; i++) {
 898        if (!opened[i]) {
 899            continue;
 900        }
 901        bdrv_unref(s->bs[i]);
 902    }
 903    g_free(s->bs);
 904    g_free(opened);
 905exit:
 906    /* propagate error */
 907    if (local_err) {
 908        error_propagate(errp, local_err);
 909    }
 910    QDECREF(list);
 911    QDECREF(sub);
 912    return ret;
 913}
 914
 915static void quorum_close(BlockDriverState *bs)
 916{
 917    BDRVQuorumState *s = bs->opaque;
 918    int i;
 919
 920    for (i = 0; i < s->num_children; i++) {
 921        bdrv_unref(s->bs[i]);
 922    }
 923
 924    g_free(s->bs);
 925}
 926
 927static void quorum_detach_aio_context(BlockDriverState *bs)
 928{
 929    BDRVQuorumState *s = bs->opaque;
 930    int i;
 931
 932    for (i = 0; i < s->num_children; i++) {
 933        bdrv_detach_aio_context(s->bs[i]);
 934    }
 935}
 936
 937static void quorum_attach_aio_context(BlockDriverState *bs,
 938                                      AioContext *new_context)
 939{
 940    BDRVQuorumState *s = bs->opaque;
 941    int i;
 942
 943    for (i = 0; i < s->num_children; i++) {
 944        bdrv_attach_aio_context(s->bs[i], new_context);
 945    }
 946}
 947
 948static BlockDriver bdrv_quorum = {
 949    .format_name                        = "quorum",
 950    .protocol_name                      = "quorum",
 951
 952    .instance_size                      = sizeof(BDRVQuorumState),
 953
 954    .bdrv_file_open                     = quorum_open,
 955    .bdrv_close                         = quorum_close,
 956
 957    .bdrv_co_flush_to_disk              = quorum_co_flush,
 958
 959    .bdrv_getlength                     = quorum_getlength,
 960
 961    .bdrv_aio_readv                     = quorum_aio_readv,
 962    .bdrv_aio_writev                    = quorum_aio_writev,
 963    .bdrv_invalidate_cache              = quorum_invalidate_cache,
 964
 965    .bdrv_detach_aio_context            = quorum_detach_aio_context,
 966    .bdrv_attach_aio_context            = quorum_attach_aio_context,
 967
 968    .is_filter                          = true,
 969    .bdrv_recurse_is_first_non_filter   = quorum_recurse_is_first_non_filter,
 970};
 971
 972static void bdrv_quorum_init(void)
 973{
 974    bdrv_register(&bdrv_quorum);
 975}
 976
 977block_init(bdrv_quorum_init);
 978