qemu/tools/virtiofsd/fuse_virtio.c
<<
>>
Prefs
   1/*
   2 * virtio-fs glue for FUSE
   3 * Copyright (C) 2018 Red Hat, Inc. and/or its affiliates
   4 *
   5 * Authors:
   6 *   Dave Gilbert  <dgilbert@redhat.com>
   7 *
   8 * Implements the glue between libfuse and libvhost-user
   9 *
  10 * This program can be distributed under the terms of the GNU LGPLv2.
  11 * See the file COPYING.LIB
  12 */
  13
  14#include "qemu/osdep.h"
  15#include "qemu/iov.h"
  16#include "qapi/error.h"
  17#include "fuse_i.h"
  18#include "standard-headers/linux/fuse.h"
  19#include "fuse_misc.h"
  20#include "fuse_opt.h"
  21#include "fuse_virtio.h"
  22
  23#include <sys/eventfd.h>
  24#include <sys/socket.h>
  25#include <sys/un.h>
  26#include <grp.h>
  27
  28#include "libvhost-user.h"
  29
  30struct fv_VuDev;
  31struct fv_QueueInfo {
  32    pthread_t thread;
  33    /*
  34     * This lock protects the VuVirtq preventing races between
  35     * fv_queue_thread() and fv_queue_worker().
  36     */
  37    pthread_mutex_t vq_lock;
  38
  39    struct fv_VuDev *virtio_dev;
  40
  41    /* Our queue index, corresponds to array position */
  42    int qidx;
  43    int kick_fd;
  44    int kill_fd; /* For killing the thread */
  45};
  46
  47/* A FUSE request */
  48typedef struct {
  49    VuVirtqElement elem;
  50    struct fuse_chan ch;
  51
  52    /* Used to complete requests that involve no reply */
  53    bool reply_sent;
  54} FVRequest;
  55
  56/*
  57 * We pass the dev element into libvhost-user
  58 * and then use it to get back to the outer
  59 * container for other data.
  60 */
  61struct fv_VuDev {
  62    VuDev dev;
  63    struct fuse_session *se;
  64
  65    /*
  66     * Either handle virtqueues or vhost-user protocol messages.  Don't do
  67     * both at the same time since that could lead to race conditions if
  68     * virtqueues or memory tables change while another thread is accessing
  69     * them.
  70     *
  71     * The assumptions are:
  72     * 1. fv_queue_thread() reads/writes to virtqueues and only reads VuDev.
  73     * 2. virtio_loop() reads/writes virtqueues and VuDev.
  74     */
  75    pthread_rwlock_t vu_dispatch_rwlock;
  76
  77    /*
  78     * The following pair of fields are only accessed in the main
  79     * virtio_loop
  80     */
  81    size_t nqueues;
  82    struct fv_QueueInfo **qi;
  83};
  84
  85/* From spec */
  86struct virtio_fs_config {
  87    char tag[36];
  88    uint32_t num_queues;
  89};
  90
  91/* Callback from libvhost-user */
  92static uint64_t fv_get_features(VuDev *dev)
  93{
  94    return 1ULL << VIRTIO_F_VERSION_1;
  95}
  96
  97/* Callback from libvhost-user */
  98static void fv_set_features(VuDev *dev, uint64_t features)
  99{
 100}
 101
 102/*
 103 * Callback from libvhost-user if there's a new fd we're supposed to listen
 104 * to, typically a queue kick?
 105 */
 106static void fv_set_watch(VuDev *dev, int fd, int condition, vu_watch_cb cb,
 107                         void *data)
 108{
 109    fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
 110}
 111
 112/*
 113 * Callback from libvhost-user if we're no longer supposed to listen on an fd
 114 */
 115static void fv_remove_watch(VuDev *dev, int fd)
 116{
 117    fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd);
 118}
 119
 120/* Callback from libvhost-user to panic */
 121static void fv_panic(VuDev *dev, const char *err)
 122{
 123    fuse_log(FUSE_LOG_ERR, "%s: libvhost-user: %s\n", __func__, err);
 124    /* TODO: Allow reconnects?? */
 125    exit(EXIT_FAILURE);
 126}
 127
 128/*
 129 * Copy from an iovec into a fuse_buf (memory only)
 130 * Caller must ensure there is space
 131 */
 132static void copy_from_iov(struct fuse_buf *buf, size_t out_num,
 133                          const struct iovec *out_sg)
 134{
 135    void *dest = buf->mem;
 136
 137    while (out_num) {
 138        size_t onelen = out_sg->iov_len;
 139        memcpy(dest, out_sg->iov_base, onelen);
 140        dest += onelen;
 141        out_sg++;
 142        out_num--;
 143    }
 144}
 145
 146/*
 147 * Copy from one iov to another, the given number of bytes
 148 * The caller must have checked sizes.
 149 */
 150static void copy_iov(struct iovec *src_iov, int src_count,
 151                     struct iovec *dst_iov, int dst_count, size_t to_copy)
 152{
 153    size_t dst_offset = 0;
 154    /* Outer loop copies 'src' elements */
 155    while (to_copy) {
 156        assert(src_count);
 157        size_t src_len = src_iov[0].iov_len;
 158        size_t src_offset = 0;
 159
 160        if (src_len > to_copy) {
 161            src_len = to_copy;
 162        }
 163        /* Inner loop copies contents of one 'src' to maybe multiple dst. */
 164        while (src_len) {
 165            assert(dst_count);
 166            size_t dst_len = dst_iov[0].iov_len - dst_offset;
 167            if (dst_len > src_len) {
 168                dst_len = src_len;
 169            }
 170
 171            memcpy(dst_iov[0].iov_base + dst_offset,
 172                   src_iov[0].iov_base + src_offset, dst_len);
 173            src_len -= dst_len;
 174            to_copy -= dst_len;
 175            src_offset += dst_len;
 176            dst_offset += dst_len;
 177
 178            assert(dst_offset <= dst_iov[0].iov_len);
 179            if (dst_offset == dst_iov[0].iov_len) {
 180                dst_offset = 0;
 181                dst_iov++;
 182                dst_count--;
 183            }
 184        }
 185        src_iov++;
 186        src_count--;
 187    }
 188}
 189
 190/*
 191 * pthread_rwlock_rdlock() and pthread_rwlock_wrlock can fail if
 192 * a deadlock condition is detected or the current thread already
 193 * owns the lock. They can also fail, like pthread_rwlock_unlock(),
 194 * if the mutex wasn't properly initialized. None of these are ever
 195 * expected to happen.
 196 */
 197static void vu_dispatch_rdlock(struct fv_VuDev *vud)
 198{
 199    int ret = pthread_rwlock_rdlock(&vud->vu_dispatch_rwlock);
 200    assert(ret == 0);
 201}
 202
 203static void vu_dispatch_wrlock(struct fv_VuDev *vud)
 204{
 205    int ret = pthread_rwlock_wrlock(&vud->vu_dispatch_rwlock);
 206    assert(ret == 0);
 207}
 208
 209static void vu_dispatch_unlock(struct fv_VuDev *vud)
 210{
 211    int ret = pthread_rwlock_unlock(&vud->vu_dispatch_rwlock);
 212    assert(ret == 0);
 213}
 214
 215/*
 216 * Called back by ll whenever it wants to send a reply/message back
 217 * The 1st element of the iov starts with the fuse_out_header
 218 * 'unique'==0 means it's a notify message.
 219 */
 220int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
 221                    struct iovec *iov, int count)
 222{
 223    FVRequest *req = container_of(ch, FVRequest, ch);
 224    struct fv_QueueInfo *qi = ch->qi;
 225    VuDev *dev = &se->virtio_dev->dev;
 226    VuVirtq *q = vu_get_queue(dev, qi->qidx);
 227    VuVirtqElement *elem = &req->elem;
 228    int ret = 0;
 229
 230    assert(count >= 1);
 231    assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
 232
 233    struct fuse_out_header *out = iov[0].iov_base;
 234    /* TODO: Endianness! */
 235
 236    size_t tosend_len = iov_size(iov, count);
 237
 238    /* unique == 0 is notification, which we don't support */
 239    assert(out->unique);
 240    assert(!req->reply_sent);
 241
 242    /* The 'in' part of the elem is to qemu */
 243    unsigned int in_num = elem->in_num;
 244    struct iovec *in_sg = elem->in_sg;
 245    size_t in_len = iov_size(in_sg, in_num);
 246    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
 247             __func__, elem->index, in_num, in_len);
 248
 249    /*
 250     * The elem should have room for a 'fuse_out_header' (out from fuse)
 251     * plus the data based on the len in the header.
 252     */
 253    if (in_len < sizeof(struct fuse_out_header)) {
 254        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
 255                 __func__, elem->index);
 256        ret = -E2BIG;
 257        goto err;
 258    }
 259    if (in_len < tosend_len) {
 260        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
 261                 __func__, elem->index, tosend_len);
 262        ret = -E2BIG;
 263        goto err;
 264    }
 265
 266    copy_iov(iov, count, in_sg, in_num, tosend_len);
 267
 268    vu_dispatch_rdlock(qi->virtio_dev);
 269    pthread_mutex_lock(&qi->vq_lock);
 270    vu_queue_push(dev, q, elem, tosend_len);
 271    vu_queue_notify(dev, q);
 272    pthread_mutex_unlock(&qi->vq_lock);
 273    vu_dispatch_unlock(qi->virtio_dev);
 274
 275    req->reply_sent = true;
 276
 277err:
 278    return ret;
 279}
 280
 281/*
 282 * Callback from fuse_send_data_iov_* when it's virtio and the buffer
 283 * is a single FD with FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK
 284 * We need send the iov and then the buffer.
 285 * Return 0 on success
 286 */
 287int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
 288                         struct iovec *iov, int count, struct fuse_bufvec *buf,
 289                         size_t len)
 290{
 291    FVRequest *req = container_of(ch, FVRequest, ch);
 292    struct fv_QueueInfo *qi = ch->qi;
 293    VuDev *dev = &se->virtio_dev->dev;
 294    VuVirtq *q = vu_get_queue(dev, qi->qidx);
 295    VuVirtqElement *elem = &req->elem;
 296    int ret = 0;
 297
 298    assert(count >= 1);
 299    assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
 300
 301    struct fuse_out_header *out = iov[0].iov_base;
 302    /* TODO: Endianness! */
 303
 304    size_t iov_len = iov_size(iov, count);
 305    size_t tosend_len = iov_len + len;
 306
 307    out->len = tosend_len;
 308
 309    fuse_log(FUSE_LOG_DEBUG, "%s: count=%d len=%zd iov_len=%zd\n", __func__,
 310             count, len, iov_len);
 311
 312    /* unique == 0 is notification which we don't support */
 313    assert(out->unique);
 314
 315    assert(!req->reply_sent);
 316
 317    /* The 'in' part of the elem is to qemu */
 318    unsigned int in_num = elem->in_num;
 319    struct iovec *in_sg = elem->in_sg;
 320    size_t in_len = iov_size(in_sg, in_num);
 321    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
 322             __func__, elem->index, in_num, in_len);
 323
 324    /*
 325     * The elem should have room for a 'fuse_out_header' (out from fuse)
 326     * plus the data based on the len in the header.
 327     */
 328    if (in_len < sizeof(struct fuse_out_header)) {
 329        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
 330                 __func__, elem->index);
 331        ret = E2BIG;
 332        goto err;
 333    }
 334    if (in_len < tosend_len) {
 335        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n",
 336                 __func__, elem->index, tosend_len);
 337        ret = E2BIG;
 338        goto err;
 339    }
 340
 341    /* TODO: Limit to 'len' */
 342
 343    /* First copy the header data from iov->in_sg */
 344    copy_iov(iov, count, in_sg, in_num, iov_len);
 345
 346    /*
 347     * Build a copy of the the in_sg iov so we can skip bits in it,
 348     * including changing the offsets
 349     */
 350    struct iovec *in_sg_cpy = calloc(sizeof(struct iovec), in_num);
 351    assert(in_sg_cpy);
 352    memcpy(in_sg_cpy, in_sg, sizeof(struct iovec) * in_num);
 353    /* These get updated as we skip */
 354    struct iovec *in_sg_ptr = in_sg_cpy;
 355    int in_sg_cpy_count = in_num;
 356
 357    /* skip over parts of in_sg that contained the header iov */
 358    size_t skip_size = iov_len;
 359
 360    size_t in_sg_left = 0;
 361    do {
 362        while (skip_size != 0 && in_sg_cpy_count) {
 363            if (skip_size >= in_sg_ptr[0].iov_len) {
 364                skip_size -= in_sg_ptr[0].iov_len;
 365                in_sg_ptr++;
 366                in_sg_cpy_count--;
 367            } else {
 368                in_sg_ptr[0].iov_len -= skip_size;
 369                in_sg_ptr[0].iov_base += skip_size;
 370                break;
 371            }
 372        }
 373
 374        int i;
 375        for (i = 0, in_sg_left = 0; i < in_sg_cpy_count; i++) {
 376            in_sg_left += in_sg_ptr[i].iov_len;
 377        }
 378        fuse_log(FUSE_LOG_DEBUG,
 379                 "%s: after skip skip_size=%zd in_sg_cpy_count=%d "
 380                 "in_sg_left=%zd\n",
 381                 __func__, skip_size, in_sg_cpy_count, in_sg_left);
 382        ret = preadv(buf->buf[0].fd, in_sg_ptr, in_sg_cpy_count,
 383                     buf->buf[0].pos);
 384
 385        if (ret == -1) {
 386            ret = errno;
 387            fuse_log(FUSE_LOG_DEBUG, "%s: preadv failed (%m) len=%zd\n",
 388                     __func__, len);
 389            free(in_sg_cpy);
 390            goto err;
 391        }
 392        fuse_log(FUSE_LOG_DEBUG, "%s: preadv ret=%d len=%zd\n", __func__,
 393                 ret, len);
 394        if (ret < len && ret) {
 395            fuse_log(FUSE_LOG_DEBUG, "%s: ret < len\n", __func__);
 396            /* Skip over this much next time around */
 397            skip_size = ret;
 398            buf->buf[0].pos += ret;
 399            len -= ret;
 400
 401            /* Lets do another read */
 402            continue;
 403        }
 404        if (!ret) {
 405            /* EOF case? */
 406            fuse_log(FUSE_LOG_DEBUG, "%s: !ret in_sg_left=%zd\n", __func__,
 407                     in_sg_left);
 408            break;
 409        }
 410        if (ret != len) {
 411            fuse_log(FUSE_LOG_DEBUG, "%s: ret!=len\n", __func__);
 412            ret = EIO;
 413            free(in_sg_cpy);
 414            goto err;
 415        }
 416        in_sg_left -= ret;
 417        len -= ret;
 418    } while (in_sg_left);
 419    free(in_sg_cpy);
 420
 421    /* Need to fix out->len on EOF */
 422    if (len) {
 423        struct fuse_out_header *out_sg = in_sg[0].iov_base;
 424
 425        tosend_len -= len;
 426        out_sg->len = tosend_len;
 427    }
 428
 429    ret = 0;
 430
 431    vu_dispatch_rdlock(qi->virtio_dev);
 432    pthread_mutex_lock(&qi->vq_lock);
 433    vu_queue_push(dev, q, elem, tosend_len);
 434    vu_queue_notify(dev, q);
 435    pthread_mutex_unlock(&qi->vq_lock);
 436    vu_dispatch_unlock(qi->virtio_dev);
 437
 438err:
 439    if (ret == 0) {
 440        req->reply_sent = true;
 441    }
 442
 443    return ret;
 444}
 445
 446static __thread bool clone_fs_called;
 447
 448/* Process one FVRequest in a thread pool */
 449static void fv_queue_worker(gpointer data, gpointer user_data)
 450{
 451    struct fv_QueueInfo *qi = user_data;
 452    struct fuse_session *se = qi->virtio_dev->se;
 453    struct VuDev *dev = &qi->virtio_dev->dev;
 454    FVRequest *req = data;
 455    VuVirtqElement *elem = &req->elem;
 456    struct fuse_buf fbuf = {};
 457    bool allocated_bufv = false;
 458    struct fuse_bufvec bufv;
 459    struct fuse_bufvec *pbufv;
 460
 461    assert(se->bufsize > sizeof(struct fuse_in_header));
 462
 463    if (!clone_fs_called) {
 464        int ret;
 465
 466        /* unshare FS for xattr operation */
 467        ret = unshare(CLONE_FS);
 468        /* should not fail */
 469        assert(ret == 0);
 470
 471        clone_fs_called = true;
 472    }
 473
 474    /*
 475     * An element contains one request and the space to send our response
 476     * They're spread over multiple descriptors in a scatter/gather set
 477     * and we can't trust the guest to keep them still; so copy in/out.
 478     */
 479    fbuf.mem = malloc(se->bufsize);
 480    assert(fbuf.mem);
 481
 482    fuse_mutex_init(&req->ch.lock);
 483    req->ch.fd = -1;
 484    req->ch.qi = qi;
 485
 486    /* The 'out' part of the elem is from qemu */
 487    unsigned int out_num = elem->out_num;
 488    struct iovec *out_sg = elem->out_sg;
 489    size_t out_len = iov_size(out_sg, out_num);
 490    fuse_log(FUSE_LOG_DEBUG,
 491             "%s: elem %d: with %d out desc of length %zd\n",
 492             __func__, elem->index, out_num, out_len);
 493
 494    /*
 495     * The elem should contain a 'fuse_in_header' (in to fuse)
 496     * plus the data based on the len in the header.
 497     */
 498    if (out_len < sizeof(struct fuse_in_header)) {
 499        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
 500                 __func__, elem->index);
 501        assert(0); /* TODO */
 502    }
 503    if (out_len > se->bufsize) {
 504        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
 505                 elem->index);
 506        assert(0); /* TODO */
 507    }
 508    /* Copy just the first element and look at it */
 509    copy_from_iov(&fbuf, 1, out_sg);
 510
 511    pbufv = NULL; /* Compiler thinks an unitialised path */
 512    if (out_num > 2 &&
 513        out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
 514        ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
 515        out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
 516        /*
 517         * For a write we don't actually need to copy the
 518         * data, we can just do it straight out of guest memory
 519         * but we must still copy the headers in case the guest
 520         * was nasty and changed them while we were using them.
 521         */
 522        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
 523
 524        /* copy the fuse_write_in header afte rthe fuse_in_header */
 525        fbuf.mem += out_sg->iov_len;
 526        copy_from_iov(&fbuf, 1, out_sg + 1);
 527        fbuf.mem -= out_sg->iov_len;
 528        fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
 529
 530        /* Allocate the bufv, with space for the rest of the iov */
 531        pbufv = malloc(sizeof(struct fuse_bufvec) +
 532                       sizeof(struct fuse_buf) * (out_num - 2));
 533        if (!pbufv) {
 534            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
 535                    __func__);
 536            goto out;
 537        }
 538
 539        allocated_bufv = true;
 540        pbufv->count = 1;
 541        pbufv->buf[0] = fbuf;
 542
 543        size_t iovindex, pbufvindex;
 544        iovindex = 2; /* 2 headers, separate iovs */
 545        pbufvindex = 1; /* 2 headers, 1 fusebuf */
 546
 547        for (; iovindex < out_num; iovindex++, pbufvindex++) {
 548            pbufv->count++;
 549            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
 550            pbufv->buf[pbufvindex].flags = 0;
 551            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
 552            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
 553        }
 554    } else {
 555        /* Normal (non fast write) path */
 556
 557        /* Copy the rest of the buffer */
 558        fbuf.mem += out_sg->iov_len;
 559        copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
 560        fbuf.mem -= out_sg->iov_len;
 561        fbuf.size = out_len;
 562
 563        /* TODO! Endianness of header */
 564
 565        /* TODO: Add checks for fuse_session_exited */
 566        bufv.buf[0] = fbuf;
 567        bufv.count = 1;
 568        pbufv = &bufv;
 569    }
 570    pbufv->idx = 0;
 571    pbufv->off = 0;
 572    fuse_session_process_buf_int(se, pbufv, &req->ch);
 573
 574out:
 575    if (allocated_bufv) {
 576        free(pbufv);
 577    }
 578
 579    /* If the request has no reply, still recycle the virtqueue element */
 580    if (!req->reply_sent) {
 581        struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
 582
 583        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
 584                 elem->index);
 585
 586        vu_dispatch_rdlock(qi->virtio_dev);
 587        pthread_mutex_lock(&qi->vq_lock);
 588        vu_queue_push(dev, q, elem, 0);
 589        vu_queue_notify(dev, q);
 590        pthread_mutex_unlock(&qi->vq_lock);
 591        vu_dispatch_unlock(qi->virtio_dev);
 592    }
 593
 594    pthread_mutex_destroy(&req->ch.lock);
 595    free(fbuf.mem);
 596    free(req);
 597}
 598
 599/* Thread function for individual queues, created when a queue is 'started' */
 600static void *fv_queue_thread(void *opaque)
 601{
 602    struct fv_QueueInfo *qi = opaque;
 603    struct VuDev *dev = &qi->virtio_dev->dev;
 604    struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
 605    struct fuse_session *se = qi->virtio_dev->se;
 606    GThreadPool *pool = NULL;
 607    GList *req_list = NULL;
 608
 609    if (se->thread_pool_size) {
 610        fuse_log(FUSE_LOG_DEBUG, "%s: Creating thread pool for Queue %d\n",
 611                 __func__, qi->qidx);
 612        pool = g_thread_pool_new(fv_queue_worker, qi, se->thread_pool_size,
 613                                 FALSE, NULL);
 614        if (!pool) {
 615            fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
 616            return NULL;
 617        }
 618    }
 619
 620    fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
 621             qi->qidx, qi->kick_fd);
 622    while (1) {
 623        struct pollfd pf[2];
 624
 625        pf[0].fd = qi->kick_fd;
 626        pf[0].events = POLLIN;
 627        pf[0].revents = 0;
 628        pf[1].fd = qi->kill_fd;
 629        pf[1].events = POLLIN;
 630        pf[1].revents = 0;
 631
 632        fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for Queue %d event\n", __func__,
 633                 qi->qidx);
 634        int poll_res = ppoll(pf, 2, NULL, NULL);
 635
 636        if (poll_res == -1) {
 637            if (errno == EINTR) {
 638                fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
 639                         __func__);
 640                continue;
 641            }
 642            fuse_log(FUSE_LOG_ERR, "fv_queue_thread ppoll: %m\n");
 643            break;
 644        }
 645        assert(poll_res >= 1);
 646        if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 647            fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x Queue %d\n",
 648                     __func__, pf[0].revents, qi->qidx);
 649            break;
 650        }
 651        if (pf[1].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 652            fuse_log(FUSE_LOG_ERR,
 653                     "%s: Unexpected poll revents %x Queue %d killfd\n",
 654                     __func__, pf[1].revents, qi->qidx);
 655            break;
 656        }
 657        if (pf[1].revents) {
 658            fuse_log(FUSE_LOG_INFO, "%s: kill event on queue %d - quitting\n",
 659                     __func__, qi->qidx);
 660            break;
 661        }
 662        assert(pf[0].revents & POLLIN);
 663        fuse_log(FUSE_LOG_DEBUG, "%s: Got queue event on Queue %d\n", __func__,
 664                 qi->qidx);
 665
 666        eventfd_t evalue;
 667        if (eventfd_read(qi->kick_fd, &evalue)) {
 668            fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
 669            break;
 670        }
 671        /* Mutual exclusion with virtio_loop() */
 672        vu_dispatch_rdlock(qi->virtio_dev);
 673        pthread_mutex_lock(&qi->vq_lock);
 674        /* out is from guest, in is too guest */
 675        unsigned int in_bytes, out_bytes;
 676        vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
 677
 678        fuse_log(FUSE_LOG_DEBUG,
 679                 "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
 680                 __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
 681
 682        while (1) {
 683            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
 684            if (!req) {
 685                break;
 686            }
 687
 688            req->reply_sent = false;
 689
 690            if (!se->thread_pool_size) {
 691                req_list = g_list_prepend(req_list, req);
 692            } else {
 693                g_thread_pool_push(pool, req, NULL);
 694            }
 695        }
 696
 697        pthread_mutex_unlock(&qi->vq_lock);
 698        vu_dispatch_unlock(qi->virtio_dev);
 699
 700        /* Process all the requests. */
 701        if (!se->thread_pool_size && req_list != NULL) {
 702            g_list_foreach(req_list, fv_queue_worker, qi);
 703            g_list_free(req_list);
 704            req_list = NULL;
 705        }
 706    }
 707
 708    if (pool) {
 709        g_thread_pool_free(pool, FALSE, TRUE);
 710    }
 711
 712    return NULL;
 713}
 714
 715static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
 716{
 717    int ret;
 718    struct fv_QueueInfo *ourqi;
 719
 720    assert(qidx < vud->nqueues);
 721    ourqi = vud->qi[qidx];
 722
 723    /* Kill the thread */
 724    if (eventfd_write(ourqi->kill_fd, 1)) {
 725        fuse_log(FUSE_LOG_ERR, "Eventfd_write for queue %d: %s\n",
 726                 qidx, strerror(errno));
 727    }
 728    ret = pthread_join(ourqi->thread, NULL);
 729    if (ret) {
 730        fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
 731                 __func__, qidx, ret);
 732    }
 733    pthread_mutex_destroy(&ourqi->vq_lock);
 734    close(ourqi->kill_fd);
 735    ourqi->kick_fd = -1;
 736    free(vud->qi[qidx]);
 737    vud->qi[qidx] = NULL;
 738}
 739
 740/* Callback from libvhost-user on start or stop of a queue */
 741static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
 742{
 743    struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
 744    struct fv_QueueInfo *ourqi;
 745
 746    fuse_log(FUSE_LOG_INFO, "%s: qidx=%d started=%d\n", __func__, qidx,
 747             started);
 748    assert(qidx >= 0);
 749
 750    /*
 751     * Ignore additional request queues for now.  passthrough_ll.c must be
 752     * audited for thread-safety issues first.  It was written with a
 753     * well-behaved client in mind and may not protect against all types of
 754     * races yet.
 755     */
 756    if (qidx > 1) {
 757        fuse_log(FUSE_LOG_ERR,
 758                 "%s: multiple request queues not yet implemented, please only "
 759                 "configure 1 request queue\n",
 760                 __func__);
 761        exit(EXIT_FAILURE);
 762    }
 763
 764    if (started) {
 765        /* Fire up a thread to watch this queue */
 766        if (qidx >= vud->nqueues) {
 767            vud->qi = realloc(vud->qi, (qidx + 1) * sizeof(vud->qi[0]));
 768            assert(vud->qi);
 769            memset(vud->qi + vud->nqueues, 0,
 770                   sizeof(vud->qi[0]) * (1 + (qidx - vud->nqueues)));
 771            vud->nqueues = qidx + 1;
 772        }
 773        if (!vud->qi[qidx]) {
 774            vud->qi[qidx] = calloc(sizeof(struct fv_QueueInfo), 1);
 775            assert(vud->qi[qidx]);
 776            vud->qi[qidx]->virtio_dev = vud;
 777            vud->qi[qidx]->qidx = qidx;
 778        } else {
 779            /* Shouldn't have been started */
 780            assert(vud->qi[qidx]->kick_fd == -1);
 781        }
 782        ourqi = vud->qi[qidx];
 783        ourqi->kick_fd = dev->vq[qidx].kick_fd;
 784
 785        ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
 786        assert(ourqi->kill_fd != -1);
 787        pthread_mutex_init(&ourqi->vq_lock, NULL);
 788
 789        if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
 790            fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
 791                     __func__, qidx);
 792            assert(0);
 793        }
 794    } else {
 795        /*
 796         * Temporarily drop write-lock taken in virtio_loop() so that
 797         * the queue thread doesn't block in virtio_send_msg().
 798         */
 799        vu_dispatch_unlock(vud);
 800        fv_queue_cleanup_thread(vud, qidx);
 801        vu_dispatch_wrlock(vud);
 802    }
 803}
 804
 805static bool fv_queue_order(VuDev *dev, int qidx)
 806{
 807    return false;
 808}
 809
 810static const VuDevIface fv_iface = {
 811    .get_features = fv_get_features,
 812    .set_features = fv_set_features,
 813
 814    /* Don't need process message, we've not got any at vhost-user level */
 815    .queue_set_started = fv_queue_set_started,
 816
 817    .queue_is_processed_in_order = fv_queue_order,
 818};
 819
 820/*
 821 * Main loop; this mostly deals with events on the vhost-user
 822 * socket itself, and not actual fuse data.
 823 */
 824int virtio_loop(struct fuse_session *se)
 825{
 826    fuse_log(FUSE_LOG_INFO, "%s: Entry\n", __func__);
 827
 828    while (!fuse_session_exited(se)) {
 829        struct pollfd pf[1];
 830        bool ok;
 831        pf[0].fd = se->vu_socketfd;
 832        pf[0].events = POLLIN;
 833        pf[0].revents = 0;
 834
 835        fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for VU event\n", __func__);
 836        int poll_res = ppoll(pf, 1, NULL, NULL);
 837
 838        if (poll_res == -1) {
 839            if (errno == EINTR) {
 840                fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n",
 841                         __func__);
 842                continue;
 843            }
 844            fuse_log(FUSE_LOG_ERR, "virtio_loop ppoll: %m\n");
 845            break;
 846        }
 847        assert(poll_res == 1);
 848        if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 849            fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x\n", __func__,
 850                     pf[0].revents);
 851            break;
 852        }
 853        assert(pf[0].revents & POLLIN);
 854        fuse_log(FUSE_LOG_DEBUG, "%s: Got VU event\n", __func__);
 855        /* Mutual exclusion with fv_queue_thread() */
 856        vu_dispatch_wrlock(se->virtio_dev);
 857
 858        ok = vu_dispatch(&se->virtio_dev->dev);
 859
 860        vu_dispatch_unlock(se->virtio_dev);
 861
 862        if (!ok) {
 863            fuse_log(FUSE_LOG_ERR, "%s: vu_dispatch failed\n", __func__);
 864            break;
 865        }
 866    }
 867
 868    /*
 869     * Make sure all fv_queue_thread()s quit on exit, as we're about to
 870     * free virtio dev and fuse session, no one should access them anymore.
 871     */
 872    for (int i = 0; i < se->virtio_dev->nqueues; i++) {
 873        if (!se->virtio_dev->qi[i]) {
 874            continue;
 875        }
 876
 877        fuse_log(FUSE_LOG_INFO, "%s: Stopping queue %d thread\n", __func__, i);
 878        fv_queue_cleanup_thread(se->virtio_dev, i);
 879    }
 880
 881    fuse_log(FUSE_LOG_INFO, "%s: Exit\n", __func__);
 882
 883    return 0;
 884}
 885
 886static void strreplace(char *s, char old, char new)
 887{
 888    for (; *s; ++s) {
 889        if (*s == old) {
 890            *s = new;
 891        }
 892    }
 893}
 894
 895static bool fv_socket_lock(struct fuse_session *se)
 896{
 897    g_autofree gchar *sk_name = NULL;
 898    g_autofree gchar *pidfile = NULL;
 899    g_autofree gchar *dir = NULL;
 900    Error *local_err = NULL;
 901
 902    dir = qemu_get_local_state_pathname("run/virtiofsd");
 903
 904    if (g_mkdir_with_parents(dir, S_IRWXU) < 0) {
 905        fuse_log(FUSE_LOG_ERR, "%s: Failed to create directory %s: %s",
 906                 __func__, dir, strerror(errno));
 907        return false;
 908    }
 909
 910    sk_name = g_strdup(se->vu_socket_path);
 911    strreplace(sk_name, '/', '.');
 912    pidfile = g_strdup_printf("%s/%s.pid", dir, sk_name);
 913
 914    if (!qemu_write_pidfile(pidfile, &local_err)) {
 915        error_report_err(local_err);
 916        return false;
 917    }
 918
 919    return true;
 920}
 921
 922static int fv_create_listen_socket(struct fuse_session *se)
 923{
 924    struct sockaddr_un un;
 925    mode_t old_umask;
 926
 927    /* Nothing to do if fd is already initialized */
 928    if (se->vu_listen_fd >= 0) {
 929        return 0;
 930    }
 931
 932    if (strlen(se->vu_socket_path) >= sizeof(un.sun_path)) {
 933        fuse_log(FUSE_LOG_ERR, "Socket path too long\n");
 934        return -1;
 935    }
 936
 937    if (!strlen(se->vu_socket_path)) {
 938        fuse_log(FUSE_LOG_ERR, "Socket path is empty\n");
 939        return -1;
 940    }
 941
 942    /* Check the vu_socket_path is already used */
 943    if (!fv_socket_lock(se)) {
 944        return -1;
 945    }
 946
 947    /*
 948     * Create the Unix socket to communicate with qemu
 949     * based on QEMU's vhost-user-bridge
 950     */
 951    unlink(se->vu_socket_path);
 952    strcpy(un.sun_path, se->vu_socket_path);
 953    size_t addr_len = sizeof(un);
 954
 955    int listen_sock = socket(AF_UNIX, SOCK_STREAM, 0);
 956    if (listen_sock == -1) {
 957        fuse_log(FUSE_LOG_ERR, "vhost socket creation: %m\n");
 958        return -1;
 959    }
 960    un.sun_family = AF_UNIX;
 961
 962    /*
 963     * Unfortunately bind doesn't let you set the mask on the socket,
 964     * so set umask appropriately and restore it later.
 965     */
 966    if (se->vu_socket_group) {
 967        old_umask = umask(S_IROTH | S_IWOTH | S_IXOTH);
 968    } else {
 969        old_umask = umask(S_IRGRP | S_IWGRP | S_IXGRP |
 970                          S_IROTH | S_IWOTH | S_IXOTH);
 971    }
 972    if (bind(listen_sock, (struct sockaddr *)&un, addr_len) == -1) {
 973        fuse_log(FUSE_LOG_ERR, "vhost socket bind: %m\n");
 974        close(listen_sock);
 975        umask(old_umask);
 976        return -1;
 977    }
 978    if (se->vu_socket_group) {
 979        struct group *g = getgrnam(se->vu_socket_group);
 980        if (g) {
 981            if (!chown(se->vu_socket_path, -1, g->gr_gid)) {
 982                fuse_log(FUSE_LOG_WARNING,
 983                         "vhost socket failed to set group to %s (%d)\n",
 984                         se->vu_socket_group, g->gr_gid);
 985            }
 986        }
 987    }
 988    umask(old_umask);
 989
 990    if (listen(listen_sock, 1) == -1) {
 991        fuse_log(FUSE_LOG_ERR, "vhost socket listen: %m\n");
 992        close(listen_sock);
 993        return -1;
 994    }
 995
 996    se->vu_listen_fd = listen_sock;
 997    return 0;
 998}
 999
