qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include "qemu/osdep.h"
  15
  16#include <rbd/librbd.h>
  17#include "qapi/error.h"
  18#include "qemu/error-report.h"
  19#include "qemu/module.h"
  20#include "qemu/option.h"
  21#include "block/block_int.h"
  22#include "block/qdict.h"
  23#include "crypto/secret.h"
  24#include "qemu/cutils.h"
  25#include "sysemu/replay.h"
  26#include "qapi/qmp/qstring.h"
  27#include "qapi/qmp/qdict.h"
  28#include "qapi/qmp/qjson.h"
  29#include "qapi/qmp/qlist.h"
  30#include "qapi/qobject-input-visitor.h"
  31#include "qapi/qapi-visit-block-core.h"
  32
  33/*
  34 * When specifying the image filename use:
  35 *
  36 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  37 *
  38 * poolname must be the name of an existing rados pool.
  39 *
  40 * devicename is the name of the rbd image.
  41 *
  42 * Each option given is used to configure rados, and may be any valid
  43 * Ceph option, "id", or "conf".
  44 *
  45 * The "id" option indicates what user we should authenticate as to
  46 * the Ceph cluster.  If it is excluded we will use the Ceph default
  47 * (normally 'admin').
  48 *
  49 * The "conf" option specifies a Ceph configuration file to read.  If
  50 * it is not specified, we will read from the default Ceph locations
  51 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  52 * file, specify conf=/dev/null.
  53 *
  54 * Configuration values containing :, @, or = can be escaped with a
  55 * leading "\".
  56 */
  57
  58#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  59
  60#define RBD_MAX_SNAPS 100
  61
  62#define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
  63
  64static const char rbd_luks_header_verification[
  65        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
  66    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
  67};
  68
  69static const char rbd_luks2_header_verification[
  70        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
  71    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
  72};
  73
  74typedef enum {
  75    RBD_AIO_READ,
  76    RBD_AIO_WRITE,
  77    RBD_AIO_DISCARD,
  78    RBD_AIO_FLUSH,
  79    RBD_AIO_WRITE_ZEROES
  80} RBDAIOCmd;
  81
  82typedef struct BDRVRBDState {
  83    rados_t cluster;
  84    rados_ioctx_t io_ctx;
  85    rbd_image_t image;
  86    char *image_name;
  87    char *snap;
  88    char *namespace;
  89    uint64_t image_size;
  90    uint64_t object_size;
  91} BDRVRBDState;
  92
  93typedef struct RBDTask {
  94    BlockDriverState *bs;
  95    Coroutine *co;
  96    bool complete;
  97    int64_t ret;
  98} RBDTask;
  99
 100typedef struct RBDDiffIterateReq {
 101    uint64_t offs;
 102    uint64_t bytes;
 103    bool exists;
 104} RBDDiffIterateReq;
 105
 106static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 107                            BlockdevOptionsRbd *opts, bool cache,
 108                            const char *keypairs, const char *secretid,
 109                            Error **errp);
 110
 111static char *qemu_rbd_strchr(char *src, char delim)
 112{
 113    char *p;
 114
 115    for (p = src; *p; ++p) {
 116        if (*p == delim) {
 117            return p;
 118        }
 119        if (*p == '\\' && p[1] != '\0') {
 120            ++p;
 121        }
 122    }
 123
 124    return NULL;
 125}
 126
 127
 128static char *qemu_rbd_next_tok(char *src, char delim, char **p)
 129{
 130    char *end;
 131
 132    *p = NULL;
 133
 134    end = qemu_rbd_strchr(src, delim);
 135    if (end) {
 136        *p = end + 1;
 137        *end = '\0';
 138    }
 139    return src;
 140}
 141
 142static void qemu_rbd_unescape(char *src)
 143{
 144    char *p;
 145
 146    for (p = src; *src; ++src, ++p) {
 147        if (*src == '\\' && src[1] != '\0') {
 148            src++;
 149        }
 150        *p = *src;
 151    }
 152    *p = '\0';
 153}
 154
 155static void qemu_rbd_parse_filename(const char *filename, QDict *options,
 156                                    Error **errp)
 157{
 158    const char *start;
 159    char *p, *buf;
 160    QList *keypairs = NULL;
 161    char *found_str, *image_name;
 162
 163    if (!strstart(filename, "rbd:", &start)) {
 164        error_setg(errp, "File name must start with 'rbd:'");
 165        return;
 166    }
 167
 168    buf = g_strdup(start);
 169    p = buf;
 170
 171    found_str = qemu_rbd_next_tok(p, '/', &p);
 172    if (!p) {
 173        error_setg(errp, "Pool name is required");
 174        goto done;
 175    }
 176    qemu_rbd_unescape(found_str);
 177    qdict_put_str(options, "pool", found_str);
 178
 179    if (qemu_rbd_strchr(p, '@')) {
 180        image_name = qemu_rbd_next_tok(p, '@', &p);
 181
 182        found_str = qemu_rbd_next_tok(p, ':', &p);
 183        qemu_rbd_unescape(found_str);
 184        qdict_put_str(options, "snapshot", found_str);
 185    } else {
 186        image_name = qemu_rbd_next_tok(p, ':', &p);
 187    }
 188    /* Check for namespace in the image_name */
 189    if (qemu_rbd_strchr(image_name, '/')) {
 190        found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
 191        qemu_rbd_unescape(found_str);
 192        qdict_put_str(options, "namespace", found_str);
 193    } else {
 194        qdict_put_str(options, "namespace", "");
 195    }
 196    qemu_rbd_unescape(image_name);
 197    qdict_put_str(options, "image", image_name);
 198    if (!p) {
 199        goto done;
 200    }
 201
 202    /* The following are essentially all key/value pairs, and we treat
 203     * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
 204    while (p) {
 205        char *name, *value;
 206        name = qemu_rbd_next_tok(p, '=', &p);
 207        if (!p) {
 208            error_setg(errp, "conf option %s has no value", name);
 209            break;
 210        }
 211
 212        qemu_rbd_unescape(name);
 213
 214        value = qemu_rbd_next_tok(p, ':', &p);
 215        qemu_rbd_unescape(value);
 216
 217        if (!strcmp(name, "conf")) {
 218            qdict_put_str(options, "conf", value);
 219        } else if (!strcmp(name, "id")) {
 220            qdict_put_str(options, "user", value);
 221        } else {
 222            /*
 223             * We pass these internally to qemu_rbd_set_keypairs(), so
 224             * we can get away with the simpler list of [ "key1",
 225             * "value1", "key2", "value2" ] rather than a raw dict
 226             * { "key1": "value1", "key2": "value2" } where we can't
 227             * guarantee order, or even a more correct but complex
 228             * [ { "key1": "value1" }, { "key2": "value2" } ]
 229             */
 230            if (!keypairs) {
 231                keypairs = qlist_new();
 232            }
 233            qlist_append_str(keypairs, name);
 234            qlist_append_str(keypairs, value);
 235        }
 236    }
 237
 238    if (keypairs) {
 239        qdict_put(options, "=keyvalue-pairs",
 240                  qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
 241    }
 242
 243done:
 244    g_free(buf);
 245    qobject_unref(keypairs);
 246    return;
 247}
 248
 249static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
 250                             Error **errp)
 251{
 252    char *key, *acr;
 253    int r;
 254    GString *accu;
 255    RbdAuthModeList *auth;
 256
 257    if (opts->key_secret) {
 258        key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
 259        if (!key) {
 260            return -EIO;
 261        }
 262        r = rados_conf_set(cluster, "key", key);
 263        g_free(key);
 264        if (r < 0) {
 265            error_setg_errno(errp, -r, "Could not set 'key'");
 266            return r;
 267        }
 268    }
 269
 270    if (opts->has_auth_client_required) {
 271        accu = g_string_new("");
 272        for (auth = opts->auth_client_required; auth; auth = auth->next) {
 273            if (accu->str[0]) {
 274                g_string_append_c(accu, ';');
 275            }
 276            g_string_append(accu, RbdAuthMode_str(auth->value));
 277        }
 278        acr = g_string_free(accu, FALSE);
 279        r = rados_conf_set(cluster, "auth_client_required", acr);
 280        g_free(acr);
 281        if (r < 0) {
 282            error_setg_errno(errp, -r,
 283                             "Could not set 'auth_client_required'");
 284            return r;
 285        }
 286    }
 287
 288    return 0;
 289}
 290
 291static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
 292                                 Error **errp)
 293{
 294    QList *keypairs;
 295    QString *name;
 296    QString *value;
 297    const char *key;
 298    size_t remaining;
 299    int ret = 0;
 300
 301    if (!keypairs_json) {
 302        return ret;
 303    }
 304    keypairs = qobject_to(QList,
 305                          qobject_from_json(keypairs_json, &error_abort));
 306    remaining = qlist_size(keypairs) / 2;
 307    assert(remaining);
 308
 309    while (remaining--) {
 310        name = qobject_to(QString, qlist_pop(keypairs));
 311        value = qobject_to(QString, qlist_pop(keypairs));
 312        assert(name && value);
 313        key = qstring_get_str(name);
 314
 315        ret = rados_conf_set(cluster, key, qstring_get_str(value));
 316        qobject_unref(value);
 317        if (ret < 0) {
 318            error_setg_errno(errp, -ret, "invalid conf option %s", key);
 319            qobject_unref(name);
 320            ret = -EINVAL;
 321            break;
 322        }
 323        qobject_unref(name);
 324    }
 325
 326    qobject_unref(keypairs);
 327    return ret;
 328}
 329
 330#ifdef LIBRBD_SUPPORTS_ENCRYPTION
 331static int qemu_rbd_convert_luks_options(
 332        RbdEncryptionOptionsLUKSBase *luks_opts,
 333        char **passphrase,
 334        size_t *passphrase_len,
 335        Error **errp)
 336{
 337    return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
 338                                 passphrase_len, errp);
 339}
 340
 341static int qemu_rbd_convert_luks_create_options(
 342        RbdEncryptionCreateOptionsLUKSBase *luks_opts,
 343        rbd_encryption_algorithm_t *alg,
 344        char **passphrase,
 345        size_t *passphrase_len,
 346        Error **errp)
 347{
 348    int r = 0;
 349
 350    r = qemu_rbd_convert_luks_options(
 351            qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
 352            passphrase, passphrase_len, errp);
 353    if (r < 0) {
 354        return r;
 355    }
 356
 357    if (luks_opts->has_cipher_alg) {
 358        switch (luks_opts->cipher_alg) {
 359            case QCRYPTO_CIPHER_ALG_AES_128: {
 360                *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
 361                break;
 362            }
 363            case QCRYPTO_CIPHER_ALG_AES_256: {
 364                *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
 365                break;
 366            }
 367            default: {
 368                r = -ENOTSUP;
 369                error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
 370                                 luks_opts->cipher_alg);
 371                return r;
 372            }
 373        }
 374    } else {
 375        /* default alg */
 376        *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
 377    }
 378
 379    return 0;
 380}
 381
 382static int qemu_rbd_encryption_format(rbd_image_t image,
 383                                      RbdEncryptionCreateOptions *encrypt,
 384                                      Error **errp)
 385{
 386    int r = 0;
 387    g_autofree char *passphrase = NULL;
 388    size_t passphrase_len;
 389    rbd_encryption_format_t format;
 390    rbd_encryption_options_t opts;
 391    rbd_encryption_luks1_format_options_t luks_opts;
 392    rbd_encryption_luks2_format_options_t luks2_opts;
 393    size_t opts_size;
 394    uint64_t raw_size, effective_size;
 395
 396    r = rbd_get_size(image, &raw_size);
 397    if (r < 0) {
 398        error_setg_errno(errp, -r, "cannot get raw image size");
 399        return r;
 400    }
 401
 402    switch (encrypt->format) {
 403        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
 404            memset(&luks_opts, 0, sizeof(luks_opts));
 405            format = RBD_ENCRYPTION_FORMAT_LUKS1;
 406            opts = &luks_opts;
 407            opts_size = sizeof(luks_opts);
 408            r = qemu_rbd_convert_luks_create_options(
 409                    qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
 410                    &luks_opts.alg, &passphrase, &passphrase_len, errp);
 411            if (r < 0) {
 412                return r;
 413            }
 414            luks_opts.passphrase = passphrase;
 415            luks_opts.passphrase_size = passphrase_len;
 416            break;
 417        }
 418        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
 419            memset(&luks2_opts, 0, sizeof(luks2_opts));
 420            format = RBD_ENCRYPTION_FORMAT_LUKS2;
 421            opts = &luks2_opts;
 422            opts_size = sizeof(luks2_opts);
 423            r = qemu_rbd_convert_luks_create_options(
 424                    qapi_RbdEncryptionCreateOptionsLUKS2_base(
 425                            &encrypt->u.luks2),
 426                    &luks2_opts.alg, &passphrase, &passphrase_len, errp);
 427            if (r < 0) {
 428                return r;
 429            }
 430            luks2_opts.passphrase = passphrase;
 431            luks2_opts.passphrase_size = passphrase_len;
 432            break;
 433        }
 434        default: {
 435            r = -ENOTSUP;
 436            error_setg_errno(
 437                    errp, -r, "unknown image encryption format: %u",
 438                    encrypt->format);
 439            return r;
 440        }
 441    }
 442
 443    r = rbd_encryption_format(image, format, opts, opts_size);
 444    if (r < 0) {
 445        error_setg_errno(errp, -r, "encryption format fail");
 446        return r;
 447    }
 448
 449    r = rbd_get_size(image, &effective_size);
 450    if (r < 0) {
 451        error_setg_errno(errp, -r, "cannot get effective image size");
 452        return r;
 453    }
 454
 455    r = rbd_resize(image, raw_size + (raw_size - effective_size));
 456    if (r < 0) {
 457        error_setg_errno(errp, -r, "cannot resize image after format");
 458        return r;
 459    }
 460
 461    return 0;
 462}
 463
 464static int qemu_rbd_encryption_load(rbd_image_t image,
 465                                    RbdEncryptionOptions *encrypt,
 466                                    Error **errp)
 467{
 468    int r = 0;
 469    g_autofree char *passphrase = NULL;
 470    size_t passphrase_len;
 471    rbd_encryption_luks1_format_options_t luks_opts;
 472    rbd_encryption_luks2_format_options_t luks2_opts;
 473    rbd_encryption_format_t format;
 474    rbd_encryption_options_t opts;
 475    size_t opts_size;
 476
 477    switch (encrypt->format) {
 478        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
 479            memset(&luks_opts, 0, sizeof(luks_opts));
 480            format = RBD_ENCRYPTION_FORMAT_LUKS1;
 481            opts = &luks_opts;
 482            opts_size = sizeof(luks_opts);
 483            r = qemu_rbd_convert_luks_options(
 484                    qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
 485                    &passphrase, &passphrase_len, errp);
 486            if (r < 0) {
 487                return r;
 488            }
 489            luks_opts.passphrase = passphrase;
 490            luks_opts.passphrase_size = passphrase_len;
 491            break;
 492        }
 493        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
 494            memset(&luks2_opts, 0, sizeof(luks2_opts));
 495            format = RBD_ENCRYPTION_FORMAT_LUKS2;
 496            opts = &luks2_opts;
 497            opts_size = sizeof(luks2_opts);
 498            r = qemu_rbd_convert_luks_options(
 499                    qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
 500                    &passphrase, &passphrase_len, errp);
 501            if (r < 0) {
 502                return r;
 503            }
 504            luks2_opts.passphrase = passphrase;
 505            luks2_opts.passphrase_size = passphrase_len;
 506            break;
 507        }
 508        default: {
 509            r = -ENOTSUP;
 510            error_setg_errno(
 511                    errp, -r, "unknown image encryption format: %u",
 512                    encrypt->format);
 513            return r;
 514        }
 515    }
 516
 517    r = rbd_encryption_load(image, format, opts, opts_size);
 518    if (r < 0) {
 519        error_setg_errno(errp, -r, "encryption load fail");
 520        return r;
 521    }
 522
 523    return 0;
 524}
 525#endif
 526
 527/* FIXME Deprecate and remove keypairs or make it available in QMP. */
 528static int qemu_rbd_do_create(BlockdevCreateOptions *options,
 529                              const char *keypairs, const char *password_secret,
 530                              Error **errp)
 531{
 532    BlockdevCreateOptionsRbd *opts = &options->u.rbd;
 533    rados_t cluster;
 534    rados_ioctx_t io_ctx;
 535    int obj_order = 0;
 536    int ret;
 537
 538    assert(options->driver == BLOCKDEV_DRIVER_RBD);
 539    if (opts->location->has_snapshot) {
 540        error_setg(errp, "Can't use snapshot name for image creation");
 541        return -EINVAL;
 542    }
 543
 544#ifndef LIBRBD_SUPPORTS_ENCRYPTION
 545    if (opts->has_encrypt) {
 546        error_setg(errp, "RBD library does not support image encryption");
 547        return -ENOTSUP;
 548    }
 549#endif
 550
 551    if (opts->has_cluster_size) {
 552        int64_t objsize = opts->cluster_size;
 553        if ((objsize - 1) & objsize) {    /* not a power of 2? */
 554            error_setg(errp, "obj size needs to be power of 2");
 555            return -EINVAL;
 556        }
 557        if (objsize < 4096) {
 558            error_setg(errp, "obj size too small");
 559            return -EINVAL;
 560        }
 561        obj_order = ctz32(objsize);
 562    }
 563
 564    ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
 565                           password_secret, errp);
 566    if (ret < 0) {
 567        return ret;
 568    }
 569
 570    ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
 571    if (ret < 0) {
 572        error_setg_errno(errp, -ret, "error rbd create");
 573        goto out;
 574    }
 575
 576#ifdef LIBRBD_SUPPORTS_ENCRYPTION
 577    if (opts->has_encrypt) {
 578        rbd_image_t image;
 579
 580        ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
 581        if (ret < 0) {
 582            error_setg_errno(errp, -ret,
 583                             "error opening image '%s' for encryption format",
 584                             opts->location->image);
 585            goto out;
 586        }
 587
 588        ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
 589        rbd_close(image);
 590        if (ret < 0) {
 591            /* encryption format fail, try removing the image */
 592            rbd_remove(io_ctx, opts->location->image);
 593            goto out;
 594        }
 595    }
 596#endif
 597
 598    ret = 0;
 599out:
 600    rados_ioctx_destroy(io_ctx);
 601    rados_shutdown(cluster);
 602    return ret;
 603}
 604
 605static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
 606{
 607    return qemu_rbd_do_create(options, NULL, NULL, errp);
 608}
 609
 610static int qemu_rbd_extract_encryption_create_options(
 611        QemuOpts *opts,
 612        RbdEncryptionCreateOptions **spec,
 613        Error **errp)
 614{
 615    QDict *opts_qdict;
 616    QDict *encrypt_qdict;
 617    Visitor *v;
 618    int ret = 0;
 619
 620    opts_qdict = qemu_opts_to_qdict(opts, NULL);
 621    qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
 622    qobject_unref(opts_qdict);
 623    if (!qdict_size(encrypt_qdict)) {
 624        *spec = NULL;
 625        goto exit;
 626    }
 627
 628    /* Convert options into a QAPI object */
 629    v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
 630    if (!v) {
 631        ret = -EINVAL;
 632        goto exit;
 633    }
 634
 635    visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
 636    visit_free(v);
 637    if (!*spec) {
 638        ret = -EINVAL;
 639        goto exit;
 640    }
 641
 642exit:
 643    qobject_unref(encrypt_qdict);
 644    return ret;
 645}
 646
 647static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
 648                                                const char *filename,
 649                                                QemuOpts *opts,
 650                                                Error **errp)
 651{
 652    BlockdevCreateOptions *create_options;
 653    BlockdevCreateOptionsRbd *rbd_opts;
 654    BlockdevOptionsRbd *loc;
 655    RbdEncryptionCreateOptions *encrypt = NULL;
 656    Error *local_err = NULL;
 657    const char *keypairs, *password_secret;
 658    QDict *options = NULL;
 659    int ret = 0;
 660
 661    create_options = g_new0(BlockdevCreateOptions, 1);
 662    create_options->driver = BLOCKDEV_DRIVER_RBD;
 663    rbd_opts = &create_options->u.rbd;
 664
 665    rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
 666
 667    password_secret = qemu_opt_get(opts, "password-secret");
 668
 669    /* Read out options */
 670    rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
 671                              BDRV_SECTOR_SIZE);
 672    rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
 673                                                   BLOCK_OPT_CLUSTER_SIZE, 0);
 674    rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
 675
 676    options = qdict_new();
 677    qemu_rbd_parse_filename(filename, options, &local_err);
 678    if (local_err) {
 679        ret = -EINVAL;
 680        error_propagate(errp, local_err);
 681        goto exit;
 682    }
 683
 684    ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
 685    if (ret < 0) {
 686        goto exit;
 687    }
 688    rbd_opts->encrypt     = encrypt;
 689    rbd_opts->has_encrypt = !!encrypt;
 690
 691    /*
 692     * Caution: while qdict_get_try_str() is fine, getting non-string
 693     * types would require more care.  When @options come from -blockdev
 694     * or blockdev_add, its members are typed according to the QAPI
 695     * schema, but when they come from -drive, they're all QString.
 696     */
 697    loc = rbd_opts->location;
 698    loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
 699    loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
 700    loc->has_conf    = !!loc->conf;
 701    loc->user        = g_strdup(qdict_get_try_str(options, "user"));
 702    loc->has_user    = !!loc->user;
 703    loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
 704    loc->has_q_namespace = !!loc->q_namespace;
 705    loc->image       = g_strdup(qdict_get_try_str(options, "image"));
 706    keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
 707
 708    ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
 709    if (ret < 0) {
 710        goto exit;
 711    }
 712
 713exit:
 714    qobject_unref(options);
 715    qapi_free_BlockdevCreateOptions(create_options);
 716    return ret;
 717}
 718
 719static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 720{
 721    const char **vals;
 722    const char *host, *port;
 723    char *rados_str;
 724    InetSocketAddressBaseList *p;
 725    int i, cnt;
 726
 727    if (!opts->has_server) {
 728        return NULL;
 729    }
 730
 731    for (cnt = 0, p = opts->server; p; p = p->next) {
 732        cnt++;
 733    }
 734
 735    vals = g_new(const char *, cnt + 1);
 736
 737    for (i = 0, p = opts->server; p; p = p->next, i++) {
 738        host = p->value->host;
 739        port = p->value->port;
 740
 741        if (strchr(host, ':')) {
 742            vals[i] = g_strdup_printf("[%s]:%s", host, port);
 743        } else {
 744            vals[i] = g_strdup_printf("%s:%s", host, port);
 745        }
 746    }
 747    vals[i] = NULL;
 748
 749    rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
 750    g_strfreev((char **)vals);
 751    return rados_str;
 752}
 753
 754static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 755                            BlockdevOptionsRbd *opts, bool cache,
 756                            const char *keypairs, const char *secretid,
 757                            Error **errp)
 758{
 759    char *mon_host = NULL;
 760    Error *local_err = NULL;
 761    int r;
 762
 763    if (secretid) {
 764        if (opts->key_secret) {
 765            error_setg(errp,
 766                       "Legacy 'password-secret' clashes with 'key-secret'");
 767            return -EINVAL;
 768        }
 769        opts->key_secret = g_strdup(secretid);
 770        opts->has_key_secret = true;
 771    }
 772
 773    mon_host = qemu_rbd_mon_host(opts, &local_err);
 774    if (local_err) {
 775        error_propagate(errp, local_err);
 776        r = -EINVAL;
 777        goto out;
 778    }
 779
 780    r = rados_create(cluster, opts->user);
 781    if (r < 0) {
 782        error_setg_errno(errp, -r, "error initializing");
 783        goto out;
 784    }
 785
 786    /* try default location when conf=NULL, but ignore failure */
 787    r = rados_conf_read_file(*cluster, opts->conf);
 788    if (opts->has_conf && r < 0) {
 789        error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
 790        goto failed_shutdown;
 791    }
 792
 793    r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
 794    if (r < 0) {
 795        goto failed_shutdown;
 796    }
 797
 798    if (mon_host) {
 799        r = rados_conf_set(*cluster, "mon_host", mon_host);
 800        if (r < 0) {
 801            goto failed_shutdown;
 802        }
 803    }
 804
 805    r = qemu_rbd_set_auth(*cluster, opts, errp);
 806    if (r < 0) {
 807        goto failed_shutdown;
 808    }
 809
 810    /*
 811     * Fallback to more conservative semantics if setting cache
 812     * options fails. Ignore errors from setting rbd_cache because the
 813     * only possible error is that the option does not exist, and
 814     * librbd defaults to no caching. If write through caching cannot
 815     * be set up, fall back to no caching.
 816     */
 817    if (cache) {
 818        rados_conf_set(*cluster, "rbd_cache", "true");
 819    } else {
 820        rados_conf_set(*cluster, "rbd_cache", "false");
 821    }
 822
 823    r = rados_connect(*cluster);
 824    if (r < 0) {
 825        error_setg_errno(errp, -r, "error connecting");
 826        goto failed_shutdown;
 827    }
 828
 829    r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
 830    if (r < 0) {
 831        error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
 832        goto failed_shutdown;
 833    }
 834    /*
 835     * Set the namespace after opening the io context on the pool,
 836     * if nspace == NULL or if nspace == "", it is just as we did nothing
 837     */
 838    rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
 839
 840    r = 0;
 841    goto out;
 842
 843failed_shutdown:
 844    rados_shutdown(*cluster);
 845out:
 846    g_free(mon_host);
 847    return r;
 848}
 849
 850static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
 851                                    Error **errp)
 852{
 853    Visitor *v;
 854
 855    /* Convert the remaining options into a QAPI object */
 856    v = qobject_input_visitor_new_flat_confused(options, errp);
 857    if (!v) {
 858        return -EINVAL;
 859    }
 860
 861    visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
 862    visit_free(v);
 863    if (!opts) {
 864        return -EINVAL;
 865    }
 866
 867    return 0;
 868}
 869
 870static int qemu_rbd_attempt_legacy_options(QDict *options,
 871                                           BlockdevOptionsRbd **opts,
 872                                           char **keypairs)
 873{
 874    char *filename;
 875    int r;
 876
 877    filename = g_strdup(qdict_get_try_str(options, "filename"));
 878    if (!filename) {
 879        return -EINVAL;
 880    }
 881    qdict_del(options, "filename");
 882
 883    qemu_rbd_parse_filename(filename, options, NULL);
 884
 885    /* keypairs freed by caller */
 886    *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 887    if (*keypairs) {
 888        qdict_del(options, "=keyvalue-pairs");
 889    }
 890
 891    r = qemu_rbd_convert_options(options, opts, NULL);
 892
 893    g_free(filename);
 894    return r;
 895}
 896
 897static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
 898                         Error **errp)
 899{
 900    BDRVRBDState *s = bs->opaque;
 901    BlockdevOptionsRbd *opts = NULL;
 902    const QDictEntry *e;
 903    Error *local_err = NULL;
 904    char *keypairs, *secretid;
 905    rbd_image_info_t info;
 906    int r;
 907
 908    keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 909    if (keypairs) {
 910        qdict_del(options, "=keyvalue-pairs");
 911    }
 912
 913    secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
 914    if (secretid) {
 915        qdict_del(options, "password-secret");
 916    }
 917
 918    r = qemu_rbd_convert_options(options, &opts, &local_err);
 919    if (local_err) {
 920        /* If keypairs are present, that means some options are present in
 921         * the modern option format.  Don't attempt to parse legacy option
 922         * formats, as we won't support mixed usage. */
 923        if (keypairs) {
 924            error_propagate(errp, local_err);
 925            goto out;
 926        }
 927
 928        /* If the initial attempt to convert and process the options failed,
 929         * we may be attempting to open an image file that has the rbd options
 930         * specified in the older format consisting of all key/value pairs
 931         * encoded in the filename.  Go ahead and attempt to parse the
 932         * filename, and see if we can pull out the required options. */
 933        r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
 934        if (r < 0) {
 935            /* Propagate the original error, not the legacy parsing fallback
 936             * error, as the latter was just a best-effort attempt. */
 937            error_propagate(errp, local_err);
 938            goto out;
 939        }
 940        /* Take care whenever deciding to actually deprecate; once this ability
 941         * is removed, we will not be able to open any images with legacy-styled
 942         * backing image strings. */
 943        warn_report("RBD options encoded in the filename as keyvalue pairs "
 944                    "is deprecated");
 945    }
 946
 947    /* Remove the processed options from the QDict (the visitor processes
 948     * _all_ options in the QDict) */
 949    while ((e = qdict_first(options))) {
 950        qdict_del(options, e->key);
 951    }
 952
 953    r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
 954                         !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
 955    if (r < 0) {
 956        goto out;
 957    }
 958
 959    s->snap = g_strdup(opts->snapshot);
 960    s->image_name = g_strdup(opts->image);
 961
 962    /* rbd_open is always r/w */
 963    r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
 964    if (r < 0) {
 965        error_setg_errno(errp, -r, "error reading header from %s",
 966                         s->image_name);
 967        goto failed_open;
 968    }
 969
 970    if (opts->has_encrypt) {
 971#ifdef LIBRBD_SUPPORTS_ENCRYPTION
 972        r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
 973        if (r < 0) {
 974            goto failed_post_open;
 975        }
 976#else
 977        r = -ENOTSUP;
 978        error_setg(errp, "RBD library does not support image encryption");
 979        goto failed_post_open;
 980#endif
 981    }
 982
 983    r = rbd_stat(s->image, &info, sizeof(info));
 984    if (r < 0) {
 985        error_setg_errno(errp, -r, "error getting image info from %s",
 986                         s->image_name);
 987        goto failed_post_open;
 988    }
 989    s->image_size = info.size;
 990    s->object_size = info.obj_size;
 991
 992    /* If we are using an rbd snapshot, we must be r/o, otherwise
 993     * leave as-is */
 994    if (s->snap != NULL) {
 995        r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
 996        if (r < 0) {
 997            goto failed_post_open;
 998        }
 999    }
1000
1001#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1002    bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1003#endif
1004
1005    /* When extending regular files, we get zeros from the OS */
1006    bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1007
1008    r = 0;
1009    goto out;
1010
1011failed_post_open:
1012    rbd_close(s->image);
1013failed_open:
1014    rados_ioctx_destroy(s->io_ctx);
1015    g_free(s->snap);
1016    g_free(s->image_name);
1017    rados_shutdown(s->cluster);
1018out:
1019    qapi_free_BlockdevOptionsRbd(opts);
1020    g_free(keypairs);
1021    g_free(secretid);
1022    return r;
1023}
1024
1025
1026/* Since RBD is currently always opened R/W via the API,
1027 * we just need to check if we are using a snapshot or not, in
1028 * order to determine if we will allow it to be R/W */
1029static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1030                                   BlockReopenQueue *queue, Error **errp)
1031{
1032    BDRVRBDState *s = state->bs->opaque;
1033    int ret = 0;
1034
1035    if (s->snap && state->flags & BDRV_O_RDWR) {
1036        error_setg(errp,
1037                   "Cannot change node '%s' to r/w when using RBD snapshot",
1038                   bdrv_get_device_or_node_name(state->bs));
1039        ret = -EINVAL;
1040    }
1041
1042    return ret;
1043}
1044
1045static void qemu_rbd_close(BlockDriverState *bs)
1046{
1047    BDRVRBDState *s = bs->opaque;
1048
1049    rbd_close(s->image);
1050    rados_ioctx_destroy(s->io_ctx);
1051    g_free(s->snap);
1052    g_free(s->image_name);
1053    rados_shutdown(s->cluster);
1054}
1055
1056/* Resize the RBD image and update the 'image_size' with the current size */
1057static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1058{
1059    BDRVRBDState *s = bs->opaque;
1060    int r;
1061
1062    r = rbd_resize(s->image, size);
1063    if (r < 0) {
1064        return r;
1065    }
1066
1067    s->image_size = size;
1068
1069    return 0;
1070}
1071
1072static void qemu_rbd_finish_bh(void *opaque)
1073{
1074    RBDTask *task = opaque;
1075    task->complete = true;
1076    aio_co_wake(task->co);
1077}
1078
1079/*
1080 * This is the completion callback function for all rbd aio calls
1081 * started from qemu_rbd_start_co().
1082 *
1083 * Note: this function is being called from a non qemu thread so
1084 * we need to be careful about what we do here. Generally we only
1085 * schedule a BH, and do the rest of the io completion handling
1086 * from qemu_rbd_finish_bh() which runs in a qemu context.
1087 */
1088static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1089{
1090    task->ret = rbd_aio_get_return_value(c);
1091    rbd_aio_release(c);
1092    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1093                            qemu_rbd_finish_bh, task);
1094}
1095
1096static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1097                                          uint64_t offset,
1098                                          uint64_t bytes,
1099                                          QEMUIOVector *qiov,
1100                                          int flags,
1101                                          RBDAIOCmd cmd)
1102{
1103    BDRVRBDState *s = bs->opaque;
1104    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1105    rbd_completion_t c;
1106    int r;
1107
1108    assert(!qiov || qiov->size == bytes);
1109
1110    r = rbd_aio_create_completion(&task,
1111                                  (rbd_callback_t) qemu_rbd_completion_cb, &c);
1112    if (r < 0) {
1113        return r;
1114    }
1115
1116    switch (cmd) {
1117    case RBD_AIO_READ:
1118        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1119        break;
1120    case RBD_AIO_WRITE:
1121        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1122        break;
1123    case RBD_AIO_DISCARD:
1124        r = rbd_aio_discard(s->image, offset, bytes, c);
1125        break;
1126    case RBD_AIO_FLUSH:
1127        r = rbd_aio_flush(s->image, c);
1128        break;
1129#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1130    case RBD_AIO_WRITE_ZEROES: {
1131        int zero_flags = 0;
1132#ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1133        if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1134            zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1135        }
1136#endif
1137        r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1138        break;
1139    }
1140#endif
1141    default:
1142        r = -EINVAL;
1143    }
1144
1145    if (r < 0) {
1146        error_report("rbd request failed early: cmd %d offset %" PRIu64
1147                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1148                     bytes, flags, r, strerror(-r));
1149        rbd_aio_release(c);
1150        return r;
1151    }
1152
1153    while (!task.complete) {
1154        qemu_coroutine_yield();
1155    }
1156
1157    if (task.ret < 0) {
1158        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1159                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1160                     bytes, flags, task.ret, strerror(-task.ret));
1161        return task.ret;
1162    }
1163
1164    /* zero pad short reads */
1165    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1166        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1167    }
1168
1169    return 0;
1170}
1171
1172static int
1173coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1174                                int64_t bytes, QEMUIOVector *qiov,
1175                                BdrvRequestFlags flags)
1176{
1177    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1178}
1179
1180static int
1181coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1182                                 int64_t bytes, QEMUIOVector *qiov,
1183                                 BdrvRequestFlags flags)
1184{
1185    BDRVRBDState *s = bs->opaque;
1186    /*
1187     * RBD APIs don't allow us to write more than actual size, so in order
1188     * to support growing images, we resize the image before write
1189     * operations that exceed the current size.
1190     */
1191    if (offset + bytes > s->image_size) {
1192        int r = qemu_rbd_resize(bs, offset + bytes);
1193        if (r < 0) {
1194            return r;
1195        }
1196    }
1197    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1198}
1199
1200static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1201{
1202    return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1203}
1204
1205static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1206                                             int64_t offset, int64_t bytes)
1207{
1208    return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1209}
1210
1211#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1212static int
1213coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1214                                       int64_t bytes, BdrvRequestFlags flags)
1215{
1216    return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1217                             RBD_AIO_WRITE_ZEROES);
1218}
1219#endif
1220
1221static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1222{
1223    BDRVRBDState *s = bs->opaque;
1224    bdi->cluster_size = s->object_size;
1225    return 0;
1226}
1227
1228static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1229                                                     Error **errp)
1230{
1231    BDRVRBDState *s = bs->opaque;
1232    ImageInfoSpecific *spec_info;
1233    char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1234    int r;
1235
1236    if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1237        r = rbd_read(s->image, 0,
1238                     RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1239        if (r < 0) {
1240            error_setg_errno(errp, -r, "cannot read image start for probe");
1241            return NULL;
1242        }
1243    }
1244
1245    spec_info = g_new(ImageInfoSpecific, 1);
1246    *spec_info = (ImageInfoSpecific){
1247        .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1248        .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1249    };
1250
1251    if (memcmp(buf, rbd_luks_header_verification,
1252               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1253        spec_info->u.rbd.data->encryption_format =
1254                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1255        spec_info->u.rbd.data->has_encryption_format = true;
1256    } else if (memcmp(buf, rbd_luks2_header_verification,
1257               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1258        spec_info->u.rbd.data->encryption_format =
1259                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1260        spec_info->u.rbd.data->has_encryption_format = true;
1261    } else {
1262        spec_info->u.rbd.data->has_encryption_format = false;
1263    }
1264
1265    return spec_info;
1266}
1267
1268/*
1269 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1270 * value in the callback routine. Choose a value that does not conflict with
1271 * an existing exitcode and return it if we want to prematurely stop the
1272 * execution because we detected a change in the allocation status.
1273 */
1274#define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1275
1276static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1277                                    int exists, void *opaque)
1278{
1279    RBDDiffIterateReq *req = opaque;
1280
1281    assert(req->offs + req->bytes <= offs);
1282    /*
1283     * we do not diff against a snapshot so we should never receive a callback
1284     * for a hole.
1285     */
1286    assert(exists);
1287
1288    if (!req->exists && offs > req->offs) {
1289        /*
1290         * we started in an unallocated area and hit the first allocated
1291         * block. req->bytes must be set to the length of the unallocated area
1292         * before the allocated area. stop further processing.
1293         */
1294        req->bytes = offs - req->offs;
1295        return QEMU_RBD_EXIT_DIFF_ITERATE2;
1296    }
1297
1298    if (req->exists && offs > req->offs + req->bytes) {
1299        /*
1300         * we started in an allocated area and jumped over an unallocated area,
1301         * req->bytes contains the length of the allocated area before the
1302         * unallocated area. stop further processing.
1303         */
1304        return QEMU_RBD_EXIT_DIFF_ITERATE2;
1305    }
1306
1307    req->bytes += len;
1308    req->exists = true;
1309
1310    return 0;
1311}
1312
1313static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1314                                                 bool want_zero, int64_t offset,
1315                                                 int64_t bytes, int64_t *pnum,
1316                                                 int64_t *map,
1317                                                 BlockDriverState **file)
1318{
1319    BDRVRBDState *s = bs->opaque;
1320    int status, r;
1321    RBDDiffIterateReq req = { .offs = offset };
1322    uint64_t features, flags;
1323
1324    assert(offset + bytes <= s->image_size);
1325
1326    /* default to all sectors allocated */
1327    status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1328    *map = offset;
1329    *file = bs;
1330    *pnum = bytes;
1331
1332    /* check if RBD image supports fast-diff */
1333    r = rbd_get_features(s->image, &features);
1334    if (r < 0) {
1335        return status;
1336    }
1337    if (!(features & RBD_FEATURE_FAST_DIFF)) {
1338        return status;
1339    }
1340
1341    /* check if RBD fast-diff result is valid */
1342    r = rbd_get_flags(s->image, &flags);
1343    if (r < 0) {
1344        return status;
1345    }
1346    if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1347        return status;
1348    }
1349
1350    r = rbd_diff_iterate2(s->image, NULL, offset, bytes, true, true,
1351                          qemu_rbd_diff_iterate_cb, &req);
1352    if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1353        return status;
1354    }
1355    assert(req.bytes <= bytes);
1356    if (!req.exists) {
1357        if (r == 0) {
1358            /*
1359             * rbd_diff_iterate2 does not invoke callbacks for unallocated
1360             * areas. This here catches the case where no callback was
1361             * invoked at all (req.bytes == 0).
1362             */
1363            assert(req.bytes == 0);
1364            req.bytes = bytes;
1365        }
1366        status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1367    }
1368
1369    *pnum = req.bytes;
1370    return status;
1371}
1372
1373static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1374{
1375    BDRVRBDState *s = bs->opaque;
1376    int r;
1377
1378    r = rbd_get_size(s->image, &s->image_size);
1379    if (r < 0) {
1380        return r;
1381    }
1382
1383    return s->image_size;
1384}
1385
1386static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1387                                             int64_t offset,
1388                                             bool exact,
1389                                             PreallocMode prealloc,
1390                                             BdrvRequestFlags flags,
1391                                             Error **errp)
1392{
1393    int r;
1394
1395    if (prealloc != PREALLOC_MODE_OFF) {
1396        error_setg(errp, "Unsupported preallocation mode '%s'",
1397                   PreallocMode_str(prealloc));
1398        return -ENOTSUP;
1399    }
1400
1401    r = qemu_rbd_resize(bs, offset);
1402    if (r < 0) {
1403        error_setg_errno(errp, -r, "Failed to resize file");
1404        return r;
1405    }
1406
1407    return 0;
1408}
1409
1410static int qemu_rbd_snap_create(BlockDriverState *bs,
1411                                QEMUSnapshotInfo *sn_info)
1412{
1413    BDRVRBDState *s = bs->opaque;
1414    int r;
1415
1416    if (sn_info->name[0] == '\0') {
1417        return -EINVAL; /* we need a name for rbd snapshots */
1418    }
1419
1420    /*
1421     * rbd snapshots are using the name as the user controlled unique identifier
1422     * we can't use the rbd snapid for that purpose, as it can't be set
1423     */
1424    if (sn_info->id_str[0] != '\0' &&
1425        strcmp(sn_info->id_str, sn_info->name) != 0) {
1426        return -EINVAL;
1427    }
1428
1429    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1430        return -ERANGE;
1431    }
1432
1433    r = rbd_snap_create(s->image, sn_info->name);
1434    if (r < 0) {
1435        error_report("failed to create snap: %s", strerror(-r));
1436        return r;
1437    }
1438
1439    return 0;
1440}
1441
1442static int qemu_rbd_snap_remove(BlockDriverState *bs,
1443                                const char *snapshot_id,
1444                                const char *snapshot_name,
1445                                Error **errp)
1446{
1447    BDRVRBDState *s = bs->opaque;
1448    int r;
1449
1450    if (!snapshot_name) {
1451        error_setg(errp, "rbd need a valid snapshot name");
1452        return -EINVAL;
1453    }
1454
1455    /* If snapshot_id is specified, it must be equal to name, see
1456       qemu_rbd_snap_list() */
1457    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1458        error_setg(errp,
1459                   "rbd do not support snapshot id, it should be NULL or "
1460                   "equal to snapshot name");
1461        return -EINVAL;
1462    }
1463
1464    r = rbd_snap_remove(s->image, snapshot_name);
1465    if (r < 0) {
1466        error_setg_errno(errp, -r, "Failed to remove the snapshot");
1467    }
1468    return r;
1469}
1470
1471static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1472                                  const char *snapshot_name)
1473{
1474    BDRVRBDState *s = bs->opaque;
1475
1476    return rbd_snap_rollback(s->image, snapshot_name);
1477}
1478
1479static int qemu_rbd_snap_list(BlockDriverState *bs,
1480                              QEMUSnapshotInfo **psn_tab)
1481{
1482    BDRVRBDState *s = bs->opaque;
1483    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1484    int i, snap_count;
1485    rbd_snap_info_t *snaps;
1486    int max_snaps = RBD_MAX_SNAPS;
1487
1488    do {
1489        snaps = g_new(rbd_snap_info_t, max_snaps);
1490        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1491        if (snap_count <= 0) {
1492            g_free(snaps);
1493        }
1494    } while (snap_count == -ERANGE);
1495
1496    if (snap_count <= 0) {
1497        goto done;
1498    }
1499
1500    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1501
1502    for (i = 0; i < snap_count; i++) {
1503        const char *snap_name = snaps[i].name;
1504
1505        sn_info = sn_tab + i;
1506        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1507        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1508
1509        sn_info->vm_state_size = snaps[i].size;
1510        sn_info->date_sec = 0;
1511        sn_info->date_nsec = 0;
1512        sn_info->vm_clock_nsec = 0;
1513    }
1514    rbd_snap_list_end(snaps);
1515    g_free(snaps);
1516
1517 done:
1518    *psn_tab = sn_tab;
1519    return snap_count;
1520}
1521
1522static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1523                                                      Error **errp)
1524{
1525    BDRVRBDState *s = bs->opaque;
1526    int r = rbd_invalidate_cache(s->image);
1527    if (r < 0) {
1528        error_setg_errno(errp, -r, "Failed to invalidate the cache");
1529    }
1530}
1531
1532static QemuOptsList qemu_rbd_create_opts = {
1533    .name = "rbd-create-opts",
1534    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1535    .desc = {
1536        {
1537            .name = BLOCK_OPT_SIZE,
1538            .type = QEMU_OPT_SIZE,
1539            .help = "Virtual disk size"
1540        },
1541        {
1542            .name = BLOCK_OPT_CLUSTER_SIZE,
1543            .type = QEMU_OPT_SIZE,
1544            .help = "RBD object size"
1545        },
1546        {
1547            .name = "password-secret",
1548            .type = QEMU_OPT_STRING,
1549            .help = "ID of secret providing the password",
1550        },
1551        {
1552            .name = "encrypt.format",
1553            .type = QEMU_OPT_STRING,
1554            .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1555        },
1556        {
1557            .name = "encrypt.cipher-alg",
1558            .type = QEMU_OPT_STRING,
1559            .help = "Name of encryption cipher algorithm"
1560                    " (allowed values: aes-128, aes-256)",
1561        },
1562        {
1563            .name = "encrypt.key-secret",
1564            .type = QEMU_OPT_STRING,
1565            .help = "ID of secret providing LUKS passphrase",
1566        },
1567        { /* end of list */ }
1568    }
1569};
1570
1571static const char *const qemu_rbd_strong_runtime_opts[] = {
1572    "pool",
1573    "namespace",
1574    "image",
1575    "conf",
1576    "snapshot",
1577    "user",
1578    "server.",
1579    "password-secret",
1580
1581    NULL
1582};
1583
1584static BlockDriver bdrv_rbd = {
1585    .format_name            = "rbd",
1586    .instance_size          = sizeof(BDRVRBDState),
1587    .bdrv_parse_filename    = qemu_rbd_parse_filename,
1588    .bdrv_file_open         = qemu_rbd_open,
1589    .bdrv_close             = qemu_rbd_close,
1590    .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1591    .bdrv_co_create         = qemu_rbd_co_create,
1592    .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1593    .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1594    .bdrv_get_info          = qemu_rbd_getinfo,
1595    .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1596    .create_opts            = &qemu_rbd_create_opts,
1597    .bdrv_getlength         = qemu_rbd_getlength,
1598    .bdrv_co_truncate       = qemu_rbd_co_truncate,
1599    .protocol_name          = "rbd",
1600
1601    .bdrv_co_preadv         = qemu_rbd_co_preadv,
1602    .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1603    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1604    .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1605#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1606    .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1607#endif
1608    .bdrv_co_block_status   = qemu_rbd_co_block_status,
1609
1610    .bdrv_snapshot_create   = qemu_rbd_snap_create,
1611    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1612    .bdrv_snapshot_list     = qemu_rbd_snap_list,
1613    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1614    .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1615
1616    .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1617};
1618
1619static void bdrv_rbd_init(void)
1620{
1621    bdrv_register(&bdrv_rbd);
1622}
1623
1624block_init(bdrv_rbd_init);
1625