qemu/block/quorum.c
<<
>>
Prefs
   1/*
   2 * Quorum Block filter
   3 *
   4 * Copyright (C) 2012-2014 Nodalink, EURL.
   5 *
   6 * Author:
   7 *   BenoƮt Canet <benoit.canet@irqsave.net>
   8 *
   9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
  10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
  11 *
  12 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  13 * See the COPYING file in the top-level directory.
  14 */
  15
  16#include <gnutls/gnutls.h>
  17#include <gnutls/crypto.h>
  18#include "block/block_int.h"
  19#include "qapi/qmp/qbool.h"
  20#include "qapi/qmp/qdict.h"
  21#include "qapi/qmp/qint.h"
  22#include "qapi/qmp/qjson.h"
  23#include "qapi/qmp/qlist.h"
  24#include "qapi/qmp/qstring.h"
  25#include "qapi-event.h"
  26
  27#define HASH_LENGTH 32
  28
  29#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
  30#define QUORUM_OPT_BLKVERIFY      "blkverify"
  31#define QUORUM_OPT_REWRITE        "rewrite-corrupted"
  32#define QUORUM_OPT_READ_PATTERN   "read-pattern"
  33
  34/* This union holds a vote hash value */
  35typedef union QuorumVoteValue {
  36    char h[HASH_LENGTH];       /* SHA-256 hash */
  37    int64_t l;                 /* simpler 64 bits hash */
  38} QuorumVoteValue;
  39
  40/* A vote item */
  41typedef struct QuorumVoteItem {
  42    int index;
  43    QLIST_ENTRY(QuorumVoteItem) next;
  44} QuorumVoteItem;
  45
  46/* this structure is a vote version. A version is the set of votes sharing the
  47 * same vote value.
  48 * The set of votes will be tracked with the items field and its cardinality is
  49 * vote_count.
  50 */
  51typedef struct QuorumVoteVersion {
  52    QuorumVoteValue value;
  53    int index;
  54    int vote_count;
  55    QLIST_HEAD(, QuorumVoteItem) items;
  56    QLIST_ENTRY(QuorumVoteVersion) next;
  57} QuorumVoteVersion;
  58
  59/* this structure holds a group of vote versions together */
  60typedef struct QuorumVotes {
  61    QLIST_HEAD(, QuorumVoteVersion) vote_list;
  62    bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
  63} QuorumVotes;
  64
  65/* the following structure holds the state of one quorum instance */
  66typedef struct BDRVQuorumState {
  67    BlockDriverState **bs; /* children BlockDriverStates */
  68    int num_children;      /* children count */
  69    int threshold;         /* if less than threshold children reads gave the
  70                            * same result a quorum error occurs.
  71                            */
  72    bool is_blkverify;     /* true if the driver is in blkverify mode
  73                            * Writes are mirrored on two children devices.
  74                            * On reads the two children devices' contents are
  75                            * compared and if a difference is spotted its
  76                            * location is printed and the code aborts.
  77                            * It is useful to debug other block drivers by
  78                            * comparing them with a reference one.
  79                            */
  80    bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted
  81                            * block if Quorum is reached.
  82                            */
  83
  84    QuorumReadPattern read_pattern;
  85} BDRVQuorumState;
  86
  87typedef struct QuorumAIOCB QuorumAIOCB;
  88
  89/* Quorum will create one instance of the following structure per operation it
  90 * performs on its children.
  91 * So for each read/write operation coming from the upper layer there will be
  92 * $children_count QuorumChildRequest.
  93 */
  94typedef struct QuorumChildRequest {
  95    BlockAIOCB *aiocb;
  96    QEMUIOVector qiov;
  97    uint8_t *buf;
  98    int ret;
  99    QuorumAIOCB *parent;
 100} QuorumChildRequest;
 101
 102/* Quorum will use the following structure to track progress of each read/write
 103 * operation received by the upper layer.
 104 * This structure hold pointers to the QuorumChildRequest structures instances
 105 * used to do operations on each children and track overall progress.
 106 */
 107struct QuorumAIOCB {
 108    BlockAIOCB common;
 109
 110    /* Request metadata */
 111    uint64_t sector_num;
 112    int nb_sectors;
 113
 114    QEMUIOVector *qiov;         /* calling IOV */
 115
 116    QuorumChildRequest *qcrs;   /* individual child requests */
 117    int count;                  /* number of completed AIOCB */
 118    int success_count;          /* number of successfully completed AIOCB */
 119
 120    int rewrite_count;          /* number of replica to rewrite: count down to
 121                                 * zero once writes are fired
 122                                 */
 123
 124    QuorumVotes votes;
 125
 126    bool is_read;
 127    int vote_ret;
 128    int child_iter;             /* which child to read in fifo pattern */
 129};
 130
 131static bool quorum_vote(QuorumAIOCB *acb);
 132
 133static void quorum_aio_cancel(BlockAIOCB *blockacb)
 134{
 135    QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
 136    BDRVQuorumState *s = acb->common.bs->opaque;
 137    int i;
 138
 139    /* cancel all callbacks */
 140    for (i = 0; i < s->num_children; i++) {
 141        if (acb->qcrs[i].aiocb) {
 142            bdrv_aio_cancel_async(acb->qcrs[i].aiocb);
 143        }
 144    }
 145}
 146
 147static AIOCBInfo quorum_aiocb_info = {
 148    .aiocb_size         = sizeof(QuorumAIOCB),
 149    .cancel_async       = quorum_aio_cancel,
 150};
 151
 152static void quorum_aio_finalize(QuorumAIOCB *acb)
 153{
 154    int i, ret = 0;
 155
 156    if (acb->vote_ret) {
 157        ret = acb->vote_ret;
 158    }
 159
 160    acb->common.cb(acb->common.opaque, ret);
 161
 162    if (acb->is_read) {
 163        /* on the quorum case acb->child_iter == s->num_children - 1 */
 164        for (i = 0; i <= acb->child_iter; i++) {
 165            qemu_vfree(acb->qcrs[i].buf);
 166            qemu_iovec_destroy(&acb->qcrs[i].qiov);
 167        }
 168    }
 169
 170    g_free(acb->qcrs);
 171    qemu_aio_unref(acb);
 172}
 173
 174static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 175{
 176    return !memcmp(a->h, b->h, HASH_LENGTH);
 177}
 178
 179static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 180{
 181    return a->l == b->l;
 182}
 183
 184static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
 185                                   BlockDriverState *bs,
 186                                   QEMUIOVector *qiov,
 187                                   uint64_t sector_num,
 188                                   int nb_sectors,
 189                                   BlockCompletionFunc *cb,
 190                                   void *opaque)
 191{
 192    QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
 193    int i;
 194
 195    acb->common.bs->opaque = s;
 196    acb->sector_num = sector_num;
 197    acb->nb_sectors = nb_sectors;
 198    acb->qiov = qiov;
 199    acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
 200    acb->count = 0;
 201    acb->success_count = 0;
 202    acb->rewrite_count = 0;
 203    acb->votes.compare = quorum_sha256_compare;
 204    QLIST_INIT(&acb->votes.vote_list);
 205    acb->is_read = false;
 206    acb->vote_ret = 0;
 207
 208    for (i = 0; i < s->num_children; i++) {
 209        acb->qcrs[i].buf = NULL;
 210        acb->qcrs[i].ret = 0;
 211        acb->qcrs[i].parent = acb;
 212    }
 213
 214    return acb;
 215}
 216
 217static void quorum_report_bad(QuorumAIOCB *acb, char *node_name, int ret)
 218{
 219    const char *msg = NULL;
 220    if (ret < 0) {
 221        msg = strerror(-ret);
 222    }
 223    qapi_event_send_quorum_report_bad(!!msg, msg, node_name,
 224                                      acb->sector_num, acb->nb_sectors, &error_abort);
 225}
 226
 227static void quorum_report_failure(QuorumAIOCB *acb)
 228{
 229    const char *reference = bdrv_get_device_name(acb->common.bs)[0] ?
 230                            bdrv_get_device_name(acb->common.bs) :
 231                            acb->common.bs->node_name;
 232
 233    qapi_event_send_quorum_failure(reference, acb->sector_num,
 234                                   acb->nb_sectors, &error_abort);
 235}
 236
 237static int quorum_vote_error(QuorumAIOCB *acb);
 238
 239static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
 240{
 241    BDRVQuorumState *s = acb->common.bs->opaque;
 242
 243    if (acb->success_count < s->threshold) {
 244        acb->vote_ret = quorum_vote_error(acb);
 245        quorum_report_failure(acb);
 246        return true;
 247    }
 248
 249    return false;
 250}
 251
 252static void quorum_rewrite_aio_cb(void *opaque, int ret)
 253{
 254    QuorumAIOCB *acb = opaque;
 255
 256    /* one less rewrite to do */
 257    acb->rewrite_count--;
 258
 259    /* wait until all rewrite callbacks have completed */
 260    if (acb->rewrite_count) {
 261        return;
 262    }
 263
 264    quorum_aio_finalize(acb);
 265}
 266
 267static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb);
 268
 269static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
 270{
 271    int i;
 272    assert(dest->niov == source->niov);
 273    assert(dest->size == source->size);
 274    for (i = 0; i < source->niov; i++) {
 275        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
 276        memcpy(dest->iov[i].iov_base,
 277               source->iov[i].iov_base,
 278               source->iov[i].iov_len);
 279    }
 280}
 281
 282static void quorum_aio_cb(void *opaque, int ret)
 283{
 284    QuorumChildRequest *sacb = opaque;
 285    QuorumAIOCB *acb = sacb->parent;
 286    BDRVQuorumState *s = acb->common.bs->opaque;
 287    bool rewrite = false;
 288
 289    if (acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO) {
 290        /* We try to read next child in FIFO order if we fail to read */
 291        if (ret < 0 && ++acb->child_iter < s->num_children) {
 292            read_fifo_child(acb);
 293            return;
 294        }
 295
 296        if (ret == 0) {
 297            quorum_copy_qiov(acb->qiov, &acb->qcrs[acb->child_iter].qiov);
 298        }
 299        acb->vote_ret = ret;
 300        quorum_aio_finalize(acb);
 301        return;
 302    }
 303
 304    sacb->ret = ret;
 305    acb->count++;
 306    if (ret == 0) {
 307        acb->success_count++;
 308    } else {
 309        quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
 310    }
 311    assert(acb->count <= s->num_children);
 312    assert(acb->success_count <= s->num_children);
 313    if (acb->count < s->num_children) {
 314        return;
 315    }
 316
 317    /* Do the vote on read */
 318    if (acb->is_read) {
 319        rewrite = quorum_vote(acb);
 320    } else {
 321        quorum_has_too_much_io_failed(acb);
 322    }
 323
 324    /* if no rewrite is done the code will finish right away */
 325    if (!rewrite) {
 326        quorum_aio_finalize(acb);
 327    }
 328}
 329
 330static void quorum_report_bad_versions(BDRVQuorumState *s,
 331                                       QuorumAIOCB *acb,
 332                                       QuorumVoteValue *value)
 333{
 334    QuorumVoteVersion *version;
 335    QuorumVoteItem *item;
 336
 337    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 338        if (acb->votes.compare(&version->value, value)) {
 339            continue;
 340        }
 341        QLIST_FOREACH(item, &version->items, next) {
 342            quorum_report_bad(acb, s->bs[item->index]->node_name, 0);
 343        }
 344    }
 345}
 346
 347static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb,
 348                                        QuorumVoteValue *value)
 349{
 350    QuorumVoteVersion *version;
 351    QuorumVoteItem *item;
 352    int count = 0;
 353
 354    /* first count the number of bad versions: done first to avoid concurrency
 355     * issues.
 356     */
 357    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 358        if (acb->votes.compare(&version->value, value)) {
 359            continue;
 360        }
 361        QLIST_FOREACH(item, &version->items, next) {
 362            count++;
 363        }
 364    }
 365
 366    /* quorum_rewrite_aio_cb will count down this to zero */
 367    acb->rewrite_count = count;
 368
 369    /* now fire the correcting rewrites */
 370    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 371        if (acb->votes.compare(&version->value, value)) {
 372            continue;
 373        }
 374        QLIST_FOREACH(item, &version->items, next) {
 375            bdrv_aio_writev(s->bs[item->index], acb->sector_num, acb->qiov,
 376                            acb->nb_sectors, quorum_rewrite_aio_cb, acb);
 377        }
 378    }
 379
 380    /* return true if any rewrite is done else false */
 381    return count;
 382}
 383
 384static void quorum_count_vote(QuorumVotes *votes,
 385                              QuorumVoteValue *value,
 386                              int index)
 387{
 388    QuorumVoteVersion *v = NULL, *version = NULL;
 389    QuorumVoteItem *item;
 390
 391    /* look if we have something with this hash */
 392    QLIST_FOREACH(v, &votes->vote_list, next) {
 393        if (votes->compare(&v->value, value)) {
 394            version = v;
 395            break;
 396        }
 397    }
 398
 399    /* It's a version not yet in the list add it */
 400    if (!version) {
 401        version = g_new0(QuorumVoteVersion, 1);
 402        QLIST_INIT(&version->items);
 403        memcpy(&version->value, value, sizeof(version->value));
 404        version->index = index;
 405        version->vote_count = 0;
 406        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
 407    }
 408
 409    version->vote_count++;
 410
 411    item = g_new0(QuorumVoteItem, 1);
 412    item->index = index;
 413    QLIST_INSERT_HEAD(&version->items, item, next);
 414}
 415
 416static void quorum_free_vote_list(QuorumVotes *votes)
 417{
 418    QuorumVoteVersion *version, *next_version;
 419    QuorumVoteItem *item, *next_item;
 420
 421    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
 422        QLIST_REMOVE(version, next);
 423        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
 424            QLIST_REMOVE(item, next);
 425            g_free(item);
 426        }
 427        g_free(version);
 428    }
 429}
 430
 431static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
 432{
 433    int j, ret;
 434    gnutls_hash_hd_t dig;
 435    QEMUIOVector *qiov = &acb->qcrs[i].qiov;
 436
 437    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
 438
 439    if (ret < 0) {
 440        return ret;
 441    }
 442
 443    for (j = 0; j < qiov->niov; j++) {
 444        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
 445        if (ret < 0) {
 446            break;
 447        }
 448    }
 449
 450    gnutls_hash_deinit(dig, (void *) hash);
 451    return ret;
 452}
 453
 454static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
 455{
 456    int max = 0;
 457    QuorumVoteVersion *candidate, *winner = NULL;
 458
 459    QLIST_FOREACH(candidate, &votes->vote_list, next) {
 460        if (candidate->vote_count > max) {
 461            max = candidate->vote_count;
 462            winner = candidate;
 463        }
 464    }
 465
 466    return winner;
 467}
 468
 469/* qemu_iovec_compare is handy for blkverify mode because it returns the first
 470 * differing byte location. Yet it is handcoded to compare vectors one byte
 471 * after another so it does not benefit from the libc SIMD optimizations.
 472 * quorum_iovec_compare is written for speed and should be used in the non
 473 * blkverify mode of quorum.
 474 */
 475static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
 476{
 477    int i;
 478    int result;
 479
 480    assert(a->niov == b->niov);
 481    for (i = 0; i < a->niov; i++) {
 482        assert(a->iov[i].iov_len == b->iov[i].iov_len);
 483        result = memcmp(a->iov[i].iov_base,
 484                        b->iov[i].iov_base,
 485                        a->iov[i].iov_len);
 486        if (result) {
 487            return false;
 488        }
 489    }
 490
 491    return true;
 492}
 493
 494static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
 495                                          const char *fmt, ...)
 496{
 497    va_list ap;
 498
 499    va_start(ap, fmt);
 500    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
 501            acb->sector_num, acb->nb_sectors);
 502    vfprintf(stderr, fmt, ap);
 503    fprintf(stderr, "\n");
 504    va_end(ap);
 505    exit(1);
 506}
 507
 508static bool quorum_compare(QuorumAIOCB *acb,
 509                           QEMUIOVector *a,
 510                           QEMUIOVector *b)
 511{
 512    BDRVQuorumState *s = acb->common.bs->opaque;
 513    ssize_t offset;
 514
 515    /* This driver will replace blkverify in this particular case */
 516    if (s->is_blkverify) {
 517        offset = qemu_iovec_compare(a, b);
 518        if (offset != -1) {
 519            quorum_err(acb, "contents mismatch in sector %" PRId64,
 520                       acb->sector_num +
 521                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
 522        }
 523        return true;
 524    }
 525
 526    return quorum_iovec_compare(a, b);
 527}
 528
 529/* Do a vote to get the error code */
 530static int quorum_vote_error(QuorumAIOCB *acb)
 531{
 532    BDRVQuorumState *s = acb->common.bs->opaque;
 533    QuorumVoteVersion *winner = NULL;
 534    QuorumVotes error_votes;
 535    QuorumVoteValue result_value;
 536    int i, ret = 0;
 537    bool error = false;
 538
 539    QLIST_INIT(&error_votes.vote_list);
 540    error_votes.compare = quorum_64bits_compare;
 541
 542    for (i = 0; i < s->num_children; i++) {
 543        ret = acb->qcrs[i].ret;
 544        if (ret) {
 545            error = true;
 546            result_value.l = ret;
 547            quorum_count_vote(&error_votes, &result_value, i);
 548        }
 549    }
 550
 551    if (error) {
 552        winner = quorum_get_vote_winner(&error_votes);
 553        ret = winner->value.l;
 554    }
 555
 556    quorum_free_vote_list(&error_votes);
 557
 558    return ret;
 559}
 560
 561static bool quorum_vote(QuorumAIOCB *acb)
 562{
 563    bool quorum = true;
 564    bool rewrite = false;
 565    int i, j, ret;
 566    QuorumVoteValue hash;
 567    BDRVQuorumState *s = acb->common.bs->opaque;
 568    QuorumVoteVersion *winner;
 569
 570    if (quorum_has_too_much_io_failed(acb)) {
 571        return false;
 572    }
 573
 574    /* get the index of the first successful read */
 575    for (i = 0; i < s->num_children; i++) {
 576        if (!acb->qcrs[i].ret) {
 577            break;
 578        }
 579    }
 580
 581    assert(i < s->num_children);
 582
 583    /* compare this read with all other successful reads stopping at quorum
 584     * failure
 585     */
 586    for (j = i + 1; j < s->num_children; j++) {
 587        if (acb->qcrs[j].ret) {
 588            continue;
 589        }
 590        quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
 591        if (!quorum) {
 592            break;
 593       }
 594    }
 595
 596    /* Every successful read agrees */
 597    if (quorum) {
 598        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
 599        return false;
 600    }
 601
 602    /* compute hashes for each successful read, also store indexes */
 603    for (i = 0; i < s->num_children; i++) {
 604        if (acb->qcrs[i].ret) {
 605            continue;
 606        }
 607        ret = quorum_compute_hash(acb, i, &hash);
 608        /* if ever the hash computation failed */
 609        if (ret < 0) {
 610            acb->vote_ret = ret;
 611            goto free_exit;
 612        }
 613        quorum_count_vote(&acb->votes, &hash, i);
 614    }
 615
 616    /* vote to select the most represented version */
 617    winner = quorum_get_vote_winner(&acb->votes);
 618
 619    /* if the winner count is smaller than threshold the read fails */
 620    if (winner->vote_count < s->threshold) {
 621        quorum_report_failure(acb);
 622        acb->vote_ret = -EIO;
 623        goto free_exit;
 624    }
 625
 626    /* we have a winner: copy it */
 627    quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
 628
 629    /* some versions are bad print them */
 630    quorum_report_bad_versions(s, acb, &winner->value);
 631
 632    /* corruption correction is enabled */
 633    if (s->rewrite_corrupted) {
 634        rewrite = quorum_rewrite_bad_versions(s, acb, &winner->value);
 635    }
 636
 637free_exit:
 638    /* free lists */
 639    quorum_free_vote_list(&acb->votes);
 640    return rewrite;
 641}
 642
 643static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
 644{
 645    BDRVQuorumState *s = acb->common.bs->opaque;
 646    int i;
 647
 648    for (i = 0; i < s->num_children; i++) {
 649        acb->qcrs[i].buf = qemu_blockalign(s->bs[i], acb->qiov->size);
 650        qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov);
 651        qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf);
 652    }
 653
 654    for (i = 0; i < s->num_children; i++) {
 655        bdrv_aio_readv(s->bs[i], acb->sector_num, &acb->qcrs[i].qiov,
 656                       acb->nb_sectors, quorum_aio_cb, &acb->qcrs[i]);
 657    }
 658
 659    return &acb->common;
 660}
 661
 662static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb)
 663{
 664    BDRVQuorumState *s = acb->common.bs->opaque;
 665
 666    acb->qcrs[acb->child_iter].buf = qemu_blockalign(s->bs[acb->child_iter],
 667                                                     acb->qiov->size);
 668    qemu_iovec_init(&acb->qcrs[acb->child_iter].qiov, acb->qiov->niov);
 669    qemu_iovec_clone(&acb->qcrs[acb->child_iter].qiov, acb->qiov,
 670                     acb->qcrs[acb->child_iter].buf);
 671    bdrv_aio_readv(s->bs[acb->child_iter], acb->sector_num,
 672                   &acb->qcrs[acb->child_iter].qiov, acb->nb_sectors,
 673                   quorum_aio_cb, &acb->qcrs[acb->child_iter]);
 674
 675    return &acb->common;
 676}
 677
 678static BlockAIOCB *quorum_aio_readv(BlockDriverState *bs,
 679                                    int64_t sector_num,
 680                                    QEMUIOVector *qiov,
 681                                    int nb_sectors,
 682                                    BlockCompletionFunc *cb,
 683                                    void *opaque)
 684{
 685    BDRVQuorumState *s = bs->opaque;
 686    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
 687                                      nb_sectors, cb, opaque);
 688    acb->is_read = true;
 689
 690    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
 691        acb->child_iter = s->num_children - 1;
 692        return read_quorum_children(acb);
 693    }
 694
 695    acb->child_iter = 0;
 696    return read_fifo_child(acb);
 697}
 698
 699static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs,
 700                                     int64_t sector_num,
 701                                     QEMUIOVector *qiov,
 702                                     int nb_sectors,
 703                                     BlockCompletionFunc *cb,
 704                                     void *opaque)
 705{
 706    BDRVQuorumState *s = bs->opaque;
 707    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors,
 708                                      cb, opaque);
 709    int i;
 710
 711    for (i = 0; i < s->num_children; i++) {
 712        acb->qcrs[i].aiocb = bdrv_aio_writev(s->bs[i], sector_num, qiov,
 713                                             nb_sectors, &quorum_aio_cb,
 714                                             &acb->qcrs[i]);
 715    }
 716
 717    return &acb->common;
 718}
 719
 720static int64_t quorum_getlength(BlockDriverState *bs)
 721{
 722    BDRVQuorumState *s = bs->opaque;
 723    int64_t result;
 724    int i;
 725
 726    /* check that all file have the same length */
 727    result = bdrv_getlength(s->bs[0]);
 728    if (result < 0) {
 729        return result;
 730    }
 731    for (i = 1; i < s->num_children; i++) {
 732        int64_t value = bdrv_getlength(s->bs[i]);
 733        if (value < 0) {
 734            return value;
 735        }
 736        if (value != result) {
 737            return -EIO;
 738        }
 739    }
 740
 741    return result;
 742}
 743
 744static void quorum_invalidate_cache(BlockDriverState *bs, Error **errp)
 745{
 746    BDRVQuorumState *s = bs->opaque;
 747    Error *local_err = NULL;
 748    int i;
 749
 750    for (i = 0; i < s->num_children; i++) {
 751        bdrv_invalidate_cache(s->bs[i], &local_err);
 752        if (local_err) {
 753            error_propagate(errp, local_err);
 754            return;
 755        }
 756    }
 757}
 758
 759static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
 760{
 761    BDRVQuorumState *s = bs->opaque;
 762    QuorumVoteVersion *winner = NULL;
 763    QuorumVotes error_votes;
 764    QuorumVoteValue result_value;
 765    int i;
 766    int result = 0;
 767
 768    QLIST_INIT(&error_votes.vote_list);
 769    error_votes.compare = quorum_64bits_compare;
 770
 771    for (i = 0; i < s->num_children; i++) {
 772        result = bdrv_co_flush(s->bs[i]);
 773        result_value.l = result;
 774        quorum_count_vote(&error_votes, &result_value, i);
 775    }
 776
 777    winner = quorum_get_vote_winner(&error_votes);
 778    result = winner->value.l;
 779
 780    quorum_free_vote_list(&error_votes);
 781
 782    return result;
 783}
 784
 785static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
 786                                               BlockDriverState *candidate)
 787{
 788    BDRVQuorumState *s = bs->opaque;
 789    int i;
 790
 791    for (i = 0; i < s->num_children; i++) {
 792        bool perm = bdrv_recurse_is_first_non_filter(s->bs[i],
 793                                                     candidate);
 794        if (perm) {
 795            return true;
 796        }
 797    }
 798
 799    return false;
 800}
 801
 802static int quorum_valid_threshold(int threshold, int num_children, Error **errp)
 803{
 804
 805    if (threshold < 1) {
 806        error_set(errp, QERR_INVALID_PARAMETER_VALUE,
 807                  "vote-threshold", "value >= 1");
 808        return -ERANGE;
 809    }
 810
 811    if (threshold > num_children) {
 812        error_setg(errp, "threshold may not exceed children count");
 813        return -ERANGE;
 814    }
 815
 816    return 0;
 817}
 818
 819static QemuOptsList quorum_runtime_opts = {
 820    .name = "quorum",
 821    .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
 822    .desc = {
 823        {
 824            .name = QUORUM_OPT_VOTE_THRESHOLD,
 825            .type = QEMU_OPT_NUMBER,
 826            .help = "The number of vote needed for reaching quorum",
 827        },
 828        {
 829            .name = QUORUM_OPT_BLKVERIFY,
 830            .type = QEMU_OPT_BOOL,
 831            .help = "Trigger block verify mode if set",
 832        },
 833        {
 834            .name = QUORUM_OPT_REWRITE,
 835            .type = QEMU_OPT_BOOL,
 836            .help = "Rewrite corrupted block on read quorum",
 837        },
 838        {
 839            .name = QUORUM_OPT_READ_PATTERN,
 840            .type = QEMU_OPT_STRING,
 841            .help = "Allowed pattern: quorum, fifo. Quorum is default",
 842        },
 843        { /* end of list */ }
 844    },
 845};
 846
 847static int parse_read_pattern(const char *opt)
 848{
 849    int i;
 850
 851    if (!opt) {
 852        /* Set quorum as default */
 853        return QUORUM_READ_PATTERN_QUORUM;
 854    }
 855
 856    for (i = 0; i < QUORUM_READ_PATTERN_MAX; i++) {
 857        if (!strcmp(opt, QuorumReadPattern_lookup[i])) {
 858            return i;
 859        }
 860    }
 861
 862    return -EINVAL;
 863}
 864
 865static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
 866                       Error **errp)
 867{
 868    BDRVQuorumState *s = bs->opaque;
 869    Error *local_err = NULL;
 870    QemuOpts *opts = NULL;
 871    bool *opened;
 872    QDict *sub = NULL;
 873    QList *list = NULL;
 874    const QListEntry *lentry;
 875    int i;
 876    int ret = 0;
 877
 878    qdict_flatten(options);
 879    qdict_extract_subqdict(options, &sub, "children.");
 880    qdict_array_split(sub, &list);
 881
 882    if (qdict_size(sub)) {
 883        error_setg(&local_err, "Invalid option children.%s",
 884                   qdict_first(sub)->key);
 885        ret = -EINVAL;
 886        goto exit;
 887    }
 888
 889    /* count how many different children are present */
 890    s->num_children = qlist_size(list);
 891    if (s->num_children < 2) {
 892        error_setg(&local_err,
 893                   "Number of provided children must be greater than 1");
 894        ret = -EINVAL;
 895        goto exit;
 896    }
 897
 898    opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
 899    qemu_opts_absorb_qdict(opts, options, &local_err);
 900    if (local_err) {
 901        ret = -EINVAL;
 902        goto exit;
 903    }
 904
 905    s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
 906    ret = parse_read_pattern(qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN));
 907    if (ret < 0) {
 908        error_setg(&local_err, "Please set read-pattern as fifo or quorum");
 909        goto exit;
 910    }
 911    s->read_pattern = ret;
 912
 913    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
 914        /* and validate it against s->num_children */
 915        ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err);
 916        if (ret < 0) {
 917            goto exit;
 918        }
 919
 920        /* is the driver in blkverify mode */
 921        if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) &&
 922            s->num_children == 2 && s->threshold == 2) {
 923            s->is_blkverify = true;
 924        } else if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false)) {
 925            fprintf(stderr, "blkverify mode is set by setting blkverify=on "
 926                    "and using two files with vote_threshold=2\n");
 927        }
 928
 929        s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
 930                                                 false);
 931        if (s->rewrite_corrupted && s->is_blkverify) {
 932            error_setg(&local_err,
 933                       "rewrite-corrupted=on cannot be used with blkverify=on");
 934            ret = -EINVAL;
 935            goto exit;
 936        }
 937    }
 938
 939    /* allocate the children BlockDriverState array */
 940    s->bs = g_new0(BlockDriverState *, s->num_children);
 941    opened = g_new0(bool, s->num_children);
 942
 943    for (i = 0, lentry = qlist_first(list); lentry;
 944         lentry = qlist_next(lentry), i++) {
 945        QDict *d;
 946        QString *string;
 947
 948        switch (qobject_type(lentry->value))
 949        {
 950            /* List of options */
 951            case QTYPE_QDICT:
 952                d = qobject_to_qdict(lentry->value);
 953                QINCREF(d);
 954                ret = bdrv_open(&s->bs[i], NULL, NULL, d, flags, NULL,
 955                                &local_err);
 956                break;
 957
 958            /* QMP reference */
 959            case QTYPE_QSTRING:
 960                string = qobject_to_qstring(lentry->value);
 961                ret = bdrv_open(&s->bs[i], NULL, qstring_get_str(string), NULL,
 962                                flags, NULL, &local_err);
 963                break;
 964
 965            default:
 966                error_setg(&local_err, "Specification of child block device %i "
 967                           "is invalid", i);
 968                ret = -EINVAL;
 969        }
 970
 971        if (ret < 0) {
 972            goto close_exit;
 973        }
 974        opened[i] = true;
 975    }
 976
 977    g_free(opened);
 978    goto exit;
 979
 980close_exit:
 981    /* cleanup on error */
 982    for (i = 0; i < s->num_children; i++) {
 983        if (!opened[i]) {
 984            continue;
 985        }
 986        bdrv_unref(s->bs[i]);
 987    }
 988    g_free(s->bs);
 989    g_free(opened);
 990exit:
 991    qemu_opts_del(opts);
 992    /* propagate error */
 993    if (local_err) {
 994        error_propagate(errp, local_err);
 995    }
 996    QDECREF(list);
 997    QDECREF(sub);
 998    return ret;
 999}
