qemu/block/sheepdog.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public License version
   6 * 2 as published by the Free Software Foundation.
   7 *
   8 * You should have received a copy of the GNU General Public License
   9 * along with this program. If not, see <http://www.gnu.org/licenses/>.
  10 *
  11 * Contributions after 2012-01-13 are licensed under the terms of the
  12 * GNU GPL, version 2 or (at your option) any later version.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qemu-common.h"
  17#include "qapi/error.h"
  18#include "qapi/qapi-visit-sockets.h"
  19#include "qapi/qapi-visit-block-core.h"
  20#include "qapi/qmp/qdict.h"
  21#include "qapi/qobject-input-visitor.h"
  22#include "qapi/qobject-output-visitor.h"
  23#include "qemu/uri.h"
  24#include "qemu/error-report.h"
  25#include "qemu/main-loop.h"
  26#include "qemu/module.h"
  27#include "qemu/option.h"
  28#include "qemu/sockets.h"
  29#include "block/block_int.h"
  30#include "block/qdict.h"
  31#include "sysemu/block-backend.h"
  32#include "qemu/bitops.h"
  33#include "qemu/cutils.h"
  34#include "trace.h"
  35
  36#define SD_PROTO_VER 0x01
  37
  38#define SD_DEFAULT_ADDR "localhost"
  39#define SD_DEFAULT_PORT 7000
  40
  41#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
  42#define SD_OP_READ_OBJ       0x02
  43#define SD_OP_WRITE_OBJ      0x03
  44/* 0x04 is used internally by Sheepdog */
  45
  46#define SD_OP_NEW_VDI        0x11
  47#define SD_OP_LOCK_VDI       0x12
  48#define SD_OP_RELEASE_VDI    0x13
  49#define SD_OP_GET_VDI_INFO   0x14
  50#define SD_OP_READ_VDIS      0x15
  51#define SD_OP_FLUSH_VDI      0x16
  52#define SD_OP_DEL_VDI        0x17
  53#define SD_OP_GET_CLUSTER_DEFAULT   0x18
  54
  55#define SD_FLAG_CMD_WRITE    0x01
  56#define SD_FLAG_CMD_COW      0x02
  57#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
  58#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
  59
  60#define SD_RES_SUCCESS       0x00 /* Success */
  61#define SD_RES_UNKNOWN       0x01 /* Unknown error */
  62#define SD_RES_NO_OBJ        0x02 /* No object found */
  63#define SD_RES_EIO           0x03 /* I/O error */
  64#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
  65#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
  66#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
  67#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
  68#define SD_RES_NO_VDI        0x08 /* No vdi found */
  69#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
  70#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
  71#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
  72#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
  73#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
  74#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
  75#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
  76#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
  77#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
  78#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
  79#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
  80#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
  81#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
  82#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
  83#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
  84#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
  85#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
  86#define SD_RES_READONLY      0x1A /* Object is read-only */
  87
  88/*
  89 * Object ID rules
  90 *
  91 *  0 - 19 (20 bits): data object space
  92 * 20 - 31 (12 bits): reserved data object space
  93 * 32 - 55 (24 bits): vdi object space
  94 * 56 - 59 ( 4 bits): reserved vdi object space
  95 * 60 - 63 ( 4 bits): object type identifier space
  96 */
  97
  98#define VDI_SPACE_SHIFT   32
  99#define VDI_BIT (UINT64_C(1) << 63)
 100#define VMSTATE_BIT (UINT64_C(1) << 62)
 101#define MAX_DATA_OBJS (UINT64_C(1) << 20)
 102#define MAX_CHILDREN 1024
 103#define SD_MAX_VDI_LEN 256
 104#define SD_MAX_VDI_TAG_LEN 256
 105#define SD_NR_VDIS   (1U << 24)
 106#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
 107#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
 108#define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
 109/*
 110 * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
 111 * (SD_EC_MAX_STRIP - 1) for parity strips
 112 *
 113 * SD_MAX_COPIES is sum of number of data strips and parity strips.
 114 */
 115#define SD_EC_MAX_STRIP 16
 116#define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
 117
 118#define SD_INODE_SIZE (sizeof(SheepdogInode))
 119#define CURRENT_VDI_ID 0
 120
 121#define LOCK_TYPE_NORMAL 0
 122#define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
 123
 124typedef struct SheepdogReq {
 125    uint8_t proto_ver;
 126    uint8_t opcode;
 127    uint16_t flags;
 128    uint32_t epoch;
 129    uint32_t id;
 130    uint32_t data_length;
 131    uint32_t opcode_specific[8];
 132} SheepdogReq;
 133
 134typedef struct SheepdogRsp {
 135    uint8_t proto_ver;
 136    uint8_t opcode;
 137    uint16_t flags;
 138    uint32_t epoch;
 139    uint32_t id;
 140    uint32_t data_length;
 141    uint32_t result;
 142    uint32_t opcode_specific[7];
 143} SheepdogRsp;
 144
 145typedef struct SheepdogObjReq {
 146    uint8_t proto_ver;
 147    uint8_t opcode;
 148    uint16_t flags;
 149    uint32_t epoch;
 150    uint32_t id;
 151    uint32_t data_length;
 152    uint64_t oid;
 153    uint64_t cow_oid;
 154    uint8_t copies;
 155    uint8_t copy_policy;
 156    uint8_t reserved[6];
 157    uint64_t offset;
 158} SheepdogObjReq;
 159
 160typedef struct SheepdogObjRsp {
 161    uint8_t proto_ver;
 162    uint8_t opcode;
 163    uint16_t flags;
 164    uint32_t epoch;
 165    uint32_t id;
 166    uint32_t data_length;
 167    uint32_t result;
 168    uint8_t copies;
 169    uint8_t copy_policy;
 170    uint8_t reserved[2];
 171    uint32_t pad[6];
 172} SheepdogObjRsp;
 173
 174typedef struct SheepdogVdiReq {
 175    uint8_t proto_ver;
 176    uint8_t opcode;
 177    uint16_t flags;
 178    uint32_t epoch;
 179    uint32_t id;
 180    uint32_t data_length;
 181    uint64_t vdi_size;
 182    uint32_t base_vdi_id;
 183    uint8_t copies;
 184    uint8_t copy_policy;
 185    uint8_t store_policy;
 186    uint8_t block_size_shift;
 187    uint32_t snapid;
 188    uint32_t type;
 189    uint32_t pad[2];
 190} SheepdogVdiReq;
 191
 192typedef struct SheepdogVdiRsp {
 193    uint8_t proto_ver;
 194    uint8_t opcode;
 195    uint16_t flags;
 196    uint32_t epoch;
 197    uint32_t id;
 198    uint32_t data_length;
 199    uint32_t result;
 200    uint32_t rsvd;
 201    uint32_t vdi_id;
 202    uint32_t pad[5];
 203} SheepdogVdiRsp;
 204
 205typedef struct SheepdogClusterRsp {
 206    uint8_t proto_ver;
 207    uint8_t opcode;
 208    uint16_t flags;
 209    uint32_t epoch;
 210    uint32_t id;
 211    uint32_t data_length;
 212    uint32_t result;
 213    uint8_t nr_copies;
 214    uint8_t copy_policy;
 215    uint8_t block_size_shift;
 216    uint8_t __pad1;
 217    uint32_t __pad2[6];
 218} SheepdogClusterRsp;
 219
 220typedef struct SheepdogInode {
 221    char name[SD_MAX_VDI_LEN];
 222    char tag[SD_MAX_VDI_TAG_LEN];
 223    uint64_t ctime;
 224    uint64_t snap_ctime;
 225    uint64_t vm_clock_nsec;
 226    uint64_t vdi_size;
 227    uint64_t vm_state_size;
 228    uint16_t copy_policy;
 229    uint8_t nr_copies;
 230    uint8_t block_size_shift;
 231    uint32_t snap_id;
 232    uint32_t vdi_id;
 233    uint32_t parent_vdi_id;
 234    uint32_t child_vdi_id[MAX_CHILDREN];
 235    uint32_t data_vdi_id[MAX_DATA_OBJS];
 236} SheepdogInode;
 237
 238#define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
 239
 240/*
 241 * 64 bit FNV-1a non-zero initial basis
 242 */
 243#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
 244
 245static void deprecation_warning(void)
 246{
 247    static bool warned;
 248
 249    if (!warned) {
 250        warn_report("the sheepdog block driver is deprecated");
 251        warned = true;
 252    }
 253}
 254
 255/*
 256 * 64 bit Fowler/Noll/Vo FNV-1a hash code
 257 */
 258static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 259{
 260    unsigned char *bp = buf;
 261    unsigned char *be = bp + len;
 262    while (bp < be) {
 263        hval ^= (uint64_t) *bp++;
 264        hval += (hval << 1) + (hval << 4) + (hval << 5) +
 265            (hval << 7) + (hval << 8) + (hval << 40);
 266    }
 267    return hval;
 268}
 269
 270static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
 271{
 272    return inode->vdi_id == inode->data_vdi_id[idx];
 273}
 274
 275static inline bool is_data_obj(uint64_t oid)
 276{
 277    return !(VDI_BIT & oid);
 278}
 279
 280static inline uint64_t data_oid_to_idx(uint64_t oid)
 281{
 282    return oid & (MAX_DATA_OBJS - 1);
 283}
 284
 285static inline uint32_t oid_to_vid(uint64_t oid)
 286{
 287    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
 288}
 289
 290static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 291{
 292    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
 293}
 294
 295static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
 296{
 297    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 298}
 299
 300static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
 301{
 302    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 303}
 304
 305static inline bool is_snapshot(struct SheepdogInode *inode)
 306{
 307    return !!inode->snap_ctime;
 308}
 309
 310static inline size_t count_data_objs(const struct SheepdogInode *inode)
 311{
 312    return DIV_ROUND_UP(inode->vdi_size,
 313                        (1UL << inode->block_size_shift));
 314}
 315
 316typedef struct SheepdogAIOCB SheepdogAIOCB;
 317typedef struct BDRVSheepdogState BDRVSheepdogState;
 318
 319typedef struct AIOReq {
 320    SheepdogAIOCB *aiocb;
 321    unsigned int iov_offset;
 322
 323    uint64_t oid;
 324    uint64_t base_oid;
 325    uint64_t offset;
 326    unsigned int data_len;
 327    uint8_t flags;
 328    uint32_t id;
 329    bool create;
 330
 331    QLIST_ENTRY(AIOReq) aio_siblings;
 332} AIOReq;
 333
 334enum AIOCBState {
 335    AIOCB_WRITE_UDATA,
 336    AIOCB_READ_UDATA,
 337    AIOCB_FLUSH_CACHE,
 338    AIOCB_DISCARD_OBJ,
 339};
 340
 341#define AIOCBOverlapping(x, y)                                 \
 342    (!(x->max_affect_data_idx < y->min_affect_data_idx          \
 343       || y->max_affect_data_idx < x->min_affect_data_idx))
 344
 345struct SheepdogAIOCB {
 346    BDRVSheepdogState *s;
 347
 348    QEMUIOVector *qiov;
 349
 350    int64_t sector_num;
 351    int nb_sectors;
 352
 353    int ret;
 354    enum AIOCBState aiocb_type;
 355
 356    Coroutine *coroutine;
 357    int nr_pending;
 358
 359    uint32_t min_affect_data_idx;
 360    uint32_t max_affect_data_idx;
 361
 362    /*
 363     * The difference between affect_data_idx and dirty_data_idx:
 364     * affect_data_idx represents range of index of all request types.
 365     * dirty_data_idx represents range of index updated by COW requests.
 366     * dirty_data_idx is used for updating an inode object.
 367     */
 368    uint32_t min_dirty_data_idx;
 369    uint32_t max_dirty_data_idx;
 370
 371    QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
 372};
 373
 374struct BDRVSheepdogState {
 375    BlockDriverState *bs;
 376    AioContext *aio_context;
 377
 378    SheepdogInode inode;
 379
 380    char name[SD_MAX_VDI_LEN];
 381    bool is_snapshot;
 382    uint32_t cache_flags;
 383    bool discard_supported;
 384
 385    SocketAddress *addr;
 386    int fd;
 387
 388    CoMutex lock;
 389    Coroutine *co_send;
 390    Coroutine *co_recv;
 391
 392    uint32_t aioreq_seq_num;
 393
 394    /* Every aio request must be linked to either of these queues. */
 395    QLIST_HEAD(, AIOReq) inflight_aio_head;
 396    QLIST_HEAD(, AIOReq) failed_aio_head;
 397
 398    CoMutex queue_lock;
 399    CoQueue overlapping_queue;
 400    QLIST_HEAD(, SheepdogAIOCB) inflight_aiocb_head;
 401};
 402
 403typedef struct BDRVSheepdogReopenState {
 404    int fd;
 405    int cache_flags;
 406} BDRVSheepdogReopenState;
 407
 408static const char *sd_strerror(int err)
 409{
 410    int i;
 411
 412    static const struct {
 413        int err;
 414        const char *desc;
 415    } errors[] = {
 416        {SD_RES_SUCCESS, "Success"},
 417        {SD_RES_UNKNOWN, "Unknown error"},
 418        {SD_RES_NO_OBJ, "No object found"},
 419        {SD_RES_EIO, "I/O error"},
 420        {SD_RES_VDI_EXIST, "VDI exists already"},
 421        {SD_RES_INVALID_PARMS, "Invalid parameters"},
 422        {SD_RES_SYSTEM_ERROR, "System error"},
 423        {SD_RES_VDI_LOCKED, "VDI is already locked"},
 424        {SD_RES_NO_VDI, "No vdi found"},
 425        {SD_RES_NO_BASE_VDI, "No base VDI found"},
 426        {SD_RES_VDI_READ, "Failed read the requested VDI"},
 427        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
 428        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
 429        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
 430        {SD_RES_NO_TAG, "Failed to find the requested tag"},
 431        {SD_RES_STARTUP, "The system is still booting"},
 432        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
 433        {SD_RES_SHUTDOWN, "The system is shutting down"},
 434        {SD_RES_NO_MEM, "Out of memory on the server"},
 435        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
 436        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
 437        {SD_RES_NO_SPACE, "Server has no space for new objects"},
 438        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
 439        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
 440        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
 441        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
 442        {SD_RES_READONLY, "Object is read-only"},
 443    };
 444
 445    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
 446        if (errors[i].err == err) {
 447            return errors[i].desc;
 448        }
 449    }
 450
 451    return "Invalid error code";
 452}
 453
 454/*
 455 * Sheepdog I/O handling:
 456 *
 457 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
 458 *    link the requests to the inflight_list in the
 459 *    BDRVSheepdogState.  The function yields while waiting for
 460 *    receiving the response.
 461 *
 462 * 2. We receive the response in aio_read_response, the fd handler to
 463 *    the sheepdog connection.  We switch back to sd_co_readv/sd_writev
 464 *    after all the requests belonging to the AIOCB are finished.  If
 465 *    needed, sd_co_writev will send another requests for the vdi object.
 466 */
 467
 468static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
 469                                    uint64_t oid, unsigned int data_len,
 470                                    uint64_t offset, uint8_t flags, bool create,
 471                                    uint64_t base_oid, unsigned int iov_offset)
 472{
 473    AIOReq *aio_req;
 474
 475    aio_req = g_malloc(sizeof(*aio_req));
 476    aio_req->aiocb = acb;
 477    aio_req->iov_offset = iov_offset;
 478    aio_req->oid = oid;
 479    aio_req->base_oid = base_oid;
 480    aio_req->offset = offset;
 481    aio_req->data_len = data_len;
 482    aio_req->flags = flags;
 483    aio_req->id = s->aioreq_seq_num++;
 484    aio_req->create = create;
 485
 486    acb->nr_pending++;
 487    return aio_req;
 488}
 489
 490static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
 491{
 492    SheepdogAIOCB *cb;
 493
 494retry:
 495    QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
 496        if (AIOCBOverlapping(acb, cb)) {
 497            qemu_co_queue_wait(&s->overlapping_queue, &s->queue_lock);
 498            goto retry;
 499        }
 500    }
 501}
 502
 503static void sd_aio_setup(SheepdogAIOCB *acb, BDRVSheepdogState *s,
 504                         QEMUIOVector *qiov, int64_t sector_num, int nb_sectors,
 505                         int type)
 506{
 507    uint32_t object_size;
 508
 509    object_size = (UINT32_C(1) << s->inode.block_size_shift);
 510
 511    acb->s = s;
 512
 513    acb->qiov = qiov;
 514
 515    acb->sector_num = sector_num;
 516    acb->nb_sectors = nb_sectors;
 517
 518    acb->coroutine = qemu_coroutine_self();
 519    acb->ret = 0;
 520    acb->nr_pending = 0;
 521
 522    acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
 523    acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
 524                              acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
 525
 526    acb->min_dirty_data_idx = UINT32_MAX;
 527    acb->max_dirty_data_idx = 0;
 528    acb->aiocb_type = type;
 529
 530    if (type == AIOCB_FLUSH_CACHE) {
 531        return;
 532    }
 533
 534    qemu_co_mutex_lock(&s->queue_lock);
 535    wait_for_overlapping_aiocb(s, acb);
 536    QLIST_INSERT_HEAD(&s->inflight_aiocb_head, acb, aiocb_siblings);
 537    qemu_co_mutex_unlock(&s->queue_lock);
 538}
 539
 540static SocketAddress *sd_server_config(QDict *options, Error **errp)
 541{
 542    QDict *server = NULL;
 543    Visitor *iv = NULL;
 544    SocketAddress *saddr = NULL;
 545
 546    qdict_extract_subqdict(options, &server, "server.");
 547
 548    iv = qobject_input_visitor_new_flat_confused(server, errp);
 549    if (!iv) {
 550        goto done;
 551    }
 552
 553    if (!visit_type_SocketAddress(iv, NULL, &saddr, errp)) {
 554        goto done;
 555    }
 556
 557done:
 558    visit_free(iv);
 559    qobject_unref(server);
 560    return saddr;
 561}
 562
 563/* Return -EIO in case of error, file descriptor on success */
 564static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
 565{
 566    int fd;
 567
 568    fd = socket_connect(s->addr, errp);
 569
 570    if (s->addr->type == SOCKET_ADDRESS_TYPE_INET && fd >= 0) {
 571        int ret = socket_set_nodelay(fd);
 572        if (ret < 0) {
 573            warn_report("can't set TCP_NODELAY: %s", strerror(errno));
 574        }
 575    }
 576
 577    if (fd >= 0) {
 578        qemu_set_nonblock(fd);
 579    } else {
 580        fd = -EIO;
 581    }
 582
 583    return fd;
 584}
 585
 586/* Return 0 on success and -errno in case of error */
 587static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
 588                                    unsigned int *wlen)
 589{
 590    int ret;
 591
 592    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
 593    if (ret != sizeof(*hdr)) {
 594        error_report("failed to send a req, %s", strerror(errno));
 595        return -errno;
 596    }
 597
 598    ret = qemu_co_send(sockfd, data, *wlen);
 599    if (ret != *wlen) {
 600        error_report("failed to send a req, %s", strerror(errno));
 601        return -errno;
 602    }
 603
 604    return ret;
 605}
 606
 607typedef struct SheepdogReqCo {
 608    int sockfd;
 609    BlockDriverState *bs;
 610    AioContext *aio_context;
 611    SheepdogReq *hdr;
 612    void *data;
 613    unsigned int *wlen;
 614    unsigned int *rlen;
 615    int ret;
 616    bool finished;
 617    Coroutine *co;
 618} SheepdogReqCo;
 619
 620static void restart_co_req(void *opaque)
 621{
 622    SheepdogReqCo *srco = opaque;
 623
 624    aio_co_wake(srco->co);
 625}
 626
 627static coroutine_fn void do_co_req(void *opaque)
 628{
 629    int ret;
 630    SheepdogReqCo *srco = opaque;
 631    int sockfd = srco->sockfd;
 632    SheepdogReq *hdr = srco->hdr;
 633    void *data = srco->data;
 634    unsigned int *wlen = srco->wlen;
 635    unsigned int *rlen = srco->rlen;
 636
 637    srco->co = qemu_coroutine_self();
 638    aio_set_fd_handler(srco->aio_context, sockfd, false,
 639                       NULL, restart_co_req, NULL, srco);
 640
 641    ret = send_co_req(sockfd, hdr, data, wlen);
 642    if (ret < 0) {
 643        goto out;
 644    }
 645
 646    aio_set_fd_handler(srco->aio_context, sockfd, false,
 647                       restart_co_req, NULL, NULL, srco);
 648
 649    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
 650    if (ret != sizeof(*hdr)) {
 651        error_report("failed to get a rsp, %s", strerror(errno));
 652        ret = -errno;
 653        goto out;
 654    }
 655
 656    if (*rlen > hdr->data_length) {
 657        *rlen = hdr->data_length;
 658    }
 659
 660    if (*rlen) {
 661        ret = qemu_co_recv(sockfd, data, *rlen);
 662        if (ret != *rlen) {
 663            error_report("failed to get the data, %s", strerror(errno));
 664            ret = -errno;
 665            goto out;
 666        }
 667    }
 668    ret = 0;
 669out:
 670    /* there is at most one request for this sockfd, so it is safe to
 671     * set each handler to NULL. */
 672    aio_set_fd_handler(srco->aio_context, sockfd, false,
 673                       NULL, NULL, NULL, NULL);
 674
 675    srco->co = NULL;
 676    srco->ret = ret;
 677    /* Set srco->finished before reading bs->wakeup.  */
 678    qatomic_mb_set(&srco->finished, true);
 679    if (srco->bs) {
 680        bdrv_wakeup(srco->bs);
 681    }
 682}
 683
 684/*
 685 * Send the request to the sheep in a synchronous manner.
 686 *
 687 * Return 0 on success, -errno in case of error.
 688 */
 689static int do_req(int sockfd, BlockDriverState *bs, SheepdogReq *hdr,
 690                  void *data, unsigned int *wlen, unsigned int *rlen)
 691{
 692    Coroutine *co;
 693    SheepdogReqCo srco = {
 694        .sockfd = sockfd,
 695        .aio_context = bs ? bdrv_get_aio_context(bs) : qemu_get_aio_context(),
 696        .bs = bs,
 697        .hdr = hdr,
 698        .data = data,
 699        .wlen = wlen,
 700        .rlen = rlen,
 701        .ret = 0,
 702        .finished = false,
 703    };
 704
 705    if (qemu_in_coroutine()) {
 706        do_co_req(&srco);
 707    } else {
 708        co = qemu_coroutine_create(do_co_req, &srco);
 709        if (bs) {
 710            bdrv_coroutine_enter(bs, co);
 711            BDRV_POLL_WHILE(bs, !srco.finished);
 712        } else {
 713            qemu_coroutine_enter(co);
 714            while (!srco.finished) {
 715                aio_poll(qemu_get_aio_context(), true);
 716            }
 717        }
 718    }
 719
 720    return srco.ret;
 721}
 722
 723static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 724                                         struct iovec *iov, int niov,
 725                                         enum AIOCBState aiocb_type);
 726static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
 727static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
 728static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
 729static void co_write_request(void *opaque);
 730
 731static coroutine_fn void reconnect_to_sdog(void *opaque)
 732{
 733    BDRVSheepdogState *s = opaque;
 734    AIOReq *aio_req, *next;
 735
 736    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
 737                       NULL, NULL, NULL);
 738    close(s->fd);
 739    s->fd = -1;
 740
 741    /* Wait for outstanding write requests to be completed. */
 742    while (s->co_send != NULL) {
 743        co_write_request(opaque);
 744    }
 745
 746    /* Try to reconnect the sheepdog server every one second. */
 747    while (s->fd < 0) {
 748        Error *local_err = NULL;
 749        s->fd = get_sheep_fd(s, &local_err);
 750        if (s->fd < 0) {
 751            trace_sheepdog_reconnect_to_sdog();
 752            error_report_err(local_err);
 753            qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, NANOSECONDS_PER_SECOND);
 754        }
 755    };
 756
 757    /*
 758     * Now we have to resend all the request in the inflight queue.  However,
 759     * resend_aioreq() can yield and newly created requests can be added to the
 760     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
 761     * have to move all the inflight requests to the failed queue before
 762     * resend_aioreq() is called.
 763     */
 764    qemu_co_mutex_lock(&s->queue_lock);
 765    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
 766        QLIST_REMOVE(aio_req, aio_siblings);
 767        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
 768    }
 769
 770    /* Resend all the failed aio requests. */
 771    while (!QLIST_EMPTY(&s->failed_aio_head)) {
 772        aio_req = QLIST_FIRST(&s->failed_aio_head);
 773        QLIST_REMOVE(aio_req, aio_siblings);
 774        qemu_co_mutex_unlock(&s->queue_lock);
 775        resend_aioreq(s, aio_req);
 776        qemu_co_mutex_lock(&s->queue_lock);
 777    }
 778    qemu_co_mutex_unlock(&s->queue_lock);
 779}
 780
 781/*
 782 * Receive responses of the I/O requests.
 783 *
 784 * This function is registered as a fd handler, and called from the
 785 * main loop when s->fd is ready for reading responses.
 786 */
 787static void coroutine_fn aio_read_response(void *opaque)
 788{
 789    SheepdogObjRsp rsp;
 790    BDRVSheepdogState *s = opaque;
 791    int fd = s->fd;
 792    int ret;
 793    AIOReq *aio_req = NULL;
 794    SheepdogAIOCB *acb;
 795    uint64_t idx;
 796
 797    /* read a header */
 798    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
 799    if (ret != sizeof(rsp)) {
 800        error_report("failed to get the header, %s", strerror(errno));
 801        goto err;
 802    }
 803
 804    /* find the right aio_req from the inflight aio list */
 805    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
 806        if (aio_req->id == rsp.id) {
 807            break;
 808        }
 809    }
 810    if (!aio_req) {
 811        error_report("cannot find aio_req %x", rsp.id);
 812        goto err;
 813    }
 814
 815    acb = aio_req->aiocb;
 816
 817    switch (acb->aiocb_type) {
 818    case AIOCB_WRITE_UDATA:
 819        if (!is_data_obj(aio_req->oid)) {
 820            break;
 821        }
 822        idx = data_oid_to_idx(aio_req->oid);
 823
 824        if (aio_req->create) {
 825            /*
 826             * If the object is newly created one, we need to update
 827             * the vdi object (metadata object).  min_dirty_data_idx
 828             * and max_dirty_data_idx are changed to include updated
 829             * index between them.
 830             */
 831            if (rsp.result == SD_RES_SUCCESS) {
 832                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
 833                acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
 834                acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
 835            }
 836        }
 837        break;
 838    case AIOCB_READ_UDATA:
 839        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
 840                            aio_req->iov_offset, rsp.data_length);
 841        if (ret != rsp.data_length) {
 842            error_report("failed to get the data, %s", strerror(errno));
 843            goto err;
 844        }
 845        break;
 846    case AIOCB_FLUSH_CACHE:
 847        if (rsp.result == SD_RES_INVALID_PARMS) {
 848            trace_sheepdog_aio_read_response();
 849            s->cache_flags = SD_FLAG_CMD_DIRECT;
 850            rsp.result = SD_RES_SUCCESS;
 851        }
 852        break;
 853    case AIOCB_DISCARD_OBJ:
 854        switch (rsp.result) {
 855        case SD_RES_INVALID_PARMS:
 856            error_report("server doesn't support discard command");
 857            rsp.result = SD_RES_SUCCESS;
 858            s->discard_supported = false;
 859            break;
 860        default:
 861            break;
 862        }
 863    }
 864
 865    /* No more data for this aio_req (reload_inode below uses its own file
 866     * descriptor handler which doesn't use co_recv).
 867    */
 868    s->co_recv = NULL;
 869
 870    qemu_co_mutex_lock(&s->queue_lock);
 871    QLIST_REMOVE(aio_req, aio_siblings);
 872    qemu_co_mutex_unlock(&s->queue_lock);
 873
 874    switch (rsp.result) {
 875    case SD_RES_SUCCESS:
 876        break;
 877    case SD_RES_READONLY:
 878        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
 879            ret = reload_inode(s, 0, "");
 880            if (ret < 0) {
 881                goto err;
 882            }
 883        }
 884        if (is_data_obj(aio_req->oid)) {
 885            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
 886                                           data_oid_to_idx(aio_req->oid));
 887        } else {
 888            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
 889        }
 890        resend_aioreq(s, aio_req);
 891        return;
 892    default:
 893        acb->ret = -EIO;
 894        error_report("%s", sd_strerror(rsp.result));
 895        break;
 896    }
 897
 898    g_free(aio_req);
 899
 900    if (!--acb->nr_pending) {
 901        /*
 902         * We've finished all requests which belong to the AIOCB, so
 903         * we can switch back to sd_co_readv/writev now.
 904         */
 905        aio_co_wake(acb->coroutine);
 906    }
 907
 908    return;
 909
 910err:
 911    reconnect_to_sdog(opaque);
 912}
 913
 914static void co_read_response(void *opaque)
 915{
 916    BDRVSheepdogState *s = opaque;
 917
 918    if (!s->co_recv) {
 919        s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
 920    }
 921
 922    aio_co_enter(s->aio_context, s->co_recv);
 923}
 924
 925static void co_write_request(void *opaque)
 926{
 927    BDRVSheepdogState *s = opaque;
 928
 929    aio_co_wake(s->co_send);
 930}
 931
 932/*
 933 * Return a socket descriptor to read/write objects.
 934 *
 935 * We cannot use this descriptor for other operations because
 936 * the block driver may be on waiting response from the server.
 937 */
 938static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
 939{
 940    int fd;
 941
 942    fd = connect_to_sdog(s, errp);
 943    if (fd < 0) {
 944        return fd;
 945    }
 946
 947    aio_set_fd_handler(s->aio_context, fd, false,
 948                       co_read_response, NULL, NULL, s);
 949    return fd;
 950}
 951
 952/*
 953 * Parse numeric snapshot ID in @str
 954 * If @str can't be parsed as number, return false.
 955 * Else, if the number is zero or too large, set *@snapid to zero and
 956 * return true.
 957 * Else, set *@snapid to the number and return true.
 958 */
 959static bool sd_parse_snapid(const char *str, uint32_t *snapid)
 960{
 961    unsigned long ul;
 962    int ret;
 963
 964    ret = qemu_strtoul(str, NULL, 10, &ul);
 965    if (ret == -ERANGE) {
 966        ul = ret = 0;
 967    }
 968    if (ret) {
 969        return false;
 970    }
 971    if (ul > UINT32_MAX) {
 972        ul = 0;
 973    }
 974
 975    *snapid = ul;
 976    return true;
 977}
 978
 979static bool sd_parse_snapid_or_tag(const char *str,
 980                                   uint32_t *snapid, char tag[])
 981{
 982    if (!sd_parse_snapid(str, snapid)) {
 983        *snapid = 0;
 984        if (g_strlcpy(tag, str, SD_MAX_VDI_TAG_LEN) >= SD_MAX_VDI_TAG_LEN) {
 985            return false;
 986        }
 987    } else if (!*snapid) {
 988        return false;
 989    } else {
 990        tag[0] = 0;
 991    }
 992    return true;
 993}
 994
 995typedef struct {
 996    const char *path;           /* non-null iff transport is tcp */
 997    const char *host;           /* valid when transport is tcp */
 998    int port;                   /* valid when transport is tcp */
 999    char vdi[SD_MAX_VDI_LEN];
1000    char tag[SD_MAX_VDI_TAG_LEN];
1001    uint32_t snap_id;
1002    /* Remainder is only for sd_config_done() */
1003    URI *uri;
1004    QueryParams *qp;
1005} SheepdogConfig;
1006
1007static void sd_config_done(SheepdogConfig *cfg)
1008{
1009    if (cfg->qp) {
1010        query_params_free(cfg->qp);
1011    }
1012    uri_free(cfg->uri);
1013}
1014
1015static void sd_parse_uri(SheepdogConfig *cfg, const char *filename,
1016                         Error **errp)
1017{
1018    Error *err = NULL;
1019    QueryParams *qp = NULL;
1020    bool is_unix;
1021    URI *uri;
1022
1023    memset(cfg, 0, sizeof(*cfg));
1024
1025    cfg->uri = uri = uri_parse(filename);
1026    if (!uri) {
1027        error_setg(&err, "invalid URI '%s'", filename);
1028        goto out;
1029    }
1030
1031    /* transport */
1032    if (!g_strcmp0(uri->scheme, "sheepdog")) {
1033        is_unix = false;
1034    } else if (!g_strcmp0(uri->scheme, "sheepdog+tcp")) {
1035        is_unix = false;
1036    } else if (!g_strcmp0(uri->scheme, "sheepdog+unix")) {
1037        is_unix = true;
1038    } else {
1039        error_setg(&err, "URI scheme must be 'sheepdog', 'sheepdog+tcp',"
1040                   " or 'sheepdog+unix'");
1041        goto out;
1042    }
1043
1044    if (uri->path == NULL || !strcmp(uri->path, "/")) {
1045        error_setg(&err, "missing file path in URI");
1046        goto out;
1047    }
1048    if (g_strlcpy(cfg->vdi, uri->path + 1, SD_MAX_VDI_LEN)
1049        >= SD_MAX_VDI_LEN) {
1050        error_setg(&err, "VDI name is too long");
1051        goto out;
1052    }
1053
1054    cfg->qp = qp = query_params_parse(uri->query);
1055
1056    if (is_unix) {
1057        /* sheepdog+unix:///vdiname?socket=path */
1058        if (uri->server || uri->port) {
1059            error_setg(&err, "URI scheme %s doesn't accept a server address",
1060                       uri->scheme);
1061            goto out;
1062        }
1063        if (!qp->n) {
1064            error_setg(&err,
1065                       "URI scheme %s requires query parameter 'socket'",
1066                       uri->scheme);
1067            goto out;
1068        }
1069        if (qp->n != 1 || strcmp(qp->p[0].name, "socket")) {
1070            error_setg(&err, "unexpected query parameters");
1071            goto out;
1072        }
1073        cfg->path = qp->p[0].value;
1074    } else {
1075        /* sheepdog[+tcp]://[host:port]/vdiname */
1076        if (qp->n) {
1077            error_setg(&err, "unexpected query parameters");
1078            goto out;
1079        }
1080        cfg->host = uri->server;
1081        cfg->port = uri->port;
1082    }
1083
1084    /* snapshot tag */
1085    if (uri->fragment) {
1086        if (!sd_parse_snapid_or_tag(uri->fragment,
1087                                    &cfg->snap_id, cfg->tag)) {
1088            error_setg(&err, "'%s' is not a valid snapshot ID",
1089                       uri->fragment);
1090            goto out;
1091        }
1092    } else {
1093        cfg->snap_id = CURRENT_VDI_ID; /* search current vdi */
1094    }
1095
1096out:
1097    if (err) {
1098        error_propagate(errp, err);
1099        sd_config_done(cfg);
1100    }
1101}
1102
1103/*
1104 * Parse a filename (old syntax)
1105 *
1106 * filename must be one of the following formats:
1107 *   1. [vdiname]
1108 *   2. [vdiname]:[snapid]
1109 *   3. [vdiname]:[tag]
1110 *   4. [hostname]:[port]:[vdiname]
1111 *   5. [hostname]:[port]:[vdiname]:[snapid]
1112 *   6. [hostname]:[port]:[vdiname]:[tag]
1113 *
1114 * You can boot from the snapshot images by specifying `snapid` or
1115 * `tag'.
1116 *
1117 * You can run VMs outside the Sheepdog cluster by specifying
1118 * `hostname' and `port' (experimental).
1119 */
1120static void parse_vdiname(SheepdogConfig *cfg, const char *filename,
1121                          Error **errp)
1122{
1123    Error *err = NULL;
1124    char *p, *q, *uri;
1125    const char *host_spec, *vdi_spec;
1126    int nr_sep;
1127
1128    strstart(filename, "sheepdog:", &filename);
1129    p = q = g_strdup(filename);
1130
1131    /* count the number of separators */
1132    nr_sep = 0;
1133    while (*p) {
1134        if (*p == ':') {
1135            nr_sep++;
1136        }
1137        p++;
1138    }
1139    p = q;
1140
1141    /* use the first two tokens as host_spec. */
1142    if (nr_sep >= 2) {
1143        host_spec = p;
1144        p = strchr(p, ':');
1145        p++;
1146        p = strchr(p, ':');
1147        *p++ = '\0';
1148    } else {
1149        host_spec = "";
1150    }
1151
1152    vdi_spec = p;
1153
1154    p = strchr(vdi_spec, ':');
1155    if (p) {
1156        *p++ = '#';
1157    }
1158
1159    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1160
1161    /*
1162     * FIXME We to escape URI meta-characters, e.g. "x?y=z"
1163     * produces "sheepdog://x?y=z".  Because of that ...
1164     */
1165    sd_parse_uri(cfg, uri, &err);
1166    if (err) {
1167        /*
1168         * ... this can fail, but the error message is misleading.
1169         * Replace it by the traditional useless one until the
1170         * escaping is fixed.
1171         */
1172        error_free(err);
1173        error_setg(errp, "Can't parse filename");
1174    }
1175
1176    g_free(q);
1177    g_free(uri);
1178}
1179
1180static void sd_parse_filename(const char *filename, QDict *options,
1181                              Error **errp)
1182{
1183    Error *err = NULL;
1184    SheepdogConfig cfg;
1185    char buf[32];
1186
1187    if (strstr(filename, "://")) {
1188        sd_parse_uri(&cfg, filename, &err);
1189    } else {
1190        parse_vdiname(&cfg, filename, &err);
1191    }
1192    if (err) {
1193        error_propagate(errp, err);
1194        return;
1195    }
1196
1197    if (cfg.path) {
1198        qdict_set_default_str(options, "server.path", cfg.path);
1199        qdict_set_default_str(options, "server.type", "unix");
1200    } else {
1201        qdict_set_default_str(options, "server.type", "inet");
1202        qdict_set_default_str(options, "server.host",
1203                              cfg.host ?: SD_DEFAULT_ADDR);
1204        snprintf(buf, sizeof(buf), "%d", cfg.port ?: SD_DEFAULT_PORT);
1205        qdict_set_default_str(options, "server.port", buf);
1206    }
1207    qdict_set_default_str(options, "vdi", cfg.vdi);
1208    qdict_set_default_str(options, "tag", cfg.tag);
1209    if (cfg.snap_id) {
1210        snprintf(buf, sizeof(buf), "%d", cfg.snap_id);
1211        qdict_set_default_str(options, "snap-id", buf);
1212    }
1213
1214    sd_config_done(&cfg);
1215}
1216
1217static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1218                         uint32_t snapid, const char *tag, uint32_t *vid,
1219                         bool lock, Error **errp)
1220{
1221    int ret, fd;
1222    SheepdogVdiReq hdr;
1223    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1224    unsigned int wlen, rlen = 0;
1225    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN] QEMU_NONSTRING;
1226
1227    fd = connect_to_sdog(s, errp);
1228    if (fd < 0) {
1229        return fd;
1230    }
1231
1232    /* This pair of strncpy calls ensures that the buffer is zero-filled,
1233     * which is desirable since we'll soon be sending those bytes, and
1234     * don't want the send_req to read uninitialized data.
1235     */
1236    strncpy(buf, filename, SD_MAX_VDI_LEN);
1237    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1238
1239    memset(&hdr, 0, sizeof(hdr));
1240    if (lock) {
1241        hdr.opcode = SD_OP_LOCK_VDI;
1242        hdr.type = LOCK_TYPE_NORMAL;
1243    } else {
1244        hdr.opcode = SD_OP_GET_VDI_INFO;
1245    }
1246    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1247    hdr.proto_ver = SD_PROTO_VER;
1248    hdr.data_length = wlen;
1249    hdr.snapid = snapid;
1250    hdr.flags = SD_FLAG_CMD_WRITE;
1251
1252    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1253    if (ret) {
1254        error_setg_errno(errp, -ret, "cannot get vdi info");
1255        goto out;
1256    }
1257
1258    if (rsp->result != SD_RES_SUCCESS) {
1259        error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1260                   sd_strerror(rsp->result), filename, snapid, tag);
1261        if (rsp->result == SD_RES_NO_VDI) {
1262            ret = -ENOENT;
1263        } else if (rsp->result == SD_RES_VDI_LOCKED) {
1264            ret = -EBUSY;
1265        } else {
1266            ret = -EIO;
1267        }
1268        goto out;
1269    }
1270    *vid = rsp->vdi_id;
1271
1272    ret = 0;
1273out:
1274    closesocket(fd);
1275    return ret;
1276}
1277
1278static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1279                                         struct iovec *iov, int niov,
1280                                         enum AIOCBState aiocb_type)
1281{
1282    int nr_copies = s->inode.nr_copies;
1283    SheepdogObjReq hdr;
1284    unsigned int wlen = 0;
1285    int ret;
1286    uint64_t oid = aio_req->oid;
1287    unsigned int datalen = aio_req->data_len;
1288    uint64_t offset = aio_req->offset;
1289    uint8_t flags = aio_req->flags;
1290    uint64_t old_oid = aio_req->base_oid;
1291    bool create = aio_req->create;
1292
1293    qemu_co_mutex_lock(&s->queue_lock);
1294    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1295    qemu_co_mutex_unlock(&s->queue_lock);
1296
1297    if (!nr_copies) {
1298        error_report("bug");
1299    }
1300
1301    memset(&hdr, 0, sizeof(hdr));
1302
1303    switch (aiocb_type) {
1304    case AIOCB_FLUSH_CACHE:
1305        hdr.opcode = SD_OP_FLUSH_VDI;
1306        break;
1307    case AIOCB_READ_UDATA:
1308        hdr.opcode = SD_OP_READ_OBJ;
1309        hdr.flags = flags;
1310        break;
1311    case AIOCB_WRITE_UDATA:
1312        if (create) {
1313            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1314        } else {
1315            hdr.opcode = SD_OP_WRITE_OBJ;
1316        }
1317        wlen = datalen;
1318        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1319        break;
1320    case AIOCB_DISCARD_OBJ:
1321        hdr.opcode = SD_OP_WRITE_OBJ;
1322        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1323        s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1324        offset = offsetof(SheepdogInode,
1325                          data_vdi_id[data_oid_to_idx(oid)]);
1326        oid = vid_to_vdi_oid(s->inode.vdi_id);
1327        wlen = datalen = sizeof(uint32_t);
1328        break;
1329    }
1330
1331    if (s->cache_flags) {
1332        hdr.flags |= s->cache_flags;
1333    }
1334
1335    hdr.oid = oid;
1336    hdr.cow_oid = old_oid;
1337    hdr.copies = s->inode.nr_copies;
1338
1339    hdr.data_length = datalen;
1340    hdr.offset = offset;
1341
1342    hdr.id = aio_req->id;
1343
1344    qemu_co_mutex_lock(&s->lock);
1345    s->co_send = qemu_coroutine_self();
1346    aio_set_fd_handler(s->aio_context, s->fd, false,
1347                       co_read_response, co_write_request, NULL, s);
1348    socket_set_cork(s->fd, 1);
1349
1350    /* send a header */
1351    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1352    if (ret != sizeof(hdr)) {
1353        error_report("failed to send a req, %s", strerror(errno));
1354        goto out;
1355    }
1356
1357    if (wlen) {
1358        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1359        if (ret != wlen) {
1360            error_report("failed to send a data, %s", strerror(errno));
1361        }
1362    }
1363out:
1364    socket_set_cork(s->fd, 0);
1365    aio_set_fd_handler(s->aio_context, s->fd, false,
1366                       co_read_response, NULL, NULL, s);
1367    s->co_send = NULL;
1368    qemu_co_mutex_unlock(&s->lock);
1369}
1370
1371static int read_write_object(int fd, BlockDriverState *bs, char *buf,
1372                             uint64_t oid, uint8_t copies,
1373                             unsigned int datalen, uint64_t offset,
1374                             bool write, bool create, uint32_t cache_flags)
1375{
1376    SheepdogObjReq hdr;
1377    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1378    unsigned int wlen, rlen;
1379    int ret;
1380
1381    memset(&hdr, 0, sizeof(hdr));
1382
1383    if (write) {
1384        wlen = datalen;
1385        rlen = 0;
1386        hdr.flags = SD_FLAG_CMD_WRITE;
1387        if (create) {
1388            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1389        } else {
1390            hdr.opcode = SD_OP_WRITE_OBJ;
1391        }
1392    } else {
1393        wlen = 0;
1394        rlen = datalen;
1395        hdr.opcode = SD_OP_READ_OBJ;
1396    }
1397
1398    hdr.flags |= cache_flags;
1399
1400    hdr.oid = oid;
1401    hdr.data_length = datalen;
1402    hdr.offset = offset;
1403    hdr.copies = copies;
1404
1405    ret = do_req(fd, bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1406    if (ret) {
1407        error_report("failed to send a request to the sheep");
1408        return ret;
1409    }
1410
1411    switch (rsp->result) {
1412    case SD_RES_SUCCESS:
1413        return 0;
1414    default:
1415        error_report("%s", sd_strerror(rsp->result));
1416        return -EIO;
1417    }
1418}
1419
1420static int read_object(int fd, BlockDriverState *bs, char *buf,
1421                       uint64_t oid, uint8_t copies,
1422                       unsigned int datalen, uint64_t offset,
1423                       uint32_t cache_flags)
1424{
1425    return read_write_object(fd, bs, buf, oid, copies,
1426                             datalen, offset, false,
1427                             false, cache_flags);
1428}
1429
1430static int write_object(int fd, BlockDriverState *bs, char *buf,
1431                        uint64_t oid, uint8_t copies,
1432                        unsigned int datalen, uint64_t offset, bool create,
1433                        uint32_t cache_flags)
1434{
1435    return read_write_object(fd, bs, buf, oid, copies,
1436                             datalen, offset, true,
1437                             create, cache_flags);
1438}
1439
1440/* update inode with the latest state */
1441static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1442{
1443    Error *local_err = NULL;
1444    SheepdogInode *inode;
1445    int ret = 0, fd;
1446    uint32_t vid = 0;
1447
1448    fd = connect_to_sdog(s, &local_err);
1449    if (fd < 0) {
1450        error_report_err(local_err);
1451        return -EIO;
1452    }
1453
1454    inode = g_malloc(SD_INODE_HEADER_SIZE);
1455
1456    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1457    if (ret) {
1458        error_report_err(local_err);
1459        goto out;
1460    }
1461
1462    ret = read_object(fd, s->bs, (char *)inode, vid_to_vdi_oid(vid),
1463                      s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1464                      s->cache_flags);
1465    if (ret < 0) {
1466        goto out;
1467    }
1468
1469    if (inode->vdi_id != s->inode.vdi_id) {
1470        memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1471    }
1472
1473out:
1474    g_free(inode);
1475    closesocket(fd);
1476
1477    return ret;
1478}
1479
1480static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1481{
1482    SheepdogAIOCB *acb = aio_req->aiocb;
1483
1484    aio_req->create = false;
1485
1486    /* check whether this request becomes a CoW one */
1487    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1488        int idx = data_oid_to_idx(aio_req->oid);
1489
1490        if (is_data_obj_writable(&s->inode, idx)) {
1491            goto out;
1492        }
1493
1494        if (s->inode.data_vdi_id[idx]) {
1495            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1496            aio_req->flags |= SD_FLAG_CMD_COW;
1497        }
1498        aio_req->create = true;
1499    }
1500out:
1501    if (is_data_obj(aio_req->oid)) {
1502        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1503                        acb->aiocb_type);
1504    } else {
1505        struct iovec iov;
1506        iov.iov_base = &s->inode;
1507        iov.iov_len = sizeof(s->inode);
1508        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1509    }
1510}
1511
1512static void sd_detach_aio_context(BlockDriverState *bs)
1513{
1514    BDRVSheepdogState *s = bs->opaque;
1515
1516    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1517                       NULL, NULL, NULL);
1518}
1519
1520static void sd_attach_aio_context(BlockDriverState *bs,
1521                                  AioContext *new_context)
1522{
1523    BDRVSheepdogState *s = bs->opaque;
1524
1525    s->aio_context = new_context;
1526    aio_set_fd_handler(new_context, s->fd, false,
1527                       co_read_response, NULL, NULL, s);
1528}
1529
1530static QemuOptsList runtime_opts = {
1531    .name = "sheepdog",
1532    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1533    .desc = {
1534        {
1535            .name = "vdi",
1536            .type = QEMU_OPT_STRING,
1537        },
1538        {
1539            .name = "snap-id",
1540            .type = QEMU_OPT_NUMBER,
1541        },
1542        {
1543            .name = "tag",
1544            .type = QEMU_OPT_STRING,
1545        },
1546        { /* end of list */ }
1547    },
1548};
1549
1550static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1551                   Error **errp)
1552{
1553    int ret, fd;
1554    uint32_t vid = 0;
1555    BDRVSheepdogState *s = bs->opaque;
1556    const char *vdi, *snap_id_str, *tag;
1557    uint64_t snap_id;
1558    char *buf = NULL;
1559    QemuOpts *opts;
1560
1561    deprecation_warning();
1562
1563    s->bs = bs;
1564    s->aio_context = bdrv_get_aio_context(bs);
1565
1566    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1567    if (!qemu_opts_absorb_qdict(opts, options, errp)) {
1568        ret = -EINVAL;
1569        goto err_no_fd;
1570    }
1571
1572    s->addr = sd_server_config(options, errp);
1573    if (!s->addr) {
1574        ret = -EINVAL;
1575        goto err_no_fd;
1576    }
1577
1578    vdi = qemu_opt_get(opts, "vdi");
1579    snap_id_str = qemu_opt_get(opts, "snap-id");
1580    snap_id = qemu_opt_get_number(opts, "snap-id", CURRENT_VDI_ID);
1581    tag = qemu_opt_get(opts, "tag");
1582
1583    if (!vdi) {
1584        error_setg(errp, "parameter 'vdi' is missing");
1585        ret = -EINVAL;
1586        goto err_no_fd;
1587    }
1588    if (strlen(vdi) >= SD_MAX_VDI_LEN) {
1589        error_setg(errp, "value of parameter 'vdi' is too long");
1590        ret = -EINVAL;
1591        goto err_no_fd;
1592    }
1593
1594    if (snap_id > UINT32_MAX) {
1595        snap_id = 0;
1596    }
1597    if (snap_id_str && !snap_id) {
1598        error_setg(errp, "'snap-id=%s' is not a valid snapshot ID",
1599                   snap_id_str);
1600        ret = -EINVAL;
1601        goto err_no_fd;
1602    }
1603
1604    if (!tag) {
1605        tag = "";
1606    }
1607    if (strlen(tag) >= SD_MAX_VDI_TAG_LEN) {
1608        error_setg(errp, "value of parameter 'tag' is too long");
1609        ret = -EINVAL;
1610        goto err_no_fd;
1611    }
1612
1613    QLIST_INIT(&s->inflight_aio_head);
1614    QLIST_INIT(&s->failed_aio_head);
1615    QLIST_INIT(&s->inflight_aiocb_head);
1616
1617    s->fd = get_sheep_fd(s, errp);
1618    if (s->fd < 0) {
1619        ret = s->fd;
1620        goto err_no_fd;
1621    }
1622
1623    ret = find_vdi_name(s, vdi, (uint32_t)snap_id, tag, &vid, true, errp);
1624    if (ret) {
1625        goto err;
1626    }
1627
1628    /*
1629     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1630     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1631     */
1632    s->cache_flags = SD_FLAG_CMD_CACHE;
1633    if (flags & BDRV_O_NOCACHE) {
1634        s->cache_flags = SD_FLAG_CMD_DIRECT;
1635    }
1636    s->discard_supported = true;
1637
1638    if (snap_id || tag[0]) {
1639        trace_sheepdog_open(vid);
1640        s->is_snapshot = true;
1641    }
1642
1643    fd = connect_to_sdog(s, errp);
1644    if (fd < 0) {
1645        ret = fd;
1646        goto err;
1647    }
1648
1649    buf = g_malloc(SD_INODE_SIZE);
1650    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
1651                      0, SD_INODE_SIZE, 0, s->cache_flags);
1652
1653    closesocket(fd);
1654
1655    if (ret) {
1656        error_setg(errp, "Can't read snapshot inode");
1657        goto err;
1658    }
1659
1660    memcpy(&s->inode, buf, sizeof(s->inode));
1661
1662    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1663    bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1664    pstrcpy(s->name, sizeof(s->name), vdi);
1665    qemu_co_mutex_init(&s->lock);
1666    qemu_co_mutex_init(&s->queue_lock);
1667    qemu_co_queue_init(&s->overlapping_queue);
1668    qemu_opts_del(opts);
1669    g_free(buf);
1670    return 0;
1671
1672err:
1673    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1674                       false, NULL, NULL, NULL, NULL);
1675    closesocket(s->fd);
1676err_no_fd:
1677    qemu_opts_del(opts);
1678    g_free(buf);
1679    return ret;
1680}
1681
1682static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1683                             Error **errp)
1684{
1685    BDRVSheepdogState *s = state->bs->opaque;
1686    BDRVSheepdogReopenState *re_s;
1687    int ret = 0;
1688
1689    re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1690
1691    re_s->cache_flags = SD_FLAG_CMD_CACHE;
1692    if (state->flags & BDRV_O_NOCACHE) {
1693        re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1694    }
1695
1696    re_s->fd = get_sheep_fd(s, errp);
1697    if (re_s->fd < 0) {
1698        ret = re_s->fd;
1699        return ret;
1700    }
1701
1702    return ret;
1703}
1704
1705static void sd_reopen_commit(BDRVReopenState *state)
1706{
1707    BDRVSheepdogReopenState *re_s = state->opaque;
1708    BDRVSheepdogState *s = state->bs->opaque;
1709
1710    if (s->fd) {
1711        aio_set_fd_handler(s->aio_context, s->fd, false,
1712                           NULL, NULL, NULL, NULL);
1713        closesocket(s->fd);
1714    }
1715
1716    s->fd = re_s->fd;
1717    s->cache_flags = re_s->cache_flags;
1718
1719    g_free(state->opaque);
1720    state->opaque = NULL;
1721
1722    return;
1723}
1724
1725static void sd_reopen_abort(BDRVReopenState *state)
1726{
1727    BDRVSheepdogReopenState *re_s = state->opaque;
1728    BDRVSheepdogState *s = state->bs->opaque;
1729
1730    if (re_s == NULL) {
1731        return;
1732    }
1733
1734    if (re_s->fd) {
1735        aio_set_fd_handler(s->aio_context, re_s->fd, false,
1736                           NULL, NULL, NULL, NULL);
1737        closesocket(re_s->fd);
1738    }
1739
1740    g_free(state->opaque);
1741    state->opaque = NULL;
1742
1743    return;
1744}
1745
1746static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1747                        Error **errp)
1748{
1749    SheepdogVdiReq hdr;
1750    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1751    int fd, ret;
1752    unsigned int wlen, rlen = 0;
1753    char buf[SD_MAX_VDI_LEN];
1754
1755    fd = connect_to_sdog(s, errp);
1756    if (fd < 0) {
1757        return fd;
1758    }
1759
1760    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1761     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1762     */
1763    memset(buf, 0, sizeof(buf));
1764    pstrcpy(buf, sizeof(buf), s->name);
1765
1766    memset(&hdr, 0, sizeof(hdr));
1767    hdr.opcode = SD_OP_NEW_VDI;
1768    hdr.base_vdi_id = s->inode.vdi_id;
1769
1770    wlen = SD_MAX_VDI_LEN;
1771
1772    hdr.flags = SD_FLAG_CMD_WRITE;
1773    hdr.snapid = snapshot;
1774
1775    hdr.data_length = wlen;
1776    hdr.vdi_size = s->inode.vdi_size;
1777    hdr.copy_policy = s->inode.copy_policy;
1778    hdr.copies = s->inode.nr_copies;
1779    hdr.block_size_shift = s->inode.block_size_shift;
1780
1781    ret = do_req(fd, NULL, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1782
1783    closesocket(fd);
1784
1785    if (ret) {
1786        error_setg_errno(errp, -ret, "create failed");
1787        return ret;
1788    }
1789
1790    if (rsp->result != SD_RES_SUCCESS) {
1791        error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1792        return -EIO;
1793    }
1794
1795    if (vdi_id) {
1796        *vdi_id = rsp->vdi_id;
1797    }
1798
1799    return 0;
1800}
1801
1802static int sd_prealloc(BlockDriverState *bs, int64_t old_size, int64_t new_size,
1803                       Error **errp)
1804{
1805    BlockBackend *blk = NULL;
1806    BDRVSheepdogState *base = bs->opaque;
1807    unsigned long buf_size;
1808    uint32_t idx, max_idx;
1809    uint32_t object_size;
1810    void *buf = NULL;
1811    int ret;
1812
1813    blk = blk_new_with_bs(bs,
1814                          BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE | BLK_PERM_RESIZE,
1815                          BLK_PERM_ALL, errp);
1816
1817    if (!blk) {
1818        ret = -EPERM;
1819        goto out_with_err_set;
1820    }
1821
1822    blk_set_allow_write_beyond_eof(blk, true);
1823
1824    object_size = (UINT32_C(1) << base->inode.block_size_shift);
1825    buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1826    buf = g_malloc0(buf_size);
1827
1828    max_idx = DIV_ROUND_UP(new_size, buf_size);
1829
1830    for (idx = old_size / buf_size; idx < max_idx; idx++) {
1831        /*
1832         * The created image can be a cloned image, so we need to read
1833         * a data from the source image.
1834         */
1835        ret = blk_pread(blk, idx * buf_size, buf, buf_size);
1836        if (ret < 0) {
1837            goto out;
1838        }
1839        ret = blk_pwrite(blk, idx * buf_size, buf, buf_size, 0);
1840        if (ret < 0) {
1841            goto out;
1842        }
1843    }
1844
1845    ret = 0;
1846out:
1847    if (ret < 0) {
1848        error_setg_errno(errp, -ret, "Can't pre-allocate");
1849    }
1850out_with_err_set:
1851    blk_unref(blk);
1852    g_free(buf);
1853
1854    return ret;
1855}
1856
1857static int sd_create_prealloc(BlockdevOptionsSheepdog *location, int64_t size,
1858                              Error **errp)
1859{
1860    BlockDriverState *bs;
1861    Visitor *v;
1862    QObject *obj = NULL;
1863    QDict *qdict;
1864    int ret;
1865
1866    v = qobject_output_visitor_new(&obj);
1867    visit_type_BlockdevOptionsSheepdog(v, NULL, &location, &error_abort);
1868    visit_free(v);
1869
1870    qdict = qobject_to(QDict, obj);
1871    qdict_flatten(qdict);
1872
1873    qdict_put_str(qdict, "driver", "sheepdog");
1874
1875    bs = bdrv_open(NULL, NULL, qdict, BDRV_O_PROTOCOL | BDRV_O_RDWR, errp);
1876    if (bs == NULL) {
1877        ret = -EIO;
1878        goto fail;
1879    }
1880
1881    ret = sd_prealloc(bs, 0, size, errp);
1882fail:
1883    bdrv_unref(bs);
1884    qobject_unref(qdict);
1885    return ret;
1886}
1887
1888static int parse_redundancy(BDRVSheepdogState *s, SheepdogRedundancy *opt)
1889{
1890    struct SheepdogInode *inode = &s->inode;
1891
1892    switch (opt->type) {
1893    case SHEEPDOG_REDUNDANCY_TYPE_FULL:
1894        if (opt->u.full.copies > SD_MAX_COPIES || opt->u.full.copies < 1) {
1895            return -EINVAL;
1896        }
1897        inode->copy_policy = 0;
1898        inode->nr_copies = opt->u.full.copies;
1899        return 0;
1900
1901    case SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED:
1902    {
1903        int64_t copy = opt->u.erasure_coded.data_strips;
1904        int64_t parity = opt->u.erasure_coded.parity_strips;
1905
1906        if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1907            return -EINVAL;
1908        }
1909
1910        if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1911            return -EINVAL;
1912        }
1913
1914        /*
1915         * 4 bits for parity and 4 bits for data.
1916         * We have to compress upper data bits because it can't represent 16
1917         */
1918        inode->copy_policy = ((copy / 2) << 4) + parity;
1919        inode->nr_copies = copy + parity;
1920        return 0;
1921    }
1922
1923    default:
1924        g_assert_not_reached();
1925    }
1926
1927    return -EINVAL;
1928}
1929
1930/*
1931 * Sheepdog support two kinds of redundancy, full replication and erasure
1932 * coding.
1933 *
1934 * # create a fully replicated vdi with x copies
1935 * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1936 *
1937 * # create a erasure coded vdi with x data strips and y parity strips
1938 * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1939 */
1940static SheepdogRedundancy *parse_redundancy_str(const char *opt)
1941{
1942    SheepdogRedundancy *redundancy;
1943    const char *n1, *n2;
1944    long copy, parity;
1945    char p[10];
1946    int ret;
1947
1948    pstrcpy(p, sizeof(p), opt);
1949    n1 = strtok(p, ":");
1950    n2 = strtok(NULL, ":");
1951
1952    if (!n1) {
1953        return NULL;
1954    }
1955
1956    ret = qemu_strtol(n1, NULL, 10, &copy);
1957    if (ret < 0) {
1958        return NULL;
1959    }
1960
1961    redundancy = g_new0(SheepdogRedundancy, 1);
1962    if (!n2) {
1963        *redundancy = (SheepdogRedundancy) {
1964            .type               = SHEEPDOG_REDUNDANCY_TYPE_FULL,
1965            .u.full.copies      = copy,
1966        };
1967    } else {
1968        ret = qemu_strtol(n2, NULL, 10, &parity);
1969        if (ret < 0) {
1970            g_free(redundancy);
1971            return NULL;
1972        }
1973
1974        *redundancy = (SheepdogRedundancy) {
1975            .type               = SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED,
1976            .u.erasure_coded    = {
1977                .data_strips    = copy,
1978                .parity_strips  = parity,
1979            },
1980        };
1981    }
1982
1983    return redundancy;
1984}
1985
1986static int parse_block_size_shift(BDRVSheepdogState *s,
1987                                  BlockdevCreateOptionsSheepdog *opts)
1988{
1989    struct SheepdogInode *inode = &s->inode;
1990    uint64_t object_size;
1991    int obj_order;
1992
1993    if (opts->has_object_size) {
1994        object_size = opts->object_size;
1995
1996        if ((object_size - 1) & object_size) {    /* not a power of 2? */
1997            return -EINVAL;
1998        }
1999        obj_order = ctz32(object_size);
2000        if (obj_order < 20 || obj_order > 31) {
2001            return -EINVAL;
2002        }
2003        inode->block_size_shift = (uint8_t)obj_order;
2004    }
2005
2006    return 0;
2007}
2008
2009static int sd_co_create(BlockdevCreateOptions *options, Error **errp)
2010{
2011    BlockdevCreateOptionsSheepdog *opts = &options->u.sheepdog;
2012    int ret = 0;
2013    uint32_t vid = 0;
2014    char *backing_file = NULL;
2015    char *buf = NULL;
2016    BDRVSheepdogState *s;
2017    uint64_t max_vdi_size;
2018    bool prealloc = false;
2019
2020    assert(options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2021
2022    deprecation_warning();
2023
2024    s = g_new0(BDRVSheepdogState, 1);
2025
2026    /* Steal SocketAddress from QAPI, set NULL to prevent double free */
2027    s->addr = opts->location->server;
2028    opts->location->server = NULL;
2029
2030    if (strlen(opts->location->vdi) >= sizeof(s->name)) {
2031        error_setg(errp, "'vdi' string too long");
2032        ret = -EINVAL;
2033        goto out;
2034    }
2035    pstrcpy(s->name, sizeof(s->name), opts->location->vdi);
2036
2037    s->inode.vdi_size = opts->size;
2038    backing_file = opts->backing_file;
2039
2040    if (!opts->has_preallocation) {
2041        opts->preallocation = PREALLOC_MODE_OFF;
2042    }
2043    switch (opts->preallocation) {
2044    case PREALLOC_MODE_OFF:
2045        prealloc = false;
2046        break;
2047    case PREALLOC_MODE_FULL:
2048        prealloc = true;
2049        break;
2050    default:
2051        error_setg(errp, "Preallocation mode not supported for Sheepdog");
2052        ret = -EINVAL;
2053        goto out;
2054    }
2055
2056    if (opts->has_redundancy) {
2057        ret = parse_redundancy(s, opts->redundancy);
2058        if (ret < 0) {
2059            error_setg(errp, "Invalid redundancy mode");
2060            goto out;
2061        }
2062    }
2063    ret = parse_block_size_shift(s, opts);
2064    if (ret < 0) {
2065        error_setg(errp, "Invalid object_size."
2066                         " obect_size needs to be power of 2"
2067                         " and be limited from 2^20 to 2^31");
2068        goto out;
2069    }
2070
2071    if (opts->has_backing_file) {
2072        BlockBackend *blk;
2073        BDRVSheepdogState *base;
2074        BlockDriver *drv;
2075
2076        /* Currently, only Sheepdog backing image is supported. */
2077        drv = bdrv_find_protocol(opts->backing_file, true, NULL);
2078        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
2079            error_setg(errp, "backing_file must be a sheepdog image");
2080            ret = -EINVAL;
2081            goto out;
2082        }
2083
2084        blk = blk_new_open(opts->backing_file, NULL, NULL,
2085                           BDRV_O_PROTOCOL, errp);
2086        if (blk == NULL) {
2087            ret = -EIO;
2088            goto out;
2089        }
2090
2091        base = blk_bs(blk)->opaque;
2092
2093        if (!is_snapshot(&base->inode)) {
2094            error_setg(errp, "cannot clone from a non snapshot vdi");
2095            blk_unref(blk);
2096            ret = -EINVAL;
2097            goto out;
2098        }
2099        s->inode.vdi_id = base->inode.vdi_id;
2100        blk_unref(blk);
2101    }
2102
2103    s->aio_context = qemu_get_aio_context();
2104
2105    /* if block_size_shift is not specified, get cluster default value */
2106    if (s->inode.block_size_shift == 0) {
2107        SheepdogVdiReq hdr;
2108        SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
2109        int fd;
2110        unsigned int wlen = 0, rlen = 0;
2111
2112        fd = connect_to_sdog(s, errp);
2113        if (fd < 0) {
2114            ret = fd;
2115            goto out;
2116        }
2117
2118        memset(&hdr, 0, sizeof(hdr));
2119        hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
2120        hdr.proto_ver = SD_PROTO_VER;
2121
2122        ret = do_req(fd, NULL, (SheepdogReq *)&hdr,
2123                     NULL, &wlen, &rlen);
2124        closesocket(fd);
2125        if (ret) {
2126            error_setg_errno(errp, -ret, "failed to get cluster default");
2127            goto out;
2128        }
2129        if (rsp->result == SD_RES_SUCCESS) {
2130            s->inode.block_size_shift = rsp->block_size_shift;
2131        } else {
2132            s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
2133        }
2134    }
2135
2136    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2137
2138    if (s->inode.vdi_size > max_vdi_size) {
2139        error_setg(errp, "An image is too large."
2140                         " The maximum image size is %"PRIu64 "GB",
2141                         max_vdi_size / 1024 / 1024 / 1024);
2142        ret = -EINVAL;
2143        goto out;
2144    }
2145
2146    ret = do_sd_create(s, &vid, 0, errp);
2147    if (ret) {
2148        goto out;
2149    }
2150
2151    if (prealloc) {
2152        ret = sd_create_prealloc(opts->location, opts->size, errp);
2153    }
2154out:
2155    g_free(backing_file);
2156    g_free(buf);
2157    g_free(s->addr);
2158    g_free(s);
2159    return ret;
2160}
2161
2162static int coroutine_fn sd_co_create_opts(BlockDriver *drv,
2163                                          const char *filename,
2164                                          QemuOpts *opts,
2165                                          Error **errp)
2166{
2167    BlockdevCreateOptions *create_options = NULL;
2168    QDict *qdict = NULL, *location_qdict;
2169    Visitor *v;
2170    char *redundancy = NULL;
2171    Error *local_err = NULL;
2172    int ret;
2173    char *backing_fmt = NULL;
2174
2175    redundancy = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
2176    backing_fmt = qemu_opt_get_del(opts, BLOCK_OPT_BACKING_FMT);
2177
2178    if (backing_fmt && strcmp(backing_fmt, "sheepdog") != 0) {
2179        error_setg(errp, "backing_file must be a sheepdog image");
2180        ret = -EINVAL;
2181        goto fail;
2182    }
2183
2184    qdict = qemu_opts_to_qdict(opts, NULL);
2185    qdict_put_str(qdict, "driver", "sheepdog");
2186
2187    location_qdict = qdict_new();
2188    qdict_put(qdict, "location", location_qdict);
2189
2190    sd_parse_filename(filename, location_qdict, &local_err);
2191    if (local_err) {
2192        error_propagate(errp, local_err);
2193        ret = -EINVAL;
2194        goto fail;
2195    }
2196
2197    qdict_flatten(qdict);
2198
2199    /* Change legacy command line options into QMP ones */
2200    static const QDictRenames opt_renames[] = {
2201        { BLOCK_OPT_BACKING_FILE,       "backing-file" },
2202        { BLOCK_OPT_OBJECT_SIZE,        "object-size" },
2203        { NULL, NULL },
2204    };
2205
2206    if (!qdict_rename_keys(qdict, opt_renames, errp)) {
2207        ret = -EINVAL;
2208        goto fail;
2209    }
2210
2211    /* Get the QAPI object */
2212    v = qobject_input_visitor_new_flat_confused(qdict, errp);
2213    if (!v) {
2214        ret = -EINVAL;
2215        goto fail;
2216    }
2217
2218    visit_type_BlockdevCreateOptions(v, NULL, &create_options, errp);
2219    visit_free(v);
2220    if (!create_options) {
2221        ret = -EINVAL;
2222        goto fail;
2223    }
2224
2225    assert(create_options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2226    create_options->u.sheepdog.size =
2227        ROUND_UP(create_options->u.sheepdog.size, BDRV_SECTOR_SIZE);
2228
2229    if (redundancy) {
2230        create_options->u.sheepdog.has_redundancy = true;
2231        create_options->u.sheepdog.redundancy =
2232            parse_redundancy_str(redundancy);
2233        if (create_options->u.sheepdog.redundancy == NULL) {
2234            error_setg(errp, "Invalid redundancy mode");
2235            ret = -EINVAL;
2236            goto fail;
2237        }
2238    }
2239
2240    ret = sd_co_create(create_options, errp);
2241fail:
2242    qapi_free_BlockdevCreateOptions(create_options);
2243    qobject_unref(qdict);
2244    g_free(redundancy);
2245    g_free(backing_fmt);
2246    return ret;
2247}
2248
2249static void sd_close(BlockDriverState *bs)
2250{
2251    Error *local_err = NULL;
2252    BDRVSheepdogState *s = bs->opaque;
2253    SheepdogVdiReq hdr;
2254    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2255    unsigned int wlen, rlen = 0;
2256    int fd, ret;
2257
2258    trace_sheepdog_close(s->name);
2259
2260    fd = connect_to_sdog(s, &local_err);
2261    if (fd < 0) {
2262        error_report_err(local_err);
2263        return;
2264    }
2265
2266    memset(&hdr, 0, sizeof(hdr));
2267
2268    hdr.opcode = SD_OP_RELEASE_VDI;
2269    hdr.type = LOCK_TYPE_NORMAL;
2270    hdr.base_vdi_id = s->inode.vdi_id;
2271    wlen = strlen(s->name) + 1;
2272    hdr.data_length = wlen;
2273    hdr.flags = SD_FLAG_CMD_WRITE;
2274
2275    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2276                 s->name, &wlen, &rlen);
2277
2278    closesocket(fd);
2279
2280    if (!ret && rsp->result != SD_RES_SUCCESS &&
2281        rsp->result != SD_RES_VDI_NOT_LOCKED) {
2282        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2283    }
2284
2285    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
2286                       false, NULL, NULL, NULL, NULL);
2287    closesocket(s->fd);
2288    qapi_free_SocketAddress(s->addr);
2289}
2290
2291static int64_t sd_getlength(BlockDriverState *bs)
2292{
2293    BDRVSheepdogState *s = bs->opaque;
2294
2295    return s->inode.vdi_size;
2296}
2297
2298static int coroutine_fn sd_co_truncate(BlockDriverState *bs, int64_t offset,
2299                                       bool exact, PreallocMode prealloc,
2300                                       BdrvRequestFlags flags, Error **errp)
2301{
2302    BDRVSheepdogState *s = bs->opaque;
2303    int ret, fd;
2304    unsigned int datalen;
2305    uint64_t max_vdi_size;
2306    int64_t old_size = s->inode.vdi_size;
2307
2308    if (prealloc != PREALLOC_MODE_OFF && prealloc != PREALLOC_MODE_FULL) {
2309        error_setg(errp, "Unsupported preallocation mode '%s'",
2310                   PreallocMode_str(prealloc));
2311        return -ENOTSUP;
2312    }
2313
2314    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2315    if (offset < old_size) {
2316        error_setg(errp, "shrinking is not supported");
2317        return -EINVAL;
2318    } else if (offset > max_vdi_size) {
2319        error_setg(errp, "too big image size");
2320        return -EINVAL;
2321    }
2322
2323    fd = connect_to_sdog(s, errp);
2324    if (fd < 0) {
2325        return fd;
2326    }
2327
2328    /* we don't need to update entire object */
2329    datalen = SD_INODE_HEADER_SIZE;
2330    s->inode.vdi_size = offset;
2331    ret = write_object(fd, s->bs, (char *)&s->inode,
2332                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2333                       datalen, 0, false, s->cache_flags);
2334    close(fd);
2335
2336    if (ret < 0) {
2337        error_setg_errno(errp, -ret, "failed to update an inode");
2338        return ret;
2339    }
2340
2341    if (prealloc == PREALLOC_MODE_FULL) {
2342        ret = sd_prealloc(bs, old_size, offset, errp);
2343        if (ret < 0) {
2344            return ret;
2345        }
2346    }
2347
2348    return 0;
2349}
2350
2351/*
2352 * This function is called after writing data objects.  If we need to
2353 * update metadata, this sends a write request to the vdi object.
2354 */
2355static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2356{
2357    BDRVSheepdogState *s = acb->s;
2358    struct iovec iov;
2359    AIOReq *aio_req;
2360    uint32_t offset, data_len, mn, mx;
2361
2362    mn = acb->min_dirty_data_idx;
2363    mx = acb->max_dirty_data_idx;
2364    if (mn <= mx) {
2365        /* we need to update the vdi object. */
2366        ++acb->nr_pending;
2367        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2368            mn * sizeof(s->inode.data_vdi_id[0]);
2369        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2370
2371        acb->min_dirty_data_idx = UINT32_MAX;
2372        acb->max_dirty_data_idx = 0;
2373
2374        iov.iov_base = &s->inode;
2375        iov.iov_len = sizeof(s->inode);
2376        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2377                                data_len, offset, 0, false, 0, offset);
2378        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2379        if (--acb->nr_pending) {
2380            qemu_coroutine_yield();
2381        }
2382    }
2383}
2384
2385/* Delete current working VDI on the snapshot chain */
2386static bool sd_delete(BDRVSheepdogState *s)
2387{
2388    Error *local_err = NULL;
2389    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2390    SheepdogVdiReq hdr = {
2391        .opcode = SD_OP_DEL_VDI,
2392        .base_vdi_id = s->inode.vdi_id,
2393        .data_length = wlen,
2394        .flags = SD_FLAG_CMD_WRITE,
2395    };
2396    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2397    int fd, ret;
2398
2399    fd = connect_to_sdog(s, &local_err);
2400    if (fd < 0) {
2401        error_report_err(local_err);
2402        return false;
2403    }
2404
2405    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2406                 s->name, &wlen, &rlen);
2407    closesocket(fd);
2408    if (ret) {
2409        return false;
2410    }
2411    switch (rsp->result) {
2412    case SD_RES_NO_VDI:
2413        error_report("%s was already deleted", s->name);
2414        /* fall through */
2415    case SD_RES_SUCCESS:
2416        break;
2417    default:
2418        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2419        return false;
2420    }
2421
2422    return true;
2423}
2424
2425/*
2426 * Create a writable VDI from a snapshot
2427 */
2428static int sd_create_branch(BDRVSheepdogState *s)
2429{
2430    Error *local_err = NULL;
2431    int ret, fd;
2432    uint32_t vid;
2433    char *buf;
2434    bool deleted;
2435
2436    trace_sheepdog_create_branch_snapshot(s->inode.vdi_id);
2437
2438    buf = g_malloc(SD_INODE_SIZE);
2439
2440    /*
2441     * Even If deletion fails, we will just create extra snapshot based on
2442     * the working VDI which was supposed to be deleted. So no need to
2443     * false bail out.
2444     */
2445    deleted = sd_delete(s);
2446    ret = do_sd_create(s, &vid, !deleted, &local_err);
2447    if (ret) {
2448        error_report_err(local_err);
2449        goto out;
2450    }
2451
2452    trace_sheepdog_create_branch_created(vid);
2453
2454    fd = connect_to_sdog(s, &local_err);
2455    if (fd < 0) {
2456        error_report_err(local_err);
2457        ret = fd;
2458        goto out;
2459    }
2460
2461    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
2462                      s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2463
2464    closesocket(fd);
2465
2466    if (ret < 0) {
2467        goto out;
2468    }
2469
2470    memcpy(&s->inode, buf, sizeof(s->inode));
2471
2472    s->is_snapshot = false;
2473    ret = 0;
2474    trace_sheepdog_create_branch_new(s->inode.vdi_id);
2475
2476out:
2477    g_free(buf);
2478
2479    return ret;
2480}
2481
2482/*
2483 * Send I/O requests to the server.
2484 *
2485 * This function sends requests to the server, links the requests to
2486 * the inflight_list in BDRVSheepdogState, and exits without
2487 * waiting the response.  The responses are received in the
2488 * `aio_read_response' function which is called from the main loop as
2489 * a fd handler.
2490 *
2491 * Returns 1 when we need to wait a response, 0 when there is no sent
2492 * request and -errno in error cases.
2493 */
2494static void coroutine_fn sd_co_rw_vector(SheepdogAIOCB *acb)
2495{
2496    int ret = 0;
2497    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2498    unsigned long idx;
2499    uint32_t object_size;
2500    uint64_t oid;
2501    uint64_t offset;
2502    BDRVSheepdogState *s = acb->s;
2503    SheepdogInode *inode = &s->inode;
2504    AIOReq *aio_req;
2505
2506    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2507        /*
2508         * In the case we open the snapshot VDI, Sheepdog creates the
2509         * writable VDI when we do a write operation first.
2510         */
2511        ret = sd_create_branch(s);
2512        if (ret) {
2513            acb->ret = -EIO;
2514            return;
2515        }
2516    }
2517
2518    object_size = (UINT32_C(1) << inode->block_size_shift);
2519    idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2520    offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2521
2522    /*
2523     * Make sure we don't free the aiocb before we are done with all requests.
2524     * This additional reference is dropped at the end of this function.
2525     */
2526    acb->nr_pending++;
2527
2528    while (done != total) {
2529        uint8_t flags = 0;
2530        uint64_t old_oid = 0;
2531        bool create = false;
2532
2533        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2534
2535        len = MIN(total - done, object_size - offset);
2536
2537        switch (acb->aiocb_type) {
2538        case AIOCB_READ_UDATA:
2539            if (!inode->data_vdi_id[idx]) {
2540                qemu_iovec_memset(acb->qiov, done, 0, len);
2541                goto done;
2542            }
2543            break;
2544        case AIOCB_WRITE_UDATA:
2545            if (!inode->data_vdi_id[idx]) {
2546                create = true;
2547            } else if (!is_data_obj_writable(inode, idx)) {
2548                /* Copy-On-Write */
2549                create = true;
2550                old_oid = oid;
2551                flags = SD_FLAG_CMD_COW;
2552            }
2553            break;
2554        case AIOCB_DISCARD_OBJ:
2555            /*
2556             * We discard the object only when the whole object is
2557             * 1) allocated 2) trimmed. Otherwise, simply skip it.
2558             */
2559            if (len != object_size || inode->data_vdi_id[idx] == 0) {
2560                goto done;
2561            }
2562            break;
2563        default:
2564            break;
2565        }
2566
2567        if (create) {
2568            trace_sheepdog_co_rw_vector_update(inode->vdi_id, oid,
2569                                  vid_to_data_oid(inode->data_vdi_id[idx], idx),
2570                                  idx);
2571            oid = vid_to_data_oid(inode->vdi_id, idx);
2572            trace_sheepdog_co_rw_vector_new(oid);
2573        }
2574
2575        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2576                                old_oid,
2577                                acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2578                                0 : done);
2579        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2580                        acb->aiocb_type);
2581    done:
2582        offset = 0;
2583        idx++;
2584        done += len;
2585    }
2586    if (--acb->nr_pending) {
2587        qemu_coroutine_yield();
2588    }
2589}
2590
2591static void sd_aio_complete(SheepdogAIOCB *acb)
2592{
2593    BDRVSheepdogState *s;
2594    if (acb->aiocb_type == AIOCB_FLUSH_CACHE) {
2595        return;
2596    }
2597
2598    s = acb->s;
2599    qemu_co_mutex_lock(&s->queue_lock);
2600    QLIST_REMOVE(acb, aiocb_siblings);
2601    qemu_co_queue_restart_all(&s->overlapping_queue);
2602    qemu_co_mutex_unlock(&s->queue_lock);
2603}
2604
2605static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2606                                     int nb_sectors, QEMUIOVector *qiov,
2607                                     int flags)
2608{
2609    SheepdogAIOCB acb;
2610    int ret;
2611    int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2612    BDRVSheepdogState *s = bs->opaque;
2613
2614    assert(!flags);
2615    if (offset > s->inode.vdi_size) {
2616        ret = sd_co_truncate(bs, offset, false, PREALLOC_MODE_OFF, 0, NULL);
2617        if (ret < 0) {
2618            return ret;
2619        }
2620    }
2621
2622    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_WRITE_UDATA);
2623    sd_co_rw_vector(&acb);
2624    sd_write_done(&acb);
2625    sd_aio_complete(&acb);
2626
2627    return acb.ret;
2628}
2629
2630static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2631                       int nb_sectors, QEMUIOVector *qiov)
2632{
2633    SheepdogAIOCB acb;
2634    BDRVSheepdogState *s = bs->opaque;
2635
2636    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_READ_UDATA);
2637    sd_co_rw_vector(&acb);
2638    sd_aio_complete(&acb);
2639
2640    return acb.ret;
2641}
2642
2643static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2644{
2645    BDRVSheepdogState *s = bs->opaque;
2646    SheepdogAIOCB acb;
2647    AIOReq *aio_req;
2648
2649    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2650        return 0;
2651    }
2652
2653    sd_aio_setup(&acb, s, NULL, 0, 0, AIOCB_FLUSH_CACHE);
2654
2655    acb.nr_pending++;
2656    aio_req = alloc_aio_req(s, &acb, vid_to_vdi_oid(s->inode.vdi_id),
2657                            0, 0, 0, false, 0, 0);
2658    add_aio_request(s, aio_req, NULL, 0, acb.aiocb_type);
2659
2660    if (--acb.nr_pending) {
2661        qemu_coroutine_yield();
2662    }
2663
2664    sd_aio_complete(&acb);
2665    return acb.ret;
2666}
2667
2668static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2669{
2670    Error *local_err = NULL;
2671    BDRVSheepdogState *s = bs->opaque;
2672    int ret, fd;
2673    uint32_t new_vid;
2674    SheepdogInode *inode;
2675    unsigned int datalen;
2676
2677    trace_sheepdog_snapshot_create_info(sn_info->name, sn_info->id_str, s->name,
2678                                        sn_info->vm_state_size, s->is_snapshot);
2679
2680    if (s->is_snapshot) {
2681        error_report("You can't create a snapshot of a snapshot VDI, "
2682                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2683
2684        return -EINVAL;
2685    }
2686
2687    trace_sheepdog_snapshot_create(sn_info->name, sn_info->id_str);
2688
2689    s->inode.vm_state_size = sn_info->vm_state_size;
2690    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2691    /* It appears that inode.tag does not require a NUL terminator,
2692     * which means this use of strncpy is ok.
2693     */
2694    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2695    /* we don't need to update entire object */
2696    datalen = SD_INODE_HEADER_SIZE;
2697    inode = g_malloc(datalen);
2698
2699    /* refresh inode. */
2700    fd = connect_to_sdog(s, &local_err);
2701    if (fd < 0) {
2702        error_report_err(local_err);
2703        ret = fd;
2704        goto cleanup;
2705    }
2706
2707    ret = write_object(fd, s->bs, (char *)&s->inode,
2708                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2709                       datalen, 0, false, s->cache_flags);
2710    if (ret < 0) {
2711        error_report("failed to write snapshot's inode.");
2712        goto cleanup;
2713    }
2714
2715    ret = do_sd_create(s, &new_vid, 1, &local_err);
2716    if (ret < 0) {
2717        error_reportf_err(local_err,
2718                          "failed to create inode for snapshot: ");
2719        goto cleanup;
2720    }
2721
2722    ret = read_object(fd, s->bs, (char *)inode,
2723                      vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2724                      s->cache_flags);
2725
2726    if (ret < 0) {
2727        error_report("failed to read new inode info. %s", strerror(errno));
2728        goto cleanup;
2729    }
2730
2731    memcpy(&s->inode, inode, datalen);
2732    trace_sheepdog_snapshot_create_inode(s->inode.name, s->inode.snap_id,
2733                                         s->inode.vdi_id);
2734
2735cleanup:
2736    g_free(inode);
2737    closesocket(fd);
2738    return ret;
2739}
2740
2741/*
2742 * We implement rollback(loadvm) operation to the specified snapshot by
2743 * 1) switch to the snapshot
2744 * 2) rely on sd_create_branch to delete working VDI and
2745 * 3) create a new working VDI based on the specified snapshot
2746 */
2747static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2748{
2749    BDRVSheepdogState *s = bs->opaque;
2750    BDRVSheepdogState *old_s;
2751    char tag[SD_MAX_VDI_TAG_LEN];
2752    uint32_t snapid = 0;
2753    int ret;
2754
2755    if (!sd_parse_snapid_or_tag(snapshot_id, &snapid, tag)) {
2756        return -EINVAL;
2757    }
2758
2759    old_s = g_new(BDRVSheepdogState, 1);
2760
2761    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2762
2763    ret = reload_inode(s, snapid, tag);
2764    if (ret) {
2765        goto out;
2766    }
2767
2768    ret = sd_create_branch(s);
2769    if (ret) {
2770        goto out;
2771    }
2772
2773    g_free(old_s);
2774
2775    return 0;
2776out:
2777    /* recover bdrv_sd_state */
2778    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2779    g_free(old_s);
2780
2781    error_report("failed to open. recover old bdrv_sd_state.");
2782
2783    return ret;
2784}
2785
2786#define NR_BATCHED_DISCARD 128
2787
2788static int remove_objects(BDRVSheepdogState *s, Error **errp)
2789{
2790    int fd, i = 0, nr_objs = 0;
2791    int ret;
2792    SheepdogInode *inode = &s->inode;
2793
2794    fd = connect_to_sdog(s, errp);
2795    if (fd < 0) {
2796        return fd;
2797    }
2798
2799    nr_objs = count_data_objs(inode);
2800    while (i < nr_objs) {
2801        int start_idx, nr_filled_idx;
2802
2803        while (i < nr_objs && !inode->data_vdi_id[i]) {
2804            i++;
2805        }
2806        start_idx = i;
2807
2808        nr_filled_idx = 0;
2809        while (i < nr_objs && nr_filled_idx < NR_BATCHED_DISCARD) {
2810            if (inode->data_vdi_id[i]) {
2811                inode->data_vdi_id[i] = 0;
2812                nr_filled_idx++;
2813            }
2814
2815            i++;
2816        }
2817
2818        ret = write_object(fd, s->bs,
2819                           (char *)&inode->data_vdi_id[start_idx],
2820                           vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
2821                           (i - start_idx) * sizeof(uint32_t),
2822                           offsetof(struct SheepdogInode,
2823                                    data_vdi_id[start_idx]),
2824                           false, s->cache_flags);
2825        if (ret < 0) {
2826            error_setg(errp, "Failed to discard snapshot inode");
2827            goto out;
2828        }
2829    }
2830
2831    ret = 0;
2832out:
2833    closesocket(fd);
2834    return ret;
2835}
2836
2837static int sd_snapshot_delete(BlockDriverState *bs,
2838                              const char *snapshot_id,
2839                              const char *name,
2840                              Error **errp)
2841{
2842    /*
2843     * FIXME should delete the snapshot matching both @snapshot_id and
2844     * @name, but @name not used here
2845     */
2846    unsigned long snap_id = 0;
2847    char snap_tag[SD_MAX_VDI_TAG_LEN];
2848    int fd, ret;
2849    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
2850    BDRVSheepdogState *s = bs->opaque;
2851    unsigned int wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, rlen = 0;
2852    uint32_t vid;
2853    SheepdogVdiReq hdr = {
2854        .opcode = SD_OP_DEL_VDI,
2855        .data_length = wlen,
2856        .flags = SD_FLAG_CMD_WRITE,
2857    };
2858    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2859
2860    ret = remove_objects(s, errp);
2861    if (ret) {
2862        return ret;
2863    }
2864
2865    memset(buf, 0, sizeof(buf));
2866    memset(snap_tag, 0, sizeof(snap_tag));
2867    pstrcpy(buf, SD_MAX_VDI_LEN, s->name);
2868    /* TODO Use sd_parse_snapid() once this mess is cleaned up */
2869    ret = qemu_strtoul(snapshot_id, NULL, 10, &snap_id);
2870    if (ret || snap_id > UINT32_MAX) {
2871        /*
2872         * FIXME Since qemu_strtoul() returns -EINVAL when
2873         * @snapshot_id is null, @snapshot_id is mandatory.  Correct
2874         * would be to require at least one of @snapshot_id and @name.
2875         */
2876        error_setg(errp, "Invalid snapshot ID: %s",
2877                         snapshot_id ? snapshot_id : "<null>");
2878        return -EINVAL;
2879    }
2880
2881    if (snap_id) {
2882        hdr.snapid = (uint32_t) snap_id;
2883    } else {
2884        /* FIXME I suspect we should use @name here */
2885        /* FIXME don't truncate silently */
2886        pstrcpy(snap_tag, sizeof(snap_tag), snapshot_id);
2887        pstrcpy(buf + SD_MAX_VDI_LEN, SD_MAX_VDI_TAG_LEN, snap_tag);
2888    }
2889
2890    ret = find_vdi_name(s, s->name, snap_id, snap_tag, &vid, true, errp);
2891    if (ret) {
2892        return ret;
2893    }
2894
2895    fd = connect_to_sdog(s, errp);
2896    if (fd < 0) {
2897        return fd;
2898    }
2899
2900    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2901                 buf, &wlen, &rlen);
2902    closesocket(fd);
2903    if (ret) {
2904        error_setg_errno(errp, -ret, "Couldn't send request to server");
2905        return ret;
2906    }
2907
2908    switch (rsp->result) {
2909    case SD_RES_NO_VDI:
2910        error_setg(errp, "Can't find the snapshot");
2911        return -ENOENT;
2912    case SD_RES_SUCCESS:
2913        break;
2914    default:
2915        error_setg(errp, "%s", sd_strerror(rsp->result));
2916        return -EIO;
2917    }
2918
2919    return 0;
2920}
2921
2922static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2923{
2924    Error *local_err = NULL;
2925    BDRVSheepdogState *s = bs->opaque;
2926    SheepdogReq req;
2927    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2928    QEMUSnapshotInfo *sn_tab = NULL;
2929    unsigned wlen, rlen;
2930    int found = 0;
2931    SheepdogInode *inode;
2932    unsigned long *vdi_inuse;
2933    unsigned int start_nr;
2934    uint64_t hval;
2935    uint32_t vid;
2936
2937    vdi_inuse = g_malloc(max);
2938    inode = g_malloc(SD_INODE_HEADER_SIZE);
2939
2940    fd = connect_to_sdog(s, &local_err);
2941    if (fd < 0) {
2942        error_report_err(local_err);
2943        ret = fd;
2944        goto out;
2945    }
2946
2947    rlen = max;
2948    wlen = 0;
2949
2950    memset(&req, 0, sizeof(req));
2951
2952    req.opcode = SD_OP_READ_VDIS;
2953    req.data_length = max;
2954
2955    ret = do_req(fd, s->bs, &req, vdi_inuse, &wlen, &rlen);
2956
2957    closesocket(fd);
2958    if (ret) {
2959        goto out;
2960    }
2961
2962    sn_tab = g_new0(QEMUSnapshotInfo, nr);
2963
2964    /* calculate a vdi id with hash function */
2965    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2966    start_nr = hval & (SD_NR_VDIS - 1);
2967
2968    fd = connect_to_sdog(s, &local_err);
2969    if (fd < 0) {
2970        error_report_err(local_err);
2971        ret = fd;
2972        goto out;
2973    }
2974
2975    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2976        if (!test_bit(vid, vdi_inuse)) {
2977            break;
2978        }
2979
2980        /* we don't need to read entire object */
2981        ret = read_object(fd, s->bs, (char *)inode,
2982                          vid_to_vdi_oid(vid),
2983                          0, SD_INODE_HEADER_SIZE, 0,
2984                          s->cache_flags);
2985
2986        if (ret) {
2987            continue;
2988        }
2989
2990        if (!strcmp(inode->name, s->name) && is_snapshot(inode)) {
2991            sn_tab[found].date_sec = inode->snap_ctime >> 32;
2992            sn_tab[found].date_nsec = inode->snap_ctime & 0xffffffff;
2993            sn_tab[found].vm_state_size = inode->vm_state_size;
2994            sn_tab[found].vm_clock_nsec = inode->vm_clock_nsec;
2995
2996            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2997                     "%" PRIu32, inode->snap_id);
2998            pstrcpy(sn_tab[found].name,
2999                    MIN(sizeof(sn_tab[found].name), sizeof(inode->tag)),
3000                    inode->tag);
3001            found++;
3002        }
3003    }
3004
3005    closesocket(fd);
3006out:
3007    *psn_tab = sn_tab;
3008
3009    g_free(vdi_inuse);
3010    g_free(inode);
3011
3012    if (ret < 0) {
3013        return ret;
3014    }
3015
3016    return found;
3017}
3018
3019static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
3020                                int64_t pos, int size, int load)
3021{
3022    Error *local_err = NULL;
3023    bool create;
3024    int fd, ret = 0, remaining = size;
3025    unsigned int data_len;
3026    uint64_t vmstate_oid;
3027    uint64_t offset;
3028    uint32_t vdi_index;
3029    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
3030    uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
3031
3032    fd = connect_to_sdog(s, &local_err);
3033    if (fd < 0) {
3034        error_report_err(local_err);
3035        return fd;
3036    }
3037
3038    while (remaining) {
3039        vdi_index = pos / object_size;
3040        offset = pos % object_size;
3041
3042        data_len = MIN(remaining, object_size - offset);
3043
3044        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
3045
3046        create = (offset == 0);
3047        if (load) {
3048            ret = read_object(fd, s->bs, (char *)data, vmstate_oid,
3049                              s->inode.nr_copies, data_len, offset,
3050                              s->cache_flags);
3051        } else {
3052            ret = write_object(fd, s->bs, (char *)data, vmstate_oid,
3053                               s->inode.nr_copies, data_len, offset, create,
3054                               s->cache_flags);
3055        }
3056
3057        if (ret < 0) {
3058            error_report("failed to save vmstate %s", strerror(errno));
3059            goto cleanup;
3060        }
3061
3062        pos += data_len;
3063        data += data_len;
3064        remaining -= data_len;
3065    }
3066    ret = size;
3067cleanup:
3068    closesocket(fd);
3069    return ret;
3070}
3071
3072static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3073                           int64_t pos)
3074{
3075    BDRVSheepdogState *s = bs->opaque;
3076    void *buf;
3077    int ret;
3078
3079    buf = qemu_blockalign(bs, qiov->size);
3080    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
3081    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
3082    qemu_vfree(buf);
3083
3084    return ret;
3085}
3086
3087static int sd_load_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3088                           int64_t pos)
3089{
3090    BDRVSheepdogState *s = bs->opaque;
3091    void *buf;
3092    int ret;
3093
3094    buf = qemu_blockalign(bs, qiov->size);
3095    ret = do_load_save_vmstate(s, buf, pos, qiov->size, 1);
3096    qemu_iovec_from_buf(qiov, 0, buf, qiov->size);
3097    qemu_vfree(buf);
3098
3099    return ret;
3100}
3101
3102
3103static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
3104                                      int bytes)
3105{
3106    SheepdogAIOCB acb;
3107    BDRVSheepdogState *s = bs->opaque;
3108    QEMUIOVector discard_iov;
3109    struct iovec iov;
3110    uint32_t zero = 0;
3111
3112    if (!s->discard_supported) {
3113        return 0;
3114    }
3115
3116    memset(&discard_iov, 0, sizeof(discard_iov));
3117    memset(&iov, 0, sizeof(iov));
3118    iov.iov_base = &zero;
3119    iov.iov_len = sizeof(zero);
3120    discard_iov.iov = &iov;
3121    discard_iov.niov = 1;
3122    if (!QEMU_IS_ALIGNED(offset | bytes, BDRV_SECTOR_SIZE)) {
3123        return -ENOTSUP;
3124    }
3125    sd_aio_setup(&acb, s, &discard_iov, offset >> BDRV_SECTOR_BITS,
3126                 bytes >> BDRV_SECTOR_BITS, AIOCB_DISCARD_OBJ);
3127    sd_co_rw_vector(&acb);
3128    sd_aio_complete(&acb);
3129
3130    return acb.ret;
3131}
3132
3133static coroutine_fn int
3134sd_co_block_status(BlockDriverState *bs, bool want_zero, int64_t offset,
3135                   int64_t bytes, int64_t *pnum, int64_t *map,
3136                   BlockDriverState **file)
3137{
3138    BDRVSheepdogState *s = bs->opaque;
3139    SheepdogInode *inode = &s->inode;
3140    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3141    unsigned long start = offset / object_size,
3142                  end = DIV_ROUND_UP(offset + bytes, object_size);
3143    unsigned long idx;
3144    *map = offset;
3145    int ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
3146
3147    for (idx = start; idx < end; idx++) {
3148        if (inode->data_vdi_id[idx] == 0) {
3149            break;
3150        }
3151    }
3152    if (idx == start) {
3153        /* Get the longest length of unallocated sectors */
3154        ret = 0;
3155        for (idx = start + 1; idx < end; idx++) {
3156            if (inode->data_vdi_id[idx] != 0) {
3157                break;
3158            }
3159        }
3160    }
3161
3162    *pnum = (idx - start) * object_size;
3163    if (*pnum > bytes) {
3164        *pnum = bytes;
3165    }
3166    if (ret > 0 && ret & BDRV_BLOCK_OFFSET_VALID) {
3167        *file = bs;
3168    }
3169    return ret;
3170}
3171
3172static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
3173{
3174    BDRVSheepdogState *s = bs->opaque;
3175    SheepdogInode *inode = &s->inode;
3176    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3177    unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
3178    uint64_t size = 0;
3179
3180    for (i = 0; i < last; i++) {
3181        if (inode->data_vdi_id[i] == 0) {
3182            continue;
3183        }
3184        size += object_size;
3185    }
3186    return size;
3187}
3188
3189static QemuOptsList sd_create_opts = {
3190    .name = "sheepdog-create-opts",
3191    .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
3192    .desc = {
3193        {
3194            .name = BLOCK_OPT_SIZE,
3195            .type = QEMU_OPT_SIZE,
3196            .help = "Virtual disk size"
3197        },
3198        {
3199            .name = BLOCK_OPT_BACKING_FILE,
3200            .type = QEMU_OPT_STRING,
3201            .help = "File name of a base image"
3202        },
3203        {
3204            .name = BLOCK_OPT_BACKING_FMT,
3205            .type = QEMU_OPT_STRING,
3206            .help = "Must be 'sheepdog' if present",
3207        },
3208        {
3209            .name = BLOCK_OPT_PREALLOC,
3210            .type = QEMU_OPT_STRING,
3211            .help = "Preallocation mode (allowed values: off, full)"
3212        },
3213        {
3214            .name = BLOCK_OPT_REDUNDANCY,
3215            .type = QEMU_OPT_STRING,
3216            .help = "Redundancy of the image"
3217        },
3218        {
3219            .name = BLOCK_OPT_OBJECT_SIZE,
3220            .type = QEMU_OPT_SIZE,
3221            .help = "Object size of the image"
3222        },
3223        { /* end of list */ }
3224    }
3225};
3226
3227static const char *const sd_strong_runtime_opts[] = {
3228    "vdi",
3229    "snap-id",
3230    "tag",
3231    "server.",
3232
3233    NULL
3234};
3235
3236static BlockDriver bdrv_sheepdog = {
3237    .format_name                  = "sheepdog",
3238    .protocol_name                = "sheepdog",
3239    .instance_size                = sizeof(BDRVSheepdogState),
3240    .bdrv_parse_filename          = sd_parse_filename,
3241    .bdrv_file_open               = sd_open,
3242    .bdrv_reopen_prepare          = sd_reopen_prepare,
3243    .bdrv_reopen_commit           = sd_reopen_commit,
3244    .bdrv_reopen_abort            = sd_reopen_abort,
3245    .bdrv_close                   = sd_close,
3246    .bdrv_co_create               = sd_co_create,
3247    .bdrv_co_create_opts          = sd_co_create_opts,
3248    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3249    .bdrv_getlength               = sd_getlength,
3250    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3251    .bdrv_co_truncate             = sd_co_truncate,
3252
3253    .bdrv_co_readv                = sd_co_readv,
3254    .bdrv_co_writev               = sd_co_writev,
3255    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3256    .bdrv_co_pdiscard             = sd_co_pdiscard,
3257    .bdrv_co_block_status         = sd_co_block_status,
3258
3259    .bdrv_snapshot_create         = sd_snapshot_create,
3260    .bdrv_snapshot_goto           = sd_snapshot_goto,
3261    .bdrv_snapshot_delete         = sd_snapshot_delete,
3262    .bdrv_snapshot_list           = sd_snapshot_list,
3263
3264    .bdrv_save_vmstate            = sd_save_vmstate,
3265    .bdrv_load_vmstate            = sd_load_vmstate,
3266
3267    .bdrv_detach_aio_context      = sd_detach_aio_context,
3268    .bdrv_attach_aio_context      = sd_attach_aio_context,
3269
3270    .create_opts                  = &sd_create_opts,
3271    .strong_runtime_opts          = sd_strong_runtime_opts,
3272};
3273
3274static BlockDriver bdrv_sheepdog_tcp = {
3275    .format_name                  = "sheepdog",
3276    .protocol_name                = "sheepdog+tcp",
3277    .instance_size                = sizeof(BDRVSheepdogState),
3278    .bdrv_parse_filename          = sd_parse_filename,
3279    .bdrv_file_open               = sd_open,
3280    .bdrv_reopen_prepare          = sd_reopen_prepare,
3281    .bdrv_reopen_commit           = sd_reopen_commit,
3282    .bdrv_reopen_abort            = sd_reopen_abort,
3283    .bdrv_close                   = sd_close,
3284    .bdrv_co_create               = sd_co_create,
3285    .bdrv_co_create_opts          = sd_co_create_opts,
3286    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3287    .bdrv_getlength               = sd_getlength,
3288    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3289    .bdrv_co_truncate             = sd_co_truncate,
3290
3291    .bdrv_co_readv                = sd_co_readv,
3292    .bdrv_co_writev               = sd_co_writev,
3293    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3294    .bdrv_co_pdiscard             = sd_co_pdiscard,
3295    .bdrv_co_block_status         = sd_co_block_status,
3296
3297    .bdrv_snapshot_create         = sd_snapshot_create,
3298    .bdrv_snapshot_goto           = sd_snapshot_goto,
3299    .bdrv_snapshot_delete         = sd_snapshot_delete,
3300    .bdrv_snapshot_list           = sd_snapshot_list,
3301
3302    .bdrv_save_vmstate            = sd_save_vmstate,
3303    .bdrv_load_vmstate            = sd_load_vmstate,
3304
3305    .bdrv_detach_aio_context      = sd_detach_aio_context,
3306    .bdrv_attach_aio_context      = sd_attach_aio_context,
3307
3308    .create_opts                  = &sd_create_opts,
3309    .strong_runtime_opts          = sd_strong_runtime_opts,
3310};
3311
3312static BlockDriver bdrv_sheepdog_unix = {
3313    .format_name                  = "sheepdog",
3314    .protocol_name                = "sheepdog+unix",
3315    .instance_size                = sizeof(BDRVSheepdogState),
3316    .bdrv_parse_filename          = sd_parse_filename,
3317    .bdrv_file_open               = sd_open,
3318    .bdrv_reopen_prepare          = sd_reopen_prepare,
3319    .bdrv_reopen_commit           = sd_reopen_commit,
3320    .bdrv_reopen_abort            = sd_reopen_abort,
3321    .bdrv_close                   = sd_close,
3322    .bdrv_co_create               = sd_co_create,
3323    .bdrv_co_create_opts          = sd_co_create_opts,
3324    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3325    .bdrv_getlength               = sd_getlength,
3326    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3327    .bdrv_co_truncate             = sd_co_truncate,
3328
3329    .bdrv_co_readv                = sd_co_readv,
3330    .bdrv_co_writev               = sd_co_writev,
3331    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3332    .bdrv_co_pdiscard             = sd_co_pdiscard,
3333    .bdrv_co_block_status         = sd_co_block_status,
3334
3335    .bdrv_snapshot_create         = sd_snapshot_create,
3336    .bdrv_snapshot_goto           = sd_snapshot_goto,
3337    .bdrv_snapshot_delete         = sd_snapshot_delete,
3338    .bdrv_snapshot_list           = sd_snapshot_list,
3339
3340    .bdrv_save_vmstate            = sd_save_vmstate,
3341    .bdrv_load_vmstate            = sd_load_vmstate,
3342
3343    .bdrv_detach_aio_context      = sd_detach_aio_context,
3344    .bdrv_attach_aio_context      = sd_attach_aio_context,
3345
3346    .create_opts                  = &sd_create_opts,
3347    .strong_runtime_opts          = sd_strong_runtime_opts,
3348};
3349
3350static void bdrv_sheepdog_init(void)
3351{
3352    bdrv_register(&bdrv_sheepdog);
3353    bdrv_register(&bdrv_sheepdog_tcp);
3354    bdrv_register(&bdrv_sheepdog_unix);
3355}
3356block_init(bdrv_sheepdog_init);
3357