qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include "qemu/osdep.h"
  15
  16#include "qapi/error.h"
  17#include "qemu/error-report.h"
  18#include "block/block_int.h"
  19#include "crypto/secret.h"
  20#include "qemu/cutils.h"
  21
  22#include <rbd/librbd.h>
  23
  24/*
  25 * When specifying the image filename use:
  26 *
  27 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  28 *
  29 * poolname must be the name of an existing rados pool.
  30 *
  31 * devicename is the name of the rbd image.
  32 *
  33 * Each option given is used to configure rados, and may be any valid
  34 * Ceph option, "id", or "conf".
  35 *
  36 * The "id" option indicates what user we should authenticate as to
  37 * the Ceph cluster.  If it is excluded we will use the Ceph default
  38 * (normally 'admin').
  39 *
  40 * The "conf" option specifies a Ceph configuration file to read.  If
  41 * it is not specified, we will read from the default Ceph locations
  42 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  43 * file, specify conf=/dev/null.
  44 *
  45 * Configuration values containing :, @, or = can be escaped with a
  46 * leading "\".
  47 */
  48
  49/* rbd_aio_discard added in 0.1.2 */
  50#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
  51#define LIBRBD_SUPPORTS_DISCARD
  52#else
  53#undef LIBRBD_SUPPORTS_DISCARD
  54#endif
  55
  56#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  57
  58#define RBD_MAX_CONF_NAME_SIZE 128
  59#define RBD_MAX_CONF_VAL_SIZE 512
  60#define RBD_MAX_CONF_SIZE 1024
  61#define RBD_MAX_POOL_NAME_SIZE 128
  62#define RBD_MAX_SNAP_NAME_SIZE 128
  63#define RBD_MAX_SNAPS 100
  64
  65typedef enum {
  66    RBD_AIO_READ,
  67    RBD_AIO_WRITE,
  68    RBD_AIO_DISCARD,
  69    RBD_AIO_FLUSH
  70} RBDAIOCmd;
  71
  72typedef struct RBDAIOCB {
  73    BlockAIOCB common;
  74    int64_t ret;
  75    QEMUIOVector *qiov;
  76    char *bounce;
  77    RBDAIOCmd cmd;
  78    int error;
  79    struct BDRVRBDState *s;
  80} RBDAIOCB;
  81
  82typedef struct RADOSCB {
  83    RBDAIOCB *acb;
  84    struct BDRVRBDState *s;
  85    int64_t size;
  86    char *buf;
  87    int64_t ret;
  88} RADOSCB;
  89
  90typedef struct BDRVRBDState {
  91    rados_t cluster;
  92    rados_ioctx_t io_ctx;
  93    rbd_image_t image;
  94    char name[RBD_MAX_IMAGE_NAME_SIZE];
  95    char *snap;
  96} BDRVRBDState;
  97
  98static int qemu_rbd_next_tok(char *dst, int dst_len,
  99                             char *src, char delim,
 100                             const char *name,
 101                             char **p, Error **errp)
 102{
 103    int l;
 104    char *end;
 105
 106    *p = NULL;
 107
 108    if (delim != '\0') {
 109        for (end = src; *end; ++end) {
 110            if (*end == delim) {
 111                break;
 112            }
 113            if (*end == '\\' && end[1] != '\0') {
 114                end++;
 115            }
 116        }
 117        if (*end == delim) {
 118            *p = end + 1;
 119            *end = '\0';
 120        }
 121    }
 122    l = strlen(src);
 123    if (l >= dst_len) {
 124        error_setg(errp, "%s too long", name);
 125        return -EINVAL;
 126    } else if (l == 0) {
 127        error_setg(errp, "%s too short", name);
 128        return -EINVAL;
 129    }
 130
 131    pstrcpy(dst, dst_len, src);
 132
 133    return 0;
 134}
 135
 136static void qemu_rbd_unescape(char *src)
 137{
 138    char *p;
 139
 140    for (p = src; *src; ++src, ++p) {
 141        if (*src == '\\' && src[1] != '\0') {
 142            src++;
 143        }
 144        *p = *src;
 145    }
 146    *p = '\0';
 147}
 148
 149static int qemu_rbd_parsename(const char *filename,
 150                              char *pool, int pool_len,
 151                              char *snap, int snap_len,
 152                              char *name, int name_len,
 153                              char *conf, int conf_len,
 154                              Error **errp)
 155{
 156    const char *start;
 157    char *p, *buf;
 158    int ret;
 159
 160    if (!strstart(filename, "rbd:", &start)) {
 161        error_setg(errp, "File name must start with 'rbd:'");
 162        return -EINVAL;
 163    }
 164
 165    buf = g_strdup(start);
 166    p = buf;
 167    *snap = '\0';
 168    *conf = '\0';
 169
 170    ret = qemu_rbd_next_tok(pool, pool_len, p,
 171                            '/', "pool name", &p, errp);
 172    if (ret < 0 || !p) {
 173        ret = -EINVAL;
 174        goto done;
 175    }
 176    qemu_rbd_unescape(pool);
 177
 178    if (strchr(p, '@')) {
 179        ret = qemu_rbd_next_tok(name, name_len, p,
 180                                '@', "object name", &p, errp);
 181        if (ret < 0) {
 182            goto done;
 183        }
 184        ret = qemu_rbd_next_tok(snap, snap_len, p,
 185                                ':', "snap name", &p, errp);
 186        qemu_rbd_unescape(snap);
 187    } else {
 188        ret = qemu_rbd_next_tok(name, name_len, p,
 189                                ':', "object name", &p, errp);
 190    }
 191    qemu_rbd_unescape(name);
 192    if (ret < 0 || !p) {
 193        goto done;
 194    }
 195
 196    ret = qemu_rbd_next_tok(conf, conf_len, p,
 197                            '\0', "configuration", &p, errp);
 198
 199done:
 200    g_free(buf);
 201    return ret;
 202}
 203
 204static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
 205{
 206    const char *p = conf;
 207
 208    while (*p) {
 209        int len;
 210        const char *end = strchr(p, ':');
 211
 212        if (end) {
 213            len = end - p;
 214        } else {
 215            len = strlen(p);
 216        }
 217
 218        if (strncmp(p, "id=", 3) == 0) {
 219            len -= 3;
 220            strncpy(clientname, p + 3, len);
 221            clientname[len] = '\0';
 222            return clientname;
 223        }
 224        if (end == NULL) {
 225            break;
 226        }
 227        p = end + 1;
 228    }
 229    return NULL;
 230}
 231
 232
 233static int qemu_rbd_set_auth(rados_t cluster, const char *secretid,
 234                             Error **errp)
 235{
 236    if (secretid == 0) {
 237        return 0;
 238    }
 239
 240    gchar *secret = qcrypto_secret_lookup_as_base64(secretid,
 241                                                    errp);
 242    if (!secret) {
 243        return -1;
 244    }
 245
 246    rados_conf_set(cluster, "key", secret);
 247    g_free(secret);
 248
 249    return 0;
 250}
 251
 252
 253static int qemu_rbd_set_conf(rados_t cluster, const char *conf,
 254                             bool only_read_conf_file,
 255                             Error **errp)
 256{
 257    char *p, *buf;
 258    char name[RBD_MAX_CONF_NAME_SIZE];
 259    char value[RBD_MAX_CONF_VAL_SIZE];
 260    int ret = 0;
 261
 262    buf = g_strdup(conf);
 263    p = buf;
 264
 265    while (p) {
 266        ret = qemu_rbd_next_tok(name, sizeof(name), p,
 267                                '=', "conf option name", &p, errp);
 268        if (ret < 0) {
 269            break;
 270        }
 271        qemu_rbd_unescape(name);
 272
 273        if (!p) {
 274            error_setg(errp, "conf option %s has no value", name);
 275            ret = -EINVAL;
 276            break;
 277        }
 278
 279        ret = qemu_rbd_next_tok(value, sizeof(value), p,
 280                                ':', "conf option value", &p, errp);
 281        if (ret < 0) {
 282            break;
 283        }
 284        qemu_rbd_unescape(value);
 285
 286        if (strcmp(name, "conf") == 0) {
 287            /* read the conf file alone, so it doesn't override more
 288               specific settings for a particular device */
 289            if (only_read_conf_file) {
 290                ret = rados_conf_read_file(cluster, value);
 291                if (ret < 0) {
 292                    error_setg_errno(errp, -ret, "error reading conf file %s",
 293                                     value);
 294                    break;
 295                }
 296            }
 297        } else if (strcmp(name, "id") == 0) {
 298            /* ignore, this is parsed by qemu_rbd_parse_clientname() */
 299        } else if (!only_read_conf_file) {
 300            ret = rados_conf_set(cluster, name, value);
 301            if (ret < 0) {
 302                error_setg_errno(errp, -ret, "invalid conf option %s", name);
 303                ret = -EINVAL;
 304                break;
 305            }
 306        }
 307    }
 308
 309    g_free(buf);
 310    return ret;
 311}
 312
 313static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
 314{
 315    Error *local_err = NULL;
 316    int64_t bytes = 0;
 317    int64_t objsize;
 318    int obj_order = 0;
 319    char pool[RBD_MAX_POOL_NAME_SIZE];
 320    char name[RBD_MAX_IMAGE_NAME_SIZE];
 321    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
 322    char conf[RBD_MAX_CONF_SIZE];
 323    char clientname_buf[RBD_MAX_CONF_SIZE];
 324    char *clientname;
 325    const char *secretid;
 326    rados_t cluster;
 327    rados_ioctx_t io_ctx;
 328    int ret;
 329
 330    secretid = qemu_opt_get(opts, "password-secret");
 331
 332    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
 333                           snap_buf, sizeof(snap_buf),
 334                           name, sizeof(name),
 335                           conf, sizeof(conf), &local_err) < 0) {
 336        error_propagate(errp, local_err);
 337        return -EINVAL;
 338    }
 339
 340    /* Read out options */
 341    bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
 342                     BDRV_SECTOR_SIZE);
 343    objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0);
 344    if (objsize) {
 345        if ((objsize - 1) & objsize) {    /* not a power of 2? */
 346            error_setg(errp, "obj size needs to be power of 2");
 347            return -EINVAL;
 348        }
 349        if (objsize < 4096) {
 350            error_setg(errp, "obj size too small");
 351            return -EINVAL;
 352        }
 353        obj_order = ctz32(objsize);
 354    }
 355
 356    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 357    ret = rados_create(&cluster, clientname);
 358    if (ret < 0) {
 359        error_setg_errno(errp, -ret, "error initializing");
 360        return ret;
 361    }
 362
 363    if (strstr(conf, "conf=") == NULL) {
 364        /* try default location, but ignore failure */
 365        rados_conf_read_file(cluster, NULL);
 366    } else if (conf[0] != '\0' &&
 367               qemu_rbd_set_conf(cluster, conf, true, &local_err) < 0) {
 368        error_propagate(errp, local_err);
 369        ret = -EIO;
 370        goto shutdown;
 371    }
 372
 373    if (conf[0] != '\0' &&
 374        qemu_rbd_set_conf(cluster, conf, false, &local_err) < 0) {
 375        error_propagate(errp, local_err);
 376        ret = -EIO;
 377        goto shutdown;
 378    }
 379
 380    if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) {
 381        ret = -EIO;
 382        goto shutdown;
 383    }
 384
 385    ret = rados_connect(cluster);
 386    if (ret < 0) {
 387        error_setg_errno(errp, -ret, "error connecting");
 388        goto shutdown;
 389    }
 390
 391    ret = rados_ioctx_create(cluster, pool, &io_ctx);
 392    if (ret < 0) {
 393        error_setg_errno(errp, -ret, "error opening pool %s", pool);
 394        goto shutdown;
 395    }
 396
 397    ret = rbd_create(io_ctx, name, bytes, &obj_order);
 398    if (ret < 0) {
 399        error_setg_errno(errp, -ret, "error rbd create");
 400    }
 401
 402    rados_ioctx_destroy(io_ctx);
 403
 404shutdown:
 405    rados_shutdown(cluster);
 406    return ret;
 407}
 408
 409/*
 410 * This aio completion is being called from rbd_finish_bh() and runs in qemu
 411 * BH context.
 412 */
 413static void qemu_rbd_complete_aio(RADOSCB *rcb)
 414{
 415    RBDAIOCB *acb = rcb->acb;
 416    int64_t r;
 417
 418    r = rcb->ret;
 419
 420    if (acb->cmd != RBD_AIO_READ) {
 421        if (r < 0) {
 422            acb->ret = r;
 423            acb->error = 1;
 424        } else if (!acb->error) {
 425            acb->ret = rcb->size;
 426        }
 427    } else {
 428        if (r < 0) {
 429            memset(rcb->buf, 0, rcb->size);
 430            acb->ret = r;
 431            acb->error = 1;
 432        } else if (r < rcb->size) {
 433            memset(rcb->buf + r, 0, rcb->size - r);
 434            if (!acb->error) {
 435                acb->ret = rcb->size;
 436            }
 437        } else if (!acb->error) {
 438            acb->ret = r;
 439        }
 440    }
 441
 442    g_free(rcb);
 443
 444    if (acb->cmd == RBD_AIO_READ) {
 445        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 446    }
 447    qemu_vfree(acb->bounce);
 448    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 449
 450    qemu_aio_unref(acb);
 451}
 452
 453/* TODO Convert to fine grained options */
 454static QemuOptsList runtime_opts = {
 455    .name = "rbd",
 456    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
 457    .desc = {
 458        {
 459            .name = "filename",
 460            .type = QEMU_OPT_STRING,
 461            .help = "Specification of the rbd image",
 462        },
 463        {
 464            .name = "password-secret",
 465            .type = QEMU_OPT_STRING,
 466            .help = "ID of secret providing the password",
 467        },
 468        { /* end of list */ }
 469    },
 470};
 471
 472static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
 473                         Error **errp)
 474{
 475    BDRVRBDState *s = bs->opaque;
 476    char pool[RBD_MAX_POOL_NAME_SIZE];
 477    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
 478    char conf[RBD_MAX_CONF_SIZE];
 479    char clientname_buf[RBD_MAX_CONF_SIZE];
 480    char *clientname;
 481    const char *secretid;
 482    QemuOpts *opts;
 483    Error *local_err = NULL;
 484    const char *filename;
 485    int r;
 486
 487    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
 488    qemu_opts_absorb_qdict(opts, options, &local_err);
 489    if (local_err) {
 490        error_propagate(errp, local_err);
 491        qemu_opts_del(opts);
 492        return -EINVAL;
 493    }
 494
 495    filename = qemu_opt_get(opts, "filename");
 496    secretid = qemu_opt_get(opts, "password-secret");
 497
 498    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
 499                           snap_buf, sizeof(snap_buf),
 500                           s->name, sizeof(s->name),
 501                           conf, sizeof(conf), errp) < 0) {
 502        r = -EINVAL;
 503        goto failed_opts;
 504    }
 505
 506    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 507    r = rados_create(&s->cluster, clientname);
 508    if (r < 0) {
 509        error_setg_errno(errp, -r, "error initializing");
 510        goto failed_opts;
 511    }
 512
 513    s->snap = NULL;
 514    if (snap_buf[0] != '\0') {
 515        s->snap = g_strdup(snap_buf);
 516    }
 517
 518    if (strstr(conf, "conf=") == NULL) {
 519        /* try default location, but ignore failure */
 520        rados_conf_read_file(s->cluster, NULL);
 521    } else if (conf[0] != '\0') {
 522        r = qemu_rbd_set_conf(s->cluster, conf, true, errp);
 523        if (r < 0) {
 524            goto failed_shutdown;
 525        }
 526    }
 527
 528    if (conf[0] != '\0') {
 529        r = qemu_rbd_set_conf(s->cluster, conf, false, errp);
 530        if (r < 0) {
 531            goto failed_shutdown;
 532        }
 533    }
 534
 535    if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) {
 536        r = -EIO;
 537        goto failed_shutdown;
 538    }
 539
 540    /*
 541     * Fallback to more conservative semantics if setting cache
 542     * options fails. Ignore errors from setting rbd_cache because the
 543     * only possible error is that the option does not exist, and
 544     * librbd defaults to no caching. If write through caching cannot
 545     * be set up, fall back to no caching.
 546     */
 547    if (flags & BDRV_O_NOCACHE) {
 548        rados_conf_set(s->cluster, "rbd_cache", "false");
 549    } else {
 550        rados_conf_set(s->cluster, "rbd_cache", "true");
 551    }
 552
 553    r = rados_connect(s->cluster);
 554    if (r < 0) {
 555        error_setg_errno(errp, -r, "error connecting");
 556        goto failed_shutdown;
 557    }
 558
 559    r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
 560    if (r < 0) {
 561        error_setg_errno(errp, -r, "error opening pool %s", pool);
 562        goto failed_shutdown;
 563    }
 564
 565    r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
 566    if (r < 0) {
 567        error_setg_errno(errp, -r, "error reading header from %s", s->name);
 568        goto failed_open;
 569    }
 570
 571    bs->read_only = (s->snap != NULL);
 572
 573    qemu_opts_del(opts);
 574    return 0;
 575
 576failed_open:
 577    rados_ioctx_destroy(s->io_ctx);
 578failed_shutdown:
 579    rados_shutdown(s->cluster);
 580    g_free(s->snap);
 581failed_opts:
 582    qemu_opts_del(opts);
 583    return r;
 584}
 585
 586static void qemu_rbd_close(BlockDriverState *bs)
 587{
 588    BDRVRBDState *s = bs->opaque;
 589
 590    rbd_close(s->image);
 591    rados_ioctx_destroy(s->io_ctx);
 592    g_free(s->snap);
 593    rados_shutdown(s->cluster);
 594}
 595
 596static const AIOCBInfo rbd_aiocb_info = {
 597    .aiocb_size = sizeof(RBDAIOCB),
 598};
 599
 600static void rbd_finish_bh(void *opaque)
 601{
 602    RADOSCB *rcb = opaque;
 603    qemu_rbd_complete_aio(rcb);
 604}
 605
 606/*
 607 * This is the callback function for rbd_aio_read and _write
 608 *
 609 * Note: this function is being called from a non qemu thread so
 610 * we need to be careful about what we do here. Generally we only
 611 * schedule a BH, and do the rest of the io completion handling
 612 * from rbd_finish_bh() which runs in a qemu context.
 613 */
 614static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 615{
 616    RBDAIOCB *acb = rcb->acb;
 617
 618    rcb->ret = rbd_aio_get_return_value(c);
 619    rbd_aio_release(c);
 620
 621    aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
 622                            rbd_finish_bh, rcb);
 623}
 624
 625static int rbd_aio_discard_wrapper(rbd_image_t image,
 626                                   uint64_t off,
 627                                   uint64_t len,
 628                                   rbd_completion_t comp)
 629{
 630#ifdef LIBRBD_SUPPORTS_DISCARD
 631    return rbd_aio_discard(image, off, len, comp);
 632#else
 633    return -ENOTSUP;
 634#endif
 635}
 636
 637static int rbd_aio_flush_wrapper(rbd_image_t image,
 638                                 rbd_completion_t comp)
 639{
 640#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 641    return rbd_aio_flush(image, comp);
 642#else
 643    return -ENOTSUP;
 644#endif
 645}
 646
 647static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 648                                 int64_t off,
 649                                 QEMUIOVector *qiov,
 650                                 int64_t size,
 651                                 BlockCompletionFunc *cb,
 652                                 void *opaque,
 653                                 RBDAIOCmd cmd)
 654{
 655    RBDAIOCB *acb;
 656    RADOSCB *rcb = NULL;
 657    rbd_completion_t c;
 658    char *buf;
 659    int r;
 660
 661    BDRVRBDState *s = bs->opaque;
 662
 663    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
 664    acb->cmd = cmd;
 665    acb->qiov = qiov;
 666    assert(!qiov || qiov->size == size);
 667    if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 668        acb->bounce = NULL;
 669    } else {
 670        acb->bounce = qemu_try_blockalign(bs, qiov->size);
 671        if (acb->bounce == NULL) {
 672            goto failed;
 673        }
 674    }
 675    acb->ret = 0;
 676    acb->error = 0;
 677    acb->s = s;
 678
 679    if (cmd == RBD_AIO_WRITE) {
 680        qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 681    }
 682
 683    buf = acb->bounce;
 684
 685    rcb = g_new(RADOSCB, 1);
 686    rcb->acb = acb;
 687    rcb->buf = buf;
 688    rcb->s = acb->s;
 689    rcb->size = size;
 690    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
 691    if (r < 0) {
 692        goto failed;
 693    }
 694
 695    switch (cmd) {
 696    case RBD_AIO_WRITE:
 697        r = rbd_aio_write(s->image, off, size, buf, c);
 698        break;
 699    case RBD_AIO_READ:
 700        r = rbd_aio_read(s->image, off, size, buf, c);
 701        break;
 702    case RBD_AIO_DISCARD:
 703        r = rbd_aio_discard_wrapper(s->image, off, size, c);
 704        break;
 705    case RBD_AIO_FLUSH:
 706        r = rbd_aio_flush_wrapper(s->image, c);
 707        break;
 708    default:
 709        r = -EINVAL;
 710    }
 711
 712    if (r < 0) {
 713        goto failed_completion;
 714    }
 715
 716    return &acb->common;
 717
 718failed_completion:
 719    rbd_aio_release(c);
 720failed:
 721    g_free(rcb);
 722    qemu_vfree(acb->bounce);
 723    qemu_aio_unref(acb);
 724    return NULL;
 725}
 726
 727static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
 728                                      int64_t sector_num,
 729                                      QEMUIOVector *qiov,
 730                                      int nb_sectors,
 731                                      BlockCompletionFunc *cb,
 732                                      void *opaque)
 733{
 734    return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
 735                         (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
 736                         RBD_AIO_READ);
 737}
 738
 739static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
 740                                       int64_t sector_num,
 741                                       QEMUIOVector *qiov,
 742                                       int nb_sectors,
 743                                       BlockCompletionFunc *cb,
 744                                       void *opaque)
 745{
 746    return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
 747                         (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
 748                         RBD_AIO_WRITE);
 749}
 750
 751#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 752static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
 753                                      BlockCompletionFunc *cb,
 754                                      void *opaque)
 755{
 756    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
 757}
 758
 759#else
 760
 761static int qemu_rbd_co_flush(BlockDriverState *bs)
 762{
 763#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
 764    /* rbd_flush added in 0.1.1 */
 765    BDRVRBDState *s = bs->opaque;
 766    return rbd_flush(s->image);
 767#else
 768    return 0;
 769#endif
 770}
 771#endif
 772
 773static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
 774{
 775    BDRVRBDState *s = bs->opaque;
 776    rbd_image_info_t info;
 777    int r;
 778
 779    r = rbd_stat(s->image, &info, sizeof(info));
 780    if (r < 0) {
 781        return r;
 782    }
 783
 784    bdi->cluster_size = info.obj_size;
 785    return 0;
 786}
 787
 788static int64_t qemu_rbd_getlength(BlockDriverState *bs)
 789{
 790    BDRVRBDState *s = bs->opaque;
 791    rbd_image_info_t info;
 792    int r;
 793
 794    r = rbd_stat(s->image, &info, sizeof(info));
 795    if (r < 0) {
 796        return r;
 797    }
 798
 799    return info.size;
 800}
 801
 802static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
 803{
 804    BDRVRBDState *s = bs->opaque;
 805    int r;
 806
 807    r = rbd_resize(s->image, offset);
 808    if (r < 0) {
 809        return r;
 810    }
 811
 812    return 0;
 813}
 814
 815static int qemu_rbd_snap_create(BlockDriverState *bs,
 816                                QEMUSnapshotInfo *sn_info)
 817{
 818    BDRVRBDState *s = bs->opaque;
 819    int r;
 820
 821    if (sn_info->name[0] == '\0') {
 822        return -EINVAL; /* we need a name for rbd snapshots */
 823    }
 824
 825    /*
 826     * rbd snapshots are using the name as the user controlled unique identifier
 827     * we can't use the rbd snapid for that purpose, as it can't be set
 828     */
 829    if (sn_info->id_str[0] != '\0' &&
 830        strcmp(sn_info->id_str, sn_info->name) != 0) {
 831        return -EINVAL;
 832    }
 833
 834    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
 835        return -ERANGE;
 836    }
 837
 838    r = rbd_snap_create(s->image, sn_info->name);
 839    if (r < 0) {
 840        error_report("failed to create snap: %s", strerror(-r));
 841        return r;
 842    }
 843
 844    return 0;
 845}
 846
 847static int qemu_rbd_snap_remove(BlockDriverState *bs,
 848                                const char *snapshot_id,
 849                                const char *snapshot_name,
 850                                Error **errp)
 851{
 852    BDRVRBDState *s = bs->opaque;
 853    int r;
 854
 855    if (!snapshot_name) {
 856        error_setg(errp, "rbd need a valid snapshot name");
 857        return -EINVAL;
 858    }
 859
 860    /* If snapshot_id is specified, it must be equal to name, see
 861       qemu_rbd_snap_list() */
 862    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
 863        error_setg(errp,
 864                   "rbd do not support snapshot id, it should be NULL or "
 865                   "equal to snapshot name");
 866        return -EINVAL;
 867    }
 868
 869    r = rbd_snap_remove(s->image, snapshot_name);
 870    if (r < 0) {
 871        error_setg_errno(errp, -r, "Failed to remove the snapshot");
 872    }
 873    return r;
 874}
 875
 876static int qemu_rbd_snap_rollback(BlockDriverState *bs,
 877                                  const char *snapshot_name)
 878{
 879    BDRVRBDState *s = bs->opaque;
 880
 881    return rbd_snap_rollback(s->image, snapshot_name);
 882}
 883
 884static int qemu_rbd_snap_list(BlockDriverState *bs,
 885                              QEMUSnapshotInfo **psn_tab)
 886{
 887    BDRVRBDState *s = bs->opaque;
 888    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
 889    int i, snap_count;
 890    rbd_snap_info_t *snaps;
 891    int max_snaps = RBD_MAX_SNAPS;
 892
 893    do {
 894        snaps = g_new(rbd_snap_info_t, max_snaps);
 895        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
 896        if (snap_count <= 0) {
 897            g_free(snaps);
 898        }
 899    } while (snap_count == -ERANGE);
 900
 901    if (snap_count <= 0) {
 902        goto done;
 903    }
 904
 905    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
 906
 907    for (i = 0; i < snap_count; i++) {
 908        const char *snap_name = snaps[i].name;
 909
 910        sn_info = sn_tab + i;
 911        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
 912        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
 913
 914        sn_info->vm_state_size = snaps[i].size;
 915        sn_info->date_sec = 0;
 916        sn_info->date_nsec = 0;
 917        sn_info->vm_clock_nsec = 0;
 918    }
 919    rbd_snap_list_end(snaps);
 920    g_free(snaps);
 921
 922 done:
 923    *psn_tab = sn_tab;
 924    return snap_count;
 925}
 926
 927#ifdef LIBRBD_SUPPORTS_DISCARD
 928static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
 929                                         int64_t offset,
 930                                         int count,
 931                                         BlockCompletionFunc *cb,
 932                                         void *opaque)
 933{
 934    return rbd_start_aio(bs, offset, NULL, count, cb, opaque,
 935                         RBD_AIO_DISCARD);
 936}
 937#endif
 938
 939#ifdef LIBRBD_SUPPORTS_INVALIDATE
 940static void qemu_rbd_invalidate_cache(BlockDriverState *bs,
 941                                      Error **errp)
 942{
 943    BDRVRBDState *s = bs->opaque;
 944    int r = rbd_invalidate_cache(s->image);
 945    if (r < 0) {
 946        error_setg_errno(errp, -r, "Failed to invalidate the cache");
 947    }
 948}
 949#endif
 950
 951static QemuOptsList qemu_rbd_create_opts = {
 952    .name = "rbd-create-opts",
 953    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
 954    .desc = {
 955        {
 956            .name = BLOCK_OPT_SIZE,
 957            .type = QEMU_OPT_SIZE,
 958            .help = "Virtual disk size"
 959        },
 960        {
 961            .name = BLOCK_OPT_CLUSTER_SIZE,
 962            .type = QEMU_OPT_SIZE,
 963            .help = "RBD object size"
 964        },
 965        {
 966            .name = "password-secret",
 967            .type = QEMU_OPT_STRING,
 968            .help = "ID of secret providing the password",
 969        },
 970        { /* end of list */ }
 971    }
 972};
 973
 974static BlockDriver bdrv_rbd = {
 975    .format_name        = "rbd",
 976    .instance_size      = sizeof(BDRVRBDState),
 977    .bdrv_needs_filename = true,
 978    .bdrv_file_open     = qemu_rbd_open,
 979    .bdrv_close         = qemu_rbd_close,
 980    .bdrv_create        = qemu_rbd_create,
 981    .bdrv_has_zero_init = bdrv_has_zero_init_1,
 982    .bdrv_get_info      = qemu_rbd_getinfo,
 983    .create_opts        = &qemu_rbd_create_opts,
 984    .bdrv_getlength     = qemu_rbd_getlength,
 985    .bdrv_truncate      = qemu_rbd_truncate,
 986    .protocol_name      = "rbd",
 987
 988    .bdrv_aio_readv         = qemu_rbd_aio_readv,
 989    .bdrv_aio_writev        = qemu_rbd_aio_writev,
 990
 991#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 992    .bdrv_aio_flush         = qemu_rbd_aio_flush,
 993#else
 994    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
 995#endif
 996
 997#ifdef LIBRBD_SUPPORTS_DISCARD
 998    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
 999#endif
1000
1001    .bdrv_snapshot_create   = qemu_rbd_snap_create,
1002    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1003    .bdrv_snapshot_list     = qemu_rbd_snap_list,
1004    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1005#ifdef LIBRBD_SUPPORTS_INVALIDATE
1006    .bdrv_invalidate_cache  = qemu_rbd_invalidate_cache,
1007#endif
1008};
1009
1010static void bdrv_rbd_init(void)
1011{
1012    bdrv_register(&bdrv_rbd);
1013}
1014
1015block_init(bdrv_rbd_init);
1016