1000
1001static void quorum_close(BlockDriverState *bs)
1002{
1003    BDRVQuorumState *s = bs->opaque;
1004    int i;
1005
1006    for (i = 0; i < s->num_children; i++) {
1007        bdrv_unref(s->bs[i]);
1008    }
1009
1010    g_free(s->bs);
1011}
1012
1013static void quorum_detach_aio_context(BlockDriverState *bs)
1014{
1015    BDRVQuorumState *s = bs->opaque;
1016    int i;
1017
1018    for (i = 0; i < s->num_children; i++) {
1019        bdrv_detach_aio_context(s->bs[i]);
1020    }
1021}
1022
1023static void quorum_attach_aio_context(BlockDriverState *bs,
1024                                      AioContext *new_context)
1025{
1026    BDRVQuorumState *s = bs->opaque;
1027    int i;
1028
1029    for (i = 0; i < s->num_children; i++) {
1030        bdrv_attach_aio_context(s->bs[i], new_context);
1031    }
1032}
1033
1034static void quorum_refresh_filename(BlockDriverState *bs)
1035{
1036    BDRVQuorumState *s = bs->opaque;
1037    QDict *opts;
1038    QList *children;
1039    int i;
1040
1041    for (i = 0; i < s->num_children; i++) {
1042        bdrv_refresh_filename(s->bs[i]);
1043        if (!s->bs[i]->full_open_options) {
1044            return;
1045        }
1046    }
1047
1048    children = qlist_new();
1049    for (i = 0; i < s->num_children; i++) {
1050        QINCREF(s->bs[i]->full_open_options);
1051        qlist_append_obj(children, QOBJECT(s->bs[i]->full_open_options));
1052    }
1053
1054    opts = qdict_new();
1055    qdict_put_obj(opts, "driver", QOBJECT(qstring_from_str("quorum")));
1056    qdict_put_obj(opts, QUORUM_OPT_VOTE_THRESHOLD,
1057                  QOBJECT(qint_from_int(s->threshold)));
1058    qdict_put_obj(opts, QUORUM_OPT_BLKVERIFY,
1059                  QOBJECT(qbool_from_int(s->is_blkverify)));
1060    qdict_put_obj(opts, QUORUM_OPT_REWRITE,
1061                  QOBJECT(qbool_from_int(s->rewrite_corrupted)));
1062    qdict_put_obj(opts, "children", QOBJECT(children));
1063
1064    bs->full_open_options = opts;
1065}
1066
1067static BlockDriver bdrv_quorum = {
1068    .format_name                        = "quorum",
1069    .protocol_name                      = "quorum",
1070
1071    .instance_size                      = sizeof(BDRVQuorumState),
1072
1073    .bdrv_file_open                     = quorum_open,
1074    .bdrv_close                         = quorum_close,
1075    .bdrv_refresh_filename              = quorum_refresh_filename,
1076
1077    .bdrv_co_flush_to_disk              = quorum_co_flush,
1078
1079    .bdrv_getlength                     = quorum_getlength,
1080
1081    .bdrv_aio_readv                     = quorum_aio_readv,
1082    .bdrv_aio_writev                    = quorum_aio_writev,
1083    .bdrv_invalidate_cache              = quorum_invalidate_cache,
1084
1085    .bdrv_detach_aio_context            = quorum_detach_aio_context,
1086    .bdrv_attach_aio_context            = quorum_attach_aio_context,
1087
1088    .is_filter                          = true,
1089    .bdrv_recurse_is_first_non_filter   = quorum_recurse_is_first_non_filter,
1090};
1091
1092static void bdrv_quorum_init(void)
1093{
1094    bdrv_register(&bdrv_quorum);
1095}
1096
1097block_init(bdrv_quorum_init);
1098