qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include "qemu/osdep.h"
  15
  16#include <rbd/librbd.h>
  17#include "qapi/error.h"
  18#include "qemu/error-report.h"
  19#include "qemu/module.h"
  20#include "qemu/option.h"
  21#include "block/block_int.h"
  22#include "block/qdict.h"
  23#include "crypto/secret.h"
  24#include "qemu/cutils.h"
  25#include "sysemu/replay.h"
  26#include "qapi/qmp/qstring.h"
  27#include "qapi/qmp/qdict.h"
  28#include "qapi/qmp/qjson.h"
  29#include "qapi/qmp/qlist.h"
  30#include "qapi/qobject-input-visitor.h"
  31#include "qapi/qapi-visit-block-core.h"
  32
  33/*
  34 * When specifying the image filename use:
  35 *
  36 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  37 *
  38 * poolname must be the name of an existing rados pool.
  39 *
  40 * devicename is the name of the rbd image.
  41 *
  42 * Each option given is used to configure rados, and may be any valid
  43 * Ceph option, "id", or "conf".
  44 *
  45 * The "id" option indicates what user we should authenticate as to
  46 * the Ceph cluster.  If it is excluded we will use the Ceph default
  47 * (normally 'admin').
  48 *
  49 * The "conf" option specifies a Ceph configuration file to read.  If
  50 * it is not specified, we will read from the default Ceph locations
  51 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  52 * file, specify conf=/dev/null.
  53 *
  54 * Configuration values containing :, @, or = can be escaped with a
  55 * leading "\".
  56 */
  57
  58/* rbd_aio_discard added in 0.1.2 */
  59#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
  60#define LIBRBD_SUPPORTS_DISCARD
  61#else
  62#undef LIBRBD_SUPPORTS_DISCARD
  63#endif
  64
  65#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  66
  67#define RBD_MAX_SNAPS 100
  68
  69/* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
  70#ifdef LIBRBD_SUPPORTS_IOVEC
  71#define LIBRBD_USE_IOVEC 1
  72#else
  73#define LIBRBD_USE_IOVEC 0
  74#endif
  75
  76typedef enum {
  77    RBD_AIO_READ,
  78    RBD_AIO_WRITE,
  79    RBD_AIO_DISCARD,
  80    RBD_AIO_FLUSH
  81} RBDAIOCmd;
  82
  83typedef struct RBDAIOCB {
  84    BlockAIOCB common;
  85    int64_t ret;
  86    QEMUIOVector *qiov;
  87    char *bounce;
  88    RBDAIOCmd cmd;
  89    int error;
  90    struct BDRVRBDState *s;
  91} RBDAIOCB;
  92
  93typedef struct RADOSCB {
  94    RBDAIOCB *acb;
  95    struct BDRVRBDState *s;
  96    int64_t size;
  97    char *buf;
  98    int64_t ret;
  99} RADOSCB;
 100
 101typedef struct BDRVRBDState {
 102    rados_t cluster;
 103    rados_ioctx_t io_ctx;
 104    rbd_image_t image;
 105    char *image_name;
 106    char *snap;
 107    char *namespace;
 108    uint64_t image_size;
 109} BDRVRBDState;
 110
 111static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 112                            BlockdevOptionsRbd *opts, bool cache,
 113                            const char *keypairs, const char *secretid,
 114                            Error **errp);
 115
 116static char *qemu_rbd_next_tok(char *src, char delim, char **p)
 117{
 118    char *end;
 119
 120    *p = NULL;
 121
 122    for (end = src; *end; ++end) {
 123        if (*end == delim) {
 124            break;
 125        }
 126        if (*end == '\\' && end[1] != '\0') {
 127            end++;
 128        }
 129    }
 130    if (*end == delim) {
 131        *p = end + 1;
 132        *end = '\0';
 133    }
 134    return src;
 135}
 136
 137static void qemu_rbd_unescape(char *src)
 138{
 139    char *p;
 140
 141    for (p = src; *src; ++src, ++p) {
 142        if (*src == '\\' && src[1] != '\0') {
 143            src++;
 144        }
 145        *p = *src;
 146    }
 147    *p = '\0';
 148}
 149
 150static void qemu_rbd_parse_filename(const char *filename, QDict *options,
 151                                    Error **errp)
 152{
 153    const char *start;
 154    char *p, *buf;
 155    QList *keypairs = NULL;
 156    char *found_str, *image_name;
 157
 158    if (!strstart(filename, "rbd:", &start)) {
 159        error_setg(errp, "File name must start with 'rbd:'");
 160        return;
 161    }
 162
 163    buf = g_strdup(start);
 164    p = buf;
 165
 166    found_str = qemu_rbd_next_tok(p, '/', &p);
 167    if (!p) {
 168        error_setg(errp, "Pool name is required");
 169        goto done;
 170    }
 171    qemu_rbd_unescape(found_str);
 172    qdict_put_str(options, "pool", found_str);
 173
 174    if (strchr(p, '@')) {
 175        image_name = qemu_rbd_next_tok(p, '@', &p);
 176
 177        found_str = qemu_rbd_next_tok(p, ':', &p);
 178        qemu_rbd_unescape(found_str);
 179        qdict_put_str(options, "snapshot", found_str);
 180    } else {
 181        image_name = qemu_rbd_next_tok(p, ':', &p);
 182    }
 183    /* Check for namespace in the image_name */
 184    if (strchr(image_name, '/')) {
 185        found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
 186        qemu_rbd_unescape(found_str);
 187        qdict_put_str(options, "namespace", found_str);
 188    } else {
 189        qdict_put_str(options, "namespace", "");
 190    }
 191    qemu_rbd_unescape(image_name);
 192    qdict_put_str(options, "image", image_name);
 193    if (!p) {
 194        goto done;
 195    }
 196
 197    /* The following are essentially all key/value pairs, and we treat
 198     * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
 199    while (p) {
 200        char *name, *value;
 201        name = qemu_rbd_next_tok(p, '=', &p);
 202        if (!p) {
 203            error_setg(errp, "conf option %s has no value", name);
 204            break;
 205        }
 206
 207        qemu_rbd_unescape(name);
 208
 209        value = qemu_rbd_next_tok(p, ':', &p);
 210        qemu_rbd_unescape(value);
 211
 212        if (!strcmp(name, "conf")) {
 213            qdict_put_str(options, "conf", value);
 214        } else if (!strcmp(name, "id")) {
 215            qdict_put_str(options, "user", value);
 216        } else {
 217            /*
 218             * We pass these internally to qemu_rbd_set_keypairs(), so
 219             * we can get away with the simpler list of [ "key1",
 220             * "value1", "key2", "value2" ] rather than a raw dict
 221             * { "key1": "value1", "key2": "value2" } where we can't
 222             * guarantee order, or even a more correct but complex
 223             * [ { "key1": "value1" }, { "key2": "value2" } ]
 224             */
 225            if (!keypairs) {
 226                keypairs = qlist_new();
 227            }
 228            qlist_append_str(keypairs, name);
 229            qlist_append_str(keypairs, value);
 230        }
 231    }
 232
 233    if (keypairs) {
 234        qdict_put(options, "=keyvalue-pairs",
 235                  qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
 236    }
 237
 238done:
 239    g_free(buf);
 240    qobject_unref(keypairs);
 241    return;
 242}
 243
 244
 245static void qemu_rbd_refresh_limits(BlockDriverState *bs, Error **errp)
 246{
 247    /* XXX Does RBD support AIO on less than 512-byte alignment? */
 248    bs->bl.request_alignment = 512;
 249}
 250
 251
 252static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
 253                             Error **errp)
 254{
 255    char *key, *acr;
 256    int r;
 257    GString *accu;
 258    RbdAuthModeList *auth;
 259
 260    if (opts->key_secret) {
 261        key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
 262        if (!key) {
 263            return -EIO;
 264        }
 265        r = rados_conf_set(cluster, "key", key);
 266        g_free(key);
 267        if (r < 0) {
 268            error_setg_errno(errp, -r, "Could not set 'key'");
 269            return r;
 270        }
 271    }
 272
 273    if (opts->has_auth_client_required) {
 274        accu = g_string_new("");
 275        for (auth = opts->auth_client_required; auth; auth = auth->next) {
 276            if (accu->str[0]) {
 277                g_string_append_c(accu, ';');
 278            }
 279            g_string_append(accu, RbdAuthMode_str(auth->value));
 280        }
 281        acr = g_string_free(accu, FALSE);
 282        r = rados_conf_set(cluster, "auth_client_required", acr);
 283        g_free(acr);
 284        if (r < 0) {
 285            error_setg_errno(errp, -r,
 286                             "Could not set 'auth_client_required'");
 287            return r;
 288        }
 289    }
 290
 291    return 0;
 292}
 293
 294static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
 295                                 Error **errp)
 296{
 297    QList *keypairs;
 298    QString *name;
 299    QString *value;
 300    const char *key;
 301    size_t remaining;
 302    int ret = 0;
 303
 304    if (!keypairs_json) {
 305        return ret;
 306    }
 307    keypairs = qobject_to(QList,
 308                          qobject_from_json(keypairs_json, &error_abort));
 309    remaining = qlist_size(keypairs) / 2;
 310    assert(remaining);
 311
 312    while (remaining--) {
 313        name = qobject_to(QString, qlist_pop(keypairs));
 314        value = qobject_to(QString, qlist_pop(keypairs));
 315        assert(name && value);
 316        key = qstring_get_str(name);
 317
 318        ret = rados_conf_set(cluster, key, qstring_get_str(value));
 319        qobject_unref(value);
 320        if (ret < 0) {
 321            error_setg_errno(errp, -ret, "invalid conf option %s", key);
 322            qobject_unref(name);
 323            ret = -EINVAL;
 324            break;
 325        }
 326        qobject_unref(name);
 327    }
 328
 329    qobject_unref(keypairs);
 330    return ret;
 331}
 332
 333static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
 334{
 335    if (LIBRBD_USE_IOVEC) {
 336        RBDAIOCB *acb = rcb->acb;
 337        iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
 338                   acb->qiov->size - offs);
 339    } else {
 340        memset(rcb->buf + offs, 0, rcb->size - offs);
 341    }
 342}
 343
 344/* FIXME Deprecate and remove keypairs or make it available in QMP. */
 345static int qemu_rbd_do_create(BlockdevCreateOptions *options,
 346                              const char *keypairs, const char *password_secret,
 347                              Error **errp)
 348{
 349    BlockdevCreateOptionsRbd *opts = &options->u.rbd;
 350    rados_t cluster;
 351    rados_ioctx_t io_ctx;
 352    int obj_order = 0;
 353    int ret;
 354
 355    assert(options->driver == BLOCKDEV_DRIVER_RBD);
 356    if (opts->location->has_snapshot) {
 357        error_setg(errp, "Can't use snapshot name for image creation");
 358        return -EINVAL;
 359    }
 360
 361    if (opts->has_cluster_size) {
 362        int64_t objsize = opts->cluster_size;
 363        if ((objsize - 1) & objsize) {    /* not a power of 2? */
 364            error_setg(errp, "obj size needs to be power of 2");
 365            return -EINVAL;
 366        }
 367        if (objsize < 4096) {
 368            error_setg(errp, "obj size too small");
 369            return -EINVAL;
 370        }
 371        obj_order = ctz32(objsize);
 372    }
 373
 374    ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
 375                           password_secret, errp);
 376    if (ret < 0) {
 377        return ret;
 378    }
 379
 380    ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
 381    if (ret < 0) {
 382        error_setg_errno(errp, -ret, "error rbd create");
 383        goto out;
 384    }
 385
 386    ret = 0;
 387out:
 388    rados_ioctx_destroy(io_ctx);
 389    rados_shutdown(cluster);
 390    return ret;
 391}
 392
 393static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
 394{
 395    return qemu_rbd_do_create(options, NULL, NULL, errp);
 396}
 397
 398static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
 399                                                const char *filename,
 400                                                QemuOpts *opts,
 401                                                Error **errp)
 402{
 403    BlockdevCreateOptions *create_options;
 404    BlockdevCreateOptionsRbd *rbd_opts;
 405    BlockdevOptionsRbd *loc;
 406    Error *local_err = NULL;
 407    const char *keypairs, *password_secret;
 408    QDict *options = NULL;
 409    int ret = 0;
 410
 411    create_options = g_new0(BlockdevCreateOptions, 1);
 412    create_options->driver = BLOCKDEV_DRIVER_RBD;
 413    rbd_opts = &create_options->u.rbd;
 414
 415    rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
 416
 417    password_secret = qemu_opt_get(opts, "password-secret");
 418
 419    /* Read out options */
 420    rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
 421                              BDRV_SECTOR_SIZE);
 422    rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
 423                                                   BLOCK_OPT_CLUSTER_SIZE, 0);
 424    rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
 425
 426    options = qdict_new();
 427    qemu_rbd_parse_filename(filename, options, &local_err);
 428    if (local_err) {
 429        ret = -EINVAL;
 430        error_propagate(errp, local_err);
 431        goto exit;
 432    }
 433
 434    /*
 435     * Caution: while qdict_get_try_str() is fine, getting non-string
 436     * types would require more care.  When @options come from -blockdev
 437     * or blockdev_add, its members are typed according to the QAPI
 438     * schema, but when they come from -drive, they're all QString.
 439     */
 440    loc = rbd_opts->location;
 441    loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
 442    loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
 443    loc->has_conf    = !!loc->conf;
 444    loc->user        = g_strdup(qdict_get_try_str(options, "user"));
 445    loc->has_user    = !!loc->user;
 446    loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
 447    loc->has_q_namespace = !!loc->q_namespace;
 448    loc->image       = g_strdup(qdict_get_try_str(options, "image"));
 449    keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
 450
 451    ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
 452    if (ret < 0) {
 453        goto exit;
 454    }
 455
 456exit:
 457    qobject_unref(options);
 458    qapi_free_BlockdevCreateOptions(create_options);
 459    return ret;
 460}
 461
 462/*
 463 * This aio completion is being called from rbd_finish_bh() and runs in qemu
 464 * BH context.
 465 */
 466static void qemu_rbd_complete_aio(RADOSCB *rcb)
 467{
 468    RBDAIOCB *acb = rcb->acb;
 469    int64_t r;
 470
 471    r = rcb->ret;
 472
 473    if (acb->cmd != RBD_AIO_READ) {
 474        if (r < 0) {
 475            acb->ret = r;
 476            acb->error = 1;
 477        } else if (!acb->error) {
 478            acb->ret = rcb->size;
 479        }
 480    } else {
 481        if (r < 0) {
 482            qemu_rbd_memset(rcb, 0);
 483            acb->ret = r;
 484            acb->error = 1;
 485        } else if (r < rcb->size) {
 486            qemu_rbd_memset(rcb, r);
 487            if (!acb->error) {
 488                acb->ret = rcb->size;
 489            }
 490        } else if (!acb->error) {
 491            acb->ret = r;
 492        }
 493    }
 494
 495    g_free(rcb);
 496
 497    if (!LIBRBD_USE_IOVEC) {
 498        if (acb->cmd == RBD_AIO_READ) {
 499            qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 500        }
 501        qemu_vfree(acb->bounce);
 502    }
 503
 504    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 505
 506    qemu_aio_unref(acb);
 507}
 508
 509static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 510{
 511    const char **vals;
 512    const char *host, *port;
 513    char *rados_str;
 514    InetSocketAddressBaseList *p;
 515    int i, cnt;
 516
 517    if (!opts->has_server) {
 518        return NULL;
 519    }
 520
 521    for (cnt = 0, p = opts->server; p; p = p->next) {
 522        cnt++;
 523    }
 524
 525    vals = g_new(const char *, cnt + 1);
 526
 527    for (i = 0, p = opts->server; p; p = p->next, i++) {
 528        host = p->value->host;
 529        port = p->value->port;
 530
 531        if (strchr(host, ':')) {
 532            vals[i] = g_strdup_printf("[%s]:%s", host, port);
 533        } else {
 534            vals[i] = g_strdup_printf("%s:%s", host, port);
 535        }
 536    }
 537    vals[i] = NULL;
 538
 539    rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
 540    g_strfreev((char **)vals);
 541    return rados_str;
 542}
 543
 544static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 545                            BlockdevOptionsRbd *opts, bool cache,
 546                            const char *keypairs, const char *secretid,
 547                            Error **errp)
 548{
 549    char *mon_host = NULL;
 550    Error *local_err = NULL;
 551    int r;
 552
 553    if (secretid) {
 554        if (opts->key_secret) {
 555            error_setg(errp,
 556                       "Legacy 'password-secret' clashes with 'key-secret'");
 557            return -EINVAL;
 558        }
 559        opts->key_secret = g_strdup(secretid);
 560        opts->has_key_secret = true;
 561    }
 562
 563    mon_host = qemu_rbd_mon_host(opts, &local_err);
 564    if (local_err) {
 565        error_propagate(errp, local_err);
 566        r = -EINVAL;
 567        goto out;
 568    }
 569
 570    r = rados_create(cluster, opts->user);
 571    if (r < 0) {
 572        error_setg_errno(errp, -r, "error initializing");
 573        goto out;
 574    }
 575
 576    /* try default location when conf=NULL, but ignore failure */
 577    r = rados_conf_read_file(*cluster, opts->conf);
 578    if (opts->has_conf && r < 0) {
 579        error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
 580        goto failed_shutdown;
 581    }
 582
 583    r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
 584    if (r < 0) {
 585        goto failed_shutdown;
 586    }
 587
 588    if (mon_host) {
 589        r = rados_conf_set(*cluster, "mon_host", mon_host);
 590        if (r < 0) {
 591            goto failed_shutdown;
 592        }
 593    }
 594
 595    r = qemu_rbd_set_auth(*cluster, opts, errp);
 596    if (r < 0) {
 597        goto failed_shutdown;
 598    }
 599
 600    /*
 601     * Fallback to more conservative semantics if setting cache
 602     * options fails. Ignore errors from setting rbd_cache because the
 603     * only possible error is that the option does not exist, and
 604     * librbd defaults to no caching. If write through caching cannot
 605     * be set up, fall back to no caching.
 606     */
 607    if (cache) {
 608        rados_conf_set(*cluster, "rbd_cache", "true");
 609    } else {
 610        rados_conf_set(*cluster, "rbd_cache", "false");
 611    }
 612
 613    r = rados_connect(*cluster);
 614    if (r < 0) {
 615        error_setg_errno(errp, -r, "error connecting");
 616        goto failed_shutdown;
 617    }
 618
 619    r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
 620    if (r < 0) {
 621        error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
 622        goto failed_shutdown;
 623    }
 624    /*
 625     * Set the namespace after opening the io context on the pool,
 626     * if nspace == NULL or if nspace == "", it is just as we did nothing
 627     */
 628    rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
 629
 630    r = 0;
 631    goto out;
 632
 633failed_shutdown:
 634    rados_shutdown(*cluster);
 635out:
 636    g_free(mon_host);
 637    return r;
 638}
 639
 640static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
 641                                    Error **errp)
 642{
 643    Visitor *v;
 644
 645    /* Convert the remaining options into a QAPI object */
 646    v = qobject_input_visitor_new_flat_confused(options, errp);
 647    if (!v) {
 648        return -EINVAL;
 649    }
 650
 651    visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
 652    visit_free(v);
 653    if (!opts) {
 654        return -EINVAL;
 655    }
 656
 657    return 0;
 658}
 659
 660static int qemu_rbd_attempt_legacy_options(QDict *options,
 661                                           BlockdevOptionsRbd **opts,
 662                                           char **keypairs)
 663{
 664    char *filename;
 665    int r;
 666
 667    filename = g_strdup(qdict_get_try_str(options, "filename"));
 668    if (!filename) {
 669        return -EINVAL;
 670    }
 671    qdict_del(options, "filename");
 672
 673    qemu_rbd_parse_filename(filename, options, NULL);
 674
 675    /* keypairs freed by caller */
 676    *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 677    if (*keypairs) {
 678        qdict_del(options, "=keyvalue-pairs");
 679    }
 680
 681    r = qemu_rbd_convert_options(options, opts, NULL);
 682
 683    g_free(filename);
 684    return r;
 685}
 686
 687static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
 688                         Error **errp)
 689{
 690    BDRVRBDState *s = bs->opaque;
 691    BlockdevOptionsRbd *opts = NULL;
 692    const QDictEntry *e;
 693    Error *local_err = NULL;
 694    char *keypairs, *secretid;
 695    int r;
 696
 697    keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
 698    if (keypairs) {
 699        qdict_del(options, "=keyvalue-pairs");
 700    }
 701
 702    secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
 703    if (secretid) {
 704        qdict_del(options, "password-secret");
 705    }
 706
 707    r = qemu_rbd_convert_options(options, &opts, &local_err);
 708    if (local_err) {
 709        /* If keypairs are present, that means some options are present in
 710         * the modern option format.  Don't attempt to parse legacy option
 711         * formats, as we won't support mixed usage. */
 712        if (keypairs) {
 713            error_propagate(errp, local_err);
 714            goto out;
 715        }
 716
 717        /* If the initial attempt to convert and process the options failed,
 718         * we may be attempting to open an image file that has the rbd options
 719         * specified in the older format consisting of all key/value pairs
 720         * encoded in the filename.  Go ahead and attempt to parse the
 721         * filename, and see if we can pull out the required options. */
 722        r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
 723        if (r < 0) {
 724            /* Propagate the original error, not the legacy parsing fallback
 725             * error, as the latter was just a best-effort attempt. */
 726            error_propagate(errp, local_err);
 727            goto out;
 728        }
 729        /* Take care whenever deciding to actually deprecate; once this ability
 730         * is removed, we will not be able to open any images with legacy-styled
 731         * backing image strings. */
 732        warn_report("RBD options encoded in the filename as keyvalue pairs "
 733                    "is deprecated");
 734    }
 735
 736    /* Remove the processed options from the QDict (the visitor processes
 737     * _all_ options in the QDict) */
 738    while ((e = qdict_first(options))) {
 739        qdict_del(options, e->key);
 740    }
 741
 742    r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
 743                         !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
 744    if (r < 0) {
 745        goto out;
 746    }
 747
 748    s->snap = g_strdup(opts->snapshot);
 749    s->image_name = g_strdup(opts->image);
 750
 751    /* rbd_open is always r/w */
 752    r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
 753    if (r < 0) {
 754        error_setg_errno(errp, -r, "error reading header from %s",
 755                         s->image_name);
 756        goto failed_open;
 757    }
 758
 759    r = rbd_get_size(s->image, &s->image_size);
 760    if (r < 0) {
 761        error_setg_errno(errp, -r, "error getting image size from %s",
 762                         s->image_name);
 763        rbd_close(s->image);
 764        goto failed_open;
 765    }
 766
 767    /* If we are using an rbd snapshot, we must be r/o, otherwise
 768     * leave as-is */
 769    if (s->snap != NULL) {
 770        r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
 771        if (r < 0) {
 772            rbd_close(s->image);
 773            goto failed_open;
 774        }
 775    }
 776
 777    /* When extending regular files, we get zeros from the OS */
 778    bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
 779
 780    r = 0;
 781    goto out;
 782
 783failed_open:
 784    rados_ioctx_destroy(s->io_ctx);
 785    g_free(s->snap);
 786    g_free(s->image_name);
 787    rados_shutdown(s->cluster);
 788out:
 789    qapi_free_BlockdevOptionsRbd(opts);
 790    g_free(keypairs);
 791    g_free(secretid);
 792    return r;
 793}
 794
 795
 796/* Since RBD is currently always opened R/W via the API,
 797 * we just need to check if we are using a snapshot or not, in
 798 * order to determine if we will allow it to be R/W */
 799static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
 800                                   BlockReopenQueue *queue, Error **errp)
 801{
 802    BDRVRBDState *s = state->bs->opaque;
 803    int ret = 0;
 804
 805    if (s->snap && state->flags & BDRV_O_RDWR) {
 806        error_setg(errp,
 807                   "Cannot change node '%s' to r/w when using RBD snapshot",
 808                   bdrv_get_device_or_node_name(state->bs));
 809        ret = -EINVAL;
 810    }
 811
 812    return ret;
 813}
 814
 815static void qemu_rbd_close(BlockDriverState *bs)
 816{
 817    BDRVRBDState *s = bs->opaque;
 818
 819    rbd_close(s->image);
 820    rados_ioctx_destroy(s->io_ctx);
 821    g_free(s->snap);
 822    g_free(s->image_name);
 823    rados_shutdown(s->cluster);
 824}
 825
 826/* Resize the RBD image and update the 'image_size' with the current size */
 827static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
 828{
 829    BDRVRBDState *s = bs->opaque;
 830    int r;
 831
 832    r = rbd_resize(s->image, size);
 833    if (r < 0) {
 834        return r;
 835    }
 836
 837    s->image_size = size;
 838
 839    return 0;
 840}
 841
 842static const AIOCBInfo rbd_aiocb_info = {
 843    .aiocb_size = sizeof(RBDAIOCB),
 844};
 845
 846static void rbd_finish_bh(void *opaque)
 847{
 848    RADOSCB *rcb = opaque;
 849    qemu_rbd_complete_aio(rcb);
 850}
 851
 852/*
 853 * This is the callback function for rbd_aio_read and _write
 854 *
 855 * Note: this function is being called from a non qemu thread so
 856 * we need to be careful about what we do here. Generally we only
 857 * schedule a BH, and do the rest of the io completion handling
 858 * from rbd_finish_bh() which runs in a qemu context.
 859 */
 860static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 861{
 862    RBDAIOCB *acb = rcb->acb;
 863
 864    rcb->ret = rbd_aio_get_return_value(c);
 865    rbd_aio_release(c);
 866
 867    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
 868                                     rbd_finish_bh, rcb);
 869}
 870
 871static int rbd_aio_discard_wrapper(rbd_image_t image,
 872                                   uint64_t off,
 873                                   uint64_t len,
 874                                   rbd_completion_t comp)
 875{
 876#ifdef LIBRBD_SUPPORTS_DISCARD
 877    return rbd_aio_discard(image, off, len, comp);
 878#else
 879    return -ENOTSUP;
 880#endif
 881}
 882
 883static int rbd_aio_flush_wrapper(rbd_image_t image,
 884                                 rbd_completion_t comp)
 885{
 886#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 887    return rbd_aio_flush(image, comp);
 888#else
 889    return -ENOTSUP;
 890#endif
 891}
 892
 893static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 894                                 int64_t off,
 895                                 QEMUIOVector *qiov,
 896                                 int64_t size,
 897                                 BlockCompletionFunc *cb,
 898                                 void *opaque,
 899                                 RBDAIOCmd cmd)
 900{
 901    RBDAIOCB *acb;
 902    RADOSCB *rcb = NULL;
 903    rbd_completion_t c;
 904    int r;
 905
 906    BDRVRBDState *s = bs->opaque;
 907
 908    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
 909    acb->cmd = cmd;
 910    acb->qiov = qiov;
 911    assert(!qiov || qiov->size == size);
 912
 913    rcb = g_new(RADOSCB, 1);
 914
 915    if (!LIBRBD_USE_IOVEC) {
 916        if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 917            acb->bounce = NULL;
 918        } else {
 919            acb->bounce = qemu_try_blockalign(bs, qiov->size);
 920            if (acb->bounce == NULL) {
 921                goto failed;
 922            }
 923        }
 924        if (cmd == RBD_AIO_WRITE) {
 925            qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 926        }
 927        rcb->buf = acb->bounce;
 928    }
 929
 930    acb->ret = 0;
 931    acb->error = 0;
 932    acb->s = s;
 933
 934    rcb->acb = acb;
 935    rcb->s = acb->s;
 936    rcb->size = size;
 937    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
 938    if (r < 0) {
 939        goto failed;
 940    }
 941
 942    switch (cmd) {
 943    case RBD_AIO_WRITE: {
 944        /*
 945         * RBD APIs don't allow us to write more than actual size, so in order
 946         * to support growing images, we resize the image before write
 947         * operations that exceed the current size.
 948         */
 949        if (off + size > s->image_size) {
 950            r = qemu_rbd_resize(bs, off + size);
 951            if (r < 0) {
 952                goto failed_completion;
 953            }
 954        }
 955#ifdef LIBRBD_SUPPORTS_IOVEC
 956            r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
 957#else
 958            r = rbd_aio_write(s->image, off, size, rcb->buf, c);
 959#endif
 960        break;
 961    }
 962    case RBD_AIO_READ:
 963#ifdef LIBRBD_SUPPORTS_IOVEC
 964            r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
 965#else
 966            r = rbd_aio_read(s->image, off, size, rcb->buf, c);
 967#endif
 968        break;
 969    case RBD_AIO_DISCARD:
 970        r = rbd_aio_discard_wrapper(s->image, off, size, c);
 971        break;
 972    case RBD_AIO_FLUSH:
 973        r = rbd_aio_flush_wrapper(s->image, c);
 974        break;
 975    default:
 976        r = -EINVAL;
 977    }
 978
 979    if (r < 0) {
 980        goto failed_completion;
 981    }
 982    return &acb->common;
 983
 984failed_completion:
 985    rbd_aio_release(c);
 986failed:
 987    g_free(rcb);
 988    if (!LIBRBD_USE_IOVEC) {
 989        qemu_vfree(acb->bounce);
 990    }
 991
 992    qemu_aio_unref(acb);
 993    return NULL;
 994}
 995
 996static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
 997                                       uint64_t offset, uint64_t bytes,
 998                                       QEMUIOVector *qiov, int flags,
 999                                       BlockCompletionFunc *cb,
