qemu/block/quorum.c
<<
>>
Prefs
   1/*
   2 * Quorum Block filter
   3 *
   4 * Copyright (C) 2012-2014 Nodalink, EURL.
   5 *
   6 * Author:
   7 *   BenoƮt Canet <benoit.canet@irqsave.net>
   8 *
   9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
  10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
  11 *
  12 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  13 * See the COPYING file in the top-level directory.
  14 */
  15
  16#include "qemu/osdep.h"
  17#include "qemu/cutils.h"
  18#include "qemu/module.h"
  19#include "qemu/option.h"
  20#include "block/block_int.h"
  21#include "block/coroutines.h"
  22#include "block/qdict.h"
  23#include "qapi/error.h"
  24#include "qapi/qapi-events-block.h"
  25#include "qapi/qmp/qdict.h"
  26#include "qapi/qmp/qerror.h"
  27#include "qapi/qmp/qlist.h"
  28#include "qapi/qmp/qstring.h"
  29#include "crypto/hash.h"
  30
  31#define HASH_LENGTH 32
  32
  33#define INDEXSTR_LEN 32
  34
  35#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
  36#define QUORUM_OPT_BLKVERIFY      "blkverify"
  37#define QUORUM_OPT_REWRITE        "rewrite-corrupted"
  38#define QUORUM_OPT_READ_PATTERN   "read-pattern"
  39
  40/* This union holds a vote hash value */
  41typedef union QuorumVoteValue {
  42    uint8_t h[HASH_LENGTH];    /* SHA-256 hash */
  43    int64_t l;                 /* simpler 64 bits hash */
  44} QuorumVoteValue;
  45
  46/* A vote item */
  47typedef struct QuorumVoteItem {
  48    int index;
  49    QLIST_ENTRY(QuorumVoteItem) next;
  50} QuorumVoteItem;
  51
  52/* this structure is a vote version. A version is the set of votes sharing the
  53 * same vote value.
  54 * The set of votes will be tracked with the items field and its cardinality is
  55 * vote_count.
  56 */
  57typedef struct QuorumVoteVersion {
  58    QuorumVoteValue value;
  59    int index;
  60    int vote_count;
  61    QLIST_HEAD(, QuorumVoteItem) items;
  62    QLIST_ENTRY(QuorumVoteVersion) next;
  63} QuorumVoteVersion;
  64
  65/* this structure holds a group of vote versions together */
  66typedef struct QuorumVotes {
  67    QLIST_HEAD(, QuorumVoteVersion) vote_list;
  68    bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
  69} QuorumVotes;
  70
  71/* the following structure holds the state of one quorum instance */
  72typedef struct BDRVQuorumState {
  73    BdrvChild **children;  /* children BlockDriverStates */
  74    int num_children;      /* children count */
  75    unsigned next_child_index;  /* the index of the next child that should
  76                                 * be added
  77                                 */
  78    int threshold;         /* if less than threshold children reads gave the
  79                            * same result a quorum error occurs.
  80                            */
  81    bool is_blkverify;     /* true if the driver is in blkverify mode
  82                            * Writes are mirrored on two children devices.
  83                            * On reads the two children devices' contents are
  84                            * compared and if a difference is spotted its
  85                            * location is printed and the code aborts.
  86                            * It is useful to debug other block drivers by
  87                            * comparing them with a reference one.
  88                            */
  89    bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted
  90                            * block if Quorum is reached.
  91                            */
  92
  93    QuorumReadPattern read_pattern;
  94} BDRVQuorumState;
  95
  96typedef struct QuorumAIOCB QuorumAIOCB;
  97
  98/* Quorum will create one instance of the following structure per operation it
  99 * performs on its children.
 100 * So for each read/write operation coming from the upper layer there will be
 101 * $children_count QuorumChildRequest.
 102 */
 103typedef struct QuorumChildRequest {
 104    BlockDriverState *bs;
 105    QEMUIOVector qiov;
 106    uint8_t *buf;
 107    int ret;
 108    QuorumAIOCB *parent;
 109} QuorumChildRequest;
 110
 111/* Quorum will use the following structure to track progress of each read/write
 112 * operation received by the upper layer.
 113 * This structure hold pointers to the QuorumChildRequest structures instances
 114 * used to do operations on each children and track overall progress.
 115 */
 116struct QuorumAIOCB {
 117    BlockDriverState *bs;
 118    Coroutine *co;
 119
 120    /* Request metadata */
 121    uint64_t offset;
 122    uint64_t bytes;
 123    int flags;
 124
 125    QEMUIOVector *qiov;         /* calling IOV */
 126
 127    QuorumChildRequest *qcrs;   /* individual child requests */
 128    int count;                  /* number of completed AIOCB */
 129    int success_count;          /* number of successfully completed AIOCB */
 130
 131    int rewrite_count;          /* number of replica to rewrite: count down to
 132                                 * zero once writes are fired
 133                                 */
 134
 135    QuorumVotes votes;
 136
 137    bool is_read;
 138    int vote_ret;
 139    int children_read;          /* how many children have been read from */
 140};
 141
 142typedef struct QuorumCo {
 143    QuorumAIOCB *acb;
 144    int idx;
 145} QuorumCo;
 146
 147static void quorum_aio_finalize(QuorumAIOCB *acb)
 148{
 149    g_free(acb->qcrs);
 150    g_free(acb);
 151}
 152
 153static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 154{
 155    return !memcmp(a->h, b->h, HASH_LENGTH);
 156}
 157
 158static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
 159{
 160    return a->l == b->l;
 161}
 162
 163static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
 164                                   QEMUIOVector *qiov,
 165                                   uint64_t offset,
 166                                   uint64_t bytes,
 167                                   int flags)
 168{
 169    BDRVQuorumState *s = bs->opaque;
 170    QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
 171    int i;
 172
 173    *acb = (QuorumAIOCB) {
 174        .co                 = qemu_coroutine_self(),
 175        .bs                 = bs,
 176        .offset             = offset,
 177        .bytes              = bytes,
 178        .flags              = flags,
 179        .qiov               = qiov,
 180        .votes.compare      = quorum_sha256_compare,
 181        .votes.vote_list    = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
 182    };
 183
 184    acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
 185    for (i = 0; i < s->num_children; i++) {
 186        acb->qcrs[i].buf = NULL;
 187        acb->qcrs[i].ret = 0;
 188        acb->qcrs[i].parent = acb;
 189    }
 190
 191    return acb;
 192}
 193
 194static void quorum_report_bad(QuorumOpType type, uint64_t offset,
 195                              uint64_t bytes, char *node_name, int ret)
 196{
 197    const char *msg = NULL;
 198    int64_t start_sector = offset / BDRV_SECTOR_SIZE;
 199    int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE);
 200
 201    if (ret < 0) {
 202        msg = strerror(-ret);
 203    }
 204
 205    qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
 206                                      end_sector - start_sector);
 207}
 208
 209static void quorum_report_failure(QuorumAIOCB *acb)
 210{
 211    const char *reference = bdrv_get_device_or_node_name(acb->bs);
 212    int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE;
 213    int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes,
 214                                      BDRV_SECTOR_SIZE);
 215
 216    qapi_event_send_quorum_failure(reference, start_sector,
 217                                   end_sector - start_sector);
 218}
 219
 220static int quorum_vote_error(QuorumAIOCB *acb);
 221
 222static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
 223{
 224    BDRVQuorumState *s = acb->bs->opaque;
 225
 226    if (acb->success_count < s->threshold) {
 227        acb->vote_ret = quorum_vote_error(acb);
 228        quorum_report_failure(acb);
 229        return true;
 230    }
 231
 232    return false;
 233}
 234
 235static int read_fifo_child(QuorumAIOCB *acb);
 236
 237static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
 238{
 239    int i;
 240    assert(dest->niov == source->niov);
 241    assert(dest->size == source->size);
 242    for (i = 0; i < source->niov; i++) {
 243        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
 244        memcpy(dest->iov[i].iov_base,
 245               source->iov[i].iov_base,
 246               source->iov[i].iov_len);
 247    }
 248}
 249
 250static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret)
 251{
 252    QuorumAIOCB *acb = sacb->parent;
 253    QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
 254    quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret);
 255}
 256
 257static void quorum_report_bad_versions(BDRVQuorumState *s,
 258                                       QuorumAIOCB *acb,
 259                                       QuorumVoteValue *value)
 260{
 261    QuorumVoteVersion *version;
 262    QuorumVoteItem *item;
 263
 264    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 265        if (acb->votes.compare(&version->value, value)) {
 266            continue;
 267        }
 268        QLIST_FOREACH(item, &version->items, next) {
 269            quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes,
 270                              s->children[item->index]->bs->node_name, 0);
 271        }
 272    }
 273}
 274
 275static void quorum_rewrite_entry(void *opaque)
 276{
 277    QuorumCo *co = opaque;
 278    QuorumAIOCB *acb = co->acb;
 279    BDRVQuorumState *s = acb->bs->opaque;
 280
 281    /* Ignore any errors, it's just a correction attempt for already
 282     * corrupted data.
 283     * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the
 284     * area with different data from the other children. */
 285    bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
 286                    acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED);
 287
 288    /* Wake up the caller after the last rewrite */
 289    acb->rewrite_count--;
 290    if (!acb->rewrite_count) {
 291        qemu_coroutine_enter_if_inactive(acb->co);
 292    }
 293}
 294
 295static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb,
 296                                        QuorumVoteValue *value)
 297{
 298    QuorumVoteVersion *version;
 299    QuorumVoteItem *item;
 300    int count = 0;
 301
 302    /* first count the number of bad versions: done first to avoid concurrency
 303     * issues.
 304     */
 305    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 306        if (acb->votes.compare(&version->value, value)) {
 307            continue;
 308        }
 309        QLIST_FOREACH(item, &version->items, next) {
 310            count++;
 311        }
 312    }
 313
 314    /* quorum_rewrite_entry will count down this to zero */
 315    acb->rewrite_count = count;
 316
 317    /* now fire the correcting rewrites */
 318    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
 319        if (acb->votes.compare(&version->value, value)) {
 320            continue;
 321        }
 322        QLIST_FOREACH(item, &version->items, next) {
 323            Coroutine *co;
 324            QuorumCo data = {
 325                .acb = acb,
 326                .idx = item->index,
 327            };
 328
 329            co = qemu_coroutine_create(quorum_rewrite_entry, &data);
 330            qemu_coroutine_enter(co);
 331        }
 332    }
 333
 334    /* return true if any rewrite is done else false */
 335    return count;
 336}
 337
 338static void quorum_count_vote(QuorumVotes *votes,
 339                              QuorumVoteValue *value,
 340                              int index)
 341{
 342    QuorumVoteVersion *v = NULL, *version = NULL;
 343    QuorumVoteItem *item;
 344
 345    /* look if we have something with this hash */
 346    QLIST_FOREACH(v, &votes->vote_list, next) {
 347        if (votes->compare(&v->value, value)) {
 348            version = v;
 349            break;
 350        }
 351    }
 352
 353    /* It's a version not yet in the list add it */
 354    if (!version) {
 355        version = g_new0(QuorumVoteVersion, 1);
 356        QLIST_INIT(&version->items);
 357        memcpy(&version->value, value, sizeof(version->value));
 358        version->index = index;
 359        version->vote_count = 0;
 360        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
 361    }
 362
 363    version->vote_count++;
 364
 365    item = g_new0(QuorumVoteItem, 1);
 366    item->index = index;
 367    QLIST_INSERT_HEAD(&version->items, item, next);
 368}
 369
 370static void quorum_free_vote_list(QuorumVotes *votes)
 371{
 372    QuorumVoteVersion *version, *next_version;
 373    QuorumVoteItem *item, *next_item;
 374
 375    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
 376        QLIST_REMOVE(version, next);
 377        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
 378            QLIST_REMOVE(item, next);
 379            g_free(item);
 380        }
 381        g_free(version);
 382    }
 383}
 384
 385static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
 386{
 387    QEMUIOVector *qiov = &acb->qcrs[i].qiov;
 388    size_t len = sizeof(hash->h);
 389    uint8_t *data = hash->h;
 390
 391    /* XXX - would be nice if we could pass in the Error **
 392     * and propagate that back, but this quorum code is
 393     * restricted to just errno values currently */
 394    if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256,
 395                            qiov->iov, qiov->niov,
 396                            &data, &len,
 397                            NULL) < 0) {
 398        return -EINVAL;
 399    }
 400
 401    return 0;
 402}
 403
 404static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
 405{
 406    int max = 0;
 407    QuorumVoteVersion *candidate, *winner = NULL;
 408
 409    QLIST_FOREACH(candidate, &votes->vote_list, next) {
 410        if (candidate->vote_count > max) {
 411            max = candidate->vote_count;
 412            winner = candidate;
 413        }
 414    }
 415
 416    return winner;
 417}
 418
 419/* qemu_iovec_compare is handy for blkverify mode because it returns the first
 420 * differing byte location. Yet it is handcoded to compare vectors one byte
 421 * after another so it does not benefit from the libc SIMD optimizations.
 422 * quorum_iovec_compare is written for speed and should be used in the non
 423 * blkverify mode of quorum.
 424 */
 425static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
 426{
 427    int i;
 428    int result;
 429
 430    assert(a->niov == b->niov);
 431    for (i = 0; i < a->niov; i++) {
 432        assert(a->iov[i].iov_len == b->iov[i].iov_len);
 433        result = memcmp(a->iov[i].iov_base,
 434                        b->iov[i].iov_base,
 435                        a->iov[i].iov_len);
 436        if (result) {
 437            return false;
 438        }
 439    }
 440
 441    return true;
 442}
 443
 444static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b)
 445{
 446    BDRVQuorumState *s = acb->bs->opaque;
 447    ssize_t offset;
 448
 449    /* This driver will replace blkverify in this particular case */
 450    if (s->is_blkverify) {
 451        offset = qemu_iovec_compare(a, b);
 452        if (offset != -1) {
 453            fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64
 454                    " contents mismatch at offset %" PRIu64 "\n",
 455                    acb->offset, acb->bytes, acb->offset + offset);
 456            exit(1);
 457        }
 458        return true;
 459    }
 460
 461    return quorum_iovec_compare(a, b);
 462}
 463
 464/* Do a vote to get the error code */
 465static int quorum_vote_error(QuorumAIOCB *acb)
 466{
 467    BDRVQuorumState *s = acb->bs->opaque;
 468    QuorumVoteVersion *winner = NULL;
 469    QuorumVotes error_votes;
 470    QuorumVoteValue result_value;
 471    int i, ret = 0;
 472    bool error = false;
 473
 474    QLIST_INIT(&error_votes.vote_list);
 475    error_votes.compare = quorum_64bits_compare;
 476
 477    for (i = 0; i < s->num_children; i++) {
 478        ret = acb->qcrs[i].ret;
 479        if (ret) {
 480            error = true;
 481            result_value.l = ret;
 482            quorum_count_vote(&error_votes, &result_value, i);
 483        }
 484    }
 485
 486    if (error) {
 487        winner = quorum_get_vote_winner(&error_votes);
 488        ret = winner->value.l;
 489    }
 490
 491    quorum_free_vote_list(&error_votes);
 492
 493    return ret;
 494}
 495
 496static void quorum_vote(QuorumAIOCB *acb)
 497{
 498    bool quorum = true;
 499    int i, j, ret;
 500    QuorumVoteValue hash;
 501    BDRVQuorumState *s = acb->bs->opaque;
 502    QuorumVoteVersion *winner;
 503
 504    if (quorum_has_too_much_io_failed(acb)) {
 505        return;
 506    }
 507
 508    /* get the index of the first successful read */
 509    for (i = 0; i < s->num_children; i++) {
 510        if (!acb->qcrs[i].ret) {
 511            break;
 512        }
 513    }
 514
 515    assert(i < s->num_children);
 516
 517    /* compare this read with all other successful reads stopping at quorum
 518     * failure
 519     */
 520    for (j = i + 1; j < s->num_children; j++) {
 521        if (acb->qcrs[j].ret) {
 522            continue;
 523        }
 524        quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
 525        if (!quorum) {
 526            break;
 527       }
 528    }
 529
 530    /* Every successful read agrees */
 531    if (quorum) {
 532        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
 533        return;
 534    }
 535
 536    /* compute hashes for each successful read, also store indexes */
 537    for (i = 0; i < s->num_children; i++) {
 538        if (acb->qcrs[i].ret) {
 539            continue;
 540        }
 541        ret = quorum_compute_hash(acb, i, &hash);
 542        /* if ever the hash computation failed */
 543        if (ret < 0) {
 544            acb->vote_ret = ret;
 545            goto free_exit;
 546        }
 547        quorum_count_vote(&acb->votes, &hash, i);
 548    }
 549
 550    /* vote to select the most represented version */
 551    winner = quorum_get_vote_winner(&acb->votes);
 552
 553    /* if the winner count is smaller than threshold the read fails */
 554    if (winner->vote_count < s->threshold) {
 555        quorum_report_failure(acb);
 556        acb->vote_ret = -EIO;
 557        goto free_exit;
 558    }
 559
 560    /* we have a winner: copy it */
 561    quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
 562
 563    /* some versions are bad print them */
 564    quorum_report_bad_versions(s, acb, &winner->value);
 565
 566    /* corruption correction is enabled */
 567    if (s->rewrite_corrupted) {
 568        quorum_rewrite_bad_versions(acb, &winner->value);
 569    }
 570
 571free_exit:
 572    /* free lists */
 573    quorum_free_vote_list(&acb->votes);
 574}
 575
 576static void read_quorum_children_entry(void *opaque)
 577{
 578    QuorumCo *co = opaque;
 579    QuorumAIOCB *acb = co->acb;
 580    BDRVQuorumState *s = acb->bs->opaque;
 581    int i = co->idx;
 582    QuorumChildRequest *sacb = &acb->qcrs[i];
 583
 584    sacb->bs = s->children[i]->bs;
 585    sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes,
 586                               &acb->qcrs[i].qiov, 0);
 587
 588    if (sacb->ret == 0) {
 589        acb->success_count++;
 590    } else {
 591        quorum_report_bad_acb(sacb, sacb->ret);
 592    }
 593
 594    acb->count++;
 595    assert(acb->count <= s->num_children);
 596    assert(acb->success_count <= s->num_children);
 597
 598    /* Wake up the caller after the last read */
 599    if (acb->count == s->num_children) {
 600        qemu_coroutine_enter_if_inactive(acb->co);
 601    }
 602}
 603
 604static int read_quorum_children(QuorumAIOCB *acb)
 605{
 606    BDRVQuorumState *s = acb->bs->opaque;
 607    int i;
 608
 609    acb->children_read = s->num_children;
 610    for (i = 0; i < s->num_children; i++) {
 611        acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size);
 612        qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov);
 613        qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf);
 614    }
 615
 616    for (i = 0; i < s->num_children; i++) {
 617        Coroutine *co;
 618        QuorumCo data = {
 619            .acb = acb,
 620            .idx = i,
 621        };
 622
 623        co = qemu_coroutine_create(read_quorum_children_entry, &data);
 624        qemu_coroutine_enter(co);
 625    }
 626
 627    while (acb->count < s->num_children) {
 628        qemu_coroutine_yield();
 629    }
 630
 631    /* Do the vote on read */
 632    quorum_vote(acb);
 633    for (i = 0; i < s->num_children; i++) {
 634        qemu_vfree(acb->qcrs[i].buf);
 635        qemu_iovec_destroy(&acb->qcrs[i].qiov);
 636    }
 637
 638    while (acb->rewrite_count) {
 639        qemu_coroutine_yield();
 640    }
 641
 642    return acb->vote_ret;
 643}
 644
 645static int read_fifo_child(QuorumAIOCB *acb)
 646{
 647    BDRVQuorumState *s = acb->bs->opaque;
 648    int n, ret;
 649
 650    /* We try to read the next child in FIFO order if we failed to read */
 651    do {
 652        n = acb->children_read++;
 653        acb->qcrs[n].bs = s->children[n]->bs;
 654        ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes,
 655                             acb->qiov, 0);
 656        if (ret < 0) {
 657            quorum_report_bad_acb(&acb->qcrs[n], ret);
 658        }
 659    } while (ret < 0 && acb->children_read < s->num_children);
 660
 661    /* FIXME: rewrite failed children if acb->children_read > 1? */
 662
 663    return ret;
 664}
 665
 666static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset,
 667                            uint64_t bytes, QEMUIOVector *qiov, int flags)
 668{
 669    BDRVQuorumState *s = bs->opaque;
 670    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
 671    int ret;
 672
 673    acb->is_read = true;
 674    acb->children_read = 0;
 675
 676    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
 677        ret = read_quorum_children(acb);
 678    } else {
 679        ret = read_fifo_child(acb);
 680    }
 681    quorum_aio_finalize(acb);
 682
 683    return ret;
 684}
 685
 686static void write_quorum_entry(void *opaque)
 687{
 688    QuorumCo *co = opaque;
 689    QuorumAIOCB *acb = co->acb;
 690    BDRVQuorumState *s = acb->bs->opaque;
 691    int i = co->idx;
 692    QuorumChildRequest *sacb = &acb->qcrs[i];
 693
 694    sacb->bs = s->children[i]->bs;
 695    if (acb->flags & BDRV_REQ_ZERO_WRITE) {
 696        sacb->ret = bdrv_co_pwrite_zeroes(s->children[i], acb->offset,
 697                                          acb->bytes, acb->flags);
 698    } else {
 699        sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
 700                                    acb->qiov, acb->flags);
 701    }
 702    if (sacb->ret == 0) {
 703        acb->success_count++;
 704    } else {
 705        quorum_report_bad_acb(sacb, sacb->ret);
 706    }
 707    acb->count++;
 708    assert(acb->count <= s->num_children);
 709    assert(acb->success_count <= s->num_children);
 710
 711    /* Wake up the caller after the last write */
 712    if (acb->count == s->num_children) {
 713        qemu_coroutine_enter_if_inactive(acb->co);
 714    }
 715}
 716
 717static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset,
 718                             uint64_t bytes, QEMUIOVector *qiov, int flags)
 719{
 720    BDRVQuorumState *s = bs->opaque;
 721    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
 722    int i, ret;
 723
 724    for (i = 0; i < s->num_children; i++) {
 725        Coroutine *co;
 726        QuorumCo data = {
 727            .acb = acb,
 728            .idx = i,
 729        };
 730
 731        co = qemu_coroutine_create(write_quorum_entry, &data);
 732        qemu_coroutine_enter(co);
 733    }
 734
 735    while (acb->count < s->num_children) {
 736        qemu_coroutine_yield();
 737    }
 738
 739    quorum_has_too_much_io_failed(acb);
 740
 741    ret = acb->vote_ret;
 742    quorum_aio_finalize(acb);
 743
 744    return ret;
 745}
 746
 747static int quorum_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
 748                                   int bytes, BdrvRequestFlags flags)
 749
 750{
 751    return quorum_co_pwritev(bs, offset, bytes, NULL,
 752                             flags | BDRV_REQ_ZERO_WRITE);
 753}
 754
 755static int64_t quorum_getlength(BlockDriverState *bs)
 756{
 757    BDRVQuorumState *s = bs->opaque;
 758    int64_t result;
 759    int i;
 760
 761    /* check that all file have the same length */
 762    result = bdrv_getlength(s->children[0]->bs);
 763    if (result < 0) {
 764        return result;
 765    }
 766    for (i = 1; i < s->num_children; i++) {
 767        int64_t value = bdrv_getlength(s->children[i]->bs);
 768        if (value < 0) {
 769            return value;
 770        }
 771        if (value != result) {
 772            return -EIO;
 773        }
 774    }
 775
 776    return result;
 777}
 778
 779static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
 780{
 781    BDRVQuorumState *s = bs->opaque;
 782    QuorumVoteVersion *winner = NULL;
 783    QuorumVotes error_votes;
 784    QuorumVoteValue result_value;
 785    int i;
 786    int result = 0;
 787    int success_count = 0;
 788
 789    QLIST_INIT(&error_votes.vote_list);
 790    error_votes.compare = quorum_64bits_compare;
 791
 792    for (i = 0; i < s->num_children; i++) {
 793        result = bdrv_co_flush(s->children[i]->bs);
 794        if (result) {
 795            quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 0,
 796                              s->children[i]->bs->node_name, result);
 797            result_value.l = result;
 798            quorum_count_vote(&error_votes, &result_value, i);
 799        } else {
 800            success_count++;
 801        }
 802    }
 803
 804    if (success_count >= s->threshold) {
 805        result = 0;
 806    } else {
 807        winner = quorum_get_vote_winner(&error_votes);
 808        result = winner->value.l;
 809    }
 810    quorum_free_vote_list(&error_votes);
 811
 812    return result;
 813}
 814
 815static bool quorum_recurse_can_replace(BlockDriverState *bs,
 816                                       BlockDriverState *to_replace)
 817{
 818    BDRVQuorumState *s = bs->opaque;
 819    int i;
 820
 821    for (i = 0; i < s->num_children; i++) {
 822        /*
 823         * We have no idea whether our children show the same data as
 824         * this node (@bs).  It is actually highly likely that
 825         * @to_replace does not, because replacing a broken child is
 826         * one of the main use cases here.
 827         *
 828         * We do know that the new BDS will match @bs, so replacing
 829         * any of our children by it will be safe.  It cannot change
 830         * the data this quorum node presents to its parents.
 831         *
 832         * However, replacing @to_replace by @bs in any of our
 833         * children's chains may change visible data somewhere in
 834         * there.  We therefore cannot recurse down those chains with
 835         * bdrv_recurse_can_replace().
 836         * (More formally, bdrv_recurse_can_replace() requires that
 837         * @to_replace will be replaced by something matching the @bs
 838         * passed to it.  We cannot guarantee that.)
 839         *
 840         * Thus, we can only check whether any of our immediate
 841         * children matches @to_replace.
 842         *
 843         * (In the future, we might add a function to recurse down a
 844         * chain that checks that nothing there cares about a change
 845         * in data from the respective child in question.  For
 846         * example, most filters do not care when their child's data
 847         * suddenly changes, as long as their parents do not care.)
 848         */
 849        if (s->children[i]->bs == to_replace) {
 850            /*
 851             * We now have to ensure that there is no other parent
 852             * that cares about replacing this child by a node with
 853             * potentially different data.
 854             * We do so by checking whether there are any other parents
 855             * at all, which is stricter than necessary, but also very
 856             * simple.  (We may decide to implement something more
 857             * complex and permissive when there is an actual need for
 858             * it.)
 859             */
 860            return QLIST_FIRST(&to_replace->parents) == s->children[i] &&
 861                QLIST_NEXT(s->children[i], next_parent) == NULL;
 862        }
 863    }
 864
 865    return false;
 866}
 867
 868static int quorum_valid_threshold(int threshold, int num_children, Error **errp)
 869{
 870
 871    if (threshold < 1) {
 872        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
 873                   "vote-threshold", "a value >= 1");
 874        return -ERANGE;
 875    }
 876
 877    if (threshold > num_children) {
 878        error_setg(errp, "threshold may not exceed children count");
 879        return -ERANGE;
 880    }
 881
 882    return 0;
 883}
 884
 885static QemuOptsList quorum_runtime_opts = {
 886    .name = "quorum",
 887    .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
 888    .desc = {
 889        {
 890            .name = QUORUM_OPT_VOTE_THRESHOLD,
 891            .type = QEMU_OPT_NUMBER,
 892            .help = "The number of vote needed for reaching quorum",
 893        },
 894        {
 895            .name = QUORUM_OPT_BLKVERIFY,
 896            .type = QEMU_OPT_BOOL,
 897            .help = "Trigger block verify mode if set",
 898        },
 899        {
 900            .name = QUORUM_OPT_REWRITE,
 901            .type = QEMU_OPT_BOOL,
 902            .help = "Rewrite corrupted block on read quorum",
 903        },
 904        {
 905            .name = QUORUM_OPT_READ_PATTERN,
 906            .type = QEMU_OPT_STRING,
 907            .help = "Allowed pattern: quorum, fifo. Quorum is default",
 908        },
 909        { /* end of list */ }
 910    },
 911};
 912
 913static void quorum_refresh_flags(BlockDriverState *bs)
 914{
 915    BDRVQuorumState *s = bs->opaque;
 916    int i;
 917
 918    bs->supported_zero_flags =
 919        BDRV_REQ_FUA | BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
 920
 921    for (i = 0; i < s->num_children; i++) {
 922        bs->supported_zero_flags &= s->children[i]->bs->supported_zero_flags;
 923    }
 924
 925    bs->supported_zero_flags |= BDRV_REQ_WRITE_UNCHANGED;
 926}
 927
 928static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
 929                       Error **errp)
 930{
 931    BDRVQuorumState *s = bs->opaque;
 932    QemuOpts *opts = NULL;
 933    const char *pattern_str;
 934    bool *opened;
 935    int i;
 936    int ret = 0;
 937
 938    qdict_flatten(options);
 939
 940    /* count how many different children are present */
 941    s->num_children = qdict_array_entries(options, "children.");
 942    if (s->num_children < 0) {
 943        error_setg(errp, "Option children is not a valid array");
 944        ret = -EINVAL;
 945        goto exit;
 946    }
 947    if (s->num_children < 1) {
 948        error_setg(errp, "Number of provided children must be 1 or more");
 949        ret = -EINVAL;
 950        goto exit;
 951    }
 952
 953    opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
 954    if (!qemu_opts_absorb_qdict(opts, options, errp)) {
 955        ret = -EINVAL;
 956        goto exit;
 957    }
 958
 959    s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
 960    /* and validate it against s->num_children */
 961    ret = quorum_valid_threshold(s->threshold, s->num_children, errp);
 962    if (ret < 0) {
 963        goto exit;
 964    }
 965
 966    pattern_str = qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN);
 967    if (!pattern_str) {
 968        ret = QUORUM_READ_PATTERN_QUORUM;
 969    } else {
 970        ret = qapi_enum_parse(&QuorumReadPattern_lookup, pattern_str,
 971                              -EINVAL, NULL);
 972    }
 973    if (ret < 0) {
 974        error_setg(errp, "Please set read-pattern as fifo or quorum");
 975        goto exit;
 976    }
 977    s->read_pattern = ret;
 978
 979    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
 980        s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false);
 981        if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) {
 982            error_setg(errp, "blkverify=on can only be set if there are "
 983                       "exactly two files and vote-threshold is 2");
 984            ret = -EINVAL;
 985            goto exit;
 986        }
 987
 988        s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
 989                                                 false);
 990        if (s->rewrite_corrupted && s->is_blkverify) {
 991            error_setg(errp,
 992                       "rewrite-corrupted=on cannot be used with blkverify=on");
 993            ret = -EINVAL;
 994            goto exit;
 995        }
 996    }
 997
 998    /* allocate the children array */
 999    s->children = g_new0(BdrvChild *, s->num_children);
