qemu/block/quorum.c
<<
>>
Prefs
   1/*
   2 * Quorum Block filter
   3 *
   4 * Copyright (C) 2012-2014 Nodalink, EURL.
   5 *
   6 * Author:
   7 *   BenoƮt Canet <benoit.canet@irqsave.net>
   8 *
   9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
  10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
  11 *
  12 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  13 * See the COPYING file in the top-level directory.
  14 */
  15
  16#include "qemu/osdep.h"
  17#include "qemu/cutils.h"
  18#include "qemu/module.h"
  19#include "qemu/option.h"
  20#include "qemu/memalign.h"
  21#include "block/block_int.h"
  22#include "block/coroutines.h"
  23#include "block/qdict.h"
  24#include "qapi/error.h"
  25#include "qapi/qapi-events-block.h"
  26#include "qapi/qmp/qdict.h"
  27#include "qapi/qmp/qerror.h"
  28#include "qapi/qmp/qlist.h"
  29#include "qapi/qmp/qstring.h"
  30#include "crypto/hash.h"
  31
  32#define HASH_LENGTH 32
  33
  34#define INDEXSTR_LEN 32
  35
  36#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
  37#define QUORUM_OPT_BLKVERIFY      "blkverify"
  38#define QUORUM_OPT_REWRITE        "rewrite-corrupted"
  39#define QUORUM_OPT_READ_PATTERN   "read-pattern"
  40
  41/* This union holds a vote hash value */
  42typedef union QuorumVoteValue {
  43    uint8_t h[HASH_LENGTH];    /* SHA-256 hash */
  44    int64_t l;                 /* simpler 64 bits hash */
  45} QuorumVoteValue;
  46
  47/* A vote item */
  48typedef struct QuorumVoteItem {
  49    int index;
  50    QLIST_ENTRY(QuorumVoteItem) next;
  51} QuorumVoteItem;
  52
  53/* this structure is a vote version. A version is the set of votes sharing the
  54 * same vote value.
  55 * The set of votes will be tracked with the items field and its cardinality is
  56 * vote_count.
  57 */
  58typedef struct QuorumVoteVersion {
  59    QuorumVoteValue value;
  60    int index;
  61    int vote_count;
  62    QLIST_HEAD(, QuorumVoteItem) items;
  63    QLIST_ENTRY(QuorumVoteVersion) next;
  64} QuorumVoteVersion;
  65
  66/* this structure holds a group of vote versions together */
  67typedef struct QuorumVotes {
  68    QLIST_HEAD(, QuorumVoteVersion) vote_list;
  69    bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
  70} QuorumVotes;
  71
  72/* the following structure holds the state of one quorum instance */
  73typedef struct BDRVQuorumState {
  74    BdrvChild **children;  /* children BlockDriverStates */
  75    int num_children;      /* children count */
  76    unsigned next_child_index;  /* the index of the next child that should
  77                                 * be added
  78                                 */
  79    int threshold;         /* if less than threshold children reads gave the
  80                            * same result a quorum error occurs.
  81                            */
  82    bool is_blkverify;     /* true if the driver is in blkverify mode
  83                            * Writes are mirrored on two children devices.
  84                            * On reads the two children devices' contents are
  85                            * compared and if a difference is spotted its
  86                            * location is printed and the code aborts.
  87                            * It is useful to debug other block drivers by
  88                            * comparing them with a reference one.
  89                            */
  90    bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted
  91                            * block if Quorum is reached.
  92                            */
  93
  94    QuorumReadPattern read_pattern;
  95} BDRVQuorumState;
  96
  97typedef struct QuorumAIOCB QuorumAIOCB;
  98
  99/* Quorum will create one instance of the following structure per operation it
 100 * performs on its children.
 101 * So for each read/write operation coming from the upper layer there will be
 102 * $children_count QuorumChildRequest.
 103 */
 104typedef struct QuorumChildRequest {
 105    BlockDriverState *bs;
 106    QEMUIOVector qiov;
 107    uint8_t *buf;
 108    int ret;
 109    QuorumAIOCB *parent;
 110} QuorumChildRequest;
 111
 112/* Quorum will use the following structure to track progress of each read/write
 113 * operation received by the upper layer.
 114 * This structure hold pointers to the QuorumChildRequest structures instances
 115 * used to do operations on each children and track overall progress.
 116 */
 117struct QuorumAIOCB {
 118    BlockDriverState *bs;
 119    Coroutine *co;
 120
 121    /* Request metadata */
 122    uint64_t offset;
 123    uint64_t bytes;
 124    int flags;
 125
 126    QEMUIOVector *qiov;         /* calling IOV */
 127
 128    QuorumChildRequest *qcrs;   /* individual child requests */
 129    int count;                  /* number of completed AIOCB */
 130    int success_count;          /* number of successfully completed AIOCB */
 131
 132    int rewrite_count;          /* number of replica to rewrite: count down to
 133                                 * zero once writes are fired
 134                                 */
 135
 136    QuorumVotes votes;
 137
 138    bool is_read;
 139    int vote_ret;
 140    int children_read;          /* how many children have been read from */
 141};
 142
 143typedef struct QuorumCo {
 144    QuorumAIOCB *acb;
 145    int idx;
 146} QuorumCo;
 147
 148static void quorum_aio_finalize(QuorumAIOCB *acb)
 149{
 150    g_free(acb->qcrs);
 151    g_free(acb);
 152}
 153
 154static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 155{
 156    return !memcmp(a->h, b->h, HASH_LENGTH);
 157}
 158
 159static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 160{
 161    return a->l == b->l;
 162}
 163
 164static QuorumAIOCB *coroutine_fn quorum_aio_get(BlockDriverState *bs,
 165                                                QEMUIOVector *qiov,
 166                                                uint64_t offset, uint64_t bytes,
 167                                                int flags)
 168{
 169    BDRVQuorumState *s = bs->opaque;
 170    QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
 171    int i;
 172
 173    *acb = (QuorumAIOCB) {
 174        .co                 = qemu_coroutine_self(),
 175        .bs                 = bs,
 176        .offset             = offset,
 177        .bytes              = bytes,
 178        .flags              = flags,
 179        .qiov               = qiov,
 180        .votes.compare      = quorum_sha256_compare,
 181        .votes.vote_list    = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
 182    };
 183
 184    acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
 185    for (i = 0; i < s->num_children; i++) {
 186        acb->qcrs[i].buf = NULL;
 187        acb->qcrs[i].ret = 0;
 188        acb->qcrs[i].parent = acb;
 189    }
 190
 191    return acb;
 192}
 193
 194static void quorum_report_bad(QuorumOpType type, uint64_t offset,
 195                              uint64_t bytes, char *node_name, int ret)
 196{
 197    const char *msg = NULL;
 198    int64_t start_sector = offset / BDRV_SECTOR_SIZE;
 199    int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE);
 200
 201    if (ret < 0) {
 202        msg = strerror(-ret);
 203    }
 204
 205    qapi_event_send_quorum_report_bad(type, msg, node_name, start_sector,
 206                                      end_sector - start_sector);
 207}
 208
 209static void quorum_report_failure(QuorumAIOCB *acb)
 210{
 211    const char *reference = bdrv_get_device_or_node_name(acb->bs);
 212    int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE;
 213    int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes,
 214                                      BDRV_SECTOR_SIZE);
 215
 216    qapi_event_send_quorum_failure(reference, start_sector,
 217                                   end_sector - start_sector);
 218}
 219
 220static int quorum_vote_error(QuorumAIOCB *acb);
 221
 222static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
 223{
 224    BDRVQuorumState *s = acb->bs->opaque;
 225
 226    if (acb->success_count < s->threshold) {
 227        acb->vote_ret = quorum_vote_error(acb);
 228        quorum_report_failure(acb);
 229        return true;
 230    }
 231
 232    return false;
 233}
 234
 235static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
 236{
 237    int i;
 238    assert(dest->niov == source->niov);
 239    assert(dest->size == source->size);
 240    for (i = 0; i < source->niov; i++) {
 241        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
 242        memcpy(dest->iov[i].iov_base,
 243               source->iov[i].iov_base,
 244               source->iov[i].iov_len);
 245    }
 246}
 247
 248static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret)
 249{
 250    QuorumAIOCB *acb = sacb->parent;
 251    QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
 252    quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret);
 253}
 254
 255static void quorum_report_bad_versions(BDRVQuorumState *s,
 256                                       QuorumAIOCB *acb,
 257                                       QuorumVoteValue *value)
 258{
 259    QuorumVoteVersion *version;
 260    QuorumVoteItem *item;
 261
 262    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 263        if (acb->votes.compare(&version->value, value)) {
 264            continue;
 265        }
 266        QLIST_FOREACH(item, &version->items, next) {
 267            quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes,
 268                              s->children[item->index]->bs->node_name, 0);
 269        }
 270    }
 271}
 272
 273/*
 274 * This function can count as GRAPH_RDLOCK because read_quorum_children() holds
 275 * the graph lock and keeps it until this coroutine has terminated.
 276 */
 277static void coroutine_fn GRAPH_RDLOCK quorum_rewrite_entry(void *opaque)
 278{
 279    QuorumCo *co = opaque;
 280    QuorumAIOCB *acb = co->acb;
 281    BDRVQuorumState *s = acb->bs->opaque;
 282
 283    /* Ignore any errors, it's just a correction attempt for already
 284     * corrupted data.
 285     * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the
 286     * area with different data from the other children. */
 287    bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
 288                    acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED);
 289
 290    /* Wake up the caller after the last rewrite */
 291    acb->rewrite_count--;
 292    if (!acb->rewrite_count) {
 293        qemu_coroutine_enter_if_inactive(acb->co);
 294    }
 295}
 296
 297static bool coroutine_fn GRAPH_RDLOCK
 298quorum_rewrite_bad_versions(QuorumAIOCB *acb, QuorumVoteValue *value)
 299{
 300    QuorumVoteVersion *version;
 301    QuorumVoteItem *item;
 302    int count = 0;
 303
 304    /* first count the number of bad versions: done first to avoid concurrency
 305     * issues.
 306     */
 307    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 308        if (acb->votes.compare(&version->value, value)) {
 309            continue;
 310        }
 311        QLIST_FOREACH(item, &version->items, next) {
 312            count++;
 313        }
 314    }
 315
 316    /* quorum_rewrite_entry will count down this to zero */
 317    acb->rewrite_count = count;
 318
 319    /* now fire the correcting rewrites */
 320    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 321        if (acb->votes.compare(&version->value, value)) {
 322            continue;
 323        }
 324        QLIST_FOREACH(item, &version->items, next) {
 325            Coroutine *co;
 326            QuorumCo data = {
 327                .acb = acb,
 328                .idx = item->index,
 329            };
 330
 331            co = qemu_coroutine_create(quorum_rewrite_entry, &data);
 332            qemu_coroutine_enter(co);
 333        }
 334    }
 335
 336    /* return true if any rewrite is done else false */
 337    return count;
 338}
 339
 340static void quorum_count_vote(QuorumVotes *votes,
 341                              QuorumVoteValue *value,
 342                              int index)
 343{
 344    QuorumVoteVersion *v = NULL, *version = NULL;
 345    QuorumVoteItem *item;
 346
 347    /* look if we have something with this hash */
 348    QLIST_FOREACH(v, &votes->vote_list, next) {
 349        if (votes->compare(&v->value, value)) {
 350            version = v;
 351            break;
 352        }
 353    }
 354
 355    /* It's a version not yet in the list add it */
 356    if (!version) {
 357        version = g_new0(QuorumVoteVersion, 1);
 358        QLIST_INIT(&version->items);
 359        memcpy(&version->value, value, sizeof(version->value));
 360        version->index = index;
 361        version->vote_count = 0;
 362        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
 363    }
 364
 365    version->vote_count++;
 366
 367    item = g_new0(QuorumVoteItem, 1);
 368    item->index = index;
 369    QLIST_INSERT_HEAD(&version->items, item, next);
 370}
 371
 372static void quorum_free_vote_list(QuorumVotes *votes)
 373{
 374    QuorumVoteVersion *version, *next_version;
 375    QuorumVoteItem *item, *next_item;
 376
 377    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
 378        QLIST_REMOVE(version, next);
 379        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
 380            QLIST_REMOVE(item, next);
 381            g_free(item);
 382        }
 383        g_free(version);
 384    }
 385}
 386
 387static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
 388{
 389    QEMUIOVector *qiov = &acb->qcrs[i].qiov;
 390    size_t len = sizeof(hash->h);
 391    uint8_t *data = hash->h;
 392
 393    /* XXX - would be nice if we could pass in the Error **
 394     * and propagate that back, but this quorum code is
 395     * restricted to just errno values currently */
 396    if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256,
 397                            qiov->iov, qiov->niov,
 398                            &data, &len,
 399                            NULL) < 0) {
 400        return -EINVAL;
 401    }
 402
 403    return 0;
 404}
 405
 406static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
 407{
 408    int max = 0;
 409    QuorumVoteVersion *candidate, *winner = NULL;
 410
 411    QLIST_FOREACH(candidate, &votes->vote_list, next) {
 412        if (candidate->vote_count > max) {
 413            max = candidate->vote_count;
 414            winner = candidate;
 415        }
 416    }
 417
 418    return winner;
 419}
 420
 421/* qemu_iovec_compare is handy for blkverify mode because it returns the first
 422 * differing byte location. Yet it is handcoded to compare vectors one byte
 423 * after another so it does not benefit from the libc SIMD optimizations.
 424 * quorum_iovec_compare is written for speed and should be used in the non
 425 * blkverify mode of quorum.
 426 */
 427static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
 428{
 429    int i;
 430    int result;
 431
 432    assert(a->niov == b->niov);
 433    for (i = 0; i < a->niov; i++) {
 434        assert(a->iov[i].iov_len == b->iov[i].iov_len);
 435        result = memcmp(a->iov[i].iov_base,
 436                        b->iov[i].iov_base,
 437                        a->iov[i].iov_len);
 438        if (result) {
 439            return false;
 440        }
 441    }
 442
 443    return true;
 444}
 445
 446static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b)
 447{
 448    BDRVQuorumState *s = acb->bs->opaque;
 449    ssize_t offset;
 450
 451    /* This driver will replace blkverify in this particular case */
 452    if (s->is_blkverify) {
 453        offset = qemu_iovec_compare(a, b);
 454        if (offset != -1) {
 455            fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64
 456                    " contents mismatch at offset %" PRIu64 "\n",
 457                    acb->offset, acb->bytes, acb->offset + offset);
 458            exit(1);
 459        }
 460        return true;
 461    }
 462
 463    return quorum_iovec_compare(a, b);
 464}
 465
 466/* Do a vote to get the error code */
 467static int quorum_vote_error(QuorumAIOCB *acb)
 468{
 469    BDRVQuorumState *s = acb->bs->opaque;
 470    QuorumVoteVersion *winner = NULL;
 471    QuorumVotes error_votes;
 472    QuorumVoteValue result_value;
 473    int i, ret = 0;
 474    bool error = false;
 475
 476    QLIST_INIT(&error_votes.vote_list);
 477    error_votes.compare = quorum_64bits_compare;
 478
 479    for (i = 0; i < s->num_children; i++) {
 480        ret = acb->qcrs[i].ret;
 481        if (ret) {
 482            error = true;
 483            result_value.l = ret;
 484            quorum_count_vote(&error_votes, &result_value, i);
 485        }
 486    }
 487
 488    if (error) {
 489        winner = quorum_get_vote_winner(&error_votes);
 490        ret = winner->value.l;
 491    }
 492
 493    quorum_free_vote_list(&error_votes);
 494
 495    return ret;
 496}
 497
 498static void coroutine_fn GRAPH_RDLOCK quorum_vote(QuorumAIOCB *acb)
 499{
 500    bool quorum = true;
 501    int i, j, ret;
 502    QuorumVoteValue hash;
 503    BDRVQuorumState *s = acb->bs->opaque;
 504    QuorumVoteVersion *winner;
 505
 506    if (quorum_has_too_much_io_failed(acb)) {
 507        return;
 508    }
 509
 510    /* get the index of the first successful read */
 511    for (i = 0; i < s->num_children; i++) {
 512        if (!acb->qcrs[i].ret) {
 513            break;
 514        }
 515    }
 516
 517    assert(i < s->num_children);
 518
 519    /* compare this read with all other successful reads stopping at quorum
 520     * failure
 521     */
 522    for (j = i + 1; j < s->num_children; j++) {
 523        if (acb->qcrs[j].ret) {
 524            continue;
 525        }
 526        quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
 527        if (!quorum) {
 528            break;
 529       }
 530    }
 531
 532    /* Every successful read agrees */
 533    if (quorum) {
 534        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
 535        return;
 536    }
 537
 538    /* compute hashes for each successful read, also store indexes */
 539    for (i = 0; i < s->num_children; i++) {
 540        if (acb->qcrs[i].ret) {
 541            continue;
 542        }
 543        ret = quorum_compute_hash(acb, i, &hash);
 544        /* if ever the hash computation failed */
 545        if (ret < 0) {
 546            acb->vote_ret = ret;
 547            goto free_exit;
 548        }
 549        quorum_count_vote(&acb->votes, &hash, i);
 550    }
 551
 552    /* vote to select the most represented version */
 553    winner = quorum_get_vote_winner(&acb->votes);
 554
 555    /* if the winner count is smaller than threshold the read fails */
 556    if (winner->vote_count < s->threshold) {
 557        quorum_report_failure(acb);
 558        acb->vote_ret = -EIO;
 559        goto free_exit;
 560    }
 561
 562    /* we have a winner: copy it */
 563    quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
 564
 565    /* some versions are bad print them */
 566    quorum_report_bad_versions(s, acb, &winner->value);
 567
 568    /* corruption correction is enabled */
 569    if (s->rewrite_corrupted) {
 570        quorum_rewrite_bad_versions(acb, &winner->value);
 571    }
 572
 573free_exit:
 574    /* free lists */
 575    quorum_free_vote_list(&acb->votes);
 576}
 577
 578/*
 579 * This function can count as GRAPH_RDLOCK because read_quorum_children() holds
 580 * the graph lock and keeps it until this coroutine has terminated.
 581 */
 582static void coroutine_fn GRAPH_RDLOCK read_quorum_children_entry(void *opaque)
 583{
 584    QuorumCo *co = opaque;
 585    QuorumAIOCB *acb = co->acb;
 586    BDRVQuorumState *s = acb->bs->opaque;
 587    int i = co->idx;
 588    QuorumChildRequest *sacb = &acb->qcrs[i];
 589
 590    sacb->bs = s->children[i]->bs;
 591    sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes,
 592                               &acb->qcrs[i].qiov, 0);
 593
 594    if (sacb->ret == 0) {
 595        acb->success_count++;
 596    } else {
 597        quorum_report_bad_acb(sacb, sacb->ret);
 598    }
 599
 600    acb->count++;
 601    assert(acb->count <= s->num_children);
 602    assert(acb->success_count <= s->num_children);
 603
 604    /* Wake up the caller after the last read */
 605    if (acb->count == s->num_children) {
 606        qemu_coroutine_enter_if_inactive(acb->co);
 607    }
 608}
 609
 610static int coroutine_fn GRAPH_RDLOCK read_quorum_children(QuorumAIOCB *acb)
 611{
 612    BDRVQuorumState *s = acb->bs->opaque;
 613    int i;
 614
 615    acb->children_read = s->num_children;
 616    for (i = 0; i < s->num_children; i++) {
 617        acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size);
 618        qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov);
 619        qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf);
 620    }
 621
 622    for (i = 0; i < s->num_children; i++) {
 623        Coroutine *co;
 624        QuorumCo data = {
 625            .acb = acb,
 626            .idx = i,
 627        };
 628
 629        co = qemu_coroutine_create(read_quorum_children_entry, &data);
 630        qemu_coroutine_enter(co);
 631    }
 632
 633    while (acb->count < s->num_children) {
 634        qemu_coroutine_yield();
 635    }
 636
 637    /* Do the vote on read */
 638    quorum_vote(acb);
 639    for (i = 0; i < s->num_children; i++) {
 640        qemu_vfree(acb->qcrs[i].buf);
 641        qemu_iovec_destroy(&acb->qcrs[i].qiov);
 642    }
 643
 644    while (acb->rewrite_count) {
 645        qemu_coroutine_yield();
 646    }
 647
 648    return acb->vote_ret;
 649}
 650
 651static int coroutine_fn GRAPH_RDLOCK read_fifo_child(QuorumAIOCB *acb)
 652{
 653    BDRVQuorumState *s = acb->bs->opaque;
 654    int n, ret;
 655
 656    /* We try to read the next child in FIFO order if we failed to read */
 657    do {
 658        n = acb->children_read++;
 659        acb->qcrs[n].bs = s->children[n]->bs;
 660        ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes,
 661                             acb->qiov, 0);
 662        if (ret < 0) {
 663            quorum_report_bad_acb(&acb->qcrs[n], ret);
 664        }
 665    } while (ret < 0 && acb->children_read < s->num_children);
 666
 667    /* FIXME: rewrite failed children if acb->children_read > 1? */
 668
 669    return ret;
 670}
 671
 672static int coroutine_fn GRAPH_RDLOCK
 673quorum_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
 674                 QEMUIOVector *qiov, BdrvRequestFlags flags)
 675{
 676    BDRVQuorumState *s = bs->opaque;
 677    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
 678    int ret;
 679
 680    acb->is_read = true;
 681    acb->children_read = 0;
 682
 683    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
 684        ret = read_quorum_children(acb);
 685    } else {
 686        ret = read_fifo_child(acb);
 687    }
 688    quorum_aio_finalize(acb);
 689
 690    return ret;
 691}
 692
 693/*
 694 * This function can count as GRAPH_RDLOCK because quorum_co_pwritev() holds the
 695 * graph lock and keeps it until this coroutine has terminated.
 696 */
 697static void coroutine_fn GRAPH_RDLOCK write_quorum_entry(void *opaque)
 698{
 699    QuorumCo *co = opaque;
 700    QuorumAIOCB *acb = co->acb;
 701    BDRVQuorumState *s = acb->bs->opaque;
 702    int i = co->idx;
 703    QuorumChildRequest *sacb = &acb->qcrs[i];
 704
 705    sacb->bs = s->children[i]->bs;
 706    if (acb->flags & BDRV_REQ_ZERO_WRITE) {
 707        sacb->ret = bdrv_co_pwrite_zeroes(s->children[i], acb->offset,
 708                                          acb->bytes, acb->flags);
 709    } else {
 710        sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
 711                                    acb->qiov, acb->flags);
 712    }
 713    if (sacb->ret == 0) {
 714        acb->success_count++;
 715    } else {
 716        quorum_report_bad_acb(sacb, sacb->ret);
 717    }
 718    acb->count++;
 719    assert(acb->count <= s->num_children);
 720    assert(acb->success_count <= s->num_children);
 721
 722    /* Wake up the caller after the last write */
 723    if (acb->count == s->num_children) {
 724        qemu_coroutine_enter_if_inactive(acb->co);
 725    }
 726}
 727
 728static int coroutine_fn GRAPH_RDLOCK
 729quorum_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
 730                  QEMUIOVector *qiov, BdrvRequestFlags flags)
 731{
 732    BDRVQuorumState *s = bs->opaque;
 733    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
 734    int i, ret;
 735
 736    for (i = 0; i < s->num_children; i++) {
 737        Coroutine *co;
 738        QuorumCo data = {
 739            .acb = acb,
 740            .idx = i,
 741        };
 742
 743        co = qemu_coroutine_create(write_quorum_entry, &data);
 744        qemu_coroutine_enter(co);
 745    }
 746
 747    while (acb->count < s->num_children) {
 748        qemu_coroutine_yield();
 749    }
 750
 751    quorum_has_too_much_io_failed(acb);
 752
 753    ret = acb->vote_ret;
 754    quorum_aio_finalize(acb);
 755
 756    return ret;
 757}
 758
 759static int coroutine_fn GRAPH_RDLOCK
 760quorum_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
 761                        BdrvRequestFlags flags)
 762{
 763    return quorum_co_pwritev(bs, offset, bytes, NULL,
 764                             flags | BDRV_REQ_ZERO_WRITE);
 765}
 766
 767static int64_t coroutine_fn GRAPH_RDLOCK
 768quorum_co_getlength(BlockDriverState *bs)
 769{
 770    BDRVQuorumState *s = bs->opaque;
 771    int64_t result;
 772    int i;
 773
 774    /* check that all file have the same length */
 775    result = bdrv_co_getlength(s->children[0]->bs);
 776    if (result < 0) {
 777        return result;
 778    }
 779    for (i = 1; i < s->num_children; i++) {
 780        int64_t value = bdrv_co_getlength(s->children[i]->bs);
 781        if (value < 0) {
 782            return value;
 783        }
 784        if (value != result) {
 785            return -EIO;
 786        }
 787    }
 788
 789    return result;
 790}
 791
 792static coroutine_fn GRAPH_RDLOCK int quorum_co_flush(BlockDriverState *bs)
 793{
 794    BDRVQuorumState *s = bs->opaque;
 795    QuorumVoteVersion *winner = NULL;
 796    QuorumVotes error_votes;
 797    QuorumVoteValue result_value;
 798    int i;
 799    int result = 0;
 800    int success_count = 0;
 801
 802    QLIST_INIT(&error_votes.vote_list);
 803    error_votes.compare = quorum_64bits_compare;
 804
 805    for (i = 0; i < s->num_children; i++) {
 806        result = bdrv_co_flush(s->children[i]->bs);
 807        if (result) {
 808            quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 0,
 809                              s->children[i]->bs->node_name, result);
 810            result_value.l = result;
 811            quorum_count_vote(&error_votes, &result_value, i);
 812        } else {
 813            success_count++;
 814        }
 815    }
 816
 817    if (success_count >= s->threshold) {
 818        result = 0;
 819    } else {
 820        winner = quorum_get_vote_winner(&error_votes);
 821        result = winner->value.l;
 822    }
 823    quorum_free_vote_list(&error_votes);
 824
 825    return result;
 826}
 827
 828static bool GRAPH_RDLOCK
 829quorum_recurse_can_replace(BlockDriverState *bs, BlockDriverState *to_replace)
 830{
 831    BDRVQuorumState *s = bs->opaque;
 832    int i;
 833
 834    for (i = 0; i < s->num_children; i++) {
 835        /*
 836         * We have no idea whether our children show the same data as
 837         * this node (@bs).  It is actually highly likely that
 838         * @to_replace does not, because replacing a broken child is
 839         * one of the main use cases here.
 840         *
 841         * We do know that the new BDS will match @bs, so replacing
 842         * any of our children by it will be safe.  It cannot change
 843         * the data this quorum node presents to its parents.
 844         *
 845         * However, replacing @to_replace by @bs in any of our
 846         * children's chains may change visible data somewhere in
 847         * there.  We therefore cannot recurse down those chains with
 848         * bdrv_recurse_can_replace().
 849         * (More formally, bdrv_recurse_can_replace() requires that
 850         * @to_replace will be replaced by something matching the @bs
 851         * passed to it.  We cannot guarantee that.)
 852         *
 853         * Thus, we can only check whether any of our immediate
 854         * children matches @to_replace.
 855         *
 856         * (In the future, we might add a function to recurse down a
 857         * chain that checks that nothing there cares about a change
 858         * in data from the respective child in question.  For
 859         * example, most filters do not care when their child's data
 860         * suddenly changes, as long as their parents do not care.)
 861         */
 862        if (s->children[i]->bs == to_replace) {
 863            /*
 864             * We now have to ensure that there is no other parent
 865             * that cares about replacing this child by a node with
 866             * potentially different data.
 867             * We do so by checking whether there are any other parents
 868             * at all, which is stricter than necessary, but also very
 869             * simple.  (We may decide to implement something more
 870             * complex and permissive when there is an actual need for
 871             * it.)
 872             */
 873            return QLIST_FIRST(&to_replace->parents) == s->children[i] &&
 874                QLIST_NEXT(s->children[i], next_parent) == NULL;
 875        }
 876    }
 877
 878    return false;
 879}
 880
 881static int quorum_valid_threshold(int threshold, int num_children, Error **errp)
 882{
 883
 884    if (threshold < 1) {
 885        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
 886                   "vote-threshold", "a value >= 1");
 887        return -ERANGE;
 888    }
 889
 890    if (threshold > num_children) {
 891        error_setg(errp, "threshold may not exceed children count");
 892        return -ERANGE;
 893    }
 894
 895    return 0;
 896}
 897
 898static QemuOptsList quorum_runtime_opts = {
 899    .name = "quorum",
 900    .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
 901    .desc = {
 902        {
 903            .name = QUORUM_OPT_VOTE_THRESHOLD,
 904            .type = QEMU_OPT_NUMBER,
 905            .help = "The number of vote needed for reaching quorum",
 906        },
 907        {
 908            .name = QUORUM_OPT_BLKVERIFY,
 909            .type = QEMU_OPT_BOOL,
 910            .help = "Trigger block verify mode if set",
 911        },
 912        {
 913            .name = QUORUM_OPT_REWRITE,
 914            .type = QEMU_OPT_BOOL,
 915            .help = "Rewrite corrupted block on read quorum",
 916        },
 917        {
 918            .name = QUORUM_OPT_READ_PATTERN,
 919            .type = QEMU_OPT_STRING,
 920            .help = "Allowed pattern: quorum, fifo. Quorum is default",
 921        },
 922        { /* end of list */ }
 923    },
 924};
 925
 926static void quorum_refresh_flags(BlockDriverState *bs)
 927{
 928    BDRVQuorumState *s = bs->opaque;
 929    int i;
 930
 931    bs->supported_zero_flags =
 932        BDRV_REQ_FUA | BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
 933
 934    for (i = 0; i < s->num_children; i++) {
 935        bs->supported_zero_flags &= s->children[i]->bs->supported_zero_flags;
 936    }
 937
 938    bs->supported_zero_flags |= BDRV_REQ_WRITE_UNCHANGED;
 939}
 940
 941static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
 942                       Error **errp)
 943{
 944    BDRVQuorumState *s = bs->opaque;
 945    QemuOpts *opts = NULL;
 946    const char *pattern_str;
 947    bool *opened;
 948    int i;
 949    int ret = 0;
 950
 951    qdict_flatten(options);
 952
 953    /* count how many different children are present */
 954    s->num_children = qdict_array_entries(options, "children.");
 955    if (s->num_children < 0) {
 956        error_setg(errp, "Option children is not a valid array");
 957        ret = -EINVAL;
 958        goto exit;
 959    }
 960    if (s->num_children < 1) {
 961        error_setg(errp, "Number of provided children must be 1 or more");
 962        ret = -EINVAL;
 963        goto exit;
 964    }
 965
 966    opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
 967    if (!qemu_opts_absorb_qdict(opts, options, errp)) {
 968        ret = -EINVAL;
 969        goto exit;
 970    }
 971
 972    s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
 973    /* and validate it against s->num_children */
 974    ret = quorum_valid_threshold(s->threshold, s->num_children, errp);
 975    if (ret < 0) {
 976        goto exit;
 977    }
 978
 979    pattern_str = qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN);
 980    if (!pattern_str) {
 981        ret = QUORUM_READ_PATTERN_QUORUM;
 982    } else {
 983        ret = qapi_enum_parse(&QuorumReadPattern_lookup, pattern_str,
 984                              -EINVAL, NULL);
 985    }
 986    if (ret < 0) {
 987        error_setg(errp, "Please set read-pattern as fifo or quorum");
 988        goto exit;
 989    }
 990    s->read_pattern = ret;
 991
 992    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
 993        s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false);
 994        if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) {
 995            error_setg(errp, "blkverify=on can only be set if there are "
 996                       "exactly two files and vote-threshold is 2");
 997            ret = -EINVAL;
 998            goto exit;
 999        }
