qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include "qemu/osdep.h"
  15
  16#include <rbd/librbd.h>
  17#include "qapi/error.h"
  18#include "qemu/error-report.h"
  19#include "qemu/module.h"
  20#include "qemu/option.h"
  21#include "block/block_int.h"
  22#include "block/qdict.h"
  23#include "crypto/secret.h"
  24#include "qemu/cutils.h"
  25#include "sysemu/replay.h"
  26#include "qapi/qmp/qstring.h"
  27#include "qapi/qmp/qdict.h"
  28#include "qapi/qmp/qjson.h"
  29#include "qapi/qmp/qlist.h"
  30#include "qapi/qobject-input-visitor.h"
  31#include "qapi/qapi-visit-block-core.h"
  32
  33/*
  34 * When specifying the image filename use:
  35 *
  36 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  37 *
  38 * poolname must be the name of an existing rados pool.
  39 *
  40 * devicename is the name of the rbd image.
  41 *
  42 * Each option given is used to configure rados, and may be any valid
  43 * Ceph option, "id", or "conf".
  44 *
  45 * The "id" option indicates what user we should authenticate as to
  46 * the Ceph cluster.  If it is excluded we will use the Ceph default
  47 * (normally 'admin').
  48 *
  49 * The "conf" option specifies a Ceph configuration file to read.  If
  50 * it is not specified, we will read from the default Ceph locations
  51 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  52 * file, specify conf=/dev/null.
  53 *
  54 * Configuration values containing :, @, or = can be escaped with a
  55 * leading "\".
  56 */
  57
  58/* rbd_aio_discard added in 0.1.2 */
  59#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
  60#define LIBRBD_SUPPORTS_DISCARD
  61#else
  62#undef LIBRBD_SUPPORTS_DISCARD
  63#endif
  64
  65#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  66
  67#define RBD_MAX_SNAPS 100
  68
  69/* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
  70#ifdef LIBRBD_SUPPORTS_IOVEC
  71#define LIBRBD_USE_IOVEC 1
  72#else
  73#define LIBRBD_USE_IOVEC 0
  74#endif
  75
  76typedef enum {
  77    RBD_AIO_READ,
  78    RBD_AIO_WRITE,
  79    RBD_AIO_DISCARD,
  80    RBD_AIO_FLUSH
  81} RBDAIOCmd;
  82
  83typedef struct RBDAIOCB {
  84    BlockAIOCB common;
  85    int64_t ret;
  86    QEMUIOVector *qiov;
  87    char *bounce;
  88    RBDAIOCmd cmd;
  89    int error;
  90    struct BDRVRBDState *s;
  91} RBDAIOCB;
  92
  93typedef struct RADOSCB {
  94    RBDAIOCB *acb;
  95    struct BDRVRBDState *s;
  96    int64_t size;
  97    char *buf;
  98    int64_t ret;
  99} RADOSCB;
 100
 101typedef struct BDRVRBDState {
 102    rados_t cluster;
 103    rados_ioctx_t io_ctx;
 104    rbd_image_t image;
 105    char *image_name;
 106    char *snap;
 107    uint64_t image_size;
 108} BDRVRBDState;
 109
 110static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 111                            BlockdevOptionsRbd *opts, bool cache,
 112                            const char *keypairs, const char *secretid,
 113                            Error **errp);
 114
 115static char *qemu_rbd_next_tok(char *src, char delim, char **p)
 116{
 117    char *end;
 118
 119    *p = NULL;
 120
 121    for (end = src; *end; ++end) {
 122        if (*end == delim) {
 123            break;
 124        }
 125        if (*end == '\\' && end[1] != '\0') {
 126            end++;
 127        }
 128    }
 129    if (*end == delim) {
 130        *p = end + 1;
 131        *end = '\0';
 132    }
 133    return src;
 134}
 135
 136static void qemu_rbd_unescape(char *src)
 137{
 138    char *p;
 139
 140    for (p = src; *src; ++src, ++p) {
 141        if (*src == '\\' && src[1] != '\0') {
 142            src++;
 143        }
 144        *p = *src;
 145    }
 146    *p = '\0';
 147}
 148
 149static void qemu_rbd_parse_filename(const char *filename, QDict *options,
 150                                    Error **errp)
 151{
 152    const char *start;
 153    char *p, *buf;
 154    QList *keypairs = NULL;
 155    char *found_str;
 156
 157    if (!strstart(filename, "rbd:", &start)) {
 158        error_setg(errp, "File name must start with 'rbd:'");
 159        return;
 160    }
 161
 162    buf = g_strdup(start);
 163    p = buf;
 164
 165    found_str = qemu_rbd_next_tok(p, '/', &p);
 166    if (!p) {
 167        error_setg(errp, "Pool name is required");
 168        goto done;
 169    }
 170    qemu_rbd_unescape(found_str);
 171    qdict_put_str(options, "pool", found_str);
 172
 173    if (strchr(p, '@')) {
 174        found_str = qemu_rbd_next_tok(p, '@', &p);
 175        qemu_rbd_unescape(found_str);
 176        qdict_put_str(options, "image", found_str);
 177
 178        found_str = qemu_rbd_next_tok(p, ':', &p);
 179        qemu_rbd_unescape(found_str);
 180        qdict_put_str(options, "snapshot", found_str);
 181    } else {
 182        found_str = qemu_rbd_next_tok(p, ':', &p);
 183        qemu_rbd_unescape(found_str);
 184        qdict_put_str(options, "image", found_str);
 185    }
 186    if (!p) {
 187        goto done;
 188    }
 189
 190    /* The following are essentially all key/value pairs, and we treat
 191     * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
 192    while (p) {
 193        char *name, *value;
 194        name = qemu_rbd_next_tok(p, '=', &p);
 195        if (!p) {
 196            error_setg(errp, "conf option %s has no value", name);
 197            break;
 198        }
 199
 200        qemu_rbd_unescape(name);
 201
 202        value = qemu_rbd_next_tok(p, ':', &p);
 203        qemu_rbd_unescape(value);
 204
 205        if (!strcmp(name, "conf")) {
 206            qdict_put_str(options, "conf", value);
 207        } else if (!strcmp(name, "id")) {
 208            qdict_put_str(options, "user", value);
 209        } else {
 210            /*
 211             * We pass these internally to qemu_rbd_set_keypairs(), so
 212             * we can get away with the simpler list of [ "key1",
 213             * "value1", "key2", "value2" ] rather than a raw dict
 214             * { "key1": "value1", "key2": "value2" } where we can't
 215             * guarantee order, or even a more correct but complex
 216             * [ { "key1": "value1" }, { "key2": "value2" } ]
 217             */
 218            if (!keypairs) {
 219                keypairs = qlist_new();
 220            }
 221            qlist_append_str(keypairs, name);
 222            qlist_append_str(keypairs, value);
 223        }
 224    }
 225
 226    if (keypairs) {
 227        qdict_put(options, "=keyvalue-pairs",
 228                  qobject_to_json(QOBJECT(keypairs)));
 229    }
 230
 231done:
 232    g_free(buf);
 233    qobject_unref(keypairs);
 234    return;
 235}
 236
 237
 238static void qemu_rbd_refresh_limits(BlockDriverState *bs, Error **errp)
 239{
 240    /* XXX Does RBD support AIO on less than 512-byte alignment? */
 241    bs->bl.request_alignment = 512;
 242}
 243
 244
 245static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
 246                             Error **errp)
 247{
 248    char *key, *acr;
 249    int r;
 250    GString *accu;
 251    RbdAuthModeList *auth;
 252
 253    if (opts->key_secret) {
 254        key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
 255        if (!key) {
 256            return -EIO;
 257        }
 258        r = rados_conf_set(cluster, "key", key);
 259        g_free(key);
 260        if (r < 0) {
 261            error_setg_errno(errp, -r, "Could not set 'key'");
 262            return r;
 263        }
 264    }
 265
 266    if (opts->has_auth_client_required) {
 267        accu = g_string_new("");
 268        for (auth = opts->auth_client_required; auth; auth = auth->next) {
 269            if (accu->str[0]) {
 270                g_string_append_c(accu, ';');
 271            }
 272            g_string_append(accu, RbdAuthMode_str(auth->value));
 273        }
 274        acr = g_string_free(accu, FALSE);
 275        r = rados_conf_set(cluster, "auth_client_required", acr);
 276        g_free(acr);
 277        if (r < 0) {
 278            error_setg_errno(errp, -r,
 279                             "Could not set 'auth_client_required'");
 280            return r;
 281        }
 282    }
 283
 284    return 0;
 285}
 286
 287static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
 288                                 Error **errp)
 289{
 290    QList *keypairs;
 291    QString *name;
 292    QString *value;
 293    const char *key;
 294    size_t remaining;
 295    int ret = 0;
 296
 297    if (!keypairs_json) {
 298        return ret;
 299    }
 300    keypairs = qobject_to(QList,
 301                          qobject_from_json(keypairs_json, &error_abort));
 302    remaining = qlist_size(keypairs) / 2;
 303    assert(remaining);
 304
 305    while (remaining--) {
 306        name = qobject_to(QString, qlist_pop(keypairs));
 307        value = qobject_to(QString, qlist_pop(keypairs));
 308        assert(name && value);
 309        key = qstring_get_str(name);
 310
 311        ret = rados_conf_set(cluster, key, qstring_get_str(value));
 312        qobject_unref(value);
 313        if (ret < 0) {
 314            error_setg_errno(errp, -ret, "invalid conf option %s", key);
 315            qobject_unref(name);
 316            ret = -EINVAL;
 317            break;
 318        }
 319        qobject_unref(name);
 320    }
 321
 322    qobject_unref(keypairs);
 323    return ret;
 324}
 325
 326static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
 327{
 328    if (LIBRBD_USE_IOVEC) {
 329        RBDAIOCB *acb = rcb->acb;
 330        iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
 331                   acb->qiov->size - offs);
 332    } else {
 333        memset(rcb->buf + offs, 0, rcb->size - offs);
 334    }
 335}
 336
 337static QemuOptsList runtime_opts = {
 338    .name = "rbd",
 339    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
 340    .desc = {
 341        {
 342            .name = "pool",
 343            .type = QEMU_OPT_STRING,
 344            .help = "Rados pool name",
 345        },
 346        {
 347            .name = "image",
 348            .type = QEMU_OPT_STRING,
 349            .help = "Image name in the pool",
 350        },
 351        {
 352            .name = "conf",
 353            .type = QEMU_OPT_STRING,
 354            .help = "Rados config file location",
 355        },
 356        {
 357            .name = "snapshot",
 358            .type = QEMU_OPT_STRING,
 359            .help = "Ceph snapshot name",
 360        },
 361        {
 362            /* maps to 'id' in rados_create() */
 363            .name = "user",
 364            .type = QEMU_OPT_STRING,
 365            .help = "Rados id name",
 366        },
 367        /*
 368         * server.* extracted manually, see qemu_rbd_mon_host()
 369         */
 370        { /* end of list */ }
 371    },
 372};
 373
 374/* FIXME Deprecate and remove keypairs or make it available in QMP. */
 375static int qemu_rbd_do_create(BlockdevCreateOptions *options,
 376                              const char *keypairs, const char *password_secret,
 377                              Error **errp)
 378{
 379    BlockdevCreateOptionsRbd *opts = &options->u.rbd;
 380    rados_t cluster;
 381    rados_ioctx_t io_ctx;
 382    int obj_order = 0;
 383    int ret;
 384
 385    assert(options->driver == BLOCKDEV_DRIVER_RBD);
 386    if (opts->location->has_snapshot) {
 387        error_setg(errp, "Can't use snapshot name for image creation");
 388        return -EINVAL;
 389    }
 390
 391    if (opts->has_cluster_size) {
 392        int64_t objsize = opts->cluster_size;
 393        if ((objsize - 1) & objsize) {    /* not a power of 2? */
 394            error_setg(errp, "obj size needs to be power of 2");
 395            return -EINVAL;
 396        }
 397        if (objsize < 4096) {
 398            error_setg(errp, "obj size too small");
 399            return -EINVAL;
 400        }
 401        obj_order = ctz32(objsize);
 402    }
 403
 404    ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
 405                           password_secret, errp);
 406    if (ret < 0) {
 407        return ret;
 408    }
 409
 410    ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
 411    if (ret < 0) {
 412        error_setg_errno(errp, -ret, "error rbd create");
 413        goto out;
 414    }
 415
 416    ret = 0;
 417out:
 418    rados_ioctx_destroy(io_ctx);
 419    rados_shutdown(cluster);
 420    return ret;
 421}
 422
 423static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
 424{
 425    return qemu_rbd_do_create(options, NULL, NULL, errp);
 426}
 427
 428static int coroutine_fn qemu_rbd_co_create_opts(const char *filename,
 429                                                QemuOpts *opts,
 430                                                Error **errp)
 431{
 432    BlockdevCreateOptions *create_options;
 433    BlockdevCreateOptionsRbd *rbd_opts;
 434    BlockdevOptionsRbd *loc;
 435    Error *local_err = NULL;
 436    const char *keypairs, *password_secret;
 437    QDict *options = NULL;
 438    int ret = 0;
 439
 440    create_options = g_new0(BlockdevCreateOptions, 1);
 441    create_options->driver = BLOCKDEV_DRIVER_RBD;
 442    rbd_opts = &create_options->u.rbd;
 443
 444    rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
 445
 446    password_secret = qemu_opt_get(opts, "password-secret");
 447
 448    /* Read out options */
 449    rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
 450                              BDRV_SECTOR_SIZE);
 451    rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
 452                                                   BLOCK_OPT_CLUSTER_SIZE, 0);
 453    rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
 454
 455    options = qdict_new();
 456    qemu_rbd_parse_filename(filename, options, &local_err);
 457    if (local_err) {
 458        ret = -EINVAL;
 459        error_propagate(errp, local_err);
 460        goto exit;
 461    }
 462
 463    /*
 464     * Caution: while qdict_get_try_str() is fine, getting non-string
 465     * types would require more care.  When @options come from -blockdev
 466     * or blockdev_add, its members are typed according to the QAPI
 467     * schema, but when they come from -drive, they're all QString.
 468     */
 469    loc = rbd_opts->location;
 470    loc->pool     = g_strdup(qdict_get_try_str(options, "pool"));
 471    loc->conf     = g_strdup(qdict_get_try_str(options, "conf"));
 472    loc->has_conf = !!loc->conf;
 473    loc->user     = g_strdup(qdict_get_try_str(options, "user"));
 474    loc->has_user = !!loc->user;
 475    loc->image    = g_strdup(qdict_get_try_str(options, "image"));
 476    keypairs      = qdict_get_try_str(options, "=keyvalue-pairs");
 477
 478    ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
 479    if (ret < 0) {
 480        goto exit;
 481    }
 482
 483exit:
 484    qobject_unref(options);
 485    qapi_free_BlockdevCreateOptions(create_options);
 486    return ret;
 487}
 488
 489/*
 490 * This aio completion is being called from rbd_finish_bh() and runs in qemu
 491 * BH context.
 492 */
 493static void qemu_rbd_complete_aio(RADOSCB *rcb)
 494{
 495    RBDAIOCB *acb = rcb->acb;
 496    int64_t r;
 497
 498    r = rcb->ret;
 499
 500    if (acb->cmd != RBD_AIO_READ) {
 501        if (r < 0) {
 502            acb->ret = r;
 503            acb->error = 1;
 504        } else if (!acb->error) {
 505            acb->ret = rcb->size;
 506        }
 507    } else {
 508        if (r < 0) {
 509            qemu_rbd_memset(rcb, 0);
 510            acb->ret = r;
 511            acb->error = 1;
 512        } else if (r < rcb->size) {
 513            qemu_rbd_memset(rcb, r);
 514            if (!acb->error) {
 515                acb->ret = rcb->size;
 516            }
 517        } else if (!acb->error) {
 518            acb->ret = r;
 519        }
 520    }
 521
 522    g_free(rcb);
 523
 524    if (!LIBRBD_USE_IOVEC) {
 525        if (acb->cmd == RBD_AIO_READ) {
 526            qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 527        }
 528        qemu_vfree(acb->bounce);
 529    }
 530
 531    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 532
 533    qemu_aio_unref(acb);
 534}
 535
 536static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 537{
 538    const char **vals;
 539    const char *host, *port;
 540    char *rados_str;
 541    InetSocketAddressBaseList *p;
 542    int i, cnt;
 543
 544    if (!opts->has_server) {
 545        return NULL;
 546    }
 547
 548    for (cnt = 0, p = opts->server; p; p = p->next) {
 549        cnt++;
 550    }
 551
 552    vals = g_new(const char *, cnt + 1);
 553
 554    for (i = 0, p = opts->server; p; p = p->next, i++) {
 555        host = p->value->host;
 556        port = p->value->port;
 557
 558        if (strchr(host, ':')) {
 559            vals[i] = g_strdup_printf("[%s]:%s", host, port);
 560        } else {
 561            vals[i] = g_strdup_printf("%s:%s", host, port);
 562        }
 563    }
 564    vals[i] = NULL;
 565
 566    rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
 567    g_strfreev((char **)vals);
 568    return rados_str;
 569}
 570
 571static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 572                            BlockdevOptionsRbd *opts, bool cache,
 573                            const char *keypairs, const char *secretid,
 574                            Error **errp)
 575{
 576    char *mon_host = NULL;
 577    Error *local_err = NULL;
 578    int r;
 579
 580    if (secretid) {
 581        if (opts->key_secret) {
 582            error_setg(errp,
 583                       "Legacy 'password-secret' clashes with 'key-secret'");
 584            return -EINVAL;
 585        }
 586        opts->key_secret = g_strdup(secretid);
 587        opts->has_key_secret = true;
 588    }
 589
 590    mon_host = qemu_rbd_mon_host(opts, &local_err);
 591    if (local_err) {
 592        error_propagate(errp, local_err);
 593        r = -EINVAL;
 594        goto failed_opts;
 595    }
 596
 597    r = rados_create(cluster, opts->user);
 598    if (r < 0) {
 599        error_setg_errno(errp, -r, "error initializing");
 600        goto failed_opts;
 601    }
 602
 603    /* try default location when conf=NULL, but ignore failure */
 604    r = rados_conf_read_file(*cluster, opts->conf);
 605    if (opts->has_conf && r < 0) {
 606        error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
 607        goto failed_shutdown;
 608    }
 609
 610    r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
 611    if (r < 0) {
 612        goto failed_shutdown;
 613    }
 614
 615    if (mon_host) {
 616        r = rados_conf_set(*cluster, "mon_host", mon_host);
 617        if (r < 0) {
 618            goto failed_shutdown;
 619        }
 620    }
 621
 622    r = qemu_rbd_set_auth(*cluster, opts, errp);
 623    if (r < 0) {
 624        goto failed_shutdown;
 625    }
 626
 627    /*
 628     * Fallback to more conservative semantics if setting cache
 629     * options fails. Ignore errors from setting rbd_cache because the
 630     * only possible error is that the option does not exist, and
 631     * librbd defaults to no caching. If write through caching cannot
 632     * be set up, fall back to no caching.
 633     */
 634    if (cache) {
 635        rados_conf_set(*cluster, "rbd_cache", "true");
 636    } else {
 637        rados_conf_set(*cluster, "rbd_cache", "false");
 638    }
 639
 640    r = rados_connect(*cluster);
 641    if (r < 0) {
 642        error_setg_errno(errp, -r, "error connecting");
 643        goto failed_shutdown;
 644    }
 645
 646    r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
 647    if (r < 0) {
 648        error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
 649        goto failed_shutdown;
 650    }
 651
 652    return 0;
 653
 654failed_shutdown:
 655    rados_shutdown(*cluster);
 656failed_opts:
 657    g_free(mon_host);
 658    return r;
 659}
 660
 661static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
 662                                    Error **errp)
 663{
 664    Visitor *v;
 665    Error *local_err = NULL;
 666
 667    /* Convert the remaining options into a QAPI object */
 668    v = qobject_input_visitor_new_flat_confused(options, errp);
 669    if (!v) {
 670        return -EINVAL;
 671    }
 672
 673    visit_type_BlockdevOptionsRbd(v, NULL, opts, &local_err);
 674    visit_free(v);
 675
 676    if (local_err) {
 677        error_propagate(errp, local_err);
 678        return -EINVAL;
 679    }
 680
 681    return 0;
 682}
 683
 684static int qemu_rbd_attempt_legacy_options(QDict *options,
 685                                           BlockdevOptionsRbd **opts,
 686                                           char **keypairs)
 687{
 688    char *filename;
 689    int r;
 690
 691    filename = g_strdup(qdict_get_try_str(options, "filename"));
 692    if (!filename) {
 693        return -EINVAL;
 694    }
 695    qdict_del(options, "filename");
 696
 697    qemu_rbd_parse_filename(filename, options, NULL);
 698
 699    /* keypairs freed by caller */
 700    *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 701    if (*keypairs) {
 702        qdict_del(options, "=keyvalue-pairs");
 703    }
 704
 705    r = qemu_rbd_convert_options(options, opts, NULL);
 706
 707    g_free(filename);
 708    return r;
 709}
 710
 711static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
 712                         Error **errp)
 713{
 714    BDRVRBDState *s = bs->opaque;
 715    BlockdevOptionsRbd *opts = NULL;
 716    const QDictEntry *e;
 717    Error *local_err = NULL;
 718    char *keypairs, *secretid;
 719    int r;
 720
 721    keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 722    if (keypairs) {
 723        qdict_del(options, "=keyvalue-pairs");
 724    }
 725
 726    secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
 727    if (secretid) {
 728        qdict_del(options, "password-secret");
 729    }
 730
 731    r = qemu_rbd_convert_options(options, &opts, &local_err);
 732    if (local_err) {
 733        /* If keypairs are present, that means some options are present in
 734         * the modern option format.  Don't attempt to parse legacy option
 735         * formats, as we won't support mixed usage. */
 736        if (keypairs) {
 737            error_propagate(errp, local_err);
 738            goto out;
 739        }
 740
 741        /* If the initial attempt to convert and process the options failed,
 742         * we may be attempting to open an image file that has the rbd options
 743         * specified in the older format consisting of all key/value pairs
 744         * encoded in the filename.  Go ahead and attempt to parse the
 745         * filename, and see if we can pull out the required options. */
 746        r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
 747        if (r < 0) {
 748            /* Propagate the original error, not the legacy parsing fallback
 749             * error, as the latter was just a best-effort attempt. */
 750            error_propagate(errp, local_err);
 751            goto out;
 752        }
 753        /* Take care whenever deciding to actually deprecate; once this ability
 754         * is removed, we will not be able to open any images with legacy-styled
 755         * backing image strings. */
 756        warn_report("RBD options encoded in the filename as keyvalue pairs "
 757                    "is deprecated");
 758    }
 759
 760    /* Remove the processed options from the QDict (the visitor processes
 761     * _all_ options in the QDict) */
 762    while ((e = qdict_first(options))) {
 763        qdict_del(options, e->key);
 764    }
 765
 766    r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
 767                         !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
 768    if (r < 0) {
 769        goto out;
 770    }
 771
 772    s->snap = g_strdup(opts->snapshot);
 773    s->image_name = g_strdup(opts->image);
 774
 775    /* rbd_open is always r/w */
 776    r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
 777    if (r < 0) {
 778        error_setg_errno(errp, -r, "error reading header from %s",
 779                         s->image_name);
 780        goto failed_open;
 781    }
 782
 783    r = rbd_get_size(s->image, &s->image_size);
 784    if (r < 0) {
 785        error_setg_errno(errp, -r, "error getting image size from %s",
 786                         s->image_name);
 787        rbd_close(s->image);
 788        goto failed_open;
 789    }
 790
 791    /* If we are using an rbd snapshot, we must be r/o, otherwise
 792     * leave as-is */
 793    if (s->snap != NULL) {
 794        r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
 795        if (r < 0) {
 796            rbd_close(s->image);
 797            goto failed_open;
 798        }
 799    }
 800
 801    r = 0;
 802    goto out;
 803
 804failed_open:
 805    rados_ioctx_destroy(s->io_ctx);
 806    g_free(s->snap);
 807    g_free(s->image_name);
 808    rados_shutdown(s->cluster);
 809out:
 810    qapi_free_BlockdevOptionsRbd(opts);
 811    g_free(keypairs);
 812    g_free(secretid);
 813    return r;
 814}
 815
 816
 817/* Since RBD is currently always opened R/W via the API,
 818 * we just need to check if we are using a snapshot or not, in
 819 * order to determine if we will allow it to be R/W */
 820static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
 821                                   BlockReopenQueue *queue, Error **errp)
 822{
 823    BDRVRBDState *s = state->bs->opaque;
 824    int ret = 0;
 825
 826    if (s->snap && state->flags & BDRV_O_RDWR) {
 827        error_setg(errp,
 828                   "Cannot change node '%s' to r/w when using RBD snapshot",
 829                   bdrv_get_device_or_node_name(state->bs));
 830        ret = -EINVAL;
 831    }
 832
 833    return ret;
 834}
 835
 836static void qemu_rbd_close(BlockDriverState *bs)
 837{
 838    BDRVRBDState *s = bs->opaque;
 839
 840    rbd_close(s->image);
 841    rados_ioctx_destroy(s->io_ctx);
 842    g_free(s->snap);
 843    g_free(s->image_name);
 844    rados_shutdown(s->cluster);
 845}
 846
 847/* Resize the RBD image and update the 'image_size' with the current size */
 848static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
 849{
 850    BDRVRBDState *s = bs->opaque;
 851    int r;
 852
 853    r = rbd_resize(s->image, size);
 854    if (r < 0) {
 855        return r;
 856    }
 857
 858    s->image_size = size;
 859
 860    return 0;
 861}
 862
 863static const AIOCBInfo rbd_aiocb_info = {
 864    .aiocb_size = sizeof(RBDAIOCB),
 865};
 866
 867static void rbd_finish_bh(void *opaque)
 868{
 869    RADOSCB *rcb = opaque;
 870    qemu_rbd_complete_aio(rcb);
 871}
 872
 873/*
 874 * This is the callback function for rbd_aio_read and _write
 875 *
 876 * Note: this function is being called from a non qemu thread so
 877 * we need to be careful about what we do here. Generally we only
 878 * schedule a BH, and do the rest of the io completion handling
 879 * from rbd_finish_bh() which runs in a qemu context.
 880 */
 881static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 882{
 883    RBDAIOCB *acb = rcb->acb;
 884
 885    rcb->ret = rbd_aio_get_return_value(c);
 886    rbd_aio_release(c);
 887
 888    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
 889                                     rbd_finish_bh, rcb);
 890}
 891
 892static int rbd_aio_discard_wrapper(rbd_image_t image,
 893                                   uint64_t off,
 894                                   uint64_t len,
 895                                   rbd_completion_t comp)
 896{
 897#ifdef LIBRBD_SUPPORTS_DISCARD
 898    return rbd_aio_discard(image, off, len, comp);
 899#else
 900    return -ENOTSUP;
 901#endif
 902}
 903
 904static int rbd_aio_flush_wrapper(rbd_image_t image,
 905                                 rbd_completion_t comp)
 906{
 907#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 908    return rbd_aio_flush(image, comp);
 909#else
 910    return -ENOTSUP;
 911#endif
 912}
 913
 914static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 915                                 int64_t off,
 916                                 QEMUIOVector *qiov,
 917                                 int64_t size,
 918                                 BlockCompletionFunc *cb,
 919                                 void *opaque,
 920                                 RBDAIOCmd cmd)
 921{
 922    RBDAIOCB *acb;
 923    RADOSCB *rcb = NULL;
 924    rbd_completion_t c;
 925    int r;
 926
 927    BDRVRBDState *s = bs->opaque;
 928
 929    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
 930    acb->cmd = cmd;
 931    acb->qiov = qiov;
 932    assert(!qiov || qiov->size == size);
 933
 934    rcb = g_new(RADOSCB, 1);
 935
 936    if (!LIBRBD_USE_IOVEC) {
 937        if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 938            acb->bounce = NULL;
 939        } else {
 940            acb->bounce = qemu_try_blockalign(bs, qiov->size);
 941            if (acb->bounce == NULL) {
 942                goto failed;
 943            }
 944        }
 945        if (cmd == RBD_AIO_WRITE) {
 946            qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 947        }
 948        rcb->buf = acb->bounce;
 949    }
 950
 951    acb->ret = 0;
 952    acb->error = 0;
 953    acb->s = s;
 954
 955    rcb->acb = acb;
 956    rcb->s = acb->s;
 957    rcb->size = size;
 958    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
 959    if (r < 0) {
 960        goto failed;
 961    }
 962
 963    switch (cmd) {
 964    case RBD_AIO_WRITE: {
 965        /*
 966         * RBD APIs don't allow us to write more than actual size, so in order
 967         * to support growing images, we resize the image before write
 968         * operations that exceed the current size.
 969         */
 970        if (off + size > s->image_size) {
 971            r = qemu_rbd_resize(bs, off + size);
 972            if (r < 0) {
 973                goto failed_completion;
 974            }
 975        }
 976#ifdef LIBRBD_SUPPORTS_IOVEC
 977            r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
 978#else
 979            r = rbd_aio_write(s->image, off, size, rcb->buf, c);
 980#endif
 981        break;
 982    }
 983    case RBD_AIO_READ:
 984#ifdef LIBRBD_SUPPORTS_IOVEC
 985            r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
 986#else
 987            r = rbd_aio_read(s->image, off, size, rcb->buf, c);
 988#endif
 989        break;
 990    case RBD_AIO_DISCARD:
 991        r = rbd_aio_discard_wrapper(s->image, off, size, c);
 992        break;
 993    case RBD_AIO_FLUSH:
 994        r = rbd_aio_flush_wrapper(s->image, c);
 995        break;
 996    default:
 997        r = -EINVAL;
 998    }
 999
