qemu/block/archipelago.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for Archipelago
   3 *
   4 * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
   5 *
   6 * This work is licensed under the terms of the GNU GPL, version 2 or later.
   7 * See the COPYING file in the top-level directory.
   8 *
   9 */
  10
  11/*
  12 * VM Image on Archipelago volume is specified like this:
  13 *
  14 * file.driver=archipelago,file.volume=<volumename>
  15 * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
  16 * [,file.segment=<segment_name>]]
  17 *
  18 * or
  19 *
  20 * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
  21 * segment=<segment_name>]]
  22 *
  23 * 'archipelago' is the protocol.
  24 *
  25 * 'mport' is the port number on which mapperd is listening. This is optional
  26 * and if not specified, QEMU will make Archipelago to use the default port.
  27 *
  28 * 'vport' is the port number on which vlmcd is listening. This is optional
  29 * and if not specified, QEMU will make Archipelago to use the default port.
  30 *
  31 * 'segment' is the name of the shared memory segment Archipelago stack
  32 * is using. This is optional and if not specified, QEMU will make Archipelago
  33 * to use the default value, 'archipelago'.
  34 *
  35 * Examples:
  36 *
  37 * file.driver=archipelago,file.volume=my_vm_volume
  38 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
  39 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
  40 *  file.vport=1234
  41 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
  42 *  file.vport=1234,file.segment=my_segment
  43 *
  44 * or
  45 *
  46 * file=archipelago:my_vm_volume
  47 * file=archipelago:my_vm_volume/mport=123
  48 * file=archipelago:my_vm_volume/mport=123:vport=1234
  49 * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
  50 *
  51 */
  52
  53#include "qemu/osdep.h"
  54#include "qemu/cutils.h"
  55#include "block/block_int.h"
  56#include "qemu/error-report.h"
  57#include "qemu/thread.h"
  58#include "qapi/qmp/qint.h"
  59#include "qapi/qmp/qstring.h"
  60#include "qapi/qmp/qjson.h"
  61#include "qemu/atomic.h"
  62
  63#include <xseg/xseg.h>
  64#include <xseg/protocol.h>
  65
  66#define MAX_REQUEST_SIZE    524288
  67
  68#define ARCHIPELAGO_OPT_VOLUME      "volume"
  69#define ARCHIPELAGO_OPT_SEGMENT     "segment"
  70#define ARCHIPELAGO_OPT_MPORT       "mport"
  71#define ARCHIPELAGO_OPT_VPORT       "vport"
  72#define ARCHIPELAGO_DFL_MPORT       1001
  73#define ARCHIPELAGO_DFL_VPORT       501
  74
  75#define archipelagolog(fmt, ...) \
  76    do {                         \
  77        fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
  78    } while (0)
  79
  80typedef enum {
  81    ARCHIP_OP_READ,
  82    ARCHIP_OP_WRITE,
  83    ARCHIP_OP_FLUSH,
  84    ARCHIP_OP_VOLINFO,
  85    ARCHIP_OP_TRUNCATE,
  86} ARCHIPCmd;
  87
  88typedef struct ArchipelagoAIOCB {
  89    BlockAIOCB common;
  90    struct BDRVArchipelagoState *s;
  91    QEMUIOVector *qiov;
  92    ARCHIPCmd cmd;
  93    int status;
  94    int64_t size;
  95    int64_t ret;
  96} ArchipelagoAIOCB;
  97
  98typedef struct BDRVArchipelagoState {
  99    ArchipelagoAIOCB *event_acb;
 100    char *volname;
 101    char *segment_name;
 102    uint64_t size;
 103    /* Archipelago specific */
 104    struct xseg *xseg;
 105    struct xseg_port *port;
 106    xport srcport;
 107    xport sport;
 108    xport mportno;
 109    xport vportno;
 110    QemuMutex archip_mutex;
 111    QemuCond archip_cond;
 112    bool is_signaled;
 113    /* Request handler specific */
 114    QemuThread request_th;
 115    QemuCond request_cond;
 116    QemuMutex request_mutex;
 117    bool th_is_signaled;
 118    bool stopping;
 119} BDRVArchipelagoState;
 120
 121typedef struct ArchipelagoSegmentedRequest {
 122    size_t count;
 123    size_t total;
 124    int ref;
 125    int failed;
 126} ArchipelagoSegmentedRequest;
 127
 128typedef struct AIORequestData {
 129    const char *volname;
 130    off_t offset;
 131    size_t size;
 132    uint64_t bufidx;
 133    int ret;
 134    int op;
 135    ArchipelagoAIOCB *aio_cb;
 136    ArchipelagoSegmentedRequest *segreq;
 137} AIORequestData;
 138
 139static void qemu_archipelago_complete_aio(void *opaque);
 140
 141static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
 142{
 143    if (xseg && (sport != srcport)) {
 144        xseg_init_local_signal(xseg, srcport);
 145        sport = srcport;
 146    }
 147}
 148
 149static void archipelago_finish_aiocb(AIORequestData *reqdata)
 150{
 151    if (reqdata->aio_cb->ret != reqdata->segreq->total) {
 152        reqdata->aio_cb->ret = -EIO;
 153    } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
 154        reqdata->aio_cb->ret = 0;
 155    }
 156    aio_bh_schedule_oneshot(
 157                        bdrv_get_aio_context(reqdata->aio_cb->common.bs),
 158                        qemu_archipelago_complete_aio, reqdata
 159                        );
 160}
 161
 162static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
 163                      struct xseg_request *expected_req)
 164{
 165    struct xseg_request *req;
 166    xseg_prepare_wait(xseg, srcport);
 167    void *psd = xseg_get_signal_desc(xseg, port);
 168    while (1) {
 169        req = xseg_receive(xseg, srcport, X_NONBLOCK);
 170        if (req) {
 171            if (req != expected_req) {
 172                archipelagolog("Unknown received request\n");
 173                xseg_put_request(xseg, req, srcport);
 174            } else if (!(req->state & XS_SERVED)) {
 175                return -1;
 176            } else {
 177                break;
 178            }
 179        }
 180        xseg_wait_signal(xseg, psd, 100000UL);
 181    }
 182    xseg_cancel_wait(xseg, srcport);
 183    return 0;
 184}
 185
 186static void xseg_request_handler(void *state)
 187{
 188    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
 189    void *psd = xseg_get_signal_desc(s->xseg, s->port);
 190    qemu_mutex_lock(&s->request_mutex);
 191
 192    while (!s->stopping) {
 193        struct xseg_request *req;
 194        void *data;
 195        xseg_prepare_wait(s->xseg, s->srcport);
 196        req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
 197        if (req) {
 198            AIORequestData *reqdata;
 199            ArchipelagoSegmentedRequest *segreq;
 200            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
 201
 202            switch (reqdata->op) {
 203            case ARCHIP_OP_READ:
 204                data = xseg_get_data(s->xseg, req);
 205                segreq = reqdata->segreq;
 206                segreq->count += req->serviced;
 207
 208                qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
 209                                    data,
 210                                    req->serviced);
 211
 212                xseg_put_request(s->xseg, req, s->srcport);
 213
 214                if (atomic_fetch_dec(&segreq->ref) == 1) {
 215                    if (!segreq->failed) {
 216                        reqdata->aio_cb->ret = segreq->count;
 217                        archipelago_finish_aiocb(reqdata);
 218                        g_free(segreq);
 219                    } else {
 220                        g_free(segreq);
 221                        g_free(reqdata);
 222                    }
 223                } else {
 224                    g_free(reqdata);
 225                }
 226                break;
 227            case ARCHIP_OP_WRITE:
 228            case ARCHIP_OP_FLUSH:
 229                segreq = reqdata->segreq;
 230                segreq->count += req->serviced;
 231                xseg_put_request(s->xseg, req, s->srcport);
 232
 233                if (atomic_fetch_dec(&segreq->ref) == 1) {
 234                    if (!segreq->failed) {
 235                        reqdata->aio_cb->ret = segreq->count;
 236                        archipelago_finish_aiocb(reqdata);
 237                        g_free(segreq);
 238                    } else {
 239                        g_free(segreq);
 240                        g_free(reqdata);
 241                    }
 242                } else {
 243                    g_free(reqdata);
 244                }
 245                break;
 246            case ARCHIP_OP_VOLINFO:
 247            case ARCHIP_OP_TRUNCATE:
 248                s->is_signaled = true;
 249                qemu_cond_signal(&s->archip_cond);
 250                break;
 251            }
 252        } else {
 253            xseg_wait_signal(s->xseg, psd, 100000UL);
 254        }
 255        xseg_cancel_wait(s->xseg, s->srcport);
 256    }
 257
 258    s->th_is_signaled = true;
 259    qemu_cond_signal(&s->request_cond);
 260    qemu_mutex_unlock(&s->request_mutex);
 261    qemu_thread_exit(NULL);
 262}
 263
 264static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
 265{
 266    if (xseg_initialize()) {
 267        archipelagolog("Cannot initialize XSEG\n");
 268        goto err_exit;
 269    }
 270
 271    s->xseg = xseg_join("posix", s->segment_name,
 272                        "posixfd", NULL);
 273    if (!s->xseg) {
 274        archipelagolog("Cannot join XSEG shared memory segment\n");
 275        goto err_exit;
 276    }
 277    s->port = xseg_bind_dynport(s->xseg);
 278    s->srcport = s->port->portno;
 279    init_local_signal(s->xseg, s->sport, s->srcport);
 280    return 0;
 281
 282err_exit:
 283    return -1;
 284}
 285
 286static int qemu_archipelago_init(BDRVArchipelagoState *s)
 287{
 288    int ret;
 289
 290    ret = qemu_archipelago_xseg_init(s);
 291    if (ret < 0) {
 292        error_report("Cannot initialize XSEG. Aborting...");
 293        goto err_exit;
 294    }
 295
 296    qemu_cond_init(&s->archip_cond);
 297    qemu_mutex_init(&s->archip_mutex);
 298    qemu_cond_init(&s->request_cond);
 299    qemu_mutex_init(&s->request_mutex);
 300    s->th_is_signaled = false;
 301    qemu_thread_create(&s->request_th, "xseg_io_th",
 302                       (void *) xseg_request_handler,
 303                       (void *) s, QEMU_THREAD_JOINABLE);
 304
 305err_exit:
 306    return ret;
 307}
 308
 309static void qemu_archipelago_complete_aio(void *opaque)
 310{
 311    AIORequestData *reqdata = (AIORequestData *) opaque;
 312    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
 313
 314    aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
 315    aio_cb->status = 0;
 316
 317    qemu_aio_unref(aio_cb);
 318    g_free(reqdata);
 319}
 320
 321static void xseg_find_port(char *pstr, const char *needle, xport *aport)
 322{
 323    const char *a;
 324    char *endptr = NULL;
 325    unsigned long port;
 326    if (strstart(pstr, needle, &a)) {
 327        if (strlen(a) > 0) {
 328            port = strtoul(a, &endptr, 10);
 329            if (strlen(endptr)) {
 330                *aport = -2;
 331                return;
 332            }
 333            *aport = (xport) port;
 334        }
 335    }
 336}
 337
 338static void xseg_find_segment(char *pstr, const char *needle,
 339                              char **segment_name)
 340{
 341    const char *a;
 342    if (strstart(pstr, needle, &a)) {
 343        if (strlen(a) > 0) {
 344            *segment_name = g_strdup(a);
 345        }
 346    }
 347}
 348
 349static void parse_filename_opts(const char *filename, Error **errp,
 350                                char **volume, char **segment_name,
 351                                xport *mport, xport *vport)
 352{
 353    const char *start;
 354    char *tokens[4], *ds;
 355    int idx;
 356    xport lmport = NoPort, lvport = NoPort;
 357
 358    strstart(filename, "archipelago:", &start);
 359
 360    ds = g_strdup(start);
 361    tokens[0] = strtok(ds, "/");
 362    tokens[1] = strtok(NULL, ":");
 363    tokens[2] = strtok(NULL, ":");
 364    tokens[3] = strtok(NULL, "\0");
 365
 366    if (!strlen(tokens[0])) {
 367        error_setg(errp, "volume name must be specified first");
 368        g_free(ds);
 369        return;
 370    }
 371
 372    for (idx = 1; idx < 4; idx++) {
 373        if (tokens[idx] != NULL) {
 374            if (strstart(tokens[idx], "mport=", NULL)) {
 375                xseg_find_port(tokens[idx], "mport=", &lmport);
 376            }
 377            if (strstart(tokens[idx], "vport=", NULL)) {
 378                xseg_find_port(tokens[idx], "vport=", &lvport);
 379            }
 380            if (strstart(tokens[idx], "segment=", NULL)) {
 381                xseg_find_segment(tokens[idx], "segment=", segment_name);
 382            }
 383        }
 384    }
 385
 386    if ((lmport == -2) || (lvport == -2)) {
 387        error_setg(errp, "mport and/or vport must be set");
 388        g_free(ds);
 389        return;
 390    }
 391    *volume = g_strdup(tokens[0]);
 392    *mport = lmport;
 393    *vport = lvport;
 394    g_free(ds);
 395}
 396
 397static void archipelago_parse_filename(const char *filename, QDict *options,
 398                                       Error **errp)
 399{
 400    const char *start;
 401    char *volume = NULL, *segment_name = NULL;
 402    xport mport = NoPort, vport = NoPort;
 403
 404    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
 405            || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
 406            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
 407            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
 408        error_setg(errp, "volume/mport/vport/segment and a file name may not"
 409                         " be specified at the same time");
 410        return;
 411    }
 412
 413    if (!strstart(filename, "archipelago:", &start)) {
 414        error_setg(errp, "File name must start with 'archipelago:'");
 415        return;
 416    }
 417
 418    if (!strlen(start) || strstart(start, "/", NULL)) {
 419        error_setg(errp, "volume name must be specified");
 420        return;
 421    }
 422
 423    parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
 424
 425    if (volume) {
 426        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
 427        g_free(volume);
 428    }
 429    if (segment_name) {
 430        qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
 431                  qstring_from_str(segment_name));
 432        g_free(segment_name);
 433    }
 434    if (mport != NoPort) {
 435        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
 436    }
 437    if (vport != NoPort) {
 438        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
 439    }
 440}
 441
 442static QemuOptsList archipelago_runtime_opts = {
 443    .name = "archipelago",
 444    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
 445    .desc = {
 446        {
 447            .name = ARCHIPELAGO_OPT_VOLUME,
 448            .type = QEMU_OPT_STRING,
 449            .help = "Name of the volume image",
 450        },
 451        {
 452            .name = ARCHIPELAGO_OPT_SEGMENT,
 453            .type = QEMU_OPT_STRING,
 454            .help = "Name of the Archipelago shared memory segment",
 455        },
 456        {
 457            .name = ARCHIPELAGO_OPT_MPORT,
 458            .type = QEMU_OPT_NUMBER,
 459            .help = "Archipelago mapperd port number"
 460        },
 461        {
 462            .name = ARCHIPELAGO_OPT_VPORT,
 463            .type = QEMU_OPT_NUMBER,
 464            .help = "Archipelago vlmcd port number"
 465
 466        },
 467        { /* end of list */ }
 468    },
 469};
 470
 471static int qemu_archipelago_open(BlockDriverState *bs,
 472                                 QDict *options,
 473                                 int bdrv_flags,
 474                                 Error **errp)
 475{
 476    int ret = 0;
 477    const char *volume, *segment_name;
 478    QemuOpts *opts;
 479    Error *local_err = NULL;
 480    BDRVArchipelagoState *s = bs->opaque;
 481
 482    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
 483    qemu_opts_absorb_qdict(opts, options, &local_err);
 484    if (local_err) {
 485        error_propagate(errp, local_err);
 486        ret = -EINVAL;
 487        goto err_exit;
 488    }
 489
 490    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
 491                                     ARCHIPELAGO_DFL_MPORT);
 492    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
 493                                     ARCHIPELAGO_DFL_VPORT);
 494
 495    segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
 496    if (segment_name == NULL) {
 497        s->segment_name = g_strdup("archipelago");
 498    } else {
 499        s->segment_name = g_strdup(segment_name);
 500    }
 501
 502    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
 503    if (volume == NULL) {
 504        error_setg(errp, "archipelago block driver requires the 'volume'"
 505                   " option");
 506        ret = -EINVAL;
 507        goto err_exit;
 508    }
 509    s->volname = g_strdup(volume);
 510
 511    /* Initialize XSEG, join shared memory segment */
 512    ret = qemu_archipelago_init(s);
 513    if (ret < 0) {
 514        error_setg(errp, "cannot initialize XSEG and join shared "
 515                   "memory segment");
 516        goto err_exit;
 517    }
 518
 519    qemu_opts_del(opts);
 520    return 0;
 521
 522err_exit:
 523    g_free(s->volname);
 524    g_free(s->segment_name);
 525    qemu_opts_del(opts);
 526    return ret;
 527}
 528
 529static void qemu_archipelago_close(BlockDriverState *bs)
 530{
 531    int r, targetlen;
 532    char *target;
 533    struct xseg_request *req;
 534    BDRVArchipelagoState *s = bs->opaque;
 535
 536    s->stopping = true;
 537
 538    qemu_mutex_lock(&s->request_mutex);
 539    while (!s->th_is_signaled) {
 540        qemu_cond_wait(&s->request_cond,
 541                       &s->request_mutex);
 542    }
 543    qemu_mutex_unlock(&s->request_mutex);
 544    qemu_thread_join(&s->request_th);
 545    qemu_cond_destroy(&s->request_cond);
 546    qemu_mutex_destroy(&s->request_mutex);
 547
 548    qemu_cond_destroy(&s->archip_cond);
 549    qemu_mutex_destroy(&s->archip_mutex);
 550
 551    targetlen = strlen(s->volname);
 552    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
 553    if (!req) {
 554        archipelagolog("Cannot get XSEG request\n");
 555        goto err_exit;
 556    }
 557    r = xseg_prep_request(s->xseg, req, targetlen, 0);
 558    if (r < 0) {
 559        xseg_put_request(s->xseg, req, s->srcport);
 560        archipelagolog("Cannot prepare XSEG close request\n");
 561        goto err_exit;
 562    }
 563
 564    target = xseg_get_target(s->xseg, req);
 565    memcpy(target, s->volname, targetlen);
 566    req->size = req->datalen;
 567    req->offset = 0;
 568    req->op = X_CLOSE;
 569
 570    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
 571    if (p == NoPort) {
 572        xseg_put_request(s->xseg, req, s->srcport);
 573        archipelagolog("Cannot submit XSEG close request\n");
 574        goto err_exit;
 575    }
 576
 577    xseg_signal(s->xseg, p);
 578    wait_reply(s->xseg, s->srcport, s->port, req);
 579
 580    xseg_put_request(s->xseg, req, s->srcport);
 581
 582err_exit:
 583    g_free(s->volname);
 584    g_free(s->segment_name);
 585    xseg_quit_local_signal(s->xseg, s->srcport);
 586    xseg_leave_dynport(s->xseg, s->port);
 587    xseg_leave(s->xseg);
 588}
 589
 590static int qemu_archipelago_create_volume(Error **errp, const char *volname,
 591                                          char *segment_name,
 592                                          uint64_t size, xport mportno,
 593                                          xport vportno)
 594{
 595    int ret, targetlen;
 596    struct xseg *xseg = NULL;
 597    struct xseg_request *req;
 598    struct xseg_request_clone *xclone;
 599    struct xseg_port *port;
 600    xport srcport = NoPort, sport = NoPort;
 601    char *target;
 602
 603    /* Try default values if none has been set */
 604    if (mportno == (xport) -1) {
 605        mportno = ARCHIPELAGO_DFL_MPORT;
 606    }
 607
 608    if (vportno == (xport) -1) {
 609        vportno = ARCHIPELAGO_DFL_VPORT;
 610    }
 611
 612    if (xseg_initialize()) {
 613        error_setg(errp, "Cannot initialize XSEG");
 614        return -1;
 615    }
 616
 617    xseg = xseg_join("posix", segment_name,
 618                     "posixfd", NULL);
 619
 620    if (!xseg) {
 621        error_setg(errp, "Cannot join XSEG shared memory segment");
 622        return -1;
 623    }
 624
 625    port = xseg_bind_dynport(xseg);
 626    srcport = port->portno;
 627    init_local_signal(xseg, sport, srcport);
 628
 629    req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
 630    if (!req) {
 631        error_setg(errp, "Cannot get XSEG request");
 632        return -1;
 633    }
 634
 635    targetlen = strlen(volname);
 636    ret = xseg_prep_request(xseg, req, targetlen,
 637                            sizeof(struct xseg_request_clone));
 638    if (ret < 0) {
 639        error_setg(errp, "Cannot prepare XSEG request");
 640        goto err_exit;
 641    }
 642
 643    target = xseg_get_target(xseg, req);
 644    if (!target) {
 645        error_setg(errp, "Cannot get XSEG target.");
 646        goto err_exit;
 647    }
 648    memcpy(target, volname, targetlen);
 649    xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
 650    memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
 651    xclone->targetlen = 0;
 652    xclone->size = size;
 653    req->offset = 0;
 654    req->size = req->datalen;
 655    req->op = X_CLONE;
 656
 657    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
 658    if (p == NoPort) {
 659        error_setg(errp, "Could not submit XSEG request");
 660        goto err_exit;
 661    }
 662    xseg_signal(xseg, p);
 663
 664    ret = wait_reply(xseg, srcport, port, req);
 665    if (ret < 0) {
 666        error_setg(errp, "wait_reply() error.");
 667    }
 668
 669    xseg_put_request(xseg, req, srcport);
 670    xseg_quit_local_signal(xseg, srcport);
 671    xseg_leave_dynport(xseg, port);
 672    xseg_leave(xseg);
 673    return ret;
 674
 675err_exit:
 676    xseg_put_request(xseg, req, srcport);
 677    xseg_quit_local_signal(xseg, srcport);
 678    xseg_leave_dynport(xseg, port);
 679    xseg_leave(xseg);
 680    return -1;
 681}
 682
 683static int qemu_archipelago_create(const char *filename,
 684                                   QemuOpts *options,
 685                                   Error **errp)
 686{
 687    int ret = 0;
 688    uint64_t total_size = 0;
 689    char *volname = NULL, *segment_name = NULL;
 690    const char *start;
 691    xport mport = NoPort, vport = NoPort;
 692
 693    if (!strstart(filename, "archipelago:", &start)) {
 694        error_setg(errp, "File name must start with 'archipelago:'");
 695        return -1;
 696    }
 697
 698    if (!strlen(start) || strstart(start, "/", NULL)) {
 699        error_setg(errp, "volume name must be specified");
 700        return -1;
 701    }
 702
 703    parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
 704                        &vport);
 705    total_size = ROUND_UP(qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0),
 706                          BDRV_SECTOR_SIZE);
 707
 708    if (segment_name == NULL) {
 709        segment_name = g_strdup("archipelago");
 710    }
 711
 712    /* Create an Archipelago volume */
 713    ret = qemu_archipelago_create_volume(errp, volname, segment_name,
 714                                         total_size, mport,
 715                                         vport);
 716
 717    g_free(volname);
 718    g_free(segment_name);
 719    return ret;
 720}
 721
 722static const AIOCBInfo archipelago_aiocb_info = {
 723    .aiocb_size = sizeof(ArchipelagoAIOCB),
 724};
 725
 726static int archipelago_submit_request(BDRVArchipelagoState *s,
 727                                        uint64_t bufidx,
 728                                        size_t count,
 729                                        off_t offset,
 730                                        ArchipelagoAIOCB *aio_cb,
 731                                        ArchipelagoSegmentedRequest *segreq,
 732                                        int op)
 733{
 734    int ret, targetlen;
 735    char *target;
 736    void *data = NULL;
 737    struct xseg_request *req;
 738    AIORequestData *reqdata = g_new(AIORequestData, 1);
 739
 740    targetlen = strlen(s->volname);
 741    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
 742    if (!req) {
 743        archipelagolog("Cannot get XSEG request\n");
 744        goto err_exit2;
 745    }
 746    ret = xseg_prep_request(s->xseg, req, targetlen, count);
 747    if (ret < 0) {
 748        archipelagolog("Cannot prepare XSEG request\n");
 749        goto err_exit;
 750    }
 751    target = xseg_get_target(s->xseg, req);
 752    if (!target) {
 753        archipelagolog("Cannot get XSEG target\n");
 754        goto err_exit;
 755    }
 756    memcpy(target, s->volname, targetlen);
 757    req->size = count;
 758    req->offset = offset;
 759
 760    switch (op) {
 761    case ARCHIP_OP_READ:
 762        req->op = X_READ;
 763        break;
 764    case ARCHIP_OP_WRITE:
 765        req->op = X_WRITE;
 766        break;
 767    case ARCHIP_OP_FLUSH:
 768        req->op = X_FLUSH;
 769        break;
 770    }
 771    reqdata->volname = s->volname;
 772    reqdata->offset = offset;
 773    reqdata->size = count;
 774    reqdata->bufidx = bufidx;
 775    reqdata->aio_cb = aio_cb;
 776    reqdata->segreq = segreq;
 777    reqdata->op = op;
 778
 779    xseg_set_req_data(s->xseg, req, reqdata);
 780    if (op == ARCHIP_OP_WRITE) {
 781        data = xseg_get_data(s->xseg, req);
 782        if (!data) {
 783            archipelagolog("Cannot get XSEG data\n");
 784            goto err_exit;
 785        }
 786        qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
 787    }
 788
 789    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
 790    if (p == NoPort) {
 791        archipelagolog("Could not submit XSEG request\n");
 792        goto err_exit;
 793    }
 794    xseg_signal(s->xseg, p);
 795    return 0;
 796
 797err_exit:
 798    g_free(reqdata);
 799    xseg_put_request(s->xseg, req, s->srcport);
 800    return -EIO;
 801err_exit2:
 802    g_free(reqdata);
 803    return -EIO;
 804}
 805
 806static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
 807                                        size_t count,
 808                                        off_t offset,
 809                                        ArchipelagoAIOCB *aio_cb,
 810                                        int op)
 811{
 812    int ret, segments_nr;
 813    size_t pos = 0;
 814    ArchipelagoSegmentedRequest *segreq;
 815
 816    segreq = g_new0(ArchipelagoSegmentedRequest, 1);
 817
 818    if (op == ARCHIP_OP_FLUSH) {
 819        segments_nr = 1;
 820    } else {
 821        segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
 822                      ((count % MAX_REQUEST_SIZE) ? 1 : 0);
 823    }
 824    segreq->total = count;
 825    atomic_mb_set(&segreq->ref, segments_nr);
 826
 827    while (segments_nr > 1) {
 828        ret = archipelago_submit_request(s, pos,
 829                                            MAX_REQUEST_SIZE,
 830                                            offset + pos,
 831                                            aio_cb, segreq, op);
 832
 833        if (ret < 0) {
 834            goto err_exit;
 835        }
 836        count -= MAX_REQUEST_SIZE;
 837        pos += MAX_REQUEST_SIZE;
 838        segments_nr--;
 839    }
 840    ret = archipelago_submit_request(s, pos, count, offset + pos,
 841                                     aio_cb, segreq, op);
 842
 843    if (ret < 0) {
 844        goto err_exit;
 845    }
 846    return 0;
 847
 848err_exit:
 849    segreq->failed = 1;
 850    if (atomic_fetch_sub(&segreq->ref, segments_nr) == segments_nr) {
 851        g_free(segreq);
 852    }
 853    return ret;
 854}
 855
 856static BlockAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
 857                                           int64_t sector_num,
 858                                           QEMUIOVector *qiov,
 859                                           int nb_sectors,
 860                                           BlockCompletionFunc *cb,
 861                                           void *opaque,
 862                                           int op)
 863{
 864    ArchipelagoAIOCB *aio_cb;
 865    BDRVArchipelagoState *s = bs->opaque;
 866    int64_t size, off;
 867    int ret;
 868
 869    aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
 870    aio_cb->cmd = op;
 871    aio_cb->qiov = qiov;
 872
 873    aio_cb->ret = 0;
 874    aio_cb->s = s;
 875    aio_cb->status = -EINPROGRESS;
 876
 877    off = sector_num * BDRV_SECTOR_SIZE;
 878    size = nb_sectors * BDRV_SECTOR_SIZE;
 879    aio_cb->size = size;
 880
 881    ret = archipelago_aio_segmented_rw(s, size, off,
 882                                       aio_cb, op);
 883    if (ret < 0) {
 884        goto err_exit;
 885    }
 886    return &aio_cb->common;
 887
 888err_exit:
 889    error_report("qemu_archipelago_aio_rw(): I/O Error");
 890    qemu_aio_unref(aio_cb);
 891    return NULL;
 892}
 893
 894static BlockAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
 895        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 896        BlockCompletionFunc *cb, void *opaque)
 897{
 898    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
 899                                   opaque, ARCHIP_OP_READ);
 900}
 901
 902static BlockAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
 903        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 904        BlockCompletionFunc *cb, void *opaque)
 905{
 906    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
 907                                   opaque, ARCHIP_OP_WRITE);
 908}
 909
 910static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
 911{
 912    uint64_t size;
 913    int ret, targetlen;
 914    struct xseg_request *req;
 915    struct xseg_reply_info *xinfo;
 916    AIORequestData *reqdata = g_new(AIORequestData, 1);
 917
 918    const char *volname = s->volname;
 919    targetlen = strlen(volname);
 920    req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
 921    if (!req) {
 922        archipelagolog("Cannot get XSEG request\n");
 923        goto err_exit2;
 924    }
 925    ret = xseg_prep_request(s->xseg, req, targetlen,
 926                            sizeof(struct xseg_reply_info));
 927    if (ret < 0) {
 928        archipelagolog("Cannot prepare XSEG request\n");
 929        goto err_exit;
 930    }
 931    char *target = xseg_get_target(s->xseg, req);
 932    if (!target) {
 933        archipelagolog("Cannot get XSEG target\n");
 934        goto err_exit;
 935    }
 936    memcpy(target, volname, targetlen);
 937    req->size = req->datalen;
 938    req->offset = 0;
 939    req->op = X_INFO;
 940
 941    reqdata->op = ARCHIP_OP_VOLINFO;
 942    reqdata->volname = volname;
 943    xseg_set_req_data(s->xseg, req, reqdata);
 944
 945    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
 946    if (p == NoPort) {
 947        archipelagolog("Cannot submit XSEG request\n");
 948        goto err_exit;
 949    }
 950    xseg_signal(s->xseg, p);
 951    qemu_mutex_lock(&s->archip_mutex);
 952    while (!s->is_signaled) {
 953        qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
 954    }
 955    s->is_signaled = false;
 956    qemu_mutex_unlock(&s->archip_mutex);
 957
 958    xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
 959    size = xinfo->size;
 960    xseg_put_request(s->xseg, req, s->srcport);
 961    g_free(reqdata);
 962    s->size = size;
 963    return size;
 964
 965err_exit:
 966    xseg_put_request(s->xseg, req, s->srcport);
 967err_exit2:
 968    g_free(reqdata);
 969    return -EIO;
 970}
 971
 972static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
 973{
 974    BDRVArchipelagoState *s = bs->opaque;
 975
 976    return archipelago_volume_info(s);
 977}
 978
 979static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
 980{
 981    int ret, targetlen;
 982    struct xseg_request *req;
 983    BDRVArchipelagoState *s = bs->opaque;
 984    AIORequestData *reqdata = g_new(AIORequestData, 1);
 985
 986    const char *volname = s->volname;
 987    targetlen = strlen(volname);
 988    req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
 989    if (!req) {
 990        archipelagolog("Cannot get XSEG request\n");
 991        goto err_exit2;
 992    }
 993
 994    ret = xseg_prep_request(s->xseg, req, targetlen, 0);
 995    if (ret < 0) {
 996        archipelagolog("Cannot prepare XSEG request\n");
 997        goto err_exit;
 998    }
 999    char *target = xseg_get_target(s->xseg, req);
1000    if (!target) {
1001        archipelagolog("Cannot get XSEG target\n");
1002        goto err_exit;
1003    }
1004    memcpy(target, volname, targetlen);
1005    req->offset = offset;
1006    req->op = X_TRUNCATE;
1007
1008    reqdata->op = ARCHIP_OP_TRUNCATE;
1009    reqdata->volname = volname;
1010
1011    xseg_set_req_data(s->xseg, req, reqdata);
1012
1013    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
1014    if (p == NoPort) {
1015        archipelagolog("Cannot submit XSEG request\n");
1016        goto err_exit;
1017    }
1018
1019    xseg_signal(s->xseg, p);
1020    qemu_mutex_lock(&s->archip_mutex);
1021    while (!s->is_signaled) {
1022        qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1023    }
1024    s->is_signaled = false;
1025    qemu_mutex_unlock(&s->archip_mutex);
1026    xseg_put_request(s->xseg, req, s->srcport);
1027    g_free(reqdata);
1028    return 0;
1029
1030err_exit:
1031    xseg_put_request(s->xseg, req, s->srcport);
1032err_exit2:
1033    g_free(reqdata);
1034    return -EIO;
1035}
1036
1037static QemuOptsList qemu_archipelago_create_opts = {
1038    .name = "archipelago-create-opts",
1039    .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1040    .desc = {
1041        {
1042            .name = BLOCK_OPT_SIZE,
1043            .type = QEMU_OPT_SIZE,
1044            .help = "Virtual disk size"
1045        },
1046        { /* end of list */ }
1047    }
1048};
1049
1050static BlockAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1051        BlockCompletionFunc *cb, void *opaque)
1052{
1053    return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1054                                   ARCHIP_OP_FLUSH);
1055}
1056
1057static BlockDriver bdrv_archipelago = {
1058    .format_name         = "archipelago",
1059    .protocol_name       = "archipelago",
1060    .instance_size       = sizeof(BDRVArchipelagoState),
1061    .bdrv_parse_filename = archipelago_parse_filename,
1062    .bdrv_file_open      = qemu_archipelago_open,
1063    .bdrv_close          = qemu_archipelago_close,
1064    .bdrv_create         = qemu_archipelago_create,
1065    .bdrv_getlength      = qemu_archipelago_getlength,
1066    .bdrv_truncate       = qemu_archipelago_truncate,
1067    .bdrv_aio_readv      = qemu_archipelago_aio_readv,
1068    .bdrv_aio_writev     = qemu_archipelago_aio_writev,
1069    .bdrv_aio_flush      = qemu_archipelago_aio_flush,
1070    .bdrv_has_zero_init  = bdrv_has_zero_init_1,
1071    .create_opts         = &qemu_archipelago_create_opts,
1072};
1073
1074static void bdrv_archipelago_init(void)
1075{
1076    bdrv_register(&bdrv_archipelago);
1077}
1078
1079block_init(bdrv_archipelago_init);
1080