qemu/block/rbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for RADOS (Ceph)
   3 *
   4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
   5 *                         Josh Durgin <josh.durgin@dreamhost.com>
   6 *
   7 * This work is licensed under the terms of the GNU GPL, version 2.  See
   8 * the COPYING file in the top-level directory.
   9 *
  10 * Contributions after 2012-01-13 are licensed under the terms of the
  11 * GNU GPL, version 2 or (at your option) any later version.
  12 */
  13
  14#include <inttypes.h>
  15
  16#include "qemu-common.h"
  17#include "qemu-error.h"
  18#include "block_int.h"
  19
  20#include <rbd/librbd.h>
  21
  22/*
  23 * When specifying the image filename use:
  24 *
  25 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  26 *
  27 * poolname must be the name of an existing rados pool.
  28 *
  29 * devicename is the name of the rbd image.
  30 *
  31 * Each option given is used to configure rados, and may be any valid
  32 * Ceph option, "id", or "conf".
  33 *
  34 * The "id" option indicates what user we should authenticate as to
  35 * the Ceph cluster.  If it is excluded we will use the Ceph default
  36 * (normally 'admin').
  37 *
  38 * The "conf" option specifies a Ceph configuration file to read.  If
  39 * it is not specified, we will read from the default Ceph locations
  40 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
  41 * file, specify conf=/dev/null.
  42 *
  43 * Configuration values containing :, @, or = can be escaped with a
  44 * leading "\".
  45 */
  46
  47/* rbd_aio_discard added in 0.1.2 */
  48#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
  49#define LIBRBD_SUPPORTS_DISCARD
  50#else
  51#undef LIBRBD_SUPPORTS_DISCARD
  52#endif
  53
  54#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
  55
  56#define RBD_MAX_CONF_NAME_SIZE 128
  57#define RBD_MAX_CONF_VAL_SIZE 512
  58#define RBD_MAX_CONF_SIZE 1024
  59#define RBD_MAX_POOL_NAME_SIZE 128
  60#define RBD_MAX_SNAP_NAME_SIZE 128
  61#define RBD_MAX_SNAPS 100
  62
  63typedef enum {
  64    RBD_AIO_READ,
  65    RBD_AIO_WRITE,
  66    RBD_AIO_DISCARD
  67} RBDAIOCmd;
  68
  69typedef struct RBDAIOCB {
  70    BlockDriverAIOCB common;
  71    QEMUBH *bh;
  72    int ret;
  73    QEMUIOVector *qiov;
  74    char *bounce;
  75    RBDAIOCmd cmd;
  76    int64_t sector_num;
  77    int error;
  78    struct BDRVRBDState *s;
  79    int cancelled;
  80} RBDAIOCB;
  81
  82typedef struct RADOSCB {
  83    int rcbid;
  84    RBDAIOCB *acb;
  85    struct BDRVRBDState *s;
  86    int done;
  87    int64_t size;
  88    char *buf;
  89    int ret;
  90} RADOSCB;
  91
  92#define RBD_FD_READ 0
  93#define RBD_FD_WRITE 1
  94
  95typedef struct BDRVRBDState {
  96    int fds[2];
  97    rados_t cluster;
  98    rados_ioctx_t io_ctx;
  99    rbd_image_t image;
 100    char name[RBD_MAX_IMAGE_NAME_SIZE];
 101    int qemu_aio_count;
 102    char *snap;
 103    int event_reader_pos;
 104    RADOSCB *event_rcb;
 105} BDRVRBDState;
 106
 107static void rbd_aio_bh_cb(void *opaque);
 108
 109static int qemu_rbd_next_tok(char *dst, int dst_len,
 110                             char *src, char delim,
 111                             const char *name,
 112                             char **p)
 113{
 114    int l;
 115    char *end;
 116
 117    *p = NULL;
 118
 119    if (delim != '\0') {
 120        for (end = src; *end; ++end) {
 121            if (*end == delim) {
 122                break;
 123            }
 124            if (*end == '\\' && end[1] != '\0') {
 125                end++;
 126            }
 127        }
 128        if (*end == delim) {
 129            *p = end + 1;
 130            *end = '\0';
 131        }
 132    }
 133    l = strlen(src);
 134    if (l >= dst_len) {
 135        error_report("%s too long", name);
 136        return -EINVAL;
 137    } else if (l == 0) {
 138        error_report("%s too short", name);
 139        return -EINVAL;
 140    }
 141
 142    pstrcpy(dst, dst_len, src);
 143
 144    return 0;
 145}
 146
 147static void qemu_rbd_unescape(char *src)
 148{
 149    char *p;
 150
 151    for (p = src; *src; ++src, ++p) {
 152        if (*src == '\\' && src[1] != '\0') {
 153            src++;
 154        }
 155        *p = *src;
 156    }
 157    *p = '\0';
 158}
 159
 160static int qemu_rbd_parsename(const char *filename,
 161                              char *pool, int pool_len,
 162                              char *snap, int snap_len,
 163                              char *name, int name_len,
 164                              char *conf, int conf_len)
 165{
 166    const char *start;
 167    char *p, *buf;
 168    int ret;
 169
 170    if (!strstart(filename, "rbd:", &start)) {
 171        return -EINVAL;
 172    }
 173
 174    buf = g_strdup(start);
 175    p = buf;
 176    *snap = '\0';
 177    *conf = '\0';
 178
 179    ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
 180    if (ret < 0 || !p) {
 181        ret = -EINVAL;
 182        goto done;
 183    }
 184    qemu_rbd_unescape(pool);
 185
 186    if (strchr(p, '@')) {
 187        ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
 188        if (ret < 0) {
 189            goto done;
 190        }
 191        ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
 192        qemu_rbd_unescape(snap);
 193    } else {
 194        ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
 195    }
 196    qemu_rbd_unescape(name);
 197    if (ret < 0 || !p) {
 198        goto done;
 199    }
 200
 201    ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
 202
 203done:
 204    g_free(buf);
 205    return ret;
 206}
 207
 208static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
 209{
 210    const char *p = conf;
 211
 212    while (*p) {
 213        int len;
 214        const char *end = strchr(p, ':');
 215
 216        if (end) {
 217            len = end - p;
 218        } else {
 219            len = strlen(p);
 220        }
 221
 222        if (strncmp(p, "id=", 3) == 0) {
 223            len -= 3;
 224            strncpy(clientname, p + 3, len);
 225            clientname[len] = '\0';
 226            return clientname;
 227        }
 228        if (end == NULL) {
 229            break;
 230        }
 231        p = end + 1;
 232    }
 233    return NULL;
 234}
 235
 236static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
 237{
 238    char *p, *buf;
 239    char name[RBD_MAX_CONF_NAME_SIZE];
 240    char value[RBD_MAX_CONF_VAL_SIZE];
 241    int ret = 0;
 242
 243    buf = g_strdup(conf);
 244    p = buf;
 245
 246    while (p) {
 247        ret = qemu_rbd_next_tok(name, sizeof(name), p,
 248                                '=', "conf option name", &p);
 249        if (ret < 0) {
 250            break;
 251        }
 252        qemu_rbd_unescape(name);
 253
 254        if (!p) {
 255            error_report("conf option %s has no value", name);
 256            ret = -EINVAL;
 257            break;
 258        }
 259
 260        ret = qemu_rbd_next_tok(value, sizeof(value), p,
 261                                ':', "conf option value", &p);
 262        if (ret < 0) {
 263            break;
 264        }
 265        qemu_rbd_unescape(value);
 266
 267        if (strcmp(name, "conf") == 0) {
 268            ret = rados_conf_read_file(cluster, value);
 269            if (ret < 0) {
 270                error_report("error reading conf file %s", value);
 271                break;
 272            }
 273        } else if (strcmp(name, "id") == 0) {
 274            /* ignore, this is parsed by qemu_rbd_parse_clientname() */
 275        } else {
 276            ret = rados_conf_set(cluster, name, value);
 277            if (ret < 0) {
 278                error_report("invalid conf option %s", name);
 279                ret = -EINVAL;
 280                break;
 281            }
 282        }
 283    }
 284
 285    g_free(buf);
 286    return ret;
 287}
 288
 289static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
 290{
 291    int64_t bytes = 0;
 292    int64_t objsize;
 293    int obj_order = 0;
 294    char pool[RBD_MAX_POOL_NAME_SIZE];
 295    char name[RBD_MAX_IMAGE_NAME_SIZE];
 296    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
 297    char conf[RBD_MAX_CONF_SIZE];
 298    char clientname_buf[RBD_MAX_CONF_SIZE];
 299    char *clientname;
 300    rados_t cluster;
 301    rados_ioctx_t io_ctx;
 302    int ret;
 303
 304    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
 305                           snap_buf, sizeof(snap_buf),
 306                           name, sizeof(name),
 307                           conf, sizeof(conf)) < 0) {
 308        return -EINVAL;
 309    }
 310
 311    /* Read out options */
 312    while (options && options->name) {
 313        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
 314            bytes = options->value.n;
 315        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
 316            if (options->value.n) {
 317                objsize = options->value.n;
 318                if ((objsize - 1) & objsize) {    /* not a power of 2? */
 319                    error_report("obj size needs to be power of 2");
 320                    return -EINVAL;
 321                }
 322                if (objsize < 4096) {
 323                    error_report("obj size too small");
 324                    return -EINVAL;
 325                }
 326                obj_order = ffs(objsize) - 1;
 327            }
 328        }
 329        options++;
 330    }
 331
 332    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 333    if (rados_create(&cluster, clientname) < 0) {
 334        error_report("error initializing");
 335        return -EIO;
 336    }
 337
 338    if (strstr(conf, "conf=") == NULL) {
 339        /* try default location, but ignore failure */
 340        rados_conf_read_file(cluster, NULL);
 341    }
 342
 343    if (conf[0] != '\0' &&
 344        qemu_rbd_set_conf(cluster, conf) < 0) {
 345        error_report("error setting config options");
 346        rados_shutdown(cluster);
 347        return -EIO;
 348    }
 349
 350    if (rados_connect(cluster) < 0) {
 351        error_report("error connecting");
 352        rados_shutdown(cluster);
 353        return -EIO;
 354    }
 355
 356    if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
 357        error_report("error opening pool %s", pool);
 358        rados_shutdown(cluster);
 359        return -EIO;
 360    }
 361
 362    ret = rbd_create(io_ctx, name, bytes, &obj_order);
 363    rados_ioctx_destroy(io_ctx);
 364    rados_shutdown(cluster);
 365
 366    return ret;
 367}
 368
 369/*
 370 * This aio completion is being called from qemu_rbd_aio_event_reader()
 371 * and runs in qemu context. It schedules a bh, but just in case the aio
 372 * was not cancelled before.
 373 */
 374static void qemu_rbd_complete_aio(RADOSCB *rcb)
 375{
 376    RBDAIOCB *acb = rcb->acb;
 377    int64_t r;
 378
 379    if (acb->cancelled) {
 380        qemu_vfree(acb->bounce);
 381        qemu_aio_release(acb);
 382        goto done;
 383    }
 384
 385    r = rcb->ret;
 386
 387    if (acb->cmd == RBD_AIO_WRITE ||
 388        acb->cmd == RBD_AIO_DISCARD) {
 389        if (r < 0) {
 390            acb->ret = r;
 391            acb->error = 1;
 392        } else if (!acb->error) {
 393            acb->ret = rcb->size;
 394        }
 395    } else {
 396        if (r < 0) {
 397            memset(rcb->buf, 0, rcb->size);
 398            acb->ret = r;
 399            acb->error = 1;
 400        } else if (r < rcb->size) {
 401            memset(rcb->buf + r, 0, rcb->size - r);
 402            if (!acb->error) {
 403                acb->ret = rcb->size;
 404            }
 405        } else if (!acb->error) {
 406            acb->ret = r;
 407        }
 408    }
 409    /* Note that acb->bh can be NULL in case where the aio was cancelled */
 410    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
 411    qemu_bh_schedule(acb->bh);
 412done:
 413    g_free(rcb);
 414}
 415
 416/*
 417 * aio fd read handler. It runs in the qemu context and calls the
 418 * completion handling of completed rados aio operations.
 419 */
 420static void qemu_rbd_aio_event_reader(void *opaque)
 421{
 422    BDRVRBDState *s = opaque;
 423
 424    ssize_t ret;
 425
 426    do {
 427        char *p = (char *)&s->event_rcb;
 428
 429        /* now read the rcb pointer that was sent from a non qemu thread */
 430        ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
 431                   sizeof(s->event_rcb) - s->event_reader_pos);
 432        if (ret > 0) {
 433            s->event_reader_pos += ret;
 434            if (s->event_reader_pos == sizeof(s->event_rcb)) {
 435                s->event_reader_pos = 0;
 436                qemu_rbd_complete_aio(s->event_rcb);
 437                s->qemu_aio_count--;
 438            }
 439        }
 440    } while (ret < 0 && errno == EINTR);
 441}
 442
 443static int qemu_rbd_aio_flush_cb(void *opaque)
 444{
 445    BDRVRBDState *s = opaque;
 446
 447    return (s->qemu_aio_count > 0);
 448}
 449
 450static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
 451{
 452    BDRVRBDState *s = bs->opaque;
 453    char pool[RBD_MAX_POOL_NAME_SIZE];
 454    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
 455    char conf[RBD_MAX_CONF_SIZE];
 456    char clientname_buf[RBD_MAX_CONF_SIZE];
 457    char *clientname;
 458    int r;
 459
 460    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
 461                           snap_buf, sizeof(snap_buf),
 462                           s->name, sizeof(s->name),
 463                           conf, sizeof(conf)) < 0) {
 464        return -EINVAL;
 465    }
 466
 467    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 468    r = rados_create(&s->cluster, clientname);
 469    if (r < 0) {
 470        error_report("error initializing");
 471        return r;
 472    }
 473
 474    s->snap = NULL;
 475    if (snap_buf[0] != '\0') {
 476        s->snap = g_strdup(snap_buf);
 477    }
 478
 479    if (strstr(conf, "conf=") == NULL) {
 480        /* try default location, but ignore failure */
 481        rados_conf_read_file(s->cluster, NULL);
 482    }
 483
 484    if (conf[0] != '\0') {
 485        r = qemu_rbd_set_conf(s->cluster, conf);
 486        if (r < 0) {
 487            error_report("error setting config options");
 488            goto failed_shutdown;
 489        }
 490    }
 491
 492    r = rados_connect(s->cluster);
 493    if (r < 0) {
 494        error_report("error connecting");
 495        goto failed_shutdown;
 496    }
 497
 498    r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
 499    if (r < 0) {
 500        error_report("error opening pool %s", pool);
 501        goto failed_shutdown;
 502    }
 503
 504    r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
 505    if (r < 0) {
 506        error_report("error reading header from %s", s->name);
 507        goto failed_open;
 508    }
 509
 510    bs->read_only = (s->snap != NULL);
 511
 512    s->event_reader_pos = 0;
 513    r = qemu_pipe(s->fds);
 514    if (r < 0) {
 515        error_report("error opening eventfd");
 516        goto failed;
 517    }
 518    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
 519    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
 520    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
 521                            NULL, qemu_rbd_aio_flush_cb, s);
 522
 523
 524    return 0;
 525
 526failed:
 527    rbd_close(s->image);
 528failed_open:
 529    rados_ioctx_destroy(s->io_ctx);
 530failed_shutdown:
 531    rados_shutdown(s->cluster);
 532    g_free(s->snap);
 533    return r;
 534}
 535
 536static void qemu_rbd_close(BlockDriverState *bs)
 537{
 538    BDRVRBDState *s = bs->opaque;
 539
 540    close(s->fds[0]);
 541    close(s->fds[1]);
 542    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL);
 543
 544    rbd_close(s->image);
 545    rados_ioctx_destroy(s->io_ctx);
 546    g_free(s->snap);
 547    rados_shutdown(s->cluster);
 548}
 549
 550/*
 551 * Cancel aio. Since we don't reference acb in a non qemu threads,
 552 * it is safe to access it here.
 553 */
 554static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
 555{
 556    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
 557    acb->cancelled = 1;
 558}
 559
 560static AIOPool rbd_aio_pool = {
 561    .aiocb_size = sizeof(RBDAIOCB),
 562    .cancel = qemu_rbd_aio_cancel,
 563};
 564
 565static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
 566{
 567    int ret = 0;
 568    while (1) {
 569        fd_set wfd;
 570        int fd = s->fds[RBD_FD_WRITE];
 571
 572        /* send the op pointer to the qemu thread that is responsible
 573           for the aio/op completion. Must do it in a qemu thread context */
 574        ret = write(fd, (void *)&rcb, sizeof(rcb));
 575        if (ret >= 0) {
 576            break;
 577        }
 578        if (errno == EINTR) {
 579            continue;
 580        }
 581        if (errno != EAGAIN) {
 582            break;
 583        }
 584
 585        FD_ZERO(&wfd);
 586        FD_SET(fd, &wfd);
 587        do {
 588            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
 589        } while (ret < 0 && errno == EINTR);
 590    }
 591
 592    return ret;
 593}
 594
 595/*
 596 * This is the callback function for rbd_aio_read and _write
 597 *
 598 * Note: this function is being called from a non qemu thread so
 599 * we need to be careful about what we do here. Generally we only
 600 * write to the block notification pipe, and do the rest of the
 601 * io completion handling from qemu_rbd_aio_event_reader() which
 602 * runs in a qemu context.
 603 */
 604static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 605{
 606    int ret;
 607    rcb->ret = rbd_aio_get_return_value(c);
 608    rbd_aio_release(c);
 609    ret = qemu_rbd_send_pipe(rcb->s, rcb);
 610    if (ret < 0) {
 611        error_report("failed writing to acb->s->fds");
 612        g_free(rcb);
 613    }
 614}
 615
 616/* Callback when all queued rbd_aio requests are complete */
 617
 618static void rbd_aio_bh_cb(void *opaque)
 619{
 620    RBDAIOCB *acb = opaque;
 621
 622    if (acb->cmd == RBD_AIO_READ) {
 623        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
 624    }
 625    qemu_vfree(acb->bounce);
 626    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 627    qemu_bh_delete(acb->bh);
 628    acb->bh = NULL;
 629
 630    qemu_aio_release(acb);
 631}
 632
 633static int rbd_aio_discard_wrapper(rbd_image_t image,
 634                                   uint64_t off,
 635                                   uint64_t len,
 636                                   rbd_completion_t comp)
 637{
 638#ifdef LIBRBD_SUPPORTS_DISCARD
 639    return rbd_aio_discard(image, off, len, comp);
 640#else
 641    return -ENOTSUP;
 642#endif
 643}
 644
 645static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
 646                                       int64_t sector_num,
 647                                       QEMUIOVector *qiov,
 648                                       int nb_sectors,
 649                                       BlockDriverCompletionFunc *cb,
 650                                       void *opaque,
 651                                       RBDAIOCmd cmd)
 652{
 653    RBDAIOCB *acb;
 654    RADOSCB *rcb;
 655    rbd_completion_t c;
 656    int64_t off, size;
 657    char *buf;
 658    int r;
 659
 660    BDRVRBDState *s = bs->opaque;
 661
 662    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
 663    acb->cmd = cmd;
 664    acb->qiov = qiov;
 665    if (cmd == RBD_AIO_DISCARD) {
 666        acb->bounce = NULL;
 667    } else {
 668        acb->bounce = qemu_blockalign(bs, qiov->size);
 669    }
 670    acb->ret = 0;
 671    acb->error = 0;
 672    acb->s = s;
 673    acb->cancelled = 0;
 674    acb->bh = NULL;
 675
 676    if (cmd == RBD_AIO_WRITE) {
 677        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
 678    }
 679
 680    buf = acb->bounce;
 681
 682    off = sector_num * BDRV_SECTOR_SIZE;
 683    size = nb_sectors * BDRV_SECTOR_SIZE;
 684
 685    s->qemu_aio_count++; /* All the RADOSCB */
 686
 687    rcb = g_malloc(sizeof(RADOSCB));
 688    rcb->done = 0;
 689    rcb->acb = acb;
 690    rcb->buf = buf;
 691    rcb->s = acb->s;
 692    rcb->size = size;
 693    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
 694    if (r < 0) {
 695        goto failed;
 696    }
 697
 698    switch (cmd) {
 699    case RBD_AIO_WRITE:
 700        r = rbd_aio_write(s->image, off, size, buf, c);
 701        break;
 702    case RBD_AIO_READ:
 703        r = rbd_aio_read(s->image, off, size, buf, c);
 704        break;
 705    case RBD_AIO_DISCARD:
 706        r = rbd_aio_discard_wrapper(s->image, off, size, c);
 707        break;
 708    default:
 709        r = -EINVAL;
 710    }
 711
 712    if (r < 0) {
 713        goto failed;
 714    }
 715
 716    return &acb->common;
 717
 718failed:
 719    g_free(rcb);
 720    s->qemu_aio_count--;
 721    qemu_aio_release(acb);
 722    return NULL;
 723}
 724
 725static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
 726                                            int64_t sector_num,
 727                                            QEMUIOVector *qiov,
 728                                            int nb_sectors,
 729                                            BlockDriverCompletionFunc *cb,
 730                                            void *opaque)
 731{
 732    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
 733                         RBD_AIO_READ);
 734}
 735
 736static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
 737                                             int64_t sector_num,
 738                                             QEMUIOVector *qiov,
 739                                             int nb_sectors,
 740                                             BlockDriverCompletionFunc *cb,
 741                                             void *opaque)
 742{
 743    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
 744                         RBD_AIO_WRITE);
 745}
 746
 747static int qemu_rbd_co_flush(BlockDriverState *bs)
 748{
 749#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
 750    /* rbd_flush added in 0.1.1 */
 751    BDRVRBDState *s = bs->opaque;
 752    return rbd_flush(s->image);
 753#else
 754    return 0;
 755#endif
 756}
 757
 758static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
 759{
 760    BDRVRBDState *s = bs->opaque;
 761    rbd_image_info_t info;
 762    int r;
 763
 764    r = rbd_stat(s->image, &info, sizeof(info));
 765    if (r < 0) {
 766        return r;
 767    }
 768
 769    bdi->cluster_size = info.obj_size;
 770    return 0;
 771}
 772
 773static int64_t qemu_rbd_getlength(BlockDriverState *bs)
 774{
 775    BDRVRBDState *s = bs->opaque;
 776    rbd_image_info_t info;
 777    int r;
 778
 779    r = rbd_stat(s->image, &info, sizeof(info));
 780    if (r < 0) {
 781        return r;
 782    }
 783
 784    return info.size;
 785}
 786
 787static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
 788{
 789    BDRVRBDState *s = bs->opaque;
 790    int r;
 791
 792    r = rbd_resize(s->image, offset);
 793    if (r < 0) {
 794        return r;
 795    }
 796
 797    return 0;
 798}
 799
 800static int qemu_rbd_snap_create(BlockDriverState *bs,
 801                                QEMUSnapshotInfo *sn_info)
 802{
 803    BDRVRBDState *s = bs->opaque;
 804    int r;
 805
 806    if (sn_info->name[0] == '\0') {
 807        return -EINVAL; /* we need a name for rbd snapshots */
 808    }
 809
 810    /*
 811     * rbd snapshots are using the name as the user controlled unique identifier
 812     * we can't use the rbd snapid for that purpose, as it can't be set
 813     */
 814    if (sn_info->id_str[0] != '\0' &&
 815        strcmp(sn_info->id_str, sn_info->name) != 0) {
 816        return -EINVAL;
 817    }
 818
 819    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
 820        return -ERANGE;
 821    }
 822
 823    r = rbd_snap_create(s->image, sn_info->name);
 824    if (r < 0) {
 825        error_report("failed to create snap: %s", strerror(-r));
 826        return r;
 827    }
 828
 829    return 0;
 830}
 831
 832static int qemu_rbd_snap_remove(BlockDriverState *bs,
 833                                const char *snapshot_name)
 834{
 835    BDRVRBDState *s = bs->opaque;
 836    int r;
 837
 838    r = rbd_snap_remove(s->image, snapshot_name);
 839    return r;
 840}
 841
 842static int qemu_rbd_snap_rollback(BlockDriverState *bs,
 843                                  const char *snapshot_name)
 844{
 845    BDRVRBDState *s = bs->opaque;
 846    int r;
 847
 848    r = rbd_snap_rollback(s->image, snapshot_name);
 849    return r;
 850}
 851
 852static int qemu_rbd_snap_list(BlockDriverState *bs,
 853                              QEMUSnapshotInfo **psn_tab)
 854{
 855    BDRVRBDState *s = bs->opaque;
 856    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
 857    int i, snap_count;
 858    rbd_snap_info_t *snaps;
 859    int max_snaps = RBD_MAX_SNAPS;
 860
 861    do {
 862        snaps = g_malloc(sizeof(*snaps) * max_snaps);
 863        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
 864        if (snap_count < 0) {
 865            g_free(snaps);
 866        }
 867    } while (snap_count == -ERANGE);
 868
 869    if (snap_count <= 0) {
 870        goto done;
 871    }
 872
 873    sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
 874
 875    for (i = 0; i < snap_count; i++) {
 876        const char *snap_name = snaps[i].name;
 877
 878        sn_info = sn_tab + i;
 879        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
 880        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
 881
 882        sn_info->vm_state_size = snaps[i].size;
 883        sn_info->date_sec = 0;
 884        sn_info->date_nsec = 0;
 885        sn_info->vm_clock_nsec = 0;
 886    }
 887    rbd_snap_list_end(snaps);
 888
 889 done:
 890    *psn_tab = sn_tab;
 891    return snap_count;
 892}
 893
 894#ifdef LIBRBD_SUPPORTS_DISCARD
 895static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
 896                                              int64_t sector_num,
 897                                              int nb_sectors,
 898                                              BlockDriverCompletionFunc *cb,
 899                                              void *opaque)
 900{
 901    return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
 902                         RBD_AIO_DISCARD);
 903}
 904#endif
 905
 906static QEMUOptionParameter qemu_rbd_create_options[] = {
 907    {
 908     .name = BLOCK_OPT_SIZE,
 909     .type = OPT_SIZE,
 910     .help = "Virtual disk size"
 911    },
 912    {
 913     .name = BLOCK_OPT_CLUSTER_SIZE,
 914     .type = OPT_SIZE,
 915     .help = "RBD object size"
 916    },
 917    {NULL}
 918};
 919
 920static BlockDriver bdrv_rbd = {
 921    .format_name        = "rbd",
 922    .instance_size      = sizeof(BDRVRBDState),
 923    .bdrv_file_open     = qemu_rbd_open,
 924    .bdrv_close         = qemu_rbd_close,
 925    .bdrv_create        = qemu_rbd_create,
 926    .bdrv_get_info      = qemu_rbd_getinfo,
 927    .create_options     = qemu_rbd_create_options,
 928    .bdrv_getlength     = qemu_rbd_getlength,
 929    .bdrv_truncate      = qemu_rbd_truncate,
 930    .protocol_name      = "rbd",
 931
 932    .bdrv_aio_readv         = qemu_rbd_aio_readv,
 933    .bdrv_aio_writev        = qemu_rbd_aio_writev,
 934    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
 935
 936#ifdef LIBRBD_SUPPORTS_DISCARD
 937    .bdrv_aio_discard       = qemu_rbd_aio_discard,
 938#endif
 939
 940    .bdrv_snapshot_create   = qemu_rbd_snap_create,
 941    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
 942    .bdrv_snapshot_list     = qemu_rbd_snap_list,
 943    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
 944};
 945
 946static void bdrv_rbd_init(void)
 947{
 948    bdrv_register(&bdrv_rbd);
 949}
 950
 951block_init(bdrv_rbd_init);
 952