qemu/tools/virtiofsd/fuse_virtio.c
<<
>>
Prefs
   1/*
   2 * virtio-fs glue for FUSE
   3 * Copyright (C) 2018 Red Hat, Inc. and/or its affiliates
   4 *
   5 * Authors:
   6 *   Dave Gilbert  <dgilbert@redhat.com>
   7 *
   8 * Implements the glue between libfuse and libvhost-user
   9 *
  10 * This program can be distributed under the terms of the GNU LGPLv2.
  11 * See the file COPYING.LIB
  12 */
  13
  14#include "qemu/osdep.h"
  15#include "qemu/iov.h"
  16#include "qapi/error.h"
  17#include "fuse_i.h"
  18#include "standard-headers/linux/fuse.h"
  19#include "fuse_misc.h"
  20#include "fuse_opt.h"
  21#include "fuse_virtio.h"
  22
  23#include <sys/eventfd.h>
  24#include <sys/socket.h>
  25#include <sys/un.h>
  26#include <grp.h>
  27
  28#include "libvhost-user.h"
  29
  30struct fv_VuDev;
  31struct fv_QueueInfo {
  32    pthread_t thread;
  33    /*
  34     * This lock protects the VuVirtq preventing races between
  35     * fv_queue_thread() and fv_queue_worker().
  36     */
  37    pthread_mutex_t vq_lock;
  38
  39    struct fv_VuDev *virtio_dev;
  40
  41    /* Our queue index, corresponds to array position */
  42    int qidx;
  43    int kick_fd;
  44    int kill_fd; /* For killing the thread */
  45};
  46
  47/* A FUSE request */
  48typedef struct {
  49    VuVirtqElement elem;
  50    struct fuse_chan ch;
  51
  52    /* Used to complete requests that involve no reply */
  53    bool reply_sent;
  54} FVRequest;
  55
  56/*
  57 * We pass the dev element into libvhost-user
  58 * and then use it to get back to the outer
  59 * container for other data.
  60 */
  61struct fv_VuDev {
  62    VuDev dev;
  63    struct fuse_session *se;
  64
  65    /*
  66     * Either handle virtqueues or vhost-user protocol messages.  Don't do
  67     * both at the same time since that could lead to race conditions if
  68     * virtqueues or memory tables change while another thread is accessing
  69     * them.
  70     *
  71     * The assumptions are:
  72     * 1. fv_queue_thread() reads/writes to virtqueues and only reads VuDev.
  73     * 2. virtio_loop() reads/writes virtqueues and VuDev.
  74     */
  75    pthread_rwlock_t vu_dispatch_rwlock;
  76
  77    /*
  78     * The following pair of fields are only accessed in the main
  79     * virtio_loop
  80     */
  81    size_t nqueues;
  82    struct fv_QueueInfo **qi;
  83};
  84
  85/* Callback from libvhost-user */
  86static uint64_t fv_get_features(VuDev *dev)
  87{
  88    return 1ULL << VIRTIO_F_VERSION_1;
  89}
  90
  91/* Callback from libvhost-user */
  92static void fv_set_features(VuDev *dev, uint64_t features)
  93{
  94}
  95
  96/*
  97 * Callback from libvhost-user if there's a new fd we're supposed to listen
  98 * to, typically a queue kick?
  99 */
 100static void fv_set_watch(VuDev *dev, int fd, int condition, vu_watch_cb cb,
 101                         void *data)
 102{
 103    fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
 104}
 105
 106/*
 107 * Callback from libvhost-user if we're no longer supposed to listen on an fd
 108 */
 109static void fv_remove_watch(VuDev *dev, int fd)
 110{
 111    fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
 112}
 113
 114/* Callback from libvhost-user to panic */
 115static void fv_panic(VuDev *dev, const char *err)
 116{
 117    fuse_log(FUSE_LOG_ERR, "%s: libvhost-user: %s\n", __func__, err);
 118    /* TODO: Allow reconnects?? */
 119    exit(EXIT_FAILURE);
 120}
 121
 122/*
 123 * Copy from an iovec into a fuse_buf (memory only)
 124 * Caller must ensure there is space
 125 */
 126static size_t copy_from_iov(struct fuse_buf *buf, size_t out_num,
 127                            const struct iovec *out_sg,
 128                            size_t max)
 129{
 130    void *dest = buf->mem;
 131    size_t copied = 0;
 132
 133    while (out_num && max) {
 134        size_t onelen = out_sg->iov_len;
 135        onelen = MIN(onelen, max);
 136        memcpy(dest, out_sg->iov_base, onelen);
 137        dest += onelen;
 138        copied += onelen;
 139        out_sg++;
 140        out_num--;
 141        max -= onelen;
 142    }
 143
 144    return copied;
 145}
 146
 147/*
 148 * Skip 'skip' bytes in the iov; 'sg_1stindex' is set as
 149 * the index for the 1st iovec to read data from, and
 150 * 'sg_1stskip' is the number of bytes to skip in that entry.
 151 *
 152 * Returns True if there are at least 'skip' bytes in the iovec
 153 *
 154 */
 155static bool skip_iov(const struct iovec *sg, size_t sg_size,
 156                     size_t skip,
 157                     size_t *sg_1stindex, size_t *sg_1stskip)
 158{
 159    size_t vec;
 160
 161    for (vec = 0; vec < sg_size; vec++) {
 162        if (sg[vec].iov_len > skip) {
 163            *sg_1stskip = skip;
 164            *sg_1stindex = vec;
 165
 166            return true;
 167        }
 168
 169        skip -= sg[vec].iov_len;
 170    }
 171
 172    *sg_1stindex = vec;
 173    *sg_1stskip = 0;
 174    return skip == 0;
 175}
 176
 177/*
 178 * Copy from one iov to another, the given number of bytes
 179 * The caller must have checked sizes.
 180 */
 181static void copy_iov(struct iovec *src_iov, int src_count,
 182                     struct iovec *dst_iov, int dst_count, size_t to_copy)
 183{
 184    size_t dst_offset = 0;
 185    /* Outer loop copies 'src' elements */
 186    while (to_copy) {
 187        assert(src_count);
 188        size_t src_len = src_iov[0].iov_len;
 189        size_t src_offset = 0;
 190
 191        if (src_len > to_copy) {
 192            src_len = to_copy;
 193        }
 194        /* Inner loop copies contents of one 'src' to maybe multiple dst. */
 195        while (src_len) {
 196            assert(dst_count);
 197            size_t dst_len = dst_iov[0].iov_len - dst_offset;
 198            if (dst_len > src_len) {
 199                dst_len = src_len;
 200            }
 201
 202            memcpy(dst_iov[0].iov_base + dst_offset,
 203                   src_iov[0].iov_base + src_offset, dst_len);
 204            src_len -= dst_len;
 205            to_copy -= dst_len;
 206            src_offset += dst_len;
 207            dst_offset += dst_len;
 208
 209            assert(dst_offset <= dst_iov[0].iov_len);
 210            if (dst_offset == dst_iov[0].iov_len) {
 211                dst_offset = 0;
 212                dst_iov++;
 213                dst_count--;
 214            }
 215        }
 216        src_iov++;
 217        src_count--;
 218    }
 219}
 220
 221/*
 222 * pthread_rwlock_rdlock() and pthread_rwlock_wrlock can fail if
 223 * a deadlock condition is detected or the current thread already
 224 * owns the lock. They can also fail, like pthread_rwlock_unlock(),
 225 * if the mutex wasn't properly initialized. None of these are ever
 226 * expected to happen.
 227 */
 228static void vu_dispatch_rdlock(struct fv_VuDev *vud)
 229{
 230    int ret = pthread_rwlock_rdlock(&vud->vu_dispatch_rwlock);
 231    assert(ret == 0);
 232}
 233
 234static void vu_dispatch_wrlock(struct fv_VuDev *vud)
 235{
 236    int ret = pthread_rwlock_wrlock(&vud->vu_dispatch_rwlock);
 237    assert(ret == 0);
 238}
 239
 240static void vu_dispatch_unlock(struct fv_VuDev *vud)
 241{
 242    int ret = pthread_rwlock_unlock(&vud->vu_dispatch_rwlock);
 243    assert(ret == 0);
 244}
 245
 246static void vq_send_element(struct fv_QueueInfo *qi, VuVirtqElement *elem,
 247                            ssize_t len)
 248{
 249    struct fuse_session *se = qi->virtio_dev->se;
 250    VuDev *dev = &se->virtio_dev->dev;
 251    VuVirtq *q = vu_get_queue(dev, qi->qidx);
 252
 253    vu_dispatch_rdlock(qi->virtio_dev);
 254    pthread_mutex_lock(&qi->vq_lock);
 255    vu_queue_push(dev, q, elem, len);
 256    vu_queue_notify(dev, q);
 257    pthread_mutex_unlock(&qi->vq_lock);
 258    vu_dispatch_unlock(qi->virtio_dev);
 259}
 260
 261/*
 262 * Called back by ll whenever it wants to send a reply/message back
 263 * The 1st element of the iov starts with the fuse_out_header
 264 * 'unique'==0 means it's a notify message.
 265 */
 266int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
 267                    struct iovec *iov, int count)
 268{
 269    FVRequest *req = container_of(ch, FVRequest, ch);
 270    struct fv_QueueInfo *qi = ch->qi;
 271    VuVirtqElement *elem = &req->elem;
 272    int ret = 0;
 273
 274    assert(count >= 1);
 275    assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
 276
 277    struct fuse_out_header *out = iov[0].iov_base;
 278    /* TODO: Endianness! */
 279
 280    size_t tosend_len = iov_size(iov, count);
 281
 282    /* unique == 0 is notification, which we don't support */
 283    assert(out->unique);
 284    assert(!req->reply_sent);
 285
 286    /* The 'in' part of the elem is to qemu */
 287    unsigned int in_num = elem->in_num;
 288    struct iovec *in_sg = elem->in_sg;
 289    size_t in_len = iov_size(in_sg, in_num);
 290    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
 291             __func__, elem->index, in_num, in_len);
 292
 293    /*
 294     * The elem should have room for a 'fuse_out_header' (out from fuse)
 295     * plus the data based on the len in the header.
 296     */
 297    if (in_len < sizeof(struct fuse_out_header)) {
 298        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
 299                 __func__, elem->index);
 300        ret = -E2BIG;
 301        goto err;
 302    }
 303    if (in_len < tosend_len) {
 304        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
 305                 __func__, elem->index, tosend_len);
 306        ret = -E2BIG;
 307        goto err;
 308    }
 309
 310    copy_iov(iov, count, in_sg, in_num, tosend_len);
 311
 312    vq_send_element(qi, elem, tosend_len);
 313    req->reply_sent = true;
 314
 315err:
 316    return ret;
 317}
 318
 319/*
 320 * Callback from fuse_send_data_iov_* when it's virtio and the buffer
 321 * is a single FD with FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK
 322 * We need send the iov and then the buffer.
 323 * Return 0 on success
 324 */
 325int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
 326                         struct iovec *iov, int count, struct fuse_bufvec *buf,
 327                         size_t len)
 328{
 329    FVRequest *req = container_of(ch, FVRequest, ch);
 330    struct fv_QueueInfo *qi = ch->qi;
 331    VuVirtqElement *elem = &req->elem;
 332    int ret = 0;
 333    g_autofree struct iovec *in_sg_cpy = NULL;
 334
 335    assert(count >= 1);
 336    assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
 337
 338    struct fuse_out_header *out = iov[0].iov_base;
 339    /* TODO: Endianness! */
 340
 341    size_t iov_len = iov_size(iov, count);
 342    size_t tosend_len = iov_len + len;
 343
 344    out->len = tosend_len;
 345
 346    fuse_log(FUSE_LOG_DEBUG, "%s: count=%d len=%zd iov_len=%zd\n", __func__,
 347             count, len, iov_len);
 348
 349    /* unique == 0 is notification which we don't support */
 350    assert(out->unique);
 351
 352    assert(!req->reply_sent);
 353
 354    /* The 'in' part of the elem is to qemu */
 355    unsigned int in_num = elem->in_num;
 356    struct iovec *in_sg = elem->in_sg;
 357    size_t in_len = iov_size(in_sg, in_num);
 358    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
 359             __func__, elem->index, in_num, in_len);
 360
 361    /*
 362     * The elem should have room for a 'fuse_out_header' (out from fuse)
 363     * plus the data based on the len in the header.
 364     */
 365    if (in_len < sizeof(struct fuse_out_header)) {
 366        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
 367                 __func__, elem->index);
 368        return E2BIG;
 369    }
 370    if (in_len < tosend_len) {
 371        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
 372                 __func__, elem->index, tosend_len);
 373        return E2BIG;
 374    }
 375
 376    /* TODO: Limit to 'len' */
 377
 378    /* First copy the header data from iov->in_sg */
 379    copy_iov(iov, count, in_sg, in_num, iov_len);
 380
 381    /*
 382     * Build a copy of the the in_sg iov so we can skip bits in it,
 383     * including changing the offsets
 384     */
 385    in_sg_cpy = g_new(struct iovec, in_num);
 386    memcpy(in_sg_cpy, in_sg, sizeof(struct iovec) * in_num);
 387    /* These get updated as we skip */
 388    struct iovec *in_sg_ptr = in_sg_cpy;
 389    unsigned int in_sg_cpy_count = in_num;
 390
 391    /* skip over parts of in_sg that contained the header iov */
 392    iov_discard_front(&in_sg_ptr, &in_sg_cpy_count, iov_len);
 393
 394    do {
 395        fuse_log(FUSE_LOG_DEBUG, "%s: in_sg_cpy_count=%d len remaining=%zd\n",
 396                 __func__, in_sg_cpy_count, len);
 397
 398        ret = preadv(buf->buf[0].fd, in_sg_ptr, in_sg_cpy_count,
 399                     buf->buf[0].pos);
 400
 401        if (ret == -1) {
 402            ret = errno;
 403            if (ret == EINTR) {
 404                continue;
 405            }
 406            fuse_log(FUSE_LOG_DEBUG, "%s: preadv failed (%m) len=%zd\n",
 407                     __func__, len);
 408            return ret;
 409        }
 410
 411        if (!ret) {
 412            /* EOF case? */
 413            fuse_log(FUSE_LOG_DEBUG, "%s: !ret len remaining=%zd\n", __func__,
 414                     len);
 415            break;
 416        }
 417        fuse_log(FUSE_LOG_DEBUG, "%s: preadv ret=%d len=%zd\n", __func__,
 418                 ret, len);
 419
 420        len -= ret;
 421        /* Short read. Retry reading remaining bytes */
 422        if (len) {
 423            fuse_log(FUSE_LOG_DEBUG, "%s: ret < len\n", __func__);
 424            /* Skip over this much next time around */
 425            iov_discard_front(&in_sg_ptr, &in_sg_cpy_count, ret);
 426            buf->buf[0].pos += ret;
 427        }
 428    } while (len);
 429
 430    /* Need to fix out->len on EOF */
 431    if (len) {
 432        struct fuse_out_header *out_sg = in_sg[0].iov_base;
 433
 434        tosend_len -= len;
 435        out_sg->len = tosend_len;
 436    }
 437
 438    vq_send_element(qi, elem, tosend_len);
 439    req->reply_sent = true;
 440    return 0;
 441}
 442
 443static __thread bool clone_fs_called;
 444
 445/* Process one FVRequest in a thread pool */
 446static void fv_queue_worker(gpointer data, gpointer user_data)
 447{
 448    struct fv_QueueInfo *qi = user_data;
 449    struct fuse_session *se = qi->virtio_dev->se;
 450    FVRequest *req = data;
 451    VuVirtqElement *elem = &req->elem;
 452    struct fuse_buf fbuf = {};
 453    bool allocated_bufv = false;
 454    struct fuse_bufvec bufv;
 455    struct fuse_bufvec *pbufv;
 456    struct fuse_in_header inh;
 457
 458    assert(se->bufsize > sizeof(struct fuse_in_header));
 459
 460    if (!clone_fs_called) {
 461        int ret;
 462
 463        /* unshare FS for xattr operation */
 464        ret = unshare(CLONE_FS);
 465        /* should not fail */
 466        assert(ret == 0);
 467
 468        clone_fs_called = true;
 469    }
 470
 471    /*
 472     * An element contains one request and the space to send our response
 473     * They're spread over multiple descriptors in a scatter/gather set
 474     * and we can't trust the guest to keep them still; so copy in/out.
 475     */
 476    fbuf.mem = g_malloc(se->bufsize);
 477
 478    fuse_mutex_init(&req->ch.lock);
 479    req->ch.fd = -1;
 480    req->ch.qi = qi;
 481
 482    /* The 'out' part of the elem is from qemu */
 483    unsigned int out_num = elem->out_num;
 484    struct iovec *out_sg = elem->out_sg;
 485    size_t out_len = iov_size(out_sg, out_num);
 486    fuse_log(FUSE_LOG_DEBUG,
 487             "%s: elem %d: with %d out desc of length %zd\n",
 488             __func__, elem->index, out_num, out_len);
 489
 490    /*
 491     * The elem should contain a 'fuse_in_header' (in to fuse)
 492     * plus the data based on the len in the header.
 493     */
 494    if (out_len < sizeof(struct fuse_in_header)) {
 495        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
 496                 __func__, elem->index);
 497        assert(0); /* TODO */
 498    }
 499    if (out_len > se->bufsize) {
 500        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
 501                 elem->index);
 502        assert(0); /* TODO */
 503    }
 504    /* Copy just the fuse_in_header and look at it */
 505    copy_from_iov(&fbuf, out_num, out_sg,
 506                  sizeof(struct fuse_in_header));
 507    memcpy(&inh, fbuf.mem, sizeof(struct fuse_in_header));
 508
 509    pbufv = NULL; /* Compiler thinks an unitialised path */
 510    if (inh.opcode == FUSE_WRITE &&
 511        out_len >= (sizeof(struct fuse_in_header) +
 512                    sizeof(struct fuse_write_in))) {
 513        /*
 514         * For a write we don't actually need to copy the
 515         * data, we can just do it straight out of guest memory
 516         * but we must still copy the headers in case the guest
 517         * was nasty and changed them while we were using them.
 518         */
 519        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
 520
 521        fbuf.size = copy_from_iov(&fbuf, out_num, out_sg,
 522                                  sizeof(struct fuse_in_header) +
 523                                  sizeof(struct fuse_write_in));
 524        /* That copy reread the in_header, make sure we use the original */
 525        memcpy(fbuf.mem, &inh, sizeof(struct fuse_in_header));
 526
 527        /* Allocate the bufv, with space for the rest of the iov */
 528        pbufv = g_try_malloc(sizeof(struct fuse_bufvec) +
 529                             sizeof(struct fuse_buf) * out_num);
 530        if (!pbufv) {
 531            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
 532                    __func__);
 533            goto out;
 534        }
 535
 536        allocated_bufv = true;
 537        pbufv->count = 1;
 538        pbufv->buf[0] = fbuf;
 539
 540        size_t iovindex, pbufvindex, iov_bytes_skip;
 541        pbufvindex = 1; /* 2 headers, 1 fusebuf */
 542
 543        if (!skip_iov(out_sg, out_num,
 544                      sizeof(struct fuse_in_header) +
 545                      sizeof(struct fuse_write_in),
 546                      &iovindex, &iov_bytes_skip)) {
 547            fuse_log(FUSE_LOG_ERR, "%s: skip failed\n",
 548                    __func__);
 549            goto out;
 550        }
 551
 552        for (; iovindex < out_num; iovindex++, pbufvindex++) {
 553            pbufv->count++;
 554            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
 555            pbufv->buf[pbufvindex].flags = 0;
 556            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
 557            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
 558
 559            if (iov_bytes_skip) {
 560                pbufv->buf[pbufvindex].mem += iov_bytes_skip;
 561                pbufv->buf[pbufvindex].size -= iov_bytes_skip;
 562                iov_bytes_skip = 0;
 563            }
 564        }
 565    } else {
 566        /* Normal (non fast write) path */
 567
 568        copy_from_iov(&fbuf, out_num, out_sg, se->bufsize);
 569        /* That copy reread the in_header, make sure we use the original */
 570        memcpy(fbuf.mem, &inh, sizeof(struct fuse_in_header));
 571        fbuf.size = out_len;
 572
 573        /* TODO! Endianness of header */
 574
 575        /* TODO: Add checks for fuse_session_exited */
 576        bufv.buf[0] = fbuf;
 577        bufv.count = 1;
 578        pbufv = &bufv;
 579    }
 580    pbufv->idx = 0;
 581    pbufv->off = 0;
 582    fuse_session_process_buf_int(se, pbufv, &req->ch);
 583
 584out:
 585    if (allocated_bufv) {
 586        g_free(pbufv);
 587    }
 588
 589    /* If the request has no reply, still recycle the virtqueue element */
 590    if (!req->reply_sent) {
 591        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
 592                 elem->index);
 593        vq_send_element(qi, elem, 0);
 594    }
 595
 596    pthread_mutex_destroy(&req->ch.lock);
 597    g_free(fbuf.mem);
 598    free(req);
 599}
 600
 601/* Thread function for individual queues, created when a queue is 'started' */
 602static void *fv_queue_thread(void *opaque)
 603{
 604    struct fv_QueueInfo *qi = opaque;
 605    struct VuDev *dev = &qi->virtio_dev->dev;
 606    struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
 607    struct fuse_session *se = qi->virtio_dev->se;
 608    GThreadPool *pool = NULL;
 609    GList *req_list = NULL;
 610
 611    if (se->thread_pool_size) {
 612        fuse_log(FUSE_LOG_DEBUG, "%s: Creating thread pool for Queue %d\n",
 613                 __func__, qi->qidx);
 614        pool = g_thread_pool_new(fv_queue_worker, qi, se->thread_pool_size,
 615                                 FALSE, NULL);
 616        if (!pool) {
 617            fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
 618            return NULL;
 619        }
 620    }
 621
 622    fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
 623             qi->qidx, qi->kick_fd);
 624    while (1) {
 625        struct pollfd pf[2];
 626
 627        pf[0].fd = qi->kick_fd;
 628        pf[0].events = POLLIN;
 629        pf[0].revents = 0;
 630        pf[1].fd = qi->kill_fd;
 631        pf[1].events = POLLIN;
 632        pf[1].revents = 0;
 633
 634        fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for Queue %d event\n", __func__,
 635                 qi->qidx);
 636        int poll_res = ppoll(pf, 2, NULL, NULL);
 637
 638        if (poll_res == -1) {
 639            if (errno == EINTR) {
 640                fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
 641                         __func__);
 642                continue;
 643            }
 644            fuse_log(FUSE_LOG_ERR, "fv_queue_thread ppoll: %m\n");
 645            break;
 646        }
 647        assert(poll_res >= 1);
 648        if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 649            fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x Queue %d\n",
 650                     __func__, pf[0].revents, qi->qidx);
 651            break;
 652        }
 653        if (pf[1].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 654            fuse_log(FUSE_LOG_ERR,
 655                     "%s: Unexpected poll revents %x Queue %d killfd\n",
 656                     __func__, pf[1].revents, qi->qidx);
 657            break;
 658        }
 659        if (pf[1].revents) {
 660            fuse_log(FUSE_LOG_INFO, "%s: kill event on queue %d - quitting\n",
 661                     __func__, qi->qidx);
 662            break;
 663        }
 664        assert(pf[0].revents & POLLIN);
 665        fuse_log(FUSE_LOG_DEBUG, "%s: Got queue event on Queue %d\n", __func__,
 666                 qi->qidx);
 667
 668        eventfd_t evalue;
 669        if (eventfd_read(qi->kick_fd, &evalue)) {
 670            fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
 671            break;
 672        }
 673        /* Mutual exclusion with virtio_loop() */
 674        vu_dispatch_rdlock(qi->virtio_dev);
 675        pthread_mutex_lock(&qi->vq_lock);
 676        /* out is from guest, in is too guest */
 677        unsigned int in_bytes, out_bytes;
 678        vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
 679
 680        fuse_log(FUSE_LOG_DEBUG,
 681                 "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
 682                 __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
 683
 684        while (1) {
 685            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
 686            if (!req) {
 687                break;
 688            }
 689
 690            req->reply_sent = false;
 691
 692            if (!se->thread_pool_size) {
 693                req_list = g_list_prepend(req_list, req);
 694            } else {
 695                g_thread_pool_push(pool, req, NULL);
 696            }
 697        }
 698
 699        pthread_mutex_unlock(&qi->vq_lock);
 700        vu_dispatch_unlock(qi->virtio_dev);
 701
 702        /* Process all the requests. */
 703        if (!se->thread_pool_size && req_list != NULL) {
 704            req_list = g_list_reverse(req_list);
 705            g_list_foreach(req_list, fv_queue_worker, qi);
 706            g_list_free(req_list);
 707            req_list = NULL;
 708        }
 709    }
 710
 711    if (pool) {
 712        g_thread_pool_free(pool, FALSE, TRUE);
 713    }
 714
 715    return NULL;
 716}
 717
 718static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
 719{
 720    int ret;
 721    struct fv_QueueInfo *ourqi;
 722
 723    assert(qidx < vud->nqueues);
 724    ourqi = vud->qi[qidx];
 725
 726    /* Kill the thread */
 727    if (eventfd_write(ourqi->kill_fd, 1)) {
 728        fuse_log(FUSE_LOG_ERR, "Eventfd_write for queue %d: %s\n",
 729                 qidx, strerror(errno));
 730    }
 731    ret = pthread_join(ourqi->thread, NULL);
 732    if (ret) {
 733        fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
 734                 __func__, qidx, ret);
 735    }
 736    pthread_mutex_destroy(&ourqi->vq_lock);
 737    close(ourqi->kill_fd);
 738    ourqi->kick_fd = -1;
 739    g_free(vud->qi[qidx]);
 740    vud->qi[qidx] = NULL;
 741}
 742
 743static void stop_all_queues(struct fv_VuDev *vud)
 744{
 745    for (int i = 0; i < vud->nqueues; i++) {
 746        if (!vud->qi[i]) {
 747            continue;
 748        }
 749
 750        fuse_log(FUSE_LOG_INFO, "%s: Stopping queue %d thread\n", __func__, i);
 751        fv_queue_cleanup_thread(vud, i);
 752    }
 753}
 754
 755/* Callback from libvhost-user on start or stop of a queue */
 756static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
 757{
 758    struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
 759    struct fv_QueueInfo *ourqi;
 760
 761    fuse_log(FUSE_LOG_INFO, "%s: qidx=%d started=%d\n", __func__, qidx,
 762             started);
 763    assert(qidx >= 0);
 764
 765    /*
 766     * Ignore additional request queues for now.  passthrough_ll.c must be
 767     * audited for thread-safety issues first.  It was written with a
 768     * well-behaved client in mind and may not protect against all types of
 769     * races yet.
 770     */
 771    if (qidx > 1) {
 772        fuse_log(FUSE_LOG_ERR,
 773                 "%s: multiple request queues not yet implemented, please only "
 774                 "configure 1 request queue\n",
 775                 __func__);
 776        exit(EXIT_FAILURE);
 777    }
 778
 779    if (started) {
 780        /* Fire up a thread to watch this queue */
 781        if (qidx >= vud->nqueues) {
 782            vud->qi = g_realloc_n(vud->qi, qidx + 1, sizeof(vud->qi[0]));
 783            memset(vud->qi + vud->nqueues, 0,
 784                   sizeof(vud->qi[0]) * (1 + (qidx - vud->nqueues)));
 785            vud->nqueues = qidx + 1;
 786        }
 787        if (!vud->qi[qidx]) {
 788            vud->qi[qidx] = g_new0(struct fv_QueueInfo, 1);
 789            vud->qi[qidx]->virtio_dev = vud;
 790            vud->qi[qidx]->qidx = qidx;
 791        } else {
 792            /* Shouldn't have been started */
 793            assert(vud->qi[qidx]->kick_fd == -1);
 794        }
 795        ourqi = vud->qi[qidx];
 796        ourqi->kick_fd = dev->vq[qidx].kick_fd;
 797
 798        ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
 799        assert(ourqi->kill_fd != -1);
 800        pthread_mutex_init(&ourqi->vq_lock, NULL);
 801
 802        if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
 803            fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
 804                     __func__, qidx);
 805            assert(0);
 806        }
 807    } else {
 808        /*
 809         * Temporarily drop write-lock taken in virtio_loop() so that
 810         * the queue thread doesn't block in virtio_send_msg().
 811         */
 812        vu_dispatch_unlock(vud);
 813        fv_queue_cleanup_thread(vud, qidx);
 814        vu_dispatch_wrlock(vud);
 815    }
 816}
 817
 818static bool fv_queue_order(VuDev *dev, int qidx)
 819{
 820    return false;
 821}
 822
 823static const VuDevIface fv_iface = {
 824    .get_features = fv_get_features,
 825    .set_features = fv_set_features,
 826
 827    /* Don't need process message, we've not got any at vhost-user level */
 828    .queue_set_started = fv_queue_set_started,
 829
 830    .queue_is_processed_in_order = fv_queue_order,
 831};
 832
 833/*
 834 * Main loop; this mostly deals with events on the vhost-user
 835 * socket itself, and not actual fuse data.
 836 */
 837int virtio_loop(struct fuse_session *se)
 838{
 839    fuse_log(FUSE_LOG_INFO, "%s: Entry\n", __func__);
 840
 841    while (!fuse_session_exited(se)) {
 842        struct pollfd pf[1];
 843        bool ok;
 844        pf[0].fd = se->vu_socketfd;
 845        pf[0].events = POLLIN;
 846        pf[0].revents = 0;
 847
 848        fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for VU event\n", __func__);
 849        int poll_res = ppoll(pf, 1, NULL, NULL);
 850
 851        if (poll_res == -1) {
 852            if (errno == EINTR) {
 853                fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
 854                         __func__);
 855                continue;
 856            }
 857            fuse_log(FUSE_LOG_ERR, "virtio_loop ppoll: %m\n");
 858            break;
 859        }
 860        assert(poll_res == 1);
 861        if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 862            fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x\n", __func__,
 863                     pf[0].revents);
 864            break;
 865        }
 866        assert(pf[0].revents & POLLIN);
 867        fuse_log(FUSE_LOG_DEBUG, "%s: Got VU event\n", __func__);
 868        /* Mutual exclusion with fv_queue_thread() */
 869        vu_dispatch_wrlock(se->virtio_dev);
 870
 871        ok = vu_dispatch(&se->virtio_dev->dev);
 872
 873        vu_dispatch_unlock(se->virtio_dev);
 874
 875        if (!ok) {
 876            fuse_log(FUSE_LOG_ERR, "%s: vu_dispatch failed\n", __func__);
 877            break;
 878        }
 879    }
 880
 881    /*
 882     * Make sure all fv_queue_thread()s quit on exit, as we're about to
 883     * free virtio dev and fuse session, no one should access them anymore.
 884     */
 885    stop_all_queues(se->virtio_dev);
 886    fuse_log(FUSE_LOG_INFO, "%s: Exit\n", __func__);
 887
 888    return 0;
 889}
 890
 891static void strreplace(char *s, char old, char new)
 892{
 893    for (; *s; ++s) {
 894        if (*s == old) {
 895            *s = new;
 896        }
 897    }
 898}
 899
 900static bool fv_socket_lock(struct fuse_session *se)
 901{
 902    g_autofree gchar *sk_name = NULL;
 903    g_autofree gchar *pidfile = NULL;
 904    g_autofree gchar *dir = NULL;
 905    Error *local_err = NULL;
 906
 907    dir = qemu_get_local_state_pathname("run/virtiofsd");
 908
 909    if (g_mkdir_with_parents(dir, S_IRWXU) < 0) {
 910        fuse_log(FUSE_LOG_ERR, "%s: Failed to create directory %s: %s\n",
 911                 __func__, dir, strerror(errno));
 912        return false;
 913    }
 914
 915    sk_name = g_strdup(se->vu_socket_path);
 916    strreplace(sk_name, '/', '.');
 917    pidfile = g_strdup_printf("%s/%s.pid", dir, sk_name);
 918
 919    if (!qemu_write_pidfile(pidfile, &local_err)) {
 920        error_report_err(local_err);
 921        return false;
 922    }
 923
 924    return true;
 925}
 926
 927static int fv_create_listen_socket(struct fuse_session *se)
 928{
 929    struct sockaddr_un un;
 930    mode_t old_umask;
 931
 932    /* Nothing to do if fd is already initialized */
 933    if (se->vu_listen_fd >= 0) {
 934        return 0;
 935    }
 936
 937    if (strlen(se->vu_socket_path) >= sizeof(un.sun_path)) {
 938        fuse_log(FUSE_LOG_ERR, "Socket path too long\n");
 939        return -1;
 940    }
 941
 942    if (!strlen(se->vu_socket_path)) {
 943        fuse_log(FUSE_LOG_ERR, "Socket path is empty\n");
 944        return -1;
 945    }
 946
 947    /* Check the vu_socket_path is already used */
 948    if (!fv_socket_lock(se)) {
 949        return -1;
 950    }
 951
 952    /*
 953     * Create the Unix socket to communicate with qemu
 954     * based on QEMU's vhost-user-bridge
 955     */
 956    unlink(se->vu_socket_path);
 957    strcpy(un.sun_path, se->vu_socket_path);
 958    size_t addr_len = sizeof(un);
 959
 960    int listen_sock = socket(AF_UNIX, SOCK_STREAM, 0);
 961    if (listen_sock == -1) {
 962        fuse_log(FUSE_LOG_ERR, "vhost socket creation: %m\n");
 963        return -1;
 964    }
 965    un.sun_family = AF_UNIX;
 966
 967    /*
 968     * Unfortunately bind doesn't let you set the mask on the socket,
 969     * so set umask appropriately and restore it later.
 970     */
 971    if (se->vu_socket_group) {
 972        old_umask = umask(S_IROTH | S_IWOTH | S_IXOTH);
 973    } else {
 974        old_umask = umask(S_IRGRP | S_IWGRP | S_IXGRP |
 975                          S_IROTH | S_IWOTH | S_IXOTH);
 976    }
 977    if (bind(listen_sock, (struct sockaddr *)&un, addr_len) == -1) {
 978        fuse_log(FUSE_LOG_ERR, "vhost socket bind: %m\n");
 979        close(listen_sock);
 980        umask(old_umask);
 981        return -1;
 982    }
 983    if (se->vu_socket_group) {
 984        struct group *g = getgrnam(se->vu_socket_group);
 985        if (g) {
 986            if (chown(se->vu_socket_path, -1, g->gr_gid) == -1) {
 987                fuse_log(FUSE_LOG_WARNING,
 988                         "vhost socket failed to set group to %s (%d): %m\n",
 989                         se->vu_socket_group, g->gr_gid);
 990            }
 991        } else {
 992            fuse_log(FUSE_LOG_ERR,
 993                     "vhost socket: unable to find group '%s'\n",
 994                     se->vu_socket_group);
 995            close(listen_sock);
 996            umask(old_umask);
 997            return -1;
 998        }
 999    }
1000    umask(old_umask);
1001
1002    if (listen(listen_sock, 1) == -1) {
1003        fuse_log(FUSE_LOG_ERR, "vhost socket listen: %m\n");
1004        close(listen_sock);
1005        return -1;
1006    }
1007
1008    se->vu_listen_fd = listen_sock;
1009    return 0;
1010}
1011
1012int virtio_session_mount(struct fuse_session *se)
1013{
1014    int ret;
1015
1016    /*
1017     * Test that unshare(CLONE_FS) works. fv_queue_worker() will need it. It's
1018     * an unprivileged system call but some Docker/Moby versions are known to
1019     * reject it via seccomp when CAP_SYS_ADMIN is not given.
1020     *
1021     * Note that the program is single-threaded here so this syscall has no
1022     * visible effect and is safe to make.
1023     */
1024    ret = unshare(CLONE_FS);
1025    if (ret == -1 && errno == EPERM) {
1026        fuse_log(FUSE_LOG_ERR, "unshare(CLONE_FS) failed with EPERM. If "
1027                "running in a container please check that the container "
1028                "runtime seccomp policy allows unshare.\n");
1029        return -1;
1030    }
1031
1032    ret = fv_create_listen_socket(se);
1033    if (ret < 0) {
1034        return ret;
1035    }
1036
1037    se->fd = -1;
1038
1039    fuse_log(FUSE_LOG_INFO, "%s: Waiting for vhost-user socket connection...\n",
1040             __func__);
1041    int data_sock = accept(se->vu_listen_fd, NULL, NULL);
1042    if (data_sock == -1) {
1043        fuse_log(FUSE_LOG_ERR, "vhost socket accept: %m\n");
1044        close(se->vu_listen_fd);
1045        return -1;
1046    }
1047    close(se->vu_listen_fd);
1048    se->vu_listen_fd = -1;
1049    fuse_log(FUSE_LOG_INFO, "%s: Received vhost-user socket connection\n",
1050             __func__);
1051
1052    /* TODO: Some cleanup/deallocation! */
1053    se->virtio_dev = g_new0(struct fv_VuDev, 1);
1054
1055    se->vu_socketfd = data_sock;
1056    se->virtio_dev->se = se;
1057    pthread_rwlock_init(&se->virtio_dev->vu_dispatch_rwlock, NULL);
1058    if (!vu_init(&se->virtio_dev->dev, 2, se->vu_socketfd, fv_panic, NULL,
1059                 fv_set_watch, fv_remove_watch, &fv_iface)) {
1060        fuse_log(FUSE_LOG_ERR, "%s: vu_init failed\n", __func__);
1061        return -1;
1062    }
1063
1064    return 0;
1065}
1066
1067void virtio_session_close(struct fuse_session *se)
1068{
1069    close(se->vu_socketfd);
1070
1071    if (!se->virtio_dev) {
1072        return;
1073    }
1074
1075    g_free(se->virtio_dev->qi);
1076    pthread_rwlock_destroy(&se->virtio_dev->vu_dispatch_rwlock);
1077    g_free(se->virtio_dev);
1078    se->virtio_dev = NULL;
1079}
1080