qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include <inttypes.h>
  15
  16#include "qemu-common.h"
  17#include "qemu/error-report.h"
  18#include "block/block_int.h"
  19
  20#include <rbd/librbd.h>
  21
  22/*
  23 * When specifying the image filename use:
  24 *
  25 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  26 *
  27 * poolname must be the name of an existing rados pool.
  28 *
  29 * devicename is the name of the rbd image.
  30 *
  31 * Each option given is used to configure rados, and may be any valid
  32 * Ceph option, "id", or "conf".
  33 *
  34 * The "id" option indicates what user we should authenticate as to
  35 * the Ceph cluster.  If it is excluded we will use the Ceph default
  36 * (normally 'admin').
  37 *
  38 * The "conf" option specifies a Ceph configuration file to read.  If
  39 * it is not specified, we will read from the default Ceph locations
  40 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  41 * file, specify conf=/dev/null.
  42 *
  43 * Configuration values containing :, @, or = can be escaped with a
  44 * leading "\".
  45 */
  46
  47/* rbd_aio_discard added in 0.1.2 */
  48#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
  49#define LIBRBD_SUPPORTS_DISCARD
  50#else
  51#undef LIBRBD_SUPPORTS_DISCARD
  52#endif
  53
  54#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  55
  56#define RBD_MAX_CONF_NAME_SIZE 128
  57#define RBD_MAX_CONF_VAL_SIZE 512
  58#define RBD_MAX_CONF_SIZE 1024
  59#define RBD_MAX_POOL_NAME_SIZE 128
  60#define RBD_MAX_SNAP_NAME_SIZE 128
  61#define RBD_MAX_SNAPS 100
  62
  63typedef enum {
  64    RBD_AIO_READ,
  65    RBD_AIO_WRITE,
  66    RBD_AIO_DISCARD,
  67    RBD_AIO_FLUSH
  68} RBDAIOCmd;
  69
  70typedef struct RBDAIOCB {
  71    BlockAIOCB common;
  72    QEMUBH *bh;
  73    int64_t ret;
  74    QEMUIOVector *qiov;
  75    char *bounce;
  76    RBDAIOCmd cmd;
  77    int error;
  78    struct BDRVRBDState *s;
  79} RBDAIOCB;
  80
  81typedef struct RADOSCB {
  82    RBDAIOCB *acb;
  83    struct BDRVRBDState *s;
  84    int64_t size;
  85    char *buf;
  86    int64_t ret;
  87} RADOSCB;
  88
  89typedef struct BDRVRBDState {
  90    rados_t cluster;
  91    rados_ioctx_t io_ctx;
  92    rbd_image_t image;
  93    char name[RBD_MAX_IMAGE_NAME_SIZE];
  94    char *snap;
  95} BDRVRBDState;
  96
  97static int qemu_rbd_next_tok(char *dst, int dst_len,
  98                             char *src, char delim,
  99                             const char *name,
 100                             char **p, Error **errp)
 101{
 102    int l;
 103    char *end;
 104
 105    *p = NULL;
 106
 107    if (delim != '\0') {
 108        for (end = src; *end; ++end) {
 109            if (*end == delim) {
 110                break;
 111            }
 112            if (*end == '\\' && end[1] != '\0') {
 113                end++;
 114            }
 115        }
 116        if (*end == delim) {
 117            *p = end + 1;
 118            *end = '\0';
 119        }
 120    }
 121    l = strlen(src);
 122    if (l >= dst_len) {
 123        error_setg(errp, "%s too long", name);
 124        return -EINVAL;
 125    } else if (l == 0) {
 126        error_setg(errp, "%s too short", name);
 127        return -EINVAL;
 128    }
 129
 130    pstrcpy(dst, dst_len, src);
 131
 132    return 0;
 133}
 134
 135static void qemu_rbd_unescape(char *src)
 136{
 137    char *p;
 138
 139    for (p = src; *src; ++src, ++p) {
 140        if (*src == '\\' && src[1] != '\0') {
 141            src++;
 142        }
 143        *p = *src;
 144    }
 145    *p = '\0';
 146}
 147
 148static int qemu_rbd_parsename(const char *filename,
 149                              char *pool, int pool_len,
 150                              char *snap, int snap_len,
 151                              char *name, int name_len,
 152                              char *conf, int conf_len,
 153                              Error **errp)
 154{
 155    const char *start;
 156    char *p, *buf;
 157    int ret;
 158
 159    if (!strstart(filename, "rbd:", &start)) {
 160        error_setg(errp, "File name must start with 'rbd:'");
 161        return -EINVAL;
 162    }
 163
 164    buf = g_strdup(start);
 165    p = buf;
 166    *snap = '\0';
 167    *conf = '\0';
 168
 169    ret = qemu_rbd_next_tok(pool, pool_len, p,
 170                            '/', "pool name", &p, errp);
 171    if (ret < 0 || !p) {
 172        ret = -EINVAL;
 173        goto done;
 174    }
 175    qemu_rbd_unescape(pool);
 176
 177    if (strchr(p, '@')) {
 178        ret = qemu_rbd_next_tok(name, name_len, p,
 179                                '@', "object name", &p, errp);
 180        if (ret < 0) {
 181            goto done;
 182        }
 183        ret = qemu_rbd_next_tok(snap, snap_len, p,
 184                                ':', "snap name", &p, errp);
 185        qemu_rbd_unescape(snap);
 186    } else {
 187        ret = qemu_rbd_next_tok(name, name_len, p,
 188                                ':', "object name", &p, errp);
 189    }
 190    qemu_rbd_unescape(name);
 191    if (ret < 0 || !p) {
 192        goto done;
 193    }
 194
 195    ret = qemu_rbd_next_tok(conf, conf_len, p,
 196                            '\0', "configuration", &p, errp);
 197
 198done:
 199    g_free(buf);
 200    return ret;
 201}
 202
 203static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
 204{
 205    const char *p = conf;
 206
 207    while (*p) {
 208        int len;
 209        const char *end = strchr(p, ':');
 210
 211        if (end) {
 212            len = end - p;
 213        } else {
 214            len = strlen(p);
 215        }
 216
 217        if (strncmp(p, "id=", 3) == 0) {
 218            len -= 3;
 219            strncpy(clientname, p + 3, len);
 220            clientname[len] = '\0';
 221            return clientname;
 222        }
 223        if (end == NULL) {
 224            break;
 225        }
 226        p = end + 1;
 227    }
 228    return NULL;
 229}
 230
 231static int qemu_rbd_set_conf(rados_t cluster, const char *conf,
 232                             bool only_read_conf_file,
 233                             Error **errp)
 234{
 235    char *p, *buf;
 236    char name[RBD_MAX_CONF_NAME_SIZE];
 237    char value[RBD_MAX_CONF_VAL_SIZE];
 238    int ret = 0;
 239
 240    buf = g_strdup(conf);
 241    p = buf;
 242
 243    while (p) {
 244        ret = qemu_rbd_next_tok(name, sizeof(name), p,
 245                                '=', "conf option name", &p, errp);
 246        if (ret < 0) {
 247            break;
 248        }
 249        qemu_rbd_unescape(name);
 250
 251        if (!p) {
 252            error_setg(errp, "conf option %s has no value", name);
 253            ret = -EINVAL;
 254            break;
 255        }
 256
 257        ret = qemu_rbd_next_tok(value, sizeof(value), p,
 258                                ':', "conf option value", &p, errp);
 259        if (ret < 0) {
 260            break;
 261        }
 262        qemu_rbd_unescape(value);
 263
 264        if (strcmp(name, "conf") == 0) {
 265            /* read the conf file alone, so it doesn't override more
 266               specific settings for a particular device */
 267            if (only_read_conf_file) {
 268                ret = rados_conf_read_file(cluster, value);
 269                if (ret < 0) {
 270                    error_setg(errp, "error reading conf file %s", value);
 271                    break;
 272                }
 273            }
 274        } else if (strcmp(name, "id") == 0) {
 275            /* ignore, this is parsed by qemu_rbd_parse_clientname() */
 276        } else if (!only_read_conf_file) {
 277            ret = rados_conf_set(cluster, name, value);
 278            if (ret < 0) {
 279                error_setg(errp, "invalid conf option %s", name);
 280                ret = -EINVAL;
 281                break;
 282            }
 283        }
 284    }
 285
 286    g_free(buf);
 287    return ret;
 288}
 289
 290static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
 291{
 292    Error *local_err = NULL;
 293    int64_t bytes = 0;
 294    int64_t objsize;
 295    int obj_order = 0;
 296    char pool[RBD_MAX_POOL_NAME_SIZE];
 297    char name[RBD_MAX_IMAGE_NAME_SIZE];
 298    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
 299    char conf[RBD_MAX_CONF_SIZE];
 300    char clientname_buf[RBD_MAX_CONF_SIZE];
 301    char *clientname;
 302    rados_t cluster;
 303    rados_ioctx_t io_ctx;
 304    int ret;
 305
 306    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
 307                           snap_buf, sizeof(snap_buf),
 308                           name, sizeof(name),
 309                           conf, sizeof(conf), &local_err) < 0) {
 310        error_propagate(errp, local_err);
 311        return -EINVAL;
 312    }
 313
 314    /* Read out options */
 315    bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
 316                     BDRV_SECTOR_SIZE);
 317    objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0);
 318    if (objsize) {
 319        if ((objsize - 1) & objsize) {    /* not a power of 2? */
 320            error_setg(errp, "obj size needs to be power of 2");
 321            return -EINVAL;
 322        }
 323        if (objsize < 4096) {
 324            error_setg(errp, "obj size too small");
 325            return -EINVAL;
 326        }
 327        obj_order = ctz32(objsize);
 328    }
 329
 330    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 331    if (rados_create(&cluster, clientname) < 0) {
 332        error_setg(errp, "error initializing");
 333        return -EIO;
 334    }
 335
 336    if (strstr(conf, "conf=") == NULL) {
 337        /* try default location, but ignore failure */
 338        rados_conf_read_file(cluster, NULL);
 339    } else if (conf[0] != '\0' &&
 340               qemu_rbd_set_conf(cluster, conf, true, &local_err) < 0) {
 341        rados_shutdown(cluster);
 342        error_propagate(errp, local_err);
 343        return -EIO;
 344    }
 345
 346    if (conf[0] != '\0' &&
 347        qemu_rbd_set_conf(cluster, conf, false, &local_err) < 0) {
 348        rados_shutdown(cluster);
 349        error_propagate(errp, local_err);
 350        return -EIO;
 351    }
 352
 353    if (rados_connect(cluster) < 0) {
 354        error_setg(errp, "error connecting");
 355        rados_shutdown(cluster);
 356        return -EIO;
 357    }
 358
 359    if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
 360        error_setg(errp, "error opening pool %s", pool);
 361        rados_shutdown(cluster);
 362        return -EIO;
 363    }
 364
 365    ret = rbd_create(io_ctx, name, bytes, &obj_order);
 366    rados_ioctx_destroy(io_ctx);
 367    rados_shutdown(cluster);
 368
 369    return ret;
 370}
 371
 372/*
 373 * This aio completion is being called from rbd_finish_bh() and runs in qemu
 374 * BH context.
 375 */
 376static void qemu_rbd_complete_aio(RADOSCB *rcb)
 377{
 378    RBDAIOCB *acb = rcb->acb;
 379    int64_t r;
 380
 381    r = rcb->ret;
 382
 383    if (acb->cmd != RBD_AIO_READ) {
 384        if (r < 0) {
 385            acb->ret = r;
 386            acb->error = 1;
 387        } else if (!acb->error) {
 388            acb->ret = rcb->size;
 389        }
 390    } else {
 391        if (r < 0) {
 392            memset(rcb->buf, 0, rcb->size);
 393            acb->ret = r;
 394            acb->error = 1;
 395        } else if (r < rcb->size) {
 396            memset(rcb->buf + r, 0, rcb->size - r);
 397            if (!acb->error) {
 398                acb->ret = rcb->size;
 399            }
 400        } else if (!acb->error) {
 401            acb->ret = r;
 402        }
 403    }
 404
 405    g_free(rcb);
 406
 407    if (acb->cmd == RBD_AIO_READ) {
 408        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 409    }
 410    qemu_vfree(acb->bounce);
 411    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 412
 413    qemu_aio_unref(acb);
 414}
 415
 416/* TODO Convert to fine grained options */
 417static QemuOptsList runtime_opts = {
 418    .name = "rbd",
 419    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
 420    .desc = {
 421        {
 422            .name = "filename",
 423            .type = QEMU_OPT_STRING,
 424            .help = "Specification of the rbd image",
 425        },
 426        { /* end of list */ }
 427    },
 428};
 429
 430static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
 431                         Error **errp)
 432{
 433    BDRVRBDState *s = bs->opaque;
 434    char pool[RBD_MAX_POOL_NAME_SIZE];
 435    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
 436    char conf[RBD_MAX_CONF_SIZE];
 437    char clientname_buf[RBD_MAX_CONF_SIZE];
 438    char *clientname;
 439    QemuOpts *opts;
 440    Error *local_err = NULL;
 441    const char *filename;
 442    int r;
 443
 444    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
 445    qemu_opts_absorb_qdict(opts, options, &local_err);
 446    if (local_err) {
 447        error_propagate(errp, local_err);
 448        qemu_opts_del(opts);
 449        return -EINVAL;
 450    }
 451
 452    filename = qemu_opt_get(opts, "filename");
 453
 454    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
 455                           snap_buf, sizeof(snap_buf),
 456                           s->name, sizeof(s->name),
 457                           conf, sizeof(conf), errp) < 0) {
 458        r = -EINVAL;
 459        goto failed_opts;
 460    }
 461
 462    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 463    r = rados_create(&s->cluster, clientname);
 464    if (r < 0) {
 465        error_setg(errp, "error initializing");
 466        goto failed_opts;
 467    }
 468
 469    s->snap = NULL;
 470    if (snap_buf[0] != '\0') {
 471        s->snap = g_strdup(snap_buf);
 472    }
 473
 474    if (strstr(conf, "conf=") == NULL) {
 475        /* try default location, but ignore failure */
 476        rados_conf_read_file(s->cluster, NULL);
 477    } else if (conf[0] != '\0') {
 478        r = qemu_rbd_set_conf(s->cluster, conf, true, errp);
 479        if (r < 0) {
 480            goto failed_shutdown;
 481        }
 482    }
 483
 484    if (conf[0] != '\0') {
 485        r = qemu_rbd_set_conf(s->cluster, conf, false, errp);
 486        if (r < 0) {
 487            goto failed_shutdown;
 488        }
 489    }
 490
 491    /*
 492     * Fallback to more conservative semantics if setting cache
 493     * options fails. Ignore errors from setting rbd_cache because the
 494     * only possible error is that the option does not exist, and
 495     * librbd defaults to no caching. If write through caching cannot
 496     * be set up, fall back to no caching.
 497     */
 498    if (flags & BDRV_O_NOCACHE) {
 499        rados_conf_set(s->cluster, "rbd_cache", "false");
 500    } else {
 501        rados_conf_set(s->cluster, "rbd_cache", "true");
 502    }
 503
 504    r = rados_connect(s->cluster);
 505    if (r < 0) {
 506        error_setg(errp, "error connecting");
 507        goto failed_shutdown;
 508    }
 509
 510    r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
 511    if (r < 0) {
 512        error_setg(errp, "error opening pool %s", pool);
 513        goto failed_shutdown;
 514    }
 515
 516    r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
 517    if (r < 0) {
 518        error_setg(errp, "error reading header from %s", s->name);
 519        goto failed_open;
 520    }
 521
 522    bs->read_only = (s->snap != NULL);
 523
 524    qemu_opts_del(opts);
 525    return 0;
 526
 527failed_open:
 528    rados_ioctx_destroy(s->io_ctx);
 529failed_shutdown:
 530    rados_shutdown(s->cluster);
 531    g_free(s->snap);
 532failed_opts:
 533    qemu_opts_del(opts);
 534    return r;
 535}
 536
 537static void qemu_rbd_close(BlockDriverState *bs)
 538{
 539    BDRVRBDState *s = bs->opaque;
 540
 541    rbd_close(s->image);
 542    rados_ioctx_destroy(s->io_ctx);
 543    g_free(s->snap);
 544    rados_shutdown(s->cluster);
 545}
 546
 547static const AIOCBInfo rbd_aiocb_info = {
 548    .aiocb_size = sizeof(RBDAIOCB),
 549};
 550
 551static void rbd_finish_bh(void *opaque)
 552{
 553    RADOSCB *rcb = opaque;
 554    qemu_bh_delete(rcb->acb->bh);
 555    qemu_rbd_complete_aio(rcb);
 556}
 557
 558/*
 559 * This is the callback function for rbd_aio_read and _write
 560 *
 561 * Note: this function is being called from a non qemu thread so
 562 * we need to be careful about what we do here. Generally we only
 563 * schedule a BH, and do the rest of the io completion handling
 564 * from rbd_finish_bh() which runs in a qemu context.
 565 */
 566static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 567{
 568    RBDAIOCB *acb = rcb->acb;
 569
 570    rcb->ret = rbd_aio_get_return_value(c);
 571    rbd_aio_release(c);
 572
 573    acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
 574                         rbd_finish_bh, rcb);
 575    qemu_bh_schedule(acb->bh);
 576}
 577
 578static int rbd_aio_discard_wrapper(rbd_image_t image,
 579                                   uint64_t off,
 580                                   uint64_t len,
 581                                   rbd_completion_t comp)
 582{
 583#ifdef LIBRBD_SUPPORTS_DISCARD
 584    return rbd_aio_discard(image, off, len, comp);
 585#else
 586    return -ENOTSUP;
 587#endif
 588}
 589
 590static int rbd_aio_flush_wrapper(rbd_image_t image,
 591                                 rbd_completion_t comp)
 592{
 593#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 594    return rbd_aio_flush(image, comp);
 595#else
 596    return -ENOTSUP;
 597#endif
 598}
 599
 600static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 601                                 int64_t sector_num,
 602                                 QEMUIOVector *qiov,
 603                                 int nb_sectors,
 604                                 BlockCompletionFunc *cb,
 605                                 void *opaque,
 606                                 RBDAIOCmd cmd)
 607{
 608    RBDAIOCB *acb;
 609    RADOSCB *rcb = NULL;
 610    rbd_completion_t c;
 611    int64_t off, size;
 612    char *buf;
 613    int r;
 614
 615    BDRVRBDState *s = bs->opaque;
 616
 617    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
 618    acb->cmd = cmd;
 619    acb->qiov = qiov;
 620    if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 621        acb->bounce = NULL;
 622    } else {
 623        acb->bounce = qemu_try_blockalign(bs, qiov->size);
 624        if (acb->bounce == NULL) {
 625            goto failed;
 626        }
 627    }
 628    acb->ret = 0;
 629    acb->error = 0;
 630    acb->s = s;
 631    acb->bh = NULL;
 632
 633    if (cmd == RBD_AIO_WRITE) {
 634        qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 635    }
 636
 637    buf = acb->bounce;
 638
 639    off = sector_num * BDRV_SECTOR_SIZE;
 640    size = nb_sectors * BDRV_SECTOR_SIZE;
 641
 642    rcb = g_new(RADOSCB, 1);
 643    rcb->acb = acb;
 644    rcb->buf = buf;
 645    rcb->s = acb->s;
 646    rcb->size = size;
 647    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
 648    if (r < 0) {
 649        goto failed;
 650    }
 651
 652    switch (cmd) {
 653    case RBD_AIO_WRITE:
 654        r = rbd_aio_write(s->image, off, size, buf, c);
 655        break;
 656    case RBD_AIO_READ:
 657        r = rbd_aio_read(s->image, off, size, buf, c);
 658        break;
 659    case RBD_AIO_DISCARD:
 660        r = rbd_aio_discard_wrapper(s->image, off, size, c);
 661        break;
 662    case RBD_AIO_FLUSH:
 663        r = rbd_aio_flush_wrapper(s->image, c);
 664        break;
 665    default:
 666        r = -EINVAL;
 667    }
 668
 669    if (r < 0) {
 670        goto failed_completion;
 671    }
 672
 673    return &acb->common;
 674
 675failed_completion:
 676    rbd_aio_release(c);
 677failed:
 678    g_free(rcb);
 679    qemu_vfree(acb->bounce);
 680    qemu_aio_unref(acb);
 681    return NULL;
 682}
 683
 684static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
 685                                      int64_t sector_num,
 686                                      QEMUIOVector *qiov,
 687                                      int nb_sectors,
 688                                      BlockCompletionFunc *cb,
 689                                      void *opaque)
 690{
 691    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
 692                         RBD_AIO_READ);
 693}
 694
 695static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
 696                                       int64_t sector_num,
 697                                       QEMUIOVector *qiov,
 698                                       int nb_sectors,
 699                                       BlockCompletionFunc *cb,
 700                                       void *opaque)
 701{
 702    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
 703                         RBD_AIO_WRITE);
 704}
 705
 706#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 707static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
 708                                      BlockCompletionFunc *cb,
 709                                      void *opaque)
 710{
 711    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
 712}
 713
 714#else
 715
 716static int qemu_rbd_co_flush(BlockDriverState *bs)
 717{
 718#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
 719    /* rbd_flush added in 0.1.1 */
 720    BDRVRBDState *s = bs->opaque;
 721    return rbd_flush(s->image);
 722#else
 723    return 0;
 724#endif
 725}
 726#endif
 727
 728static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
 729{
 730    BDRVRBDState *s = bs->opaque;
 731    rbd_image_info_t info;
 732    int r;
 733
 734    r = rbd_stat(s->image, &info, sizeof(info));
 735    if (r < 0) {
 736        return r;
 737    }
 738
 739    bdi->cluster_size = info.obj_size;
 740    return 0;
 741}
 742
 743static int64_t qemu_rbd_getlength(BlockDriverState *bs)
 744{
 745    BDRVRBDState *s = bs->opaque;
 746    rbd_image_info_t info;
 747    int r;
 748
 749    r = rbd_stat(s->image, &info, sizeof(info));
 750    if (r < 0) {
 751        return r;
 752    }
 753
 754    return info.size;
 755}
 756
 757static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
 758{
 759    BDRVRBDState *s = bs->opaque;
 760    int r;
 761
 762    r = rbd_resize(s->image, offset);
 763    if (r < 0) {
 764        return r;
 765    }
 766
 767    return 0;
 768}
 769
 770static int qemu_rbd_snap_create(BlockDriverState *bs,
 771                                QEMUSnapshotInfo *sn_info)
 772{
 773    BDRVRBDState *s = bs->opaque;
 774    int r;
 775
 776    if (sn_info->name[0] == '\0') {
 777        return -EINVAL; /* we need a name for rbd snapshots */
 778    }
 779
 780    /*
 781     * rbd snapshots are using the name as the user controlled unique identifier
 782     * we can't use the rbd snapid for that purpose, as it can't be set
 783     */
 784    if (sn_info->id_str[0] != '\0' &&
 785        strcmp(sn_info->id_str, sn_info->name) != 0) {
 786        return -EINVAL;
 787    }
 788
 789    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
 790        return -ERANGE;
 791    }
 792
 793    r = rbd_snap_create(s->image, sn_info->name);
 794    if (r < 0) {
 795        error_report("failed to create snap: %s", strerror(-r));
 796        return r;
 797    }
 798
 799    return 0;
 800}
 801
 802static int qemu_rbd_snap_remove(BlockDriverState *bs,
 803                                const char *snapshot_id,
 804                                const char *snapshot_name,
 805                                Error **errp)
 806{
 807    BDRVRBDState *s = bs->opaque;
 808    int r;
 809
 810    if (!snapshot_name) {
 811        error_setg(errp, "rbd need a valid snapshot name");
 812        return -EINVAL;
 813    }
 814
 815    /* If snapshot_id is specified, it must be equal to name, see
 816       qemu_rbd_snap_list() */
 817    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
 818        error_setg(errp,
 819                   "rbd do not support snapshot id, it should be NULL or "
 820                   "equal to snapshot name");
 821        return -EINVAL;
 822    }
 823
 824    r = rbd_snap_remove(s->image, snapshot_name);
 825    if (r < 0) {
 826        error_setg_errno(errp, -r, "Failed to remove the snapshot");
 827    }
 828    return r;
 829}
 830
 831static int qemu_rbd_snap_rollback(BlockDriverState *bs,
 832                                  const char *snapshot_name)
 833{
 834    BDRVRBDState *s = bs->opaque;
 835    int r;
 836
 837    r = rbd_snap_rollback(s->image, snapshot_name);
 838    return r;
 839}
 840
 841static int qemu_rbd_snap_list(BlockDriverState *bs,
 842                              QEMUSnapshotInfo **psn_tab)
 843{
 844    BDRVRBDState *s = bs->opaque;
 845    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
 846    int i, snap_count;
 847    rbd_snap_info_t *snaps;
 848    int max_snaps = RBD_MAX_SNAPS;
 849
 850    do {
 851        snaps = g_new(rbd_snap_info_t, max_snaps);
 852        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
 853        if (snap_count <= 0) {
 854            g_free(snaps);
 855        }
 856    } while (snap_count == -ERANGE);
 857
 858    if (snap_count <= 0) {
 859        goto done;
 860    }
 861
 862    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
 863
 864    for (i = 0; i < snap_count; i++) {
 865        const char *snap_name = snaps[i].name;
 866
 867        sn_info = sn_tab + i;
 868        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
 869        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
 870
 871        sn_info->vm_state_size = snaps[i].size;
 872        sn_info->date_sec = 0;
 873        sn_info->date_nsec = 0;
 874        sn_info->vm_clock_nsec = 0;
 875    }
 876    rbd_snap_list_end(snaps);
 877    g_free(snaps);
 878
 879 done:
 880    *psn_tab = sn_tab;
 881    return snap_count;
 882}
 883
 884#ifdef LIBRBD_SUPPORTS_DISCARD
 885static BlockAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
 886                                        int64_t sector_num,
 887                                        int nb_sectors,
 888                                        BlockCompletionFunc *cb,
 889                                        void *opaque)
 890{
 891    return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
 892                         RBD_AIO_DISCARD);
 893}
 894#endif
 895
 896#ifdef LIBRBD_SUPPORTS_INVALIDATE
 897static void qemu_rbd_invalidate_cache(BlockDriverState *bs,
 898                                      Error **errp)
 899{
 900    BDRVRBDState *s = bs->opaque;
 901    int r = rbd_invalidate_cache(s->image);
 902    if (r < 0) {
 903        error_setg_errno(errp, -r, "Failed to invalidate the cache");
 904    }
 905}
 906#endif
 907
 908static QemuOptsList qemu_rbd_create_opts = {
 909    .name = "rbd-create-opts",
 910    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
 911    .desc = {
 912        {
 913            .name = BLOCK_OPT_SIZE,
 914            .type = QEMU_OPT_SIZE,
 915            .help = "Virtual disk size"
 916        },
 917        {
 918            .name = BLOCK_OPT_CLUSTER_SIZE,
 919            .type = QEMU_OPT_SIZE,
 920            .help = "RBD object size"
 921        },
 922        { /* end of list */ }
 923    }
 924};
 925
 926static BlockDriver bdrv_rbd = {
 927    .format_name        = "rbd",
 928    .instance_size      = sizeof(BDRVRBDState),
 929    .bdrv_needs_filename = true,
 930    .bdrv_file_open     = qemu_rbd_open,
 931    .bdrv_close         = qemu_rbd_close,
 932    .bdrv_create        = qemu_rbd_create,
 933    .bdrv_has_zero_init = bdrv_has_zero_init_1,
 934    .bdrv_get_info      = qemu_rbd_getinfo,
 935    .create_opts        = &qemu_rbd_create_opts,
 936    .bdrv_getlength     = qemu_rbd_getlength,
 937    .bdrv_truncate      = qemu_rbd_truncate,
 938    .protocol_name      = "rbd",
 939
 940    .bdrv_aio_readv         = qemu_rbd_aio_readv,
 941    .bdrv_aio_writev        = qemu_rbd_aio_writev,
 942
 943#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 944    .bdrv_aio_flush         = qemu_rbd_aio_flush,
 945#else
 946    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
 947#endif
 948
 949#ifdef LIBRBD_SUPPORTS_DISCARD
 950    .bdrv_aio_discard       = qemu_rbd_aio_discard,
 951#endif
 952
 953    .bdrv_snapshot_create   = qemu_rbd_snap_create,
 954    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
 955    .bdrv_snapshot_list     = qemu_rbd_snap_list,
 956    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
 957#ifdef LIBRBD_SUPPORTS_INVALIDATE
 958    .bdrv_invalidate_cache  = qemu_rbd_invalidate_cache,
 959#endif
 960};
 961
 962static void bdrv_rbd_init(void)
 963{
 964    bdrv_register(&bdrv_rbd);
 965}
 966
 967block_init(bdrv_rbd_init);
 968