qemu/block/sheepdog.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public License version
   6 * 2 as published by the Free Software Foundation.
   7 *
   8 * You should have received a copy of the GNU General Public License
   9 * along with this program. If not, see <http://www.gnu.org/licenses/>.
  10 *
  11 * Contributions after 2012-01-13 are licensed under the terms of the
  12 * GNU GPL, version 2 or (at your option) any later version.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qapi/error.h"
  17#include "qemu/uri.h"
  18#include "qemu/error-report.h"
  19#include "qemu/sockets.h"
  20#include "block/block_int.h"
  21#include "sysemu/block-backend.h"
  22#include "qemu/bitops.h"
  23#include "qemu/cutils.h"
  24
  25#define SD_PROTO_VER 0x01
  26
  27#define SD_DEFAULT_ADDR "localhost"
  28#define SD_DEFAULT_PORT 7000
  29
  30#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
  31#define SD_OP_READ_OBJ       0x02
  32#define SD_OP_WRITE_OBJ      0x03
  33/* 0x04 is used internally by Sheepdog */
  34
  35#define SD_OP_NEW_VDI        0x11
  36#define SD_OP_LOCK_VDI       0x12
  37#define SD_OP_RELEASE_VDI    0x13
  38#define SD_OP_GET_VDI_INFO   0x14
  39#define SD_OP_READ_VDIS      0x15
  40#define SD_OP_FLUSH_VDI      0x16
  41#define SD_OP_DEL_VDI        0x17
  42#define SD_OP_GET_CLUSTER_DEFAULT   0x18
  43
  44#define SD_FLAG_CMD_WRITE    0x01
  45#define SD_FLAG_CMD_COW      0x02
  46#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
  47#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
  48
  49#define SD_RES_SUCCESS       0x00 /* Success */
  50#define SD_RES_UNKNOWN       0x01 /* Unknown error */
  51#define SD_RES_NO_OBJ        0x02 /* No object found */
  52#define SD_RES_EIO           0x03 /* I/O error */
  53#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
  54#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
  55#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
  56#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
  57#define SD_RES_NO_VDI        0x08 /* No vdi found */
  58#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
  59#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
  60#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
  61#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
  62#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
  63#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
  64#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
  65#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
  66#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
  67#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
  68#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
  69#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
  70#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
  71#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
  72#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
  73#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
  74#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
  75#define SD_RES_READONLY      0x1A /* Object is read-only */
  76
  77/*
  78 * Object ID rules
  79 *
  80 *  0 - 19 (20 bits): data object space
  81 * 20 - 31 (12 bits): reserved data object space
  82 * 32 - 55 (24 bits): vdi object space
  83 * 56 - 59 ( 4 bits): reserved vdi object space
  84 * 60 - 63 ( 4 bits): object type identifier space
  85 */
  86
  87#define VDI_SPACE_SHIFT   32
  88#define VDI_BIT (UINT64_C(1) << 63)
  89#define VMSTATE_BIT (UINT64_C(1) << 62)
  90#define MAX_DATA_OBJS (UINT64_C(1) << 20)
  91#define MAX_CHILDREN 1024
  92#define SD_MAX_VDI_LEN 256
  93#define SD_MAX_VDI_TAG_LEN 256
  94#define SD_NR_VDIS   (1U << 24)
  95#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
  96#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
  97#define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
  98/*
  99 * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
 100 * (SD_EC_MAX_STRIP - 1) for parity strips
 101 *
 102 * SD_MAX_COPIES is sum of number of data strips and parity strips.
 103 */
 104#define SD_EC_MAX_STRIP 16
 105#define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
 106
 107#define SD_INODE_SIZE (sizeof(SheepdogInode))
 108#define CURRENT_VDI_ID 0
 109
 110#define LOCK_TYPE_NORMAL 0
 111#define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
 112
 113typedef struct SheepdogReq {
 114    uint8_t proto_ver;
 115    uint8_t opcode;
 116    uint16_t flags;
 117    uint32_t epoch;
 118    uint32_t id;
 119    uint32_t data_length;
 120    uint32_t opcode_specific[8];
 121} SheepdogReq;
 122
 123typedef struct SheepdogRsp {
 124    uint8_t proto_ver;
 125    uint8_t opcode;
 126    uint16_t flags;
 127    uint32_t epoch;
 128    uint32_t id;
 129    uint32_t data_length;
 130    uint32_t result;
 131    uint32_t opcode_specific[7];
 132} SheepdogRsp;
 133
 134typedef struct SheepdogObjReq {
 135    uint8_t proto_ver;
 136    uint8_t opcode;
 137    uint16_t flags;
 138    uint32_t epoch;
 139    uint32_t id;
 140    uint32_t data_length;
 141    uint64_t oid;
 142    uint64_t cow_oid;
 143    uint8_t copies;
 144    uint8_t copy_policy;
 145    uint8_t reserved[6];
 146    uint64_t offset;
 147} SheepdogObjReq;
 148
 149typedef struct SheepdogObjRsp {
 150    uint8_t proto_ver;
 151    uint8_t opcode;
 152    uint16_t flags;
 153    uint32_t epoch;
 154    uint32_t id;
 155    uint32_t data_length;
 156    uint32_t result;
 157    uint8_t copies;
 158    uint8_t copy_policy;
 159    uint8_t reserved[2];
 160    uint32_t pad[6];
 161} SheepdogObjRsp;
 162
 163typedef struct SheepdogVdiReq {
 164    uint8_t proto_ver;
 165    uint8_t opcode;
 166    uint16_t flags;
 167    uint32_t epoch;
 168    uint32_t id;
 169    uint32_t data_length;
 170    uint64_t vdi_size;
 171    uint32_t base_vdi_id;
 172    uint8_t copies;
 173    uint8_t copy_policy;
 174    uint8_t store_policy;
 175    uint8_t block_size_shift;
 176    uint32_t snapid;
 177    uint32_t type;
 178    uint32_t pad[2];
 179} SheepdogVdiReq;
 180
 181typedef struct SheepdogVdiRsp {
 182    uint8_t proto_ver;
 183    uint8_t opcode;
 184    uint16_t flags;
 185    uint32_t epoch;
 186    uint32_t id;
 187    uint32_t data_length;
 188    uint32_t result;
 189    uint32_t rsvd;
 190    uint32_t vdi_id;
 191    uint32_t pad[5];
 192} SheepdogVdiRsp;
 193
 194typedef struct SheepdogClusterRsp {
 195    uint8_t proto_ver;
 196    uint8_t opcode;
 197    uint16_t flags;
 198    uint32_t epoch;
 199    uint32_t id;
 200    uint32_t data_length;
 201    uint32_t result;
 202    uint8_t nr_copies;
 203    uint8_t copy_policy;
 204    uint8_t block_size_shift;
 205    uint8_t __pad1;
 206    uint32_t __pad2[6];
 207} SheepdogClusterRsp;
 208
 209typedef struct SheepdogInode {
 210    char name[SD_MAX_VDI_LEN];
 211    char tag[SD_MAX_VDI_TAG_LEN];
 212    uint64_t ctime;
 213    uint64_t snap_ctime;
 214    uint64_t vm_clock_nsec;
 215    uint64_t vdi_size;
 216    uint64_t vm_state_size;
 217    uint16_t copy_policy;
 218    uint8_t nr_copies;
 219    uint8_t block_size_shift;
 220    uint32_t snap_id;
 221    uint32_t vdi_id;
 222    uint32_t parent_vdi_id;
 223    uint32_t child_vdi_id[MAX_CHILDREN];
 224    uint32_t data_vdi_id[MAX_DATA_OBJS];
 225} SheepdogInode;
 226
 227#define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
 228
 229/*
 230 * 64 bit FNV-1a non-zero initial basis
 231 */
 232#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
 233
 234/*
 235 * 64 bit Fowler/Noll/Vo FNV-1a hash code
 236 */
 237static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 238{
 239    unsigned char *bp = buf;
 240    unsigned char *be = bp + len;
 241    while (bp < be) {
 242        hval ^= (uint64_t) *bp++;
 243        hval += (hval << 1) + (hval << 4) + (hval << 5) +
 244            (hval << 7) + (hval << 8) + (hval << 40);
 245    }
 246    return hval;
 247}
 248
 249static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
 250{
 251    return inode->vdi_id == inode->data_vdi_id[idx];
 252}
 253
 254static inline bool is_data_obj(uint64_t oid)
 255{
 256    return !(VDI_BIT & oid);
 257}
 258
 259static inline uint64_t data_oid_to_idx(uint64_t oid)
 260{
 261    return oid & (MAX_DATA_OBJS - 1);
 262}
 263
 264static inline uint32_t oid_to_vid(uint64_t oid)
 265{
 266    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
 267}
 268
 269static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 270{
 271    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
 272}
 273
 274static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
 275{
 276    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 277}
 278
 279static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
 280{
 281    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 282}
 283
 284static inline bool is_snapshot(struct SheepdogInode *inode)
 285{
 286    return !!inode->snap_ctime;
 287}
 288
 289static inline size_t count_data_objs(const struct SheepdogInode *inode)
 290{
 291    return DIV_ROUND_UP(inode->vdi_size,
 292                        (1UL << inode->block_size_shift));
 293}
 294
 295#undef DPRINTF
 296#ifdef DEBUG_SDOG
 297#define DPRINTF(fmt, args...)                                       \
 298    do {                                                            \
 299        fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
 300    } while (0)
 301#else
 302#define DPRINTF(fmt, args...)
 303#endif
 304
 305typedef struct SheepdogAIOCB SheepdogAIOCB;
 306
 307typedef struct AIOReq {
 308    SheepdogAIOCB *aiocb;
 309    unsigned int iov_offset;
 310
 311    uint64_t oid;
 312    uint64_t base_oid;
 313    uint64_t offset;
 314    unsigned int data_len;
 315    uint8_t flags;
 316    uint32_t id;
 317    bool create;
 318
 319    QLIST_ENTRY(AIOReq) aio_siblings;
 320} AIOReq;
 321
 322enum AIOCBState {
 323    AIOCB_WRITE_UDATA,
 324    AIOCB_READ_UDATA,
 325    AIOCB_FLUSH_CACHE,
 326    AIOCB_DISCARD_OBJ,
 327};
 328
 329#define AIOCBOverlapping(x, y)                                 \
 330    (!(x->max_affect_data_idx < y->min_affect_data_idx          \
 331       || y->max_affect_data_idx < x->min_affect_data_idx))
 332
 333struct SheepdogAIOCB {
 334    BlockAIOCB common;
 335
 336    QEMUIOVector *qiov;
 337
 338    int64_t sector_num;
 339    int nb_sectors;
 340
 341    int ret;
 342    enum AIOCBState aiocb_type;
 343
 344    Coroutine *coroutine;
 345    void (*aio_done_func)(SheepdogAIOCB *);
 346
 347    bool cancelable;
 348    int nr_pending;
 349
 350    uint32_t min_affect_data_idx;
 351    uint32_t max_affect_data_idx;
 352
 353    /*
 354     * The difference between affect_data_idx and dirty_data_idx:
 355     * affect_data_idx represents range of index of all request types.
 356     * dirty_data_idx represents range of index updated by COW requests.
 357     * dirty_data_idx is used for updating an inode object.
 358     */
 359    uint32_t min_dirty_data_idx;
 360    uint32_t max_dirty_data_idx;
 361
 362    QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
 363};
 364
 365typedef struct BDRVSheepdogState {
 366    BlockDriverState *bs;
 367    AioContext *aio_context;
 368
 369    SheepdogInode inode;
 370
 371    char name[SD_MAX_VDI_LEN];
 372    bool is_snapshot;
 373    uint32_t cache_flags;
 374    bool discard_supported;
 375
 376    char *host_spec;
 377    bool is_unix;
 378    int fd;
 379
 380    CoMutex lock;
 381    Coroutine *co_send;
 382    Coroutine *co_recv;
 383
 384    uint32_t aioreq_seq_num;
 385
 386    /* Every aio request must be linked to either of these queues. */
 387    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
 388    QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
 389
 390    CoQueue overlapping_queue;
 391    QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
 392} BDRVSheepdogState;
 393
 394typedef struct BDRVSheepdogReopenState {
 395    int fd;
 396    int cache_flags;
 397} BDRVSheepdogReopenState;
 398
 399static const char * sd_strerror(int err)
 400{
 401    int i;
 402
 403    static const struct {
 404        int err;
 405        const char *desc;
 406    } errors[] = {
 407        {SD_RES_SUCCESS, "Success"},
 408        {SD_RES_UNKNOWN, "Unknown error"},
 409        {SD_RES_NO_OBJ, "No object found"},
 410        {SD_RES_EIO, "I/O error"},
 411        {SD_RES_VDI_EXIST, "VDI exists already"},
 412        {SD_RES_INVALID_PARMS, "Invalid parameters"},
 413        {SD_RES_SYSTEM_ERROR, "System error"},
 414        {SD_RES_VDI_LOCKED, "VDI is already locked"},
 415        {SD_RES_NO_VDI, "No vdi found"},
 416        {SD_RES_NO_BASE_VDI, "No base VDI found"},
 417        {SD_RES_VDI_READ, "Failed read the requested VDI"},
 418        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
 419        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
 420        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
 421        {SD_RES_NO_TAG, "Failed to find the requested tag"},
 422        {SD_RES_STARTUP, "The system is still booting"},
 423        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
 424        {SD_RES_SHUTDOWN, "The system is shutting down"},
 425        {SD_RES_NO_MEM, "Out of memory on the server"},
 426        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
 427        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
 428        {SD_RES_NO_SPACE, "Server has no space for new objects"},
 429        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
 430        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
 431        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
 432        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
 433        {SD_RES_READONLY, "Object is read-only"},
 434    };
 435
 436    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
 437        if (errors[i].err == err) {
 438            return errors[i].desc;
 439        }
 440    }
 441
 442    return "Invalid error code";
 443}
 444
 445/*
 446 * Sheepdog I/O handling:
 447 *
 448 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
 449 *    link the requests to the inflight_list in the
 450 *    BDRVSheepdogState.  The function exits without waiting for
 451 *    receiving the response.
 452 *
 453 * 2. We receive the response in aio_read_response, the fd handler to
 454 *    the sheepdog connection.  If metadata update is needed, we send
 455 *    the write request to the vdi object in sd_write_done, the write
 456 *    completion function.  We switch back to sd_co_readv/writev after
 457 *    all the requests belonging to the AIOCB are finished.
 458 */
 459
 460static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
 461                                    uint64_t oid, unsigned int data_len,
 462                                    uint64_t offset, uint8_t flags, bool create,
 463                                    uint64_t base_oid, unsigned int iov_offset)
 464{
 465    AIOReq *aio_req;
 466
 467    aio_req = g_malloc(sizeof(*aio_req));
 468    aio_req->aiocb = acb;
 469    aio_req->iov_offset = iov_offset;
 470    aio_req->oid = oid;
 471    aio_req->base_oid = base_oid;
 472    aio_req->offset = offset;
 473    aio_req->data_len = data_len;
 474    aio_req->flags = flags;
 475    aio_req->id = s->aioreq_seq_num++;
 476    aio_req->create = create;
 477
 478    acb->nr_pending++;
 479    return aio_req;
 480}
 481
 482static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 483{
 484    SheepdogAIOCB *acb = aio_req->aiocb;
 485
 486    acb->cancelable = false;
 487    QLIST_REMOVE(aio_req, aio_siblings);
 488    g_free(aio_req);
 489
 490    acb->nr_pending--;
 491}
 492
 493static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
 494{
 495    qemu_coroutine_enter(acb->coroutine, NULL);
 496    qemu_aio_unref(acb);
 497}
 498
 499/*
 500 * Check whether the specified acb can be canceled
 501 *
 502 * We can cancel aio when any request belonging to the acb is:
 503 *  - Not processed by the sheepdog server.
 504 *  - Not linked to the inflight queue.
 505 */
 506static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
 507{
 508    BDRVSheepdogState *s = acb->common.bs->opaque;
 509    AIOReq *aioreq;
 510
 511    if (!acb->cancelable) {
 512        return false;
 513    }
 514
 515    QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
 516        if (aioreq->aiocb == acb) {
 517            return false;
 518        }
 519    }
 520
 521    return true;
 522}
 523
 524static void sd_aio_cancel(BlockAIOCB *blockacb)
 525{
 526    SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
 527    BDRVSheepdogState *s = acb->common.bs->opaque;
 528    AIOReq *aioreq, *next;
 529
 530    if (sd_acb_cancelable(acb)) {
 531        /* Remove outstanding requests from failed queue.  */
 532        QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
 533                           next) {
 534            if (aioreq->aiocb == acb) {
 535                free_aio_req(s, aioreq);
 536            }
 537        }
 538
 539        assert(acb->nr_pending == 0);
 540        if (acb->common.cb) {
 541            acb->common.cb(acb->common.opaque, -ECANCELED);
 542        }
 543        sd_finish_aiocb(acb);
 544    }
 545}
 546
 547static const AIOCBInfo sd_aiocb_info = {
 548    .aiocb_size     = sizeof(SheepdogAIOCB),
 549    .cancel_async   = sd_aio_cancel,
 550};
 551
 552static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
 553                                   int64_t sector_num, int nb_sectors)
 554{
 555    SheepdogAIOCB *acb;
 556    uint32_t object_size;
 557    BDRVSheepdogState *s = bs->opaque;
 558
 559    object_size = (UINT32_C(1) << s->inode.block_size_shift);
 560
 561    acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
 562
 563    acb->qiov = qiov;
 564
 565    acb->sector_num = sector_num;
 566    acb->nb_sectors = nb_sectors;
 567
 568    acb->aio_done_func = NULL;
 569    acb->cancelable = true;
 570    acb->coroutine = qemu_coroutine_self();
 571    acb->ret = 0;
 572    acb->nr_pending = 0;
 573
 574    acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
 575    acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
 576                              acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
 577
 578    acb->min_dirty_data_idx = UINT32_MAX;
 579    acb->max_dirty_data_idx = 0;
 580
 581    return acb;
 582}
 583
 584/* Return -EIO in case of error, file descriptor on success */
 585static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
 586{
 587    int fd;
 588
 589    if (s->is_unix) {
 590        fd = unix_connect(s->host_spec, errp);
 591    } else {
 592        fd = inet_connect(s->host_spec, errp);
 593
 594        if (fd >= 0) {
 595            int ret = socket_set_nodelay(fd);
 596            if (ret < 0) {
 597                error_report("%s", strerror(errno));
 598            }
 599        }
 600    }
 601
 602    if (fd >= 0) {
 603        qemu_set_nonblock(fd);
 604    } else {
 605        fd = -EIO;
 606    }
 607
 608    return fd;
 609}
 610
 611/* Return 0 on success and -errno in case of error */
 612static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
 613                                    unsigned int *wlen)
 614{
 615    int ret;
 616
 617    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
 618    if (ret != sizeof(*hdr)) {
 619        error_report("failed to send a req, %s", strerror(errno));
 620        return -errno;
 621    }
 622
 623    ret = qemu_co_send(sockfd, data, *wlen);
 624    if (ret != *wlen) {
 625        error_report("failed to send a req, %s", strerror(errno));
 626        return -errno;
 627    }
 628
 629    return ret;
 630}
 631
 632static void restart_co_req(void *opaque)
 633{
 634    Coroutine *co = opaque;
 635
 636    qemu_coroutine_enter(co, NULL);
 637}
 638
 639typedef struct SheepdogReqCo {
 640    int sockfd;
 641    AioContext *aio_context;
 642    SheepdogReq *hdr;
 643    void *data;
 644    unsigned int *wlen;
 645    unsigned int *rlen;
 646    int ret;
 647    bool finished;
 648} SheepdogReqCo;
 649
 650static coroutine_fn void do_co_req(void *opaque)
 651{
 652    int ret;
 653    Coroutine *co;
 654    SheepdogReqCo *srco = opaque;
 655    int sockfd = srco->sockfd;
 656    SheepdogReq *hdr = srco->hdr;
 657    void *data = srco->data;
 658    unsigned int *wlen = srco->wlen;
 659    unsigned int *rlen = srco->rlen;
 660
 661    co = qemu_coroutine_self();
 662    aio_set_fd_handler(srco->aio_context, sockfd, false,
 663                       NULL, restart_co_req, co);
 664
 665    ret = send_co_req(sockfd, hdr, data, wlen);
 666    if (ret < 0) {
 667        goto out;
 668    }
 669
 670    aio_set_fd_handler(srco->aio_context, sockfd, false,
 671                       restart_co_req, NULL, co);
 672
 673    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
 674    if (ret != sizeof(*hdr)) {
 675        error_report("failed to get a rsp, %s", strerror(errno));
 676        ret = -errno;
 677        goto out;
 678    }
 679
 680    if (*rlen > hdr->data_length) {
 681        *rlen = hdr->data_length;
 682    }
 683
 684    if (*rlen) {
 685        ret = qemu_co_recv(sockfd, data, *rlen);
 686        if (ret != *rlen) {
 687            error_report("failed to get the data, %s", strerror(errno));
 688            ret = -errno;
 689            goto out;
 690        }
 691    }
 692    ret = 0;
 693out:
 694    /* there is at most one request for this sockfd, so it is safe to
 695     * set each handler to NULL. */
 696    aio_set_fd_handler(srco->aio_context, sockfd, false,
 697                       NULL, NULL, NULL);
 698
 699    srco->ret = ret;
 700    srco->finished = true;
 701}
 702
 703/*
 704 * Send the request to the sheep in a synchronous manner.
 705 *
 706 * Return 0 on success, -errno in case of error.
 707 */
 708static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
 709                  void *data, unsigned int *wlen, unsigned int *rlen)
 710{
 711    Coroutine *co;
 712    SheepdogReqCo srco = {
 713        .sockfd = sockfd,
 714        .aio_context = aio_context,
 715        .hdr = hdr,
 716        .data = data,
 717        .wlen = wlen,
 718        .rlen = rlen,
 719        .ret = 0,
 720        .finished = false,
 721    };
 722
 723    if (qemu_in_coroutine()) {
 724        do_co_req(&srco);
 725    } else {
 726        co = qemu_coroutine_create(do_co_req);
 727        qemu_coroutine_enter(co, &srco);
 728        while (!srco.finished) {
 729            aio_poll(aio_context, true);
 730        }
 731    }
 732
 733    return srco.ret;
 734}
 735
 736static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 737                                         struct iovec *iov, int niov,
 738                                         enum AIOCBState aiocb_type);
 739static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
 740static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
 741static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
 742static void co_write_request(void *opaque);
 743
 744static coroutine_fn void reconnect_to_sdog(void *opaque)
 745{
 746    BDRVSheepdogState *s = opaque;
 747    AIOReq *aio_req, *next;
 748
 749    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
 750                       NULL, NULL);
 751    close(s->fd);
 752    s->fd = -1;
 753
 754    /* Wait for outstanding write requests to be completed. */
 755    while (s->co_send != NULL) {
 756        co_write_request(opaque);
 757    }
 758
 759    /* Try to reconnect the sheepdog server every one second. */
 760    while (s->fd < 0) {
 761        Error *local_err = NULL;
 762        s->fd = get_sheep_fd(s, &local_err);
 763        if (s->fd < 0) {
 764            DPRINTF("Wait for connection to be established\n");
 765            error_report_err(local_err);
 766            co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME,
 767                            1000000000ULL);
 768        }
 769    };
 770
 771    /*
 772     * Now we have to resend all the request in the inflight queue.  However,
 773     * resend_aioreq() can yield and newly created requests can be added to the
 774     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
 775     * have to move all the inflight requests to the failed queue before
 776     * resend_aioreq() is called.
 777     */
 778    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
 779        QLIST_REMOVE(aio_req, aio_siblings);
 780        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
 781    }
 782
 783    /* Resend all the failed aio requests. */
 784    while (!QLIST_EMPTY(&s->failed_aio_head)) {
 785        aio_req = QLIST_FIRST(&s->failed_aio_head);
 786        QLIST_REMOVE(aio_req, aio_siblings);
 787        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
 788        resend_aioreq(s, aio_req);
 789    }
 790}
 791
 792/*
 793 * Receive responses of the I/O requests.
 794 *
 795 * This function is registered as a fd handler, and called from the
 796 * main loop when s->fd is ready for reading responses.
 797 */
 798static void coroutine_fn aio_read_response(void *opaque)
 799{
 800    SheepdogObjRsp rsp;
 801    BDRVSheepdogState *s = opaque;
 802    int fd = s->fd;
 803    int ret;
 804    AIOReq *aio_req = NULL;
 805    SheepdogAIOCB *acb;
 806    uint64_t idx;
 807
 808    /* read a header */
 809    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
 810    if (ret != sizeof(rsp)) {
 811        error_report("failed to get the header, %s", strerror(errno));
 812        goto err;
 813    }
 814
 815    /* find the right aio_req from the inflight aio list */
 816    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
 817        if (aio_req->id == rsp.id) {
 818            break;
 819        }
 820    }
 821    if (!aio_req) {
 822        error_report("cannot find aio_req %x", rsp.id);
 823        goto err;
 824    }
 825
 826    acb = aio_req->aiocb;
 827
 828    switch (acb->aiocb_type) {
 829    case AIOCB_WRITE_UDATA:
 830        /* this coroutine context is no longer suitable for co_recv
 831         * because we may send data to update vdi objects */
 832        s->co_recv = NULL;
 833        if (!is_data_obj(aio_req->oid)) {
 834            break;
 835        }
 836        idx = data_oid_to_idx(aio_req->oid);
 837
 838        if (aio_req->create) {
 839            /*
 840             * If the object is newly created one, we need to update
 841             * the vdi object (metadata object).  min_dirty_data_idx
 842             * and max_dirty_data_idx are changed to include updated
 843             * index between them.
 844             */
 845            if (rsp.result == SD_RES_SUCCESS) {
 846                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
 847                acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
 848                acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
 849            }
 850        }
 851        break;
 852    case AIOCB_READ_UDATA:
 853        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
 854                            aio_req->iov_offset, rsp.data_length);
 855        if (ret != rsp.data_length) {
 856            error_report("failed to get the data, %s", strerror(errno));
 857            goto err;
 858        }
 859        break;
 860    case AIOCB_FLUSH_CACHE:
 861        if (rsp.result == SD_RES_INVALID_PARMS) {
 862            DPRINTF("disable cache since the server doesn't support it\n");
 863            s->cache_flags = SD_FLAG_CMD_DIRECT;
 864            rsp.result = SD_RES_SUCCESS;
 865        }
 866        break;
 867    case AIOCB_DISCARD_OBJ:
 868        switch (rsp.result) {
 869        case SD_RES_INVALID_PARMS:
 870            error_report("sheep(%s) doesn't support discard command",
 871                         s->host_spec);
 872            rsp.result = SD_RES_SUCCESS;
 873            s->discard_supported = false;
 874            break;
 875        default:
 876            break;
 877        }
 878    }
 879
 880    switch (rsp.result) {
 881    case SD_RES_SUCCESS:
 882        break;
 883    case SD_RES_READONLY:
 884        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
 885            ret = reload_inode(s, 0, "");
 886            if (ret < 0) {
 887                goto err;
 888            }
 889        }
 890        if (is_data_obj(aio_req->oid)) {
 891            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
 892                                           data_oid_to_idx(aio_req->oid));
 893        } else {
 894            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
 895        }
 896        resend_aioreq(s, aio_req);
 897        goto out;
 898    default:
 899        acb->ret = -EIO;
 900        error_report("%s", sd_strerror(rsp.result));
 901        break;
 902    }
 903
 904    free_aio_req(s, aio_req);
 905    if (!acb->nr_pending) {
 906        /*
 907         * We've finished all requests which belong to the AIOCB, so
 908         * we can switch back to sd_co_readv/writev now.
 909         */
 910        acb->aio_done_func(acb);
 911    }
 912out:
 913    s->co_recv = NULL;
 914    return;
 915err:
 916    s->co_recv = NULL;
 917    reconnect_to_sdog(opaque);
 918}
 919
 920static void co_read_response(void *opaque)
 921{
 922    BDRVSheepdogState *s = opaque;
 923
 924    if (!s->co_recv) {
 925        s->co_recv = qemu_coroutine_create(aio_read_response);
 926    }
 927
 928    qemu_coroutine_enter(s->co_recv, opaque);
 929}
 930
 931static void co_write_request(void *opaque)
 932{
 933    BDRVSheepdogState *s = opaque;
 934
 935    qemu_coroutine_enter(s->co_send, NULL);
 936}
 937
 938/*
 939 * Return a socket descriptor to read/write objects.
 940 *
 941 * We cannot use this descriptor for other operations because
 942 * the block driver may be on waiting response from the server.
 943 */
 944static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
 945{
 946    int fd;
 947
 948    fd = connect_to_sdog(s, errp);
 949    if (fd < 0) {
 950        return fd;
 951    }
 952
 953    aio_set_fd_handler(s->aio_context, fd, false,
 954                       co_read_response, NULL, s);
 955    return fd;
 956}
 957
 958static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
 959                        char *vdi, uint32_t *snapid, char *tag)
 960{
 961    URI *uri;
 962    QueryParams *qp = NULL;
 963    int ret = 0;
 964
 965    uri = uri_parse(filename);
 966    if (!uri) {
 967        return -EINVAL;
 968    }
 969
 970    /* transport */
 971    if (!strcmp(uri->scheme, "sheepdog")) {
 972        s->is_unix = false;
 973    } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
 974        s->is_unix = false;
 975    } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
 976        s->is_unix = true;
 977    } else {
 978        ret = -EINVAL;
 979        goto out;
 980    }
 981
 982    if (uri->path == NULL || !strcmp(uri->path, "/")) {
 983        ret = -EINVAL;
 984        goto out;
 985    }
 986    pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
 987
 988    qp = query_params_parse(uri->query);
 989    if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
 990        ret = -EINVAL;
 991        goto out;
 992    }
 993
 994    if (s->is_unix) {
 995        /* sheepdog+unix:///vdiname?socket=path */
 996        if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
 997            ret = -EINVAL;
 998            goto out;
 999        }
