qemu/block/sheepdog.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public License version
   6 * 2 as published by the Free Software Foundation.
   7 *
   8 * You should have received a copy of the GNU General Public License
   9 * along with this program. If not, see <http://www.gnu.org/licenses/>.
  10 *
  11 * Contributions after 2012-01-13 are licensed under the terms of the
  12 * GNU GPL, version 2 or (at your option) any later version.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qapi/error.h"
  17#include "qemu/uri.h"
  18#include "qemu/error-report.h"
  19#include "qemu/sockets.h"
  20#include "block/block_int.h"
  21#include "sysemu/block-backend.h"
  22#include "qemu/bitops.h"
  23#include "qemu/cutils.h"
  24
  25#define SD_PROTO_VER 0x01
  26
  27#define SD_DEFAULT_ADDR "localhost"
  28#define SD_DEFAULT_PORT 7000
  29
  30#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
  31#define SD_OP_READ_OBJ       0x02
  32#define SD_OP_WRITE_OBJ      0x03
  33/* 0x04 is used internally by Sheepdog */
  34
  35#define SD_OP_NEW_VDI        0x11
  36#define SD_OP_LOCK_VDI       0x12
  37#define SD_OP_RELEASE_VDI    0x13
  38#define SD_OP_GET_VDI_INFO   0x14
  39#define SD_OP_READ_VDIS      0x15
  40#define SD_OP_FLUSH_VDI      0x16
  41#define SD_OP_DEL_VDI        0x17
  42#define SD_OP_GET_CLUSTER_DEFAULT   0x18
  43
  44#define SD_FLAG_CMD_WRITE    0x01
  45#define SD_FLAG_CMD_COW      0x02
  46#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
  47#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
  48
  49#define SD_RES_SUCCESS       0x00 /* Success */
  50#define SD_RES_UNKNOWN       0x01 /* Unknown error */
  51#define SD_RES_NO_OBJ        0x02 /* No object found */
  52#define SD_RES_EIO           0x03 /* I/O error */
  53#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
  54#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
  55#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
  56#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
  57#define SD_RES_NO_VDI        0x08 /* No vdi found */
  58#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
  59#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
  60#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
  61#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
  62#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
  63#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
  64#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
  65#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
  66#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
  67#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
  68#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
  69#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
  70#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
  71#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
  72#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
  73#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
  74#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
  75#define SD_RES_READONLY      0x1A /* Object is read-only */
  76
  77/*
  78 * Object ID rules
  79 *
  80 *  0 - 19 (20 bits): data object space
  81 * 20 - 31 (12 bits): reserved data object space
  82 * 32 - 55 (24 bits): vdi object space
  83 * 56 - 59 ( 4 bits): reserved vdi object space
  84 * 60 - 63 ( 4 bits): object type identifier space
  85 */
  86
  87#define VDI_SPACE_SHIFT   32
  88#define VDI_BIT (UINT64_C(1) << 63)
  89#define VMSTATE_BIT (UINT64_C(1) << 62)
  90#define MAX_DATA_OBJS (UINT64_C(1) << 20)
  91#define MAX_CHILDREN 1024
  92#define SD_MAX_VDI_LEN 256
  93#define SD_MAX_VDI_TAG_LEN 256
  94#define SD_NR_VDIS   (1U << 24)
  95#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
  96#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
  97#define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
  98/*
  99 * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
 100 * (SD_EC_MAX_STRIP - 1) for parity strips
 101 *
 102 * SD_MAX_COPIES is sum of number of data strips and parity strips.
 103 */
 104#define SD_EC_MAX_STRIP 16
 105#define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
 106
 107#define SD_INODE_SIZE (sizeof(SheepdogInode))
 108#define CURRENT_VDI_ID 0
 109
 110#define LOCK_TYPE_NORMAL 0
 111#define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
 112
 113typedef struct SheepdogReq {
 114    uint8_t proto_ver;
 115    uint8_t opcode;
 116    uint16_t flags;
 117    uint32_t epoch;
 118    uint32_t id;
 119    uint32_t data_length;
 120    uint32_t opcode_specific[8];
 121} SheepdogReq;
 122
 123typedef struct SheepdogRsp {
 124    uint8_t proto_ver;
 125    uint8_t opcode;
 126    uint16_t flags;
 127    uint32_t epoch;
 128    uint32_t id;
 129    uint32_t data_length;
 130    uint32_t result;
 131    uint32_t opcode_specific[7];
 132} SheepdogRsp;
 133
 134typedef struct SheepdogObjReq {
 135    uint8_t proto_ver;
 136    uint8_t opcode;
 137    uint16_t flags;
 138    uint32_t epoch;
 139    uint32_t id;
 140    uint32_t data_length;
 141    uint64_t oid;
 142    uint64_t cow_oid;
 143    uint8_t copies;
 144    uint8_t copy_policy;
 145    uint8_t reserved[6];
 146    uint64_t offset;
 147} SheepdogObjReq;
 148
 149typedef struct SheepdogObjRsp {
 150    uint8_t proto_ver;
 151    uint8_t opcode;
 152    uint16_t flags;
 153    uint32_t epoch;
 154    uint32_t id;
 155    uint32_t data_length;
 156    uint32_t result;
 157    uint8_t copies;
 158    uint8_t copy_policy;
 159    uint8_t reserved[2];
 160    uint32_t pad[6];
 161} SheepdogObjRsp;
 162
 163typedef struct SheepdogVdiReq {
 164    uint8_t proto_ver;
 165    uint8_t opcode;
 166    uint16_t flags;
 167    uint32_t epoch;
 168    uint32_t id;
 169    uint32_t data_length;
 170    uint64_t vdi_size;
 171    uint32_t base_vdi_id;
 172    uint8_t copies;
 173    uint8_t copy_policy;
 174    uint8_t store_policy;
 175    uint8_t block_size_shift;
 176    uint32_t snapid;
 177    uint32_t type;
 178    uint32_t pad[2];
 179} SheepdogVdiReq;
 180
 181typedef struct SheepdogVdiRsp {
 182    uint8_t proto_ver;
 183    uint8_t opcode;
 184    uint16_t flags;
 185    uint32_t epoch;
 186    uint32_t id;
 187    uint32_t data_length;
 188    uint32_t result;
 189    uint32_t rsvd;
 190    uint32_t vdi_id;
 191    uint32_t pad[5];
 192} SheepdogVdiRsp;
 193
 194typedef struct SheepdogClusterRsp {
 195    uint8_t proto_ver;
 196    uint8_t opcode;
 197    uint16_t flags;
 198    uint32_t epoch;
 199    uint32_t id;
 200    uint32_t data_length;
 201    uint32_t result;
 202    uint8_t nr_copies;
 203    uint8_t copy_policy;
 204    uint8_t block_size_shift;
 205    uint8_t __pad1;
 206    uint32_t __pad2[6];
 207} SheepdogClusterRsp;
 208
 209typedef struct SheepdogInode {
 210    char name[SD_MAX_VDI_LEN];
 211    char tag[SD_MAX_VDI_TAG_LEN];
 212    uint64_t ctime;
 213    uint64_t snap_ctime;
 214    uint64_t vm_clock_nsec;
 215    uint64_t vdi_size;
 216    uint64_t vm_state_size;
 217    uint16_t copy_policy;
 218    uint8_t nr_copies;
 219    uint8_t block_size_shift;
 220    uint32_t snap_id;
 221    uint32_t vdi_id;
 222    uint32_t parent_vdi_id;
 223    uint32_t child_vdi_id[MAX_CHILDREN];
 224    uint32_t data_vdi_id[MAX_DATA_OBJS];
 225} SheepdogInode;
 226
 227#define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
 228
 229/*
 230 * 64 bit FNV-1a non-zero initial basis
 231 */
 232#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
 233
 234/*
 235 * 64 bit Fowler/Noll/Vo FNV-1a hash code
 236 */
 237static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 238{
 239    unsigned char *bp = buf;
 240    unsigned char *be = bp + len;
 241    while (bp < be) {
 242        hval ^= (uint64_t) *bp++;
 243        hval += (hval << 1) + (hval << 4) + (hval << 5) +
 244            (hval << 7) + (hval << 8) + (hval << 40);
 245    }
 246    return hval;
 247}
 248
 249static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
 250{
 251    return inode->vdi_id == inode->data_vdi_id[idx];
 252}
 253
 254static inline bool is_data_obj(uint64_t oid)
 255{
 256    return !(VDI_BIT & oid);
 257}
 258
 259static inline uint64_t data_oid_to_idx(uint64_t oid)
 260{
 261    return oid & (MAX_DATA_OBJS - 1);
 262}
 263
 264static inline uint32_t oid_to_vid(uint64_t oid)
 265{
 266    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
 267}
 268
 269static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 270{
 271    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
 272}
 273
 274static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
 275{
 276    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 277}
 278
 279static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
 280{
 281    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 282}
 283
 284static inline bool is_snapshot(struct SheepdogInode *inode)
 285{
 286    return !!inode->snap_ctime;
 287}
 288
 289static inline size_t count_data_objs(const struct SheepdogInode *inode)
 290{
 291    return DIV_ROUND_UP(inode->vdi_size,
 292                        (1UL << inode->block_size_shift));
 293}
 294
 295#undef DPRINTF
 296#ifdef DEBUG_SDOG
 297#define DEBUG_SDOG_PRINT 1
 298#else
 299#define DEBUG_SDOG_PRINT 0
 300#endif
 301#define DPRINTF(fmt, args...)                                           \
 302    do {                                                                \
 303        if (DEBUG_SDOG_PRINT) {                                         \
 304            fprintf(stderr, "%s %d: " fmt, __func__, __LINE__, ##args); \
 305        }                                                               \
 306    } while (0)
 307
 308typedef struct SheepdogAIOCB SheepdogAIOCB;
 309
 310typedef struct AIOReq {
 311    SheepdogAIOCB *aiocb;
 312    unsigned int iov_offset;
 313
 314    uint64_t oid;
 315    uint64_t base_oid;
 316    uint64_t offset;
 317    unsigned int data_len;
 318    uint8_t flags;
 319    uint32_t id;
 320    bool create;
 321
 322    QLIST_ENTRY(AIOReq) aio_siblings;
 323} AIOReq;
 324
 325enum AIOCBState {
 326    AIOCB_WRITE_UDATA,
 327    AIOCB_READ_UDATA,
 328    AIOCB_FLUSH_CACHE,
 329    AIOCB_DISCARD_OBJ,
 330};
 331
 332#define AIOCBOverlapping(x, y)                                 \
 333    (!(x->max_affect_data_idx < y->min_affect_data_idx          \
 334       || y->max_affect_data_idx < x->min_affect_data_idx))
 335
 336struct SheepdogAIOCB {
 337    BlockAIOCB common;
 338
 339    QEMUIOVector *qiov;
 340
 341    int64_t sector_num;
 342    int nb_sectors;
 343
 344    int ret;
 345    enum AIOCBState aiocb_type;
 346
 347    Coroutine *coroutine;
 348    void (*aio_done_func)(SheepdogAIOCB *);
 349
 350    bool cancelable;
 351    int nr_pending;
 352
 353    uint32_t min_affect_data_idx;
 354    uint32_t max_affect_data_idx;
 355
 356    /*
 357     * The difference between affect_data_idx and dirty_data_idx:
 358     * affect_data_idx represents range of index of all request types.
 359     * dirty_data_idx represents range of index updated by COW requests.
 360     * dirty_data_idx is used for updating an inode object.
 361     */
 362    uint32_t min_dirty_data_idx;
 363    uint32_t max_dirty_data_idx;
 364
 365    QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
 366};
 367
 368typedef struct BDRVSheepdogState {
 369    BlockDriverState *bs;
 370    AioContext *aio_context;
 371
 372    SheepdogInode inode;
 373
 374    char name[SD_MAX_VDI_LEN];
 375    bool is_snapshot;
 376    uint32_t cache_flags;
 377    bool discard_supported;
 378
 379    char *host_spec;
 380    bool is_unix;
 381    int fd;
 382
 383    CoMutex lock;
 384    Coroutine *co_send;
 385    Coroutine *co_recv;
 386
 387    uint32_t aioreq_seq_num;
 388
 389    /* Every aio request must be linked to either of these queues. */
 390    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
 391    QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
 392
 393    CoQueue overlapping_queue;
 394    QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
 395} BDRVSheepdogState;
 396
 397typedef struct BDRVSheepdogReopenState {
 398    int fd;
 399    int cache_flags;
 400} BDRVSheepdogReopenState;
 401
 402static const char * sd_strerror(int err)
 403{
 404    int i;
 405
 406    static const struct {
 407        int err;
 408        const char *desc;
 409    } errors[] = {
 410        {SD_RES_SUCCESS, "Success"},
 411        {SD_RES_UNKNOWN, "Unknown error"},
 412        {SD_RES_NO_OBJ, "No object found"},
 413        {SD_RES_EIO, "I/O error"},
 414        {SD_RES_VDI_EXIST, "VDI exists already"},
 415        {SD_RES_INVALID_PARMS, "Invalid parameters"},
 416        {SD_RES_SYSTEM_ERROR, "System error"},
 417        {SD_RES_VDI_LOCKED, "VDI is already locked"},
 418        {SD_RES_NO_VDI, "No vdi found"},
 419        {SD_RES_NO_BASE_VDI, "No base VDI found"},
 420        {SD_RES_VDI_READ, "Failed read the requested VDI"},
 421        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
 422        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
 423        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
 424        {SD_RES_NO_TAG, "Failed to find the requested tag"},
 425        {SD_RES_STARTUP, "The system is still booting"},
 426        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
 427        {SD_RES_SHUTDOWN, "The system is shutting down"},
 428        {SD_RES_NO_MEM, "Out of memory on the server"},
 429        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
 430        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
 431        {SD_RES_NO_SPACE, "Server has no space for new objects"},
 432        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
 433        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
 434        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
 435        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
 436        {SD_RES_READONLY, "Object is read-only"},
 437    };
 438
 439    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
 440        if (errors[i].err == err) {
 441            return errors[i].desc;
 442        }
 443    }
 444
 445    return "Invalid error code";
 446}
 447
 448/*
 449 * Sheepdog I/O handling:
 450 *
 451 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
 452 *    link the requests to the inflight_list in the
 453 *    BDRVSheepdogState.  The function exits without waiting for
 454 *    receiving the response.
 455 *
 456 * 2. We receive the response in aio_read_response, the fd handler to
 457 *    the sheepdog connection.  If metadata update is needed, we send
 458 *    the write request to the vdi object in sd_write_done, the write
 459 *    completion function.  We switch back to sd_co_readv/writev after
 460 *    all the requests belonging to the AIOCB are finished.
 461 */
 462
 463static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
 464                                    uint64_t oid, unsigned int data_len,
 465                                    uint64_t offset, uint8_t flags, bool create,
 466                                    uint64_t base_oid, unsigned int iov_offset)
 467{
 468    AIOReq *aio_req;
 469
 470    aio_req = g_malloc(sizeof(*aio_req));
 471    aio_req->aiocb = acb;
 472    aio_req->iov_offset = iov_offset;
 473    aio_req->oid = oid;
 474    aio_req->base_oid = base_oid;
 475    aio_req->offset = offset;
 476    aio_req->data_len = data_len;
 477    aio_req->flags = flags;
 478    aio_req->id = s->aioreq_seq_num++;
 479    aio_req->create = create;
 480
 481    acb->nr_pending++;
 482    return aio_req;
 483}
 484
 485static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 486{
 487    SheepdogAIOCB *acb = aio_req->aiocb;
 488
 489    acb->cancelable = false;
 490    QLIST_REMOVE(aio_req, aio_siblings);
 491    g_free(aio_req);
 492
 493    acb->nr_pending--;
 494}
 495
 496static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
 497{
 498    qemu_coroutine_enter(acb->coroutine);
 499    qemu_aio_unref(acb);
 500}
 501
 502/*
 503 * Check whether the specified acb can be canceled
 504 *
 505 * We can cancel aio when any request belonging to the acb is:
 506 *  - Not processed by the sheepdog server.
 507 *  - Not linked to the inflight queue.
 508 */
 509static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
 510{
 511    BDRVSheepdogState *s = acb->common.bs->opaque;
 512    AIOReq *aioreq;
 513
 514    if (!acb->cancelable) {
 515        return false;
 516    }
 517
 518    QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
 519        if (aioreq->aiocb == acb) {
 520            return false;
 521        }
 522    }
 523
 524    return true;
 525}
 526
 527static void sd_aio_cancel(BlockAIOCB *blockacb)
 528{
 529    SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
 530    BDRVSheepdogState *s = acb->common.bs->opaque;
 531    AIOReq *aioreq, *next;
 532
 533    if (sd_acb_cancelable(acb)) {
 534        /* Remove outstanding requests from failed queue.  */
 535        QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
 536                           next) {
 537            if (aioreq->aiocb == acb) {
 538                free_aio_req(s, aioreq);
 539            }
 540        }
 541
 542        assert(acb->nr_pending == 0);
 543        if (acb->common.cb) {
 544            acb->common.cb(acb->common.opaque, -ECANCELED);
 545        }
 546        sd_finish_aiocb(acb);
 547    }
 548}
 549
 550static const AIOCBInfo sd_aiocb_info = {
 551    .aiocb_size     = sizeof(SheepdogAIOCB),
 552    .cancel_async   = sd_aio_cancel,
 553};
 554
 555static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
 556                                   int64_t sector_num, int nb_sectors)
 557{
 558    SheepdogAIOCB *acb;
 559    uint32_t object_size;
 560    BDRVSheepdogState *s = bs->opaque;
 561
 562    object_size = (UINT32_C(1) << s->inode.block_size_shift);
 563
 564    acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
 565
 566    acb->qiov = qiov;
 567
 568    acb->sector_num = sector_num;
 569    acb->nb_sectors = nb_sectors;
 570
 571    acb->aio_done_func = NULL;
 572    acb->cancelable = true;
 573    acb->coroutine = qemu_coroutine_self();
 574    acb->ret = 0;
 575    acb->nr_pending = 0;
 576
 577    acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
 578    acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
 579                              acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
 580
 581    acb->min_dirty_data_idx = UINT32_MAX;
 582    acb->max_dirty_data_idx = 0;
 583
 584    return acb;
 585}
 586
 587/* Return -EIO in case of error, file descriptor on success */
 588static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
 589{
 590    int fd;
 591
 592    if (s->is_unix) {
 593        fd = unix_connect(s->host_spec, errp);
 594    } else {
 595        fd = inet_connect(s->host_spec, errp);
 596
 597        if (fd >= 0) {
 598            int ret = socket_set_nodelay(fd);
 599            if (ret < 0) {
 600                error_report("%s", strerror(errno));
 601            }
 602        }
 603    }
 604
 605    if (fd >= 0) {
 606        qemu_set_nonblock(fd);
 607    } else {
 608        fd = -EIO;
 609    }
 610
 611    return fd;
 612}
 613
 614/* Return 0 on success and -errno in case of error */
 615static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
 616                                    unsigned int *wlen)
 617{
 618    int ret;
 619
 620    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
 621    if (ret != sizeof(*hdr)) {
 622        error_report("failed to send a req, %s", strerror(errno));
 623        return -errno;
 624    }
 625
 626    ret = qemu_co_send(sockfd, data, *wlen);
 627    if (ret != *wlen) {
 628        error_report("failed to send a req, %s", strerror(errno));
 629        return -errno;
 630    }
 631
 632    return ret;
 633}
 634
 635static void restart_co_req(void *opaque)
 636{
 637    Coroutine *co = opaque;
 638
 639    qemu_coroutine_enter(co);
 640}
 641
 642typedef struct SheepdogReqCo {
 643    int sockfd;
 644    AioContext *aio_context;
 645    SheepdogReq *hdr;
 646    void *data;
 647    unsigned int *wlen;
 648    unsigned int *rlen;
 649    int ret;
 650    bool finished;
 651} SheepdogReqCo;
 652
 653static coroutine_fn void do_co_req(void *opaque)
 654{
 655    int ret;
 656    Coroutine *co;
 657    SheepdogReqCo *srco = opaque;
 658    int sockfd = srco->sockfd;
 659    SheepdogReq *hdr = srco->hdr;
 660    void *data = srco->data;
 661    unsigned int *wlen = srco->wlen;
 662    unsigned int *rlen = srco->rlen;
 663
 664    co = qemu_coroutine_self();
 665    aio_set_fd_handler(srco->aio_context, sockfd, false,
 666                       NULL, restart_co_req, co);
 667
 668    ret = send_co_req(sockfd, hdr, data, wlen);
 669    if (ret < 0) {
 670        goto out;
 671    }
 672
 673    aio_set_fd_handler(srco->aio_context, sockfd, false,
 674                       restart_co_req, NULL, co);
 675
 676    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
 677    if (ret != sizeof(*hdr)) {
 678        error_report("failed to get a rsp, %s", strerror(errno));
 679        ret = -errno;
 680        goto out;
 681    }
 682
 683    if (*rlen > hdr->data_length) {
 684        *rlen = hdr->data_length;
 685    }
 686
 687    if (*rlen) {
 688        ret = qemu_co_recv(sockfd, data, *rlen);
 689        if (ret != *rlen) {
 690            error_report("failed to get the data, %s", strerror(errno));
 691            ret = -errno;
 692            goto out;
 693        }
 694    }
 695    ret = 0;
 696out:
 697    /* there is at most one request for this sockfd, so it is safe to
 698     * set each handler to NULL. */
 699    aio_set_fd_handler(srco->aio_context, sockfd, false,
 700                       NULL, NULL, NULL);
 701
 702    srco->ret = ret;
 703    srco->finished = true;
 704}
 705
 706/*
 707 * Send the request to the sheep in a synchronous manner.
 708 *
 709 * Return 0 on success, -errno in case of error.
 710 */
 711static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
 712                  void *data, unsigned int *wlen, unsigned int *rlen)
 713{
 714    Coroutine *co;
 715    SheepdogReqCo srco = {
 716        .sockfd = sockfd,
 717        .aio_context = aio_context,
 718        .hdr = hdr,
 719        .data = data,
 720        .wlen = wlen,
 721        .rlen = rlen,
 722        .ret = 0,
 723        .finished = false,
 724    };
 725
 726    if (qemu_in_coroutine()) {
 727        do_co_req(&srco);
 728    } else {
 729        co = qemu_coroutine_create(do_co_req, &srco);
 730        qemu_coroutine_enter(co);
 731        while (!srco.finished) {
 732            aio_poll(aio_context, true);
 733        }
 734    }
 735
 736    return srco.ret;
 737}
 738
 739static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 740                                         struct iovec *iov, int niov,
 741                                         enum AIOCBState aiocb_type);
 742static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
 743static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
 744static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
 745static void co_write_request(void *opaque);
 746
 747static coroutine_fn void reconnect_to_sdog(void *opaque)
 748{
 749    BDRVSheepdogState *s = opaque;
 750    AIOReq *aio_req, *next;
 751
 752    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
 753                       NULL, NULL);
 754    close(s->fd);
 755    s->fd = -1;
 756
 757    /* Wait for outstanding write requests to be completed. */
 758    while (s->co_send != NULL) {
 759        co_write_request(opaque);
 760    }
 761
 762    /* Try to reconnect the sheepdog server every one second. */
 763    while (s->fd < 0) {
 764        Error *local_err = NULL;
 765        s->fd = get_sheep_fd(s, &local_err);
 766        if (s->fd < 0) {
 767            DPRINTF("Wait for connection to be established\n");
 768            error_report_err(local_err);
 769            co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME,
 770                            1000000000ULL);
 771        }
 772    };
 773
 774    /*
 775     * Now we have to resend all the request in the inflight queue.  However,
 776     * resend_aioreq() can yield and newly created requests can be added to the
 777     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
 778     * have to move all the inflight requests to the failed queue before
 779     * resend_aioreq() is called.
 780     */
 781    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
 782        QLIST_REMOVE(aio_req, aio_siblings);
 783        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
 784    }
 785
 786    /* Resend all the failed aio requests. */
 787    while (!QLIST_EMPTY(&s->failed_aio_head)) {
 788        aio_req = QLIST_FIRST(&s->failed_aio_head);
 789        QLIST_REMOVE(aio_req, aio_siblings);
 790        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
 791        resend_aioreq(s, aio_req);
 792    }
 793}
 794
 795/*
 796 * Receive responses of the I/O requests.
 797 *
 798 * This function is registered as a fd handler, and called from the
 799 * main loop when s->fd is ready for reading responses.
 800 */
 801static void coroutine_fn aio_read_response(void *opaque)
 802{
 803    SheepdogObjRsp rsp;
 804    BDRVSheepdogState *s = opaque;
 805    int fd = s->fd;
 806    int ret;
 807    AIOReq *aio_req = NULL;
 808    SheepdogAIOCB *acb;
 809    uint64_t idx;
 810
 811    /* read a header */
 812    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
 813    if (ret != sizeof(rsp)) {
 814        error_report("failed to get the header, %s", strerror(errno));
 815        goto err;
 816    }
 817
 818    /* find the right aio_req from the inflight aio list */
 819    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
 820        if (aio_req->id == rsp.id) {
 821            break;
 822        }
 823    }
 824    if (!aio_req) {
 825        error_report("cannot find aio_req %x", rsp.id);
 826        goto err;
 827    }
 828
 829    acb = aio_req->aiocb;
 830
 831    switch (acb->aiocb_type) {
 832    case AIOCB_WRITE_UDATA:
 833        /* this coroutine context is no longer suitable for co_recv
 834         * because we may send data to update vdi objects */
 835        s->co_recv = NULL;
 836        if (!is_data_obj(aio_req->oid)) {
 837            break;
 838        }
 839        idx = data_oid_to_idx(aio_req->oid);
 840
 841        if (aio_req->create) {
 842            /*
 843             * If the object is newly created one, we need to update
 844             * the vdi object (metadata object).  min_dirty_data_idx
 845             * and max_dirty_data_idx are changed to include updated
 846             * index between them.
 847             */
 848            if (rsp.result == SD_RES_SUCCESS) {
 849                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
 850                acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
 851                acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
 852            }
 853        }
 854        break;
 855    case AIOCB_READ_UDATA:
 856        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
 857                            aio_req->iov_offset, rsp.data_length);
 858        if (ret != rsp.data_length) {
 859            error_report("failed to get the data, %s", strerror(errno));
 860            goto err;
 861        }
 862        break;
 863    case AIOCB_FLUSH_CACHE:
 864        if (rsp.result == SD_RES_INVALID_PARMS) {
 865            DPRINTF("disable cache since the server doesn't support it\n");
 866            s->cache_flags = SD_FLAG_CMD_DIRECT;
 867            rsp.result = SD_RES_SUCCESS;
 868        }
 869        break;
 870    case AIOCB_DISCARD_OBJ:
 871        switch (rsp.result) {
 872        case SD_RES_INVALID_PARMS:
 873            error_report("sheep(%s) doesn't support discard command",
 874                         s->host_spec);
 875            rsp.result = SD_RES_SUCCESS;
 876            s->discard_supported = false;
 877            break;
 878        default:
 879            break;
 880        }
 881    }
 882
 883    switch (rsp.result) {
 884    case SD_RES_SUCCESS:
 885        break;
 886    case SD_RES_READONLY:
 887        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
 888            ret = reload_inode(s, 0, "");
 889            if (ret < 0) {
 890                goto err;
 891            }
 892        }
 893        if (is_data_obj(aio_req->oid)) {
 894            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
 895                                           data_oid_to_idx(aio_req->oid));
 896        } else {
 897            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
 898        }
 899        resend_aioreq(s, aio_req);
 900        goto out;
 901    default:
 902        acb->ret = -EIO;
 903        error_report("%s", sd_strerror(rsp.result));
 904        break;
 905    }
 906
 907    free_aio_req(s, aio_req);
 908    if (!acb->nr_pending) {
 909        /*
 910         * We've finished all requests which belong to the AIOCB, so
 911         * we can switch back to sd_co_readv/writev now.
 912         */
 913        acb->aio_done_func(acb);
 914    }
 915out:
 916    s->co_recv = NULL;
 917    return;
 918err:
 919    s->co_recv = NULL;
 920    reconnect_to_sdog(opaque);
 921}
 922
 923static void co_read_response(void *opaque)
 924{
 925    BDRVSheepdogState *s = opaque;
 926
 927    if (!s->co_recv) {
 928        s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
 929    }
 930
 931    qemu_coroutine_enter(s->co_recv);
 932}
 933
 934static void co_write_request(void *opaque)
 935{
 936    BDRVSheepdogState *s = opaque;
 937
 938    qemu_coroutine_enter(s->co_send);
 939}
 940
 941/*
 942 * Return a socket descriptor to read/write objects.
 943 *
 944 * We cannot use this descriptor for other operations because
 945 * the block driver may be on waiting response from the server.
 946 */
 947static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
 948{
 949    int fd;
 950
 951    fd = connect_to_sdog(s, errp);
 952    if (fd < 0) {
 953        return fd;
 954    }
 955
 956    aio_set_fd_handler(s->aio_context, fd, false,
 957                       co_read_response, NULL, s);
 958    return fd;
 959}
 960
 961static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
 962                        char *vdi, uint32_t *snapid, char *tag)
 963{
 964    URI *uri;
 965    QueryParams *qp = NULL;
 966    int ret = 0;
 967
 968    uri = uri_parse(filename);
 969    if (!uri) {
 970        return -EINVAL;
 971    }
 972
 973    /* transport */
 974    if (!strcmp(uri->scheme, "sheepdog")) {
 975        s->is_unix = false;
 976    } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
 977        s->is_unix = false;
 978    } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
 979        s->is_unix = true;
 980    } else {
 981        ret = -EINVAL;
 982        goto out;
 983    }
 984
 985    if (uri->path == NULL || !strcmp(uri->path, "/")) {
 986        ret = -EINVAL;
 987        goto out;
 988    }
 989    pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
 990
 991    qp = query_params_parse(uri->query);
 992    if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
 993        ret = -EINVAL;
 994        goto out;
 995    }
 996
 997    if (s->is_unix) {
 998        /* sheepdog+unix:///vdiname?socket=path */
 999        if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
1000            ret = -EINVAL;
1001            goto out;
1002        }
1003        s->host_spec = g_strdup(qp->p[0].value);
1004    } else {
1005        /* sheepdog[+tcp]://[host:port]/vdiname */
1006        s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
1007                                       uri->port ?: SD_DEFAULT_PORT);
1008    }
1009
1010    /* snapshot tag */
1011    if (uri->fragment) {
1012        *snapid = strtoul(uri->fragment, NULL, 10);
1013        if (*snapid == 0) {
1014            pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
1015        }
1016    } else {
1017        *snapid = CURRENT_VDI_ID; /* search current vdi */
1018    }
1019
1020out:
1021    if (qp) {
1022        query_params_free(qp);
1023    }
1024    uri_free(uri);
1025    return ret;
1026}
1027
1028/*
1029 * Parse a filename (old syntax)
1030 *
1031 * filename must be one of the following formats:
1032 *   1. [vdiname]
1033 *   2. [vdiname]:[snapid]
1034 *   3. [vdiname]:[tag]
1035 *   4. [hostname]:[port]:[vdiname]
1036 *   5. [hostname]:[port]:[vdiname]:[snapid]
1037 *   6. [hostname]:[port]:[vdiname]:[tag]
1038 *
1039 * You can boot from the snapshot images by specifying `snapid` or
1040 * `tag'.
1041 *
1042 * You can run VMs outside the Sheepdog cluster by specifying
1043 * `hostname' and `port' (experimental).
1044 */
1045static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
1046                         char *vdi, uint32_t *snapid, char *tag)
1047{
1048    char *p, *q, *uri;
1049    const char *host_spec, *vdi_spec;
1050    int nr_sep, ret;
1051
1052    strstart(filename, "sheepdog:", (const char **)&filename);
1053    p = q = g_strdup(filename);
1054
1055    /* count the number of separators */
1056    nr_sep = 0;
1057    while (*p) {
1058        if (*p == ':') {
1059            nr_sep++;
1060        }
1061        p++;
1062    }
1063    p = q;
1064
1065    /* use the first two tokens as host_spec. */
1066    if (nr_sep >= 2) {
1067        host_spec = p;
1068        p = strchr(p, ':');
1069        p++;
1070        p = strchr(p, ':');
1071        *p++ = '\0';
1072    } else {
1073        host_spec = "";
1074    }
1075
1076    vdi_spec = p;
1077
1078    p = strchr(vdi_spec, ':');
1079    if (p) {
1080        *p++ = '#';
1081    }
1082
1083    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1084
1085    ret = sd_parse_uri(s, uri, vdi, snapid, tag);
1086
1087    g_free(q);
1088    g_free(uri);
1089
1090    return ret;
1091}
1092
1093static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1094                         uint32_t snapid, const char *tag, uint32_t *vid,
1095                         bool lock, Error **errp)
1096{
1097    int ret, fd;
1098    SheepdogVdiReq hdr;
1099    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1100    unsigned int wlen, rlen = 0;
1101    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
1102
1103    fd = connect_to_sdog(s, errp);
1104    if (fd < 0) {
1105        return fd;
1106    }
1107
1108    /* This pair of strncpy calls ensures that the buffer is zero-filled,
1109     * which is desirable since we'll soon be sending those bytes, and
1110     * don't want the send_req to read uninitialized data.
1111     */
1112    strncpy(buf, filename, SD_MAX_VDI_LEN);
1113    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1114
1115    memset(&hdr, 0, sizeof(hdr));
1116    if (lock) {
1117        hdr.opcode = SD_OP_LOCK_VDI;
1118        hdr.type = LOCK_TYPE_NORMAL;
1119    } else {
1120        hdr.opcode = SD_OP_GET_VDI_INFO;
1121    }
1122    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1123    hdr.proto_ver = SD_PROTO_VER;
1124    hdr.data_length = wlen;
1125    hdr.snapid = snapid;
1126    hdr.flags = SD_FLAG_CMD_WRITE;
1127
1128    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1129    if (ret) {
1130        error_setg_errno(errp, -ret, "cannot get vdi info");
1131        goto out;
1132    }
1133
1134    if (rsp->result != SD_RES_SUCCESS) {
1135        error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1136                   sd_strerror(rsp->result), filename, snapid, tag);
1137        if (rsp->result == SD_RES_NO_VDI) {
1138            ret = -ENOENT;
1139        } else if (rsp->result == SD_RES_VDI_LOCKED) {
1140            ret = -EBUSY;
1141        } else {
1142            ret = -EIO;
1143        }
1144        goto out;
1145    }
1146    *vid = rsp->vdi_id;
1147
1148    ret = 0;
1149out:
1150    closesocket(fd);
1151    return ret;
1152}
1153
1154static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1155                                         struct iovec *iov, int niov,
1156                                         enum AIOCBState aiocb_type)
1157{
1158    int nr_copies = s->inode.nr_copies;
1159    SheepdogObjReq hdr;
1160    unsigned int wlen = 0;
1161    int ret;
1162    uint64_t oid = aio_req->oid;
1163    unsigned int datalen = aio_req->data_len;
1164    uint64_t offset = aio_req->offset;
1165    uint8_t flags = aio_req->flags;
1166    uint64_t old_oid = aio_req->base_oid;
1167    bool create = aio_req->create;
1168
1169    if (!nr_copies) {
1170        error_report("bug");
1171    }
1172
1173    memset(&hdr, 0, sizeof(hdr));
1174
1175    switch (aiocb_type) {
1176    case AIOCB_FLUSH_CACHE:
1177        hdr.opcode = SD_OP_FLUSH_VDI;
1178        break;
1179    case AIOCB_READ_UDATA:
1180        hdr.opcode = SD_OP_READ_OBJ;
1181        hdr.flags = flags;
1182        break;
1183    case AIOCB_WRITE_UDATA:
1184        if (create) {
1185            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1186        } else {
1187            hdr.opcode = SD_OP_WRITE_OBJ;
1188        }
1189        wlen = datalen;
1190        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1191        break;
1192    case AIOCB_DISCARD_OBJ:
1193        hdr.opcode = SD_OP_WRITE_OBJ;
1194        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1195        s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1196        offset = offsetof(SheepdogInode,
1197                          data_vdi_id[data_oid_to_idx(oid)]);
1198        oid = vid_to_vdi_oid(s->inode.vdi_id);
1199        wlen = datalen = sizeof(uint32_t);
1200        break;
1201    }
1202
1203    if (s->cache_flags) {
1204        hdr.flags |= s->cache_flags;
1205    }
1206
1207    hdr.oid = oid;
1208    hdr.cow_oid = old_oid;
1209    hdr.copies = s->inode.nr_copies;
1210
1211    hdr.data_length = datalen;
1212    hdr.offset = offset;
1213
1214    hdr.id = aio_req->id;
1215
1216    qemu_co_mutex_lock(&s->lock);
1217    s->co_send = qemu_coroutine_self();
1218    aio_set_fd_handler(s->aio_context, s->fd, false,
1219                       co_read_response, co_write_request, s);
1220    socket_set_cork(s->fd, 1);
1221
1222    /* send a header */
1223    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1224    if (ret != sizeof(hdr)) {
1225        error_report("failed to send a req, %s", strerror(errno));
1226        goto out;
1227    }
1228
1229    if (wlen) {
1230        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1231        if (ret != wlen) {
1232            error_report("failed to send a data, %s", strerror(errno));
1233        }
1234    }
1235out:
1236    socket_set_cork(s->fd, 0);
1237    aio_set_fd_handler(s->aio_context, s->fd, false,
1238                       co_read_response, NULL, s);
1239    s->co_send = NULL;
1240    qemu_co_mutex_unlock(&s->lock);
1241}
1242
1243static int read_write_object(int fd, AioContext *aio_context, char *buf,
1244                             uint64_t oid, uint8_t copies,
1245                             unsigned int datalen, uint64_t offset,
1246                             bool write, bool create, uint32_t cache_flags)
1247{
1248    SheepdogObjReq hdr;
1249    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1250    unsigned int wlen, rlen;
1251    int ret;
1252
1253    memset(&hdr, 0, sizeof(hdr));
1254
1255    if (write) {
1256        wlen = datalen;
1257        rlen = 0;
1258        hdr.flags = SD_FLAG_CMD_WRITE;
1259        if (create) {
1260            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1261        } else {
1262            hdr.opcode = SD_OP_WRITE_OBJ;
1263        }
1264    } else {
1265        wlen = 0;
1266        rlen = datalen;
1267        hdr.opcode = SD_OP_READ_OBJ;
1268    }
1269
1270    hdr.flags |= cache_flags;
1271
1272    hdr.oid = oid;
1273    hdr.data_length = datalen;
1274    hdr.offset = offset;
1275    hdr.copies = copies;
1276
1277    ret = do_req(fd, aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1278    if (ret) {
1279        error_report("failed to send a request to the sheep");
1280        return ret;
1281    }
1282
1283    switch (rsp->result) {
1284    case SD_RES_SUCCESS:
1285        return 0;
1286    default:
1287        error_report("%s", sd_strerror(rsp->result));
1288        return -EIO;
1289    }
1290}
1291
1292static int read_object(int fd, AioContext *aio_context, char *buf,
1293                       uint64_t oid, uint8_t copies,
1294                       unsigned int datalen, uint64_t offset,
1295                       uint32_t cache_flags)
1296{
1297    return read_write_object(fd, aio_context, buf, oid, copies,
1298                             datalen, offset, false,
1299                             false, cache_flags);
1300}
1301
1302static int write_object(int fd, AioContext *aio_context, char *buf,
1303                        uint64_t oid, uint8_t copies,
1304                        unsigned int datalen, uint64_t offset, bool create,
1305                        uint32_t cache_flags)
1306{
1307    return read_write_object(fd, aio_context, buf, oid, copies,
1308                             datalen, offset, true,
1309                             create, cache_flags);
1310}
1311
1312/* update inode with the latest state */
1313static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1314{
1315    Error *local_err = NULL;
1316    SheepdogInode *inode;
1317    int ret = 0, fd;
1318    uint32_t vid = 0;
1319
1320    fd = connect_to_sdog(s, &local_err);
1321    if (fd < 0) {
1322        error_report_err(local_err);
1323        return -EIO;
1324    }
1325
1326    inode = g_malloc(SD_INODE_HEADER_SIZE);
1327
1328    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1329    if (ret) {
1330        error_report_err(local_err);
1331        goto out;
1332    }
1333
1334    ret = read_object(fd, s->aio_context, (char *)inode, vid_to_vdi_oid(vid),
1335                      s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1336                      s->cache_flags);
1337    if (ret < 0) {
1338        goto out;
1339    }
1340
1341    if (inode->vdi_id != s->inode.vdi_id) {
1342        memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1343    }
1344
1345out:
1346    g_free(inode);
1347    closesocket(fd);
1348
1349    return ret;
1350}
1351
1352static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1353{
1354    SheepdogAIOCB *acb = aio_req->aiocb;
1355
1356    aio_req->create = false;
1357
1358    /* check whether this request becomes a CoW one */
1359    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1360        int idx = data_oid_to_idx(aio_req->oid);
1361
1362        if (is_data_obj_writable(&s->inode, idx)) {
1363            goto out;
1364        }
1365
1366        if (s->inode.data_vdi_id[idx]) {
1367            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1368            aio_req->flags |= SD_FLAG_CMD_COW;
1369        }
1370        aio_req->create = true;
1371    }
1372out:
1373    if (is_data_obj(aio_req->oid)) {
1374        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1375                        acb->aiocb_type);
1376    } else {
1377        struct iovec iov;
1378        iov.iov_base = &s->inode;
1379        iov.iov_len = sizeof(s->inode);
1380        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1381    }
1382}
1383
1384static void sd_detach_aio_context(BlockDriverState *bs)
1385{
1386    BDRVSheepdogState *s = bs->opaque;
1387
1388    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1389                       NULL, NULL);
1390}
1391
1392static void sd_attach_aio_context(BlockDriverState *bs,
1393                                  AioContext *new_context)
1394{
1395    BDRVSheepdogState *s = bs->opaque;
1396
1397    s->aio_context = new_context;
1398    aio_set_fd_handler(new_context, s->fd, false,
1399                       co_read_response, NULL, s);
1400}
1401
1402/* TODO Convert to fine grained options */
1403static QemuOptsList runtime_opts = {
1404    .name = "sheepdog",
1405    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1406    .desc = {
1407        {
1408            .name = "filename",
1409            .type = QEMU_OPT_STRING,
1410            .help = "URL to the sheepdog image",
1411        },
1412        { /* end of list */ }
1413    },
1414};
1415
1416static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1417                   Error **errp)
1418{
1419    int ret, fd;
1420    uint32_t vid = 0;
1421    BDRVSheepdogState *s = bs->opaque;
1422    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1423    uint32_t snapid;
1424    char *buf = NULL;
1425    QemuOpts *opts;
1426    Error *local_err = NULL;
1427    const char *filename;
1428
1429    s->bs = bs;
1430    s->aio_context = bdrv_get_aio_context(bs);
1431
1432    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1433    qemu_opts_absorb_qdict(opts, options, &local_err);
1434    if (local_err) {
1435        error_propagate(errp, local_err);
1436        ret = -EINVAL;
1437        goto out;
1438    }
1439
1440    filename = qemu_opt_get(opts, "filename");
1441
1442    QLIST_INIT(&s->inflight_aio_head);
1443    QLIST_INIT(&s->failed_aio_head);
1444    QLIST_INIT(&s->inflight_aiocb_head);
1445    s->fd = -1;
1446
1447    memset(vdi, 0, sizeof(vdi));
1448    memset(tag, 0, sizeof(tag));
1449
1450    if (strstr(filename, "://")) {
1451        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1452    } else {
1453        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1454    }
1455    if (ret < 0) {
1456        error_setg(errp, "Can't parse filename");
1457        goto out;
1458    }
1459    s->fd = get_sheep_fd(s, errp);
1460    if (s->fd < 0) {
1461        ret = s->fd;
1462        goto out;
1463    }
1464
1465    ret = find_vdi_name(s, vdi, snapid, tag, &vid, true, errp);
1466    if (ret) {
1467        goto out;
1468    }
1469
1470    /*
1471     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1472     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1473     */
1474    s->cache_flags = SD_FLAG_CMD_CACHE;
1475    if (flags & BDRV_O_NOCACHE) {
1476        s->cache_flags = SD_FLAG_CMD_DIRECT;
1477    }
1478    s->discard_supported = true;
1479
1480    if (snapid || tag[0] != '\0') {
1481        DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1482        s->is_snapshot = true;
1483    }
1484
1485    fd = connect_to_sdog(s, errp);
1486    if (fd < 0) {
1487        ret = fd;
1488        goto out;
1489    }
1490
1491    buf = g_malloc(SD_INODE_SIZE);
1492    ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
1493                      0, SD_INODE_SIZE, 0, s->cache_flags);
1494
1495    closesocket(fd);
1496
1497    if (ret) {
1498        error_setg(errp, "Can't read snapshot inode");
1499        goto out;
1500    }
1501
1502    memcpy(&s->inode, buf, sizeof(s->inode));
1503
1504    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1505    pstrcpy(s->name, sizeof(s->name), vdi);
1506    qemu_co_mutex_init(&s->lock);
1507    qemu_co_queue_init(&s->overlapping_queue);
1508    qemu_opts_del(opts);
1509    g_free(buf);
1510    return 0;
1511out:
1512    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1513                       false, NULL, NULL, NULL);
1514    if (s->fd >= 0) {
1515        closesocket(s->fd);
1516    }
1517    qemu_opts_del(opts);
1518    g_free(buf);
1519    return ret;
1520}
1521
1522static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1523                             Error **errp)
1524{
1525    BDRVSheepdogState *s = state->bs->opaque;
1526    BDRVSheepdogReopenState *re_s;
1527    int ret = 0;
1528
1529    re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1530
1531    re_s->cache_flags = SD_FLAG_CMD_CACHE;
1532    if (state->flags & BDRV_O_NOCACHE) {
1533        re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1534    }
1535
1536    re_s->fd = get_sheep_fd(s, errp);
1537    if (re_s->fd < 0) {
1538        ret = re_s->fd;
1539        return ret;
1540    }
1541
1542    return ret;
1543}
1544
1545static void sd_reopen_commit(BDRVReopenState *state)
1546{
1547    BDRVSheepdogReopenState *re_s = state->opaque;
1548    BDRVSheepdogState *s = state->bs->opaque;
1549
1550    if (s->fd) {
1551        aio_set_fd_handler(s->aio_context, s->fd, false,
1552                           NULL, NULL, NULL);
1553        closesocket(s->fd);
1554    }
1555
1556    s->fd = re_s->fd;
1557    s->cache_flags = re_s->cache_flags;
1558
1559    g_free(state->opaque);
1560    state->opaque = NULL;
1561
1562    return;
1563}
1564
1565static void sd_reopen_abort(BDRVReopenState *state)
1566{
1567    BDRVSheepdogReopenState *re_s = state->opaque;
1568    BDRVSheepdogState *s = state->bs->opaque;
1569
1570    if (re_s == NULL) {
1571        return;
1572    }
1573
1574    if (re_s->fd) {
1575        aio_set_fd_handler(s->aio_context, re_s->fd, false,
1576                           NULL, NULL, NULL);
1577        closesocket(re_s->fd);
1578    }
1579
1580    g_free(state->opaque);
1581    state->opaque = NULL;
1582
1583    return;
1584}
1585
1586static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1587                        Error **errp)
1588{
1589    SheepdogVdiReq hdr;
1590    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1591    int fd, ret;
1592    unsigned int wlen, rlen = 0;
1593    char buf[SD_MAX_VDI_LEN];
1594
1595    fd = connect_to_sdog(s, errp);
1596    if (fd < 0) {
1597        return fd;
1598    }
1599
1600    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1601     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1602     */
1603    memset(buf, 0, sizeof(buf));
1604    pstrcpy(buf, sizeof(buf), s->name);
1605
1606    memset(&hdr, 0, sizeof(hdr));
1607    hdr.opcode = SD_OP_NEW_VDI;
1608    hdr.base_vdi_id = s->inode.vdi_id;
1609
1610    wlen = SD_MAX_VDI_LEN;
1611
1612    hdr.flags = SD_FLAG_CMD_WRITE;
1613    hdr.snapid = snapshot;
1614
1615    hdr.data_length = wlen;
1616    hdr.vdi_size = s->inode.vdi_size;
1617    hdr.copy_policy = s->inode.copy_policy;
1618    hdr.copies = s->inode.nr_copies;
1619    hdr.block_size_shift = s->inode.block_size_shift;
1620
1621    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1622
1623    closesocket(fd);
1624
1625    if (ret) {
1626        error_setg_errno(errp, -ret, "create failed");
1627        return ret;
1628    }
1629
1630    if (rsp->result != SD_RES_SUCCESS) {
1631        error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1632        return -EIO;
1633    }
1634
1635    if (vdi_id) {
1636        *vdi_id = rsp->vdi_id;
1637    }
1638
1639    return 0;
1640}
1641
1642static int sd_prealloc(const char *filename, Error **errp)
1643{
1644    BlockBackend *blk = NULL;
1645    BDRVSheepdogState *base = NULL;
1646    unsigned long buf_size;
1647    uint32_t idx, max_idx;
1648    uint32_t object_size;
1649    int64_t vdi_size;
1650    void *buf = NULL;
1651    int ret;
1652
1653    blk = blk_new_open(filename, NULL, NULL,
1654                       BDRV_O_RDWR | BDRV_O_PROTOCOL, errp);
1655    if (blk == NULL) {
1656        ret = -EIO;
1657        goto out_with_err_set;
1658    }
1659
1660    blk_set_allow_write_beyond_eof(blk, true);
1661
1662    vdi_size = blk_getlength(blk);
1663    if (vdi_size < 0) {
1664        ret = vdi_size;
1665        goto out;
1666    }
1667
1668    base = blk_bs(blk)->opaque;
1669    object_size = (UINT32_C(1) << base->inode.block_size_shift);
1670    buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1671    buf = g_malloc0(buf_size);
1672
1673    max_idx = DIV_ROUND_UP(vdi_size, buf_size);
1674
1675    for (idx = 0; idx < max_idx; idx++) {
1676        /*
1677         * The created image can be a cloned image, so we need to read
1678         * a data from the source image.
1679         */
1680        ret = blk_pread(blk, idx * buf_size, buf, buf_size);
1681        if (ret < 0) {
1682            goto out;
1683        }
1684        ret = blk_pwrite(blk, idx * buf_size, buf, buf_size, 0);
1685        if (ret < 0) {
1686            goto out;
1687        }
1688    }
1689
1690    ret = 0;
1691out:
1692    if (ret < 0) {
1693        error_setg_errno(errp, -ret, "Can't pre-allocate");
1694    }
1695out_with_err_set:
1696    if (blk) {
1697        blk_unref(blk);
1698    }
1699    g_free(buf);
1700
1701    return ret;
1702}
1703
1704/*
1705 * Sheepdog support two kinds of redundancy, full replication and erasure
1706 * coding.
1707 *
1708 * # create a fully replicated vdi with x copies
1709 * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1710 *
1711 * # create a erasure coded vdi with x data strips and y parity strips
1712 * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1713 */
1714static int parse_redundancy(BDRVSheepdogState *s, const char *opt)
1715{
1716    struct SheepdogInode *inode = &s->inode;
1717    const char *n1, *n2;
1718    long copy, parity;
1719    char p[10];
1720
1721    pstrcpy(p, sizeof(p), opt);
1722    n1 = strtok(p, ":");
1723    n2 = strtok(NULL, ":");
1724
1725    if (!n1) {
1726        return -EINVAL;
1727    }
1728
1729    copy = strtol(n1, NULL, 10);
1730    if (copy > SD_MAX_COPIES || copy < 1) {
1731        return -EINVAL;
1732    }
1733    if (!n2) {
1734        inode->copy_policy = 0;
1735        inode->nr_copies = copy;
1736        return 0;
1737    }
1738
1739    if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1740        return -EINVAL;
1741    }
1742
1743    parity = strtol(n2, NULL, 10);
1744    if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1745        return -EINVAL;
1746    }
1747
1748    /*
1749     * 4 bits for parity and 4 bits for data.
1750     * We have to compress upper data bits because it can't represent 16
1751     */
1752    inode->copy_policy = ((copy / 2) << 4) + parity;
1753    inode->nr_copies = copy + parity;
1754
1755    return 0;
1756}
1757
1758static int parse_block_size_shift(BDRVSheepdogState *s, QemuOpts *opt)
1759{
1760    struct SheepdogInode *inode = &s->inode;
1761    uint64_t object_size;
1762    int obj_order;
1763
1764    object_size = qemu_opt_get_size_del(opt, BLOCK_OPT_OBJECT_SIZE, 0);
1765    if (object_size) {
1766        if ((object_size - 1) & object_size) {    /* not a power of 2? */
1767            return -EINVAL;
1768        }
1769        obj_order = ctz32(object_size);
1770        if (obj_order < 20 || obj_order > 31) {
1771            return -EINVAL;
1772        }
1773        inode->block_size_shift = (uint8_t)obj_order;
1774    }
1775
1776    return 0;
1777}
1778
1779static int sd_create(const char *filename, QemuOpts *opts,
1780                     Error **errp)
1781{
1782    int ret = 0;
1783    uint32_t vid = 0;
1784    char *backing_file = NULL;
1785    char *buf = NULL;
1786    BDRVSheepdogState *s;
1787    char tag[SD_MAX_VDI_TAG_LEN];
1788    uint32_t snapid;
1789    uint64_t max_vdi_size;
1790    bool prealloc = false;
1791
1792    s = g_new0(BDRVSheepdogState, 1);
1793
1794    memset(tag, 0, sizeof(tag));
1795    if (strstr(filename, "://")) {
1796        ret = sd_parse_uri(s, filename, s->name, &snapid, tag);
1797    } else {
1798        ret = parse_vdiname(s, filename, s->name, &snapid, tag);
1799    }
1800    if (ret < 0) {
1801        error_setg(errp, "Can't parse filename");
1802        goto out;
1803    }
1804
1805    s->inode.vdi_size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
1806                                 BDRV_SECTOR_SIZE);
1807    backing_file = qemu_opt_get_del(opts, BLOCK_OPT_BACKING_FILE);
1808    buf = qemu_opt_get_del(opts, BLOCK_OPT_PREALLOC);
1809    if (!buf || !strcmp(buf, "off")) {
1810        prealloc = false;
1811    } else if (!strcmp(buf, "full")) {
1812        prealloc = true;
1813    } else {
1814        error_setg(errp, "Invalid preallocation mode: '%s'", buf);
1815        ret = -EINVAL;
1816        goto out;
1817    }
1818
1819    g_free(buf);
1820    buf = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
1821    if (buf) {
1822        ret = parse_redundancy(s, buf);
1823        if (ret < 0) {
1824            error_setg(errp, "Invalid redundancy mode: '%s'", buf);
1825            goto out;
1826        }
1827    }
1828    ret = parse_block_size_shift(s, opts);
1829    if (ret < 0) {
1830        error_setg(errp, "Invalid object_size."
1831                         " obect_size needs to be power of 2"
1832                         " and be limited from 2^20 to 2^31");
1833        goto out;
1834    }
1835
1836    if (backing_file) {
1837        BlockBackend *blk;
1838        BDRVSheepdogState *base;
1839        BlockDriver *drv;
1840
1841        /* Currently, only Sheepdog backing image is supported. */
1842        drv = bdrv_find_protocol(backing_file, true, NULL);
1843        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1844            error_setg(errp, "backing_file must be a sheepdog image");
1845            ret = -EINVAL;
1846            goto out;
1847        }
1848
1849        blk = blk_new_open(backing_file, NULL, NULL,
1850                           BDRV_O_PROTOCOL, errp);
1851        if (blk == NULL) {
1852            ret = -EIO;
1853            goto out;
1854        }
1855
1856        base = blk_bs(blk)->opaque;
1857
1858        if (!is_snapshot(&base->inode)) {
1859            error_setg(errp, "cannot clone from a non snapshot vdi");
1860            blk_unref(blk);
1861            ret = -EINVAL;
1862            goto out;
1863        }
1864        s->inode.vdi_id = base->inode.vdi_id;
1865        blk_unref(blk);
1866    }
1867
1868    s->aio_context = qemu_get_aio_context();
1869
1870    /* if block_size_shift is not specified, get cluster default value */
1871    if (s->inode.block_size_shift == 0) {
1872        SheepdogVdiReq hdr;
1873        SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
1874        Error *local_err = NULL;
1875        int fd;
1876        unsigned int wlen = 0, rlen = 0;
1877
1878        fd = connect_to_sdog(s, &local_err);
1879        if (fd < 0) {
1880            error_report_err(local_err);
1881            ret = -EIO;
1882            goto out;
1883        }
1884
1885        memset(&hdr, 0, sizeof(hdr));
1886        hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
1887        hdr.proto_ver = SD_PROTO_VER;
1888
1889        ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1890                     NULL, &wlen, &rlen);
1891        closesocket(fd);
1892        if (ret) {
1893            error_setg_errno(errp, -ret, "failed to get cluster default");
1894            goto out;
1895        }
1896        if (rsp->result == SD_RES_SUCCESS) {
1897            s->inode.block_size_shift = rsp->block_size_shift;
1898        } else {
1899            s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
1900        }
1901    }
1902
1903    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1904
1905    if (s->inode.vdi_size > max_vdi_size) {
1906        error_setg(errp, "An image is too large."
1907                         " The maximum image size is %"PRIu64 "GB",
1908                         max_vdi_size / 1024 / 1024 / 1024);
1909        ret = -EINVAL;
1910        goto out;
1911    }
1912
1913    ret = do_sd_create(s, &vid, 0, errp);
1914    if (ret) {
1915        goto out;
1916    }
1917
1918    if (prealloc) {
1919        ret = sd_prealloc(filename, errp);
1920    }
1921out:
1922    g_free(backing_file);
1923    g_free(buf);
1924    g_free(s);
1925    return ret;
1926}
1927
1928static void sd_close(BlockDriverState *bs)
1929{
1930    Error *local_err = NULL;
1931    BDRVSheepdogState *s = bs->opaque;
1932    SheepdogVdiReq hdr;
1933    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1934    unsigned int wlen, rlen = 0;
1935    int fd, ret;
1936
1937    DPRINTF("%s\n", s->name);
1938
1939    fd = connect_to_sdog(s, &local_err);
1940    if (fd < 0) {
1941        error_report_err(local_err);
1942        return;
1943    }
1944
1945    memset(&hdr, 0, sizeof(hdr));
1946
1947    hdr.opcode = SD_OP_RELEASE_VDI;
1948    hdr.type = LOCK_TYPE_NORMAL;
1949    hdr.base_vdi_id = s->inode.vdi_id;
1950    wlen = strlen(s->name) + 1;
1951    hdr.data_length = wlen;
1952    hdr.flags = SD_FLAG_CMD_WRITE;
1953
1954    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1955                 s->name, &wlen, &rlen);
1956
1957    closesocket(fd);
1958
1959    if (!ret && rsp->result != SD_RES_SUCCESS &&
1960        rsp->result != SD_RES_VDI_NOT_LOCKED) {
1961        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1962    }
1963
1964    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1965                       false, NULL, NULL, NULL);
1966    closesocket(s->fd);
1967    g_free(s->host_spec);
1968}
1969
1970static int64_t sd_getlength(BlockDriverState *bs)
1971{
1972    BDRVSheepdogState *s = bs->opaque;
1973
1974    return s->inode.vdi_size;
1975}
1976
1977static int sd_truncate(BlockDriverState *bs, int64_t offset)
1978{
1979    Error *local_err = NULL;
1980    BDRVSheepdogState *s = bs->opaque;
1981    int ret, fd;
1982    unsigned int datalen;
1983    uint64_t max_vdi_size;
1984
1985    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1986    if (offset < s->inode.vdi_size) {
1987        error_report("shrinking is not supported");
1988        return -EINVAL;
1989    } else if (offset > max_vdi_size) {
1990        error_report("too big image size");
1991        return -EINVAL;
1992    }
1993
1994    fd = connect_to_sdog(s, &local_err);
1995    if (fd < 0) {
1996        error_report_err(local_err);
1997        return fd;
1998    }
1999
2000    /* we don't need to update entire object */
2001    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2002    s->inode.vdi_size = offset;
2003    ret = write_object(fd, s->aio_context, (char *)&s->inode,
2004                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2005                       datalen, 0, false, s->cache_flags);
2006    close(fd);
2007
2008    if (ret < 0) {
2009        error_report("failed to update an inode.");
2010    }
2011
2012    return ret;
2013}
2014
2015/*
2016 * This function is called after writing data objects.  If we need to
2017 * update metadata, this sends a write request to the vdi object.
2018 * Otherwise, this switches back to sd_co_readv/writev.
2019 */
2020static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2021{
2022    BDRVSheepdogState *s = acb->common.bs->opaque;
2023    struct iovec iov;
2024    AIOReq *aio_req;
2025    uint32_t offset, data_len, mn, mx;
2026
2027    mn = acb->min_dirty_data_idx;
2028    mx = acb->max_dirty_data_idx;
2029    if (mn <= mx) {
2030        /* we need to update the vdi object. */
2031        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2032            mn * sizeof(s->inode.data_vdi_id[0]);
2033        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2034
2035        acb->min_dirty_data_idx = UINT32_MAX;
2036        acb->max_dirty_data_idx = 0;
2037
2038        iov.iov_base = &s->inode;
2039        iov.iov_len = sizeof(s->inode);
2040        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2041                                data_len, offset, 0, false, 0, offset);
2042        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2043        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2044
2045        acb->aio_done_func = sd_finish_aiocb;
2046        acb->aiocb_type = AIOCB_WRITE_UDATA;
2047        return;
2048    }
2049
2050    sd_finish_aiocb(acb);
2051}
2052
2053/* Delete current working VDI on the snapshot chain */
2054static bool sd_delete(BDRVSheepdogState *s)
2055{
2056    Error *local_err = NULL;
2057    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2058    SheepdogVdiReq hdr = {
2059        .opcode = SD_OP_DEL_VDI,
2060        .base_vdi_id = s->inode.vdi_id,
2061        .data_length = wlen,
2062        .flags = SD_FLAG_CMD_WRITE,
2063    };
2064    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2065    int fd, ret;
2066
2067    fd = connect_to_sdog(s, &local_err);
2068    if (fd < 0) {
2069        error_report_err(local_err);
2070        return false;
2071    }
2072
2073    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
2074                 s->name, &wlen, &rlen);
2075    closesocket(fd);
2076    if (ret) {
2077        return false;
2078    }
2079    switch (rsp->result) {
2080    case SD_RES_NO_VDI:
2081        error_report("%s was already deleted", s->name);
2082        /* fall through */
2083    case SD_RES_SUCCESS:
2084        break;
2085    default:
2086        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2087        return false;
2088    }
2089
2090    return true;
2091}
2092
2093/*
2094 * Create a writable VDI from a snapshot
2095 */
2096static int sd_create_branch(BDRVSheepdogState *s)
2097{
2098    Error *local_err = NULL;
2099    int ret, fd;
2100    uint32_t vid;
2101    char *buf;
2102    bool deleted;
2103
2104    DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
2105
2106    buf = g_malloc(SD_INODE_SIZE);
2107
2108    /*
2109     * Even If deletion fails, we will just create extra snapshot based on
2110     * the working VDI which was supposed to be deleted. So no need to
2111     * false bail out.
2112     */
2113    deleted = sd_delete(s);
2114    ret = do_sd_create(s, &vid, !deleted, &local_err);
2115    if (ret) {
2116        error_report_err(local_err);
2117        goto out;
2118    }
2119
2120    DPRINTF("%" PRIx32 " is created.\n", vid);
2121
2122    fd = connect_to_sdog(s, &local_err);
2123    if (fd < 0) {
2124        error_report_err(local_err);
2125        ret = fd;
2126        goto out;
2127    }
2128
2129    ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
2130                      s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2131
2132    closesocket(fd);
2133
2134    if (ret < 0) {
2135        goto out;
2136    }
2137
2138    memcpy(&s->inode, buf, sizeof(s->inode));
2139
2140    s->is_snapshot = false;
2141    ret = 0;
2142    DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
2143
2144out:
2145    g_free(buf);
2146
2147    return ret;
2148}
2149
2150/*
2151 * Send I/O requests to the server.
2152 *
2153 * This function sends requests to the server, links the requests to
2154 * the inflight_list in BDRVSheepdogState, and exits without
2155 * waiting the response.  The responses are received in the
2156 * `aio_read_response' function which is called from the main loop as
2157 * a fd handler.
2158 *
2159 * Returns 1 when we need to wait a response, 0 when there is no sent
2160 * request and -errno in error cases.
2161 */
2162static int coroutine_fn sd_co_rw_vector(void *p)
2163{
2164    SheepdogAIOCB *acb = p;
2165    int ret = 0;
2166    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2167    unsigned long idx;
2168    uint32_t object_size;
2169    uint64_t oid;
2170    uint64_t offset;
2171    BDRVSheepdogState *s = acb->common.bs->opaque;
2172    SheepdogInode *inode = &s->inode;
2173    AIOReq *aio_req;
2174
2175    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2176        /*
2177         * In the case we open the snapshot VDI, Sheepdog creates the
2178         * writable VDI when we do a write operation first.
2179         */
2180        ret = sd_create_branch(s);
2181        if (ret) {
2182            acb->ret = -EIO;
2183            goto out;
2184        }
2185    }
2186
2187    object_size = (UINT32_C(1) << inode->block_size_shift);
2188    idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2189    offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2190
2191    /*
2192     * Make sure we don't free the aiocb before we are done with all requests.
2193     * This additional reference is dropped at the end of this function.
2194     */
2195    acb->nr_pending++;
2196
2197    while (done != total) {
2198        uint8_t flags = 0;
2199        uint64_t old_oid = 0;
2200        bool create = false;
2201
2202        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2203
2204        len = MIN(total - done, object_size - offset);
2205
2206        switch (acb->aiocb_type) {
2207        case AIOCB_READ_UDATA:
2208            if (!inode->data_vdi_id[idx]) {
2209                qemu_iovec_memset(acb->qiov, done, 0, len);
2210                goto done;
2211            }
2212            break;
2213        case AIOCB_WRITE_UDATA:
2214            if (!inode->data_vdi_id[idx]) {
2215                create = true;
2216            } else if (!is_data_obj_writable(inode, idx)) {
2217                /* Copy-On-Write */
2218                create = true;
2219                old_oid = oid;
2220                flags = SD_FLAG_CMD_COW;
2221            }
2222            break;
2223        case AIOCB_DISCARD_OBJ:
2224            /*
2225             * We discard the object only when the whole object is
2226             * 1) allocated 2) trimmed. Otherwise, simply skip it.
2227             */
2228            if (len != object_size || inode->data_vdi_id[idx] == 0) {
2229                goto done;
2230            }
2231            break;
2232        default:
2233            break;
2234        }
2235
2236        if (create) {
2237            DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
2238                    inode->vdi_id, oid,
2239                    vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
2240            oid = vid_to_data_oid(inode->vdi_id, idx);
2241            DPRINTF("new oid %" PRIx64 "\n", oid);
2242        }
2243
2244        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2245                                old_oid,
2246                                acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2247                                0 : done);
2248        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2249
2250        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2251                        acb->aiocb_type);
2252    done:
2253        offset = 0;
2254        idx++;
2255        done += len;
2256    }
2257out:
2258    if (!--acb->nr_pending) {
2259        return acb->ret;
2260    }
2261    return 1;
2262}
2263
2264static bool check_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
2265{
2266    SheepdogAIOCB *cb;
2267
2268    QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
2269        if (AIOCBOverlapping(aiocb, cb)) {
2270            return true;
2271        }
2272    }
2273
2274    QLIST_INSERT_HEAD(&s->inflight_aiocb_head, aiocb, aiocb_siblings);
2275    return false;
2276}
2277
2278static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2279                        int nb_sectors, QEMUIOVector *qiov)
2280{
2281    SheepdogAIOCB *acb;
2282    int ret;
2283    int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2284    BDRVSheepdogState *s = bs->opaque;
2285
2286    if (offset > s->inode.vdi_size) {
2287        ret = sd_truncate(bs, offset);
2288        if (ret < 0) {
2289            return ret;
2290        }
2291    }
2292
2293    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2294    acb->aio_done_func = sd_write_done;
2295    acb->aiocb_type = AIOCB_WRITE_UDATA;
2296
2297retry:
2298    if (check_overlapping_aiocb(s, acb)) {
2299        qemu_co_queue_wait(&s->overlapping_queue);
2300        goto retry;
2301    }
2302
2303    ret = sd_co_rw_vector(acb);
2304    if (ret <= 0) {
2305        QLIST_REMOVE(acb, aiocb_siblings);
2306        qemu_co_queue_restart_all(&s->overlapping_queue);
2307        qemu_aio_unref(acb);
2308        return ret;
2309    }
2310
2311    qemu_coroutine_yield();
2312
2313    QLIST_REMOVE(acb, aiocb_siblings);
2314    qemu_co_queue_restart_all(&s->overlapping_queue);
2315
2316    return acb->ret;
2317}
2318
2319static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2320                       int nb_sectors, QEMUIOVector *qiov)
2321{
2322    SheepdogAIOCB *acb;
2323    int ret;
2324    BDRVSheepdogState *s = bs->opaque;
2325
2326    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2327    acb->aiocb_type = AIOCB_READ_UDATA;
2328    acb->aio_done_func = sd_finish_aiocb;
2329
2330retry:
2331    if (check_overlapping_aiocb(s, acb)) {
2332        qemu_co_queue_wait(&s->overlapping_queue);
2333        goto retry;
2334    }
2335
2336    ret = sd_co_rw_vector(acb);
2337    if (ret <= 0) {
2338        QLIST_REMOVE(acb, aiocb_siblings);
2339        qemu_co_queue_restart_all(&s->overlapping_queue);
2340        qemu_aio_unref(acb);
2341        return ret;
2342    }
2343
2344    qemu_coroutine_yield();
2345
2346    QLIST_REMOVE(acb, aiocb_siblings);
2347    qemu_co_queue_restart_all(&s->overlapping_queue);
2348    return acb->ret;
2349}
2350
2351static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2352{
2353    BDRVSheepdogState *s = bs->opaque;
2354    SheepdogAIOCB *acb;
2355    AIOReq *aio_req;
2356
2357    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2358        return 0;
2359    }
2360
2361    acb = sd_aio_setup(bs, NULL, 0, 0);
2362    acb->aiocb_type = AIOCB_FLUSH_CACHE;
2363    acb->aio_done_func = sd_finish_aiocb;
2364
2365    aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2366                            0, 0, 0, false, 0, 0);
2367    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2368    add_aio_request(s, aio_req, NULL, 0, acb->aiocb_type);
2369
2370    qemu_coroutine_yield();
2371    return acb->ret;
2372}
2373
2374static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2375{
2376    Error *local_err = NULL;
2377    BDRVSheepdogState *s = bs->opaque;
2378    int ret, fd;
2379    uint32_t new_vid;
2380    SheepdogInode *inode;
2381    unsigned int datalen;
2382
2383    DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
2384            "is_snapshot %d\n", sn_info->name, sn_info->id_str,
2385            s->name, sn_info->vm_state_size, s->is_snapshot);
2386
2387    if (s->is_snapshot) {
2388        error_report("You can't create a snapshot of a snapshot VDI, "
2389                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2390
2391        return -EINVAL;
2392    }
2393
2394    DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
2395
2396    s->inode.vm_state_size = sn_info->vm_state_size;
2397    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2398    /* It appears that inode.tag does not require a NUL terminator,
2399     * which means this use of strncpy is ok.
2400     */
2401    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2402    /* we don't need to update entire object */
2403    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2404    inode = g_malloc(datalen);
2405
2406    /* refresh inode. */
2407    fd = connect_to_sdog(s, &local_err);
2408    if (fd < 0) {
2409        error_report_err(local_err);
2410        ret = fd;
2411        goto cleanup;
2412    }
2413
2414    ret = write_object(fd, s->aio_context, (char *)&s->inode,
2415                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2416                       datalen, 0, false, s->cache_flags);
2417    if (ret < 0) {
2418        error_report("failed to write snapshot's inode.");
2419        goto cleanup;
2420    }
2421
2422    ret = do_sd_create(s, &new_vid, 1, &local_err);
2423    if (ret < 0) {
2424        error_reportf_err(local_err,
2425                          "failed to create inode for snapshot: ");
2426        goto cleanup;
2427    }
2428
2429    ret = read_object(fd, s->aio_context, (char *)inode,
2430                      vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2431                      s->cache_flags);
2432
2433    if (ret < 0) {
2434        error_report("failed to read new inode info. %s", strerror(errno));
2435        goto cleanup;
2436    }
2437
2438    memcpy(&s->inode, inode, datalen);
2439    DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2440            s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2441
2442cleanup:
2443    g_free(inode);
2444    closesocket(fd);
2445    return ret;
2446}
2447
2448/*
2449 * We implement rollback(loadvm) operation to the specified snapshot by
2450 * 1) switch to the snapshot
2451 * 2) rely on sd_create_branch to delete working VDI and
2452 * 3) create a new working VDI based on the specified snapshot
2453 */
2454static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2455{
2456    BDRVSheepdogState *s = bs->opaque;
2457    BDRVSheepdogState *old_s;
2458    char tag[SD_MAX_VDI_TAG_LEN];
2459    uint32_t snapid = 0;
2460    int ret = 0;
2461
2462    old_s = g_new(BDRVSheepdogState, 1);
2463
2464    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2465
2466    snapid = strtoul(snapshot_id, NULL, 10);
2467    if (snapid) {
2468        tag[0] = 0;
2469    } else {
2470        pstrcpy(tag, sizeof(tag), snapshot_id);
2471    }
2472
2473    ret = reload_inode(s, snapid, tag);
2474    if (ret) {
2475        goto out;
2476    }
2477
2478    ret = sd_create_branch(s);
2479    if (ret) {
2480        goto out;
2481    }
2482
2483    g_free(old_s);
2484
2485    return 0;
2486out:
2487    /* recover bdrv_sd_state */
2488    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2489    g_free(old_s);
2490
2491    error_report("failed to open. recover old bdrv_sd_state.");
2492
2493    return ret;
2494}
2495
2496#define NR_BATCHED_DISCARD 128
2497
2498static bool remove_objects(BDRVSheepdogState *s)
2499{
2500    int fd, i = 0, nr_objs = 0;
2501    Error *local_err = NULL;
2502    int ret = 0;
2503    bool result = true;
2504    SheepdogInode *inode = &s->inode;
2505
2506    fd = connect_to_sdog(s, &local_err);
2507    if (fd < 0) {
2508        error_report_err(local_err);
2509        return false;
2510    }
2511
2512    nr_objs = count_data_objs(inode);
2513    while (i < nr_objs) {
2514        int start_idx, nr_filled_idx;
2515
2516        while (i < nr_objs && !inode->data_vdi_id[i]) {
2517            i++;
2518        }
2519        start_idx = i;
2520
2521        nr_filled_idx = 0;
2522        while (i < nr_objs && nr_filled_idx < NR_BATCHED_DISCARD) {
2523            if (inode->data_vdi_id[i]) {
2524                inode->data_vdi_id[i] = 0;
2525                nr_filled_idx++;
2526            }
2527
2528            i++;
2529        }
2530
2531        ret = write_object(fd, s->aio_context,
2532                           (char *)&inode->data_vdi_id[start_idx],
2533                           vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
2534                           (i - start_idx) * sizeof(uint32_t),
2535                           offsetof(struct SheepdogInode,
2536                                    data_vdi_id[start_idx]),
2537                           false, s->cache_flags);
2538        if (ret < 0) {
2539            error_report("failed to discard snapshot inode.");
2540            result = false;
2541            goto out;
2542        }
2543    }
2544
2545out:
2546    closesocket(fd);
2547    return result;
2548}
2549
2550static int sd_snapshot_delete(BlockDriverState *bs,
2551                              const char *snapshot_id,
2552                              const char *name,
2553                              Error **errp)
2554{
2555    unsigned long snap_id = 0;
2556    char snap_tag[SD_MAX_VDI_TAG_LEN];
2557    Error *local_err = NULL;
2558    int fd, ret;
2559    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
2560    BDRVSheepdogState *s = bs->opaque;
2561    unsigned int wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, rlen = 0;
2562    uint32_t vid;
2563    SheepdogVdiReq hdr = {
2564        .opcode = SD_OP_DEL_VDI,
2565        .data_length = wlen,
2566        .flags = SD_FLAG_CMD_WRITE,
2567    };
2568    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2569
2570    if (!remove_objects(s)) {
2571        return -1;
2572    }
2573
2574    memset(buf, 0, sizeof(buf));
2575    memset(snap_tag, 0, sizeof(snap_tag));
2576    pstrcpy(buf, SD_MAX_VDI_LEN, s->name);
2577    ret = qemu_strtoul(snapshot_id, NULL, 10, &snap_id);
2578    if (ret || snap_id > UINT32_MAX) {
2579        error_setg(errp, "Invalid snapshot ID: %s",
2580                         snapshot_id ? snapshot_id : "<null>");
2581        return -EINVAL;
2582    }
2583
2584    if (snap_id) {
2585        hdr.snapid = (uint32_t) snap_id;
2586    } else {
2587        pstrcpy(snap_tag, sizeof(snap_tag), snapshot_id);
2588        pstrcpy(buf + SD_MAX_VDI_LEN, SD_MAX_VDI_TAG_LEN, snap_tag);
2589    }
2590
2591    ret = find_vdi_name(s, s->name, snap_id, snap_tag, &vid, true,
2592                        &local_err);
2593    if (ret) {
2594        return ret;
2595    }
2596
2597    fd = connect_to_sdog(s, &local_err);
2598    if (fd < 0) {
2599        error_report_err(local_err);
2600        return -1;
2601    }
2602
2603    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
2604                 buf, &wlen, &rlen);
2605    closesocket(fd);
2606    if (ret) {
2607        return ret;
2608    }
2609
2610    switch (rsp->result) {
2611    case SD_RES_NO_VDI:
2612        error_report("%s was already deleted", s->name);
2613    case SD_RES_SUCCESS:
2614        break;
2615    default:
2616        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2617        return -1;
2618    }
2619
2620    return ret;
2621}
2622
2623static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2624{
2625    Error *local_err = NULL;
2626    BDRVSheepdogState *s = bs->opaque;
2627    SheepdogReq req;
2628    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2629    QEMUSnapshotInfo *sn_tab = NULL;
2630    unsigned wlen, rlen;
2631    int found = 0;
2632    static SheepdogInode inode;
2633    unsigned long *vdi_inuse;
2634    unsigned int start_nr;
2635    uint64_t hval;
2636    uint32_t vid;
2637
2638    vdi_inuse = g_malloc(max);
2639
2640    fd = connect_to_sdog(s, &local_err);
2641    if (fd < 0) {
2642        error_report_err(local_err);
2643        ret = fd;
2644        goto out;
2645    }
2646
2647    rlen = max;
2648    wlen = 0;
2649
2650    memset(&req, 0, sizeof(req));
2651
2652    req.opcode = SD_OP_READ_VDIS;
2653    req.data_length = max;
2654
2655    ret = do_req(fd, s->aio_context, (SheepdogReq *)&req,
2656                 vdi_inuse, &wlen, &rlen);
2657
2658    closesocket(fd);
2659    if (ret) {
2660        goto out;
2661    }
2662
2663    sn_tab = g_new0(QEMUSnapshotInfo, nr);
2664
2665    /* calculate a vdi id with hash function */
2666    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2667    start_nr = hval & (SD_NR_VDIS - 1);
2668
2669    fd = connect_to_sdog(s, &local_err);
2670    if (fd < 0) {
2671        error_report_err(local_err);
2672        ret = fd;
2673        goto out;
2674    }
2675
2676    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2677        if (!test_bit(vid, vdi_inuse)) {
2678            break;
2679        }
2680
2681        /* we don't need to read entire object */
2682        ret = read_object(fd, s->aio_context, (char *)&inode,
2683                          vid_to_vdi_oid(vid),
2684                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2685                          s->cache_flags);
2686
2687        if (ret) {
2688            continue;
2689        }
2690
2691        if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2692            sn_tab[found].date_sec = inode.snap_ctime >> 32;
2693            sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2694            sn_tab[found].vm_state_size = inode.vm_state_size;
2695            sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2696
2697            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2698                     "%" PRIu32, inode.snap_id);
2699            pstrcpy(sn_tab[found].name,
2700                    MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2701                    inode.tag);
2702            found++;
2703        }
2704    }
2705
2706    closesocket(fd);
2707out:
2708    *psn_tab = sn_tab;
2709
2710    g_free(vdi_inuse);
2711
2712    if (ret < 0) {
2713        return ret;
2714    }
2715
2716    return found;
2717}
2718
2719static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2720                                int64_t pos, int size, int load)
2721{
2722    Error *local_err = NULL;
2723    bool create;
2724    int fd, ret = 0, remaining = size;
2725    unsigned int data_len;
2726    uint64_t vmstate_oid;
2727    uint64_t offset;
2728    uint32_t vdi_index;
2729    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2730    uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
2731
2732    fd = connect_to_sdog(s, &local_err);
2733    if (fd < 0) {
2734        error_report_err(local_err);
2735        return fd;
2736    }
2737
2738    while (remaining) {
2739        vdi_index = pos / object_size;
2740        offset = pos % object_size;
2741
2742        data_len = MIN(remaining, object_size - offset);
2743
2744        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2745
2746        create = (offset == 0);
2747        if (load) {
2748            ret = read_object(fd, s->aio_context, (char *)data, vmstate_oid,
2749                              s->inode.nr_copies, data_len, offset,
2750                              s->cache_flags);
2751        } else {
2752            ret = write_object(fd, s->aio_context, (char *)data, vmstate_oid,
2753                               s->inode.nr_copies, data_len, offset, create,
2754                               s->cache_flags);
2755        }
2756
2757        if (ret < 0) {
2758            error_report("failed to save vmstate %s", strerror(errno));
2759            goto cleanup;
2760        }
2761
2762        pos += data_len;
2763        data += data_len;
2764        remaining -= data_len;
2765    }
2766    ret = size;
2767cleanup:
2768    closesocket(fd);
2769    return ret;
2770}
2771
2772static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2773                           int64_t pos)
2774{
2775    BDRVSheepdogState *s = bs->opaque;
2776    void *buf;
2777    int ret;
2778
2779    buf = qemu_blockalign(bs, qiov->size);
2780    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2781    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2782    qemu_vfree(buf);
2783
2784    return ret;
2785}
2786
2787static int sd_load_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2788                           int64_t pos)
2789{
2790    BDRVSheepdogState *s = bs->opaque;
2791    void *buf;
2792    int ret;
2793
2794    buf = qemu_blockalign(bs, qiov->size);
2795    ret = do_load_save_vmstate(s, buf, pos, qiov->size, 1);
2796    qemu_iovec_from_buf(qiov, 0, buf, qiov->size);
2797    qemu_vfree(buf);
2798
2799    return ret;
2800}
2801
2802
2803static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
2804                                      int count)
2805{
2806    SheepdogAIOCB *acb;
2807    BDRVSheepdogState *s = bs->opaque;
2808    int ret;
2809    QEMUIOVector discard_iov;
2810    struct iovec iov;
2811    uint32_t zero = 0;
2812
2813    if (!s->discard_supported) {
2814        return 0;
2815    }
2816
2817    memset(&discard_iov, 0, sizeof(discard_iov));
2818    memset(&iov, 0, sizeof(iov));
2819    iov.iov_base = &zero;
2820    iov.iov_len = sizeof(zero);
2821    discard_iov.iov = &iov;
2822    discard_iov.niov = 1;
2823    assert((offset & (BDRV_SECTOR_SIZE - 1)) == 0);
2824    assert((count & (BDRV_SECTOR_SIZE - 1)) == 0);
2825    acb = sd_aio_setup(bs, &discard_iov, offset >> BDRV_SECTOR_BITS,
2826                       count >> BDRV_SECTOR_BITS);
2827    acb->aiocb_type = AIOCB_DISCARD_OBJ;
2828    acb->aio_done_func = sd_finish_aiocb;
2829
2830retry:
2831    if (check_overlapping_aiocb(s, acb)) {
2832        qemu_co_queue_wait(&s->overlapping_queue);
2833        goto retry;
2834    }
2835
2836    ret = sd_co_rw_vector(acb);
2837    if (ret <= 0) {
2838        QLIST_REMOVE(acb, aiocb_siblings);
2839        qemu_co_queue_restart_all(&s->overlapping_queue);
2840        qemu_aio_unref(acb);
2841        return ret;
2842    }
2843
2844    qemu_coroutine_yield();
2845
2846    QLIST_REMOVE(acb, aiocb_siblings);
2847    qemu_co_queue_restart_all(&s->overlapping_queue);
2848
2849    return acb->ret;
2850}
2851
2852static coroutine_fn int64_t
2853sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2854                       int *pnum, BlockDriverState **file)
2855{
2856    BDRVSheepdogState *s = bs->opaque;
2857    SheepdogInode *inode = &s->inode;
2858    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2859    uint64_t offset = sector_num * BDRV_SECTOR_SIZE;
2860    unsigned long start = offset / object_size,
2861                  end = DIV_ROUND_UP((sector_num + nb_sectors) *
2862                                     BDRV_SECTOR_SIZE, object_size);
2863    unsigned long idx;
2864    int64_t ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID | offset;
2865
2866    for (idx = start; idx < end; idx++) {
2867        if (inode->data_vdi_id[idx] == 0) {
2868            break;
2869        }
2870    }
2871    if (idx == start) {
2872        /* Get the longest length of unallocated sectors */
2873        ret = 0;
2874        for (idx = start + 1; idx < end; idx++) {
2875            if (inode->data_vdi_id[idx] != 0) {
2876                break;
2877            }
2878        }
2879    }
2880
2881    *pnum = (idx - start) * object_size / BDRV_SECTOR_SIZE;
2882    if (*pnum > nb_sectors) {
2883        *pnum = nb_sectors;
2884    }
2885    if (ret > 0 && ret & BDRV_BLOCK_OFFSET_VALID) {
2886        *file = bs;
2887    }
2888    return ret;
2889}
2890
2891static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
2892{
2893    BDRVSheepdogState *s = bs->opaque;
2894    SheepdogInode *inode = &s->inode;
2895    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2896    unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
2897    uint64_t size = 0;
2898
2899    for (i = 0; i < last; i++) {
2900        if (inode->data_vdi_id[i] == 0) {
2901            continue;
2902        }
2903        size += object_size;
2904    }
2905    return size;
2906}
2907
2908static QemuOptsList sd_create_opts = {
2909    .name = "sheepdog-create-opts",
2910    .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
2911    .desc = {
2912        {
2913            .name = BLOCK_OPT_SIZE,
2914            .type = QEMU_OPT_SIZE,
2915            .help = "Virtual disk size"
2916        },
2917        {
2918            .name = BLOCK_OPT_BACKING_FILE,
2919            .type = QEMU_OPT_STRING,
2920            .help = "File name of a base image"
2921        },
2922        {
2923            .name = BLOCK_OPT_PREALLOC,
2924            .type = QEMU_OPT_STRING,
2925            .help = "Preallocation mode (allowed values: off, full)"
2926        },
2927        {
2928            .name = BLOCK_OPT_REDUNDANCY,
2929            .type = QEMU_OPT_STRING,
2930            .help = "Redundancy of the image"
2931        },
2932        {
2933            .name = BLOCK_OPT_OBJECT_SIZE,
2934            .type = QEMU_OPT_SIZE,
2935            .help = "Object size of the image"
2936        },
2937        { /* end of list */ }
2938    }
2939};
2940
2941static BlockDriver bdrv_sheepdog = {
2942    .format_name    = "sheepdog",
2943    .protocol_name  = "sheepdog",
2944    .instance_size  = sizeof(BDRVSheepdogState),
2945    .bdrv_needs_filename = true,
2946    .bdrv_file_open = sd_open,
2947    .bdrv_reopen_prepare    = sd_reopen_prepare,
2948    .bdrv_reopen_commit     = sd_reopen_commit,
2949    .bdrv_reopen_abort      = sd_reopen_abort,
2950    .bdrv_close     = sd_close,
2951    .bdrv_create    = sd_create,
2952    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2953    .bdrv_getlength = sd_getlength,
2954    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2955    .bdrv_truncate  = sd_truncate,
2956
2957    .bdrv_co_readv  = sd_co_readv,
2958    .bdrv_co_writev = sd_co_writev,
2959    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2960    .bdrv_co_pdiscard = sd_co_pdiscard,
2961    .bdrv_co_get_block_status = sd_co_get_block_status,
2962
2963    .bdrv_snapshot_create   = sd_snapshot_create,
2964    .bdrv_snapshot_goto     = sd_snapshot_goto,
2965    .bdrv_snapshot_delete   = sd_snapshot_delete,
2966    .bdrv_snapshot_list     = sd_snapshot_list,
2967
2968    .bdrv_save_vmstate  = sd_save_vmstate,
2969    .bdrv_load_vmstate  = sd_load_vmstate,
2970
2971    .bdrv_detach_aio_context = sd_detach_aio_context,
2972    .bdrv_attach_aio_context = sd_attach_aio_context,
2973
2974    .create_opts    = &sd_create_opts,
2975};
2976
2977static BlockDriver bdrv_sheepdog_tcp = {
2978    .format_name    = "sheepdog",
2979    .protocol_name  = "sheepdog+tcp",
2980    .instance_size  = sizeof(BDRVSheepdogState),
2981    .bdrv_needs_filename = true,
2982    .bdrv_file_open = sd_open,
2983    .bdrv_reopen_prepare    = sd_reopen_prepare,
2984    .bdrv_reopen_commit     = sd_reopen_commit,
2985    .bdrv_reopen_abort      = sd_reopen_abort,
2986    .bdrv_close     = sd_close,
2987    .bdrv_create    = sd_create,
2988    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2989    .bdrv_getlength = sd_getlength,
2990    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2991    .bdrv_truncate  = sd_truncate,
2992
2993    .bdrv_co_readv  = sd_co_readv,
2994    .bdrv_co_writev = sd_co_writev,
2995    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2996    .bdrv_co_pdiscard = sd_co_pdiscard,
2997    .bdrv_co_get_block_status = sd_co_get_block_status,
2998
2999    .bdrv_snapshot_create   = sd_snapshot_create,
3000    .bdrv_snapshot_goto     = sd_snapshot_goto,
3001    .bdrv_snapshot_delete   = sd_snapshot_delete,
3002    .bdrv_snapshot_list     = sd_snapshot_list,
3003
3004    .bdrv_save_vmstate  = sd_save_vmstate,
3005    .bdrv_load_vmstate  = sd_load_vmstate,
3006
3007    .bdrv_detach_aio_context = sd_detach_aio_context,
3008    .bdrv_attach_aio_context = sd_attach_aio_context,
3009
3010    .create_opts    = &sd_create_opts,
3011};
3012
3013static BlockDriver bdrv_sheepdog_unix = {
3014    .format_name    = "sheepdog",
3015    .protocol_name  = "sheepdog+unix",
3016    .instance_size  = sizeof(BDRVSheepdogState),
3017    .bdrv_needs_filename = true,
3018    .bdrv_file_open = sd_open,
3019    .bdrv_reopen_prepare    = sd_reopen_prepare,
3020    .bdrv_reopen_commit     = sd_reopen_commit,
3021    .bdrv_reopen_abort      = sd_reopen_abort,
3022    .bdrv_close     = sd_close,
3023    .bdrv_create    = sd_create,
3024    .bdrv_has_zero_init = bdrv_has_zero_init_1,
3025    .bdrv_getlength = sd_getlength,
3026    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3027    .bdrv_truncate  = sd_truncate,
3028
3029    .bdrv_co_readv  = sd_co_readv,
3030    .bdrv_co_writev = sd_co_writev,
3031    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
3032    .bdrv_co_pdiscard = sd_co_pdiscard,
3033    .bdrv_co_get_block_status = sd_co_get_block_status,
3034
3035    .bdrv_snapshot_create   = sd_snapshot_create,
3036    .bdrv_snapshot_goto     = sd_snapshot_goto,
3037    .bdrv_snapshot_delete   = sd_snapshot_delete,
3038    .bdrv_snapshot_list     = sd_snapshot_list,
3039
3040    .bdrv_save_vmstate  = sd_save_vmstate,
3041    .bdrv_load_vmstate  = sd_load_vmstate,
3042
3043    .bdrv_detach_aio_context = sd_detach_aio_context,
3044    .bdrv_attach_aio_context = sd_attach_aio_context,
3045
3046    .create_opts    = &sd_create_opts,
3047};
3048
3049static void bdrv_sheepdog_init(void)
3050{
3051    bdrv_register(&bdrv_sheepdog);
3052    bdrv_register(&bdrv_sheepdog_tcp);
3053    bdrv_register(&bdrv_sheepdog_unix);
3054}
3055block_init(bdrv_sheepdog_init);
3056