linux/fs/io_uring.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Shared application/kernel submission and completion ring pairs, for
   4 * supporting fast/efficient IO.
   5 *
   6 * A note on the read/write ordering memory barriers that are matched between
   7 * the application and kernel side.
   8 *
   9 * After the application reads the CQ ring tail, it must use an
  10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
  11 * before writing the tail (using smp_load_acquire to read the tail will
  12 * do). It also needs a smp_mb() before updating CQ head (ordering the
  13 * entry load(s) with the head store), pairing with an implicit barrier
  14 * through a control-dependency in io_get_cqring (smp_store_release to
  15 * store head will do). Failure to do so could lead to reading invalid
  16 * CQ entries.
  17 *
  18 * Likewise, the application must use an appropriate smp_wmb() before
  19 * writing the SQ tail (ordering SQ entry stores with the tail store),
  20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
  21 * to store the tail will do). And it needs a barrier ordering the SQ
  22 * head load before writing new SQ entries (smp_load_acquire to read
  23 * head will do).
  24 *
  25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
  26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
  27 * updating the SQ tail; a full memory barrier smp_mb() is needed
  28 * between.
  29 *
  30 * Also see the examples in the liburing library:
  31 *
  32 *      git://git.kernel.dk/liburing
  33 *
  34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
  35 * from data shared between the kernel and application. This is done both
  36 * for ordering purposes, but also to ensure that once a value is loaded from
  37 * data that the application could potentially modify, it remains stable.
  38 *
  39 * Copyright (C) 2018-2019 Jens Axboe
  40 * Copyright (c) 2018-2019 Christoph Hellwig
  41 */
  42#include <linux/kernel.h>
  43#include <linux/init.h>
  44#include <linux/errno.h>
  45#include <linux/syscalls.h>
  46#include <linux/compat.h>
  47#include <linux/refcount.h>
  48#include <linux/uio.h>
  49
  50#include <linux/sched/signal.h>
  51#include <linux/fs.h>
  52#include <linux/file.h>
  53#include <linux/fdtable.h>
  54#include <linux/mm.h>
  55#include <linux/mman.h>
  56#include <linux/mmu_context.h>
  57#include <linux/percpu.h>
  58#include <linux/slab.h>
  59#include <linux/workqueue.h>
  60#include <linux/kthread.h>
  61#include <linux/blkdev.h>
  62#include <linux/bvec.h>
  63#include <linux/net.h>
  64#include <net/sock.h>
  65#include <net/af_unix.h>
  66#include <net/scm.h>
  67#include <linux/anon_inodes.h>
  68#include <linux/sched/mm.h>
  69#include <linux/uaccess.h>
  70#include <linux/nospec.h>
  71#include <linux/sizes.h>
  72#include <linux/hugetlb.h>
  73
  74#include <uapi/linux/io_uring.h>
  75
  76#include "internal.h"
  77
  78#define IORING_MAX_ENTRIES      32768
  79#define IORING_MAX_FIXED_FILES  1024
  80
  81struct io_uring {
  82        u32 head ____cacheline_aligned_in_smp;
  83        u32 tail ____cacheline_aligned_in_smp;
  84};
  85
  86/*
  87 * This data is shared with the application through the mmap at offsets
  88 * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
  89 *
  90 * The offsets to the member fields are published through struct
  91 * io_sqring_offsets when calling io_uring_setup.
  92 */
  93struct io_rings {
  94        /*
  95         * Head and tail offsets into the ring; the offsets need to be
  96         * masked to get valid indices.
  97         *
  98         * The kernel controls head of the sq ring and the tail of the cq ring,
  99         * and the application controls tail of the sq ring and the head of the
 100         * cq ring.
 101         */
 102        struct io_uring         sq, cq;
 103        /*
 104         * Bitmasks to apply to head and tail offsets (constant, equals
 105         * ring_entries - 1)
 106         */
 107        u32                     sq_ring_mask, cq_ring_mask;
 108        /* Ring sizes (constant, power of 2) */
 109        u32                     sq_ring_entries, cq_ring_entries;
 110        /*
 111         * Number of invalid entries dropped by the kernel due to
 112         * invalid index stored in array
 113         *
 114         * Written by the kernel, shouldn't be modified by the
 115         * application (i.e. get number of "new events" by comparing to
 116         * cached value).
 117         *
 118         * After a new SQ head value was read by the application this
 119         * counter includes all submissions that were dropped reaching
 120         * the new SQ head (and possibly more).
 121         */
 122        u32                     sq_dropped;
 123        /*
 124         * Runtime flags
 125         *
 126         * Written by the kernel, shouldn't be modified by the
 127         * application.
 128         *
 129         * The application needs a full memory barrier before checking
 130         * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
 131         */
 132        u32                     sq_flags;
 133        /*
 134         * Number of completion events lost because the queue was full;
 135         * this should be avoided by the application by making sure
 136         * there are not more requests pending thatn there is space in
 137         * the completion queue.
 138         *
 139         * Written by the kernel, shouldn't be modified by the
 140         * application (i.e. get number of "new events" by comparing to
 141         * cached value).
 142         *
 143         * As completion events come in out of order this counter is not
 144         * ordered with any other data.
 145         */
 146        u32                     cq_overflow;
 147        /*
 148         * Ring buffer of completion events.
 149         *
 150         * The kernel writes completion events fresh every time they are
 151         * produced, so the application is allowed to modify pending
 152         * entries.
 153         */
 154        struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
 155};
 156
 157struct io_mapped_ubuf {
 158        u64             ubuf;
 159        size_t          len;
 160        struct          bio_vec *bvec;
 161        unsigned int    nr_bvecs;
 162};
 163
 164struct async_list {
 165        spinlock_t              lock;
 166        atomic_t                cnt;
 167        struct list_head        list;
 168
 169        struct file             *file;
 170        off_t                   io_start;
 171        size_t                  io_len;
 172};
 173
 174struct io_ring_ctx {
 175        struct {
 176                struct percpu_ref       refs;
 177        } ____cacheline_aligned_in_smp;
 178
 179        struct {
 180                unsigned int            flags;
 181                bool                    compat;
 182                bool                    account_mem;
 183
 184                /*
 185                 * Ring buffer of indices into array of io_uring_sqe, which is
 186                 * mmapped by the application using the IORING_OFF_SQES offset.
 187                 *
 188                 * This indirection could e.g. be used to assign fixed
 189                 * io_uring_sqe entries to operations and only submit them to
 190                 * the queue when needed.
 191                 *
 192                 * The kernel modifies neither the indices array nor the entries
 193                 * array.
 194                 */
 195                u32                     *sq_array;
 196                unsigned                cached_sq_head;
 197                unsigned                sq_entries;
 198                unsigned                sq_mask;
 199                unsigned                sq_thread_idle;
 200                unsigned                cached_sq_dropped;
 201                struct io_uring_sqe     *sq_sqes;
 202
 203                struct list_head        defer_list;
 204                struct list_head        timeout_list;
 205        } ____cacheline_aligned_in_smp;
 206
 207        /* IO offload */
 208        struct workqueue_struct *sqo_wq[2];
 209        struct task_struct      *sqo_thread;    /* if using sq thread polling */
 210        struct mm_struct        *sqo_mm;
 211        wait_queue_head_t       sqo_wait;
 212        struct completion       sqo_thread_started;
 213
 214        struct {
 215                unsigned                cached_cq_tail;
 216                atomic_t                cached_cq_overflow;
 217                unsigned                cq_entries;
 218                unsigned                cq_mask;
 219                struct wait_queue_head  cq_wait;
 220                struct fasync_struct    *cq_fasync;
 221                struct eventfd_ctx      *cq_ev_fd;
 222                atomic_t                cq_timeouts;
 223        } ____cacheline_aligned_in_smp;
 224
 225        struct io_rings *rings;
 226
 227        /*
 228         * If used, fixed file set. Writers must ensure that ->refs is dead,
 229         * readers must ensure that ->refs is alive as long as the file* is
 230         * used. Only updated through io_uring_register(2).
 231         */
 232        struct file             **user_files;
 233        unsigned                nr_user_files;
 234
 235        /* if used, fixed mapped user buffers */
 236        unsigned                nr_user_bufs;
 237        struct io_mapped_ubuf   *user_bufs;
 238
 239        struct user_struct      *user;
 240
 241        struct completion       ctx_done;
 242
 243        struct {
 244                struct mutex            uring_lock;
 245                wait_queue_head_t       wait;
 246        } ____cacheline_aligned_in_smp;
 247
 248        struct {
 249                spinlock_t              completion_lock;
 250                bool                    poll_multi_file;
 251                /*
 252                 * ->poll_list is protected by the ctx->uring_lock for
 253                 * io_uring instances that don't use IORING_SETUP_SQPOLL.
 254                 * For SQPOLL, only the single threaded io_sq_thread() will
 255                 * manipulate the list, hence no extra locking is needed there.
 256                 */
 257                struct list_head        poll_list;
 258                struct list_head        cancel_list;
 259        } ____cacheline_aligned_in_smp;
 260
 261        struct async_list       pending_async[2];
 262
 263#if defined(CONFIG_UNIX)
 264        struct socket           *ring_sock;
 265#endif
 266};
 267
 268struct sqe_submit {
 269        const struct io_uring_sqe       *sqe;
 270        unsigned short                  index;
 271        u32                             sequence;
 272        bool                            has_user;
 273        bool                            needs_lock;
 274        bool                            needs_fixed_file;
 275};
 276
 277/*
 278 * First field must be the file pointer in all the
 279 * iocb unions! See also 'struct kiocb' in <linux/fs.h>
 280 */
 281struct io_poll_iocb {
 282        struct file                     *file;
 283        struct wait_queue_head          *head;
 284        __poll_t                        events;
 285        bool                            done;
 286        bool                            canceled;
 287        struct wait_queue_entry         wait;
 288};
 289
 290struct io_timeout {
 291        struct file                     *file;
 292        struct hrtimer                  timer;
 293};
 294
 295/*
 296 * NOTE! Each of the iocb union members has the file pointer
 297 * as the first entry in their struct definition. So you can
 298 * access the file pointer through any of the sub-structs,
 299 * or directly as just 'ki_filp' in this struct.
 300 */
 301struct io_kiocb {
 302        union {
 303                struct file             *file;
 304                struct kiocb            rw;
 305                struct io_poll_iocb     poll;
 306                struct io_timeout       timeout;
 307        };
 308
 309        struct sqe_submit       submit;
 310
 311        struct io_ring_ctx      *ctx;
 312        struct list_head        list;
 313        struct list_head        link_list;
 314        unsigned int            flags;
 315        refcount_t              refs;
 316#define REQ_F_NOWAIT            1       /* must not punt to workers */
 317#define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
 318#define REQ_F_FIXED_FILE        4       /* ctx owns file */
 319#define REQ_F_SEQ_PREV          8       /* sequential with previous */
 320#define REQ_F_IO_DRAIN          16      /* drain existing IO first */
 321#define REQ_F_IO_DRAINED        32      /* drain done */
 322#define REQ_F_LINK              64      /* linked sqes */
 323#define REQ_F_LINK_DONE         128     /* linked sqes done */
 324#define REQ_F_FAIL_LINK         256     /* fail rest of links */
 325#define REQ_F_SHADOW_DRAIN      512     /* link-drain shadow req */
 326#define REQ_F_TIMEOUT           1024    /* timeout request */
 327#define REQ_F_ISREG             2048    /* regular file */
 328#define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
 329#define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
 330        u64                     user_data;
 331        u32                     result;
 332        u32                     sequence;
 333
 334        struct work_struct      work;
 335};
 336
 337#define IO_PLUG_THRESHOLD               2
 338#define IO_IOPOLL_BATCH                 8
 339
 340struct io_submit_state {
 341        struct blk_plug         plug;
 342
 343        /*
 344         * io_kiocb alloc cache
 345         */
 346        void                    *reqs[IO_IOPOLL_BATCH];
 347        unsigned                int free_reqs;
 348        unsigned                int cur_req;
 349
 350        /*
 351         * File reference cache
 352         */
 353        struct file             *file;
 354        unsigned int            fd;
 355        unsigned int            has_refs;
 356        unsigned int            used_refs;
 357        unsigned int            ios_left;
 358};
 359
 360static void io_sq_wq_submit_work(struct work_struct *work);
 361static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 362                                 long res);
 363static void __io_free_req(struct io_kiocb *req);
 364
 365static struct kmem_cache *req_cachep;
 366
 367static const struct file_operations io_uring_fops;
 368
 369struct sock *io_uring_get_socket(struct file *file)
 370{
 371#if defined(CONFIG_UNIX)
 372        if (file->f_op == &io_uring_fops) {
 373                struct io_ring_ctx *ctx = file->private_data;
 374
 375                return ctx->ring_sock->sk;
 376        }
 377#endif
 378        return NULL;
 379}
 380EXPORT_SYMBOL(io_uring_get_socket);
 381
 382static void io_ring_ctx_ref_free(struct percpu_ref *ref)
 383{
 384        struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
 385
 386        complete(&ctx->ctx_done);
 387}
 388
 389static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 390{
 391        struct io_ring_ctx *ctx;
 392        int i;
 393
 394        ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
 395        if (!ctx)
 396                return NULL;
 397
 398        if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
 399                            PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
 400                kfree(ctx);
 401                return NULL;
 402        }
 403
 404        ctx->flags = p->flags;
 405        init_waitqueue_head(&ctx->cq_wait);
 406        init_completion(&ctx->ctx_done);
 407        init_completion(&ctx->sqo_thread_started);
 408        mutex_init(&ctx->uring_lock);
 409        init_waitqueue_head(&ctx->wait);
 410        for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
 411                spin_lock_init(&ctx->pending_async[i].lock);
 412                INIT_LIST_HEAD(&ctx->pending_async[i].list);
 413                atomic_set(&ctx->pending_async[i].cnt, 0);
 414        }
 415        spin_lock_init(&ctx->completion_lock);
 416        INIT_LIST_HEAD(&ctx->poll_list);
 417        INIT_LIST_HEAD(&ctx->cancel_list);
 418        INIT_LIST_HEAD(&ctx->defer_list);
 419        INIT_LIST_HEAD(&ctx->timeout_list);
 420        return ctx;
 421}
 422
 423static inline bool __io_sequence_defer(struct io_ring_ctx *ctx,
 424                                       struct io_kiocb *req)
 425{
 426        return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
 427                                        + atomic_read(&ctx->cached_cq_overflow);
 428}
 429
 430static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
 431                                     struct io_kiocb *req)
 432{
 433        if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
 434                return false;
 435
 436        return __io_sequence_defer(ctx, req);
 437}
 438
 439static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
 440{
 441        struct io_kiocb *req;
 442
 443        req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
 444        if (req && !io_sequence_defer(ctx, req)) {
 445                list_del_init(&req->list);
 446                return req;
 447        }
 448
 449        return NULL;
 450}
 451
 452static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
 453{
 454        struct io_kiocb *req;
 455
 456        req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
 457        if (req) {
 458                if (req->flags & REQ_F_TIMEOUT_NOSEQ)
 459                        return NULL;
 460                if (!__io_sequence_defer(ctx, req)) {
 461                        list_del_init(&req->list);
 462                        return req;
 463                }
 464        }
 465
 466        return NULL;
 467}
 468
 469static void __io_commit_cqring(struct io_ring_ctx *ctx)
 470{
 471        struct io_rings *rings = ctx->rings;
 472
 473        if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
 474                /* order cqe stores with ring update */
 475                smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
 476
 477                if (wq_has_sleeper(&ctx->cq_wait)) {
 478                        wake_up_interruptible(&ctx->cq_wait);
 479                        kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
 480                }
 481        }
 482}
 483
 484static inline void io_queue_async_work(struct io_ring_ctx *ctx,
 485                                       struct io_kiocb *req)
 486{
 487        int rw = 0;
 488
 489        if (req->submit.sqe) {
 490                switch (req->submit.sqe->opcode) {
 491                case IORING_OP_WRITEV:
 492                case IORING_OP_WRITE_FIXED:
 493                        rw = !(req->rw.ki_flags & IOCB_DIRECT);
 494                        break;
 495                }
 496        }
 497
 498        queue_work(ctx->sqo_wq[rw], &req->work);
 499}
 500
 501static void io_kill_timeout(struct io_kiocb *req)
 502{
 503        int ret;
 504
 505        ret = hrtimer_try_to_cancel(&req->timeout.timer);
 506        if (ret != -1) {
 507                atomic_inc(&req->ctx->cq_timeouts);
 508                list_del(&req->list);
 509                io_cqring_fill_event(req->ctx, req->user_data, 0);
 510                __io_free_req(req);
 511        }
 512}
 513
 514static void io_kill_timeouts(struct io_ring_ctx *ctx)
 515{
 516        struct io_kiocb *req, *tmp;
 517
 518        spin_lock_irq(&ctx->completion_lock);
 519        list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
 520                io_kill_timeout(req);
 521        spin_unlock_irq(&ctx->completion_lock);
 522}
 523
 524static void io_commit_cqring(struct io_ring_ctx *ctx)
 525{
 526        struct io_kiocb *req;
 527
 528        while ((req = io_get_timeout_req(ctx)) != NULL)
 529                io_kill_timeout(req);
 530
 531        __io_commit_cqring(ctx);
 532
 533        while ((req = io_get_deferred_req(ctx)) != NULL) {
 534                if (req->flags & REQ_F_SHADOW_DRAIN) {
 535                        /* Just for drain, free it. */
 536                        __io_free_req(req);
 537                        continue;
 538                }
 539                req->flags |= REQ_F_IO_DRAINED;
 540                io_queue_async_work(ctx, req);
 541        }
 542}
 543
 544static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 545{
 546        struct io_rings *rings = ctx->rings;
 547        unsigned tail;
 548
 549        tail = ctx->cached_cq_tail;
 550        /*
 551         * writes to the cq entry need to come after reading head; the
 552         * control dependency is enough as we're using WRITE_ONCE to
 553         * fill the cq entry
 554         */
 555        if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
 556                return NULL;
 557
 558        ctx->cached_cq_tail++;
 559        return &rings->cqes[tail & ctx->cq_mask];
 560}
 561
 562static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 563                                 long res)
 564{
 565        struct io_uring_cqe *cqe;
 566
 567        /*
 568         * If we can't get a cq entry, userspace overflowed the
 569         * submission (by quite a lot). Increment the overflow count in
 570         * the ring.
 571         */
 572        cqe = io_get_cqring(ctx);
 573        if (cqe) {
 574                WRITE_ONCE(cqe->user_data, ki_user_data);
 575                WRITE_ONCE(cqe->res, res);
 576                WRITE_ONCE(cqe->flags, 0);
 577        } else {
 578                WRITE_ONCE(ctx->rings->cq_overflow,
 579                                atomic_inc_return(&ctx->cached_cq_overflow));
 580        }
 581}
 582
 583static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
 584{
 585        if (waitqueue_active(&ctx->wait))
 586                wake_up(&ctx->wait);
 587        if (waitqueue_active(&ctx->sqo_wait))
 588                wake_up(&ctx->sqo_wait);
 589        if (ctx->cq_ev_fd)
 590                eventfd_signal(ctx->cq_ev_fd, 1);
 591}
 592
 593static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
 594                                long res)
 595{
 596        unsigned long flags;
 597
 598        spin_lock_irqsave(&ctx->completion_lock, flags);
 599        io_cqring_fill_event(ctx, user_data, res);
 600        io_commit_cqring(ctx);
 601        spin_unlock_irqrestore(&ctx->completion_lock, flags);
 602
 603        io_cqring_ev_posted(ctx);
 604}
 605
 606static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
 607                                   struct io_submit_state *state)
 608{
 609        gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
 610        struct io_kiocb *req;
 611
 612        if (!percpu_ref_tryget(&ctx->refs))
 613                return NULL;
 614
 615        if (!state) {
 616                req = kmem_cache_alloc(req_cachep, gfp);
 617                if (unlikely(!req))
 618                        goto out;
 619        } else if (!state->free_reqs) {
 620                size_t sz;
 621                int ret;
 622
 623                sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
 624                ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
 625
 626                /*
 627                 * Bulk alloc is all-or-nothing. If we fail to get a batch,
 628                 * retry single alloc to be on the safe side.
 629                 */
 630                if (unlikely(ret <= 0)) {
 631                        state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
 632                        if (!state->reqs[0])
 633                                goto out;
 634                        ret = 1;
 635                }
 636                state->free_reqs = ret - 1;
 637                state->cur_req = 1;
 638                req = state->reqs[0];
 639        } else {
 640                req = state->reqs[state->cur_req];
 641                state->free_reqs--;
 642                state->cur_req++;
 643        }
 644
 645        req->file = NULL;
 646        req->ctx = ctx;
 647        req->flags = 0;
 648        /* one is dropped after submission, the other at completion */
 649        refcount_set(&req->refs, 2);
 650        req->result = 0;
 651        return req;
 652out:
 653        percpu_ref_put(&ctx->refs);
 654        return NULL;
 655}
 656
 657static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
 658{
 659        if (*nr) {
 660                kmem_cache_free_bulk(req_cachep, *nr, reqs);
 661                percpu_ref_put_many(&ctx->refs, *nr);
 662                *nr = 0;
 663        }
 664}
 665
 666static void __io_free_req(struct io_kiocb *req)
 667{
 668        if (req->file && !(req->flags & REQ_F_FIXED_FILE))
 669                fput(req->file);
 670        percpu_ref_put(&req->ctx->refs);
 671        kmem_cache_free(req_cachep, req);
 672}
 673
 674static void io_req_link_next(struct io_kiocb *req)
 675{
 676        struct io_kiocb *nxt;
 677
 678        /*
 679         * The list should never be empty when we are called here. But could
 680         * potentially happen if the chain is messed up, check to be on the
 681         * safe side.
 682         */
 683        nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
 684        if (nxt) {
 685                list_del(&nxt->list);
 686                if (!list_empty(&req->link_list)) {
 687                        INIT_LIST_HEAD(&nxt->link_list);
 688                        list_splice(&req->link_list, &nxt->link_list);
 689                        nxt->flags |= REQ_F_LINK;
 690                }
 691
 692                nxt->flags |= REQ_F_LINK_DONE;
 693                INIT_WORK(&nxt->work, io_sq_wq_submit_work);
 694                io_queue_async_work(req->ctx, nxt);
 695        }
 696}
 697
 698/*
 699 * Called if REQ_F_LINK is set, and we fail the head request
 700 */
 701static void io_fail_links(struct io_kiocb *req)
 702{
 703        struct io_kiocb *link;
 704
 705        while (!list_empty(&req->link_list)) {
 706                link = list_first_entry(&req->link_list, struct io_kiocb, list);
 707                list_del(&link->list);
 708
 709                io_cqring_add_event(req->ctx, link->user_data, -ECANCELED);
 710                __io_free_req(link);
 711        }
 712}
 713
 714static void io_free_req(struct io_kiocb *req)
 715{
 716        /*
 717         * If LINK is set, we have dependent requests in this chain. If we
 718         * didn't fail this request, queue the first one up, moving any other
 719         * dependencies to the next request. In case of failure, fail the rest
 720         * of the chain.
 721         */
 722        if (req->flags & REQ_F_LINK) {
 723                if (req->flags & REQ_F_FAIL_LINK)
 724                        io_fail_links(req);
 725                else
 726                        io_req_link_next(req);
 727        }
 728
 729        __io_free_req(req);
 730}
 731
 732static void io_put_req(struct io_kiocb *req)
 733{
 734        if (refcount_dec_and_test(&req->refs))
 735                io_free_req(req);
 736}
 737
 738static unsigned io_cqring_events(struct io_rings *rings)
 739{
 740        /* See comment at the top of this file */
 741        smp_rmb();
 742        return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
 743}
 744
 745static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
 746{
 747        struct io_rings *rings = ctx->rings;
 748
 749        /* make sure SQ entry isn't read before tail */
 750        return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
 751}
 752
 753/*
 754 * Find and free completed poll iocbs
 755 */
 756static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 757                               struct list_head *done)
 758{
 759        void *reqs[IO_IOPOLL_BATCH];
 760        struct io_kiocb *req;
 761        int to_free;
 762
 763        to_free = 0;
 764        while (!list_empty(done)) {
 765                req = list_first_entry(done, struct io_kiocb, list);
 766                list_del(&req->list);
 767
 768                io_cqring_fill_event(ctx, req->user_data, req->result);
 769                (*nr_events)++;
 770
 771                if (refcount_dec_and_test(&req->refs)) {
 772                        /* If we're not using fixed files, we have to pair the
 773                         * completion part with the file put. Use regular
 774                         * completions for those, only batch free for fixed
 775                         * file and non-linked commands.
 776                         */
 777                        if ((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
 778                            REQ_F_FIXED_FILE) {
 779                                reqs[to_free++] = req;
 780                                if (to_free == ARRAY_SIZE(reqs))
 781                                        io_free_req_many(ctx, reqs, &to_free);
 782                        } else {
 783                                io_free_req(req);
 784                        }
 785                }
 786        }
 787
 788        io_commit_cqring(ctx);
 789        io_free_req_many(ctx, reqs, &to_free);
 790}
 791
 792static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
 793                        long min)
 794{
 795        struct io_kiocb *req, *tmp;
 796        LIST_HEAD(done);
 797        bool spin;
 798        int ret;
 799
 800        /*
 801         * Only spin for completions if we don't have multiple devices hanging
 802         * off our complete list, and we're under the requested amount.
 803         */
 804        spin = !ctx->poll_multi_file && *nr_events < min;
 805
 806        ret = 0;
 807        list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
 808                struct kiocb *kiocb = &req->rw;
 809
 810                /*
 811                 * Move completed entries to our local list. If we find a
 812                 * request that requires polling, break out and complete
 813                 * the done list first, if we have entries there.
 814                 */
 815                if (req->flags & REQ_F_IOPOLL_COMPLETED) {
 816                        list_move_tail(&req->list, &done);
 817                        continue;
 818                }
 819                if (!list_empty(&done))
 820                        break;
 821
 822                ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
 823                if (ret < 0)
 824                        break;
 825
 826                if (ret && spin)
 827                        spin = false;
 828                ret = 0;
 829        }
 830
 831        if (!list_empty(&done))
 832                io_iopoll_complete(ctx, nr_events, &done);
 833
 834        return ret;
 835}
 836
 837/*
 838 * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
 839 * non-spinning poll check - we'll still enter the driver poll loop, but only
 840 * as a non-spinning completion check.
 841 */
 842static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
 843                                long min)
 844{
 845        while (!list_empty(&ctx->poll_list) && !need_resched()) {
 846                int ret;
 847
 848                ret = io_do_iopoll(ctx, nr_events, min);
 849                if (ret < 0)
 850                        return ret;
 851                if (!min || *nr_events >= min)
 852                        return 0;
 853        }
 854
 855        return 1;
 856}
 857
 858/*
 859 * We can't just wait for polled events to come to us, we have to actively
 860 * find and complete them.
 861 */
 862static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
 863{
 864        if (!(ctx->flags & IORING_SETUP_IOPOLL))
 865                return;
 866
 867        mutex_lock(&ctx->uring_lock);
 868        while (!list_empty(&ctx->poll_list)) {
 869                unsigned int nr_events = 0;
 870
 871                io_iopoll_getevents(ctx, &nr_events, 1);
 872
 873                /*
 874                 * Ensure we allow local-to-the-cpu processing to take place,
 875                 * in this case we need to ensure that we reap all events.
 876                 */
 877                cond_resched();
 878        }
 879        mutex_unlock(&ctx->uring_lock);
 880}
 881
 882static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
 883                            long min)
 884{
 885        int iters = 0, ret = 0;
 886
 887        do {
 888                int tmin = 0;
 889
 890                /*
 891                 * Don't enter poll loop if we already have events pending.
 892                 * If we do, we can potentially be spinning for commands that
 893                 * already triggered a CQE (eg in error).
 894                 */
 895                if (io_cqring_events(ctx->rings))
 896                        break;
 897
 898                /*
 899                 * If a submit got punted to a workqueue, we can have the
 900                 * application entering polling for a command before it gets
 901                 * issued. That app will hold the uring_lock for the duration
 902                 * of the poll right here, so we need to take a breather every
 903                 * now and then to ensure that the issue has a chance to add
 904                 * the poll to the issued list. Otherwise we can spin here
 905                 * forever, while the workqueue is stuck trying to acquire the
 906                 * very same mutex.
 907                 */
 908                if (!(++iters & 7)) {
 909                        mutex_unlock(&ctx->uring_lock);
 910                        mutex_lock(&ctx->uring_lock);
 911                }
 912
 913                if (*nr_events < min)
 914                        tmin = min - *nr_events;
 915
 916                ret = io_iopoll_getevents(ctx, nr_events, tmin);
 917                if (ret <= 0)
 918                        break;
 919                ret = 0;
 920        } while (min && !*nr_events && !need_resched());
 921
 922        return ret;
 923}
 924
 925static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
 926                           long min)
 927{
 928        int ret;
 929
 930        /*
 931         * We disallow the app entering submit/complete with polling, but we
 932         * still need to lock the ring to prevent racing with polled issue
 933         * that got punted to a workqueue.
 934         */
 935        mutex_lock(&ctx->uring_lock);
 936        ret = __io_iopoll_check(ctx, nr_events, min);
 937        mutex_unlock(&ctx->uring_lock);
 938        return ret;
 939}
 940
 941static void kiocb_end_write(struct io_kiocb *req)
 942{
 943        /*
 944         * Tell lockdep we inherited freeze protection from submission
 945         * thread.
 946         */
 947        if (req->flags & REQ_F_ISREG) {
 948                struct inode *inode = file_inode(req->file);
 949
 950                __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
 951        }
 952        file_end_write(req->file);
 953}
 954
 955static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
 956{
 957        struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
 958
 959        if (kiocb->ki_flags & IOCB_WRITE)
 960                kiocb_end_write(req);
 961
 962        if ((req->flags & REQ_F_LINK) && res != req->result)
 963                req->flags |= REQ_F_FAIL_LINK;
 964        io_cqring_add_event(req->ctx, req->user_data, res);
 965        io_put_req(req);
 966}
 967
 968static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
 969{
 970        struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
 971
 972        if (kiocb->ki_flags & IOCB_WRITE)
 973                kiocb_end_write(req);
 974
 975        if ((req->flags & REQ_F_LINK) && res != req->result)
 976                req->flags |= REQ_F_FAIL_LINK;
 977        req->result = res;
 978        if (res != -EAGAIN)
 979                req->flags |= REQ_F_IOPOLL_COMPLETED;
 980}
 981
 982/*
 983 * After the iocb has been issued, it's safe to be found on the poll list.
 984 * Adding the kiocb to the list AFTER submission ensures that we don't
 985 * find it from a io_iopoll_getevents() thread before the issuer is done
 986 * accessing the kiocb cookie.
 987 */
 988static void io_iopoll_req_issued(struct io_kiocb *req)
 989{
 990        struct io_ring_ctx *ctx = req->ctx;
 991
 992        /*
 993         * Track whether we have multiple files in our lists. This will impact
 994         * how we do polling eventually, not spinning if we're on potentially
 995         * different devices.
 996         */
 997        if (list_empty(&ctx->poll_list)) {
 998                ctx->poll_multi_file = false;
 999        } else if (!ctx->poll_multi_file) {
1000                struct io_kiocb *list_req;
1001
1002                list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1003                                                list);
1004                if (list_req->rw.ki_filp != req->rw.ki_filp)
1005                        ctx->poll_multi_file = true;
1006        }
1007
1008        /*
1009         * For fast devices, IO may have already completed. If it has, add
1010         * it to the front so we find it first.
1011         */
1012        if (req->flags & REQ_F_IOPOLL_COMPLETED)
1013                list_add(&req->list, &ctx->poll_list);
1014        else
1015                list_add_tail(&req->list, &ctx->poll_list);
1016}
1017
1018static void io_file_put(struct io_submit_state *state)
1019{
1020        if (state->file) {
1021                int diff = state->has_refs - state->used_refs;
1022
1023                if (diff)
1024                        fput_many(state->file, diff);
1025                state->file = NULL;
1026        }
1027}
1028
1029/*
1030 * Get as many references to a file as we have IOs left in this submission,
1031 * assuming most submissions are for one file, or at least that each file
1032 * has more than one submission.
1033 */
1034static struct file *io_file_get(struct io_submit_state *state, int fd)
1035{
1036        if (!state)
1037                return fget(fd);
1038
1039        if (state->file) {
1040                if (state->fd == fd) {
1041                        state->used_refs++;
1042                        state->ios_left--;
1043                        return state->file;
1044                }
1045                io_file_put(state);
1046        }
1047        state->file = fget_many(fd, state->ios_left);
1048        if (!state->file)
1049                return NULL;
1050
1051        state->fd = fd;
1052        state->has_refs = state->ios_left;
1053        state->used_refs = 1;
1054        state->ios_left--;
1055        return state->file;
1056}
1057
1058/*
1059 * If we tracked the file through the SCM inflight mechanism, we could support
1060 * any file. For now, just ensure that anything potentially problematic is done
1061 * inline.
1062 */
1063static bool io_file_supports_async(struct file *file)
1064{
1065        umode_t mode = file_inode(file)->i_mode;
1066
1067        if (S_ISBLK(mode) || S_ISCHR(mode))
1068                return true;
1069        if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1070                return true;
1071
1072        return false;
1073}
1074
1075static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
1076                      bool force_nonblock)
1077{
1078        const struct io_uring_sqe *sqe = s->sqe;
1079        struct io_ring_ctx *ctx = req->ctx;
1080        struct kiocb *kiocb = &req->rw;
1081        unsigned ioprio;
1082        int ret;
1083
1084        if (!req->file)
1085                return -EBADF;
1086
1087        if (S_ISREG(file_inode(req->file)->i_mode))
1088                req->flags |= REQ_F_ISREG;
1089
1090        /*
1091         * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1092         * we know to async punt it even if it was opened O_NONBLOCK
1093         */
1094        if (force_nonblock && !io_file_supports_async(req->file)) {
1095                req->flags |= REQ_F_MUST_PUNT;
1096                return -EAGAIN;
1097        }
1098
1099        kiocb->ki_pos = READ_ONCE(sqe->off);
1100        kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1101        kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1102
1103        ioprio = READ_ONCE(sqe->ioprio);
1104        if (ioprio) {
1105                ret = ioprio_check_cap(ioprio);
1106                if (ret)
1107                        return ret;
1108
1109                kiocb->ki_ioprio = ioprio;
1110        } else
1111                kiocb->ki_ioprio = get_current_ioprio();
1112
1113        ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1114        if (unlikely(ret))
1115                return ret;
1116
1117        /* don't allow async punt if RWF_NOWAIT was requested */
1118        if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1119            (req->file->f_flags & O_NONBLOCK))
1120                req->flags |= REQ_F_NOWAIT;
1121
1122        if (force_nonblock)
1123                kiocb->ki_flags |= IOCB_NOWAIT;
1124
1125        if (ctx->flags & IORING_SETUP_IOPOLL) {
1126                if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1127                    !kiocb->ki_filp->f_op->iopoll)
1128                        return -EOPNOTSUPP;
1129
1130                kiocb->ki_flags |= IOCB_HIPRI;
1131                kiocb->ki_complete = io_complete_rw_iopoll;
1132                req->result = 0;
1133        } else {
1134                if (kiocb->ki_flags & IOCB_HIPRI)
1135                        return -EINVAL;
1136                kiocb->ki_complete = io_complete_rw;
1137        }
1138        return 0;
1139}
1140
1141static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1142{
1143        switch (ret) {
1144        case -EIOCBQUEUED:
1145                break;
1146        case -ERESTARTSYS:
1147        case -ERESTARTNOINTR:
1148        case -ERESTARTNOHAND:
1149        case -ERESTART_RESTARTBLOCK:
1150                /*
1151                 * We can't just restart the syscall, since previously
1152                 * submitted sqes may already be in progress. Just fail this
1153                 * IO with EINTR.
1154                 */
1155                ret = -EINTR;
1156                /* fall through */
1157        default:
1158                kiocb->ki_complete(kiocb, ret, 0);
1159        }
1160}
1161
1162static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
1163                           const struct io_uring_sqe *sqe,
1164                           struct iov_iter *iter)
1165{
1166        size_t len = READ_ONCE(sqe->len);
1167        struct io_mapped_ubuf *imu;
1168        unsigned index, buf_index;
1169        size_t offset;
1170        u64 buf_addr;
1171
1172        /* attempt to use fixed buffers without having provided iovecs */
1173        if (unlikely(!ctx->user_bufs))
1174                return -EFAULT;
1175
1176        buf_index = READ_ONCE(sqe->buf_index);
1177        if (unlikely(buf_index >= ctx->nr_user_bufs))
1178                return -EFAULT;
1179
1180        index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1181        imu = &ctx->user_bufs[index];
1182        buf_addr = READ_ONCE(sqe->addr);
1183
1184        /* overflow */
1185        if (buf_addr + len < buf_addr)
1186                return -EFAULT;
1187        /* not inside the mapped region */
1188        if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1189                return -EFAULT;
1190
1191        /*
1192         * May not be a start of buffer, set size appropriately
1193         * and advance us to the beginning.
1194         */
1195        offset = buf_addr - imu->ubuf;
1196        iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1197
1198        if (offset) {
1199                /*
1200                 * Don't use iov_iter_advance() here, as it's really slow for
1201                 * using the latter parts of a big fixed buffer - it iterates
1202                 * over each segment manually. We can cheat a bit here, because
1203                 * we know that:
1204                 *
1205                 * 1) it's a BVEC iter, we set it up
1206                 * 2) all bvecs are PAGE_SIZE in size, except potentially the
1207                 *    first and last bvec
1208                 *
1209                 * So just find our index, and adjust the iterator afterwards.
1210                 * If the offset is within the first bvec (or the whole first
1211                 * bvec, just use iov_iter_advance(). This makes it easier
1212                 * since we can just skip the first segment, which may not
1213                 * be PAGE_SIZE aligned.
1214                 */
1215                const struct bio_vec *bvec = imu->bvec;
1216
1217                if (offset <= bvec->bv_len) {
1218                        iov_iter_advance(iter, offset);
1219                } else {
1220                        unsigned long seg_skip;
1221
1222                        /* skip first vec */
1223                        offset -= bvec->bv_len;
1224                        seg_skip = 1 + (offset >> PAGE_SHIFT);
1225
1226                        iter->bvec = bvec + seg_skip;
1227                        iter->nr_segs -= seg_skip;
1228                        iter->count -= bvec->bv_len + offset;
1229                        iter->iov_offset = offset & ~PAGE_MASK;
1230                }
1231        }
1232
1233        return len;
1234}
1235
1236static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
1237                               const struct sqe_submit *s, struct iovec **iovec,
1238                               struct iov_iter *iter)
1239{
1240        const struct io_uring_sqe *sqe = s->sqe;
1241        void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1242        size_t sqe_len = READ_ONCE(sqe->len);
1243        u8 opcode;
1244
1245        /*
1246         * We're reading ->opcode for the second time, but the first read
1247         * doesn't care whether it's _FIXED or not, so it doesn't matter
1248         * whether ->opcode changes concurrently. The first read does care
1249         * about whether it is a READ or a WRITE, so we don't trust this read
1250         * for that purpose and instead let the caller pass in the read/write
1251         * flag.
1252         */
1253        opcode = READ_ONCE(sqe->opcode);
1254        if (opcode == IORING_OP_READ_FIXED ||
1255            opcode == IORING_OP_WRITE_FIXED) {
1256                ssize_t ret = io_import_fixed(ctx, rw, sqe, iter);
1257                *iovec = NULL;
1258                return ret;
1259        }
1260
1261        if (!s->has_user)
1262                return -EFAULT;
1263
1264#ifdef CONFIG_COMPAT
1265        if (ctx->compat)
1266                return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1267                                                iovec, iter);
1268#endif
1269
1270        return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1271}
1272
1273static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb)
1274{
1275        if (al->file == kiocb->ki_filp) {
1276                off_t start, end;
1277
1278                /*
1279                 * Allow merging if we're anywhere in the range of the same
1280                 * page. Generally this happens for sub-page reads or writes,
1281                 * and it's beneficial to allow the first worker to bring the
1282                 * page in and the piggy backed work can then work on the
1283                 * cached page.
1284                 */
1285                start = al->io_start & PAGE_MASK;
1286                end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK;
1287                if (kiocb->ki_pos >= start && kiocb->ki_pos <= end)
1288                        return true;
1289        }
1290
1291        al->file = NULL;
1292        return false;
1293}
1294
1295/*
1296 * Make a note of the last file/offset/direction we punted to async
1297 * context. We'll use this information to see if we can piggy back a
1298 * sequential request onto the previous one, if it's still hasn't been
1299 * completed by the async worker.
1300 */
1301static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1302{
1303        struct async_list *async_list = &req->ctx->pending_async[rw];
1304        struct kiocb *kiocb = &req->rw;
1305        struct file *filp = kiocb->ki_filp;
1306
1307        if (io_should_merge(async_list, kiocb)) {
1308                unsigned long max_bytes;
1309
1310                /* Use 8x RA size as a decent limiter for both reads/writes */
1311                max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3);
1312                if (!max_bytes)
1313                        max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3);
1314
1315                /* If max len are exceeded, reset the state */
1316                if (async_list->io_len + len <= max_bytes) {
1317                        req->flags |= REQ_F_SEQ_PREV;
1318                        async_list->io_len += len;
1319                } else {
1320                        async_list->file = NULL;
1321                }
1322        }
1323
1324        /* New file? Reset state. */
1325        if (async_list->file != filp) {
1326                async_list->io_start = kiocb->ki_pos;
1327                async_list->io_len = len;
1328                async_list->file = filp;
1329        }
1330}
1331
1332/*
1333 * For files that don't have ->read_iter() and ->write_iter(), handle them
1334 * by looping over ->read() or ->write() manually.
1335 */
1336static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1337                           struct iov_iter *iter)
1338{
1339        ssize_t ret = 0;
1340
1341        /*
1342         * Don't support polled IO through this interface, and we can't
1343         * support non-blocking either. For the latter, this just causes
1344         * the kiocb to be handled from an async context.
1345         */
1346        if (kiocb->ki_flags & IOCB_HIPRI)
1347                return -EOPNOTSUPP;
1348        if (kiocb->ki_flags & IOCB_NOWAIT)
1349                return -EAGAIN;
1350
1351        while (iov_iter_count(iter)) {
1352                struct iovec iovec = iov_iter_iovec(iter);
1353                ssize_t nr;
1354
1355                if (rw == READ) {
1356                        nr = file->f_op->read(file, iovec.iov_base,
1357                                              iovec.iov_len, &kiocb->ki_pos);
1358                } else {
1359                        nr = file->f_op->write(file, iovec.iov_base,
1360                                               iovec.iov_len, &kiocb->ki_pos);
1361                }
1362
1363                if (nr < 0) {
1364                        if (!ret)
1365                                ret = nr;
1366                        break;
1367                }
1368                ret += nr;
1369                if (nr != iovec.iov_len)
1370                        break;
1371                iov_iter_advance(iter, nr);
1372        }
1373
1374        return ret;
1375}
1376
1377static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1378                   bool force_nonblock)
1379{
1380        struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1381        struct kiocb *kiocb = &req->rw;
1382        struct iov_iter iter;
1383        struct file *file;
1384        size_t iov_count;
1385        ssize_t read_size, ret;
1386
1387        ret = io_prep_rw(req, s, force_nonblock);
1388        if (ret)
1389                return ret;
1390        file = kiocb->ki_filp;
1391
1392        if (unlikely(!(file->f_mode & FMODE_READ)))
1393                return -EBADF;
1394
1395        ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1396        if (ret < 0)
1397                return ret;
1398
1399        read_size = ret;
1400        if (req->flags & REQ_F_LINK)
1401                req->result = read_size;
1402
1403        iov_count = iov_iter_count(&iter);
1404        ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1405        if (!ret) {
1406                ssize_t ret2;
1407
1408                if (file->f_op->read_iter)
1409                        ret2 = call_read_iter(file, kiocb, &iter);
1410                else
1411                        ret2 = loop_rw_iter(READ, file, kiocb, &iter);
1412
1413                /*
1414                 * In case of a short read, punt to async. This can happen
1415                 * if we have data partially cached. Alternatively we can
1416                 * return the short read, in which case the application will
1417                 * need to issue another SQE and wait for it. That SQE will
1418                 * need async punt anyway, so it's more efficient to do it
1419                 * here.
1420                 */
1421                if (force_nonblock && !(req->flags & REQ_F_NOWAIT) &&
1422                    (req->flags & REQ_F_ISREG) &&
1423                    ret2 > 0 && ret2 < read_size)
1424                        ret2 = -EAGAIN;
1425                /* Catch -EAGAIN return for forced non-blocking submission */
1426                if (!force_nonblock || ret2 != -EAGAIN) {
1427                        io_rw_done(kiocb, ret2);
1428                } else {
1429                        /*
1430                         * If ->needs_lock is true, we're already in async
1431                         * context.
1432                         */
1433                        if (!s->needs_lock)
1434                                io_async_list_note(READ, req, iov_count);
1435                        ret = -EAGAIN;
1436                }
1437        }
1438        kfree(iovec);
1439        return ret;
1440}
1441
1442static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1443                    bool force_nonblock)
1444{
1445        struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1446        struct kiocb *kiocb = &req->rw;
1447        struct iov_iter iter;
1448        struct file *file;
1449        size_t iov_count;
1450        ssize_t ret;
1451
1452        ret = io_prep_rw(req, s, force_nonblock);
1453        if (ret)
1454                return ret;
1455
1456        file = kiocb->ki_filp;
1457        if (unlikely(!(file->f_mode & FMODE_WRITE)))
1458                return -EBADF;
1459
1460        ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1461        if (ret < 0)
1462                return ret;
1463
1464        if (req->flags & REQ_F_LINK)
1465                req->result = ret;
1466
1467        iov_count = iov_iter_count(&iter);
1468
1469        ret = -EAGAIN;
1470        if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1471                /* If ->needs_lock is true, we're already in async context. */
1472                if (!s->needs_lock)
1473                        io_async_list_note(WRITE, req, iov_count);
1474                goto out_free;
1475        }
1476
1477        ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1478        if (!ret) {
1479                ssize_t ret2;
1480
1481                /*
1482                 * Open-code file_start_write here to grab freeze protection,
1483                 * which will be released by another thread in
1484                 * io_complete_rw().  Fool lockdep by telling it the lock got
1485                 * released so that it doesn't complain about the held lock when
1486                 * we return to userspace.
1487                 */
1488                if (req->flags & REQ_F_ISREG) {
1489                        __sb_start_write(file_inode(file)->i_sb,
1490                                                SB_FREEZE_WRITE, true);
1491                        __sb_writers_release(file_inode(file)->i_sb,
1492                                                SB_FREEZE_WRITE);
1493                }
1494                kiocb->ki_flags |= IOCB_WRITE;
1495
1496                if (file->f_op->write_iter)
1497                        ret2 = call_write_iter(file, kiocb, &iter);
1498                else
1499                        ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
1500                if (!force_nonblock || ret2 != -EAGAIN) {
1501                        io_rw_done(kiocb, ret2);
1502                } else {
1503                        /*
1504                         * If ->needs_lock is true, we're already in async
1505                         * context.
1506                         */
1507                        if (!s->needs_lock)
1508                                io_async_list_note(WRITE, req, iov_count);
1509                        ret = -EAGAIN;
1510                }
1511        }
1512out_free:
1513        kfree(iovec);
1514        return ret;
1515}
1516
1517/*
1518 * IORING_OP_NOP just posts a completion event, nothing else.
1519 */
1520static int io_nop(struct io_kiocb *req, u64 user_data)
1521{
1522        struct io_ring_ctx *ctx = req->ctx;
1523        long err = 0;
1524
1525        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1526                return -EINVAL;
1527
1528        io_cqring_add_event(ctx, user_data, err);
1529        io_put_req(req);
1530        return 0;
1531}
1532
1533static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1534{
1535        struct io_ring_ctx *ctx = req->ctx;
1536
1537        if (!req->file)
1538                return -EBADF;
1539
1540        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1541                return -EINVAL;
1542        if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1543                return -EINVAL;
1544
1545        return 0;
1546}
1547
1548static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1549                    bool force_nonblock)
1550{
1551        loff_t sqe_off = READ_ONCE(sqe->off);
1552        loff_t sqe_len = READ_ONCE(sqe->len);
1553        loff_t end = sqe_off + sqe_len;
1554        unsigned fsync_flags;
1555        int ret;
1556
1557        fsync_flags = READ_ONCE(sqe->fsync_flags);
1558        if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1559                return -EINVAL;
1560
1561        ret = io_prep_fsync(req, sqe);
1562        if (ret)
1563                return ret;
1564
1565        /* fsync always requires a blocking context */
1566        if (force_nonblock)
1567                return -EAGAIN;
1568
1569        ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1570                                end > 0 ? end : LLONG_MAX,
1571                                fsync_flags & IORING_FSYNC_DATASYNC);
1572
1573        if (ret < 0 && (req->flags & REQ_F_LINK))
1574                req->flags |= REQ_F_FAIL_LINK;
1575        io_cqring_add_event(req->ctx, sqe->user_data, ret);
1576        io_put_req(req);
1577        return 0;
1578}
1579
1580static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1581{
1582        struct io_ring_ctx *ctx = req->ctx;
1583        int ret = 0;
1584
1585        if (!req->file)
1586                return -EBADF;
1587
1588        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1589                return -EINVAL;
1590        if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1591                return -EINVAL;
1592
1593        return ret;
1594}
1595
1596static int io_sync_file_range(struct io_kiocb *req,
1597                              const struct io_uring_sqe *sqe,
1598                              bool force_nonblock)
1599{
1600        loff_t sqe_off;
1601        loff_t sqe_len;
1602        unsigned flags;
1603        int ret;
1604
1605        ret = io_prep_sfr(req, sqe);
1606        if (ret)
1607                return ret;
1608
1609        /* sync_file_range always requires a blocking context */
1610        if (force_nonblock)
1611                return -EAGAIN;
1612
1613        sqe_off = READ_ONCE(sqe->off);
1614        sqe_len = READ_ONCE(sqe->len);
1615        flags = READ_ONCE(sqe->sync_range_flags);
1616
1617        ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1618
1619        if (ret < 0 && (req->flags & REQ_F_LINK))
1620                req->flags |= REQ_F_FAIL_LINK;
1621        io_cqring_add_event(req->ctx, sqe->user_data, ret);
1622        io_put_req(req);
1623        return 0;
1624}
1625
1626#if defined(CONFIG_NET)
1627static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1628                           bool force_nonblock,
1629                   long (*fn)(struct socket *, struct user_msghdr __user *,
1630                                unsigned int))
1631{
1632        struct socket *sock;
1633        int ret;
1634
1635        if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1636                return -EINVAL;
1637
1638        sock = sock_from_file(req->file, &ret);
1639        if (sock) {
1640                struct user_msghdr __user *msg;
1641                unsigned flags;
1642
1643                flags = READ_ONCE(sqe->msg_flags);
1644                if (flags & MSG_DONTWAIT)
1645                        req->flags |= REQ_F_NOWAIT;
1646                else if (force_nonblock)
1647                        flags |= MSG_DONTWAIT;
1648
1649                msg = (struct user_msghdr __user *) (unsigned long)
1650                        READ_ONCE(sqe->addr);
1651
1652                ret = fn(sock, msg, flags);
1653                if (force_nonblock && ret == -EAGAIN)
1654                        return ret;
1655        }
1656
1657        io_cqring_add_event(req->ctx, sqe->user_data, ret);
1658        io_put_req(req);
1659        return 0;
1660}
1661#endif
1662
1663static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1664                      bool force_nonblock)
1665{
1666#if defined(CONFIG_NET)
1667        return io_send_recvmsg(req, sqe, force_nonblock, __sys_sendmsg_sock);
1668#else
1669        return -EOPNOTSUPP;
1670#endif
1671}
1672
1673static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1674                      bool force_nonblock)
1675{
1676#if defined(CONFIG_NET)
1677        return io_send_recvmsg(req, sqe, force_nonblock, __sys_recvmsg_sock);
1678#else
1679        return -EOPNOTSUPP;
1680#endif
1681}
1682
1683static void io_poll_remove_one(struct io_kiocb *req)
1684{
1685        struct io_poll_iocb *poll = &req->poll;
1686
1687        spin_lock(&poll->head->lock);
1688        WRITE_ONCE(poll->canceled, true);
1689        if (!list_empty(&poll->wait.entry)) {
1690                list_del_init(&poll->wait.entry);
1691                io_queue_async_work(req->ctx, req);
1692        }
1693        spin_unlock(&poll->head->lock);
1694
1695        list_del_init(&req->list);
1696}
1697
1698static void io_poll_remove_all(struct io_ring_ctx *ctx)
1699{
1700        struct io_kiocb *req;
1701
1702        spin_lock_irq(&ctx->completion_lock);
1703        while (!list_empty(&ctx->cancel_list)) {
1704                req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1705                io_poll_remove_one(req);
1706        }
1707        spin_unlock_irq(&ctx->completion_lock);
1708}
1709
1710/*
1711 * Find a running poll command that matches one specified in sqe->addr,
1712 * and remove it if found.
1713 */
1714static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1715{
1716        struct io_ring_ctx *ctx = req->ctx;
1717        struct io_kiocb *poll_req, *next;
1718        int ret = -ENOENT;
1719
1720        if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1721                return -EINVAL;
1722        if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1723            sqe->poll_events)
1724                return -EINVAL;
1725
1726        spin_lock_irq(&ctx->completion_lock);
1727        list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1728                if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1729                        io_poll_remove_one(poll_req);
1730                        ret = 0;
1731                        break;
1732                }
1733        }
1734        spin_unlock_irq(&ctx->completion_lock);
1735
1736        io_cqring_add_event(req->ctx, sqe->user_data, ret);
1737        io_put_req(req);
1738        return 0;
1739}
1740
1741static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1742                             __poll_t mask)
1743{
1744        req->poll.done = true;
1745        io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
1746        io_commit_cqring(ctx);
1747}
1748
1749static void io_poll_complete_work(struct work_struct *work)
1750{
1751        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1752        struct io_poll_iocb *poll = &req->poll;
1753        struct poll_table_struct pt = { ._key = poll->events };
1754        struct io_ring_ctx *ctx = req->ctx;
1755        __poll_t mask = 0;
1756
1757        if (!READ_ONCE(poll->canceled))
1758                mask = vfs_poll(poll->file, &pt) & poll->events;
1759
1760        /*
1761         * Note that ->ki_cancel callers also delete iocb from active_reqs after
1762         * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1763         * synchronize with them.  In the cancellation case the list_del_init
1764         * itself is not actually needed, but harmless so we keep it in to
1765         * avoid further branches in the fast path.
1766         */
1767        spin_lock_irq(&ctx->completion_lock);
1768        if (!mask && !READ_ONCE(poll->canceled)) {
1769                add_wait_queue(poll->head, &poll->wait);
1770                spin_unlock_irq(&ctx->completion_lock);
1771                return;
1772        }
1773        list_del_init(&req->list);
1774        io_poll_complete(ctx, req, mask);
1775        spin_unlock_irq(&ctx->completion_lock);
1776
1777        io_cqring_ev_posted(ctx);
1778        io_put_req(req);
1779}
1780
1781static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1782                        void *key)
1783{
1784        struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1785                                                        wait);
1786        struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1787        struct io_ring_ctx *ctx = req->ctx;
1788        __poll_t mask = key_to_poll(key);
1789        unsigned long flags;
1790
1791        /* for instances that support it check for an event match first: */
1792        if (mask && !(mask & poll->events))
1793                return 0;
1794
1795        list_del_init(&poll->wait.entry);
1796
1797        if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1798                list_del(&req->list);
1799                io_poll_complete(ctx, req, mask);
1800                spin_unlock_irqrestore(&ctx->completion_lock, flags);
1801
1802                io_cqring_ev_posted(ctx);
1803                io_put_req(req);
1804        } else {
1805                io_queue_async_work(ctx, req);
1806        }
1807
1808        return 1;
1809}
1810
1811struct io_poll_table {
1812        struct poll_table_struct pt;
1813        struct io_kiocb *req;
1814        int error;
1815};
1816
1817static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1818                               struct poll_table_struct *p)
1819{
1820        struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1821
1822        if (unlikely(pt->req->poll.head)) {
1823                pt->error = -EINVAL;
1824                return;
1825        }
1826
1827        pt->error = 0;
1828        pt->req->poll.head = head;
1829        add_wait_queue(head, &pt->req->poll.wait);
1830}
1831
1832static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1833{
1834        struct io_poll_iocb *poll = &req->poll;
1835        struct io_ring_ctx *ctx = req->ctx;
1836        struct io_poll_table ipt;
1837        bool cancel = false;
1838        __poll_t mask;
1839        u16 events;
1840
1841        if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1842                return -EINVAL;
1843        if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1844                return -EINVAL;
1845        if (!poll->file)
1846                return -EBADF;
1847
1848        req->submit.sqe = NULL;
1849        INIT_WORK(&req->work, io_poll_complete_work);
1850        events = READ_ONCE(sqe->poll_events);
1851        poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1852
1853        poll->head = NULL;
1854        poll->done = false;
1855        poll->canceled = false;
1856
1857        ipt.pt._qproc = io_poll_queue_proc;
1858        ipt.pt._key = poll->events;
1859        ipt.req = req;
1860        ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1861
1862        /* initialized the list so that we can do list_empty checks */
1863        INIT_LIST_HEAD(&poll->wait.entry);
1864        init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1865
1866        INIT_LIST_HEAD(&req->list);
1867
1868        mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1869
1870        spin_lock_irq(&ctx->completion_lock);
1871        if (likely(poll->head)) {
1872                spin_lock(&poll->head->lock);
1873                if (unlikely(list_empty(&poll->wait.entry))) {
1874                        if (ipt.error)
1875                                cancel = true;
1876                        ipt.error = 0;
1877                        mask = 0;
1878                }
1879                if (mask || ipt.error)
1880                        list_del_init(&poll->wait.entry);
1881                else if (cancel)
1882                        WRITE_ONCE(poll->canceled, true);
1883                else if (!poll->done) /* actually waiting for an event */
1884                        list_add_tail(&req->list, &ctx->cancel_list);
1885                spin_unlock(&poll->head->lock);
1886        }
1887        if (mask) { /* no async, we'd stolen it */
1888                ipt.error = 0;
1889                io_poll_complete(ctx, req, mask);
1890        }
1891        spin_unlock_irq(&ctx->completion_lock);
1892
1893        if (mask) {
1894                io_cqring_ev_posted(ctx);
1895                io_put_req(req);
1896        }
1897        return ipt.error;
1898}
1899
1900static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
1901{
1902        struct io_ring_ctx *ctx;
1903        struct io_kiocb *req, *prev;
1904        unsigned long flags;
1905
1906        req = container_of(timer, struct io_kiocb, timeout.timer);
1907        ctx = req->ctx;
1908        atomic_inc(&ctx->cq_timeouts);
1909
1910        spin_lock_irqsave(&ctx->completion_lock, flags);
1911        /*
1912         * Adjust the reqs sequence before the current one because it
1913         * will consume a slot in the cq_ring and the the cq_tail pointer
1914         * will be increased, otherwise other timeout reqs may return in
1915         * advance without waiting for enough wait_nr.
1916         */
1917        prev = req;
1918        list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
1919                prev->sequence++;
1920        list_del(&req->list);
1921
1922        io_cqring_fill_event(ctx, req->user_data, -ETIME);
1923        io_commit_cqring(ctx);
1924        spin_unlock_irqrestore(&ctx->completion_lock, flags);
1925
1926        io_cqring_ev_posted(ctx);
1927
1928        io_put_req(req);
1929        return HRTIMER_NORESTART;
1930}
1931
1932static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1933{
1934        unsigned count;
1935        struct io_ring_ctx *ctx = req->ctx;
1936        struct list_head *entry;
1937        struct timespec64 ts;
1938        unsigned span = 0;
1939
1940        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1941                return -EINVAL;
1942        if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->timeout_flags ||
1943            sqe->len != 1)
1944                return -EINVAL;
1945
1946        if (get_timespec64(&ts, u64_to_user_ptr(sqe->addr)))
1947                return -EFAULT;
1948
1949        req->flags |= REQ_F_TIMEOUT;
1950
1951        /*
1952         * sqe->off holds how many events that need to occur for this
1953         * timeout event to be satisfied. If it isn't set, then this is
1954         * a pure timeout request, sequence isn't used.
1955         */
1956        count = READ_ONCE(sqe->off);
1957        if (!count) {
1958                req->flags |= REQ_F_TIMEOUT_NOSEQ;
1959                spin_lock_irq(&ctx->completion_lock);
1960                entry = ctx->timeout_list.prev;
1961                goto add;
1962        }
1963
1964        req->sequence = ctx->cached_sq_head + count - 1;
1965        /* reuse it to store the count */
1966        req->submit.sequence = count;
1967
1968        /*
1969         * Insertion sort, ensuring the first entry in the list is always
1970         * the one we need first.
1971         */
1972        spin_lock_irq(&ctx->completion_lock);
1973        list_for_each_prev(entry, &ctx->timeout_list) {
1974                struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
1975                unsigned nxt_sq_head;
1976                long long tmp, tmp_nxt;
1977
1978                if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
1979                        continue;
1980
1981                /*
1982                 * Since cached_sq_head + count - 1 can overflow, use type long
1983                 * long to store it.
1984                 */
1985                tmp = (long long)ctx->cached_sq_head + count - 1;
1986                nxt_sq_head = nxt->sequence - nxt->submit.sequence + 1;
1987                tmp_nxt = (long long)nxt_sq_head + nxt->submit.sequence - 1;
1988
1989                /*
1990                 * cached_sq_head may overflow, and it will never overflow twice
1991                 * once there is some timeout req still be valid.
1992                 */
1993                if (ctx->cached_sq_head < nxt_sq_head)
1994                        tmp += UINT_MAX;
1995
1996                if (tmp > tmp_nxt)
1997                        break;
1998
1999                /*
2000                 * Sequence of reqs after the insert one and itself should
2001                 * be adjusted because each timeout req consumes a slot.
2002                 */
2003                span++;
2004                nxt->sequence++;
2005        }
2006        req->sequence -= span;
2007add:
2008        list_add(&req->list, entry);
2009        spin_unlock_irq(&ctx->completion_lock);
2010
2011        hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
2012        req->timeout.timer.function = io_timeout_fn;
2013        hrtimer_start(&req->timeout.timer, timespec64_to_ktime(ts),
2014                        HRTIMER_MODE_REL);
2015        return 0;
2016}
2017
2018static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
2019                        const struct io_uring_sqe *sqe)
2020{
2021        struct io_uring_sqe *sqe_copy;
2022
2023        if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
2024                return 0;
2025
2026        sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
2027        if (!sqe_copy)
2028                return -EAGAIN;
2029
2030        spin_lock_irq(&ctx->completion_lock);
2031        if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
2032                spin_unlock_irq(&ctx->completion_lock);
2033                kfree(sqe_copy);
2034                return 0;
2035        }
2036
2037        memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
2038        req->submit.sqe = sqe_copy;
2039
2040        INIT_WORK(&req->work, io_sq_wq_submit_work);
2041        list_add_tail(&req->list, &ctx->defer_list);
2042        spin_unlock_irq(&ctx->completion_lock);
2043        return -EIOCBQUEUED;
2044}
2045
2046static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2047                           const struct sqe_submit *s, bool force_nonblock)
2048{
2049        int ret, opcode;
2050
2051        req->user_data = READ_ONCE(s->sqe->user_data);
2052
2053        if (unlikely(s->index >= ctx->sq_entries))
2054                return -EINVAL;
2055
2056        opcode = READ_ONCE(s->sqe->opcode);
2057        switch (opcode) {
2058        case IORING_OP_NOP:
2059                ret = io_nop(req, req->user_data);
2060                break;
2061        case IORING_OP_READV:
2062                if (unlikely(s->sqe->buf_index))
2063                        return -EINVAL;
2064                ret = io_read(req, s, force_nonblock);
2065                break;
2066        case IORING_OP_WRITEV:
2067                if (unlikely(s->sqe->buf_index))
2068                        return -EINVAL;
2069                ret = io_write(req, s, force_nonblock);
2070                break;
2071        case IORING_OP_READ_FIXED:
2072                ret = io_read(req, s, force_nonblock);
2073                break;
2074        case IORING_OP_WRITE_FIXED:
2075                ret = io_write(req, s, force_nonblock);
2076                break;
2077        case IORING_OP_FSYNC:
2078                ret = io_fsync(req, s->sqe, force_nonblock);
2079                break;
2080        case IORING_OP_POLL_ADD:
2081                ret = io_poll_add(req, s->sqe);
2082                break;
2083        case IORING_OP_POLL_REMOVE:
2084                ret = io_poll_remove(req, s->sqe);
2085                break;
2086        case IORING_OP_SYNC_FILE_RANGE:
2087                ret = io_sync_file_range(req, s->sqe, force_nonblock);
2088                break;
2089        case IORING_OP_SENDMSG:
2090                ret = io_sendmsg(req, s->sqe, force_nonblock);
2091                break;
2092        case IORING_OP_RECVMSG:
2093                ret = io_recvmsg(req, s->sqe, force_nonblock);
2094                break;
2095        case IORING_OP_TIMEOUT:
2096                ret = io_timeout(req, s->sqe);
2097                break;
2098        default:
2099                ret = -EINVAL;
2100                break;
2101        }
2102
2103        if (ret)
2104                return ret;
2105
2106        if (ctx->flags & IORING_SETUP_IOPOLL) {
2107                if (req->result == -EAGAIN)
2108                        return -EAGAIN;
2109
2110                /* workqueue context doesn't hold uring_lock, grab it now */
2111                if (s->needs_lock)
2112                        mutex_lock(&ctx->uring_lock);
2113                io_iopoll_req_issued(req);
2114                if (s->needs_lock)
2115                        mutex_unlock(&ctx->uring_lock);
2116        }
2117
2118        return 0;
2119}
2120
2121static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
2122                                                 const struct io_uring_sqe *sqe)
2123{
2124        switch (sqe->opcode) {
2125        case IORING_OP_READV:
2126        case IORING_OP_READ_FIXED:
2127                return &ctx->pending_async[READ];
2128        case IORING_OP_WRITEV:
2129        case IORING_OP_WRITE_FIXED:
2130                return &ctx->pending_async[WRITE];
2131        default:
2132                return NULL;
2133        }
2134}
2135
2136static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
2137{
2138        u8 opcode = READ_ONCE(sqe->opcode);
2139
2140        return !(opcode == IORING_OP_READ_FIXED ||
2141                 opcode == IORING_OP_WRITE_FIXED);
2142}
2143
2144static void io_sq_wq_submit_work(struct work_struct *work)
2145{
2146        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2147        struct io_ring_ctx *ctx = req->ctx;
2148        struct mm_struct *cur_mm = NULL;
2149        struct async_list *async_list;
2150        LIST_HEAD(req_list);
2151        mm_segment_t old_fs;
2152        int ret;
2153
2154        async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
2155restart:
2156        do {
2157                struct sqe_submit *s = &req->submit;
2158                const struct io_uring_sqe *sqe = s->sqe;
2159                unsigned int flags = req->flags;
2160
2161                /* Ensure we clear previously set non-block flag */
2162                req->rw.ki_flags &= ~IOCB_NOWAIT;
2163
2164                ret = 0;
2165                if (io_sqe_needs_user(sqe) && !cur_mm) {
2166                        if (!mmget_not_zero(ctx->sqo_mm)) {
2167                                ret = -EFAULT;
2168                        } else {
2169                                cur_mm = ctx->sqo_mm;
2170                                use_mm(cur_mm);
2171                                old_fs = get_fs();
2172                                set_fs(USER_DS);
2173                        }
2174                }
2175
2176                if (!ret) {
2177                        s->has_user = cur_mm != NULL;
2178                        s->needs_lock = true;
2179                        do {
2180                                ret = __io_submit_sqe(ctx, req, s, false);
2181                                /*
2182                                 * We can get EAGAIN for polled IO even though
2183                                 * we're forcing a sync submission from here,
2184                                 * since we can't wait for request slots on the
2185                                 * block side.
2186                                 */
2187                                if (ret != -EAGAIN)
2188                                        break;
2189                                cond_resched();
2190                        } while (1);
2191                }
2192
2193                /* drop submission reference */
2194                io_put_req(req);
2195
2196                if (ret) {
2197                        io_cqring_add_event(ctx, sqe->user_data, ret);
2198                        io_put_req(req);
2199                }
2200
2201                /* async context always use a copy of the sqe */
2202                kfree(sqe);
2203
2204                /* req from defer and link list needn't decrease async cnt */
2205                if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE))
2206                        goto out;
2207
2208                if (!async_list)
2209                        break;
2210                if (!list_empty(&req_list)) {
2211                        req = list_first_entry(&req_list, struct io_kiocb,
2212                                                list);
2213                        list_del(&req->list);
2214                        continue;
2215                }
2216                if (list_empty(&async_list->list))
2217                        break;
2218
2219                req = NULL;
2220                spin_lock(&async_list->lock);
2221                if (list_empty(&async_list->list)) {
2222                        spin_unlock(&async_list->lock);
2223                        break;
2224                }
2225                list_splice_init(&async_list->list, &req_list);
2226                spin_unlock(&async_list->lock);
2227
2228                req = list_first_entry(&req_list, struct io_kiocb, list);
2229                list_del(&req->list);
2230        } while (req);
2231
2232        /*
2233         * Rare case of racing with a submitter. If we find the count has
2234         * dropped to zero AND we have pending work items, then restart
2235         * the processing. This is a tiny race window.
2236         */
2237        if (async_list) {
2238                ret = atomic_dec_return(&async_list->cnt);
2239                while (!ret && !list_empty(&async_list->list)) {
2240                        spin_lock(&async_list->lock);
2241                        atomic_inc(&async_list->cnt);
2242                        list_splice_init(&async_list->list, &req_list);
2243                        spin_unlock(&async_list->lock);
2244
2245                        if (!list_empty(&req_list)) {
2246                                req = list_first_entry(&req_list,
2247                                                        struct io_kiocb, list);
2248                                list_del(&req->list);
2249                                goto restart;
2250                        }
2251                        ret = atomic_dec_return(&async_list->cnt);
2252                }
2253        }
2254
2255out:
2256        if (cur_mm) {
2257                set_fs(old_fs);
2258                unuse_mm(cur_mm);
2259                mmput(cur_mm);
2260        }
2261}
2262
2263/*
2264 * See if we can piggy back onto previously submitted work, that is still
2265 * running. We currently only allow this if the new request is sequential
2266 * to the previous one we punted.
2267 */
2268static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
2269{
2270        bool ret;
2271
2272        if (!list)
2273                return false;
2274        if (!(req->flags & REQ_F_SEQ_PREV))
2275                return false;
2276        if (!atomic_read(&list->cnt))
2277                return false;
2278
2279        ret = true;
2280        spin_lock(&list->lock);
2281        list_add_tail(&req->list, &list->list);
2282        /*
2283         * Ensure we see a simultaneous modification from io_sq_wq_submit_work()
2284         */
2285        smp_mb();
2286        if (!atomic_read(&list->cnt)) {
2287                list_del_init(&req->list);
2288                ret = false;
2289        }
2290        spin_unlock(&list->lock);
2291        return ret;
2292}
2293
2294static bool io_op_needs_file(const struct io_uring_sqe *sqe)
2295{
2296        int op = READ_ONCE(sqe->opcode);
2297
2298        switch (op) {
2299        case IORING_OP_NOP:
2300        case IORING_OP_POLL_REMOVE:
2301        case IORING_OP_TIMEOUT:
2302                return false;
2303        default:
2304                return true;
2305        }
2306}
2307
2308static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
2309                           struct io_submit_state *state, struct io_kiocb *req)
2310{
2311        unsigned flags;
2312        int fd;
2313
2314        flags = READ_ONCE(s->sqe->flags);
2315        fd = READ_ONCE(s->sqe->fd);
2316
2317        if (flags & IOSQE_IO_DRAIN)
2318                req->flags |= REQ_F_IO_DRAIN;
2319        /*
2320         * All io need record the previous position, if LINK vs DARIN,
2321         * it can be used to mark the position of the first IO in the
2322         * link list.
2323         */
2324        req->sequence = s->sequence;
2325
2326        if (!io_op_needs_file(s->sqe))
2327                return 0;
2328
2329        if (flags & IOSQE_FIXED_FILE) {
2330                if (unlikely(!ctx->user_files ||
2331                    (unsigned) fd >= ctx->nr_user_files))
2332                        return -EBADF;
2333                req->file = ctx->user_files[fd];
2334                req->flags |= REQ_F_FIXED_FILE;
2335        } else {
2336                if (s->needs_fixed_file)
2337                        return -EBADF;
2338                req->file = io_file_get(state, fd);
2339                if (unlikely(!req->file))
2340                        return -EBADF;
2341        }
2342
2343        return 0;
2344}
2345
2346static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2347                        struct sqe_submit *s)
2348{
2349        int ret;
2350
2351        ret = __io_submit_sqe(ctx, req, s, true);
2352
2353        /*
2354         * We async punt it if the file wasn't marked NOWAIT, or if the file
2355         * doesn't support non-blocking read/write attempts
2356         */
2357        if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
2358            (req->flags & REQ_F_MUST_PUNT))) {
2359                struct io_uring_sqe *sqe_copy;
2360
2361                sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
2362                if (sqe_copy) {
2363                        struct async_list *list;
2364
2365                        s->sqe = sqe_copy;
2366                        memcpy(&req->submit, s, sizeof(*s));
2367                        list = io_async_list_from_sqe(ctx, s->sqe);
2368                        if (!io_add_to_prev_work(list, req)) {
2369                                if (list)
2370                                        atomic_inc(&list->cnt);
2371                                INIT_WORK(&req->work, io_sq_wq_submit_work);
2372                                io_queue_async_work(ctx, req);
2373                        }
2374
2375                        /*
2376                         * Queued up for async execution, worker will release
2377                         * submit reference when the iocb is actually submitted.
2378                         */
2379                        return 0;
2380                }
2381        }
2382
2383        /* drop submission reference */
2384        io_put_req(req);
2385
2386        /* and drop final reference, if we failed */
2387        if (ret) {
2388                io_cqring_add_event(ctx, req->user_data, ret);
2389                if (req->flags & REQ_F_LINK)
2390                        req->flags |= REQ_F_FAIL_LINK;
2391                io_put_req(req);
2392        }
2393
2394        return ret;
2395}
2396
2397static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2398                        struct sqe_submit *s)
2399{
2400        int ret;
2401
2402        ret = io_req_defer(ctx, req, s->sqe);
2403        if (ret) {
2404                if (ret != -EIOCBQUEUED) {
2405                        io_free_req(req);
2406                        io_cqring_add_event(ctx, s->sqe->user_data, ret);
2407                }
2408                return 0;
2409        }
2410
2411        return __io_queue_sqe(ctx, req, s);
2412}
2413
2414static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
2415                              struct sqe_submit *s, struct io_kiocb *shadow)
2416{
2417        int ret;
2418        int need_submit = false;
2419
2420        if (!shadow)
2421                return io_queue_sqe(ctx, req, s);
2422
2423        /*
2424         * Mark the first IO in link list as DRAIN, let all the following
2425         * IOs enter the defer list. all IO needs to be completed before link
2426         * list.
2427         */
2428        req->flags |= REQ_F_IO_DRAIN;
2429        ret = io_req_defer(ctx, req, s->sqe);
2430        if (ret) {
2431                if (ret != -EIOCBQUEUED) {
2432                        io_free_req(req);
2433                        __io_free_req(shadow);
2434                        io_cqring_add_event(ctx, s->sqe->user_data, ret);
2435                        return 0;
2436                }
2437        } else {
2438                /*
2439                 * If ret == 0 means that all IOs in front of link io are
2440                 * running done. let's queue link head.
2441                 */
2442                need_submit = true;
2443        }
2444
2445        /* Insert shadow req to defer_list, blocking next IOs */
2446        spin_lock_irq(&ctx->completion_lock);
2447        list_add_tail(&shadow->list, &ctx->defer_list);
2448        spin_unlock_irq(&ctx->completion_lock);
2449
2450        if (need_submit)
2451                return __io_queue_sqe(ctx, req, s);
2452
2453        return 0;
2454}
2455
2456#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
2457
2458static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
2459                          struct io_submit_state *state, struct io_kiocb **link)
2460{
2461        struct io_uring_sqe *sqe_copy;
2462        struct io_kiocb *req;
2463        int ret;
2464
2465        /* enforce forwards compatibility on users */
2466        if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) {
2467                ret = -EINVAL;
2468                goto err;
2469        }
2470
2471        req = io_get_req(ctx, state);
2472        if (unlikely(!req)) {
2473                ret = -EAGAIN;
2474                goto err;
2475        }
2476
2477        ret = io_req_set_file(ctx, s, state, req);
2478        if (unlikely(ret)) {
2479err_req:
2480                io_free_req(req);
2481err:
2482                io_cqring_add_event(ctx, s->sqe->user_data, ret);
2483                return;
2484        }
2485
2486        req->user_data = s->sqe->user_data;
2487
2488        /*
2489         * If we already have a head request, queue this one for async
2490         * submittal once the head completes. If we don't have a head but
2491         * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
2492         * submitted sync once the chain is complete. If none of those
2493         * conditions are true (normal request), then just queue it.
2494         */
2495        if (*link) {
2496                struct io_kiocb *prev = *link;
2497
2498                sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
2499                if (!sqe_copy) {
2500                        ret = -EAGAIN;
2501                        goto err_req;
2502                }
2503
2504                s->sqe = sqe_copy;
2505                memcpy(&req->submit, s, sizeof(*s));
2506                list_add_tail(&req->list, &prev->link_list);
2507        } else if (s->sqe->flags & IOSQE_IO_LINK) {
2508                req->flags |= REQ_F_LINK;
2509
2510                memcpy(&req->submit, s, sizeof(*s));
2511                INIT_LIST_HEAD(&req->link_list);
2512                *link = req;
2513        } else {
2514                io_queue_sqe(ctx, req, s);
2515        }
2516}
2517
2518/*
2519 * Batched submission is done, ensure local IO is flushed out.
2520 */
2521static void io_submit_state_end(struct io_submit_state *state)
2522{
2523        blk_finish_plug(&state->plug);
2524        io_file_put(state);
2525        if (state->free_reqs)
2526                kmem_cache_free_bulk(req_cachep, state->free_reqs,
2527                                        &state->reqs[state->cur_req]);
2528}
2529
2530/*
2531 * Start submission side cache.
2532 */
2533static void io_submit_state_start(struct io_submit_state *state,
2534                                  struct io_ring_ctx *ctx, unsigned max_ios)
2535{
2536        blk_start_plug(&state->plug);
2537        state->free_reqs = 0;
2538        state->file = NULL;
2539        state->ios_left = max_ios;
2540}
2541
2542static void io_commit_sqring(struct io_ring_ctx *ctx)
2543{
2544        struct io_rings *rings = ctx->rings;
2545
2546        if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
2547                /*
2548                 * Ensure any loads from the SQEs are done at this point,
2549                 * since once we write the new head, the application could
2550                 * write new data to them.
2551                 */
2552                smp_store_release(&rings->sq.head, ctx->cached_sq_head);
2553        }
2554}
2555
2556/*
2557 * Fetch an sqe, if one is available. Note that s->sqe will point to memory
2558 * that is mapped by userspace. This means that care needs to be taken to
2559 * ensure that reads are stable, as we cannot rely on userspace always
2560 * being a good citizen. If members of the sqe are validated and then later
2561 * used, it's important that those reads are done through READ_ONCE() to
2562 * prevent a re-load down the line.
2563 */
2564static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
2565{
2566        struct io_rings *rings = ctx->rings;
2567        u32 *sq_array = ctx->sq_array;
2568        unsigned head;
2569
2570        /*
2571         * The cached sq head (or cq tail) serves two purposes:
2572         *
2573         * 1) allows us to batch the cost of updating the user visible
2574         *    head updates.
2575         * 2) allows the kernel side to track the head on its own, even
2576         *    though the application is the one updating it.
2577         */
2578        head = ctx->cached_sq_head;
2579        /* make sure SQ entry isn't read before tail */
2580        if (head == smp_load_acquire(&rings->sq.tail))
2581                return false;
2582
2583        head = READ_ONCE(sq_array[head & ctx->sq_mask]);
2584        if (head < ctx->sq_entries) {
2585                s->index = head;
2586                s->sqe = &ctx->sq_sqes[head];
2587                s->sequence = ctx->cached_sq_head;
2588                ctx->cached_sq_head++;
2589                return true;
2590        }
2591
2592        /* drop invalid entries */
2593        ctx->cached_sq_head++;
2594        ctx->cached_sq_dropped++;
2595        WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
2596        return false;
2597}
2598
2599static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
2600                          bool has_user, bool mm_fault)
2601{
2602        struct io_submit_state state, *statep = NULL;
2603        struct io_kiocb *link = NULL;
2604        struct io_kiocb *shadow_req = NULL;
2605        bool prev_was_link = false;
2606        int i, submitted = 0;
2607
2608        if (nr > IO_PLUG_THRESHOLD) {
2609                io_submit_state_start(&state, ctx, nr);
2610                statep = &state;
2611        }
2612
2613        for (i = 0; i < nr; i++) {
2614                struct sqe_submit s;
2615
2616                if (!io_get_sqring(ctx, &s))
2617                        break;
2618
2619                /*
2620                 * If previous wasn't linked and we have a linked command,
2621                 * that's the end of the chain. Submit the previous link.
2622                 */
2623                if (!prev_was_link && link) {
2624                        io_queue_link_head(ctx, link, &link->submit, shadow_req);
2625                        link = NULL;
2626                        shadow_req = NULL;
2627                }
2628                prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
2629
2630                if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
2631                        if (!shadow_req) {
2632                                shadow_req = io_get_req(ctx, NULL);
2633                                if (unlikely(!shadow_req))
2634                                        goto out;
2635                                shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
2636                                refcount_dec(&shadow_req->refs);
2637                        }
2638                        shadow_req->sequence = s.sequence;
2639                }
2640
2641out:
2642                if (unlikely(mm_fault)) {
2643                        io_cqring_add_event(ctx, s.sqe->user_data,
2644                                                -EFAULT);
2645                } else {
2646                        s.has_user = has_user;
2647                        s.needs_lock = true;
2648                        s.needs_fixed_file = true;
2649                        io_submit_sqe(ctx, &s, statep, &link);
2650                        submitted++;
2651                }
2652        }
2653
2654        if (link)
2655                io_queue_link_head(ctx, link, &link->submit, shadow_req);
2656        if (statep)
2657                io_submit_state_end(&state);
2658
2659        return submitted;
2660}
2661
2662static int io_sq_thread(void *data)
2663{
2664        struct io_ring_ctx *ctx = data;
2665        struct mm_struct *cur_mm = NULL;
2666        mm_segment_t old_fs;
2667        DEFINE_WAIT(wait);
2668        unsigned inflight;
2669        unsigned long timeout;
2670
2671        complete(&ctx->sqo_thread_started);
2672
2673        old_fs = get_fs();
2674        set_fs(USER_DS);
2675
2676        timeout = inflight = 0;
2677        while (!kthread_should_park()) {
2678                bool mm_fault = false;
2679                unsigned int to_submit;
2680
2681                if (inflight) {
2682                        unsigned nr_events = 0;
2683
2684                        if (ctx->flags & IORING_SETUP_IOPOLL) {
2685                                /*
2686                                 * inflight is the count of the maximum possible
2687                                 * entries we submitted, but it can be smaller
2688                                 * if we dropped some of them. If we don't have
2689                                 * poll entries available, then we know that we
2690                                 * have nothing left to poll for. Reset the
2691                                 * inflight count to zero in that case.
2692                                 */
2693                                mutex_lock(&ctx->uring_lock);
2694                                if (!list_empty(&ctx->poll_list))
2695                                        __io_iopoll_check(ctx, &nr_events, 0);
2696                                else
2697                                        inflight = 0;
2698                                mutex_unlock(&ctx->uring_lock);
2699                        } else {
2700                                /*
2701                                 * Normal IO, just pretend everything completed.
2702                                 * We don't have to poll completions for that.
2703                                 */
2704                                nr_events = inflight;
2705                        }
2706
2707                        inflight -= nr_events;
2708                        if (!inflight)
2709                                timeout = jiffies + ctx->sq_thread_idle;
2710                }
2711
2712                to_submit = io_sqring_entries(ctx);
2713                if (!to_submit) {
2714                        /*
2715                         * We're polling. If we're within the defined idle
2716                         * period, then let us spin without work before going
2717                         * to sleep.
2718                         */
2719                        if (inflight || !time_after(jiffies, timeout)) {
2720                                cond_resched();
2721                                continue;
2722                        }
2723
2724                        /*
2725                         * Drop cur_mm before scheduling, we can't hold it for
2726                         * long periods (or over schedule()). Do this before
2727                         * adding ourselves to the waitqueue, as the unuse/drop
2728                         * may sleep.
2729                         */
2730                        if (cur_mm) {
2731                                unuse_mm(cur_mm);
2732                                mmput(cur_mm);
2733                                cur_mm = NULL;
2734                        }
2735
2736                        prepare_to_wait(&ctx->sqo_wait, &wait,
2737                                                TASK_INTERRUPTIBLE);
2738
2739                        /* Tell userspace we may need a wakeup call */
2740                        ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
2741                        /* make sure to read SQ tail after writing flags */
2742                        smp_mb();
2743
2744                        to_submit = io_sqring_entries(ctx);
2745                        if (!to_submit) {
2746                                if (kthread_should_park()) {
2747                                        finish_wait(&ctx->sqo_wait, &wait);
2748                                        break;
2749                                }
2750                                if (signal_pending(current))
2751                                        flush_signals(current);
2752                                schedule();
2753                                finish_wait(&ctx->sqo_wait, &wait);
2754
2755                                ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
2756                                continue;
2757                        }
2758                        finish_wait(&ctx->sqo_wait, &wait);
2759
2760                        ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
2761                }
2762
2763                /* Unless all new commands are FIXED regions, grab mm */
2764                if (!cur_mm) {
2765                        mm_fault = !mmget_not_zero(ctx->sqo_mm);
2766                        if (!mm_fault) {
2767                                use_mm(ctx->sqo_mm);
2768                                cur_mm = ctx->sqo_mm;
2769                        }
2770                }
2771
2772                to_submit = min(to_submit, ctx->sq_entries);
2773                inflight += io_submit_sqes(ctx, to_submit, cur_mm != NULL,
2774                                           mm_fault);
2775
2776                /* Commit SQ ring head once we've consumed all SQEs */
2777                io_commit_sqring(ctx);
2778        }
2779
2780        set_fs(old_fs);
2781        if (cur_mm) {
2782                unuse_mm(cur_mm);
2783                mmput(cur_mm);
2784        }
2785
2786        kthread_parkme();
2787
2788        return 0;
2789}
2790
2791static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2792{
2793        struct io_submit_state state, *statep = NULL;
2794        struct io_kiocb *link = NULL;
2795        struct io_kiocb *shadow_req = NULL;
2796        bool prev_was_link = false;
2797        int i, submit = 0;
2798
2799        if (to_submit > IO_PLUG_THRESHOLD) {
2800                io_submit_state_start(&state, ctx, to_submit);
2801                statep = &state;
2802        }
2803
2804        for (i = 0; i < to_submit; i++) {
2805                struct sqe_submit s;
2806
2807                if (!io_get_sqring(ctx, &s))
2808                        break;
2809
2810                /*
2811                 * If previous wasn't linked and we have a linked command,
2812                 * that's the end of the chain. Submit the previous link.
2813                 */
2814                if (!prev_was_link && link) {
2815                        io_queue_link_head(ctx, link, &link->submit, shadow_req);
2816                        link = NULL;
2817                        shadow_req = NULL;
2818                }
2819                prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
2820
2821                if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
2822                        if (!shadow_req) {
2823                                shadow_req = io_get_req(ctx, NULL);
2824                                if (unlikely(!shadow_req))
2825                                        goto out;
2826                                shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
2827                                refcount_dec(&shadow_req->refs);
2828                        }
2829                        shadow_req->sequence = s.sequence;
2830                }
2831
2832out:
2833                s.has_user = true;
2834                s.needs_lock = false;
2835                s.needs_fixed_file = false;
2836                submit++;
2837                io_submit_sqe(ctx, &s, statep, &link);
2838        }
2839
2840        if (link)
2841                io_queue_link_head(ctx, link, &link->submit, shadow_req);
2842        if (statep)
2843                io_submit_state_end(statep);
2844
2845        io_commit_sqring(ctx);
2846
2847        return submit;
2848}
2849
2850struct io_wait_queue {
2851        struct wait_queue_entry wq;
2852        struct io_ring_ctx *ctx;
2853        unsigned to_wait;
2854        unsigned nr_timeouts;
2855};
2856
2857static inline bool io_should_wake(struct io_wait_queue *iowq)
2858{
2859        struct io_ring_ctx *ctx = iowq->ctx;
2860
2861        /*
2862         * Wake up if we have enough events, or if a timeout occured since we
2863         * started waiting. For timeouts, we always want to return to userspace,
2864         * regardless of event count.
2865         */
2866        return io_cqring_events(ctx->rings) >= iowq->to_wait ||
2867                        atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
2868}
2869
2870static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
2871                            int wake_flags, void *key)
2872{
2873        struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
2874                                                        wq);
2875
2876        if (!io_should_wake(iowq))
2877                return -1;
2878
2879        return autoremove_wake_function(curr, mode, wake_flags, key);
2880}
2881
2882/*
2883 * Wait until events become available, if we don't already have some. The
2884 * application must reap them itself, as they reside on the shared cq ring.
2885 */
2886static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2887                          const sigset_t __user *sig, size_t sigsz)
2888{
2889        struct io_wait_queue iowq = {
2890                .wq = {
2891                        .private        = current,
2892                        .func           = io_wake_function,
2893                        .entry          = LIST_HEAD_INIT(iowq.wq.entry),
2894                },
2895                .ctx            = ctx,
2896                .to_wait        = min_events,
2897        };
2898        struct io_rings *rings = ctx->rings;
2899        int ret;
2900
2901        if (io_cqring_events(rings) >= min_events)
2902                return 0;
2903
2904        if (sig) {
2905#ifdef CONFIG_COMPAT
2906                if (in_compat_syscall())
2907                        ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2908                                                      sigsz);
2909                else
2910#endif
2911                        ret = set_user_sigmask(sig, sigsz);
2912
2913                if (ret)
2914                        return ret;
2915        }
2916
2917        ret = 0;
2918        iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
2919        do {
2920                prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
2921                                                TASK_INTERRUPTIBLE);
2922                if (io_should_wake(&iowq))
2923                        break;
2924                schedule();
2925                if (signal_pending(current)) {
2926                        ret = -ERESTARTSYS;
2927                        break;
2928                }
2929        } while (1);
2930        finish_wait(&ctx->wait, &iowq.wq);
2931
2932        restore_saved_sigmask_unless(ret == -ERESTARTSYS);
2933        if (ret == -ERESTARTSYS)
2934                ret = -EINTR;
2935
2936        return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
2937}
2938
2939static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2940{
2941#if defined(CONFIG_UNIX)
2942        if (ctx->ring_sock) {
2943                struct sock *sock = ctx->ring_sock->sk;
2944                struct sk_buff *skb;
2945
2946                while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2947                        kfree_skb(skb);
2948        }
2949#else
2950        int i;
2951
2952        for (i = 0; i < ctx->nr_user_files; i++)
2953                fput(ctx->user_files[i]);
2954#endif
2955}
2956
2957static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2958{
2959        if (!ctx->user_files)
2960                return -ENXIO;
2961
2962        __io_sqe_files_unregister(ctx);
2963        kfree(ctx->user_files);
2964        ctx->user_files = NULL;
2965        ctx->nr_user_files = 0;
2966        return 0;
2967}
2968
2969static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2970{
2971        if (ctx->sqo_thread) {
2972                wait_for_completion(&ctx->sqo_thread_started);
2973                /*
2974                 * The park is a bit of a work-around, without it we get
2975                 * warning spews on shutdown with SQPOLL set and affinity
2976                 * set to a single CPU.
2977                 */
2978                kthread_park(ctx->sqo_thread);
2979                kthread_stop(ctx->sqo_thread);
2980                ctx->sqo_thread = NULL;
2981        }
2982}
2983
2984static void io_finish_async(struct io_ring_ctx *ctx)
2985{
2986        int i;
2987
2988        io_sq_thread_stop(ctx);
2989
2990        for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) {
2991                if (ctx->sqo_wq[i]) {
2992                        destroy_workqueue(ctx->sqo_wq[i]);
2993                        ctx->sqo_wq[i] = NULL;
2994                }
2995        }
2996}
2997
2998#if defined(CONFIG_UNIX)
2999static void io_destruct_skb(struct sk_buff *skb)
3000{
3001        struct io_ring_ctx *ctx = skb->sk->sk_user_data;
3002        int i;
3003
3004        for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++)
3005                if (ctx->sqo_wq[i])
3006                        flush_workqueue(ctx->sqo_wq[i]);
3007
3008        unix_destruct_scm(skb);
3009}
3010
3011/*
3012 * Ensure the UNIX gc is aware of our file set, so we are certain that
3013 * the io_uring can be safely unregistered on process exit, even if we have
3014 * loops in the file referencing.
3015 */
3016static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
3017{
3018        struct sock *sk = ctx->ring_sock->sk;
3019        struct scm_fp_list *fpl;
3020        struct sk_buff *skb;
3021        int i;
3022
3023        if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
3024                unsigned long inflight = ctx->user->unix_inflight + nr;
3025
3026                if (inflight > task_rlimit(current, RLIMIT_NOFILE))
3027                        return -EMFILE;
3028        }
3029
3030        fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
3031        if (!fpl)
3032                return -ENOMEM;
3033
3034        skb = alloc_skb(0, GFP_KERNEL);
3035        if (!skb) {
3036                kfree(fpl);
3037                return -ENOMEM;
3038        }
3039
3040        skb->sk = sk;
3041        skb->destructor = io_destruct_skb;
3042
3043        fpl->user = get_uid(ctx->user);
3044        for (i = 0; i < nr; i++) {
3045                fpl->fp[i] = get_file(ctx->user_files[i + offset]);
3046                unix_inflight(fpl->user, fpl->fp[i]);
3047        }
3048
3049        fpl->max = fpl->count = nr;
3050        UNIXCB(skb).fp = fpl;
3051        refcount_add(skb->truesize, &sk->sk_wmem_alloc);
3052        skb_queue_head(&sk->sk_receive_queue, skb);
3053
3054        for (i = 0; i < nr; i++)
3055                fput(fpl->fp[i]);
3056
3057        return 0;
3058}
3059
3060/*
3061 * If UNIX sockets are enabled, fd passing can cause a reference cycle which
3062 * causes regular reference counting to break down. We rely on the UNIX
3063 * garbage collection to take care of this problem for us.
3064 */
3065static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3066{
3067        unsigned left, total;
3068        int ret = 0;
3069
3070        total = 0;
3071        left = ctx->nr_user_files;
3072        while (left) {
3073                unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
3074
3075                ret = __io_sqe_files_scm(ctx, this_files, total);
3076                if (ret)
3077                        break;
3078                left -= this_files;
3079                total += this_files;
3080        }
3081
3082        if (!ret)
3083                return 0;
3084
3085        while (total < ctx->nr_user_files) {
3086                fput(ctx->user_files[total]);
3087                total++;
3088        }
3089
3090        return ret;
3091}
3092#else
3093static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3094{
3095        return 0;
3096}
3097#endif
3098
3099static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
3100                                 unsigned nr_args)
3101{
3102        __s32 __user *fds = (__s32 __user *) arg;
3103        int fd, ret = 0;
3104        unsigned i;
3105
3106        if (ctx->user_files)
3107                return -EBUSY;
3108        if (!nr_args)
3109                return -EINVAL;
3110        if (nr_args > IORING_MAX_FIXED_FILES)
3111                return -EMFILE;
3112
3113        ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
3114        if (!ctx->user_files)
3115                return -ENOMEM;
3116
3117        for (i = 0; i < nr_args; i++) {
3118                ret = -EFAULT;
3119                if (copy_from_user(&fd, &fds[i], sizeof(fd)))
3120                        break;
3121
3122                ctx->user_files[i] = fget(fd);
3123
3124                ret = -EBADF;
3125                if (!ctx->user_files[i])
3126                        break;
3127                /*
3128                 * Don't allow io_uring instances to be registered. If UNIX
3129                 * isn't enabled, then this causes a reference cycle and this
3130                 * instance can never get freed. If UNIX is enabled we'll
3131                 * handle it just fine, but there's still no point in allowing
3132                 * a ring fd as it doesn't support regular read/write anyway.
3133                 */
3134                if (ctx->user_files[i]->f_op == &io_uring_fops) {
3135                        fput(ctx->user_files[i]);
3136                        break;
3137                }
3138                ctx->nr_user_files++;
3139                ret = 0;
3140        }
3141
3142        if (ret) {
3143                for (i = 0; i < ctx->nr_user_files; i++)
3144                        fput(ctx->user_files[i]);
3145
3146                kfree(ctx->user_files);
3147                ctx->user_files = NULL;
3148                ctx->nr_user_files = 0;
3149                return ret;
3150        }
3151
3152        ret = io_sqe_files_scm(ctx);
3153        if (ret)
3154                io_sqe_files_unregister(ctx);
3155
3156        return ret;
3157}
3158
3159static int io_sq_offload_start(struct io_ring_ctx *ctx,
3160                               struct io_uring_params *p)
3161{
3162        int ret;
3163
3164        init_waitqueue_head(&ctx->sqo_wait);
3165        mmgrab(current->mm);
3166        ctx->sqo_mm = current->mm;
3167
3168        if (ctx->flags & IORING_SETUP_SQPOLL) {
3169                ret = -EPERM;
3170                if (!capable(CAP_SYS_ADMIN))
3171                        goto err;
3172
3173                ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
3174                if (!ctx->sq_thread_idle)
3175                        ctx->sq_thread_idle = HZ;
3176
3177                if (p->flags & IORING_SETUP_SQ_AFF) {
3178                        int cpu = p->sq_thread_cpu;
3179
3180                        ret = -EINVAL;
3181                        if (cpu >= nr_cpu_ids)
3182                                goto err;
3183                        if (!cpu_online(cpu))
3184                                goto err;
3185
3186                        ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
3187                                                        ctx, cpu,
3188                                                        "io_uring-sq");
3189                } else {
3190                        ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
3191                                                        "io_uring-sq");
3192                }
3193                if (IS_ERR(ctx->sqo_thread)) {
3194                        ret = PTR_ERR(ctx->sqo_thread);
3195                        ctx->sqo_thread = NULL;
3196                        goto err;
3197                }
3198                wake_up_process(ctx->sqo_thread);
3199        } else if (p->flags & IORING_SETUP_SQ_AFF) {
3200                /* Can't have SQ_AFF without SQPOLL */
3201                ret = -EINVAL;
3202                goto err;
3203        }
3204
3205        /* Do QD, or 2 * CPUS, whatever is smallest */
3206        ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq",
3207                        WQ_UNBOUND | WQ_FREEZABLE,
3208                        min(ctx->sq_entries - 1, 2 * num_online_cpus()));
3209        if (!ctx->sqo_wq[0]) {
3210                ret = -ENOMEM;
3211                goto err;
3212        }
3213
3214        /*
3215         * This is for buffered writes, where we want to limit the parallelism
3216         * due to file locking in file systems. As "normal" buffered writes
3217         * should parellelize on writeout quite nicely, limit us to having 2
3218         * pending. This avoids massive contention on the inode when doing
3219         * buffered async writes.
3220         */
3221        ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq",
3222                                                WQ_UNBOUND | WQ_FREEZABLE, 2);
3223        if (!ctx->sqo_wq[1]) {
3224                ret = -ENOMEM;
3225                goto err;
3226        }
3227
3228        return 0;
3229err:
3230        io_finish_async(ctx);
3231        mmdrop(ctx->sqo_mm);
3232        ctx->sqo_mm = NULL;
3233        return ret;
3234}
3235
3236static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
3237{
3238        atomic_long_sub(nr_pages, &user->locked_vm);
3239}
3240
3241static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
3242{
3243        unsigned long page_limit, cur_pages, new_pages;
3244
3245        /* Don't allow more pages than we can safely lock */
3246        page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
3247
3248        do {
3249                cur_pages = atomic_long_read(&user->locked_vm);
3250                new_pages = cur_pages + nr_pages;
3251                if (new_pages > page_limit)
3252                        return -ENOMEM;
3253        } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
3254                                        new_pages) != cur_pages);
3255
3256        return 0;
3257}
3258
3259static void io_mem_free(void *ptr)
3260{
3261        struct page *page;
3262
3263        if (!ptr)
3264                return;
3265
3266        page = virt_to_head_page(ptr);
3267        if (put_page_testzero(page))
3268                free_compound_page(page);
3269}
3270
3271static void *io_mem_alloc(size_t size)
3272{
3273        gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
3274                                __GFP_NORETRY;
3275
3276        return (void *) __get_free_pages(gfp_flags, get_order(size));
3277}
3278
3279static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
3280                                size_t *sq_offset)
3281{
3282        struct io_rings *rings;
3283        size_t off, sq_array_size;
3284
3285        off = struct_size(rings, cqes, cq_entries);
3286        if (off == SIZE_MAX)
3287                return SIZE_MAX;
3288
3289#ifdef CONFIG_SMP
3290        off = ALIGN(off, SMP_CACHE_BYTES);
3291        if (off == 0)
3292                return SIZE_MAX;
3293#endif
3294
3295        sq_array_size = array_size(sizeof(u32), sq_entries);
3296        if (sq_array_size == SIZE_MAX)
3297                return SIZE_MAX;
3298
3299        if (check_add_overflow(off, sq_array_size, &off))
3300                return SIZE_MAX;
3301
3302        if (sq_offset)
3303                *sq_offset = off;
3304
3305        return off;
3306}
3307
3308static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
3309{
3310        size_t pages;
3311
3312        pages = (size_t)1 << get_order(
3313                rings_size(sq_entries, cq_entries, NULL));
3314        pages += (size_t)1 << get_order(
3315                array_size(sizeof(struct io_uring_sqe), sq_entries));
3316
3317        return pages;
3318}
3319
3320static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
3321{
3322        int i, j;
3323
3324        if (!ctx->user_bufs)
3325                return -ENXIO;
3326
3327        for (i = 0; i < ctx->nr_user_bufs; i++) {
3328                struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
3329
3330                for (j = 0; j < imu->nr_bvecs; j++)
3331                        put_user_page(imu->bvec[j].bv_page);
3332
3333                if (ctx->account_mem)
3334                        io_unaccount_mem(ctx->user, imu->nr_bvecs);
3335                kvfree(imu->bvec);
3336                imu->nr_bvecs = 0;
3337        }
3338
3339        kfree(ctx->user_bufs);
3340        ctx->user_bufs = NULL;
3341        ctx->nr_user_bufs = 0;
3342        return 0;
3343}
3344
3345static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
3346                       void __user *arg, unsigned index)
3347{
3348        struct iovec __user *src;
3349
3350#ifdef CONFIG_COMPAT
3351        if (ctx->compat) {
3352                struct compat_iovec __user *ciovs;
3353                struct compat_iovec ciov;
3354
3355                ciovs = (struct compat_iovec __user *) arg;
3356                if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
3357                        return -EFAULT;
3358
3359                dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
3360                dst->iov_len = ciov.iov_len;
3361                return 0;
3362        }
3363#endif
3364        src = (struct iovec __user *) arg;
3365        if (copy_from_user(dst, &src[index], sizeof(*dst)))
3366                return -EFAULT;
3367        return 0;
3368}
3369
3370static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
3371                                  unsigned nr_args)
3372{
3373        struct vm_area_struct **vmas = NULL;
3374        struct page **pages = NULL;
3375        int i, j, got_pages = 0;
3376        int ret = -EINVAL;
3377
3378        if (ctx->user_bufs)
3379                return -EBUSY;
3380        if (!nr_args || nr_args > UIO_MAXIOV)
3381                return -EINVAL;
3382
3383        ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
3384                                        GFP_KERNEL);
3385        if (!ctx->user_bufs)
3386                return -ENOMEM;
3387
3388        for (i = 0; i < nr_args; i++) {
3389                struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
3390                unsigned long off, start, end, ubuf;
3391                int pret, nr_pages;
3392                struct iovec iov;
3393                size_t size;
3394
3395                ret = io_copy_iov(ctx, &iov, arg, i);
3396                if (ret)
3397                        goto err;
3398
3399                /*
3400                 * Don't impose further limits on the size and buffer
3401                 * constraints here, we'll -EINVAL later when IO is
3402                 * submitted if they are wrong.
3403                 */
3404                ret = -EFAULT;
3405                if (!iov.iov_base || !iov.iov_len)
3406                        goto err;
3407
3408                /* arbitrary limit, but we need something */
3409                if (iov.iov_len > SZ_1G)
3410                        goto err;
3411
3412                ubuf = (unsigned long) iov.iov_base;
3413                end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
3414                start = ubuf >> PAGE_SHIFT;
3415                nr_pages = end - start;
3416
3417                if (ctx->account_mem) {
3418                        ret = io_account_mem(ctx->user, nr_pages);
3419                        if (ret)
3420                                goto err;
3421                }
3422
3423                ret = 0;
3424                if (!pages || nr_pages > got_pages) {
3425                        kfree(vmas);
3426                        kfree(pages);
3427                        pages = kvmalloc_array(nr_pages, sizeof(struct page *),
3428                                                GFP_KERNEL);
3429                        vmas = kvmalloc_array(nr_pages,
3430                                        sizeof(struct vm_area_struct *),
3431                                        GFP_KERNEL);
3432                        if (!pages || !vmas) {
3433                                ret = -ENOMEM;
3434                                if (ctx->account_mem)
3435                                        io_unaccount_mem(ctx->user, nr_pages);
3436                                goto err;
3437                        }
3438                        got_pages = nr_pages;
3439                }
3440
3441                imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
3442                                                GFP_KERNEL);
3443                ret = -ENOMEM;
3444                if (!imu->bvec) {
3445                        if (ctx->account_mem)
3446                                io_unaccount_mem(ctx->user, nr_pages);
3447                        goto err;
3448                }
3449
3450                ret = 0;
3451                down_read(&current->mm->mmap_sem);
3452                pret = get_user_pages(ubuf, nr_pages,
3453                                      FOLL_WRITE | FOLL_LONGTERM,
3454                                      pages, vmas);
3455                if (pret == nr_pages) {
3456                        /* don't support file backed memory */
3457                        for (j = 0; j < nr_pages; j++) {
3458                                struct vm_area_struct *vma = vmas[j];
3459
3460                                if (vma->vm_file &&
3461                                    !is_file_hugepages(vma->vm_file)) {
3462                                        ret = -EOPNOTSUPP;
3463                                        break;
3464                                }
3465                        }
3466                } else {
3467                        ret = pret < 0 ? pret : -EFAULT;
3468                }
3469                up_read(&current->mm->mmap_sem);
3470                if (ret) {
3471                        /*
3472                         * if we did partial map, or found file backed vmas,
3473                         * release any pages we did get
3474                         */
3475                        if (pret > 0)
3476                                put_user_pages(pages, pret);
3477                        if (ctx->account_mem)
3478                                io_unaccount_mem(ctx->user, nr_pages);
3479                        kvfree(imu->bvec);
3480                        goto err;
3481                }
3482
3483                off = ubuf & ~PAGE_MASK;
3484                size = iov.iov_len;
3485                for (j = 0; j < nr_pages; j++) {
3486                        size_t vec_len;
3487
3488                        vec_len = min_t(size_t, size, PAGE_SIZE - off);
3489                        imu->bvec[j].bv_page = pages[j];
3490                        imu->bvec[j].bv_len = vec_len;
3491                        imu->bvec[j].bv_offset = off;
3492                        off = 0;
3493                        size -= vec_len;
3494                }
3495                /* store original address for later verification */
3496                imu->ubuf = ubuf;
3497                imu->len = iov.iov_len;
3498                imu->nr_bvecs = nr_pages;
3499
3500                ctx->nr_user_bufs++;
3501        }
3502        kvfree(pages);
3503        kvfree(vmas);
3504        return 0;
3505err:
3506        kvfree(pages);
3507        kvfree(vmas);
3508        io_sqe_buffer_unregister(ctx);
3509        return ret;
3510}
3511
3512static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
3513{
3514        __s32 __user *fds = arg;
3515        int fd;
3516
3517        if (ctx->cq_ev_fd)
3518                return -EBUSY;
3519
3520        if (copy_from_user(&fd, fds, sizeof(*fds)))
3521                return -EFAULT;
3522
3523        ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
3524        if (IS_ERR(ctx->cq_ev_fd)) {
3525                int ret = PTR_ERR(ctx->cq_ev_fd);
3526                ctx->cq_ev_fd = NULL;
3527                return ret;
3528        }
3529
3530        return 0;
3531}
3532
3533static int io_eventfd_unregister(struct io_ring_ctx *ctx)
3534{
3535        if (ctx->cq_ev_fd) {
3536                eventfd_ctx_put(ctx->cq_ev_fd);
3537                ctx->cq_ev_fd = NULL;
3538                return 0;
3539        }
3540
3541        return -ENXIO;
3542}
3543
3544static void io_ring_ctx_free(struct io_ring_ctx *ctx)
3545{
3546        io_finish_async(ctx);
3547        if (ctx->sqo_mm)
3548                mmdrop(ctx->sqo_mm);
3549
3550        io_iopoll_reap_events(ctx);
3551        io_sqe_buffer_unregister(ctx);
3552        io_sqe_files_unregister(ctx);
3553        io_eventfd_unregister(ctx);
3554
3555#if defined(CONFIG_UNIX)
3556        if (ctx->ring_sock) {
3557                ctx->ring_sock->file = NULL; /* so that iput() is called */
3558                sock_release(ctx->ring_sock);
3559        }
3560#endif
3561
3562        io_mem_free(ctx->rings);
3563        io_mem_free(ctx->sq_sqes);
3564
3565        percpu_ref_exit(&ctx->refs);
3566        if (ctx->account_mem)
3567                io_unaccount_mem(ctx->user,
3568                                ring_pages(ctx->sq_entries, ctx->cq_entries));
3569        free_uid(ctx->user);
3570        kfree(ctx);
3571}
3572
3573static __poll_t io_uring_poll(struct file *file, poll_table *wait)
3574{
3575        struct io_ring_ctx *ctx = file->private_data;
3576        __poll_t mask = 0;
3577
3578        poll_wait(file, &ctx->cq_wait, wait);
3579        /*
3580         * synchronizes with barrier from wq_has_sleeper call in
3581         * io_commit_cqring
3582         */
3583        smp_rmb();
3584        if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
3585            ctx->rings->sq_ring_entries)
3586                mask |= EPOLLOUT | EPOLLWRNORM;
3587        if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
3588                mask |= EPOLLIN | EPOLLRDNORM;
3589
3590        return mask;
3591}
3592
3593static int io_uring_fasync(int fd, struct file *file, int on)
3594{
3595        struct io_ring_ctx *ctx = file->private_data;
3596
3597        return fasync_helper(fd, file, on, &ctx->cq_fasync);
3598}
3599
3600static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
3601{
3602        mutex_lock(&ctx->uring_lock);
3603        percpu_ref_kill(&ctx->refs);
3604        mutex_unlock(&ctx->uring_lock);
3605
3606        io_kill_timeouts(ctx);
3607        io_poll_remove_all(ctx);
3608        io_iopoll_reap_events(ctx);
3609        wait_for_completion(&ctx->ctx_done);
3610        io_ring_ctx_free(ctx);
3611}
3612
3613static int io_uring_release(struct inode *inode, struct file *file)
3614{
3615        struct io_ring_ctx *ctx = file->private_data;
3616
3617        file->private_data = NULL;
3618        io_ring_ctx_wait_and_kill(ctx);
3619        return 0;
3620}
3621
3622static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
3623{
3624        loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
3625        unsigned long sz = vma->vm_end - vma->vm_start;
3626        struct io_ring_ctx *ctx = file->private_data;
3627        unsigned long pfn;
3628        struct page *page;
3629        void *ptr;
3630
3631        switch (offset) {
3632        case IORING_OFF_SQ_RING:
3633        case IORING_OFF_CQ_RING:
3634                ptr = ctx->rings;
3635                break;
3636        case IORING_OFF_SQES:
3637                ptr = ctx->sq_sqes;
3638                break;
3639        default:
3640                return -EINVAL;
3641        }
3642
3643        page = virt_to_head_page(ptr);
3644        if (sz > page_size(page))
3645                return -EINVAL;
3646
3647        pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
3648        return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
3649}
3650
3651SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
3652                u32, min_complete, u32, flags, const sigset_t __user *, sig,
3653                size_t, sigsz)
3654{
3655        struct io_ring_ctx *ctx;
3656        long ret = -EBADF;
3657        int submitted = 0;
3658        struct fd f;
3659
3660        if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
3661                return -EINVAL;
3662
3663        f = fdget(fd);
3664        if (!f.file)
3665                return -EBADF;
3666
3667        ret = -EOPNOTSUPP;
3668        if (f.file->f_op != &io_uring_fops)
3669                goto out_fput;
3670
3671        ret = -ENXIO;
3672        ctx = f.file->private_data;
3673        if (!percpu_ref_tryget(&ctx->refs))
3674                goto out_fput;
3675
3676        /*
3677         * For SQ polling, the thread will do all submissions and completions.
3678         * Just return the requested submit count, and wake the thread if
3679         * we were asked to.
3680         */
3681        ret = 0;
3682        if (ctx->flags & IORING_SETUP_SQPOLL) {
3683                if (flags & IORING_ENTER_SQ_WAKEUP)
3684                        wake_up(&ctx->sqo_wait);
3685                submitted = to_submit;
3686        } else if (to_submit) {
3687                to_submit = min(to_submit, ctx->sq_entries);
3688
3689                mutex_lock(&ctx->uring_lock);
3690                submitted = io_ring_submit(ctx, to_submit);
3691                mutex_unlock(&ctx->uring_lock);
3692        }
3693        if (flags & IORING_ENTER_GETEVENTS) {
3694                unsigned nr_events = 0;
3695
3696                min_complete = min(min_complete, ctx->cq_entries);
3697
3698                if (ctx->flags & IORING_SETUP_IOPOLL) {
3699                        ret = io_iopoll_check(ctx, &nr_events, min_complete);
3700                } else {
3701                        ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
3702                }
3703        }
3704
3705        percpu_ref_put(&ctx->refs);
3706out_fput:
3707        fdput(f);
3708        return submitted ? submitted : ret;
3709}
3710
3711static const struct file_operations io_uring_fops = {
3712        .release        = io_uring_release,
3713        .mmap           = io_uring_mmap,
3714        .poll           = io_uring_poll,
3715        .fasync         = io_uring_fasync,
3716};
3717
3718static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
3719                                  struct io_uring_params *p)
3720{
3721        struct io_rings *rings;
3722        size_t size, sq_array_offset;
3723
3724        size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
3725        if (size == SIZE_MAX)
3726                return -EOVERFLOW;
3727
3728        rings = io_mem_alloc(size);
3729        if (!rings)
3730                return -ENOMEM;
3731
3732        ctx->rings = rings;
3733        ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
3734        rings->sq_ring_mask = p->sq_entries - 1;
3735        rings->cq_ring_mask = p->cq_entries - 1;
3736        rings->sq_ring_entries = p->sq_entries;
3737        rings->cq_ring_entries = p->cq_entries;
3738        ctx->sq_mask = rings->sq_ring_mask;
3739        ctx->cq_mask = rings->cq_ring_mask;
3740        ctx->sq_entries = rings->sq_ring_entries;
3741        ctx->cq_entries = rings->cq_ring_entries;
3742
3743        size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
3744        if (size == SIZE_MAX)
3745                return -EOVERFLOW;
3746
3747        ctx->sq_sqes = io_mem_alloc(size);
3748        if (!ctx->sq_sqes)
3749                return -ENOMEM;
3750
3751        return 0;
3752}
3753
3754/*
3755 * Allocate an anonymous fd, this is what constitutes the application
3756 * visible backing of an io_uring instance. The application mmaps this
3757 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
3758 * we have to tie this fd to a socket for file garbage collection purposes.
3759 */
3760static int io_uring_get_fd(struct io_ring_ctx *ctx)
3761{
3762        struct file *file;
3763        int ret;
3764
3765#if defined(CONFIG_UNIX)
3766        ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3767                                &ctx->ring_sock);
3768        if (ret)
3769                return ret;
3770#endif
3771
3772        ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3773        if (ret < 0)
3774                goto err;
3775
3776        file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3777                                        O_RDWR | O_CLOEXEC);
3778        if (IS_ERR(file)) {
3779                put_unused_fd(ret);
3780                ret = PTR_ERR(file);
3781                goto err;
3782        }
3783
3784#if defined(CONFIG_UNIX)
3785        ctx->ring_sock->file = file;
3786        ctx->ring_sock->sk->sk_user_data = ctx;
3787#endif
3788        fd_install(ret, file);
3789        return ret;
3790err:
3791#if defined(CONFIG_UNIX)
3792        sock_release(ctx->ring_sock);
3793        ctx->ring_sock = NULL;
3794#endif
3795        return ret;
3796}
3797
3798static int io_uring_create(unsigned entries, struct io_uring_params *p)
3799{
3800        struct user_struct *user = NULL;
3801        struct io_ring_ctx *ctx;
3802        bool account_mem;
3803        int ret;
3804
3805        if (!entries || entries > IORING_MAX_ENTRIES)
3806                return -EINVAL;
3807
3808        /*
3809         * Use twice as many entries for the CQ ring. It's possible for the
3810         * application to drive a higher depth than the size of the SQ ring,
3811         * since the sqes are only used at submission time. This allows for
3812         * some flexibility in overcommitting a bit.
3813         */
3814        p->sq_entries = roundup_pow_of_two(entries);
3815        p->cq_entries = 2 * p->sq_entries;
3816
3817        user = get_uid(current_user());
3818        account_mem = !capable(CAP_IPC_LOCK);
3819
3820        if (account_mem) {
3821                ret = io_account_mem(user,
3822                                ring_pages(p->sq_entries, p->cq_entries));
3823                if (ret) {
3824                        free_uid(user);
3825                        return ret;
3826                }
3827        }
3828
3829        ctx = io_ring_ctx_alloc(p);
3830        if (!ctx) {
3831                if (account_mem)
3832                        io_unaccount_mem(user, ring_pages(p->sq_entries,
3833                                                                p->cq_entries));
3834                free_uid(user);
3835                return -ENOMEM;
3836        }
3837        ctx->compat = in_compat_syscall();
3838        ctx->account_mem = account_mem;
3839        ctx->user = user;
3840
3841        ret = io_allocate_scq_urings(ctx, p);
3842        if (ret)
3843                goto err;
3844
3845        ret = io_sq_offload_start(ctx, p);
3846        if (ret)
3847                goto err;
3848
3849        memset(&p->sq_off, 0, sizeof(p->sq_off));
3850        p->sq_off.head = offsetof(struct io_rings, sq.head);
3851        p->sq_off.tail = offsetof(struct io_rings, sq.tail);
3852        p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
3853        p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
3854        p->sq_off.flags = offsetof(struct io_rings, sq_flags);
3855        p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
3856        p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
3857
3858        memset(&p->cq_off, 0, sizeof(p->cq_off));
3859        p->cq_off.head = offsetof(struct io_rings, cq.head);
3860        p->cq_off.tail = offsetof(struct io_rings, cq.tail);
3861        p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
3862        p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
3863        p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
3864        p->cq_off.cqes = offsetof(struct io_rings, cqes);
3865
3866        /*
3867         * Install ring fd as the very last thing, so we don't risk someone
3868         * having closed it before we finish setup
3869         */
3870        ret = io_uring_get_fd(ctx);
3871        if (ret < 0)
3872                goto err;
3873
3874        p->features = IORING_FEAT_SINGLE_MMAP;
3875        return ret;
3876err:
3877        io_ring_ctx_wait_and_kill(ctx);
3878        return ret;
3879}
3880
3881/*
3882 * Sets up an aio uring context, and returns the fd. Applications asks for a
3883 * ring size, we return the actual sq/cq ring sizes (among other things) in the
3884 * params structure passed in.
3885 */
3886static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3887{
3888        struct io_uring_params p;
3889        long ret;
3890        int i;
3891
3892        if (copy_from_user(&p, params, sizeof(p)))
3893                return -EFAULT;
3894        for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3895                if (p.resv[i])
3896                        return -EINVAL;
3897        }
3898
3899        if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3900                        IORING_SETUP_SQ_AFF))
3901                return -EINVAL;
3902
3903        ret = io_uring_create(entries, &p);
3904        if (ret < 0)
3905                return ret;
3906
3907        if (copy_to_user(params, &p, sizeof(p)))
3908                return -EFAULT;
3909
3910        return ret;
3911}
3912
3913SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3914                struct io_uring_params __user *, params)
3915{
3916        return io_uring_setup(entries, params);
3917}
3918
3919static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3920                               void __user *arg, unsigned nr_args)
3921        __releases(ctx->uring_lock)
3922        __acquires(ctx->uring_lock)
3923{
3924        int ret;
3925
3926        /*
3927         * We're inside the ring mutex, if the ref is already dying, then
3928         * someone else killed the ctx or is already going through
3929         * io_uring_register().
3930         */
3931        if (percpu_ref_is_dying(&ctx->refs))
3932                return -ENXIO;
3933
3934        percpu_ref_kill(&ctx->refs);
3935
3936        /*
3937         * Drop uring mutex before waiting for references to exit. If another
3938         * thread is currently inside io_uring_enter() it might need to grab
3939         * the uring_lock to make progress. If we hold it here across the drain
3940         * wait, then we can deadlock. It's safe to drop the mutex here, since
3941         * no new references will come in after we've killed the percpu ref.
3942         */
3943        mutex_unlock(&ctx->uring_lock);
3944        wait_for_completion(&ctx->ctx_done);
3945        mutex_lock(&ctx->uring_lock);
3946
3947        switch (opcode) {
3948        case IORING_REGISTER_BUFFERS:
3949                ret = io_sqe_buffer_register(ctx, arg, nr_args);
3950                break;
3951        case IORING_UNREGISTER_BUFFERS:
3952                ret = -EINVAL;
3953                if (arg || nr_args)
3954                        break;
3955                ret = io_sqe_buffer_unregister(ctx);
3956                break;
3957        case IORING_REGISTER_FILES:
3958                ret = io_sqe_files_register(ctx, arg, nr_args);
3959                break;
3960        case IORING_UNREGISTER_FILES:
3961                ret = -EINVAL;
3962                if (arg || nr_args)
3963                        break;
3964                ret = io_sqe_files_unregister(ctx);
3965                break;
3966        case IORING_REGISTER_EVENTFD:
3967                ret = -EINVAL;
3968                if (nr_args != 1)
3969                        break;
3970                ret = io_eventfd_register(ctx, arg);
3971                break;
3972        case IORING_UNREGISTER_EVENTFD:
3973                ret = -EINVAL;
3974                if (arg || nr_args)
3975                        break;
3976                ret = io_eventfd_unregister(ctx);
3977                break;
3978        default:
3979                ret = -EINVAL;
3980                break;
3981        }
3982
3983        /* bring the ctx back to life */
3984        reinit_completion(&ctx->ctx_done);
3985        percpu_ref_reinit(&ctx->refs);
3986        return ret;
3987}
3988
3989SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3990                void __user *, arg, unsigned int, nr_args)
3991{
3992        struct io_ring_ctx *ctx;
3993        long ret = -EBADF;
3994        struct fd f;
3995
3996        f = fdget(fd);
3997        if (!f.file)
3998                return -EBADF;
3999
4000        ret = -EOPNOTSUPP;
4001        if (f.file->f_op != &io_uring_fops)
4002                goto out_fput;
4003
4004        ctx = f.file->private_data;
4005
4006        mutex_lock(&ctx->uring_lock);
4007        ret = __io_uring_register(ctx, opcode, arg, nr_args);
4008        mutex_unlock(&ctx->uring_lock);
4009out_fput:
4010        fdput(f);
4011        return ret;
4012}
4013
4014static int __init io_uring_init(void)
4015{
4016        req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
4017        return 0;
4018};
4019__initcall(io_uring_init);
4020