qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include "qemu/osdep.h"
  15
  16#include <rbd/librbd.h>
  17#include "qapi/error.h"
  18#include "qemu/error-report.h"
  19#include "qemu/module.h"
  20#include "qemu/option.h"
  21#include "block/block-io.h"
  22#include "block/block_int.h"
  23#include "block/qdict.h"
  24#include "crypto/secret.h"
  25#include "qemu/cutils.h"
  26#include "sysemu/replay.h"
  27#include "qapi/qmp/qstring.h"
  28#include "qapi/qmp/qdict.h"
  29#include "qapi/qmp/qjson.h"
  30#include "qapi/qmp/qlist.h"
  31#include "qapi/qobject-input-visitor.h"
  32#include "qapi/qapi-visit-block-core.h"
  33
  34/*
  35 * When specifying the image filename use:
  36 *
  37 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  38 *
  39 * poolname must be the name of an existing rados pool.
  40 *
  41 * devicename is the name of the rbd image.
  42 *
  43 * Each option given is used to configure rados, and may be any valid
  44 * Ceph option, "id", or "conf".
  45 *
  46 * The "id" option indicates what user we should authenticate as to
  47 * the Ceph cluster.  If it is excluded we will use the Ceph default
  48 * (normally 'admin').
  49 *
  50 * The "conf" option specifies a Ceph configuration file to read.  If
  51 * it is not specified, we will read from the default Ceph locations
  52 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  53 * file, specify conf=/dev/null.
  54 *
  55 * Configuration values containing :, @, or = can be escaped with a
  56 * leading "\".
  57 */
  58
  59#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  60
  61#define RBD_MAX_SNAPS 100
  62
  63#define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
  64
  65static const char rbd_luks_header_verification[
  66        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
  67    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
  68};
  69
  70static const char rbd_luks2_header_verification[
  71        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
  72    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
  73};
  74
  75static const char rbd_layered_luks_header_verification[
  76        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
  77    'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
  78};
  79
  80static const char rbd_layered_luks2_header_verification[
  81        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
  82    'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
  83};
  84
  85typedef enum {
  86    RBD_AIO_READ,
  87    RBD_AIO_WRITE,
  88    RBD_AIO_DISCARD,
  89    RBD_AIO_FLUSH,
  90    RBD_AIO_WRITE_ZEROES
  91} RBDAIOCmd;
  92
  93typedef struct BDRVRBDState {
  94    rados_t cluster;
  95    rados_ioctx_t io_ctx;
  96    rbd_image_t image;
  97    char *image_name;
  98    char *snap;
  99    char *namespace;
 100    uint64_t image_size;
 101    uint64_t object_size;
 102} BDRVRBDState;
 103
 104typedef struct RBDTask {
 105    BlockDriverState *bs;
 106    Coroutine *co;
 107    bool complete;
 108    int64_t ret;
 109} RBDTask;
 110
 111typedef struct RBDDiffIterateReq {
 112    uint64_t offs;
 113    uint64_t bytes;
 114    bool exists;
 115} RBDDiffIterateReq;
 116
 117static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 118                            BlockdevOptionsRbd *opts, bool cache,
 119                            const char *keypairs, const char *secretid,
 120                            Error **errp);
 121
 122static char *qemu_rbd_strchr(char *src, char delim)
 123{
 124    char *p;
 125
 126    for (p = src; *p; ++p) {
 127        if (*p == delim) {
 128            return p;
 129        }
 130        if (*p == '\\' && p[1] != '\0') {
 131            ++p;
 132        }
 133    }
 134
 135    return NULL;
 136}
 137
 138
 139static char *qemu_rbd_next_tok(char *src, char delim, char **p)
 140{
 141    char *end;
 142
 143    *p = NULL;
 144
 145    end = qemu_rbd_strchr(src, delim);
 146    if (end) {
 147        *p = end + 1;
 148        *end = '\0';
 149    }
 150    return src;
 151}
 152
 153static void qemu_rbd_unescape(char *src)
 154{
 155    char *p;
 156
 157    for (p = src; *src; ++src, ++p) {
 158        if (*src == '\\' && src[1] != '\0') {
 159            src++;
 160        }
 161        *p = *src;
 162    }
 163    *p = '\0';
 164}
 165
 166static void qemu_rbd_parse_filename(const char *filename, QDict *options,
 167                                    Error **errp)
 168{
 169    const char *start;
 170    char *p, *buf;
 171    QList *keypairs = NULL;
 172    char *found_str, *image_name;
 173
 174    if (!strstart(filename, "rbd:", &start)) {
 175        error_setg(errp, "File name must start with 'rbd:'");
 176        return;
 177    }
 178
 179    buf = g_strdup(start);
 180    p = buf;
 181
 182    found_str = qemu_rbd_next_tok(p, '/', &p);
 183    if (!p) {
 184        error_setg(errp, "Pool name is required");
 185        goto done;
 186    }
 187    qemu_rbd_unescape(found_str);
 188    qdict_put_str(options, "pool", found_str);
 189
 190    if (qemu_rbd_strchr(p, '@')) {
 191        image_name = qemu_rbd_next_tok(p, '@', &p);
 192
 193        found_str = qemu_rbd_next_tok(p, ':', &p);
 194        qemu_rbd_unescape(found_str);
 195        qdict_put_str(options, "snapshot", found_str);
 196    } else {
 197        image_name = qemu_rbd_next_tok(p, ':', &p);
 198    }
 199    /* Check for namespace in the image_name */
 200    if (qemu_rbd_strchr(image_name, '/')) {
 201        found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
 202        qemu_rbd_unescape(found_str);
 203        qdict_put_str(options, "namespace", found_str);
 204    } else {
 205        qdict_put_str(options, "namespace", "");
 206    }
 207    qemu_rbd_unescape(image_name);
 208    qdict_put_str(options, "image", image_name);
 209    if (!p) {
 210        goto done;
 211    }
 212
 213    /* The following are essentially all key/value pairs, and we treat
 214     * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
 215    while (p) {
 216        char *name, *value;
 217        name = qemu_rbd_next_tok(p, '=', &p);
 218        if (!p) {
 219            error_setg(errp, "conf option %s has no value", name);
 220            break;
 221        }
 222
 223        qemu_rbd_unescape(name);
 224
 225        value = qemu_rbd_next_tok(p, ':', &p);
 226        qemu_rbd_unescape(value);
 227
 228        if (!strcmp(name, "conf")) {
 229            qdict_put_str(options, "conf", value);
 230        } else if (!strcmp(name, "id")) {
 231            qdict_put_str(options, "user", value);
 232        } else {
 233            /*
 234             * We pass these internally to qemu_rbd_set_keypairs(), so
 235             * we can get away with the simpler list of [ "key1",
 236             * "value1", "key2", "value2" ] rather than a raw dict
 237             * { "key1": "value1", "key2": "value2" } where we can't
 238             * guarantee order, or even a more correct but complex
 239             * [ { "key1": "value1" }, { "key2": "value2" } ]
 240             */
 241            if (!keypairs) {
 242                keypairs = qlist_new();
 243            }
 244            qlist_append_str(keypairs, name);
 245            qlist_append_str(keypairs, value);
 246        }
 247    }
 248
 249    if (keypairs) {
 250        qdict_put(options, "=keyvalue-pairs",
 251                  qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
 252    }
 253
 254done:
 255    g_free(buf);
 256    qobject_unref(keypairs);
 257    return;
 258}
 259
 260static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
 261                             Error **errp)
 262{
 263    char *key, *acr;
 264    int r;
 265    GString *accu;
 266    RbdAuthModeList *auth;
 267
 268    if (opts->key_secret) {
 269        key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
 270        if (!key) {
 271            return -EIO;
 272        }
 273        r = rados_conf_set(cluster, "key", key);
 274        g_free(key);
 275        if (r < 0) {
 276            error_setg_errno(errp, -r, "Could not set 'key'");
 277            return r;
 278        }
 279    }
 280
 281    if (opts->has_auth_client_required) {
 282        accu = g_string_new("");
 283        for (auth = opts->auth_client_required; auth; auth = auth->next) {
 284            if (accu->str[0]) {
 285                g_string_append_c(accu, ';');
 286            }
 287            g_string_append(accu, RbdAuthMode_str(auth->value));
 288        }
 289        acr = g_string_free(accu, FALSE);
 290        r = rados_conf_set(cluster, "auth_client_required", acr);
 291        g_free(acr);
 292        if (r < 0) {
 293            error_setg_errno(errp, -r,
 294                             "Could not set 'auth_client_required'");
 295            return r;
 296        }
 297    }
 298
 299    return 0;
 300}
 301
 302static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
 303                                 Error **errp)
 304{
 305    QList *keypairs;
 306    QString *name;
 307    QString *value;
 308    const char *key;
 309    size_t remaining;
 310    int ret = 0;
 311
 312    if (!keypairs_json) {
 313        return ret;
 314    }
 315    keypairs = qobject_to(QList,
 316                          qobject_from_json(keypairs_json, &error_abort));
 317    remaining = qlist_size(keypairs) / 2;
 318    assert(remaining);
 319
 320    while (remaining--) {
 321        name = qobject_to(QString, qlist_pop(keypairs));
 322        value = qobject_to(QString, qlist_pop(keypairs));
 323        assert(name && value);
 324        key = qstring_get_str(name);
 325
 326        ret = rados_conf_set(cluster, key, qstring_get_str(value));
 327        qobject_unref(value);
 328        if (ret < 0) {
 329            error_setg_errno(errp, -ret, "invalid conf option %s", key);
 330            qobject_unref(name);
 331            ret = -EINVAL;
 332            break;
 333        }
 334        qobject_unref(name);
 335    }
 336
 337    qobject_unref(keypairs);
 338    return ret;
 339}
 340
 341#ifdef LIBRBD_SUPPORTS_ENCRYPTION
 342static int qemu_rbd_convert_luks_options(
 343        RbdEncryptionOptionsLUKSBase *luks_opts,
 344        char **passphrase,
 345        size_t *passphrase_len,
 346        Error **errp)
 347{
 348    return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
 349                                 passphrase_len, errp);
 350}
 351
 352static int qemu_rbd_convert_luks_create_options(
 353        RbdEncryptionCreateOptionsLUKSBase *luks_opts,
 354        rbd_encryption_algorithm_t *alg,
 355        char **passphrase,
 356        size_t *passphrase_len,
 357        Error **errp)
 358{
 359    int r = 0;
 360
 361    r = qemu_rbd_convert_luks_options(
 362            qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
 363            passphrase, passphrase_len, errp);
 364    if (r < 0) {
 365        return r;
 366    }
 367
 368    if (luks_opts->has_cipher_alg) {
 369        switch (luks_opts->cipher_alg) {
 370            case QCRYPTO_CIPHER_ALG_AES_128: {
 371                *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
 372                break;
 373            }
 374            case QCRYPTO_CIPHER_ALG_AES_256: {
 375                *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
 376                break;
 377            }
 378            default: {
 379                r = -ENOTSUP;
 380                error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
 381                                 luks_opts->cipher_alg);
 382                return r;
 383            }
 384        }
 385    } else {
 386        /* default alg */
 387        *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
 388    }
 389
 390    return 0;
 391}
 392
 393static int qemu_rbd_encryption_format(rbd_image_t image,
 394                                      RbdEncryptionCreateOptions *encrypt,
 395                                      Error **errp)
 396{
 397    int r = 0;
 398    g_autofree char *passphrase = NULL;
 399    rbd_encryption_format_t format;
 400    rbd_encryption_options_t opts;
 401    rbd_encryption_luks1_format_options_t luks_opts;
 402    rbd_encryption_luks2_format_options_t luks2_opts;
 403    size_t opts_size;
 404    uint64_t raw_size, effective_size;
 405
 406    r = rbd_get_size(image, &raw_size);
 407    if (r < 0) {
 408        error_setg_errno(errp, -r, "cannot get raw image size");
 409        return r;
 410    }
 411
 412    switch (encrypt->format) {
 413        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
 414            memset(&luks_opts, 0, sizeof(luks_opts));
 415            format = RBD_ENCRYPTION_FORMAT_LUKS1;
 416            opts = &luks_opts;
 417            opts_size = sizeof(luks_opts);
 418            r = qemu_rbd_convert_luks_create_options(
 419                    qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
 420                    &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
 421                    errp);
 422            if (r < 0) {
 423                return r;
 424            }
 425            luks_opts.passphrase = passphrase;
 426            break;
 427        }
 428        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
 429            memset(&luks2_opts, 0, sizeof(luks2_opts));
 430            format = RBD_ENCRYPTION_FORMAT_LUKS2;
 431            opts = &luks2_opts;
 432            opts_size = sizeof(luks2_opts);
 433            r = qemu_rbd_convert_luks_create_options(
 434                    qapi_RbdEncryptionCreateOptionsLUKS2_base(
 435                            &encrypt->u.luks2),
 436                    &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
 437                    errp);
 438            if (r < 0) {
 439                return r;
 440            }
 441            luks2_opts.passphrase = passphrase;
 442            break;
 443        }
 444        default: {
 445            r = -ENOTSUP;
 446            error_setg_errno(
 447                    errp, -r, "unknown image encryption format: %u",
 448                    encrypt->format);
 449            return r;
 450        }
 451    }
 452
 453    r = rbd_encryption_format(image, format, opts, opts_size);
 454    if (r < 0) {
 455        error_setg_errno(errp, -r, "encryption format fail");
 456        return r;
 457    }
 458
 459    r = rbd_get_size(image, &effective_size);
 460    if (r < 0) {
 461        error_setg_errno(errp, -r, "cannot get effective image size");
 462        return r;
 463    }
 464
 465    r = rbd_resize(image, raw_size + (raw_size - effective_size));
 466    if (r < 0) {
 467        error_setg_errno(errp, -r, "cannot resize image after format");
 468        return r;
 469    }
 470
 471    return 0;
 472}
 473
 474static int qemu_rbd_encryption_load(rbd_image_t image,
 475                                    RbdEncryptionOptions *encrypt,
 476                                    Error **errp)
 477{
 478    int r = 0;
 479    g_autofree char *passphrase = NULL;
 480    rbd_encryption_luks1_format_options_t luks_opts;
 481    rbd_encryption_luks2_format_options_t luks2_opts;
 482#ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
 483    rbd_encryption_luks_format_options_t luks_any_opts;
 484#endif
 485    rbd_encryption_format_t format;
 486    rbd_encryption_options_t opts;
 487    size_t opts_size;
 488
 489    switch (encrypt->format) {
 490        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
 491            memset(&luks_opts, 0, sizeof(luks_opts));
 492            format = RBD_ENCRYPTION_FORMAT_LUKS1;
 493            opts = &luks_opts;
 494            opts_size = sizeof(luks_opts);
 495            r = qemu_rbd_convert_luks_options(
 496                    qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
 497                    &passphrase, &luks_opts.passphrase_size, errp);
 498            if (r < 0) {
 499                return r;
 500            }
 501            luks_opts.passphrase = passphrase;
 502            break;
 503        }
 504        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
 505            memset(&luks2_opts, 0, sizeof(luks2_opts));
 506            format = RBD_ENCRYPTION_FORMAT_LUKS2;
 507            opts = &luks2_opts;
 508            opts_size = sizeof(luks2_opts);
 509            r = qemu_rbd_convert_luks_options(
 510                    qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
 511                    &passphrase, &luks2_opts.passphrase_size, errp);
 512            if (r < 0) {
 513                return r;
 514            }
 515            luks2_opts.passphrase = passphrase;
 516            break;
 517        }
 518#ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
 519        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
 520            memset(&luks_any_opts, 0, sizeof(luks_any_opts));
 521            format = RBD_ENCRYPTION_FORMAT_LUKS;
 522            opts = &luks_any_opts;
 523            opts_size = sizeof(luks_any_opts);
 524            r = qemu_rbd_convert_luks_options(
 525                    qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
 526                    &passphrase, &luks_any_opts.passphrase_size, errp);
 527            if (r < 0) {
 528                return r;
 529            }
 530            luks_any_opts.passphrase = passphrase;
 531            break;
 532        }
 533#endif
 534        default: {
 535            r = -ENOTSUP;
 536            error_setg_errno(
 537                    errp, -r, "unknown image encryption format: %u",
 538                    encrypt->format);
 539            return r;
 540        }
 541    }
 542
 543    r = rbd_encryption_load(image, format, opts, opts_size);
 544    if (r < 0) {
 545        error_setg_errno(errp, -r, "encryption load fail");
 546        return r;
 547    }
 548
 549    return 0;
 550}
 551
 552#ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
 553static int qemu_rbd_encryption_load2(rbd_image_t image,
 554                                     RbdEncryptionOptions *encrypt,
 555                                     Error **errp)
 556{
 557    int r = 0;
 558    int encrypt_count = 1;
 559    int i;
 560    RbdEncryptionOptions *curr_encrypt;
 561    rbd_encryption_spec_t *specs;
 562    rbd_encryption_luks1_format_options_t *luks_opts;
 563    rbd_encryption_luks2_format_options_t *luks2_opts;
 564    rbd_encryption_luks_format_options_t *luks_any_opts;
 565
 566    /* count encryption options */
 567    for (curr_encrypt = encrypt->parent; curr_encrypt;
 568         curr_encrypt = curr_encrypt->parent) {
 569        ++encrypt_count;
 570    }
 571
 572    specs = g_new0(rbd_encryption_spec_t, encrypt_count);
 573
 574    curr_encrypt = encrypt;
 575    for (i = 0; i < encrypt_count; ++i) {
 576        switch (curr_encrypt->format) {
 577            case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
 578                specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
 579
 580                luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
 581                specs[i].opts = luks_opts;
 582                specs[i].opts_size = sizeof(*luks_opts);
 583
 584                r = qemu_rbd_convert_luks_options(
 585                        qapi_RbdEncryptionOptionsLUKS_base(
 586                                &curr_encrypt->u.luks),
 587                        (char **)&luks_opts->passphrase,
 588                        &luks_opts->passphrase_size,
 589                        errp);
 590                break;
 591            }
 592            case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
 593                specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
 594
 595                luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
 596                specs[i].opts = luks2_opts;
 597                specs[i].opts_size = sizeof(*luks2_opts);
 598
 599                r = qemu_rbd_convert_luks_options(
 600                        qapi_RbdEncryptionOptionsLUKS2_base(
 601                                &curr_encrypt->u.luks2),
 602                        (char **)&luks2_opts->passphrase,
 603                        &luks2_opts->passphrase_size,
 604                        errp);
 605                break;
 606            }
 607            case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
 608                specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
 609
 610                luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
 611                specs[i].opts = luks_any_opts;
 612                specs[i].opts_size = sizeof(*luks_any_opts);
 613
 614                r = qemu_rbd_convert_luks_options(
 615                        qapi_RbdEncryptionOptionsLUKSAny_base(
 616                                &curr_encrypt->u.luks_any),
 617                        (char **)&luks_any_opts->passphrase,
 618                        &luks_any_opts->passphrase_size,
 619                        errp);
 620                break;
 621            }
 622            default: {
 623                r = -ENOTSUP;
 624                error_setg_errno(
 625                        errp, -r, "unknown image encryption format: %u",
 626                        curr_encrypt->format);
 627            }
 628        }
 629
 630        if (r < 0) {
 631            goto exit;
 632        }
 633
 634        curr_encrypt = curr_encrypt->parent;
 635    }
 636
 637    r = rbd_encryption_load2(image, specs, encrypt_count);
 638    if (r < 0) {
 639        error_setg_errno(errp, -r, "layered encryption load fail");
 640        goto exit;
 641    }
 642
 643exit:
 644    for (i = 0; i < encrypt_count; ++i) {
 645        if (!specs[i].opts) {
 646            break;
 647        }
 648
 649        switch (specs[i].format) {
 650            case RBD_ENCRYPTION_FORMAT_LUKS1: {
 651                luks_opts = specs[i].opts;
 652                g_free((void *)luks_opts->passphrase);
 653                break;
 654            }
 655            case RBD_ENCRYPTION_FORMAT_LUKS2: {
 656                luks2_opts = specs[i].opts;
 657                g_free((void *)luks2_opts->passphrase);
 658                break;
 659            }
 660            case RBD_ENCRYPTION_FORMAT_LUKS: {
 661                luks_any_opts = specs[i].opts;
 662                g_free((void *)luks_any_opts->passphrase);
 663                break;
 664            }
 665        }
 666
 667        g_free(specs[i].opts);
 668    }
 669    g_free(specs);
 670    return r;
 671}
 672#endif
 673#endif
 674
 675/* FIXME Deprecate and remove keypairs or make it available in QMP. */
 676static int qemu_rbd_do_create(BlockdevCreateOptions *options,
 677                              const char *keypairs, const char *password_secret,
 678                              Error **errp)
 679{
 680    BlockdevCreateOptionsRbd *opts = &options->u.rbd;
 681    rados_t cluster;
 682    rados_ioctx_t io_ctx;
 683    int obj_order = 0;
 684    int ret;
 685
 686    assert(options->driver == BLOCKDEV_DRIVER_RBD);
 687    if (opts->location->snapshot) {
 688        error_setg(errp, "Can't use snapshot name for image creation");
 689        return -EINVAL;
 690    }
 691
 692#ifndef LIBRBD_SUPPORTS_ENCRYPTION
 693    if (opts->encrypt) {
 694        error_setg(errp, "RBD library does not support image encryption");
 695        return -ENOTSUP;
 696    }
 697#endif
 698
 699    if (opts->has_cluster_size) {
 700        int64_t objsize = opts->cluster_size;
 701        if ((objsize - 1) & objsize) {    /* not a power of 2? */
 702            error_setg(errp, "obj size needs to be power of 2");
 703            return -EINVAL;
 704        }
 705        if (objsize < 4096) {
 706            error_setg(errp, "obj size too small");
 707            return -EINVAL;
 708        }
 709        obj_order = ctz32(objsize);
 710    }
 711
 712    ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
 713                           password_secret, errp);
 714    if (ret < 0) {
 715        return ret;
 716    }
 717
 718    ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
 719    if (ret < 0) {
 720        error_setg_errno(errp, -ret, "error rbd create");
 721        goto out;
 722    }
 723
 724#ifdef LIBRBD_SUPPORTS_ENCRYPTION
 725    if (opts->encrypt) {
 726        rbd_image_t image;
 727
 728        ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
 729        if (ret < 0) {
 730            error_setg_errno(errp, -ret,
 731                             "error opening image '%s' for encryption format",
 732                             opts->location->image);
 733            goto out;
 734        }
 735
 736        ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
 737        rbd_close(image);
 738        if (ret < 0) {
 739            /* encryption format fail, try removing the image */
 740            rbd_remove(io_ctx, opts->location->image);
 741            goto out;
 742        }
 743    }
 744#endif
 745
 746    ret = 0;
 747out:
 748    rados_ioctx_destroy(io_ctx);
 749    rados_shutdown(cluster);
 750    return ret;
 751}
 752
 753static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
 754{
 755    return qemu_rbd_do_create(options, NULL, NULL, errp);
 756}
 757
 758static int qemu_rbd_extract_encryption_create_options(
 759        QemuOpts *opts,
 760        RbdEncryptionCreateOptions **spec,
 761        Error **errp)
 762{
 763    QDict *opts_qdict;
 764    QDict *encrypt_qdict;
 765    Visitor *v;
 766    int ret = 0;
 767
 768    opts_qdict = qemu_opts_to_qdict(opts, NULL);
 769    qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
 770    qobject_unref(opts_qdict);
 771    if (!qdict_size(encrypt_qdict)) {
 772        *spec = NULL;
 773        goto exit;
 774    }
 775
 776    /* Convert options into a QAPI object */
 777    v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
 778    if (!v) {
 779        ret = -EINVAL;
 780        goto exit;
 781    }
 782
 783    visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
 784    visit_free(v);
 785    if (!*spec) {
 786        ret = -EINVAL;
 787        goto exit;
 788    }
 789
 790exit:
 791    qobject_unref(encrypt_qdict);
 792    return ret;
 793}
 794
 795static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
 796                                                const char *filename,
 797                                                QemuOpts *opts,
 798                                                Error **errp)
 799{
 800    BlockdevCreateOptions *create_options;
 801    BlockdevCreateOptionsRbd *rbd_opts;
 802    BlockdevOptionsRbd *loc;
 803    RbdEncryptionCreateOptions *encrypt = NULL;
 804    Error *local_err = NULL;
 805    const char *keypairs, *password_secret;
 806    QDict *options = NULL;
 807    int ret = 0;
 808
 809    create_options = g_new0(BlockdevCreateOptions, 1);
 810    create_options->driver = BLOCKDEV_DRIVER_RBD;
 811    rbd_opts = &create_options->u.rbd;
 812
 813    rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
 814
 815    password_secret = qemu_opt_get(opts, "password-secret");
 816
 817    /* Read out options */
 818    rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
 819                              BDRV_SECTOR_SIZE);
 820    rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
 821                                                   BLOCK_OPT_CLUSTER_SIZE, 0);
 822    rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
 823
 824    options = qdict_new();
 825    qemu_rbd_parse_filename(filename, options, &local_err);
 826    if (local_err) {
 827        ret = -EINVAL;
 828        error_propagate(errp, local_err);
 829        goto exit;
 830    }
 831
 832    ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
 833    if (ret < 0) {
 834        goto exit;
 835    }
 836    rbd_opts->encrypt     = encrypt;
 837
 838    /*
 839     * Caution: while qdict_get_try_str() is fine, getting non-string
 840     * types would require more care.  When @options come from -blockdev
 841     * or blockdev_add, its members are typed according to the QAPI
 842     * schema, but when they come from -drive, they're all QString.
 843     */
 844    loc = rbd_opts->location;
 845    loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
 846    loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
 847    loc->user        = g_strdup(qdict_get_try_str(options, "user"));
 848    loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
 849    loc->image       = g_strdup(qdict_get_try_str(options, "image"));
 850    keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
 851
 852    ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
 853    if (ret < 0) {
 854        goto exit;
 855    }
 856
 857exit:
 858    qobject_unref(options);
 859    qapi_free_BlockdevCreateOptions(create_options);
 860    return ret;
 861}
 862
 863static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 864{
 865    const char **vals;
 866    const char *host, *port;
 867    char *rados_str;
 868    InetSocketAddressBaseList *p;
 869    int i, cnt;
 870
 871    if (!opts->has_server) {
 872        return NULL;
 873    }
 874
 875    for (cnt = 0, p = opts->server; p; p = p->next) {
 876        cnt++;
 877    }
 878
 879    vals = g_new(const char *, cnt + 1);
 880
 881    for (i = 0, p = opts->server; p; p = p->next, i++) {
 882        host = p->value->host;
 883        port = p->value->port;
 884
 885        if (strchr(host, ':')) {
 886            vals[i] = g_strdup_printf("[%s]:%s", host, port);
 887        } else {
 888            vals[i] = g_strdup_printf("%s:%s", host, port);
 889        }
 890    }
 891    vals[i] = NULL;
 892
 893    rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
 894    g_strfreev((char **)vals);
 895    return rados_str;
 896}
 897
 898static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 899                            BlockdevOptionsRbd *opts, bool cache,
 900                            const char *keypairs, const char *secretid,
 901                            Error **errp)
 902{
 903    char *mon_host = NULL;
 904    Error *local_err = NULL;
 905    int r;
 906
 907    if (secretid) {
 908        if (opts->key_secret) {
 909            error_setg(errp,
 910                       "Legacy 'password-secret' clashes with 'key-secret'");
 911            return -EINVAL;
 912        }
 913        opts->key_secret = g_strdup(secretid);
 914    }
 915
 916    mon_host = qemu_rbd_mon_host(opts, &local_err);
 917    if (local_err) {
 918        error_propagate(errp, local_err);
 919        r = -EINVAL;
 920        goto out;
 921    }
 922
 923    r = rados_create(cluster, opts->user);
 924    if (r < 0) {
 925        error_setg_errno(errp, -r, "error initializing");
 926        goto out;
 927    }
 928
 929    /* try default location when conf=NULL, but ignore failure */
 930    r = rados_conf_read_file(*cluster, opts->conf);
 931    if (opts->conf && r < 0) {
 932        error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
 933        goto failed_shutdown;
 934    }
 935
 936    r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
 937    if (r < 0) {
 938        goto failed_shutdown;
 939    }
 940
 941    if (mon_host) {
 942        r = rados_conf_set(*cluster, "mon_host", mon_host);
 943        if (r < 0) {
 944            goto failed_shutdown;
 945        }
 946    }
 947
 948    r = qemu_rbd_set_auth(*cluster, opts, errp);
 949    if (r < 0) {
 950        goto failed_shutdown;
 951    }
 952
 953    /*
 954     * Fallback to more conservative semantics if setting cache
 955     * options fails. Ignore errors from setting rbd_cache because the
 956     * only possible error is that the option does not exist, and
 957     * librbd defaults to no caching. If write through caching cannot
 958     * be set up, fall back to no caching.
 959     */
 960    if (cache) {
 961        rados_conf_set(*cluster, "rbd_cache", "true");
 962    } else {
 963        rados_conf_set(*cluster, "rbd_cache", "false");
 964    }
 965
 966    r = rados_connect(*cluster);
 967    if (r < 0) {
 968        error_setg_errno(errp, -r, "error connecting");
 969        goto failed_shutdown;
 970    }
 971
 972    r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
 973    if (r < 0) {
 974        error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
 975        goto failed_shutdown;
 976    }
 977
 978#ifdef HAVE_RBD_NAMESPACE_EXISTS
 979    if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
 980        bool exists;
 981
 982        r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
 983        if (r < 0) {
 984            error_setg_errno(errp, -r, "error checking namespace");
 985            goto failed_ioctx_destroy;
 986        }
 987
 988        if (!exists) {
 989            error_setg(errp, "namespace '%s' does not exist",
 990                       opts->q_namespace);
 991            r = -ENOENT;
 992            goto failed_ioctx_destroy;
 993        }
 994    }
 995#endif
 996
 997    /*
 998     * Set the namespace after opening the io context on the pool,
 999     * if nspace == NULL or if nspace == "", it is just as we did nothing
1000     */
1001    rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1002
1003    r = 0;
1004    goto out;
1005
1006#ifdef HAVE_RBD_NAMESPACE_EXISTS
1007failed_ioctx_destroy:
1008    rados_ioctx_destroy(*io_ctx);
1009#endif
1010failed_shutdown:
1011    rados_shutdown(*cluster);
1012out:
1013    g_free(mon_host);
1014    return r;
1015}
1016
1017static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1018                                    Error **errp)
1019{
1020    Visitor *v;
1021
1022    /* Convert the remaining options into a QAPI object */
1023    v = qobject_input_visitor_new_flat_confused(options, errp);
1024    if (!v) {
1025        return -EINVAL;
1026    }
1027
1028    visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1029    visit_free(v);
1030    if (!opts) {
1031        return -EINVAL;
1032    }
1033
1034    return 0;
1035}
1036
1037static int qemu_rbd_attempt_legacy_options(QDict *options,
1038                                           BlockdevOptionsRbd **opts,
1039                                           char **keypairs)
1040{
1041    char *filename;
1042    int r;
1043
1044    filename = g_strdup(qdict_get_try_str(options, "filename"));
1045    if (!filename) {
1046        return -EINVAL;
1047    }
1048    qdict_del(options, "filename");
1049
1050    qemu_rbd_parse_filename(filename, options, NULL);
1051
1052    /* keypairs freed by caller */
1053    *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1054    if (*keypairs) {
1055        qdict_del(options, "=keyvalue-pairs");
1056    }
1057
1058    r = qemu_rbd_convert_options(options, opts, NULL);
1059
1060    g_free(filename);
1061    return r;
1062}
1063
1064static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1065                         Error **errp)
1066{
1067    BDRVRBDState *s = bs->opaque;
1068    BlockdevOptionsRbd *opts = NULL;
1069    const QDictEntry *e;
1070    Error *local_err = NULL;
1071    char *keypairs, *secretid;
1072    rbd_image_info_t info;
1073    int r;
1074
1075    keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1076    if (keypairs) {
1077        qdict_del(options, "=keyvalue-pairs");
1078    }
1079
1080    secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1081    if (secretid) {
1082        qdict_del(options, "password-secret");
1083    }
1084
1085    r = qemu_rbd_convert_options(options, &opts, &local_err);
1086    if (local_err) {
1087        /* If keypairs are present, that means some options are present in
1088         * the modern option format.  Don't attempt to parse legacy option
1089         * formats, as we won't support mixed usage. */
1090        if (keypairs) {
1091            error_propagate(errp, local_err);
1092            goto out;
1093        }
1094
1095        /* If the initial attempt to convert and process the options failed,
1096         * we may be attempting to open an image file that has the rbd options
1097         * specified in the older format consisting of all key/value pairs
1098         * encoded in the filename.  Go ahead and attempt to parse the
1099         * filename, and see if we can pull out the required options. */
1100        r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1101        if (r < 0) {
1102            /* Propagate the original error, not the legacy parsing fallback
1103             * error, as the latter was just a best-effort attempt. */
1104            error_propagate(errp, local_err);
1105            goto out;
1106        }
1107        /* Take care whenever deciding to actually deprecate; once this ability
1108         * is removed, we will not be able to open any images with legacy-styled
1109         * backing image strings. */
1110        warn_report("RBD options encoded in the filename as keyvalue pairs "
1111                    "is deprecated");
1112    }
1113
1114    /* Remove the processed options from the QDict (the visitor processes
1115     * _all_ options in the QDict) */
1116    while ((e = qdict_first(options))) {
1117        qdict_del(options, e->key);
1118    }
1119
1120    r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1121                         !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1122    if (r < 0) {
1123        goto out;
1124    }
1125
1126    s->snap = g_strdup(opts->snapshot);
1127    s->image_name = g_strdup(opts->image);
1128
1129    /* rbd_open is always r/w */
1130    r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1131    if (r < 0) {
1132        error_setg_errno(errp, -r, "error reading header from %s",
1133                         s->image_name);
1134        goto failed_open;
1135    }
1136
1137    if (opts->encrypt) {
1138#ifdef LIBRBD_SUPPORTS_ENCRYPTION
1139        if (opts->encrypt->parent) {
1140#ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1141            r = qemu_rbd_encryption_load2(s->image, opts->encrypt, errp);
1142#else
1143            r = -ENOTSUP;
1144            error_setg(errp, "RBD library does not support layered encryption");
1145#endif
1146        } else {
1147            r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
1148        }
1149        if (r < 0) {
1150            goto failed_post_open;
1151        }
1152#else
1153        r = -ENOTSUP;
1154        error_setg(errp, "RBD library does not support image encryption");
1155        goto failed_post_open;
1156#endif
1157    }
1158
1159    r = rbd_stat(s->image, &info, sizeof(info));
1160    if (r < 0) {
1161        error_setg_errno(errp, -r, "error getting image info from %s",
1162                         s->image_name);
1163        goto failed_post_open;
1164    }
1165    s->image_size = info.size;
1166    s->object_size = info.obj_size;
1167
1168    /* If we are using an rbd snapshot, we must be r/o, otherwise
1169     * leave as-is */
1170    if (s->snap != NULL) {
1171        r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1172        if (r < 0) {
1173            goto failed_post_open;
1174        }
1175    }
1176
1177#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1178    bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1179#endif
1180
1181    /* When extending regular files, we get zeros from the OS */
1182    bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1183
1184    r = 0;
1185    goto out;
1186
1187failed_post_open:
1188    rbd_close(s->image);
1189failed_open:
1190    rados_ioctx_destroy(s->io_ctx);
1191    g_free(s->snap);
1192    g_free(s->image_name);
1193    rados_shutdown(s->cluster);
1194out:
1195    qapi_free_BlockdevOptionsRbd(opts);
1196    g_free(keypairs);
1197    g_free(secretid);
1198    return r;
1199}
1200
1201
1202/* Since RBD is currently always opened R/W via the API,
1203 * we just need to check if we are using a snapshot or not, in
1204 * order to determine if we will allow it to be R/W */
1205static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1206                                   BlockReopenQueue *queue, Error **errp)
1207{
1208    BDRVRBDState *s = state->bs->opaque;
1209    int ret = 0;
1210
1211    if (s->snap && state->flags & BDRV_O_RDWR) {
1212        error_setg(errp,
1213                   "Cannot change node '%s' to r/w when using RBD snapshot",
1214                   bdrv_get_device_or_node_name(state->bs));
1215        ret = -EINVAL;
1216    }
1217
1218    return ret;
1219}
1220
1221static void qemu_rbd_close(BlockDriverState *bs)
1222{
1223    BDRVRBDState *s = bs->opaque;
1224
1225    rbd_close(s->image);
1226    rados_ioctx_destroy(s->io_ctx);
1227    g_free(s->snap);
1228    g_free(s->image_name);
1229    rados_shutdown(s->cluster);
1230}
1231
1232/* Resize the RBD image and update the 'image_size' with the current size */
1233static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1234{
1235    BDRVRBDState *s = bs->opaque;
1236    int r;
1237
1238    r = rbd_resize(s->image, size);
1239    if (r < 0) {
1240        return r;
1241    }
1242
1243    s->image_size = size;
1244
1245    return 0;
1246}
1247
1248static void qemu_rbd_finish_bh(void *opaque)
1249{
1250    RBDTask *task = opaque;
1251    task->complete = true;
1252    aio_co_wake(task->co);
1253}
1254
1255/*
1256 * This is the completion callback function for all rbd aio calls
1257 * started from qemu_rbd_start_co().
1258 *
1259 * Note: this function is being called from a non qemu thread so
1260 * we need to be careful about what we do here. Generally we only
1261 * schedule a BH, and do the rest of the io completion handling
1262 * from qemu_rbd_finish_bh() which runs in a qemu context.
1263 */
1264static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1265{
1266    task->ret = rbd_aio_get_return_value(c);
1267    rbd_aio_release(c);
1268    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1269                            qemu_rbd_finish_bh, task);
1270}
1271
1272static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1273                                          uint64_t offset,
1274                                          uint64_t bytes,
1275                                          QEMUIOVector *qiov,
1276                                          int flags,
1277                                          RBDAIOCmd cmd)
1278{
1279    BDRVRBDState *s = bs->opaque;
1280    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1281    rbd_completion_t c;
1282    int r;
1283
1284    assert(!qiov || qiov->size == bytes);
1285
1286    if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1287        /*
1288         * RBD APIs don't allow us to write more than actual size, so in order
1289         * to support growing images, we resize the image before write
1290         * operations that exceed the current size.
1291         */
1292        if (offset + bytes > s->image_size) {
1293            int r = qemu_rbd_resize(bs, offset + bytes);
1294            if (r < 0) {
1295                return r;
1296            }
1297        }
1298    }
1299
1300    r = rbd_aio_create_completion(&task,
1301                                  (rbd_callback_t) qemu_rbd_completion_cb, &c);
1302    if (r < 0) {
1303        return r;
1304    }
1305
1306    switch (cmd) {
1307    case RBD_AIO_READ:
1308        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1309        break;
1310    case RBD_AIO_WRITE:
1311        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1312        break;
1313    case RBD_AIO_DISCARD:
1314        r = rbd_aio_discard(s->image, offset, bytes, c);
1315        break;
1316    case RBD_AIO_FLUSH:
1317        r = rbd_aio_flush(s->image, c);
1318        break;
1319#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1320    case RBD_AIO_WRITE_ZEROES: {
1321        int zero_flags = 0;
1322#ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1323        if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1324            zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1325        }
1326#endif
1327        r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1328        break;
1329    }
1330#endif
1331    default:
1332        r = -EINVAL;
1333    }
1334
1335    if (r < 0) {
1336        error_report("rbd request failed early: cmd %d offset %" PRIu64
1337                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1338                     bytes, flags, r, strerror(-r));
1339        rbd_aio_release(c);
1340        return r;
1341    }
1342
1343    while (!task.complete) {
1344        qemu_coroutine_yield();
1345    }
1346
1347    if (task.ret < 0) {
1348        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1349                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1350                     bytes, flags, task.ret, strerror(-task.ret));
1351        return task.ret;
1352    }
1353
1354    /* zero pad short reads */
1355    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1356        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1357    }
1358
1359    return 0;
1360}
1361
1362static int
1363coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1364                                int64_t bytes, QEMUIOVector *qiov,
1365                                BdrvRequestFlags flags)
1366{
1367    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1368}
1369
1370static int
1371coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1372                                 int64_t bytes, QEMUIOVector *qiov,
1373                                 BdrvRequestFlags flags)
1374{
1375    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1376}
1377
1378static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1379{
1380    return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1381}
1382
1383static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1384                                             int64_t offset, int64_t bytes)
1385{
1386    return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1387}
1388
1389#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1390static int
1391coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1392                                       int64_t bytes, BdrvRequestFlags flags)
1393{
1394    return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1395                             RBD_AIO_WRITE_ZEROES);
1396}
1397#endif
1398
1399static int coroutine_fn
1400qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1401{
1402    BDRVRBDState *s = bs->opaque;
1403    bdi->cluster_size = s->object_size;
1404    return 0;
1405}
1406
1407static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1408                                                     Error **errp)
1409{
1410    BDRVRBDState *s = bs->opaque;
1411    ImageInfoSpecific *spec_info;
1412    char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1413    int r;
1414
1415    if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1416        r = rbd_read(s->image, 0,
1417                     RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1418        if (r < 0) {
1419            error_setg_errno(errp, -r, "cannot read image start for probe");
1420            return NULL;
1421        }
1422    }
1423
1424    spec_info = g_new(ImageInfoSpecific, 1);
1425    *spec_info = (ImageInfoSpecific){
1426        .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1427        .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1428    };
1429
1430    if (memcmp(buf, rbd_luks_header_verification,
1431               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1432        spec_info->u.rbd.data->encryption_format =
1433                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1434        spec_info->u.rbd.data->has_encryption_format = true;
1435    } else if (memcmp(buf, rbd_luks2_header_verification,
1436               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1437        spec_info->u.rbd.data->encryption_format =
1438                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1439        spec_info->u.rbd.data->has_encryption_format = true;
1440    } else if (memcmp(buf, rbd_layered_luks_header_verification,
1441               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1442        spec_info->u.rbd.data->encryption_format =
1443                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1444        spec_info->u.rbd.data->has_encryption_format = true;
1445    } else if (memcmp(buf, rbd_layered_luks2_header_verification,
1446               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1447        spec_info->u.rbd.data->encryption_format =
1448                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1449        spec_info->u.rbd.data->has_encryption_format = true;
1450    } else {
1451        spec_info->u.rbd.data->has_encryption_format = false;
1452    }
1453
1454    return spec_info;
1455}
1456
1457/*
1458 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1459 * value in the callback routine. Choose a value that does not conflict with
1460 * an existing exitcode and return it if we want to prematurely stop the
1461 * execution because we detected a change in the allocation status.
1462 */
1463#define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1464
1465static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1466                                    int exists, void *opaque)
1467{
1468    RBDDiffIterateReq *req = opaque;
1469
1470    assert(req->offs + req->bytes <= offs);
1471
1472    /* treat a hole like an unallocated area and bail out */
1473    if (!exists) {
1474        return 0;
1475    }
1476
1477    if (!req->exists && offs > req->offs) {
1478        /*
1479         * we started in an unallocated area and hit the first allocated
1480         * block. req->bytes must be set to the length of the unallocated area
1481         * before the allocated area. stop further processing.
1482         */
1483        req->bytes = offs - req->offs;
1484        return QEMU_RBD_EXIT_DIFF_ITERATE2;
1485    }
1486
1487    if (req->exists && offs > req->offs + req->bytes) {
1488        /*
1489         * we started in an allocated area and jumped over an unallocated area,
1490         * req->bytes contains the length of the allocated area before the
1491         * unallocated area. stop further processing.
1492         */
1493        return QEMU_RBD_EXIT_DIFF_ITERATE2;
1494    }
1495
1496    req->bytes += len;
1497    req->exists = true;
1498
1499    return 0;
1500}
1501
1502static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1503                                                 bool want_zero, int64_t offset,
1504                                                 int64_t bytes, int64_t *pnum,
1505                                                 int64_t *map,
1506                                                 BlockDriverState **file)
1507{
1508    BDRVRBDState *s = bs->opaque;
1509    int status, r;
1510    RBDDiffIterateReq req = { .offs = offset };
1511    uint64_t features, flags;
1512    uint64_t head = 0;
1513
1514    assert(offset + bytes <= s->image_size);
1515
1516    /* default to all sectors allocated */
1517    status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1518    *map = offset;
1519    *file = bs;
1520    *pnum = bytes;
1521
1522    /* check if RBD image supports fast-diff */
1523    r = rbd_get_features(s->image, &features);
1524    if (r < 0) {
1525        return status;
1526    }
1527    if (!(features & RBD_FEATURE_FAST_DIFF)) {
1528        return status;
1529    }
1530
1531    /* check if RBD fast-diff result is valid */
1532    r = rbd_get_flags(s->image, &flags);
1533    if (r < 0) {
1534        return status;
1535    }
1536    if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1537        return status;
1538    }
1539
1540#if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1541    /*
1542     * librbd had a bug until early 2022 that affected all versions of ceph that
1543     * supported fast-diff. This bug results in reporting of incorrect offsets
1544     * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1545     * Work around this bug by rounding down the offset to object boundaries.
1546     * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1547     * However, this workaround only works for non cloned images with default
1548     * striping.
1549     *
1550     * See: https://tracker.ceph.com/issues/53784
1551     */
1552
1553    /* check if RBD image has non-default striping enabled */
1554    if (features & RBD_FEATURE_STRIPINGV2) {
1555        return status;
1556    }
1557
1558#pragma GCC diagnostic push
1559#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1560    /*
1561     * check if RBD image is a clone (= has a parent).
1562     *
1563     * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1564     * replacement rbd_get_parent is not present in Luminous and Mimic.
1565     */
1566    if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1567        return status;
1568    }
1569#pragma GCC diagnostic pop
1570
1571    head = req.offs & (s->object_size - 1);
1572    req.offs -= head;
1573    bytes += head;
1574#endif
1575
1576    r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1577                          qemu_rbd_diff_iterate_cb, &req);
1578    if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1579        return status;
1580    }
1581    assert(req.bytes <= bytes);
1582    if (!req.exists) {
1583        if (r == 0) {
1584            /*
1585             * rbd_diff_iterate2 does not invoke callbacks for unallocated
1586             * areas. This here catches the case where no callback was
1587             * invoked at all (req.bytes == 0).
1588             */
1589            assert(req.bytes == 0);
1590            req.bytes = bytes;
1591        }
1592        status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1593    }
1594
1595    assert(req.bytes > head);
1596    *pnum = req.bytes - head;
1597    return status;
1598}
1599
1600static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1601{
1602    BDRVRBDState *s = bs->opaque;
1603    int r;
1604
1605    r = rbd_get_size(s->image, &s->image_size);
1606    if (r < 0) {
1607        return r;
1608    }
1609
1610    return s->image_size;
1611}
1612
1613static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1614                                             int64_t offset,
1615                                             bool exact,
1616                                             PreallocMode prealloc,
1617                                             BdrvRequestFlags flags,
1618                                             Error **errp)
1619{
1620    int r;
1621
1622    if (prealloc != PREALLOC_MODE_OFF) {
1623        error_setg(errp, "Unsupported preallocation mode '%s'",
1624                   PreallocMode_str(prealloc));
1625        return -ENOTSUP;
1626    }
1627
1628    r = qemu_rbd_resize(bs, offset);
1629    if (r < 0) {
1630        error_setg_errno(errp, -r, "Failed to resize file");
1631        return r;
1632    }
1633
1634    return 0;
1635}
1636
1637static int qemu_rbd_snap_create(BlockDriverState *bs,
1638                                QEMUSnapshotInfo *sn_info)
1639{
1640    BDRVRBDState *s = bs->opaque;
1641    int r;
1642
1643    if (sn_info->name[0] == '\0') {
1644        return -EINVAL; /* we need a name for rbd snapshots */
1645    }
1646
1647    /*
1648     * rbd snapshots are using the name as the user controlled unique identifier
1649     * we can't use the rbd snapid for that purpose, as it can't be set
1650     */
1651    if (sn_info->id_str[0] != '\0' &&
1652        strcmp(sn_info->id_str, sn_info->name) != 0) {
1653        return -EINVAL;
1654    }
1655
1656    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1657        return -ERANGE;
1658    }
1659
1660    r = rbd_snap_create(s->image, sn_info->name);
1661    if (r < 0) {
1662        error_report("failed to create snap: %s", strerror(-r));
1663        return r;
1664    }
1665
1666    return 0;
1667}
1668
1669static int qemu_rbd_snap_remove(BlockDriverState *bs,
1670                                const char *snapshot_id,
1671                                const char *snapshot_name,
1672                                Error **errp)
1673{
1674    BDRVRBDState *s = bs->opaque;
1675    int r;
1676
1677    if (!snapshot_name) {
1678        error_setg(errp, "rbd need a valid snapshot name");
1679        return -EINVAL;
1680    }
1681
1682    /* If snapshot_id is specified, it must be equal to name, see
1683       qemu_rbd_snap_list() */
1684    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1685        error_setg(errp,
1686                   "rbd do not support snapshot id, it should be NULL or "
1687                   "equal to snapshot name");
1688        return -EINVAL;
1689    }
1690
1691    r = rbd_snap_remove(s->image, snapshot_name);
1692    if (r < 0) {
1693        error_setg_errno(errp, -r, "Failed to remove the snapshot");
1694    }
1695    return r;
1696}
1697
1698static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1699                                  const char *snapshot_name)
1700{
1701    BDRVRBDState *s = bs->opaque;
1702
1703    return rbd_snap_rollback(s->image, snapshot_name);
1704}
1705
1706static int qemu_rbd_snap_list(BlockDriverState *bs,
1707                              QEMUSnapshotInfo **psn_tab)
1708{
1709    BDRVRBDState *s = bs->opaque;
1710    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1711    int i, snap_count;
1712    rbd_snap_info_t *snaps;
1713    int max_snaps = RBD_MAX_SNAPS;
1714
1715    do {
1716        snaps = g_new(rbd_snap_info_t, max_snaps);
1717        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1718        if (snap_count <= 0) {
1719            g_free(snaps);
1720        }
1721    } while (snap_count == -ERANGE);
1722
1723    if (snap_count <= 0) {
1724        goto done;
1725    }
1726
1727    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1728
1729    for (i = 0; i < snap_count; i++) {
1730        const char *snap_name = snaps[i].name;
1731
1732        sn_info = sn_tab + i;
1733        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1734        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1735
1736        sn_info->vm_state_size = snaps[i].size;
1737        sn_info->date_sec = 0;
1738        sn_info->date_nsec = 0;
1739        sn_info->vm_clock_nsec = 0;
1740    }
1741    rbd_snap_list_end(snaps);
1742    g_free(snaps);
1743
1744 done:
1745    *psn_tab = sn_tab;
1746    return snap_count;
1747}
1748
1749static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1750                                                      Error **errp)
1751{
1752    BDRVRBDState *s = bs->opaque;
1753    int r = rbd_invalidate_cache(s->image);
1754    if (r < 0) {
1755        error_setg_errno(errp, -r, "Failed to invalidate the cache");
1756    }
1757}
1758
1759static QemuOptsList qemu_rbd_create_opts = {
1760    .name = "rbd-create-opts",
1761    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1762    .desc = {
1763        {
1764            .name = BLOCK_OPT_SIZE,
1765            .type = QEMU_OPT_SIZE,
1766            .help = "Virtual disk size"
1767        },
1768        {
1769            .name = BLOCK_OPT_CLUSTER_SIZE,
1770            .type = QEMU_OPT_SIZE,
1771            .help = "RBD object size"
1772        },
1773        {
1774            .name = "password-secret",
1775            .type = QEMU_OPT_STRING,
1776            .help = "ID of secret providing the password",
1777        },
1778        {
1779            .name = "encrypt.format",
1780            .type = QEMU_OPT_STRING,
1781            .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1782        },
1783        {
1784            .name = "encrypt.cipher-alg",
1785            .type = QEMU_OPT_STRING,
1786            .help = "Name of encryption cipher algorithm"
1787                    " (allowed values: aes-128, aes-256)",
1788        },
1789        {
1790            .name = "encrypt.key-secret",
1791            .type = QEMU_OPT_STRING,
1792            .help = "ID of secret providing LUKS passphrase",
1793        },
1794        { /* end of list */ }
1795    }
1796};
1797
1798static const char *const qemu_rbd_strong_runtime_opts[] = {
1799    "pool",
1800    "namespace",
1801    "image",
1802    "conf",
1803    "snapshot",
1804    "user",
1805    "server.",
1806    "password-secret",
1807
1808    NULL
1809};
1810
1811static BlockDriver bdrv_rbd = {
1812    .format_name            = "rbd",
1813    .instance_size          = sizeof(BDRVRBDState),
1814    .bdrv_parse_filename    = qemu_rbd_parse_filename,
1815    .bdrv_file_open         = qemu_rbd_open,
1816    .bdrv_close             = qemu_rbd_close,
1817    .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1818    .bdrv_co_create         = qemu_rbd_co_create,
1819    .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1820    .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1821    .bdrv_co_get_info       = qemu_rbd_co_get_info,
1822    .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1823    .create_opts            = &qemu_rbd_create_opts,
1824    .bdrv_co_getlength      = qemu_rbd_co_getlength,
1825    .bdrv_co_truncate       = qemu_rbd_co_truncate,
1826    .protocol_name          = "rbd",
1827
1828    .bdrv_co_preadv         = qemu_rbd_co_preadv,
1829    .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1830    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1831    .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1832#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1833    .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1834#endif
1835    .bdrv_co_block_status   = qemu_rbd_co_block_status,
1836
1837    .bdrv_snapshot_create   = qemu_rbd_snap_create,
1838    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1839    .bdrv_snapshot_list     = qemu_rbd_snap_list,
1840    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1841    .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1842
1843    .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1844};
1845
1846static void bdrv_rbd_init(void)
1847{
1848    bdrv_register(&bdrv_rbd);
1849}
1850
1851block_init(bdrv_rbd_init);
1852