qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include "qemu/osdep.h"
  15
  16#include <rbd/librbd.h>
  17#include "qapi/error.h"
  18#include "qemu/error-report.h"
  19#include "qemu/module.h"
  20#include "qemu/option.h"
  21#include "block/block_int.h"
  22#include "block/qdict.h"
  23#include "crypto/secret.h"
  24#include "qemu/cutils.h"
  25#include "sysemu/replay.h"
  26#include "qapi/qmp/qstring.h"
  27#include "qapi/qmp/qdict.h"
  28#include "qapi/qmp/qjson.h"
  29#include "qapi/qmp/qlist.h"
  30#include "qapi/qobject-input-visitor.h"
  31#include "qapi/qapi-visit-block-core.h"
  32
  33/*
  34 * When specifying the image filename use:
  35 *
  36 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  37 *
  38 * poolname must be the name of an existing rados pool.
  39 *
  40 * devicename is the name of the rbd image.
  41 *
  42 * Each option given is used to configure rados, and may be any valid
  43 * Ceph option, "id", or "conf".
  44 *
  45 * The "id" option indicates what user we should authenticate as to
  46 * the Ceph cluster.  If it is excluded we will use the Ceph default
  47 * (normally 'admin').
  48 *
  49 * The "conf" option specifies a Ceph configuration file to read.  If
  50 * it is not specified, we will read from the default Ceph locations
  51 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  52 * file, specify conf=/dev/null.
  53 *
  54 * Configuration values containing :, @, or = can be escaped with a
  55 * leading "\".
  56 */
  57
  58#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  59
  60#define RBD_MAX_SNAPS 100
  61
  62#define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
  63
  64static const char rbd_luks_header_verification[
  65        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
  66    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
  67};
  68
  69static const char rbd_luks2_header_verification[
  70        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
  71    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
  72};
  73
  74typedef enum {
  75    RBD_AIO_READ,
  76    RBD_AIO_WRITE,
  77    RBD_AIO_DISCARD,
  78    RBD_AIO_FLUSH,
  79    RBD_AIO_WRITE_ZEROES
  80} RBDAIOCmd;
  81
  82typedef struct BDRVRBDState {
  83    rados_t cluster;
  84    rados_ioctx_t io_ctx;
  85    rbd_image_t image;
  86    char *image_name;
  87    char *snap;
  88    char *namespace;
  89    uint64_t image_size;
  90    uint64_t object_size;
  91} BDRVRBDState;
  92
  93typedef struct RBDTask {
  94    BlockDriverState *bs;
  95    Coroutine *co;
  96    bool complete;
  97    int64_t ret;
  98} RBDTask;
  99
 100typedef struct RBDDiffIterateReq {
 101    uint64_t offs;
 102    uint64_t bytes;
 103    bool exists;
 104} RBDDiffIterateReq;
 105
 106static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 107                            BlockdevOptionsRbd *opts, bool cache,
 108                            const char *keypairs, const char *secretid,
 109                            Error **errp);
 110
 111static char *qemu_rbd_strchr(char *src, char delim)
 112{
 113    char *p;
 114
 115    for (p = src; *p; ++p) {
 116        if (*p == delim) {
 117            return p;
 118        }
 119        if (*p == '\\' && p[1] != '\0') {
 120            ++p;
 121        }
 122    }
 123
 124    return NULL;
 125}
 126
 127
 128static char *qemu_rbd_next_tok(char *src, char delim, char **p)
 129{
 130    char *end;
 131
 132    *p = NULL;
 133
 134    end = qemu_rbd_strchr(src, delim);
 135    if (end) {
 136        *p = end + 1;
 137        *end = '\0';
 138    }
 139    return src;
 140}
 141
 142static void qemu_rbd_unescape(char *src)
 143{
 144    char *p;
 145
 146    for (p = src; *src; ++src, ++p) {
 147        if (*src == '\\' && src[1] != '\0') {
 148            src++;
 149        }
 150        *p = *src;
 151    }
 152    *p = '\0';
 153}
 154
 155static void qemu_rbd_parse_filename(const char *filename, QDict *options,
 156                                    Error **errp)
 157{
 158    const char *start;
 159    char *p, *buf;
 160    QList *keypairs = NULL;
 161    char *found_str, *image_name;
 162
 163    if (!strstart(filename, "rbd:", &start)) {
 164        error_setg(errp, "File name must start with 'rbd:'");
 165        return;
 166    }
 167
 168    buf = g_strdup(start);
 169    p = buf;
 170
 171    found_str = qemu_rbd_next_tok(p, '/', &p);
 172    if (!p) {
 173        error_setg(errp, "Pool name is required");
 174        goto done;
 175    }
 176    qemu_rbd_unescape(found_str);
 177    qdict_put_str(options, "pool", found_str);
 178
 179    if (qemu_rbd_strchr(p, '@')) {
 180        image_name = qemu_rbd_next_tok(p, '@', &p);
 181
 182        found_str = qemu_rbd_next_tok(p, ':', &p);
 183        qemu_rbd_unescape(found_str);
 184        qdict_put_str(options, "snapshot", found_str);
 185    } else {
 186        image_name = qemu_rbd_next_tok(p, ':', &p);
 187    }
 188    /* Check for namespace in the image_name */
 189    if (qemu_rbd_strchr(image_name, '/')) {
 190        found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
 191        qemu_rbd_unescape(found_str);
 192        qdict_put_str(options, "namespace", found_str);
 193    } else {
 194        qdict_put_str(options, "namespace", "");
 195    }
 196    qemu_rbd_unescape(image_name);
 197    qdict_put_str(options, "image", image_name);
 198    if (!p) {
 199        goto done;
 200    }
 201
 202    /* The following are essentially all key/value pairs, and we treat
 203     * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
 204    while (p) {
 205        char *name, *value;
 206        name = qemu_rbd_next_tok(p, '=', &p);
 207        if (!p) {
 208            error_setg(errp, "conf option %s has no value", name);
 209            break;
 210        }
 211
 212        qemu_rbd_unescape(name);
 213
 214        value = qemu_rbd_next_tok(p, ':', &p);
 215        qemu_rbd_unescape(value);
 216
 217        if (!strcmp(name, "conf")) {
 218            qdict_put_str(options, "conf", value);
 219        } else if (!strcmp(name, "id")) {
 220            qdict_put_str(options, "user", value);
 221        } else {
 222            /*
 223             * We pass these internally to qemu_rbd_set_keypairs(), so
 224             * we can get away with the simpler list of [ "key1",
 225             * "value1", "key2", "value2" ] rather than a raw dict
 226             * { "key1": "value1", "key2": "value2" } where we can't
 227             * guarantee order, or even a more correct but complex
 228             * [ { "key1": "value1" }, { "key2": "value2" } ]
 229             */
 230            if (!keypairs) {
 231                keypairs = qlist_new();
 232            }
 233            qlist_append_str(keypairs, name);
 234            qlist_append_str(keypairs, value);
 235        }
 236    }
 237
 238    if (keypairs) {
 239        qdict_put(options, "=keyvalue-pairs",
 240                  qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
 241    }
 242
 243done:
 244    g_free(buf);
 245    qobject_unref(keypairs);
 246    return;
 247}
 248
 249static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
 250                             Error **errp)
 251{
 252    char *key, *acr;
 253    int r;
 254    GString *accu;
 255    RbdAuthModeList *auth;
 256
 257    if (opts->key_secret) {
 258        key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
 259        if (!key) {
 260            return -EIO;
 261        }
 262        r = rados_conf_set(cluster, "key", key);
 263        g_free(key);
 264        if (r < 0) {
 265            error_setg_errno(errp, -r, "Could not set 'key'");
 266            return r;
 267        }
 268    }
 269
 270    if (opts->has_auth_client_required) {
 271        accu = g_string_new("");
 272        for (auth = opts->auth_client_required; auth; auth = auth->next) {
 273            if (accu->str[0]) {
 274                g_string_append_c(accu, ';');
 275            }
 276            g_string_append(accu, RbdAuthMode_str(auth->value));
 277        }
 278        acr = g_string_free(accu, FALSE);
 279        r = rados_conf_set(cluster, "auth_client_required", acr);
 280        g_free(acr);
 281        if (r < 0) {
 282            error_setg_errno(errp, -r,
 283                             "Could not set 'auth_client_required'");
 284            return r;
 285        }
 286    }
 287
 288    return 0;
 289}
 290
 291static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
 292                                 Error **errp)
 293{
 294    QList *keypairs;
 295    QString *name;
 296    QString *value;
 297    const char *key;
 298    size_t remaining;
 299    int ret = 0;
 300
 301    if (!keypairs_json) {
 302        return ret;
 303    }
 304    keypairs = qobject_to(QList,
 305                          qobject_from_json(keypairs_json, &error_abort));
 306    remaining = qlist_size(keypairs) / 2;
 307    assert(remaining);
 308
 309    while (remaining--) {
 310        name = qobject_to(QString, qlist_pop(keypairs));
 311        value = qobject_to(QString, qlist_pop(keypairs));
 312        assert(name && value);
 313        key = qstring_get_str(name);
 314
 315        ret = rados_conf_set(cluster, key, qstring_get_str(value));
 316        qobject_unref(value);
 317        if (ret < 0) {
 318            error_setg_errno(errp, -ret, "invalid conf option %s", key);
 319            qobject_unref(name);
 320            ret = -EINVAL;
 321            break;
 322        }
 323        qobject_unref(name);
 324    }
 325
 326    qobject_unref(keypairs);
 327    return ret;
 328}
 329
 330#ifdef LIBRBD_SUPPORTS_ENCRYPTION
 331static int qemu_rbd_convert_luks_options(
 332        RbdEncryptionOptionsLUKSBase *luks_opts,
 333        char **passphrase,
 334        size_t *passphrase_len,
 335        Error **errp)
 336{
 337    return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
 338                                 passphrase_len, errp);
 339}
 340
 341static int qemu_rbd_convert_luks_create_options(
 342        RbdEncryptionCreateOptionsLUKSBase *luks_opts,
 343        rbd_encryption_algorithm_t *alg,
 344        char **passphrase,
 345        size_t *passphrase_len,
 346        Error **errp)
 347{
 348    int r = 0;
 349
 350    r = qemu_rbd_convert_luks_options(
 351            qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
 352            passphrase, passphrase_len, errp);
 353    if (r < 0) {
 354        return r;
 355    }
 356
 357    if (luks_opts->has_cipher_alg) {
 358        switch (luks_opts->cipher_alg) {
 359            case QCRYPTO_CIPHER_ALG_AES_128: {
 360                *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
 361                break;
 362            }
 363            case QCRYPTO_CIPHER_ALG_AES_256: {
 364                *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
 365                break;
 366            }
 367            default: {
 368                r = -ENOTSUP;
 369                error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
 370                                 luks_opts->cipher_alg);
 371                return r;
 372            }
 373        }
 374    } else {
 375        /* default alg */
 376        *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
 377    }
 378
 379    return 0;
 380}
 381
 382static int qemu_rbd_encryption_format(rbd_image_t image,
 383                                      RbdEncryptionCreateOptions *encrypt,
 384                                      Error **errp)
 385{
 386    int r = 0;
 387    g_autofree char *passphrase = NULL;
 388    size_t passphrase_len;
 389    rbd_encryption_format_t format;
 390    rbd_encryption_options_t opts;
 391    rbd_encryption_luks1_format_options_t luks_opts;
 392    rbd_encryption_luks2_format_options_t luks2_opts;
 393    size_t opts_size;
 394    uint64_t raw_size, effective_size;
 395
 396    r = rbd_get_size(image, &raw_size);
 397    if (r < 0) {
 398        error_setg_errno(errp, -r, "cannot get raw image size");
 399        return r;
 400    }
 401
 402    switch (encrypt->format) {
 403        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
 404            memset(&luks_opts, 0, sizeof(luks_opts));
 405            format = RBD_ENCRYPTION_FORMAT_LUKS1;
 406            opts = &luks_opts;
 407            opts_size = sizeof(luks_opts);
 408            r = qemu_rbd_convert_luks_create_options(
 409                    qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
 410                    &luks_opts.alg, &passphrase, &passphrase_len, errp);
 411            if (r < 0) {
 412                return r;
 413            }
 414            luks_opts.passphrase = passphrase;
 415            luks_opts.passphrase_size = passphrase_len;
 416            break;
 417        }
 418        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
 419            memset(&luks2_opts, 0, sizeof(luks2_opts));
 420            format = RBD_ENCRYPTION_FORMAT_LUKS2;
 421            opts = &luks2_opts;
 422            opts_size = sizeof(luks2_opts);
 423            r = qemu_rbd_convert_luks_create_options(
 424                    qapi_RbdEncryptionCreateOptionsLUKS2_base(
 425                            &encrypt->u.luks2),
 426                    &luks2_opts.alg, &passphrase, &passphrase_len, errp);
 427            if (r < 0) {
 428                return r;
 429            }
 430            luks2_opts.passphrase = passphrase;
 431            luks2_opts.passphrase_size = passphrase_len;
 432            break;
 433        }
 434        default: {
 435            r = -ENOTSUP;
 436            error_setg_errno(
 437                    errp, -r, "unknown image encryption format: %u",
 438                    encrypt->format);
 439            return r;
 440        }
 441    }
 442
 443    r = rbd_encryption_format(image, format, opts, opts_size);
 444    if (r < 0) {
 445        error_setg_errno(errp, -r, "encryption format fail");
 446        return r;
 447    }
 448
 449    r = rbd_get_size(image, &effective_size);
 450    if (r < 0) {
 451        error_setg_errno(errp, -r, "cannot get effective image size");
 452        return r;
 453    }
 454
 455    r = rbd_resize(image, raw_size + (raw_size - effective_size));
 456    if (r < 0) {
 457        error_setg_errno(errp, -r, "cannot resize image after format");
 458        return r;
 459    }
 460
 461    return 0;
 462}
 463
 464static int qemu_rbd_encryption_load(rbd_image_t image,
 465                                    RbdEncryptionOptions *encrypt,
 466                                    Error **errp)
 467{
 468    int r = 0;
 469    g_autofree char *passphrase = NULL;
 470    size_t passphrase_len;
 471    rbd_encryption_luks1_format_options_t luks_opts;
 472    rbd_encryption_luks2_format_options_t luks2_opts;
 473    rbd_encryption_format_t format;
 474    rbd_encryption_options_t opts;
 475    size_t opts_size;
 476
 477    switch (encrypt->format) {
 478        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
 479            memset(&luks_opts, 0, sizeof(luks_opts));
 480            format = RBD_ENCRYPTION_FORMAT_LUKS1;
 481            opts = &luks_opts;
 482            opts_size = sizeof(luks_opts);
 483            r = qemu_rbd_convert_luks_options(
 484                    qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
 485                    &passphrase, &passphrase_len, errp);
 486            if (r < 0) {
 487                return r;
 488            }
 489            luks_opts.passphrase = passphrase;
 490            luks_opts.passphrase_size = passphrase_len;
 491            break;
 492        }
 493        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
 494            memset(&luks2_opts, 0, sizeof(luks2_opts));
 495            format = RBD_ENCRYPTION_FORMAT_LUKS2;
 496            opts = &luks2_opts;
 497            opts_size = sizeof(luks2_opts);
 498            r = qemu_rbd_convert_luks_options(
 499                    qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
 500                    &passphrase, &passphrase_len, errp);
 501            if (r < 0) {
 502                return r;
 503            }
 504            luks2_opts.passphrase = passphrase;
 505            luks2_opts.passphrase_size = passphrase_len;
 506            break;
 507        }
 508        default: {
 509            r = -ENOTSUP;
 510            error_setg_errno(
 511                    errp, -r, "unknown image encryption format: %u",
 512                    encrypt->format);
 513            return r;
 514        }
 515    }
 516
 517    r = rbd_encryption_load(image, format, opts, opts_size);
 518    if (r < 0) {
 519        error_setg_errno(errp, -r, "encryption load fail");
 520        return r;
 521    }
 522
 523    return 0;
 524}
 525#endif
 526
 527/* FIXME Deprecate and remove keypairs or make it available in QMP. */
 528static int qemu_rbd_do_create(BlockdevCreateOptions *options,
 529                              const char *keypairs, const char *password_secret,
 530                              Error **errp)
 531{
 532    BlockdevCreateOptionsRbd *opts = &options->u.rbd;
 533    rados_t cluster;
 534    rados_ioctx_t io_ctx;
 535    int obj_order = 0;
 536    int ret;
 537
 538    assert(options->driver == BLOCKDEV_DRIVER_RBD);
 539    if (opts->location->has_snapshot) {
 540        error_setg(errp, "Can't use snapshot name for image creation");
 541        return -EINVAL;
 542    }
 543
 544#ifndef LIBRBD_SUPPORTS_ENCRYPTION
 545    if (opts->has_encrypt) {
 546        error_setg(errp, "RBD library does not support image encryption");
 547        return -ENOTSUP;
 548    }
 549#endif
 550
 551    if (opts->has_cluster_size) {
 552        int64_t objsize = opts->cluster_size;
 553        if ((objsize - 1) & objsize) {    /* not a power of 2? */
 554            error_setg(errp, "obj size needs to be power of 2");
 555            return -EINVAL;
 556        }
 557        if (objsize < 4096) {
 558            error_setg(errp, "obj size too small");
 559            return -EINVAL;
 560        }
 561        obj_order = ctz32(objsize);
 562    }
 563
 564    ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
 565                           password_secret, errp);
 566    if (ret < 0) {
 567        return ret;
 568    }
 569
 570    ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
 571    if (ret < 0) {
 572        error_setg_errno(errp, -ret, "error rbd create");
 573        goto out;
 574    }
 575
 576#ifdef LIBRBD_SUPPORTS_ENCRYPTION
 577    if (opts->has_encrypt) {
 578        rbd_image_t image;
 579
 580        ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
 581        if (ret < 0) {
 582            error_setg_errno(errp, -ret,
 583                             "error opening image '%s' for encryption format",
 584                             opts->location->image);
 585            goto out;
 586        }
 587
 588        ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
 589        rbd_close(image);
 590        if (ret < 0) {
 591            /* encryption format fail, try removing the image */
 592            rbd_remove(io_ctx, opts->location->image);
 593            goto out;
 594        }
 595    }
 596#endif
 597
 598    ret = 0;
 599out:
 600    rados_ioctx_destroy(io_ctx);
 601    rados_shutdown(cluster);
 602    return ret;
 603}
 604
 605static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
 606{
 607    return qemu_rbd_do_create(options, NULL, NULL, errp);
 608}
 609
 610static int qemu_rbd_extract_encryption_create_options(
 611        QemuOpts *opts,
 612        RbdEncryptionCreateOptions **spec,
 613        Error **errp)
 614{
 615    QDict *opts_qdict;
 616    QDict *encrypt_qdict;
 617    Visitor *v;
 618    int ret = 0;
 619
 620    opts_qdict = qemu_opts_to_qdict(opts, NULL);
 621    qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
 622    qobject_unref(opts_qdict);
 623    if (!qdict_size(encrypt_qdict)) {
 624        *spec = NULL;
 625        goto exit;
 626    }
 627
 628    /* Convert options into a QAPI object */
 629    v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
 630    if (!v) {
 631        ret = -EINVAL;
 632        goto exit;
 633    }
 634
 635    visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
 636    visit_free(v);
 637    if (!*spec) {
 638        ret = -EINVAL;
 639        goto exit;
 640    }
 641
 642exit:
 643    qobject_unref(encrypt_qdict);
 644    return ret;
 645}
 646
 647static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
 648                                                const char *filename,
 649                                                QemuOpts *opts,
 650                                                Error **errp)
 651{
 652    BlockdevCreateOptions *create_options;
 653    BlockdevCreateOptionsRbd *rbd_opts;
 654    BlockdevOptionsRbd *loc;
 655    RbdEncryptionCreateOptions *encrypt = NULL;
 656    Error *local_err = NULL;
 657    const char *keypairs, *password_secret;
 658    QDict *options = NULL;
 659    int ret = 0;
 660
 661    create_options = g_new0(BlockdevCreateOptions, 1);
 662    create_options->driver = BLOCKDEV_DRIVER_RBD;
 663    rbd_opts = &create_options->u.rbd;
 664
 665    rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
 666
 667    password_secret = qemu_opt_get(opts, "password-secret");
 668
 669    /* Read out options */
 670    rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
 671                              BDRV_SECTOR_SIZE);
 672    rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
 673                                                   BLOCK_OPT_CLUSTER_SIZE, 0);
 674    rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
 675
 676    options = qdict_new();
 677    qemu_rbd_parse_filename(filename, options, &local_err);
 678    if (local_err) {
 679        ret = -EINVAL;
 680        error_propagate(errp, local_err);
 681        goto exit;
 682    }
 683
 684    ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
 685    if (ret < 0) {
 686        goto exit;
 687    }
 688    rbd_opts->encrypt     = encrypt;
 689    rbd_opts->has_encrypt = !!encrypt;
 690
 691    /*
 692     * Caution: while qdict_get_try_str() is fine, getting non-string
 693     * types would require more care.  When @options come from -blockdev
 694     * or blockdev_add, its members are typed according to the QAPI
 695     * schema, but when they come from -drive, they're all QString.
 696     */
 697    loc = rbd_opts->location;
 698    loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
 699    loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
 700    loc->has_conf    = !!loc->conf;
 701    loc->user        = g_strdup(qdict_get_try_str(options, "user"));
 702    loc->has_user    = !!loc->user;
 703    loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
 704    loc->has_q_namespace = !!loc->q_namespace;
 705    loc->image       = g_strdup(qdict_get_try_str(options, "image"));
 706    keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
 707
 708    ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
 709    if (ret < 0) {
 710        goto exit;
 711    }
 712
 713exit:
 714    qobject_unref(options);
 715    qapi_free_BlockdevCreateOptions(create_options);
 716    return ret;
 717}
 718
 719static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 720{
 721    const char **vals;
 722    const char *host, *port;
 723    char *rados_str;
 724    InetSocketAddressBaseList *p;
 725    int i, cnt;
 726
 727    if (!opts->has_server) {
 728        return NULL;
 729    }
 730
 731    for (cnt = 0, p = opts->server; p; p = p->next) {
 732        cnt++;
 733    }
 734
 735    vals = g_new(const char *, cnt + 1);
 736
 737    for (i = 0, p = opts->server; p; p = p->next, i++) {
 738        host = p->value->host;
 739        port = p->value->port;
 740
 741        if (strchr(host, ':')) {
 742            vals[i] = g_strdup_printf("[%s]:%s", host, port);
 743        } else {
 744            vals[i] = g_strdup_printf("%s:%s", host, port);
 745        }
 746    }
 747    vals[i] = NULL;
 748
 749    rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
 750    g_strfreev((char **)vals);
 751    return rados_str;
 752}
 753
 754static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 755                            BlockdevOptionsRbd *opts, bool cache,
 756                            const char *keypairs, const char *secretid,
 757                            Error **errp)
 758{
 759    char *mon_host = NULL;
 760    Error *local_err = NULL;
 761    int r;
 762
 763    if (secretid) {
 764        if (opts->key_secret) {
 765            error_setg(errp,
 766                       "Legacy 'password-secret' clashes with 'key-secret'");
 767            return -EINVAL;
 768        }
 769        opts->key_secret = g_strdup(secretid);
 770        opts->has_key_secret = true;
 771    }
 772
 773    mon_host = qemu_rbd_mon_host(opts, &local_err);
 774    if (local_err) {
 775        error_propagate(errp, local_err);
 776        r = -EINVAL;
 777        goto out;
 778    }
 779
 780    r = rados_create(cluster, opts->user);
 781    if (r < 0) {
 782        error_setg_errno(errp, -r, "error initializing");
 783        goto out;
 784    }
 785
 786    /* try default location when conf=NULL, but ignore failure */
 787    r = rados_conf_read_file(*cluster, opts->conf);
 788    if (opts->has_conf && r < 0) {
 789        error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
 790        goto failed_shutdown;
 791    }
 792
 793    r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
 794    if (r < 0) {
 795        goto failed_shutdown;
 796    }
 797
 798    if (mon_host) {
 799        r = rados_conf_set(*cluster, "mon_host", mon_host);
 800        if (r < 0) {
 801            goto failed_shutdown;
 802        }
 803    }
 804
 805    r = qemu_rbd_set_auth(*cluster, opts, errp);
 806    if (r < 0) {
 807        goto failed_shutdown;
 808    }
 809
 810    /*
 811     * Fallback to more conservative semantics if setting cache
 812     * options fails. Ignore errors from setting rbd_cache because the
 813     * only possible error is that the option does not exist, and
 814     * librbd defaults to no caching. If write through caching cannot
 815     * be set up, fall back to no caching.
 816     */
 817    if (cache) {
 818        rados_conf_set(*cluster, "rbd_cache", "true");
 819    } else {
 820        rados_conf_set(*cluster, "rbd_cache", "false");
 821    }
 822
 823    r = rados_connect(*cluster);
 824    if (r < 0) {
 825        error_setg_errno(errp, -r, "error connecting");
 826        goto failed_shutdown;
 827    }
 828
 829    r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
 830    if (r < 0) {
 831        error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
 832        goto failed_shutdown;
 833    }
 834
 835#ifdef HAVE_RBD_NAMESPACE_EXISTS
 836    if (opts->has_q_namespace && strlen(opts->q_namespace) > 0) {
 837        bool exists;
 838
 839        r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
 840        if (r < 0) {
 841            error_setg_errno(errp, -r, "error checking namespace");
 842            goto failed_ioctx_destroy;
 843        }
 844
 845        if (!exists) {
 846            error_setg(errp, "namespace '%s' does not exist",
 847                       opts->q_namespace);
 848            r = -ENOENT;
 849            goto failed_ioctx_destroy;
 850        }
 851    }
 852#endif
 853
 854    /*
 855     * Set the namespace after opening the io context on the pool,
 856     * if nspace == NULL or if nspace == "", it is just as we did nothing
 857     */
 858    rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
 859
 860    r = 0;
 861    goto out;
 862
 863#ifdef HAVE_RBD_NAMESPACE_EXISTS
 864failed_ioctx_destroy:
 865    rados_ioctx_destroy(*io_ctx);
 866#endif
 867failed_shutdown:
 868    rados_shutdown(*cluster);
 869out:
 870    g_free(mon_host);
 871    return r;
 872}
 873
 874static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
 875                                    Error **errp)
 876{
 877    Visitor *v;
 878
 879    /* Convert the remaining options into a QAPI object */
 880    v = qobject_input_visitor_new_flat_confused(options, errp);
 881    if (!v) {
 882        return -EINVAL;
 883    }
 884
 885    visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
 886    visit_free(v);
 887    if (!opts) {
 888        return -EINVAL;
 889    }
 890
 891    return 0;
 892}
 893
 894static int qemu_rbd_attempt_legacy_options(QDict *options,
 895                                           BlockdevOptionsRbd **opts,
 896                                           char **keypairs)
 897{
 898    char *filename;
 899    int r;
 900
 901    filename = g_strdup(qdict_get_try_str(options, "filename"));
 902    if (!filename) {
 903        return -EINVAL;
 904    }
 905    qdict_del(options, "filename");
 906
 907    qemu_rbd_parse_filename(filename, options, NULL);
 908
 909    /* keypairs freed by caller */
 910    *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 911    if (*keypairs) {
 912        qdict_del(options, "=keyvalue-pairs");
 913    }
 914
 915    r = qemu_rbd_convert_options(options, opts, NULL);
 916
 917    g_free(filename);
 918    return r;
 919}
 920
 921static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
 922                         Error **errp)
 923{
 924    BDRVRBDState *s = bs->opaque;
 925    BlockdevOptionsRbd *opts = NULL;
 926    const QDictEntry *e;
 927    Error *local_err = NULL;
 928    char *keypairs, *secretid;
 929    rbd_image_info_t info;
 930    int r;
 931
 932    keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 933    if (keypairs) {
 934        qdict_del(options, "=keyvalue-pairs");
 935    }
 936
 937    secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
 938    if (secretid) {
 939        qdict_del(options, "password-secret");
 940    }
 941
 942    r = qemu_rbd_convert_options(options, &opts, &local_err);
 943    if (local_err) {
 944        /* If keypairs are present, that means some options are present in
 945         * the modern option format.  Don't attempt to parse legacy option
 946         * formats, as we won't support mixed usage. */
 947        if (keypairs) {
 948            error_propagate(errp, local_err);
 949            goto out;
 950        }
 951
 952        /* If the initial attempt to convert and process the options failed,
 953         * we may be attempting to open an image file that has the rbd options
 954         * specified in the older format consisting of all key/value pairs
 955         * encoded in the filename.  Go ahead and attempt to parse the
 956         * filename, and see if we can pull out the required options. */
 957        r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
 958        if (r < 0) {
 959            /* Propagate the original error, not the legacy parsing fallback
 960             * error, as the latter was just a best-effort attempt. */
 961            error_propagate(errp, local_err);
 962            goto out;
 963        }
 964        /* Take care whenever deciding to actually deprecate; once this ability
 965         * is removed, we will not be able to open any images with legacy-styled
 966         * backing image strings. */
 967        warn_report("RBD options encoded in the filename as keyvalue pairs "
 968                    "is deprecated");
 969    }
 970
 971    /* Remove the processed options from the QDict (the visitor processes
 972     * _all_ options in the QDict) */
 973    while ((e = qdict_first(options))) {
 974        qdict_del(options, e->key);
 975    }
 976
 977    r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
 978                         !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
 979    if (r < 0) {
 980        goto out;
 981    }
 982
 983    s->snap = g_strdup(opts->snapshot);
 984    s->image_name = g_strdup(opts->image);
 985
 986    /* rbd_open is always r/w */
 987    r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
 988    if (r < 0) {
 989        error_setg_errno(errp, -r, "error reading header from %s",
 990                         s->image_name);
 991        goto failed_open;
 992    }
 993
 994    if (opts->has_encrypt) {
 995#ifdef LIBRBD_SUPPORTS_ENCRYPTION
 996        r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
 997        if (r < 0) {
 998            goto failed_post_open;
 999        }
1000#else
1001        r = -ENOTSUP;
1002        error_setg(errp, "RBD library does not support image encryption");
1003        goto failed_post_open;
1004#endif
1005    }
1006
1007    r = rbd_stat(s->image, &info, sizeof(info));
1008    if (r < 0) {
1009        error_setg_errno(errp, -r, "error getting image info from %s",
1010                         s->image_name);
1011        goto failed_post_open;
1012    }
1013    s->image_size = info.size;
1014    s->object_size = info.obj_size;
1015
1016    /* If we are using an rbd snapshot, we must be r/o, otherwise
1017     * leave as-is */
1018    if (s->snap != NULL) {
1019        r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1020        if (r < 0) {
1021            goto failed_post_open;
1022        }
1023    }
1024
1025#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1026    bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1027#endif
1028
1029    /* When extending regular files, we get zeros from the OS */
1030    bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1031
1032    r = 0;
1033    goto out;
1034
1035failed_post_open:
1036    rbd_close(s->image);
1037failed_open:
1038    rados_ioctx_destroy(s->io_ctx);
1039    g_free(s->snap);
1040    g_free(s->image_name);
1041    rados_shutdown(s->cluster);
1042out:
1043    qapi_free_BlockdevOptionsRbd(opts);
1044    g_free(keypairs);
1045    g_free(secretid);
1046    return r;
1047}
1048
1049
1050/* Since RBD is currently always opened R/W via the API,
1051 * we just need to check if we are using a snapshot or not, in
1052 * order to determine if we will allow it to be R/W */
1053static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1054                                   BlockReopenQueue *queue, Error **errp)
1055{
1056    BDRVRBDState *s = state->bs->opaque;
1057    int ret = 0;
1058
1059    if (s->snap && state->flags & BDRV_O_RDWR) {
1060        error_setg(errp,
1061                   "Cannot change node '%s' to r/w when using RBD snapshot",
1062                   bdrv_get_device_or_node_name(state->bs));
1063        ret = -EINVAL;
1064    }
1065
1066    return ret;
1067}
1068
1069static void qemu_rbd_close(BlockDriverState *bs)
1070{
1071    BDRVRBDState *s = bs->opaque;
1072
1073    rbd_close(s->image);
1074    rados_ioctx_destroy(s->io_ctx);
1075    g_free(s->snap);
1076    g_free(s->image_name);
1077    rados_shutdown(s->cluster);
1078}
1079
1080/* Resize the RBD image and update the 'image_size' with the current size */
1081static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1082{
1083    BDRVRBDState *s = bs->opaque;
1084    int r;
1085
1086    r = rbd_resize(s->image, size);
1087    if (r < 0) {
1088        return r;
1089    }
1090
1091    s->image_size = size;
1092
1093    return 0;
1094}
1095
1096static void qemu_rbd_finish_bh(void *opaque)
1097{
1098    RBDTask *task = opaque;
1099    task->complete = true;
1100    aio_co_wake(task->co);
1101}
1102
1103/*
1104 * This is the completion callback function for all rbd aio calls
1105 * started from qemu_rbd_start_co().
1106 *
1107 * Note: this function is being called from a non qemu thread so
1108 * we need to be careful about what we do here. Generally we only
1109 * schedule a BH, and do the rest of the io completion handling
1110 * from qemu_rbd_finish_bh() which runs in a qemu context.
1111 */
1112static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1113{
1114    task->ret = rbd_aio_get_return_value(c);
1115    rbd_aio_release(c);
1116    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1117                            qemu_rbd_finish_bh, task);
1118}
1119
1120static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1121                                          uint64_t offset,
1122                                          uint64_t bytes,
1123                                          QEMUIOVector *qiov,
1124                                          int flags,
1125                                          RBDAIOCmd cmd)
1126{
1127    BDRVRBDState *s = bs->opaque;
1128    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1129    rbd_completion_t c;
1130    int r;
1131
1132    assert(!qiov || qiov->size == bytes);
1133
1134    if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1135        /*
1136         * RBD APIs don't allow us to write more than actual size, so in order
1137         * to support growing images, we resize the image before write
1138         * operations that exceed the current size.
1139         */
1140        if (offset + bytes > s->image_size) {
1141            int r = qemu_rbd_resize(bs, offset + bytes);
1142            if (r < 0) {
1143                return r;
1144            }
1145        }
1146    }
1147
1148    r = rbd_aio_create_completion(&task,
1149                                  (rbd_callback_t) qemu_rbd_completion_cb, &c);
1150    if (r < 0) {
1151        return r;
1152    }
1153
1154    switch (cmd) {
1155    case RBD_AIO_READ:
1156        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1157        break;
1158    case RBD_AIO_WRITE:
1159        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1160        break;
1161    case RBD_AIO_DISCARD:
1162        r = rbd_aio_discard(s->image, offset, bytes, c);
1163        break;
1164    case RBD_AIO_FLUSH:
1165        r = rbd_aio_flush(s->image, c);
1166        break;
1167#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1168    case RBD_AIO_WRITE_ZEROES: {
1169        int zero_flags = 0;
1170#ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1171        if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1172            zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1173        }
1174#endif
1175        r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1176        break;
1177    }
1178#endif
1179    default:
1180        r = -EINVAL;
1181    }
1182
1183    if (r < 0) {
1184        error_report("rbd request failed early: cmd %d offset %" PRIu64
1185                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1186                     bytes, flags, r, strerror(-r));
1187        rbd_aio_release(c);
1188        return r;
1189    }
1190
1191    while (!task.complete) {
1192        qemu_coroutine_yield();
1193    }
1194
1195    if (task.ret < 0) {
1196        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1197                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1198                     bytes, flags, task.ret, strerror(-task.ret));
1199        return task.ret;
1200    }
1201
1202    /* zero pad short reads */
1203    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1204        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1205    }
1206
1207    return 0;
1208}
1209
1210static int
1211coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1212                                int64_t bytes, QEMUIOVector *qiov,
1213                                BdrvRequestFlags flags)
1214{
1215    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1216}
1217
1218static int
1219coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1220                                 int64_t bytes, QEMUIOVector *qiov,
1221                                 BdrvRequestFlags flags)
1222{
1223    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1224}
1225
1226static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1227{
1228    return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1229}
1230
1231static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1232                                             int64_t offset, int64_t bytes)
1233{
1234    return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1235}
1236
1237#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1238static int
1239coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1240                                       int64_t bytes, BdrvRequestFlags flags)
1241{
1242    return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1243                             RBD_AIO_WRITE_ZEROES);
1244}
1245#endif
1246
1247static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1248{
1249    BDRVRBDState *s = bs->opaque;
1250    bdi->cluster_size = s->object_size;
1251    return 0;
1252}
1253
1254static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1255                                                     Error **errp)
1256{
1257    BDRVRBDState *s = bs->opaque;
1258    ImageInfoSpecific *spec_info;
1259    char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1260    int r;
1261
1262    if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1263        r = rbd_read(s->image, 0,
1264                     RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1265        if (r < 0) {
1266            error_setg_errno(errp, -r, "cannot read image start for probe");
1267            return NULL;
1268        }
1269    }
1270
1271    spec_info = g_new(ImageInfoSpecific, 1);
1272    *spec_info = (ImageInfoSpecific){
1273        .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1274        .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1275    };
1276
1277    if (memcmp(buf, rbd_luks_header_verification,
1278               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1279        spec_info->u.rbd.data->encryption_format =
1280                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1281        spec_info->u.rbd.data->has_encryption_format = true;
1282    } else if (memcmp(buf, rbd_luks2_header_verification,
1283               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1284        spec_info->u.rbd.data->encryption_format =
1285                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1286        spec_info->u.rbd.data->has_encryption_format = true;
1287    } else {
1288        spec_info->u.rbd.data->has_encryption_format = false;
1289    }
1290
1291    return spec_info;
1292}
1293
1294/*
1295 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1296 * value in the callback routine. Choose a value that does not conflict with
1297 * an existing exitcode and return it if we want to prematurely stop the
1298 * execution because we detected a change in the allocation status.
1299 */
1300#define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1301
1302static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1303                                    int exists, void *opaque)
1304{
1305    RBDDiffIterateReq *req = opaque;
1306
1307    assert(req->offs + req->bytes <= offs);
1308
1309    /* treat a hole like an unallocated area and bail out */
1310    if (!exists) {
1311        return 0;
1312    }
1313
1314    if (!req->exists && offs > req->offs) {
1315        /*
1316         * we started in an unallocated area and hit the first allocated
1317         * block. req->bytes must be set to the length of the unallocated area
1318         * before the allocated area. stop further processing.
1319         */
1320        req->bytes = offs - req->offs;
1321        return QEMU_RBD_EXIT_DIFF_ITERATE2;
1322    }
1323
1324    if (req->exists && offs > req->offs + req->bytes) {
1325        /*
1326         * we started in an allocated area and jumped over an unallocated area,
1327         * req->bytes contains the length of the allocated area before the
1328         * unallocated area. stop further processing.
1329         */
1330        return QEMU_RBD_EXIT_DIFF_ITERATE2;
1331    }
1332
1333    req->bytes += len;
1334    req->exists = true;
1335
1336    return 0;
1337}
1338
1339static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1340                                                 bool want_zero, int64_t offset,
1341                                                 int64_t bytes, int64_t *pnum,
1342                                                 int64_t *map,
1343                                                 BlockDriverState **file)
1344{
1345    BDRVRBDState *s = bs->opaque;
1346    int status, r;
1347    RBDDiffIterateReq req = { .offs = offset };
1348    uint64_t features, flags;
1349    uint64_t head = 0;
1350
1351    assert(offset + bytes <= s->image_size);
1352
1353    /* default to all sectors allocated */
1354    status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1355    *map = offset;
1356    *file = bs;
1357    *pnum = bytes;
1358
1359    /* check if RBD image supports fast-diff */
1360    r = rbd_get_features(s->image, &features);
1361    if (r < 0) {
1362        return status;
1363    }
1364    if (!(features & RBD_FEATURE_FAST_DIFF)) {
1365        return status;
1366    }
1367
1368    /* check if RBD fast-diff result is valid */
1369    r = rbd_get_flags(s->image, &flags);
1370    if (r < 0) {
1371        return status;
1372    }
1373    if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1374        return status;
1375    }
1376
1377#if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1378    /*
1379     * librbd had a bug until early 2022 that affected all versions of ceph that
1380     * supported fast-diff. This bug results in reporting of incorrect offsets
1381     * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1382     * Work around this bug by rounding down the offset to object boundaries.
1383     * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1384     * However, this workaround only works for non cloned images with default
1385     * striping.
1386     *
1387     * See: https://tracker.ceph.com/issues/53784
1388     */
1389
1390    /* check if RBD image has non-default striping enabled */
1391    if (features & RBD_FEATURE_STRIPINGV2) {
1392        return status;
1393    }
1394
1395#pragma GCC diagnostic push
1396#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1397    /*
1398     * check if RBD image is a clone (= has a parent).
1399     *
1400     * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1401     * replacement rbd_get_parent is not present in Luminous and Mimic.
1402     */
1403    if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1404        return status;
1405    }
1406#pragma GCC diagnostic pop
1407
1408    head = req.offs & (s->object_size - 1);
1409    req.offs -= head;
1410    bytes += head;
1411#endif
1412
1413    r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1414                          qemu_rbd_diff_iterate_cb, &req);
1415    if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1416        return status;
1417    }
1418    assert(req.bytes <= bytes);
1419    if (!req.exists) {
1420        if (r == 0) {
1421            /*
1422             * rbd_diff_iterate2 does not invoke callbacks for unallocated
1423             * areas. This here catches the case where no callback was
1424             * invoked at all (req.bytes == 0).
1425             */
1426            assert(req.bytes == 0);
1427            req.bytes = bytes;
1428        }
1429        status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1430    }
1431
1432    assert(req.bytes > head);
1433    *pnum = req.bytes - head;
1434    return status;
1435}
1436
1437static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1438{
1439    BDRVRBDState *s = bs->opaque;
1440    int r;
1441
1442    r = rbd_get_size(s->image, &s->image_size);
1443    if (r < 0) {
1444        return r;
1445    }
1446
1447    return s->image_size;
1448}
1449
1450static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1451                                             int64_t offset,
1452                                             bool exact,
1453                                             PreallocMode prealloc,
1454                                             BdrvRequestFlags flags,
1455                                             Error **errp)
1456{
1457    int r;
1458
1459    if (prealloc != PREALLOC_MODE_OFF) {
1460        error_setg(errp, "Unsupported preallocation mode '%s'",
1461                   PreallocMode_str(prealloc));
1462        return -ENOTSUP;
1463    }
1464
1465    r = qemu_rbd_resize(bs, offset);
1466    if (r < 0) {
1467        error_setg_errno(errp, -r, "Failed to resize file");
1468        return r;
1469    }
1470
1471    return 0;
1472}
1473
1474static int qemu_rbd_snap_create(BlockDriverState *bs,
1475                                QEMUSnapshotInfo *sn_info)
1476{
1477    BDRVRBDState *s = bs->opaque;
1478    int r;
1479
1480    if (sn_info->name[0] == '\0') {
1481        return -EINVAL; /* we need a name for rbd snapshots */
1482    }
1483
1484    /*
1485     * rbd snapshots are using the name as the user controlled unique identifier
1486     * we can't use the rbd snapid for that purpose, as it can't be set
1487     */
1488    if (sn_info->id_str[0] != '\0' &&
1489        strcmp(sn_info->id_str, sn_info->name) != 0) {
1490        return -EINVAL;
1491    }
1492
1493    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1494        return -ERANGE;
1495    }
1496
1497    r = rbd_snap_create(s->image, sn_info->name);
1498    if (r < 0) {
1499        error_report("failed to create snap: %s", strerror(-r));
1500        return r;
1501    }
1502
1503    return 0;
1504}
1505
1506static int qemu_rbd_snap_remove(BlockDriverState *bs,
1507                                const char *snapshot_id,
1508                                const char *snapshot_name,
1509                                Error **errp)
1510{
1511    BDRVRBDState *s = bs->opaque;
1512    int r;
1513
1514    if (!snapshot_name) {
1515        error_setg(errp, "rbd need a valid snapshot name");
1516        return -EINVAL;
1517    }
1518
1519    /* If snapshot_id is specified, it must be equal to name, see
1520       qemu_rbd_snap_list() */
1521    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1522        error_setg(errp,
1523                   "rbd do not support snapshot id, it should be NULL or "
1524                   "equal to snapshot name");
1525        return -EINVAL;
1526    }
1527
1528    r = rbd_snap_remove(s->image, snapshot_name);
1529    if (r < 0) {
1530        error_setg_errno(errp, -r, "Failed to remove the snapshot");
1531    }
1532    return r;
1533}
1534
1535static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1536                                  const char *snapshot_name)
1537{
1538    BDRVRBDState *s = bs->opaque;
1539
1540    return rbd_snap_rollback(s->image, snapshot_name);
1541}
1542
1543static int qemu_rbd_snap_list(BlockDriverState *bs,
1544                              QEMUSnapshotInfo **psn_tab)
1545{
1546    BDRVRBDState *s = bs->opaque;
1547    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1548    int i, snap_count;
1549    rbd_snap_info_t *snaps;
1550    int max_snaps = RBD_MAX_SNAPS;
1551
1552    do {
1553        snaps = g_new(rbd_snap_info_t, max_snaps);
1554        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1555        if (snap_count <= 0) {
1556            g_free(snaps);
1557        }
1558    } while (snap_count == -ERANGE);
1559
1560    if (snap_count <= 0) {
1561        goto done;
1562    }
1563
1564    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1565
1566    for (i = 0; i < snap_count; i++) {
1567        const char *snap_name = snaps[i].name;
1568
1569        sn_info = sn_tab + i;
1570        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1571        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1572
1573        sn_info->vm_state_size = snaps[i].size;
1574        sn_info->date_sec = 0;
1575        sn_info->date_nsec = 0;
1576        sn_info->vm_clock_nsec = 0;
1577    }
1578    rbd_snap_list_end(snaps);
1579    g_free(snaps);
1580
1581 done:
1582    *psn_tab = sn_tab;
1583    return snap_count;
1584}
1585
1586static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1587                                                      Error **errp)
1588{
1589    BDRVRBDState *s = bs->opaque;
1590    int r = rbd_invalidate_cache(s->image);
1591    if (r < 0) {
1592        error_setg_errno(errp, -r, "Failed to invalidate the cache");
1593    }
1594}
1595
1596static QemuOptsList qemu_rbd_create_opts = {
1597    .name = "rbd-create-opts",
1598    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1599    .desc = {
1600        {
1601            .name = BLOCK_OPT_SIZE,
1602            .type = QEMU_OPT_SIZE,
1603            .help = "Virtual disk size"
1604        },
1605        {
1606            .name = BLOCK_OPT_CLUSTER_SIZE,
1607            .type = QEMU_OPT_SIZE,
1608            .help = "RBD object size"
1609        },
1610        {
1611            .name = "password-secret",
1612            .type = QEMU_OPT_STRING,
1613            .help = "ID of secret providing the password",
1614        },
1615        {
1616            .name = "encrypt.format",
1617            .type = QEMU_OPT_STRING,
1618            .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1619        },
1620        {
1621            .name = "encrypt.cipher-alg",
1622            .type = QEMU_OPT_STRING,
1623            .help = "Name of encryption cipher algorithm"
1624                    " (allowed values: aes-128, aes-256)",
1625        },
1626        {
1627            .name = "encrypt.key-secret",
1628            .type = QEMU_OPT_STRING,
1629            .help = "ID of secret providing LUKS passphrase",
1630        },
1631        { /* end of list */ }
1632    }
1633};
1634
1635static const char *const qemu_rbd_strong_runtime_opts[] = {
1636    "pool",
1637    "namespace",
1638    "image",
1639    "conf",
1640    "snapshot",
1641    "user",
1642    "server.",
1643    "password-secret",
1644
1645    NULL
1646};
1647
1648static BlockDriver bdrv_rbd = {
1649    .format_name            = "rbd",
1650    .instance_size          = sizeof(BDRVRBDState),
1651    .bdrv_parse_filename    = qemu_rbd_parse_filename,
1652    .bdrv_file_open         = qemu_rbd_open,
1653    .bdrv_close             = qemu_rbd_close,
1654    .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1655    .bdrv_co_create         = qemu_rbd_co_create,
1656    .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1657    .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1658    .bdrv_get_info          = qemu_rbd_getinfo,
1659    .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1660    .create_opts            = &qemu_rbd_create_opts,
1661    .bdrv_getlength         = qemu_rbd_getlength,
1662    .bdrv_co_truncate       = qemu_rbd_co_truncate,
1663    .protocol_name          = "rbd",
1664
1665    .bdrv_co_preadv         = qemu_rbd_co_preadv,
1666    .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1667    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1668    .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1669#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1670    .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1671#endif
1672    .bdrv_co_block_status   = qemu_rbd_co_block_status,
1673
1674    .bdrv_snapshot_create   = qemu_rbd_snap_create,
1675    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1676    .bdrv_snapshot_list     = qemu_rbd_snap_list,
1677    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1678    .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1679
1680    .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1681};
1682
1683static void bdrv_rbd_init(void)
1684{
1685    bdrv_register(&bdrv_rbd);
1686}
1687
1688block_init(bdrv_rbd_init);
1689