qemu/block/sheepdog.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public License version
   6 * 2 as published by the Free Software Foundation.
   7 *
   8 * You should have received a copy of the GNU General Public License
   9 * along with this program. If not, see <http://www.gnu.org/licenses/>.
  10 *
  11 * Contributions after 2012-01-13 are licensed under the terms of the
  12 * GNU GPL, version 2 or (at your option) any later version.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qemu-common.h"
  17#include "qapi/error.h"
  18#include "qapi/qapi-visit-sockets.h"
  19#include "qapi/qapi-visit-block-core.h"
  20#include "qapi/qmp/qdict.h"
  21#include "qapi/qobject-input-visitor.h"
  22#include "qapi/qobject-output-visitor.h"
  23#include "qemu/uri.h"
  24#include "qemu/error-report.h"
  25#include "qemu/module.h"
  26#include "qemu/option.h"
  27#include "qemu/sockets.h"
  28#include "block/block_int.h"
  29#include "block/qdict.h"
  30#include "sysemu/block-backend.h"
  31#include "qemu/bitops.h"
  32#include "qemu/cutils.h"
  33#include "trace.h"
  34
  35#define SD_PROTO_VER 0x01
  36
  37#define SD_DEFAULT_ADDR "localhost"
  38#define SD_DEFAULT_PORT 7000
  39
  40#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
  41#define SD_OP_READ_OBJ       0x02
  42#define SD_OP_WRITE_OBJ      0x03
  43/* 0x04 is used internally by Sheepdog */
  44
  45#define SD_OP_NEW_VDI        0x11
  46#define SD_OP_LOCK_VDI       0x12
  47#define SD_OP_RELEASE_VDI    0x13
  48#define SD_OP_GET_VDI_INFO   0x14
  49#define SD_OP_READ_VDIS      0x15
  50#define SD_OP_FLUSH_VDI      0x16
  51#define SD_OP_DEL_VDI        0x17
  52#define SD_OP_GET_CLUSTER_DEFAULT   0x18
  53
  54#define SD_FLAG_CMD_WRITE    0x01
  55#define SD_FLAG_CMD_COW      0x02
  56#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
  57#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
  58
  59#define SD_RES_SUCCESS       0x00 /* Success */
  60#define SD_RES_UNKNOWN       0x01 /* Unknown error */
  61#define SD_RES_NO_OBJ        0x02 /* No object found */
  62#define SD_RES_EIO           0x03 /* I/O error */
  63#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
  64#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
  65#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
  66#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
  67#define SD_RES_NO_VDI        0x08 /* No vdi found */
  68#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
  69#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
  70#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
  71#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
  72#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
  73#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
  74#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
  75#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
  76#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
  77#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
  78#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
  79#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
  80#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
  81#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
  82#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
  83#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
  84#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
  85#define SD_RES_READONLY      0x1A /* Object is read-only */
  86
  87/*
  88 * Object ID rules
  89 *
  90 *  0 - 19 (20 bits): data object space
  91 * 20 - 31 (12 bits): reserved data object space
  92 * 32 - 55 (24 bits): vdi object space
  93 * 56 - 59 ( 4 bits): reserved vdi object space
  94 * 60 - 63 ( 4 bits): object type identifier space
  95 */
  96
  97#define VDI_SPACE_SHIFT   32
  98#define VDI_BIT (UINT64_C(1) << 63)
  99#define VMSTATE_BIT (UINT64_C(1) << 62)
 100#define MAX_DATA_OBJS (UINT64_C(1) << 20)
 101#define MAX_CHILDREN 1024
 102#define SD_MAX_VDI_LEN 256
 103#define SD_MAX_VDI_TAG_LEN 256
 104#define SD_NR_VDIS   (1U << 24)
 105#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
 106#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
 107#define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
 108/*
 109 * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
 110 * (SD_EC_MAX_STRIP - 1) for parity strips
 111 *
 112 * SD_MAX_COPIES is sum of number of data strips and parity strips.
 113 */
 114#define SD_EC_MAX_STRIP 16
 115#define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
 116
 117#define SD_INODE_SIZE (sizeof(SheepdogInode))
 118#define CURRENT_VDI_ID 0
 119
 120#define LOCK_TYPE_NORMAL 0
 121#define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
 122
 123typedef struct SheepdogReq {
 124    uint8_t proto_ver;
 125    uint8_t opcode;
 126    uint16_t flags;
 127    uint32_t epoch;
 128    uint32_t id;
 129    uint32_t data_length;
 130    uint32_t opcode_specific[8];
 131} SheepdogReq;
 132
 133typedef struct SheepdogRsp {
 134    uint8_t proto_ver;
 135    uint8_t opcode;
 136    uint16_t flags;
 137    uint32_t epoch;
 138    uint32_t id;
 139    uint32_t data_length;
 140    uint32_t result;
 141    uint32_t opcode_specific[7];
 142} SheepdogRsp;
 143
 144typedef struct SheepdogObjReq {
 145    uint8_t proto_ver;
 146    uint8_t opcode;
 147    uint16_t flags;
 148    uint32_t epoch;
 149    uint32_t id;
 150    uint32_t data_length;
 151    uint64_t oid;
 152    uint64_t cow_oid;
 153    uint8_t copies;
 154    uint8_t copy_policy;
 155    uint8_t reserved[6];
 156    uint64_t offset;
 157} SheepdogObjReq;
 158
 159typedef struct SheepdogObjRsp {
 160    uint8_t proto_ver;
 161    uint8_t opcode;
 162    uint16_t flags;
 163    uint32_t epoch;
 164    uint32_t id;
 165    uint32_t data_length;
 166    uint32_t result;
 167    uint8_t copies;
 168    uint8_t copy_policy;
 169    uint8_t reserved[2];
 170    uint32_t pad[6];
 171} SheepdogObjRsp;
 172
 173typedef struct SheepdogVdiReq {
 174    uint8_t proto_ver;
 175    uint8_t opcode;
 176    uint16_t flags;
 177    uint32_t epoch;
 178    uint32_t id;
 179    uint32_t data_length;
 180    uint64_t vdi_size;
 181    uint32_t base_vdi_id;
 182    uint8_t copies;
 183    uint8_t copy_policy;
 184    uint8_t store_policy;
 185    uint8_t block_size_shift;
 186    uint32_t snapid;
 187    uint32_t type;
 188    uint32_t pad[2];
 189} SheepdogVdiReq;
 190
 191typedef struct SheepdogVdiRsp {
 192    uint8_t proto_ver;
 193    uint8_t opcode;
 194    uint16_t flags;
 195    uint32_t epoch;
 196    uint32_t id;
 197    uint32_t data_length;
 198    uint32_t result;
 199    uint32_t rsvd;
 200    uint32_t vdi_id;
 201    uint32_t pad[5];
 202} SheepdogVdiRsp;
 203
 204typedef struct SheepdogClusterRsp {
 205    uint8_t proto_ver;
 206    uint8_t opcode;
 207    uint16_t flags;
 208    uint32_t epoch;
 209    uint32_t id;
 210    uint32_t data_length;
 211    uint32_t result;
 212    uint8_t nr_copies;
 213    uint8_t copy_policy;
 214    uint8_t block_size_shift;
 215    uint8_t __pad1;
 216    uint32_t __pad2[6];
 217} SheepdogClusterRsp;
 218
 219typedef struct SheepdogInode {
 220    char name[SD_MAX_VDI_LEN];
 221    char tag[SD_MAX_VDI_TAG_LEN];
 222    uint64_t ctime;
 223    uint64_t snap_ctime;
 224    uint64_t vm_clock_nsec;
 225    uint64_t vdi_size;
 226    uint64_t vm_state_size;
 227    uint16_t copy_policy;
 228    uint8_t nr_copies;
 229    uint8_t block_size_shift;
 230    uint32_t snap_id;
 231    uint32_t vdi_id;
 232    uint32_t parent_vdi_id;
 233    uint32_t child_vdi_id[MAX_CHILDREN];
 234    uint32_t data_vdi_id[MAX_DATA_OBJS];
 235} SheepdogInode;
 236
 237#define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
 238
 239/*
 240 * 64 bit FNV-1a non-zero initial basis
 241 */
 242#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
 243
 244/*
 245 * 64 bit Fowler/Noll/Vo FNV-1a hash code
 246 */
 247static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 248{
 249    unsigned char *bp = buf;
 250    unsigned char *be = bp + len;
 251    while (bp < be) {
 252        hval ^= (uint64_t) *bp++;
 253        hval += (hval << 1) + (hval << 4) + (hval << 5) +
 254            (hval << 7) + (hval << 8) + (hval << 40);
 255    }
 256    return hval;
 257}
 258
 259static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
 260{
 261    return inode->vdi_id == inode->data_vdi_id[idx];
 262}
 263
 264static inline bool is_data_obj(uint64_t oid)
 265{
 266    return !(VDI_BIT & oid);
 267}
 268
 269static inline uint64_t data_oid_to_idx(uint64_t oid)
 270{
 271    return oid & (MAX_DATA_OBJS - 1);
 272}
 273
 274static inline uint32_t oid_to_vid(uint64_t oid)
 275{
 276    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
 277}
 278
 279static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 280{
 281    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
 282}
 283
 284static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
 285{
 286    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 287}
 288
 289static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
 290{
 291    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 292}
 293
 294static inline bool is_snapshot(struct SheepdogInode *inode)
 295{
 296    return !!inode->snap_ctime;
 297}
 298
 299static inline size_t count_data_objs(const struct SheepdogInode *inode)
 300{
 301    return DIV_ROUND_UP(inode->vdi_size,
 302                        (1UL << inode->block_size_shift));
 303}
 304
 305typedef struct SheepdogAIOCB SheepdogAIOCB;
 306typedef struct BDRVSheepdogState BDRVSheepdogState;
 307
 308typedef struct AIOReq {
 309    SheepdogAIOCB *aiocb;
 310    unsigned int iov_offset;
 311
 312    uint64_t oid;
 313    uint64_t base_oid;
 314    uint64_t offset;
 315    unsigned int data_len;
 316    uint8_t flags;
 317    uint32_t id;
 318    bool create;
 319
 320    QLIST_ENTRY(AIOReq) aio_siblings;
 321} AIOReq;
 322
 323enum AIOCBState {
 324    AIOCB_WRITE_UDATA,
 325    AIOCB_READ_UDATA,
 326    AIOCB_FLUSH_CACHE,
 327    AIOCB_DISCARD_OBJ,
 328};
 329
 330#define AIOCBOverlapping(x, y)                                 \
 331    (!(x->max_affect_data_idx < y->min_affect_data_idx          \
 332       || y->max_affect_data_idx < x->min_affect_data_idx))
 333
 334struct SheepdogAIOCB {
 335    BDRVSheepdogState *s;
 336
 337    QEMUIOVector *qiov;
 338
 339    int64_t sector_num;
 340    int nb_sectors;
 341
 342    int ret;
 343    enum AIOCBState aiocb_type;
 344
 345    Coroutine *coroutine;
 346    int nr_pending;
 347
 348    uint32_t min_affect_data_idx;
 349    uint32_t max_affect_data_idx;
 350
 351    /*
 352     * The difference between affect_data_idx and dirty_data_idx:
 353     * affect_data_idx represents range of index of all request types.
 354     * dirty_data_idx represents range of index updated by COW requests.
 355     * dirty_data_idx is used for updating an inode object.
 356     */
 357    uint32_t min_dirty_data_idx;
 358    uint32_t max_dirty_data_idx;
 359
 360    QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
 361};
 362
 363struct BDRVSheepdogState {
 364    BlockDriverState *bs;
 365    AioContext *aio_context;
 366
 367    SheepdogInode inode;
 368
 369    char name[SD_MAX_VDI_LEN];
 370    bool is_snapshot;
 371    uint32_t cache_flags;
 372    bool discard_supported;
 373
 374    SocketAddress *addr;
 375    int fd;
 376
 377    CoMutex lock;
 378    Coroutine *co_send;
 379    Coroutine *co_recv;
 380
 381    uint32_t aioreq_seq_num;
 382
 383    /* Every aio request must be linked to either of these queues. */
 384    QLIST_HEAD(, AIOReq) inflight_aio_head;
 385    QLIST_HEAD(, AIOReq) failed_aio_head;
 386
 387    CoMutex queue_lock;
 388    CoQueue overlapping_queue;
 389    QLIST_HEAD(, SheepdogAIOCB) inflight_aiocb_head;
 390};
 391
 392typedef struct BDRVSheepdogReopenState {
 393    int fd;
 394    int cache_flags;
 395} BDRVSheepdogReopenState;
 396
 397static const char *sd_strerror(int err)
 398{
 399    int i;
 400
 401    static const struct {
 402        int err;
 403        const char *desc;
 404    } errors[] = {
 405        {SD_RES_SUCCESS, "Success"},
 406        {SD_RES_UNKNOWN, "Unknown error"},
 407        {SD_RES_NO_OBJ, "No object found"},
 408        {SD_RES_EIO, "I/O error"},
 409        {SD_RES_VDI_EXIST, "VDI exists already"},
 410        {SD_RES_INVALID_PARMS, "Invalid parameters"},
 411        {SD_RES_SYSTEM_ERROR, "System error"},
 412        {SD_RES_VDI_LOCKED, "VDI is already locked"},
 413        {SD_RES_NO_VDI, "No vdi found"},
 414        {SD_RES_NO_BASE_VDI, "No base VDI found"},
 415        {SD_RES_VDI_READ, "Failed read the requested VDI"},
 416        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
 417        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
 418        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
 419        {SD_RES_NO_TAG, "Failed to find the requested tag"},
 420        {SD_RES_STARTUP, "The system is still booting"},
 421        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
 422        {SD_RES_SHUTDOWN, "The system is shutting down"},
 423        {SD_RES_NO_MEM, "Out of memory on the server"},
 424        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
 425        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
 426        {SD_RES_NO_SPACE, "Server has no space for new objects"},
 427        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
 428        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
 429        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
 430        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
 431        {SD_RES_READONLY, "Object is read-only"},
 432    };
 433
 434    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
 435        if (errors[i].err == err) {
 436            return errors[i].desc;
 437        }
 438    }
 439
 440    return "Invalid error code";
 441}
 442
 443/*
 444 * Sheepdog I/O handling:
 445 *
 446 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
 447 *    link the requests to the inflight_list in the
 448 *    BDRVSheepdogState.  The function yields while waiting for
 449 *    receiving the response.
 450 *
 451 * 2. We receive the response in aio_read_response, the fd handler to
 452 *    the sheepdog connection.  We switch back to sd_co_readv/sd_writev
 453 *    after all the requests belonging to the AIOCB are finished.  If
 454 *    needed, sd_co_writev will send another requests for the vdi object.
 455 */
 456
 457static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
 458                                    uint64_t oid, unsigned int data_len,
 459                                    uint64_t offset, uint8_t flags, bool create,
 460                                    uint64_t base_oid, unsigned int iov_offset)
 461{
 462    AIOReq *aio_req;
 463
 464    aio_req = g_malloc(sizeof(*aio_req));
 465    aio_req->aiocb = acb;
 466    aio_req->iov_offset = iov_offset;
 467    aio_req->oid = oid;
 468    aio_req->base_oid = base_oid;
 469    aio_req->offset = offset;
 470    aio_req->data_len = data_len;
 471    aio_req->flags = flags;
 472    aio_req->id = s->aioreq_seq_num++;
 473    aio_req->create = create;
 474
 475    acb->nr_pending++;
 476    return aio_req;
 477}
 478
 479static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
 480{
 481    SheepdogAIOCB *cb;
 482
 483retry:
 484    QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
 485        if (AIOCBOverlapping(acb, cb)) {
 486            qemu_co_queue_wait(&s->overlapping_queue, &s->queue_lock);
 487            goto retry;
 488        }
 489    }
 490}
 491
 492static void sd_aio_setup(SheepdogAIOCB *acb, BDRVSheepdogState *s,
 493                         QEMUIOVector *qiov, int64_t sector_num, int nb_sectors,
 494                         int type)
 495{
 496    uint32_t object_size;
 497
 498    object_size = (UINT32_C(1) << s->inode.block_size_shift);
 499
 500    acb->s = s;
 501
 502    acb->qiov = qiov;
 503
 504    acb->sector_num = sector_num;
 505    acb->nb_sectors = nb_sectors;
 506
 507    acb->coroutine = qemu_coroutine_self();
 508    acb->ret = 0;
 509    acb->nr_pending = 0;
 510
 511    acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
 512    acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
 513                              acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
 514
 515    acb->min_dirty_data_idx = UINT32_MAX;
 516    acb->max_dirty_data_idx = 0;
 517    acb->aiocb_type = type;
 518
 519    if (type == AIOCB_FLUSH_CACHE) {
 520        return;
 521    }
 522
 523    qemu_co_mutex_lock(&s->queue_lock);
 524    wait_for_overlapping_aiocb(s, acb);
 525    QLIST_INSERT_HEAD(&s->inflight_aiocb_head, acb, aiocb_siblings);
 526    qemu_co_mutex_unlock(&s->queue_lock);
 527}
 528
 529static SocketAddress *sd_server_config(QDict *options, Error **errp)
 530{
 531    QDict *server = NULL;
 532    Visitor *iv = NULL;
 533    SocketAddress *saddr = NULL;
 534    Error *local_err = NULL;
 535
 536    qdict_extract_subqdict(options, &server, "server.");
 537
 538    iv = qobject_input_visitor_new_flat_confused(server, errp);
 539    if (!iv) {
 540        goto done;
 541    }
 542
 543    visit_type_SocketAddress(iv, NULL, &saddr, &local_err);
 544    if (local_err) {
 545        error_propagate(errp, local_err);
 546        goto done;
 547    }
 548
 549done:
 550    visit_free(iv);
 551    qobject_unref(server);
 552    return saddr;
 553}
 554
 555/* Return -EIO in case of error, file descriptor on success */
 556static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
 557{
 558    int fd;
 559
 560    fd = socket_connect(s->addr, errp);
 561
 562    if (s->addr->type == SOCKET_ADDRESS_TYPE_INET && fd >= 0) {
 563        int ret = socket_set_nodelay(fd);
 564        if (ret < 0) {
 565            warn_report("can't set TCP_NODELAY: %s", strerror(errno));
 566        }
 567    }
 568
 569    if (fd >= 0) {
 570        qemu_set_nonblock(fd);
 571    } else {
 572        fd = -EIO;
 573    }
 574
 575    return fd;
 576}
 577
 578/* Return 0 on success and -errno in case of error */
 579static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
 580                                    unsigned int *wlen)
 581{
 582    int ret;
 583
 584    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
 585    if (ret != sizeof(*hdr)) {
 586        error_report("failed to send a req, %s", strerror(errno));
 587        return -errno;
 588    }
 589
 590    ret = qemu_co_send(sockfd, data, *wlen);
 591    if (ret != *wlen) {
 592        error_report("failed to send a req, %s", strerror(errno));
 593        return -errno;
 594    }
 595
 596    return ret;
 597}
 598
 599typedef struct SheepdogReqCo {
 600    int sockfd;
 601    BlockDriverState *bs;
 602    AioContext *aio_context;
 603    SheepdogReq *hdr;
 604    void *data;
 605    unsigned int *wlen;
 606    unsigned int *rlen;
 607    int ret;
 608    bool finished;
 609    Coroutine *co;
 610} SheepdogReqCo;
 611
 612static void restart_co_req(void *opaque)
 613{
 614    SheepdogReqCo *srco = opaque;
 615
 616    aio_co_wake(srco->co);
 617}
 618
 619static coroutine_fn void do_co_req(void *opaque)
 620{
 621    int ret;
 622    SheepdogReqCo *srco = opaque;
 623    int sockfd = srco->sockfd;
 624    SheepdogReq *hdr = srco->hdr;
 625    void *data = srco->data;
 626    unsigned int *wlen = srco->wlen;
 627    unsigned int *rlen = srco->rlen;
 628
 629    srco->co = qemu_coroutine_self();
 630    aio_set_fd_handler(srco->aio_context, sockfd, false,
 631                       NULL, restart_co_req, NULL, srco);
 632
 633    ret = send_co_req(sockfd, hdr, data, wlen);
 634    if (ret < 0) {
 635        goto out;
 636    }
 637
 638    aio_set_fd_handler(srco->aio_context, sockfd, false,
 639                       restart_co_req, NULL, NULL, srco);
 640
 641    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
 642    if (ret != sizeof(*hdr)) {
 643        error_report("failed to get a rsp, %s", strerror(errno));
 644        ret = -errno;
 645        goto out;
 646    }
 647
 648    if (*rlen > hdr->data_length) {
 649        *rlen = hdr->data_length;
 650    }
 651
 652    if (*rlen) {
 653        ret = qemu_co_recv(sockfd, data, *rlen);
 654        if (ret != *rlen) {
 655            error_report("failed to get the data, %s", strerror(errno));
 656            ret = -errno;
 657            goto out;
 658        }
 659    }
 660    ret = 0;
 661out:
 662    /* there is at most one request for this sockfd, so it is safe to
 663     * set each handler to NULL. */
 664    aio_set_fd_handler(srco->aio_context, sockfd, false,
 665                       NULL, NULL, NULL, NULL);
 666
 667    srco->co = NULL;
 668    srco->ret = ret;
 669    /* Set srco->finished before reading bs->wakeup.  */
 670    atomic_mb_set(&srco->finished, true);
 671    if (srco->bs) {
 672        bdrv_wakeup(srco->bs);
 673    }
 674}
 675
 676/*
 677 * Send the request to the sheep in a synchronous manner.
 678 *
 679 * Return 0 on success, -errno in case of error.
 680 */
 681static int do_req(int sockfd, BlockDriverState *bs, SheepdogReq *hdr,
 682                  void *data, unsigned int *wlen, unsigned int *rlen)
 683{
 684    Coroutine *co;
 685    SheepdogReqCo srco = {
 686        .sockfd = sockfd,
 687        .aio_context = bs ? bdrv_get_aio_context(bs) : qemu_get_aio_context(),
 688        .bs = bs,
 689        .hdr = hdr,
 690        .data = data,
 691        .wlen = wlen,
 692        .rlen = rlen,
 693        .ret = 0,
 694        .finished = false,
 695    };
 696
 697    if (qemu_in_coroutine()) {
 698        do_co_req(&srco);
 699    } else {
 700        co = qemu_coroutine_create(do_co_req, &srco);
 701        if (bs) {
 702            bdrv_coroutine_enter(bs, co);
 703            BDRV_POLL_WHILE(bs, !srco.finished);
 704        } else {
 705            qemu_coroutine_enter(co);
 706            while (!srco.finished) {
 707                aio_poll(qemu_get_aio_context(), true);
 708            }
 709        }
 710    }
 711
 712    return srco.ret;
 713}
 714
 715static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 716                                         struct iovec *iov, int niov,
 717                                         enum AIOCBState aiocb_type);
 718static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
 719static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
 720static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
 721static void co_write_request(void *opaque);
 722
 723static coroutine_fn void reconnect_to_sdog(void *opaque)
 724{
 725    BDRVSheepdogState *s = opaque;
 726    AIOReq *aio_req, *next;
 727
 728    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
 729                       NULL, NULL, NULL);
 730    close(s->fd);
 731    s->fd = -1;
 732
 733    /* Wait for outstanding write requests to be completed. */
 734    while (s->co_send != NULL) {
 735        co_write_request(opaque);
 736    }
 737
 738    /* Try to reconnect the sheepdog server every one second. */
 739    while (s->fd < 0) {
 740        Error *local_err = NULL;
 741        s->fd = get_sheep_fd(s, &local_err);
 742        if (s->fd < 0) {
 743            trace_sheepdog_reconnect_to_sdog();
 744            error_report_err(local_err);
 745            qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 1000000000ULL);
 746        }
 747    };
 748
 749    /*
 750     * Now we have to resend all the request in the inflight queue.  However,
 751     * resend_aioreq() can yield and newly created requests can be added to the
 752     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
 753     * have to move all the inflight requests to the failed queue before
 754     * resend_aioreq() is called.
 755     */
 756    qemu_co_mutex_lock(&s->queue_lock);
 757    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
 758        QLIST_REMOVE(aio_req, aio_siblings);
 759        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
 760    }
 761
 762    /* Resend all the failed aio requests. */
 763    while (!QLIST_EMPTY(&s->failed_aio_head)) {
 764        aio_req = QLIST_FIRST(&s->failed_aio_head);
 765        QLIST_REMOVE(aio_req, aio_siblings);
 766        qemu_co_mutex_unlock(&s->queue_lock);
 767        resend_aioreq(s, aio_req);
 768        qemu_co_mutex_lock(&s->queue_lock);
 769    }
 770    qemu_co_mutex_unlock(&s->queue_lock);
 771}
 772
 773/*
 774 * Receive responses of the I/O requests.
 775 *
 776 * This function is registered as a fd handler, and called from the
 777 * main loop when s->fd is ready for reading responses.
 778 */
 779static void coroutine_fn aio_read_response(void *opaque)
 780{
 781    SheepdogObjRsp rsp;
 782    BDRVSheepdogState *s = opaque;
 783    int fd = s->fd;
 784    int ret;
 785    AIOReq *aio_req = NULL;
 786    SheepdogAIOCB *acb;
 787    uint64_t idx;
 788
 789    /* read a header */
 790    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
 791    if (ret != sizeof(rsp)) {
 792        error_report("failed to get the header, %s", strerror(errno));
 793        goto err;
 794    }
 795
 796    /* find the right aio_req from the inflight aio list */
 797    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
 798        if (aio_req->id == rsp.id) {
 799            break;
 800        }
 801    }
 802    if (!aio_req) {
 803        error_report("cannot find aio_req %x", rsp.id);
 804        goto err;
 805    }
 806
 807    acb = aio_req->aiocb;
 808
 809    switch (acb->aiocb_type) {
 810    case AIOCB_WRITE_UDATA:
 811        if (!is_data_obj(aio_req->oid)) {
 812            break;
 813        }
 814        idx = data_oid_to_idx(aio_req->oid);
 815
 816        if (aio_req->create) {
 817            /*
 818             * If the object is newly created one, we need to update
 819             * the vdi object (metadata object).  min_dirty_data_idx
 820             * and max_dirty_data_idx are changed to include updated
 821             * index between them.
 822             */
 823            if (rsp.result == SD_RES_SUCCESS) {
 824                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
 825                acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
 826                acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
 827            }
 828        }
 829        break;
 830    case AIOCB_READ_UDATA:
 831        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
 832                            aio_req->iov_offset, rsp.data_length);
 833        if (ret != rsp.data_length) {
 834            error_report("failed to get the data, %s", strerror(errno));
 835            goto err;
 836        }
 837        break;
 838    case AIOCB_FLUSH_CACHE:
 839        if (rsp.result == SD_RES_INVALID_PARMS) {
 840            trace_sheepdog_aio_read_response();
 841            s->cache_flags = SD_FLAG_CMD_DIRECT;
 842            rsp.result = SD_RES_SUCCESS;
 843        }
 844        break;
 845    case AIOCB_DISCARD_OBJ:
 846        switch (rsp.result) {
 847        case SD_RES_INVALID_PARMS:
 848            error_report("server doesn't support discard command");
 849            rsp.result = SD_RES_SUCCESS;
 850            s->discard_supported = false;
 851            break;
 852        default:
 853            break;
 854        }
 855    }
 856
 857    /* No more data for this aio_req (reload_inode below uses its own file
 858     * descriptor handler which doesn't use co_recv).
 859    */
 860    s->co_recv = NULL;
 861
 862    qemu_co_mutex_lock(&s->queue_lock);
 863    QLIST_REMOVE(aio_req, aio_siblings);
 864    qemu_co_mutex_unlock(&s->queue_lock);
 865
 866    switch (rsp.result) {
 867    case SD_RES_SUCCESS:
 868        break;
 869    case SD_RES_READONLY:
 870        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
 871            ret = reload_inode(s, 0, "");
 872            if (ret < 0) {
 873                goto err;
 874            }
 875        }
 876        if (is_data_obj(aio_req->oid)) {
 877            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
 878                                           data_oid_to_idx(aio_req->oid));
 879        } else {
 880            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
 881        }
 882        resend_aioreq(s, aio_req);
 883        return;
 884    default:
 885        acb->ret = -EIO;
 886        error_report("%s", sd_strerror(rsp.result));
 887        break;
 888    }
 889
 890    g_free(aio_req);
 891
 892    if (!--acb->nr_pending) {
 893        /*
 894         * We've finished all requests which belong to the AIOCB, so
 895         * we can switch back to sd_co_readv/writev now.
 896         */
 897        aio_co_wake(acb->coroutine);
 898    }
 899
 900    return;
 901
 902err:
 903    reconnect_to_sdog(opaque);
 904}
 905
 906static void co_read_response(void *opaque)
 907{
 908    BDRVSheepdogState *s = opaque;
 909
 910    if (!s->co_recv) {
 911        s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
 912    }
 913
 914    aio_co_enter(s->aio_context, s->co_recv);
 915}
 916
 917static void co_write_request(void *opaque)
 918{
 919    BDRVSheepdogState *s = opaque;
 920
 921    aio_co_wake(s->co_send);
 922}
 923
 924/*
 925 * Return a socket descriptor to read/write objects.
 926 *
 927 * We cannot use this descriptor for other operations because
 928 * the block driver may be on waiting response from the server.
 929 */
 930static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
 931{
 932    int fd;
 933
 934    fd = connect_to_sdog(s, errp);
 935    if (fd < 0) {
 936        return fd;
 937    }
 938
 939    aio_set_fd_handler(s->aio_context, fd, false,
 940                       co_read_response, NULL, NULL, s);
 941    return fd;
 942}
 943
 944/*
 945 * Parse numeric snapshot ID in @str
 946 * If @str can't be parsed as number, return false.
 947 * Else, if the number is zero or too large, set *@snapid to zero and
 948 * return true.
 949 * Else, set *@snapid to the number and return true.
 950 */
 951static bool sd_parse_snapid(const char *str, uint32_t *snapid)
 952{
 953    unsigned long ul;
 954    int ret;
 955
 956    ret = qemu_strtoul(str, NULL, 10, &ul);
 957    if (ret == -ERANGE) {
 958        ul = ret = 0;
 959    }
 960    if (ret) {
 961        return false;
 962    }
 963    if (ul > UINT32_MAX) {
 964        ul = 0;
 965    }
 966
 967    *snapid = ul;
 968    return true;
 969}
 970
 971static bool sd_parse_snapid_or_tag(const char *str,
 972                                   uint32_t *snapid, char tag[])
 973{
 974    if (!sd_parse_snapid(str, snapid)) {
 975        *snapid = 0;
 976        if (g_strlcpy(tag, str, SD_MAX_VDI_TAG_LEN) >= SD_MAX_VDI_TAG_LEN) {
 977            return false;
 978        }
 979    } else if (!*snapid) {
 980        return false;
 981    } else {
 982        tag[0] = 0;
 983    }
 984    return true;
 985}
 986
 987typedef struct {
 988    const char *path;           /* non-null iff transport is tcp */
 989    const char *host;           /* valid when transport is tcp */
 990    int port;                   /* valid when transport is tcp */
 991    char vdi[SD_MAX_VDI_LEN];
 992    char tag[SD_MAX_VDI_TAG_LEN];
 993    uint32_t snap_id;
 994    /* Remainder is only for sd_config_done() */
 995    URI *uri;
 996    QueryParams *qp;
 997} SheepdogConfig;
 998
 999static void sd_config_done(SheepdogConfig *cfg)