1000
1001        s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
1002                                                 false);
1003        if (s->rewrite_corrupted && s->is_blkverify) {
1004            error_setg(errp,
1005                       "rewrite-corrupted=on cannot be used with blkverify=on");
1006            ret = -EINVAL;
1007            goto exit;
1008        }
1009    }
1010
1011    /* allocate the children array */
1012    s->children = g_new0(BdrvChild *, s->num_children);
1013    opened = g_new0(bool, s->num_children);
1014
1015    for (i = 0; i < s->num_children; i++) {
1016        char indexstr[INDEXSTR_LEN];
1017        ret = snprintf(indexstr, INDEXSTR_LEN, "children.%d", i);
1018        assert(ret < INDEXSTR_LEN);
1019
1020        s->children[i] = bdrv_open_child(NULL, options, indexstr, bs,
1021                                         &child_of_bds, BDRV_CHILD_DATA, false,
1022                                         errp);
1023        if (!s->children[i]) {
1024            ret = -EINVAL;
1025            goto close_exit;
1026        }
1027
1028        opened[i] = true;
1029    }
1030    s->next_child_index = s->num_children;
1031
1032    bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
1033    quorum_refresh_flags(bs);
1034
1035    g_free(opened);
1036    goto exit;
1037
1038close_exit:
1039    /* cleanup on error */
1040    for (i = 0; i < s->num_children; i++) {
1041        if (!opened[i]) {
1042            continue;
1043        }
1044        bdrv_unref_child(bs, s->children[i]);
1045    }
1046    g_free(s->children);
1047    g_free(opened);
1048exit:
1049    qemu_opts_del(opts);
1050    return ret;
1051}
1052
1053static void quorum_close(BlockDriverState *bs)
1054{
1055    BDRVQuorumState *s = bs->opaque;
1056    int i;
1057
1058    for (i = 0; i < s->num_children; i++) {
1059        bdrv_unref_child(bs, s->children[i]);
1060    }
1061
1062    g_free(s->children);
1063}
1064
1065static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
1066                             Error **errp)
1067{
1068    BDRVQuorumState *s = bs->opaque;
1069    BdrvChild *child;
1070    char indexstr[INDEXSTR_LEN];
1071    int ret;
1072
1073    if (s->is_blkverify) {
1074        error_setg(errp, "Cannot add a child to a quorum in blkverify mode");
1075        return;
1076    }
1077
1078    assert(s->num_children <= INT_MAX / sizeof(BdrvChild *));
1079    if (s->num_children == INT_MAX / sizeof(BdrvChild *) ||
1080        s->next_child_index == UINT_MAX) {
1081        error_setg(errp, "Too many children");
1082        return;
1083    }
1084
1085    ret = snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index);
1086    if (ret < 0 || ret >= INDEXSTR_LEN) {
1087        error_setg(errp, "cannot generate child name");
1088        return;
1089    }
1090    s->next_child_index++;
1091
1092    bdrv_drained_begin(bs);
1093
1094    /* We can safely add the child now */
1095    bdrv_ref(child_bs);
1096
1097    child = bdrv_attach_child(bs, child_bs, indexstr, &child_of_bds,
1098                              BDRV_CHILD_DATA, errp);
1099    if (child == NULL) {
1100        s->next_child_index--;
1101        goto out;
1102    }
1103    s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
1104    s->children[s->num_children++] = child;
1105    quorum_refresh_flags(bs);
1106
1107out:
1108    bdrv_drained_end(bs);
1109}
1110
1111static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
1112                             Error **errp)
1113{
1114    BDRVQuorumState *s = bs->opaque;
1115    char indexstr[INDEXSTR_LEN];
1116    int i;
1117
1118    for (i = 0; i < s->num_children; i++) {
1119        if (s->children[i] == child) {
1120            break;
1121        }
1122    }
1123
1124    /* we have checked it in bdrv_del_child() */
1125    assert(i < s->num_children);
1126
1127    if (s->num_children <= s->threshold) {
1128        error_setg(errp,
1129            "The number of children cannot be lower than the vote threshold %d",
1130            s->threshold);
1131        return;
1132    }
1133
1134    /* We know now that num_children > threshold, so blkverify must be false */
1135    assert(!s->is_blkverify);
1136
1137    snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index - 1);
1138    if (!strncmp(child->name, indexstr, INDEXSTR_LEN)) {
1139        s->next_child_index--;
1140    }
1141
1142    bdrv_drained_begin(bs);
1143
1144    /* We can safely remove this child now */
1145    memmove(&s->children[i], &s->children[i + 1],
1146            (s->num_children - i - 1) * sizeof(BdrvChild *));
1147    s->children = g_renew(BdrvChild *, s->children, --s->num_children);
1148    bdrv_unref_child(bs, child);
1149
1150    quorum_refresh_flags(bs);
1151    bdrv_drained_end(bs);
1152}
1153
1154static void quorum_gather_child_options(BlockDriverState *bs, QDict *target,
1155                                        bool backing_overridden)
1156{
1157    BDRVQuorumState *s = bs->opaque;
1158    QList *children_list;
1159    int i;
1160
1161    /*
1162     * The generic implementation for gathering child options in
1163     * bdrv_refresh_filename() would use the names of the children
1164     * as specified for bdrv_open_child() or bdrv_attach_child(),
1165     * which is "children.%u" with %u being a value
1166     * (s->next_child_index) that is incremented each time a new child
1167     * is added (and never decremented).  Since children can be
1168     * deleted at runtime, there may be gaps in that enumeration.
1169     * When creating a new quorum BDS and specifying the children for
1170     * it through runtime options, the enumeration used there may not
1171     * have any gaps, though.
1172     *
1173     * Therefore, we have to create a new gap-less enumeration here
1174     * (which we can achieve by simply putting all of the children's
1175     * full_open_options into a QList).
1176     *
1177     * XXX: Note that there are issues with the current child option
1178     *      structure quorum uses (such as the fact that children do
1179     *      not really have unique permanent names).  Therefore, this
1180     *      is going to have to change in the future and ideally we
1181     *      want quorum to be covered by the generic implementation.
1182     */
1183
1184    children_list = qlist_new();
1185    qdict_put(target, "children", children_list);
1186
1187    for (i = 0; i < s->num_children; i++) {
1188        qlist_append(children_list,
1189                     qobject_ref(s->children[i]->bs->full_open_options));
1190    }
1191}
1192
1193static char *quorum_dirname(BlockDriverState *bs, Error **errp)
1194{
1195    /* In general, there are multiple BDSs with different dirnames below this
1196     * one; so there is no unique dirname we could return (unless all are equal
1197     * by chance, or there is only one). Therefore, to be consistent, just
1198     * always return NULL. */
1199    error_setg(errp, "Cannot generate a base directory for quorum nodes");
1200    return NULL;
1201}
1202
1203static void quorum_child_perm(BlockDriverState *bs, BdrvChild *c,
1204                              BdrvChildRole role,
1205                              BlockReopenQueue *reopen_queue,
1206                              uint64_t perm, uint64_t shared,
1207                              uint64_t *nperm, uint64_t *nshared)
1208{
1209    BDRVQuorumState *s = bs->opaque;
1210
1211    *nperm = perm & DEFAULT_PERM_PASSTHROUGH;
1212    if (s->rewrite_corrupted) {
1213        *nperm |= BLK_PERM_WRITE;
1214    }
1215
1216    /*
1217     * We cannot share RESIZE or WRITE, as this would make the
1218     * children differ from each other.
1219     */
1220    *nshared = (shared & (BLK_PERM_CONSISTENT_READ |
1221                          BLK_PERM_WRITE_UNCHANGED))
1222             | DEFAULT_PERM_UNCHANGED;
1223}
1224
1225/*
1226 * Each one of the children can report different status flags even
1227 * when they contain the same data, so what this function does is
1228 * return BDRV_BLOCK_ZERO if *all* children agree that a certain
1229 * region contains zeroes, and BDRV_BLOCK_DATA otherwise.
1230 */
1231static int coroutine_fn GRAPH_RDLOCK
1232quorum_co_block_status(BlockDriverState *bs, bool want_zero,
1233                       int64_t offset, int64_t count,
1234                       int64_t *pnum, int64_t *map, BlockDriverState **file)
1235{
1236    BDRVQuorumState *s = bs->opaque;
1237    int i, ret;
1238    int64_t pnum_zero = count;
1239    int64_t pnum_data = 0;
1240
1241    for (i = 0; i < s->num_children; i++) {
1242        int64_t bytes;
1243        ret = bdrv_co_common_block_status_above(s->children[i]->bs, NULL, false,
1244                                                want_zero, offset, count,
1245                                                &bytes, NULL, NULL, NULL);
1246        if (ret < 0) {
1247            quorum_report_bad(QUORUM_OP_TYPE_READ, offset, count,
1248                              s->children[i]->bs->node_name, ret);
1249            pnum_data = count;
1250            break;
1251        }
1252        /*
1253         * Even if all children agree about whether there are zeroes
1254         * or not at @offset they might disagree on the size, so use
1255         * the smallest when reporting BDRV_BLOCK_ZERO and the largest
1256         * when reporting BDRV_BLOCK_DATA.
1257         */
1258        if (ret & BDRV_BLOCK_ZERO) {
1259            pnum_zero = MIN(pnum_zero, bytes);
1260        } else {
1261            pnum_data = MAX(pnum_data, bytes);
1262        }
1263    }
1264
1265    if (pnum_data) {
1266        *pnum = pnum_data;
1267        return BDRV_BLOCK_DATA;
1268    } else {
1269        *pnum = pnum_zero;
1270        return BDRV_BLOCK_ZERO;
1271    }
1272}
1273
1274static const char *const quorum_strong_runtime_opts[] = {
1275    QUORUM_OPT_VOTE_THRESHOLD,
1276    QUORUM_OPT_BLKVERIFY,
1277    QUORUM_OPT_REWRITE,
1278    QUORUM_OPT_READ_PATTERN,
1279
1280    NULL
1281};
1282
1283static BlockDriver bdrv_quorum = {
1284    .format_name                        = "quorum",
1285
1286    .instance_size                      = sizeof(BDRVQuorumState),
1287
1288    .bdrv_open                          = quorum_open,
1289    .bdrv_close                         = quorum_close,
1290    .bdrv_gather_child_options          = quorum_gather_child_options,
1291    .bdrv_dirname                       = quorum_dirname,
1292    .bdrv_co_block_status               = quorum_co_block_status,
1293
1294    .bdrv_co_flush                      = quorum_co_flush,
1295
1296    .bdrv_co_getlength                  = quorum_co_getlength,
1297
1298    .bdrv_co_preadv                     = quorum_co_preadv,
1299    .bdrv_co_pwritev                    = quorum_co_pwritev,
1300    .bdrv_co_pwrite_zeroes              = quorum_co_pwrite_zeroes,
1301
1302    .bdrv_add_child                     = quorum_add_child,
1303    .bdrv_del_child                     = quorum_del_child,
1304
1305    .bdrv_child_perm                    = quorum_child_perm,
1306
1307    .bdrv_recurse_can_replace           = quorum_recurse_can_replace,
1308
1309    .strong_runtime_opts                = quorum_strong_runtime_opts,
1310};
1311
1312static void bdrv_quorum_init(void)
1313{
1314    if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) {
1315        /* SHA256 hash support is required for quorum device */
1316        return;
1317    }
1318    bdrv_register(&bdrv_quorum);
1319}
1320
1321block_init(bdrv_quorum_init);
1322