1000        s->host_spec = g_strdup(qp->p[0].value);
1001    } else {
1002        /* sheepdog[+tcp]://[host:port]/vdiname */
1003        s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
1004                                       uri->port ?: SD_DEFAULT_PORT);
1005    }
1006
1007    /* snapshot tag */
1008    if (uri->fragment) {
1009        *snapid = strtoul(uri->fragment, NULL, 10);
1010        if (*snapid == 0) {
1011            pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
1012        }
1013    } else {
1014        *snapid = CURRENT_VDI_ID; /* search current vdi */
1015    }
1016
1017out:
1018    if (qp) {
1019        query_params_free(qp);
1020    }
1021    uri_free(uri);
1022    return ret;
1023}
1024
1025/*
1026 * Parse a filename (old syntax)
1027 *
1028 * filename must be one of the following formats:
1029 *   1. [vdiname]
1030 *   2. [vdiname]:[snapid]
1031 *   3. [vdiname]:[tag]
1032 *   4. [hostname]:[port]:[vdiname]
1033 *   5. [hostname]:[port]:[vdiname]:[snapid]
1034 *   6. [hostname]:[port]:[vdiname]:[tag]
1035 *
1036 * You can boot from the snapshot images by specifying `snapid` or
1037 * `tag'.
1038 *
1039 * You can run VMs outside the Sheepdog cluster by specifying
1040 * `hostname' and `port' (experimental).
1041 */
1042static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
1043                         char *vdi, uint32_t *snapid, char *tag)
1044{
1045    char *p, *q, *uri;
1046    const char *host_spec, *vdi_spec;
1047    int nr_sep, ret;
1048
1049    strstart(filename, "sheepdog:", (const char **)&filename);
1050    p = q = g_strdup(filename);
1051
1052    /* count the number of separators */
1053    nr_sep = 0;
1054    while (*p) {
1055        if (*p == ':') {
1056            nr_sep++;
1057        }
1058        p++;
1059    }
1060    p = q;
1061
1062    /* use the first two tokens as host_spec. */
1063    if (nr_sep >= 2) {
1064        host_spec = p;
1065        p = strchr(p, ':');
1066        p++;
1067        p = strchr(p, ':');
1068        *p++ = '\0';
1069    } else {
1070        host_spec = "";
1071    }
1072
1073    vdi_spec = p;
1074
1075    p = strchr(vdi_spec, ':');
1076    if (p) {
1077        *p++ = '#';
1078    }
1079
1080    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1081
1082    ret = sd_parse_uri(s, uri, vdi, snapid, tag);
1083
1084    g_free(q);
1085    g_free(uri);
1086
1087    return ret;
1088}
1089
1090static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1091                         uint32_t snapid, const char *tag, uint32_t *vid,
1092                         bool lock, Error **errp)
1093{
1094    int ret, fd;
1095    SheepdogVdiReq hdr;
1096    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1097    unsigned int wlen, rlen = 0;
1098    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
1099
1100    fd = connect_to_sdog(s, errp);
1101    if (fd < 0) {
1102        return fd;
1103    }
1104
1105    /* This pair of strncpy calls ensures that the buffer is zero-filled,
1106     * which is desirable since we'll soon be sending those bytes, and
1107     * don't want the send_req to read uninitialized data.
1108     */
1109    strncpy(buf, filename, SD_MAX_VDI_LEN);
1110    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1111
1112    memset(&hdr, 0, sizeof(hdr));
1113    if (lock) {
1114        hdr.opcode = SD_OP_LOCK_VDI;
1115        hdr.type = LOCK_TYPE_NORMAL;
1116    } else {
1117        hdr.opcode = SD_OP_GET_VDI_INFO;
1118    }
1119    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1120    hdr.proto_ver = SD_PROTO_VER;
1121    hdr.data_length = wlen;
1122    hdr.snapid = snapid;
1123    hdr.flags = SD_FLAG_CMD_WRITE;
1124
1125    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1126    if (ret) {
1127        error_setg_errno(errp, -ret, "cannot get vdi info");
1128        goto out;
1129    }
1130
1131    if (rsp->result != SD_RES_SUCCESS) {
1132        error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1133                   sd_strerror(rsp->result), filename, snapid, tag);
1134        if (rsp->result == SD_RES_NO_VDI) {
1135            ret = -ENOENT;
1136        } else if (rsp->result == SD_RES_VDI_LOCKED) {
1137            ret = -EBUSY;
1138        } else {
1139            ret = -EIO;
1140        }
1141        goto out;
1142    }
1143    *vid = rsp->vdi_id;
1144
1145    ret = 0;
1146out:
1147    closesocket(fd);
1148    return ret;
1149}
1150
1151static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1152                                         struct iovec *iov, int niov,
1153                                         enum AIOCBState aiocb_type)
1154{
1155    int nr_copies = s->inode.nr_copies;
1156    SheepdogObjReq hdr;
1157    unsigned int wlen = 0;
1158    int ret;
1159    uint64_t oid = aio_req->oid;
1160    unsigned int datalen = aio_req->data_len;
1161    uint64_t offset = aio_req->offset;
1162    uint8_t flags = aio_req->flags;
1163    uint64_t old_oid = aio_req->base_oid;
1164    bool create = aio_req->create;
1165
1166    if (!nr_copies) {
1167        error_report("bug");
1168    }
1169
1170    memset(&hdr, 0, sizeof(hdr));
1171
1172    switch (aiocb_type) {
1173    case AIOCB_FLUSH_CACHE:
1174        hdr.opcode = SD_OP_FLUSH_VDI;
1175        break;
1176    case AIOCB_READ_UDATA:
1177        hdr.opcode = SD_OP_READ_OBJ;
1178        hdr.flags = flags;
1179        break;
1180    case AIOCB_WRITE_UDATA:
1181        if (create) {
1182            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1183        } else {
1184            hdr.opcode = SD_OP_WRITE_OBJ;
1185        }
1186        wlen = datalen;
1187        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1188        break;
1189    case AIOCB_DISCARD_OBJ:
1190        hdr.opcode = SD_OP_WRITE_OBJ;
1191        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1192        s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1193        offset = offsetof(SheepdogInode,
1194                          data_vdi_id[data_oid_to_idx(oid)]);
1195        oid = vid_to_vdi_oid(s->inode.vdi_id);
1196        wlen = datalen = sizeof(uint32_t);
1197        break;
1198    }
1199
1200    if (s->cache_flags) {
1201        hdr.flags |= s->cache_flags;
1202    }
1203
1204    hdr.oid = oid;
1205    hdr.cow_oid = old_oid;
1206    hdr.copies = s->inode.nr_copies;
1207
1208    hdr.data_length = datalen;
1209    hdr.offset = offset;
1210
1211    hdr.id = aio_req->id;
1212
1213    qemu_co_mutex_lock(&s->lock);
1214    s->co_send = qemu_coroutine_self();
1215    aio_set_fd_handler(s->aio_context, s->fd, false,
1216                       co_read_response, co_write_request, s);
1217    socket_set_cork(s->fd, 1);
1218
1219    /* send a header */
1220    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1221    if (ret != sizeof(hdr)) {
1222        error_report("failed to send a req, %s", strerror(errno));
1223        goto out;
1224    }
1225
1226    if (wlen) {
1227        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1228        if (ret != wlen) {
1229            error_report("failed to send a data, %s", strerror(errno));
1230        }
1231    }
1232out:
1233    socket_set_cork(s->fd, 0);
1234    aio_set_fd_handler(s->aio_context, s->fd, false,
1235                       co_read_response, NULL, s);
1236    s->co_send = NULL;
1237    qemu_co_mutex_unlock(&s->lock);
1238}
1239
1240static int read_write_object(int fd, AioContext *aio_context, char *buf,
1241                             uint64_t oid, uint8_t copies,
1242                             unsigned int datalen, uint64_t offset,
1243                             bool write, bool create, uint32_t cache_flags)
1244{
1245    SheepdogObjReq hdr;
1246    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1247    unsigned int wlen, rlen;
1248    int ret;
1249
1250    memset(&hdr, 0, sizeof(hdr));
1251
1252    if (write) {
1253        wlen = datalen;
1254        rlen = 0;
1255        hdr.flags = SD_FLAG_CMD_WRITE;
1256        if (create) {
1257            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1258        } else {
1259            hdr.opcode = SD_OP_WRITE_OBJ;
1260        }
1261    } else {
1262        wlen = 0;
1263        rlen = datalen;
1264        hdr.opcode = SD_OP_READ_OBJ;
1265    }
1266
1267    hdr.flags |= cache_flags;
1268
1269    hdr.oid = oid;
1270    hdr.data_length = datalen;
1271    hdr.offset = offset;
1272    hdr.copies = copies;
1273
1274    ret = do_req(fd, aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1275    if (ret) {
1276        error_report("failed to send a request to the sheep");
1277        return ret;
1278    }
1279
1280    switch (rsp->result) {
1281    case SD_RES_SUCCESS:
1282        return 0;
1283    default:
1284        error_report("%s", sd_strerror(rsp->result));
1285        return -EIO;
1286    }
1287}
1288
1289static int read_object(int fd, AioContext *aio_context, char *buf,
1290                       uint64_t oid, uint8_t copies,
1291                       unsigned int datalen, uint64_t offset,
1292                       uint32_t cache_flags)
1293{
1294    return read_write_object(fd, aio_context, buf, oid, copies,
1295                             datalen, offset, false,
1296                             false, cache_flags);
1297}
1298
1299static int write_object(int fd, AioContext *aio_context, char *buf,
1300                        uint64_t oid, uint8_t copies,
1301                        unsigned int datalen, uint64_t offset, bool create,
1302                        uint32_t cache_flags)
1303{
1304    return read_write_object(fd, aio_context, buf, oid, copies,
1305                             datalen, offset, true,
1306                             create, cache_flags);
1307}
1308
1309/* update inode with the latest state */
1310static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1311{
1312    Error *local_err = NULL;
1313    SheepdogInode *inode;
1314    int ret = 0, fd;
1315    uint32_t vid = 0;
1316
1317    fd = connect_to_sdog(s, &local_err);
1318    if (fd < 0) {
1319        error_report_err(local_err);
1320        return -EIO;
1321    }
1322
1323    inode = g_malloc(SD_INODE_HEADER_SIZE);
1324
1325    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1326    if (ret) {
1327        error_report_err(local_err);
1328        goto out;
1329    }
1330
1331    ret = read_object(fd, s->aio_context, (char *)inode, vid_to_vdi_oid(vid),
1332                      s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1333                      s->cache_flags);
1334    if (ret < 0) {
1335        goto out;
1336    }
1337
1338    if (inode->vdi_id != s->inode.vdi_id) {
1339        memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1340    }
1341
1342out:
1343    g_free(inode);
1344    closesocket(fd);
1345
1346    return ret;
1347}
1348
1349static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1350{
1351    SheepdogAIOCB *acb = aio_req->aiocb;
1352
1353    aio_req->create = false;
1354
1355    /* check whether this request becomes a CoW one */
1356    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1357        int idx = data_oid_to_idx(aio_req->oid);
1358
1359        if (is_data_obj_writable(&s->inode, idx)) {
1360            goto out;
1361        }
1362
1363        if (s->inode.data_vdi_id[idx]) {
1364            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1365            aio_req->flags |= SD_FLAG_CMD_COW;
1366        }
1367        aio_req->create = true;
1368    }
1369out:
1370    if (is_data_obj(aio_req->oid)) {
1371        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1372                        acb->aiocb_type);
1373    } else {
1374        struct iovec iov;
1375        iov.iov_base = &s->inode;
1376        iov.iov_len = sizeof(s->inode);
1377        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1378    }
1379}
1380
1381static void sd_detach_aio_context(BlockDriverState *bs)
1382{
1383    BDRVSheepdogState *s = bs->opaque;
1384
1385    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1386                       NULL, NULL);
1387}
1388
1389static void sd_attach_aio_context(BlockDriverState *bs,
1390                                  AioContext *new_context)
1391{
1392    BDRVSheepdogState *s = bs->opaque;
1393
1394    s->aio_context = new_context;
1395    aio_set_fd_handler(new_context, s->fd, false,
1396                       co_read_response, NULL, s);
1397}
1398
1399/* TODO Convert to fine grained options */
1400static QemuOptsList runtime_opts = {
1401    .name = "sheepdog",
1402    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1403    .desc = {
1404        {
1405            .name = "filename",
1406            .type = QEMU_OPT_STRING,
1407            .help = "URL to the sheepdog image",
1408        },
1409        { /* end of list */ }
1410    },
1411};
1412
1413static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1414                   Error **errp)
1415{
1416    int ret, fd;
1417    uint32_t vid = 0;
1418    BDRVSheepdogState *s = bs->opaque;
1419    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1420    uint32_t snapid;
1421    char *buf = NULL;
1422    QemuOpts *opts;
1423    Error *local_err = NULL;
1424    const char *filename;
1425
1426    s->bs = bs;
1427    s->aio_context = bdrv_get_aio_context(bs);
1428
1429    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1430    qemu_opts_absorb_qdict(opts, options, &local_err);
1431    if (local_err) {
1432        error_propagate(errp, local_err);
1433        ret = -EINVAL;
1434        goto out;
1435    }
1436
1437    filename = qemu_opt_get(opts, "filename");
1438
1439    QLIST_INIT(&s->inflight_aio_head);
1440    QLIST_INIT(&s->failed_aio_head);
1441    QLIST_INIT(&s->inflight_aiocb_head);
1442    s->fd = -1;
1443
1444    memset(vdi, 0, sizeof(vdi));
1445    memset(tag, 0, sizeof(tag));
1446
1447    if (strstr(filename, "://")) {
1448        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1449    } else {
1450        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1451    }
1452    if (ret < 0) {
1453        error_setg(errp, "Can't parse filename");
1454        goto out;
1455    }
1456    s->fd = get_sheep_fd(s, errp);
1457    if (s->fd < 0) {
1458        ret = s->fd;
1459        goto out;
1460    }
1461
1462    ret = find_vdi_name(s, vdi, snapid, tag, &vid, true, errp);
1463    if (ret) {
1464        goto out;
1465    }
1466
1467    /*
1468     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1469     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1470     */
1471    s->cache_flags = SD_FLAG_CMD_CACHE;
1472    if (flags & BDRV_O_NOCACHE) {
1473        s->cache_flags = SD_FLAG_CMD_DIRECT;
1474    }
1475    s->discard_supported = true;
1476
1477    if (snapid || tag[0] != '\0') {
1478        DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1479        s->is_snapshot = true;
1480    }
1481
1482    fd = connect_to_sdog(s, errp);
1483    if (fd < 0) {
1484        ret = fd;
1485        goto out;
1486    }
1487
1488    buf = g_malloc(SD_INODE_SIZE);
1489    ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
1490                      0, SD_INODE_SIZE, 0, s->cache_flags);
1491
1492    closesocket(fd);
1493
1494    if (ret) {
1495        error_setg(errp, "Can't read snapshot inode");
1496        goto out;
1497    }
1498
1499    memcpy(&s->inode, buf, sizeof(s->inode));
1500
1501    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1502    pstrcpy(s->name, sizeof(s->name), vdi);
1503    qemu_co_mutex_init(&s->lock);
1504    qemu_co_queue_init(&s->overlapping_queue);
1505    qemu_opts_del(opts);
1506    g_free(buf);
1507    return 0;
1508out:
1509    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1510                       false, NULL, NULL, NULL);
1511    if (s->fd >= 0) {
1512        closesocket(s->fd);
1513    }
1514    qemu_opts_del(opts);
1515    g_free(buf);
1516    return ret;
1517}
1518
1519static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1520                             Error **errp)
1521{
1522    BDRVSheepdogState *s = state->bs->opaque;
1523    BDRVSheepdogReopenState *re_s;
1524    int ret = 0;
1525
1526    re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1527
1528    re_s->cache_flags = SD_FLAG_CMD_CACHE;
1529    if (state->flags & BDRV_O_NOCACHE) {
1530        re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1531    }
1532
1533    re_s->fd = get_sheep_fd(s, errp);
1534    if (re_s->fd < 0) {
1535        ret = re_s->fd;
1536        return ret;
1537    }
1538
1539    return ret;
1540}
1541
1542static void sd_reopen_commit(BDRVReopenState *state)
1543{
1544    BDRVSheepdogReopenState *re_s = state->opaque;
1545    BDRVSheepdogState *s = state->bs->opaque;
1546
1547    if (s->fd) {
1548        aio_set_fd_handler(s->aio_context, s->fd, false,
1549                           NULL, NULL, NULL);
1550        closesocket(s->fd);
1551    }
1552
1553    s->fd = re_s->fd;
1554    s->cache_flags = re_s->cache_flags;
1555
1556    g_free(state->opaque);
1557    state->opaque = NULL;
1558
1559    return;
1560}
1561
1562static void sd_reopen_abort(BDRVReopenState *state)
1563{
1564    BDRVSheepdogReopenState *re_s = state->opaque;
1565    BDRVSheepdogState *s = state->bs->opaque;
1566
1567    if (re_s == NULL) {
1568        return;
1569    }
1570
1571    if (re_s->fd) {
1572        aio_set_fd_handler(s->aio_context, re_s->fd, false,
1573                           NULL, NULL, NULL);
1574        closesocket(re_s->fd);
1575    }
1576
1577    g_free(state->opaque);
1578    state->opaque = NULL;
1579
1580    return;
1581}
1582
1583static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1584                        Error **errp)
1585{
1586    SheepdogVdiReq hdr;
1587    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1588    int fd, ret;
1589    unsigned int wlen, rlen = 0;
1590    char buf[SD_MAX_VDI_LEN];
1591
1592    fd = connect_to_sdog(s, errp);
1593    if (fd < 0) {
1594        return fd;
1595    }
1596
1597    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1598     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1599     */
1600    memset(buf, 0, sizeof(buf));
1601    pstrcpy(buf, sizeof(buf), s->name);
1602
1603    memset(&hdr, 0, sizeof(hdr));
1604    hdr.opcode = SD_OP_NEW_VDI;
1605    hdr.base_vdi_id = s->inode.vdi_id;
1606
1607    wlen = SD_MAX_VDI_LEN;
1608
1609    hdr.flags = SD_FLAG_CMD_WRITE;
1610    hdr.snapid = snapshot;
1611
1612    hdr.data_length = wlen;
1613    hdr.vdi_size = s->inode.vdi_size;
1614    hdr.copy_policy = s->inode.copy_policy;
1615    hdr.copies = s->inode.nr_copies;
1616    hdr.block_size_shift = s->inode.block_size_shift;
1617
1618    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1619
1620    closesocket(fd);
1621
1622    if (ret) {
1623        error_setg_errno(errp, -ret, "create failed");
1624        return ret;
1625    }
1626
1627    if (rsp->result != SD_RES_SUCCESS) {
1628        error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1629        return -EIO;
1630    }
1631
1632    if (vdi_id) {
1633        *vdi_id = rsp->vdi_id;
1634    }
1635
1636    return 0;
1637}
1638
1639static int sd_prealloc(const char *filename, Error **errp)
1640{
1641    BlockBackend *blk = NULL;
1642    BDRVSheepdogState *base = NULL;
1643    unsigned long buf_size;
1644    uint32_t idx, max_idx;
1645    uint32_t object_size;
1646    int64_t vdi_size;
1647    void *buf = NULL;
1648    int ret;
1649
1650    blk = blk_new_open(filename, NULL, NULL,
1651                       BDRV_O_RDWR | BDRV_O_PROTOCOL, errp);
1652    if (blk == NULL) {
1653        ret = -EIO;
1654        goto out_with_err_set;
1655    }
1656
1657    blk_set_allow_write_beyond_eof(blk, true);
1658
1659    vdi_size = blk_getlength(blk);
1660    if (vdi_size < 0) {
1661        ret = vdi_size;
1662        goto out;
1663    }
1664
1665    base = blk_bs(blk)->opaque;
1666    object_size = (UINT32_C(1) << base->inode.block_size_shift);
1667    buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1668    buf = g_malloc0(buf_size);
1669
1670    max_idx = DIV_ROUND_UP(vdi_size, buf_size);
1671
1672    for (idx = 0; idx < max_idx; idx++) {
1673        /*
1674         * The created image can be a cloned image, so we need to read
1675         * a data from the source image.
1676         */
1677        ret = blk_pread(blk, idx * buf_size, buf, buf_size);
1678        if (ret < 0) {
1679            goto out;
1680        }
1681        ret = blk_pwrite(blk, idx * buf_size, buf, buf_size);
1682        if (ret < 0) {
1683            goto out;
1684        }
1685    }
1686
1687    ret = 0;
1688out:
1689    if (ret < 0) {
1690        error_setg_errno(errp, -ret, "Can't pre-allocate");
1691    }
1692out_with_err_set:
1693    if (blk) {
1694        blk_unref(blk);
1695    }
1696    g_free(buf);
1697
1698    return ret;
1699}
1700
1701/*
1702 * Sheepdog support two kinds of redundancy, full replication and erasure
1703 * coding.
1704 *
1705 * # create a fully replicated vdi with x copies
1706 * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1707 *
1708 * # create a erasure coded vdi with x data strips and y parity strips
1709 * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1710 */
1711static int parse_redundancy(BDRVSheepdogState *s, const char *opt)
1712{
1713    struct SheepdogInode *inode = &s->inode;
1714    const char *n1, *n2;
1715    long copy, parity;
1716    char p[10];
1717
1718    pstrcpy(p, sizeof(p), opt);
1719    n1 = strtok(p, ":");
1720    n2 = strtok(NULL, ":");
1721
1722    if (!n1) {
1723        return -EINVAL;
1724    }
1725
1726    copy = strtol(n1, NULL, 10);
1727    if (copy > SD_MAX_COPIES || copy < 1) {
1728        return -EINVAL;
1729    }
1730    if (!n2) {
1731        inode->copy_policy = 0;
1732        inode->nr_copies = copy;
1733        return 0;
1734    }
1735
1736    if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1737        return -EINVAL;
1738    }
1739
1740    parity = strtol(n2, NULL, 10);
1741    if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1742        return -EINVAL;
1743    }
1744
1745    /*
1746     * 4 bits for parity and 4 bits for data.
1747     * We have to compress upper data bits because it can't represent 16
1748     */
1749    inode->copy_policy = ((copy / 2) << 4) + parity;
1750    inode->nr_copies = copy + parity;
1751
1752    return 0;
1753}
1754
1755static int parse_block_size_shift(BDRVSheepdogState *s, QemuOpts *opt)
1756{
1757    struct SheepdogInode *inode = &s->inode;
1758    uint64_t object_size;
1759    int obj_order;
1760
1761    object_size = qemu_opt_get_size_del(opt, BLOCK_OPT_OBJECT_SIZE, 0);
1762    if (object_size) {
1763        if ((object_size - 1) & object_size) {    /* not a power of 2? */
1764            return -EINVAL;
1765        }
1766        obj_order = ctz32(object_size);
1767        if (obj_order < 20 || obj_order > 31) {
1768            return -EINVAL;
1769        }
1770        inode->block_size_shift = (uint8_t)obj_order;
1771    }
1772
1773    return 0;
1774}
1775
1776static int sd_create(const char *filename, QemuOpts *opts,
1777                     Error **errp)
1778{
1779    int ret = 0;
1780    uint32_t vid = 0;
1781    char *backing_file = NULL;
1782    char *buf = NULL;
1783    BDRVSheepdogState *s;
1784    char tag[SD_MAX_VDI_TAG_LEN];
1785    uint32_t snapid;
1786    uint64_t max_vdi_size;
1787    bool prealloc = false;
1788
1789    s = g_new0(BDRVSheepdogState, 1);
1790
1791    memset(tag, 0, sizeof(tag));
1792    if (strstr(filename, "://")) {
1793        ret = sd_parse_uri(s, filename, s->name, &snapid, tag);
1794    } else {
1795        ret = parse_vdiname(s, filename, s->name, &snapid, tag);
1796    }
1797    if (ret < 0) {
1798        error_setg(errp, "Can't parse filename");
1799        goto out;
1800    }
1801
1802    s->inode.vdi_size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
1803                                 BDRV_SECTOR_SIZE);
1804    backing_file = qemu_opt_get_del(opts, BLOCK_OPT_BACKING_FILE);
1805    buf = qemu_opt_get_del(opts, BLOCK_OPT_PREALLOC);
1806    if (!buf || !strcmp(buf, "off")) {
1807        prealloc = false;
1808    } else if (!strcmp(buf, "full")) {
1809        prealloc = true;
1810    } else {
1811        error_setg(errp, "Invalid preallocation mode: '%s'", buf);
1812        ret = -EINVAL;
1813        goto out;
1814    }
1815
1816    g_free(buf);
1817    buf = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
1818    if (buf) {
1819        ret = parse_redundancy(s, buf);
1820        if (ret < 0) {
1821            error_setg(errp, "Invalid redundancy mode: '%s'", buf);
1822            goto out;
1823        }
1824    }
1825    ret = parse_block_size_shift(s, opts);
1826    if (ret < 0) {
1827        error_setg(errp, "Invalid object_size."
1828                         " obect_size needs to be power of 2"
1829                         " and be limited from 2^20 to 2^31");
1830        goto out;
1831    }
1832
1833    if (backing_file) {
1834        BlockBackend *blk;
1835        BDRVSheepdogState *base;
1836        BlockDriver *drv;
1837
1838        /* Currently, only Sheepdog backing image is supported. */
1839        drv = bdrv_find_protocol(backing_file, true, NULL);
1840        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1841            error_setg(errp, "backing_file must be a sheepdog image");
1842            ret = -EINVAL;
1843            goto out;
1844        }
1845
1846        blk = blk_new_open(backing_file, NULL, NULL,
1847                           BDRV_O_PROTOCOL, errp);
1848        if (blk == NULL) {
1849            ret = -EIO;
1850            goto out;
1851        }
1852
1853        base = blk_bs(blk)->opaque;
1854
1855        if (!is_snapshot(&base->inode)) {
1856            error_setg(errp, "cannot clone from a non snapshot vdi");
1857            blk_unref(blk);
1858            ret = -EINVAL;
1859            goto out;
1860        }
1861        s->inode.vdi_id = base->inode.vdi_id;
1862        blk_unref(blk);
1863    }
1864
1865    s->aio_context = qemu_get_aio_context();
1866
1867    /* if block_size_shift is not specified, get cluster default value */
1868    if (s->inode.block_size_shift == 0) {
1869        SheepdogVdiReq hdr;
1870        SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
1871        Error *local_err = NULL;
1872        int fd;
1873        unsigned int wlen = 0, rlen = 0;
1874
1875        fd = connect_to_sdog(s, &local_err);
1876        if (fd < 0) {
1877            error_report_err(local_err);
1878            ret = -EIO;
1879            goto out;
1880        }
1881
1882        memset(&hdr, 0, sizeof(hdr));
1883        hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
1884        hdr.proto_ver = SD_PROTO_VER;
1885
1886        ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1887                     NULL, &wlen, &rlen);
1888        closesocket(fd);
1889        if (ret) {
1890            error_setg_errno(errp, -ret, "failed to get cluster default");
1891            goto out;
1892        }
1893        if (rsp->result == SD_RES_SUCCESS) {
1894            s->inode.block_size_shift = rsp->block_size_shift;
1895        } else {
1896            s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
1897        }
1898    }
1899
1900    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1901
1902    if (s->inode.vdi_size > max_vdi_size) {
1903        error_setg(errp, "An image is too large."
1904                         " The maximum image size is %"PRIu64 "GB",
1905                         max_vdi_size / 1024 / 1024 / 1024);
1906        ret = -EINVAL;
1907        goto out;
1908    }
1909
1910    ret = do_sd_create(s, &vid, 0, errp);
1911    if (ret) {
1912        goto out;
1913    }
1914
1915    if (prealloc) {
1916        ret = sd_prealloc(filename, errp);
1917    }
1918out:
1919    g_free(backing_file);
1920    g_free(buf);
1921    g_free(s);
1922    return ret;
1923}
1924
1925static void sd_close(BlockDriverState *bs)
1926{
1927    Error *local_err = NULL;
1928    BDRVSheepdogState *s = bs->opaque;
1929    SheepdogVdiReq hdr;
1930    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1931    unsigned int wlen, rlen = 0;
1932    int fd, ret;
1933
1934    DPRINTF("%s\n", s->name);
1935
1936    fd = connect_to_sdog(s, &local_err);
1937    if (fd < 0) {
1938        error_report_err(local_err);
1939        return;
1940    }
1941
1942    memset(&hdr, 0, sizeof(hdr));
1943
1944    hdr.opcode = SD_OP_RELEASE_VDI;
1945    hdr.type = LOCK_TYPE_NORMAL;
1946    hdr.base_vdi_id = s->inode.vdi_id;
1947    wlen = strlen(s->name) + 1;
1948    hdr.data_length = wlen;
1949    hdr.flags = SD_FLAG_CMD_WRITE;
1950
1951    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1952                 s->name, &wlen, &rlen);
1953
1954    closesocket(fd);
1955
1956    if (!ret && rsp->result != SD_RES_SUCCESS &&
1957        rsp->result != SD_RES_VDI_NOT_LOCKED) {
1958        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1959    }
1960
1961    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1962                       false, NULL, NULL, NULL);
1963    closesocket(s->fd);
1964    g_free(s->host_spec);
1965}
1966
1967static int64_t sd_getlength(BlockDriverState *bs)
1968{
1969    BDRVSheepdogState *s = bs->opaque;
1970
1971    return s->inode.vdi_size;
1972}
1973
1974static int sd_truncate(BlockDriverState *bs, int64_t offset)
1975{
1976    Error *local_err = NULL;
1977    BDRVSheepdogState *s = bs->opaque;
1978    int ret, fd;
1979    unsigned int datalen;
1980    uint64_t max_vdi_size;
1981
1982    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1983    if (offset < s->inode.vdi_size) {
1984        error_report("shrinking is not supported");
1985        return -EINVAL;
1986    } else if (offset > max_vdi_size) {
1987        error_report("too big image size");
1988        return -EINVAL;
1989    }
1990
1991    fd = connect_to_sdog(s, &local_err);
1992    if (fd < 0) {
1993        error_report_err(local_err);
1994        return fd;
1995    }
1996
1997    /* we don't need to update entire object */
1998    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1999    s->inode.vdi_size = offset;
2000    ret = write_object(fd, s->aio_context, (char *)&s->inode,
2001                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2002                       datalen, 0, false, s->cache_flags);
2003    close(fd);
2004
2005    if (ret < 0) {
2006        error_report("failed to update an inode.");
2007    }
2008
2009    return ret;
2010}
2011
2012/*
2013 * This function is called after writing data objects.  If we need to
2014 * update metadata, this sends a write request to the vdi object.
2015 * Otherwise, this switches back to sd_co_readv/writev.
2016 */
2017static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2018{
2019    BDRVSheepdogState *s = acb->common.bs->opaque;
2020    struct iovec iov;
2021    AIOReq *aio_req;
2022    uint32_t offset, data_len, mn, mx;
2023
2024    mn = acb->min_dirty_data_idx;
2025    mx = acb->max_dirty_data_idx;
2026    if (mn <= mx) {
2027        /* we need to update the vdi object. */
2028        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2029            mn * sizeof(s->inode.data_vdi_id[0]);
2030        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2031
2032        acb->min_dirty_data_idx = UINT32_MAX;
2033        acb->max_dirty_data_idx = 0;
2034
2035        iov.iov_base = &s->inode;
2036        iov.iov_len = sizeof(s->inode);
2037        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2038                                data_len, offset, 0, false, 0, offset);
2039        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2040        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2041
2042        acb->aio_done_func = sd_finish_aiocb;
2043        acb->aiocb_type = AIOCB_WRITE_UDATA;
2044        return;
2045    }
2046
2047    sd_finish_aiocb(acb);
2048}
2049
2050/* Delete current working VDI on the snapshot chain */
2051static bool sd_delete(BDRVSheepdogState *s)
2052{
2053    Error *local_err = NULL;
2054    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2055    SheepdogVdiReq hdr = {
2056        .opcode = SD_OP_DEL_VDI,
2057        .base_vdi_id = s->inode.vdi_id,
2058        .data_length = wlen,
2059        .flags = SD_FLAG_CMD_WRITE,
2060    };
2061    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2062    int fd, ret;
2063
2064    fd = connect_to_sdog(s, &local_err);
2065    if (fd < 0) {
2066        error_report_err(local_err);
2067        return false;
2068    }
2069
2070    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
2071                 s->name, &wlen, &rlen);
2072    closesocket(fd);
2073    if (ret) {
2074        return false;
2075    }
2076    switch (rsp->result) {
2077    case SD_RES_NO_VDI:
2078        error_report("%s was already deleted", s->name);
2079        /* fall through */
2080    case SD_RES_SUCCESS:
2081        break;
2082    default:
2083        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2084        return false;
2085    }
2086
2087    return true;
2088}
2089
2090/*
2091 * Create a writable VDI from a snapshot
2092 */
2093static int sd_create_branch(BDRVSheepdogState *s)
2094{
2095    Error *local_err = NULL;
2096    int ret, fd;
2097    uint32_t vid;
2098    char *buf;
2099    bool deleted;
2100
2101    DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
2102
2103    buf = g_malloc(SD_INODE_SIZE);
2104
2105    /*
2106     * Even If deletion fails, we will just create extra snapshot based on
2107     * the working VDI which was supposed to be deleted. So no need to
2108     * false bail out.
2109     */
2110    deleted = sd_delete(s);
2111    ret = do_sd_create(s, &vid, !deleted, &local_err);
2112    if (ret) {
2113        error_report_err(local_err);
2114        goto out;
2115    }
2116
2117    DPRINTF("%" PRIx32 " is created.\n", vid);
2118
2119    fd = connect_to_sdog(s, &local_err);
2120    if (fd < 0) {
2121        error_report_err(local_err);
2122        ret = fd;
2123        goto out;
2124    }
2125
2126    ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
2127                      s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2128
2129    closesocket(fd);
2130
2131    if (ret < 0) {
2132        goto out;
2133    }
2134
2135    memcpy(&s->inode, buf, sizeof(s->inode));
2136
2137    s->is_snapshot = false;
2138    ret = 0;
2139    DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
2140
2141out:
2142    g_free(buf);
2143
2144    return ret;
2145}
2146
2147/*
2148 * Send I/O requests to the server.
2149 *
2150 * This function sends requests to the server, links the requests to
2151 * the inflight_list in BDRVSheepdogState, and exits without
2152 * waiting the response.  The responses are received in the
2153 * `aio_read_response' function which is called from the main loop as
2154 * a fd handler.
2155 *
2156 * Returns 1 when we need to wait a response, 0 when there is no sent
2157 * request and -errno in error cases.
2158 */
2159static int coroutine_fn sd_co_rw_vector(void *p)
2160{
2161    SheepdogAIOCB *acb = p;
2162    int ret = 0;
2163    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2164    unsigned long idx;
2165    uint32_t object_size;
2166    uint64_t oid;
2167    uint64_t offset;
2168    BDRVSheepdogState *s = acb->common.bs->opaque;
2169    SheepdogInode *inode = &s->inode;
2170    AIOReq *aio_req;
2171
2172    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2173        /*
2174         * In the case we open the snapshot VDI, Sheepdog creates the
2175         * writable VDI when we do a write operation first.
2176         */
2177        ret = sd_create_branch(s);
2178        if (ret) {
2179            acb->ret = -EIO;
2180            goto out;
2181        }
2182    }
2183
2184    object_size = (UINT32_C(1) << inode->block_size_shift);
2185    idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2186    offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2187
2188    /*
2189     * Make sure we don't free the aiocb before we are done with all requests.
2190     * This additional reference is dropped at the end of this function.
2191     */
2192    acb->nr_pending++;
2193
2194    while (done != total) {
2195        uint8_t flags = 0;
2196        uint64_t old_oid = 0;
2197        bool create = false;
2198
2199        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2200
2201        len = MIN(total - done, object_size - offset);
2202
2203        switch (acb->aiocb_type) {
2204        case AIOCB_READ_UDATA:
2205            if (!inode->data_vdi_id[idx]) {
2206                qemu_iovec_memset(acb->qiov, done, 0, len);
2207                goto done;
2208            }
2209            break;
2210        case AIOCB_WRITE_UDATA:
2211            if (!inode->data_vdi_id[idx]) {
2212                create = true;
2213            } else if (!is_data_obj_writable(inode, idx)) {
2214                /* Copy-On-Write */
2215                create = true;
2216                old_oid = oid;
2217                flags = SD_FLAG_CMD_COW;
2218            }
2219            break;
2220        case AIOCB_DISCARD_OBJ:
2221            /*
2222             * We discard the object only when the whole object is
2223             * 1) allocated 2) trimmed. Otherwise, simply skip it.
2224             */
2225            if (len != object_size || inode->data_vdi_id[idx] == 0) {
2226                goto done;
2227            }
2228            break;
2229        default:
2230            break;
2231        }
2232
2233        if (create) {
2234            DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
2235                    inode->vdi_id, oid,
2236                    vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
2237            oid = vid_to_data_oid(inode->vdi_id, idx);
2238            DPRINTF("new oid %" PRIx64 "\n", oid);
2239        }
2240
2241        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2242                                old_oid,
2243                                acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2244                                0 : done);
2245        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2246
2247        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2248                        acb->aiocb_type);
2249    done:
2250        offset = 0;
2251        idx++;
2252        done += len;
2253    }
2254out:
2255    if (!--acb->nr_pending) {
2256        return acb->ret;
2257    }
2258    return 1;
2259}
2260
2261static bool check_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
2262{
2263    SheepdogAIOCB *cb;
2264
2265    QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
2266        if (AIOCBOverlapping(aiocb, cb)) {
2267            return true;
2268        }
2269    }
2270
2271    QLIST_INSERT_HEAD(&s->inflight_aiocb_head, aiocb, aiocb_siblings);
2272    return false;
2273}
2274
2275static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2276                        int nb_sectors, QEMUIOVector *qiov)
2277{
2278    SheepdogAIOCB *acb;
2279    int ret;
2280    int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2281    BDRVSheepdogState *s = bs->opaque;
2282
2283    if (offset > s->inode.vdi_size) {
2284        ret = sd_truncate(bs, offset);
2285        if (ret < 0) {
2286            return ret;
2287        }
2288    }
2289
2290    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2291    acb->aio_done_func = sd_write_done;
2292    acb->aiocb_type = AIOCB_WRITE_UDATA;
2293
2294retry:
2295    if (check_overlapping_aiocb(s, acb)) {
2296        qemu_co_queue_wait(&s->overlapping_queue);
2297        goto retry;
2298    }
2299
2300    ret = sd_co_rw_vector(acb);
2301    if (ret <= 0) {
2302        QLIST_REMOVE(acb, aiocb_siblings);
2303        qemu_co_queue_restart_all(&s->overlapping_queue);
2304        qemu_aio_unref(acb);
2305        return ret;
2306    }
2307
2308    qemu_coroutine_yield();
2309
2310    QLIST_REMOVE(acb, aiocb_siblings);
2311    qemu_co_queue_restart_all(&s->overlapping_queue);
2312
2313    return acb->ret;
2314}
2315
2316static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2317                       int nb_sectors, QEMUIOVector *qiov)
2318{
2319    SheepdogAIOCB *acb;
2320    int ret;
2321    BDRVSheepdogState *s = bs->opaque;
2322
2323    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2324    acb->aiocb_type = AIOCB_READ_UDATA;
2325    acb->aio_done_func = sd_finish_aiocb;
2326
2327retry:
2328    if (check_overlapping_aiocb(s, acb)) {
2329        qemu_co_queue_wait(&s->overlapping_queue);
2330        goto retry;
2331    }
2332
2333    ret = sd_co_rw_vector(acb);
2334    if (ret <= 0) {
2335        QLIST_REMOVE(acb, aiocb_siblings);
2336        qemu_co_queue_restart_all(&s->overlapping_queue);
2337        qemu_aio_unref(acb);
2338        return ret;
2339    }
2340
2341    qemu_coroutine_yield();
2342
2343    QLIST_REMOVE(acb, aiocb_siblings);
2344    qemu_co_queue_restart_all(&s->overlapping_queue);
2345    return acb->ret;
2346}
2347
2348static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2349{
2350    BDRVSheepdogState *s = bs->opaque;
2351    SheepdogAIOCB *acb;
2352    AIOReq *aio_req;
2353
2354    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2355        return 0;
2356    }
2357
2358    acb = sd_aio_setup(bs, NULL, 0, 0);
2359    acb->aiocb_type = AIOCB_FLUSH_CACHE;
2360    acb->aio_done_func = sd_finish_aiocb;
2361
2362    aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2363                            0, 0, 0, false, 0, 0);
2364    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2365    add_aio_request(s, aio_req, NULL, 0, acb->aiocb_type);
2366
2367    qemu_coroutine_yield();
2368    return acb->ret;
2369}
2370
2371static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2372{
2373    Error *local_err = NULL;
2374    BDRVSheepdogState *s = bs->opaque;
2375    int ret, fd;
2376    uint32_t new_vid;
2377    SheepdogInode *inode;
2378    unsigned int datalen;
2379
2380    DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
2381            "is_snapshot %d\n", sn_info->name, sn_info->id_str,
2382            s->name, sn_info->vm_state_size, s->is_snapshot);
2383
2384    if (s->is_snapshot) {
2385        error_report("You can't create a snapshot of a snapshot VDI, "
2386                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2387
2388        return -EINVAL;
2389    }
2390
2391    DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
2392
2393    s->inode.vm_state_size = sn_info->vm_state_size;
2394    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2395    /* It appears that inode.tag does not require a NUL terminator,
2396     * which means this use of strncpy is ok.
2397     */
2398    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2399    /* we don't need to update entire object */
2400    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2401    inode = g_malloc(datalen);
2402
2403    /* refresh inode. */
2404    fd = connect_to_sdog(s, &local_err);
2405    if (fd < 0) {
2406        error_report_err(local_err);
2407        ret = fd;
2408        goto cleanup;
2409    }
2410
2411    ret = write_object(fd, s->aio_context, (char *)&s->inode,
2412                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2413                       datalen, 0, false, s->cache_flags);
2414    if (ret < 0) {
2415        error_report("failed to write snapshot's inode.");
2416        goto cleanup;
2417    }
2418
2419    ret = do_sd_create(s, &new_vid, 1, &local_err);
2420    if (ret < 0) {
2421        error_reportf_err(local_err,
2422                          "failed to create inode for snapshot: ");
2423        goto cleanup;
2424    }
2425
2426    ret = read_object(fd, s->aio_context, (char *)inode,
2427                      vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2428                      s->cache_flags);
2429
2430    if (ret < 0) {
2431        error_report("failed to read new inode info. %s", strerror(errno));
2432        goto cleanup;
2433    }
2434
2435    memcpy(&s->inode, inode, datalen);
2436    DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2437            s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2438
2439cleanup:
2440    g_free(inode);
2441    closesocket(fd);
2442    return ret;
2443}
2444
2445/*
2446 * We implement rollback(loadvm) operation to the specified snapshot by
2447 * 1) switch to the snapshot
2448 * 2) rely on sd_create_branch to delete working VDI and
2449 * 3) create a new working VDI based on the specified snapshot
2450 */
2451static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2452{
2453    BDRVSheepdogState *s = bs->opaque;
2454    BDRVSheepdogState *old_s;
2455    char tag[SD_MAX_VDI_TAG_LEN];
2456    uint32_t snapid = 0;
2457    int ret = 0;
2458
2459    old_s = g_new(BDRVSheepdogState, 1);
2460
2461    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2462
2463    snapid = strtoul(snapshot_id, NULL, 10);
2464    if (snapid) {
2465        tag[0] = 0;
2466    } else {
2467        pstrcpy(tag, sizeof(tag), snapshot_id);
2468    }
2469
2470    ret = reload_inode(s, snapid, tag);
2471    if (ret) {
2472        goto out;
2473    }
2474
2475    ret = sd_create_branch(s);
2476    if (ret) {
2477        goto out;
2478    }
2479
2480    g_free(old_s);
2481
2482    return 0;
2483out:
2484    /* recover bdrv_sd_state */
2485    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2486    g_free(old_s);
2487
2488    error_report("failed to open. recover old bdrv_sd_state.");
2489
2490    return ret;
2491}
2492
2493#define NR_BATCHED_DISCARD 128
2494
2495static bool remove_objects(BDRVSheepdogState *s)
2496{
2497    int fd, i = 0, nr_objs = 0;
2498    Error *local_err = NULL;
2499    int ret = 0;
2500    bool result = true;
2501    SheepdogInode *inode = &s->inode;
2502
2503    fd = connect_to_sdog(s, &local_err);
2504    if (fd < 0) {
2505        error_report_err(local_err);
2506        return false;
2507    }
2508
2509    nr_objs = count_data_objs(inode);
2510    while (i < nr_objs) {
2511        int start_idx, nr_filled_idx;
2512
2513        while (i < nr_objs && !inode->data_vdi_id[i]) {
2514            i++;
2515        }
2516        start_idx = i;
2517
2518        nr_filled_idx = 0;
2519        while (i < nr_objs && nr_filled_idx < NR_BATCHED_DISCARD) {
2520            if (inode->data_vdi_id[i]) {
2521                inode->data_vdi_id[i] = 0;
2522                nr_filled_idx++;
2523            }
2524
2525            i++;
2526        }
2527
2528        ret = write_object(fd, s->aio_context,
2529                           (char *)&inode->data_vdi_id[start_idx],
2530                           vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
2531                           (i - start_idx) * sizeof(uint32_t),
2532                           offsetof(struct SheepdogInode,
2533                                    data_vdi_id[start_idx]),
2534                           false, s->cache_flags);
2535        if (ret < 0) {
2536            error_report("failed to discard snapshot inode.");
2537            result = false;
2538            goto out;
2539        }
2540    }
2541
2542out:
2543    closesocket(fd);
2544    return result;
2545}
2546
2547static int sd_snapshot_delete(BlockDriverState *bs,
2548                              const char *snapshot_id,
2549                              const char *name,
2550                              Error **errp)
2551{
2552    unsigned long snap_id = 0;
2553    char snap_tag[SD_MAX_VDI_TAG_LEN];
2554    Error *local_err = NULL;
2555    int fd, ret;
2556    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
2557    BDRVSheepdogState *s = bs->opaque;
2558    unsigned int wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, rlen = 0;
2559    uint32_t vid;
2560    SheepdogVdiReq hdr = {
2561        .opcode = SD_OP_DEL_VDI,
2562        .data_length = wlen,
2563        .flags = SD_FLAG_CMD_WRITE,
2564    };
2565    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2566
2567    if (!remove_objects(s)) {
2568        return -1;
2569    }
2570
2571    memset(buf, 0, sizeof(buf));
2572    memset(snap_tag, 0, sizeof(snap_tag));
2573    pstrcpy(buf, SD_MAX_VDI_LEN, s->name);
2574    ret = qemu_strtoul(snapshot_id, NULL, 10, &snap_id);
2575    if (ret || snap_id > UINT32_MAX) {
2576        error_setg(errp, "Invalid snapshot ID: %s",
2577                         snapshot_id ? snapshot_id : "<null>");
2578        return -EINVAL;
2579    }
2580
2581    if (snap_id) {
2582        hdr.snapid = (uint32_t) snap_id;
2583    } else {
2584        pstrcpy(snap_tag, sizeof(snap_tag), snapshot_id);
2585        pstrcpy(buf + SD_MAX_VDI_LEN, SD_MAX_VDI_TAG_LEN, snap_tag);
2586    }
2587
2588    ret = find_vdi_name(s, s->name, snap_id, snap_tag, &vid, true,
2589                        &local_err);
2590    if (ret) {
2591        return ret;
2592    }
2593
2594    fd = connect_to_sdog(s, &local_err);
2595    if (fd < 0) {
2596        error_report_err(local_err);
2597        return -1;
2598    }
2599
2600    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
2601                 buf, &wlen, &rlen);
2602    closesocket(fd);
2603    if (ret) {
2604        return ret;
2605    }
2606
2607    switch (rsp->result) {
2608    case SD_RES_NO_VDI:
2609        error_report("%s was already deleted", s->name);
2610    case SD_RES_SUCCESS:
2611        break;
2612    default:
2613        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2614        return -1;
2615    }
2616
2617    return ret;
2618}
2619
2620static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2621{
2622    Error *local_err = NULL;
2623    BDRVSheepdogState *s = bs->opaque;
2624    SheepdogReq req;
2625    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2626    QEMUSnapshotInfo *sn_tab = NULL;
2627    unsigned wlen, rlen;
2628    int found = 0;
2629    static SheepdogInode inode;
2630    unsigned long *vdi_inuse;
2631    unsigned int start_nr;
2632    uint64_t hval;
2633    uint32_t vid;
2634
2635    vdi_inuse = g_malloc(max);
2636
2637    fd = connect_to_sdog(s, &local_err);
2638    if (fd < 0) {
2639        error_report_err(local_err);
2640        ret = fd;
2641        goto out;
2642    }
2643
2644    rlen = max;
2645    wlen = 0;
2646
2647    memset(&req, 0, sizeof(req));
2648
2649    req.opcode = SD_OP_READ_VDIS;
2650    req.data_length = max;
2651
2652    ret = do_req(fd, s->aio_context, (SheepdogReq *)&req,
2653                 vdi_inuse, &wlen, &rlen);
2654
2655    closesocket(fd);
2656    if (ret) {
2657        goto out;
2658    }
2659
2660    sn_tab = g_new0(QEMUSnapshotInfo, nr);
2661
2662    /* calculate a vdi id with hash function */
2663    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2664    start_nr = hval & (SD_NR_VDIS - 1);
2665
2666    fd = connect_to_sdog(s, &local_err);
2667    if (fd < 0) {
2668        error_report_err(local_err);
2669        ret = fd;
2670        goto out;
2671    }
2672
2673    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2674        if (!test_bit(vid, vdi_inuse)) {
2675            break;
2676        }
2677
2678        /* we don't need to read entire object */
2679        ret = read_object(fd, s->aio_context, (char *)&inode,
2680                          vid_to_vdi_oid(vid),
2681                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2682                          s->cache_flags);
2683
2684        if (ret) {
2685            continue;
2686        }
2687
2688        if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2689            sn_tab[found].date_sec = inode.snap_ctime >> 32;
2690            sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2691            sn_tab[found].vm_state_size = inode.vm_state_size;
2692            sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2693
2694            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2695                     "%" PRIu32, inode.snap_id);
2696            pstrcpy(sn_tab[found].name,
2697                    MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2698                    inode.tag);
2699            found++;
2700        }
2701    }
2702
2703    closesocket(fd);
2704out:
2705    *psn_tab = sn_tab;
2706
2707    g_free(vdi_inuse);
2708
2709    if (ret < 0) {
2710        return ret;
2711    }
2712
2713    return found;
2714}
2715
2716static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2717                                int64_t pos, int size, int load)
2718{
2719    Error *local_err = NULL;
2720    bool create;
2721    int fd, ret = 0, remaining = size;
2722    unsigned int data_len;
2723    uint64_t vmstate_oid;
2724    uint64_t offset;
2725    uint32_t vdi_index;
2726    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2727    uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
2728
2729    fd = connect_to_sdog(s, &local_err);
2730    if (fd < 0) {
2731        error_report_err(local_err);
2732        return fd;
2733    }
2734
2735    while (remaining) {
2736        vdi_index = pos / object_size;
2737        offset = pos % object_size;
2738
2739        data_len = MIN(remaining, object_size - offset);
2740
2741        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2742
2743        create = (offset == 0);
2744        if (load) {
2745            ret = read_object(fd, s->aio_context, (char *)data, vmstate_oid,
2746                              s->inode.nr_copies, data_len, offset,
2747                              s->cache_flags);
2748        } else {
2749            ret = write_object(fd, s->aio_context, (char *)data, vmstate_oid,
2750                               s->inode.nr_copies, data_len, offset, create,
2751                               s->cache_flags);
2752        }
2753
2754        if (ret < 0) {
2755            error_report("failed to save vmstate %s", strerror(errno));
2756            goto cleanup;
2757        }
2758
2759        pos += data_len;
2760        data += data_len;
2761        remaining -= data_len;
2762    }
2763    ret = size;
2764cleanup:
2765    closesocket(fd);
2766    return ret;
2767}
2768
2769static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2770                           int64_t pos)
2771{
2772    BDRVSheepdogState *s = bs->opaque;
2773    void *buf;
2774    int ret;
2775
2776    buf = qemu_blockalign(bs, qiov->size);
2777    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2778    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2779    qemu_vfree(buf);
2780
2781    return ret;
2782}
2783
2784static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
2785                           int64_t pos, int size)
2786{
2787    BDRVSheepdogState *s = bs->opaque;
2788
2789    return do_load_save_vmstate(s, data, pos, size, 1);
2790}
2791
2792
2793static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
2794                                      int nb_sectors)
2795{
2796    SheepdogAIOCB *acb;
2797    BDRVSheepdogState *s = bs->opaque;
2798    int ret;
2799    QEMUIOVector discard_iov;
2800    struct iovec iov;
2801    uint32_t zero = 0;
2802
2803    if (!s->discard_supported) {
2804            return 0;
2805    }
2806
2807    memset(&discard_iov, 0, sizeof(discard_iov));
2808    memset(&iov, 0, sizeof(iov));
2809    iov.iov_base = &zero;
2810    iov.iov_len = sizeof(zero);
2811    discard_iov.iov = &iov;
2812    discard_iov.niov = 1;
2813    acb = sd_aio_setup(bs, &discard_iov, sector_num, nb_sectors);
2814    acb->aiocb_type = AIOCB_DISCARD_OBJ;
2815    acb->aio_done_func = sd_finish_aiocb;
2816
2817retry:
2818    if (check_overlapping_aiocb(s, acb)) {
2819        qemu_co_queue_wait(&s->overlapping_queue);
2820        goto retry;
2821    }
2822
2823    ret = sd_co_rw_vector(acb);
2824    if (ret <= 0) {
2825        QLIST_REMOVE(acb, aiocb_siblings);
2826        qemu_co_queue_restart_all(&s->overlapping_queue);
2827        qemu_aio_unref(acb);
2828        return ret;
2829    }
2830
2831    qemu_coroutine_yield();
2832
2833    QLIST_REMOVE(acb, aiocb_siblings);
2834    qemu_co_queue_restart_all(&s->overlapping_queue);
2835
2836    return acb->ret;
2837}
2838
2839static coroutine_fn int64_t
2840sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2841                       int *pnum, BlockDriverState **file)
2842{
2843    BDRVSheepdogState *s = bs->opaque;
2844    SheepdogInode *inode = &s->inode;
2845    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2846    uint64_t offset = sector_num * BDRV_SECTOR_SIZE;
2847    unsigned long start = offset / object_size,
2848                  end = DIV_ROUND_UP((sector_num + nb_sectors) *
2849                                     BDRV_SECTOR_SIZE, object_size);
2850    unsigned long idx;
2851    int64_t ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID | offset;
2852
2853    for (idx = start; idx < end; idx++) {
2854        if (inode->data_vdi_id[idx] == 0) {
2855            break;
2856        }
2857    }
2858    if (idx == start) {
2859        /* Get the longest length of unallocated sectors */
2860        ret = 0;
2861        for (idx = start + 1; idx < end; idx++) {
2862            if (inode->data_vdi_id[idx] != 0) {
2863                break;
2864            }
2865        }
2866    }
2867
2868    *pnum = (idx - start) * object_size / BDRV_SECTOR_SIZE;
2869    if (*pnum > nb_sectors) {
2870        *pnum = nb_sectors;
2871    }
2872    if (ret > 0 && ret & BDRV_BLOCK_OFFSET_VALID) {
2873        *file = bs;
2874    }
2875    return ret;
2876}
2877
2878static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
2879{
2880    BDRVSheepdogState *s = bs->opaque;
2881    SheepdogInode *inode = &s->inode;
2882    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2883    unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
2884    uint64_t size = 0;
2885
2886    for (i = 0; i < last; i++) {
2887        if (inode->data_vdi_id[i] == 0) {
2888            continue;
2889        }
2890        size += object_size;
2891    }
2892    return size;
2893}
2894
2895static QemuOptsList sd_create_opts = {
2896    .name = "sheepdog-create-opts",
2897    .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
2898    .desc = {
2899        {
2900            .name = BLOCK_OPT_SIZE,
2901            .type = QEMU_OPT_SIZE,
2902            .help = "Virtual disk size"
2903        },
2904        {
2905            .name = BLOCK_OPT_BACKING_FILE,
2906            .type = QEMU_OPT_STRING,
2907            .help = "File name of a base image"
2908        },
2909        {
2910            .name = BLOCK_OPT_PREALLOC,
2911            .type = QEMU_OPT_STRING,
2912            .help = "Preallocation mode (allowed values: off, full)"
2913        },
2914        {
2915            .name = BLOCK_OPT_REDUNDANCY,
2916            .type = QEMU_OPT_STRING,
2917            .help = "Redundancy of the image"
2918        },
2919        {
2920            .name = BLOCK_OPT_OBJECT_SIZE,
2921            .type = QEMU_OPT_SIZE,
2922            .help = "Object size of the image"
2923        },
2924        { /* end of list */ }
2925    }
2926};
2927
2928static BlockDriver bdrv_sheepdog = {
2929    .format_name    = "sheepdog",
2930    .protocol_name  = "sheepdog",
2931    .instance_size  = sizeof(BDRVSheepdogState),
2932    .bdrv_needs_filename = true,
2933    .bdrv_file_open = sd_open,
2934    .bdrv_reopen_prepare    = sd_reopen_prepare,
2935    .bdrv_reopen_commit     = sd_reopen_commit,
2936    .bdrv_reopen_abort      = sd_reopen_abort,
2937    .bdrv_close     = sd_close,
2938    .bdrv_create    = sd_create,
2939    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2940    .bdrv_getlength = sd_getlength,
2941    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2942    .bdrv_truncate  = sd_truncate,
2943
2944    .bdrv_co_readv  = sd_co_readv,
2945    .bdrv_co_writev = sd_co_writev,
2946    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2947    .bdrv_co_discard = sd_co_discard,
2948    .bdrv_co_get_block_status = sd_co_get_block_status,
2949
2950    .bdrv_snapshot_create   = sd_snapshot_create,
2951    .bdrv_snapshot_goto     = sd_snapshot_goto,
2952    .bdrv_snapshot_delete   = sd_snapshot_delete,
2953    .bdrv_snapshot_list     = sd_snapshot_list,
2954
2955    .bdrv_save_vmstate  = sd_save_vmstate,
2956    .bdrv_load_vmstate  = sd_load_vmstate,
2957
2958    .bdrv_detach_aio_context = sd_detach_aio_context,
2959    .bdrv_attach_aio_context = sd_attach_aio_context,
2960
2961    .create_opts    = &sd_create_opts,
2962};
2963
2964static BlockDriver bdrv_sheepdog_tcp = {
2965    .format_name    = "sheepdog",
2966    .protocol_name  = "sheepdog+tcp",
2967    .instance_size  = sizeof(BDRVSheepdogState),
2968    .bdrv_needs_filename = true,
2969    .bdrv_file_open = sd_open,
2970    .bdrv_reopen_prepare    = sd_reopen_prepare,
2971    .bdrv_reopen_commit     = sd_reopen_commit,
2972    .bdrv_reopen_abort      = sd_reopen_abort,
2973    .bdrv_close     = sd_close,
2974    .bdrv_create    = sd_create,
2975    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2976    .bdrv_getlength = sd_getlength,
2977    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2978    .bdrv_truncate  = sd_truncate,
2979
2980    .bdrv_co_readv  = sd_co_readv,
2981    .bdrv_co_writev = sd_co_writev,
2982    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2983    .bdrv_co_discard = sd_co_discard,
2984    .bdrv_co_get_block_status = sd_co_get_block_status,
2985
2986    .bdrv_snapshot_create   = sd_snapshot_create,
2987    .bdrv_snapshot_goto     = sd_snapshot_goto,
2988    .bdrv_snapshot_delete   = sd_snapshot_delete,
2989    .bdrv_snapshot_list     = sd_snapshot_list,
2990
2991    .bdrv_save_vmstate  = sd_save_vmstate,
2992    .bdrv_load_vmstate  = sd_load_vmstate,
2993
2994    .bdrv_detach_aio_context = sd_detach_aio_context,
2995    .bdrv_attach_aio_context = sd_attach_aio_context,
2996
2997    .create_opts    = &sd_create_opts,
2998};
2999
3000static BlockDriver bdrv_sheepdog_unix = {
3001    .format_name    = "sheepdog",
3002    .protocol_name  = "sheepdog+unix",
3003    .instance_size  = sizeof(BDRVSheepdogState),
3004    .bdrv_needs_filename = true,
3005    .bdrv_file_open = sd_open,
3006    .bdrv_reopen_prepare    = sd_reopen_prepare,
3007    .bdrv_reopen_commit     = sd_reopen_commit,
3008    .bdrv_reopen_abort      = sd_reopen_abort,
3009    .bdrv_close     = sd_close,
3010    .bdrv_create    = sd_create,
3011    .bdrv_has_zero_init = bdrv_has_zero_init_1,
3012    .bdrv_getlength = sd_getlength,
3013    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3014    .bdrv_truncate  = sd_truncate,
3015
3016    .bdrv_co_readv  = sd_co_readv,
3017    .bdrv_co_writev = sd_co_writev,
3018    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
3019    .bdrv_co_discard = sd_co_discard,
3020    .bdrv_co_get_block_status = sd_co_get_block_status,
3021
3022    .bdrv_snapshot_create   = sd_snapshot_create,
3023    .bdrv_snapshot_goto     = sd_snapshot_goto,
3024    .bdrv_snapshot_delete   = sd_snapshot_delete,
3025    .bdrv_snapshot_list     = sd_snapshot_list,
3026
3027    .bdrv_save_vmstate  = sd_save_vmstate,
3028    .bdrv_load_vmstate  = sd_load_vmstate,
3029
3030    .bdrv_detach_aio_context = sd_detach_aio_context,
3031    .bdrv_attach_aio_context = sd_attach_aio_context,
3032
3033    .create_opts    = &sd_create_opts,
3034};
3035
3036static void bdrv_sheepdog_init(void)
3037{
3038    bdrv_register(&bdrv_sheepdog);
3039    bdrv_register(&bdrv_sheepdog_tcp);
3040    bdrv_register(&bdrv_sheepdog_unix);
3041}
3042block_init(bdrv_sheepdog_init);
3043