qemu/block/sheepdog.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public License version
   6 * 2 as published by the Free Software Foundation.
   7 *
   8 * You should have received a copy of the GNU General Public License
   9 * along with this program. If not, see <http://www.gnu.org/licenses/>.
  10 *
  11 * Contributions after 2012-01-13 are licensed under the terms of the
  12 * GNU GPL, version 2 or (at your option) any later version.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qapi/error.h"
  17#include "qapi/qapi-visit-sockets.h"
  18#include "qapi/qapi-visit-block-core.h"
  19#include "qapi/qmp/qdict.h"
  20#include "qapi/qobject-input-visitor.h"
  21#include "qapi/qobject-output-visitor.h"
  22#include "qemu/uri.h"
  23#include "qemu/error-report.h"
  24#include "qemu/option.h"
  25#include "qemu/sockets.h"
  26#include "block/block_int.h"
  27#include "sysemu/block-backend.h"
  28#include "qemu/bitops.h"
  29#include "qemu/cutils.h"
  30
  31#define SD_PROTO_VER 0x01
  32
  33#define SD_DEFAULT_ADDR "localhost"
  34#define SD_DEFAULT_PORT 7000
  35
  36#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
  37#define SD_OP_READ_OBJ       0x02
  38#define SD_OP_WRITE_OBJ      0x03
  39/* 0x04 is used internally by Sheepdog */
  40
  41#define SD_OP_NEW_VDI        0x11
  42#define SD_OP_LOCK_VDI       0x12
  43#define SD_OP_RELEASE_VDI    0x13
  44#define SD_OP_GET_VDI_INFO   0x14
  45#define SD_OP_READ_VDIS      0x15
  46#define SD_OP_FLUSH_VDI      0x16
  47#define SD_OP_DEL_VDI        0x17
  48#define SD_OP_GET_CLUSTER_DEFAULT   0x18
  49
  50#define SD_FLAG_CMD_WRITE    0x01
  51#define SD_FLAG_CMD_COW      0x02
  52#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
  53#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
  54
  55#define SD_RES_SUCCESS       0x00 /* Success */
  56#define SD_RES_UNKNOWN       0x01 /* Unknown error */
  57#define SD_RES_NO_OBJ        0x02 /* No object found */
  58#define SD_RES_EIO           0x03 /* I/O error */
  59#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
  60#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
  61#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
  62#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
  63#define SD_RES_NO_VDI        0x08 /* No vdi found */
  64#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
  65#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
  66#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
  67#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
  68#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
  69#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
  70#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
  71#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
  72#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
  73#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
  74#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
  75#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
  76#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
  77#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
  78#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
  79#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
  80#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
  81#define SD_RES_READONLY      0x1A /* Object is read-only */
  82
  83/*
  84 * Object ID rules
  85 *
  86 *  0 - 19 (20 bits): data object space
  87 * 20 - 31 (12 bits): reserved data object space
  88 * 32 - 55 (24 bits): vdi object space
  89 * 56 - 59 ( 4 bits): reserved vdi object space
  90 * 60 - 63 ( 4 bits): object type identifier space
  91 */
  92
  93#define VDI_SPACE_SHIFT   32
  94#define VDI_BIT (UINT64_C(1) << 63)
  95#define VMSTATE_BIT (UINT64_C(1) << 62)
  96#define MAX_DATA_OBJS (UINT64_C(1) << 20)
  97#define MAX_CHILDREN 1024
  98#define SD_MAX_VDI_LEN 256
  99#define SD_MAX_VDI_TAG_LEN 256
 100#define SD_NR_VDIS   (1U << 24)
 101#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
 102#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
 103#define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
 104/*
 105 * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
 106 * (SD_EC_MAX_STRIP - 1) for parity strips
 107 *
 108 * SD_MAX_COPIES is sum of number of data strips and parity strips.
 109 */
 110#define SD_EC_MAX_STRIP 16
 111#define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
 112
 113#define SD_INODE_SIZE (sizeof(SheepdogInode))
 114#define CURRENT_VDI_ID 0
 115
 116#define LOCK_TYPE_NORMAL 0
 117#define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
 118
 119typedef struct SheepdogReq {
 120    uint8_t proto_ver;
 121    uint8_t opcode;
 122    uint16_t flags;
 123    uint32_t epoch;
 124    uint32_t id;
 125    uint32_t data_length;
 126    uint32_t opcode_specific[8];
 127} SheepdogReq;
 128
 129typedef struct SheepdogRsp {
 130    uint8_t proto_ver;
 131    uint8_t opcode;
 132    uint16_t flags;
 133    uint32_t epoch;
 134    uint32_t id;
 135    uint32_t data_length;
 136    uint32_t result;
 137    uint32_t opcode_specific[7];
 138} SheepdogRsp;
 139
 140typedef struct SheepdogObjReq {
 141    uint8_t proto_ver;
 142    uint8_t opcode;
 143    uint16_t flags;
 144    uint32_t epoch;
 145    uint32_t id;
 146    uint32_t data_length;
 147    uint64_t oid;
 148    uint64_t cow_oid;
 149    uint8_t copies;
 150    uint8_t copy_policy;
 151    uint8_t reserved[6];
 152    uint64_t offset;
 153} SheepdogObjReq;
 154
 155typedef struct SheepdogObjRsp {
 156    uint8_t proto_ver;
 157    uint8_t opcode;
 158    uint16_t flags;
 159    uint32_t epoch;
 160    uint32_t id;
 161    uint32_t data_length;
 162    uint32_t result;
 163    uint8_t copies;
 164    uint8_t copy_policy;
 165    uint8_t reserved[2];
 166    uint32_t pad[6];
 167} SheepdogObjRsp;
 168
 169typedef struct SheepdogVdiReq {
 170    uint8_t proto_ver;
 171    uint8_t opcode;
 172    uint16_t flags;
 173    uint32_t epoch;
 174    uint32_t id;
 175    uint32_t data_length;
 176    uint64_t vdi_size;
 177    uint32_t base_vdi_id;
 178    uint8_t copies;
 179    uint8_t copy_policy;
 180    uint8_t store_policy;
 181    uint8_t block_size_shift;
 182    uint32_t snapid;
 183    uint32_t type;
 184    uint32_t pad[2];
 185} SheepdogVdiReq;
 186
 187typedef struct SheepdogVdiRsp {
 188    uint8_t proto_ver;
 189    uint8_t opcode;
 190    uint16_t flags;
 191    uint32_t epoch;
 192    uint32_t id;
 193    uint32_t data_length;
 194    uint32_t result;
 195    uint32_t rsvd;
 196    uint32_t vdi_id;
 197    uint32_t pad[5];
 198} SheepdogVdiRsp;
 199
 200typedef struct SheepdogClusterRsp {
 201    uint8_t proto_ver;
 202    uint8_t opcode;
 203    uint16_t flags;
 204    uint32_t epoch;
 205    uint32_t id;
 206    uint32_t data_length;
 207    uint32_t result;
 208    uint8_t nr_copies;
 209    uint8_t copy_policy;
 210    uint8_t block_size_shift;
 211    uint8_t __pad1;
 212    uint32_t __pad2[6];
 213} SheepdogClusterRsp;
 214
 215typedef struct SheepdogInode {
 216    char name[SD_MAX_VDI_LEN];
 217    char tag[SD_MAX_VDI_TAG_LEN];
 218    uint64_t ctime;
 219    uint64_t snap_ctime;
 220    uint64_t vm_clock_nsec;
 221    uint64_t vdi_size;
 222    uint64_t vm_state_size;
 223    uint16_t copy_policy;
 224    uint8_t nr_copies;
 225    uint8_t block_size_shift;
 226    uint32_t snap_id;
 227    uint32_t vdi_id;
 228    uint32_t parent_vdi_id;
 229    uint32_t child_vdi_id[MAX_CHILDREN];
 230    uint32_t data_vdi_id[MAX_DATA_OBJS];
 231} SheepdogInode;
 232
 233#define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
 234
 235/*
 236 * 64 bit FNV-1a non-zero initial basis
 237 */
 238#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
 239
 240/*
 241 * 64 bit Fowler/Noll/Vo FNV-1a hash code
 242 */
 243static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 244{
 245    unsigned char *bp = buf;
 246    unsigned char *be = bp + len;
 247    while (bp < be) {
 248        hval ^= (uint64_t) *bp++;
 249        hval += (hval << 1) + (hval << 4) + (hval << 5) +
 250            (hval << 7) + (hval << 8) + (hval << 40);
 251    }
 252    return hval;
 253}
 254
 255static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
 256{
 257    return inode->vdi_id == inode->data_vdi_id[idx];
 258}
 259
 260static inline bool is_data_obj(uint64_t oid)
 261{
 262    return !(VDI_BIT & oid);
 263}
 264
 265static inline uint64_t data_oid_to_idx(uint64_t oid)
 266{
 267    return oid & (MAX_DATA_OBJS - 1);
 268}
 269
 270static inline uint32_t oid_to_vid(uint64_t oid)
 271{
 272    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
 273}
 274
 275static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 276{
 277    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
 278}
 279
 280static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
 281{
 282    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 283}
 284
 285static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
 286{
 287    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 288}
 289
 290static inline bool is_snapshot(struct SheepdogInode *inode)
 291{
 292    return !!inode->snap_ctime;
 293}
 294
 295static inline size_t count_data_objs(const struct SheepdogInode *inode)
 296{
 297    return DIV_ROUND_UP(inode->vdi_size,
 298                        (1UL << inode->block_size_shift));
 299}
 300
 301#undef DPRINTF
 302#ifdef DEBUG_SDOG
 303#define DEBUG_SDOG_PRINT 1
 304#else
 305#define DEBUG_SDOG_PRINT 0
 306#endif
 307#define DPRINTF(fmt, args...)                                           \
 308    do {                                                                \
 309        if (DEBUG_SDOG_PRINT) {                                         \
 310            fprintf(stderr, "%s %d: " fmt, __func__, __LINE__, ##args); \
 311        }                                                               \
 312    } while (0)
 313
 314typedef struct SheepdogAIOCB SheepdogAIOCB;
 315typedef struct BDRVSheepdogState BDRVSheepdogState;
 316
 317typedef struct AIOReq {
 318    SheepdogAIOCB *aiocb;
 319    unsigned int iov_offset;
 320
 321    uint64_t oid;
 322    uint64_t base_oid;
 323    uint64_t offset;
 324    unsigned int data_len;
 325    uint8_t flags;
 326    uint32_t id;
 327    bool create;
 328
 329    QLIST_ENTRY(AIOReq) aio_siblings;
 330} AIOReq;
 331
 332enum AIOCBState {
 333    AIOCB_WRITE_UDATA,
 334    AIOCB_READ_UDATA,
 335    AIOCB_FLUSH_CACHE,
 336    AIOCB_DISCARD_OBJ,
 337};
 338
 339#define AIOCBOverlapping(x, y)                                 \
 340    (!(x->max_affect_data_idx < y->min_affect_data_idx          \
 341       || y->max_affect_data_idx < x->min_affect_data_idx))
 342
 343struct SheepdogAIOCB {
 344    BDRVSheepdogState *s;
 345
 346    QEMUIOVector *qiov;
 347
 348    int64_t sector_num;
 349    int nb_sectors;
 350
 351    int ret;
 352    enum AIOCBState aiocb_type;
 353
 354    Coroutine *coroutine;
 355    int nr_pending;
 356
 357    uint32_t min_affect_data_idx;
 358    uint32_t max_affect_data_idx;
 359
 360    /*
 361     * The difference between affect_data_idx and dirty_data_idx:
 362     * affect_data_idx represents range of index of all request types.
 363     * dirty_data_idx represents range of index updated by COW requests.
 364     * dirty_data_idx is used for updating an inode object.
 365     */
 366    uint32_t min_dirty_data_idx;
 367    uint32_t max_dirty_data_idx;
 368
 369    QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
 370};
 371
 372struct BDRVSheepdogState {
 373    BlockDriverState *bs;
 374    AioContext *aio_context;
 375
 376    SheepdogInode inode;
 377
 378    char name[SD_MAX_VDI_LEN];
 379    bool is_snapshot;
 380    uint32_t cache_flags;
 381    bool discard_supported;
 382
 383    SocketAddress *addr;
 384    int fd;
 385
 386    CoMutex lock;
 387    Coroutine *co_send;
 388    Coroutine *co_recv;
 389
 390    uint32_t aioreq_seq_num;
 391
 392    /* Every aio request must be linked to either of these queues. */
 393    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
 394    QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
 395
 396    CoMutex queue_lock;
 397    CoQueue overlapping_queue;
 398    QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
 399};
 400
 401typedef struct BDRVSheepdogReopenState {
 402    int fd;
 403    int cache_flags;
 404} BDRVSheepdogReopenState;
 405
 406static const char *sd_strerror(int err)
 407{
 408    int i;
 409
 410    static const struct {
 411        int err;
 412        const char *desc;
 413    } errors[] = {
 414        {SD_RES_SUCCESS, "Success"},
 415        {SD_RES_UNKNOWN, "Unknown error"},
 416        {SD_RES_NO_OBJ, "No object found"},
 417        {SD_RES_EIO, "I/O error"},
 418        {SD_RES_VDI_EXIST, "VDI exists already"},
 419        {SD_RES_INVALID_PARMS, "Invalid parameters"},
 420        {SD_RES_SYSTEM_ERROR, "System error"},
 421        {SD_RES_VDI_LOCKED, "VDI is already locked"},
 422        {SD_RES_NO_VDI, "No vdi found"},
 423        {SD_RES_NO_BASE_VDI, "No base VDI found"},
 424        {SD_RES_VDI_READ, "Failed read the requested VDI"},
 425        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
 426        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
 427        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
 428        {SD_RES_NO_TAG, "Failed to find the requested tag"},
 429        {SD_RES_STARTUP, "The system is still booting"},
 430        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
 431        {SD_RES_SHUTDOWN, "The system is shutting down"},
 432        {SD_RES_NO_MEM, "Out of memory on the server"},
 433        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
 434        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
 435        {SD_RES_NO_SPACE, "Server has no space for new objects"},
 436        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
 437        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
 438        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
 439        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
 440        {SD_RES_READONLY, "Object is read-only"},
 441    };
 442
 443    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
 444        if (errors[i].err == err) {
 445            return errors[i].desc;
 446        }
 447    }
 448
 449    return "Invalid error code";
 450}
 451
 452/*
 453 * Sheepdog I/O handling:
 454 *
 455 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
 456 *    link the requests to the inflight_list in the
 457 *    BDRVSheepdogState.  The function yields while waiting for
 458 *    receiving the response.
 459 *
 460 * 2. We receive the response in aio_read_response, the fd handler to
 461 *    the sheepdog connection.  We switch back to sd_co_readv/sd_writev
 462 *    after all the requests belonging to the AIOCB are finished.  If
 463 *    needed, sd_co_writev will send another requests for the vdi object.
 464 */
 465
 466static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
 467                                    uint64_t oid, unsigned int data_len,
 468                                    uint64_t offset, uint8_t flags, bool create,
 469                                    uint64_t base_oid, unsigned int iov_offset)
 470{
 471    AIOReq *aio_req;
 472
 473    aio_req = g_malloc(sizeof(*aio_req));
 474    aio_req->aiocb = acb;
 475    aio_req->iov_offset = iov_offset;
 476    aio_req->oid = oid;
 477    aio_req->base_oid = base_oid;
 478    aio_req->offset = offset;
 479    aio_req->data_len = data_len;
 480    aio_req->flags = flags;
 481    aio_req->id = s->aioreq_seq_num++;
 482    aio_req->create = create;
 483
 484    acb->nr_pending++;
 485    return aio_req;
 486}
 487
 488static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
 489{
 490    SheepdogAIOCB *cb;
 491
 492retry:
 493    QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
 494        if (AIOCBOverlapping(acb, cb)) {
 495            qemu_co_queue_wait(&s->overlapping_queue, &s->queue_lock);
 496            goto retry;
 497        }
 498    }
 499}
 500
 501static void sd_aio_setup(SheepdogAIOCB *acb, BDRVSheepdogState *s,
 502                         QEMUIOVector *qiov, int64_t sector_num, int nb_sectors,
 503                         int type)
 504{
 505    uint32_t object_size;
 506
 507    object_size = (UINT32_C(1) << s->inode.block_size_shift);
 508
 509    acb->s = s;
 510
 511    acb->qiov = qiov;
 512
 513    acb->sector_num = sector_num;
 514    acb->nb_sectors = nb_sectors;
 515
 516    acb->coroutine = qemu_coroutine_self();
 517    acb->ret = 0;
 518    acb->nr_pending = 0;
 519
 520    acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
 521    acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
 522                              acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
 523
 524    acb->min_dirty_data_idx = UINT32_MAX;
 525    acb->max_dirty_data_idx = 0;
 526    acb->aiocb_type = type;
 527
 528    if (type == AIOCB_FLUSH_CACHE) {
 529        return;
 530    }
 531
 532    qemu_co_mutex_lock(&s->queue_lock);
 533    wait_for_overlapping_aiocb(s, acb);
 534    QLIST_INSERT_HEAD(&s->inflight_aiocb_head, acb, aiocb_siblings);
 535    qemu_co_mutex_unlock(&s->queue_lock);
 536}
 537
 538static SocketAddress *sd_server_config(QDict *options, Error **errp)
 539{
 540    QDict *server = NULL;
 541    QObject *crumpled_server = NULL;
 542    Visitor *iv = NULL;
 543    SocketAddress *saddr = NULL;
 544    Error *local_err = NULL;
 545
 546    qdict_extract_subqdict(options, &server, "server.");
 547
 548    crumpled_server = qdict_crumple(server, errp);
 549    if (!crumpled_server) {
 550        goto done;
 551    }
 552
 553    /*
 554     * FIXME .numeric, .to, .ipv4 or .ipv6 don't work with -drive
 555     * server.type=inet.  .to doesn't matter, it's ignored anyway.
 556     * That's because when @options come from -blockdev or
 557     * blockdev_add, members are typed according to the QAPI schema,
 558     * but when they come from -drive, they're all QString.  The
 559     * visitor expects the former.
 560     */
 561    iv = qobject_input_visitor_new(crumpled_server);
 562    visit_type_SocketAddress(iv, NULL, &saddr, &local_err);
 563    if (local_err) {
 564        error_propagate(errp, local_err);
 565        goto done;
 566    }
 567
 568done:
 569    visit_free(iv);
 570    qobject_decref(crumpled_server);
 571    QDECREF(server);
 572    return saddr;
 573}
 574
 575/* Return -EIO in case of error, file descriptor on success */
 576static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
 577{
 578    int fd;
 579
 580    fd = socket_connect(s->addr, errp);
 581
 582    if (s->addr->type == SOCKET_ADDRESS_TYPE_INET && fd >= 0) {
 583        int ret = socket_set_nodelay(fd);
 584        if (ret < 0) {
 585            error_report("%s", strerror(errno));
 586        }
 587    }
 588
 589    if (fd >= 0) {
 590        qemu_set_nonblock(fd);
 591    } else {
 592        fd = -EIO;
 593    }
 594
 595    return fd;
 596}
 597
 598/* Return 0 on success and -errno in case of error */
 599static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
 600                                    unsigned int *wlen)
 601{
 602    int ret;
 603
 604    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
 605    if (ret != sizeof(*hdr)) {
 606        error_report("failed to send a req, %s", strerror(errno));
 607        return -errno;
 608    }
 609
 610    ret = qemu_co_send(sockfd, data, *wlen);
 611    if (ret != *wlen) {
 612        error_report("failed to send a req, %s", strerror(errno));
 613        return -errno;
 614    }
 615
 616    return ret;
 617}
 618
 619typedef struct SheepdogReqCo {
 620    int sockfd;
 621    BlockDriverState *bs;
 622    AioContext *aio_context;
 623    SheepdogReq *hdr;
 624    void *data;
 625    unsigned int *wlen;
 626    unsigned int *rlen;
 627    int ret;
 628    bool finished;
 629    Coroutine *co;
 630} SheepdogReqCo;
 631
 632static void restart_co_req(void *opaque)
 633{
 634    SheepdogReqCo *srco = opaque;
 635
 636    aio_co_wake(srco->co);
 637}
 638
 639static coroutine_fn void do_co_req(void *opaque)
 640{
 641    int ret;
 642    SheepdogReqCo *srco = opaque;
 643    int sockfd = srco->sockfd;
 644    SheepdogReq *hdr = srco->hdr;
 645    void *data = srco->data;
 646    unsigned int *wlen = srco->wlen;
 647    unsigned int *rlen = srco->rlen;
 648
 649    srco->co = qemu_coroutine_self();
 650    aio_set_fd_handler(srco->aio_context, sockfd, false,
 651                       NULL, restart_co_req, NULL, srco);
 652
 653    ret = send_co_req(sockfd, hdr, data, wlen);
 654    if (ret < 0) {
 655        goto out;
 656    }
 657
 658    aio_set_fd_handler(srco->aio_context, sockfd, false,
 659                       restart_co_req, NULL, NULL, srco);
 660
 661    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
 662    if (ret != sizeof(*hdr)) {
 663        error_report("failed to get a rsp, %s", strerror(errno));
 664        ret = -errno;
 665        goto out;
 666    }
 667
 668    if (*rlen > hdr->data_length) {
 669        *rlen = hdr->data_length;
 670    }
 671
 672    if (*rlen) {
 673        ret = qemu_co_recv(sockfd, data, *rlen);
 674        if (ret != *rlen) {
 675            error_report("failed to get the data, %s", strerror(errno));
 676            ret = -errno;
 677            goto out;
 678        }
 679    }
 680    ret = 0;
 681out:
 682    /* there is at most one request for this sockfd, so it is safe to
 683     * set each handler to NULL. */
 684    aio_set_fd_handler(srco->aio_context, sockfd, false,
 685                       NULL, NULL, NULL, NULL);
 686
 687    srco->co = NULL;
 688    srco->ret = ret;
 689    /* Set srco->finished before reading bs->wakeup.  */
 690    atomic_mb_set(&srco->finished, true);
 691    if (srco->bs) {
 692        bdrv_wakeup(srco->bs);
 693    }
 694}
 695
 696/*
 697 * Send the request to the sheep in a synchronous manner.
 698 *
 699 * Return 0 on success, -errno in case of error.
 700 */
 701static int do_req(int sockfd, BlockDriverState *bs, SheepdogReq *hdr,
 702                  void *data, unsigned int *wlen, unsigned int *rlen)
 703{
 704    Coroutine *co;
 705    SheepdogReqCo srco = {
 706        .sockfd = sockfd,
 707        .aio_context = bs ? bdrv_get_aio_context(bs) : qemu_get_aio_context(),
 708        .bs = bs,
 709        .hdr = hdr,
 710        .data = data,
 711        .wlen = wlen,
 712        .rlen = rlen,
 713        .ret = 0,
 714        .finished = false,
 715    };
 716
 717    if (qemu_in_coroutine()) {
 718        do_co_req(&srco);
 719    } else {
 720        co = qemu_coroutine_create(do_co_req, &srco);
 721        if (bs) {
 722            bdrv_coroutine_enter(bs, co);
 723            BDRV_POLL_WHILE(bs, !srco.finished);
 724        } else {
 725            qemu_coroutine_enter(co);
 726            while (!srco.finished) {
 727                aio_poll(qemu_get_aio_context(), true);
 728            }
 729        }
 730    }
 731
 732    return srco.ret;
 733}
 734
 735static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 736                                         struct iovec *iov, int niov,
 737                                         enum AIOCBState aiocb_type);
 738static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
 739static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
 740static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
 741static void co_write_request(void *opaque);
 742
 743static coroutine_fn void reconnect_to_sdog(void *opaque)
 744{
 745    BDRVSheepdogState *s = opaque;
 746    AIOReq *aio_req, *next;
 747
 748    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
 749                       NULL, NULL, NULL);
 750    close(s->fd);
 751    s->fd = -1;
 752
 753    /* Wait for outstanding write requests to be completed. */
 754    while (s->co_send != NULL) {
 755        co_write_request(opaque);
 756    }
 757
 758    /* Try to reconnect the sheepdog server every one second. */
 759    while (s->fd < 0) {
 760        Error *local_err = NULL;
 761        s->fd = get_sheep_fd(s, &local_err);
 762        if (s->fd < 0) {
 763            DPRINTF("Wait for connection to be established\n");
 764            error_report_err(local_err);
 765            qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 1000000000ULL);
 766        }
 767    };
 768
 769    /*
 770     * Now we have to resend all the request in the inflight queue.  However,
 771     * resend_aioreq() can yield and newly created requests can be added to the
 772     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
 773     * have to move all the inflight requests to the failed queue before
 774     * resend_aioreq() is called.
 775     */
 776    qemu_co_mutex_lock(&s->queue_lock);
 777    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
 778        QLIST_REMOVE(aio_req, aio_siblings);
 779        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
 780    }
 781
 782    /* Resend all the failed aio requests. */
 783    while (!QLIST_EMPTY(&s->failed_aio_head)) {
 784        aio_req = QLIST_FIRST(&s->failed_aio_head);
 785        QLIST_REMOVE(aio_req, aio_siblings);
 786        qemu_co_mutex_unlock(&s->queue_lock);
 787        resend_aioreq(s, aio_req);
 788        qemu_co_mutex_lock(&s->queue_lock);
 789    }
 790    qemu_co_mutex_unlock(&s->queue_lock);
 791}
 792
 793/*
 794 * Receive responses of the I/O requests.
 795 *
 796 * This function is registered as a fd handler, and called from the
 797 * main loop when s->fd is ready for reading responses.
 798 */
 799static void coroutine_fn aio_read_response(void *opaque)
 800{
 801    SheepdogObjRsp rsp;
 802    BDRVSheepdogState *s = opaque;
 803    int fd = s->fd;
 804    int ret;
 805    AIOReq *aio_req = NULL;
 806    SheepdogAIOCB *acb;
 807    uint64_t idx;
 808
 809    /* read a header */
 810    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
 811    if (ret != sizeof(rsp)) {
 812        error_report("failed to get the header, %s", strerror(errno));
 813        goto err;
 814    }
 815
 816    /* find the right aio_req from the inflight aio list */
 817    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
 818        if (aio_req->id == rsp.id) {
 819            break;
 820        }
 821    }
 822    if (!aio_req) {
 823        error_report("cannot find aio_req %x", rsp.id);
 824        goto err;
 825    }
 826
 827    acb = aio_req->aiocb;
 828
 829    switch (acb->aiocb_type) {
 830    case AIOCB_WRITE_UDATA:
 831        if (!is_data_obj(aio_req->oid)) {
 832            break;
 833        }
 834        idx = data_oid_to_idx(aio_req->oid);
 835
 836        if (aio_req->create) {
 837            /*
 838             * If the object is newly created one, we need to update
 839             * the vdi object (metadata object).  min_dirty_data_idx
 840             * and max_dirty_data_idx are changed to include updated
 841             * index between them.
 842             */
 843            if (rsp.result == SD_RES_SUCCESS) {
 844                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
 845                acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
 846                acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
 847            }
 848        }
 849        break;
 850    case AIOCB_READ_UDATA:
 851        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
 852                            aio_req->iov_offset, rsp.data_length);
 853        if (ret != rsp.data_length) {
 854            error_report("failed to get the data, %s", strerror(errno));
 855            goto err;
 856        }
 857        break;
 858    case AIOCB_FLUSH_CACHE:
 859        if (rsp.result == SD_RES_INVALID_PARMS) {
 860            DPRINTF("disable cache since the server doesn't support it\n");
 861            s->cache_flags = SD_FLAG_CMD_DIRECT;
 862            rsp.result = SD_RES_SUCCESS;
 863        }
 864        break;
 865    case AIOCB_DISCARD_OBJ:
 866        switch (rsp.result) {
 867        case SD_RES_INVALID_PARMS:
 868            error_report("server doesn't support discard command");
 869            rsp.result = SD_RES_SUCCESS;
 870            s->discard_supported = false;
 871            break;
 872        default:
 873            break;
 874        }
 875    }
 876
 877    /* No more data for this aio_req (reload_inode below uses its own file
 878     * descriptor handler which doesn't use co_recv).
 879    */
 880    s->co_recv = NULL;
 881
 882    qemu_co_mutex_lock(&s->queue_lock);
 883    QLIST_REMOVE(aio_req, aio_siblings);
 884    qemu_co_mutex_unlock(&s->queue_lock);
 885
 886    switch (rsp.result) {
 887    case SD_RES_SUCCESS:
 888        break;
 889    case SD_RES_READONLY:
 890        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
 891            ret = reload_inode(s, 0, "");
 892            if (ret < 0) {
 893                goto err;
 894            }
 895        }
 896        if (is_data_obj(aio_req->oid)) {
 897            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
 898                                           data_oid_to_idx(aio_req->oid));
 899        } else {
 900            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
 901        }
 902        resend_aioreq(s, aio_req);
 903        return;
 904    default:
 905        acb->ret = -EIO;
 906        error_report("%s", sd_strerror(rsp.result));
 907        break;
 908    }
 909
 910    g_free(aio_req);
 911
 912    if (!--acb->nr_pending) {
 913        /*
 914         * We've finished all requests which belong to the AIOCB, so
 915         * we can switch back to sd_co_readv/writev now.
 916         */
 917        aio_co_wake(acb->coroutine);
 918    }
 919
 920    return;
 921
 922err:
 923    reconnect_to_sdog(opaque);
 924}
 925
 926static void co_read_response(void *opaque)
 927{
 928    BDRVSheepdogState *s = opaque;
 929
 930    if (!s->co_recv) {
 931        s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
 932    }
 933
 934    aio_co_enter(s->aio_context, s->co_recv);
 935}
 936
 937static void co_write_request(void *opaque)
 938{
 939    BDRVSheepdogState *s = opaque;
 940
 941    aio_co_wake(s->co_send);
 942}
 943
 944/*
 945 * Return a socket descriptor to read/write objects.
 946 *
 947 * We cannot use this descriptor for other operations because
 948 * the block driver may be on waiting response from the server.
 949 */
 950static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
 951{
 952    int fd;
 953
 954    fd = connect_to_sdog(s, errp);
 955    if (fd < 0) {
 956        return fd;
 957    }
 958
 959    aio_set_fd_handler(s->aio_context, fd, false,
 960                       co_read_response, NULL, NULL, s);
 961    return fd;
 962}
 963
 964/*
 965 * Parse numeric snapshot ID in @str
 966 * If @str can't be parsed as number, return false.
 967 * Else, if the number is zero or too large, set *@snapid to zero and
 968 * return true.
 969 * Else, set *@snapid to the number and return true.
 970 */
 971static bool sd_parse_snapid(const char *str, uint32_t *snapid)
 972{
 973    unsigned long ul;
 974    int ret;
 975
 976    ret = qemu_strtoul(str, NULL, 10, &ul);
 977    if (ret == -ERANGE) {
 978        ul = ret = 0;
 979    }
 980    if (ret) {
 981        return false;
 982    }
 983    if (ul > UINT32_MAX) {
 984        ul = 0;
 985    }
 986
 987    *snapid = ul;
 988    return true;
 989}
 990
 991static bool sd_parse_snapid_or_tag(const char *str,
 992                                   uint32_t *snapid, char tag[])
 993{
 994    if (!sd_parse_snapid(str, snapid)) {
 995        *snapid = 0;
 996        if (g_strlcpy(tag, str, SD_MAX_VDI_TAG_LEN) >= SD_MAX_VDI_TAG_LEN) {
 997            return false;
 998        }
 999    } else if (!*snapid) {
1000        return false;
1001    } else {
1002        tag[0] = 0;
1003    }
1004    return true;
1005}
1006
1007typedef struct {
1008    const char *path;           /* non-null iff transport is tcp */
1009    const char *host;           /* valid when transport is tcp */
1010    int port;                   /* valid when transport is tcp */
1011    char vdi[SD_MAX_VDI_LEN];
1012    char tag[SD_MAX_VDI_TAG_LEN];
1013    uint32_t snap_id;
1014    /* Remainder is only for sd_config_done() */
1015    URI *uri;
1016    QueryParams *qp;
1017} SheepdogConfig;
1018
1019static void sd_config_done(SheepdogConfig *cfg)
1020{
1021    if (cfg->qp) {
1022        query_params_free(cfg->qp);
1023    }
1024    uri_free(cfg->uri);
1025}
1026
1027static void sd_parse_uri(SheepdogConfig *cfg, const char *filename,
1028                         Error **errp)
1029{
1030    Error *err = NULL;
1031    QueryParams *qp = NULL;
1032    bool is_unix;
1033    URI *uri;
1034
1035    memset(cfg, 0, sizeof(*cfg));
1036
1037    cfg->uri = uri = uri_parse(filename);
1038    if (!uri) {
1039        error_setg(&err, "invalid URI '%s'", filename);
1040        goto out;
1041    }
1042
1043    /* transport */
1044    if (!g_strcmp0(uri->scheme, "sheepdog")) {
1045        is_unix = false;
1046    } else if (!g_strcmp0(uri->scheme, "sheepdog+tcp")) {
1047        is_unix = false;
1048    } else if (!g_strcmp0(uri->scheme, "sheepdog+unix")) {
1049        is_unix = true;
1050    } else {
1051        error_setg(&err, "URI scheme must be 'sheepdog', 'sheepdog+tcp',"
1052                   " or 'sheepdog+unix'");
1053        goto out;
1054    }
1055
1056    if (uri->path == NULL || !strcmp(uri->path, "/")) {
1057        error_setg(&err, "missing file path in URI");
1058        goto out;
1059    }
1060    if (g_strlcpy(cfg->vdi, uri->path + 1, SD_MAX_VDI_LEN)
1061        >= SD_MAX_VDI_LEN) {
1062        error_setg(&err, "VDI name is too long");
1063        goto out;
1064    }
1065
1066    cfg->qp = qp = query_params_parse(uri->query);
1067
1068    if (is_unix) {
1069        /* sheepdog+unix:///vdiname?socket=path */
1070        if (uri->server || uri->port) {
1071            error_setg(&err, "URI scheme %s doesn't accept a server address",
1072                       uri->scheme);
1073            goto out;
1074        }
1075        if (!qp->n) {
1076            error_setg(&err,
1077                       "URI scheme %s requires query parameter 'socket'",
1078                       uri->scheme);
1079            goto out;
1080        }
1081        if (qp->n != 1 || strcmp(qp->p[0].name, "socket")) {
1082            error_setg(&err, "unexpected query parameters");
1083            goto out;
1084        }
1085        cfg->path = qp->p[0].value;
1086    } else {
1087        /* sheepdog[+tcp]://[host:port]/vdiname */
1088        if (qp->n) {
1089            error_setg(&err, "unexpected query parameters");
1090            goto out;
1091        }
1092        cfg->host = uri->server;
1093        cfg->port = uri->port;
1094    }
1095
1096    /* snapshot tag */
1097    if (uri->fragment) {
1098        if (!sd_parse_snapid_or_tag(uri->fragment,
1099                                    &cfg->snap_id, cfg->tag)) {
1100            error_setg(&err, "'%s' is not a valid snapshot ID",
1101                       uri->fragment);
1102            goto out;
1103        }
1104    } else {
1105        cfg->snap_id = CURRENT_VDI_ID; /* search current vdi */
1106    }
1107
1108out:
1109    if (err) {
1110        error_propagate(errp, err);
1111        sd_config_done(cfg);
1112    }
1113}
1114
1115/*
1116 * Parse a filename (old syntax)
1117 *
1118 * filename must be one of the following formats:
1119 *   1. [vdiname]
1120 *   2. [vdiname]:[snapid]
1121 *   3. [vdiname]:[tag]
1122 *   4. [hostname]:[port]:[vdiname]
1123 *   5. [hostname]:[port]:[vdiname]:[snapid]
1124 *   6. [hostname]:[port]:[vdiname]:[tag]
1125 *
1126 * You can boot from the snapshot images by specifying `snapid` or
1127 * `tag'.
1128 *
1129 * You can run VMs outside the Sheepdog cluster by specifying
1130 * `hostname' and `port' (experimental).
1131 */
1132static void parse_vdiname(SheepdogConfig *cfg, const char *filename,
1133                          Error **errp)
1134{
1135    Error *err = NULL;
1136    char *p, *q, *uri;
1137    const char *host_spec, *vdi_spec;
1138    int nr_sep;
1139
1140    strstart(filename, "sheepdog:", &filename);
1141    p = q = g_strdup(filename);
1142
1143    /* count the number of separators */
1144    nr_sep = 0;
1145    while (*p) {
1146        if (*p == ':') {
1147            nr_sep++;
1148        }
1149        p++;
1150    }
1151    p = q;
1152
1153    /* use the first two tokens as host_spec. */
1154    if (nr_sep >= 2) {
1155        host_spec = p;
1156        p = strchr(p, ':');
1157        p++;
1158        p = strchr(p, ':');
1159        *p++ = '\0';
1160    } else {
1161        host_spec = "";
1162    }
1163
1164    vdi_spec = p;
1165
1166    p = strchr(vdi_spec, ':');
1167    if (p) {
1168        *p++ = '#';
1169    }
1170
1171    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1172
1173    /*
1174     * FIXME We to escape URI meta-characters, e.g. "x?y=z"
1175     * produces "sheepdog://x?y=z".  Because of that ...
1176     */
1177    sd_parse_uri(cfg, uri, &err);
1178    if (err) {
1179        /*
1180         * ... this can fail, but the error message is misleading.
1181         * Replace it by the traditional useless one until the
1182         * escaping is fixed.
1183         */
1184        error_free(err);
1185        error_setg(errp, "Can't parse filename");
1186    }
1187
1188    g_free(q);
1189    g_free(uri);
1190}
1191
1192static void sd_parse_filename(const char *filename, QDict *options,
1193                              Error **errp)
1194{
1195    Error *err = NULL;
1196    SheepdogConfig cfg;
1197    char buf[32];
1198
1199    if (strstr(filename, "://")) {
1200        sd_parse_uri(&cfg, filename, &err);
1201    } else {
1202        parse_vdiname(&cfg, filename, &err);
1203    }
1204    if (err) {
1205        error_propagate(errp, err);
1206        return;
1207    }
1208
1209    if (cfg.path) {
1210        qdict_set_default_str(options, "server.path", cfg.path);
1211        qdict_set_default_str(options, "server.type", "unix");
1212    } else {
1213        qdict_set_default_str(options, "server.type", "inet");
1214        qdict_set_default_str(options, "server.host",
1215                              cfg.host ?: SD_DEFAULT_ADDR);
1216        snprintf(buf, sizeof(buf), "%d", cfg.port ?: SD_DEFAULT_PORT);
1217        qdict_set_default_str(options, "server.port", buf);
1218    }
1219    qdict_set_default_str(options, "vdi", cfg.vdi);
1220    qdict_set_default_str(options, "tag", cfg.tag);
1221    if (cfg.snap_id) {
1222        snprintf(buf, sizeof(buf), "%d", cfg.snap_id);
1223        qdict_set_default_str(options, "snap-id", buf);
1224    }
1225
1226    sd_config_done(&cfg);
1227}
1228
1229static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1230                         uint32_t snapid, const char *tag, uint32_t *vid,
1231                         bool lock, Error **errp)
1232{
1233    int ret, fd;
1234    SheepdogVdiReq hdr;
1235    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1236    unsigned int wlen, rlen = 0;
1237    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
1238
1239    fd = connect_to_sdog(s, errp);
1240    if (fd < 0) {
1241        return fd;
1242    }
1243
1244    /* This pair of strncpy calls ensures that the buffer is zero-filled,
1245     * which is desirable since we'll soon be sending those bytes, and
1246     * don't want the send_req to read uninitialized data.
1247     */
1248    strncpy(buf, filename, SD_MAX_VDI_LEN);
1249    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1250
1251    memset(&hdr, 0, sizeof(hdr));
1252    if (lock) {
1253        hdr.opcode = SD_OP_LOCK_VDI;
1254        hdr.type = LOCK_TYPE_NORMAL;
1255    } else {
1256        hdr.opcode = SD_OP_GET_VDI_INFO;
1257    }
1258    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1259    hdr.proto_ver = SD_PROTO_VER;
1260    hdr.data_length = wlen;
1261    hdr.snapid = snapid;
1262    hdr.flags = SD_FLAG_CMD_WRITE;
1263
1264    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1265    if (ret) {
1266        error_setg_errno(errp, -ret, "cannot get vdi info");
1267        goto out;
1268    }
1269
1270    if (rsp->result != SD_RES_SUCCESS) {
1271        error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1272                   sd_strerror(rsp->result), filename, snapid, tag);
1273        if (rsp->result == SD_RES_NO_VDI) {
1274            ret = -ENOENT;
1275        } else if (rsp->result == SD_RES_VDI_LOCKED) {
1276            ret = -EBUSY;
1277        } else {
1278            ret = -EIO;
1279        }
1280        goto out;
1281    }
1282    *vid = rsp->vdi_id;
1283
1284    ret = 0;
1285out:
1286    closesocket(fd);
1287    return ret;
1288}
1289
1290static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1291                                         struct iovec *iov, int niov,
1292                                         enum AIOCBState aiocb_type)
1293{
1294    int nr_copies = s->inode.nr_copies;
1295    SheepdogObjReq hdr;
1296    unsigned int wlen = 0;
1297    int ret;
1298    uint64_t oid = aio_req->oid;
1299    unsigned int datalen = aio_req->data_len;
1300    uint64_t offset = aio_req->offset;
1301    uint8_t flags = aio_req->flags;
1302    uint64_t old_oid = aio_req->base_oid;
1303    bool create = aio_req->create;
1304
1305    qemu_co_mutex_lock(&s->queue_lock);
1306    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1307    qemu_co_mutex_unlock(&s->queue_lock);
1308
1309    if (!nr_copies) {
1310        error_report("bug");
1311    }
1312
1313    memset(&hdr, 0, sizeof(hdr));
1314
1315    switch (aiocb_type) {
1316    case AIOCB_FLUSH_CACHE:
1317        hdr.opcode = SD_OP_FLUSH_VDI;
1318        break;
1319    case AIOCB_READ_UDATA:
1320        hdr.opcode = SD_OP_READ_OBJ;
1321        hdr.flags = flags;
1322        break;
1323    case AIOCB_WRITE_UDATA:
1324        if (create) {
1325            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1326        } else {
1327            hdr.opcode = SD_OP_WRITE_OBJ;
1328        }
1329        wlen = datalen;
1330        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1331        break;
1332    case AIOCB_DISCARD_OBJ:
1333        hdr.opcode = SD_OP_WRITE_OBJ;
1334        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1335        s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1336        offset = offsetof(SheepdogInode,
1337                          data_vdi_id[data_oid_to_idx(oid)]);
1338        oid = vid_to_vdi_oid(s->inode.vdi_id);
1339        wlen = datalen = sizeof(uint32_t);
1340        break;
1341    }
1342
1343    if (s->cache_flags) {
1344        hdr.flags |= s->cache_flags;
1345    }
1346
1347    hdr.oid = oid;
1348    hdr.cow_oid = old_oid;
1349    hdr.copies = s->inode.nr_copies;
1350
1351    hdr.data_length = datalen;
1352    hdr.offset = offset;
1353
1354    hdr.id = aio_req->id;
1355
1356    qemu_co_mutex_lock(&s->lock);
1357    s->co_send = qemu_coroutine_self();
1358    aio_set_fd_handler(s->aio_context, s->fd, false,
1359                       co_read_response, co_write_request, NULL, s);
1360    socket_set_cork(s->fd, 1);
1361
1362    /* send a header */
1363    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1364    if (ret != sizeof(hdr)) {
1365        error_report("failed to send a req, %s", strerror(errno));
1366        goto out;
1367    }
1368
1369    if (wlen) {
1370        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1371        if (ret != wlen) {
1372            error_report("failed to send a data, %s", strerror(errno));
1373        }
1374    }
1375out:
1376    socket_set_cork(s->fd, 0);
1377    aio_set_fd_handler(s->aio_context, s->fd, false,
1378                       co_read_response, NULL, NULL, s);
1379    s->co_send = NULL;
1380    qemu_co_mutex_unlock(&s->lock);
1381}
1382
1383static int read_write_object(int fd, BlockDriverState *bs, char *buf,
1384                             uint64_t oid, uint8_t copies,
1385                             unsigned int datalen, uint64_t offset,
1386                             bool write, bool create, uint32_t cache_flags)
1387{
1388    SheepdogObjReq hdr;
1389    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1390    unsigned int wlen, rlen;
1391    int ret;
1392
1393    memset(&hdr, 0, sizeof(hdr));
1394
1395    if (write) {
1396        wlen = datalen;
1397        rlen = 0;
1398        hdr.flags = SD_FLAG_CMD_WRITE;
1399        if (create) {
1400            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1401        } else {
1402            hdr.opcode = SD_OP_WRITE_OBJ;
1403        }
1404    } else {
1405        wlen = 0;
1406        rlen = datalen;
1407        hdr.opcode = SD_OP_READ_OBJ;
1408    }
1409
1410    hdr.flags |= cache_flags;
1411
1412    hdr.oid = oid;
1413    hdr.data_length = datalen;
1414    hdr.offset = offset;
1415    hdr.copies = copies;
1416
1417    ret = do_req(fd, bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1418    if (ret) {
1419        error_report("failed to send a request to the sheep");
1420        return ret;
1421    }
1422
1423    switch (rsp->result) {
1424    case SD_RES_SUCCESS:
1425        return 0;
1426    default:
1427        error_report("%s", sd_strerror(rsp->result));
1428        return -EIO;
1429    }
1430}
1431
1432static int read_object(int fd, BlockDriverState *bs, char *buf,
1433                       uint64_t oid, uint8_t copies,
1434                       unsigned int datalen, uint64_t offset,
1435                       uint32_t cache_flags)
1436{
1437    return read_write_object(fd, bs, buf, oid, copies,
1438                             datalen, offset, false,
1439                             false, cache_flags);
1440}
1441
1442static int write_object(int fd, BlockDriverState *bs, char *buf,
1443                        uint64_t oid, uint8_t copies,
1444                        unsigned int datalen, uint64_t offset, bool create,
1445                        uint32_t cache_flags)
1446{
1447    return read_write_object(fd, bs, buf, oid, copies,
1448                             datalen, offset, true,
1449                             create, cache_flags);
1450}
1451
1452/* update inode with the latest state */
1453static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1454{
1455    Error *local_err = NULL;
1456    SheepdogInode *inode;
1457    int ret = 0, fd;
1458    uint32_t vid = 0;
1459
1460    fd = connect_to_sdog(s, &local_err);
1461    if (fd < 0) {
1462        error_report_err(local_err);
1463        return -EIO;
1464    }
1465
1466    inode = g_malloc(SD_INODE_HEADER_SIZE);
1467
1468    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1469    if (ret) {
1470        error_report_err(local_err);
1471        goto out;
1472    }
1473
1474    ret = read_object(fd, s->bs, (char *)inode, vid_to_vdi_oid(vid),
1475                      s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1476                      s->cache_flags);
1477    if (ret < 0) {
1478        goto out;
1479    }
1480
1481    if (inode->vdi_id != s->inode.vdi_id) {
1482        memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1483    }
1484
1485out:
1486    g_free(inode);
1487    closesocket(fd);
1488
1489    return ret;
1490}
1491
1492static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1493{
1494    SheepdogAIOCB *acb = aio_req->aiocb;
1495
1496    aio_req->create = false;
1497
1498    /* check whether this request becomes a CoW one */
1499    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1500        int idx = data_oid_to_idx(aio_req->oid);
1501
1502        if (is_data_obj_writable(&s->inode, idx)) {
1503            goto out;
1504        }
1505
1506        if (s->inode.data_vdi_id[idx]) {
1507            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1508            aio_req->flags |= SD_FLAG_CMD_COW;
1509        }
1510        aio_req->create = true;
1511    }
1512out:
1513    if (is_data_obj(aio_req->oid)) {
1514        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1515                        acb->aiocb_type);
1516    } else {
1517        struct iovec iov;
1518        iov.iov_base = &s->inode;
1519        iov.iov_len = sizeof(s->inode);
1520        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1521    }
1522}
1523
1524static void sd_detach_aio_context(BlockDriverState *bs)
1525{
1526    BDRVSheepdogState *s = bs->opaque;
1527
1528    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1529                       NULL, NULL, NULL);
1530}
1531
1532static void sd_attach_aio_context(BlockDriverState *bs,
1533                                  AioContext *new_context)
1534{
1535    BDRVSheepdogState *s = bs->opaque;
1536
1537    s->aio_context = new_context;
1538    aio_set_fd_handler(new_context, s->fd, false,
1539                       co_read_response, NULL, NULL, s);
1540}
1541
1542static QemuOptsList runtime_opts = {
1543    .name = "sheepdog",
1544    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1545    .desc = {
1546        {
1547            .name = "vdi",
1548            .type = QEMU_OPT_STRING,
1549        },
1550        {
1551            .name = "snap-id",
1552            .type = QEMU_OPT_NUMBER,
1553        },
1554        {
1555            .name = "tag",
1556            .type = QEMU_OPT_STRING,
1557        },
1558        { /* end of list */ }
1559    },
1560};
1561
1562static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1563                   Error **errp)
1564{
1565    int ret, fd;
1566    uint32_t vid = 0;
1567    BDRVSheepdogState *s = bs->opaque;
1568    const char *vdi, *snap_id_str, *tag;
1569    uint64_t snap_id;
1570    char *buf = NULL;
1571    QemuOpts *opts;
1572    Error *local_err = NULL;
1573
1574    s->bs = bs;
1575    s->aio_context = bdrv_get_aio_context(bs);
1576
1577    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1578    qemu_opts_absorb_qdict(opts, options, &local_err);
1579    if (local_err) {
1580        error_propagate(errp, local_err);
1581        ret = -EINVAL;
1582        goto err_no_fd;
1583    }
1584
1585    s->addr = sd_server_config(options, errp);
1586    if (!s->addr) {
1587        ret = -EINVAL;
1588        goto err_no_fd;
1589    }
1590
1591    vdi = qemu_opt_get(opts, "vdi");
1592    snap_id_str = qemu_opt_get(opts, "snap-id");
1593    snap_id = qemu_opt_get_number(opts, "snap-id", CURRENT_VDI_ID);
1594    tag = qemu_opt_get(opts, "tag");
1595
1596    if (!vdi) {
1597        error_setg(errp, "parameter 'vdi' is missing");
1598        ret = -EINVAL;
1599        goto err_no_fd;
1600    }
1601    if (strlen(vdi) >= SD_MAX_VDI_LEN) {
1602        error_setg(errp, "value of parameter 'vdi' is too long");
1603        ret = -EINVAL;
1604        goto err_no_fd;
1605    }
1606
1607    if (snap_id > UINT32_MAX) {
1608        snap_id = 0;
1609    }
1610    if (snap_id_str && !snap_id) {
1611        error_setg(errp, "'snap-id=%s' is not a valid snapshot ID",
1612                   snap_id_str);
1613        ret = -EINVAL;
1614        goto err_no_fd;
1615    }
1616
1617    if (!tag) {
1618        tag = "";
1619    }
1620    if (strlen(tag) >= SD_MAX_VDI_TAG_LEN) {
1621        error_setg(errp, "value of parameter 'tag' is too long");
1622        ret = -EINVAL;
1623        goto err_no_fd;
1624    }
1625
1626    QLIST_INIT(&s->inflight_aio_head);
1627    QLIST_INIT(&s->failed_aio_head);
1628    QLIST_INIT(&s->inflight_aiocb_head);
1629
1630    s->fd = get_sheep_fd(s, errp);
1631    if (s->fd < 0) {
1632        ret = s->fd;
1633        goto err_no_fd;
1634    }
1635
1636    ret = find_vdi_name(s, vdi, (uint32_t)snap_id, tag, &vid, true, errp);
1637    if (ret) {
1638        goto err;
1639    }
1640
1641    /*
1642     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1643     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1644     */
1645    s->cache_flags = SD_FLAG_CMD_CACHE;
1646    if (flags & BDRV_O_NOCACHE) {
1647        s->cache_flags = SD_FLAG_CMD_DIRECT;
1648    }
1649    s->discard_supported = true;
1650
1651    if (snap_id || tag[0]) {
1652        DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1653        s->is_snapshot = true;
1654    }
1655
1656    fd = connect_to_sdog(s, errp);
1657    if (fd < 0) {
1658        ret = fd;
1659        goto err;
1660    }
1661
1662    buf = g_malloc(SD_INODE_SIZE);
1663    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
1664                      0, SD_INODE_SIZE, 0, s->cache_flags);
1665
1666    closesocket(fd);
1667
1668    if (ret) {
1669        error_setg(errp, "Can't read snapshot inode");
1670        goto err;
1671    }
1672
1673    memcpy(&s->inode, buf, sizeof(s->inode));
1674
1675    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1676    pstrcpy(s->name, sizeof(s->name), vdi);
1677    qemu_co_mutex_init(&s->lock);
1678    qemu_co_mutex_init(&s->queue_lock);
1679    qemu_co_queue_init(&s->overlapping_queue);
1680    qemu_opts_del(opts);
1681    g_free(buf);
1682    return 0;
1683
1684err:
1685    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1686                       false, NULL, NULL, NULL, NULL);
1687    closesocket(s->fd);
1688err_no_fd:
1689    qemu_opts_del(opts);
1690    g_free(buf);
1691    return ret;
1692}
1693
1694static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1695                             Error **errp)
1696{
1697    BDRVSheepdogState *s = state->bs->opaque;
1698    BDRVSheepdogReopenState *re_s;
1699    int ret = 0;
1700
1701    re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1702
1703    re_s->cache_flags = SD_FLAG_CMD_CACHE;
1704    if (state->flags & BDRV_O_NOCACHE) {
1705        re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1706    }
1707
1708    re_s->fd = get_sheep_fd(s, errp);
1709    if (re_s->fd < 0) {
1710        ret = re_s->fd;
1711        return ret;
1712    }
1713
1714    return ret;
1715}
1716
1717static void sd_reopen_commit(BDRVReopenState *state)
1718{
1719    BDRVSheepdogReopenState *re_s = state->opaque;
1720    BDRVSheepdogState *s = state->bs->opaque;
1721
1722    if (s->fd) {
1723        aio_set_fd_handler(s->aio_context, s->fd, false,
1724                           NULL, NULL, NULL, NULL);
1725        closesocket(s->fd);
1726    }
1727
1728    s->fd = re_s->fd;
1729    s->cache_flags = re_s->cache_flags;
1730
1731    g_free(state->opaque);
1732    state->opaque = NULL;
1733
1734    return;
1735}
1736
1737static void sd_reopen_abort(BDRVReopenState *state)
1738{
1739    BDRVSheepdogReopenState *re_s = state->opaque;
1740    BDRVSheepdogState *s = state->bs->opaque;
1741
1742    if (re_s == NULL) {
1743        return;
1744    }
1745
1746    if (re_s->fd) {
1747        aio_set_fd_handler(s->aio_context, re_s->fd, false,
1748                           NULL, NULL, NULL, NULL);
1749        closesocket(re_s->fd);
1750    }
1751
1752    g_free(state->opaque);
1753    state->opaque = NULL;
1754
1755    return;
1756}
1757
1758static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1759                        Error **errp)
1760{
1761    SheepdogVdiReq hdr;
1762    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1763    int fd, ret;
1764    unsigned int wlen, rlen = 0;
1765    char buf[SD_MAX_VDI_LEN];
1766
1767    fd = connect_to_sdog(s, errp);
1768    if (fd < 0) {
1769        return fd;
1770    }
1771
1772    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1773     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1774     */
1775    memset(buf, 0, sizeof(buf));
1776    pstrcpy(buf, sizeof(buf), s->name);
1777
1778    memset(&hdr, 0, sizeof(hdr));
1779    hdr.opcode = SD_OP_NEW_VDI;
1780    hdr.base_vdi_id = s->inode.vdi_id;
1781
1782    wlen = SD_MAX_VDI_LEN;
1783
1784    hdr.flags = SD_FLAG_CMD_WRITE;
1785    hdr.snapid = snapshot;
1786
1787    hdr.data_length = wlen;
1788    hdr.vdi_size = s->inode.vdi_size;
1789    hdr.copy_policy = s->inode.copy_policy;
1790    hdr.copies = s->inode.nr_copies;
1791    hdr.block_size_shift = s->inode.block_size_shift;
1792
1793    ret = do_req(fd, NULL, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1794
1795    closesocket(fd);
1796
1797    if (ret) {
1798        error_setg_errno(errp, -ret, "create failed");
1799        return ret;
1800    }
1801
1802    if (rsp->result != SD_RES_SUCCESS) {
1803        error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1804        return -EIO;
1805    }
1806
1807    if (vdi_id) {
1808        *vdi_id = rsp->vdi_id;
1809    }
1810
1811    return 0;
1812}
1813
1814static int sd_prealloc(BlockDriverState *bs, int64_t old_size, int64_t new_size,
1815                       Error **errp)
1816{
1817    BlockBackend *blk = NULL;
1818    BDRVSheepdogState *base = bs->opaque;
1819    unsigned long buf_size;
1820    uint32_t idx, max_idx;
1821    uint32_t object_size;
1822    void *buf = NULL;
1823    int ret;
1824
1825    blk = blk_new(BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE | BLK_PERM_RESIZE,
1826                  BLK_PERM_ALL);
1827
1828    ret = blk_insert_bs(blk, bs, errp);
1829    if (ret < 0) {
1830        goto out_with_err_set;
1831    }
1832
1833    blk_set_allow_write_beyond_eof(blk, true);
1834
1835    object_size = (UINT32_C(1) << base->inode.block_size_shift);
1836    buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1837    buf = g_malloc0(buf_size);
1838
1839    max_idx = DIV_ROUND_UP(new_size, buf_size);
1840
1841    for (idx = old_size / buf_size; idx < max_idx; idx++) {
1842        /*
1843         * The created image can be a cloned image, so we need to read
1844         * a data from the source image.
1845         */
1846        ret = blk_pread(blk, idx * buf_size, buf, buf_size);
1847        if (ret < 0) {
1848            goto out;
1849        }
1850        ret = blk_pwrite(blk, idx * buf_size, buf, buf_size, 0);
1851        if (ret < 0) {
1852            goto out;
1853        }
1854    }
1855
1856    ret = 0;
1857out:
1858    if (ret < 0) {
1859        error_setg_errno(errp, -ret, "Can't pre-allocate");
1860    }
1861out_with_err_set:
1862    if (blk) {
1863        blk_unref(blk);
1864    }
1865    g_free(buf);
1866
1867    return ret;
1868}
1869
1870static int sd_create_prealloc(BlockdevOptionsSheepdog *location, int64_t size,
1871                              Error **errp)
1872{
1873    BlockDriverState *bs;
1874    Visitor *v;
1875    QObject *obj = NULL;
1876    QDict *qdict;
1877    Error *local_err = NULL;
1878    int ret;
1879
1880    v = qobject_output_visitor_new(&obj);
1881    visit_type_BlockdevOptionsSheepdog(v, NULL, &location, &local_err);
1882    visit_free(v);
1883
1884    if (local_err) {
1885        error_propagate(errp, local_err);
1886        qobject_decref(obj);
1887        return -EINVAL;
1888    }
1889
1890    qdict = qobject_to(QDict, obj);
1891    qdict_flatten(qdict);
1892
1893    qdict_put_str(qdict, "driver", "sheepdog");
1894
1895    bs = bdrv_open(NULL, NULL, qdict, BDRV_O_PROTOCOL | BDRV_O_RDWR, errp);
1896    if (bs == NULL) {
1897        ret = -EIO;
1898        goto fail;
1899    }
1900
1901    ret = sd_prealloc(bs, 0, size, errp);
1902fail:
1903    bdrv_unref(bs);
1904    QDECREF(qdict);
1905    return ret;
1906}
1907
1908static int parse_redundancy(BDRVSheepdogState *s, SheepdogRedundancy *opt)
1909{
1910    struct SheepdogInode *inode = &s->inode;
1911
1912    switch (opt->type) {
1913    case SHEEPDOG_REDUNDANCY_TYPE_FULL:
1914        if (opt->u.full.copies > SD_MAX_COPIES || opt->u.full.copies < 1) {
1915            return -EINVAL;
1916        }
1917        inode->copy_policy = 0;
1918        inode->nr_copies = opt->u.full.copies;
1919        return 0;
1920
1921    case SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED:
1922    {
1923        int64_t copy = opt->u.erasure_coded.data_strips;
1924        int64_t parity = opt->u.erasure_coded.parity_strips;
1925
1926        if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1927            return -EINVAL;
1928        }
1929
1930        if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1931            return -EINVAL;
1932        }
1933
1934        /*
1935         * 4 bits for parity and 4 bits for data.
1936         * We have to compress upper data bits because it can't represent 16
1937         */
1938        inode->copy_policy = ((copy / 2) << 4) + parity;
1939        inode->nr_copies = copy + parity;
1940        return 0;
1941    }
1942
1943    default:
1944        g_assert_not_reached();
1945    }
1946
1947    return -EINVAL;
1948}
1949
1950/*
1951 * Sheepdog support two kinds of redundancy, full replication and erasure
1952 * coding.
1953 *
1954 * # create a fully replicated vdi with x copies
1955 * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1956 *
1957 * # create a erasure coded vdi with x data strips and y parity strips
1958 * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1959 */
1960static SheepdogRedundancy *parse_redundancy_str(const char *opt)
1961{
1962    SheepdogRedundancy *redundancy;
1963    const char *n1, *n2;
1964    long copy, parity;
1965    char p[10];
1966    int ret;
1967
1968    pstrcpy(p, sizeof(p), opt);
1969    n1 = strtok(p, ":");
1970    n2 = strtok(NULL, ":");
1971
1972    if (!n1) {
1973        return NULL;
1974    }
1975
1976    ret = qemu_strtol(n1, NULL, 10, &copy);
1977    if (ret < 0) {
1978        return NULL;
1979    }
1980
1981    redundancy = g_new0(SheepdogRedundancy, 1);
1982    if (!n2) {
1983        *redundancy = (SheepdogRedundancy) {
1984            .type               = SHEEPDOG_REDUNDANCY_TYPE_FULL,
1985            .u.full.copies      = copy,
1986        };
1987    } else {
1988        ret = qemu_strtol(n2, NULL, 10, &parity);
1989        if (ret < 0) {
1990            return NULL;
1991        }
1992
1993        *redundancy = (SheepdogRedundancy) {
1994            .type               = SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED,
1995            .u.erasure_coded    = {
1996                .data_strips    = copy,
1997                .parity_strips  = parity,
1998            },
1999        };
2000    }
2001
2002    return redundancy;
2003}
2004
2005static int parse_block_size_shift(BDRVSheepdogState *s,
2006                                  BlockdevCreateOptionsSheepdog *opts)
2007{
2008    struct SheepdogInode *inode = &s->inode;
2009    uint64_t object_size;
2010    int obj_order;
2011
2012    if (opts->has_object_size) {
2013        object_size = opts->object_size;
2014
2015        if ((object_size - 1) & object_size) {    /* not a power of 2? */
2016            return -EINVAL;
2017        }
2018        obj_order = ctz32(object_size);
2019        if (obj_order < 20 || obj_order > 31) {
2020            return -EINVAL;
2021        }
2022        inode->block_size_shift = (uint8_t)obj_order;
2023    }
2024
2025    return 0;
2026}
2027
2028static int sd_co_create(BlockdevCreateOptions *options, Error **errp)
2029{
2030    BlockdevCreateOptionsSheepdog *opts = &options->u.sheepdog;
2031    int ret = 0;
2032    uint32_t vid = 0;
2033    char *backing_file = NULL;
2034    char *buf = NULL;
2035    BDRVSheepdogState *s;
2036    uint64_t max_vdi_size;
2037    bool prealloc = false;
2038
2039    assert(options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2040
2041    s = g_new0(BDRVSheepdogState, 1);
2042
2043    /* Steal SocketAddress from QAPI, set NULL to prevent double free */
2044    s->addr = opts->location->server;
2045    opts->location->server = NULL;
2046
2047    if (strlen(opts->location->vdi) >= sizeof(s->name)) {
2048        error_setg(errp, "'vdi' string too long");
2049        ret = -EINVAL;
2050        goto out;
2051    }
2052    pstrcpy(s->name, sizeof(s->name), opts->location->vdi);
2053
2054    s->inode.vdi_size = opts->size;
2055    backing_file = opts->backing_file;
2056
2057    if (!opts->has_preallocation) {
2058        opts->preallocation = PREALLOC_MODE_OFF;
2059    }
2060    switch (opts->preallocation) {
2061    case PREALLOC_MODE_OFF:
2062        prealloc = false;
2063        break;
2064    case PREALLOC_MODE_FULL:
2065        prealloc = true;
2066        break;
2067    default:
2068        error_setg(errp, "Preallocation mode not supported for Sheepdog");
2069        ret = -EINVAL;
2070        goto out;
2071    }
2072
2073    if (opts->has_redundancy) {
2074        ret = parse_redundancy(s, opts->redundancy);
2075        if (ret < 0) {
2076            error_setg(errp, "Invalid redundancy mode");
2077            goto out;
2078        }
2079    }
2080    ret = parse_block_size_shift(s, opts);
2081    if (ret < 0) {
2082        error_setg(errp, "Invalid object_size."
2083                         " obect_size needs to be power of 2"
2084                         " and be limited from 2^20 to 2^31");
2085        goto out;
2086    }
2087
2088    if (opts->has_backing_file) {
2089        BlockBackend *blk;
2090        BDRVSheepdogState *base;
2091        BlockDriver *drv;
2092
2093        /* Currently, only Sheepdog backing image is supported. */
2094        drv = bdrv_find_protocol(opts->backing_file, true, NULL);
2095        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
2096            error_setg(errp, "backing_file must be a sheepdog image");
2097            ret = -EINVAL;
2098            goto out;
2099        }
2100
2101        blk = blk_new_open(opts->backing_file, NULL, NULL,
2102                           BDRV_O_PROTOCOL, errp);
2103        if (blk == NULL) {
2104            ret = -EIO;
2105            goto out;
2106        }
2107
2108        base = blk_bs(blk)->opaque;
2109
2110        if (!is_snapshot(&base->inode)) {
2111            error_setg(errp, "cannot clone from a non snapshot vdi");
2112            blk_unref(blk);
2113            ret = -EINVAL;
2114            goto out;
2115        }
2116        s->inode.vdi_id = base->inode.vdi_id;
2117        blk_unref(blk);
2118    }
2119
2120    s->aio_context = qemu_get_aio_context();
2121
2122    /* if block_size_shift is not specified, get cluster default value */
2123    if (s->inode.block_size_shift == 0) {
2124        SheepdogVdiReq hdr;
2125        SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
2126        int fd;
2127        unsigned int wlen = 0, rlen = 0;
2128
2129        fd = connect_to_sdog(s, errp);
2130        if (fd < 0) {
2131            ret = fd;
2132            goto out;
2133        }
2134
2135        memset(&hdr, 0, sizeof(hdr));
2136        hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
2137        hdr.proto_ver = SD_PROTO_VER;
2138
2139        ret = do_req(fd, NULL, (SheepdogReq *)&hdr,
2140                     NULL, &wlen, &rlen);
2141        closesocket(fd);
2142        if (ret) {
2143            error_setg_errno(errp, -ret, "failed to get cluster default");
2144            goto out;
2145        }
2146        if (rsp->result == SD_RES_SUCCESS) {
2147            s->inode.block_size_shift = rsp->block_size_shift;
2148        } else {
2149            s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
2150        }
2151    }
2152
2153    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2154
2155    if (s->inode.vdi_size > max_vdi_size) {
2156        error_setg(errp, "An image is too large."
2157                         " The maximum image size is %"PRIu64 "GB",
2158                         max_vdi_size / 1024 / 1024 / 1024);
2159        ret = -EINVAL;
2160        goto out;
2161    }
2162
2163    ret = do_sd_create(s, &vid, 0, errp);
2164    if (ret) {
2165        goto out;
2166    }
2167
2168    if (prealloc) {
2169        ret = sd_create_prealloc(opts->location, opts->size, errp);
2170    }
2171out:
2172    g_free(backing_file);
2173    g_free(buf);
2174    g_free(s->addr);
2175    g_free(s);
2176    return ret;
2177}
2178
2179static int coroutine_fn sd_co_create_opts(const char *filename, QemuOpts *opts,
2180                                          Error **errp)
2181{
2182    BlockdevCreateOptions *create_options = NULL;
2183    QDict *qdict, *location_qdict;
2184    QObject *crumpled;
2185    Visitor *v;
2186    const char *redundancy;
2187    Error *local_err = NULL;
2188    int ret;
2189
2190    redundancy = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
2191
2192    qdict = qemu_opts_to_qdict(opts, NULL);
2193    qdict_put_str(qdict, "driver", "sheepdog");
2194
2195    location_qdict = qdict_new();
2196    qdict_put(qdict, "location", location_qdict);
2197
2198    sd_parse_filename(filename, location_qdict, &local_err);
2199    if (local_err) {
2200        error_propagate(errp, local_err);
2201        ret = -EINVAL;
2202        goto fail;
2203    }
2204
2205    qdict_flatten(qdict);
2206
2207    /* Change legacy command line options into QMP ones */
2208    static const QDictRenames opt_renames[] = {
2209        { BLOCK_OPT_BACKING_FILE,       "backing-file" },
2210        { BLOCK_OPT_OBJECT_SIZE,        "object-size" },
2211        { NULL, NULL },
2212    };
2213
2214    if (!qdict_rename_keys(qdict, opt_renames, errp)) {
2215        ret = -EINVAL;
2216        goto fail;
2217    }
2218
2219    /* Get the QAPI object */
2220    crumpled = qdict_crumple(qdict, errp);
2221    if (crumpled == NULL) {
2222        ret = -EINVAL;
2223        goto fail;
2224    }
2225
2226    v = qobject_input_visitor_new_keyval(crumpled);
2227    visit_type_BlockdevCreateOptions(v, NULL, &create_options, &local_err);
2228    visit_free(v);
2229    qobject_decref(crumpled);
2230
2231    if (local_err) {
2232        error_propagate(errp, local_err);
2233        ret = -EINVAL;
2234        goto fail;
2235    }
2236
2237    assert(create_options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2238    create_options->u.sheepdog.size =
2239        ROUND_UP(create_options->u.sheepdog.size, BDRV_SECTOR_SIZE);
2240
2241    if (redundancy) {
2242        create_options->u.sheepdog.has_redundancy = true;
2243        create_options->u.sheepdog.redundancy =
2244            parse_redundancy_str(redundancy);
2245        if (create_options->u.sheepdog.redundancy == NULL) {
2246            error_setg(errp, "Invalid redundancy mode");
2247            ret = -EINVAL;
2248            goto fail;
2249        }
2250    }
2251
2252    ret = sd_co_create(create_options, errp);
2253fail:
2254    qapi_free_BlockdevCreateOptions(create_options);
2255    QDECREF(qdict);
2256    return ret;
2257}
2258
2259static void sd_close(BlockDriverState *bs)
2260{
2261    Error *local_err = NULL;
2262    BDRVSheepdogState *s = bs->opaque;
2263    SheepdogVdiReq hdr;
2264    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2265    unsigned int wlen, rlen = 0;
2266    int fd, ret;
2267
2268    DPRINTF("%s\n", s->name);
2269
2270    fd = connect_to_sdog(s, &local_err);
2271    if (fd < 0) {
2272        error_report_err(local_err);
2273        return;
2274    }
2275
2276    memset(&hdr, 0, sizeof(hdr));
2277
2278    hdr.opcode = SD_OP_RELEASE_VDI;
2279    hdr.type = LOCK_TYPE_NORMAL;
2280    hdr.base_vdi_id = s->inode.vdi_id;
2281    wlen = strlen(s->name) + 1;
2282    hdr.data_length = wlen;
2283    hdr.flags = SD_FLAG_CMD_WRITE;
2284
2285    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2286                 s->name, &wlen, &rlen);
2287
2288    closesocket(fd);
2289
2290    if (!ret && rsp->result != SD_RES_SUCCESS &&
2291        rsp->result != SD_RES_VDI_NOT_LOCKED) {
2292        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2293    }
2294
2295    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
2296                       false, NULL, NULL, NULL, NULL);
2297    closesocket(s->fd);
2298    qapi_free_SocketAddress(s->addr);
2299}
2300
2301static int64_t sd_getlength(BlockDriverState *bs)
2302{
2303    BDRVSheepdogState *s = bs->opaque;
2304
2305    return s->inode.vdi_size;
2306}
2307
2308static int sd_truncate(BlockDriverState *bs, int64_t offset,
2309                       PreallocMode prealloc, Error **errp)
2310{
2311    BDRVSheepdogState *s = bs->opaque;
2312    int ret, fd;
2313    unsigned int datalen;
2314    uint64_t max_vdi_size;
2315    int64_t old_size = s->inode.vdi_size;
2316
2317    if (prealloc != PREALLOC_MODE_OFF && prealloc != PREALLOC_MODE_FULL) {
2318        error_setg(errp, "Unsupported preallocation mode '%s'",
2319                   PreallocMode_str(prealloc));
2320        return -ENOTSUP;
2321    }
2322
2323    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2324    if (offset < old_size) {
2325        error_setg(errp, "shrinking is not supported");
2326        return -EINVAL;
2327    } else if (offset > max_vdi_size) {
2328        error_setg(errp, "too big image size");
2329        return -EINVAL;
2330    }
2331
2332    fd = connect_to_sdog(s, errp);
2333    if (fd < 0) {
2334        return fd;
2335    }
2336
2337    /* we don't need to update entire object */
2338    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2339    s->inode.vdi_size = offset;
2340    ret = write_object(fd, s->bs, (char *)&s->inode,
2341                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2342                       datalen, 0, false, s->cache_flags);
2343    close(fd);
2344
2345    if (ret < 0) {
2346        error_setg_errno(errp, -ret, "failed to update an inode");
2347        return ret;
2348    }
2349
2350    if (prealloc == PREALLOC_MODE_FULL) {
2351        ret = sd_prealloc(bs, old_size, offset, errp);
2352        if (ret < 0) {
2353            return ret;
2354        }
2355    }
2356
2357    return 0;
2358}
2359
2360/*
2361 * This function is called after writing data objects.  If we need to
2362 * update metadata, this sends a write request to the vdi object.
2363 */
2364static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2365{
2366    BDRVSheepdogState *s = acb->s;
2367    struct iovec iov;
2368    AIOReq *aio_req;
2369    uint32_t offset, data_len, mn, mx;
2370
2371    mn = acb->min_dirty_data_idx;
2372    mx = acb->max_dirty_data_idx;
2373    if (mn <= mx) {
2374        /* we need to update the vdi object. */
2375        ++acb->nr_pending;
2376        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2377            mn * sizeof(s->inode.data_vdi_id[0]);
2378        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2379
2380        acb->min_dirty_data_idx = UINT32_MAX;
2381        acb->max_dirty_data_idx = 0;
2382
2383        iov.iov_base = &s->inode;
2384        iov.iov_len = sizeof(s->inode);
2385        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2386                                data_len, offset, 0, false, 0, offset);
2387        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2388        if (--acb->nr_pending) {
2389            qemu_coroutine_yield();
2390        }
2391    }
2392}
2393
2394/* Delete current working VDI on the snapshot chain */
2395static bool sd_delete(BDRVSheepdogState *s)
2396{
2397    Error *local_err = NULL;
2398    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2399    SheepdogVdiReq hdr = {
2400        .opcode = SD_OP_DEL_VDI,
2401        .base_vdi_id = s->inode.vdi_id,
2402        .data_length = wlen,
2403        .flags = SD_FLAG_CMD_WRITE,
2404    };
2405    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2406    int fd, ret;
2407
2408    fd = connect_to_sdog(s, &local_err);
2409    if (fd < 0) {
2410        error_report_err(local_err);
2411        return false;
2412    }
2413
2414    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2415                 s->name, &wlen, &rlen);
2416    closesocket(fd);
2417    if (ret) {
2418        return false;
2419    }
2420    switch (rsp->result) {
2421    case SD_RES_NO_VDI:
2422        error_report("%s was already deleted", s->name);
2423        /* fall through */
2424    case SD_RES_SUCCESS:
2425        break;
2426    default:
2427        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2428        return false;
2429    }
2430
2431    return true;
2432}
2433
2434/*
2435 * Create a writable VDI from a snapshot
2436 */
2437static int sd_create_branch(BDRVSheepdogState *s)
2438{
2439    Error *local_err = NULL;
2440    int ret, fd;
2441    uint32_t vid;
2442    char *buf;
2443    bool deleted;
2444
2445    DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
2446
2447    buf = g_malloc(SD_INODE_SIZE);
2448
2449    /*
2450     * Even If deletion fails, we will just create extra snapshot based on
2451     * the working VDI which was supposed to be deleted. So no need to
2452     * false bail out.
2453     */
2454    deleted = sd_delete(s);
2455    ret = do_sd_create(s, &vid, !deleted, &local_err);
2456    if (ret) {
2457        error_report_err(local_err);
2458        goto out;
2459    }
2460
2461    DPRINTF("%" PRIx32 " is created.\n", vid);
2462
2463    fd = connect_to_sdog(s, &local_err);
2464    if (fd < 0) {
2465        error_report_err(local_err);
2466        ret = fd;
2467        goto out;
2468    }
2469
2470    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
2471                      s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2472
2473    closesocket(fd);
2474
2475    if (ret < 0) {
2476        goto out;
2477    }
2478
2479    memcpy(&s->inode, buf, sizeof(s->inode));
2480
2481    s->is_snapshot = false;
2482    ret = 0;
2483    DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
2484
2485out:
2486    g_free(buf);
2487
2488    return ret;
2489}
2490
2491/*
2492 * Send I/O requests to the server.
2493 *
2494 * This function sends requests to the server, links the requests to
2495 * the inflight_list in BDRVSheepdogState, and exits without
2496 * waiting the response.  The responses are received in the
2497 * `aio_read_response' function which is called from the main loop as
2498 * a fd handler.
2499 *
2500 * Returns 1 when we need to wait a response, 0 when there is no sent
2501 * request and -errno in error cases.
2502 */
2503static void coroutine_fn sd_co_rw_vector(SheepdogAIOCB *acb)
2504{
2505    int ret = 0;
2506    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2507    unsigned long idx;
2508    uint32_t object_size;
2509    uint64_t oid;
2510    uint64_t offset;
2511    BDRVSheepdogState *s = acb->s;
2512    SheepdogInode *inode = &s->inode;
2513    AIOReq *aio_req;
2514
2515    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2516        /*
2517         * In the case we open the snapshot VDI, Sheepdog creates the
2518         * writable VDI when we do a write operation first.
2519         */
2520        ret = sd_create_branch(s);
2521        if (ret) {
2522            acb->ret = -EIO;
2523            return;
2524        }
2525    }
2526
2527    object_size = (UINT32_C(1) << inode->block_size_shift);
2528    idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2529    offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2530
2531    /*
2532     * Make sure we don't free the aiocb before we are done with all requests.
2533     * This additional reference is dropped at the end of this function.
2534     */
2535    acb->nr_pending++;
2536
2537    while (done != total) {
2538        uint8_t flags = 0;
2539        uint64_t old_oid = 0;
2540        bool create = false;
2541
2542        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2543
2544        len = MIN(total - done, object_size - offset);
2545
2546        switch (acb->aiocb_type) {
2547        case AIOCB_READ_UDATA:
2548            if (!inode->data_vdi_id[idx]) {
2549                qemu_iovec_memset(acb->qiov, done, 0, len);
2550                goto done;
2551            }
2552            break;
2553        case AIOCB_WRITE_UDATA:
2554            if (!inode->data_vdi_id[idx]) {
2555                create = true;
2556            } else if (!is_data_obj_writable(inode, idx)) {
2557                /* Copy-On-Write */
2558                create = true;
2559                old_oid = oid;
2560                flags = SD_FLAG_CMD_COW;
2561            }
2562            break;
2563        case AIOCB_DISCARD_OBJ:
2564            /*
2565             * We discard the object only when the whole object is
2566             * 1) allocated 2) trimmed. Otherwise, simply skip it.
2567             */
2568            if (len != object_size || inode->data_vdi_id[idx] == 0) {
2569                goto done;
2570            }
2571            break;
2572        default:
2573            break;
2574        }
2575
2576        if (create) {
2577            DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
2578                    inode->vdi_id, oid,
2579                    vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
2580            oid = vid_to_data_oid(inode->vdi_id, idx);
2581            DPRINTF("new oid %" PRIx64 "\n", oid);
2582        }
2583
2584        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2585                                old_oid,
2586                                acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2587                                0 : done);
2588        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2589                        acb->aiocb_type);
2590    done:
2591        offset = 0;
2592        idx++;
2593        done += len;
2594    }
2595    if (--acb->nr_pending) {
2596        qemu_coroutine_yield();
2597    }
2598}
2599
2600static void sd_aio_complete(SheepdogAIOCB *acb)
2601{
2602    BDRVSheepdogState *s;
2603    if (acb->aiocb_type == AIOCB_FLUSH_CACHE) {
2604        return;
2605    }
2606
2607    s = acb->s;
2608    qemu_co_mutex_lock(&s->queue_lock);
2609    QLIST_REMOVE(acb, aiocb_siblings);
2610    qemu_co_queue_restart_all(&s->overlapping_queue);
2611    qemu_co_mutex_unlock(&s->queue_lock);
2612}
2613
2614static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2615                        int nb_sectors, QEMUIOVector *qiov)
2616{
2617    SheepdogAIOCB acb;
2618    int ret;
2619    int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2620    BDRVSheepdogState *s = bs->opaque;
2621
2622    if (offset > s->inode.vdi_size) {
2623        ret = sd_truncate(bs, offset, PREALLOC_MODE_OFF, NULL);
2624        if (ret < 0) {
2625            return ret;
2626        }
2627    }
2628
2629    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_WRITE_UDATA);
2630    sd_co_rw_vector(&acb);
2631    sd_write_done(&acb);
2632    sd_aio_complete(&acb);
2633
2634    return acb.ret;
2635}
2636
2637static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2638                       int nb_sectors, QEMUIOVector *qiov)
2639{
2640    SheepdogAIOCB acb;
2641    BDRVSheepdogState *s = bs->opaque;
2642
2643    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_READ_UDATA);
2644    sd_co_rw_vector(&acb);
2645    sd_aio_complete(&acb);
2646
2647    return acb.ret;
2648}
2649
2650static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2651{
2652    BDRVSheepdogState *s = bs->opaque;
2653    SheepdogAIOCB acb;
2654    AIOReq *aio_req;
2655
2656    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2657        return 0;
2658    }
2659
2660    sd_aio_setup(&acb, s, NULL, 0, 0, AIOCB_FLUSH_CACHE);
2661
2662    acb.nr_pending++;
2663    aio_req = alloc_aio_req(s, &acb, vid_to_vdi_oid(s->inode.vdi_id),
2664                            0, 0, 0, false, 0, 0);
2665    add_aio_request(s, aio_req, NULL, 0, acb.aiocb_type);
2666
2667    if (--acb.nr_pending) {
2668        qemu_coroutine_yield();
2669    }
2670
2671    sd_aio_complete(&acb);
2672    return acb.ret;
2673}
2674
2675static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2676{
2677    Error *local_err = NULL;
2678    BDRVSheepdogState *s = bs->opaque;
2679    int ret, fd;
2680    uint32_t new_vid;
2681    SheepdogInode *inode;
2682    unsigned int datalen;
2683
2684    DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
2685            "is_snapshot %d\n", sn_info->name, sn_info->id_str,
2686            s->name, sn_info->vm_state_size, s->is_snapshot);
2687
2688    if (s->is_snapshot) {
2689        error_report("You can't create a snapshot of a snapshot VDI, "
2690                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2691
2692        return -EINVAL;
2693    }
2694
2695    DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
2696
2697    s->inode.vm_state_size = sn_info->vm_state_size;
2698    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2699    /* It appears that inode.tag does not require a NUL terminator,
2700     * which means this use of strncpy is ok.
2701     */
2702    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2703    /* we don't need to update entire object */
2704    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2705    inode = g_malloc(datalen);
2706
2707    /* refresh inode. */
2708    fd = connect_to_sdog(s, &local_err);
2709    if (fd < 0) {
2710        error_report_err(local_err);
2711        ret = fd;
2712        goto cleanup;
2713    }
2714
2715    ret = write_object(fd, s->bs, (char *)&s->inode,
2716                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2717                       datalen, 0, false, s->cache_flags);
2718    if (ret < 0) {
2719        error_report("failed to write snapshot's inode.");
2720        goto cleanup;
2721    }
2722
2723    ret = do_sd_create(s, &new_vid, 1, &local_err);
2724    if (ret < 0) {
2725        error_reportf_err(local_err,
2726                          "failed to create inode for snapshot: ");
2727        goto cleanup;
2728    }
2729
2730    ret = read_object(fd, s->bs, (char *)inode,
2731                      vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2732                      s->cache_flags);
2733
2734    if (ret < 0) {
2735        error_report("failed to read new inode info. %s", strerror(errno));
2736        goto cleanup;
2737    }
2738
2739    memcpy(&s->inode, inode, datalen);
2740    DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2741            s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2742
2743cleanup:
2744    g_free(inode);
2745    closesocket(fd);
2746    return ret;
2747}
2748
2749/*
2750 * We implement rollback(loadvm) operation to the specified snapshot by
2751 * 1) switch to the snapshot
2752 * 2) rely on sd_create_branch to delete working VDI and
2753 * 3) create a new working VDI based on the specified snapshot
2754 */
2755static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2756{
2757    BDRVSheepdogState *s = bs->opaque;
2758    BDRVSheepdogState *old_s;
2759    char tag[SD_MAX_VDI_TAG_LEN];
2760    uint32_t snapid = 0;
2761    int ret;
2762
2763    if (!sd_parse_snapid_or_tag(snapshot_id, &snapid, tag)) {
2764        return -EINVAL;
2765    }
2766
2767    old_s = g_new(BDRVSheepdogState, 1);
2768
2769    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2770
2771    ret = reload_inode(s, snapid, tag);
2772    if (ret) {
2773        goto out;
2774    }
2775
2776    ret = sd_create_branch(s);
2777    if (ret) {
2778        goto out;
2779    }
2780
2781    g_free(old_s);
2782
2783    return 0;
2784out:
2785    /* recover bdrv_sd_state */
2786    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2787    g_free(old_s);
2788
2789    error_report("failed to open. recover old bdrv_sd_state.");
2790
2791    return ret;
2792}
2793
2794#define NR_BATCHED_DISCARD 128
2795
2796static int remove_objects(BDRVSheepdogState *s, Error **errp)
2797{
2798    int fd, i = 0, nr_objs = 0;
2799    int ret;
2800    SheepdogInode *inode = &s->inode;
2801
2802    fd = connect_to_sdog(s, errp);
2803    if (fd < 0) {
2804        return fd;
2805    }
2806
2807    nr_objs = count_data_objs(inode);
2808    while (i < nr_objs) {
2809        int start_idx, nr_filled_idx;
2810
2811        while (i < nr_objs && !inode->data_vdi_id[i]) {
2812            i++;
2813        }
2814        start_idx = i;
2815
2816        nr_filled_idx = 0;
2817        while (i < nr_objs && nr_filled_idx < NR_BATCHED_DISCARD) {
2818            if (inode->data_vdi_id[i]) {
2819                inode->data_vdi_id[i] = 0;
2820                nr_filled_idx++;
2821            }
2822
2823            i++;
2824        }
2825
2826        ret = write_object(fd, s->bs,
2827                           (char *)&inode->data_vdi_id[start_idx],
2828                           vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
2829                           (i - start_idx) * sizeof(uint32_t),
2830                           offsetof(struct SheepdogInode,
2831                                    data_vdi_id[start_idx]),
2832                           false, s->cache_flags);
2833        if (ret < 0) {
2834            error_setg(errp, "Failed to discard snapshot inode");
2835            goto out;
2836        }
2837    }
2838
2839    ret = 0;
2840out:
2841    closesocket(fd);
2842    return ret;
2843}
2844
2845static int sd_snapshot_delete(BlockDriverState *bs,
2846                              const char *snapshot_id,
2847                              const char *name,
2848                              Error **errp)
2849{
2850    /*
2851     * FIXME should delete the snapshot matching both @snapshot_id and
2852     * @name, but @name not used here
2853     */
2854    unsigned long snap_id = 0;
2855    char snap_tag[SD_MAX_VDI_TAG_LEN];
2856    int fd, ret;
2857    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
2858    BDRVSheepdogState *s = bs->opaque;
2859    unsigned int wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, rlen = 0;
2860    uint32_t vid;
2861    SheepdogVdiReq hdr = {
2862        .opcode = SD_OP_DEL_VDI,
2863        .data_length = wlen,
2864        .flags = SD_FLAG_CMD_WRITE,
2865    };
2866    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2867
2868    ret = remove_objects(s, errp);
2869    if (ret) {
2870        return ret;
2871    }
2872
2873    memset(buf, 0, sizeof(buf));
2874    memset(snap_tag, 0, sizeof(snap_tag));
2875    pstrcpy(buf, SD_MAX_VDI_LEN, s->name);
2876    /* TODO Use sd_parse_snapid() once this mess is cleaned up */
2877    ret = qemu_strtoul(snapshot_id, NULL, 10, &snap_id);
2878    if (ret || snap_id > UINT32_MAX) {
2879        /*
2880         * FIXME Since qemu_strtoul() returns -EINVAL when
2881         * @snapshot_id is null, @snapshot_id is mandatory.  Correct
2882         * would be to require at least one of @snapshot_id and @name.
2883         */
2884        error_setg(errp, "Invalid snapshot ID: %s",
2885                         snapshot_id ? snapshot_id : "<null>");
2886        return -EINVAL;
2887    }
2888
2889    if (snap_id) {
2890        hdr.snapid = (uint32_t) snap_id;
2891    } else {
2892        /* FIXME I suspect we should use @name here */
2893        /* FIXME don't truncate silently */
2894        pstrcpy(snap_tag, sizeof(snap_tag), snapshot_id);
2895        pstrcpy(buf + SD_MAX_VDI_LEN, SD_MAX_VDI_TAG_LEN, snap_tag);
2896    }
2897
2898    ret = find_vdi_name(s, s->name, snap_id, snap_tag, &vid, true, errp);
2899    if (ret) {
2900        return ret;
2901    }
2902
2903    fd = connect_to_sdog(s, errp);
2904    if (fd < 0) {
2905        return fd;
2906    }
2907
2908    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2909                 buf, &wlen, &rlen);
2910    closesocket(fd);
2911    if (ret) {
2912        error_setg_errno(errp, -ret, "Couldn't send request to server");
2913        return ret;
2914    }
2915
2916    switch (rsp->result) {
2917    case SD_RES_NO_VDI:
2918        error_setg(errp, "Can't find the snapshot");
2919        return -ENOENT;
2920    case SD_RES_SUCCESS:
2921        break;
2922    default:
2923        error_setg(errp, "%s", sd_strerror(rsp->result));
2924        return -EIO;
2925    }
2926
2927    return 0;
2928}
2929
2930static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2931{
2932    Error *local_err = NULL;
2933    BDRVSheepdogState *s = bs->opaque;
2934    SheepdogReq req;
2935    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2936    QEMUSnapshotInfo *sn_tab = NULL;
2937    unsigned wlen, rlen;
2938    int found = 0;
2939    static SheepdogInode inode;
2940    unsigned long *vdi_inuse;
2941    unsigned int start_nr;
2942    uint64_t hval;
2943    uint32_t vid;
2944
2945    vdi_inuse = g_malloc(max);
2946
2947    fd = connect_to_sdog(s, &local_err);
2948    if (fd < 0) {
2949        error_report_err(local_err);
2950        ret = fd;
2951        goto out;
2952    }
2953
2954    rlen = max;
2955    wlen = 0;
2956
2957    memset(&req, 0, sizeof(req));
2958
2959    req.opcode = SD_OP_READ_VDIS;
2960    req.data_length = max;
2961
2962    ret = do_req(fd, s->bs, &req, vdi_inuse, &wlen, &rlen);
2963
2964    closesocket(fd);
2965    if (ret) {
2966        goto out;
2967    }
2968
2969    sn_tab = g_new0(QEMUSnapshotInfo, nr);
2970
2971    /* calculate a vdi id with hash function */
2972    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2973    start_nr = hval & (SD_NR_VDIS - 1);
2974
2975    fd = connect_to_sdog(s, &local_err);
2976    if (fd < 0) {
2977        error_report_err(local_err);
2978        ret = fd;
2979        goto out;
2980    }
2981
2982    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2983        if (!test_bit(vid, vdi_inuse)) {
2984            break;
2985        }
2986
2987        /* we don't need to read entire object */
2988        ret = read_object(fd, s->bs, (char *)&inode,
2989                          vid_to_vdi_oid(vid),
2990                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2991                          s->cache_flags);
2992
2993        if (ret) {
2994            continue;
2995        }
2996
2997        if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2998            sn_tab[found].date_sec = inode.snap_ctime >> 32;
2999            sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
3000            sn_tab[found].vm_state_size = inode.vm_state_size;
3001            sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
3002
3003            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
3004                     "%" PRIu32, inode.snap_id);
3005            pstrcpy(sn_tab[found].name,
3006                    MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
3007                    inode.tag);
3008            found++;
3009        }
3010    }
3011
3012    closesocket(fd);
3013out:
3014    *psn_tab = sn_tab;
3015
3016    g_free(vdi_inuse);
3017
3018    if (ret < 0) {
3019        return ret;
3020    }
3021
3022    return found;
3023}
3024
3025static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
3026                                int64_t pos, int size, int load)
3027{
3028    Error *local_err = NULL;
3029    bool create;
3030    int fd, ret = 0, remaining = size;
3031    unsigned int data_len;
3032    uint64_t vmstate_oid;
3033    uint64_t offset;
3034    uint32_t vdi_index;
3035    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
3036    uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
3037
3038    fd = connect_to_sdog(s, &local_err);
3039    if (fd < 0) {
3040        error_report_err(local_err);
3041        return fd;
3042    }
3043
3044    while (remaining) {
3045        vdi_index = pos / object_size;
3046        offset = pos % object_size;
3047
3048        data_len = MIN(remaining, object_size - offset);
3049
3050        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
3051
3052        create = (offset == 0);
3053        if (load) {
3054            ret = read_object(fd, s->bs, (char *)data, vmstate_oid,
3055                              s->inode.nr_copies, data_len, offset,
3056                              s->cache_flags);
3057        } else {
3058            ret = write_object(fd, s->bs, (char *)data, vmstate_oid,
3059                               s->inode.nr_copies, data_len, offset, create,
3060                               s->cache_flags);
3061        }
3062
3063        if (ret < 0) {
3064            error_report("failed to save vmstate %s", strerror(errno));
3065            goto cleanup;
3066        }
3067
3068        pos += data_len;
3069        data += data_len;
3070        remaining -= data_len;
3071    }
3072    ret = size;
3073cleanup:
3074    closesocket(fd);
3075    return ret;
3076}
3077
3078static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3079                           int64_t pos)
3080{
3081    BDRVSheepdogState *s = bs->opaque;
3082    void *buf;
3083    int ret;
3084
3085    buf = qemu_blockalign(bs, qiov->size);
3086    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
3087    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
3088    qemu_vfree(buf);
3089
3090    return ret;
3091}
3092
3093static int sd_load_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3094                           int64_t pos)
3095{
3096    BDRVSheepdogState *s = bs->opaque;
3097    void *buf;
3098    int ret;
3099
3100    buf = qemu_blockalign(bs, qiov->size);
3101    ret = do_load_save_vmstate(s, buf, pos, qiov->size, 1);
3102    qemu_iovec_from_buf(qiov, 0, buf, qiov->size);
3103    qemu_vfree(buf);
3104
3105    return ret;
3106}
3107
3108
3109static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
3110                                      int bytes)
3111{
3112    SheepdogAIOCB acb;
3113    BDRVSheepdogState *s = bs->opaque;
3114    QEMUIOVector discard_iov;
3115    struct iovec iov;
3116    uint32_t zero = 0;
3117
3118    if (!s->discard_supported) {
3119        return 0;
3120    }
3121
3122    memset(&discard_iov, 0, sizeof(discard_iov));
3123    memset(&iov, 0, sizeof(iov));
3124    iov.iov_base = &zero;
3125    iov.iov_len = sizeof(zero);
3126    discard_iov.iov = &iov;
3127    discard_iov.niov = 1;
3128    if (!QEMU_IS_ALIGNED(offset | bytes, BDRV_SECTOR_SIZE)) {
3129        return -ENOTSUP;
3130    }
3131    sd_aio_setup(&acb, s, &discard_iov, offset >> BDRV_SECTOR_BITS,
3132                 bytes >> BDRV_SECTOR_BITS, AIOCB_DISCARD_OBJ);
3133    sd_co_rw_vector(&acb);
3134    sd_aio_complete(&acb);
3135
3136    return acb.ret;
3137}
3138
3139static coroutine_fn int
3140sd_co_block_status(BlockDriverState *bs, bool want_zero, int64_t offset,
3141                   int64_t bytes, int64_t *pnum, int64_t *map,
3142                   BlockDriverState **file)
3143{
3144    BDRVSheepdogState *s = bs->opaque;
3145    SheepdogInode *inode = &s->inode;
3146    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3147    unsigned long start = offset / object_size,
3148                  end = DIV_ROUND_UP(offset + bytes, object_size);
3149    unsigned long idx;
3150    *map = offset;
3151    int ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
3152
3153    for (idx = start; idx < end; idx++) {
3154        if (inode->data_vdi_id[idx] == 0) {
3155            break;
3156        }
3157    }
3158    if (idx == start) {
3159        /* Get the longest length of unallocated sectors */
3160        ret = 0;
3161        for (idx = start + 1; idx < end; idx++) {
3162            if (inode->data_vdi_id[idx] != 0) {
3163                break;
3164            }
3165        }
3166    }
3167
3168    *pnum = (idx - start) * object_size;
3169    if (*pnum > bytes) {
3170        *pnum = bytes;
3171    }
3172    if (ret > 0 && ret & BDRV_BLOCK_OFFSET_VALID) {
3173        *file = bs;
3174    }
3175    return ret;
3176}
3177
3178static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
3179{
3180    BDRVSheepdogState *s = bs->opaque;
3181    SheepdogInode *inode = &s->inode;
3182    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3183    unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
3184    uint64_t size = 0;
3185
3186    for (i = 0; i < last; i++) {
3187        if (inode->data_vdi_id[i] == 0) {
3188            continue;
3189        }
3190        size += object_size;
3191    }
3192    return size;
3193}
3194
3195static QemuOptsList sd_create_opts = {
3196    .name = "sheepdog-create-opts",
3197    .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
3198    .desc = {
3199        {
3200            .name = BLOCK_OPT_SIZE,
3201            .type = QEMU_OPT_SIZE,
3202            .help = "Virtual disk size"
3203        },
3204        {
3205            .name = BLOCK_OPT_BACKING_FILE,
3206            .type = QEMU_OPT_STRING,
3207            .help = "File name of a base image"
3208        },
3209        {
3210            .name = BLOCK_OPT_PREALLOC,
3211            .type = QEMU_OPT_STRING,
3212            .help = "Preallocation mode (allowed values: off, full)"
3213        },
3214        {
3215            .name = BLOCK_OPT_REDUNDANCY,
3216            .type = QEMU_OPT_STRING,
3217            .help = "Redundancy of the image"
3218        },
3219        {
3220            .name = BLOCK_OPT_OBJECT_SIZE,
3221            .type = QEMU_OPT_SIZE,
3222            .help = "Object size of the image"
3223        },
3224        { /* end of list */ }
3225    }
3226};
3227
3228static BlockDriver bdrv_sheepdog = {
3229    .format_name                  = "sheepdog",
3230    .protocol_name                = "sheepdog",
3231    .instance_size                = sizeof(BDRVSheepdogState),
3232    .bdrv_parse_filename          = sd_parse_filename,
3233    .bdrv_file_open               = sd_open,
3234    .bdrv_reopen_prepare          = sd_reopen_prepare,
3235    .bdrv_reopen_commit           = sd_reopen_commit,
3236    .bdrv_reopen_abort            = sd_reopen_abort,
3237    .bdrv_close                   = sd_close,
3238    .bdrv_co_create               = sd_co_create,
3239    .bdrv_co_create_opts          = sd_co_create_opts,
3240    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3241    .bdrv_getlength               = sd_getlength,
3242    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3243    .bdrv_truncate                = sd_truncate,
3244
3245    .bdrv_co_readv                = sd_co_readv,
3246    .bdrv_co_writev               = sd_co_writev,
3247    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3248    .bdrv_co_pdiscard             = sd_co_pdiscard,
3249    .bdrv_co_block_status         = sd_co_block_status,
3250
3251    .bdrv_snapshot_create         = sd_snapshot_create,
3252    .bdrv_snapshot_goto           = sd_snapshot_goto,
3253    .bdrv_snapshot_delete         = sd_snapshot_delete,
3254    .bdrv_snapshot_list           = sd_snapshot_list,
3255
3256    .bdrv_save_vmstate            = sd_save_vmstate,
3257    .bdrv_load_vmstate            = sd_load_vmstate,
3258
3259    .bdrv_detach_aio_context      = sd_detach_aio_context,
3260    .bdrv_attach_aio_context      = sd_attach_aio_context,
3261
3262    .create_opts                  = &sd_create_opts,
3263};
3264
3265static BlockDriver bdrv_sheepdog_tcp = {
3266    .format_name                  = "sheepdog",
3267    .protocol_name                = "sheepdog+tcp",
3268    .instance_size                = sizeof(BDRVSheepdogState),
3269    .bdrv_parse_filename          = sd_parse_filename,
3270    .bdrv_file_open               = sd_open,
3271    .bdrv_reopen_prepare          = sd_reopen_prepare,
3272    .bdrv_reopen_commit           = sd_reopen_commit,
3273    .bdrv_reopen_abort            = sd_reopen_abort,
3274    .bdrv_close                   = sd_close,
3275    .bdrv_co_create               = sd_co_create,
3276    .bdrv_co_create_opts          = sd_co_create_opts,
3277    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3278    .bdrv_getlength               = sd_getlength,
3279    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3280    .bdrv_truncate                = sd_truncate,
3281
3282    .bdrv_co_readv                = sd_co_readv,
3283    .bdrv_co_writev               = sd_co_writev,
3284    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3285    .bdrv_co_pdiscard             = sd_co_pdiscard,
3286    .bdrv_co_block_status         = sd_co_block_status,
3287
3288    .bdrv_snapshot_create         = sd_snapshot_create,
3289    .bdrv_snapshot_goto           = sd_snapshot_goto,
3290    .bdrv_snapshot_delete         = sd_snapshot_delete,
3291    .bdrv_snapshot_list           = sd_snapshot_list,
3292
3293    .bdrv_save_vmstate            = sd_save_vmstate,
3294    .bdrv_load_vmstate            = sd_load_vmstate,
3295
3296    .bdrv_detach_aio_context      = sd_detach_aio_context,
3297    .bdrv_attach_aio_context      = sd_attach_aio_context,
3298
3299    .create_opts                  = &sd_create_opts,
3300};
3301
3302static BlockDriver bdrv_sheepdog_unix = {
3303    .format_name                  = "sheepdog",
3304    .protocol_name                = "sheepdog+unix",
3305    .instance_size                = sizeof(BDRVSheepdogState),
3306    .bdrv_parse_filename          = sd_parse_filename,
3307    .bdrv_file_open               = sd_open,
3308    .bdrv_reopen_prepare          = sd_reopen_prepare,
3309    .bdrv_reopen_commit           = sd_reopen_commit,
3310    .bdrv_reopen_abort            = sd_reopen_abort,
3311    .bdrv_close                   = sd_close,
3312    .bdrv_co_create               = sd_co_create,
3313    .bdrv_co_create_opts          = sd_co_create_opts,
3314    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3315    .bdrv_getlength               = sd_getlength,
3316    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3317    .bdrv_truncate                = sd_truncate,
3318
3319    .bdrv_co_readv                = sd_co_readv,
3320    .bdrv_co_writev               = sd_co_writev,
3321    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3322    .bdrv_co_pdiscard             = sd_co_pdiscard,
3323    .bdrv_co_block_status         = sd_co_block_status,
3324
3325    .bdrv_snapshot_create         = sd_snapshot_create,
3326    .bdrv_snapshot_goto           = sd_snapshot_goto,
3327    .bdrv_snapshot_delete         = sd_snapshot_delete,
3328    .bdrv_snapshot_list           = sd_snapshot_list,
3329
3330    .bdrv_save_vmstate            = sd_save_vmstate,
3331    .bdrv_load_vmstate            = sd_load_vmstate,
3332
3333    .bdrv_detach_aio_context      = sd_detach_aio_context,
3334    .bdrv_attach_aio_context      = sd_attach_aio_context,
3335
3336    .create_opts                  = &sd_create_opts,
3337};
3338
3339static void bdrv_sheepdog_init(void)
3340{
3341    bdrv_register(&bdrv_sheepdog);
3342    bdrv_register(&bdrv_sheepdog_tcp);
3343    bdrv_register(&bdrv_sheepdog_unix);
3344}
3345block_init(bdrv_sheepdog_init);
3346