qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include "qemu/osdep.h"
  15
  16#include <rbd/librbd.h>
  17#include "qapi/error.h"
  18#include "qemu/error-report.h"
  19#include "qemu/module.h"
  20#include "qemu/option.h"
  21#include "block/block_int.h"
  22#include "block/qdict.h"
  23#include "crypto/secret.h"
  24#include "qemu/cutils.h"
  25#include "qapi/qmp/qstring.h"
  26#include "qapi/qmp/qdict.h"
  27#include "qapi/qmp/qjson.h"
  28#include "qapi/qmp/qlist.h"
  29#include "qapi/qobject-input-visitor.h"
  30#include "qapi/qapi-visit-block-core.h"
  31
  32/*
  33 * When specifying the image filename use:
  34 *
  35 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  36 *
  37 * poolname must be the name of an existing rados pool.
  38 *
  39 * devicename is the name of the rbd image.
  40 *
  41 * Each option given is used to configure rados, and may be any valid
  42 * Ceph option, "id", or "conf".
  43 *
  44 * The "id" option indicates what user we should authenticate as to
  45 * the Ceph cluster.  If it is excluded we will use the Ceph default
  46 * (normally 'admin').
  47 *
  48 * The "conf" option specifies a Ceph configuration file to read.  If
  49 * it is not specified, we will read from the default Ceph locations
  50 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  51 * file, specify conf=/dev/null.
  52 *
  53 * Configuration values containing :, @, or = can be escaped with a
  54 * leading "\".
  55 */
  56
  57/* rbd_aio_discard added in 0.1.2 */
  58#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
  59#define LIBRBD_SUPPORTS_DISCARD
  60#else
  61#undef LIBRBD_SUPPORTS_DISCARD
  62#endif
  63
  64#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  65
  66#define RBD_MAX_SNAPS 100
  67
  68/* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
  69#ifdef LIBRBD_SUPPORTS_IOVEC
  70#define LIBRBD_USE_IOVEC 1
  71#else
  72#define LIBRBD_USE_IOVEC 0
  73#endif
  74
  75typedef enum {
  76    RBD_AIO_READ,
  77    RBD_AIO_WRITE,
  78    RBD_AIO_DISCARD,
  79    RBD_AIO_FLUSH
  80} RBDAIOCmd;
  81
  82typedef struct RBDAIOCB {
  83    BlockAIOCB common;
  84    int64_t ret;
  85    QEMUIOVector *qiov;
  86    char *bounce;
  87    RBDAIOCmd cmd;
  88    int error;
  89    struct BDRVRBDState *s;
  90} RBDAIOCB;
  91
  92typedef struct RADOSCB {
  93    RBDAIOCB *acb;
  94    struct BDRVRBDState *s;
  95    int64_t size;
  96    char *buf;
  97    int64_t ret;
  98} RADOSCB;
  99
 100typedef struct BDRVRBDState {
 101    rados_t cluster;
 102    rados_ioctx_t io_ctx;
 103    rbd_image_t image;
 104    char *image_name;
 105    char *snap;
 106    uint64_t image_size;
 107} BDRVRBDState;
 108
 109static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 110                            BlockdevOptionsRbd *opts, bool cache,
 111                            const char *keypairs, const char *secretid,
 112                            Error **errp);
 113
 114static char *qemu_rbd_next_tok(char *src, char delim, char **p)
 115{
 116    char *end;
 117
 118    *p = NULL;
 119
 120    for (end = src; *end; ++end) {
 121        if (*end == delim) {
 122            break;
 123        }
 124        if (*end == '\\' && end[1] != '\0') {
 125            end++;
 126        }
 127    }
 128    if (*end == delim) {
 129        *p = end + 1;
 130        *end = '\0';
 131    }
 132    return src;
 133}
 134
 135static void qemu_rbd_unescape(char *src)
 136{
 137    char *p;
 138
 139    for (p = src; *src; ++src, ++p) {
 140        if (*src == '\\' && src[1] != '\0') {
 141            src++;
 142        }
 143        *p = *src;
 144    }
 145    *p = '\0';
 146}
 147
 148static void qemu_rbd_parse_filename(const char *filename, QDict *options,
 149                                    Error **errp)
 150{
 151    const char *start;
 152    char *p, *buf;
 153    QList *keypairs = NULL;
 154    char *found_str;
 155
 156    if (!strstart(filename, "rbd:", &start)) {
 157        error_setg(errp, "File name must start with 'rbd:'");
 158        return;
 159    }
 160
 161    buf = g_strdup(start);
 162    p = buf;
 163
 164    found_str = qemu_rbd_next_tok(p, '/', &p);
 165    if (!p) {
 166        error_setg(errp, "Pool name is required");
 167        goto done;
 168    }
 169    qemu_rbd_unescape(found_str);
 170    qdict_put_str(options, "pool", found_str);
 171
 172    if (strchr(p, '@')) {
 173        found_str = qemu_rbd_next_tok(p, '@', &p);
 174        qemu_rbd_unescape(found_str);
 175        qdict_put_str(options, "image", found_str);
 176
 177        found_str = qemu_rbd_next_tok(p, ':', &p);
 178        qemu_rbd_unescape(found_str);
 179        qdict_put_str(options, "snapshot", found_str);
 180    } else {
 181        found_str = qemu_rbd_next_tok(p, ':', &p);
 182        qemu_rbd_unescape(found_str);
 183        qdict_put_str(options, "image", found_str);
 184    }
 185    if (!p) {
 186        goto done;
 187    }
 188
 189    /* The following are essentially all key/value pairs, and we treat
 190     * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
 191    while (p) {
 192        char *name, *value;
 193        name = qemu_rbd_next_tok(p, '=', &p);
 194        if (!p) {
 195            error_setg(errp, "conf option %s has no value", name);
 196            break;
 197        }
 198
 199        qemu_rbd_unescape(name);
 200
 201        value = qemu_rbd_next_tok(p, ':', &p);
 202        qemu_rbd_unescape(value);
 203
 204        if (!strcmp(name, "conf")) {
 205            qdict_put_str(options, "conf", value);
 206        } else if (!strcmp(name, "id")) {
 207            qdict_put_str(options, "user", value);
 208        } else {
 209            /*
 210             * We pass these internally to qemu_rbd_set_keypairs(), so
 211             * we can get away with the simpler list of [ "key1",
 212             * "value1", "key2", "value2" ] rather than a raw dict
 213             * { "key1": "value1", "key2": "value2" } where we can't
 214             * guarantee order, or even a more correct but complex
 215             * [ { "key1": "value1" }, { "key2": "value2" } ]
 216             */
 217            if (!keypairs) {
 218                keypairs = qlist_new();
 219            }
 220            qlist_append_str(keypairs, name);
 221            qlist_append_str(keypairs, value);
 222        }
 223    }
 224
 225    if (keypairs) {
 226        qdict_put(options, "=keyvalue-pairs",
 227                  qobject_to_json(QOBJECT(keypairs)));
 228    }
 229
 230done:
 231    g_free(buf);
 232    qobject_unref(keypairs);
 233    return;
 234}
 235
 236
 237static void qemu_rbd_refresh_limits(BlockDriverState *bs, Error **errp)
 238{
 239    /* XXX Does RBD support AIO on less than 512-byte alignment? */
 240    bs->bl.request_alignment = 512;
 241}
 242
 243
 244static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
 245                             Error **errp)
 246{
 247    char *key, *acr;
 248    int r;
 249    GString *accu;
 250    RbdAuthModeList *auth;
 251
 252    if (opts->key_secret) {
 253        key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
 254        if (!key) {
 255            return -EIO;
 256        }
 257        r = rados_conf_set(cluster, "key", key);
 258        g_free(key);
 259        if (r < 0) {
 260            error_setg_errno(errp, -r, "Could not set 'key'");
 261            return r;
 262        }
 263    }
 264
 265    if (opts->has_auth_client_required) {
 266        accu = g_string_new("");
 267        for (auth = opts->auth_client_required; auth; auth = auth->next) {
 268            if (accu->str[0]) {
 269                g_string_append_c(accu, ';');
 270            }
 271            g_string_append(accu, RbdAuthMode_str(auth->value));
 272        }
 273        acr = g_string_free(accu, FALSE);
 274        r = rados_conf_set(cluster, "auth_client_required", acr);
 275        g_free(acr);
 276        if (r < 0) {
 277            error_setg_errno(errp, -r,
 278                             "Could not set 'auth_client_required'");
 279            return r;
 280        }
 281    }
 282
 283    return 0;
 284}
 285
 286static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
 287                                 Error **errp)
 288{
 289    QList *keypairs;
 290    QString *name;
 291    QString *value;
 292    const char *key;
 293    size_t remaining;
 294    int ret = 0;
 295
 296    if (!keypairs_json) {
 297        return ret;
 298    }
 299    keypairs = qobject_to(QList,
 300                          qobject_from_json(keypairs_json, &error_abort));
 301    remaining = qlist_size(keypairs) / 2;
 302    assert(remaining);
 303
 304    while (remaining--) {
 305        name = qobject_to(QString, qlist_pop(keypairs));
 306        value = qobject_to(QString, qlist_pop(keypairs));
 307        assert(name && value);
 308        key = qstring_get_str(name);
 309
 310        ret = rados_conf_set(cluster, key, qstring_get_str(value));
 311        qobject_unref(value);
 312        if (ret < 0) {
 313            error_setg_errno(errp, -ret, "invalid conf option %s", key);
 314            qobject_unref(name);
 315            ret = -EINVAL;
 316            break;
 317        }
 318        qobject_unref(name);
 319    }
 320
 321    qobject_unref(keypairs);
 322    return ret;
 323}
 324
 325static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
 326{
 327    if (LIBRBD_USE_IOVEC) {
 328        RBDAIOCB *acb = rcb->acb;
 329        iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
 330                   acb->qiov->size - offs);
 331    } else {
 332        memset(rcb->buf + offs, 0, rcb->size - offs);
 333    }
 334}
 335
 336static QemuOptsList runtime_opts = {
 337    .name = "rbd",
 338    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
 339    .desc = {
 340        {
 341            .name = "pool",
 342            .type = QEMU_OPT_STRING,
 343            .help = "Rados pool name",
 344        },
 345        {
 346            .name = "image",
 347            .type = QEMU_OPT_STRING,
 348            .help = "Image name in the pool",
 349        },
 350        {
 351            .name = "conf",
 352            .type = QEMU_OPT_STRING,
 353            .help = "Rados config file location",
 354        },
 355        {
 356            .name = "snapshot",
 357            .type = QEMU_OPT_STRING,
 358            .help = "Ceph snapshot name",
 359        },
 360        {
 361            /* maps to 'id' in rados_create() */
 362            .name = "user",
 363            .type = QEMU_OPT_STRING,
 364            .help = "Rados id name",
 365        },
 366        /*
 367         * server.* extracted manually, see qemu_rbd_mon_host()
 368         */
 369        { /* end of list */ }
 370    },
 371};
 372
 373/* FIXME Deprecate and remove keypairs or make it available in QMP. */
 374static int qemu_rbd_do_create(BlockdevCreateOptions *options,
 375                              const char *keypairs, const char *password_secret,
 376                              Error **errp)
 377{
 378    BlockdevCreateOptionsRbd *opts = &options->u.rbd;
 379    rados_t cluster;
 380    rados_ioctx_t io_ctx;
 381    int obj_order = 0;
 382    int ret;
 383
 384    assert(options->driver == BLOCKDEV_DRIVER_RBD);
 385    if (opts->location->has_snapshot) {
 386        error_setg(errp, "Can't use snapshot name for image creation");
 387        return -EINVAL;
 388    }
 389
 390    if (opts->has_cluster_size) {
 391        int64_t objsize = opts->cluster_size;
 392        if ((objsize - 1) & objsize) {    /* not a power of 2? */
 393            error_setg(errp, "obj size needs to be power of 2");
 394            return -EINVAL;
 395        }
 396        if (objsize < 4096) {
 397            error_setg(errp, "obj size too small");
 398            return -EINVAL;
 399        }
 400        obj_order = ctz32(objsize);
 401    }
 402
 403    ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
 404                           password_secret, errp);
 405    if (ret < 0) {
 406        return ret;
 407    }
 408
 409    ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
 410    if (ret < 0) {
 411        error_setg_errno(errp, -ret, "error rbd create");
 412        goto out;
 413    }
 414
 415    ret = 0;
 416out:
 417    rados_ioctx_destroy(io_ctx);
 418    rados_shutdown(cluster);
 419    return ret;
 420}
 421
 422static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
 423{
 424    return qemu_rbd_do_create(options, NULL, NULL, errp);
 425}
 426
 427static int coroutine_fn qemu_rbd_co_create_opts(const char *filename,
 428                                                QemuOpts *opts,
 429                                                Error **errp)
 430{
 431    BlockdevCreateOptions *create_options;
 432    BlockdevCreateOptionsRbd *rbd_opts;
 433    BlockdevOptionsRbd *loc;
 434    Error *local_err = NULL;
 435    const char *keypairs, *password_secret;
 436    QDict *options = NULL;
 437    int ret = 0;
 438
 439    create_options = g_new0(BlockdevCreateOptions, 1);
 440    create_options->driver = BLOCKDEV_DRIVER_RBD;
 441    rbd_opts = &create_options->u.rbd;
 442
 443    rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
 444
 445    password_secret = qemu_opt_get(opts, "password-secret");
 446
 447    /* Read out options */
 448    rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
 449                              BDRV_SECTOR_SIZE);
 450    rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
 451                                                   BLOCK_OPT_CLUSTER_SIZE, 0);
 452    rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
 453
 454    options = qdict_new();
 455    qemu_rbd_parse_filename(filename, options, &local_err);
 456    if (local_err) {
 457        ret = -EINVAL;
 458        error_propagate(errp, local_err);
 459        goto exit;
 460    }
 461
 462    /*
 463     * Caution: while qdict_get_try_str() is fine, getting non-string
 464     * types would require more care.  When @options come from -blockdev
 465     * or blockdev_add, its members are typed according to the QAPI
 466     * schema, but when they come from -drive, they're all QString.
 467     */
 468    loc = rbd_opts->location;
 469    loc->pool     = g_strdup(qdict_get_try_str(options, "pool"));
 470    loc->conf     = g_strdup(qdict_get_try_str(options, "conf"));
 471    loc->has_conf = !!loc->conf;
 472    loc->user     = g_strdup(qdict_get_try_str(options, "user"));
 473    loc->has_user = !!loc->user;
 474    loc->image    = g_strdup(qdict_get_try_str(options, "image"));
 475    keypairs      = qdict_get_try_str(options, "=keyvalue-pairs");
 476
 477    ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
 478    if (ret < 0) {
 479        goto exit;
 480    }
 481
 482exit:
 483    qobject_unref(options);
 484    qapi_free_BlockdevCreateOptions(create_options);
 485    return ret;
 486}
 487
 488/*
 489 * This aio completion is being called from rbd_finish_bh() and runs in qemu
 490 * BH context.
 491 */
 492static void qemu_rbd_complete_aio(RADOSCB *rcb)
 493{
 494    RBDAIOCB *acb = rcb->acb;
 495    int64_t r;
 496
 497    r = rcb->ret;
 498
 499    if (acb->cmd != RBD_AIO_READ) {
 500        if (r < 0) {
 501            acb->ret = r;
 502            acb->error = 1;
 503        } else if (!acb->error) {
 504            acb->ret = rcb->size;
 505        }
 506    } else {
 507        if (r < 0) {
 508            qemu_rbd_memset(rcb, 0);
 509            acb->ret = r;
 510            acb->error = 1;
 511        } else if (r < rcb->size) {
 512            qemu_rbd_memset(rcb, r);
 513            if (!acb->error) {
 514                acb->ret = rcb->size;
 515            }
 516        } else if (!acb->error) {
 517            acb->ret = r;
 518        }
 519    }
 520
 521    g_free(rcb);
 522
 523    if (!LIBRBD_USE_IOVEC) {
 524        if (acb->cmd == RBD_AIO_READ) {
 525            qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 526        }
 527        qemu_vfree(acb->bounce);
 528    }
 529
 530    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 531
 532    qemu_aio_unref(acb);
 533}
 534
 535static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 536{
 537    const char **vals;
 538    const char *host, *port;
 539    char *rados_str;
 540    InetSocketAddressBaseList *p;
 541    int i, cnt;
 542
 543    if (!opts->has_server) {
 544        return NULL;
 545    }
 546
 547    for (cnt = 0, p = opts->server; p; p = p->next) {
 548        cnt++;
 549    }
 550
 551    vals = g_new(const char *, cnt + 1);
 552
 553    for (i = 0, p = opts->server; p; p = p->next, i++) {
 554        host = p->value->host;
 555        port = p->value->port;
 556
 557        if (strchr(host, ':')) {
 558            vals[i] = g_strdup_printf("[%s]:%s", host, port);
 559        } else {
 560            vals[i] = g_strdup_printf("%s:%s", host, port);
 561        }
 562    }
 563    vals[i] = NULL;
 564
 565    rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
 566    g_strfreev((char **)vals);
 567    return rados_str;
 568}
 569
 570static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 571                            BlockdevOptionsRbd *opts, bool cache,
 572                            const char *keypairs, const char *secretid,
 573                            Error **errp)
 574{
 575    char *mon_host = NULL;
 576    Error *local_err = NULL;
 577    int r;
 578
 579    if (secretid) {
 580        if (opts->key_secret) {
 581            error_setg(errp,
 582                       "Legacy 'password-secret' clashes with 'key-secret'");
 583            return -EINVAL;
 584        }
 585        opts->key_secret = g_strdup(secretid);
 586        opts->has_key_secret = true;
 587    }
 588
 589    mon_host = qemu_rbd_mon_host(opts, &local_err);
 590    if (local_err) {
 591        error_propagate(errp, local_err);
 592        r = -EINVAL;
 593        goto failed_opts;
 594    }
 595
 596    r = rados_create(cluster, opts->user);
 597    if (r < 0) {
 598        error_setg_errno(errp, -r, "error initializing");
 599        goto failed_opts;
 600    }
 601
 602    /* try default location when conf=NULL, but ignore failure */
 603    r = rados_conf_read_file(*cluster, opts->conf);
 604    if (opts->has_conf && r < 0) {
 605        error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
 606        goto failed_shutdown;
 607    }
 608
 609    r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
 610    if (r < 0) {
 611        goto failed_shutdown;
 612    }
 613
 614    if (mon_host) {
 615        r = rados_conf_set(*cluster, "mon_host", mon_host);
 616        if (r < 0) {
 617            goto failed_shutdown;
 618        }
 619    }
 620
 621    r = qemu_rbd_set_auth(*cluster, opts, errp);
 622    if (r < 0) {
 623        goto failed_shutdown;
 624    }
 625
 626    /*
 627     * Fallback to more conservative semantics if setting cache
 628     * options fails. Ignore errors from setting rbd_cache because the
 629     * only possible error is that the option does not exist, and
 630     * librbd defaults to no caching. If write through caching cannot
 631     * be set up, fall back to no caching.
 632     */
 633    if (cache) {
 634        rados_conf_set(*cluster, "rbd_cache", "true");
 635    } else {
 636        rados_conf_set(*cluster, "rbd_cache", "false");
 637    }
 638
 639    r = rados_connect(*cluster);
 640    if (r < 0) {
 641        error_setg_errno(errp, -r, "error connecting");
 642        goto failed_shutdown;
 643    }
 644
 645    r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
 646    if (r < 0) {
 647        error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
 648        goto failed_shutdown;
 649    }
 650
 651    return 0;
 652
 653failed_shutdown:
 654    rados_shutdown(*cluster);
 655failed_opts:
 656    g_free(mon_host);
 657    return r;
 658}
 659
 660static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
 661                                    Error **errp)
 662{
 663    Visitor *v;
 664    Error *local_err = NULL;
 665
 666    /* Convert the remaining options into a QAPI object */
 667    v = qobject_input_visitor_new_flat_confused(options, errp);
 668    if (!v) {
 669        return -EINVAL;
 670    }
 671
 672    visit_type_BlockdevOptionsRbd(v, NULL, opts, &local_err);
 673    visit_free(v);
 674
 675    if (local_err) {
 676        error_propagate(errp, local_err);
 677        return -EINVAL;
 678    }
 679
 680    return 0;
 681}
 682
 683static int qemu_rbd_attempt_legacy_options(QDict *options,
 684                                           BlockdevOptionsRbd **opts,
 685                                           char **keypairs)
 686{
 687    char *filename;
 688    int r;
 689
 690    filename = g_strdup(qdict_get_try_str(options, "filename"));
 691    if (!filename) {
 692        return -EINVAL;
 693    }
 694    qdict_del(options, "filename");
 695
 696    qemu_rbd_parse_filename(filename, options, NULL);
 697
 698    /* keypairs freed by caller */
 699    *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 700    if (*keypairs) {
 701        qdict_del(options, "=keyvalue-pairs");
 702    }
 703
 704    r = qemu_rbd_convert_options(options, opts, NULL);
 705
 706    g_free(filename);
 707    return r;
 708}
 709
 710static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
 711                         Error **errp)
 712{
 713    BDRVRBDState *s = bs->opaque;
 714    BlockdevOptionsRbd *opts = NULL;
 715    const QDictEntry *e;
 716    Error *local_err = NULL;
 717    char *keypairs, *secretid;
 718    int r;
 719
 720    keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 721    if (keypairs) {
 722        qdict_del(options, "=keyvalue-pairs");
 723    }
 724
 725    secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
 726    if (secretid) {
 727        qdict_del(options, "password-secret");
 728    }
 729
 730    r = qemu_rbd_convert_options(options, &opts, &local_err);
 731    if (local_err) {
 732        /* If keypairs are present, that means some options are present in
 733         * the modern option format.  Don't attempt to parse legacy option
 734         * formats, as we won't support mixed usage. */
 735        if (keypairs) {
 736            error_propagate(errp, local_err);
 737            goto out;
 738        }
 739
 740        /* If the initial attempt to convert and process the options failed,
 741         * we may be attempting to open an image file that has the rbd options
 742         * specified in the older format consisting of all key/value pairs
 743         * encoded in the filename.  Go ahead and attempt to parse the
 744         * filename, and see if we can pull out the required options. */
 745        r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
 746        if (r < 0) {
 747            /* Propagate the original error, not the legacy parsing fallback
 748             * error, as the latter was just a best-effort attempt. */
 749            error_propagate(errp, local_err);
 750            goto out;
 751        }
 752        /* Take care whenever deciding to actually deprecate; once this ability
 753         * is removed, we will not be able to open any images with legacy-styled
 754         * backing image strings. */
 755        warn_report("RBD options encoded in the filename as keyvalue pairs "
 756                    "is deprecated");
 757    }
 758
 759    /* Remove the processed options from the QDict (the visitor processes
 760     * _all_ options in the QDict) */
 761    while ((e = qdict_first(options))) {
 762        qdict_del(options, e->key);
 763    }
 764
 765    r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
 766                         !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
 767    if (r < 0) {
 768        goto out;
 769    }
 770
 771    s->snap = g_strdup(opts->snapshot);
 772    s->image_name = g_strdup(opts->image);
 773
 774    /* rbd_open is always r/w */
 775    r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
 776    if (r < 0) {
 777        error_setg_errno(errp, -r, "error reading header from %s",
 778                         s->image_name);
 779        goto failed_open;
 780    }
 781
 782    r = rbd_get_size(s->image, &s->image_size);
 783    if (r < 0) {
 784        error_setg_errno(errp, -r, "error getting image size from %s",
 785                         s->image_name);
 786        rbd_close(s->image);
 787        goto failed_open;
 788    }
 789
 790    /* If we are using an rbd snapshot, we must be r/o, otherwise
 791     * leave as-is */
 792    if (s->snap != NULL) {
 793        r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
 794        if (r < 0) {
 795            rbd_close(s->image);
 796            goto failed_open;
 797        }
 798    }
 799
 800    r = 0;
 801    goto out;
 802
 803failed_open:
 804    rados_ioctx_destroy(s->io_ctx);
 805    g_free(s->snap);
 806    g_free(s->image_name);
 807    rados_shutdown(s->cluster);
 808out:
 809    qapi_free_BlockdevOptionsRbd(opts);
 810    g_free(keypairs);
 811    g_free(secretid);
 812    return r;
 813}
 814
 815
 816/* Since RBD is currently always opened R/W via the API,
 817 * we just need to check if we are using a snapshot or not, in
 818 * order to determine if we will allow it to be R/W */
 819static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
 820                                   BlockReopenQueue *queue, Error **errp)
 821{
 822    BDRVRBDState *s = state->bs->opaque;
 823    int ret = 0;
 824
 825    if (s->snap && state->flags & BDRV_O_RDWR) {
 826        error_setg(errp,
 827                   "Cannot change node '%s' to r/w when using RBD snapshot",
 828                   bdrv_get_device_or_node_name(state->bs));
 829        ret = -EINVAL;
 830    }
 831
 832    return ret;
 833}
 834
 835static void qemu_rbd_close(BlockDriverState *bs)
 836{
 837    BDRVRBDState *s = bs->opaque;
 838
 839    rbd_close(s->image);
 840    rados_ioctx_destroy(s->io_ctx);
 841    g_free(s->snap);
 842    g_free(s->image_name);
 843    rados_shutdown(s->cluster);
 844}
 845
 846/* Resize the RBD image and update the 'image_size' with the current size */
 847static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
 848{
 849    BDRVRBDState *s = bs->opaque;
 850    int r;
 851
 852    r = rbd_resize(s->image, size);
 853    if (r < 0) {
 854        return r;
 855    }
 856
 857    s->image_size = size;
 858
 859    return 0;
 860}
 861
 862static const AIOCBInfo rbd_aiocb_info = {
 863    .aiocb_size = sizeof(RBDAIOCB),
 864};
 865
 866static void rbd_finish_bh(void *opaque)
 867{
 868    RADOSCB *rcb = opaque;
 869    qemu_rbd_complete_aio(rcb);
 870}
 871
 872/*
 873 * This is the callback function for rbd_aio_read and _write
 874 *
 875 * Note: this function is being called from a non qemu thread so
 876 * we need to be careful about what we do here. Generally we only
 877 * schedule a BH, and do the rest of the io completion handling
 878 * from rbd_finish_bh() which runs in a qemu context.
 879 */
 880static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 881{
 882    RBDAIOCB *acb = rcb->acb;
 883
 884    rcb->ret = rbd_aio_get_return_value(c);
 885    rbd_aio_release(c);
 886
 887    aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
 888                            rbd_finish_bh, rcb);
 889}
 890
 891static int rbd_aio_discard_wrapper(rbd_image_t image,
 892                                   uint64_t off,
 893                                   uint64_t len,
 894                                   rbd_completion_t comp)
 895{
 896#ifdef LIBRBD_SUPPORTS_DISCARD
 897    return rbd_aio_discard(image, off, len, comp);
 898#else
 899    return -ENOTSUP;
 900#endif
 901}
 902
 903static int rbd_aio_flush_wrapper(rbd_image_t image,
 904                                 rbd_completion_t comp)
 905{
 906#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 907    return rbd_aio_flush(image, comp);
 908#else
 909    return -ENOTSUP;
 910#endif
 911}
 912
 913static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 914                                 int64_t off,
 915                                 QEMUIOVector *qiov,
 916                                 int64_t size,
 917                                 BlockCompletionFunc *cb,
 918                                 void *opaque,
 919                                 RBDAIOCmd cmd)
 920{
 921    RBDAIOCB *acb;
 922    RADOSCB *rcb = NULL;
 923    rbd_completion_t c;
 924    int r;
 925
 926    BDRVRBDState *s = bs->opaque;
 927
 928    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
 929    acb->cmd = cmd;
 930    acb->qiov = qiov;
 931    assert(!qiov || qiov->size == size);
 932
 933    rcb = g_new(RADOSCB, 1);
 934
 935    if (!LIBRBD_USE_IOVEC) {
 936        if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 937            acb->bounce = NULL;
 938        } else {
 939            acb->bounce = qemu_try_blockalign(bs, qiov->size);
 940            if (acb->bounce == NULL) {
 941                goto failed;
 942            }
 943        }
 944        if (cmd == RBD_AIO_WRITE) {
 945            qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 946        }
 947        rcb->buf = acb->bounce;
 948    }
 949
 950    acb->ret = 0;
 951    acb->error = 0;
 952    acb->s = s;
 953
 954    rcb->acb = acb;
 955    rcb->s = acb->s;
 956    rcb->size = size;
 957    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
 958    if (r < 0) {
 959        goto failed;
 960    }
 961
 962    switch (cmd) {
 963    case RBD_AIO_WRITE: {
 964        /*
 965         * RBD APIs don't allow us to write more than actual size, so in order
 966         * to support growing images, we resize the image before write
 967         * operations that exceed the current size.
 968         */
 969        if (off + size > s->image_size) {
 970            r = qemu_rbd_resize(bs, off + size);
 971            if (r < 0) {
 972                goto failed_completion;
 973            }
 974        }
 975#ifdef LIBRBD_SUPPORTS_IOVEC
 976            r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
 977#else
 978            r = rbd_aio_write(s->image, off, size, rcb->buf, c);
 979#endif
 980        break;
 981    }
 982    case RBD_AIO_READ:
 983#ifdef LIBRBD_SUPPORTS_IOVEC
 984            r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
 985#else
 986            r = rbd_aio_read(s->image, off, size, rcb->buf, c);
 987#endif
 988        break;
 989    case RBD_AIO_DISCARD:
 990        r = rbd_aio_discard_wrapper(s->image, off, size, c);
 991        break;
 992    case RBD_AIO_FLUSH:
 993        r = rbd_aio_flush_wrapper(s->image, c);
 994        break;
 995    default:
 996        r = -EINVAL;
 997    }
 998
 999    if (r < 0) {
1000        goto failed_completion;
1001    }
1002    return &acb->common;
1003
1004failed_completion:
1005    rbd_aio_release(c);
1006failed:
1007    g_free(rcb);
1008    if (!LIBRBD_USE_IOVEC) {
1009        qemu_vfree(acb->bounce);
1010    }
1011
1012    qemu_aio_unref(acb);
1013    return NULL;
1014}
1015
1016static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
1017                                       uint64_t offset, uint64_t bytes,
1018                                       QEMUIOVector *qiov, int flags,
1019                                       BlockCompletionFunc *cb,
1020                                       void *opaque)
1021{
1022    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1023                         RBD_AIO_READ);
1024}
1025
1026static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
1027                                        uint64_t offset, uint64_t bytes,
1028                                        QEMUIOVector *qiov, int flags,
1029                                        BlockCompletionFunc *cb,
1030                                        void *opaque)
1031{
1032    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1033                         RBD_AIO_WRITE);
1034}
1035
1036#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1037static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
1038                                      BlockCompletionFunc *cb,
1039                                      void *opaque)
1040{
1041    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
1042}
1043
1044#else
1045
1046static int qemu_rbd_co_flush(BlockDriverState *bs)
1047{
1048#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
1049    /* rbd_flush added in 0.1.1 */
1050    BDRVRBDState *s = bs->opaque;
1051    return rbd_flush(s->image);
1052#else
1053    return 0;
1054#endif
1055}
1056#endif
1057
1058static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1059{
1060    BDRVRBDState *s = bs->opaque;
1061    rbd_image_info_t info;
1062    int r;
1063
1064    r = rbd_stat(s->image, &info, sizeof(info));
1065    if (r < 0) {
1066        return r;
1067    }
1068
1069    bdi->cluster_size = info.obj_size;
1070    return 0;
1071}
1072
1073static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1074{
1075    BDRVRBDState *s = bs->opaque;
1076    rbd_image_info_t info;
1077    int r;
1078
1079    r = rbd_stat(s->image, &info, sizeof(info));
1080    if (r < 0) {
1081        return r;
1082    }
1083
1084    return info.size;
1085}
1086
1087static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1088                                             int64_t offset,
1089                                             PreallocMode prealloc,
1090                                             Error **errp)
1091{
1092    int r;
1093
1094    if (prealloc != PREALLOC_MODE_OFF) {
1095        error_setg(errp, "Unsupported preallocation mode '%s'",
1096                   PreallocMode_str(prealloc));
1097        return -ENOTSUP;
1098    }
1099
1100    r = qemu_rbd_resize(bs, offset);
1101    if (r < 0) {
1102        error_setg_errno(errp, -r, "Failed to resize file");
1103        return r;
1104    }
1105
1106    return 0;
1107}
1108
1109static int qemu_rbd_snap_create(BlockDriverState *bs,
1110                                QEMUSnapshotInfo *sn_info)
1111{
1112    BDRVRBDState *s = bs->opaque;
1113    int r;
1114
1115    if (sn_info->name[0] == '\0') {
1116        return -EINVAL; /* we need a name for rbd snapshots */
1117    }
1118
1119    /*
1120     * rbd snapshots are using the name as the user controlled unique identifier
1121     * we can't use the rbd snapid for that purpose, as it can't be set
1122     */
1123    if (sn_info->id_str[0] != '\0' &&
1124        strcmp(sn_info->id_str, sn_info->name) != 0) {
1125        return -EINVAL;
1126    }
1127
1128    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1129        return -ERANGE;
1130    }
1131
1132    r = rbd_snap_create(s->image, sn_info->name);
1133    if (r < 0) {
1134        error_report("failed to create snap: %s", strerror(-r));
1135        return r;
1136    }
1137
1138    return 0;
1139}
1140
1141static int qemu_rbd_snap_remove(BlockDriverState *bs,
1142                                const char *snapshot_id,
1143                                const char *snapshot_name,
1144                                Error **errp)
1145{
1146    BDRVRBDState *s = bs->opaque;
1147    int r;
1148
1149    if (!snapshot_name) {
1150        error_setg(errp, "rbd need a valid snapshot name");
1151        return -EINVAL;
1152    }
1153
1154    /* If snapshot_id is specified, it must be equal to name, see
1155       qemu_rbd_snap_list() */
1156    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1157        error_setg(errp,
1158                   "rbd do not support snapshot id, it should be NULL or "
1159                   "equal to snapshot name");
1160        return -EINVAL;
1161    }
1162
1163    r = rbd_snap_remove(s->image, snapshot_name);
1164    if (r < 0) {
1165        error_setg_errno(errp, -r, "Failed to remove the snapshot");
1166    }
1167    return r;
1168}
1169
1170static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1171                                  const char *snapshot_name)
1172{
1173    BDRVRBDState *s = bs->opaque;
1174
1175    return rbd_snap_rollback(s->image, snapshot_name);
1176}
1177
1178static int qemu_rbd_snap_list(BlockDriverState *bs,
1179                              QEMUSnapshotInfo **psn_tab)
1180{
1181    BDRVRBDState *s = bs->opaque;
1182    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1183    int i, snap_count;
1184    rbd_snap_info_t *snaps;
1185    int max_snaps = RBD_MAX_SNAPS;
1186
1187    do {
1188        snaps = g_new(rbd_snap_info_t, max_snaps);
1189        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1190        if (snap_count <= 0) {
1191            g_free(snaps);
1192        }
1193    } while (snap_count == -ERANGE);
1194
1195    if (snap_count <= 0) {
1196        goto done;
1197    }
1198
1199    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1200
1201    for (i = 0; i < snap_count; i++) {
1202        const char *snap_name = snaps[i].name;
1203
1204        sn_info = sn_tab + i;
1205        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1206        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1207
1208        sn_info->vm_state_size = snaps[i].size;
1209        sn_info->date_sec = 0;
1210        sn_info->date_nsec = 0;
1211        sn_info->vm_clock_nsec = 0;
1212    }
1213    rbd_snap_list_end(snaps);
1214    g_free(snaps);
1215
1216 done:
1217    *psn_tab = sn_tab;
1218    return snap_count;
1219}
1220
1221#ifdef LIBRBD_SUPPORTS_DISCARD
1222static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1223                                         int64_t offset,
1224                                         int bytes,
1225                                         BlockCompletionFunc *cb,
1226                                         void *opaque)
1227{
1228    return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
1229                         RBD_AIO_DISCARD);
1230}
1231#endif
1232
1233#ifdef LIBRBD_SUPPORTS_INVALIDATE
1234static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1235                                                      Error **errp)
1236{
1237    BDRVRBDState *s = bs->opaque;
1238    int r = rbd_invalidate_cache(s->image);
1239    if (r < 0) {
1240        error_setg_errno(errp, -r, "Failed to invalidate the cache");
1241    }
1242}
1243#endif
1244
1245static QemuOptsList qemu_rbd_create_opts = {
1246    .name = "rbd-create-opts",
1247    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1248    .desc = {
1249        {
1250            .name = BLOCK_OPT_SIZE,
1251            .type = QEMU_OPT_SIZE,
1252            .help = "Virtual disk size"
1253        },
1254        {
1255            .name = BLOCK_OPT_CLUSTER_SIZE,
1256            .type = QEMU_OPT_SIZE,
1257            .help = "RBD object size"
1258        },
1259        {
1260            .name = "password-secret",
1261            .type = QEMU_OPT_STRING,
1262            .help = "ID of secret providing the password",
1263        },
1264        { /* end of list */ }
1265    }
1266};
1267
1268static const char *const qemu_rbd_strong_runtime_opts[] = {
1269    "pool",
1270    "image",
1271    "conf",
1272    "snapshot",
1273    "user",
1274    "server.",
1275    "password-secret",
1276
1277    NULL
1278};
1279
1280static BlockDriver bdrv_rbd = {
1281    .format_name            = "rbd",
1282    .instance_size          = sizeof(BDRVRBDState),
1283    .bdrv_parse_filename    = qemu_rbd_parse_filename,
1284    .bdrv_refresh_limits    = qemu_rbd_refresh_limits,
1285    .bdrv_file_open         = qemu_rbd_open,
1286    .bdrv_close             = qemu_rbd_close,
1287    .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1288    .bdrv_co_create         = qemu_rbd_co_create,
1289    .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1290    .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1291    .bdrv_get_info          = qemu_rbd_getinfo,
1292    .create_opts            = &qemu_rbd_create_opts,
1293    .bdrv_getlength         = qemu_rbd_getlength,
1294    .bdrv_co_truncate       = qemu_rbd_co_truncate,
1295    .protocol_name          = "rbd",
1296
1297    .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
1298    .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
1299
1300#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1301    .bdrv_aio_flush         = qemu_rbd_aio_flush,
1302#else
1303    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1304#endif
1305
1306#ifdef LIBRBD_SUPPORTS_DISCARD
1307    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1308#endif
1309
1310    .bdrv_snapshot_create   = qemu_rbd_snap_create,
1311    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1312    .bdrv_snapshot_list     = qemu_rbd_snap_list,
1313    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1314#ifdef LIBRBD_SUPPORTS_INVALIDATE
1315    .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1316#endif
1317
1318    .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1319};
1320
1321static void bdrv_rbd_init(void)
1322{
1323    bdrv_register(&bdrv_rbd);
1324}
1325
1326block_init(bdrv_rbd_init);
1327