qemu/block/quorum.c
<<
>>
Prefs
   1/*
   2 * Quorum Block filter
   3 *
   4 * Copyright (C) 2012-2014 Nodalink, EURL.
   5 *
   6 * Author:
   7 *   BenoƮt Canet <benoit.canet@irqsave.net>
   8 *
   9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
  10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
  11 *
  12 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  13 * See the COPYING file in the top-level directory.
  14 */
  15
  16#include "qemu/osdep.h"
  17#include "qemu/cutils.h"
  18#include "block/block_int.h"
  19#include "qapi/qmp/qbool.h"
  20#include "qapi/qmp/qdict.h"
  21#include "qapi/qmp/qerror.h"
  22#include "qapi/qmp/qjson.h"
  23#include "qapi/qmp/qlist.h"
  24#include "qapi/qmp/qstring.h"
  25#include "qapi-event.h"
  26#include "crypto/hash.h"
  27
  28#define HASH_LENGTH 32
  29
  30#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
  31#define QUORUM_OPT_BLKVERIFY      "blkverify"
  32#define QUORUM_OPT_REWRITE        "rewrite-corrupted"
  33#define QUORUM_OPT_READ_PATTERN   "read-pattern"
  34
  35/* This union holds a vote hash value */
  36typedef union QuorumVoteValue {
  37    uint8_t h[HASH_LENGTH];    /* SHA-256 hash */
  38    int64_t l;                 /* simpler 64 bits hash */
  39} QuorumVoteValue;
  40
  41/* A vote item */
  42typedef struct QuorumVoteItem {
  43    int index;
  44    QLIST_ENTRY(QuorumVoteItem) next;
  45} QuorumVoteItem;
  46
  47/* this structure is a vote version. A version is the set of votes sharing the
  48 * same vote value.
  49 * The set of votes will be tracked with the items field and its cardinality is
  50 * vote_count.
  51 */
  52typedef struct QuorumVoteVersion {
  53    QuorumVoteValue value;
  54    int index;
  55    int vote_count;
  56    QLIST_HEAD(, QuorumVoteItem) items;
  57    QLIST_ENTRY(QuorumVoteVersion) next;
  58} QuorumVoteVersion;
  59
  60/* this structure holds a group of vote versions together */
  61typedef struct QuorumVotes {
  62    QLIST_HEAD(, QuorumVoteVersion) vote_list;
  63    bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
  64} QuorumVotes;
  65
  66/* the following structure holds the state of one quorum instance */
  67typedef struct BDRVQuorumState {
  68    BdrvChild **children;  /* children BlockDriverStates */
  69    int num_children;      /* children count */
  70    unsigned next_child_index;  /* the index of the next child that should
  71                                 * be added
  72                                 */
  73    int threshold;         /* if less than threshold children reads gave the
  74                            * same result a quorum error occurs.
  75                            */
  76    bool is_blkverify;     /* true if the driver is in blkverify mode
  77                            * Writes are mirrored on two children devices.
  78                            * On reads the two children devices' contents are
  79                            * compared and if a difference is spotted its
  80                            * location is printed and the code aborts.
  81                            * It is useful to debug other block drivers by
  82                            * comparing them with a reference one.
  83                            */
  84    bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted
  85                            * block if Quorum is reached.
  86                            */
  87
  88    QuorumReadPattern read_pattern;
  89} BDRVQuorumState;
  90
  91typedef struct QuorumAIOCB QuorumAIOCB;
  92
  93/* Quorum will create one instance of the following structure per operation it
  94 * performs on its children.
  95 * So for each read/write operation coming from the upper layer there will be
  96 * $children_count QuorumChildRequest.
  97 */
  98typedef struct QuorumChildRequest {
  99    BlockDriverState *bs;
 100    QEMUIOVector qiov;
 101    uint8_t *buf;
 102    int ret;
 103    QuorumAIOCB *parent;
 104} QuorumChildRequest;
 105
 106/* Quorum will use the following structure to track progress of each read/write
 107 * operation received by the upper layer.
 108 * This structure hold pointers to the QuorumChildRequest structures instances
 109 * used to do operations on each children and track overall progress.
 110 */
 111struct QuorumAIOCB {
 112    BlockDriverState *bs;
 113    Coroutine *co;
 114
 115    /* Request metadata */
 116    uint64_t offset;
 117    uint64_t bytes;
 118
 119    QEMUIOVector *qiov;         /* calling IOV */
 120
 121    QuorumChildRequest *qcrs;   /* individual child requests */
 122    int count;                  /* number of completed AIOCB */
 123    int success_count;          /* number of successfully completed AIOCB */
 124
 125    int rewrite_count;          /* number of replica to rewrite: count down to
 126                                 * zero once writes are fired
 127                                 */
 128
 129    QuorumVotes votes;
 130
 131    bool is_read;
 132    int vote_ret;
 133    int children_read;          /* how many children have been read from */
 134};
 135
 136typedef struct QuorumCo {
 137    QuorumAIOCB *acb;
 138    int idx;
 139} QuorumCo;
 140
 141static void quorum_aio_finalize(QuorumAIOCB *acb)
 142{
 143    g_free(acb->qcrs);
 144    g_free(acb);
 145}
 146
 147static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 148{
 149    return !memcmp(a->h, b->h, HASH_LENGTH);
 150}
 151
 152static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 153{
 154    return a->l == b->l;
 155}
 156
 157static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
 158                                   QEMUIOVector *qiov,
 159                                   uint64_t offset,
 160                                   uint64_t bytes)
 161{
 162    BDRVQuorumState *s = bs->opaque;
 163    QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
 164    int i;
 165
 166    *acb = (QuorumAIOCB) {
 167        .co                 = qemu_coroutine_self(),
 168        .bs                 = bs,
 169        .offset             = offset,
 170        .bytes              = bytes,
 171        .qiov               = qiov,
 172        .votes.compare      = quorum_sha256_compare,
 173        .votes.vote_list    = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
 174    };
 175
 176    acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
 177    for (i = 0; i < s->num_children; i++) {
 178        acb->qcrs[i].buf = NULL;
 179        acb->qcrs[i].ret = 0;
 180        acb->qcrs[i].parent = acb;
 181    }
 182
 183    return acb;
 184}
 185
 186static void quorum_report_bad(QuorumOpType type, uint64_t offset,
 187                              uint64_t bytes, char *node_name, int ret)
 188{
 189    const char *msg = NULL;
 190    int64_t start_sector = offset / BDRV_SECTOR_SIZE;
 191    int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE);
 192
 193    if (ret < 0) {
 194        msg = strerror(-ret);
 195    }
 196
 197    qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
 198                                      end_sector - start_sector, &error_abort);
 199}
 200
 201static void quorum_report_failure(QuorumAIOCB *acb)
 202{
 203    const char *reference = bdrv_get_device_or_node_name(acb->bs);
 204    int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE;
 205    int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes,
 206                                      BDRV_SECTOR_SIZE);
 207
 208    qapi_event_send_quorum_failure(reference, start_sector,
 209                                   end_sector - start_sector, &error_abort);
 210}
 211
 212static int quorum_vote_error(QuorumAIOCB *acb);
 213
 214static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
 215{
 216    BDRVQuorumState *s = acb->bs->opaque;
 217
 218    if (acb->success_count < s->threshold) {
 219        acb->vote_ret = quorum_vote_error(acb);
 220        quorum_report_failure(acb);
 221        return true;
 222    }
 223
 224    return false;
 225}
 226
 227static int read_fifo_child(QuorumAIOCB *acb);
 228
 229static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
 230{
 231    int i;
 232    assert(dest->niov == source->niov);
 233    assert(dest->size == source->size);
 234    for (i = 0; i < source->niov; i++) {
 235        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
 236        memcpy(dest->iov[i].iov_base,
 237               source->iov[i].iov_base,
 238               source->iov[i].iov_len);
 239    }
 240}
 241
 242static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret)
 243{
 244    QuorumAIOCB *acb = sacb->parent;
 245    QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
 246    quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret);
 247}
 248
 249static void quorum_report_bad_versions(BDRVQuorumState *s,
 250                                       QuorumAIOCB *acb,
 251                                       QuorumVoteValue *value)
 252{
 253    QuorumVoteVersion *version;
 254    QuorumVoteItem *item;
 255
 256    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 257        if (acb->votes.compare(&version->value, value)) {
 258            continue;
 259        }
 260        QLIST_FOREACH(item, &version->items, next) {
 261            quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes,
 262                              s->children[item->index]->bs->node_name, 0);
 263        }
 264    }
 265}
 266
 267static void quorum_rewrite_entry(void *opaque)
 268{
 269    QuorumCo *co = opaque;
 270    QuorumAIOCB *acb = co->acb;
 271    BDRVQuorumState *s = acb->bs->opaque;
 272
 273    /* Ignore any errors, it's just a correction attempt for already
 274     * corrupted data. */
 275    bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
 276                    acb->qiov, 0);
 277
 278    /* Wake up the caller after the last rewrite */
 279    acb->rewrite_count--;
 280    if (!acb->rewrite_count) {
 281        qemu_coroutine_enter_if_inactive(acb->co);
 282    }
 283}
 284
 285static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb,
 286                                        QuorumVoteValue *value)
 287{
 288    QuorumVoteVersion *version;
 289    QuorumVoteItem *item;
 290    int count = 0;
 291
 292    /* first count the number of bad versions: done first to avoid concurrency
 293     * issues.
 294     */
 295    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 296        if (acb->votes.compare(&version->value, value)) {
 297            continue;
 298        }
 299        QLIST_FOREACH(item, &version->items, next) {
 300            count++;
 301        }
 302    }
 303
 304    /* quorum_rewrite_entry will count down this to zero */
 305    acb->rewrite_count = count;
 306
 307    /* now fire the correcting rewrites */
 308    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 309        if (acb->votes.compare(&version->value, value)) {
 310            continue;
 311        }
 312        QLIST_FOREACH(item, &version->items, next) {
 313            Coroutine *co;
 314            QuorumCo data = {
 315                .acb = acb,
 316                .idx = item->index,
 317            };
 318
 319            co = qemu_coroutine_create(quorum_rewrite_entry, &data);
 320            qemu_coroutine_enter(co);
 321        }
 322    }
 323
 324    /* return true if any rewrite is done else false */
 325    return count;
 326}
 327
 328static void quorum_count_vote(QuorumVotes *votes,
 329                              QuorumVoteValue *value,
 330                              int index)
 331{
 332    QuorumVoteVersion *v = NULL, *version = NULL;
 333    QuorumVoteItem *item;
 334
 335    /* look if we have something with this hash */
 336    QLIST_FOREACH(v, &votes->vote_list, next) {
 337        if (votes->compare(&v->value, value)) {
 338            version = v;
 339            break;
 340        }
 341    }
 342
 343    /* It's a version not yet in the list add it */
 344    if (!version) {
 345        version = g_new0(QuorumVoteVersion, 1);
 346        QLIST_INIT(&version->items);
 347        memcpy(&version->value, value, sizeof(version->value));
 348        version->index = index;
 349        version->vote_count = 0;
 350        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
 351    }
 352
 353    version->vote_count++;
 354
 355    item = g_new0(QuorumVoteItem, 1);
 356    item->index = index;
 357    QLIST_INSERT_HEAD(&version->items, item, next);
 358}
 359
 360static void quorum_free_vote_list(QuorumVotes *votes)
 361{
 362    QuorumVoteVersion *version, *next_version;
 363    QuorumVoteItem *item, *next_item;
 364
 365    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
 366        QLIST_REMOVE(version, next);
 367        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
 368            QLIST_REMOVE(item, next);
 369            g_free(item);
 370        }
 371        g_free(version);
 372    }
 373}
 374
 375static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
 376{
 377    QEMUIOVector *qiov = &acb->qcrs[i].qiov;
 378    size_t len = sizeof(hash->h);
 379    uint8_t *data = hash->h;
 380
 381    /* XXX - would be nice if we could pass in the Error **
 382     * and propagate that back, but this quorum code is
 383     * restricted to just errno values currently */
 384    if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256,
 385                            qiov->iov, qiov->niov,
 386                            &data, &len,
 387                            NULL) < 0) {
 388        return -EINVAL;
 389    }
 390
 391    return 0;
 392}
 393
 394static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
 395{
 396    int max = 0;
 397    QuorumVoteVersion *candidate, *winner = NULL;
 398
 399    QLIST_FOREACH(candidate, &votes->vote_list, next) {
 400        if (candidate->vote_count > max) {
 401            max = candidate->vote_count;
 402            winner = candidate;
 403        }
 404    }
 405
 406    return winner;
 407}
 408
 409/* qemu_iovec_compare is handy for blkverify mode because it returns the first
 410 * differing byte location. Yet it is handcoded to compare vectors one byte
 411 * after another so it does not benefit from the libc SIMD optimizations.
 412 * quorum_iovec_compare is written for speed and should be used in the non
 413 * blkverify mode of quorum.
 414 */
 415static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
 416{
 417    int i;
 418    int result;
 419
 420    assert(a->niov == b->niov);
 421    for (i = 0; i < a->niov; i++) {
 422        assert(a->iov[i].iov_len == b->iov[i].iov_len);
 423        result = memcmp(a->iov[i].iov_base,
 424                        b->iov[i].iov_base,
 425                        a->iov[i].iov_len);
 426        if (result) {
 427            return false;
 428        }
 429    }
 430
 431    return true;
 432}
 433
 434static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
 435                                          const char *fmt, ...)
 436{
 437    va_list ap;
 438
 439    va_start(ap, fmt);
 440    fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 " ",
 441            acb->offset, acb->bytes);
 442    vfprintf(stderr, fmt, ap);
 443    fprintf(stderr, "\n");
 444    va_end(ap);
 445    exit(1);
 446}
 447
 448static bool quorum_compare(QuorumAIOCB *acb,
 449                           QEMUIOVector *a,
 450                           QEMUIOVector *b)
 451{
 452    BDRVQuorumState *s = acb->bs->opaque;
 453    ssize_t offset;
 454
 455    /* This driver will replace blkverify in this particular case */
 456    if (s->is_blkverify) {
 457        offset = qemu_iovec_compare(a, b);
 458        if (offset != -1) {
 459            quorum_err(acb, "contents mismatch at offset %" PRIu64,
 460                       acb->offset + offset);
 461        }
 462        return true;
 463    }
 464
 465    return quorum_iovec_compare(a, b);
 466}
 467
 468/* Do a vote to get the error code */
 469static int quorum_vote_error(QuorumAIOCB *acb)
 470{
 471    BDRVQuorumState *s = acb->bs->opaque;
 472    QuorumVoteVersion *winner = NULL;
 473    QuorumVotes error_votes;
 474    QuorumVoteValue result_value;
 475    int i, ret = 0;
 476    bool error = false;
 477
 478    QLIST_INIT(&error_votes.vote_list);
 479    error_votes.compare = quorum_64bits_compare;
 480
 481    for (i = 0; i < s->num_children; i++) {
 482        ret = acb->qcrs[i].ret;
 483        if (ret) {
 484            error = true;
 485            result_value.l = ret;
 486            quorum_count_vote(&error_votes, &result_value, i);
 487        }
 488    }
 489
 490    if (error) {
 491        winner = quorum_get_vote_winner(&error_votes);
 492        ret = winner->value.l;
 493    }
 494
 495    quorum_free_vote_list(&error_votes);
 496
 497    return ret;
 498}
 499
 500static void quorum_vote(QuorumAIOCB *acb)
 501{
 502    bool quorum = true;
 503    int i, j, ret;
 504    QuorumVoteValue hash;
 505    BDRVQuorumState *s = acb->bs->opaque;
 506    QuorumVoteVersion *winner;
 507
 508    if (quorum_has_too_much_io_failed(acb)) {
 509        return;
 510    }
 511
 512    /* get the index of the first successful read */
 513    for (i = 0; i < s->num_children; i++) {
 514        if (!acb->qcrs[i].ret) {
 515            break;
 516        }
 517    }
 518
 519    assert(i < s->num_children);
 520
 521    /* compare this read with all other successful reads stopping at quorum
 522     * failure
 523     */
 524    for (j = i + 1; j < s->num_children; j++) {
 525        if (acb->qcrs[j].ret) {
 526            continue;
 527        }
 528        quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
 529        if (!quorum) {
 530            break;
 531       }
 532    }
 533
 534    /* Every successful read agrees */
 535    if (quorum) {
 536        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
 537        return;
 538    }
 539
 540    /* compute hashes for each successful read, also store indexes */
 541    for (i = 0; i < s->num_children; i++) {
 542        if (acb->qcrs[i].ret) {
 543            continue;
 544        }
 545        ret = quorum_compute_hash(acb, i, &hash);
 546        /* if ever the hash computation failed */
 547        if (ret < 0) {
 548            acb->vote_ret = ret;
 549            goto free_exit;
 550        }
 551        quorum_count_vote(&acb->votes, &hash, i);
 552    }
 553
 554    /* vote to select the most represented version */
 555    winner = quorum_get_vote_winner(&acb->votes);
 556
 557    /* if the winner count is smaller than threshold the read fails */
 558    if (winner->vote_count < s->threshold) {
 559        quorum_report_failure(acb);
 560        acb->vote_ret = -EIO;
 561        goto free_exit;
 562    }
 563
 564    /* we have a winner: copy it */
 565    quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
 566
 567    /* some versions are bad print them */
 568    quorum_report_bad_versions(s, acb, &winner->value);
 569
 570    /* corruption correction is enabled */
 571    if (s->rewrite_corrupted) {
 572        quorum_rewrite_bad_versions(acb, &winner->value);
 573    }
 574
 575free_exit:
 576    /* free lists */
 577    quorum_free_vote_list(&acb->votes);
 578}
 579
 580static void read_quorum_children_entry(void *opaque)
 581{
 582    QuorumCo *co = opaque;
 583    QuorumAIOCB *acb = co->acb;
 584    BDRVQuorumState *s = acb->bs->opaque;
 585    int i = co->idx;
 586    QuorumChildRequest *sacb = &acb->qcrs[i];
 587
 588    sacb->bs = s->children[i]->bs;
 589    sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes,
 590                               &acb->qcrs[i].qiov, 0);
 591
 592    if (sacb->ret == 0) {
 593        acb->success_count++;
 594    } else {
 595        quorum_report_bad_acb(sacb, sacb->ret);
 596    }
 597
 598    acb->count++;
 599    assert(acb->count <= s->num_children);
 600    assert(acb->success_count <= s->num_children);
 601
 602    /* Wake up the caller after the last read */
 603    if (acb->count == s->num_children) {
 604        qemu_coroutine_enter_if_inactive(acb->co);
 605    }
 606}
 607
 608static int read_quorum_children(QuorumAIOCB *acb)
 609{
 610    BDRVQuorumState *s = acb->bs->opaque;
 611    int i, ret;
 612
 613    acb->children_read = s->num_children;
 614    for (i = 0; i < s->num_children; i++) {
 615        acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size);
 616        qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov);
 617        qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf);
 618    }
 619
 620    for (i = 0; i < s->num_children; i++) {
 621        Coroutine *co;
 622        QuorumCo data = {
 623            .acb = acb,
 624            .idx = i,
 625        };
 626
 627        co = qemu_coroutine_create(read_quorum_children_entry, &data);
 628        qemu_coroutine_enter(co);
 629    }
 630
 631    while (acb->count < s->num_children) {
 632        qemu_coroutine_yield();
 633    }
 634
 635    /* Do the vote on read */
 636    quorum_vote(acb);
 637    for (i = 0; i < s->num_children; i++) {
 638        qemu_vfree(acb->qcrs[i].buf);
 639        qemu_iovec_destroy(&acb->qcrs[i].qiov);
 640    }
 641
 642    while (acb->rewrite_count) {
 643        qemu_coroutine_yield();
 644    }
 645
 646    ret = acb->vote_ret;
 647
 648    return ret;
 649}
 650
 651static int read_fifo_child(QuorumAIOCB *acb)
 652{
 653    BDRVQuorumState *s = acb->bs->opaque;
 654    int n, ret;
 655
 656    /* We try to read the next child in FIFO order if we failed to read */
 657    do {
 658        n = acb->children_read++;
 659        acb->qcrs[n].bs = s->children[n]->bs;
 660        ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes,
 661                             acb->qiov, 0);
 662        if (ret < 0) {
 663            quorum_report_bad_acb(&acb->qcrs[n], ret);
 664        }
 665    } while (ret < 0 && acb->children_read < s->num_children);
 666
 667    /* FIXME: rewrite failed children if acb->children_read > 1? */
 668
 669    return ret;
 670}
 671
 672static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset,
 673                            uint64_t bytes, QEMUIOVector *qiov, int flags)
 674{
 675    BDRVQuorumState *s = bs->opaque;
 676    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
 677    int ret;
 678
 679    acb->is_read = true;
 680    acb->children_read = 0;
 681
 682    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
 683        ret = read_quorum_children(acb);
 684    } else {
 685        ret = read_fifo_child(acb);
 686    }
 687    quorum_aio_finalize(acb);
 688
 689    return ret;
 690}
 691
 692static void write_quorum_entry(void *opaque)
 693{
 694    QuorumCo *co = opaque;
 695    QuorumAIOCB *acb = co->acb;
 696    BDRVQuorumState *s = acb->bs->opaque;
 697    int i = co->idx;
 698    QuorumChildRequest *sacb = &acb->qcrs[i];
 699
 700    sacb->bs = s->children[i]->bs;
 701    sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
 702                                acb->qiov, 0);
 703    if (sacb->ret == 0) {
 704        acb->success_count++;
 705    } else {
 706        quorum_report_bad_acb(sacb, sacb->ret);
 707    }
 708    acb->count++;
 709    assert(acb->count <= s->num_children);
 710    assert(acb->success_count <= s->num_children);
 711
 712    /* Wake up the caller after the last write */
 713    if (acb->count == s->num_children) {
 714        qemu_coroutine_enter_if_inactive(acb->co);
 715    }
 716}
 717
 718static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset,
 719                             uint64_t bytes, QEMUIOVector *qiov, int flags)
 720{
 721    BDRVQuorumState *s = bs->opaque;
 722    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
 723    int i, ret;
 724
 725    for (i = 0; i < s->num_children; i++) {
 726        Coroutine *co;
 727        QuorumCo data = {
 728            .acb = acb,
 729            .idx = i,
 730        };
 731
 732        co = qemu_coroutine_create(write_quorum_entry, &data);
 733        qemu_coroutine_enter(co);
 734    }
 735
 736    while (acb->count < s->num_children) {
 737        qemu_coroutine_yield();
 738    }
 739
 740    quorum_has_too_much_io_failed(acb);
 741
 742    ret = acb->vote_ret;
 743    quorum_aio_finalize(acb);
 744
 745    return ret;
 746}
 747
 748static int64_t quorum_getlength(BlockDriverState *bs)
 749{
 750    BDRVQuorumState *s = bs->opaque;
 751    int64_t result;
 752    int i;
 753
 754    /* check that all file have the same length */
 755    result = bdrv_getlength(s->children[0]->bs);
 756    if (result < 0) {
 757        return result;
 758    }
 759    for (i = 1; i < s->num_children; i++) {
 760        int64_t value = bdrv_getlength(s->children[i]->bs);
 761        if (value < 0) {
 762            return value;
 763        }
 764        if (value != result) {
 765            return -EIO;
 766        }
 767    }
 768
 769    return result;
 770}
 771
 772static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
 773{
 774    BDRVQuorumState *s = bs->opaque;
 775    QuorumVoteVersion *winner = NULL;
 776    QuorumVotes error_votes;
 777    QuorumVoteValue result_value;
 778    int i;
 779    int result = 0;
 780    int success_count = 0;
 781
 782    QLIST_INIT(&error_votes.vote_list);
 783    error_votes.compare = quorum_64bits_compare;
 784
 785    for (i = 0; i < s->num_children; i++) {
 786        result = bdrv_co_flush(s->children[i]->bs);
 787        if (result) {
 788            quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 0,
 789                              s->children[i]->bs->node_name, result);
 790            result_value.l = result;
 791            quorum_count_vote(&error_votes, &result_value, i);
 792        } else {
 793            success_count++;
 794        }
 795    }
 796
 797    if (success_count >= s->threshold) {
 798        result = 0;
 799    } else {
 800        winner = quorum_get_vote_winner(&error_votes);
 801        result = winner->value.l;
 802    }
 803    quorum_free_vote_list(&error_votes);
 804
 805    return result;
 806}
 807
 808static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
 809                                               BlockDriverState *candidate)
 810{
 811    BDRVQuorumState *s = bs->opaque;
 812    int i;
 813
 814    for (i = 0; i < s->num_children; i++) {
 815        bool perm = bdrv_recurse_is_first_non_filter(s->children[i]->bs,
 816                                                     candidate);
 817        if (perm) {
 818            return true;
 819        }
 820    }
 821
 822    return false;
 823}
 824
 825static int quorum_valid_threshold(int threshold, int num_children, Error **errp)
 826{
 827
 828    if (threshold < 1) {
 829        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
 830                   "vote-threshold", "value >= 1");
 831        return -ERANGE;
 832    }
 833
 834    if (threshold > num_children) {
 835        error_setg(errp, "threshold may not exceed children count");
 836        return -ERANGE;
 837    }
 838
 839    return 0;
 840}
 841
 842static QemuOptsList quorum_runtime_opts = {
 843    .name = "quorum",
 844    .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
 845    .desc = {
 846        {
 847            .name = QUORUM_OPT_VOTE_THRESHOLD,
 848            .type = QEMU_OPT_NUMBER,
 849            .help = "The number of vote needed for reaching quorum",
 850        },
 851        {
 852            .name = QUORUM_OPT_BLKVERIFY,
 853            .type = QEMU_OPT_BOOL,
 854            .help = "Trigger block verify mode if set",
 855        },
 856        {
 857            .name = QUORUM_OPT_REWRITE,
 858            .type = QEMU_OPT_BOOL,
 859            .help = "Rewrite corrupted block on read quorum",
 860        },
 861        {
 862            .name = QUORUM_OPT_READ_PATTERN,
 863            .type = QEMU_OPT_STRING,
 864            .help = "Allowed pattern: quorum, fifo. Quorum is default",
 865        },
 866        { /* end of list */ }
 867    },
 868};
 869
 870static int parse_read_pattern(const char *opt)
 871{
 872    int i;
 873
 874    if (!opt) {
 875        /* Set quorum as default */
 876        return QUORUM_READ_PATTERN_QUORUM;
 877    }
 878
 879    for (i = 0; i < QUORUM_READ_PATTERN__MAX; i++) {
 880        if (!strcmp(opt, QuorumReadPattern_lookup[i])) {
 881            return i;
 882        }
 883    }
 884
 885    return -EINVAL;
 886}
 887
 888static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
 889                       Error **errp)
 890{
 891    BDRVQuorumState *s = bs->opaque;
 892    Error *local_err = NULL;
 893    QemuOpts *opts = NULL;
 894    bool *opened;
 895    int i;
 896    int ret = 0;
 897
 898    qdict_flatten(options);
 899
 900    /* count how many different children are present */
 901    s->num_children = qdict_array_entries(options, "children.");
 902    if (s->num_children < 0) {
 903        error_setg(&local_err, "Option children is not a valid array");
 904        ret = -EINVAL;
 905        goto exit;
 906    }
 907    if (s->num_children < 1) {
 908        error_setg(&local_err,
 909                   "Number of provided children must be 1 or more");
 910        ret = -EINVAL;
 911        goto exit;
 912    }
 913
 914    opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
 915    qemu_opts_absorb_qdict(opts, options, &local_err);
 916    if (local_err) {
 917        ret = -EINVAL;
 918        goto exit;
 919    }
 920
 921    s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
 922    /* and validate it against s->num_children */
 923    ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err);
 924    if (ret < 0) {
 925        goto exit;
 926    }
 927
 928    ret = parse_read_pattern(qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN));
 929    if (ret < 0) {
 930        error_setg(&local_err, "Please set read-pattern as fifo or quorum");
 931        goto exit;
 932    }
 933    s->read_pattern = ret;
 934
 935    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
 936        /* is the driver in blkverify mode */
 937        if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) &&
 938            s->num_children == 2 && s->threshold == 2) {
 939            s->is_blkverify = true;
 940        } else if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false)) {
 941            fprintf(stderr, "blkverify mode is set by setting blkverify=on "
 942                    "and using two files with vote_threshold=2\n");
 943        }
 944
 945        s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
 946                                                 false);
 947        if (s->rewrite_corrupted && s->is_blkverify) {
 948            error_setg(&local_err,
 949                       "rewrite-corrupted=on cannot be used with blkverify=on");
 950            ret = -EINVAL;
 951            goto exit;
 952        }
 953    }
 954
 955    /* allocate the children array */
 956    s->children = g_new0(BdrvChild *, s->num_children);
 957    opened = g_new0(bool, s->num_children);
 958
 959    for (i = 0; i < s->num_children; i++) {
 960        char indexstr[32];
 961        ret = snprintf(indexstr, 32, "children.%d", i);
 962        assert(ret < 32);
 963
 964        s->children[i] = bdrv_open_child(NULL, options, indexstr, bs,
 965                                         &child_format, false, &local_err);
 966        if (local_err) {
 967            ret = -EINVAL;
 968            goto close_exit;
 969        }
 970
 971        opened[i] = true;
 972    }
 973    s->next_child_index = s->num_children;
 974
 975    g_free(opened);
 976    goto exit;
 977
 978close_exit:
 979    /* cleanup on error */
 980    for (i = 0; i < s->num_children; i++) {
 981        if (!opened[i]) {
 982            continue;
 983        }
 984        bdrv_unref_child(bs, s->children[i]);
 985    }
 986    g_free(s->children);
 987    g_free(opened);
 988exit:
 989    qemu_opts_del(opts);
 990    /* propagate error */
 991    error_propagate(errp, local_err);
 992    return ret;
 993}
 994
 995static void quorum_close(BlockDriverState *bs)
 996{
 997    BDRVQuorumState *s = bs->opaque;
 998    int i;
 999
1000    for (i = 0; i < s->num_children; i++) {
1001        bdrv_unref_child(bs, s->children[i]);
1002    }
1003
1004    g_free(s->children);
1005}
1006
1007static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
1008                             Error **errp)
1009{
1010    BDRVQuorumState *s = bs->opaque;
1011    BdrvChild *child;
1012    char indexstr[32];
1013    int ret;
1014
1015    assert(s->num_children <= INT_MAX / sizeof(BdrvChild *));
1016    if (s->num_children == INT_MAX / sizeof(BdrvChild *) ||
1017        s->next_child_index == UINT_MAX) {
1018        error_setg(errp, "Too many children");
1019        return;
1020    }
1021
1022    ret = snprintf(indexstr, 32, "children.%u", s->next_child_index);
1023    if (ret < 0 || ret >= 32) {
1024        error_setg(errp, "cannot generate child name");
1025        return;
1026    }
1027    s->next_child_index++;
1028
1029    bdrv_drained_begin(bs);
1030
1031    /* We can safely add the child now */
1032    bdrv_ref(child_bs);
1033
1034    child = bdrv_attach_child(bs, child_bs, indexstr, &child_format, errp);
1035    if (child == NULL) {
1036        s->next_child_index--;
1037        bdrv_unref(child_bs);
1038        goto out;
1039    }
1040    s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
1041    s->children[s->num_children++] = child;
1042
1043out:
1044    bdrv_drained_end(bs);
1045}
1046
1047static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
1048                             Error **errp)
1049{
1050    BDRVQuorumState *s = bs->opaque;
1051    int i;
1052
1053    for (i = 0; i < s->num_children; i++) {
1054        if (s->children[i] == child) {
1055            break;
1056        }
1057    }
1058
1059    /* we have checked it in bdrv_del_child() */
1060    assert(i < s->num_children);
1061
1062    if (s->num_children <= s->threshold) {
1063        error_setg(errp,
1064            "The number of children cannot be lower than the vote threshold %d",
1065            s->threshold);
1066        return;
1067    }
1068
1069    bdrv_drained_begin(bs);
1070
1071    /* We can safely remove this child now */
1072    memmove(&s->children[i], &s->children[i + 1],
1073            (s->num_children - i - 1) * sizeof(BdrvChild *));
1074    s->children = g_renew(BdrvChild *, s->children, --s->num_children);
1075    bdrv_unref_child(bs, child);
1076
1077    bdrv_drained_end(bs);
1078}
1079
1080static void quorum_refresh_filename(BlockDriverState *bs, QDict *options)
1081{
1082    BDRVQuorumState *s = bs->opaque;
1083    QDict *opts;
1084    QList *children;
1085    int i;
1086
1087    for (i = 0; i < s->num_children; i++) {
1088        bdrv_refresh_filename(s->children[i]->bs);
1089        if (!s->children[i]->bs->full_open_options) {
1090            return;
1091        }
1092    }
1093
1094    children = qlist_new();
1095    for (i = 0; i < s->num_children; i++) {
1096        QINCREF(s->children[i]->bs->full_open_options);
1097        qlist_append(children, s->children[i]->bs->full_open_options);
1098    }
1099
1100    opts = qdict_new();
1101    qdict_put_str(opts, "driver", "quorum");
1102    qdict_put_int(opts, QUORUM_OPT_VOTE_THRESHOLD, s->threshold);
1103    qdict_put_bool(opts, QUORUM_OPT_BLKVERIFY, s->is_blkverify);
1104    qdict_put_bool(opts, QUORUM_OPT_REWRITE, s->rewrite_corrupted);
1105    qdict_put(opts, "children", children);
1106
1107    bs->full_open_options = opts;
1108}
1109
1110static BlockDriver bdrv_quorum = {
1111    .format_name                        = "quorum",
1112    .protocol_name                      = "quorum",
1113
1114    .instance_size                      = sizeof(BDRVQuorumState),
1115
1116    .bdrv_file_open                     = quorum_open,
1117    .bdrv_close                         = quorum_close,
1118    .bdrv_refresh_filename              = quorum_refresh_filename,
1119
1120    .bdrv_co_flush_to_disk              = quorum_co_flush,
1121
1122    .bdrv_getlength                     = quorum_getlength,
1123
1124    .bdrv_co_preadv                     = quorum_co_preadv,
1125    .bdrv_co_pwritev                    = quorum_co_pwritev,
1126
1127    .bdrv_add_child                     = quorum_add_child,
1128    .bdrv_del_child                     = quorum_del_child,
1129
1130    .bdrv_child_perm                    = bdrv_filter_default_perms,
1131
1132    .is_filter                          = true,
1133    .bdrv_recurse_is_first_non_filter   = quorum_recurse_is_first_non_filter,
1134};
1135
1136static void bdrv_quorum_init(void)
1137{
1138    if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) {
1139        /* SHA256 hash support is required for quorum device */
1140        return;
1141    }
1142    bdrv_register(&bdrv_quorum);
1143}
1144
1145block_init(bdrv_quorum_init);
1146