1000    opened = g_new0(bool, s->num_children);
1001
1002    for (i = 0; i < s->num_children; i++) {
1003        char indexstr[INDEXSTR_LEN];
1004        ret = snprintf(indexstr, INDEXSTR_LEN, "children.%d", i);
1005        assert(ret < INDEXSTR_LEN);
1006
1007        s->children[i] = bdrv_open_child(NULL, options, indexstr, bs,
1008                                         &child_of_bds, BDRV_CHILD_DATA, false,
1009                                         errp);
1010        if (!s->children[i]) {
1011            ret = -EINVAL;
1012            goto close_exit;
1013        }
1014
1015        opened[i] = true;
1016    }
1017    s->next_child_index = s->num_children;
1018
1019    bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
1020    quorum_refresh_flags(bs);
1021
1022    g_free(opened);
1023    goto exit;
1024
1025close_exit:
1026    /* cleanup on error */
1027    for (i = 0; i < s->num_children; i++) {
1028        if (!opened[i]) {
1029            continue;
1030        }
1031        bdrv_unref_child(bs, s->children[i]);
1032    }
1033    g_free(s->children);
1034    g_free(opened);
1035exit:
1036    qemu_opts_del(opts);
1037    return ret;
1038}
1039
1040static void quorum_close(BlockDriverState *bs)
1041{
1042    BDRVQuorumState *s = bs->opaque;
1043    int i;
1044
1045    for (i = 0; i < s->num_children; i++) {
1046        bdrv_unref_child(bs, s->children[i]);
1047    }
1048
1049    g_free(s->children);
1050}
1051
1052static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
1053                             Error **errp)
1054{
1055    BDRVQuorumState *s = bs->opaque;
1056    BdrvChild *child;
1057    char indexstr[INDEXSTR_LEN];
1058    int ret;
1059
1060    if (s->is_blkverify) {
1061        error_setg(errp, "Cannot add a child to a quorum in blkverify mode");
1062        return;
1063    }
1064
1065    assert(s->num_children <= INT_MAX / sizeof(BdrvChild *));
1066    if (s->num_children == INT_MAX / sizeof(BdrvChild *) ||
1067        s->next_child_index == UINT_MAX) {
1068        error_setg(errp, "Too many children");
1069        return;
1070    }
1071
1072    ret = snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index);
1073    if (ret < 0 || ret >= INDEXSTR_LEN) {
1074        error_setg(errp, "cannot generate child name");
1075        return;
1076    }
1077    s->next_child_index++;
1078
1079    bdrv_drained_begin(bs);
1080
1081    /* We can safely add the child now */
1082    bdrv_ref(child_bs);
1083
1084    child = bdrv_attach_child(bs, child_bs, indexstr, &child_of_bds,
1085                              BDRV_CHILD_DATA, errp);
1086    if (child == NULL) {
1087        s->next_child_index--;
1088        goto out;
1089    }
1090    s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
1091    s->children[s->num_children++] = child;
1092    quorum_refresh_flags(bs);
1093
1094out:
1095    bdrv_drained_end(bs);
1096}
1097
1098static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
1099                             Error **errp)
1100{
1101    BDRVQuorumState *s = bs->opaque;
1102    char indexstr[INDEXSTR_LEN];
1103    int i;
1104
1105    for (i = 0; i < s->num_children; i++) {
1106        if (s->children[i] == child) {
1107            break;
1108        }
1109    }
1110
1111    /* we have checked it in bdrv_del_child() */
1112    assert(i < s->num_children);
1113
1114    if (s->num_children <= s->threshold) {
1115        error_setg(errp,
1116            "The number of children cannot be lower than the vote threshold %d",
1117            s->threshold);
1118        return;
1119    }
1120
1121    /* We know now that num_children > threshold, so blkverify must be false */
1122    assert(!s->is_blkverify);
1123
1124    snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index - 1);
1125    if (!strncmp(child->name, indexstr, INDEXSTR_LEN)) {
1126        s->next_child_index--;
1127    }
1128
1129    bdrv_drained_begin(bs);
1130
1131    /* We can safely remove this child now */
1132    memmove(&s->children[i], &s->children[i + 1],
1133            (s->num_children - i - 1) * sizeof(BdrvChild *));
1134    s->children = g_renew(BdrvChild *, s->children, --s->num_children);
1135    bdrv_unref_child(bs, child);
1136
1137    quorum_refresh_flags(bs);
1138    bdrv_drained_end(bs);
1139}
1140
1141static void quorum_gather_child_options(BlockDriverState *bs, QDict *target,
1142                                        bool backing_overridden)
1143{
1144    BDRVQuorumState *s = bs->opaque;
1145    QList *children_list;
1146    int i;
1147
1148    /*
1149     * The generic implementation for gathering child options in
1150     * bdrv_refresh_filename() would use the names of the children
1151     * as specified for bdrv_open_child() or bdrv_attach_child(),
1152     * which is "children.%u" with %u being a value
1153     * (s->next_child_index) that is incremented each time a new child
1154     * is added (and never decremented).  Since children can be
1155     * deleted at runtime, there may be gaps in that enumeration.
1156     * When creating a new quorum BDS and specifying the children for
1157     * it through runtime options, the enumeration used there may not
1158     * have any gaps, though.
1159     *
1160     * Therefore, we have to create a new gap-less enumeration here
1161     * (which we can achieve by simply putting all of the children's
1162     * full_open_options into a QList).
1163     *
1164     * XXX: Note that there are issues with the current child option
1165     *      structure quorum uses (such as the fact that children do
1166     *      not really have unique permanent names).  Therefore, this
1167     *      is going to have to change in the future and ideally we
1168     *      want quorum to be covered by the generic implementation.
1169     */
1170
1171    children_list = qlist_new();
1172    qdict_put(target, "children", children_list);
1173
1174    for (i = 0; i < s->num_children; i++) {
1175        qlist_append(children_list,
1176                     qobject_ref(s->children[i]->bs->full_open_options));
1177    }
1178}
1179
1180static char *quorum_dirname(BlockDriverState *bs, Error **errp)
1181{
1182    /* In general, there are multiple BDSs with different dirnames below this
1183     * one; so there is no unique dirname we could return (unless all are equal
1184     * by chance, or there is only one). Therefore, to be consistent, just
1185     * always return NULL. */
1186    error_setg(errp, "Cannot generate a base directory for quorum nodes");
1187    return NULL;
1188}
1189
1190static void quorum_child_perm(BlockDriverState *bs, BdrvChild *c,
1191                              BdrvChildRole role,
1192                              BlockReopenQueue *reopen_queue,
1193                              uint64_t perm, uint64_t shared,
1194                              uint64_t *nperm, uint64_t *nshared)
1195{
1196    BDRVQuorumState *s = bs->opaque;
1197
1198    *nperm = perm & DEFAULT_PERM_PASSTHROUGH;
1199    if (s->rewrite_corrupted) {
1200        *nperm |= BLK_PERM_WRITE;
1201    }
1202
1203    /*
1204     * We cannot share RESIZE or WRITE, as this would make the
1205     * children differ from each other.
1206     */
1207    *nshared = (shared & (BLK_PERM_CONSISTENT_READ |
1208                          BLK_PERM_WRITE_UNCHANGED))
1209             | DEFAULT_PERM_UNCHANGED;
1210}
1211
1212/*
1213 * Each one of the children can report different status flags even
1214 * when they contain the same data, so what this function does is
1215 * return BDRV_BLOCK_ZERO if *all* children agree that a certain
1216 * region contains zeroes, and BDRV_BLOCK_DATA otherwise.
1217 */
1218static int coroutine_fn quorum_co_block_status(BlockDriverState *bs,
1219                                               bool want_zero,
1220                                               int64_t offset, int64_t count,
1221                                               int64_t *pnum, int64_t *map,
1222                                               BlockDriverState **file)
1223{
1224    BDRVQuorumState *s = bs->opaque;
1225    int i, ret;
1226    int64_t pnum_zero = count;
1227    int64_t pnum_data = 0;
1228
1229    for (i = 0; i < s->num_children; i++) {
1230        int64_t bytes;
1231        ret = bdrv_co_common_block_status_above(s->children[i]->bs, NULL, false,
1232                                                want_zero, offset, count,
1233                                                &bytes, NULL, NULL, NULL);
1234        if (ret < 0) {
1235            quorum_report_bad(QUORUM_OP_TYPE_READ, offset, count,
1236                              s->children[i]->bs->node_name, ret);
1237            pnum_data = count;
1238            break;
1239        }
1240        /*
1241         * Even if all children agree about whether there are zeroes
1242         * or not at @offset they might disagree on the size, so use
1243         * the smallest when reporting BDRV_BLOCK_ZERO and the largest
1244         * when reporting BDRV_BLOCK_DATA.
1245         */
1246        if (ret & BDRV_BLOCK_ZERO) {
1247            pnum_zero = MIN(pnum_zero, bytes);
1248        } else {
1249            pnum_data = MAX(pnum_data, bytes);
1250        }
1251    }
1252
1253    if (pnum_data) {
1254        *pnum = pnum_data;
1255        return BDRV_BLOCK_DATA;
1256    } else {
1257        *pnum = pnum_zero;
1258        return BDRV_BLOCK_ZERO;
1259    }
1260}
1261
1262static const char *const quorum_strong_runtime_opts[] = {
1263    QUORUM_OPT_VOTE_THRESHOLD,
1264    QUORUM_OPT_BLKVERIFY,
1265    QUORUM_OPT_REWRITE,
1266    QUORUM_OPT_READ_PATTERN,
1267
1268    NULL
1269};
1270
1271static BlockDriver bdrv_quorum = {
1272    .format_name                        = "quorum",
1273
1274    .instance_size                      = sizeof(BDRVQuorumState),
1275
1276    .bdrv_open                          = quorum_open,
1277    .bdrv_close                         = quorum_close,
1278    .bdrv_gather_child_options          = quorum_gather_child_options,
1279    .bdrv_dirname                       = quorum_dirname,
1280    .bdrv_co_block_status               = quorum_co_block_status,
1281
1282    .bdrv_co_flush                      = quorum_co_flush,
1283
1284    .bdrv_getlength                     = quorum_getlength,
1285
1286    .bdrv_co_preadv                     = quorum_co_preadv,
1287    .bdrv_co_pwritev                    = quorum_co_pwritev,
1288    .bdrv_co_pwrite_zeroes              = quorum_co_pwrite_zeroes,
1289
1290    .bdrv_add_child                     = quorum_add_child,
1291    .bdrv_del_child                     = quorum_del_child,
1292
1293    .bdrv_child_perm                    = quorum_child_perm,
1294
1295    .bdrv_recurse_can_replace           = quorum_recurse_can_replace,
1296
1297    .strong_runtime_opts                = quorum_strong_runtime_opts,
1298};
1299
1300static void bdrv_quorum_init(void)
1301{
1302    if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) {
1303        /* SHA256 hash support is required for quorum device */
1304        return;
1305    }
1306    bdrv_register(&bdrv_quorum);
1307}
1308
1309block_init(bdrv_quorum_init);
1310