qemu/block/sheepdog.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public License version
   6 * 2 as published by the Free Software Foundation.
   7 *
   8 * You should have received a copy of the GNU General Public License
   9 * along with this program. If not, see <http://www.gnu.org/licenses/>.
  10 *
  11 * Contributions after 2012-01-13 are licensed under the terms of the
  12 * GNU GPL, version 2 or (at your option) any later version.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qapi/error.h"
  17#include "qapi/qapi-visit-sockets.h"
  18#include "qapi/qapi-visit-block-core.h"
  19#include "qapi/qmp/qdict.h"
  20#include "qapi/qobject-input-visitor.h"
  21#include "qapi/qobject-output-visitor.h"
  22#include "qemu/uri.h"
  23#include "qemu/error-report.h"
  24#include "qemu/option.h"
  25#include "qemu/sockets.h"
  26#include "block/block_int.h"
  27#include "block/qdict.h"
  28#include "sysemu/block-backend.h"
  29#include "qemu/bitops.h"
  30#include "qemu/cutils.h"
  31#include "trace.h"
  32
  33#define SD_PROTO_VER 0x01
  34
  35#define SD_DEFAULT_ADDR "localhost"
  36#define SD_DEFAULT_PORT 7000
  37
  38#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
  39#define SD_OP_READ_OBJ       0x02
  40#define SD_OP_WRITE_OBJ      0x03
  41/* 0x04 is used internally by Sheepdog */
  42
  43#define SD_OP_NEW_VDI        0x11
  44#define SD_OP_LOCK_VDI       0x12
  45#define SD_OP_RELEASE_VDI    0x13
  46#define SD_OP_GET_VDI_INFO   0x14
  47#define SD_OP_READ_VDIS      0x15
  48#define SD_OP_FLUSH_VDI      0x16
  49#define SD_OP_DEL_VDI        0x17
  50#define SD_OP_GET_CLUSTER_DEFAULT   0x18
  51
  52#define SD_FLAG_CMD_WRITE    0x01
  53#define SD_FLAG_CMD_COW      0x02
  54#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
  55#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
  56
  57#define SD_RES_SUCCESS       0x00 /* Success */
  58#define SD_RES_UNKNOWN       0x01 /* Unknown error */
  59#define SD_RES_NO_OBJ        0x02 /* No object found */
  60#define SD_RES_EIO           0x03 /* I/O error */
  61#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
  62#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
  63#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
  64#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
  65#define SD_RES_NO_VDI        0x08 /* No vdi found */
  66#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
  67#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
  68#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
  69#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
  70#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
  71#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
  72#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
  73#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
  74#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
  75#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
  76#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
  77#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
  78#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
  79#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
  80#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
  81#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
  82#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
  83#define SD_RES_READONLY      0x1A /* Object is read-only */
  84
  85/*
  86 * Object ID rules
  87 *
  88 *  0 - 19 (20 bits): data object space
  89 * 20 - 31 (12 bits): reserved data object space
  90 * 32 - 55 (24 bits): vdi object space
  91 * 56 - 59 ( 4 bits): reserved vdi object space
  92 * 60 - 63 ( 4 bits): object type identifier space
  93 */
  94
  95#define VDI_SPACE_SHIFT   32
  96#define VDI_BIT (UINT64_C(1) << 63)
  97#define VMSTATE_BIT (UINT64_C(1) << 62)
  98#define MAX_DATA_OBJS (UINT64_C(1) << 20)
  99#define MAX_CHILDREN 1024
 100#define SD_MAX_VDI_LEN 256
 101#define SD_MAX_VDI_TAG_LEN 256
 102#define SD_NR_VDIS   (1U << 24)
 103#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
 104#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
 105#define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
 106/*
 107 * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
 108 * (SD_EC_MAX_STRIP - 1) for parity strips
 109 *
 110 * SD_MAX_COPIES is sum of number of data strips and parity strips.
 111 */
 112#define SD_EC_MAX_STRIP 16
 113#define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
 114
 115#define SD_INODE_SIZE (sizeof(SheepdogInode))
 116#define CURRENT_VDI_ID 0
 117
 118#define LOCK_TYPE_NORMAL 0
 119#define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
 120
 121typedef struct SheepdogReq {
 122    uint8_t proto_ver;
 123    uint8_t opcode;
 124    uint16_t flags;
 125    uint32_t epoch;
 126    uint32_t id;
 127    uint32_t data_length;
 128    uint32_t opcode_specific[8];
 129} SheepdogReq;
 130
 131typedef struct SheepdogRsp {
 132    uint8_t proto_ver;
 133    uint8_t opcode;
 134    uint16_t flags;
 135    uint32_t epoch;
 136    uint32_t id;
 137    uint32_t data_length;
 138    uint32_t result;
 139    uint32_t opcode_specific[7];
 140} SheepdogRsp;
 141
 142typedef struct SheepdogObjReq {
 143    uint8_t proto_ver;
 144    uint8_t opcode;
 145    uint16_t flags;
 146    uint32_t epoch;
 147    uint32_t id;
 148    uint32_t data_length;
 149    uint64_t oid;
 150    uint64_t cow_oid;
 151    uint8_t copies;
 152    uint8_t copy_policy;
 153    uint8_t reserved[6];
 154    uint64_t offset;
 155} SheepdogObjReq;
 156
 157typedef struct SheepdogObjRsp {
 158    uint8_t proto_ver;
 159    uint8_t opcode;
 160    uint16_t flags;
 161    uint32_t epoch;
 162    uint32_t id;
 163    uint32_t data_length;
 164    uint32_t result;
 165    uint8_t copies;
 166    uint8_t copy_policy;
 167    uint8_t reserved[2];
 168    uint32_t pad[6];
 169} SheepdogObjRsp;
 170
 171typedef struct SheepdogVdiReq {
 172    uint8_t proto_ver;
 173    uint8_t opcode;
 174    uint16_t flags;
 175    uint32_t epoch;
 176    uint32_t id;
 177    uint32_t data_length;
 178    uint64_t vdi_size;
 179    uint32_t base_vdi_id;
 180    uint8_t copies;
 181    uint8_t copy_policy;
 182    uint8_t store_policy;
 183    uint8_t block_size_shift;
 184    uint32_t snapid;
 185    uint32_t type;
 186    uint32_t pad[2];
 187} SheepdogVdiReq;
 188
 189typedef struct SheepdogVdiRsp {
 190    uint8_t proto_ver;
 191    uint8_t opcode;
 192    uint16_t flags;
 193    uint32_t epoch;
 194    uint32_t id;
 195    uint32_t data_length;
 196    uint32_t result;
 197    uint32_t rsvd;
 198    uint32_t vdi_id;
 199    uint32_t pad[5];
 200} SheepdogVdiRsp;
 201
 202typedef struct SheepdogClusterRsp {
 203    uint8_t proto_ver;
 204    uint8_t opcode;
 205    uint16_t flags;
 206    uint32_t epoch;
 207    uint32_t id;
 208    uint32_t data_length;
 209    uint32_t result;
 210    uint8_t nr_copies;
 211    uint8_t copy_policy;
 212    uint8_t block_size_shift;
 213    uint8_t __pad1;
 214    uint32_t __pad2[6];
 215} SheepdogClusterRsp;
 216
 217typedef struct SheepdogInode {
 218    char name[SD_MAX_VDI_LEN];
 219    char tag[SD_MAX_VDI_TAG_LEN];
 220    uint64_t ctime;
 221    uint64_t snap_ctime;
 222    uint64_t vm_clock_nsec;
 223    uint64_t vdi_size;
 224    uint64_t vm_state_size;
 225    uint16_t copy_policy;
 226    uint8_t nr_copies;
 227    uint8_t block_size_shift;
 228    uint32_t snap_id;
 229    uint32_t vdi_id;
 230    uint32_t parent_vdi_id;
 231    uint32_t child_vdi_id[MAX_CHILDREN];
 232    uint32_t data_vdi_id[MAX_DATA_OBJS];
 233} SheepdogInode;
 234
 235#define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
 236
 237/*
 238 * 64 bit FNV-1a non-zero initial basis
 239 */
 240#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
 241
 242/*
 243 * 64 bit Fowler/Noll/Vo FNV-1a hash code
 244 */
 245static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 246{
 247    unsigned char *bp = buf;
 248    unsigned char *be = bp + len;
 249    while (bp < be) {
 250        hval ^= (uint64_t) *bp++;
 251        hval += (hval << 1) + (hval << 4) + (hval << 5) +
 252            (hval << 7) + (hval << 8) + (hval << 40);
 253    }
 254    return hval;
 255}
 256
 257static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
 258{
 259    return inode->vdi_id == inode->data_vdi_id[idx];
 260}
 261
 262static inline bool is_data_obj(uint64_t oid)
 263{
 264    return !(VDI_BIT & oid);
 265}
 266
 267static inline uint64_t data_oid_to_idx(uint64_t oid)
 268{
 269    return oid & (MAX_DATA_OBJS - 1);
 270}
 271
 272static inline uint32_t oid_to_vid(uint64_t oid)
 273{
 274    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
 275}
 276
 277static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 278{
 279    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
 280}
 281
 282static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
 283{
 284    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 285}
 286
 287static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
 288{
 289    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 290}
 291
 292static inline bool is_snapshot(struct SheepdogInode *inode)
 293{
 294    return !!inode->snap_ctime;
 295}
 296
 297static inline size_t count_data_objs(const struct SheepdogInode *inode)
 298{
 299    return DIV_ROUND_UP(inode->vdi_size,
 300                        (1UL << inode->block_size_shift));
 301}
 302
 303typedef struct SheepdogAIOCB SheepdogAIOCB;
 304typedef struct BDRVSheepdogState BDRVSheepdogState;
 305
 306typedef struct AIOReq {
 307    SheepdogAIOCB *aiocb;
 308    unsigned int iov_offset;
 309
 310    uint64_t oid;
 311    uint64_t base_oid;
 312    uint64_t offset;
 313    unsigned int data_len;
 314    uint8_t flags;
 315    uint32_t id;
 316    bool create;
 317
 318    QLIST_ENTRY(AIOReq) aio_siblings;
 319} AIOReq;
 320
 321enum AIOCBState {
 322    AIOCB_WRITE_UDATA,
 323    AIOCB_READ_UDATA,
 324    AIOCB_FLUSH_CACHE,
 325    AIOCB_DISCARD_OBJ,
 326};
 327
 328#define AIOCBOverlapping(x, y)                                 \
 329    (!(x->max_affect_data_idx < y->min_affect_data_idx          \
 330       || y->max_affect_data_idx < x->min_affect_data_idx))
 331
 332struct SheepdogAIOCB {
 333    BDRVSheepdogState *s;
 334
 335    QEMUIOVector *qiov;
 336
 337    int64_t sector_num;
 338    int nb_sectors;
 339
 340    int ret;
 341    enum AIOCBState aiocb_type;
 342
 343    Coroutine *coroutine;
 344    int nr_pending;
 345
 346    uint32_t min_affect_data_idx;
 347    uint32_t max_affect_data_idx;
 348
 349    /*
 350     * The difference between affect_data_idx and dirty_data_idx:
 351     * affect_data_idx represents range of index of all request types.
 352     * dirty_data_idx represents range of index updated by COW requests.
 353     * dirty_data_idx is used for updating an inode object.
 354     */
 355    uint32_t min_dirty_data_idx;
 356    uint32_t max_dirty_data_idx;
 357
 358    QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
 359};
 360
 361struct BDRVSheepdogState {
 362    BlockDriverState *bs;
 363    AioContext *aio_context;
 364
 365    SheepdogInode inode;
 366
 367    char name[SD_MAX_VDI_LEN];
 368    bool is_snapshot;
 369    uint32_t cache_flags;
 370    bool discard_supported;
 371
 372    SocketAddress *addr;
 373    int fd;
 374
 375    CoMutex lock;
 376    Coroutine *co_send;
 377    Coroutine *co_recv;
 378
 379    uint32_t aioreq_seq_num;
 380
 381    /* Every aio request must be linked to either of these queues. */
 382    QLIST_HEAD(, AIOReq) inflight_aio_head;
 383    QLIST_HEAD(, AIOReq) failed_aio_head;
 384
 385    CoMutex queue_lock;
 386    CoQueue overlapping_queue;
 387    QLIST_HEAD(, SheepdogAIOCB) inflight_aiocb_head;
 388};
 389
 390typedef struct BDRVSheepdogReopenState {
 391    int fd;
 392    int cache_flags;
 393} BDRVSheepdogReopenState;
 394
 395static const char *sd_strerror(int err)
 396{
 397    int i;
 398
 399    static const struct {
 400        int err;
 401        const char *desc;
 402    } errors[] = {
 403        {SD_RES_SUCCESS, "Success"},
 404        {SD_RES_UNKNOWN, "Unknown error"},
 405        {SD_RES_NO_OBJ, "No object found"},
 406        {SD_RES_EIO, "I/O error"},
 407        {SD_RES_VDI_EXIST, "VDI exists already"},
 408        {SD_RES_INVALID_PARMS, "Invalid parameters"},
 409        {SD_RES_SYSTEM_ERROR, "System error"},
 410        {SD_RES_VDI_LOCKED, "VDI is already locked"},
 411        {SD_RES_NO_VDI, "No vdi found"},
 412        {SD_RES_NO_BASE_VDI, "No base VDI found"},
 413        {SD_RES_VDI_READ, "Failed read the requested VDI"},
 414        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
 415        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
 416        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
 417        {SD_RES_NO_TAG, "Failed to find the requested tag"},
 418        {SD_RES_STARTUP, "The system is still booting"},
 419        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
 420        {SD_RES_SHUTDOWN, "The system is shutting down"},
 421        {SD_RES_NO_MEM, "Out of memory on the server"},
 422        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
 423        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
 424        {SD_RES_NO_SPACE, "Server has no space for new objects"},
 425        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
 426        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
 427        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
 428        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
 429        {SD_RES_READONLY, "Object is read-only"},
 430    };
 431
 432    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
 433        if (errors[i].err == err) {
 434            return errors[i].desc;
 435        }
 436    }
 437
 438    return "Invalid error code";
 439}
 440
 441/*
 442 * Sheepdog I/O handling:
 443 *
 444 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
 445 *    link the requests to the inflight_list in the
 446 *    BDRVSheepdogState.  The function yields while waiting for
 447 *    receiving the response.
 448 *
 449 * 2. We receive the response in aio_read_response, the fd handler to
 450 *    the sheepdog connection.  We switch back to sd_co_readv/sd_writev
 451 *    after all the requests belonging to the AIOCB are finished.  If
 452 *    needed, sd_co_writev will send another requests for the vdi object.
 453 */
 454
 455static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
 456                                    uint64_t oid, unsigned int data_len,
 457                                    uint64_t offset, uint8_t flags, bool create,
 458                                    uint64_t base_oid, unsigned int iov_offset)
 459{
 460    AIOReq *aio_req;
 461
 462    aio_req = g_malloc(sizeof(*aio_req));
 463    aio_req->aiocb = acb;
 464    aio_req->iov_offset = iov_offset;
 465    aio_req->oid = oid;
 466    aio_req->base_oid = base_oid;
 467    aio_req->offset = offset;
 468    aio_req->data_len = data_len;
 469    aio_req->flags = flags;
 470    aio_req->id = s->aioreq_seq_num++;
 471    aio_req->create = create;
 472
 473    acb->nr_pending++;
 474    return aio_req;
 475}
 476
 477static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
 478{
 479    SheepdogAIOCB *cb;
 480
 481retry:
 482    QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
 483        if (AIOCBOverlapping(acb, cb)) {
 484            qemu_co_queue_wait(&s->overlapping_queue, &s->queue_lock);
 485            goto retry;
 486        }
 487    }
 488}
 489
 490static void sd_aio_setup(SheepdogAIOCB *acb, BDRVSheepdogState *s,
 491                         QEMUIOVector *qiov, int64_t sector_num, int nb_sectors,
 492                         int type)
 493{
 494    uint32_t object_size;
 495
 496    object_size = (UINT32_C(1) << s->inode.block_size_shift);
 497
 498    acb->s = s;
 499
 500    acb->qiov = qiov;
 501
 502    acb->sector_num = sector_num;
 503    acb->nb_sectors = nb_sectors;
 504
 505    acb->coroutine = qemu_coroutine_self();
 506    acb->ret = 0;
 507    acb->nr_pending = 0;
 508
 509    acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
 510    acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
 511                              acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
 512
 513    acb->min_dirty_data_idx = UINT32_MAX;
 514    acb->max_dirty_data_idx = 0;
 515    acb->aiocb_type = type;
 516
 517    if (type == AIOCB_FLUSH_CACHE) {
 518        return;
 519    }
 520
 521    qemu_co_mutex_lock(&s->queue_lock);
 522    wait_for_overlapping_aiocb(s, acb);
 523    QLIST_INSERT_HEAD(&s->inflight_aiocb_head, acb, aiocb_siblings);
 524    qemu_co_mutex_unlock(&s->queue_lock);
 525}
 526
 527static SocketAddress *sd_server_config(QDict *options, Error **errp)
 528{
 529    QDict *server = NULL;
 530    Visitor *iv = NULL;
 531    SocketAddress *saddr = NULL;
 532    Error *local_err = NULL;
 533
 534    qdict_extract_subqdict(options, &server, "server.");
 535
 536    iv = qobject_input_visitor_new_flat_confused(server, errp);
 537    if (!iv) {
 538        goto done;
 539    }
 540
 541    visit_type_SocketAddress(iv, NULL, &saddr, &local_err);
 542    if (local_err) {
 543        error_propagate(errp, local_err);
 544        goto done;
 545    }
 546
 547done:
 548    visit_free(iv);
 549    qobject_unref(server);
 550    return saddr;
 551}
 552
 553/* Return -EIO in case of error, file descriptor on success */
 554static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
 555{
 556    int fd;
 557
 558    fd = socket_connect(s->addr, errp);
 559
 560    if (s->addr->type == SOCKET_ADDRESS_TYPE_INET && fd >= 0) {
 561        int ret = socket_set_nodelay(fd);
 562        if (ret < 0) {
 563            warn_report("can't set TCP_NODELAY: %s", strerror(errno));
 564        }
 565    }
 566
 567    if (fd >= 0) {
 568        qemu_set_nonblock(fd);
 569    } else {
 570        fd = -EIO;
 571    }
 572
 573    return fd;
 574}
 575
 576/* Return 0 on success and -errno in case of error */
 577static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
 578                                    unsigned int *wlen)
 579{
 580    int ret;
 581
 582    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
 583    if (ret != sizeof(*hdr)) {
 584        error_report("failed to send a req, %s", strerror(errno));
 585        return -errno;
 586    }
 587
 588    ret = qemu_co_send(sockfd, data, *wlen);
 589    if (ret != *wlen) {
 590        error_report("failed to send a req, %s", strerror(errno));
 591        return -errno;
 592    }
 593
 594    return ret;
 595}
 596
 597typedef struct SheepdogReqCo {
 598    int sockfd;
 599    BlockDriverState *bs;
 600    AioContext *aio_context;
 601    SheepdogReq *hdr;
 602    void *data;
 603    unsigned int *wlen;
 604    unsigned int *rlen;
 605    int ret;
 606    bool finished;
 607    Coroutine *co;
 608} SheepdogReqCo;
 609
 610static void restart_co_req(void *opaque)
 611{
 612    SheepdogReqCo *srco = opaque;
 613
 614    aio_co_wake(srco->co);
 615}
 616
 617static coroutine_fn void do_co_req(void *opaque)
 618{
 619    int ret;
 620    SheepdogReqCo *srco = opaque;
 621    int sockfd = srco->sockfd;
 622    SheepdogReq *hdr = srco->hdr;
 623    void *data = srco->data;
 624    unsigned int *wlen = srco->wlen;
 625    unsigned int *rlen = srco->rlen;
 626
 627    srco->co = qemu_coroutine_self();
 628    aio_set_fd_handler(srco->aio_context, sockfd, false,
 629                       NULL, restart_co_req, NULL, srco);
 630
 631    ret = send_co_req(sockfd, hdr, data, wlen);
 632    if (ret < 0) {
 633        goto out;
 634    }
 635
 636    aio_set_fd_handler(srco->aio_context, sockfd, false,
 637                       restart_co_req, NULL, NULL, srco);
 638
 639    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
 640    if (ret != sizeof(*hdr)) {
 641        error_report("failed to get a rsp, %s", strerror(errno));
 642        ret = -errno;
 643        goto out;
 644    }
 645
 646    if (*rlen > hdr->data_length) {
 647        *rlen = hdr->data_length;
 648    }
 649
 650    if (*rlen) {
 651        ret = qemu_co_recv(sockfd, data, *rlen);
 652        if (ret != *rlen) {
 653            error_report("failed to get the data, %s", strerror(errno));
 654            ret = -errno;
 655            goto out;
 656        }
 657    }
 658    ret = 0;
 659out:
 660    /* there is at most one request for this sockfd, so it is safe to
 661     * set each handler to NULL. */
 662    aio_set_fd_handler(srco->aio_context, sockfd, false,
 663                       NULL, NULL, NULL, NULL);
 664
 665    srco->co = NULL;
 666    srco->ret = ret;
 667    /* Set srco->finished before reading bs->wakeup.  */
 668    atomic_mb_set(&srco->finished, true);
 669    if (srco->bs) {
 670        bdrv_wakeup(srco->bs);
 671    }
 672}
 673
 674/*
 675 * Send the request to the sheep in a synchronous manner.
 676 *
 677 * Return 0 on success, -errno in case of error.
 678 */
 679static int do_req(int sockfd, BlockDriverState *bs, SheepdogReq *hdr,
 680                  void *data, unsigned int *wlen, unsigned int *rlen)
 681{
 682    Coroutine *co;
 683    SheepdogReqCo srco = {
 684        .sockfd = sockfd,
 685        .aio_context = bs ? bdrv_get_aio_context(bs) : qemu_get_aio_context(),
 686        .bs = bs,
 687        .hdr = hdr,
 688        .data = data,
 689        .wlen = wlen,
 690        .rlen = rlen,
 691        .ret = 0,
 692        .finished = false,
 693    };
 694
 695    if (qemu_in_coroutine()) {
 696        do_co_req(&srco);
 697    } else {
 698        co = qemu_coroutine_create(do_co_req, &srco);
 699        if (bs) {
 700            bdrv_coroutine_enter(bs, co);
 701            BDRV_POLL_WHILE(bs, !srco.finished);
 702        } else {
 703            qemu_coroutine_enter(co);
 704            while (!srco.finished) {
 705                aio_poll(qemu_get_aio_context(), true);
 706            }
 707        }
 708    }
 709
 710    return srco.ret;
 711}
 712
 713static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 714                                         struct iovec *iov, int niov,
 715                                         enum AIOCBState aiocb_type);
 716static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
 717static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
 718static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
 719static void co_write_request(void *opaque);
 720
 721static coroutine_fn void reconnect_to_sdog(void *opaque)
 722{
 723    BDRVSheepdogState *s = opaque;
 724    AIOReq *aio_req, *next;
 725
 726    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
 727                       NULL, NULL, NULL);
 728    close(s->fd);
 729    s->fd = -1;
 730
 731    /* Wait for outstanding write requests to be completed. */
 732    while (s->co_send != NULL) {
 733        co_write_request(opaque);
 734    }
 735
 736    /* Try to reconnect the sheepdog server every one second. */
 737    while (s->fd < 0) {
 738        Error *local_err = NULL;
 739        s->fd = get_sheep_fd(s, &local_err);
 740        if (s->fd < 0) {
 741            trace_sheepdog_reconnect_to_sdog();
 742            error_report_err(local_err);
 743            qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 1000000000ULL);
 744        }
 745    };
 746
 747    /*
 748     * Now we have to resend all the request in the inflight queue.  However,
 749     * resend_aioreq() can yield and newly created requests can be added to the
 750     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
 751     * have to move all the inflight requests to the failed queue before
 752     * resend_aioreq() is called.
 753     */
 754    qemu_co_mutex_lock(&s->queue_lock);
 755    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
 756        QLIST_REMOVE(aio_req, aio_siblings);
 757        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
 758    }
 759
 760    /* Resend all the failed aio requests. */
 761    while (!QLIST_EMPTY(&s->failed_aio_head)) {
 762        aio_req = QLIST_FIRST(&s->failed_aio_head);
 763        QLIST_REMOVE(aio_req, aio_siblings);
 764        qemu_co_mutex_unlock(&s->queue_lock);
 765        resend_aioreq(s, aio_req);
 766        qemu_co_mutex_lock(&s->queue_lock);
 767    }
 768    qemu_co_mutex_unlock(&s->queue_lock);
 769}
 770
 771/*
 772 * Receive responses of the I/O requests.
 773 *
 774 * This function is registered as a fd handler, and called from the
 775 * main loop when s->fd is ready for reading responses.
 776 */
 777static void coroutine_fn aio_read_response(void *opaque)
 778{
 779    SheepdogObjRsp rsp;
 780    BDRVSheepdogState *s = opaque;
 781    int fd = s->fd;
 782    int ret;
 783    AIOReq *aio_req = NULL;
 784    SheepdogAIOCB *acb;
 785    uint64_t idx;
 786
 787    /* read a header */
 788    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
 789    if (ret != sizeof(rsp)) {
 790        error_report("failed to get the header, %s", strerror(errno));
 791        goto err;
 792    }
 793
 794    /* find the right aio_req from the inflight aio list */
 795    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
 796        if (aio_req->id == rsp.id) {
 797            break;
 798        }
 799    }
 800    if (!aio_req) {
 801        error_report("cannot find aio_req %x", rsp.id);
 802        goto err;
 803    }
 804
 805    acb = aio_req->aiocb;
 806
 807    switch (acb->aiocb_type) {
 808    case AIOCB_WRITE_UDATA:
 809        if (!is_data_obj(aio_req->oid)) {
 810            break;
 811        }
 812        idx = data_oid_to_idx(aio_req->oid);
 813
 814        if (aio_req->create) {
 815            /*
 816             * If the object is newly created one, we need to update
 817             * the vdi object (metadata object).  min_dirty_data_idx
 818             * and max_dirty_data_idx are changed to include updated
 819             * index between them.
 820             */
 821            if (rsp.result == SD_RES_SUCCESS) {
 822                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
 823                acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
 824                acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
 825            }
 826        }
 827        break;
 828    case AIOCB_READ_UDATA:
 829        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
 830                            aio_req->iov_offset, rsp.data_length);
 831        if (ret != rsp.data_length) {
 832            error_report("failed to get the data, %s", strerror(errno));
 833            goto err;
 834        }
 835        break;
 836    case AIOCB_FLUSH_CACHE:
 837        if (rsp.result == SD_RES_INVALID_PARMS) {
 838            trace_sheepdog_aio_read_response();
 839            s->cache_flags = SD_FLAG_CMD_DIRECT;
 840            rsp.result = SD_RES_SUCCESS;
 841        }
 842        break;
 843    case AIOCB_DISCARD_OBJ:
 844        switch (rsp.result) {
 845        case SD_RES_INVALID_PARMS:
 846            error_report("server doesn't support discard command");
 847            rsp.result = SD_RES_SUCCESS;
 848            s->discard_supported = false;
 849            break;
 850        default:
 851            break;
 852        }
 853    }
 854
 855    /* No more data for this aio_req (reload_inode below uses its own file
 856     * descriptor handler which doesn't use co_recv).
 857    */
 858    s->co_recv = NULL;
 859
 860    qemu_co_mutex_lock(&s->queue_lock);
 861    QLIST_REMOVE(aio_req, aio_siblings);
 862    qemu_co_mutex_unlock(&s->queue_lock);
 863
 864    switch (rsp.result) {
 865    case SD_RES_SUCCESS:
 866        break;
 867    case SD_RES_READONLY:
 868        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
 869            ret = reload_inode(s, 0, "");
 870            if (ret < 0) {
 871                goto err;
 872            }
 873        }
 874        if (is_data_obj(aio_req->oid)) {
 875            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
 876                                           data_oid_to_idx(aio_req->oid));
 877        } else {
 878            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
 879        }
 880        resend_aioreq(s, aio_req);
 881        return;
 882    default:
 883        acb->ret = -EIO;
 884        error_report("%s", sd_strerror(rsp.result));
 885        break;
 886    }
 887
 888    g_free(aio_req);
 889
 890    if (!--acb->nr_pending) {
 891        /*
 892         * We've finished all requests which belong to the AIOCB, so
 893         * we can switch back to sd_co_readv/writev now.
 894         */
 895        aio_co_wake(acb->coroutine);
 896    }
 897
 898    return;
 899
 900err:
 901    reconnect_to_sdog(opaque);
 902}
 903
 904static void co_read_response(void *opaque)
 905{
 906    BDRVSheepdogState *s = opaque;
 907
 908    if (!s->co_recv) {
 909        s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
 910    }
 911
 912    aio_co_enter(s->aio_context, s->co_recv);
 913}
 914
 915static void co_write_request(void *opaque)
 916{
 917    BDRVSheepdogState *s = opaque;
 918
 919    aio_co_wake(s->co_send);
 920}
 921
 922/*
 923 * Return a socket descriptor to read/write objects.
 924 *
 925 * We cannot use this descriptor for other operations because
 926 * the block driver may be on waiting response from the server.
 927 */
 928static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
 929{
 930    int fd;
 931
 932    fd = connect_to_sdog(s, errp);
 933    if (fd < 0) {
 934        return fd;
 935    }
 936
 937    aio_set_fd_handler(s->aio_context, fd, false,
 938                       co_read_response, NULL, NULL, s);
 939    return fd;
 940}
 941
 942/*
 943 * Parse numeric snapshot ID in @str
 944 * If @str can't be parsed as number, return false.
 945 * Else, if the number is zero or too large, set *@snapid to zero and
 946 * return true.
 947 * Else, set *@snapid to the number and return true.
 948 */
 949static bool sd_parse_snapid(const char *str, uint32_t *snapid)
 950{
 951    unsigned long ul;
 952    int ret;
 953
 954    ret = qemu_strtoul(str, NULL, 10, &ul);
 955    if (ret == -ERANGE) {
 956        ul = ret = 0;
 957    }
 958    if (ret) {
 959        return false;
 960    }
 961    if (ul > UINT32_MAX) {
 962        ul = 0;
 963    }
 964
 965    *snapid = ul;
 966    return true;
 967}
 968
 969static bool sd_parse_snapid_or_tag(const char *str,
 970                                   uint32_t *snapid, char tag[])
 971{
 972    if (!sd_parse_snapid(str, snapid)) {
 973        *snapid = 0;
 974        if (g_strlcpy(tag, str, SD_MAX_VDI_TAG_LEN) >= SD_MAX_VDI_TAG_LEN) {
 975            return false;
 976        }
 977    } else if (!*snapid) {
 978        return false;
 979    } else {
 980        tag[0] = 0;
 981    }
 982    return true;
 983}
 984
 985typedef struct {
 986    const char *path;           /* non-null iff transport is tcp */
 987    const char *host;           /* valid when transport is tcp */
 988    int port;                   /* valid when transport is tcp */
 989    char vdi[SD_MAX_VDI_LEN];
 990    char tag[SD_MAX_VDI_TAG_LEN];
 991    uint32_t snap_id;
 992    /* Remainder is only for sd_config_done() */
 993    URI *uri;
 994    QueryParams *qp;
 995} SheepdogConfig;
 996
 997static void sd_config_done(SheepdogConfig *cfg)
 998{
 999    if (cfg->qp) {
1000        query_params_free(cfg->qp);
1001    }
1002    uri_free(cfg->uri);
1003}
1004
1005static void sd_parse_uri(SheepdogConfig *cfg, const char *filename,
1006                         Error **errp)
1007{
1008    Error *err = NULL;
1009    QueryParams *qp = NULL;
1010    bool is_unix;
1011    URI *uri;
1012
1013    memset(cfg, 0, sizeof(*cfg));
1014
1015    cfg->uri = uri = uri_parse(filename);
1016    if (!uri) {
1017        error_setg(&err, "invalid URI '%s'", filename);
1018        goto out;
1019    }
1020
1021    /* transport */
1022    if (!g_strcmp0(uri->scheme, "sheepdog")) {
1023        is_unix = false;
1024    } else if (!g_strcmp0(uri->scheme, "sheepdog+tcp")) {
1025        is_unix = false;
1026    } else if (!g_strcmp0(uri->scheme, "sheepdog+unix")) {
1027        is_unix = true;
1028    } else {
1029        error_setg(&err, "URI scheme must be 'sheepdog', 'sheepdog+tcp',"
1030                   " or 'sheepdog+unix'");
1031        goto out;
1032    }
1033
1034    if (uri->path == NULL || !strcmp(uri->path, "/")) {
1035        error_setg(&err, "missing file path in URI");
1036        goto out;
1037    }
1038    if (g_strlcpy(cfg->vdi, uri->path + 1, SD_MAX_VDI_LEN)
1039        >= SD_MAX_VDI_LEN) {
1040        error_setg(&err, "VDI name is too long");
1041        goto out;
1042    }
1043
1044    cfg->qp = qp = query_params_parse(uri->query);
1045
1046    if (is_unix) {
1047        /* sheepdog+unix:///vdiname?socket=path */
1048        if (uri->server || uri->port) {
1049            error_setg(&err, "URI scheme %s doesn't accept a server address",
1050                       uri->scheme);
1051            goto out;
1052        }
1053        if (!qp->n) {
1054            error_setg(&err,
1055                       "URI scheme %s requires query parameter 'socket'",
1056                       uri->scheme);
1057            goto out;
1058        }
1059        if (qp->n != 1 || strcmp(qp->p[0].name, "socket")) {
1060            error_setg(&err, "unexpected query parameters");
1061            goto out;
1062        }
1063        cfg->path = qp->p[0].value;
1064    } else {
1065        /* sheepdog[+tcp]://[host:port]/vdiname */
1066        if (qp->n) {
1067            error_setg(&err, "unexpected query parameters");
1068            goto out;
1069        }
1070        cfg->host = uri->server;
1071        cfg->port = uri->port;
1072    }
1073
1074    /* snapshot tag */
1075    if (uri->fragment) {
1076        if (!sd_parse_snapid_or_tag(uri->fragment,
1077                                    &cfg->snap_id, cfg->tag)) {
1078            error_setg(&err, "'%s' is not a valid snapshot ID",
1079                       uri->fragment);
1080            goto out;
1081        }
1082    } else {
1083        cfg->snap_id = CURRENT_VDI_ID; /* search current vdi */
1084    }
1085
1086out:
1087    if (err) {
1088        error_propagate(errp, err);
1089        sd_config_done(cfg);
1090    }
1091}
1092
1093/*
1094 * Parse a filename (old syntax)
1095 *
1096 * filename must be one of the following formats:
1097 *   1. [vdiname]
1098 *   2. [vdiname]:[snapid]
1099 *   3. [vdiname]:[tag]
1100 *   4. [hostname]:[port]:[vdiname]
1101 *   5. [hostname]:[port]:[vdiname]:[snapid]
1102 *   6. [hostname]:[port]:[vdiname]:[tag]
1103 *
1104 * You can boot from the snapshot images by specifying `snapid` or
1105 * `tag'.
1106 *
1107 * You can run VMs outside the Sheepdog cluster by specifying
1108 * `hostname' and `port' (experimental).
1109 */
1110static void parse_vdiname(SheepdogConfig *cfg, const char *filename,
1111                          Error **errp)
1112{
1113    Error *err = NULL;
1114    char *p, *q, *uri;
1115    const char *host_spec, *vdi_spec;
1116    int nr_sep;
1117
1118    strstart(filename, "sheepdog:", &filename);
1119    p = q = g_strdup(filename);
1120
1121    /* count the number of separators */
1122    nr_sep = 0;
1123    while (*p) {
1124        if (*p == ':') {
1125            nr_sep++;
1126        }
1127        p++;
1128    }
1129    p = q;
1130
1131    /* use the first two tokens as host_spec. */
1132    if (nr_sep >= 2) {
1133        host_spec = p;
1134        p = strchr(p, ':');
1135        p++;
1136        p = strchr(p, ':');
1137        *p++ = '\0';
1138    } else {
1139        host_spec = "";
1140    }
1141
1142    vdi_spec = p;
1143
1144    p = strchr(vdi_spec, ':');
1145    if (p) {
1146        *p++ = '#';
1147    }
1148
1149    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1150
1151    /*
1152     * FIXME We to escape URI meta-characters, e.g. "x?y=z"
1153     * produces "sheepdog://x?y=z".  Because of that ...
1154     */
1155    sd_parse_uri(cfg, uri, &err);
1156    if (err) {
1157        /*
1158         * ... this can fail, but the error message is misleading.
1159         * Replace it by the traditional useless one until the
1160         * escaping is fixed.
1161         */
1162        error_free(err);
1163        error_setg(errp, "Can't parse filename");
1164    }
1165
1166    g_free(q);
1167    g_free(uri);
1168}
1169
1170static void sd_parse_filename(const char *filename, QDict *options,
1171                              Error **errp)
1172{
1173    Error *err = NULL;
1174    SheepdogConfig cfg;
1175    char buf[32];
1176
1177    if (strstr(filename, "://")) {
1178        sd_parse_uri(&cfg, filename, &err);
1179    } else {
1180        parse_vdiname(&cfg, filename, &err);
1181    }
1182    if (err) {
1183        error_propagate(errp, err);
1184        return;
1185    }
1186
1187    if (cfg.path) {
1188        qdict_set_default_str(options, "server.path", cfg.path);
1189        qdict_set_default_str(options, "server.type", "unix");
1190    } else {
1191        qdict_set_default_str(options, "server.type", "inet");
1192        qdict_set_default_str(options, "server.host",
1193                              cfg.host ?: SD_DEFAULT_ADDR);
1194        snprintf(buf, sizeof(buf), "%d", cfg.port ?: SD_DEFAULT_PORT);
1195        qdict_set_default_str(options, "server.port", buf);
1196    }
1197    qdict_set_default_str(options, "vdi", cfg.vdi);
1198    qdict_set_default_str(options, "tag", cfg.tag);
1199    if (cfg.snap_id) {
1200        snprintf(buf, sizeof(buf), "%d", cfg.snap_id);
1201        qdict_set_default_str(options, "snap-id", buf);
1202    }
1203
1204    sd_config_done(&cfg);
1205}
1206
1207static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1208                         uint32_t snapid, const char *tag, uint32_t *vid,
1209                         bool lock, Error **errp)
1210{
1211    int ret, fd;
1212    SheepdogVdiReq hdr;
1213    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1214    unsigned int wlen, rlen = 0;
1215    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN] QEMU_NONSTRING;
1216
1217    fd = connect_to_sdog(s, errp);
1218    if (fd < 0) {
1219        return fd;
1220    }
1221
1222    /* This pair of strncpy calls ensures that the buffer is zero-filled,
1223     * which is desirable since we'll soon be sending those bytes, and
1224     * don't want the send_req to read uninitialized data.
1225     */
1226    strncpy(buf, filename, SD_MAX_VDI_LEN);
1227    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1228
1229    memset(&hdr, 0, sizeof(hdr));
1230    if (lock) {
1231        hdr.opcode = SD_OP_LOCK_VDI;
1232        hdr.type = LOCK_TYPE_NORMAL;
1233    } else {
1234        hdr.opcode = SD_OP_GET_VDI_INFO;
1235    }
1236    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1237    hdr.proto_ver = SD_PROTO_VER;
1238    hdr.data_length = wlen;
1239    hdr.snapid = snapid;
1240    hdr.flags = SD_FLAG_CMD_WRITE;
1241
1242    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1243    if (ret) {
1244        error_setg_errno(errp, -ret, "cannot get vdi info");
1245        goto out;
1246    }
1247
1248    if (rsp->result != SD_RES_SUCCESS) {
1249        error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1250                   sd_strerror(rsp->result), filename, snapid, tag);
1251        if (rsp->result == SD_RES_NO_VDI) {
1252            ret = -ENOENT;
1253        } else if (rsp->result == SD_RES_VDI_LOCKED) {
1254            ret = -EBUSY;
1255        } else {
1256            ret = -EIO;
1257        }
1258        goto out;
1259    }
1260    *vid = rsp->vdi_id;
1261
1262    ret = 0;
1263out:
1264    closesocket(fd);
1265    return ret;
1266}
1267
1268static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1269                                         struct iovec *iov, int niov,
1270                                         enum AIOCBState aiocb_type)
1271{
1272    int nr_copies = s->inode.nr_copies;
1273    SheepdogObjReq hdr;
1274    unsigned int wlen = 0;
1275    int ret;
1276    uint64_t oid = aio_req->oid;
1277    unsigned int datalen = aio_req->data_len;
1278    uint64_t offset = aio_req->offset;
1279    uint8_t flags = aio_req->flags;
1280    uint64_t old_oid = aio_req->base_oid;
1281    bool create = aio_req->create;
1282
1283    qemu_co_mutex_lock(&s->queue_lock);
1284    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1285    qemu_co_mutex_unlock(&s->queue_lock);
1286
1287    if (!nr_copies) {
1288        error_report("bug");
1289    }
1290
1291    memset(&hdr, 0, sizeof(hdr));
1292
1293    switch (aiocb_type) {
1294    case AIOCB_FLUSH_CACHE:
1295        hdr.opcode = SD_OP_FLUSH_VDI;
1296        break;
1297    case AIOCB_READ_UDATA:
1298        hdr.opcode = SD_OP_READ_OBJ;
1299        hdr.flags = flags;
1300        break;
1301    case AIOCB_WRITE_UDATA:
1302        if (create) {
1303            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1304        } else {
1305            hdr.opcode = SD_OP_WRITE_OBJ;
1306        }
1307        wlen = datalen;
1308        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1309        break;
1310    case AIOCB_DISCARD_OBJ:
1311        hdr.opcode = SD_OP_WRITE_OBJ;
1312        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1313        s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1314        offset = offsetof(SheepdogInode,
1315                          data_vdi_id[data_oid_to_idx(oid)]);
1316        oid = vid_to_vdi_oid(s->inode.vdi_id);
1317        wlen = datalen = sizeof(uint32_t);
1318        break;
1319    }
1320
1321    if (s->cache_flags) {
1322        hdr.flags |= s->cache_flags;
1323    }
1324
1325    hdr.oid = oid;
1326    hdr.cow_oid = old_oid;
1327    hdr.copies = s->inode.nr_copies;
1328
1329    hdr.data_length = datalen;
1330    hdr.offset = offset;
1331
1332    hdr.id = aio_req->id;
1333
1334    qemu_co_mutex_lock(&s->lock);
1335    s->co_send = qemu_coroutine_self();
1336    aio_set_fd_handler(s->aio_context, s->fd, false,
1337                       co_read_response, co_write_request, NULL, s);
1338    socket_set_cork(s->fd, 1);
1339
1340    /* send a header */
1341    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1342    if (ret != sizeof(hdr)) {
1343        error_report("failed to send a req, %s", strerror(errno));
1344        goto out;
1345    }
1346
1347    if (wlen) {
1348        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1349        if (ret != wlen) {
1350            error_report("failed to send a data, %s", strerror(errno));
1351        }
1352    }
1353out:
1354    socket_set_cork(s->fd, 0);
1355    aio_set_fd_handler(s->aio_context, s->fd, false,
1356                       co_read_response, NULL, NULL, s);
1357    s->co_send = NULL;
1358    qemu_co_mutex_unlock(&s->lock);
1359}
1360
1361static int read_write_object(int fd, BlockDriverState *bs, char *buf,
1362                             uint64_t oid, uint8_t copies,
1363                             unsigned int datalen, uint64_t offset,
1364                             bool write, bool create, uint32_t cache_flags)
1365{
1366    SheepdogObjReq hdr;
1367    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1368    unsigned int wlen, rlen;
1369    int ret;
1370
1371    memset(&hdr, 0, sizeof(hdr));
1372
1373    if (write) {
1374        wlen = datalen;
1375        rlen = 0;
1376        hdr.flags = SD_FLAG_CMD_WRITE;
1377        if (create) {
1378            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1379        } else {
1380            hdr.opcode = SD_OP_WRITE_OBJ;
1381        }
1382    } else {
1383        wlen = 0;
1384        rlen = datalen;
1385        hdr.opcode = SD_OP_READ_OBJ;
1386    }
1387
1388    hdr.flags |= cache_flags;
1389
1390    hdr.oid = oid;
1391    hdr.data_length = datalen;
1392    hdr.offset = offset;
1393    hdr.copies = copies;
1394
1395    ret = do_req(fd, bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1396    if (ret) {
1397        error_report("failed to send a request to the sheep");
1398        return ret;
1399    }
1400
1401    switch (rsp->result) {
1402    case SD_RES_SUCCESS:
1403        return 0;
1404    default:
1405        error_report("%s", sd_strerror(rsp->result));
1406        return -EIO;
1407    }
1408}
1409
1410static int read_object(int fd, BlockDriverState *bs, char *buf,
1411                       uint64_t oid, uint8_t copies,
1412                       unsigned int datalen, uint64_t offset,
1413                       uint32_t cache_flags)
1414{
1415    return read_write_object(fd, bs, buf, oid, copies,
1416                             datalen, offset, false,
1417                             false, cache_flags);
1418}
1419
1420static int write_object(int fd, BlockDriverState *bs, char *buf,
1421                        uint64_t oid, uint8_t copies,
1422                        unsigned int datalen, uint64_t offset, bool create,
1423                        uint32_t cache_flags)
1424{
1425    return read_write_object(fd, bs, buf, oid, copies,
1426                             datalen, offset, true,
1427                             create, cache_flags);
1428}
1429
1430/* update inode with the latest state */
1431static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1432{
1433    Error *local_err = NULL;
1434    SheepdogInode *inode;
1435    int ret = 0, fd;
1436    uint32_t vid = 0;
1437
1438    fd = connect_to_sdog(s, &local_err);
1439    if (fd < 0) {
1440        error_report_err(local_err);
1441        return -EIO;
1442    }
1443
1444    inode = g_malloc(SD_INODE_HEADER_SIZE);
1445
1446    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1447    if (ret) {
1448        error_report_err(local_err);
1449        goto out;
1450    }
1451
1452    ret = read_object(fd, s->bs, (char *)inode, vid_to_vdi_oid(vid),
1453                      s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1454                      s->cache_flags);
1455    if (ret < 0) {
1456        goto out;
1457    }
1458
1459    if (inode->vdi_id != s->inode.vdi_id) {
1460        memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1461    }
1462
1463out:
1464    g_free(inode);
1465    closesocket(fd);
1466
1467    return ret;
1468}
1469
1470static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1471{
1472    SheepdogAIOCB *acb = aio_req->aiocb;
1473
1474    aio_req->create = false;
1475
1476    /* check whether this request becomes a CoW one */
1477    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1478        int idx = data_oid_to_idx(aio_req->oid);
1479
1480        if (is_data_obj_writable(&s->inode, idx)) {
1481            goto out;
1482        }
1483
1484        if (s->inode.data_vdi_id[idx]) {
1485            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1486            aio_req->flags |= SD_FLAG_CMD_COW;
1487        }
1488        aio_req->create = true;
1489    }
1490out:
1491    if (is_data_obj(aio_req->oid)) {
1492        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1493                        acb->aiocb_type);
1494    } else {
1495        struct iovec iov;
1496        iov.iov_base = &s->inode;
1497        iov.iov_len = sizeof(s->inode);
1498        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1499    }
1500}
1501
1502static void sd_detach_aio_context(BlockDriverState *bs)
1503{
1504    BDRVSheepdogState *s = bs->opaque;
1505
1506    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1507                       NULL, NULL, NULL);
1508}
1509
1510static void sd_attach_aio_context(BlockDriverState *bs,
1511                                  AioContext *new_context)
1512{
1513    BDRVSheepdogState *s = bs->opaque;
1514
1515    s->aio_context = new_context;
1516    aio_set_fd_handler(new_context, s->fd, false,
1517                       co_read_response, NULL, NULL, s);
1518}
1519
1520static QemuOptsList runtime_opts = {
1521    .name = "sheepdog",
1522    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1523    .desc = {
1524        {
1525            .name = "vdi",
1526            .type = QEMU_OPT_STRING,
1527        },
1528        {
1529            .name = "snap-id",
1530            .type = QEMU_OPT_NUMBER,
1531        },
1532        {
1533            .name = "tag",
1534            .type = QEMU_OPT_STRING,
1535        },
1536        { /* end of list */ }
1537    },
1538};
1539
1540static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1541                   Error **errp)
1542{
1543    int ret, fd;
1544    uint32_t vid = 0;
1545    BDRVSheepdogState *s = bs->opaque;
1546    const char *vdi, *snap_id_str, *tag;
1547    uint64_t snap_id;
1548    char *buf = NULL;
1549    QemuOpts *opts;
1550    Error *local_err = NULL;
1551
1552    s->bs = bs;
1553    s->aio_context = bdrv_get_aio_context(bs);
1554
1555    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1556    qemu_opts_absorb_qdict(opts, options, &local_err);
1557    if (local_err) {
1558        error_propagate(errp, local_err);
1559        ret = -EINVAL;
1560        goto err_no_fd;
1561    }
1562
1563    s->addr = sd_server_config(options, errp);
1564    if (!s->addr) {
1565        ret = -EINVAL;
1566        goto err_no_fd;
1567    }
1568
1569    vdi = qemu_opt_get(opts, "vdi");
1570    snap_id_str = qemu_opt_get(opts, "snap-id");
1571    snap_id = qemu_opt_get_number(opts, "snap-id", CURRENT_VDI_ID);
1572    tag = qemu_opt_get(opts, "tag");
1573
1574    if (!vdi) {
1575        error_setg(errp, "parameter 'vdi' is missing");
1576        ret = -EINVAL;
1577        goto err_no_fd;
1578    }
1579    if (strlen(vdi) >= SD_MAX_VDI_LEN) {
1580        error_setg(errp, "value of parameter 'vdi' is too long");
1581        ret = -EINVAL;
1582        goto err_no_fd;
1583    }
1584
1585    if (snap_id > UINT32_MAX) {
1586        snap_id = 0;
1587    }
1588    if (snap_id_str && !snap_id) {
1589        error_setg(errp, "'snap-id=%s' is not a valid snapshot ID",
1590                   snap_id_str);
1591        ret = -EINVAL;
1592        goto err_no_fd;
1593    }
1594
1595    if (!tag) {
1596        tag = "";
1597    }
1598    if (strlen(tag) >= SD_MAX_VDI_TAG_LEN) {
1599        error_setg(errp, "value of parameter 'tag' is too long");
1600        ret = -EINVAL;
1601        goto err_no_fd;
1602    }
1603
1604    QLIST_INIT(&s->inflight_aio_head);
1605    QLIST_INIT(&s->failed_aio_head);
1606    QLIST_INIT(&s->inflight_aiocb_head);
1607
1608    s->fd = get_sheep_fd(s, errp);
1609    if (s->fd < 0) {
1610        ret = s->fd;
1611        goto err_no_fd;
1612    }
1613
1614    ret = find_vdi_name(s, vdi, (uint32_t)snap_id, tag, &vid, true, errp);
1615    if (ret) {
1616        goto err;
1617    }
1618
1619    /*
1620     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1621     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1622     */
1623    s->cache_flags = SD_FLAG_CMD_CACHE;
1624    if (flags & BDRV_O_NOCACHE) {
1625        s->cache_flags = SD_FLAG_CMD_DIRECT;
1626    }
1627    s->discard_supported = true;
1628
1629    if (snap_id || tag[0]) {
1630        trace_sheepdog_open(vid);
1631        s->is_snapshot = true;
1632    }
1633
1634    fd = connect_to_sdog(s, errp);
1635    if (fd < 0) {
1636        ret = fd;
1637        goto err;
1638    }
1639
1640    buf = g_malloc(SD_INODE_SIZE);
1641    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
1642                      0, SD_INODE_SIZE, 0, s->cache_flags);
1643
1644    closesocket(fd);
1645
1646    if (ret) {
1647        error_setg(errp, "Can't read snapshot inode");
1648        goto err;
1649    }
1650
1651    memcpy(&s->inode, buf, sizeof(s->inode));
1652
1653    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1654    pstrcpy(s->name, sizeof(s->name), vdi);
1655    qemu_co_mutex_init(&s->lock);
1656    qemu_co_mutex_init(&s->queue_lock);
1657    qemu_co_queue_init(&s->overlapping_queue);
1658    qemu_opts_del(opts);
1659    g_free(buf);
1660    return 0;
1661
1662err:
1663    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1664                       false, NULL, NULL, NULL, NULL);
1665    closesocket(s->fd);
1666err_no_fd:
1667    qemu_opts_del(opts);
1668    g_free(buf);
1669    return ret;
1670}
1671
1672static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1673                             Error **errp)
1674{
1675    BDRVSheepdogState *s = state->bs->opaque;
1676    BDRVSheepdogReopenState *re_s;
1677    int ret = 0;
1678
1679    re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1680
1681    re_s->cache_flags = SD_FLAG_CMD_CACHE;
1682    if (state->flags & BDRV_O_NOCACHE) {
1683        re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1684    }
1685
1686    re_s->fd = get_sheep_fd(s, errp);
1687    if (re_s->fd < 0) {
1688        ret = re_s->fd;
1689        return ret;
1690    }
1691
1692    return ret;
1693}
1694
1695static void sd_reopen_commit(BDRVReopenState *state)
1696{
1697    BDRVSheepdogReopenState *re_s = state->opaque;
1698    BDRVSheepdogState *s = state->bs->opaque;
1699
1700    if (s->fd) {
1701        aio_set_fd_handler(s->aio_context, s->fd, false,
1702                           NULL, NULL, NULL, NULL);
1703        closesocket(s->fd);
1704    }
1705
1706    s->fd = re_s->fd;
1707    s->cache_flags = re_s->cache_flags;
1708
1709    g_free(state->opaque);
1710    state->opaque = NULL;
1711
1712    return;
1713}
1714
1715static void sd_reopen_abort(BDRVReopenState *state)
1716{
1717    BDRVSheepdogReopenState *re_s = state->opaque;
1718    BDRVSheepdogState *s = state->bs->opaque;
1719
1720    if (re_s == NULL) {
1721        return;
1722    }
1723
1724    if (re_s->fd) {
1725        aio_set_fd_handler(s->aio_context, re_s->fd, false,
1726                           NULL, NULL, NULL, NULL);
1727        closesocket(re_s->fd);
1728    }
1729
1730    g_free(state->opaque);
1731    state->opaque = NULL;
1732
1733    return;
1734}
1735
1736static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1737                        Error **errp)
1738{
1739    SheepdogVdiReq hdr;
1740    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1741    int fd, ret;
1742    unsigned int wlen, rlen = 0;
1743    char buf[SD_MAX_VDI_LEN];
1744
1745    fd = connect_to_sdog(s, errp);
1746    if (fd < 0) {
1747        return fd;
1748    }
1749
1750    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1751     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1752     */
1753    memset(buf, 0, sizeof(buf));
1754    pstrcpy(buf, sizeof(buf), s->name);
1755
1756    memset(&hdr, 0, sizeof(hdr));
1757    hdr.opcode = SD_OP_NEW_VDI;
1758    hdr.base_vdi_id = s->inode.vdi_id;
1759
1760    wlen = SD_MAX_VDI_LEN;
1761
1762    hdr.flags = SD_FLAG_CMD_WRITE;
1763    hdr.snapid = snapshot;
1764
1765    hdr.data_length = wlen;
1766    hdr.vdi_size = s->inode.vdi_size;
1767    hdr.copy_policy = s->inode.copy_policy;
1768    hdr.copies = s->inode.nr_copies;
1769    hdr.block_size_shift = s->inode.block_size_shift;
1770
1771    ret = do_req(fd, NULL, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1772
1773    closesocket(fd);
1774
1775    if (ret) {
1776        error_setg_errno(errp, -ret, "create failed");
1777        return ret;
1778    }
1779
1780    if (rsp->result != SD_RES_SUCCESS) {
1781        error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1782        return -EIO;
1783    }
1784
1785    if (vdi_id) {
1786        *vdi_id = rsp->vdi_id;
1787    }
1788
1789    return 0;
1790}
1791
1792static int sd_prealloc(BlockDriverState *bs, int64_t old_size, int64_t new_size,
1793                       Error **errp)
1794{
1795    BlockBackend *blk = NULL;
1796    BDRVSheepdogState *base = bs->opaque;
1797    unsigned long buf_size;
1798    uint32_t idx, max_idx;
1799    uint32_t object_size;
1800    void *buf = NULL;
1801    int ret;
1802
1803    blk = blk_new(BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE | BLK_PERM_RESIZE,
1804                  BLK_PERM_ALL);
1805
1806    ret = blk_insert_bs(blk, bs, errp);
1807    if (ret < 0) {
1808        goto out_with_err_set;
1809    }
1810
1811    blk_set_allow_write_beyond_eof(blk, true);
1812
1813    object_size = (UINT32_C(1) << base->inode.block_size_shift);
1814    buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1815    buf = g_malloc0(buf_size);
1816
1817    max_idx = DIV_ROUND_UP(new_size, buf_size);
1818
1819    for (idx = old_size / buf_size; idx < max_idx; idx++) {
1820        /*
1821         * The created image can be a cloned image, so we need to read
1822         * a data from the source image.
1823         */
1824        ret = blk_pread(blk, idx * buf_size, buf, buf_size);
1825        if (ret < 0) {
1826            goto out;
1827        }
1828        ret = blk_pwrite(blk, idx * buf_size, buf, buf_size, 0);
1829        if (ret < 0) {
1830            goto out;
1831        }
1832    }
1833
1834    ret = 0;
1835out:
1836    if (ret < 0) {
1837        error_setg_errno(errp, -ret, "Can't pre-allocate");
1838    }
1839out_with_err_set:
1840    blk_unref(blk);
1841    g_free(buf);
1842
1843    return ret;
1844}
1845
1846static int sd_create_prealloc(BlockdevOptionsSheepdog *location, int64_t size,
1847                              Error **errp)
1848{
1849    BlockDriverState *bs;
1850    Visitor *v;
1851    QObject *obj = NULL;
1852    QDict *qdict;
1853    Error *local_err = NULL;
1854    int ret;
1855
1856    v = qobject_output_visitor_new(&obj);
1857    visit_type_BlockdevOptionsSheepdog(v, NULL, &location, &local_err);
1858    visit_free(v);
1859
1860    if (local_err) {
1861        error_propagate(errp, local_err);
1862        qobject_unref(obj);
1863        return -EINVAL;
1864    }
1865
1866    qdict = qobject_to(QDict, obj);
1867    qdict_flatten(qdict);
1868
1869    qdict_put_str(qdict, "driver", "sheepdog");
1870
1871    bs = bdrv_open(NULL, NULL, qdict, BDRV_O_PROTOCOL | BDRV_O_RDWR, errp);
1872    if (bs == NULL) {
1873        ret = -EIO;
1874        goto fail;
1875    }
1876
1877    ret = sd_prealloc(bs, 0, size, errp);
1878fail:
1879    bdrv_unref(bs);
1880    qobject_unref(qdict);
1881    return ret;
1882}
1883
1884static int parse_redundancy(BDRVSheepdogState *s, SheepdogRedundancy *opt)
1885{
1886    struct SheepdogInode *inode = &s->inode;
1887
1888    switch (opt->type) {
1889    case SHEEPDOG_REDUNDANCY_TYPE_FULL:
1890        if (opt->u.full.copies > SD_MAX_COPIES || opt->u.full.copies < 1) {
1891            return -EINVAL;
1892        }
1893        inode->copy_policy = 0;
1894        inode->nr_copies = opt->u.full.copies;
1895        return 0;
1896
1897    case SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED:
1898    {
1899        int64_t copy = opt->u.erasure_coded.data_strips;
1900        int64_t parity = opt->u.erasure_coded.parity_strips;
1901
1902        if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1903            return -EINVAL;
1904        }
1905
1906        if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1907            return -EINVAL;
1908        }
1909
1910        /*
1911         * 4 bits for parity and 4 bits for data.
1912         * We have to compress upper data bits because it can't represent 16
1913         */
1914        inode->copy_policy = ((copy / 2) << 4) + parity;
1915        inode->nr_copies = copy + parity;
1916        return 0;
1917    }
1918
1919    default:
1920        g_assert_not_reached();
1921    }
1922
1923    return -EINVAL;
1924}
1925
1926/*
1927 * Sheepdog support two kinds of redundancy, full replication and erasure
1928 * coding.
1929 *
1930 * # create a fully replicated vdi with x copies
1931 * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1932 *
1933 * # create a erasure coded vdi with x data strips and y parity strips
1934 * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1935 */
1936static SheepdogRedundancy *parse_redundancy_str(const char *opt)
1937{
1938    SheepdogRedundancy *redundancy;
1939    const char *n1, *n2;
1940    long copy, parity;
1941    char p[10];
1942    int ret;
1943
1944    pstrcpy(p, sizeof(p), opt);
1945    n1 = strtok(p, ":");
1946    n2 = strtok(NULL, ":");
1947
1948    if (!n1) {
1949        return NULL;
1950    }
1951
1952    ret = qemu_strtol(n1, NULL, 10, &copy);
1953    if (ret < 0) {
1954        return NULL;
1955    }
1956
1957    redundancy = g_new0(SheepdogRedundancy, 1);
1958    if (!n2) {
1959        *redundancy = (SheepdogRedundancy) {
1960            .type               = SHEEPDOG_REDUNDANCY_TYPE_FULL,
1961            .u.full.copies      = copy,
1962        };
1963    } else {
1964        ret = qemu_strtol(n2, NULL, 10, &parity);
1965        if (ret < 0) {
1966            g_free(redundancy);
1967            return NULL;
1968        }
1969
1970        *redundancy = (SheepdogRedundancy) {
1971            .type               = SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED,
1972            .u.erasure_coded    = {
1973                .data_strips    = copy,
1974                .parity_strips  = parity,
1975            },
1976        };
1977    }
1978
1979    return redundancy;
1980}
1981
1982static int parse_block_size_shift(BDRVSheepdogState *s,
1983                                  BlockdevCreateOptionsSheepdog *opts)
1984{
1985    struct SheepdogInode *inode = &s->inode;
1986    uint64_t object_size;
1987    int obj_order;
1988
1989    if (opts->has_object_size) {
1990        object_size = opts->object_size;
1991
1992        if ((object_size - 1) & object_size) {    /* not a power of 2? */
1993            return -EINVAL;
1994        }
1995        obj_order = ctz32(object_size);
1996        if (obj_order < 20 || obj_order > 31) {
1997            return -EINVAL;
1998        }
1999        inode->block_size_shift = (uint8_t)obj_order;
2000    }
2001
2002    return 0;
2003}
2004
2005static int sd_co_create(BlockdevCreateOptions *options, Error **errp)
2006{
2007    BlockdevCreateOptionsSheepdog *opts = &options->u.sheepdog;
2008    int ret = 0;
2009    uint32_t vid = 0;
2010    char *backing_file = NULL;
2011    char *buf = NULL;
2012    BDRVSheepdogState *s;
2013    uint64_t max_vdi_size;
2014    bool prealloc = false;
2015
2016    assert(options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2017
2018    s = g_new0(BDRVSheepdogState, 1);
2019
2020    /* Steal SocketAddress from QAPI, set NULL to prevent double free */
2021    s->addr = opts->location->server;
2022    opts->location->server = NULL;
2023
2024    if (strlen(opts->location->vdi) >= sizeof(s->name)) {
2025        error_setg(errp, "'vdi' string too long");
2026        ret = -EINVAL;
2027        goto out;
2028    }
2029    pstrcpy(s->name, sizeof(s->name), opts->location->vdi);
2030
2031    s->inode.vdi_size = opts->size;
2032    backing_file = opts->backing_file;
2033
2034    if (!opts->has_preallocation) {
2035        opts->preallocation = PREALLOC_MODE_OFF;
2036    }
2037    switch (opts->preallocation) {
2038    case PREALLOC_MODE_OFF:
2039        prealloc = false;
2040        break;
2041    case PREALLOC_MODE_FULL:
2042        prealloc = true;
2043        break;
2044    default:
2045        error_setg(errp, "Preallocation mode not supported for Sheepdog");
2046        ret = -EINVAL;
2047        goto out;
2048    }
2049
2050    if (opts->has_redundancy) {
2051        ret = parse_redundancy(s, opts->redundancy);
2052        if (ret < 0) {
2053            error_setg(errp, "Invalid redundancy mode");
2054            goto out;
2055        }
2056    }
2057    ret = parse_block_size_shift(s, opts);
2058    if (ret < 0) {
2059        error_setg(errp, "Invalid object_size."
2060                         " obect_size needs to be power of 2"
2061                         " and be limited from 2^20 to 2^31");
2062        goto out;
2063    }
2064
2065    if (opts->has_backing_file) {
2066        BlockBackend *blk;
2067        BDRVSheepdogState *base;
2068        BlockDriver *drv;
2069
2070        /* Currently, only Sheepdog backing image is supported. */
2071        drv = bdrv_find_protocol(opts->backing_file, true, NULL);
2072        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
2073            error_setg(errp, "backing_file must be a sheepdog image");
2074            ret = -EINVAL;
2075            goto out;
2076        }
2077
2078        blk = blk_new_open(opts->backing_file, NULL, NULL,
2079                           BDRV_O_PROTOCOL, errp);
2080        if (blk == NULL) {
2081            ret = -EIO;
2082            goto out;
2083        }
2084
2085        base = blk_bs(blk)->opaque;
2086
2087        if (!is_snapshot(&base->inode)) {
2088            error_setg(errp, "cannot clone from a non snapshot vdi");
2089            blk_unref(blk);
2090            ret = -EINVAL;
2091            goto out;
2092        }
2093        s->inode.vdi_id = base->inode.vdi_id;
2094        blk_unref(blk);
2095    }
2096
2097    s->aio_context = qemu_get_aio_context();
2098
2099    /* if block_size_shift is not specified, get cluster default value */
2100    if (s->inode.block_size_shift == 0) {
2101        SheepdogVdiReq hdr;
2102        SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
2103        int fd;
2104        unsigned int wlen = 0, rlen = 0;
2105
2106        fd = connect_to_sdog(s, errp);
2107        if (fd < 0) {
2108            ret = fd;
2109            goto out;
2110        }
2111
2112        memset(&hdr, 0, sizeof(hdr));
2113        hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
2114        hdr.proto_ver = SD_PROTO_VER;
2115
2116        ret = do_req(fd, NULL, (SheepdogReq *)&hdr,
2117                     NULL, &wlen, &rlen);
2118        closesocket(fd);
2119        if (ret) {
2120            error_setg_errno(errp, -ret, "failed to get cluster default");
2121            goto out;
2122        }
2123        if (rsp->result == SD_RES_SUCCESS) {
2124            s->inode.block_size_shift = rsp->block_size_shift;
2125        } else {
2126            s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
2127        }
2128    }
2129
2130    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2131
2132    if (s->inode.vdi_size > max_vdi_size) {
2133        error_setg(errp, "An image is too large."
2134                         " The maximum image size is %"PRIu64 "GB",
2135                         max_vdi_size / 1024 / 1024 / 1024);
2136        ret = -EINVAL;
2137        goto out;
2138    }
2139
2140    ret = do_sd_create(s, &vid, 0, errp);
2141    if (ret) {
2142        goto out;
2143    }
2144
2145    if (prealloc) {
2146        ret = sd_create_prealloc(opts->location, opts->size, errp);
2147    }
2148out:
2149    g_free(backing_file);
2150    g_free(buf);
2151    g_free(s->addr);
2152    g_free(s);
2153    return ret;
2154}
2155
2156static int coroutine_fn sd_co_create_opts(const char *filename, QemuOpts *opts,
2157                                          Error **errp)
2158{
2159    BlockdevCreateOptions *create_options = NULL;
2160    QDict *qdict, *location_qdict;
2161    Visitor *v;
2162    char *redundancy;
2163    Error *local_err = NULL;
2164    int ret;
2165
2166    redundancy = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
2167
2168    qdict = qemu_opts_to_qdict(opts, NULL);
2169    qdict_put_str(qdict, "driver", "sheepdog");
2170
2171    location_qdict = qdict_new();
2172    qdict_put(qdict, "location", location_qdict);
2173
2174    sd_parse_filename(filename, location_qdict, &local_err);
2175    if (local_err) {
2176        error_propagate(errp, local_err);
2177        ret = -EINVAL;
2178        goto fail;
2179    }
2180
2181    qdict_flatten(qdict);
2182
2183    /* Change legacy command line options into QMP ones */
2184    static const QDictRenames opt_renames[] = {
2185        { BLOCK_OPT_BACKING_FILE,       "backing-file" },
2186        { BLOCK_OPT_OBJECT_SIZE,        "object-size" },
2187        { NULL, NULL },
2188    };
2189
2190    if (!qdict_rename_keys(qdict, opt_renames, errp)) {
2191        ret = -EINVAL;
2192        goto fail;
2193    }
2194
2195    /* Get the QAPI object */
2196    v = qobject_input_visitor_new_flat_confused(qdict, errp);
2197    if (!v) {
2198        ret = -EINVAL;
2199        goto fail;
2200    }
2201
2202    visit_type_BlockdevCreateOptions(v, NULL, &create_options, &local_err);
2203    visit_free(v);
2204
2205    if (local_err) {
2206        error_propagate(errp, local_err);
2207        ret = -EINVAL;
2208        goto fail;
2209    }
2210
2211    assert(create_options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2212    create_options->u.sheepdog.size =
2213        ROUND_UP(create_options->u.sheepdog.size, BDRV_SECTOR_SIZE);
2214
2215    if (redundancy) {
2216        create_options->u.sheepdog.has_redundancy = true;
2217        create_options->u.sheepdog.redundancy =
2218            parse_redundancy_str(redundancy);
2219        if (create_options->u.sheepdog.redundancy == NULL) {
2220            error_setg(errp, "Invalid redundancy mode");
2221            ret = -EINVAL;
2222            goto fail;
2223        }
2224    }
2225
2226    ret = sd_co_create(create_options, errp);
2227fail:
2228    qapi_free_BlockdevCreateOptions(create_options);
2229    qobject_unref(qdict);
2230    g_free(redundancy);
2231    return ret;
2232}
2233
2234static void sd_close(BlockDriverState *bs)
2235{
2236    Error *local_err = NULL;
2237    BDRVSheepdogState *s = bs->opaque;
2238    SheepdogVdiReq hdr;
2239    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2240    unsigned int wlen, rlen = 0;
2241    int fd, ret;
2242
2243    trace_sheepdog_close(s->name);
2244
2245    fd = connect_to_sdog(s, &local_err);
2246    if (fd < 0) {
2247        error_report_err(local_err);
2248        return;
2249    }
2250
2251    memset(&hdr, 0, sizeof(hdr));
2252
2253    hdr.opcode = SD_OP_RELEASE_VDI;
2254    hdr.type = LOCK_TYPE_NORMAL;
2255    hdr.base_vdi_id = s->inode.vdi_id;
2256    wlen = strlen(s->name) + 1;
2257    hdr.data_length = wlen;
2258    hdr.flags = SD_FLAG_CMD_WRITE;
2259
2260    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2261                 s->name, &wlen, &rlen);
2262
2263    closesocket(fd);
2264
2265    if (!ret && rsp->result != SD_RES_SUCCESS &&
2266        rsp->result != SD_RES_VDI_NOT_LOCKED) {
2267        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2268    }
2269
2270    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
2271                       false, NULL, NULL, NULL, NULL);
2272    closesocket(s->fd);
2273    qapi_free_SocketAddress(s->addr);
2274}
2275
2276static int64_t sd_getlength(BlockDriverState *bs)
2277{
2278    BDRVSheepdogState *s = bs->opaque;
2279
2280    return s->inode.vdi_size;
2281}
2282
2283static int coroutine_fn sd_co_truncate(BlockDriverState *bs, int64_t offset,
2284                                       PreallocMode prealloc, Error **errp)
2285{
2286    BDRVSheepdogState *s = bs->opaque;
2287    int ret, fd;
2288    unsigned int datalen;
2289    uint64_t max_vdi_size;
2290    int64_t old_size = s->inode.vdi_size;
2291
2292    if (prealloc != PREALLOC_MODE_OFF && prealloc != PREALLOC_MODE_FULL) {
2293        error_setg(errp, "Unsupported preallocation mode '%s'",
2294                   PreallocMode_str(prealloc));
2295        return -ENOTSUP;
2296    }
2297
2298    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2299    if (offset < old_size) {
2300        error_setg(errp, "shrinking is not supported");
2301        return -EINVAL;
2302    } else if (offset > max_vdi_size) {
2303        error_setg(errp, "too big image size");
2304        return -EINVAL;
2305    }
2306
2307    fd = connect_to_sdog(s, errp);
2308    if (fd < 0) {
2309        return fd;
2310    }
2311
2312    /* we don't need to update entire object */
2313    datalen = SD_INODE_HEADER_SIZE;
2314    s->inode.vdi_size = offset;
2315    ret = write_object(fd, s->bs, (char *)&s->inode,
2316                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2317                       datalen, 0, false, s->cache_flags);
2318    close(fd);
2319
2320    if (ret < 0) {
2321        error_setg_errno(errp, -ret, "failed to update an inode");
2322        return ret;
2323    }
2324
2325    if (prealloc == PREALLOC_MODE_FULL) {
2326        ret = sd_prealloc(bs, old_size, offset, errp);
2327        if (ret < 0) {
2328            return ret;
2329        }
2330    }
2331
2332    return 0;
2333}
2334
2335/*
2336 * This function is called after writing data objects.  If we need to
2337 * update metadata, this sends a write request to the vdi object.
2338 */
2339static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2340{
2341    BDRVSheepdogState *s = acb->s;
2342    struct iovec iov;
2343    AIOReq *aio_req;
2344    uint32_t offset, data_len, mn, mx;
2345
2346    mn = acb->min_dirty_data_idx;
2347    mx = acb->max_dirty_data_idx;
2348    if (mn <= mx) {
2349        /* we need to update the vdi object. */
2350        ++acb->nr_pending;
2351        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2352            mn * sizeof(s->inode.data_vdi_id[0]);
2353        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2354
2355        acb->min_dirty_data_idx = UINT32_MAX;
2356        acb->max_dirty_data_idx = 0;
2357
2358        iov.iov_base = &s->inode;
2359        iov.iov_len = sizeof(s->inode);
2360        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2361                                data_len, offset, 0, false, 0, offset);
2362        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2363        if (--acb->nr_pending) {
2364            qemu_coroutine_yield();
2365        }
2366    }
2367}
2368
2369/* Delete current working VDI on the snapshot chain */
2370static bool sd_delete(BDRVSheepdogState *s)
2371{
2372    Error *local_err = NULL;
2373    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2374    SheepdogVdiReq hdr = {
2375        .opcode = SD_OP_DEL_VDI,
2376        .base_vdi_id = s->inode.vdi_id,
2377        .data_length = wlen,
2378        .flags = SD_FLAG_CMD_WRITE,
2379    };
2380    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2381    int fd, ret;
2382
2383    fd = connect_to_sdog(s, &local_err);
2384    if (fd < 0) {
2385        error_report_err(local_err);
2386        return false;
2387    }
2388
2389    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2390                 s->name, &wlen, &rlen);
2391    closesocket(fd);
2392    if (ret) {
2393        return false;
2394    }
2395    switch (rsp->result) {
2396    case SD_RES_NO_VDI:
2397        error_report("%s was already deleted", s->name);
2398        /* fall through */
2399    case SD_RES_SUCCESS:
2400        break;
2401    default:
2402        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2403        return false;
2404    }
2405
2406    return true;
2407}
2408
2409/*
2410 * Create a writable VDI from a snapshot
2411 */
2412static int sd_create_branch(BDRVSheepdogState *s)
2413{
2414    Error *local_err = NULL;
2415    int ret, fd;
2416    uint32_t vid;
2417    char *buf;
2418    bool deleted;
2419
2420    trace_sheepdog_create_branch_snapshot(s->inode.vdi_id);
2421
2422    buf = g_malloc(SD_INODE_SIZE);
2423
2424    /*
2425     * Even If deletion fails, we will just create extra snapshot based on
2426     * the working VDI which was supposed to be deleted. So no need to
2427     * false bail out.
2428     */
2429    deleted = sd_delete(s);
2430    ret = do_sd_create(s, &vid, !deleted, &local_err);
2431    if (ret) {
2432        error_report_err(local_err);
2433        goto out;
2434    }
2435
2436    trace_sheepdog_create_branch_created(vid);
2437
2438    fd = connect_to_sdog(s, &local_err);
2439    if (fd < 0) {
2440        error_report_err(local_err);
2441        ret = fd;
2442        goto out;
2443    }
2444
2445    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
2446                      s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2447
2448    closesocket(fd);
2449
2450    if (ret < 0) {
2451        goto out;
2452    }
2453
2454    memcpy(&s->inode, buf, sizeof(s->inode));
2455
2456    s->is_snapshot = false;
2457    ret = 0;
2458    trace_sheepdog_create_branch_new(s->inode.vdi_id);
2459
2460out:
2461    g_free(buf);
2462
2463    return ret;
2464}
2465
2466/*
2467 * Send I/O requests to the server.
2468 *
2469 * This function sends requests to the server, links the requests to
2470 * the inflight_list in BDRVSheepdogState, and exits without
2471 * waiting the response.  The responses are received in the
2472 * `aio_read_response' function which is called from the main loop as
2473 * a fd handler.
2474 *
2475 * Returns 1 when we need to wait a response, 0 when there is no sent
2476 * request and -errno in error cases.
2477 */
2478static void coroutine_fn sd_co_rw_vector(SheepdogAIOCB *acb)
2479{
2480    int ret = 0;
2481    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2482    unsigned long idx;
2483    uint32_t object_size;
2484    uint64_t oid;
2485    uint64_t offset;
2486    BDRVSheepdogState *s = acb->s;
2487    SheepdogInode *inode = &s->inode;
2488    AIOReq *aio_req;
2489
2490    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2491        /*
2492         * In the case we open the snapshot VDI, Sheepdog creates the
2493         * writable VDI when we do a write operation first.
2494         */
2495        ret = sd_create_branch(s);
2496        if (ret) {
2497            acb->ret = -EIO;
2498            return;
2499        }
2500    }
2501
2502    object_size = (UINT32_C(1) << inode->block_size_shift);
2503    idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2504    offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2505
2506    /*
2507     * Make sure we don't free the aiocb before we are done with all requests.
2508     * This additional reference is dropped at the end of this function.
2509     */
2510    acb->nr_pending++;
2511
2512    while (done != total) {
2513        uint8_t flags = 0;
2514        uint64_t old_oid = 0;
2515        bool create = false;
2516
2517        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2518
2519        len = MIN(total - done, object_size - offset);
2520
2521        switch (acb->aiocb_type) {
2522        case AIOCB_READ_UDATA:
2523            if (!inode->data_vdi_id[idx]) {
2524                qemu_iovec_memset(acb->qiov, done, 0, len);
2525                goto done;
2526            }
2527            break;
2528        case AIOCB_WRITE_UDATA:
2529            if (!inode->data_vdi_id[idx]) {
2530                create = true;
2531            } else if (!is_data_obj_writable(inode, idx)) {
2532                /* Copy-On-Write */
2533                create = true;
2534                old_oid = oid;
2535                flags = SD_FLAG_CMD_COW;
2536            }
2537            break;
2538        case AIOCB_DISCARD_OBJ:
2539            /*
2540             * We discard the object only when the whole object is
2541             * 1) allocated 2) trimmed. Otherwise, simply skip it.
2542             */
2543            if (len != object_size || inode->data_vdi_id[idx] == 0) {
2544                goto done;
2545            }
2546            break;
2547        default:
2548            break;
2549        }
2550
2551        if (create) {
2552            trace_sheepdog_co_rw_vector_update(inode->vdi_id, oid,
2553                                  vid_to_data_oid(inode->data_vdi_id[idx], idx),
2554                                  idx);
2555            oid = vid_to_data_oid(inode->vdi_id, idx);
2556            trace_sheepdog_co_rw_vector_new(oid);
2557        }
2558
2559        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2560                                old_oid,
2561                                acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2562                                0 : done);
2563        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2564                        acb->aiocb_type);
2565    done:
2566        offset = 0;
2567        idx++;
2568        done += len;
2569    }
2570    if (--acb->nr_pending) {
2571        qemu_coroutine_yield();
2572    }
2573}
2574
2575static void sd_aio_complete(SheepdogAIOCB *acb)
2576{
2577    BDRVSheepdogState *s;
2578    if (acb->aiocb_type == AIOCB_FLUSH_CACHE) {
2579        return;
2580    }
2581
2582    s = acb->s;
2583    qemu_co_mutex_lock(&s->queue_lock);
2584    QLIST_REMOVE(acb, aiocb_siblings);
2585    qemu_co_queue_restart_all(&s->overlapping_queue);
2586    qemu_co_mutex_unlock(&s->queue_lock);
2587}
2588
2589static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2590                                     int nb_sectors, QEMUIOVector *qiov,
2591                                     int flags)
2592{
2593    SheepdogAIOCB acb;
2594    int ret;
2595    int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2596    BDRVSheepdogState *s = bs->opaque;
2597
2598    assert(!flags);
2599    if (offset > s->inode.vdi_size) {
2600        ret = sd_co_truncate(bs, offset, PREALLOC_MODE_OFF, NULL);
2601        if (ret < 0) {
2602            return ret;
2603        }
2604    }
2605
2606    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_WRITE_UDATA);
2607    sd_co_rw_vector(&acb);
2608    sd_write_done(&acb);
2609    sd_aio_complete(&acb);
2610
2611    return acb.ret;
2612}
2613
2614static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2615                       int nb_sectors, QEMUIOVector *qiov)
2616{
2617    SheepdogAIOCB acb;
2618    BDRVSheepdogState *s = bs->opaque;
2619
2620    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_READ_UDATA);
2621    sd_co_rw_vector(&acb);
2622    sd_aio_complete(&acb);
2623
2624    return acb.ret;
2625}
2626
2627static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2628{
2629    BDRVSheepdogState *s = bs->opaque;
2630    SheepdogAIOCB acb;
2631    AIOReq *aio_req;
2632
2633    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2634        return 0;
2635    }
2636
2637    sd_aio_setup(&acb, s, NULL, 0, 0, AIOCB_FLUSH_CACHE);
2638
2639    acb.nr_pending++;
2640    aio_req = alloc_aio_req(s, &acb, vid_to_vdi_oid(s->inode.vdi_id),
2641                            0, 0, 0, false, 0, 0);
2642    add_aio_request(s, aio_req, NULL, 0, acb.aiocb_type);
2643
2644    if (--acb.nr_pending) {
2645        qemu_coroutine_yield();
2646    }
2647
2648    sd_aio_complete(&acb);
2649    return acb.ret;
2650}
2651
2652static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2653{
2654    Error *local_err = NULL;
2655    BDRVSheepdogState *s = bs->opaque;
2656    int ret, fd;
2657    uint32_t new_vid;
2658    SheepdogInode *inode;
2659    unsigned int datalen;
2660
2661    trace_sheepdog_snapshot_create_info(sn_info->name, sn_info->id_str, s->name,
2662                                        sn_info->vm_state_size, s->is_snapshot);
2663
2664    if (s->is_snapshot) {
2665        error_report("You can't create a snapshot of a snapshot VDI, "
2666                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2667
2668        return -EINVAL;
2669    }
2670
2671    trace_sheepdog_snapshot_create(sn_info->name, sn_info->id_str);
2672
2673    s->inode.vm_state_size = sn_info->vm_state_size;
2674    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2675    /* It appears that inode.tag does not require a NUL terminator,
2676     * which means this use of strncpy is ok.
2677     */
2678    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2679    /* we don't need to update entire object */
2680    datalen = SD_INODE_HEADER_SIZE;
2681    inode = g_malloc(datalen);
2682
2683    /* refresh inode. */
2684    fd = connect_to_sdog(s, &local_err);
2685    if (fd < 0) {
2686        error_report_err(local_err);
2687        ret = fd;
2688        goto cleanup;
2689    }
2690
2691    ret = write_object(fd, s->bs, (char *)&s->inode,
2692                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2693                       datalen, 0, false, s->cache_flags);
2694    if (ret < 0) {
2695        error_report("failed to write snapshot's inode.");
2696        goto cleanup;
2697    }
2698
2699    ret = do_sd_create(s, &new_vid, 1, &local_err);
2700    if (ret < 0) {
2701        error_reportf_err(local_err,
2702                          "failed to create inode for snapshot: ");
2703        goto cleanup;
2704    }
2705
2706    ret = read_object(fd, s->bs, (char *)inode,
2707                      vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2708                      s->cache_flags);
2709
2710    if (ret < 0) {
2711        error_report("failed to read new inode info. %s", strerror(errno));
2712        goto cleanup;
2713    }
2714
2715    memcpy(&s->inode, inode, datalen);
2716    trace_sheepdog_snapshot_create_inode(s->inode.name, s->inode.snap_id,
2717                                         s->inode.vdi_id);
2718
2719cleanup:
2720    g_free(inode);
2721    closesocket(fd);
2722    return ret;
2723}
2724
2725/*
2726 * We implement rollback(loadvm) operation to the specified snapshot by
2727 * 1) switch to the snapshot
2728 * 2) rely on sd_create_branch to delete working VDI and
2729 * 3) create a new working VDI based on the specified snapshot
2730 */
2731static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2732{
2733    BDRVSheepdogState *s = bs->opaque;
2734    BDRVSheepdogState *old_s;
2735    char tag[SD_MAX_VDI_TAG_LEN];
2736    uint32_t snapid = 0;
2737    int ret;
2738
2739    if (!sd_parse_snapid_or_tag(snapshot_id, &snapid, tag)) {
2740        return -EINVAL;
2741    }
2742
2743    old_s = g_new(BDRVSheepdogState, 1);
2744
2745    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2746
2747    ret = reload_inode(s, snapid, tag);
2748    if (ret) {
2749        goto out;
2750    }
2751
2752    ret = sd_create_branch(s);
2753    if (ret) {
2754        goto out;
2755    }
2756
2757    g_free(old_s);
2758
2759    return 0;
2760out:
2761    /* recover bdrv_sd_state */
2762    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2763    g_free(old_s);
2764
2765    error_report("failed to open. recover old bdrv_sd_state.");
2766
2767    return ret;
2768}
2769
2770#define NR_BATCHED_DISCARD 128
2771
2772static int remove_objects(BDRVSheepdogState *s, Error **errp)
2773{
2774    int fd, i = 0, nr_objs = 0;
2775    int ret;
2776    SheepdogInode *inode = &s->inode;
2777
2778    fd = connect_to_sdog(s, errp);
2779    if (fd < 0) {
2780        return fd;
2781    }
2782
2783    nr_objs = count_data_objs(inode);
2784    while (i < nr_objs) {
2785        int start_idx, nr_filled_idx;
2786
2787        while (i < nr_objs && !inode->data_vdi_id[i]) {
2788            i++;
2789        }
2790        start_idx = i;
2791
2792        nr_filled_idx = 0;
2793        while (i < nr_objs && nr_filled_idx < NR_BATCHED_DISCARD) {
2794            if (inode->data_vdi_id[i]) {
2795                inode->data_vdi_id[i] = 0;
2796                nr_filled_idx++;
2797            }
2798
2799            i++;
2800        }
2801
2802        ret = write_object(fd, s->bs,
2803                           (char *)&inode->data_vdi_id[start_idx],
2804                           vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
2805                           (i - start_idx) * sizeof(uint32_t),
2806                           offsetof(struct SheepdogInode,
2807                                    data_vdi_id[start_idx]),
2808                           false, s->cache_flags);
2809        if (ret < 0) {
2810            error_setg(errp, "Failed to discard snapshot inode");
2811            goto out;
2812        }
2813    }
2814
2815    ret = 0;
2816out:
2817    closesocket(fd);
2818    return ret;
2819}
2820
2821static int sd_snapshot_delete(BlockDriverState *bs,
2822                              const char *snapshot_id,
2823                              const char *name,
2824                              Error **errp)
2825{
2826    /*
2827     * FIXME should delete the snapshot matching both @snapshot_id and
2828     * @name, but @name not used here
2829     */
2830    unsigned long snap_id = 0;
2831    char snap_tag[SD_MAX_VDI_TAG_LEN];
2832    int fd, ret;
2833    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
2834    BDRVSheepdogState *s = bs->opaque;
2835    unsigned int wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, rlen = 0;
2836    uint32_t vid;
2837    SheepdogVdiReq hdr = {
2838        .opcode = SD_OP_DEL_VDI,
2839        .data_length = wlen,
2840        .flags = SD_FLAG_CMD_WRITE,
2841    };
2842    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2843
2844    ret = remove_objects(s, errp);
2845    if (ret) {
2846        return ret;
2847    }
2848
2849    memset(buf, 0, sizeof(buf));
2850    memset(snap_tag, 0, sizeof(snap_tag));
2851    pstrcpy(buf, SD_MAX_VDI_LEN, s->name);
2852    /* TODO Use sd_parse_snapid() once this mess is cleaned up */
2853    ret = qemu_strtoul(snapshot_id, NULL, 10, &snap_id);
2854    if (ret || snap_id > UINT32_MAX) {
2855        /*
2856         * FIXME Since qemu_strtoul() returns -EINVAL when
2857         * @snapshot_id is null, @snapshot_id is mandatory.  Correct
2858         * would be to require at least one of @snapshot_id and @name.
2859         */
2860        error_setg(errp, "Invalid snapshot ID: %s",
2861                         snapshot_id ? snapshot_id : "<null>");
2862        return -EINVAL;
2863    }
2864
2865    if (snap_id) {
2866        hdr.snapid = (uint32_t) snap_id;
2867    } else {
2868        /* FIXME I suspect we should use @name here */
2869        /* FIXME don't truncate silently */
2870        pstrcpy(snap_tag, sizeof(snap_tag), snapshot_id);
2871        pstrcpy(buf + SD_MAX_VDI_LEN, SD_MAX_VDI_TAG_LEN, snap_tag);
2872    }
2873
2874    ret = find_vdi_name(s, s->name, snap_id, snap_tag, &vid, true, errp);
2875    if (ret) {
2876        return ret;
2877    }
2878
2879    fd = connect_to_sdog(s, errp);
2880    if (fd < 0) {
2881        return fd;
2882    }
2883
2884    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2885                 buf, &wlen, &rlen);
2886    closesocket(fd);
2887    if (ret) {
2888        error_setg_errno(errp, -ret, "Couldn't send request to server");
2889        return ret;
2890    }
2891
2892    switch (rsp->result) {
2893    case SD_RES_NO_VDI:
2894        error_setg(errp, "Can't find the snapshot");
2895        return -ENOENT;
2896    case SD_RES_SUCCESS:
2897        break;
2898    default:
2899        error_setg(errp, "%s", sd_strerror(rsp->result));
2900        return -EIO;
2901    }
2902
2903    return 0;
2904}
2905
2906static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2907{
2908    Error *local_err = NULL;
2909    BDRVSheepdogState *s = bs->opaque;
2910    SheepdogReq req;
2911    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2912    QEMUSnapshotInfo *sn_tab = NULL;
2913    unsigned wlen, rlen;
2914    int found = 0;
2915    SheepdogInode *inode;
2916    unsigned long *vdi_inuse;
2917    unsigned int start_nr;
2918    uint64_t hval;
2919    uint32_t vid;
2920
2921    vdi_inuse = g_malloc(max);
2922    inode = g_malloc(SD_INODE_HEADER_SIZE);
2923
2924    fd = connect_to_sdog(s, &local_err);
2925    if (fd < 0) {
2926        error_report_err(local_err);
2927        ret = fd;
2928        goto out;
2929    }
2930
2931    rlen = max;
2932    wlen = 0;
2933
2934    memset(&req, 0, sizeof(req));
2935
2936    req.opcode = SD_OP_READ_VDIS;
2937    req.data_length = max;
2938
2939    ret = do_req(fd, s->bs, &req, vdi_inuse, &wlen, &rlen);
2940
2941    closesocket(fd);
2942    if (ret) {
2943        goto out;
2944    }
2945
2946    sn_tab = g_new0(QEMUSnapshotInfo, nr);
2947
2948    /* calculate a vdi id with hash function */
2949    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2950    start_nr = hval & (SD_NR_VDIS - 1);
2951
2952    fd = connect_to_sdog(s, &local_err);
2953    if (fd < 0) {
2954        error_report_err(local_err);
2955        ret = fd;
2956        goto out;
2957    }
2958
2959    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2960        if (!test_bit(vid, vdi_inuse)) {
2961            break;
2962        }
2963
2964        /* we don't need to read entire object */
2965        ret = read_object(fd, s->bs, (char *)inode,
2966                          vid_to_vdi_oid(vid),
2967                          0, SD_INODE_HEADER_SIZE, 0,
2968                          s->cache_flags);
2969
2970        if (ret) {
2971            continue;
2972        }
2973
2974        if (!strcmp(inode->name, s->name) && is_snapshot(inode)) {
2975            sn_tab[found].date_sec = inode->snap_ctime >> 32;
2976            sn_tab[found].date_nsec = inode->snap_ctime & 0xffffffff;
2977            sn_tab[found].vm_state_size = inode->vm_state_size;
2978            sn_tab[found].vm_clock_nsec = inode->vm_clock_nsec;
2979
2980            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2981                     "%" PRIu32, inode->snap_id);
2982            pstrcpy(sn_tab[found].name,
2983                    MIN(sizeof(sn_tab[found].name), sizeof(inode->tag)),
2984                    inode->tag);
2985            found++;
2986        }
2987    }
2988
2989    closesocket(fd);
2990out:
2991    *psn_tab = sn_tab;
2992
2993    g_free(vdi_inuse);
2994    g_free(inode);
2995
2996    if (ret < 0) {
2997        return ret;
2998    }
2999
3000    return found;
3001}
3002
3003static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
3004                                int64_t pos, int size, int load)
3005{
3006    Error *local_err = NULL;
3007    bool create;
3008    int fd, ret = 0, remaining = size;
3009    unsigned int data_len;
3010    uint64_t vmstate_oid;
3011    uint64_t offset;
3012    uint32_t vdi_index;
3013    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
3014    uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
3015
3016    fd = connect_to_sdog(s, &local_err);
3017    if (fd < 0) {
3018        error_report_err(local_err);
3019        return fd;
3020    }
3021
3022    while (remaining) {
3023        vdi_index = pos / object_size;
3024        offset = pos % object_size;
3025
3026        data_len = MIN(remaining, object_size - offset);
3027
3028        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
3029
3030        create = (offset == 0);
3031        if (load) {
3032            ret = read_object(fd, s->bs, (char *)data, vmstate_oid,
3033                              s->inode.nr_copies, data_len, offset,
3034                              s->cache_flags);
3035        } else {
3036            ret = write_object(fd, s->bs, (char *)data, vmstate_oid,
3037                               s->inode.nr_copies, data_len, offset, create,
3038                               s->cache_flags);
3039        }
3040
3041        if (ret < 0) {
3042            error_report("failed to save vmstate %s", strerror(errno));
3043            goto cleanup;
3044        }
3045
3046        pos += data_len;
3047        data += data_len;
3048        remaining -= data_len;
3049    }
3050    ret = size;
3051cleanup:
3052    closesocket(fd);
3053    return ret;
3054}
3055
3056static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3057                           int64_t pos)
3058{
3059    BDRVSheepdogState *s = bs->opaque;
3060    void *buf;
3061    int ret;
3062
3063    buf = qemu_blockalign(bs, qiov->size);
3064    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
3065    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
3066    qemu_vfree(buf);
3067
3068    return ret;
3069}
3070
3071static int sd_load_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3072                           int64_t pos)
3073{
3074    BDRVSheepdogState *s = bs->opaque;
3075    void *buf;
3076    int ret;
3077
3078    buf = qemu_blockalign(bs, qiov->size);
3079    ret = do_load_save_vmstate(s, buf, pos, qiov->size, 1);
3080    qemu_iovec_from_buf(qiov, 0, buf, qiov->size);
3081    qemu_vfree(buf);
3082
3083    return ret;
3084}
3085
3086
3087static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
3088                                      int bytes)
3089{
3090    SheepdogAIOCB acb;
3091    BDRVSheepdogState *s = bs->opaque;
3092    QEMUIOVector discard_iov;
3093    struct iovec iov;
3094    uint32_t zero = 0;
3095
3096    if (!s->discard_supported) {
3097        return 0;
3098    }
3099
3100    memset(&discard_iov, 0, sizeof(discard_iov));
3101    memset(&iov, 0, sizeof(iov));
3102    iov.iov_base = &zero;
3103    iov.iov_len = sizeof(zero);
3104    discard_iov.iov = &iov;
3105    discard_iov.niov = 1;
3106    if (!QEMU_IS_ALIGNED(offset | bytes, BDRV_SECTOR_SIZE)) {
3107        return -ENOTSUP;
3108    }
3109    sd_aio_setup(&acb, s, &discard_iov, offset >> BDRV_SECTOR_BITS,
3110                 bytes >> BDRV_SECTOR_BITS, AIOCB_DISCARD_OBJ);
3111    sd_co_rw_vector(&acb);
3112    sd_aio_complete(&acb);
3113
3114    return acb.ret;
3115}
3116
3117static coroutine_fn int
3118sd_co_block_status(BlockDriverState *bs, bool want_zero, int64_t offset,
3119                   int64_t bytes, int64_t *pnum, int64_t *map,
3120                   BlockDriverState **file)
3121{
3122    BDRVSheepdogState *s = bs->opaque;
3123    SheepdogInode *inode = &s->inode;
3124    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3125    unsigned long start = offset / object_size,
3126                  end = DIV_ROUND_UP(offset + bytes, object_size);
3127    unsigned long idx;
3128    *map = offset;
3129    int ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
3130
3131    for (idx = start; idx < end; idx++) {
3132        if (inode->data_vdi_id[idx] == 0) {
3133            break;
3134        }
3135    }
3136    if (idx == start) {
3137        /* Get the longest length of unallocated sectors */
3138        ret = 0;
3139        for (idx = start + 1; idx < end; idx++) {
3140            if (inode->data_vdi_id[idx] != 0) {
3141                break;
3142            }
3143        }
3144    }
3145
3146    *pnum = (idx - start) * object_size;
3147    if (*pnum > bytes) {
3148        *pnum = bytes;
3149    }
3150    if (ret > 0 && ret & BDRV_BLOCK_OFFSET_VALID) {
3151        *file = bs;
3152    }
3153    return ret;
3154}
3155
3156static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
3157{
3158    BDRVSheepdogState *s = bs->opaque;
3159    SheepdogInode *inode = &s->inode;
3160    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3161    unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
3162    uint64_t size = 0;
3163
3164    for (i = 0; i < last; i++) {
3165        if (inode->data_vdi_id[i] == 0) {
3166            continue;
3167        }
3168        size += object_size;
3169    }
3170    return size;
3171}
3172
3173static QemuOptsList sd_create_opts = {
3174    .name = "sheepdog-create-opts",
3175    .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
3176    .desc = {
3177        {
3178            .name = BLOCK_OPT_SIZE,
3179            .type = QEMU_OPT_SIZE,
3180            .help = "Virtual disk size"
3181        },
3182        {
3183            .name = BLOCK_OPT_BACKING_FILE,
3184            .type = QEMU_OPT_STRING,
3185            .help = "File name of a base image"
3186        },
3187        {
3188            .name = BLOCK_OPT_PREALLOC,
3189            .type = QEMU_OPT_STRING,
3190            .help = "Preallocation mode (allowed values: off, full)"
3191        },
3192        {
3193            .name = BLOCK_OPT_REDUNDANCY,
3194            .type = QEMU_OPT_STRING,
3195            .help = "Redundancy of the image"
3196        },
3197        {
3198            .name = BLOCK_OPT_OBJECT_SIZE,
3199            .type = QEMU_OPT_SIZE,
3200            .help = "Object size of the image"
3201        },
3202        { /* end of list */ }
3203    }
3204};
3205
3206static const char *const sd_strong_runtime_opts[] = {
3207    "vdi",
3208    "snap-id",
3209    "tag",
3210    "server.",
3211
3212    NULL
3213};
3214
3215static BlockDriver bdrv_sheepdog = {
3216    .format_name                  = "sheepdog",
3217    .protocol_name                = "sheepdog",
3218    .instance_size                = sizeof(BDRVSheepdogState),
3219    .bdrv_parse_filename          = sd_parse_filename,
3220    .bdrv_file_open               = sd_open,
3221    .bdrv_reopen_prepare          = sd_reopen_prepare,
3222    .bdrv_reopen_commit           = sd_reopen_commit,
3223    .bdrv_reopen_abort            = sd_reopen_abort,
3224    .bdrv_close                   = sd_close,
3225    .bdrv_co_create               = sd_co_create,
3226    .bdrv_co_create_opts          = sd_co_create_opts,
3227    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3228    .bdrv_getlength               = sd_getlength,
3229    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3230    .bdrv_co_truncate             = sd_co_truncate,
3231
3232    .bdrv_co_readv                = sd_co_readv,
3233    .bdrv_co_writev               = sd_co_writev,
3234    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3235    .bdrv_co_pdiscard             = sd_co_pdiscard,
3236    .bdrv_co_block_status         = sd_co_block_status,
3237
3238    .bdrv_snapshot_create         = sd_snapshot_create,
3239    .bdrv_snapshot_goto           = sd_snapshot_goto,
3240    .bdrv_snapshot_delete         = sd_snapshot_delete,
3241    .bdrv_snapshot_list           = sd_snapshot_list,
3242
3243    .bdrv_save_vmstate            = sd_save_vmstate,
3244    .bdrv_load_vmstate            = sd_load_vmstate,
3245
3246    .bdrv_detach_aio_context      = sd_detach_aio_context,
3247    .bdrv_attach_aio_context      = sd_attach_aio_context,
3248
3249    .create_opts                  = &sd_create_opts,
3250    .strong_runtime_opts          = sd_strong_runtime_opts,
3251};
3252
3253static BlockDriver bdrv_sheepdog_tcp = {
3254    .format_name                  = "sheepdog",
3255    .protocol_name                = "sheepdog+tcp",
3256    .instance_size                = sizeof(BDRVSheepdogState),
3257    .bdrv_parse_filename          = sd_parse_filename,
3258    .bdrv_file_open               = sd_open,
3259    .bdrv_reopen_prepare          = sd_reopen_prepare,
3260    .bdrv_reopen_commit           = sd_reopen_commit,
3261    .bdrv_reopen_abort            = sd_reopen_abort,
3262    .bdrv_close                   = sd_close,
3263    .bdrv_co_create               = sd_co_create,
3264    .bdrv_co_create_opts          = sd_co_create_opts,
3265    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3266    .bdrv_getlength               = sd_getlength,
3267    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3268    .bdrv_co_truncate             = sd_co_truncate,
3269
3270    .bdrv_co_readv                = sd_co_readv,
3271    .bdrv_co_writev               = sd_co_writev,
3272    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3273    .bdrv_co_pdiscard             = sd_co_pdiscard,
3274    .bdrv_co_block_status         = sd_co_block_status,
3275
3276    .bdrv_snapshot_create         = sd_snapshot_create,
3277    .bdrv_snapshot_goto           = sd_snapshot_goto,
3278    .bdrv_snapshot_delete         = sd_snapshot_delete,
3279    .bdrv_snapshot_list           = sd_snapshot_list,
3280
3281    .bdrv_save_vmstate            = sd_save_vmstate,
3282    .bdrv_load_vmstate            = sd_load_vmstate,
3283
3284    .bdrv_detach_aio_context      = sd_detach_aio_context,
3285    .bdrv_attach_aio_context      = sd_attach_aio_context,
3286
3287    .create_opts                  = &sd_create_opts,
3288    .strong_runtime_opts          = sd_strong_runtime_opts,
3289};
3290
3291static BlockDriver bdrv_sheepdog_unix = {
3292    .format_name                  = "sheepdog",
3293    .protocol_name                = "sheepdog+unix",
3294    .instance_size                = sizeof(BDRVSheepdogState),
3295    .bdrv_parse_filename          = sd_parse_filename,
3296    .bdrv_file_open               = sd_open,
3297    .bdrv_reopen_prepare          = sd_reopen_prepare,
3298    .bdrv_reopen_commit           = sd_reopen_commit,
3299    .bdrv_reopen_abort            = sd_reopen_abort,
3300    .bdrv_close                   = sd_close,
3301    .bdrv_co_create               = sd_co_create,
3302    .bdrv_co_create_opts          = sd_co_create_opts,
3303    .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3304    .bdrv_getlength               = sd_getlength,
3305    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3306    .bdrv_co_truncate             = sd_co_truncate,
3307
3308    .bdrv_co_readv                = sd_co_readv,
3309    .bdrv_co_writev               = sd_co_writev,
3310    .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3311    .bdrv_co_pdiscard             = sd_co_pdiscard,
3312    .bdrv_co_block_status         = sd_co_block_status,
3313
3314    .bdrv_snapshot_create         = sd_snapshot_create,
3315    .bdrv_snapshot_goto           = sd_snapshot_goto,
3316    .bdrv_snapshot_delete         = sd_snapshot_delete,
3317    .bdrv_snapshot_list           = sd_snapshot_list,
3318
3319    .bdrv_save_vmstate            = sd_save_vmstate,
3320    .bdrv_load_vmstate            = sd_load_vmstate,
3321
3322    .bdrv_detach_aio_context      = sd_detach_aio_context,
3323    .bdrv_attach_aio_context      = sd_attach_aio_context,
3324
3325    .create_opts                  = &sd_create_opts,
3326    .strong_runtime_opts          = sd_strong_runtime_opts,
3327};
3328
3329static void bdrv_sheepdog_init(void)
3330{
3331    bdrv_register(&bdrv_sheepdog);
3332    bdrv_register(&bdrv_sheepdog_tcp);
3333    bdrv_register(&bdrv_sheepdog_unix);
3334}
3335block_init(bdrv_sheepdog_init);
3336