1000                                       void *opaque)
1001{
1002    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1003                         RBD_AIO_READ);
1004}
1005
1006static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
1007                                        uint64_t offset, uint64_t bytes,
1008                                        QEMUIOVector *qiov, int flags,
1009                                        BlockCompletionFunc *cb,
1010                                        void *opaque)
1011{
1012    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1013                         RBD_AIO_WRITE);
1014}
1015
1016#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1017static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
1018                                      BlockCompletionFunc *cb,
1019                                      void *opaque)
1020{
1021    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
1022}
1023
1024#else
1025
1026static int qemu_rbd_co_flush(BlockDriverState *bs)
1027{
1028#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
1029    /* rbd_flush added in 0.1.1 */
1030    BDRVRBDState *s = bs->opaque;
1031    return rbd_flush(s->image);
1032#else
1033    return 0;
1034#endif
1035}
1036#endif
1037
1038static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1039{
1040    BDRVRBDState *s = bs->opaque;
1041    rbd_image_info_t info;
1042    int r;
1043
1044    r = rbd_stat(s->image, &info, sizeof(info));
1045    if (r < 0) {
1046        return r;
1047    }
1048
1049    bdi->cluster_size = info.obj_size;
1050    return 0;
1051}
1052
1053static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1054{
1055    BDRVRBDState *s = bs->opaque;
1056    rbd_image_info_t info;
1057    int r;
1058
1059    r = rbd_stat(s->image, &info, sizeof(info));
1060    if (r < 0) {
1061        return r;
1062    }
1063
1064    return info.size;
1065}
1066
1067static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1068                                             int64_t offset,
1069                                             bool exact,
1070                                             PreallocMode prealloc,
1071                                             BdrvRequestFlags flags,
1072                                             Error **errp)
1073{
1074    int r;
1075
1076    if (prealloc != PREALLOC_MODE_OFF) {
1077        error_setg(errp, "Unsupported preallocation mode '%s'",
1078                   PreallocMode_str(prealloc));
1079        return -ENOTSUP;
1080    }
1081
1082    r = qemu_rbd_resize(bs, offset);
1083    if (r < 0) {
1084        error_setg_errno(errp, -r, "Failed to resize file");
1085        return r;
1086    }
1087
1088    return 0;
1089}
1090
1091static int qemu_rbd_snap_create(BlockDriverState *bs,
1092                                QEMUSnapshotInfo *sn_info)
1093{
1094    BDRVRBDState *s = bs->opaque;
1095    int r;
1096
1097    if (sn_info->name[0] == '\0') {
1098        return -EINVAL; /* we need a name for rbd snapshots */
1099    }
1100
1101    /*
1102     * rbd snapshots are using the name as the user controlled unique identifier
1103     * we can't use the rbd snapid for that purpose, as it can't be set
1104     */
1105    if (sn_info->id_str[0] != '\0' &&
1106        strcmp(sn_info->id_str, sn_info->name) != 0) {
1107        return -EINVAL;
1108    }
1109
1110    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1111        return -ERANGE;
1112    }
1113
1114    r = rbd_snap_create(s->image, sn_info->name);
1115    if (r < 0) {
1116        error_report("failed to create snap: %s", strerror(-r));
1117        return r;
1118    }
1119
1120    return 0;
1121}
1122
1123static int qemu_rbd_snap_remove(BlockDriverState *bs,
1124                                const char *snapshot_id,
1125                                const char *snapshot_name,
1126                                Error **errp)
1127{
1128    BDRVRBDState *s = bs->opaque;
1129    int r;
1130
1131    if (!snapshot_name) {
1132        error_setg(errp, "rbd need a valid snapshot name");
1133        return -EINVAL;
1134    }
1135
1136    /* If snapshot_id is specified, it must be equal to name, see
1137       qemu_rbd_snap_list() */
1138    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1139        error_setg(errp,
1140                   "rbd do not support snapshot id, it should be NULL or "
1141                   "equal to snapshot name");
1142        return -EINVAL;
1143    }
1144
1145    r = rbd_snap_remove(s->image, snapshot_name);
1146    if (r < 0) {
1147        error_setg_errno(errp, -r, "Failed to remove the snapshot");
1148    }
1149    return r;
1150}
1151
1152static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1153                                  const char *snapshot_name)
1154{
1155    BDRVRBDState *s = bs->opaque;
1156
1157    return rbd_snap_rollback(s->image, snapshot_name);
1158}
1159
1160static int qemu_rbd_snap_list(BlockDriverState *bs,
1161                              QEMUSnapshotInfo **psn_tab)
1162{
1163    BDRVRBDState *s = bs->opaque;
1164    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1165    int i, snap_count;
1166    rbd_snap_info_t *snaps;
1167    int max_snaps = RBD_MAX_SNAPS;
1168
1169    do {
1170        snaps = g_new(rbd_snap_info_t, max_snaps);
1171        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1172        if (snap_count <= 0) {
1173            g_free(snaps);
1174        }
1175    } while (snap_count == -ERANGE);
1176
1177    if (snap_count <= 0) {
1178        goto done;
1179    }
1180
1181    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1182
1183    for (i = 0; i < snap_count; i++) {
1184        const char *snap_name = snaps[i].name;
1185
1186        sn_info = sn_tab + i;
1187        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1188        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1189
1190        sn_info->vm_state_size = snaps[i].size;
1191        sn_info->date_sec = 0;
1192        sn_info->date_nsec = 0;
1193        sn_info->vm_clock_nsec = 0;
1194    }
1195    rbd_snap_list_end(snaps);
1196    g_free(snaps);
1197
1198 done:
1199    *psn_tab = sn_tab;
1200    return snap_count;
1201}
1202
1203#ifdef LIBRBD_SUPPORTS_DISCARD
1204static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1205                                         int64_t offset,
1206                                         int bytes,
1207                                         BlockCompletionFunc *cb,
1208                                         void *opaque)
1209{
1210    return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
1211                         RBD_AIO_DISCARD);
1212}
1213#endif
1214
1215#ifdef LIBRBD_SUPPORTS_INVALIDATE
1216static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1217                                                      Error **errp)
1218{
1219    BDRVRBDState *s = bs->opaque;
1220    int r = rbd_invalidate_cache(s->image);
1221    if (r < 0) {
1222        error_setg_errno(errp, -r, "Failed to invalidate the cache");
1223    }
1224}
1225#endif
1226
1227static QemuOptsList qemu_rbd_create_opts = {
1228    .name = "rbd-create-opts",
1229    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1230    .desc = {
1231        {
1232            .name = BLOCK_OPT_SIZE,
1233            .type = QEMU_OPT_SIZE,
1234            .help = "Virtual disk size"
1235        },
1236        {
1237            .name = BLOCK_OPT_CLUSTER_SIZE,
1238            .type = QEMU_OPT_SIZE,
1239            .help = "RBD object size"
1240        },
1241        {
1242            .name = "password-secret",
1243            .type = QEMU_OPT_STRING,
1244            .help = "ID of secret providing the password",
1245        },
1246        { /* end of list */ }
1247    }
1248};
1249
1250static const char *const qemu_rbd_strong_runtime_opts[] = {
1251    "pool",
1252    "namespace",
1253    "image",
1254    "conf",
1255    "snapshot",
1256    "user",
1257    "server.",
1258    "password-secret",
1259
1260    NULL
1261};
1262
1263static BlockDriver bdrv_rbd = {
1264    .format_name            = "rbd",
1265    .instance_size          = sizeof(BDRVRBDState),
1266    .bdrv_parse_filename    = qemu_rbd_parse_filename,
1267    .bdrv_refresh_limits    = qemu_rbd_refresh_limits,
1268    .bdrv_file_open         = qemu_rbd_open,
1269    .bdrv_close             = qemu_rbd_close,
1270    .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1271    .bdrv_co_create         = qemu_rbd_co_create,
1272    .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1273    .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1274    .bdrv_get_info          = qemu_rbd_getinfo,
1275    .create_opts            = &qemu_rbd_create_opts,
1276    .bdrv_getlength         = qemu_rbd_getlength,
1277    .bdrv_co_truncate       = qemu_rbd_co_truncate,
1278    .protocol_name          = "rbd",
1279
1280    .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
1281    .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
1282
1283#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1284    .bdrv_aio_flush         = qemu_rbd_aio_flush,
1285#else
1286    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1287#endif
1288
1289#ifdef LIBRBD_SUPPORTS_DISCARD
1290    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1291#endif
1292
1293    .bdrv_snapshot_create   = qemu_rbd_snap_create,
1294    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1295    .bdrv_snapshot_list     = qemu_rbd_snap_list,
1296    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1297#ifdef LIBRBD_SUPPORTS_INVALIDATE
1298    .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1299#endif
1300
1301    .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1302};
1303
1304static void bdrv_rbd_init(void)
1305{
1306    bdrv_register(&bdrv_rbd);
1307}
1308
1309block_init(bdrv_rbd_init);
1310