1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53#include "qemu/osdep.h"
54#include "qemu/cutils.h"
55#include "block/block_int.h"
56#include "qemu/error-report.h"
57#include "qemu/thread.h"
58#include "qapi/qmp/qint.h"
59#include "qapi/qmp/qstring.h"
60#include "qapi/qmp/qjson.h"
61#include "qemu/atomic.h"
62
63#include <xseg/xseg.h>
64#include <xseg/protocol.h>
65
66#define MAX_REQUEST_SIZE 524288
67
68#define ARCHIPELAGO_OPT_VOLUME "volume"
69#define ARCHIPELAGO_OPT_SEGMENT "segment"
70#define ARCHIPELAGO_OPT_MPORT "mport"
71#define ARCHIPELAGO_OPT_VPORT "vport"
72#define ARCHIPELAGO_DFL_MPORT 1001
73#define ARCHIPELAGO_DFL_VPORT 501
74
75#define archipelagolog(fmt, ...) \
76 do { \
77 fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
78 } while (0)
79
80typedef enum {
81 ARCHIP_OP_READ,
82 ARCHIP_OP_WRITE,
83 ARCHIP_OP_FLUSH,
84 ARCHIP_OP_VOLINFO,
85 ARCHIP_OP_TRUNCATE,
86} ARCHIPCmd;
87
88typedef struct ArchipelagoAIOCB {
89 BlockAIOCB common;
90 QEMUBH *bh;
91 struct BDRVArchipelagoState *s;
92 QEMUIOVector *qiov;
93 ARCHIPCmd cmd;
94 int status;
95 int64_t size;
96 int64_t ret;
97} ArchipelagoAIOCB;
98
99typedef struct BDRVArchipelagoState {
100 ArchipelagoAIOCB *event_acb;
101 char *volname;
102 char *segment_name;
103 uint64_t size;
104
105 struct xseg *xseg;
106 struct xseg_port *port;
107 xport srcport;
108 xport sport;
109 xport mportno;
110 xport vportno;
111 QemuMutex archip_mutex;
112 QemuCond archip_cond;
113 bool is_signaled;
114
115 QemuThread request_th;
116 QemuCond request_cond;
117 QemuMutex request_mutex;
118 bool th_is_signaled;
119 bool stopping;
120} BDRVArchipelagoState;
121
122typedef struct ArchipelagoSegmentedRequest {
123 size_t count;
124 size_t total;
125 int ref;
126 int failed;
127} ArchipelagoSegmentedRequest;
128
129typedef struct AIORequestData {
130 const char *volname;
131 off_t offset;
132 size_t size;
133 uint64_t bufidx;
134 int ret;
135 int op;
136 ArchipelagoAIOCB *aio_cb;
137 ArchipelagoSegmentedRequest *segreq;
138} AIORequestData;
139
140static void qemu_archipelago_complete_aio(void *opaque);
141
142static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
143{
144 if (xseg && (sport != srcport)) {
145 xseg_init_local_signal(xseg, srcport);
146 sport = srcport;
147 }
148}
149
150static void archipelago_finish_aiocb(AIORequestData *reqdata)
151{
152 if (reqdata->aio_cb->ret != reqdata->segreq->total) {
153 reqdata->aio_cb->ret = -EIO;
154 } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
155 reqdata->aio_cb->ret = 0;
156 }
157 reqdata->aio_cb->bh = aio_bh_new(
158 bdrv_get_aio_context(reqdata->aio_cb->common.bs),
159 qemu_archipelago_complete_aio, reqdata
160 );
161 qemu_bh_schedule(reqdata->aio_cb->bh);
162}
163
164static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
165 struct xseg_request *expected_req)
166{
167 struct xseg_request *req;
168 xseg_prepare_wait(xseg, srcport);
169 void *psd = xseg_get_signal_desc(xseg, port);
170 while (1) {
171 req = xseg_receive(xseg, srcport, X_NONBLOCK);
172 if (req) {
173 if (req != expected_req) {
174 archipelagolog("Unknown received request\n");
175 xseg_put_request(xseg, req, srcport);
176 } else if (!(req->state & XS_SERVED)) {
177 return -1;
178 } else {
179 break;
180 }
181 }
182 xseg_wait_signal(xseg, psd, 100000UL);
183 }
184 xseg_cancel_wait(xseg, srcport);
185 return 0;
186}
187
188static void xseg_request_handler(void *state)
189{
190 BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
191 void *psd = xseg_get_signal_desc(s->xseg, s->port);
192 qemu_mutex_lock(&s->request_mutex);
193
194 while (!s->stopping) {
195 struct xseg_request *req;
196 void *data;
197 xseg_prepare_wait(s->xseg, s->srcport);
198 req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
199 if (req) {
200 AIORequestData *reqdata;
201 ArchipelagoSegmentedRequest *segreq;
202 xseg_get_req_data(s->xseg, req, (void **)&reqdata);
203
204 switch (reqdata->op) {
205 case ARCHIP_OP_READ:
206 data = xseg_get_data(s->xseg, req);
207 segreq = reqdata->segreq;
208 segreq->count += req->serviced;
209
210 qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
211 data,
212 req->serviced);
213
214 xseg_put_request(s->xseg, req, s->srcport);
215
216 if (atomic_fetch_dec(&segreq->ref) == 1) {
217 if (!segreq->failed) {
218 reqdata->aio_cb->ret = segreq->count;
219 archipelago_finish_aiocb(reqdata);
220 g_free(segreq);
221 } else {
222 g_free(segreq);
223 g_free(reqdata);
224 }
225 } else {
226 g_free(reqdata);
227 }
228 break;
229 case ARCHIP_OP_WRITE:
230 case ARCHIP_OP_FLUSH:
231 segreq = reqdata->segreq;
232 segreq->count += req->serviced;
233 xseg_put_request(s->xseg, req, s->srcport);
234
235 if (atomic_fetch_dec(&segreq->ref) == 1) {
236 if (!segreq->failed) {
237 reqdata->aio_cb->ret = segreq->count;
238 archipelago_finish_aiocb(reqdata);
239 g_free(segreq);
240 } else {
241 g_free(segreq);
242 g_free(reqdata);
243 }
244 } else {
245 g_free(reqdata);
246 }
247 break;
248 case ARCHIP_OP_VOLINFO:
249 case ARCHIP_OP_TRUNCATE:
250 s->is_signaled = true;
251 qemu_cond_signal(&s->archip_cond);
252 break;
253 }
254 } else {
255 xseg_wait_signal(s->xseg, psd, 100000UL);
256 }
257 xseg_cancel_wait(s->xseg, s->srcport);
258 }
259
260 s->th_is_signaled = true;
261 qemu_cond_signal(&s->request_cond);
262 qemu_mutex_unlock(&s->request_mutex);
263 qemu_thread_exit(NULL);
264}
265
266static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
267{
268 if (xseg_initialize()) {
269 archipelagolog("Cannot initialize XSEG\n");
270 goto err_exit;
271 }
272
273 s->xseg = xseg_join("posix", s->segment_name,
274 "posixfd", NULL);
275 if (!s->xseg) {
276 archipelagolog("Cannot join XSEG shared memory segment\n");
277 goto err_exit;
278 }
279 s->port = xseg_bind_dynport(s->xseg);
280 s->srcport = s->port->portno;
281 init_local_signal(s->xseg, s->sport, s->srcport);
282 return 0;
283
284err_exit:
285 return -1;
286}
287
288static int qemu_archipelago_init(BDRVArchipelagoState *s)
289{
290 int ret;
291
292 ret = qemu_archipelago_xseg_init(s);
293 if (ret < 0) {
294 error_report("Cannot initialize XSEG. Aborting...");
295 goto err_exit;
296 }
297
298 qemu_cond_init(&s->archip_cond);
299 qemu_mutex_init(&s->archip_mutex);
300 qemu_cond_init(&s->request_cond);
301 qemu_mutex_init(&s->request_mutex);
302 s->th_is_signaled = false;
303 qemu_thread_create(&s->request_th, "xseg_io_th",
304 (void *) xseg_request_handler,
305 (void *) s, QEMU_THREAD_JOINABLE);
306
307err_exit:
308 return ret;
309}
310
311static void qemu_archipelago_complete_aio(void *opaque)
312{
313 AIORequestData *reqdata = (AIORequestData *) opaque;
314 ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
315
316 qemu_bh_delete(aio_cb->bh);
317 aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
318 aio_cb->status = 0;
319
320 qemu_aio_unref(aio_cb);
321 g_free(reqdata);
322}
323
324static void xseg_find_port(char *pstr, const char *needle, xport *aport)
325{
326 const char *a;
327 char *endptr = NULL;
328 unsigned long port;
329 if (strstart(pstr, needle, &a)) {
330 if (strlen(a) > 0) {
331 port = strtoul(a, &endptr, 10);
332 if (strlen(endptr)) {
333 *aport = -2;
334 return;
335 }
336 *aport = (xport) port;
337 }
338 }
339}
340
341static void xseg_find_segment(char *pstr, const char *needle,
342 char **segment_name)
343{
344 const char *a;
345 if (strstart(pstr, needle, &a)) {
346 if (strlen(a) > 0) {
347 *segment_name = g_strdup(a);
348 }
349 }
350}
351
352static void parse_filename_opts(const char *filename, Error **errp,
353 char **volume, char **segment_name,
354 xport *mport, xport *vport)
355{
356 const char *start;
357 char *tokens[4], *ds;
358 int idx;
359 xport lmport = NoPort, lvport = NoPort;
360
361 strstart(filename, "archipelago:", &start);
362
363 ds = g_strdup(start);
364 tokens[0] = strtok(ds, "/");
365 tokens[1] = strtok(NULL, ":");
366 tokens[2] = strtok(NULL, ":");
367 tokens[3] = strtok(NULL, "\0");
368
369 if (!strlen(tokens[0])) {
370 error_setg(errp, "volume name must be specified first");
371 g_free(ds);
372 return;
373 }
374
375 for (idx = 1; idx < 4; idx++) {
376 if (tokens[idx] != NULL) {
377 if (strstart(tokens[idx], "mport=", NULL)) {
378 xseg_find_port(tokens[idx], "mport=", &lmport);
379 }
380 if (strstart(tokens[idx], "vport=", NULL)) {
381 xseg_find_port(tokens[idx], "vport=", &lvport);
382 }
383 if (strstart(tokens[idx], "segment=", NULL)) {
384 xseg_find_segment(tokens[idx], "segment=", segment_name);
385 }
386 }
387 }
388
389 if ((lmport == -2) || (lvport == -2)) {
390 error_setg(errp, "mport and/or vport must be set");
391 g_free(ds);
392 return;
393 }
394 *volume = g_strdup(tokens[0]);
395 *mport = lmport;
396 *vport = lvport;
397 g_free(ds);
398}
399
400static void archipelago_parse_filename(const char *filename, QDict *options,
401 Error **errp)
402{
403 const char *start;
404 char *volume = NULL, *segment_name = NULL;
405 xport mport = NoPort, vport = NoPort;
406
407 if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
408 || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
409 || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
410 || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
411 error_setg(errp, "volume/mport/vport/segment and a file name may not"
412 " be specified at the same time");
413 return;
414 }
415
416 if (!strstart(filename, "archipelago:", &start)) {
417 error_setg(errp, "File name must start with 'archipelago:'");
418 return;
419 }
420
421 if (!strlen(start) || strstart(start, "/", NULL)) {
422 error_setg(errp, "volume name must be specified");
423 return;
424 }
425
426 parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
427
428 if (volume) {
429 qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
430 g_free(volume);
431 }
432 if (segment_name) {
433 qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
434 qstring_from_str(segment_name));
435 g_free(segment_name);
436 }
437 if (mport != NoPort) {
438 qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
439 }
440 if (vport != NoPort) {
441 qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
442 }
443}
444
445static QemuOptsList archipelago_runtime_opts = {
446 .name = "archipelago",
447 .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
448 .desc = {
449 {
450 .name = ARCHIPELAGO_OPT_VOLUME,
451 .type = QEMU_OPT_STRING,
452 .help = "Name of the volume image",
453 },
454 {
455 .name = ARCHIPELAGO_OPT_SEGMENT,
456 .type = QEMU_OPT_STRING,
457 .help = "Name of the Archipelago shared memory segment",
458 },
459 {
460 .name = ARCHIPELAGO_OPT_MPORT,
461 .type = QEMU_OPT_NUMBER,
462 .help = "Archipelago mapperd port number"
463 },
464 {
465 .name = ARCHIPELAGO_OPT_VPORT,
466 .type = QEMU_OPT_NUMBER,
467 .help = "Archipelago vlmcd port number"
468
469 },
470 { }
471 },
472};
473
474static int qemu_archipelago_open(BlockDriverState *bs,
475 QDict *options,
476 int bdrv_flags,
477 Error **errp)
478{
479 int ret = 0;
480 const char *volume, *segment_name;
481 QemuOpts *opts;
482 Error *local_err = NULL;
483 BDRVArchipelagoState *s = bs->opaque;
484
485 opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
486 qemu_opts_absorb_qdict(opts, options, &local_err);
487 if (local_err) {
488 error_propagate(errp, local_err);
489 ret = -EINVAL;
490 goto err_exit;
491 }
492
493 s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
494 ARCHIPELAGO_DFL_MPORT);
495 s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
496 ARCHIPELAGO_DFL_VPORT);
497
498 segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
499 if (segment_name == NULL) {
500 s->segment_name = g_strdup("archipelago");
501 } else {
502 s->segment_name = g_strdup(segment_name);
503 }
504
505 volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
506 if (volume == NULL) {
507 error_setg(errp, "archipelago block driver requires the 'volume'"
508 " option");
509 ret = -EINVAL;
510 goto err_exit;
511 }
512 s->volname = g_strdup(volume);
513
514
515 ret = qemu_archipelago_init(s);
516 if (ret < 0) {
517 error_setg(errp, "cannot initialize XSEG and join shared "
518 "memory segment");
519 goto err_exit;
520 }
521
522 qemu_opts_del(opts);
523 return 0;
524
525err_exit:
526 g_free(s->volname);
527 g_free(s->segment_name);
528 qemu_opts_del(opts);
529 return ret;
530}
531
532static void qemu_archipelago_close(BlockDriverState *bs)
533{
534 int r, targetlen;
535 char *target;
536 struct xseg_request *req;
537 BDRVArchipelagoState *s = bs->opaque;
538
539 s->stopping = true;
540
541 qemu_mutex_lock(&s->request_mutex);
542 while (!s->th_is_signaled) {
543 qemu_cond_wait(&s->request_cond,
544 &s->request_mutex);
545 }
546 qemu_mutex_unlock(&s->request_mutex);
547 qemu_thread_join(&s->request_th);
548 qemu_cond_destroy(&s->request_cond);
549 qemu_mutex_destroy(&s->request_mutex);
550
551 qemu_cond_destroy(&s->archip_cond);
552 qemu_mutex_destroy(&s->archip_mutex);
553
554 targetlen = strlen(s->volname);
555 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
556 if (!req) {
557 archipelagolog("Cannot get XSEG request\n");
558 goto err_exit;
559 }
560 r = xseg_prep_request(s->xseg, req, targetlen, 0);
561 if (r < 0) {
562 xseg_put_request(s->xseg, req, s->srcport);
563 archipelagolog("Cannot prepare XSEG close request\n");
564 goto err_exit;
565 }
566
567 target = xseg_get_target(s->xseg, req);
568 memcpy(target, s->volname, targetlen);
569 req->size = req->datalen;
570 req->offset = 0;
571 req->op = X_CLOSE;
572
573 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
574 if (p == NoPort) {
575 xseg_put_request(s->xseg, req, s->srcport);
576 archipelagolog("Cannot submit XSEG close request\n");
577 goto err_exit;
578 }
579
580 xseg_signal(s->xseg, p);
581 wait_reply(s->xseg, s->srcport, s->port, req);
582
583 xseg_put_request(s->xseg, req, s->srcport);
584
585err_exit:
586 g_free(s->volname);
587 g_free(s->segment_name);
588 xseg_quit_local_signal(s->xseg, s->srcport);
589 xseg_leave_dynport(s->xseg, s->port);
590 xseg_leave(s->xseg);
591}
592
593static int qemu_archipelago_create_volume(Error **errp, const char *volname,
594 char *segment_name,
595 uint64_t size, xport mportno,
596 xport vportno)
597{
598 int ret, targetlen;
599 struct xseg *xseg = NULL;
600 struct xseg_request *req;
601 struct xseg_request_clone *xclone;
602 struct xseg_port *port;
603 xport srcport = NoPort, sport = NoPort;
604 char *target;
605
606
607 if (mportno == (xport) -1) {
608 mportno = ARCHIPELAGO_DFL_MPORT;
609 }
610
611 if (vportno == (xport) -1) {
612 vportno = ARCHIPELAGO_DFL_VPORT;
613 }
614
615 if (xseg_initialize()) {
616 error_setg(errp, "Cannot initialize XSEG");
617 return -1;
618 }
619
620 xseg = xseg_join("posix", segment_name,
621 "posixfd", NULL);
622
623 if (!xseg) {
624 error_setg(errp, "Cannot join XSEG shared memory segment");
625 return -1;
626 }
627
628 port = xseg_bind_dynport(xseg);
629 srcport = port->portno;
630 init_local_signal(xseg, sport, srcport);
631
632 req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
633 if (!req) {
634 error_setg(errp, "Cannot get XSEG request");
635 return -1;
636 }
637
638 targetlen = strlen(volname);
639 ret = xseg_prep_request(xseg, req, targetlen,
640 sizeof(struct xseg_request_clone));
641 if (ret < 0) {
642 error_setg(errp, "Cannot prepare XSEG request");
643 goto err_exit;
644 }
645
646 target = xseg_get_target(xseg, req);
647 if (!target) {
648 error_setg(errp, "Cannot get XSEG target.");
649 goto err_exit;
650 }
651 memcpy(target, volname, targetlen);
652 xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
653 memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
654 xclone->targetlen = 0;
655 xclone->size = size;
656 req->offset = 0;
657 req->size = req->datalen;
658 req->op = X_CLONE;
659
660 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
661 if (p == NoPort) {
662 error_setg(errp, "Could not submit XSEG request");
663 goto err_exit;
664 }
665 xseg_signal(xseg, p);
666
667 ret = wait_reply(xseg, srcport, port, req);
668 if (ret < 0) {
669 error_setg(errp, "wait_reply() error.");
670 }
671
672 xseg_put_request(xseg, req, srcport);
673 xseg_quit_local_signal(xseg, srcport);
674 xseg_leave_dynport(xseg, port);
675 xseg_leave(xseg);
676 return ret;
677
678err_exit:
679 xseg_put_request(xseg, req, srcport);
680 xseg_quit_local_signal(xseg, srcport);
681 xseg_leave_dynport(xseg, port);
682 xseg_leave(xseg);
683 return -1;
684}
685
686static int qemu_archipelago_create(const char *filename,
687 QemuOpts *options,
688 Error **errp)
689{
690 int ret = 0;
691 uint64_t total_size = 0;
692 char *volname = NULL, *segment_name = NULL;
693 const char *start;
694 xport mport = NoPort, vport = NoPort;
695
696 if (!strstart(filename, "archipelago:", &start)) {
697 error_setg(errp, "File name must start with 'archipelago:'");
698 return -1;
699 }
700
701 if (!strlen(start) || strstart(start, "/", NULL)) {
702 error_setg(errp, "volume name must be specified");
703 return -1;
704 }
705
706 parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
707 &vport);
708 total_size = ROUND_UP(qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0),
709 BDRV_SECTOR_SIZE);
710
711 if (segment_name == NULL) {
712 segment_name = g_strdup("archipelago");
713 }
714
715
716 ret = qemu_archipelago_create_volume(errp, volname, segment_name,
717 total_size, mport,
718 vport);
719
720 g_free(volname);
721 g_free(segment_name);
722 return ret;
723}
724
725static const AIOCBInfo archipelago_aiocb_info = {
726 .aiocb_size = sizeof(ArchipelagoAIOCB),
727};
728
729static int archipelago_submit_request(BDRVArchipelagoState *s,
730 uint64_t bufidx,
731 size_t count,
732 off_t offset,
733 ArchipelagoAIOCB *aio_cb,
734 ArchipelagoSegmentedRequest *segreq,
735 int op)
736{
737 int ret, targetlen;
738 char *target;
739 void *data = NULL;
740 struct xseg_request *req;
741 AIORequestData *reqdata = g_new(AIORequestData, 1);
742
743 targetlen = strlen(s->volname);
744 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
745 if (!req) {
746 archipelagolog("Cannot get XSEG request\n");
747 goto err_exit2;
748 }
749 ret = xseg_prep_request(s->xseg, req, targetlen, count);
750 if (ret < 0) {
751 archipelagolog("Cannot prepare XSEG request\n");
752 goto err_exit;
753 }
754 target = xseg_get_target(s->xseg, req);
755 if (!target) {
756 archipelagolog("Cannot get XSEG target\n");
757 goto err_exit;
758 }
759 memcpy(target, s->volname, targetlen);
760 req->size = count;
761 req->offset = offset;
762
763 switch (op) {
764 case ARCHIP_OP_READ:
765 req->op = X_READ;
766 break;
767 case ARCHIP_OP_WRITE:
768 req->op = X_WRITE;
769 break;
770 case ARCHIP_OP_FLUSH:
771 req->op = X_FLUSH;
772 break;
773 }
774 reqdata->volname = s->volname;
775 reqdata->offset = offset;
776 reqdata->size = count;
777 reqdata->bufidx = bufidx;
778 reqdata->aio_cb = aio_cb;
779 reqdata->segreq = segreq;
780 reqdata->op = op;
781
782 xseg_set_req_data(s->xseg, req, reqdata);
783 if (op == ARCHIP_OP_WRITE) {
784 data = xseg_get_data(s->xseg, req);
785 if (!data) {
786 archipelagolog("Cannot get XSEG data\n");
787 goto err_exit;
788 }
789 qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
790 }
791
792 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
793 if (p == NoPort) {
794 archipelagolog("Could not submit XSEG request\n");
795 goto err_exit;
796 }
797 xseg_signal(s->xseg, p);
798 return 0;
799
800err_exit:
801 g_free(reqdata);
802 xseg_put_request(s->xseg, req, s->srcport);
803 return -EIO;
804err_exit2:
805 g_free(reqdata);
806 return -EIO;
807}
808
809static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
810 size_t count,
811 off_t offset,
812 ArchipelagoAIOCB *aio_cb,
813 int op)
814{
815 int ret, segments_nr;
816 size_t pos = 0;
817 ArchipelagoSegmentedRequest *segreq;
818
819 segreq = g_new0(ArchipelagoSegmentedRequest, 1);
820
821 if (op == ARCHIP_OP_FLUSH) {
822 segments_nr = 1;
823 } else {
824 segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
825 ((count % MAX_REQUEST_SIZE) ? 1 : 0);
826 }
827 segreq->total = count;
828 atomic_mb_set(&segreq->ref, segments_nr);
829
830 while (segments_nr > 1) {
831 ret = archipelago_submit_request(s, pos,
832 MAX_REQUEST_SIZE,
833 offset + pos,
834 aio_cb, segreq, op);
835
836 if (ret < 0) {
837 goto err_exit;
838 }
839 count -= MAX_REQUEST_SIZE;
840 pos += MAX_REQUEST_SIZE;
841 segments_nr--;
842 }
843 ret = archipelago_submit_request(s, pos, count, offset + pos,
844 aio_cb, segreq, op);
845
846 if (ret < 0) {
847 goto err_exit;
848 }
849 return 0;
850
851err_exit:
852 segreq->failed = 1;
853 if (atomic_fetch_sub(&segreq->ref, segments_nr) == segments_nr) {
854 g_free(segreq);
855 }
856 return ret;
857}
858
859static BlockAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
860 int64_t sector_num,
861 QEMUIOVector *qiov,
862 int nb_sectors,
863 BlockCompletionFunc *cb,
864 void *opaque,
865 int op)
866{
867 ArchipelagoAIOCB *aio_cb;
868 BDRVArchipelagoState *s = bs->opaque;
869 int64_t size, off;
870 int ret;
871
872 aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
873 aio_cb->cmd = op;
874 aio_cb->qiov = qiov;
875
876 aio_cb->ret = 0;
877 aio_cb->s = s;
878 aio_cb->status = -EINPROGRESS;
879
880 off = sector_num * BDRV_SECTOR_SIZE;
881 size = nb_sectors * BDRV_SECTOR_SIZE;
882 aio_cb->size = size;
883
884 ret = archipelago_aio_segmented_rw(s, size, off,
885 aio_cb, op);
886 if (ret < 0) {
887 goto err_exit;
888 }
889 return &aio_cb->common;
890
891err_exit:
892 error_report("qemu_archipelago_aio_rw(): I/O Error");
893 qemu_aio_unref(aio_cb);
894 return NULL;
895}
896
897static BlockAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
898 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
899 BlockCompletionFunc *cb, void *opaque)
900{
901 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
902 opaque, ARCHIP_OP_READ);
903}
904
905static BlockAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
906 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
907 BlockCompletionFunc *cb, void *opaque)
908{
909 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
910 opaque, ARCHIP_OP_WRITE);
911}
912
913static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
914{
915 uint64_t size;
916 int ret, targetlen;
917 struct xseg_request *req;
918 struct xseg_reply_info *xinfo;
919 AIORequestData *reqdata = g_new(AIORequestData, 1);
920
921 const char *volname = s->volname;
922 targetlen = strlen(volname);
923 req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
924 if (!req) {
925 archipelagolog("Cannot get XSEG request\n");
926 goto err_exit2;
927 }
928 ret = xseg_prep_request(s->xseg, req, targetlen,
929 sizeof(struct xseg_reply_info));
930 if (ret < 0) {
931 archipelagolog("Cannot prepare XSEG request\n");
932 goto err_exit;
933 }
934 char *target = xseg_get_target(s->xseg, req);
935 if (!target) {
936 archipelagolog("Cannot get XSEG target\n");
937 goto err_exit;
938 }
939 memcpy(target, volname, targetlen);
940 req->size = req->datalen;
941 req->offset = 0;
942 req->op = X_INFO;
943
944 reqdata->op = ARCHIP_OP_VOLINFO;
945 reqdata->volname = volname;
946 xseg_set_req_data(s->xseg, req, reqdata);
947
948 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
949 if (p == NoPort) {
950 archipelagolog("Cannot submit XSEG request\n");
951 goto err_exit;
952 }
953 xseg_signal(s->xseg, p);
954 qemu_mutex_lock(&s->archip_mutex);
955 while (!s->is_signaled) {
956 qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
957 }
958 s->is_signaled = false;
959 qemu_mutex_unlock(&s->archip_mutex);
960
961 xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
962 size = xinfo->size;
963 xseg_put_request(s->xseg, req, s->srcport);
964 g_free(reqdata);
965 s->size = size;
966 return size;
967
968err_exit:
969 xseg_put_request(s->xseg, req, s->srcport);
970err_exit2:
971 g_free(reqdata);
972 return -EIO;
973}
974
975static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
976{
977 int64_t ret;
978 BDRVArchipelagoState *s = bs->opaque;
979
980 ret = archipelago_volume_info(s);
981 return ret;
982}
983
984static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
985{
986 int ret, targetlen;
987 struct xseg_request *req;
988 BDRVArchipelagoState *s = bs->opaque;
989 AIORequestData *reqdata = g_new(AIORequestData, 1);
990
991 const char *volname = s->volname;
992 targetlen = strlen(volname);
993 req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
994 if (!req) {
995 archipelagolog("Cannot get XSEG request\n");
996 goto err_exit2;
997 }
998
999 ret = xseg_prep_request(s->xseg, req, targetlen, 0);
1000 if (ret < 0) {
1001 archipelagolog("Cannot prepare XSEG request\n");
1002 goto err_exit;
1003 }
1004 char *target = xseg_get_target(s->xseg, req);
1005 if (!target) {
1006 archipelagolog("Cannot get XSEG target\n");
1007 goto err_exit;
1008 }
1009 memcpy(target, volname, targetlen);
1010 req->offset = offset;
1011 req->op = X_TRUNCATE;
1012
1013 reqdata->op = ARCHIP_OP_TRUNCATE;
1014 reqdata->volname = volname;
1015
1016 xseg_set_req_data(s->xseg, req, reqdata);
1017
1018 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
1019 if (p == NoPort) {
1020 archipelagolog("Cannot submit XSEG request\n");
1021 goto err_exit;
1022 }
1023
1024 xseg_signal(s->xseg, p);
1025 qemu_mutex_lock(&s->archip_mutex);
1026 while (!s->is_signaled) {
1027 qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1028 }
1029 s->is_signaled = false;
1030 qemu_mutex_unlock(&s->archip_mutex);
1031 xseg_put_request(s->xseg, req, s->srcport);
1032 g_free(reqdata);
1033 return 0;
1034
1035err_exit:
1036 xseg_put_request(s->xseg, req, s->srcport);
1037err_exit2:
1038 g_free(reqdata);
1039 return -EIO;
1040}
1041
1042static QemuOptsList qemu_archipelago_create_opts = {
1043 .name = "archipelago-create-opts",
1044 .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1045 .desc = {
1046 {
1047 .name = BLOCK_OPT_SIZE,
1048 .type = QEMU_OPT_SIZE,
1049 .help = "Virtual disk size"
1050 },
1051 { }
1052 }
1053};
1054
1055static BlockAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1056 BlockCompletionFunc *cb, void *opaque)
1057{
1058 return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1059 ARCHIP_OP_FLUSH);
1060}
1061
1062static BlockDriver bdrv_archipelago = {
1063 .format_name = "archipelago",
1064 .protocol_name = "archipelago",
1065 .instance_size = sizeof(BDRVArchipelagoState),
1066 .bdrv_parse_filename = archipelago_parse_filename,
1067 .bdrv_file_open = qemu_archipelago_open,
1068 .bdrv_close = qemu_archipelago_close,
1069 .bdrv_create = qemu_archipelago_create,
1070 .bdrv_getlength = qemu_archipelago_getlength,
1071 .bdrv_truncate = qemu_archipelago_truncate,
1072 .bdrv_aio_readv = qemu_archipelago_aio_readv,
1073 .bdrv_aio_writev = qemu_archipelago_aio_writev,
1074 .bdrv_aio_flush = qemu_archipelago_aio_flush,
1075 .bdrv_has_zero_init = bdrv_has_zero_init_1,
1076 .create_opts = &qemu_archipelago_create_opts,
1077};
1078
1079static void bdrv_archipelago_init(void)
1080{
1081 bdrv_register(&bdrv_archipelago);
1082}
1083
1084block_init(bdrv_archipelago_init);
1085