1000int virtio_session_mount(struct fuse_session *se)
1001{
1002    int ret;
1003
1004    /*
1005     * Test that unshare(CLONE_FS) works. fv_queue_worker() will need it. It's
1006     * an unprivileged system call but some Docker/Moby versions are known to
1007     * reject it via seccomp when CAP_SYS_ADMIN is not given.
1008     *
1009     * Note that the program is single-threaded here so this syscall has no
1010     * visible effect and is safe to make.
1011     */
1012    ret = unshare(CLONE_FS);
1013    if (ret == -1 && errno == EPERM) {
1014        fuse_log(FUSE_LOG_ERR, "unshare(CLONE_FS) failed with EPERM. If "
1015                "running in a container please check that the container "
1016                "runtime seccomp policy allows unshare.\n");
1017        return -1;
1018    }
1019
1020    ret = fv_create_listen_socket(se);
1021    if (ret < 0) {
1022        return ret;
1023    }
1024
1025    se->fd = -1;
1026
1027    fuse_log(FUSE_LOG_INFO, "%s: Waiting for vhost-user socket connection...\n",
1028             __func__);
1029    int data_sock = accept(se->vu_listen_fd, NULL, NULL);
1030    if (data_sock == -1) {
1031        fuse_log(FUSE_LOG_ERR, "vhost socket accept: %m\n");
1032        close(se->vu_listen_fd);
1033        return -1;
1034    }
1035    close(se->vu_listen_fd);
1036    se->vu_listen_fd = -1;
1037    fuse_log(FUSE_LOG_INFO, "%s: Received vhost-user socket connection\n",
1038             __func__);
1039
1040    /* TODO: Some cleanup/deallocation! */
1041    se->virtio_dev = calloc(sizeof(struct fv_VuDev), 1);
1042    if (!se->virtio_dev) {
1043        fuse_log(FUSE_LOG_ERR, "%s: virtio_dev calloc failed\n", __func__);
1044        close(data_sock);
1045        return -1;
1046    }
1047
1048    se->vu_socketfd = data_sock;
1049    se->virtio_dev->se = se;
1050    pthread_rwlock_init(&se->virtio_dev->vu_dispatch_rwlock, NULL);
1051    if (!vu_init(&se->virtio_dev->dev, 2, se->vu_socketfd, fv_panic, NULL,
1052                 fv_set_watch, fv_remove_watch, &fv_iface)) {
1053        fuse_log(FUSE_LOG_ERR, "%s: vu_init failed\n", __func__);
1054        return -1;
1055    }
1056
1057    return 0;
1058}
1059
1060void virtio_session_close(struct fuse_session *se)
1061{
1062    close(se->vu_socketfd);
1063
1064    if (!se->virtio_dev) {
1065        return;
1066    }
1067
1068    free(se->virtio_dev->qi);
1069    pthread_rwlock_destroy(&se->virtio_dev->vu_dispatch_rwlock);
1070    free(se->virtio_dev);
1071    se->virtio_dev = NULL;
1072}
1073