linux/fs/io_uring.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Shared application/kernel submission and completion ring pairs, for
   4 * supporting fast/efficient IO.
   5 *
   6 * A note on the read/write ordering memory barriers that are matched between
   7 * the application and kernel side.
   8 *
   9 * After the application reads the CQ ring tail, it must use an
  10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
  11 * before writing the tail (using smp_load_acquire to read the tail will
  12 * do). It also needs a smp_mb() before updating CQ head (ordering the
  13 * entry load(s) with the head store), pairing with an implicit barrier
  14 * through a control-dependency in io_get_cqring (smp_store_release to
  15 * store head will do). Failure to do so could lead to reading invalid
  16 * CQ entries.
  17 *
  18 * Likewise, the application must use an appropriate smp_wmb() before
  19 * writing the SQ tail (ordering SQ entry stores with the tail store),
  20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
  21 * to store the tail will do). And it needs a barrier ordering the SQ
  22 * head load before writing new SQ entries (smp_load_acquire to read
  23 * head will do).
  24 *
  25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
  26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
  27 * updating the SQ tail; a full memory barrier smp_mb() is needed
  28 * between.
  29 *
  30 * Also see the examples in the liburing library:
  31 *
  32 *      git://git.kernel.dk/liburing
  33 *
  34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
  35 * from data shared between the kernel and application. This is done both
  36 * for ordering purposes, but also to ensure that once a value is loaded from
  37 * data that the application could potentially modify, it remains stable.
  38 *
  39 * Copyright (C) 2018-2019 Jens Axboe
  40 * Copyright (c) 2018-2019 Christoph Hellwig
  41 */
  42#include <linux/kernel.h>
  43#include <linux/init.h>
  44#include <linux/errno.h>
  45#include <linux/syscalls.h>
  46#include <linux/compat.h>
  47#include <linux/refcount.h>
  48#include <linux/uio.h>
  49
  50#include <linux/sched/signal.h>
  51#include <linux/fs.h>
  52#include <linux/file.h>
  53#include <linux/fdtable.h>
  54#include <linux/mm.h>
  55#include <linux/mman.h>
  56#include <linux/mmu_context.h>
  57#include <linux/percpu.h>
  58#include <linux/slab.h>
  59#include <linux/workqueue.h>
  60#include <linux/kthread.h>
  61#include <linux/blkdev.h>
  62#include <linux/bvec.h>
  63#include <linux/net.h>
  64#include <net/sock.h>
  65#include <net/af_unix.h>
  66#include <net/scm.h>
  67#include <linux/anon_inodes.h>
  68#include <linux/sched/mm.h>
  69#include <linux/uaccess.h>
  70#include <linux/nospec.h>
  71#include <linux/sizes.h>
  72#include <linux/hugetlb.h>
  73
  74#include <uapi/linux/io_uring.h>
  75
  76#include "internal.h"
  77
  78#define IORING_MAX_ENTRIES      4096
  79#define IORING_MAX_FIXED_FILES  1024
  80
  81struct io_uring {
  82        u32 head ____cacheline_aligned_in_smp;
  83        u32 tail ____cacheline_aligned_in_smp;
  84};
  85
  86/*
  87 * This data is shared with the application through the mmap at offset
  88 * IORING_OFF_SQ_RING.
  89 *
  90 * The offsets to the member fields are published through struct
  91 * io_sqring_offsets when calling io_uring_setup.
  92 */
  93struct io_sq_ring {
  94        /*
  95         * Head and tail offsets into the ring; the offsets need to be
  96         * masked to get valid indices.
  97         *
  98         * The kernel controls head and the application controls tail.
  99         */
 100        struct io_uring         r;
 101        /*
 102         * Bitmask to apply to head and tail offsets (constant, equals
 103         * ring_entries - 1)
 104         */
 105        u32                     ring_mask;
 106        /* Ring size (constant, power of 2) */
 107        u32                     ring_entries;
 108        /*
 109         * Number of invalid entries dropped by the kernel due to
 110         * invalid index stored in array
 111         *
 112         * Written by the kernel, shouldn't be modified by the
 113         * application (i.e. get number of "new events" by comparing to
 114         * cached value).
 115         *
 116         * After a new SQ head value was read by the application this
 117         * counter includes all submissions that were dropped reaching
 118         * the new SQ head (and possibly more).
 119         */
 120        u32                     dropped;
 121        /*
 122         * Runtime flags
 123         *
 124         * Written by the kernel, shouldn't be modified by the
 125         * application.
 126         *
 127         * The application needs a full memory barrier before checking
 128         * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
 129         */
 130        u32                     flags;
 131        /*
 132         * Ring buffer of indices into array of io_uring_sqe, which is
 133         * mmapped by the application using the IORING_OFF_SQES offset.
 134         *
 135         * This indirection could e.g. be used to assign fixed
 136         * io_uring_sqe entries to operations and only submit them to
 137         * the queue when needed.
 138         *
 139         * The kernel modifies neither the indices array nor the entries
 140         * array.
 141         */
 142        u32                     array[];
 143};
 144
 145/*
 146 * This data is shared with the application through the mmap at offset
 147 * IORING_OFF_CQ_RING.
 148 *
 149 * The offsets to the member fields are published through struct
 150 * io_cqring_offsets when calling io_uring_setup.
 151 */
 152struct io_cq_ring {
 153        /*
 154         * Head and tail offsets into the ring; the offsets need to be
 155         * masked to get valid indices.
 156         *
 157         * The application controls head and the kernel tail.
 158         */
 159        struct io_uring         r;
 160        /*
 161         * Bitmask to apply to head and tail offsets (constant, equals
 162         * ring_entries - 1)
 163         */
 164        u32                     ring_mask;
 165        /* Ring size (constant, power of 2) */
 166        u32                     ring_entries;
 167        /*
 168         * Number of completion events lost because the queue was full;
 169         * this should be avoided by the application by making sure
 170         * there are not more requests pending thatn there is space in
 171         * the completion queue.
 172         *
 173         * Written by the kernel, shouldn't be modified by the
 174         * application (i.e. get number of "new events" by comparing to
 175         * cached value).
 176         *
 177         * As completion events come in out of order this counter is not
 178         * ordered with any other data.
 179         */
 180        u32                     overflow;
 181        /*
 182         * Ring buffer of completion events.
 183         *
 184         * The kernel writes completion events fresh every time they are
 185         * produced, so the application is allowed to modify pending
 186         * entries.
 187         */
 188        struct io_uring_cqe     cqes[];
 189};
 190
 191struct io_mapped_ubuf {
 192        u64             ubuf;
 193        size_t          len;
 194        struct          bio_vec *bvec;
 195        unsigned int    nr_bvecs;
 196};
 197
 198struct async_list {
 199        spinlock_t              lock;
 200        atomic_t                cnt;
 201        struct list_head        list;
 202
 203        struct file             *file;
 204        off_t                   io_end;
 205        size_t                  io_pages;
 206};
 207
 208struct io_ring_ctx {
 209        struct {
 210                struct percpu_ref       refs;
 211        } ____cacheline_aligned_in_smp;
 212
 213        struct {
 214                unsigned int            flags;
 215                bool                    compat;
 216                bool                    account_mem;
 217
 218                /* SQ ring */
 219                struct io_sq_ring       *sq_ring;
 220                unsigned                cached_sq_head;
 221                unsigned                sq_entries;
 222                unsigned                sq_mask;
 223                unsigned                sq_thread_idle;
 224                struct io_uring_sqe     *sq_sqes;
 225
 226                struct list_head        defer_list;
 227        } ____cacheline_aligned_in_smp;
 228
 229        /* IO offload */
 230        struct workqueue_struct *sqo_wq;
 231        struct task_struct      *sqo_thread;    /* if using sq thread polling */
 232        struct mm_struct        *sqo_mm;
 233        wait_queue_head_t       sqo_wait;
 234
 235        struct {
 236                /* CQ ring */
 237                struct io_cq_ring       *cq_ring;
 238                unsigned                cached_cq_tail;
 239                unsigned                cq_entries;
 240                unsigned                cq_mask;
 241                struct wait_queue_head  cq_wait;
 242                struct fasync_struct    *cq_fasync;
 243                struct eventfd_ctx      *cq_ev_fd;
 244        } ____cacheline_aligned_in_smp;
 245
 246        /*
 247         * If used, fixed file set. Writers must ensure that ->refs is dead,
 248         * readers must ensure that ->refs is alive as long as the file* is
 249         * used. Only updated through io_uring_register(2).
 250         */
 251        struct file             **user_files;
 252        unsigned                nr_user_files;
 253
 254        /* if used, fixed mapped user buffers */
 255        unsigned                nr_user_bufs;
 256        struct io_mapped_ubuf   *user_bufs;
 257
 258        struct user_struct      *user;
 259
 260        struct completion       ctx_done;
 261
 262        struct {
 263                struct mutex            uring_lock;
 264                wait_queue_head_t       wait;
 265        } ____cacheline_aligned_in_smp;
 266
 267        struct {
 268                spinlock_t              completion_lock;
 269                bool                    poll_multi_file;
 270                /*
 271                 * ->poll_list is protected by the ctx->uring_lock for
 272                 * io_uring instances that don't use IORING_SETUP_SQPOLL.
 273                 * For SQPOLL, only the single threaded io_sq_thread() will
 274                 * manipulate the list, hence no extra locking is needed there.
 275                 */
 276                struct list_head        poll_list;
 277                struct list_head        cancel_list;
 278        } ____cacheline_aligned_in_smp;
 279
 280        struct async_list       pending_async[2];
 281
 282#if defined(CONFIG_UNIX)
 283        struct socket           *ring_sock;
 284#endif
 285};
 286
 287struct sqe_submit {
 288        const struct io_uring_sqe       *sqe;
 289        unsigned short                  index;
 290        bool                            has_user;
 291        bool                            needs_lock;
 292        bool                            needs_fixed_file;
 293};
 294
 295/*
 296 * First field must be the file pointer in all the
 297 * iocb unions! See also 'struct kiocb' in <linux/fs.h>
 298 */
 299struct io_poll_iocb {
 300        struct file                     *file;
 301        struct wait_queue_head          *head;
 302        __poll_t                        events;
 303        bool                            done;
 304        bool                            canceled;
 305        struct wait_queue_entry         wait;
 306};
 307
 308/*
 309 * NOTE! Each of the iocb union members has the file pointer
 310 * as the first entry in their struct definition. So you can
 311 * access the file pointer through any of the sub-structs,
 312 * or directly as just 'ki_filp' in this struct.
 313 */
 314struct io_kiocb {
 315        union {
 316                struct file             *file;
 317                struct kiocb            rw;
 318                struct io_poll_iocb     poll;
 319        };
 320
 321        struct sqe_submit       submit;
 322
 323        struct io_ring_ctx      *ctx;
 324        struct list_head        list;
 325        unsigned int            flags;
 326        refcount_t              refs;
 327#define REQ_F_NOWAIT            1       /* must not punt to workers */
 328#define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
 329#define REQ_F_FIXED_FILE        4       /* ctx owns file */
 330#define REQ_F_SEQ_PREV          8       /* sequential with previous */
 331#define REQ_F_IO_DRAIN          16      /* drain existing IO first */
 332#define REQ_F_IO_DRAINED        32      /* drain done */
 333        u64                     user_data;
 334        u32                     error;  /* iopoll result from callback */
 335        u32                     sequence;
 336
 337        struct work_struct      work;
 338};
 339
 340#define IO_PLUG_THRESHOLD               2
 341#define IO_IOPOLL_BATCH                 8
 342
 343struct io_submit_state {
 344        struct blk_plug         plug;
 345
 346        /*
 347         * io_kiocb alloc cache
 348         */
 349        void                    *reqs[IO_IOPOLL_BATCH];
 350        unsigned                int free_reqs;
 351        unsigned                int cur_req;
 352
 353        /*
 354         * File reference cache
 355         */
 356        struct file             *file;
 357        unsigned int            fd;
 358        unsigned int            has_refs;
 359        unsigned int            used_refs;
 360        unsigned int            ios_left;
 361};
 362
 363static void io_sq_wq_submit_work(struct work_struct *work);
 364
 365static struct kmem_cache *req_cachep;
 366
 367static const struct file_operations io_uring_fops;
 368
 369struct sock *io_uring_get_socket(struct file *file)
 370{
 371#if defined(CONFIG_UNIX)
 372        if (file->f_op == &io_uring_fops) {
 373                struct io_ring_ctx *ctx = file->private_data;
 374
 375                return ctx->ring_sock->sk;
 376        }
 377#endif
 378        return NULL;
 379}
 380EXPORT_SYMBOL(io_uring_get_socket);
 381
 382static void io_ring_ctx_ref_free(struct percpu_ref *ref)
 383{
 384        struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
 385
 386        complete(&ctx->ctx_done);
 387}
 388
 389static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 390{
 391        struct io_ring_ctx *ctx;
 392        int i;
 393
 394        ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
 395        if (!ctx)
 396                return NULL;
 397
 398        if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
 399                kfree(ctx);
 400                return NULL;
 401        }
 402
 403        ctx->flags = p->flags;
 404        init_waitqueue_head(&ctx->cq_wait);
 405        init_completion(&ctx->ctx_done);
 406        mutex_init(&ctx->uring_lock);
 407        init_waitqueue_head(&ctx->wait);
 408        for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
 409                spin_lock_init(&ctx->pending_async[i].lock);
 410                INIT_LIST_HEAD(&ctx->pending_async[i].list);
 411                atomic_set(&ctx->pending_async[i].cnt, 0);
 412        }
 413        spin_lock_init(&ctx->completion_lock);
 414        INIT_LIST_HEAD(&ctx->poll_list);
 415        INIT_LIST_HEAD(&ctx->cancel_list);
 416        INIT_LIST_HEAD(&ctx->defer_list);
 417        return ctx;
 418}
 419
 420static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
 421                                     struct io_kiocb *req)
 422{
 423        if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
 424                return false;
 425
 426        return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
 427}
 428
 429static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
 430{
 431        struct io_kiocb *req;
 432
 433        if (list_empty(&ctx->defer_list))
 434                return NULL;
 435
 436        req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
 437        if (!io_sequence_defer(ctx, req)) {
 438                list_del_init(&req->list);
 439                return req;
 440        }
 441
 442        return NULL;
 443}
 444
 445static void __io_commit_cqring(struct io_ring_ctx *ctx)
 446{
 447        struct io_cq_ring *ring = ctx->cq_ring;
 448
 449        if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
 450                /* order cqe stores with ring update */
 451                smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
 452
 453                if (wq_has_sleeper(&ctx->cq_wait)) {
 454                        wake_up_interruptible(&ctx->cq_wait);
 455                        kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
 456                }
 457        }
 458}
 459
 460static void io_commit_cqring(struct io_ring_ctx *ctx)
 461{
 462        struct io_kiocb *req;
 463
 464        __io_commit_cqring(ctx);
 465
 466        while ((req = io_get_deferred_req(ctx)) != NULL) {
 467                req->flags |= REQ_F_IO_DRAINED;
 468                queue_work(ctx->sqo_wq, &req->work);
 469        }
 470}
 471
 472static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 473{
 474        struct io_cq_ring *ring = ctx->cq_ring;
 475        unsigned tail;
 476
 477        tail = ctx->cached_cq_tail;
 478        /*
 479         * writes to the cq entry need to come after reading head; the
 480         * control dependency is enough as we're using WRITE_ONCE to
 481         * fill the cq entry
 482         */
 483        if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
 484                return NULL;
 485
 486        ctx->cached_cq_tail++;
 487        return &ring->cqes[tail & ctx->cq_mask];
 488}
 489
 490static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 491                                 long res)
 492{
 493        struct io_uring_cqe *cqe;
 494
 495        /*
 496         * If we can't get a cq entry, userspace overflowed the
 497         * submission (by quite a lot). Increment the overflow count in
 498         * the ring.
 499         */
 500        cqe = io_get_cqring(ctx);
 501        if (cqe) {
 502                WRITE_ONCE(cqe->user_data, ki_user_data);
 503                WRITE_ONCE(cqe->res, res);
 504                WRITE_ONCE(cqe->flags, 0);
 505        } else {
 506                unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
 507
 508                WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
 509        }
 510}
 511
 512static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
 513{
 514        if (waitqueue_active(&ctx->wait))
 515                wake_up(&ctx->wait);
 516        if (waitqueue_active(&ctx->sqo_wait))
 517                wake_up(&ctx->sqo_wait);
 518        if (ctx->cq_ev_fd)
 519                eventfd_signal(ctx->cq_ev_fd, 1);
 520}
 521
 522static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
 523                                long res)
 524{
 525        unsigned long flags;
 526
 527        spin_lock_irqsave(&ctx->completion_lock, flags);
 528        io_cqring_fill_event(ctx, user_data, res);
 529        io_commit_cqring(ctx);
 530        spin_unlock_irqrestore(&ctx->completion_lock, flags);
 531
 532        io_cqring_ev_posted(ctx);
 533}
 534
 535static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
 536{
 537        percpu_ref_put_many(&ctx->refs, refs);
 538
 539        if (waitqueue_active(&ctx->wait))
 540                wake_up(&ctx->wait);
 541}
 542
 543static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
 544                                   struct io_submit_state *state)
 545{
 546        gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
 547        struct io_kiocb *req;
 548
 549        if (!percpu_ref_tryget(&ctx->refs))
 550                return NULL;
 551
 552        if (!state) {
 553                req = kmem_cache_alloc(req_cachep, gfp);
 554                if (unlikely(!req))
 555                        goto out;
 556        } else if (!state->free_reqs) {
 557                size_t sz;
 558                int ret;
 559
 560                sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
 561                ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
 562
 563                /*
 564                 * Bulk alloc is all-or-nothing. If we fail to get a batch,
 565                 * retry single alloc to be on the safe side.
 566                 */
 567                if (unlikely(ret <= 0)) {
 568                        state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
 569                        if (!state->reqs[0])
 570                                goto out;
 571                        ret = 1;
 572                }
 573                state->free_reqs = ret - 1;
 574                state->cur_req = 1;
 575                req = state->reqs[0];
 576        } else {
 577                req = state->reqs[state->cur_req];
 578                state->free_reqs--;
 579                state->cur_req++;
 580        }
 581
 582        req->file = NULL;
 583        req->ctx = ctx;
 584        req->flags = 0;
 585        /* one is dropped after submission, the other at completion */
 586        refcount_set(&req->refs, 2);
 587        return req;
 588out:
 589        io_ring_drop_ctx_refs(ctx, 1);
 590        return NULL;
 591}
 592
 593static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
 594{
 595        if (*nr) {
 596                kmem_cache_free_bulk(req_cachep, *nr, reqs);
 597                io_ring_drop_ctx_refs(ctx, *nr);
 598                *nr = 0;
 599        }
 600}
 601
 602static void io_free_req(struct io_kiocb *req)
 603{
 604        if (req->file && !(req->flags & REQ_F_FIXED_FILE))
 605                fput(req->file);
 606        io_ring_drop_ctx_refs(req->ctx, 1);
 607        kmem_cache_free(req_cachep, req);
 608}
 609
 610static void io_put_req(struct io_kiocb *req)
 611{
 612        if (refcount_dec_and_test(&req->refs))
 613                io_free_req(req);
 614}
 615
 616/*
 617 * Find and free completed poll iocbs
 618 */
 619static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 620                               struct list_head *done)
 621{
 622        void *reqs[IO_IOPOLL_BATCH];
 623        struct io_kiocb *req;
 624        int to_free;
 625
 626        to_free = 0;
 627        while (!list_empty(done)) {
 628                req = list_first_entry(done, struct io_kiocb, list);
 629                list_del(&req->list);
 630
 631                io_cqring_fill_event(ctx, req->user_data, req->error);
 632                (*nr_events)++;
 633
 634                if (refcount_dec_and_test(&req->refs)) {
 635                        /* If we're not using fixed files, we have to pair the
 636                         * completion part with the file put. Use regular
 637                         * completions for those, only batch free for fixed
 638                         * file.
 639                         */
 640                        if (req->flags & REQ_F_FIXED_FILE) {
 641                                reqs[to_free++] = req;
 642                                if (to_free == ARRAY_SIZE(reqs))
 643                                        io_free_req_many(ctx, reqs, &to_free);
 644                        } else {
 645                                io_free_req(req);
 646                        }
 647                }
 648        }
 649
 650        io_commit_cqring(ctx);
 651        io_free_req_many(ctx, reqs, &to_free);
 652}
 653
 654static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
 655                        long min)
 656{
 657        struct io_kiocb *req, *tmp;
 658        LIST_HEAD(done);
 659        bool spin;
 660        int ret;
 661
 662        /*
 663         * Only spin for completions if we don't have multiple devices hanging
 664         * off our complete list, and we're under the requested amount.
 665         */
 666        spin = !ctx->poll_multi_file && *nr_events < min;
 667
 668        ret = 0;
 669        list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
 670                struct kiocb *kiocb = &req->rw;
 671
 672                /*
 673                 * Move completed entries to our local list. If we find a
 674                 * request that requires polling, break out and complete
 675                 * the done list first, if we have entries there.
 676                 */
 677                if (req->flags & REQ_F_IOPOLL_COMPLETED) {
 678                        list_move_tail(&req->list, &done);
 679                        continue;
 680                }
 681                if (!list_empty(&done))
 682                        break;
 683
 684                ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
 685                if (ret < 0)
 686                        break;
 687
 688                if (ret && spin)
 689                        spin = false;
 690                ret = 0;
 691        }
 692
 693        if (!list_empty(&done))
 694                io_iopoll_complete(ctx, nr_events, &done);
 695
 696        return ret;
 697}
 698
 699/*
 700 * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
 701 * non-spinning poll check - we'll still enter the driver poll loop, but only
 702 * as a non-spinning completion check.
 703 */
 704static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
 705                                long min)
 706{
 707        while (!list_empty(&ctx->poll_list)) {
 708                int ret;
 709
 710                ret = io_do_iopoll(ctx, nr_events, min);
 711                if (ret < 0)
 712                        return ret;
 713                if (!min || *nr_events >= min)
 714                        return 0;
 715        }
 716
 717        return 1;
 718}
 719
 720/*
 721 * We can't just wait for polled events to come to us, we have to actively
 722 * find and complete them.
 723 */
 724static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
 725{
 726        if (!(ctx->flags & IORING_SETUP_IOPOLL))
 727                return;
 728
 729        mutex_lock(&ctx->uring_lock);
 730        while (!list_empty(&ctx->poll_list)) {
 731                unsigned int nr_events = 0;
 732
 733                io_iopoll_getevents(ctx, &nr_events, 1);
 734        }
 735        mutex_unlock(&ctx->uring_lock);
 736}
 737
 738static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
 739                           long min)
 740{
 741        int ret = 0;
 742
 743        do {
 744                int tmin = 0;
 745
 746                if (*nr_events < min)
 747                        tmin = min - *nr_events;
 748
 749                ret = io_iopoll_getevents(ctx, nr_events, tmin);
 750                if (ret <= 0)
 751                        break;
 752                ret = 0;
 753        } while (min && !*nr_events && !need_resched());
 754
 755        return ret;
 756}
 757
 758static void kiocb_end_write(struct kiocb *kiocb)
 759{
 760        if (kiocb->ki_flags & IOCB_WRITE) {
 761                struct inode *inode = file_inode(kiocb->ki_filp);
 762
 763                /*
 764                 * Tell lockdep we inherited freeze protection from submission
 765                 * thread.
 766                 */
 767                if (S_ISREG(inode->i_mode))
 768                        __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
 769                file_end_write(kiocb->ki_filp);
 770        }
 771}
 772
 773static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
 774{
 775        struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
 776
 777        kiocb_end_write(kiocb);
 778
 779        io_cqring_add_event(req->ctx, req->user_data, res);
 780        io_put_req(req);
 781}
 782
 783static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
 784{
 785        struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
 786
 787        kiocb_end_write(kiocb);
 788
 789        req->error = res;
 790        if (res != -EAGAIN)
 791                req->flags |= REQ_F_IOPOLL_COMPLETED;
 792}
 793
 794/*
 795 * After the iocb has been issued, it's safe to be found on the poll list.
 796 * Adding the kiocb to the list AFTER submission ensures that we don't
 797 * find it from a io_iopoll_getevents() thread before the issuer is done
 798 * accessing the kiocb cookie.
 799 */
 800static void io_iopoll_req_issued(struct io_kiocb *req)
 801{
 802        struct io_ring_ctx *ctx = req->ctx;
 803
 804        /*
 805         * Track whether we have multiple files in our lists. This will impact
 806         * how we do polling eventually, not spinning if we're on potentially
 807         * different devices.
 808         */
 809        if (list_empty(&ctx->poll_list)) {
 810                ctx->poll_multi_file = false;
 811        } else if (!ctx->poll_multi_file) {
 812                struct io_kiocb *list_req;
 813
 814                list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
 815                                                list);
 816                if (list_req->rw.ki_filp != req->rw.ki_filp)
 817                        ctx->poll_multi_file = true;
 818        }
 819
 820        /*
 821         * For fast devices, IO may have already completed. If it has, add
 822         * it to the front so we find it first.
 823         */
 824        if (req->flags & REQ_F_IOPOLL_COMPLETED)
 825                list_add(&req->list, &ctx->poll_list);
 826        else
 827                list_add_tail(&req->list, &ctx->poll_list);
 828}
 829
 830static void io_file_put(struct io_submit_state *state)
 831{
 832        if (state->file) {
 833                int diff = state->has_refs - state->used_refs;
 834
 835                if (diff)
 836                        fput_many(state->file, diff);
 837                state->file = NULL;
 838        }
 839}
 840
 841/*
 842 * Get as many references to a file as we have IOs left in this submission,
 843 * assuming most submissions are for one file, or at least that each file
 844 * has more than one submission.
 845 */
 846static struct file *io_file_get(struct io_submit_state *state, int fd)
 847{
 848        if (!state)
 849                return fget(fd);
 850
 851        if (state->file) {
 852                if (state->fd == fd) {
 853                        state->used_refs++;
 854                        state->ios_left--;
 855                        return state->file;
 856                }
 857                io_file_put(state);
 858        }
 859        state->file = fget_many(fd, state->ios_left);
 860        if (!state->file)
 861                return NULL;
 862
 863        state->fd = fd;
 864        state->has_refs = state->ios_left;
 865        state->used_refs = 1;
 866        state->ios_left--;
 867        return state->file;
 868}
 869
 870/*
 871 * If we tracked the file through the SCM inflight mechanism, we could support
 872 * any file. For now, just ensure that anything potentially problematic is done
 873 * inline.
 874 */
 875static bool io_file_supports_async(struct file *file)
 876{
 877        umode_t mode = file_inode(file)->i_mode;
 878
 879        if (S_ISBLK(mode) || S_ISCHR(mode))
 880                return true;
 881        if (S_ISREG(mode) && file->f_op != &io_uring_fops)
 882                return true;
 883
 884        return false;
 885}
 886
 887static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
 888                      bool force_nonblock)
 889{
 890        const struct io_uring_sqe *sqe = s->sqe;
 891        struct io_ring_ctx *ctx = req->ctx;
 892        struct kiocb *kiocb = &req->rw;
 893        unsigned ioprio;
 894        int ret;
 895
 896        if (!req->file)
 897                return -EBADF;
 898
 899        if (force_nonblock && !io_file_supports_async(req->file))
 900                force_nonblock = false;
 901
 902        kiocb->ki_pos = READ_ONCE(sqe->off);
 903        kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
 904        kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
 905
 906        ioprio = READ_ONCE(sqe->ioprio);
 907        if (ioprio) {
 908                ret = ioprio_check_cap(ioprio);
 909                if (ret)
 910                        return ret;
 911
 912                kiocb->ki_ioprio = ioprio;
 913        } else
 914                kiocb->ki_ioprio = get_current_ioprio();
 915
 916        ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
 917        if (unlikely(ret))
 918                return ret;
 919
 920        /* don't allow async punt if RWF_NOWAIT was requested */
 921        if (kiocb->ki_flags & IOCB_NOWAIT)
 922                req->flags |= REQ_F_NOWAIT;
 923
 924        if (force_nonblock)
 925                kiocb->ki_flags |= IOCB_NOWAIT;
 926
 927        if (ctx->flags & IORING_SETUP_IOPOLL) {
 928                if (!(kiocb->ki_flags & IOCB_DIRECT) ||
 929                    !kiocb->ki_filp->f_op->iopoll)
 930                        return -EOPNOTSUPP;
 931
 932                req->error = 0;
 933                kiocb->ki_flags |= IOCB_HIPRI;
 934                kiocb->ki_complete = io_complete_rw_iopoll;
 935        } else {
 936                if (kiocb->ki_flags & IOCB_HIPRI)
 937                        return -EINVAL;
 938                kiocb->ki_complete = io_complete_rw;
 939        }
 940        return 0;
 941}
 942
 943static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
 944{
 945        switch (ret) {
 946        case -EIOCBQUEUED:
 947                break;
 948        case -ERESTARTSYS:
 949        case -ERESTARTNOINTR:
 950        case -ERESTARTNOHAND:
 951        case -ERESTART_RESTARTBLOCK:
 952                /*
 953                 * We can't just restart the syscall, since previously
 954                 * submitted sqes may already be in progress. Just fail this
 955                 * IO with EINTR.
 956                 */
 957                ret = -EINTR;
 958                /* fall through */
 959        default:
 960                kiocb->ki_complete(kiocb, ret, 0);
 961        }
 962}
 963
 964static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
 965                           const struct io_uring_sqe *sqe,
 966                           struct iov_iter *iter)
 967{
 968        size_t len = READ_ONCE(sqe->len);
 969        struct io_mapped_ubuf *imu;
 970        unsigned index, buf_index;
 971        size_t offset;
 972        u64 buf_addr;
 973
 974        /* attempt to use fixed buffers without having provided iovecs */
 975        if (unlikely(!ctx->user_bufs))
 976                return -EFAULT;
 977
 978        buf_index = READ_ONCE(sqe->buf_index);
 979        if (unlikely(buf_index >= ctx->nr_user_bufs))
 980                return -EFAULT;
 981
 982        index = array_index_nospec(buf_index, ctx->nr_user_bufs);
 983        imu = &ctx->user_bufs[index];
 984        buf_addr = READ_ONCE(sqe->addr);
 985
 986        /* overflow */
 987        if (buf_addr + len < buf_addr)
 988                return -EFAULT;
 989        /* not inside the mapped region */
 990        if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
 991                return -EFAULT;
 992
 993        /*
 994         * May not be a start of buffer, set size appropriately
 995         * and advance us to the beginning.
 996         */
 997        offset = buf_addr - imu->ubuf;
 998        iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
 999        if (offset)
1000                iov_iter_advance(iter, offset);
1001
1002        /* don't drop a reference to these pages */
1003        iter->type |= ITER_BVEC_FLAG_NO_REF;
1004        return 0;
1005}
1006
1007static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
1008                           const struct sqe_submit *s, struct iovec **iovec,
1009                           struct iov_iter *iter)
1010{
1011        const struct io_uring_sqe *sqe = s->sqe;
1012        void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1013        size_t sqe_len = READ_ONCE(sqe->len);
1014        u8 opcode;
1015
1016        /*
1017         * We're reading ->opcode for the second time, but the first read
1018         * doesn't care whether it's _FIXED or not, so it doesn't matter
1019         * whether ->opcode changes concurrently. The first read does care
1020         * about whether it is a READ or a WRITE, so we don't trust this read
1021         * for that purpose and instead let the caller pass in the read/write
1022         * flag.
1023         */
1024        opcode = READ_ONCE(sqe->opcode);
1025        if (opcode == IORING_OP_READ_FIXED ||
1026            opcode == IORING_OP_WRITE_FIXED) {
1027                int ret = io_import_fixed(ctx, rw, sqe, iter);
1028                *iovec = NULL;
1029                return ret;
1030        }
1031
1032        if (!s->has_user)
1033                return -EFAULT;
1034
1035#ifdef CONFIG_COMPAT
1036        if (ctx->compat)
1037                return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1038                                                iovec, iter);
1039#endif
1040
1041        return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1042}
1043
1044/*
1045 * Make a note of the last file/offset/direction we punted to async
1046 * context. We'll use this information to see if we can piggy back a
1047 * sequential request onto the previous one, if it's still hasn't been
1048 * completed by the async worker.
1049 */
1050static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1051{
1052        struct async_list *async_list = &req->ctx->pending_async[rw];
1053        struct kiocb *kiocb = &req->rw;
1054        struct file *filp = kiocb->ki_filp;
1055        off_t io_end = kiocb->ki_pos + len;
1056
1057        if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1058                unsigned long max_pages;
1059
1060                /* Use 8x RA size as a decent limiter for both reads/writes */
1061                max_pages = filp->f_ra.ra_pages;
1062                if (!max_pages)
1063                        max_pages = VM_READAHEAD_PAGES;
1064                max_pages *= 8;
1065
1066                /* If max pages are exceeded, reset the state */
1067                len >>= PAGE_SHIFT;
1068                if (async_list->io_pages + len <= max_pages) {
1069                        req->flags |= REQ_F_SEQ_PREV;
1070                        async_list->io_pages += len;
1071                } else {
1072                        io_end = 0;
1073                        async_list->io_pages = 0;
1074                }
1075        }
1076
1077        /* New file? Reset state. */
1078        if (async_list->file != filp) {
1079                async_list->io_pages = 0;
1080                async_list->file = filp;
1081        }
1082        async_list->io_end = io_end;
1083}
1084
1085static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1086                   bool force_nonblock)
1087{
1088        struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1089        struct kiocb *kiocb = &req->rw;
1090        struct iov_iter iter;
1091        struct file *file;
1092        size_t iov_count;
1093        int ret;
1094
1095        ret = io_prep_rw(req, s, force_nonblock);
1096        if (ret)
1097                return ret;
1098        file = kiocb->ki_filp;
1099
1100        if (unlikely(!(file->f_mode & FMODE_READ)))
1101                return -EBADF;
1102        if (unlikely(!file->f_op->read_iter))
1103                return -EINVAL;
1104
1105        ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1106        if (ret)
1107                return ret;
1108
1109        iov_count = iov_iter_count(&iter);
1110        ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1111        if (!ret) {
1112                ssize_t ret2;
1113
1114                /* Catch -EAGAIN return for forced non-blocking submission */
1115                ret2 = call_read_iter(file, kiocb, &iter);
1116                if (!force_nonblock || ret2 != -EAGAIN) {
1117                        io_rw_done(kiocb, ret2);
1118                } else {
1119                        /*
1120                         * If ->needs_lock is true, we're already in async
1121                         * context.
1122                         */
1123                        if (!s->needs_lock)
1124                                io_async_list_note(READ, req, iov_count);
1125                        ret = -EAGAIN;
1126                }
1127        }
1128        kfree(iovec);
1129        return ret;
1130}
1131
1132static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1133                    bool force_nonblock)
1134{
1135        struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1136        struct kiocb *kiocb = &req->rw;
1137        struct iov_iter iter;
1138        struct file *file;
1139        size_t iov_count;
1140        int ret;
1141
1142        ret = io_prep_rw(req, s, force_nonblock);
1143        if (ret)
1144                return ret;
1145
1146        file = kiocb->ki_filp;
1147        if (unlikely(!(file->f_mode & FMODE_WRITE)))
1148                return -EBADF;
1149        if (unlikely(!file->f_op->write_iter))
1150                return -EINVAL;
1151
1152        ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1153        if (ret)
1154                return ret;
1155
1156        iov_count = iov_iter_count(&iter);
1157
1158        ret = -EAGAIN;
1159        if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1160                /* If ->needs_lock is true, we're already in async context. */
1161                if (!s->needs_lock)
1162                        io_async_list_note(WRITE, req, iov_count);
1163                goto out_free;
1164        }
1165
1166        ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1167        if (!ret) {
1168                ssize_t ret2;
1169
1170                /*
1171                 * Open-code file_start_write here to grab freeze protection,
1172                 * which will be released by another thread in
1173                 * io_complete_rw().  Fool lockdep by telling it the lock got
1174                 * released so that it doesn't complain about the held lock when
1175                 * we return to userspace.
1176                 */
1177                if (S_ISREG(file_inode(file)->i_mode)) {
1178                        __sb_start_write(file_inode(file)->i_sb,
1179                                                SB_FREEZE_WRITE, true);
1180                        __sb_writers_release(file_inode(file)->i_sb,
1181                                                SB_FREEZE_WRITE);
1182                }
1183                kiocb->ki_flags |= IOCB_WRITE;
1184
1185                ret2 = call_write_iter(file, kiocb, &iter);
1186                if (!force_nonblock || ret2 != -EAGAIN) {
1187                        io_rw_done(kiocb, ret2);
1188                } else {
1189                        /*
1190                         * If ->needs_lock is true, we're already in async
1191                         * context.
1192                         */
1193                        if (!s->needs_lock)
1194                                io_async_list_note(WRITE, req, iov_count);
1195                        ret = -EAGAIN;
1196                }
1197        }
1198out_free:
1199        kfree(iovec);
1200        return ret;
1201}
1202
1203/*
1204 * IORING_OP_NOP just posts a completion event, nothing else.
1205 */
1206static int io_nop(struct io_kiocb *req, u64 user_data)
1207{
1208        struct io_ring_ctx *ctx = req->ctx;
1209        long err = 0;
1210
1211        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1212                return -EINVAL;
1213
1214        io_cqring_add_event(ctx, user_data, err);
1215        io_put_req(req);
1216        return 0;
1217}
1218
1219static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1220{
1221        struct io_ring_ctx *ctx = req->ctx;
1222
1223        if (!req->file)
1224                return -EBADF;
1225
1226        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1227                return -EINVAL;
1228        if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1229                return -EINVAL;
1230
1231        return 0;
1232}
1233
1234static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1235                    bool force_nonblock)
1236{
1237        loff_t sqe_off = READ_ONCE(sqe->off);
1238        loff_t sqe_len = READ_ONCE(sqe->len);
1239        loff_t end = sqe_off + sqe_len;
1240        unsigned fsync_flags;
1241        int ret;
1242
1243        fsync_flags = READ_ONCE(sqe->fsync_flags);
1244        if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1245                return -EINVAL;
1246
1247        ret = io_prep_fsync(req, sqe);
1248        if (ret)
1249                return ret;
1250
1251        /* fsync always requires a blocking context */
1252        if (force_nonblock)
1253                return -EAGAIN;
1254
1255        ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1256                                end > 0 ? end : LLONG_MAX,
1257                                fsync_flags & IORING_FSYNC_DATASYNC);
1258
1259        io_cqring_add_event(req->ctx, sqe->user_data, ret);
1260        io_put_req(req);
1261        return 0;
1262}
1263
1264static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1265{
1266        struct io_ring_ctx *ctx = req->ctx;
1267        int ret = 0;
1268
1269        if (!req->file)
1270                return -EBADF;
1271
1272        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1273                return -EINVAL;
1274        if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1275                return -EINVAL;
1276
1277        return ret;
1278}
1279
1280static int io_sync_file_range(struct io_kiocb *req,
1281                              const struct io_uring_sqe *sqe,
1282                              bool force_nonblock)
1283{
1284        loff_t sqe_off;
1285        loff_t sqe_len;
1286        unsigned flags;
1287        int ret;
1288
1289        ret = io_prep_sfr(req, sqe);
1290        if (ret)
1291                return ret;
1292
1293        /* sync_file_range always requires a blocking context */
1294        if (force_nonblock)
1295                return -EAGAIN;
1296
1297        sqe_off = READ_ONCE(sqe->off);
1298        sqe_len = READ_ONCE(sqe->len);
1299        flags = READ_ONCE(sqe->sync_range_flags);
1300
1301        ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1302
1303        io_cqring_add_event(req->ctx, sqe->user_data, ret);
1304        io_put_req(req);
1305        return 0;
1306}
1307
1308static void io_poll_remove_one(struct io_kiocb *req)
1309{
1310        struct io_poll_iocb *poll = &req->poll;
1311
1312        spin_lock(&poll->head->lock);
1313        WRITE_ONCE(poll->canceled, true);
1314        if (!list_empty(&poll->wait.entry)) {
1315                list_del_init(&poll->wait.entry);
1316                queue_work(req->ctx->sqo_wq, &req->work);
1317        }
1318        spin_unlock(&poll->head->lock);
1319
1320        list_del_init(&req->list);
1321}
1322
1323static void io_poll_remove_all(struct io_ring_ctx *ctx)
1324{
1325        struct io_kiocb *req;
1326
1327        spin_lock_irq(&ctx->completion_lock);
1328        while (!list_empty(&ctx->cancel_list)) {
1329                req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1330                io_poll_remove_one(req);
1331        }
1332        spin_unlock_irq(&ctx->completion_lock);
1333}
1334
1335/*
1336 * Find a running poll command that matches one specified in sqe->addr,
1337 * and remove it if found.
1338 */
1339static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1340{
1341        struct io_ring_ctx *ctx = req->ctx;
1342        struct io_kiocb *poll_req, *next;
1343        int ret = -ENOENT;
1344
1345        if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1346                return -EINVAL;
1347        if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1348            sqe->poll_events)
1349                return -EINVAL;
1350
1351        spin_lock_irq(&ctx->completion_lock);
1352        list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1353                if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1354                        io_poll_remove_one(poll_req);
1355                        ret = 0;
1356                        break;
1357                }
1358        }
1359        spin_unlock_irq(&ctx->completion_lock);
1360
1361        io_cqring_add_event(req->ctx, sqe->user_data, ret);
1362        io_put_req(req);
1363        return 0;
1364}
1365
1366static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1367                             __poll_t mask)
1368{
1369        req->poll.done = true;
1370        io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
1371        io_commit_cqring(ctx);
1372}
1373
1374static void io_poll_complete_work(struct work_struct *work)
1375{
1376        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1377        struct io_poll_iocb *poll = &req->poll;
1378        struct poll_table_struct pt = { ._key = poll->events };
1379        struct io_ring_ctx *ctx = req->ctx;
1380        __poll_t mask = 0;
1381
1382        if (!READ_ONCE(poll->canceled))
1383                mask = vfs_poll(poll->file, &pt) & poll->events;
1384
1385        /*
1386         * Note that ->ki_cancel callers also delete iocb from active_reqs after
1387         * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1388         * synchronize with them.  In the cancellation case the list_del_init
1389         * itself is not actually needed, but harmless so we keep it in to
1390         * avoid further branches in the fast path.
1391         */
1392        spin_lock_irq(&ctx->completion_lock);
1393        if (!mask && !READ_ONCE(poll->canceled)) {
1394                add_wait_queue(poll->head, &poll->wait);
1395                spin_unlock_irq(&ctx->completion_lock);
1396                return;
1397        }
1398        list_del_init(&req->list);
1399        io_poll_complete(ctx, req, mask);
1400        spin_unlock_irq(&ctx->completion_lock);
1401
1402        io_cqring_ev_posted(ctx);
1403        io_put_req(req);
1404}
1405
1406static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1407                        void *key)
1408{
1409        struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1410                                                        wait);
1411        struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1412        struct io_ring_ctx *ctx = req->ctx;
1413        __poll_t mask = key_to_poll(key);
1414        unsigned long flags;
1415
1416        /* for instances that support it check for an event match first: */
1417        if (mask && !(mask & poll->events))
1418                return 0;
1419
1420        list_del_init(&poll->wait.entry);
1421
1422        if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1423                list_del(&req->list);
1424                io_poll_complete(ctx, req, mask);
1425                spin_unlock_irqrestore(&ctx->completion_lock, flags);
1426
1427                io_cqring_ev_posted(ctx);
1428                io_put_req(req);
1429        } else {
1430                queue_work(ctx->sqo_wq, &req->work);
1431        }
1432
1433        return 1;
1434}
1435
1436struct io_poll_table {
1437        struct poll_table_struct pt;
1438        struct io_kiocb *req;
1439        int error;
1440};
1441
1442static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1443                               struct poll_table_struct *p)
1444{
1445        struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1446
1447        if (unlikely(pt->req->poll.head)) {
1448                pt->error = -EINVAL;
1449                return;
1450        }
1451
1452        pt->error = 0;
1453        pt->req->poll.head = head;
1454        add_wait_queue(head, &pt->req->poll.wait);
1455}
1456
1457static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1458{
1459        struct io_poll_iocb *poll = &req->poll;
1460        struct io_ring_ctx *ctx = req->ctx;
1461        struct io_poll_table ipt;
1462        bool cancel = false;
1463        __poll_t mask;
1464        u16 events;
1465
1466        if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1467                return -EINVAL;
1468        if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1469                return -EINVAL;
1470        if (!poll->file)
1471                return -EBADF;
1472
1473        INIT_WORK(&req->work, io_poll_complete_work);
1474        events = READ_ONCE(sqe->poll_events);
1475        poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1476
1477        poll->head = NULL;
1478        poll->done = false;
1479        poll->canceled = false;
1480
1481        ipt.pt._qproc = io_poll_queue_proc;
1482        ipt.pt._key = poll->events;
1483        ipt.req = req;
1484        ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1485
1486        /* initialized the list so that we can do list_empty checks */
1487        INIT_LIST_HEAD(&poll->wait.entry);
1488        init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1489
1490        mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1491
1492        spin_lock_irq(&ctx->completion_lock);
1493        if (likely(poll->head)) {
1494                spin_lock(&poll->head->lock);
1495                if (unlikely(list_empty(&poll->wait.entry))) {
1496                        if (ipt.error)
1497                                cancel = true;
1498                        ipt.error = 0;
1499                        mask = 0;
1500                }
1501                if (mask || ipt.error)
1502                        list_del_init(&poll->wait.entry);
1503                else if (cancel)
1504                        WRITE_ONCE(poll->canceled, true);
1505                else if (!poll->done) /* actually waiting for an event */
1506                        list_add_tail(&req->list, &ctx->cancel_list);
1507                spin_unlock(&poll->head->lock);
1508        }
1509        if (mask) { /* no async, we'd stolen it */
1510                ipt.error = 0;
1511                io_poll_complete(ctx, req, mask);
1512        }
1513        spin_unlock_irq(&ctx->completion_lock);
1514
1515        if (mask) {
1516                io_cqring_ev_posted(ctx);
1517                io_put_req(req);
1518        }
1519        return ipt.error;
1520}
1521
1522static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
1523                        const struct io_uring_sqe *sqe)
1524{
1525        struct io_uring_sqe *sqe_copy;
1526
1527        if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
1528                return 0;
1529
1530        sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1531        if (!sqe_copy)
1532                return -EAGAIN;
1533
1534        spin_lock_irq(&ctx->completion_lock);
1535        if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
1536                spin_unlock_irq(&ctx->completion_lock);
1537                kfree(sqe_copy);
1538                return 0;
1539        }
1540
1541        memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
1542        req->submit.sqe = sqe_copy;
1543
1544        INIT_WORK(&req->work, io_sq_wq_submit_work);
1545        list_add_tail(&req->list, &ctx->defer_list);
1546        spin_unlock_irq(&ctx->completion_lock);
1547        return -EIOCBQUEUED;
1548}
1549
1550static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1551                           const struct sqe_submit *s, bool force_nonblock)
1552{
1553        int ret, opcode;
1554
1555        if (unlikely(s->index >= ctx->sq_entries))
1556                return -EINVAL;
1557        req->user_data = READ_ONCE(s->sqe->user_data);
1558
1559        opcode = READ_ONCE(s->sqe->opcode);
1560        switch (opcode) {
1561        case IORING_OP_NOP:
1562                ret = io_nop(req, req->user_data);
1563                break;
1564        case IORING_OP_READV:
1565                if (unlikely(s->sqe->buf_index))
1566                        return -EINVAL;
1567                ret = io_read(req, s, force_nonblock);
1568                break;
1569        case IORING_OP_WRITEV:
1570                if (unlikely(s->sqe->buf_index))
1571                        return -EINVAL;
1572                ret = io_write(req, s, force_nonblock);
1573                break;
1574        case IORING_OP_READ_FIXED:
1575                ret = io_read(req, s, force_nonblock);
1576                break;
1577        case IORING_OP_WRITE_FIXED:
1578                ret = io_write(req, s, force_nonblock);
1579                break;
1580        case IORING_OP_FSYNC:
1581                ret = io_fsync(req, s->sqe, force_nonblock);
1582                break;
1583        case IORING_OP_POLL_ADD:
1584                ret = io_poll_add(req, s->sqe);
1585                break;
1586        case IORING_OP_POLL_REMOVE:
1587                ret = io_poll_remove(req, s->sqe);
1588                break;
1589        case IORING_OP_SYNC_FILE_RANGE:
1590                ret = io_sync_file_range(req, s->sqe, force_nonblock);
1591                break;
1592        default:
1593                ret = -EINVAL;
1594                break;
1595        }
1596
1597        if (ret)
1598                return ret;
1599
1600        if (ctx->flags & IORING_SETUP_IOPOLL) {
1601                if (req->error == -EAGAIN)
1602                        return -EAGAIN;
1603
1604                /* workqueue context doesn't hold uring_lock, grab it now */
1605                if (s->needs_lock)
1606                        mutex_lock(&ctx->uring_lock);
1607                io_iopoll_req_issued(req);
1608                if (s->needs_lock)
1609                        mutex_unlock(&ctx->uring_lock);
1610        }
1611
1612        return 0;
1613}
1614
1615static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1616                                                 const struct io_uring_sqe *sqe)
1617{
1618        switch (sqe->opcode) {
1619        case IORING_OP_READV:
1620        case IORING_OP_READ_FIXED:
1621                return &ctx->pending_async[READ];
1622        case IORING_OP_WRITEV:
1623        case IORING_OP_WRITE_FIXED:
1624                return &ctx->pending_async[WRITE];
1625        default:
1626                return NULL;
1627        }
1628}
1629
1630static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1631{
1632        u8 opcode = READ_ONCE(sqe->opcode);
1633
1634        return !(opcode == IORING_OP_READ_FIXED ||
1635                 opcode == IORING_OP_WRITE_FIXED);
1636}
1637
1638static void io_sq_wq_submit_work(struct work_struct *work)
1639{
1640        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1641        struct io_ring_ctx *ctx = req->ctx;
1642        struct mm_struct *cur_mm = NULL;
1643        struct async_list *async_list;
1644        LIST_HEAD(req_list);
1645        mm_segment_t old_fs;
1646        int ret;
1647
1648        async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1649restart:
1650        do {
1651                struct sqe_submit *s = &req->submit;
1652                const struct io_uring_sqe *sqe = s->sqe;
1653
1654                /* Ensure we clear previously set non-block flag */
1655                req->rw.ki_flags &= ~IOCB_NOWAIT;
1656
1657                ret = 0;
1658                if (io_sqe_needs_user(sqe) && !cur_mm) {
1659                        if (!mmget_not_zero(ctx->sqo_mm)) {
1660                                ret = -EFAULT;
1661                        } else {
1662                                cur_mm = ctx->sqo_mm;
1663                                use_mm(cur_mm);
1664                                old_fs = get_fs();
1665                                set_fs(USER_DS);
1666                        }
1667                }
1668
1669                if (!ret) {
1670                        s->has_user = cur_mm != NULL;
1671                        s->needs_lock = true;
1672                        do {
1673                                ret = __io_submit_sqe(ctx, req, s, false);
1674                                /*
1675                                 * We can get EAGAIN for polled IO even though
1676                                 * we're forcing a sync submission from here,
1677                                 * since we can't wait for request slots on the
1678                                 * block side.
1679                                 */
1680                                if (ret != -EAGAIN)
1681                                        break;
1682                                cond_resched();
1683                        } while (1);
1684                }
1685
1686                /* drop submission reference */
1687                io_put_req(req);
1688
1689                if (ret) {
1690                        io_cqring_add_event(ctx, sqe->user_data, ret);
1691                        io_put_req(req);
1692                }
1693
1694                /* async context always use a copy of the sqe */
1695                kfree(sqe);
1696
1697                if (!async_list)
1698                        break;
1699                if (!list_empty(&req_list)) {
1700                        req = list_first_entry(&req_list, struct io_kiocb,
1701                                                list);
1702                        list_del(&req->list);
1703                        continue;
1704                }
1705                if (list_empty(&async_list->list))
1706                        break;
1707
1708                req = NULL;
1709                spin_lock(&async_list->lock);
1710                if (list_empty(&async_list->list)) {
1711                        spin_unlock(&async_list->lock);
1712                        break;
1713                }
1714                list_splice_init(&async_list->list, &req_list);
1715                spin_unlock(&async_list->lock);
1716
1717                req = list_first_entry(&req_list, struct io_kiocb, list);
1718                list_del(&req->list);
1719        } while (req);
1720
1721        /*
1722         * Rare case of racing with a submitter. If we find the count has
1723         * dropped to zero AND we have pending work items, then restart
1724         * the processing. This is a tiny race window.
1725         */
1726        if (async_list) {
1727                ret = atomic_dec_return(&async_list->cnt);
1728                while (!ret && !list_empty(&async_list->list)) {
1729                        spin_lock(&async_list->lock);
1730                        atomic_inc(&async_list->cnt);
1731                        list_splice_init(&async_list->list, &req_list);
1732                        spin_unlock(&async_list->lock);
1733
1734                        if (!list_empty(&req_list)) {
1735                                req = list_first_entry(&req_list,
1736                                                        struct io_kiocb, list);
1737                                list_del(&req->list);
1738                                goto restart;
1739                        }
1740                        ret = atomic_dec_return(&async_list->cnt);
1741                }
1742        }
1743
1744        if (cur_mm) {
1745                set_fs(old_fs);
1746                unuse_mm(cur_mm);
1747                mmput(cur_mm);
1748        }
1749}
1750
1751/*
1752 * See if we can piggy back onto previously submitted work, that is still
1753 * running. We currently only allow this if the new request is sequential
1754 * to the previous one we punted.
1755 */
1756static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1757{
1758        bool ret = false;
1759
1760        if (!list)
1761                return false;
1762        if (!(req->flags & REQ_F_SEQ_PREV))
1763                return false;
1764        if (!atomic_read(&list->cnt))
1765                return false;
1766
1767        ret = true;
1768        spin_lock(&list->lock);
1769        list_add_tail(&req->list, &list->list);
1770        if (!atomic_read(&list->cnt)) {
1771                list_del_init(&req->list);
1772                ret = false;
1773        }
1774        spin_unlock(&list->lock);
1775        return ret;
1776}
1777
1778static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1779{
1780        int op = READ_ONCE(sqe->opcode);
1781
1782        switch (op) {
1783        case IORING_OP_NOP:
1784        case IORING_OP_POLL_REMOVE:
1785                return false;
1786        default:
1787                return true;
1788        }
1789}
1790
1791static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1792                           struct io_submit_state *state, struct io_kiocb *req)
1793{
1794        unsigned flags;
1795        int fd;
1796
1797        flags = READ_ONCE(s->sqe->flags);
1798        fd = READ_ONCE(s->sqe->fd);
1799
1800        if (flags & IOSQE_IO_DRAIN) {
1801                req->flags |= REQ_F_IO_DRAIN;
1802                req->sequence = ctx->cached_sq_head - 1;
1803        }
1804
1805        if (!io_op_needs_file(s->sqe))
1806                return 0;
1807
1808        if (flags & IOSQE_FIXED_FILE) {
1809                if (unlikely(!ctx->user_files ||
1810                    (unsigned) fd >= ctx->nr_user_files))
1811                        return -EBADF;
1812                req->file = ctx->user_files[fd];
1813                req->flags |= REQ_F_FIXED_FILE;
1814        } else {
1815                if (s->needs_fixed_file)
1816                        return -EBADF;
1817                req->file = io_file_get(state, fd);
1818                if (unlikely(!req->file))
1819                        return -EBADF;
1820        }
1821
1822        return 0;
1823}
1824
1825static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1826                         struct io_submit_state *state)
1827{
1828        struct io_kiocb *req;
1829        int ret;
1830
1831        /* enforce forwards compatibility on users */
1832        if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
1833                return -EINVAL;
1834
1835        req = io_get_req(ctx, state);
1836        if (unlikely(!req))
1837                return -EAGAIN;
1838
1839        ret = io_req_set_file(ctx, s, state, req);
1840        if (unlikely(ret))
1841                goto out;
1842
1843        ret = io_req_defer(ctx, req, s->sqe);
1844        if (ret) {
1845                if (ret == -EIOCBQUEUED)
1846                        ret = 0;
1847                return ret;
1848        }
1849
1850        ret = __io_submit_sqe(ctx, req, s, true);
1851        if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1852                struct io_uring_sqe *sqe_copy;
1853
1854                sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1855                if (sqe_copy) {
1856                        struct async_list *list;
1857
1858                        memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1859                        s->sqe = sqe_copy;
1860
1861                        memcpy(&req->submit, s, sizeof(*s));
1862                        list = io_async_list_from_sqe(ctx, s->sqe);
1863                        if (!io_add_to_prev_work(list, req)) {
1864                                if (list)
1865                                        atomic_inc(&list->cnt);
1866                                INIT_WORK(&req->work, io_sq_wq_submit_work);
1867                                queue_work(ctx->sqo_wq, &req->work);
1868                        }
1869
1870                        /*
1871                         * Queued up for async execution, worker will release
1872                         * submit reference when the iocb is actually
1873                         * submitted.
1874                         */
1875                        return 0;
1876                }
1877        }
1878
1879out:
1880        /* drop submission reference */
1881        io_put_req(req);
1882
1883        /* and drop final reference, if we failed */
1884        if (ret)
1885                io_put_req(req);
1886
1887        return ret;
1888}
1889
1890/*
1891 * Batched submission is done, ensure local IO is flushed out.
1892 */
1893static void io_submit_state_end(struct io_submit_state *state)
1894{
1895        blk_finish_plug(&state->plug);
1896        io_file_put(state);
1897        if (state->free_reqs)
1898                kmem_cache_free_bulk(req_cachep, state->free_reqs,
1899                                        &state->reqs[state->cur_req]);
1900}
1901
1902/*
1903 * Start submission side cache.
1904 */
1905static void io_submit_state_start(struct io_submit_state *state,
1906                                  struct io_ring_ctx *ctx, unsigned max_ios)
1907{
1908        blk_start_plug(&state->plug);
1909        state->free_reqs = 0;
1910        state->file = NULL;
1911        state->ios_left = max_ios;
1912}
1913
1914static void io_commit_sqring(struct io_ring_ctx *ctx)
1915{
1916        struct io_sq_ring *ring = ctx->sq_ring;
1917
1918        if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1919                /*
1920                 * Ensure any loads from the SQEs are done at this point,
1921                 * since once we write the new head, the application could
1922                 * write new data to them.
1923                 */
1924                smp_store_release(&ring->r.head, ctx->cached_sq_head);
1925        }
1926}
1927
1928/*
1929 * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1930 * that is mapped by userspace. This means that care needs to be taken to
1931 * ensure that reads are stable, as we cannot rely on userspace always
1932 * being a good citizen. If members of the sqe are validated and then later
1933 * used, it's important that those reads are done through READ_ONCE() to
1934 * prevent a re-load down the line.
1935 */
1936static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1937{
1938        struct io_sq_ring *ring = ctx->sq_ring;
1939        unsigned head;
1940
1941        /*
1942         * The cached sq head (or cq tail) serves two purposes:
1943         *
1944         * 1) allows us to batch the cost of updating the user visible
1945         *    head updates.
1946         * 2) allows the kernel side to track the head on its own, even
1947         *    though the application is the one updating it.
1948         */
1949        head = ctx->cached_sq_head;
1950        /* make sure SQ entry isn't read before tail */
1951        if (head == smp_load_acquire(&ring->r.tail))
1952                return false;
1953
1954        head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1955        if (head < ctx->sq_entries) {
1956                s->index = head;
1957                s->sqe = &ctx->sq_sqes[head];
1958                ctx->cached_sq_head++;
1959                return true;
1960        }
1961
1962        /* drop invalid entries */
1963        ctx->cached_sq_head++;
1964        ring->dropped++;
1965        return false;
1966}
1967
1968static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1969                          unsigned int nr, bool has_user, bool mm_fault)
1970{
1971        struct io_submit_state state, *statep = NULL;
1972        int ret, i, submitted = 0;
1973
1974        if (nr > IO_PLUG_THRESHOLD) {
1975                io_submit_state_start(&state, ctx, nr);
1976                statep = &state;
1977        }
1978
1979        for (i = 0; i < nr; i++) {
1980                if (unlikely(mm_fault)) {
1981                        ret = -EFAULT;
1982                } else {
1983                        sqes[i].has_user = has_user;
1984                        sqes[i].needs_lock = true;
1985                        sqes[i].needs_fixed_file = true;
1986                        ret = io_submit_sqe(ctx, &sqes[i], statep);
1987                }
1988                if (!ret) {
1989                        submitted++;
1990                        continue;
1991                }
1992
1993                io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret);
1994        }
1995
1996        if (statep)
1997                io_submit_state_end(&state);
1998
1999        return submitted;
2000}
2001
2002static int io_sq_thread(void *data)
2003{
2004        struct sqe_submit sqes[IO_IOPOLL_BATCH];
2005        struct io_ring_ctx *ctx = data;
2006        struct mm_struct *cur_mm = NULL;
2007        mm_segment_t old_fs;
2008        DEFINE_WAIT(wait);
2009        unsigned inflight;
2010        unsigned long timeout;
2011
2012        old_fs = get_fs();
2013        set_fs(USER_DS);
2014
2015        timeout = inflight = 0;
2016        while (!kthread_should_park()) {
2017                bool all_fixed, mm_fault = false;
2018                int i;
2019
2020                if (inflight) {
2021                        unsigned nr_events = 0;
2022
2023                        if (ctx->flags & IORING_SETUP_IOPOLL) {
2024                                /*
2025                                 * We disallow the app entering submit/complete
2026                                 * with polling, but we still need to lock the
2027                                 * ring to prevent racing with polled issue
2028                                 * that got punted to a workqueue.
2029                                 */
2030                                mutex_lock(&ctx->uring_lock);
2031                                io_iopoll_check(ctx, &nr_events, 0);
2032                                mutex_unlock(&ctx->uring_lock);
2033                        } else {
2034                                /*
2035                                 * Normal IO, just pretend everything completed.
2036                                 * We don't have to poll completions for that.
2037                                 */
2038                                nr_events = inflight;
2039                        }
2040
2041                        inflight -= nr_events;
2042                        if (!inflight)
2043                                timeout = jiffies + ctx->sq_thread_idle;
2044                }
2045
2046                if (!io_get_sqring(ctx, &sqes[0])) {
2047                        /*
2048                         * We're polling. If we're within the defined idle
2049                         * period, then let us spin without work before going
2050                         * to sleep.
2051                         */
2052                        if (inflight || !time_after(jiffies, timeout)) {
2053                                cpu_relax();
2054                                continue;
2055                        }
2056
2057                        /*
2058                         * Drop cur_mm before scheduling, we can't hold it for
2059                         * long periods (or over schedule()). Do this before
2060                         * adding ourselves to the waitqueue, as the unuse/drop
2061                         * may sleep.
2062                         */
2063                        if (cur_mm) {
2064                                unuse_mm(cur_mm);
2065                                mmput(cur_mm);
2066                                cur_mm = NULL;
2067                        }
2068
2069                        prepare_to_wait(&ctx->sqo_wait, &wait,
2070                                                TASK_INTERRUPTIBLE);
2071
2072                        /* Tell userspace we may need a wakeup call */
2073                        ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
2074                        /* make sure to read SQ tail after writing flags */
2075                        smp_mb();
2076
2077                        if (!io_get_sqring(ctx, &sqes[0])) {
2078                                if (kthread_should_park()) {
2079                                        finish_wait(&ctx->sqo_wait, &wait);
2080                                        break;
2081                                }
2082                                if (signal_pending(current))
2083                                        flush_signals(current);
2084                                schedule();
2085                                finish_wait(&ctx->sqo_wait, &wait);
2086
2087                                ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2088                                continue;
2089                        }
2090                        finish_wait(&ctx->sqo_wait, &wait);
2091
2092                        ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2093                }
2094
2095                i = 0;
2096                all_fixed = true;
2097                do {
2098                        if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
2099                                all_fixed = false;
2100
2101                        i++;
2102                        if (i == ARRAY_SIZE(sqes))
2103                                break;
2104                } while (io_get_sqring(ctx, &sqes[i]));
2105
2106                /* Unless all new commands are FIXED regions, grab mm */
2107                if (!all_fixed && !cur_mm) {
2108                        mm_fault = !mmget_not_zero(ctx->sqo_mm);
2109                        if (!mm_fault) {
2110                                use_mm(ctx->sqo_mm);
2111                                cur_mm = ctx->sqo_mm;
2112                        }
2113                }
2114
2115                inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
2116                                                mm_fault);
2117
2118                /* Commit SQ ring head once we've consumed all SQEs */
2119                io_commit_sqring(ctx);
2120        }
2121
2122        set_fs(old_fs);
2123        if (cur_mm) {
2124                unuse_mm(cur_mm);
2125                mmput(cur_mm);
2126        }
2127
2128        kthread_parkme();
2129
2130        return 0;
2131}
2132
2133static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2134{
2135        struct io_submit_state state, *statep = NULL;
2136        int i, submit = 0;
2137
2138        if (to_submit > IO_PLUG_THRESHOLD) {
2139                io_submit_state_start(&state, ctx, to_submit);
2140                statep = &state;
2141        }
2142
2143        for (i = 0; i < to_submit; i++) {
2144                struct sqe_submit s;
2145                int ret;
2146
2147                if (!io_get_sqring(ctx, &s))
2148                        break;
2149
2150                s.has_user = true;
2151                s.needs_lock = false;
2152                s.needs_fixed_file = false;
2153                submit++;
2154
2155                ret = io_submit_sqe(ctx, &s, statep);
2156                if (ret)
2157                        io_cqring_add_event(ctx, s.sqe->user_data, ret);
2158        }
2159        io_commit_sqring(ctx);
2160
2161        if (statep)
2162                io_submit_state_end(statep);
2163
2164        return submit;
2165}
2166
2167static unsigned io_cqring_events(struct io_cq_ring *ring)
2168{
2169        /* See comment at the top of this file */
2170        smp_rmb();
2171        return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2172}
2173
2174/*
2175 * Wait until events become available, if we don't already have some. The
2176 * application must reap them itself, as they reside on the shared cq ring.
2177 */
2178static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2179                          const sigset_t __user *sig, size_t sigsz)
2180{
2181        struct io_cq_ring *ring = ctx->cq_ring;
2182        sigset_t ksigmask, sigsaved;
2183        int ret;
2184
2185        if (io_cqring_events(ring) >= min_events)
2186                return 0;
2187
2188        if (sig) {
2189#ifdef CONFIG_COMPAT
2190                if (in_compat_syscall())
2191                        ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2192                                                      &ksigmask, &sigsaved, sigsz);
2193                else
2194#endif
2195                        ret = set_user_sigmask(sig, &ksigmask,
2196                                               &sigsaved, sigsz);
2197
2198                if (ret)
2199                        return ret;
2200        }
2201
2202        ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events);
2203
2204        if (sig)
2205                restore_user_sigmask(sig, &sigsaved, ret == -ERESTARTSYS);
2206
2207        if (ret == -ERESTARTSYS)
2208                ret = -EINTR;
2209
2210        return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2211}
2212
2213static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2214{
2215#if defined(CONFIG_UNIX)
2216        if (ctx->ring_sock) {
2217                struct sock *sock = ctx->ring_sock->sk;
2218                struct sk_buff *skb;
2219
2220                while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2221                        kfree_skb(skb);
2222        }
2223#else
2224        int i;
2225
2226        for (i = 0; i < ctx->nr_user_files; i++)
2227                fput(ctx->user_files[i]);
2228#endif
2229}
2230
2231static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2232{
2233        if (!ctx->user_files)
2234                return -ENXIO;
2235
2236        __io_sqe_files_unregister(ctx);
2237        kfree(ctx->user_files);
2238        ctx->user_files = NULL;
2239        ctx->nr_user_files = 0;
2240        return 0;
2241}
2242
2243static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2244{
2245        if (ctx->sqo_thread) {
2246                /*
2247                 * The park is a bit of a work-around, without it we get
2248                 * warning spews on shutdown with SQPOLL set and affinity
2249                 * set to a single CPU.
2250                 */
2251                kthread_park(ctx->sqo_thread);
2252                kthread_stop(ctx->sqo_thread);
2253                ctx->sqo_thread = NULL;
2254        }
2255}
2256
2257static void io_finish_async(struct io_ring_ctx *ctx)
2258{
2259        io_sq_thread_stop(ctx);
2260
2261        if (ctx->sqo_wq) {
2262                destroy_workqueue(ctx->sqo_wq);
2263                ctx->sqo_wq = NULL;
2264        }
2265}
2266
2267#if defined(CONFIG_UNIX)
2268static void io_destruct_skb(struct sk_buff *skb)
2269{
2270        struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2271
2272        io_finish_async(ctx);
2273        unix_destruct_scm(skb);
2274}
2275
2276/*
2277 * Ensure the UNIX gc is aware of our file set, so we are certain that
2278 * the io_uring can be safely unregistered on process exit, even if we have
2279 * loops in the file referencing.
2280 */
2281static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2282{
2283        struct sock *sk = ctx->ring_sock->sk;
2284        struct scm_fp_list *fpl;
2285        struct sk_buff *skb;
2286        int i;
2287
2288        if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2289                unsigned long inflight = ctx->user->unix_inflight + nr;
2290
2291                if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2292                        return -EMFILE;
2293        }
2294
2295        fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2296        if (!fpl)
2297                return -ENOMEM;
2298
2299        skb = alloc_skb(0, GFP_KERNEL);
2300        if (!skb) {
2301                kfree(fpl);
2302                return -ENOMEM;
2303        }
2304
2305        skb->sk = sk;
2306        skb->destructor = io_destruct_skb;
2307
2308        fpl->user = get_uid(ctx->user);
2309        for (i = 0; i < nr; i++) {
2310                fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2311                unix_inflight(fpl->user, fpl->fp[i]);
2312        }
2313
2314        fpl->max = fpl->count = nr;
2315        UNIXCB(skb).fp = fpl;
2316        refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2317        skb_queue_head(&sk->sk_receive_queue, skb);
2318
2319        for (i = 0; i < nr; i++)
2320                fput(fpl->fp[i]);
2321
2322        return 0;
2323}
2324
2325/*
2326 * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2327 * causes regular reference counting to break down. We rely on the UNIX
2328 * garbage collection to take care of this problem for us.
2329 */
2330static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2331{
2332        unsigned left, total;
2333        int ret = 0;
2334
2335        total = 0;
2336        left = ctx->nr_user_files;
2337        while (left) {
2338                unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2339
2340                ret = __io_sqe_files_scm(ctx, this_files, total);
2341                if (ret)
2342                        break;
2343                left -= this_files;
2344                total += this_files;
2345        }
2346
2347        if (!ret)
2348                return 0;
2349
2350        while (total < ctx->nr_user_files) {
2351                fput(ctx->user_files[total]);
2352                total++;
2353        }
2354
2355        return ret;
2356}
2357#else
2358static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2359{
2360        return 0;
2361}
2362#endif
2363
2364static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2365                                 unsigned nr_args)
2366{
2367        __s32 __user *fds = (__s32 __user *) arg;
2368        int fd, ret = 0;
2369        unsigned i;
2370
2371        if (ctx->user_files)
2372                return -EBUSY;
2373        if (!nr_args)
2374                return -EINVAL;
2375        if (nr_args > IORING_MAX_FIXED_FILES)
2376                return -EMFILE;
2377
2378        ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2379        if (!ctx->user_files)
2380                return -ENOMEM;
2381
2382        for (i = 0; i < nr_args; i++) {
2383                ret = -EFAULT;
2384                if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2385                        break;
2386
2387                ctx->user_files[i] = fget(fd);
2388
2389                ret = -EBADF;
2390                if (!ctx->user_files[i])
2391                        break;
2392                /*
2393                 * Don't allow io_uring instances to be registered. If UNIX
2394                 * isn't enabled, then this causes a reference cycle and this
2395                 * instance can never get freed. If UNIX is enabled we'll
2396                 * handle it just fine, but there's still no point in allowing
2397                 * a ring fd as it doesn't support regular read/write anyway.
2398                 */
2399                if (ctx->user_files[i]->f_op == &io_uring_fops) {
2400                        fput(ctx->user_files[i]);
2401                        break;
2402                }
2403                ctx->nr_user_files++;
2404                ret = 0;
2405        }
2406
2407        if (ret) {
2408                for (i = 0; i < ctx->nr_user_files; i++)
2409                        fput(ctx->user_files[i]);
2410
2411                kfree(ctx->user_files);
2412                ctx->user_files = NULL;
2413                ctx->nr_user_files = 0;
2414                return ret;
2415        }
2416
2417        ret = io_sqe_files_scm(ctx);
2418        if (ret)
2419                io_sqe_files_unregister(ctx);
2420
2421        return ret;
2422}
2423
2424static int io_sq_offload_start(struct io_ring_ctx *ctx,
2425                               struct io_uring_params *p)
2426{
2427        int ret;
2428
2429        init_waitqueue_head(&ctx->sqo_wait);
2430        mmgrab(current->mm);
2431        ctx->sqo_mm = current->mm;
2432
2433        if (ctx->flags & IORING_SETUP_SQPOLL) {
2434                ret = -EPERM;
2435                if (!capable(CAP_SYS_ADMIN))
2436                        goto err;
2437
2438                ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2439                if (!ctx->sq_thread_idle)
2440                        ctx->sq_thread_idle = HZ;
2441
2442                if (p->flags & IORING_SETUP_SQ_AFF) {
2443                        int cpu = p->sq_thread_cpu;
2444
2445                        ret = -EINVAL;
2446                        if (cpu >= nr_cpu_ids)
2447                                goto err;
2448                        if (!cpu_online(cpu))
2449                                goto err;
2450
2451                        ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2452                                                        ctx, cpu,
2453                                                        "io_uring-sq");
2454                } else {
2455                        ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2456                                                        "io_uring-sq");
2457                }
2458                if (IS_ERR(ctx->sqo_thread)) {
2459                        ret = PTR_ERR(ctx->sqo_thread);
2460                        ctx->sqo_thread = NULL;
2461                        goto err;
2462                }
2463                wake_up_process(ctx->sqo_thread);
2464        } else if (p->flags & IORING_SETUP_SQ_AFF) {
2465                /* Can't have SQ_AFF without SQPOLL */
2466                ret = -EINVAL;
2467                goto err;
2468        }
2469
2470        /* Do QD, or 2 * CPUS, whatever is smallest */
2471        ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2472                        min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2473        if (!ctx->sqo_wq) {
2474                ret = -ENOMEM;
2475                goto err;
2476        }
2477
2478        return 0;
2479err:
2480        io_sq_thread_stop(ctx);
2481        mmdrop(ctx->sqo_mm);
2482        ctx->sqo_mm = NULL;
2483        return ret;
2484}
2485
2486static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2487{
2488        atomic_long_sub(nr_pages, &user->locked_vm);
2489}
2490
2491static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2492{
2493        unsigned long page_limit, cur_pages, new_pages;
2494
2495        /* Don't allow more pages than we can safely lock */
2496        page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2497
2498        do {
2499                cur_pages = atomic_long_read(&user->locked_vm);
2500                new_pages = cur_pages + nr_pages;
2501                if (new_pages > page_limit)
2502                        return -ENOMEM;
2503        } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2504                                        new_pages) != cur_pages);
2505
2506        return 0;
2507}
2508
2509static void io_mem_free(void *ptr)
2510{
2511        struct page *page;
2512
2513        if (!ptr)
2514                return;
2515
2516        page = virt_to_head_page(ptr);
2517        if (put_page_testzero(page))
2518                free_compound_page(page);
2519}
2520
2521static void *io_mem_alloc(size_t size)
2522{
2523        gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2524                                __GFP_NORETRY;
2525
2526        return (void *) __get_free_pages(gfp_flags, get_order(size));
2527}
2528
2529static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2530{
2531        struct io_sq_ring *sq_ring;
2532        struct io_cq_ring *cq_ring;
2533        size_t bytes;
2534
2535        bytes = struct_size(sq_ring, array, sq_entries);
2536        bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2537        bytes += struct_size(cq_ring, cqes, cq_entries);
2538
2539        return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2540}
2541
2542static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2543{
2544        int i, j;
2545
2546        if (!ctx->user_bufs)
2547                return -ENXIO;
2548
2549        for (i = 0; i < ctx->nr_user_bufs; i++) {
2550                struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2551
2552                for (j = 0; j < imu->nr_bvecs; j++)
2553                        put_page(imu->bvec[j].bv_page);
2554
2555                if (ctx->account_mem)
2556                        io_unaccount_mem(ctx->user, imu->nr_bvecs);
2557                kvfree(imu->bvec);
2558                imu->nr_bvecs = 0;
2559        }
2560
2561        kfree(ctx->user_bufs);
2562        ctx->user_bufs = NULL;
2563        ctx->nr_user_bufs = 0;
2564        return 0;
2565}
2566
2567static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2568                       void __user *arg, unsigned index)
2569{
2570        struct iovec __user *src;
2571
2572#ifdef CONFIG_COMPAT
2573        if (ctx->compat) {
2574                struct compat_iovec __user *ciovs;
2575                struct compat_iovec ciov;
2576
2577                ciovs = (struct compat_iovec __user *) arg;
2578                if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2579                        return -EFAULT;
2580
2581                dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2582                dst->iov_len = ciov.iov_len;
2583                return 0;
2584        }
2585#endif
2586        src = (struct iovec __user *) arg;
2587        if (copy_from_user(dst, &src[index], sizeof(*dst)))
2588                return -EFAULT;
2589        return 0;
2590}
2591
2592static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2593                                  unsigned nr_args)
2594{
2595        struct vm_area_struct **vmas = NULL;
2596        struct page **pages = NULL;
2597        int i, j, got_pages = 0;
2598        int ret = -EINVAL;
2599
2600        if (ctx->user_bufs)
2601                return -EBUSY;
2602        if (!nr_args || nr_args > UIO_MAXIOV)
2603                return -EINVAL;
2604
2605        ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2606                                        GFP_KERNEL);
2607        if (!ctx->user_bufs)
2608                return -ENOMEM;
2609
2610        for (i = 0; i < nr_args; i++) {
2611                struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2612                unsigned long off, start, end, ubuf;
2613                int pret, nr_pages;
2614                struct iovec iov;
2615                size_t size;
2616
2617                ret = io_copy_iov(ctx, &iov, arg, i);
2618                if (ret)
2619                        goto err;
2620
2621                /*
2622                 * Don't impose further limits on the size and buffer
2623                 * constraints here, we'll -EINVAL later when IO is
2624                 * submitted if they are wrong.
2625                 */
2626                ret = -EFAULT;
2627                if (!iov.iov_base || !iov.iov_len)
2628                        goto err;
2629
2630                /* arbitrary limit, but we need something */
2631                if (iov.iov_len > SZ_1G)
2632                        goto err;
2633
2634                ubuf = (unsigned long) iov.iov_base;
2635                end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2636                start = ubuf >> PAGE_SHIFT;
2637                nr_pages = end - start;
2638
2639                if (ctx->account_mem) {
2640                        ret = io_account_mem(ctx->user, nr_pages);
2641                        if (ret)
2642                                goto err;
2643                }
2644
2645                ret = 0;
2646                if (!pages || nr_pages > got_pages) {
2647                        kfree(vmas);
2648                        kfree(pages);
2649                        pages = kvmalloc_array(nr_pages, sizeof(struct page *),
2650                                                GFP_KERNEL);
2651                        vmas = kvmalloc_array(nr_pages,
2652                                        sizeof(struct vm_area_struct *),
2653                                        GFP_KERNEL);
2654                        if (!pages || !vmas) {
2655                                ret = -ENOMEM;
2656                                if (ctx->account_mem)
2657                                        io_unaccount_mem(ctx->user, nr_pages);
2658                                goto err;
2659                        }
2660                        got_pages = nr_pages;
2661                }
2662
2663                imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
2664                                                GFP_KERNEL);
2665                ret = -ENOMEM;
2666                if (!imu->bvec) {
2667                        if (ctx->account_mem)
2668                                io_unaccount_mem(ctx->user, nr_pages);
2669                        goto err;
2670                }
2671
2672                ret = 0;
2673                down_read(&current->mm->mmap_sem);
2674                pret = get_user_pages(ubuf, nr_pages,
2675                                      FOLL_WRITE | FOLL_LONGTERM,
2676                                      pages, vmas);
2677                if (pret == nr_pages) {
2678                        /* don't support file backed memory */
2679                        for (j = 0; j < nr_pages; j++) {
2680                                struct vm_area_struct *vma = vmas[j];
2681
2682                                if (vma->vm_file &&
2683                                    !is_file_hugepages(vma->vm_file)) {
2684                                        ret = -EOPNOTSUPP;
2685                                        break;
2686                                }
2687                        }
2688                } else {
2689                        ret = pret < 0 ? pret : -EFAULT;
2690                }
2691                up_read(&current->mm->mmap_sem);
2692                if (ret) {
2693                        /*
2694                         * if we did partial map, or found file backed vmas,
2695                         * release any pages we did get
2696                         */
2697                        if (pret > 0) {
2698                                for (j = 0; j < pret; j++)
2699                                        put_page(pages[j]);
2700                        }
2701                        if (ctx->account_mem)
2702                                io_unaccount_mem(ctx->user, nr_pages);
2703                        kvfree(imu->bvec);
2704                        goto err;
2705                }
2706
2707                off = ubuf & ~PAGE_MASK;
2708                size = iov.iov_len;
2709                for (j = 0; j < nr_pages; j++) {
2710                        size_t vec_len;
2711
2712                        vec_len = min_t(size_t, size, PAGE_SIZE - off);
2713                        imu->bvec[j].bv_page = pages[j];
2714                        imu->bvec[j].bv_len = vec_len;
2715                        imu->bvec[j].bv_offset = off;
2716                        off = 0;
2717                        size -= vec_len;
2718                }
2719                /* store original address for later verification */
2720                imu->ubuf = ubuf;
2721                imu->len = iov.iov_len;
2722                imu->nr_bvecs = nr_pages;
2723
2724                ctx->nr_user_bufs++;
2725        }
2726        kvfree(pages);
2727        kvfree(vmas);
2728        return 0;
2729err:
2730        kvfree(pages);
2731        kvfree(vmas);
2732        io_sqe_buffer_unregister(ctx);
2733        return ret;
2734}
2735
2736static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
2737{
2738        __s32 __user *fds = arg;
2739        int fd;
2740
2741        if (ctx->cq_ev_fd)
2742                return -EBUSY;
2743
2744        if (copy_from_user(&fd, fds, sizeof(*fds)))
2745                return -EFAULT;
2746
2747        ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
2748        if (IS_ERR(ctx->cq_ev_fd)) {
2749                int ret = PTR_ERR(ctx->cq_ev_fd);
2750                ctx->cq_ev_fd = NULL;
2751                return ret;
2752        }
2753
2754        return 0;
2755}
2756
2757static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2758{
2759        if (ctx->cq_ev_fd) {
2760                eventfd_ctx_put(ctx->cq_ev_fd);
2761                ctx->cq_ev_fd = NULL;
2762                return 0;
2763        }
2764
2765        return -ENXIO;
2766}
2767
2768static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2769{
2770        io_finish_async(ctx);
2771        if (ctx->sqo_mm)
2772                mmdrop(ctx->sqo_mm);
2773
2774        io_iopoll_reap_events(ctx);
2775        io_sqe_buffer_unregister(ctx);
2776        io_sqe_files_unregister(ctx);
2777        io_eventfd_unregister(ctx);
2778
2779#if defined(CONFIG_UNIX)
2780        if (ctx->ring_sock) {
2781                ctx->ring_sock->file = NULL; /* so that iput() is called */
2782                sock_release(ctx->ring_sock);
2783        }
2784#endif
2785
2786        io_mem_free(ctx->sq_ring);
2787        io_mem_free(ctx->sq_sqes);
2788        io_mem_free(ctx->cq_ring);
2789
2790        percpu_ref_exit(&ctx->refs);
2791        if (ctx->account_mem)
2792                io_unaccount_mem(ctx->user,
2793                                ring_pages(ctx->sq_entries, ctx->cq_entries));
2794        free_uid(ctx->user);
2795        kfree(ctx);
2796}
2797
2798static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2799{
2800        struct io_ring_ctx *ctx = file->private_data;
2801        __poll_t mask = 0;
2802
2803        poll_wait(file, &ctx->cq_wait, wait);
2804        /*
2805         * synchronizes with barrier from wq_has_sleeper call in
2806         * io_commit_cqring
2807         */
2808        smp_rmb();
2809        if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
2810            ctx->sq_ring->ring_entries)
2811                mask |= EPOLLOUT | EPOLLWRNORM;
2812        if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2813                mask |= EPOLLIN | EPOLLRDNORM;
2814
2815        return mask;
2816}
2817
2818static int io_uring_fasync(int fd, struct file *file, int on)
2819{
2820        struct io_ring_ctx *ctx = file->private_data;
2821
2822        return fasync_helper(fd, file, on, &ctx->cq_fasync);
2823}
2824
2825static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2826{
2827        mutex_lock(&ctx->uring_lock);
2828        percpu_ref_kill(&ctx->refs);
2829        mutex_unlock(&ctx->uring_lock);
2830
2831        io_poll_remove_all(ctx);
2832        io_iopoll_reap_events(ctx);
2833        wait_for_completion(&ctx->ctx_done);
2834        io_ring_ctx_free(ctx);
2835}
2836
2837static int io_uring_release(struct inode *inode, struct file *file)
2838{
2839        struct io_ring_ctx *ctx = file->private_data;
2840
2841        file->private_data = NULL;
2842        io_ring_ctx_wait_and_kill(ctx);
2843        return 0;
2844}
2845
2846static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2847{
2848        loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2849        unsigned long sz = vma->vm_end - vma->vm_start;
2850        struct io_ring_ctx *ctx = file->private_data;
2851        unsigned long pfn;
2852        struct page *page;
2853        void *ptr;
2854
2855        switch (offset) {
2856        case IORING_OFF_SQ_RING:
2857                ptr = ctx->sq_ring;
2858                break;
2859        case IORING_OFF_SQES:
2860                ptr = ctx->sq_sqes;
2861                break;
2862        case IORING_OFF_CQ_RING:
2863                ptr = ctx->cq_ring;
2864                break;
2865        default:
2866                return -EINVAL;
2867        }
2868
2869        page = virt_to_head_page(ptr);
2870        if (sz > (PAGE_SIZE << compound_order(page)))
2871                return -EINVAL;
2872
2873        pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2874        return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2875}
2876
2877SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2878                u32, min_complete, u32, flags, const sigset_t __user *, sig,
2879                size_t, sigsz)
2880{
2881        struct io_ring_ctx *ctx;
2882        long ret = -EBADF;
2883        int submitted = 0;
2884        struct fd f;
2885
2886        if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2887                return -EINVAL;
2888
2889        f = fdget(fd);
2890        if (!f.file)
2891                return -EBADF;
2892
2893        ret = -EOPNOTSUPP;
2894        if (f.file->f_op != &io_uring_fops)
2895                goto out_fput;
2896
2897        ret = -ENXIO;
2898        ctx = f.file->private_data;
2899        if (!percpu_ref_tryget(&ctx->refs))
2900                goto out_fput;
2901
2902        /*
2903         * For SQ polling, the thread will do all submissions and completions.
2904         * Just return the requested submit count, and wake the thread if
2905         * we were asked to.
2906         */
2907        if (ctx->flags & IORING_SETUP_SQPOLL) {
2908                if (flags & IORING_ENTER_SQ_WAKEUP)
2909                        wake_up(&ctx->sqo_wait);
2910                submitted = to_submit;
2911                goto out_ctx;
2912        }
2913
2914        ret = 0;
2915        if (to_submit) {
2916                to_submit = min(to_submit, ctx->sq_entries);
2917
2918                mutex_lock(&ctx->uring_lock);
2919                submitted = io_ring_submit(ctx, to_submit);
2920                mutex_unlock(&ctx->uring_lock);
2921        }
2922        if (flags & IORING_ENTER_GETEVENTS) {
2923                unsigned nr_events = 0;
2924
2925                min_complete = min(min_complete, ctx->cq_entries);
2926
2927                if (ctx->flags & IORING_SETUP_IOPOLL) {
2928                        mutex_lock(&ctx->uring_lock);
2929                        ret = io_iopoll_check(ctx, &nr_events, min_complete);
2930                        mutex_unlock(&ctx->uring_lock);
2931                } else {
2932                        ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2933                }
2934        }
2935
2936out_ctx:
2937        io_ring_drop_ctx_refs(ctx, 1);
2938out_fput:
2939        fdput(f);
2940        return submitted ? submitted : ret;
2941}
2942
2943static const struct file_operations io_uring_fops = {
2944        .release        = io_uring_release,
2945        .mmap           = io_uring_mmap,
2946        .poll           = io_uring_poll,
2947        .fasync         = io_uring_fasync,
2948};
2949
2950static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2951                                  struct io_uring_params *p)
2952{
2953        struct io_sq_ring *sq_ring;
2954        struct io_cq_ring *cq_ring;
2955        size_t size;
2956
2957        sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2958        if (!sq_ring)
2959                return -ENOMEM;
2960
2961        ctx->sq_ring = sq_ring;
2962        sq_ring->ring_mask = p->sq_entries - 1;
2963        sq_ring->ring_entries = p->sq_entries;
2964        ctx->sq_mask = sq_ring->ring_mask;
2965        ctx->sq_entries = sq_ring->ring_entries;
2966
2967        size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2968        if (size == SIZE_MAX)
2969                return -EOVERFLOW;
2970
2971        ctx->sq_sqes = io_mem_alloc(size);
2972        if (!ctx->sq_sqes)
2973                return -ENOMEM;
2974
2975        cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2976        if (!cq_ring)
2977                return -ENOMEM;
2978
2979        ctx->cq_ring = cq_ring;
2980        cq_ring->ring_mask = p->cq_entries - 1;
2981        cq_ring->ring_entries = p->cq_entries;
2982        ctx->cq_mask = cq_ring->ring_mask;
2983        ctx->cq_entries = cq_ring->ring_entries;
2984        return 0;
2985}
2986
2987/*
2988 * Allocate an anonymous fd, this is what constitutes the application
2989 * visible backing of an io_uring instance. The application mmaps this
2990 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
2991 * we have to tie this fd to a socket for file garbage collection purposes.
2992 */
2993static int io_uring_get_fd(struct io_ring_ctx *ctx)
2994{
2995        struct file *file;
2996        int ret;
2997
2998#if defined(CONFIG_UNIX)
2999        ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3000                                &ctx->ring_sock);
3001        if (ret)
3002                return ret;
3003#endif
3004
3005        ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3006        if (ret < 0)
3007                goto err;
3008
3009        file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3010                                        O_RDWR | O_CLOEXEC);
3011        if (IS_ERR(file)) {
3012                put_unused_fd(ret);
3013                ret = PTR_ERR(file);
3014                goto err;
3015        }
3016
3017#if defined(CONFIG_UNIX)
3018        ctx->ring_sock->file = file;
3019        ctx->ring_sock->sk->sk_user_data = ctx;
3020#endif
3021        fd_install(ret, file);
3022        return ret;
3023err:
3024#if defined(CONFIG_UNIX)
3025        sock_release(ctx->ring_sock);
3026        ctx->ring_sock = NULL;
3027#endif
3028        return ret;
3029}
3030
3031static int io_uring_create(unsigned entries, struct io_uring_params *p)
3032{
3033        struct user_struct *user = NULL;
3034        struct io_ring_ctx *ctx;
3035        bool account_mem;
3036        int ret;
3037
3038        if (!entries || entries > IORING_MAX_ENTRIES)
3039                return -EINVAL;
3040
3041        /*
3042         * Use twice as many entries for the CQ ring. It's possible for the
3043         * application to drive a higher depth than the size of the SQ ring,
3044         * since the sqes are only used at submission time. This allows for
3045         * some flexibility in overcommitting a bit.
3046         */
3047        p->sq_entries = roundup_pow_of_two(entries);
3048        p->cq_entries = 2 * p->sq_entries;
3049
3050        user = get_uid(current_user());
3051        account_mem = !capable(CAP_IPC_LOCK);
3052
3053        if (account_mem) {
3054                ret = io_account_mem(user,
3055                                ring_pages(p->sq_entries, p->cq_entries));
3056                if (ret) {
3057                        free_uid(user);
3058                        return ret;
3059                }
3060        }
3061
3062        ctx = io_ring_ctx_alloc(p);
3063        if (!ctx) {
3064                if (account_mem)
3065                        io_unaccount_mem(user, ring_pages(p->sq_entries,
3066                                                                p->cq_entries));
3067                free_uid(user);
3068                return -ENOMEM;
3069        }
3070        ctx->compat = in_compat_syscall();
3071        ctx->account_mem = account_mem;
3072        ctx->user = user;
3073
3074        ret = io_allocate_scq_urings(ctx, p);
3075        if (ret)
3076                goto err;
3077
3078        ret = io_sq_offload_start(ctx, p);
3079        if (ret)
3080                goto err;
3081
3082        ret = io_uring_get_fd(ctx);
3083        if (ret < 0)
3084                goto err;
3085
3086        memset(&p->sq_off, 0, sizeof(p->sq_off));
3087        p->sq_off.head = offsetof(struct io_sq_ring, r.head);
3088        p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
3089        p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
3090        p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
3091        p->sq_off.flags = offsetof(struct io_sq_ring, flags);
3092        p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
3093        p->sq_off.array = offsetof(struct io_sq_ring, array);
3094
3095        memset(&p->cq_off, 0, sizeof(p->cq_off));
3096        p->cq_off.head = offsetof(struct io_cq_ring, r.head);
3097        p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
3098        p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
3099        p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
3100        p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
3101        p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
3102        return ret;
3103err:
3104        io_ring_ctx_wait_and_kill(ctx);
3105        return ret;
3106}
3107
3108/*
3109 * Sets up an aio uring context, and returns the fd. Applications asks for a
3110 * ring size, we return the actual sq/cq ring sizes (among other things) in the
3111 * params structure passed in.
3112 */
3113static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3114{
3115        struct io_uring_params p;
3116        long ret;
3117        int i;
3118
3119        if (copy_from_user(&p, params, sizeof(p)))
3120                return -EFAULT;
3121        for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3122                if (p.resv[i])
3123                        return -EINVAL;
3124        }
3125
3126        if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3127                        IORING_SETUP_SQ_AFF))
3128                return -EINVAL;
3129
3130        ret = io_uring_create(entries, &p);
3131        if (ret < 0)
3132                return ret;
3133
3134        if (copy_to_user(params, &p, sizeof(p)))
3135                return -EFAULT;
3136
3137        return ret;
3138}
3139
3140SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3141                struct io_uring_params __user *, params)
3142{
3143        return io_uring_setup(entries, params);
3144}
3145
3146static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3147                               void __user *arg, unsigned nr_args)
3148        __releases(ctx->uring_lock)
3149        __acquires(ctx->uring_lock)
3150{
3151        int ret;
3152
3153        /*
3154         * We're inside the ring mutex, if the ref is already dying, then
3155         * someone else killed the ctx or is already going through
3156         * io_uring_register().
3157         */
3158        if (percpu_ref_is_dying(&ctx->refs))
3159                return -ENXIO;
3160
3161        percpu_ref_kill(&ctx->refs);
3162
3163        /*
3164         * Drop uring mutex before waiting for references to exit. If another
3165         * thread is currently inside io_uring_enter() it might need to grab
3166         * the uring_lock to make progress. If we hold it here across the drain
3167         * wait, then we can deadlock. It's safe to drop the mutex here, since
3168         * no new references will come in after we've killed the percpu ref.
3169         */
3170        mutex_unlock(&ctx->uring_lock);
3171        wait_for_completion(&ctx->ctx_done);
3172        mutex_lock(&ctx->uring_lock);
3173
3174        switch (opcode) {
3175        case IORING_REGISTER_BUFFERS:
3176                ret = io_sqe_buffer_register(ctx, arg, nr_args);
3177                break;
3178        case IORING_UNREGISTER_BUFFERS:
3179                ret = -EINVAL;
3180                if (arg || nr_args)
3181                        break;
3182                ret = io_sqe_buffer_unregister(ctx);
3183                break;
3184        case IORING_REGISTER_FILES:
3185                ret = io_sqe_files_register(ctx, arg, nr_args);
3186                break;
3187        case IORING_UNREGISTER_FILES:
3188                ret = -EINVAL;
3189                if (arg || nr_args)
3190                        break;
3191                ret = io_sqe_files_unregister(ctx);
3192                break;
3193        case IORING_REGISTER_EVENTFD:
3194                ret = -EINVAL;
3195                if (nr_args != 1)
3196                        break;
3197                ret = io_eventfd_register(ctx, arg);
3198                break;
3199        case IORING_UNREGISTER_EVENTFD:
3200                ret = -EINVAL;
3201                if (arg || nr_args)
3202                        break;
3203                ret = io_eventfd_unregister(ctx);
3204                break;
3205        default:
3206                ret = -EINVAL;
3207                break;
3208        }
3209
3210        /* bring the ctx back to life */
3211        reinit_completion(&ctx->ctx_done);
3212        percpu_ref_reinit(&ctx->refs);
3213        return ret;
3214}
3215
3216SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3217                void __user *, arg, unsigned int, nr_args)
3218{
3219        struct io_ring_ctx *ctx;
3220        long ret = -EBADF;
3221        struct fd f;
3222
3223        f = fdget(fd);
3224        if (!f.file)
3225                return -EBADF;
3226
3227        ret = -EOPNOTSUPP;
3228        if (f.file->f_op != &io_uring_fops)
3229                goto out_fput;
3230
3231        ctx = f.file->private_data;
3232
3233        mutex_lock(&ctx->uring_lock);
3234        ret = __io_uring_register(ctx, opcode, arg, nr_args);
3235        mutex_unlock(&ctx->uring_lock);
3236out_fput:
3237        fdput(f);
3238        return ret;
3239}
3240
3241static int __init io_uring_init(void)
3242{
3243        req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3244        return 0;
3245};
3246__initcall(io_uring_init);
3247