qemu/tools/virtiofsd/fuse_virtio.c
<<
>>
Prefs
   1/*
   2 * virtio-fs glue for FUSE
   3 * Copyright (C) 2018 Red Hat, Inc. and/or its affiliates
   4 *
   5 * Authors:
   6 *   Dave Gilbert  <dgilbert@redhat.com>
   7 *
   8 * Implements the glue between libfuse and libvhost-user
   9 *
  10 * This program can be distributed under the terms of the GNU LGPLv2.
  11 * See the file COPYING.LIB
  12 */
  13
  14#include "qemu/osdep.h"
  15#include "qemu/iov.h"
  16#include "qapi/error.h"
  17#include "fuse_i.h"
  18#include "standard-headers/linux/fuse.h"
  19#include "fuse_misc.h"
  20#include "fuse_opt.h"
  21#include "fuse_virtio.h"
  22
  23#include <sys/eventfd.h>
  24#include <sys/socket.h>
  25#include <sys/un.h>
  26#include <grp.h>
  27
  28#include "libvhost-user.h"
  29
  30struct fv_VuDev;
  31struct fv_QueueInfo {
  32    pthread_t thread;
  33    /*
  34     * This lock protects the VuVirtq preventing races between
  35     * fv_queue_thread() and fv_queue_worker().
  36     */
  37    pthread_mutex_t vq_lock;
  38
  39    struct fv_VuDev *virtio_dev;
  40
  41    /* Our queue index, corresponds to array position */
  42    int qidx;
  43    int kick_fd;
  44    int kill_fd; /* For killing the thread */
  45};
  46
  47/* A FUSE request */
  48typedef struct {
  49    VuVirtqElement elem;
  50    struct fuse_chan ch;
  51
  52    /* Used to complete requests that involve no reply */
  53    bool reply_sent;
  54} FVRequest;
  55
  56/*
  57 * We pass the dev element into libvhost-user
  58 * and then use it to get back to the outer
  59 * container for other data.
  60 */
  61struct fv_VuDev {
  62    VuDev dev;
  63    struct fuse_session *se;
  64
  65    /*
  66     * Either handle virtqueues or vhost-user protocol messages.  Don't do
  67     * both at the same time since that could lead to race conditions if
  68     * virtqueues or memory tables change while another thread is accessing
  69     * them.
  70     *
  71     * The assumptions are:
  72     * 1. fv_queue_thread() reads/writes to virtqueues and only reads VuDev.
  73     * 2. virtio_loop() reads/writes virtqueues and VuDev.
  74     */
  75    pthread_rwlock_t vu_dispatch_rwlock;
  76
  77    /*
  78     * The following pair of fields are only accessed in the main
  79     * virtio_loop
  80     */
  81    size_t nqueues;
  82    struct fv_QueueInfo **qi;
  83};
  84
  85/* From spec */
  86struct virtio_fs_config {
  87    char tag[36];
  88    uint32_t num_queues;
  89};
  90
  91/* Callback from libvhost-user */
  92static uint64_t fv_get_features(VuDev *dev)
  93{
  94    return 1ULL << VIRTIO_F_VERSION_1;
  95}
  96
  97/* Callback from libvhost-user */
  98static void fv_set_features(VuDev *dev, uint64_t features)
  99{
 100}
 101
 102/*
 103 * Callback from libvhost-user if there's a new fd we're supposed to listen
 104 * to, typically a queue kick?
 105 */
 106static void fv_set_watch(VuDev *dev, int fd, int condition, vu_watch_cb cb,
 107                         void *data)
 108{
 109    fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
 110}
 111
 112/*
 113 * Callback from libvhost-user if we're no longer supposed to listen on an fd
 114 */
 115static void fv_remove_watch(VuDev *dev, int fd)
 116{
 117    fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
 118}
 119
 120/* Callback from libvhost-user to panic */
 121static void fv_panic(VuDev *dev, const char *err)
 122{
 123    fuse_log(FUSE_LOG_ERR, "%s: libvhost-user: %s\n", __func__, err);
 124    /* TODO: Allow reconnects?? */
 125    exit(EXIT_FAILURE);
 126}
 127
 128/*
 129 * Copy from an iovec into a fuse_buf (memory only)
 130 * Caller must ensure there is space
 131 */
 132static size_t copy_from_iov(struct fuse_buf *buf, size_t out_num,
 133                            const struct iovec *out_sg,
 134                            size_t max)
 135{
 136    void *dest = buf->mem;
 137    size_t copied = 0;
 138
 139    while (out_num && max) {
 140        size_t onelen = out_sg->iov_len;
 141        onelen = MIN(onelen, max);
 142        memcpy(dest, out_sg->iov_base, onelen);
 143        dest += onelen;
 144        copied += onelen;
 145        out_sg++;
 146        out_num--;
 147        max -= onelen;
 148    }
 149
 150    return copied;
 151}
 152
 153/*
 154 * Skip 'skip' bytes in the iov; 'sg_1stindex' is set as
 155 * the index for the 1st iovec to read data from, and
 156 * 'sg_1stskip' is the number of bytes to skip in that entry.
 157 *
 158 * Returns True if there are at least 'skip' bytes in the iovec
 159 *
 160 */
 161static bool skip_iov(const struct iovec *sg, size_t sg_size,
 162                     size_t skip,
 163                     size_t *sg_1stindex, size_t *sg_1stskip)
 164{
 165    size_t vec;
 166
 167    for (vec = 0; vec < sg_size; vec++) {
 168        if (sg[vec].iov_len > skip) {
 169            *sg_1stskip = skip;
 170            *sg_1stindex = vec;
 171
 172            return true;
 173        }
 174
 175        skip -= sg[vec].iov_len;
 176    }
 177
 178    *sg_1stindex = vec;
 179    *sg_1stskip = 0;
 180    return skip == 0;
 181}
 182
 183/*
 184 * Copy from one iov to another, the given number of bytes
 185 * The caller must have checked sizes.
 186 */
 187static void copy_iov(struct iovec *src_iov, int src_count,
 188                     struct iovec *dst_iov, int dst_count, size_t to_copy)
 189{
 190    size_t dst_offset = 0;
 191    /* Outer loop copies 'src' elements */
 192    while (to_copy) {
 193        assert(src_count);
 194        size_t src_len = src_iov[0].iov_len;
 195        size_t src_offset = 0;
 196
 197        if (src_len > to_copy) {
 198            src_len = to_copy;
 199        }
 200        /* Inner loop copies contents of one 'src' to maybe multiple dst. */
 201        while (src_len) {
 202            assert(dst_count);
 203            size_t dst_len = dst_iov[0].iov_len - dst_offset;
 204            if (dst_len > src_len) {
 205                dst_len = src_len;
 206            }
 207
 208            memcpy(dst_iov[0].iov_base + dst_offset,
 209                   src_iov[0].iov_base + src_offset, dst_len);
 210            src_len -= dst_len;
 211            to_copy -= dst_len;
 212            src_offset += dst_len;
 213            dst_offset += dst_len;
 214
 215            assert(dst_offset <= dst_iov[0].iov_len);
 216            if (dst_offset == dst_iov[0].iov_len) {
 217                dst_offset = 0;
 218                dst_iov++;
 219                dst_count--;
 220            }
 221        }
 222        src_iov++;
 223        src_count--;
 224    }
 225}
 226
 227/*
 228 * pthread_rwlock_rdlock() and pthread_rwlock_wrlock can fail if
 229 * a deadlock condition is detected or the current thread already
 230 * owns the lock. They can also fail, like pthread_rwlock_unlock(),
 231 * if the mutex wasn't properly initialized. None of these are ever
 232 * expected to happen.
 233 */
 234static void vu_dispatch_rdlock(struct fv_VuDev *vud)
 235{
 236    int ret = pthread_rwlock_rdlock(&vud->vu_dispatch_rwlock);
 237    assert(ret == 0);
 238}
 239
 240static void vu_dispatch_wrlock(struct fv_VuDev *vud)
 241{
 242    int ret = pthread_rwlock_wrlock(&vud->vu_dispatch_rwlock);
 243    assert(ret == 0);
 244}
 245
 246static void vu_dispatch_unlock(struct fv_VuDev *vud)
 247{
 248    int ret = pthread_rwlock_unlock(&vud->vu_dispatch_rwlock);
 249    assert(ret == 0);
 250}
 251
 252/*
 253 * Called back by ll whenever it wants to send a reply/message back
 254 * The 1st element of the iov starts with the fuse_out_header
 255 * 'unique'==0 means it's a notify message.
 256 */
 257int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
 258                    struct iovec *iov, int count)
 259{
 260    FVRequest *req = container_of(ch, FVRequest, ch);
 261    struct fv_QueueInfo *qi = ch->qi;
 262    VuDev *dev = &se->virtio_dev->dev;
 263    VuVirtq *q = vu_get_queue(dev, qi->qidx);
 264    VuVirtqElement *elem = &req->elem;
 265    int ret = 0;
 266
 267    assert(count >= 1);
 268    assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
 269
 270    struct fuse_out_header *out = iov[0].iov_base;
 271    /* TODO: Endianness! */
 272
 273    size_t tosend_len = iov_size(iov, count);
 274
 275    /* unique == 0 is notification, which we don't support */
 276    assert(out->unique);
 277    assert(!req->reply_sent);
 278
 279    /* The 'in' part of the elem is to qemu */
 280    unsigned int in_num = elem->in_num;
 281    struct iovec *in_sg = elem->in_sg;
 282    size_t in_len = iov_size(in_sg, in_num);
 283    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
 284             __func__, elem->index, in_num, in_len);
 285
 286    /*
 287     * The elem should have room for a 'fuse_out_header' (out from fuse)
 288     * plus the data based on the len in the header.
 289     */
 290    if (in_len < sizeof(struct fuse_out_header)) {
 291        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
 292                 __func__, elem->index);
 293        ret = -E2BIG;
 294        goto err;
 295    }
 296    if (in_len < tosend_len) {
 297        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
 298                 __func__, elem->index, tosend_len);
 299        ret = -E2BIG;
 300        goto err;
 301    }
 302
 303    copy_iov(iov, count, in_sg, in_num, tosend_len);
 304
 305    vu_dispatch_rdlock(qi->virtio_dev);
 306    pthread_mutex_lock(&qi->vq_lock);
 307    vu_queue_push(dev, q, elem, tosend_len);
 308    vu_queue_notify(dev, q);
 309    pthread_mutex_unlock(&qi->vq_lock);
 310    vu_dispatch_unlock(qi->virtio_dev);
 311
 312    req->reply_sent = true;
 313
 314err:
 315    return ret;
 316}
 317
 318/*
 319 * Callback from fuse_send_data_iov_* when it's virtio and the buffer
 320 * is a single FD with FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK
 321 * We need send the iov and then the buffer.
 322 * Return 0 on success
 323 */
 324int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
 325                         struct iovec *iov, int count, struct fuse_bufvec *buf,
 326                         size_t len)
 327{
 328    FVRequest *req = container_of(ch, FVRequest, ch);
 329    struct fv_QueueInfo *qi = ch->qi;
 330    VuDev *dev = &se->virtio_dev->dev;
 331    VuVirtq *q = vu_get_queue(dev, qi->qidx);
 332    VuVirtqElement *elem = &req->elem;
 333    int ret = 0;
 334    g_autofree struct iovec *in_sg_cpy = NULL;
 335
 336    assert(count >= 1);
 337    assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
 338
 339    struct fuse_out_header *out = iov[0].iov_base;
 340    /* TODO: Endianness! */
 341
 342    size_t iov_len = iov_size(iov, count);
 343    size_t tosend_len = iov_len + len;
 344
 345    out->len = tosend_len;
 346
 347    fuse_log(FUSE_LOG_DEBUG, "%s: count=%d len=%zd iov_len=%zd\n", __func__,
 348             count, len, iov_len);
 349
 350    /* unique == 0 is notification which we don't support */
 351    assert(out->unique);
 352
 353    assert(!req->reply_sent);
 354
 355    /* The 'in' part of the elem is to qemu */
 356    unsigned int in_num = elem->in_num;
 357    struct iovec *in_sg = elem->in_sg;
 358    size_t in_len = iov_size(in_sg, in_num);
 359    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
 360             __func__, elem->index, in_num, in_len);
 361
 362    /*
 363     * The elem should have room for a 'fuse_out_header' (out from fuse)
 364     * plus the data based on the len in the header.
 365     */
 366    if (in_len < sizeof(struct fuse_out_header)) {
 367        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
 368                 __func__, elem->index);
 369        return E2BIG;
 370    }
 371    if (in_len < tosend_len) {
 372        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
 373                 __func__, elem->index, tosend_len);
 374        return E2BIG;
 375    }
 376
 377    /* TODO: Limit to 'len' */
 378
 379    /* First copy the header data from iov->in_sg */
 380    copy_iov(iov, count, in_sg, in_num, iov_len);
 381
 382    /*
 383     * Build a copy of the the in_sg iov so we can skip bits in it,
 384     * including changing the offsets
 385     */
 386    in_sg_cpy = g_new(struct iovec, in_num);
 387    memcpy(in_sg_cpy, in_sg, sizeof(struct iovec) * in_num);
 388    /* These get updated as we skip */
 389    struct iovec *in_sg_ptr = in_sg_cpy;
 390    unsigned int in_sg_cpy_count = in_num;
 391
 392    /* skip over parts of in_sg that contained the header iov */
 393    iov_discard_front(&in_sg_ptr, &in_sg_cpy_count, iov_len);
 394
 395    do {
 396        fuse_log(FUSE_LOG_DEBUG, "%s: in_sg_cpy_count=%d len remaining=%zd\n",
 397                 __func__, in_sg_cpy_count, len);
 398
 399        ret = preadv(buf->buf[0].fd, in_sg_ptr, in_sg_cpy_count,
 400                     buf->buf[0].pos);
 401
 402        if (ret == -1) {
 403            ret = errno;
 404            if (ret == EINTR) {
 405                continue;
 406            }
 407            fuse_log(FUSE_LOG_DEBUG, "%s: preadv failed (%m) len=%zd\n",
 408                     __func__, len);
 409            return ret;
 410        }
 411
 412        if (!ret) {
 413            /* EOF case? */
 414            fuse_log(FUSE_LOG_DEBUG, "%s: !ret len remaining=%zd\n", __func__,
 415                     len);
 416            break;
 417        }
 418        fuse_log(FUSE_LOG_DEBUG, "%s: preadv ret=%d len=%zd\n", __func__,
 419                 ret, len);
 420
 421        len -= ret;
 422        /* Short read. Retry reading remaining bytes */
 423        if (len) {
 424            fuse_log(FUSE_LOG_DEBUG, "%s: ret < len\n", __func__);
 425            /* Skip over this much next time around */
 426            iov_discard_front(&in_sg_ptr, &in_sg_cpy_count, ret);
 427            buf->buf[0].pos += ret;
 428        }
 429    } while (len);
 430
 431    /* Need to fix out->len on EOF */
 432    if (len) {
 433        struct fuse_out_header *out_sg = in_sg[0].iov_base;
 434
 435        tosend_len -= len;
 436        out_sg->len = tosend_len;
 437    }
 438
 439    vu_dispatch_rdlock(qi->virtio_dev);
 440    pthread_mutex_lock(&qi->vq_lock);
 441    vu_queue_push(dev, q, elem, tosend_len);
 442    vu_queue_notify(dev, q);
 443    pthread_mutex_unlock(&qi->vq_lock);
 444    vu_dispatch_unlock(qi->virtio_dev);
 445    req->reply_sent = true;
 446    return 0;
 447}
 448
 449static __thread bool clone_fs_called;
 450
 451/* Process one FVRequest in a thread pool */
 452static void fv_queue_worker(gpointer data, gpointer user_data)
 453{
 454    struct fv_QueueInfo *qi = user_data;
 455    struct fuse_session *se = qi->virtio_dev->se;
 456    struct VuDev *dev = &qi->virtio_dev->dev;
 457    FVRequest *req = data;
 458    VuVirtqElement *elem = &req->elem;
 459    struct fuse_buf fbuf = {};
 460    bool allocated_bufv = false;
 461    struct fuse_bufvec bufv;
 462    struct fuse_bufvec *pbufv;
 463    struct fuse_in_header inh;
 464
 465    assert(se->bufsize > sizeof(struct fuse_in_header));
 466
 467    if (!clone_fs_called) {
 468        int ret;
 469
 470        /* unshare FS for xattr operation */
 471        ret = unshare(CLONE_FS);
 472        /* should not fail */
 473        assert(ret == 0);
 474
 475        clone_fs_called = true;
 476    }
 477
 478    /*
 479     * An element contains one request and the space to send our response
 480     * They're spread over multiple descriptors in a scatter/gather set
 481     * and we can't trust the guest to keep them still; so copy in/out.
 482     */
 483    fbuf.mem = g_malloc(se->bufsize);
 484
 485    fuse_mutex_init(&req->ch.lock);
 486    req->ch.fd = -1;
 487    req->ch.qi = qi;
 488
 489    /* The 'out' part of the elem is from qemu */
 490    unsigned int out_num = elem->out_num;
 491    struct iovec *out_sg = elem->out_sg;
 492    size_t out_len = iov_size(out_sg, out_num);
 493    fuse_log(FUSE_LOG_DEBUG,
 494             "%s: elem %d: with %d out desc of length %zd\n",
 495             __func__, elem->index, out_num, out_len);
 496
 497    /*
 498     * The elem should contain a 'fuse_in_header' (in to fuse)
 499     * plus the data based on the len in the header.
 500     */
 501    if (out_len < sizeof(struct fuse_in_header)) {
 502        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
 503                 __func__, elem->index);
 504        assert(0); /* TODO */
 505    }
 506    if (out_len > se->bufsize) {
 507        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
 508                 elem->index);
 509        assert(0); /* TODO */
 510    }
 511    /* Copy just the fuse_in_header and look at it */
 512    copy_from_iov(&fbuf, out_num, out_sg,
 513                  sizeof(struct fuse_in_header));
 514    memcpy(&inh, fbuf.mem, sizeof(struct fuse_in_header));
 515
 516    pbufv = NULL; /* Compiler thinks an unitialised path */
 517    if (inh.opcode == FUSE_WRITE &&
 518        out_len >= (sizeof(struct fuse_in_header) +
 519                    sizeof(struct fuse_write_in))) {
 520        /*
 521         * For a write we don't actually need to copy the
 522         * data, we can just do it straight out of guest memory
 523         * but we must still copy the headers in case the guest
 524         * was nasty and changed them while we were using them.
 525         */
 526        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
 527
 528        fbuf.size = copy_from_iov(&fbuf, out_num, out_sg,
 529                                  sizeof(struct fuse_in_header) +
 530                                  sizeof(struct fuse_write_in));
 531        /* That copy reread the in_header, make sure we use the original */
 532        memcpy(fbuf.mem, &inh, sizeof(struct fuse_in_header));
 533
 534        /* Allocate the bufv, with space for the rest of the iov */
 535        pbufv = g_try_malloc(sizeof(struct fuse_bufvec) +
 536                             sizeof(struct fuse_buf) * out_num);
 537        if (!pbufv) {
 538            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
 539                    __func__);
 540            goto out;
 541        }
 542
 543        allocated_bufv = true;
 544        pbufv->count = 1;
 545        pbufv->buf[0] = fbuf;
 546
 547        size_t iovindex, pbufvindex, iov_bytes_skip;
 548        pbufvindex = 1; /* 2 headers, 1 fusebuf */
 549
 550        if (!skip_iov(out_sg, out_num,
 551                      sizeof(struct fuse_in_header) +
 552                      sizeof(struct fuse_write_in),
 553                      &iovindex, &iov_bytes_skip)) {
 554            fuse_log(FUSE_LOG_ERR, "%s: skip failed\n",
 555                    __func__);
 556            goto out;
 557        }
 558
 559        for (; iovindex < out_num; iovindex++, pbufvindex++) {
 560            pbufv->count++;
 561            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
 562            pbufv->buf[pbufvindex].flags = 0;
 563            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
 564            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
 565
 566            if (iov_bytes_skip) {
 567                pbufv->buf[pbufvindex].mem += iov_bytes_skip;
 568                pbufv->buf[pbufvindex].size -= iov_bytes_skip;
 569                iov_bytes_skip = 0;
 570            }
 571        }
 572    } else {
 573        /* Normal (non fast write) path */
 574
 575        copy_from_iov(&fbuf, out_num, out_sg, se->bufsize);
 576        /* That copy reread the in_header, make sure we use the original */
 577        memcpy(fbuf.mem, &inh, sizeof(struct fuse_in_header));
 578        fbuf.size = out_len;
 579
 580        /* TODO! Endianness of header */
 581
 582        /* TODO: Add checks for fuse_session_exited */
 583        bufv.buf[0] = fbuf;
 584        bufv.count = 1;
 585        pbufv = &bufv;
 586    }
 587    pbufv->idx = 0;
 588    pbufv->off = 0;
 589    fuse_session_process_buf_int(se, pbufv, &req->ch);
 590
 591out:
 592    if (allocated_bufv) {
 593        g_free(pbufv);
 594    }
 595
 596    /* If the request has no reply, still recycle the virtqueue element */
 597    if (!req->reply_sent) {
 598        struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
 599
 600        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
 601                 elem->index);
 602
 603        vu_dispatch_rdlock(qi->virtio_dev);
 604        pthread_mutex_lock(&qi->vq_lock);
 605        vu_queue_push(dev, q, elem, 0);
 606        vu_queue_notify(dev, q);
 607        pthread_mutex_unlock(&qi->vq_lock);
 608        vu_dispatch_unlock(qi->virtio_dev);
 609    }
 610
 611    pthread_mutex_destroy(&req->ch.lock);
 612    g_free(fbuf.mem);
 613    free(req);
 614}
 615
 616/* Thread function for individual queues, created when a queue is 'started' */
 617static void *fv_queue_thread(void *opaque)
 618{
 619    struct fv_QueueInfo *qi = opaque;
 620    struct VuDev *dev = &qi->virtio_dev->dev;
 621    struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
 622    struct fuse_session *se = qi->virtio_dev->se;
 623    GThreadPool *pool = NULL;
 624    GList *req_list = NULL;
 625
 626    if (se->thread_pool_size) {
 627        fuse_log(FUSE_LOG_DEBUG, "%s: Creating thread pool for Queue %d\n",
 628                 __func__, qi->qidx);
 629        pool = g_thread_pool_new(fv_queue_worker, qi, se->thread_pool_size,
 630                                 FALSE, NULL);
 631        if (!pool) {
 632            fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
 633            return NULL;
 634        }
 635    }
 636
 637    fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
 638             qi->qidx, qi->kick_fd);
 639    while (1) {
 640        struct pollfd pf[2];
 641
 642        pf[0].fd = qi->kick_fd;
 643        pf[0].events = POLLIN;
 644        pf[0].revents = 0;
 645        pf[1].fd = qi->kill_fd;
 646        pf[1].events = POLLIN;
 647        pf[1].revents = 0;
 648
 649        fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for Queue %d event\n", __func__,
 650                 qi->qidx);
 651        int poll_res = ppoll(pf, 2, NULL, NULL);
 652
 653        if (poll_res == -1) {
 654            if (errno == EINTR) {
 655                fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
 656                         __func__);
 657                continue;
 658            }
 659            fuse_log(FUSE_LOG_ERR, "fv_queue_thread ppoll: %m\n");
 660            break;
 661        }
 662        assert(poll_res >= 1);
 663        if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 664            fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x Queue %d\n",
 665                     __func__, pf[0].revents, qi->qidx);
 666            break;
 667        }
 668        if (pf[1].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 669            fuse_log(FUSE_LOG_ERR,
 670                     "%s: Unexpected poll revents %x Queue %d killfd\n",
 671                     __func__, pf[1].revents, qi->qidx);
 672            break;
 673        }
 674        if (pf[1].revents) {
 675            fuse_log(FUSE_LOG_INFO, "%s: kill event on queue %d - quitting\n",
 676                     __func__, qi->qidx);
 677            break;
 678        }
 679        assert(pf[0].revents & POLLIN);
 680        fuse_log(FUSE_LOG_DEBUG, "%s: Got queue event on Queue %d\n", __func__,
 681                 qi->qidx);
 682
 683        eventfd_t evalue;
 684        if (eventfd_read(qi->kick_fd, &evalue)) {
 685            fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
 686            break;
 687        }
 688        /* Mutual exclusion with virtio_loop() */
 689        vu_dispatch_rdlock(qi->virtio_dev);
 690        pthread_mutex_lock(&qi->vq_lock);
 691        /* out is from guest, in is too guest */
 692        unsigned int in_bytes, out_bytes;
 693        vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
 694
 695        fuse_log(FUSE_LOG_DEBUG,
 696                 "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
 697                 __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
 698
 699        while (1) {
 700            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
 701            if (!req) {
 702                break;
 703            }
 704
 705            req->reply_sent = false;
 706
 707            if (!se->thread_pool_size) {
 708                req_list = g_list_prepend(req_list, req);
 709            } else {
 710                g_thread_pool_push(pool, req, NULL);
 711            }
 712        }
 713
 714        pthread_mutex_unlock(&qi->vq_lock);
 715        vu_dispatch_unlock(qi->virtio_dev);
 716
 717        /* Process all the requests. */
 718        if (!se->thread_pool_size && req_list != NULL) {
 719            g_list_foreach(req_list, fv_queue_worker, qi);
 720            g_list_free(req_list);
 721            req_list = NULL;
 722        }
 723    }
 724
 725    if (pool) {
 726        g_thread_pool_free(pool, FALSE, TRUE);
 727    }
 728
 729    return NULL;
 730}
 731
 732static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
 733{
 734    int ret;
 735    struct fv_QueueInfo *ourqi;
 736
 737    assert(qidx < vud->nqueues);
 738    ourqi = vud->qi[qidx];
 739
 740    /* Kill the thread */
 741    if (eventfd_write(ourqi->kill_fd, 1)) {
 742        fuse_log(FUSE_LOG_ERR, "Eventfd_write for queue %d: %s\n",
 743                 qidx, strerror(errno));
 744    }
 745    ret = pthread_join(ourqi->thread, NULL);
 746    if (ret) {
 747        fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
 748                 __func__, qidx, ret);
 749    }
 750    pthread_mutex_destroy(&ourqi->vq_lock);
 751    close(ourqi->kill_fd);
 752    ourqi->kick_fd = -1;
 753    g_free(vud->qi[qidx]);
 754    vud->qi[qidx] = NULL;
 755}
 756
 757/* Callback from libvhost-user on start or stop of a queue */
 758static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
 759{
 760    struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
 761    struct fv_QueueInfo *ourqi;
 762
 763    fuse_log(FUSE_LOG_INFO, "%s: qidx=%d started=%d\n", __func__, qidx,
 764             started);
 765    assert(qidx >= 0);
 766
 767    /*
 768     * Ignore additional request queues for now.  passthrough_ll.c must be
 769     * audited for thread-safety issues first.  It was written with a
 770     * well-behaved client in mind and may not protect against all types of
 771     * races yet.
 772     */
 773    if (qidx > 1) {
 774        fuse_log(FUSE_LOG_ERR,
 775                 "%s: multiple request queues not yet implemented, please only "
 776                 "configure 1 request queue\n",
 777                 __func__);
 778        exit(EXIT_FAILURE);
 779    }
 780
 781    if (started) {
 782        /* Fire up a thread to watch this queue */
 783        if (qidx >= vud->nqueues) {
 784            vud->qi = g_realloc_n(vud->qi, qidx + 1, sizeof(vud->qi[0]));
 785            memset(vud->qi + vud->nqueues, 0,
 786                   sizeof(vud->qi[0]) * (1 + (qidx - vud->nqueues)));
 787            vud->nqueues = qidx + 1;
 788        }
 789        if (!vud->qi[qidx]) {
 790            vud->qi[qidx] = g_new0(struct fv_QueueInfo, 1);
 791            vud->qi[qidx]->virtio_dev = vud;
 792            vud->qi[qidx]->qidx = qidx;
 793        } else {
 794            /* Shouldn't have been started */
 795            assert(vud->qi[qidx]->kick_fd == -1);
 796        }
 797        ourqi = vud->qi[qidx];
 798        ourqi->kick_fd = dev->vq[qidx].kick_fd;
 799
 800        ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
 801        assert(ourqi->kill_fd != -1);
 802        pthread_mutex_init(&ourqi->vq_lock, NULL);
 803
 804        if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
 805            fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
 806                     __func__, qidx);
 807            assert(0);
 808        }
 809    } else {
 810        /*
 811         * Temporarily drop write-lock taken in virtio_loop() so that
 812         * the queue thread doesn't block in virtio_send_msg().
 813         */
 814        vu_dispatch_unlock(vud);
 815        fv_queue_cleanup_thread(vud, qidx);
 816        vu_dispatch_wrlock(vud);
 817    }
 818}
 819
 820static bool fv_queue_order(VuDev *dev, int qidx)
 821{
 822    return false;
 823}
 824
 825static const VuDevIface fv_iface = {
 826    .get_features = fv_get_features,
 827    .set_features = fv_set_features,
 828
 829    /* Don't need process message, we've not got any at vhost-user level */
 830    .queue_set_started = fv_queue_set_started,
 831
 832    .queue_is_processed_in_order = fv_queue_order,
 833};
 834
 835/*
 836 * Main loop; this mostly deals with events on the vhost-user
 837 * socket itself, and not actual fuse data.
 838 */
 839int virtio_loop(struct fuse_session *se)
 840{
 841    fuse_log(FUSE_LOG_INFO, "%s: Entry\n", __func__);
 842
 843    while (!fuse_session_exited(se)) {
 844        struct pollfd pf[1];
 845        bool ok;
 846        pf[0].fd = se->vu_socketfd;
 847        pf[0].events = POLLIN;
 848        pf[0].revents = 0;
 849
 850        fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for VU event\n", __func__);
 851        int poll_res = ppoll(pf, 1, NULL, NULL);
 852
 853        if (poll_res == -1) {
 854            if (errno == EINTR) {
 855                fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
 856                         __func__);
 857                continue;
 858            }
 859            fuse_log(FUSE_LOG_ERR, "virtio_loop ppoll: %m\n");
 860            break;
 861        }
 862        assert(poll_res == 1);
 863        if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 864            fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x\n", __func__,
 865                     pf[0].revents);
 866            break;
 867        }
 868        assert(pf[0].revents & POLLIN);
 869        fuse_log(FUSE_LOG_DEBUG, "%s: Got VU event\n", __func__);
 870        /* Mutual exclusion with fv_queue_thread() */
 871        vu_dispatch_wrlock(se->virtio_dev);
 872
 873        ok = vu_dispatch(&se->virtio_dev->dev);
 874
 875        vu_dispatch_unlock(se->virtio_dev);
 876
 877        if (!ok) {
 878            fuse_log(FUSE_LOG_ERR, "%s: vu_dispatch failed\n", __func__);
 879            break;
 880        }
 881    }
 882
 883    /*
 884     * Make sure all fv_queue_thread()s quit on exit, as we're about to
 885     * free virtio dev and fuse session, no one should access them anymore.
 886     */
 887    for (int i = 0; i < se->virtio_dev->nqueues; i++) {
 888        if (!se->virtio_dev->qi[i]) {
 889            continue;
 890        }
 891
 892        fuse_log(FUSE_LOG_INFO, "%s: Stopping queue %d thread\n", __func__, i);
 893        fv_queue_cleanup_thread(se->virtio_dev, i);
 894    }
 895
 896    fuse_log(FUSE_LOG_INFO, "%s: Exit\n", __func__);
 897
 898    return 0;
 899}
 900
 901static void strreplace(char *s, char old, char new)
 902{
 903    for (; *s; ++s) {
 904        if (*s == old) {
 905            *s = new;
 906        }
 907    }
 908}
 909
 910static bool fv_socket_lock(struct fuse_session *se)
 911{
 912    g_autofree gchar *sk_name = NULL;
 913    g_autofree gchar *pidfile = NULL;
 914    g_autofree gchar *dir = NULL;
 915    Error *local_err = NULL;
 916
 917    dir = qemu_get_local_state_pathname("run/virtiofsd");
 918
 919    if (g_mkdir_with_parents(dir, S_IRWXU) < 0) {
 920        fuse_log(FUSE_LOG_ERR, "%s: Failed to create directory %s: %s\n",
 921                 __func__, dir, strerror(errno));
 922        return false;
 923    }
 924
 925    sk_name = g_strdup(se->vu_socket_path);
 926    strreplace(sk_name, '/', '.');
 927    pidfile = g_strdup_printf("%s/%s.pid", dir, sk_name);
 928
 929    if (!qemu_write_pidfile(pidfile, &local_err)) {
 930        error_report_err(local_err);
 931        return false;
 932    }
 933
 934    return true;
 935}
 936
 937static int fv_create_listen_socket(struct fuse_session *se)
 938{
 939    struct sockaddr_un un;
 940    mode_t old_umask;
 941
 942    /* Nothing to do if fd is already initialized */
 943    if (se->vu_listen_fd >= 0) {
 944        return 0;
 945    }
 946
 947    if (strlen(se->vu_socket_path) >= sizeof(un.sun_path)) {
 948        fuse_log(FUSE_LOG_ERR, "Socket path too long\n");
 949        return -1;
 950    }
 951
 952    if (!strlen(se->vu_socket_path)) {
 953        fuse_log(FUSE_LOG_ERR, "Socket path is empty\n");
 954        return -1;
 955    }
 956
 957    /* Check the vu_socket_path is already used */
 958    if (!fv_socket_lock(se)) {
 959        return -1;
 960    }
 961
 962    /*
 963     * Create the Unix socket to communicate with qemu
 964     * based on QEMU's vhost-user-bridge
 965     */
 966    unlink(se->vu_socket_path);
 967    strcpy(un.sun_path, se->vu_socket_path);
 968    size_t addr_len = sizeof(un);
 969
 970    int listen_sock = socket(AF_UNIX, SOCK_STREAM, 0);
 971    if (listen_sock == -1) {
 972        fuse_log(FUSE_LOG_ERR, "vhost socket creation: %m\n");
 973        return -1;
 974    }
 975    un.sun_family = AF_UNIX;
 976
 977    /*
 978     * Unfortunately bind doesn't let you set the mask on the socket,
 979     * so set umask appropriately and restore it later.
 980     */
 981    if (se->vu_socket_group) {
 982        old_umask = umask(S_IROTH | S_IWOTH | S_IXOTH);
 983    } else {
 984        old_umask = umask(S_IRGRP | S_IWGRP | S_IXGRP |
 985                          S_IROTH | S_IWOTH | S_IXOTH);
 986    }
 987    if (bind(listen_sock, (struct sockaddr *)&un, addr_len) == -1) {
 988        fuse_log(FUSE_LOG_ERR, "vhost socket bind: %m\n");
 989        close(listen_sock);
 990        umask(old_umask);
 991        return -1;
 992    }
 993    if (se->vu_socket_group) {
 994        struct group *g = getgrnam(se->vu_socket_group);
 995        if (g) {
 996            if (chown(se->vu_socket_path, -1, g->gr_gid) == -1) {
 997                fuse_log(FUSE_LOG_WARNING,
 998                         "vhost socket failed to set group to %s (%d): %m\n",
 999                         se->vu_socket_group, g->gr_gid);
1000            }
1001        }
1002    }
1003    umask(old_umask);
1004
1005    if (listen(listen_sock, 1) == -1) {
1006        fuse_log(FUSE_LOG_ERR, "vhost socket listen: %m\n");
1007        close(listen_sock);
1008        return -1;
1009    }
1010
1011    se->vu_listen_fd = listen_sock;
1012    return 0;
1013}
1014
1015int virtio_session_mount(struct fuse_session *se)
1016{
1017    int ret;
1018
1019    /*
1020     * Test that unshare(CLONE_FS) works. fv_queue_worker() will need it. It's
1021     * an unprivileged system call but some Docker/Moby versions are known to
1022     * reject it via seccomp when CAP_SYS_ADMIN is not given.
1023     *
1024     * Note that the program is single-threaded here so this syscall has no
1025     * visible effect and is safe to make.
1026     */
1027    ret = unshare(CLONE_FS);
1028    if (ret == -1 && errno == EPERM) {
1029        fuse_log(FUSE_LOG_ERR, "unshare(CLONE_FS) failed with EPERM. If "
1030                "running in a container please check that the container "
1031                "runtime seccomp policy allows unshare.\n");
1032        return -1;
1033    }
1034
1035    ret = fv_create_listen_socket(se);
1036    if (ret < 0) {
1037        return ret;
1038    }
1039
1040    se->fd = -1;
1041
1042    fuse_log(FUSE_LOG_INFO, "%s: Waiting for vhost-user socket connection...\n",
1043             __func__);
1044    int data_sock = accept(se->vu_listen_fd, NULL, NULL);
1045    if (data_sock == -1) {
1046        fuse_log(FUSE_LOG_ERR, "vhost socket accept: %m\n");
1047        close(se->vu_listen_fd);
1048        return -1;
1049    }
1050    close(se->vu_listen_fd);
1051    se->vu_listen_fd = -1;
1052    fuse_log(FUSE_LOG_INFO, "%s: Received vhost-user socket connection\n",
1053             __func__);
1054
1055    /* TODO: Some cleanup/deallocation! */
1056    se->virtio_dev = g_new0(struct fv_VuDev, 1);
1057
1058    se->vu_socketfd = data_sock;
1059    se->virtio_dev->se = se;
1060    pthread_rwlock_init(&se->virtio_dev->vu_dispatch_rwlock, NULL);
1061    if (!vu_init(&se->virtio_dev->dev, 2, se->vu_socketfd, fv_panic, NULL,
1062                 fv_set_watch, fv_remove_watch, &fv_iface)) {
1063        fuse_log(FUSE_LOG_ERR, "%s: vu_init failed\n", __func__);
1064        return -1;
1065    }
1066
1067    return 0;
1068}
1069
1070void virtio_session_close(struct fuse_session *se)
1071{
1072    close(se->vu_socketfd);
1073
1074    if (!se->virtio_dev) {
1075        return;
1076    }
1077
1078    g_free(se->virtio_dev->qi);
1079    pthread_rwlock_destroy(&se->virtio_dev->vu_dispatch_rwlock);
1080    g_free(se->virtio_dev);
1081    se->virtio_dev = NULL;
1082}
1083