qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include "qemu/osdep.h"
  15
  16#include <rbd/librbd.h>
  17#include "qapi/error.h"
  18#include "qemu/error-report.h"
  19#include "qemu/module.h"
  20#include "qemu/option.h"
  21#include "block/block_int.h"
  22#include "block/qdict.h"
  23#include "crypto/secret.h"
  24#include "qemu/cutils.h"
  25#include "sysemu/replay.h"
  26#include "qapi/qmp/qstring.h"
  27#include "qapi/qmp/qdict.h"
  28#include "qapi/qmp/qjson.h"
  29#include "qapi/qmp/qlist.h"
  30#include "qapi/qobject-input-visitor.h"
  31#include "qapi/qapi-visit-block-core.h"
  32
  33/*
  34 * When specifying the image filename use:
  35 *
  36 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  37 *
  38 * poolname must be the name of an existing rados pool.
  39 *
  40 * devicename is the name of the rbd image.
  41 *
  42 * Each option given is used to configure rados, and may be any valid
  43 * Ceph option, "id", or "conf".
  44 *
  45 * The "id" option indicates what user we should authenticate as to
  46 * the Ceph cluster.  If it is excluded we will use the Ceph default
  47 * (normally 'admin').
  48 *
  49 * The "conf" option specifies a Ceph configuration file to read.  If
  50 * it is not specified, we will read from the default Ceph locations
  51 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  52 * file, specify conf=/dev/null.
  53 *
  54 * Configuration values containing :, @, or = can be escaped with a
  55 * leading "\".
  56 */
  57
  58/* rbd_aio_discard added in 0.1.2 */
  59#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
  60#define LIBRBD_SUPPORTS_DISCARD
  61#else
  62#undef LIBRBD_SUPPORTS_DISCARD
  63#endif
  64
  65#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  66
  67#define RBD_MAX_SNAPS 100
  68
  69/* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
  70#ifdef LIBRBD_SUPPORTS_IOVEC
  71#define LIBRBD_USE_IOVEC 1
  72#else
  73#define LIBRBD_USE_IOVEC 0
  74#endif
  75
  76typedef enum {
  77    RBD_AIO_READ,
  78    RBD_AIO_WRITE,
  79    RBD_AIO_DISCARD,
  80    RBD_AIO_FLUSH
  81} RBDAIOCmd;
  82
  83typedef struct RBDAIOCB {
  84    BlockAIOCB common;
  85    int64_t ret;
  86    QEMUIOVector *qiov;
  87    char *bounce;
  88    RBDAIOCmd cmd;
  89    int error;
  90    struct BDRVRBDState *s;
  91} RBDAIOCB;
  92
  93typedef struct RADOSCB {
  94    RBDAIOCB *acb;
  95    struct BDRVRBDState *s;
  96    int64_t size;
  97    char *buf;
  98    int64_t ret;
  99} RADOSCB;
 100
 101typedef struct BDRVRBDState {
 102    rados_t cluster;
 103    rados_ioctx_t io_ctx;
 104    rbd_image_t image;
 105    char *image_name;
 106    char *snap;
 107    char *namespace;
 108    uint64_t image_size;
 109} BDRVRBDState;
 110
 111static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 112                            BlockdevOptionsRbd *opts, bool cache,
 113                            const char *keypairs, const char *secretid,
 114                            Error **errp);
 115
 116static char *qemu_rbd_next_tok(char *src, char delim, char **p)
 117{
 118    char *end;
 119
 120    *p = NULL;
 121
 122    for (end = src; *end; ++end) {
 123        if (*end == delim) {
 124            break;
 125        }
 126        if (*end == '\\' && end[1] != '\0') {
 127            end++;
 128        }
 129    }
 130    if (*end == delim) {
 131        *p = end + 1;
 132        *end = '\0';
 133    }
 134    return src;
 135}
 136
 137static void qemu_rbd_unescape(char *src)
 138{
 139    char *p;
 140
 141    for (p = src; *src; ++src, ++p) {
 142        if (*src == '\\' && src[1] != '\0') {
 143            src++;
 144        }
 145        *p = *src;
 146    }
 147    *p = '\0';
 148}
 149
 150static void qemu_rbd_parse_filename(const char *filename, QDict *options,
 151                                    Error **errp)
 152{
 153    const char *start;
 154    char *p, *buf;
 155    QList *keypairs = NULL;
 156    char *found_str, *image_name;
 157
 158    if (!strstart(filename, "rbd:", &start)) {
 159        error_setg(errp, "File name must start with 'rbd:'");
 160        return;
 161    }
 162
 163    buf = g_strdup(start);
 164    p = buf;
 165
 166    found_str = qemu_rbd_next_tok(p, '/', &p);
 167    if (!p) {
 168        error_setg(errp, "Pool name is required");
 169        goto done;
 170    }
 171    qemu_rbd_unescape(found_str);
 172    qdict_put_str(options, "pool", found_str);
 173
 174    if (strchr(p, '@')) {
 175        image_name = qemu_rbd_next_tok(p, '@', &p);
 176
 177        found_str = qemu_rbd_next_tok(p, ':', &p);
 178        qemu_rbd_unescape(found_str);
 179        qdict_put_str(options, "snapshot", found_str);
 180    } else {
 181        image_name = qemu_rbd_next_tok(p, ':', &p);
 182    }
 183    /* Check for namespace in the image_name */
 184    if (strchr(image_name, '/')) {
 185        found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
 186        qemu_rbd_unescape(found_str);
 187        qdict_put_str(options, "namespace", found_str);
 188    } else {
 189        qdict_put_str(options, "namespace", "");
 190    }
 191    qemu_rbd_unescape(image_name);
 192    qdict_put_str(options, "image", image_name);
 193    if (!p) {
 194        goto done;
 195    }
 196
 197    /* The following are essentially all key/value pairs, and we treat
 198     * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
 199    while (p) {
 200        char *name, *value;
 201        name = qemu_rbd_next_tok(p, '=', &p);
 202        if (!p) {
 203            error_setg(errp, "conf option %s has no value", name);
 204            break;
 205        }
 206
 207        qemu_rbd_unescape(name);
 208
 209        value = qemu_rbd_next_tok(p, ':', &p);
 210        qemu_rbd_unescape(value);
 211
 212        if (!strcmp(name, "conf")) {
 213            qdict_put_str(options, "conf", value);
 214        } else if (!strcmp(name, "id")) {
 215            qdict_put_str(options, "user", value);
 216        } else {
 217            /*
 218             * We pass these internally to qemu_rbd_set_keypairs(), so
 219             * we can get away with the simpler list of [ "key1",
 220             * "value1", "key2", "value2" ] rather than a raw dict
 221             * { "key1": "value1", "key2": "value2" } where we can't
 222             * guarantee order, or even a more correct but complex
 223             * [ { "key1": "value1" }, { "key2": "value2" } ]
 224             */
 225            if (!keypairs) {
 226                keypairs = qlist_new();
 227            }
 228            qlist_append_str(keypairs, name);
 229            qlist_append_str(keypairs, value);
 230        }
 231    }
 232
 233    if (keypairs) {
 234        qdict_put(options, "=keyvalue-pairs",
 235                  qobject_to_json(QOBJECT(keypairs)));
 236    }
 237
 238done:
 239    g_free(buf);
 240    qobject_unref(keypairs);
 241    return;
 242}
 243
 244
 245static void qemu_rbd_refresh_limits(BlockDriverState *bs, Error **errp)
 246{
 247    /* XXX Does RBD support AIO on less than 512-byte alignment? */
 248    bs->bl.request_alignment = 512;
 249}
 250
 251
 252static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
 253                             Error **errp)
 254{
 255    char *key, *acr;
 256    int r;
 257    GString *accu;
 258    RbdAuthModeList *auth;
 259
 260    if (opts->key_secret) {
 261        key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
 262        if (!key) {
 263            return -EIO;
 264        }
 265        r = rados_conf_set(cluster, "key", key);
 266        g_free(key);
 267        if (r < 0) {
 268            error_setg_errno(errp, -r, "Could not set 'key'");
 269            return r;
 270        }
 271    }
 272
 273    if (opts->has_auth_client_required) {
 274        accu = g_string_new("");
 275        for (auth = opts->auth_client_required; auth; auth = auth->next) {
 276            if (accu->str[0]) {
 277                g_string_append_c(accu, ';');
 278            }
 279            g_string_append(accu, RbdAuthMode_str(auth->value));
 280        }
 281        acr = g_string_free(accu, FALSE);
 282        r = rados_conf_set(cluster, "auth_client_required", acr);
 283        g_free(acr);
 284        if (r < 0) {
 285            error_setg_errno(errp, -r,
 286                             "Could not set 'auth_client_required'");
 287            return r;
 288        }
 289    }
 290
 291    return 0;
 292}
 293
 294static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
 295                                 Error **errp)
 296{
 297    QList *keypairs;
 298    QString *name;
 299    QString *value;
 300    const char *key;
 301    size_t remaining;
 302    int ret = 0;
 303
 304    if (!keypairs_json) {
 305        return ret;
 306    }
 307    keypairs = qobject_to(QList,
 308                          qobject_from_json(keypairs_json, &error_abort));
 309    remaining = qlist_size(keypairs) / 2;
 310    assert(remaining);
 311
 312    while (remaining--) {
 313        name = qobject_to(QString, qlist_pop(keypairs));
 314        value = qobject_to(QString, qlist_pop(keypairs));
 315        assert(name && value);
 316        key = qstring_get_str(name);
 317
 318        ret = rados_conf_set(cluster, key, qstring_get_str(value));
 319        qobject_unref(value);
 320        if (ret < 0) {
 321            error_setg_errno(errp, -ret, "invalid conf option %s", key);
 322            qobject_unref(name);
 323            ret = -EINVAL;
 324            break;
 325        }
 326        qobject_unref(name);
 327    }
 328
 329    qobject_unref(keypairs);
 330    return ret;
 331}
 332
 333static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
 334{
 335    if (LIBRBD_USE_IOVEC) {
 336        RBDAIOCB *acb = rcb->acb;
 337        iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
 338                   acb->qiov->size - offs);
 339    } else {
 340        memset(rcb->buf + offs, 0, rcb->size - offs);
 341    }
 342}
 343
 344static QemuOptsList runtime_opts = {
 345    .name = "rbd",
 346    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
 347    .desc = {
 348        {
 349            .name = "pool",
 350            .type = QEMU_OPT_STRING,
 351            .help = "Rados pool name",
 352        },
 353        {
 354            .name = "namespace",
 355            .type = QEMU_OPT_STRING,
 356            .help = "Rados namespace name in the pool",
 357        },
 358        {
 359            .name = "image",
 360            .type = QEMU_OPT_STRING,
 361            .help = "Image name in the pool",
 362        },
 363        {
 364            .name = "conf",
 365            .type = QEMU_OPT_STRING,
 366            .help = "Rados config file location",
 367        },
 368        {
 369            .name = "snapshot",
 370            .type = QEMU_OPT_STRING,
 371            .help = "Ceph snapshot name",
 372        },
 373        {
 374            /* maps to 'id' in rados_create() */
 375            .name = "user",
 376            .type = QEMU_OPT_STRING,
 377            .help = "Rados id name",
 378        },
 379        /*
 380         * server.* extracted manually, see qemu_rbd_mon_host()
 381         */
 382        { /* end of list */ }
 383    },
 384};
 385
 386/* FIXME Deprecate and remove keypairs or make it available in QMP. */
 387static int qemu_rbd_do_create(BlockdevCreateOptions *options,
 388                              const char *keypairs, const char *password_secret,
 389                              Error **errp)
 390{
 391    BlockdevCreateOptionsRbd *opts = &options->u.rbd;
 392    rados_t cluster;
 393    rados_ioctx_t io_ctx;
 394    int obj_order = 0;
 395    int ret;
 396
 397    assert(options->driver == BLOCKDEV_DRIVER_RBD);
 398    if (opts->location->has_snapshot) {
 399        error_setg(errp, "Can't use snapshot name for image creation");
 400        return -EINVAL;
 401    }
 402
 403    if (opts->has_cluster_size) {
 404        int64_t objsize = opts->cluster_size;
 405        if ((objsize - 1) & objsize) {    /* not a power of 2? */
 406            error_setg(errp, "obj size needs to be power of 2");
 407            return -EINVAL;
 408        }
 409        if (objsize < 4096) {
 410            error_setg(errp, "obj size too small");
 411            return -EINVAL;
 412        }
 413        obj_order = ctz32(objsize);
 414    }
 415
 416    ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
 417                           password_secret, errp);
 418    if (ret < 0) {
 419        return ret;
 420    }
 421
 422    ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
 423    if (ret < 0) {
 424        error_setg_errno(errp, -ret, "error rbd create");
 425        goto out;
 426    }
 427
 428    ret = 0;
 429out:
 430    rados_ioctx_destroy(io_ctx);
 431    rados_shutdown(cluster);
 432    return ret;
 433}
 434
 435static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
 436{
 437    return qemu_rbd_do_create(options, NULL, NULL, errp);
 438}
 439
 440static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
 441                                                const char *filename,
 442                                                QemuOpts *opts,
 443                                                Error **errp)
 444{
 445    BlockdevCreateOptions *create_options;
 446    BlockdevCreateOptionsRbd *rbd_opts;
 447    BlockdevOptionsRbd *loc;
 448    Error *local_err = NULL;
 449    const char *keypairs, *password_secret;
 450    QDict *options = NULL;
 451    int ret = 0;
 452
 453    create_options = g_new0(BlockdevCreateOptions, 1);
 454    create_options->driver = BLOCKDEV_DRIVER_RBD;
 455    rbd_opts = &create_options->u.rbd;
 456
 457    rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
 458
 459    password_secret = qemu_opt_get(opts, "password-secret");
 460
 461    /* Read out options */
 462    rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
 463                              BDRV_SECTOR_SIZE);
 464    rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
 465                                                   BLOCK_OPT_CLUSTER_SIZE, 0);
 466    rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
 467
 468    options = qdict_new();
 469    qemu_rbd_parse_filename(filename, options, &local_err);
 470    if (local_err) {
 471        ret = -EINVAL;
 472        error_propagate(errp, local_err);
 473        goto exit;
 474    }
 475
 476    /*
 477     * Caution: while qdict_get_try_str() is fine, getting non-string
 478     * types would require more care.  When @options come from -blockdev
 479     * or blockdev_add, its members are typed according to the QAPI
 480     * schema, but when they come from -drive, they're all QString.
 481     */
 482    loc = rbd_opts->location;
 483    loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
 484    loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
 485    loc->has_conf    = !!loc->conf;
 486    loc->user        = g_strdup(qdict_get_try_str(options, "user"));
 487    loc->has_user    = !!loc->user;
 488    loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
 489    loc->image       = g_strdup(qdict_get_try_str(options, "image"));
 490    keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
 491
 492    ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
 493    if (ret < 0) {
 494        goto exit;
 495    }
 496
 497exit:
 498    qobject_unref(options);
 499    qapi_free_BlockdevCreateOptions(create_options);
 500    return ret;
 501}
 502
 503/*
 504 * This aio completion is being called from rbd_finish_bh() and runs in qemu
 505 * BH context.
 506 */
 507static void qemu_rbd_complete_aio(RADOSCB *rcb)
 508{
 509    RBDAIOCB *acb = rcb->acb;
 510    int64_t r;
 511
 512    r = rcb->ret;
 513
 514    if (acb->cmd != RBD_AIO_READ) {
 515        if (r < 0) {
 516            acb->ret = r;
 517            acb->error = 1;
 518        } else if (!acb->error) {
 519            acb->ret = rcb->size;
 520        }
 521    } else {
 522        if (r < 0) {
 523            qemu_rbd_memset(rcb, 0);
 524            acb->ret = r;
 525            acb->error = 1;
 526        } else if (r < rcb->size) {
 527            qemu_rbd_memset(rcb, r);
 528            if (!acb->error) {
 529                acb->ret = rcb->size;
 530            }
 531        } else if (!acb->error) {
 532            acb->ret = r;
 533        }
 534    }
 535
 536    g_free(rcb);
 537
 538    if (!LIBRBD_USE_IOVEC) {
 539        if (acb->cmd == RBD_AIO_READ) {
 540            qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 541        }
 542        qemu_vfree(acb->bounce);
 543    }
 544
 545    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 546
 547    qemu_aio_unref(acb);
 548}
 549
 550static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 551{
 552    const char **vals;
 553    const char *host, *port;
 554    char *rados_str;
 555    InetSocketAddressBaseList *p;
 556    int i, cnt;
 557
 558    if (!opts->has_server) {
 559        return NULL;
 560    }
 561
 562    for (cnt = 0, p = opts->server; p; p = p->next) {
 563        cnt++;
 564    }
 565
 566    vals = g_new(const char *, cnt + 1);
 567
 568    for (i = 0, p = opts->server; p; p = p->next, i++) {
 569        host = p->value->host;
 570        port = p->value->port;
 571
 572        if (strchr(host, ':')) {
 573            vals[i] = g_strdup_printf("[%s]:%s", host, port);
 574        } else {
 575            vals[i] = g_strdup_printf("%s:%s", host, port);
 576        }
 577    }
 578    vals[i] = NULL;
 579
 580    rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
 581    g_strfreev((char **)vals);
 582    return rados_str;
 583}
 584
 585static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 586                            BlockdevOptionsRbd *opts, bool cache,
 587                            const char *keypairs, const char *secretid,
 588                            Error **errp)
 589{
 590    char *mon_host = NULL;
 591    Error *local_err = NULL;
 592    int r;
 593
 594    if (secretid) {
 595        if (opts->key_secret) {
 596            error_setg(errp,
 597                       "Legacy 'password-secret' clashes with 'key-secret'");
 598            return -EINVAL;
 599        }
 600        opts->key_secret = g_strdup(secretid);
 601        opts->has_key_secret = true;
 602    }
 603
 604    mon_host = qemu_rbd_mon_host(opts, &local_err);
 605    if (local_err) {
 606        error_propagate(errp, local_err);
 607        r = -EINVAL;
 608        goto failed_opts;
 609    }
 610
 611    r = rados_create(cluster, opts->user);
 612    if (r < 0) {
 613        error_setg_errno(errp, -r, "error initializing");
 614        goto failed_opts;
 615    }
 616
 617    /* try default location when conf=NULL, but ignore failure */
 618    r = rados_conf_read_file(*cluster, opts->conf);
 619    if (opts->has_conf && r < 0) {
 620        error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
 621        goto failed_shutdown;
 622    }
 623
 624    r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
 625    if (r < 0) {
 626        goto failed_shutdown;
 627    }
 628
 629    if (mon_host) {
 630        r = rados_conf_set(*cluster, "mon_host", mon_host);
 631        if (r < 0) {
 632            goto failed_shutdown;
 633        }
 634    }
 635
 636    r = qemu_rbd_set_auth(*cluster, opts, errp);
 637    if (r < 0) {
 638        goto failed_shutdown;
 639    }
 640
 641    /*
 642     * Fallback to more conservative semantics if setting cache
 643     * options fails. Ignore errors from setting rbd_cache because the
 644     * only possible error is that the option does not exist, and
 645     * librbd defaults to no caching. If write through caching cannot
 646     * be set up, fall back to no caching.
 647     */
 648    if (cache) {
 649        rados_conf_set(*cluster, "rbd_cache", "true");
 650    } else {
 651        rados_conf_set(*cluster, "rbd_cache", "false");
 652    }
 653
 654    r = rados_connect(*cluster);
 655    if (r < 0) {
 656        error_setg_errno(errp, -r, "error connecting");
 657        goto failed_shutdown;
 658    }
 659
 660    r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
 661    if (r < 0) {
 662        error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
 663        goto failed_shutdown;
 664    }
 665    /*
 666     * Set the namespace after opening the io context on the pool,
 667     * if nspace == NULL or if nspace == "", it is just as we did nothing
 668     */
 669    rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
 670
 671    return 0;
 672
 673failed_shutdown:
 674    rados_shutdown(*cluster);
 675failed_opts:
 676    g_free(mon_host);
 677    return r;
 678}
 679
 680static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
 681                                    Error **errp)
 682{
 683    Visitor *v;
 684
 685    /* Convert the remaining options into a QAPI object */
 686    v = qobject_input_visitor_new_flat_confused(options, errp);
 687    if (!v) {
 688        return -EINVAL;
 689    }
 690
 691    visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
 692    visit_free(v);
 693    if (!opts) {
 694        return -EINVAL;
 695    }
 696
 697    return 0;
 698}
 699
 700static int qemu_rbd_attempt_legacy_options(QDict *options,
 701                                           BlockdevOptionsRbd **opts,
 702                                           char **keypairs)
 703{
 704    char *filename;
 705    int r;
 706
 707    filename = g_strdup(qdict_get_try_str(options, "filename"));
 708    if (!filename) {
 709        return -EINVAL;
 710    }
 711    qdict_del(options, "filename");
 712
 713    qemu_rbd_parse_filename(filename, options, NULL);
 714
 715    /* keypairs freed by caller */
 716    *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 717    if (*keypairs) {
 718        qdict_del(options, "=keyvalue-pairs");
 719    }
 720
 721    r = qemu_rbd_convert_options(options, opts, NULL);
 722
 723    g_free(filename);
 724    return r;
 725}
 726
 727static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
 728                         Error **errp)
 729{
 730    BDRVRBDState *s = bs->opaque;
 731    BlockdevOptionsRbd *opts = NULL;
 732    const QDictEntry *e;
 733    Error *local_err = NULL;
 734    char *keypairs, *secretid;
 735    int r;
 736
 737    keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 738    if (keypairs) {
 739        qdict_del(options, "=keyvalue-pairs");
 740    }
 741
 742    secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
 743    if (secretid) {
 744        qdict_del(options, "password-secret");
 745    }
 746
 747    r = qemu_rbd_convert_options(options, &opts, &local_err);
 748    if (local_err) {
 749        /* If keypairs are present, that means some options are present in
 750         * the modern option format.  Don't attempt to parse legacy option
 751         * formats, as we won't support mixed usage. */
 752        if (keypairs) {
 753            error_propagate(errp, local_err);
 754            goto out;
 755        }
 756
 757        /* If the initial attempt to convert and process the options failed,
 758         * we may be attempting to open an image file that has the rbd options
 759         * specified in the older format consisting of all key/value pairs
 760         * encoded in the filename.  Go ahead and attempt to parse the
 761         * filename, and see if we can pull out the required options. */
 762        r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
 763        if (r < 0) {
 764            /* Propagate the original error, not the legacy parsing fallback
 765             * error, as the latter was just a best-effort attempt. */
 766            error_propagate(errp, local_err);
 767            goto out;
 768        }
 769        /* Take care whenever deciding to actually deprecate; once this ability
 770         * is removed, we will not be able to open any images with legacy-styled
 771         * backing image strings. */
 772        warn_report("RBD options encoded in the filename as keyvalue pairs "
 773                    "is deprecated");
 774    }
 775
 776    /* Remove the processed options from the QDict (the visitor processes
 777     * _all_ options in the QDict) */
 778    while ((e = qdict_first(options))) {
 779        qdict_del(options, e->key);
 780    }
 781
 782    r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
 783                         !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
 784    if (r < 0) {
 785        goto out;
 786    }
 787
 788    s->snap = g_strdup(opts->snapshot);
 789    s->image_name = g_strdup(opts->image);
 790
 791    /* rbd_open is always r/w */
 792    r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
 793    if (r < 0) {
 794        error_setg_errno(errp, -r, "error reading header from %s",
 795                         s->image_name);
 796        goto failed_open;
 797    }
 798
 799    r = rbd_get_size(s->image, &s->image_size);
 800    if (r < 0) {
 801        error_setg_errno(errp, -r, "error getting image size from %s",
 802                         s->image_name);
 803        rbd_close(s->image);
 804        goto failed_open;
 805    }
 806
 807    /* If we are using an rbd snapshot, we must be r/o, otherwise
 808     * leave as-is */
 809    if (s->snap != NULL) {
 810        r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
 811        if (r < 0) {
 812            rbd_close(s->image);
 813            goto failed_open;
 814        }
 815    }
 816
 817    /* When extending regular files, we get zeros from the OS */
 818    bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
 819
 820    r = 0;
 821    goto out;
 822
 823failed_open:
 824    rados_ioctx_destroy(s->io_ctx);
 825    g_free(s->snap);
 826    g_free(s->image_name);
 827    rados_shutdown(s->cluster);
 828out:
 829    qapi_free_BlockdevOptionsRbd(opts);
 830    g_free(keypairs);
 831    g_free(secretid);
 832    return r;
 833}
 834
 835
 836/* Since RBD is currently always opened R/W via the API,
 837 * we just need to check if we are using a snapshot or not, in
 838 * order to determine if we will allow it to be R/W */
 839static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
 840                                   BlockReopenQueue *queue, Error **errp)
 841{
 842    BDRVRBDState *s = state->bs->opaque;
 843    int ret = 0;
 844
 845    if (s->snap && state->flags & BDRV_O_RDWR) {
 846        error_setg(errp,
 847                   "Cannot change node '%s' to r/w when using RBD snapshot",
 848                   bdrv_get_device_or_node_name(state->bs));
 849        ret = -EINVAL;
 850    }
 851
 852    return ret;
 853}
 854
 855static void qemu_rbd_close(BlockDriverState *bs)
 856{
 857    BDRVRBDState *s = bs->opaque;
 858
 859    rbd_close(s->image);
 860    rados_ioctx_destroy(s->io_ctx);
 861    g_free(s->snap);
 862    g_free(s->image_name);
 863    rados_shutdown(s->cluster);
 864}
 865
 866/* Resize the RBD image and update the 'image_size' with the current size */
 867static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
 868{
 869    BDRVRBDState *s = bs->opaque;
 870    int r;
 871
 872    r = rbd_resize(s->image, size);
 873    if (r < 0) {
 874        return r;
 875    }
 876
 877    s->image_size = size;
 878
 879    return 0;
 880}
 881
 882static const AIOCBInfo rbd_aiocb_info = {
 883    .aiocb_size = sizeof(RBDAIOCB),
 884};
 885
 886static void rbd_finish_bh(void *opaque)
 887{
 888    RADOSCB *rcb = opaque;
 889    qemu_rbd_complete_aio(rcb);
 890}
 891
 892/*
 893 * This is the callback function for rbd_aio_read and _write
 894 *
 895 * Note: this function is being called from a non qemu thread so
 896 * we need to be careful about what we do here. Generally we only
 897 * schedule a BH, and do the rest of the io completion handling
 898 * from rbd_finish_bh() which runs in a qemu context.
 899 */
 900static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 901{
 902    RBDAIOCB *acb = rcb->acb;
 903
 904    rcb->ret = rbd_aio_get_return_value(c);
 905    rbd_aio_release(c);
 906
 907    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
 908                                     rbd_finish_bh, rcb);
 909}
 910
 911static int rbd_aio_discard_wrapper(rbd_image_t image,
 912                                   uint64_t off,
 913                                   uint64_t len,
 914                                   rbd_completion_t comp)
 915{
 916#ifdef LIBRBD_SUPPORTS_DISCARD
 917    return rbd_aio_discard(image, off, len, comp);
 918#else
 919    return -ENOTSUP;
 920#endif
 921}
 922
 923static int rbd_aio_flush_wrapper(rbd_image_t image,
 924                                 rbd_completion_t comp)
 925{
 926#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 927    return rbd_aio_flush(image, comp);
 928#else
 929    return -ENOTSUP;
 930#endif
 931}
 932
 933static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 934                                 int64_t off,
 935                                 QEMUIOVector *qiov,
 936                                 int64_t size,
 937                                 BlockCompletionFunc *cb,
 938                                 void *opaque,
 939                                 RBDAIOCmd cmd)
 940{
 941    RBDAIOCB *acb;
 942    RADOSCB *rcb = NULL;
 943    rbd_completion_t c;
 944    int r;
 945
 946    BDRVRBDState *s = bs->opaque;
 947
 948    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
 949    acb->cmd = cmd;
 950    acb->qiov = qiov;
 951    assert(!qiov || qiov->size == size);
 952
 953    rcb = g_new(RADOSCB, 1);
 954
 955    if (!LIBRBD_USE_IOVEC) {
 956        if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 957            acb->bounce = NULL;
 958        } else {
 959            acb->bounce = qemu_try_blockalign(bs, qiov->size);
 960            if (acb->bounce == NULL) {
 961                goto failed;
 962            }
 963        }
 964        if (cmd == RBD_AIO_WRITE) {
 965            qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 966        }
 967        rcb->buf = acb->bounce;
 968    }
 969
 970    acb->ret = 0;
 971    acb->error = 0;
 972    acb->s = s;
 973
 974    rcb->acb = acb;
 975    rcb->s = acb->s;
 976    rcb->size = size;
 977    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
 978    if (r < 0) {
 979        goto failed;
 980    }
 981
 982    switch (cmd) {
 983    case RBD_AIO_WRITE: {
 984        /*
 985         * RBD APIs don't allow us to write more than actual size, so in order
 986         * to support growing images, we resize the image before write
 987         * operations that exceed the current size.
 988         */
 989        if (off + size > s->image_size) {
 990            r = qemu_rbd_resize(bs, off + size);
 991            if (r < 0) {
 992                goto failed_completion;
 993            }
 994        }
 995#ifdef LIBRBD_SUPPORTS_IOVEC
 996            r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
 997#else
 998            r = rbd_aio_write(s->image, off, size, rcb->buf, c);
 999#endif
1000        break;
1001    }
1002    case RBD_AIO_READ:
1003#ifdef LIBRBD_SUPPORTS_IOVEC
1004            r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
1005#else
1006            r = rbd_aio_read(s->image, off, size, rcb->buf, c);
1007#endif
1008        break;
1009    case RBD_AIO_DISCARD:
1010        r = rbd_aio_discard_wrapper(s->image, off, size, c);
1011        break;
1012    case RBD_AIO_FLUSH:
1013        r = rbd_aio_flush_wrapper(s->image, c);
1014        break;
1015    default:
1016        r = -EINVAL;
1017    }
1018
1019    if (r < 0) {
1020        goto failed_completion;
1021    }
1022    return &acb->common;
1023
1024failed_completion:
1025    rbd_aio_release(c);
1026failed:
1027    g_free(rcb);
1028    if (!LIBRBD_USE_IOVEC) {
1029        qemu_vfree(acb->bounce);
1030    }
1031
1032    qemu_aio_unref(acb);
1033    return NULL;
1034}
1035
1036static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
1037                                       uint64_t offset, uint64_t bytes,
1038                                       QEMUIOVector *qiov, int flags,
1039                                       BlockCompletionFunc *cb,
1040                                       void *opaque)
1041{
1042    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1043                         RBD_AIO_READ);
1044}
1045
1046static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
1047                                        uint64_t offset, uint64_t bytes,
1048                                        QEMUIOVector *qiov, int flags,
1049                                        BlockCompletionFunc *cb,
1050                                        void *opaque)
1051{
1052    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1053                         RBD_AIO_WRITE);
1054}
1055
1056#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1057static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
1058                                      BlockCompletionFunc *cb,
1059                                      void *opaque)
1060{
1061    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
1062}
1063
1064#else
1065
1066static int qemu_rbd_co_flush(BlockDriverState *bs)
1067{
1068#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
1069    /* rbd_flush added in 0.1.1 */
1070    BDRVRBDState *s = bs->opaque;
1071    return rbd_flush(s->image);
1072#else
1073    return 0;
1074#endif
1075}
1076#endif
1077
1078static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1079{
1080    BDRVRBDState *s = bs->opaque;
1081    rbd_image_info_t info;
1082    int r;
1083
1084    r = rbd_stat(s->image, &info, sizeof(info));
1085    if (r < 0) {
1086        return r;
1087    }
1088
1089    bdi->cluster_size = info.obj_size;
1090    return 0;
1091}
1092
1093static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1094{
1095    BDRVRBDState *s = bs->opaque;
1096    rbd_image_info_t info;
1097    int r;
1098
1099    r = rbd_stat(s->image, &info, sizeof(info));
1100    if (r < 0) {
1101        return r;
1102    }
1103
1104    return info.size;
1105}
1106
1107static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1108                                             int64_t offset,
1109                                             bool exact,
1110                                             PreallocMode prealloc,
1111                                             BdrvRequestFlags flags,
1112                                             Error **errp)
1113{
1114    int r;
1115
1116    if (prealloc != PREALLOC_MODE_OFF) {
1117        error_setg(errp, "Unsupported preallocation mode '%s'",
1118                   PreallocMode_str(prealloc));
1119        return -ENOTSUP;
1120    }
1121
1122    r = qemu_rbd_resize(bs, offset);
1123    if (r < 0) {
1124        error_setg_errno(errp, -r, "Failed to resize file");
1125        return r;
1126    }
1127
1128    return 0;
1129}
1130
1131static int qemu_rbd_snap_create(BlockDriverState *bs,
1132                                QEMUSnapshotInfo *sn_info)
1133{
1134    BDRVRBDState *s = bs->opaque;
1135    int r;
1136
1137    if (sn_info->name[0] == '\0') {
1138        return -EINVAL; /* we need a name for rbd snapshots */
1139    }
1140
1141    /*
1142     * rbd snapshots are using the name as the user controlled unique identifier
1143     * we can't use the rbd snapid for that purpose, as it can't be set
1144     */
1145    if (sn_info->id_str[0] != '\0' &&
1146        strcmp(sn_info->id_str, sn_info->name) != 0) {
1147        return -EINVAL;
1148    }
1149
1150    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1151        return -ERANGE;
1152    }
1153
1154    r = rbd_snap_create(s->image, sn_info->name);
1155    if (r < 0) {
1156        error_report("failed to create snap: %s", strerror(-r));
1157        return r;
1158    }
1159
1160    return 0;
1161}
1162
1163static int qemu_rbd_snap_remove(BlockDriverState *bs,
1164                                const char *snapshot_id,
1165                                const char *snapshot_name,
1166                                Error **errp)
1167{
1168    BDRVRBDState *s = bs->opaque;
1169    int r;
1170
1171    if (!snapshot_name) {
1172        error_setg(errp, "rbd need a valid snapshot name");
1173        return -EINVAL;
1174    }
1175
1176    /* If snapshot_id is specified, it must be equal to name, see
1177       qemu_rbd_snap_list() */
1178    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1179        error_setg(errp,
1180                   "rbd do not support snapshot id, it should be NULL or "
1181                   "equal to snapshot name");
1182        return -EINVAL;
1183    }
1184
1185    r = rbd_snap_remove(s->image, snapshot_name);
1186    if (r < 0) {
1187        error_setg_errno(errp, -r, "Failed to remove the snapshot");
1188    }
1189    return r;
1190}
1191
1192static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1193                                  const char *snapshot_name)
1194{
1195    BDRVRBDState *s = bs->opaque;
1196
1197    return rbd_snap_rollback(s->image, snapshot_name);
1198}
1199
1200static int qemu_rbd_snap_list(BlockDriverState *bs,
1201                              QEMUSnapshotInfo **psn_tab)
1202{
1203    BDRVRBDState *s = bs->opaque;
1204    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1205    int i, snap_count;
1206    rbd_snap_info_t *snaps;
1207    int max_snaps = RBD_MAX_SNAPS;
1208
1209    do {
1210        snaps = g_new(rbd_snap_info_t, max_snaps);
1211        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1212        if (snap_count <= 0) {
1213            g_free(snaps);
1214        }
1215    } while (snap_count == -ERANGE);
1216
1217    if (snap_count <= 0) {
1218        goto done;
1219    }
1220
1221    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1222
1223    for (i = 0; i < snap_count; i++) {
1224        const char *snap_name = snaps[i].name;
1225
1226        sn_info = sn_tab + i;
1227        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1228        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1229
1230        sn_info->vm_state_size = snaps[i].size;
1231        sn_info->date_sec = 0;
1232        sn_info->date_nsec = 0;
1233        sn_info->vm_clock_nsec = 0;
1234    }
1235    rbd_snap_list_end(snaps);
1236    g_free(snaps);
1237
1238 done:
1239    *psn_tab = sn_tab;
1240    return snap_count;
1241}
1242
1243#ifdef LIBRBD_SUPPORTS_DISCARD
1244static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1245                                         int64_t offset,
1246                                         int bytes,
1247                                         BlockCompletionFunc *cb,
1248                                         void *opaque)
1249{
1250    return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
1251                         RBD_AIO_DISCARD);
1252}
1253#endif
1254
1255#ifdef LIBRBD_SUPPORTS_INVALIDATE
1256static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1257                                                      Error **errp)
1258{
1259    BDRVRBDState *s = bs->opaque;
1260    int r = rbd_invalidate_cache(s->image);
1261    if (r < 0) {
1262        error_setg_errno(errp, -r, "Failed to invalidate the cache");
1263    }
1264}
1265#endif
1266
1267static QemuOptsList qemu_rbd_create_opts = {
1268    .name = "rbd-create-opts",
1269    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1270    .desc = {
1271        {
1272            .name = BLOCK_OPT_SIZE,
1273            .type = QEMU_OPT_SIZE,
1274            .help = "Virtual disk size"
1275        },
1276        {
1277            .name = BLOCK_OPT_CLUSTER_SIZE,
1278            .type = QEMU_OPT_SIZE,
1279            .help = "RBD object size"
1280        },
1281        {
1282            .name = "password-secret",
1283            .type = QEMU_OPT_STRING,
1284            .help = "ID of secret providing the password",
1285        },
1286        { /* end of list */ }
1287    }
1288};
1289
1290static const char *const qemu_rbd_strong_runtime_opts[] = {
1291    "pool",
1292    "image",
1293    "conf",
1294    "snapshot",
1295    "user",
1296    "server.",
1297    "password-secret",
1298
1299    NULL
1300};
1301
1302static BlockDriver bdrv_rbd = {
1303    .format_name            = "rbd",
1304    .instance_size          = sizeof(BDRVRBDState),
1305    .bdrv_parse_filename    = qemu_rbd_parse_filename,
1306    .bdrv_refresh_limits    = qemu_rbd_refresh_limits,
1307    .bdrv_file_open         = qemu_rbd_open,
1308    .bdrv_close             = qemu_rbd_close,
1309    .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1310    .bdrv_co_create         = qemu_rbd_co_create,
1311    .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1312    .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1313    .bdrv_get_info          = qemu_rbd_getinfo,
1314    .create_opts            = &qemu_rbd_create_opts,
1315    .bdrv_getlength         = qemu_rbd_getlength,
1316    .bdrv_co_truncate       = qemu_rbd_co_truncate,
1317    .protocol_name          = "rbd",
1318
1319    .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
1320    .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
1321
1322#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1323    .bdrv_aio_flush         = qemu_rbd_aio_flush,
1324#else
1325    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1326#endif
1327
1328#ifdef LIBRBD_SUPPORTS_DISCARD
1329    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1330#endif
1331
1332    .bdrv_snapshot_create   = qemu_rbd_snap_create,
1333    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1334    .bdrv_snapshot_list     = qemu_rbd_snap_list,
1335    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1336#ifdef LIBRBD_SUPPORTS_INVALIDATE
1337    .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1338#endif
1339
1340    .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1341};
1342
1343static void bdrv_rbd_init(void)
1344{
1345    bdrv_register(&bdrv_rbd);
1346}
1347
1348block_init(bdrv_rbd_init);
1349