1000{
1001    if (cfg->qp) {
1002        query_params_free(cfg->qp);
1003    }
1004    uri_free(cfg->uri);
1005}
1006
1007static void sd_parse_uri(SheepdogConfig *cfg, const char *filename,
1008                         Error **errp)
1009{
1010    Error *err = NULL;
1011    QueryParams *qp = NULL;
1012    bool is_unix;
1013    URI *uri;
1014
1015    memset(cfg, 0, sizeof(*cfg));
1016
1017    cfg->uri = uri = uri_parse(filename);
1018    if (!uri) {
1019        error_setg(&err, "invalid URI '%s'", filename);
1020        goto out;
1021    }
1022
1023    /* transport */
1024    if (!g_strcmp0(uri->scheme, "sheepdog")) {
1025        is_unix = false;
1026    } else if (!g_strcmp0(uri->scheme, "sheepdog+tcp")) {
1027        is_unix = false;
1028    } else if (!g_strcmp0(uri->scheme, "sheepdog+unix")) {
1029        is_unix = true;
1030    } else {
1031        error_setg(&err, "URI scheme must be 'sheepdog', 'sheepdog+tcp',"
1032                   " or 'sheepdog+unix'");
1033        goto out;
1034    }
1035
1036    if (uri->path == NULL || !strcmp(uri->path, "/")) {
1037        error_setg(&err, "missing file path in URI");
1038        goto out;
1039    }
1040    if (g_strlcpy(cfg->vdi, uri->path + 1, SD_MAX_VDI_LEN)
1041        >= SD_MAX_VDI_LEN) {
1042        error_setg(&err, "VDI name is too long");
1043        goto out;
1044    }
1045
1046    cfg->qp = qp = query_params_parse(uri->query);
1047
1048    if (is_unix) {
1049        /* sheepdog+unix:///vdiname?socket=path */
1050        if (uri->server || uri->port) {
1051            error_setg(&err, "URI scheme %s doesn't accept a server address",
1052                       uri->scheme);
1053            goto out;
1054        }
1055        if (!qp->n) {
1056            error_setg(&err,
1057                       "URI scheme %s requires query parameter 'socket'",
1058                       uri->scheme);
1059            goto out;
1060        }
1061        if (qp->n != 1 || strcmp(qp->p[0].name, "socket")) {
1062            error_setg(&err, "unexpected query parameters");
1063            goto out;
1064        }
1065        cfg->path = qp->p[0].value;
1066    } else {
1067        /* sheepdog[+tcp]://[host:port]/vdiname */
1068        if (qp->n) {
1069            error_setg(&err, "unexpected query parameters");
1070            goto out;
1071        }
1072        cfg->host = uri->server;
1073        cfg->port = uri->port;
1074    }
1075
1076    /* snapshot tag */
1077    if (uri->fragment) {
1078        if (!sd_parse_snapid_or_tag(uri->fragment,
1079                                    &cfg->snap_id, cfg->tag)) {
1080            error_setg(&err, "'%s' is not a valid snapshot ID",
1081                       uri->fragment);
1082            goto out;
1083        }
1084    } else {
1085        cfg->snap_id = CURRENT_VDI_ID; /* search current vdi */
1086    }
1087
1088out:
1089    if (err) {
1090        error_propagate(errp, err);
1091        sd_config_done(cfg);
1092    }
1093}
1094
1095/*
1096 * Parse a filename (old syntax)
1097 *
1098 * filename must be one of the following formats:
1099 *   1. [vdiname]
1100 *   2. [vdiname]:[snapid]
1101 *   3. [vdiname]:[tag]
1102 *   4. [hostname]:[port]:[vdiname]
1103 *   5. [hostname]:[port]:[vdiname]:[snapid]
1104 *   6. [hostname]:[port]:[vdiname]:[tag]
1105 *
1106 * You can boot from the snapshot images by specifying `snapid` or
1107 * `tag'.
1108 *
1109 * You can run VMs outside the Sheepdog cluster by specifying
1110 * `hostname' and `port' (experimental).
1111 */
1112static void parse_vdiname(SheepdogConfig *cfg, const char *filename,
1113                          Error **errp)
1114{
1115    Error *err = NULL;
1116    char *p, *q, *uri;
1117    const char *host_spec, *vdi_spec;
1118    int nr_sep;
1119
1120    strstart(filename, "sheepdog:", &filename);
1121    p = q = g_strdup(filename);
1122
1123    /* count the number of separators */
1124    nr_sep = 0;
1125    while (*p) {
1126        if (*p == ':') {
1127            nr_sep++;
1128        }
1129        p++;
1130    }
1131    p = q;
1132
1133    /* use the first two tokens as host_spec. */
1134    if (nr_sep >= 2) {
1135        host_spec = p;
1136        p = strchr(p, ':');
1137        p++;
1138        p = strchr(p, ':');
1139        *p++ = '\0';
1140    } else {
1141        host_spec = "";
1142    }
1143
1144    vdi_spec = p;
1145
1146    p = strchr(vdi_spec, ':');
1147    if (p) {
1148        *p++ = '#';
1149    }
1150
1151    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1152
1153    /*
1154     * FIXME We to escape URI meta-characters, e.g. "x?y=z"
1155     * produces "sheepdog://x?y=z".  Because of that ...
1156     */
1157    sd_parse_uri(cfg, uri, &err);
1158    if (err) {
1159        /*
1160         * ... this can fail, but the error message is misleading.
1161         * Replace it by the traditional useless one until the
1162         * escaping is fixed.
1163         */
1164        error_free(err);
1165        error_setg(errp, "Can't parse filename");
1166    }
1167
1168    g_free(q);
1169    g_free(uri);
1170}
1171
1172static void sd_parse_filename(const char *filename, QDict *options,
1173                              Error **errp)
1174{
1175    Error *err = NULL;
1176    SheepdogConfig cfg;
1177    char buf[32];
1178
1179    if (strstr(filename, "://")) {
1180        sd_parse_uri(&cfg, filename, &err);
1181    } else {
1182        parse_vdiname(&cfg, filename, &err);
1183    }
1184    if (err) {
1185        error_propagate(errp, err);
1186        return;
1187    }
1188
1189    if (cfg.path) {
1190        qdict_set_default_str(options, "server.path", cfg.path);
1191        qdict_set_default_str(options, "server.type", "unix");
1192    } else {
1193        qdict_set_default_str(options, "server.type", "inet");
1194        qdict_set_default_str(options, "server.host",
1195                              cfg.host ?: SD_DEFAULT_ADDR);
1196        snprintf(buf, sizeof(buf), "%d", cfg.port ?: SD_DEFAULT_PORT);
1197        qdict_set_default_str(options, "server.port", buf);
1198    }
1199    qdict_set_default_str(options, "vdi", cfg.vdi);
1200    qdict_set_default_str(options, "tag", cfg.tag);
1201    if (cfg.snap_id) {
1202        snprintf(buf, sizeof(buf), "%d", cfg.snap_id);
1203        qdict_set_default_str(options, "snap-id", buf);
1204    }
1205
1206    sd_config_done(&cfg);
1207}
1208
1209static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1210                         uint32_t snapid, const char *tag, uint32_t *vid,
1211                         bool lock, Error **errp)
1212{
1213    int ret, fd;
1214    SheepdogVdiReq hdr;
1215    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1216    unsigned int wlen, rlen = 0;
1217    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN] QEMU_NONSTRING;
1218
1219    fd = connect_to_sdog(s, errp);
1220    if (fd < 0) {
1221        return fd;
1222    }
1223
1224    /* This pair of strncpy calls ensures that the buffer is zero-filled,
1225     * which is desirable since we'll soon be sending those bytes, and
1226     * don't want the send_req to read uninitialized data.
1227     */
1228    strncpy(buf, filename, SD_MAX_VDI_LEN);
1229    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1230
1231    memset(&hdr, 0, sizeof(hdr));
1232    if (lock) {
1233        hdr.opcode = SD_OP_LOCK_VDI;
1234        hdr.type = LOCK_TYPE_NORMAL;
1235    } else {
1236        hdr.opcode = SD_OP_GET_VDI_INFO;
1237    }
1238    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1239    hdr.proto_ver = SD_PROTO_VER;
1240    hdr.data_length = wlen;
1241    hdr.snapid = snapid;
1242    hdr.flags = SD_FLAG_CMD_WRITE;
1243
1244    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1245    if (ret) {
1246        error_setg_errno(errp, -ret, "cannot get vdi info");
1247        goto out;
1248    }
1249
1250    if (rsp->result != SD_RES_SUCCESS) {
1251        error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1252                   sd_strerror(rsp->result), filename, snapid, tag);
1253        if (rsp->result == SD_RES_NO_VDI) {
1254            ret = -ENOENT;
1255        } else if (rsp->result == SD_RES_VDI_LOCKED) {
1256            ret = -EBUSY;
1257        } else {
1258            ret = -EIO;
1259        }
1260        goto out;
1261    }
1262    *vid = rsp->vdi_id;
1263
1264    ret = 0;
1265out:
1266    closesocket(fd);
1267    return ret;
1268}
1269
1270static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1271                                         struct iovec *iov, int niov,
1272                                         enum AIOCBState aiocb_type)
1273{
1274    int nr_copies = s->inode.nr_copies;
1275    SheepdogObjReq hdr;
1276    unsigned int wlen = 0;
1277    int ret;
1278    uint64_t oid = aio_req->oid;
1279    unsigned int datalen = aio_req->data_len;
1280    uint64_t offset = aio_req->offset;
1281    uint8_t flags = aio_req->flags;
1282    uint64_t old_oid = aio_req->base_oid;
1283    bool create = aio_req->create;
1284
1285    qemu_co_mutex_lock(&s->queue_lock);
1286    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1287    qemu_co_mutex_unlock(&s->queue_lock);
1288
1289    if (!nr_copies) {
1290        error_report("bug");
1291    }
1292
1293    memset(&hdr, 0, sizeof(hdr));
1294
1295    switch (aiocb_type) {
1296    case AIOCB_FLUSH_CACHE:
1297        hdr.opcode = SD_OP_FLUSH_VDI;
1298        break;
1299    case AIOCB_READ_UDATA:
1300        hdr.opcode = SD_OP_READ_OBJ;
1301        hdr.flags = flags;
1302        break;
1303    case AIOCB_WRITE_UDATA:
1304        if (create) {
1305            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1306        } else {
1307            hdr.opcode = SD_OP_WRITE_OBJ;
1308        }
1309        wlen = datalen;
1310        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1311        break;
1312    case AIOCB_DISCARD_OBJ:
1313        hdr.opcode = SD_OP_WRITE_OBJ;
1314        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1315        s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1316        offset = offsetof(SheepdogInode,
1317                          data_vdi_id[data_oid_to_idx(oid)]);
1318        oid = vid_to_vdi_oid(s->inode.vdi_id);
1319        wlen = datalen = sizeof(uint32_t);
1320        break;
1321    }
1322
1323    if (s->cache_flags) {
1324        hdr.flags |= s->cache_flags;
1325    }
1326
1327    hdr.oid = oid;
1328    hdr.cow_oid = old_oid;
1329    hdr.copies = s->inode.nr_copies;
1330
1331    hdr.data_length = datalen;
1332    hdr.offset = offset;
1333
1334    hdr.id = aio_req->id;
1335
1336    qemu_co_mutex_lock(&s->lock);
1337    s->co_send = qemu_coroutine_self();
1338    aio_set_fd_handler(s->aio_context, s->fd, false,
1339                       co_read_response, co_write_request, NULL, s);
1340    socket_set_cork(s->fd, 1);
1341
1342    /* send a header */
1343    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1344    if (ret != sizeof(hdr)) {
1345        error_report("failed to send a req, %s", strerror(errno));
1346        goto out;
1347    }
1348
1349    if (wlen) {
1350        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1351        if (ret != wlen) {
1352            error_report("failed to send a data, %s", strerror(errno));
1353        }
1354    }
1355out:
1356    socket_set_cork(s->fd, 0);
1357    aio_set_fd_handler(s->aio_context, s->fd, false,
1358                       co_read_response, NULL, NULL, s);
1359    s->co_send = NULL;
1360    qemu_co_mutex_unlock(&s->lock);
1361}
1362
1363static int read_write_object(int fd, BlockDriverState *bs, char *buf,
1364                             uint64_t oid, uint8_t copies,
1365                             unsigned int datalen, uint64_t offset,
1366                             bool write, bool create, uint32_t cache_flags)
1367{
1368    SheepdogObjReq hdr;
1369    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1370    unsigned int wlen, rlen;
1371    int ret;
1372
1373    memset(&hdr, 0, sizeof(hdr));
1374
1375    if (write) {
1376        wlen = datalen;
1377        rlen = 0;
1378        hdr.flags = SD_FLAG_CMD_WRITE;
1379        if (create) {
1380            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1381        } else {
1382            hdr.opcode = SD_OP_WRITE_OBJ;
1383        }
1384    } else {
1385        wlen = 0;
1386        rlen = datalen;
1387        hdr.opcode = SD_OP_READ_OBJ;
1388    }
1389
1390    hdr.flags |= cache_flags;
1391
1392    hdr.oid = oid;
1393    hdr.data_length = datalen;
1394    hdr.offset = offset;
1395    hdr.copies = copies;
1396
1397    ret = do_req(fd, bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1398    if (ret) {
1399        error_report("failed to send a request to the sheep");
1400        return ret;
1401    }
1402
1403    switch (rsp->result) {
1404    case SD_RES_SUCCESS:
1405        return 0;
1406    default:
1407        error_report("%s", sd_strerror(rsp->result));
1408        return -EIO;
1409    }
1410}
1411
1412static int read_object(int fd, BlockDriverState *bs, char *buf,
1413                       uint64_t oid, uint8_t copies,
1414                       unsigned int datalen, uint64_t offset,
1415                       uint32_t cache_flags)
1416{
1417    return read_write_object(fd, bs, buf, oid, copies,
1418                             datalen, offset, false,
1419                             false, cache_flags);
1420}
1421
1422static int write_object(int fd, BlockDriverState *bs, char *buf,
1423                        uint64_t oid, uint8_t copies,
1424                        unsigned int datalen, uint64_t offset, bool create,
1425                        uint32_t cache_flags)
1426{
1427    return read_write_object(fd, bs, buf, oid, copies,
1428                             datalen, offset, true,
1429                             create, cache_flags);
1430}
1431
1432/* update inode with the latest state */
1433static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1434{
1435    Error *local_err = NULL;
1436    SheepdogInode *inode;
1437    int ret = 0, fd;
1438    uint32_t vid = 0;
1439
1440    fd = connect_to_sdog(s, &local_err);
1441    if (fd < 0) {
1442        error_report_err(local_err);
1443        return -EIO;
1444    }
1445
1446    inode = g_malloc(SD_INODE_HEADER_SIZE);
1447
1448    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1449    if (ret) {
1450        error_report_err(local_err);
1451        goto out;
1452    }
1453
1454    ret = read_object(fd, s->bs, (char *)inode, vid_to_vdi_oid(vid),
1455                      s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1456                      s->cache_flags);
1457    if (ret < 0) {
1458        goto out;
1459    }
1460
1461    if (inode->vdi_id != s->inode.vdi_id) {
1462        memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1463    }
1464
1465out:
1466    g_free(inode);
1467    closesocket(fd);
1468
1469    return ret;
1470}
1471
1472static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1473{
1474    SheepdogAIOCB *acb = aio_req->aiocb;
1475
1476    aio_req->create = false;
1477
1478    /* check whether this request becomes a CoW one */
1479    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1480        int idx = data_oid_to_idx(aio_req->oid);
1481
1482        if (is_data_obj_writable(&s->inode, idx)) {
1483            goto out;
1484        }
1485
1486        if (s->inode.data_vdi_id[idx]) {
1487            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1488            aio_req->flags |= SD_FLAG_CMD_COW;
1489        }
1490        aio_req->create = true;
1491    }
1492out:
1493    if (is_data_obj(aio_req->oid)) {
1494        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1495                        acb->aiocb_type);
1496    } else {
1497        struct iovec iov;
1498        iov.iov_base = &s->inode;
1499        iov.iov_len = sizeof(s->inode);
1500        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1501    }
1502}
1503
1504static void sd_detach_aio_context(BlockDriverState *bs)
1505{
1506    BDRVSheepdogState *s = bs->opaque;
1507
1508    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1509                       NULL, NULL, NULL);
1510}
1511
1512static void sd_attach_aio_context(BlockDriverState *bs,
1513                                  AioContext *new_context)
1514{
1515    BDRVSheepdogState *s = bs->opaque;
1516
1517    s->aio_context = new_context;
1518    aio_set_fd_handler(new_context, s->fd, false,
1519                       co_read_response, NULL, NULL, s);
1520}
1521
1522static QemuOptsList runtime_opts = {
1523    .name = "sheepdog",
1524    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1525    .desc = {
1526        {
1527            .name = "vdi",
1528            .type = QEMU_OPT_STRING,
1529        },
1530        {
1531            .name = "snap-id",
1532            .type = QEMU_OPT_NUMBER,
1533        },
1534        {
1535            .name = "tag",
1536            .type = QEMU_OPT_STRING,
1537        },
1538        { /* end of list */ }
1539    },
1540};
1541
1542static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1543                   Error **errp)
1544{
1545    int ret, fd;
1546    uint32_t vid = 0;
1547    BDRVSheepdogState *s = bs->opaque;
1548    const char *vdi, *snap_id_str, *tag;
1549    uint64_t snap_id;
1550    char *buf = NULL;
1551    QemuOpts *opts;
1552    Error *local_err = NULL;
1553
1554    s->bs = bs;
1555    s->aio_context = bdrv_get_aio_context(bs);
1556
1557    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1558    qemu_opts_absorb_qdict(opts, options, &local_err);
1559    if (local_err) {
1560        error_propagate(errp, local_err);
1561        ret = -EINVAL;
1562        goto err_no_fd;
1563    }
1564
1565    s->addr = sd_server_config(options, errp);
1566    if (!s->addr) {
1567        ret = -EINVAL;
1568        goto err_no_fd;
1569    }
1570
1571    vdi = qemu_opt_get(opts, "vdi");
1572    snap_id_str = qemu_opt_get(opts, "snap-id");
1573    snap_id = qemu_opt_get_number(opts, "snap-id", CURRENT_VDI_ID);
1574    tag = qemu_opt_get(opts, "tag");
1575
1576    if (!vdi) {
1577        error_setg(errp, "parameter 'vdi' is missing");
1578        ret = -EINVAL;
1579        goto err_no_fd;
1580    }
1581    if (strlen(vdi) >= SD_MAX_VDI_LEN) {
1582        error_setg(errp, "value of parameter 'vdi' is too long");
1583        ret = -EINVAL;
1584        goto err_no_fd;
1585    }
1586
1587    if (snap_id > UINT32_MAX) {
1588        snap_id = 0;
1589    }
1590    if (snap_id_str && !snap_id) {
1591        error_setg(errp, "'snap-id=%s' is not a valid snapshot ID",
1592                   snap_id_str);
1593        ret = -EINVAL;
1594        goto err_no_fd;
1595    }
1596
1597    if (!tag) {
1598        tag = "";
1599    }
1600    if (strlen(tag) >= SD_MAX_VDI_TAG_LEN) {
1601        error_setg(errp, "value of parameter 'tag' is too long");
1602        ret = -EINVAL;
1603        goto err_no_fd;
1604    }
1605
1606    QLIST_INIT(&s->inflight_aio_head);
1607    QLIST_INIT(&s->failed_aio_head);
1608    QLIST_INIT(&s->inflight_aiocb_head);
1609
1610    s->fd = get_sheep_fd(s, errp);
1611    if (s->fd < 0) {
1612        ret = s->fd;
1613        goto err_no_fd;
1614    }
1615
1616    ret = find_vdi_name(s, vdi, (uint32_t)snap_id, tag, &vid, true, errp);
1617    if (ret) {
1618        goto err;
1619    }
1620
1621    /*
1622     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1623     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1624     */
1625    s->cache_flags = SD_FLAG_CMD_CACHE;
1626    if (flags & BDRV_O_NOCACHE) {
1627        s->cache_flags = SD_FLAG_CMD_DIRECT;
1628    }
1629    s->discard_supported = true;
1630
1631    if (snap_id || tag[0]) {
1632        trace_sheepdog_open(vid);
1633        s->is_snapshot = true;
1634    }
1635
1636    fd = connect_to_sdog(s, errp);
1637    if (fd < 0) {
1638        ret = fd;
1639        goto err;
1640    }
1641
1642    buf = g_malloc(SD_INODE_SIZE);
1643    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
1644                      0, SD_INODE_SIZE, 0, s->cache_flags);
1645
1646    closesocket(fd);
1647
1648    if (ret) {
1649        error_setg(errp, "Can't read snapshot inode");
1650        goto err;
1651    }
1652
1653    memcpy(&s->inode, buf, sizeof(s->inode));
1654
1655    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1656    pstrcpy(s->name, sizeof(s->name), vdi);
1657    qemu_co_mutex_init(&s->lock);
1658    qemu_co_mutex_init(&s->queue_lock);
1659    qemu_co_queue_init(&s->overlapping_queue);
1660    qemu_opts_del(opts);
1661    g_free(buf);
1662    return 0;
1663
1664err:
1665    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1666                       false, NULL, NULL, NULL, NULL);
1667    closesocket(s->fd);
1668err_no_fd:
1669    qemu_opts_del(opts);
1670    g_free(buf);
1671    return ret;
1672}
1673
1674static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1675                             Error **errp)
1676{
1677    BDRVSheepdogState *s = state->bs->opaque;
1678    BDRVSheepdogReopenState *re_s;
1679    int ret = 0;
1680
1681    re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1682
1683    re_s->cache_flags = SD_FLAG_CMD_CACHE;
1684    if (state->flags & BDRV_O_NOCACHE) {
1685        re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1686    }
1687
1688    re_s->fd = get_sheep_fd(s, errp);
1689    if (re_s->fd < 0) {
1690        ret = re_s->fd;
1691        return ret;
1692    }
1693
1694    return ret;
1695}
1696
1697static void sd_reopen_commit(BDRVReopenState *state)
1698{
1699    BDRVSheepdogReopenState *re_s = state->opaque;
1700    BDRVSheepdogState *s = state->bs->opaque;
1701
1702    if (s->fd) {
1703        aio_set_fd_handler(s->aio_context, s->fd, false,
1704                           NULL, NULL, NULL, NULL);
1705        closesocket(s->fd);
1706    }
1707
1708    s->fd = re_s->fd;
1709    s->cache_flags = re_s->cache_flags;
1710
1711    g_free(state->opaque);
1712    state->opaque = NULL;
1713
1714    return;
1715}
1716
1717static void sd_reopen_abort(BDRVReopenState *state)
1718{
1719    BDRVSheepdogReopenState *re_s = state->opaque;
1720    BDRVSheepdogState *s = state->bs->opaque;
1721
1722    if (re_s == NULL) {
1723        return;
1724    }
1725
1726    if (re_s->fd) {
1727        aio_set_fd_handler(s->aio_context, re_s->fd, false,
1728                           NULL, NULL, NULL, NULL);
1729        closesocket(re_s->fd);
1730    }
1731
1732    g_free(state->opaque);
1733    state->opaque = NULL;
1734
1735    return;
1736}
1737
1738static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1739                        Error **errp)
1740{
1741    SheepdogVdiReq hdr;
1742    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1743    int fd, ret;
1744    unsigned int wlen, rlen = 0;
1745    char buf[SD_MAX_VDI_LEN];
1746
1747    fd = connect_to_sdog(s, errp);
1748    if (fd < 0) {
1749        return fd;
1750    }
1751
1752    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1753     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1754     */
1755    memset(buf, 0, sizeof(buf));
1756    pstrcpy(buf, sizeof(buf), s->name);
1757
1758    memset(&hdr, 0, sizeof(hdr));
1759    hdr.opcode = SD_OP_NEW_VDI;
1760    hdr.base_vdi_id = s->inode.vdi_id;
1761
1762    wlen = SD_MAX_VDI_LEN;
1763
1764    hdr.flags = SD_FLAG_CMD_WRITE;
1765    hdr.snapid = snapshot;
1766
1767    hdr.data_length = wlen;
1768    hdr.vdi_size = s->inode.vdi_size;
1769    hdr.copy_policy = s->inode.copy_policy;
1770    hdr.copies = s->inode.nr_copies;
1771    hdr.block_size_shift = s->inode.block_size_shift;
1772
1773    ret = do_req(fd, NULL, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1774
1775    closesocket(fd);
1776
1777    if (ret) {
1778        error_setg_errno(errp, -ret, "create failed");
1779        return ret;
1780    }
1781
1782    if (rsp->result != SD_RES_SUCCESS) {
1783        error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1784        return -EIO;
1785    }
1786
1787    if (vdi_id) {
1788        *vdi_id = rsp->vdi_id;
1789    }
1790
1791    return 0;
1792}
1793
1794static int sd_prealloc(BlockDriverState *bs, int64_t old_size, int64_t new_size,
1795                       Error **errp)
1796{
1797    BlockBackend *blk = NULL;
1798    BDRVSheepdogState *base = bs->opaque;
1799    unsigned long buf_size;
1800    uint32_t idx, max_idx;
1801    uint32_t object_size;
1802    void *buf = NULL;
1803    int ret;
1804
1805    blk = blk_new(bdrv_get_aio_context(bs),
1806                  BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE | BLK_PERM_RESIZE,
1807                  BLK_PERM_ALL);
1808
1809    ret = blk_insert_bs(blk, bs, errp);
1810    if (ret < 0) {
1811        goto out_with_err_set;
1812    }
1813
1814    blk_set_allow_write_beyond_eof(blk, true);
1815
1816    object_size = (UINT32_C(1) << base->inode.block_size_shift);
1817    buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1818    buf = g_malloc0(buf_size);
1819
1820    max_idx = DIV_ROUND_UP(new_size, buf_size);
1821
1822    for (idx = old_size / buf_size; idx < max_idx; idx++) {
1823        /*
1824         * The created image can be a cloned image, so we need to read
1825         * a data from the source image.
1826         */
1827        ret = blk_pread(blk, idx * buf_size, buf, buf_size);
1828        if (ret < 0) {
1829            goto out;
1830        }
1831        ret = blk_pwrite(blk, idx * buf_size, buf, buf_size, 0);
1832        if (ret < 0) {
1833            goto out;
1834        }
1835    }
1836
1837    ret = 0;
1838out:
1839    if (ret < 0) {
1840        error_setg_errno(errp, -ret, "Can't pre-allocate");
1841    }
1842out_with_err_set:
1843    blk_unref(blk);
1844    g_free(buf);
1845
1846    return ret;
1847}
1848
1849static int sd_create_prealloc(BlockdevOptionsSheepdog *location, int64_t size,
1850                              Error **errp)
1851{
1852    BlockDriverState *bs;
1853    Visitor *v;
1854    QObject *obj = NULL;
1855    QDict *qdict;
1856    Error *local_err = NULL;
1857    int ret;
1858
1859    v = qobject_output_visitor_new(&obj);
1860    visit_type_BlockdevOptionsSheepdog(v, NULL, &location, &local_err);
1861    visit_free(v);
1862
1863    if (local_err) {
1864        error_propagate(errp, local_err);
1865        qobject_unref(obj);
1866        return -EINVAL;
1867    }
1868
1869    qdict = qobject_to(QDict, obj);
1870    qdict_flatten(qdict);
1871
1872    qdict_put_str(qdict, "driver", "sheepdog");
1873
1874    bs = bdrv_open(NULL, NULL, qdict, BDRV_O_PROTOCOL | BDRV_O_RDWR, errp);
1875    if (bs == NULL) {
1876        ret = -EIO;
1877        goto fail;
1878    }
1879
1880    ret = sd_prealloc(bs, 0, size, errp);
1881fail:
1882    bdrv_unref(bs);
1883    qobject_unref(qdict);
1884    return ret;
1885}
1886
1887static int parse_redundancy(BDRVSheepdogState *s, SheepdogRedundancy *opt)
1888{
1889    struct SheepdogInode *inode = &s->inode;
1890
1891    switch (opt->type) {
1892    case SHEEPDOG_REDUNDANCY_TYPE_FULL:
1893        if (opt->u.full.copies > SD_MAX_COPIES || opt->u.full.copies < 1) {
1894            return -EINVAL;
1895        }
1896        inode->copy_policy = 0;
1897        inode->nr_copies = opt->u.full.copies;
1898        return 0;
1899
1900    case SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED:
1901    {
1902        int64_t copy = opt->u.erasure_coded.data_strips;
1903        int64_t parity = opt->u.erasure_coded.parity_strips;
1904
1905        if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1906            return -EINVAL;
1907        }
1908
1909        if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1910            return -EINVAL;
1911        }
1912
1913        /*
1914         * 4 bits for parity and 4 bits for data.
1915         * We have to compress upper data bits because it can't represent 16
1916         */
1917        inode->copy_policy = ((copy / 2) << 4) + parity;
1918        inode->nr_copies = copy + parity;
1919        return 0;
1920    }
1921
1922    default:
1923        g_assert_not_reached();
1924    }
1925
1926    return -EINVAL;
1927}
1928
1929/*
1930 * Sheepdog support two kinds of redundancy, full replication and erasure
1931 * coding.
1932 *
1933 * # create a fully replicated vdi with x copies
1934 * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1935 *
1936 * # create a erasure coded vdi with x data strips and y parity strips
1937 * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1938 */
1939static SheepdogRedundancy *parse_redundancy_str(const char *opt)
1940{
1941    SheepdogRedundancy *redundancy;
1942    const char *n1, *n2;
1943    long copy, parity;
1944    char p[10];
1945    int ret;
1946
1947    pstrcpy(p, sizeof(p), opt);
1948    n1 = strtok(p, ":");
1949    n2 = strtok(NULL, ":");
1950
1951    if (!n1) {
1952        return NULL;
1953    }
1954
1955    ret = qemu_strtol(n1, NULL, 10, &copy);
1956    if (ret < 0) {
1957        return NULL;
1958    }
1959
1960    redundancy = g_new0(SheepdogRedundancy, 1);
1961    if (!n2) {
1962        *redundancy = (SheepdogRedundancy) {
1963            .type               = SHEEPDOG_REDUNDANCY_TYPE_FULL,
1964            .u.full.copies      = copy,
1965        };
1966    } else {
1967        ret = qemu_strtol(n2, NULL, 10, &parity);
1968        if (ret < 0) {
1969            g_free(redundancy);
1970            return NULL;
1971        }
1972
1973        *redundancy = (SheepdogRedundancy) {
1974            .type               = SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED,
1975            .u.erasure_coded    = {
1976                .data_strips    = copy,
1977                .parity_strips  = parity,
1978            },
1979        };
1980    }
1981
1982    return redundancy;
1983}
1984
1985static int parse_block_size_shift(BDRVSheepdogState *s,
1986                                  BlockdevCreateOptionsSheepdog *opts)
1987{
1988    struct SheepdogInode *inode = &s->inode;
1989    uint64_t object_size;
1990    int obj_order;
1991
1992    if (opts->has_object_size) {
1993        object_size = opts->object_size;
1994
1995        if ((object_size - 1) & object_size) {    /* not a power of 2? */
1996            return -EINVAL;
1997        }
1998        obj_order = ctz32(object_size);
1999        if (obj_order < 20 || obj_order > 31) {
2000            return -EINVAL;
2001        }
2002        inode->block_size_shift = (uint8_t)obj_order;
2003    }
2004
2005    return 0;
2006}
2007
2008static int sd_co_create(BlockdevCreateOptions *options, Error **errp)
2009{
2010    BlockdevCreateOptionsSheepdog *opts = &options->u.sheepdog;
2011    int ret = 0;
2012    uint32_t vid = 0;
2013    char *backing_file = NULL;
2014    char *buf = NULL;
2015    BDRVSheepdogState *s;
2016    uint64_t max_vdi_size;
2017    bool prealloc = false;
2018
2019    assert(options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2020
2021    s = g_new0(BDRVSheepdogState, 1);
2022
2023    /* Steal SocketAddress from QAPI, set NULL to prevent double free */
2024    s->addr = opts->location->server;
2025    opts->location->server = NULL;
2026
2027    if (strlen(opts->location->vdi) >= sizeof(s->name)) {
2028        error_setg(errp, "'vdi' string too long");
2029        ret = -EINVAL;
2030        goto out;
2031    }
2032    pstrcpy(s->name, sizeof(s->name), opts->location->vdi);
2033
2034    s->inode.vdi_size = opts->size;
2035    backing_file = opts->backing_file;
2036
2037    if (!opts->has_preallocation) {
2038        opts->preallocation = PREALLOC_MODE_OFF;
2039    }
2040    switch (opts->preallocation) {
2041    case PREALLOC_MODE_OFF:
2042        prealloc = false;
2043        break;
2044    case PREALLOC_MODE_FULL:
2045        prealloc = true;
2046        break;
2047    default:
2048        error_setg(errp, "Preallocation mode not supported for Sheepdog");
2049        ret = -EINVAL;
2050        goto out;
2051    }
2052
2053    if (opts->has_redundancy) {
2054        ret = parse_redundancy(s, opts->redundancy);
2055        if (ret < 0) {
2056            error_setg(errp, "Invalid redundancy mode");
2057            goto out;
2058        }
2059    }
2060    ret = parse_block_size_shift(s, opts);
2061    if (ret < 0) {
2062        error_setg(errp, "Invalid object_size."
2063                         " obect_size needs to be power of 2"
2064                         " and be limited from 2^20 to 2^31");
2065        goto out;
2066    }
2067
2068    if (opts->has_backing_file) {
2069        BlockBackend *blk;
2070        BDRVSheepdogState *base;
2071        BlockDriver *drv;
2072
2073        /* Currently, only Sheepdog backing image is supported. */
2074        drv = bdrv_find_protocol(opts->backing_file, true, NULL);
2075        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
2076            error_setg(errp, "backing_file must be a sheepdog image");
2077            ret = -EINVAL;
2078            goto out;
2079        }
2080
2081        blk = blk_new_open(opts->backing_file, NULL, NULL,
2082                           BDRV_O_PROTOCOL, errp);
2083        if (blk == NULL) {
2084            ret = -EIO;
2085            goto out;
2086        }
2087
2088        base = blk_bs(blk)->opaque;
2089
2090        if (!is_snapshot(&base->inode)) {
2091            error_setg(errp, "cannot clone from a non snapshot vdi");
2092            blk_unref(blk);
2093            ret = -EINVAL;
2094            goto out;
2095        }
2096        s->inode.vdi_id = base->inode.vdi_id;
2097        blk_unref(blk);
2098    }
2099
2100    s->aio_context = qemu_get_aio_context();
2101
2102    /* if block_size_shift is not specified, get cluster default value */
2103    if (s->inode.block_size_shift == 0) {
2104        SheepdogVdiReq hdr;
2105        SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
2106        int fd;
2107        unsigned int wlen = 0, rlen = 0;
2108
2109        fd = connect_to_sdog(s, errp);
2110        if (fd < 0) {
2111            ret = fd;
2112            goto out;
2113        }
2114
2115        memset(&hdr, 0, sizeof(hdr));
2116        hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
2117        hdr.proto_ver = SD_PROTO_VER;
2118
2119        ret = do_req(fd, NULL, (SheepdogReq *)&hdr,
2120                     NULL, &wlen, &rlen);
2121        closesocket(fd);
2122        if (ret) {
2123            error_setg_errno(errp, -ret, "failed to get cluster default");
2124            goto out;
2125        }
2126        if (rsp->result == SD_RES_SUCCESS) {
2127            s->inode.block_size_shift = rsp->block_size_shift;
2128        } else {
2129            s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
2130        }
2131    }
2132
2133    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2134
2135    if (s->inode.vdi_size > max_vdi_size) {
2136        error_setg(errp, "An image is too large."
2137                         " The maximum image size is %"PRIu64 "GB",
2138                         max_vdi_size / 1024 / 1024 / 1024);
2139        ret = -EINVAL;
2140        goto out;
2141    }
2142
2143    ret = do_sd_create(s, &vid, 0, errp);
2144    if (ret) {
2145        goto out;
2146    }
2147
2148    if (prealloc) {
2149        ret = sd_create_prealloc(opts->location, opts->size, errp);
2150    }
2151out:
2152    g_free(backing_file);
2153    g_free(buf);
2154    g_free(s->addr);
2155    g_free(s);
2156    return ret;
2157}
2158
2159static int coroutine_fn sd_co_create_opts(const char *filename, QemuOpts *opts,
2160                                          Error **errp)
2161{
2162    BlockdevCreateOptions *create_options = NULL;
2163    QDict *qdict, *location_qdict;
2164    Visitor *v;
2165    char *redundancy;
2166    Error *local_err = NULL;
2167    int ret;
2168
2169    redundancy = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
2170
2171    qdict = qemu_opts_to_qdict(opts, NULL);
2172    qdict_put_str(qdict, "driver", "sheepdog");
2173
2174    location_qdict = qdict_new();
2175    qdict_put(qdict, "location", location_qdict);
2176
2177    sd_parse_filename(filename, location_qdict, &local_err);
2178    if (local_err) {
2179        error_propagate(errp, local_err);
2180        ret = -EINVAL;
2181        goto fail;
2182    }
2183
2184    qdict_flatten(qdict);
2185
2186    /* Change legacy command line options into QMP ones */
2187    static const QDictRenames opt_renames[] = {
2188        { BLOCK_OPT_BACKING_FILE,       "backing-file" },
2189        { BLOCK_OPT_OBJECT_SIZE,        "object-size" },
2190        { NULL, NULL },
2191    };
2192
2193    if (!qdict_rename_keys(qdict, opt_renames, errp)) {
2194        ret = -EINVAL;
2195        goto fail;
2196    }
2197
2198    /* Get the QAPI object */
2199    v = qobject_input_visitor_new_flat_confused(qdict, errp);
2200    if (!v) {
2201        ret = -EINVAL;
2202        goto fail;
2203    }
2204
2205    visit_type_BlockdevCreateOptions(v, NULL, &create_options, &local_err);
2206    visit_free(v);
2207
2208    if (local_err) {
2209        error_propagate(errp, local_err);
2210        ret = -EINVAL;
2211        goto fail;
2212    }
2213
2214    assert(create_options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2215    create_options->u.sheepdog.size =
2216        ROUND_UP(create_options->u.sheepdog.size, BDRV_SECTOR_SIZE);
2217
2218    if (redundancy) {
2219        create_options->u.sheepdog.has_redundancy = true;
2220        create_options->u.sheepdog.redundancy =
2221            parse_redundancy_str(redundancy);
2222        if (create_options->u.sheepdog.redundancy == NULL) {
2223            error_setg(errp, "Invalid redundancy mode");
2224            ret = -EINVAL;
2225            goto fail;
2226        }
2227    }
2228
2229    ret = sd_co_create(create_options, errp);
2230fail:
2231    qapi_free_BlockdevCreateOptions(create_options);
2232    qobject_unref(qdict);
2233    g_free(redundancy);
2234    return ret;
2235}
2236
2237static void sd_close(BlockDriverState *bs)
2238{
2239    Error *local_err = NULL;
2240    BDRVSheepdogState *s = bs->opaque;
2241    SheepdogVdiReq hdr;
2242    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2243    unsigned int wlen, rlen = 0;
2244    int fd, ret;
2245
2246    trace_sheepdog_close(s->name);
2247
2248    fd = connect_to_sdog(s, &local_err);
2249    if (fd < 0) {
2250        error_report_err(local_err);
2251        return;
2252    }
2253
2254    memset(&hdr, 0, sizeof(hdr));
2255
2256    hdr.opcode = SD_OP_RELEASE_VDI;
2257    hdr.type = LOCK_TYPE_NORMAL;
2258    hdr.base_vdi_id = s->inode.vdi_id;
2259    wlen = strlen(s->name) + 1;
2260    hdr.data_length = wlen;
2261    hdr.flags = SD_FLAG_CMD_WRITE;
2262
2263    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2264                 s->name, &wlen, &rlen);
2265
2266    closesocket(fd);
2267
2268    if (!ret && rsp->result != SD_RES_SUCCESS &&
2269        rsp->result != SD_RES_VDI_NOT_LOCKED) {
2270        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2271    }
2272
2273    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
2274                       false, NULL, NULL, NULL, NULL);
2275    closesocket(s->fd);
2276    qapi_free_SocketAddress(s->addr);
2277}
2278
2279static int64_t sd_getlength(BlockDriverState *bs)
2280{
2281    BDRVSheepdogState *s = bs->opaque;
2282
2283    return s->inode.vdi_size;
2284}
2285
2286static int coroutine_fn sd_co_truncate(BlockDriverState *bs, int64_t offset,
2287                                       PreallocMode prealloc, Error **errp)
2288{
2289    BDRVSheepdogState *s = bs->opaque;
2290    int ret, fd;
2291    unsigned int datalen;
2292    uint64_t max_vdi_size;
2293    int64_t old_size = s->inode.vdi_size;
2294
2295    if (prealloc != PREALLOC_MODE_OFF && prealloc != PREALLOC_MODE_FULL) {
2296        error_setg(errp, "Unsupported preallocation mode '%s'",
2297                   PreallocMode_str(prealloc));
2298        return -ENOTSUP;
2299    }
2300
2301    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2302    if (offset < old_size) {
2303        error_setg(errp, "shrinking is not supported");
2304        return -EINVAL;
2305    } else if (offset > max_vdi_size) {
2306        error_setg(errp, "too big image size");
2307        return -EINVAL;
2308    }
2309
2310    fd = connect_to_sdog(s, errp);
2311    if (fd < 0) {
2312        return fd;
2313    }
2314
2315    /* we don't need to update entire object */
2316    datalen = SD_INODE_HEADER_SIZE;
2317    s->inode.vdi_size = offset;
2318    ret = write_object(fd, s->bs, (char *)&s->inode,
2319                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2320                       datalen, 0, false, s->cache_flags);
2321    close(fd);
2322
2323    if (ret < 0) {
2324        error_setg_errno(errp, -ret, "failed to update an inode");
2325        return ret;
2326    }
2327
2328    if (prealloc == PREALLOC_MODE_FULL) {
2329        ret = sd_prealloc(bs, old_size, offset, errp);
2330        if (ret < 0) {
2331            return ret;
2332        }
2333    }
2334
2335    return 0;
2336}
2337
2338/*
2339 * This function is called after writing data objects.  If we need to
2340 * update metadata, this sends a write request to the vdi object.
2341 */
2342static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2343{
2344    BDRVSheepdogState *s = acb->s;
2345    struct iovec iov;
2346    AIOReq *aio_req;
2347    uint32_t offset, data_len, mn, mx;
2348
2349    mn = acb->min_dirty_data_idx;
2350    mx = acb->max_dirty_data_idx;
2351    if (mn <= mx) {
2352        /* we need to update the vdi object. */
2353        ++acb->nr_pending;
2354        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2355            mn * sizeof(s->inode.data_vdi_id[0]);
2356        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2357
2358        acb->min_dirty_data_idx = UINT32_MAX;
2359        acb->max_dirty_data_idx = 0;
2360
2361        iov.iov_base = &s->inode;
2362        iov.iov_len = sizeof(s->inode);
2363        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2364                                data_len, offset, 0, false, 0, offset);
2365        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2366        if (--acb->nr_pending) {
2367            qemu_coroutine_yield();
2368        }
2369    }
2370}
2371
2372/* Delete current working VDI on the snapshot chain */
2373static bool sd_delete(BDRVSheepdogState *s)
2374{
2375    Error *local_err = NULL;
2376    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2377    SheepdogVdiReq hdr = {
2378        .opcode = SD_OP_DEL_VDI,
2379        .base_vdi_id = s->inode.vdi_id,
2380        .data_length = wlen,
2381        .flags = SD_FLAG_CMD_WRITE,
2382    };
2383    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2384    int fd, ret;
2385
2386    fd = connect_to_sdog(s, &local_err);
2387    if (fd < 0) {
2388        error_report_err(local_err);
2389        return false;
2390    }
2391
2392    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2393                 s->name, &wlen, &rlen);
2394    closesocket(fd);
2395    if (ret) {
2396        return false;
2397    }
2398    switch (rsp->result) {
2399    case SD_RES_NO_VDI:
2400        error_report("%s was already deleted", s->name);
2401        /* fall through */
2402    case SD_RES_SUCCESS:
2403        break;
2404    default:
2405        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2406        return false;
2407    }
2408
2409    return true;
2410}
2411
2412/*
2413 * Create a writable VDI from a snapshot
2414 */
2415static int sd_create_branch(BDRVSheepdogState *s)
2416{
2417    Error *local_err = NULL;
2418    int ret, fd;
2419    uint32_t vid;
2420    char *buf;
2421    bool deleted;
2422
2423    trace_sheepdog_create_branch_snapshot(s->inode.vdi_id);
2424
2425    buf = g_malloc(SD_INODE_SIZE);
2426
2427    /*
2428     * Even If deletion fails, we will just create extra snapshot based on
2429     * the working VDI which was supposed to be deleted. So no need to
2430     * false bail out.
2431     */
2432    deleted = sd_delete(s);
2433    ret = do_sd_create(s, &vid, !deleted, &local_err);
2434    if (ret) {
2435        error_report_err(local_err);
2436        goto out;
2437    }
2438
2439    trace_sheepdog_create_branch_created(vid);
2440
2441    fd = connect_to_sdog(s, &local_err);
2442    if (fd < 0) {
2443        error_report_err(local_err);
2444        ret = fd;
2445        goto out;
2446    }
2447
2448    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
2449                      s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2450
2451    closesocket(fd);
2452
2453    if (ret < 0) {
2454        goto out;
2455    }
2456
2457    memcpy(&s->inode, buf, sizeof(s->inode));
2458
2459    s->is_snapshot = false;
2460    ret = 0;
2461    trace_sheepdog_create_branch_new(s->inode.vdi_id);
2462
2463out:
2464    g_free(buf);
2465
2466    return ret;
2467}
2468
2469/*
2470 * Send I/O requests to the server.
2471 *
2472 * This function sends requests to the server, links the requests to
2473 * the inflight_list in BDRVSheepdogState, and exits without
2474 * waiting the response.  The responses are received in the
2475 * `aio_read_response' function which is called from the main loop as
2476 * a fd handler.
2477 *
2478 * Returns 1 when we need to wait a response, 0 when there is no sent
2479 * request and -errno in error cases.
2480 */
2481static void coroutine_fn sd_co_rw_vector(SheepdogAIOCB *acb)
2482{
2483    int ret = 0;
2484    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2485    unsigned long idx;
2486    uint32_t object_size;
2487    uint64_t oid;
2488    uint64_t offset;
2489    BDRVSheepdogState *s = acb->s;
2490    SheepdogInode *inode = &s->inode;
2491    AIOReq *aio_req;
2492
2493    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2494        /*
2495         * In the case we open the snapshot VDI, Sheepdog creates the
2496         * writable VDI when we do a write operation first.
2497         */
2498        ret = sd_create_branch(s);
2499        if (ret) {
2500            acb->ret = -EIO;
2501            return;
2502        }
2503    }
2504
2505    object_size = (UINT32_C(1) << inode->block_size_shift);
2506    idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2507    offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2508
2509    /*
2510     * Make sure we don't free the aiocb before we are done with all requests.
2511     * This additional reference is dropped at the end of this function.
2512     */
2513    acb->nr_pending++;
2514
2515    while (done != total) {
2516        uint8_t flags = 0;
2517        uint64_t old_oid = 0;
2518        bool create = false;
2519
2520        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2521
2522        len = MIN(total - done, object_size - offset);
2523
2524        switch (acb->aiocb_type) {
2525        case AIOCB_READ_UDATA:
2526            if (!inode->data_vdi_id[idx]) {
2527                qemu_iovec_memset(acb->qiov, done, 0, len);
2528                goto done;
2529            }
2530            break;
2531        case AIOCB_WRITE_UDATA:
2532            if (!inode->data_vdi_id[idx]) {
2533                create = true;
2534            } else if (!is_data_obj_writable(inode, idx)) {
2535                /* Copy-On-Write */
2536                create = true;
2537                old_oid = oid;
2538                flags = SD_FLAG_CMD_COW;
2539            }
2540            break;
2541        case AIOCB_DISCARD_OBJ:
2542            /*
2543             * We discard the object only when the whole object is
2544             * 1) allocated 2) trimmed. Otherwise, simply skip it.
2545             */
2546            if (len != object_size || inode->data_vdi_id[idx] == 0) {
2547                goto done;
2548            }
2549            break;
2550        default:
2551            break;
2552        }
2553
2554        if (create) {
2555            trace_sheepdog_co_rw_vector_update(inode->vdi_id, oid,
2556                                  vid_to_data_oid(inode->data_vdi_id[idx], idx),
2557                                  idx);
2558            oid = vid_to_data_oid(inode->vdi_id, idx);
2559            trace_sheepdog_co_rw_vector_new(oid);
2560        }
2561
2562        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2563                                old_oid,
2564                                acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2565                                0 : done);
2566        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2567                        acb->aiocb_type);
2568    done:
2569        offset = 0;
2570        idx++;
2571        done += len;
2572    }
2573    if (--acb->nr_pending) {
2574        qemu_coroutine_yield();
2575    }
2576}
2577
2578static void sd_aio_complete(SheepdogAIOCB *acb)
2579{
2580    BDRVSheepdogState *s;
2581    if (acb->aiocb_type == AIOCB_FLUSH_CACHE) {
2582        return;
2583    }
2584
2585    s = acb->s;
2586    qemu_co_mutex_lock(&s->queue_lock);
2587    QLIST_REMOVE(acb, aiocb_siblings);
2588    qemu_co_queue_restart_all(&s->overlapping_queue);
2589    qemu_co_mutex_unlock(&s->queue_lock);
2590}
2591
2592static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2593                                     int nb_sectors, QEMUIOVector *qiov,
2594                                     int flags)
2595{
2596    SheepdogAIOCB acb;
2597    int ret;
2598    int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2599    BDRVSheepdogState *s = bs->opaque;
2600
2601    assert(!flags);
2602    if (offset > s->inode.vdi_size) {
2603        ret = sd_co_truncate(bs, offset, PREALLOC_MODE_OFF, NULL);
2604        if (ret < 0) {
2605            return ret;
2606        }
2607    }
2608
2609    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_WRITE_UDATA);
2610    sd_co_rw_vector(&acb);
2611    sd_write_done(&acb);
2612    sd_aio_complete(&acb);
2613
2614    return acb.ret;
2615}
2616
2617static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2618                       int nb_sectors, QEMUIOVector *qiov)
2619{
2620    SheepdogAIOCB acb;
2621    BDRVSheepdogState *s = bs->opaque;
2622
2623    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_READ_UDATA);
2624    sd_co_rw_vector(&acb);
2625    sd_aio_complete(&acb);
2626
2627    return acb.ret;
2628}
2629
2630static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2631{
2632    BDRVSheepdogState *s = bs->opaque;
2633    SheepdogAIOCB acb;
2634    AIOReq *aio_req;
2635
2636    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2637        return 0;
2638    }
2639
2640    sd_aio_setup(&acb, s, NULL, 0, 0, AIOCB_FLUSH_CACHE);
2641
2642    acb.nr_pending++;
2643    aio_req = alloc_aio_req(s, &acb, vid_to_vdi_oid(s->inode.vdi_id),
2644                            0, 0, 0, false, 0, 0);
2645    add_aio_request(s, aio_req, NULL, 0, acb.aiocb_type);
2646
2647    if (--acb.nr_pending) {
2648        qemu_coroutine_yield();
2649    }
2650
2651    sd_aio_complete(&acb);
2652    return acb.ret;
2653}
2654
2655static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2656{
2657    Error *local_err = NULL;
2658    BDRVSheepdogState *s = bs->opaque;
2659    int ret, fd;
2660    uint32_t new_vid;
2661    SheepdogInode *inode;
2662    unsigned int datalen;
2663
2664    trace_sheepdog_snapshot_create_info(sn_info->name, sn_info->id_str, s->name,
2665                                        sn_info->vm_state_size, s->is_snapshot);
2666
2667    if (s->is_snapshot) {
2668        error_report("You can't create a snapshot of a snapshot VDI, "
2669                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2670
2671        return -EINVAL;
2672    }
2673
2674    trace_sheepdog_snapshot_create(sn_info->name, sn_info->id_str);
2675
2676    s->inode.vm_state_size = sn_info->vm_state_size;
2677    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2678    /* It appears that inode.tag does not require a NUL terminator,
2679     * which means this use of strncpy is ok.
2680     */
2681    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2682    /* we don't need to update entire object */
2683    datalen = SD_INODE_HEADER_SIZE;
2684    inode = g_malloc(datalen);
2685
2686    /* refresh inode. */
2687    fd = connect_to_sdog(s, &local_err);
2688    if (fd < 0) {
2689        error_report_err(local_err);
2690        ret = fd;
2691        goto cleanup;
2692    }
2693
2694    ret = write_object(fd, s->bs, (char *)&s->inode,
2695                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2696                       datalen, 0, false, s->cache_flags);
2697    if (ret < 0) {
2698        error_report("failed to write snapshot's inode.");
2699        goto cleanup;
2700    }
2701
2702    ret = do_sd_create(s, &new_vid, 1, &local_err);
2703    if (ret < 0) {
2704        error_reportf_err(local_err,
2705                          "failed to create inode for snapshot: ");
2706        goto cleanup;
2707    }
2708
2709    ret = read_object(fd, s->bs, (char *)inode,
2710                      vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2711                      s->cache_flags);
2712
2713    if (ret < 0) {
2714        error_report("failed to read new inode info. %s", strerror(errno));
2715        goto cleanup;
2716    }
2717
2718    memcpy(&s->inode, inode, datalen);
2719    trace_sheepdog_snapshot_create_inode(s->inode.name, s->inode.snap_id,
2720                                         s->inode.vdi_id);
2721
2722cleanup:
2723    g_free(inode);
2724    closesocket(fd);
2725    return ret;
2726}
2727
2728/*
2729 * We implement rollback(loadvm) operation to the specified snapshot by
2730 * 1) switch to the snapshot
2731 * 2) rely on sd_create_branch to delete working VDI and
2732 * 3) create a new working VDI based on the specified snapshot
2733 */
2734static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2735{
2736    BDRVSheepdogState *s = bs->opaque;
2737    BDRVSheepdogState *old_s;
2738    char tag[SD_MAX_VDI_TAG_LEN];
2739    uint32_t snapid = 0;
2740    int ret;
2741
2742    if (!sd_parse_snapid_or_tag(snapshot_id, &snapid, tag)) {
2743        return -EINVAL;
2744    }
2745
2746    old_s = g_new(BDRVSheepdogState, 1);
2747
2748    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2749
2750    ret = reload_inode(s, snapid, tag);
2751    if (ret) {
2752        goto out;
2753    }
2754
2755    ret = sd_create_branch(s);
2756    if (ret) {
2757        goto out;
2758    }
2759
2760    g_free(old_s);
2761
2762    return 0;
2763out:
2764    /* recover bdrv_sd_state */
2765    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2766    g_free(old_s);
2767
2768    error_report("failed to open. recover old bdrv_sd_state.");
2769
2770    return ret;
2771}
2772
2773#define NR_BATCHED_DISCARD 128
2774
2775static int remove_objects(BDRVSheepdogState *s, Error **errp)
2776{
2777    int fd, i = 0, nr_objs = 0;
2778    int ret;
2779    SheepdogInode *inode = &s->inode;
2780
2781    fd = connect_to_sdog(s, errp);
2782    if (fd < 0) {
2783        return fd;
2784    }
2785
2786    nr_objs = count_data_objs(inode);
2787    while (i < nr_objs) {
2788        int start_idx, nr_filled_idx;
2789
2790        while (i < nr_objs && !inode->data_vdi_id[i]) {
2791            i++;
2792        }
2793        start_idx = i;
2794
2795        nr_filled_idx = 0;
2796        while (i < nr_objs && nr_filled_idx < NR_BATCHED_DISCARD) {
2797            if (inode->data_vdi_id[i]) {
2798                inode->data_vdi_id[i] = 0;
2799                nr_filled_idx++;
2800            }
2801
2802            i++;
2803        }
2804
2805        ret = write_object(fd, s->bs,
2806                           (char *)&inode->data_vdi_id[start_idx],
2807                           vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
2808                           (i - start_idx) * sizeof(uint32_t),
2809                           offsetof(struct SheepdogInode,
2810                                    data_vdi_id[start_idx]),
2811                           false, s->cache_flags);
2812        if (ret < 0) {
2813            error_setg(errp, "Failed to discard snapshot inode");
2814            goto out;
2815        }
2816    }
2817
2818    ret = 0;
2819out:
2820    closesocket(fd);
2821    return ret;
2822}
2823
2824static int sd_snapshot_delete(BlockDriverState *bs,
2825                              const char *snapshot_id,
2826                              const char *name,
2827                              Error **errp)
2828{
2829    /*
2830     * FIXME should delete the snapshot matching both @snapshot_id and
2831     * @name, but @name not used here
2832     */
2833    unsigned long snap_id = 0;
2834    char snap_tag[SD_MAX_VDI_TAG_LEN];
2835    int fd, ret;
2836    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
2837    BDRVSheepdogState *s = bs->opaque;
2838    unsigned int wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, rlen = 0;
2839    uint32_t vid;
2840    SheepdogVdiReq hdr = {
2841        .opcode = SD_OP_DEL_VDI,
2842        .data_length = wlen,
2843        .flags = SD_FLAG_CMD_WRITE,
2844    };
2845    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2846
2847    ret = remove_objects(s, errp);
2848    if (ret) {
2849        return ret;
2850    }
2851
2852    memset(buf, 0, sizeof(buf));
2853    memset(snap_tag, 0, sizeof(snap_tag));
2854    pstrcpy(buf, SD_MAX_VDI_LEN, s->name);
2855    /* TODO Use sd_parse_snapid() once this mess is cleaned up */
2856    ret = qemu_strtoul(snapshot_id, NULL, 10, &snap_id);
2857    if (ret || snap_id > UINT32_MAX) {
2858        /*
2859         * FIXME Since qemu_strtoul() returns -EINVAL when
2860         * @snapshot_id is null, @snapshot_id is mandatory.  Correct
2861         * would be to require at least one of @snapshot_id and @name.
2862         */
2863        error_setg(errp, "Invalid snapshot ID: %s",
2864                         snapshot_id ? snapshot_id : "<null>");
2865        return -EINVAL;
2866    }
2867
2868    if (snap_id) {
2869        hdr.snapid = (uint32_t) snap_id;
2870    } else {
2871        /* FIXME I suspect we should use @name here */
2872        /* FIXME don't truncate silently */
2873        pstrcpy(snap_tag, sizeof(snap_tag), snapshot_id);
2874        pstrcpy(buf + SD_MAX_VDI_LEN, SD_MAX_VDI_TAG_LEN, snap_tag);
2875    }
2876
2877    ret = find_vdi_name(s, s->name, snap_id, snap_tag, &vid, true, errp);
2878    if (ret) {
2879        return ret;
2880    }
2881
2882    fd = connect_to_sdog(s, errp);
2883    if (fd < 0) {
2884        return fd;
2885    }
2886
2887    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2888                 buf, &wlen, &rlen);
2889    closesocket(fd);
2890    if (ret) {
2891        error_setg_errno(errp, -ret, "Couldn't send request to server");
2892        return ret;
2893    }
2894
2895    switch (rsp->result) {
2896    case SD_RES_NO_VDI:
2897        error_setg(errp, "Can't find the snapshot");
2898        return -ENOENT;
2899    case SD_RES_SUCCESS:
2900        break;
2901    default:
2902        error_setg(errp, "%s", sd_strerror(rsp->result));
2903        return -EIO;
2904    }
2905
2906    return 0;
2907}
2908
2909static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2910{
2911    Error *local_err = NULL;
2912    BDRVSheepdogState *s = bs->opaque;
2913    SheepdogReq req;
2914    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2915    QEMUSnapshotInfo *sn_tab = NULL;
2916    unsigned wlen, rlen;
2917    int found = 0;
2918    SheepdogInode *inode;
2919    unsigned long *vdi_inuse;
2920    unsigned int start_nr;
2921    uint64_t hval;
2922    uint32_t vid;
2923
2924    vdi_inuse = g_malloc(max);
2925    inode = g_malloc(SD_INODE_HEADER_SIZE);
2926
2927    fd = connect_to_sdog(s, &local_err);
2928    if (fd < 0) {
2929        error_report_err(local_err);
2930        ret = fd;
2931        goto out;
2932    }
2933
2934    rlen = max;
2935    wlen = 0;
2936
2937    memset(&req, 0, sizeof(req));
2938
2939    req.opcode = SD_OP_READ_VDIS;
2940    req.data_length = max;
2941
2942    ret = do_req(fd, s->bs, &req, vdi_inuse, &wlen, &rlen);
2943
2944    closesocket(fd);
2945    if (ret) {
2946        goto out;
2947    }
2948
2949    sn_tab = g_new0(QEMUSnapshotInfo, nr);
2950
2951    /* calculate a vdi id with hash function */
2952    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2953    start_nr = hval & (SD_NR_VDIS - 1);
2954
2955    fd = connect_to_sdog(s, &local_err);
2956    if (fd < 0) {
2957        error_report_err(local_err);
2958        ret = fd;
2959        goto out;
2960    }
2961
2962    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2963        if (!test_bit(vid, vdi_inuse)) {
2964            break;
2965        }
2966
2967        /* we don't need to read entire object */
2968        ret = read_object(fd, s->bs, (char *)inode,
2969                          vid_to_vdi_oid(vid),
2970                          0, SD_INODE_HEADER_SIZE, 0,
2971                          s->cache_flags);
2972
2973        if (ret) {
2974            continue;
2975        }
2976
2977        if (!strcmp(inode->name, s->name) && is_snapshot(inode)) {
2978            sn_tab[found].date_sec = inode->snap_ctime >> 32;
2979            sn_tab[found].date_nsec = inode->snap_ctime & 0xffffffff;
2980            sn_tab[found].vm_state_size = inode->vm_state_size;
2981            sn_tab[found].vm_clock_nsec = inode->vm_clock_nsec;
2982
2983            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2984                     "%" PRIu32, inode->snap_id);
2985            pstrcpy(sn_tab[found].name,
2986                    MIN(sizeof(sn_tab[found].name), sizeof(inode->tag)),
2987                    inode->tag);
2988            found++;
2989        }
2990    }
2991
2992    closesocket(fd);
2993out:
2994    *psn_tab = sn_tab;
2995
2996    g_free(vdi_inuse);
2997    g_free(inode);
2998
2999    if (ret < 0) {
3000        return ret;
3001    }
3002
3003    return found;
3004}
3005
3006static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
3007                                int64_t pos, int size, int load)
3008{
3009    Error *local_err = NULL;
3010    bool create;
3011    int fd, ret = 0, remaining = size;
3012    unsigned int data_len;
3013    uint64_t vmstate_oid;
3014    uint64_t offset;
3015    uint32_t vdi_index;
3016    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
3017    uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
3018
3019    fd = connect_to_sdog(s, &local_err);
3020    if (fd < 0) {
3021        error_report_err(local_err);
3022        return fd;
3023    }
3024
3025    while (remaining) {
3026        vdi_index = pos / object_size;
3027        offset = pos % object_size;
3028
3029        data_len = MIN(remaining, object_size - offset);
3030
3031        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
3032
3033        create = (offset == 0);
3034        if (load) {
3035            ret = read_object(fd, s->bs, (char *)data, vmstate_oid,
3036                              s->inode.nr_copies, data_len, offset,
3037                              s->cache_flags);
3038        } else {
3039            ret = write_object(fd, s->bs, (char *)data, vmstate_oid,
3040                               s->inode.nr_copies, data_len, offset, create,
3041                               s->cache_flags);
3042        }
3043
3044        if (ret < 0) {
3045            error_report("failed to save vmstate %s", strerror(errno));
3046            goto cleanup;
3047        }
3048
3049        pos += data_len;
3050        data += data_len;
3051        remaining -= data_len;
3052    }
3053    ret = size;
3054cleanup:
3055    closesocket(fd);
3056    return ret;
3057}
3058
3059static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3060                           int64_t pos)
3061{
3062    BDRVSheepdogState *s = bs->opaque;
3063    void *buf;
3064    int ret;
3065
3066    buf = qemu_blockalign(bs, qiov->size);
3067    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
3068    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
3069    qemu_vfree(buf);
3070
3071    return ret;
3072}
3073
3074static int sd_load_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3075                           int64_t pos)
3076{
3077    BDRVSheepdogState *s = bs->opaque;
3078    void *buf;
3079    int ret;
3080
3081    buf = qemu_blockalign(bs, qiov->size);
3082    ret = do_load_save_vmstate(s, buf, pos, qiov->size, 1);
3083    qemu_iovec_from_buf(qiov, 0, buf, qiov->size);
3084    qemu_vfree(buf);
3085
3086    return ret;
3087}
3088
3089
3090static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
3091                                      int bytes)
3092{
3093    SheepdogAIOCB acb;
3094    BDRVSheepdogState *s = bs->opaque;
3095    QEMUIOVector discard_iov;
3096    struct iovec iov;
3097    uint32_t zero = 0;
3098
3099    if (!s->discard_supported) {
3100        return 0;
3101    }
3102
3103    memset(&discard_iov, 0, sizeof(discard_iov));
3104    memset(&iov, 0, sizeof(iov));
3105    iov.iov_base = &zero;
3106    iov.iov_len = sizeof(zero);
3107    discard_iov.iov = &iov;
3108    discard_iov.niov = 1;
3109    if (!QEMU_IS_ALIGNED(offset | bytes, BDRV_SECTOR_SIZE)) {
3110        return -ENOTSUP;
3111    }
3112    sd_aio_setup(&acb, s, &discard_iov, offset >> BDRV_SECTOR_BITS,
3113                 bytes >> BDRV_SECTOR_BITS, AIOCB_DISCARD_OBJ);
3114    sd_co_rw_vector(&acb);
3115    sd_aio_complete(&acb);
3116
3117    return acb.ret;
3118}
3119
3120static coroutine_fn int
3121sd_co_block_status(BlockDriverState *bs, bool want_zero, int64_t offset,
3122                   int64_t bytes, int64_t *pnum, int64_t *map,
3123                   BlockDriverState **file)
3124{
3125    BDRVSheepdogState *s = bs->opaque;
3126    SheepdogInode *inode = &s->inode;
3127    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3128    unsigned long start = offset / object_size,
3129                  end = DIV_ROUND_UP(offset + bytes, object_size);
3130    unsigned long idx;
3131    *map = offset;
3132    int ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
3133
3134    for (idx = start; idx < end; idx++) {
3135        if (inode->data_vdi_id[idx] == 0) {
3136            break;
3137        }
3138    }
3139    if (idx == start) {
3140        /* Get the longest length of unallocated sectors */
3141        ret = 0;
3142        for (idx = start + 1; idx < end; idx++) {
3143            if (inode->data_vdi_id[idx] != 0) {
3144                break;
3145            }
3146        }
3147    }
3148
3149    *pnum = (idx - start) * object_size;
3150    if (*pnum > bytes) {
3151        *pnum = bytes;
3152    }
3153    if (ret > 0 && ret & BDRV_BLOCK_OFFSET_VALID) {
3154        *file = bs;
3155    }
3156    return ret;
3157}
3158
3159static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
3160{
3161    BDRVSheepdogState *s = bs->opaque;
3162    SheepdogInode *inode = &s->inode;
3163    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3164    unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
3165    uint64_t size = 0;
3166
3167    for (i = 0; i < last; i++) {
3168        if (inode->data_vdi_id[i] == 0) {
3169            continue;
3170        }
3171        size += object_size;
3172    }
3173    return size;
3174}
3175
3176static QemuOptsList sd_create_opts = {
3177    .name = "sheepdog-create-opts",
3178    .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
3179    .desc = {
3180        {
3181            .name = BLOCK_OPT_SIZE,
3182            .type = QEMU_OPT_SIZE,
3183            .help = "Virtual disk size"
3184        },
3185        {
3186            .name = BLOCK_OPT_BACKING_FILE,
3187            .type = QEMU_OPT_STRING,
3188            .help = "File name of a base image"
3189        },
3190        {
3191            .name = BLOCK_OPT_PREALLOC,
3192            .type = QEMU_OPT_STRING,
3193            .help = "Preallocation mode (allowed values: off, full)"
3194        },
3195        {
3196            .name = BLOCK_OPT_REDUNDANCY,
3197            .type = QEMU_OPT_STRING,
3198            .help = "Redundancy of the image"
3199        },
3200        {
3201            .name = BLOCK_OPT_OBJECT_SIZE,
3202            .type = QEMU_OPT_SIZE,
3203            .help = "Object size of the image"
3204        },
3205        { /* end of list */ }
3206    }
3207};
3208
3209static const char *const sd_strong_runtime_opts[] = {
3210    "vdi",
3211    "snap-id",
3212    "tag",
3213    "server.",
3214
3215    NULL
3216};
3217
3218static BlockDriver bdrv_sheepdog = {
3219    .format_name                  = "sheepdog",
3220    .protocol_name                = "sheepdog",
3221    .instance_size                = sizeof(BDRVSheepdogState),
3222    .bdrv_parse_filename          = sd_parse_filename,
3223    .bdrv_file_open               = sd_open,
3224    .bdrv_reopen_prepare          = sd_reopen_prepare,
3225    .bdrv_reopen_commit           = sd_reopen_commit,
3226    .bdrv_reopen_abort            = sd_reopen_abort,
3227    .bdrv_close                   = sd_close,
3228    .bdrv_co_create               = sd_co_create,
3229    .bdrv_co_create_opts          = sd_co_create_opts,
3230    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3231    .bdrv_getlength               = sd_getlength,
3232    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3233    .bdrv_co_truncate             = sd_co_truncate,
3234
3235    .bdrv_co_readv                = sd_co_readv,
3236    .bdrv_co_writev               = sd_co_writev,
3237    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3238    .bdrv_co_pdiscard             = sd_co_pdiscard,
3239    .bdrv_co_block_status         = sd_co_block_status,
3240
3241    .bdrv_snapshot_create         = sd_snapshot_create,
3242    .bdrv_snapshot_goto           = sd_snapshot_goto,
3243    .bdrv_snapshot_delete         = sd_snapshot_delete,
3244    .bdrv_snapshot_list           = sd_snapshot_list,
3245
3246    .bdrv_save_vmstate            = sd_save_vmstate,
3247    .bdrv_load_vmstate            = sd_load_vmstate,
3248
3249    .bdrv_detach_aio_context      = sd_detach_aio_context,
3250    .bdrv_attach_aio_context      = sd_attach_aio_context,
3251
3252    .create_opts                  = &sd_create_opts,
3253    .strong_runtime_opts          = sd_strong_runtime_opts,
3254};
3255
3256static BlockDriver bdrv_sheepdog_tcp = {
3257    .format_name                  = "sheepdog",
3258    .protocol_name                = "sheepdog+tcp",
3259    .instance_size                = sizeof(BDRVSheepdogState),
3260    .bdrv_parse_filename          = sd_parse_filename,
3261    .bdrv_file_open               = sd_open,
3262    .bdrv_reopen_prepare          = sd_reopen_prepare,
3263    .bdrv_reopen_commit           = sd_reopen_commit,
3264    .bdrv_reopen_abort            = sd_reopen_abort,
3265    .bdrv_close                   = sd_close,
3266    .bdrv_co_create               = sd_co_create,
3267    .bdrv_co_create_opts          = sd_co_create_opts,
3268    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3269    .bdrv_getlength               = sd_getlength,
3270    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3271    .bdrv_co_truncate             = sd_co_truncate,
3272
3273    .bdrv_co_readv                = sd_co_readv,
3274    .bdrv_co_writev               = sd_co_writev,
3275    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3276    .bdrv_co_pdiscard             = sd_co_pdiscard,
3277    .bdrv_co_block_status         = sd_co_block_status,
3278
3279    .bdrv_snapshot_create         = sd_snapshot_create,
3280    .bdrv_snapshot_goto           = sd_snapshot_goto,
3281    .bdrv_snapshot_delete         = sd_snapshot_delete,
3282    .bdrv_snapshot_list           = sd_snapshot_list,
3283
3284    .bdrv_save_vmstate            = sd_save_vmstate,
3285    .bdrv_load_vmstate            = sd_load_vmstate,
3286
3287    .bdrv_detach_aio_context      = sd_detach_aio_context,
3288    .bdrv_attach_aio_context      = sd_attach_aio_context,
3289
3290    .create_opts                  = &sd_create_opts,
3291    .strong_runtime_opts          = sd_strong_runtime_opts,
3292};
3293
3294static BlockDriver bdrv_sheepdog_unix = {
3295    .format_name                  = "sheepdog",
3296    .protocol_name                = "sheepdog+unix",
3297    .instance_size                = sizeof(BDRVSheepdogState),
3298    .bdrv_parse_filename          = sd_parse_filename,
3299    .bdrv_file_open               = sd_open,
3300    .bdrv_reopen_prepare          = sd_reopen_prepare,
3301    .bdrv_reopen_commit           = sd_reopen_commit,
3302    .bdrv_reopen_abort            = sd_reopen_abort,
3303    .bdrv_close                   = sd_close,
3304    .bdrv_co_create               = sd_co_create,
3305    .bdrv_co_create_opts          = sd_co_create_opts,
3306    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3307    .bdrv_getlength               = sd_getlength,
3308    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3309    .bdrv_co_truncate             = sd_co_truncate,
3310
3311    .bdrv_co_readv                = sd_co_readv,
3312    .bdrv_co_writev               = sd_co_writev,
3313    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3314    .bdrv_co_pdiscard             = sd_co_pdiscard,
3315    .bdrv_co_block_status         = sd_co_block_status,
3316
3317    .bdrv_snapshot_create         = sd_snapshot_create,
3318    .bdrv_snapshot_goto           = sd_snapshot_goto,
3319    .bdrv_snapshot_delete         = sd_snapshot_delete,
3320    .bdrv_snapshot_list           = sd_snapshot_list,
3321
3322    .bdrv_save_vmstate            = sd_save_vmstate,
3323    .bdrv_load_vmstate            = sd_load_vmstate,
3324
3325    .bdrv_detach_aio_context      = sd_detach_aio_context,
3326    .bdrv_attach_aio_context      = sd_attach_aio_context,
3327
3328    .create_opts                  = &sd_create_opts,
3329    .strong_runtime_opts          = sd_strong_runtime_opts,
3330};
3331
3332static void bdrv_sheepdog_init(void)
3333{
3334    bdrv_register(&bdrv_sheepdog);
3335    bdrv_register(&bdrv_sheepdog_tcp);
3336    bdrv_register(&bdrv_sheepdog_unix);
3337}
3338block_init(bdrv_sheepdog_init);
3339