1000    if (r < 0) {
1001        goto failed_completion;
1002    }
1003    return &acb->common;
1004
1005failed_completion:
1006    rbd_aio_release(c);
1007failed:
1008    g_free(rcb);
1009    if (!LIBRBD_USE_IOVEC) {
1010        qemu_vfree(acb->bounce);
1011    }
1012
1013    qemu_aio_unref(acb);
1014    return NULL;
1015}
1016
1017static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
1018                                       uint64_t offset, uint64_t bytes,
1019                                       QEMUIOVector *qiov, int flags,
1020                                       BlockCompletionFunc *cb,
1021                                       void *opaque)
1022{
1023    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1024                         RBD_AIO_READ);
1025}
1026
1027static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
1028                                        uint64_t offset, uint64_t bytes,
1029                                        QEMUIOVector *qiov, int flags,
1030                                        BlockCompletionFunc *cb,
1031                                        void *opaque)
1032{
1033    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1034                         RBD_AIO_WRITE);
1035}
1036
1037#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1038static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
1039                                      BlockCompletionFunc *cb,
1040                                      void *opaque)
1041{
1042    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
1043}
1044
1045#else
1046
1047static int qemu_rbd_co_flush(BlockDriverState *bs)
1048{
1049#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
1050    /* rbd_flush added in 0.1.1 */
1051    BDRVRBDState *s = bs->opaque;
1052    return rbd_flush(s->image);
1053#else
1054    return 0;
1055#endif
1056}
1057#endif
1058
1059static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1060{
1061    BDRVRBDState *s = bs->opaque;
1062    rbd_image_info_t info;
1063    int r;
1064
1065    r = rbd_stat(s->image, &info, sizeof(info));
1066    if (r < 0) {
1067        return r;
1068    }
1069
1070    bdi->cluster_size = info.obj_size;
1071    return 0;
1072}
1073
1074static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1075{
1076    BDRVRBDState *s = bs->opaque;
1077    rbd_image_info_t info;
1078    int r;
1079
1080    r = rbd_stat(s->image, &info, sizeof(info));
1081    if (r < 0) {
1082        return r;
1083    }
1084
1085    return info.size;
1086}
1087
1088static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1089                                             int64_t offset,
1090                                             bool exact,
1091                                             PreallocMode prealloc,
1092                                             Error **errp)
1093{
1094    int r;
1095
1096    if (prealloc != PREALLOC_MODE_OFF) {
1097        error_setg(errp, "Unsupported preallocation mode '%s'",
1098                   PreallocMode_str(prealloc));
1099        return -ENOTSUP;
1100    }
1101
1102    r = qemu_rbd_resize(bs, offset);
1103    if (r < 0) {
1104        error_setg_errno(errp, -r, "Failed to resize file");
1105        return r;
1106    }
1107
1108    return 0;
1109}
1110
1111static int qemu_rbd_snap_create(BlockDriverState *bs,
1112                                QEMUSnapshotInfo *sn_info)
1113{
1114    BDRVRBDState *s = bs->opaque;
1115    int r;
1116
1117    if (sn_info->name[0] == '\0') {
1118        return -EINVAL; /* we need a name for rbd snapshots */
1119    }
1120
1121    /*
1122     * rbd snapshots are using the name as the user controlled unique identifier
1123     * we can't use the rbd snapid for that purpose, as it can't be set
1124     */
1125    if (sn_info->id_str[0] != '\0' &&
1126        strcmp(sn_info->id_str, sn_info->name) != 0) {
1127        return -EINVAL;
1128    }
1129
1130    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1131        return -ERANGE;
1132    }
1133
1134    r = rbd_snap_create(s->image, sn_info->name);
1135    if (r < 0) {
1136        error_report("failed to create snap: %s", strerror(-r));
1137        return r;
1138    }
1139
1140    return 0;
1141}
1142
1143static int qemu_rbd_snap_remove(BlockDriverState *bs,
1144                                const char *snapshot_id,
1145                                const char *snapshot_name,
1146                                Error **errp)
1147{
1148    BDRVRBDState *s = bs->opaque;
1149    int r;
1150
1151    if (!snapshot_name) {
1152        error_setg(errp, "rbd need a valid snapshot name");
1153        return -EINVAL;
1154    }
1155
1156    /* If snapshot_id is specified, it must be equal to name, see
1157       qemu_rbd_snap_list() */
1158    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1159        error_setg(errp,
1160                   "rbd do not support snapshot id, it should be NULL or "
1161                   "equal to snapshot name");
1162        return -EINVAL;
1163    }
1164
1165    r = rbd_snap_remove(s->image, snapshot_name);
1166    if (r < 0) {
1167        error_setg_errno(errp, -r, "Failed to remove the snapshot");
1168    }
1169    return r;
1170}
1171
1172static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1173                                  const char *snapshot_name)
1174{
1175    BDRVRBDState *s = bs->opaque;
1176
1177    return rbd_snap_rollback(s->image, snapshot_name);
1178}
1179
1180static int qemu_rbd_snap_list(BlockDriverState *bs,
1181                              QEMUSnapshotInfo **psn_tab)
1182{
1183    BDRVRBDState *s = bs->opaque;
1184    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1185    int i, snap_count;
1186    rbd_snap_info_t *snaps;
1187    int max_snaps = RBD_MAX_SNAPS;
1188
1189    do {
1190        snaps = g_new(rbd_snap_info_t, max_snaps);
1191        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1192        if (snap_count <= 0) {
1193            g_free(snaps);
1194        }
1195    } while (snap_count == -ERANGE);
1196
1197    if (snap_count <= 0) {
1198        goto done;
1199    }
1200
1201    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1202
1203    for (i = 0; i < snap_count; i++) {
1204        const char *snap_name = snaps[i].name;
1205
1206        sn_info = sn_tab + i;
1207        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1208        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1209
1210        sn_info->vm_state_size = snaps[i].size;
1211        sn_info->date_sec = 0;
1212        sn_info->date_nsec = 0;
1213        sn_info->vm_clock_nsec = 0;
1214    }
1215    rbd_snap_list_end(snaps);
1216    g_free(snaps);
1217
1218 done:
1219    *psn_tab = sn_tab;
1220    return snap_count;
1221}
1222
1223#ifdef LIBRBD_SUPPORTS_DISCARD
1224static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1225                                         int64_t offset,
1226                                         int bytes,
1227                                         BlockCompletionFunc *cb,
1228                                         void *opaque)
1229{
1230    return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
1231                         RBD_AIO_DISCARD);
1232}
1233#endif
1234
1235#ifdef LIBRBD_SUPPORTS_INVALIDATE
1236static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1237                                                      Error **errp)
1238{
1239    BDRVRBDState *s = bs->opaque;
1240    int r = rbd_invalidate_cache(s->image);
1241    if (r < 0) {
1242        error_setg_errno(errp, -r, "Failed to invalidate the cache");
1243    }
1244}
1245#endif
1246
1247static QemuOptsList qemu_rbd_create_opts = {
1248    .name = "rbd-create-opts",
1249    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1250    .desc = {
1251        {
1252            .name = BLOCK_OPT_SIZE,
1253            .type = QEMU_OPT_SIZE,
1254            .help = "Virtual disk size"
1255        },
1256        {
1257            .name = BLOCK_OPT_CLUSTER_SIZE,
1258            .type = QEMU_OPT_SIZE,
1259            .help = "RBD object size"
1260        },
1261        {
1262            .name = "password-secret",
1263            .type = QEMU_OPT_STRING,
1264            .help = "ID of secret providing the password",
1265        },
1266        { /* end of list */ }
1267    }
1268};
1269
1270static const char *const qemu_rbd_strong_runtime_opts[] = {
1271    "pool",
1272    "image",
1273    "conf",
1274    "snapshot",
1275    "user",
1276    "server.",
1277    "password-secret",
1278
1279    NULL
1280};
1281
1282static BlockDriver bdrv_rbd = {
1283    .format_name            = "rbd",
1284    .instance_size          = sizeof(BDRVRBDState),
1285    .bdrv_parse_filename    = qemu_rbd_parse_filename,
1286    .bdrv_refresh_limits    = qemu_rbd_refresh_limits,
1287    .bdrv_file_open         = qemu_rbd_open,
1288    .bdrv_close             = qemu_rbd_close,
1289    .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1290    .bdrv_co_create         = qemu_rbd_co_create,
1291    .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1292    .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1293    .bdrv_has_zero_init_truncate = bdrv_has_zero_init_1,
1294    .bdrv_get_info          = qemu_rbd_getinfo,
1295    .create_opts            = &qemu_rbd_create_opts,
1296    .bdrv_getlength         = qemu_rbd_getlength,
1297    .bdrv_co_truncate       = qemu_rbd_co_truncate,
1298    .protocol_name          = "rbd",
1299
1300    .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
1301    .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
1302
1303#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1304    .bdrv_aio_flush         = qemu_rbd_aio_flush,
1305#else
1306    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1307#endif
1308
1309#ifdef LIBRBD_SUPPORTS_DISCARD
1310    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1311#endif
1312
1313    .bdrv_snapshot_create   = qemu_rbd_snap_create,
1314    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1315    .bdrv_snapshot_list     = qemu_rbd_snap_list,
1316    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1317#ifdef LIBRBD_SUPPORTS_INVALIDATE
1318    .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1319#endif
1320
1321    .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1322};
1323
1324static void bdrv_rbd_init(void)
1325{
1326    bdrv_register(&bdrv_rbd);
1327}
1328
1329block_init(bdrv_rbd_init);
1330