linux/fs/io_uring.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Shared application/kernel submission and completion ring pairs, for
   4 * supporting fast/efficient IO.
   5 *
   6 * A note on the read/write ordering memory barriers that are matched between
   7 * the application and kernel side.
   8 *
   9 * After the application reads the CQ ring tail, it must use an
  10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
  11 * before writing the tail (using smp_load_acquire to read the tail will
  12 * do). It also needs a smp_mb() before updating CQ head (ordering the
  13 * entry load(s) with the head store), pairing with an implicit barrier
  14 * through a control-dependency in io_get_cqring (smp_store_release to
  15 * store head will do). Failure to do so could lead to reading invalid
  16 * CQ entries.
  17 *
  18 * Likewise, the application must use an appropriate smp_wmb() before
  19 * writing the SQ tail (ordering SQ entry stores with the tail store),
  20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
  21 * to store the tail will do). And it needs a barrier ordering the SQ
  22 * head load before writing new SQ entries (smp_load_acquire to read
  23 * head will do).
  24 *
  25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
  26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
  27 * updating the SQ tail; a full memory barrier smp_mb() is needed
  28 * between.
  29 *
  30 * Also see the examples in the liburing library:
  31 *
  32 *      git://git.kernel.dk/liburing
  33 *
  34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
  35 * from data shared between the kernel and application. This is done both
  36 * for ordering purposes, but also to ensure that once a value is loaded from
  37 * data that the application could potentially modify, it remains stable.
  38 *
  39 * Copyright (C) 2018-2019 Jens Axboe
  40 * Copyright (c) 2018-2019 Christoph Hellwig
  41 */
  42#include <linux/kernel.h>
  43#include <linux/init.h>
  44#include <linux/errno.h>
  45#include <linux/syscalls.h>
  46#include <linux/compat.h>
  47#include <linux/refcount.h>
  48#include <linux/uio.h>
  49
  50#include <linux/sched/signal.h>
  51#include <linux/fs.h>
  52#include <linux/file.h>
  53#include <linux/fdtable.h>
  54#include <linux/mm.h>
  55#include <linux/mman.h>
  56#include <linux/mmu_context.h>
  57#include <linux/percpu.h>
  58#include <linux/slab.h>
  59#include <linux/workqueue.h>
  60#include <linux/kthread.h>
  61#include <linux/blkdev.h>
  62#include <linux/bvec.h>
  63#include <linux/net.h>
  64#include <net/sock.h>
  65#include <net/af_unix.h>
  66#include <net/scm.h>
  67#include <linux/anon_inodes.h>
  68#include <linux/sched/mm.h>
  69#include <linux/uaccess.h>
  70#include <linux/nospec.h>
  71#include <linux/sizes.h>
  72#include <linux/hugetlb.h>
  73
  74#include <uapi/linux/io_uring.h>
  75
  76#include "internal.h"
  77
  78#define IORING_MAX_ENTRIES      4096
  79#define IORING_MAX_FIXED_FILES  1024
  80
  81struct io_uring {
  82        u32 head ____cacheline_aligned_in_smp;
  83        u32 tail ____cacheline_aligned_in_smp;
  84};
  85
  86/*
  87 * This data is shared with the application through the mmap at offset
  88 * IORING_OFF_SQ_RING.
  89 *
  90 * The offsets to the member fields are published through struct
  91 * io_sqring_offsets when calling io_uring_setup.
  92 */
  93struct io_sq_ring {
  94        /*
  95         * Head and tail offsets into the ring; the offsets need to be
  96         * masked to get valid indices.
  97         *
  98         * The kernel controls head and the application controls tail.
  99         */
 100        struct io_uring         r;
 101        /*
 102         * Bitmask to apply to head and tail offsets (constant, equals
 103         * ring_entries - 1)
 104         */
 105        u32                     ring_mask;
 106        /* Ring size (constant, power of 2) */
 107        u32                     ring_entries;
 108        /*
 109         * Number of invalid entries dropped by the kernel due to
 110         * invalid index stored in array
 111         *
 112         * Written by the kernel, shouldn't be modified by the
 113         * application (i.e. get number of "new events" by comparing to
 114         * cached value).
 115         *
 116         * After a new SQ head value was read by the application this
 117         * counter includes all submissions that were dropped reaching
 118         * the new SQ head (and possibly more).
 119         */
 120        u32                     dropped;
 121        /*
 122         * Runtime flags
 123         *
 124         * Written by the kernel, shouldn't be modified by the
 125         * application.
 126         *
 127         * The application needs a full memory barrier before checking
 128         * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
 129         */
 130        u32                     flags;
 131        /*
 132         * Ring buffer of indices into array of io_uring_sqe, which is
 133         * mmapped by the application using the IORING_OFF_SQES offset.
 134         *
 135         * This indirection could e.g. be used to assign fixed
 136         * io_uring_sqe entries to operations and only submit them to
 137         * the queue when needed.
 138         *
 139         * The kernel modifies neither the indices array nor the entries
 140         * array.
 141         */
 142        u32                     array[];
 143};
 144
 145/*
 146 * This data is shared with the application through the mmap at offset
 147 * IORING_OFF_CQ_RING.
 148 *
 149 * The offsets to the member fields are published through struct
 150 * io_cqring_offsets when calling io_uring_setup.
 151 */
 152struct io_cq_ring {
 153        /*
 154         * Head and tail offsets into the ring; the offsets need to be
 155         * masked to get valid indices.
 156         *
 157         * The application controls head and the kernel tail.
 158         */
 159        struct io_uring         r;
 160        /*
 161         * Bitmask to apply to head and tail offsets (constant, equals
 162         * ring_entries - 1)
 163         */
 164        u32                     ring_mask;
 165        /* Ring size (constant, power of 2) */
 166        u32                     ring_entries;
 167        /*
 168         * Number of completion events lost because the queue was full;
 169         * this should be avoided by the application by making sure
 170         * there are not more requests pending thatn there is space in
 171         * the completion queue.
 172         *
 173         * Written by the kernel, shouldn't be modified by the
 174         * application (i.e. get number of "new events" by comparing to
 175         * cached value).
 176         *
 177         * As completion events come in out of order this counter is not
 178         * ordered with any other data.
 179         */
 180        u32                     overflow;
 181        /*
 182         * Ring buffer of completion events.
 183         *
 184         * The kernel writes completion events fresh every time they are
 185         * produced, so the application is allowed to modify pending
 186         * entries.
 187         */
 188        struct io_uring_cqe     cqes[];
 189};
 190
 191struct io_mapped_ubuf {
 192        u64             ubuf;
 193        size_t          len;
 194        struct          bio_vec *bvec;
 195        unsigned int    nr_bvecs;
 196};
 197
 198struct async_list {
 199        spinlock_t              lock;
 200        atomic_t                cnt;
 201        struct list_head        list;
 202
 203        struct file             *file;
 204        off_t                   io_end;
 205        size_t                  io_pages;
 206};
 207
 208struct io_ring_ctx {
 209        struct {
 210                struct percpu_ref       refs;
 211        } ____cacheline_aligned_in_smp;
 212
 213        struct {
 214                unsigned int            flags;
 215                bool                    compat;
 216                bool                    account_mem;
 217
 218                /* SQ ring */
 219                struct io_sq_ring       *sq_ring;
 220                unsigned                cached_sq_head;
 221                unsigned                sq_entries;
 222                unsigned                sq_mask;
 223                unsigned                sq_thread_idle;
 224                struct io_uring_sqe     *sq_sqes;
 225        } ____cacheline_aligned_in_smp;
 226
 227        /* IO offload */
 228        struct workqueue_struct *sqo_wq;
 229        struct task_struct      *sqo_thread;    /* if using sq thread polling */
 230        struct mm_struct        *sqo_mm;
 231        wait_queue_head_t       sqo_wait;
 232        unsigned                sqo_stop;
 233
 234        struct {
 235                /* CQ ring */
 236                struct io_cq_ring       *cq_ring;
 237                unsigned                cached_cq_tail;
 238                unsigned                cq_entries;
 239                unsigned                cq_mask;
 240                struct wait_queue_head  cq_wait;
 241                struct fasync_struct    *cq_fasync;
 242        } ____cacheline_aligned_in_smp;
 243
 244        /*
 245         * If used, fixed file set. Writers must ensure that ->refs is dead,
 246         * readers must ensure that ->refs is alive as long as the file* is
 247         * used. Only updated through io_uring_register(2).
 248         */
 249        struct file             **user_files;
 250        unsigned                nr_user_files;
 251
 252        /* if used, fixed mapped user buffers */
 253        unsigned                nr_user_bufs;
 254        struct io_mapped_ubuf   *user_bufs;
 255
 256        struct user_struct      *user;
 257
 258        struct completion       ctx_done;
 259
 260        struct {
 261                struct mutex            uring_lock;
 262                wait_queue_head_t       wait;
 263        } ____cacheline_aligned_in_smp;
 264
 265        struct {
 266                spinlock_t              completion_lock;
 267                bool                    poll_multi_file;
 268                /*
 269                 * ->poll_list is protected by the ctx->uring_lock for
 270                 * io_uring instances that don't use IORING_SETUP_SQPOLL.
 271                 * For SQPOLL, only the single threaded io_sq_thread() will
 272                 * manipulate the list, hence no extra locking is needed there.
 273                 */
 274                struct list_head        poll_list;
 275                struct list_head        cancel_list;
 276        } ____cacheline_aligned_in_smp;
 277
 278        struct async_list       pending_async[2];
 279
 280#if defined(CONFIG_UNIX)
 281        struct socket           *ring_sock;
 282#endif
 283};
 284
 285struct sqe_submit {
 286        const struct io_uring_sqe       *sqe;
 287        unsigned short                  index;
 288        bool                            has_user;
 289        bool                            needs_lock;
 290        bool                            needs_fixed_file;
 291};
 292
 293/*
 294 * First field must be the file pointer in all the
 295 * iocb unions! See also 'struct kiocb' in <linux/fs.h>
 296 */
 297struct io_poll_iocb {
 298        struct file                     *file;
 299        struct wait_queue_head          *head;
 300        __poll_t                        events;
 301        bool                            done;
 302        bool                            canceled;
 303        struct wait_queue_entry         wait;
 304};
 305
 306/*
 307 * NOTE! Each of the iocb union members has the file pointer
 308 * as the first entry in their struct definition. So you can
 309 * access the file pointer through any of the sub-structs,
 310 * or directly as just 'ki_filp' in this struct.
 311 */
 312struct io_kiocb {
 313        union {
 314                struct file             *file;
 315                struct kiocb            rw;
 316                struct io_poll_iocb     poll;
 317        };
 318
 319        struct sqe_submit       submit;
 320
 321        struct io_ring_ctx      *ctx;
 322        struct list_head        list;
 323        unsigned int            flags;
 324        refcount_t              refs;
 325#define REQ_F_NOWAIT            1       /* must not punt to workers */
 326#define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
 327#define REQ_F_FIXED_FILE        4       /* ctx owns file */
 328#define REQ_F_SEQ_PREV          8       /* sequential with previous */
 329#define REQ_F_PREPPED           16      /* prep already done */
 330        u64                     user_data;
 331        u64                     error;
 332
 333        struct work_struct      work;
 334};
 335
 336#define IO_PLUG_THRESHOLD               2
 337#define IO_IOPOLL_BATCH                 8
 338
 339struct io_submit_state {
 340        struct blk_plug         plug;
 341
 342        /*
 343         * io_kiocb alloc cache
 344         */
 345        void                    *reqs[IO_IOPOLL_BATCH];
 346        unsigned                int free_reqs;
 347        unsigned                int cur_req;
 348
 349        /*
 350         * File reference cache
 351         */
 352        struct file             *file;
 353        unsigned int            fd;
 354        unsigned int            has_refs;
 355        unsigned int            used_refs;
 356        unsigned int            ios_left;
 357};
 358
 359static struct kmem_cache *req_cachep;
 360
 361static const struct file_operations io_uring_fops;
 362
 363struct sock *io_uring_get_socket(struct file *file)
 364{
 365#if defined(CONFIG_UNIX)
 366        if (file->f_op == &io_uring_fops) {
 367                struct io_ring_ctx *ctx = file->private_data;
 368
 369                return ctx->ring_sock->sk;
 370        }
 371#endif
 372        return NULL;
 373}
 374EXPORT_SYMBOL(io_uring_get_socket);
 375
 376static void io_ring_ctx_ref_free(struct percpu_ref *ref)
 377{
 378        struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
 379
 380        complete(&ctx->ctx_done);
 381}
 382
 383static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 384{
 385        struct io_ring_ctx *ctx;
 386        int i;
 387
 388        ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
 389        if (!ctx)
 390                return NULL;
 391
 392        if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
 393                kfree(ctx);
 394                return NULL;
 395        }
 396
 397        ctx->flags = p->flags;
 398        init_waitqueue_head(&ctx->cq_wait);
 399        init_completion(&ctx->ctx_done);
 400        mutex_init(&ctx->uring_lock);
 401        init_waitqueue_head(&ctx->wait);
 402        for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
 403                spin_lock_init(&ctx->pending_async[i].lock);
 404                INIT_LIST_HEAD(&ctx->pending_async[i].list);
 405                atomic_set(&ctx->pending_async[i].cnt, 0);
 406        }
 407        spin_lock_init(&ctx->completion_lock);
 408        INIT_LIST_HEAD(&ctx->poll_list);
 409        INIT_LIST_HEAD(&ctx->cancel_list);
 410        return ctx;
 411}
 412
 413static void io_commit_cqring(struct io_ring_ctx *ctx)
 414{
 415        struct io_cq_ring *ring = ctx->cq_ring;
 416
 417        if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
 418                /* order cqe stores with ring update */
 419                smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
 420
 421                if (wq_has_sleeper(&ctx->cq_wait)) {
 422                        wake_up_interruptible(&ctx->cq_wait);
 423                        kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
 424                }
 425        }
 426}
 427
 428static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 429{
 430        struct io_cq_ring *ring = ctx->cq_ring;
 431        unsigned tail;
 432
 433        tail = ctx->cached_cq_tail;
 434        /*
 435         * writes to the cq entry need to come after reading head; the
 436         * control dependency is enough as we're using WRITE_ONCE to
 437         * fill the cq entry
 438         */
 439        if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
 440                return NULL;
 441
 442        ctx->cached_cq_tail++;
 443        return &ring->cqes[tail & ctx->cq_mask];
 444}
 445
 446static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 447                                 long res, unsigned ev_flags)
 448{
 449        struct io_uring_cqe *cqe;
 450
 451        /*
 452         * If we can't get a cq entry, userspace overflowed the
 453         * submission (by quite a lot). Increment the overflow count in
 454         * the ring.
 455         */
 456        cqe = io_get_cqring(ctx);
 457        if (cqe) {
 458                WRITE_ONCE(cqe->user_data, ki_user_data);
 459                WRITE_ONCE(cqe->res, res);
 460                WRITE_ONCE(cqe->flags, ev_flags);
 461        } else {
 462                unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
 463
 464                WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
 465        }
 466}
 467
 468static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
 469{
 470        if (waitqueue_active(&ctx->wait))
 471                wake_up(&ctx->wait);
 472        if (waitqueue_active(&ctx->sqo_wait))
 473                wake_up(&ctx->sqo_wait);
 474}
 475
 476static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
 477                                long res, unsigned ev_flags)
 478{
 479        unsigned long flags;
 480
 481        spin_lock_irqsave(&ctx->completion_lock, flags);
 482        io_cqring_fill_event(ctx, user_data, res, ev_flags);
 483        io_commit_cqring(ctx);
 484        spin_unlock_irqrestore(&ctx->completion_lock, flags);
 485
 486        io_cqring_ev_posted(ctx);
 487}
 488
 489static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
 490{
 491        percpu_ref_put_many(&ctx->refs, refs);
 492
 493        if (waitqueue_active(&ctx->wait))
 494                wake_up(&ctx->wait);
 495}
 496
 497static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
 498                                   struct io_submit_state *state)
 499{
 500        gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
 501        struct io_kiocb *req;
 502
 503        if (!percpu_ref_tryget(&ctx->refs))
 504                return NULL;
 505
 506        if (!state) {
 507                req = kmem_cache_alloc(req_cachep, gfp);
 508                if (unlikely(!req))
 509                        goto out;
 510        } else if (!state->free_reqs) {
 511                size_t sz;
 512                int ret;
 513
 514                sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
 515                ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
 516
 517                /*
 518                 * Bulk alloc is all-or-nothing. If we fail to get a batch,
 519                 * retry single alloc to be on the safe side.
 520                 */
 521                if (unlikely(ret <= 0)) {
 522                        state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
 523                        if (!state->reqs[0])
 524                                goto out;
 525                        ret = 1;
 526                }
 527                state->free_reqs = ret - 1;
 528                state->cur_req = 1;
 529                req = state->reqs[0];
 530        } else {
 531                req = state->reqs[state->cur_req];
 532                state->free_reqs--;
 533                state->cur_req++;
 534        }
 535
 536        req->ctx = ctx;
 537        req->flags = 0;
 538        /* one is dropped after submission, the other at completion */
 539        refcount_set(&req->refs, 2);
 540        return req;
 541out:
 542        io_ring_drop_ctx_refs(ctx, 1);
 543        return NULL;
 544}
 545
 546static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
 547{
 548        if (*nr) {
 549                kmem_cache_free_bulk(req_cachep, *nr, reqs);
 550                io_ring_drop_ctx_refs(ctx, *nr);
 551                *nr = 0;
 552        }
 553}
 554
 555static void io_free_req(struct io_kiocb *req)
 556{
 557        if (req->file && !(req->flags & REQ_F_FIXED_FILE))
 558                fput(req->file);
 559        io_ring_drop_ctx_refs(req->ctx, 1);
 560        kmem_cache_free(req_cachep, req);
 561}
 562
 563static void io_put_req(struct io_kiocb *req)
 564{
 565        if (refcount_dec_and_test(&req->refs))
 566                io_free_req(req);
 567}
 568
 569/*
 570 * Find and free completed poll iocbs
 571 */
 572static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 573                               struct list_head *done)
 574{
 575        void *reqs[IO_IOPOLL_BATCH];
 576        struct io_kiocb *req;
 577        int to_free;
 578
 579        to_free = 0;
 580        while (!list_empty(done)) {
 581                req = list_first_entry(done, struct io_kiocb, list);
 582                list_del(&req->list);
 583
 584                io_cqring_fill_event(ctx, req->user_data, req->error, 0);
 585                (*nr_events)++;
 586
 587                if (refcount_dec_and_test(&req->refs)) {
 588                        /* If we're not using fixed files, we have to pair the
 589                         * completion part with the file put. Use regular
 590                         * completions for those, only batch free for fixed
 591                         * file.
 592                         */
 593                        if (req->flags & REQ_F_FIXED_FILE) {
 594                                reqs[to_free++] = req;
 595                                if (to_free == ARRAY_SIZE(reqs))
 596                                        io_free_req_many(ctx, reqs, &to_free);
 597                        } else {
 598                                io_free_req(req);
 599                        }
 600                }
 601        }
 602
 603        io_commit_cqring(ctx);
 604        io_free_req_many(ctx, reqs, &to_free);
 605}
 606
 607static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
 608                        long min)
 609{
 610        struct io_kiocb *req, *tmp;
 611        LIST_HEAD(done);
 612        bool spin;
 613        int ret;
 614
 615        /*
 616         * Only spin for completions if we don't have multiple devices hanging
 617         * off our complete list, and we're under the requested amount.
 618         */
 619        spin = !ctx->poll_multi_file && *nr_events < min;
 620
 621        ret = 0;
 622        list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
 623                struct kiocb *kiocb = &req->rw;
 624
 625                /*
 626                 * Move completed entries to our local list. If we find a
 627                 * request that requires polling, break out and complete
 628                 * the done list first, if we have entries there.
 629                 */
 630                if (req->flags & REQ_F_IOPOLL_COMPLETED) {
 631                        list_move_tail(&req->list, &done);
 632                        continue;
 633                }
 634                if (!list_empty(&done))
 635                        break;
 636
 637                ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
 638                if (ret < 0)
 639                        break;
 640
 641                if (ret && spin)
 642                        spin = false;
 643                ret = 0;
 644        }
 645
 646        if (!list_empty(&done))
 647                io_iopoll_complete(ctx, nr_events, &done);
 648
 649        return ret;
 650}
 651
 652/*
 653 * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
 654 * non-spinning poll check - we'll still enter the driver poll loop, but only
 655 * as a non-spinning completion check.
 656 */
 657static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
 658                                long min)
 659{
 660        while (!list_empty(&ctx->poll_list)) {
 661                int ret;
 662
 663                ret = io_do_iopoll(ctx, nr_events, min);
 664                if (ret < 0)
 665                        return ret;
 666                if (!min || *nr_events >= min)
 667                        return 0;
 668        }
 669
 670        return 1;
 671}
 672
 673/*
 674 * We can't just wait for polled events to come to us, we have to actively
 675 * find and complete them.
 676 */
 677static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
 678{
 679        if (!(ctx->flags & IORING_SETUP_IOPOLL))
 680                return;
 681
 682        mutex_lock(&ctx->uring_lock);
 683        while (!list_empty(&ctx->poll_list)) {
 684                unsigned int nr_events = 0;
 685
 686                io_iopoll_getevents(ctx, &nr_events, 1);
 687        }
 688        mutex_unlock(&ctx->uring_lock);
 689}
 690
 691static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
 692                           long min)
 693{
 694        int ret = 0;
 695
 696        do {
 697                int tmin = 0;
 698
 699                if (*nr_events < min)
 700                        tmin = min - *nr_events;
 701
 702                ret = io_iopoll_getevents(ctx, nr_events, tmin);
 703                if (ret <= 0)
 704                        break;
 705                ret = 0;
 706        } while (min && !*nr_events && !need_resched());
 707
 708        return ret;
 709}
 710
 711static void kiocb_end_write(struct kiocb *kiocb)
 712{
 713        if (kiocb->ki_flags & IOCB_WRITE) {
 714                struct inode *inode = file_inode(kiocb->ki_filp);
 715
 716                /*
 717                 * Tell lockdep we inherited freeze protection from submission
 718                 * thread.
 719                 */
 720                if (S_ISREG(inode->i_mode))
 721                        __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
 722                file_end_write(kiocb->ki_filp);
 723        }
 724}
 725
 726static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
 727{
 728        struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
 729
 730        kiocb_end_write(kiocb);
 731
 732        io_cqring_add_event(req->ctx, req->user_data, res, 0);
 733        io_put_req(req);
 734}
 735
 736static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
 737{
 738        struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
 739
 740        kiocb_end_write(kiocb);
 741
 742        req->error = res;
 743        if (res != -EAGAIN)
 744                req->flags |= REQ_F_IOPOLL_COMPLETED;
 745}
 746
 747/*
 748 * After the iocb has been issued, it's safe to be found on the poll list.
 749 * Adding the kiocb to the list AFTER submission ensures that we don't
 750 * find it from a io_iopoll_getevents() thread before the issuer is done
 751 * accessing the kiocb cookie.
 752 */
 753static void io_iopoll_req_issued(struct io_kiocb *req)
 754{
 755        struct io_ring_ctx *ctx = req->ctx;
 756
 757        /*
 758         * Track whether we have multiple files in our lists. This will impact
 759         * how we do polling eventually, not spinning if we're on potentially
 760         * different devices.
 761         */
 762        if (list_empty(&ctx->poll_list)) {
 763                ctx->poll_multi_file = false;
 764        } else if (!ctx->poll_multi_file) {
 765                struct io_kiocb *list_req;
 766
 767                list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
 768                                                list);
 769                if (list_req->rw.ki_filp != req->rw.ki_filp)
 770                        ctx->poll_multi_file = true;
 771        }
 772
 773        /*
 774         * For fast devices, IO may have already completed. If it has, add
 775         * it to the front so we find it first.
 776         */
 777        if (req->flags & REQ_F_IOPOLL_COMPLETED)
 778                list_add(&req->list, &ctx->poll_list);
 779        else
 780                list_add_tail(&req->list, &ctx->poll_list);
 781}
 782
 783static void io_file_put(struct io_submit_state *state)
 784{
 785        if (state->file) {
 786                int diff = state->has_refs - state->used_refs;
 787
 788                if (diff)
 789                        fput_many(state->file, diff);
 790                state->file = NULL;
 791        }
 792}
 793
 794/*
 795 * Get as many references to a file as we have IOs left in this submission,
 796 * assuming most submissions are for one file, or at least that each file
 797 * has more than one submission.
 798 */
 799static struct file *io_file_get(struct io_submit_state *state, int fd)
 800{
 801        if (!state)
 802                return fget(fd);
 803
 804        if (state->file) {
 805                if (state->fd == fd) {
 806                        state->used_refs++;
 807                        state->ios_left--;
 808                        return state->file;
 809                }
 810                io_file_put(state);
 811        }
 812        state->file = fget_many(fd, state->ios_left);
 813        if (!state->file)
 814                return NULL;
 815
 816        state->fd = fd;
 817        state->has_refs = state->ios_left;
 818        state->used_refs = 1;
 819        state->ios_left--;
 820        return state->file;
 821}
 822
 823/*
 824 * If we tracked the file through the SCM inflight mechanism, we could support
 825 * any file. For now, just ensure that anything potentially problematic is done
 826 * inline.
 827 */
 828static bool io_file_supports_async(struct file *file)
 829{
 830        umode_t mode = file_inode(file)->i_mode;
 831
 832        if (S_ISBLK(mode) || S_ISCHR(mode))
 833                return true;
 834        if (S_ISREG(mode) && file->f_op != &io_uring_fops)
 835                return true;
 836
 837        return false;
 838}
 839
 840static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
 841                      bool force_nonblock)
 842{
 843        const struct io_uring_sqe *sqe = s->sqe;
 844        struct io_ring_ctx *ctx = req->ctx;
 845        struct kiocb *kiocb = &req->rw;
 846        unsigned ioprio;
 847        int ret;
 848
 849        if (!req->file)
 850                return -EBADF;
 851        /* For -EAGAIN retry, everything is already prepped */
 852        if (req->flags & REQ_F_PREPPED)
 853                return 0;
 854
 855        if (force_nonblock && !io_file_supports_async(req->file))
 856                force_nonblock = false;
 857
 858        kiocb->ki_pos = READ_ONCE(sqe->off);
 859        kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
 860        kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
 861
 862        ioprio = READ_ONCE(sqe->ioprio);
 863        if (ioprio) {
 864                ret = ioprio_check_cap(ioprio);
 865                if (ret)
 866                        return ret;
 867
 868                kiocb->ki_ioprio = ioprio;
 869        } else
 870                kiocb->ki_ioprio = get_current_ioprio();
 871
 872        ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
 873        if (unlikely(ret))
 874                return ret;
 875
 876        /* don't allow async punt if RWF_NOWAIT was requested */
 877        if (kiocb->ki_flags & IOCB_NOWAIT)
 878                req->flags |= REQ_F_NOWAIT;
 879
 880        if (force_nonblock)
 881                kiocb->ki_flags |= IOCB_NOWAIT;
 882
 883        if (ctx->flags & IORING_SETUP_IOPOLL) {
 884                if (!(kiocb->ki_flags & IOCB_DIRECT) ||
 885                    !kiocb->ki_filp->f_op->iopoll)
 886                        return -EOPNOTSUPP;
 887
 888                req->error = 0;
 889                kiocb->ki_flags |= IOCB_HIPRI;
 890                kiocb->ki_complete = io_complete_rw_iopoll;
 891        } else {
 892                if (kiocb->ki_flags & IOCB_HIPRI)
 893                        return -EINVAL;
 894                kiocb->ki_complete = io_complete_rw;
 895        }
 896        req->flags |= REQ_F_PREPPED;
 897        return 0;
 898}
 899
 900static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
 901{
 902        switch (ret) {
 903        case -EIOCBQUEUED:
 904                break;
 905        case -ERESTARTSYS:
 906        case -ERESTARTNOINTR:
 907        case -ERESTARTNOHAND:
 908        case -ERESTART_RESTARTBLOCK:
 909                /*
 910                 * We can't just restart the syscall, since previously
 911                 * submitted sqes may already be in progress. Just fail this
 912                 * IO with EINTR.
 913                 */
 914                ret = -EINTR;
 915                /* fall through */
 916        default:
 917                kiocb->ki_complete(kiocb, ret, 0);
 918        }
 919}
 920
 921static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
 922                           const struct io_uring_sqe *sqe,
 923                           struct iov_iter *iter)
 924{
 925        size_t len = READ_ONCE(sqe->len);
 926        struct io_mapped_ubuf *imu;
 927        unsigned index, buf_index;
 928        size_t offset;
 929        u64 buf_addr;
 930
 931        /* attempt to use fixed buffers without having provided iovecs */
 932        if (unlikely(!ctx->user_bufs))
 933                return -EFAULT;
 934
 935        buf_index = READ_ONCE(sqe->buf_index);
 936        if (unlikely(buf_index >= ctx->nr_user_bufs))
 937                return -EFAULT;
 938
 939        index = array_index_nospec(buf_index, ctx->nr_user_bufs);
 940        imu = &ctx->user_bufs[index];
 941        buf_addr = READ_ONCE(sqe->addr);
 942
 943        /* overflow */
 944        if (buf_addr + len < buf_addr)
 945                return -EFAULT;
 946        /* not inside the mapped region */
 947        if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
 948                return -EFAULT;
 949
 950        /*
 951         * May not be a start of buffer, set size appropriately
 952         * and advance us to the beginning.
 953         */
 954        offset = buf_addr - imu->ubuf;
 955        iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
 956        if (offset)
 957                iov_iter_advance(iter, offset);
 958
 959        /* don't drop a reference to these pages */
 960        iter->type |= ITER_BVEC_FLAG_NO_REF;
 961        return 0;
 962}
 963
 964static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
 965                           const struct sqe_submit *s, struct iovec **iovec,
 966                           struct iov_iter *iter)
 967{
 968        const struct io_uring_sqe *sqe = s->sqe;
 969        void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
 970        size_t sqe_len = READ_ONCE(sqe->len);
 971        u8 opcode;
 972
 973        /*
 974         * We're reading ->opcode for the second time, but the first read
 975         * doesn't care whether it's _FIXED or not, so it doesn't matter
 976         * whether ->opcode changes concurrently. The first read does care
 977         * about whether it is a READ or a WRITE, so we don't trust this read
 978         * for that purpose and instead let the caller pass in the read/write
 979         * flag.
 980         */
 981        opcode = READ_ONCE(sqe->opcode);
 982        if (opcode == IORING_OP_READ_FIXED ||
 983            opcode == IORING_OP_WRITE_FIXED) {
 984                int ret = io_import_fixed(ctx, rw, sqe, iter);
 985                *iovec = NULL;
 986                return ret;
 987        }
 988
 989        if (!s->has_user)
 990                return -EFAULT;
 991
 992#ifdef CONFIG_COMPAT
 993        if (ctx->compat)
 994                return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
 995                                                iovec, iter);
 996#endif
 997
 998        return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
 999}
1000
1001/*
1002 * Make a note of the last file/offset/direction we punted to async
1003 * context. We'll use this information to see if we can piggy back a
1004 * sequential request onto the previous one, if it's still hasn't been
1005 * completed by the async worker.
1006 */
1007static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1008{
1009        struct async_list *async_list = &req->ctx->pending_async[rw];
1010        struct kiocb *kiocb = &req->rw;
1011        struct file *filp = kiocb->ki_filp;
1012        off_t io_end = kiocb->ki_pos + len;
1013
1014        if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1015                unsigned long max_pages;
1016
1017                /* Use 8x RA size as a decent limiter for both reads/writes */
1018                max_pages = filp->f_ra.ra_pages;
1019                if (!max_pages)
1020                        max_pages = VM_READAHEAD_PAGES;
1021                max_pages *= 8;
1022
1023                /* If max pages are exceeded, reset the state */
1024                len >>= PAGE_SHIFT;
1025                if (async_list->io_pages + len <= max_pages) {
1026                        req->flags |= REQ_F_SEQ_PREV;
1027                        async_list->io_pages += len;
1028                } else {
1029                        io_end = 0;
1030                        async_list->io_pages = 0;
1031                }
1032        }
1033
1034        /* New file? Reset state. */
1035        if (async_list->file != filp) {
1036                async_list->io_pages = 0;
1037                async_list->file = filp;
1038        }
1039        async_list->io_end = io_end;
1040}
1041
1042static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1043                   bool force_nonblock)
1044{
1045        struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1046        struct kiocb *kiocb = &req->rw;
1047        struct iov_iter iter;
1048        struct file *file;
1049        size_t iov_count;
1050        int ret;
1051
1052        ret = io_prep_rw(req, s, force_nonblock);
1053        if (ret)
1054                return ret;
1055        file = kiocb->ki_filp;
1056
1057        if (unlikely(!(file->f_mode & FMODE_READ)))
1058                return -EBADF;
1059        if (unlikely(!file->f_op->read_iter))
1060                return -EINVAL;
1061
1062        ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1063        if (ret)
1064                return ret;
1065
1066        iov_count = iov_iter_count(&iter);
1067        ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1068        if (!ret) {
1069                ssize_t ret2;
1070
1071                /* Catch -EAGAIN return for forced non-blocking submission */
1072                ret2 = call_read_iter(file, kiocb, &iter);
1073                if (!force_nonblock || ret2 != -EAGAIN) {
1074                        io_rw_done(kiocb, ret2);
1075                } else {
1076                        /*
1077                         * If ->needs_lock is true, we're already in async
1078                         * context.
1079                         */
1080                        if (!s->needs_lock)
1081                                io_async_list_note(READ, req, iov_count);
1082                        ret = -EAGAIN;
1083                }
1084        }
1085        kfree(iovec);
1086        return ret;
1087}
1088
1089static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1090                    bool force_nonblock)
1091{
1092        struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1093        struct kiocb *kiocb = &req->rw;
1094        struct iov_iter iter;
1095        struct file *file;
1096        size_t iov_count;
1097        int ret;
1098
1099        ret = io_prep_rw(req, s, force_nonblock);
1100        if (ret)
1101                return ret;
1102
1103        file = kiocb->ki_filp;
1104        if (unlikely(!(file->f_mode & FMODE_WRITE)))
1105                return -EBADF;
1106        if (unlikely(!file->f_op->write_iter))
1107                return -EINVAL;
1108
1109        ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1110        if (ret)
1111                return ret;
1112
1113        iov_count = iov_iter_count(&iter);
1114
1115        ret = -EAGAIN;
1116        if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1117                /* If ->needs_lock is true, we're already in async context. */
1118                if (!s->needs_lock)
1119                        io_async_list_note(WRITE, req, iov_count);
1120                goto out_free;
1121        }
1122
1123        ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1124        if (!ret) {
1125                ssize_t ret2;
1126
1127                /*
1128                 * Open-code file_start_write here to grab freeze protection,
1129                 * which will be released by another thread in
1130                 * io_complete_rw().  Fool lockdep by telling it the lock got
1131                 * released so that it doesn't complain about the held lock when
1132                 * we return to userspace.
1133                 */
1134                if (S_ISREG(file_inode(file)->i_mode)) {
1135                        __sb_start_write(file_inode(file)->i_sb,
1136                                                SB_FREEZE_WRITE, true);
1137                        __sb_writers_release(file_inode(file)->i_sb,
1138                                                SB_FREEZE_WRITE);
1139                }
1140                kiocb->ki_flags |= IOCB_WRITE;
1141
1142                ret2 = call_write_iter(file, kiocb, &iter);
1143                if (!force_nonblock || ret2 != -EAGAIN) {
1144                        io_rw_done(kiocb, ret2);
1145                } else {
1146                        /*
1147                         * If ->needs_lock is true, we're already in async
1148                         * context.
1149                         */
1150                        if (!s->needs_lock)
1151                                io_async_list_note(WRITE, req, iov_count);
1152                        ret = -EAGAIN;
1153                }
1154        }
1155out_free:
1156        kfree(iovec);
1157        return ret;
1158}
1159
1160/*
1161 * IORING_OP_NOP just posts a completion event, nothing else.
1162 */
1163static int io_nop(struct io_kiocb *req, u64 user_data)
1164{
1165        struct io_ring_ctx *ctx = req->ctx;
1166        long err = 0;
1167
1168        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1169                return -EINVAL;
1170
1171        io_cqring_add_event(ctx, user_data, err, 0);
1172        io_put_req(req);
1173        return 0;
1174}
1175
1176static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1177{
1178        struct io_ring_ctx *ctx = req->ctx;
1179
1180        if (!req->file)
1181                return -EBADF;
1182        /* Prep already done (EAGAIN retry) */
1183        if (req->flags & REQ_F_PREPPED)
1184                return 0;
1185
1186        if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1187                return -EINVAL;
1188        if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1189                return -EINVAL;
1190
1191        req->flags |= REQ_F_PREPPED;
1192        return 0;
1193}
1194
1195static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1196                    bool force_nonblock)
1197{
1198        loff_t sqe_off = READ_ONCE(sqe->off);
1199        loff_t sqe_len = READ_ONCE(sqe->len);
1200        loff_t end = sqe_off + sqe_len;
1201        unsigned fsync_flags;
1202        int ret;
1203
1204        fsync_flags = READ_ONCE(sqe->fsync_flags);
1205        if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1206                return -EINVAL;
1207
1208        ret = io_prep_fsync(req, sqe);
1209        if (ret)
1210                return ret;
1211
1212        /* fsync always requires a blocking context */
1213        if (force_nonblock)
1214                return -EAGAIN;
1215
1216        ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1217                                end > 0 ? end : LLONG_MAX,
1218                                fsync_flags & IORING_FSYNC_DATASYNC);
1219
1220        io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1221        io_put_req(req);
1222        return 0;
1223}
1224
1225static void io_poll_remove_one(struct io_kiocb *req)
1226{
1227        struct io_poll_iocb *poll = &req->poll;
1228
1229        spin_lock(&poll->head->lock);
1230        WRITE_ONCE(poll->canceled, true);
1231        if (!list_empty(&poll->wait.entry)) {
1232                list_del_init(&poll->wait.entry);
1233                queue_work(req->ctx->sqo_wq, &req->work);
1234        }
1235        spin_unlock(&poll->head->lock);
1236
1237        list_del_init(&req->list);
1238}
1239
1240static void io_poll_remove_all(struct io_ring_ctx *ctx)
1241{
1242        struct io_kiocb *req;
1243
1244        spin_lock_irq(&ctx->completion_lock);
1245        while (!list_empty(&ctx->cancel_list)) {
1246                req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1247                io_poll_remove_one(req);
1248        }
1249        spin_unlock_irq(&ctx->completion_lock);
1250}
1251
1252/*
1253 * Find a running poll command that matches one specified in sqe->addr,
1254 * and remove it if found.
1255 */
1256static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1257{
1258        struct io_ring_ctx *ctx = req->ctx;
1259        struct io_kiocb *poll_req, *next;
1260        int ret = -ENOENT;
1261
1262        if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1263                return -EINVAL;
1264        if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1265            sqe->poll_events)
1266                return -EINVAL;
1267
1268        spin_lock_irq(&ctx->completion_lock);
1269        list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1270                if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1271                        io_poll_remove_one(poll_req);
1272                        ret = 0;
1273                        break;
1274                }
1275        }
1276        spin_unlock_irq(&ctx->completion_lock);
1277
1278        io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1279        io_put_req(req);
1280        return 0;
1281}
1282
1283static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1284                             __poll_t mask)
1285{
1286        req->poll.done = true;
1287        io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask), 0);
1288        io_commit_cqring(ctx);
1289}
1290
1291static void io_poll_complete_work(struct work_struct *work)
1292{
1293        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1294        struct io_poll_iocb *poll = &req->poll;
1295        struct poll_table_struct pt = { ._key = poll->events };
1296        struct io_ring_ctx *ctx = req->ctx;
1297        __poll_t mask = 0;
1298
1299        if (!READ_ONCE(poll->canceled))
1300                mask = vfs_poll(poll->file, &pt) & poll->events;
1301
1302        /*
1303         * Note that ->ki_cancel callers also delete iocb from active_reqs after
1304         * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1305         * synchronize with them.  In the cancellation case the list_del_init
1306         * itself is not actually needed, but harmless so we keep it in to
1307         * avoid further branches in the fast path.
1308         */
1309        spin_lock_irq(&ctx->completion_lock);
1310        if (!mask && !READ_ONCE(poll->canceled)) {
1311                add_wait_queue(poll->head, &poll->wait);
1312                spin_unlock_irq(&ctx->completion_lock);
1313                return;
1314        }
1315        list_del_init(&req->list);
1316        io_poll_complete(ctx, req, mask);
1317        spin_unlock_irq(&ctx->completion_lock);
1318
1319        io_cqring_ev_posted(ctx);
1320        io_put_req(req);
1321}
1322
1323static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1324                        void *key)
1325{
1326        struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1327                                                        wait);
1328        struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1329        struct io_ring_ctx *ctx = req->ctx;
1330        __poll_t mask = key_to_poll(key);
1331        unsigned long flags;
1332
1333        /* for instances that support it check for an event match first: */
1334        if (mask && !(mask & poll->events))
1335                return 0;
1336
1337        list_del_init(&poll->wait.entry);
1338
1339        if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1340                list_del(&req->list);
1341                io_poll_complete(ctx, req, mask);
1342                spin_unlock_irqrestore(&ctx->completion_lock, flags);
1343
1344                io_cqring_ev_posted(ctx);
1345                io_put_req(req);
1346        } else {
1347                queue_work(ctx->sqo_wq, &req->work);
1348        }
1349
1350        return 1;
1351}
1352
1353struct io_poll_table {
1354        struct poll_table_struct pt;
1355        struct io_kiocb *req;
1356        int error;
1357};
1358
1359static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1360                               struct poll_table_struct *p)
1361{
1362        struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1363
1364        if (unlikely(pt->req->poll.head)) {
1365                pt->error = -EINVAL;
1366                return;
1367        }
1368
1369        pt->error = 0;
1370        pt->req->poll.head = head;
1371        add_wait_queue(head, &pt->req->poll.wait);
1372}
1373
1374static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1375{
1376        struct io_poll_iocb *poll = &req->poll;
1377        struct io_ring_ctx *ctx = req->ctx;
1378        struct io_poll_table ipt;
1379        bool cancel = false;
1380        __poll_t mask;
1381        u16 events;
1382
1383        if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1384                return -EINVAL;
1385        if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1386                return -EINVAL;
1387        if (!poll->file)
1388                return -EBADF;
1389
1390        INIT_WORK(&req->work, io_poll_complete_work);
1391        events = READ_ONCE(sqe->poll_events);
1392        poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1393
1394        poll->head = NULL;
1395        poll->done = false;
1396        poll->canceled = false;
1397
1398        ipt.pt._qproc = io_poll_queue_proc;
1399        ipt.pt._key = poll->events;
1400        ipt.req = req;
1401        ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1402
1403        /* initialized the list so that we can do list_empty checks */
1404        INIT_LIST_HEAD(&poll->wait.entry);
1405        init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1406
1407        mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1408
1409        spin_lock_irq(&ctx->completion_lock);
1410        if (likely(poll->head)) {
1411                spin_lock(&poll->head->lock);
1412                if (unlikely(list_empty(&poll->wait.entry))) {
1413                        if (ipt.error)
1414                                cancel = true;
1415                        ipt.error = 0;
1416                        mask = 0;
1417                }
1418                if (mask || ipt.error)
1419                        list_del_init(&poll->wait.entry);
1420                else if (cancel)
1421                        WRITE_ONCE(poll->canceled, true);
1422                else if (!poll->done) /* actually waiting for an event */
1423                        list_add_tail(&req->list, &ctx->cancel_list);
1424                spin_unlock(&poll->head->lock);
1425        }
1426        if (mask) { /* no async, we'd stolen it */
1427                req->error = mangle_poll(mask);
1428                ipt.error = 0;
1429                io_poll_complete(ctx, req, mask);
1430        }
1431        spin_unlock_irq(&ctx->completion_lock);
1432
1433        if (mask) {
1434                io_cqring_ev_posted(ctx);
1435                io_put_req(req);
1436        }
1437        return ipt.error;
1438}
1439
1440static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1441                           const struct sqe_submit *s, bool force_nonblock)
1442{
1443        int ret, opcode;
1444
1445        if (unlikely(s->index >= ctx->sq_entries))
1446                return -EINVAL;
1447        req->user_data = READ_ONCE(s->sqe->user_data);
1448
1449        opcode = READ_ONCE(s->sqe->opcode);
1450        switch (opcode) {
1451        case IORING_OP_NOP:
1452                ret = io_nop(req, req->user_data);
1453                break;
1454        case IORING_OP_READV:
1455                if (unlikely(s->sqe->buf_index))
1456                        return -EINVAL;
1457                ret = io_read(req, s, force_nonblock);
1458                break;
1459        case IORING_OP_WRITEV:
1460                if (unlikely(s->sqe->buf_index))
1461                        return -EINVAL;
1462                ret = io_write(req, s, force_nonblock);
1463                break;
1464        case IORING_OP_READ_FIXED:
1465                ret = io_read(req, s, force_nonblock);
1466                break;
1467        case IORING_OP_WRITE_FIXED:
1468                ret = io_write(req, s, force_nonblock);
1469                break;
1470        case IORING_OP_FSYNC:
1471                ret = io_fsync(req, s->sqe, force_nonblock);
1472                break;
1473        case IORING_OP_POLL_ADD:
1474                ret = io_poll_add(req, s->sqe);
1475                break;
1476        case IORING_OP_POLL_REMOVE:
1477                ret = io_poll_remove(req, s->sqe);
1478                break;
1479        default:
1480                ret = -EINVAL;
1481                break;
1482        }
1483
1484        if (ret)
1485                return ret;
1486
1487        if (ctx->flags & IORING_SETUP_IOPOLL) {
1488                if (req->error == -EAGAIN)
1489                        return -EAGAIN;
1490
1491                /* workqueue context doesn't hold uring_lock, grab it now */
1492                if (s->needs_lock)
1493                        mutex_lock(&ctx->uring_lock);
1494                io_iopoll_req_issued(req);
1495                if (s->needs_lock)
1496                        mutex_unlock(&ctx->uring_lock);
1497        }
1498
1499        return 0;
1500}
1501
1502static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1503                                                 const struct io_uring_sqe *sqe)
1504{
1505        switch (sqe->opcode) {
1506        case IORING_OP_READV:
1507        case IORING_OP_READ_FIXED:
1508                return &ctx->pending_async[READ];
1509        case IORING_OP_WRITEV:
1510        case IORING_OP_WRITE_FIXED:
1511                return &ctx->pending_async[WRITE];
1512        default:
1513                return NULL;
1514        }
1515}
1516
1517static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1518{
1519        u8 opcode = READ_ONCE(sqe->opcode);
1520
1521        return !(opcode == IORING_OP_READ_FIXED ||
1522                 opcode == IORING_OP_WRITE_FIXED);
1523}
1524
1525static void io_sq_wq_submit_work(struct work_struct *work)
1526{
1527        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1528        struct io_ring_ctx *ctx = req->ctx;
1529        struct mm_struct *cur_mm = NULL;
1530        struct async_list *async_list;
1531        LIST_HEAD(req_list);
1532        mm_segment_t old_fs;
1533        int ret;
1534
1535        async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1536restart:
1537        do {
1538                struct sqe_submit *s = &req->submit;
1539                const struct io_uring_sqe *sqe = s->sqe;
1540
1541                /* Ensure we clear previously set non-block flag */
1542                req->rw.ki_flags &= ~IOCB_NOWAIT;
1543
1544                ret = 0;
1545                if (io_sqe_needs_user(sqe) && !cur_mm) {
1546                        if (!mmget_not_zero(ctx->sqo_mm)) {
1547                                ret = -EFAULT;
1548                        } else {
1549                                cur_mm = ctx->sqo_mm;
1550                                use_mm(cur_mm);
1551                                old_fs = get_fs();
1552                                set_fs(USER_DS);
1553                        }
1554                }
1555
1556                if (!ret) {
1557                        s->has_user = cur_mm != NULL;
1558                        s->needs_lock = true;
1559                        do {
1560                                ret = __io_submit_sqe(ctx, req, s, false);
1561                                /*
1562                                 * We can get EAGAIN for polled IO even though
1563                                 * we're forcing a sync submission from here,
1564                                 * since we can't wait for request slots on the
1565                                 * block side.
1566                                 */
1567                                if (ret != -EAGAIN)
1568                                        break;
1569                                cond_resched();
1570                        } while (1);
1571                }
1572
1573                /* drop submission reference */
1574                io_put_req(req);
1575
1576                if (ret) {
1577                        io_cqring_add_event(ctx, sqe->user_data, ret, 0);
1578                        io_put_req(req);
1579                }
1580
1581                /* async context always use a copy of the sqe */
1582                kfree(sqe);
1583
1584                if (!async_list)
1585                        break;
1586                if (!list_empty(&req_list)) {
1587                        req = list_first_entry(&req_list, struct io_kiocb,
1588                                                list);
1589                        list_del(&req->list);
1590                        continue;
1591                }
1592                if (list_empty(&async_list->list))
1593                        break;
1594
1595                req = NULL;
1596                spin_lock(&async_list->lock);
1597                if (list_empty(&async_list->list)) {
1598                        spin_unlock(&async_list->lock);
1599                        break;
1600                }
1601                list_splice_init(&async_list->list, &req_list);
1602                spin_unlock(&async_list->lock);
1603
1604                req = list_first_entry(&req_list, struct io_kiocb, list);
1605                list_del(&req->list);
1606        } while (req);
1607
1608        /*
1609         * Rare case of racing with a submitter. If we find the count has
1610         * dropped to zero AND we have pending work items, then restart
1611         * the processing. This is a tiny race window.
1612         */
1613        if (async_list) {
1614                ret = atomic_dec_return(&async_list->cnt);
1615                while (!ret && !list_empty(&async_list->list)) {
1616                        spin_lock(&async_list->lock);
1617                        atomic_inc(&async_list->cnt);
1618                        list_splice_init(&async_list->list, &req_list);
1619                        spin_unlock(&async_list->lock);
1620
1621                        if (!list_empty(&req_list)) {
1622                                req = list_first_entry(&req_list,
1623                                                        struct io_kiocb, list);
1624                                list_del(&req->list);
1625                                goto restart;
1626                        }
1627                        ret = atomic_dec_return(&async_list->cnt);
1628                }
1629        }
1630
1631        if (cur_mm) {
1632                set_fs(old_fs);
1633                unuse_mm(cur_mm);
1634                mmput(cur_mm);
1635        }
1636}
1637
1638/*
1639 * See if we can piggy back onto previously submitted work, that is still
1640 * running. We currently only allow this if the new request is sequential
1641 * to the previous one we punted.
1642 */
1643static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1644{
1645        bool ret = false;
1646
1647        if (!list)
1648                return false;
1649        if (!(req->flags & REQ_F_SEQ_PREV))
1650                return false;
1651        if (!atomic_read(&list->cnt))
1652                return false;
1653
1654        ret = true;
1655        spin_lock(&list->lock);
1656        list_add_tail(&req->list, &list->list);
1657        if (!atomic_read(&list->cnt)) {
1658                list_del_init(&req->list);
1659                ret = false;
1660        }
1661        spin_unlock(&list->lock);
1662        return ret;
1663}
1664
1665static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1666{
1667        int op = READ_ONCE(sqe->opcode);
1668
1669        switch (op) {
1670        case IORING_OP_NOP:
1671        case IORING_OP_POLL_REMOVE:
1672                return false;
1673        default:
1674                return true;
1675        }
1676}
1677
1678static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1679                           struct io_submit_state *state, struct io_kiocb *req)
1680{
1681        unsigned flags;
1682        int fd;
1683
1684        flags = READ_ONCE(s->sqe->flags);
1685        fd = READ_ONCE(s->sqe->fd);
1686
1687        if (!io_op_needs_file(s->sqe)) {
1688                req->file = NULL;
1689                return 0;
1690        }
1691
1692        if (flags & IOSQE_FIXED_FILE) {
1693                if (unlikely(!ctx->user_files ||
1694                    (unsigned) fd >= ctx->nr_user_files))
1695                        return -EBADF;
1696                req->file = ctx->user_files[fd];
1697                req->flags |= REQ_F_FIXED_FILE;
1698        } else {
1699                if (s->needs_fixed_file)
1700                        return -EBADF;
1701                req->file = io_file_get(state, fd);
1702                if (unlikely(!req->file))
1703                        return -EBADF;
1704        }
1705
1706        return 0;
1707}
1708
1709static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1710                         struct io_submit_state *state)
1711{
1712        struct io_kiocb *req;
1713        int ret;
1714
1715        /* enforce forwards compatibility on users */
1716        if (unlikely(s->sqe->flags & ~IOSQE_FIXED_FILE))
1717                return -EINVAL;
1718
1719        req = io_get_req(ctx, state);
1720        if (unlikely(!req))
1721                return -EAGAIN;
1722
1723        ret = io_req_set_file(ctx, s, state, req);
1724        if (unlikely(ret))
1725                goto out;
1726
1727        ret = __io_submit_sqe(ctx, req, s, true);
1728        if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1729                struct io_uring_sqe *sqe_copy;
1730
1731                sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1732                if (sqe_copy) {
1733                        struct async_list *list;
1734
1735                        memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1736                        s->sqe = sqe_copy;
1737
1738                        memcpy(&req->submit, s, sizeof(*s));
1739                        list = io_async_list_from_sqe(ctx, s->sqe);
1740                        if (!io_add_to_prev_work(list, req)) {
1741                                if (list)
1742                                        atomic_inc(&list->cnt);
1743                                INIT_WORK(&req->work, io_sq_wq_submit_work);
1744                                queue_work(ctx->sqo_wq, &req->work);
1745                        }
1746
1747                        /*
1748                         * Queued up for async execution, worker will release
1749                         * submit reference when the iocb is actually
1750                         * submitted.
1751                         */
1752                        return 0;
1753                }
1754        }
1755
1756out:
1757        /* drop submission reference */
1758        io_put_req(req);
1759
1760        /* and drop final reference, if we failed */
1761        if (ret)
1762                io_put_req(req);
1763
1764        return ret;
1765}
1766
1767/*
1768 * Batched submission is done, ensure local IO is flushed out.
1769 */
1770static void io_submit_state_end(struct io_submit_state *state)
1771{
1772        blk_finish_plug(&state->plug);
1773        io_file_put(state);
1774        if (state->free_reqs)
1775                kmem_cache_free_bulk(req_cachep, state->free_reqs,
1776                                        &state->reqs[state->cur_req]);
1777}
1778
1779/*
1780 * Start submission side cache.
1781 */
1782static void io_submit_state_start(struct io_submit_state *state,
1783                                  struct io_ring_ctx *ctx, unsigned max_ios)
1784{
1785        blk_start_plug(&state->plug);
1786        state->free_reqs = 0;
1787        state->file = NULL;
1788        state->ios_left = max_ios;
1789}
1790
1791static void io_commit_sqring(struct io_ring_ctx *ctx)
1792{
1793        struct io_sq_ring *ring = ctx->sq_ring;
1794
1795        if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1796                /*
1797                 * Ensure any loads from the SQEs are done at this point,
1798                 * since once we write the new head, the application could
1799                 * write new data to them.
1800                 */
1801                smp_store_release(&ring->r.head, ctx->cached_sq_head);
1802        }
1803}
1804
1805/*
1806 * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1807 * that is mapped by userspace. This means that care needs to be taken to
1808 * ensure that reads are stable, as we cannot rely on userspace always
1809 * being a good citizen. If members of the sqe are validated and then later
1810 * used, it's important that those reads are done through READ_ONCE() to
1811 * prevent a re-load down the line.
1812 */
1813static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1814{
1815        struct io_sq_ring *ring = ctx->sq_ring;
1816        unsigned head;
1817
1818        /*
1819         * The cached sq head (or cq tail) serves two purposes:
1820         *
1821         * 1) allows us to batch the cost of updating the user visible
1822         *    head updates.
1823         * 2) allows the kernel side to track the head on its own, even
1824         *    though the application is the one updating it.
1825         */
1826        head = ctx->cached_sq_head;
1827        /* make sure SQ entry isn't read before tail */
1828        if (head == smp_load_acquire(&ring->r.tail))
1829                return false;
1830
1831        head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1832        if (head < ctx->sq_entries) {
1833                s->index = head;
1834                s->sqe = &ctx->sq_sqes[head];
1835                ctx->cached_sq_head++;
1836                return true;
1837        }
1838
1839        /* drop invalid entries */
1840        ctx->cached_sq_head++;
1841        ring->dropped++;
1842        return false;
1843}
1844
1845static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1846                          unsigned int nr, bool has_user, bool mm_fault)
1847{
1848        struct io_submit_state state, *statep = NULL;
1849        int ret, i, submitted = 0;
1850
1851        if (nr > IO_PLUG_THRESHOLD) {
1852                io_submit_state_start(&state, ctx, nr);
1853                statep = &state;
1854        }
1855
1856        for (i = 0; i < nr; i++) {
1857                if (unlikely(mm_fault)) {
1858                        ret = -EFAULT;
1859                } else {
1860                        sqes[i].has_user = has_user;
1861                        sqes[i].needs_lock = true;
1862                        sqes[i].needs_fixed_file = true;
1863                        ret = io_submit_sqe(ctx, &sqes[i], statep);
1864                }
1865                if (!ret) {
1866                        submitted++;
1867                        continue;
1868                }
1869
1870                io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
1871        }
1872
1873        if (statep)
1874                io_submit_state_end(&state);
1875
1876        return submitted;
1877}
1878
1879static int io_sq_thread(void *data)
1880{
1881        struct sqe_submit sqes[IO_IOPOLL_BATCH];
1882        struct io_ring_ctx *ctx = data;
1883        struct mm_struct *cur_mm = NULL;
1884        mm_segment_t old_fs;
1885        DEFINE_WAIT(wait);
1886        unsigned inflight;
1887        unsigned long timeout;
1888
1889        old_fs = get_fs();
1890        set_fs(USER_DS);
1891
1892        timeout = inflight = 0;
1893        while (!kthread_should_stop() && !ctx->sqo_stop) {
1894                bool all_fixed, mm_fault = false;
1895                int i;
1896
1897                if (inflight) {
1898                        unsigned nr_events = 0;
1899
1900                        if (ctx->flags & IORING_SETUP_IOPOLL) {
1901                                /*
1902                                 * We disallow the app entering submit/complete
1903                                 * with polling, but we still need to lock the
1904                                 * ring to prevent racing with polled issue
1905                                 * that got punted to a workqueue.
1906                                 */
1907                                mutex_lock(&ctx->uring_lock);
1908                                io_iopoll_check(ctx, &nr_events, 0);
1909                                mutex_unlock(&ctx->uring_lock);
1910                        } else {
1911                                /*
1912                                 * Normal IO, just pretend everything completed.
1913                                 * We don't have to poll completions for that.
1914                                 */
1915                                nr_events = inflight;
1916                        }
1917
1918                        inflight -= nr_events;
1919                        if (!inflight)
1920                                timeout = jiffies + ctx->sq_thread_idle;
1921                }
1922
1923                if (!io_get_sqring(ctx, &sqes[0])) {
1924                        /*
1925                         * We're polling. If we're within the defined idle
1926                         * period, then let us spin without work before going
1927                         * to sleep.
1928                         */
1929                        if (inflight || !time_after(jiffies, timeout)) {
1930                                cpu_relax();
1931                                continue;
1932                        }
1933
1934                        /*
1935                         * Drop cur_mm before scheduling, we can't hold it for
1936                         * long periods (or over schedule()). Do this before
1937                         * adding ourselves to the waitqueue, as the unuse/drop
1938                         * may sleep.
1939                         */
1940                        if (cur_mm) {
1941                                unuse_mm(cur_mm);
1942                                mmput(cur_mm);
1943                                cur_mm = NULL;
1944                        }
1945
1946                        prepare_to_wait(&ctx->sqo_wait, &wait,
1947                                                TASK_INTERRUPTIBLE);
1948
1949                        /* Tell userspace we may need a wakeup call */
1950                        ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
1951                        /* make sure to read SQ tail after writing flags */
1952                        smp_mb();
1953
1954                        if (!io_get_sqring(ctx, &sqes[0])) {
1955                                if (kthread_should_stop()) {
1956                                        finish_wait(&ctx->sqo_wait, &wait);
1957                                        break;
1958                                }
1959                                if (signal_pending(current))
1960                                        flush_signals(current);
1961                                schedule();
1962                                finish_wait(&ctx->sqo_wait, &wait);
1963
1964                                ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1965                                continue;
1966                        }
1967                        finish_wait(&ctx->sqo_wait, &wait);
1968
1969                        ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1970                }
1971
1972                i = 0;
1973                all_fixed = true;
1974                do {
1975                        if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
1976                                all_fixed = false;
1977
1978                        i++;
1979                        if (i == ARRAY_SIZE(sqes))
1980                                break;
1981                } while (io_get_sqring(ctx, &sqes[i]));
1982
1983                /* Unless all new commands are FIXED regions, grab mm */
1984                if (!all_fixed && !cur_mm) {
1985                        mm_fault = !mmget_not_zero(ctx->sqo_mm);
1986                        if (!mm_fault) {
1987                                use_mm(ctx->sqo_mm);
1988                                cur_mm = ctx->sqo_mm;
1989                        }
1990                }
1991
1992                inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
1993                                                mm_fault);
1994
1995                /* Commit SQ ring head once we've consumed all SQEs */
1996                io_commit_sqring(ctx);
1997        }
1998
1999        set_fs(old_fs);
2000        if (cur_mm) {
2001                unuse_mm(cur_mm);
2002                mmput(cur_mm);
2003        }
2004
2005        if (kthread_should_park())
2006                kthread_parkme();
2007
2008        return 0;
2009}
2010
2011static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2012{
2013        struct io_submit_state state, *statep = NULL;
2014        int i, submit = 0;
2015
2016        if (to_submit > IO_PLUG_THRESHOLD) {
2017                io_submit_state_start(&state, ctx, to_submit);
2018                statep = &state;
2019        }
2020
2021        for (i = 0; i < to_submit; i++) {
2022                struct sqe_submit s;
2023                int ret;
2024
2025                if (!io_get_sqring(ctx, &s))
2026                        break;
2027
2028                s.has_user = true;
2029                s.needs_lock = false;
2030                s.needs_fixed_file = false;
2031                submit++;
2032
2033                ret = io_submit_sqe(ctx, &s, statep);
2034                if (ret)
2035                        io_cqring_add_event(ctx, s.sqe->user_data, ret, 0);
2036        }
2037        io_commit_sqring(ctx);
2038
2039        if (statep)
2040                io_submit_state_end(statep);
2041
2042        return submit;
2043}
2044
2045static unsigned io_cqring_events(struct io_cq_ring *ring)
2046{
2047        return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2048}
2049
2050/*
2051 * Wait until events become available, if we don't already have some. The
2052 * application must reap them itself, as they reside on the shared cq ring.
2053 */
2054static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2055                          const sigset_t __user *sig, size_t sigsz)
2056{
2057        struct io_cq_ring *ring = ctx->cq_ring;
2058        sigset_t ksigmask, sigsaved;
2059        DEFINE_WAIT(wait);
2060        int ret;
2061
2062        /* See comment at the top of this file */
2063        smp_rmb();
2064        if (io_cqring_events(ring) >= min_events)
2065                return 0;
2066
2067        if (sig) {
2068#ifdef CONFIG_COMPAT
2069                if (in_compat_syscall())
2070                        ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2071                                                      &ksigmask, &sigsaved, sigsz);
2072                else
2073#endif
2074                        ret = set_user_sigmask(sig, &ksigmask,
2075                                               &sigsaved, sigsz);
2076
2077                if (ret)
2078                        return ret;
2079        }
2080
2081        do {
2082                prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
2083
2084                ret = 0;
2085                /* See comment at the top of this file */
2086                smp_rmb();
2087                if (io_cqring_events(ring) >= min_events)
2088                        break;
2089
2090                schedule();
2091
2092                ret = -EINTR;
2093                if (signal_pending(current))
2094                        break;
2095        } while (1);
2096
2097        finish_wait(&ctx->wait, &wait);
2098
2099        if (sig)
2100                restore_user_sigmask(sig, &sigsaved);
2101
2102        return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2103}
2104
2105static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2106{
2107#if defined(CONFIG_UNIX)
2108        if (ctx->ring_sock) {
2109                struct sock *sock = ctx->ring_sock->sk;
2110                struct sk_buff *skb;
2111
2112                while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2113                        kfree_skb(skb);
2114        }
2115#else
2116        int i;
2117
2118        for (i = 0; i < ctx->nr_user_files; i++)
2119                fput(ctx->user_files[i]);
2120#endif
2121}
2122
2123static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2124{
2125        if (!ctx->user_files)
2126                return -ENXIO;
2127
2128        __io_sqe_files_unregister(ctx);
2129        kfree(ctx->user_files);
2130        ctx->user_files = NULL;
2131        ctx->nr_user_files = 0;
2132        return 0;
2133}
2134
2135static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2136{
2137        if (ctx->sqo_thread) {
2138                ctx->sqo_stop = 1;
2139                mb();
2140                kthread_park(ctx->sqo_thread);
2141                kthread_stop(ctx->sqo_thread);
2142                ctx->sqo_thread = NULL;
2143        }
2144}
2145
2146static void io_finish_async(struct io_ring_ctx *ctx)
2147{
2148        io_sq_thread_stop(ctx);
2149
2150        if (ctx->sqo_wq) {
2151                destroy_workqueue(ctx->sqo_wq);
2152                ctx->sqo_wq = NULL;
2153        }
2154}
2155
2156#if defined(CONFIG_UNIX)
2157static void io_destruct_skb(struct sk_buff *skb)
2158{
2159        struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2160
2161        io_finish_async(ctx);
2162        unix_destruct_scm(skb);
2163}
2164
2165/*
2166 * Ensure the UNIX gc is aware of our file set, so we are certain that
2167 * the io_uring can be safely unregistered on process exit, even if we have
2168 * loops in the file referencing.
2169 */
2170static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2171{
2172        struct sock *sk = ctx->ring_sock->sk;
2173        struct scm_fp_list *fpl;
2174        struct sk_buff *skb;
2175        int i;
2176
2177        if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2178                unsigned long inflight = ctx->user->unix_inflight + nr;
2179
2180                if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2181                        return -EMFILE;
2182        }
2183
2184        fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2185        if (!fpl)
2186                return -ENOMEM;
2187
2188        skb = alloc_skb(0, GFP_KERNEL);
2189        if (!skb) {
2190                kfree(fpl);
2191                return -ENOMEM;
2192        }
2193
2194        skb->sk = sk;
2195        skb->destructor = io_destruct_skb;
2196
2197        fpl->user = get_uid(ctx->user);
2198        for (i = 0; i < nr; i++) {
2199                fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2200                unix_inflight(fpl->user, fpl->fp[i]);
2201        }
2202
2203        fpl->max = fpl->count = nr;
2204        UNIXCB(skb).fp = fpl;
2205        refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2206        skb_queue_head(&sk->sk_receive_queue, skb);
2207
2208        for (i = 0; i < nr; i++)
2209                fput(fpl->fp[i]);
2210
2211        return 0;
2212}
2213
2214/*
2215 * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2216 * causes regular reference counting to break down. We rely on the UNIX
2217 * garbage collection to take care of this problem for us.
2218 */
2219static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2220{
2221        unsigned left, total;
2222        int ret = 0;
2223
2224        total = 0;
2225        left = ctx->nr_user_files;
2226        while (left) {
2227                unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2228                int ret;
2229
2230                ret = __io_sqe_files_scm(ctx, this_files, total);
2231                if (ret)
2232                        break;
2233                left -= this_files;
2234                total += this_files;
2235        }
2236
2237        if (!ret)
2238                return 0;
2239
2240        while (total < ctx->nr_user_files) {
2241                fput(ctx->user_files[total]);
2242                total++;
2243        }
2244
2245        return ret;
2246}
2247#else
2248static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2249{
2250        return 0;
2251}
2252#endif
2253
2254static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2255                                 unsigned nr_args)
2256{
2257        __s32 __user *fds = (__s32 __user *) arg;
2258        int fd, ret = 0;
2259        unsigned i;
2260
2261        if (ctx->user_files)
2262                return -EBUSY;
2263        if (!nr_args)
2264                return -EINVAL;
2265        if (nr_args > IORING_MAX_FIXED_FILES)
2266                return -EMFILE;
2267
2268        ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2269        if (!ctx->user_files)
2270                return -ENOMEM;
2271
2272        for (i = 0; i < nr_args; i++) {
2273                ret = -EFAULT;
2274                if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2275                        break;
2276
2277                ctx->user_files[i] = fget(fd);
2278
2279                ret = -EBADF;
2280                if (!ctx->user_files[i])
2281                        break;
2282                /*
2283                 * Don't allow io_uring instances to be registered. If UNIX
2284                 * isn't enabled, then this causes a reference cycle and this
2285                 * instance can never get freed. If UNIX is enabled we'll
2286                 * handle it just fine, but there's still no point in allowing
2287                 * a ring fd as it doesn't support regular read/write anyway.
2288                 */
2289                if (ctx->user_files[i]->f_op == &io_uring_fops) {
2290                        fput(ctx->user_files[i]);
2291                        break;
2292                }
2293                ctx->nr_user_files++;
2294                ret = 0;
2295        }
2296
2297        if (ret) {
2298                for (i = 0; i < ctx->nr_user_files; i++)
2299                        fput(ctx->user_files[i]);
2300
2301                kfree(ctx->user_files);
2302                ctx->user_files = NULL;
2303                ctx->nr_user_files = 0;
2304                return ret;
2305        }
2306
2307        ret = io_sqe_files_scm(ctx);
2308        if (ret)
2309                io_sqe_files_unregister(ctx);
2310
2311        return ret;
2312}
2313
2314static int io_sq_offload_start(struct io_ring_ctx *ctx,
2315                               struct io_uring_params *p)
2316{
2317        int ret;
2318
2319        init_waitqueue_head(&ctx->sqo_wait);
2320        mmgrab(current->mm);
2321        ctx->sqo_mm = current->mm;
2322
2323        if (ctx->flags & IORING_SETUP_SQPOLL) {
2324                ret = -EPERM;
2325                if (!capable(CAP_SYS_ADMIN))
2326                        goto err;
2327
2328                ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2329                if (!ctx->sq_thread_idle)
2330                        ctx->sq_thread_idle = HZ;
2331
2332                if (p->flags & IORING_SETUP_SQ_AFF) {
2333                        int cpu = array_index_nospec(p->sq_thread_cpu,
2334                                                        nr_cpu_ids);
2335
2336                        ret = -EINVAL;
2337                        if (!cpu_possible(cpu))
2338                                goto err;
2339
2340                        ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2341                                                        ctx, cpu,
2342                                                        "io_uring-sq");
2343                } else {
2344                        ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2345                                                        "io_uring-sq");
2346                }
2347                if (IS_ERR(ctx->sqo_thread)) {
2348                        ret = PTR_ERR(ctx->sqo_thread);
2349                        ctx->sqo_thread = NULL;
2350                        goto err;
2351                }
2352                wake_up_process(ctx->sqo_thread);
2353        } else if (p->flags & IORING_SETUP_SQ_AFF) {
2354                /* Can't have SQ_AFF without SQPOLL */
2355                ret = -EINVAL;
2356                goto err;
2357        }
2358
2359        /* Do QD, or 2 * CPUS, whatever is smallest */
2360        ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2361                        min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2362        if (!ctx->sqo_wq) {
2363                ret = -ENOMEM;
2364                goto err;
2365        }
2366
2367        return 0;
2368err:
2369        io_sq_thread_stop(ctx);
2370        mmdrop(ctx->sqo_mm);
2371        ctx->sqo_mm = NULL;
2372        return ret;
2373}
2374
2375static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2376{
2377        atomic_long_sub(nr_pages, &user->locked_vm);
2378}
2379
2380static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2381{
2382        unsigned long page_limit, cur_pages, new_pages;
2383
2384        /* Don't allow more pages than we can safely lock */
2385        page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2386
2387        do {
2388                cur_pages = atomic_long_read(&user->locked_vm);
2389                new_pages = cur_pages + nr_pages;
2390                if (new_pages > page_limit)
2391                        return -ENOMEM;
2392        } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2393                                        new_pages) != cur_pages);
2394
2395        return 0;
2396}
2397
2398static void io_mem_free(void *ptr)
2399{
2400        struct page *page;
2401
2402        if (!ptr)
2403                return;
2404
2405        page = virt_to_head_page(ptr);
2406        if (put_page_testzero(page))
2407                free_compound_page(page);
2408}
2409
2410static void *io_mem_alloc(size_t size)
2411{
2412        gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2413                                __GFP_NORETRY;
2414
2415        return (void *) __get_free_pages(gfp_flags, get_order(size));
2416}
2417
2418static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2419{
2420        struct io_sq_ring *sq_ring;
2421        struct io_cq_ring *cq_ring;
2422        size_t bytes;
2423
2424        bytes = struct_size(sq_ring, array, sq_entries);
2425        bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2426        bytes += struct_size(cq_ring, cqes, cq_entries);
2427
2428        return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2429}
2430
2431static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2432{
2433        int i, j;
2434
2435        if (!ctx->user_bufs)
2436                return -ENXIO;
2437
2438        for (i = 0; i < ctx->nr_user_bufs; i++) {
2439                struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2440
2441                for (j = 0; j < imu->nr_bvecs; j++)
2442                        put_page(imu->bvec[j].bv_page);
2443
2444                if (ctx->account_mem)
2445                        io_unaccount_mem(ctx->user, imu->nr_bvecs);
2446                kvfree(imu->bvec);
2447                imu->nr_bvecs = 0;
2448        }
2449
2450        kfree(ctx->user_bufs);
2451        ctx->user_bufs = NULL;
2452        ctx->nr_user_bufs = 0;
2453        return 0;
2454}
2455
2456static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2457                       void __user *arg, unsigned index)
2458{
2459        struct iovec __user *src;
2460
2461#ifdef CONFIG_COMPAT
2462        if (ctx->compat) {
2463                struct compat_iovec __user *ciovs;
2464                struct compat_iovec ciov;
2465
2466                ciovs = (struct compat_iovec __user *) arg;
2467                if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2468                        return -EFAULT;
2469
2470                dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2471                dst->iov_len = ciov.iov_len;
2472                return 0;
2473        }
2474#endif
2475        src = (struct iovec __user *) arg;
2476        if (copy_from_user(dst, &src[index], sizeof(*dst)))
2477                return -EFAULT;
2478        return 0;
2479}
2480
2481static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2482                                  unsigned nr_args)
2483{
2484        struct vm_area_struct **vmas = NULL;
2485        struct page **pages = NULL;
2486        int i, j, got_pages = 0;
2487        int ret = -EINVAL;
2488
2489        if (ctx->user_bufs)
2490                return -EBUSY;
2491        if (!nr_args || nr_args > UIO_MAXIOV)
2492                return -EINVAL;
2493
2494        ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2495                                        GFP_KERNEL);
2496        if (!ctx->user_bufs)
2497                return -ENOMEM;
2498
2499        for (i = 0; i < nr_args; i++) {
2500                struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2501                unsigned long off, start, end, ubuf;
2502                int pret, nr_pages;
2503                struct iovec iov;
2504                size_t size;
2505
2506                ret = io_copy_iov(ctx, &iov, arg, i);
2507                if (ret)
2508                        break;
2509
2510                /*
2511                 * Don't impose further limits on the size and buffer
2512                 * constraints here, we'll -EINVAL later when IO is
2513                 * submitted if they are wrong.
2514                 */
2515                ret = -EFAULT;
2516                if (!iov.iov_base || !iov.iov_len)
2517                        goto err;
2518
2519                /* arbitrary limit, but we need something */
2520                if (iov.iov_len > SZ_1G)
2521                        goto err;
2522
2523                ubuf = (unsigned long) iov.iov_base;
2524                end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2525                start = ubuf >> PAGE_SHIFT;
2526                nr_pages = end - start;
2527
2528                if (ctx->account_mem) {
2529                        ret = io_account_mem(ctx->user, nr_pages);
2530                        if (ret)
2531                                goto err;
2532                }
2533
2534                ret = 0;
2535                if (!pages || nr_pages > got_pages) {
2536                        kfree(vmas);
2537                        kfree(pages);
2538                        pages = kvmalloc_array(nr_pages, sizeof(struct page *),
2539                                                GFP_KERNEL);
2540                        vmas = kvmalloc_array(nr_pages,
2541                                        sizeof(struct vm_area_struct *),
2542                                        GFP_KERNEL);
2543                        if (!pages || !vmas) {
2544                                ret = -ENOMEM;
2545                                if (ctx->account_mem)
2546                                        io_unaccount_mem(ctx->user, nr_pages);
2547                                goto err;
2548                        }
2549                        got_pages = nr_pages;
2550                }
2551
2552                imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
2553                                                GFP_KERNEL);
2554                ret = -ENOMEM;
2555                if (!imu->bvec) {
2556                        if (ctx->account_mem)
2557                                io_unaccount_mem(ctx->user, nr_pages);
2558                        goto err;
2559                }
2560
2561                ret = 0;
2562                down_read(&current->mm->mmap_sem);
2563                pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE,
2564                                                pages, vmas);
2565                if (pret == nr_pages) {
2566                        /* don't support file backed memory */
2567                        for (j = 0; j < nr_pages; j++) {
2568                                struct vm_area_struct *vma = vmas[j];
2569
2570                                if (vma->vm_file &&
2571                                    !is_file_hugepages(vma->vm_file)) {
2572                                        ret = -EOPNOTSUPP;
2573                                        break;
2574                                }
2575                        }
2576                } else {
2577                        ret = pret < 0 ? pret : -EFAULT;
2578                }
2579                up_read(&current->mm->mmap_sem);
2580                if (ret) {
2581                        /*
2582                         * if we did partial map, or found file backed vmas,
2583                         * release any pages we did get
2584                         */
2585                        if (pret > 0) {
2586                                for (j = 0; j < pret; j++)
2587                                        put_page(pages[j]);
2588                        }
2589                        if (ctx->account_mem)
2590                                io_unaccount_mem(ctx->user, nr_pages);
2591                        kvfree(imu->bvec);
2592                        goto err;
2593                }
2594
2595                off = ubuf & ~PAGE_MASK;
2596                size = iov.iov_len;
2597                for (j = 0; j < nr_pages; j++) {
2598                        size_t vec_len;
2599
2600                        vec_len = min_t(size_t, size, PAGE_SIZE - off);
2601                        imu->bvec[j].bv_page = pages[j];
2602                        imu->bvec[j].bv_len = vec_len;
2603                        imu->bvec[j].bv_offset = off;
2604                        off = 0;
2605                        size -= vec_len;
2606                }
2607                /* store original address for later verification */
2608                imu->ubuf = ubuf;
2609                imu->len = iov.iov_len;
2610                imu->nr_bvecs = nr_pages;
2611
2612                ctx->nr_user_bufs++;
2613        }
2614        kvfree(pages);
2615        kvfree(vmas);
2616        return 0;
2617err:
2618        kvfree(pages);
2619        kvfree(vmas);
2620        io_sqe_buffer_unregister(ctx);
2621        return ret;
2622}
2623
2624static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2625{
2626        io_finish_async(ctx);
2627        if (ctx->sqo_mm)
2628                mmdrop(ctx->sqo_mm);
2629
2630        io_iopoll_reap_events(ctx);
2631        io_sqe_buffer_unregister(ctx);
2632        io_sqe_files_unregister(ctx);
2633
2634#if defined(CONFIG_UNIX)
2635        if (ctx->ring_sock)
2636                sock_release(ctx->ring_sock);
2637#endif
2638
2639        io_mem_free(ctx->sq_ring);
2640        io_mem_free(ctx->sq_sqes);
2641        io_mem_free(ctx->cq_ring);
2642
2643        percpu_ref_exit(&ctx->refs);
2644        if (ctx->account_mem)
2645                io_unaccount_mem(ctx->user,
2646                                ring_pages(ctx->sq_entries, ctx->cq_entries));
2647        free_uid(ctx->user);
2648        kfree(ctx);
2649}
2650
2651static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2652{
2653        struct io_ring_ctx *ctx = file->private_data;
2654        __poll_t mask = 0;
2655
2656        poll_wait(file, &ctx->cq_wait, wait);
2657        /*
2658         * synchronizes with barrier from wq_has_sleeper call in
2659         * io_commit_cqring
2660         */
2661        smp_rmb();
2662        if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
2663            ctx->sq_ring->ring_entries)
2664                mask |= EPOLLOUT | EPOLLWRNORM;
2665        if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2666                mask |= EPOLLIN | EPOLLRDNORM;
2667
2668        return mask;
2669}
2670
2671static int io_uring_fasync(int fd, struct file *file, int on)
2672{
2673        struct io_ring_ctx *ctx = file->private_data;
2674
2675        return fasync_helper(fd, file, on, &ctx->cq_fasync);
2676}
2677
2678static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2679{
2680        mutex_lock(&ctx->uring_lock);
2681        percpu_ref_kill(&ctx->refs);
2682        mutex_unlock(&ctx->uring_lock);
2683
2684        io_poll_remove_all(ctx);
2685        io_iopoll_reap_events(ctx);
2686        wait_for_completion(&ctx->ctx_done);
2687        io_ring_ctx_free(ctx);
2688}
2689
2690static int io_uring_release(struct inode *inode, struct file *file)
2691{
2692        struct io_ring_ctx *ctx = file->private_data;
2693
2694        file->private_data = NULL;
2695        io_ring_ctx_wait_and_kill(ctx);
2696        return 0;
2697}
2698
2699static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2700{
2701        loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2702        unsigned long sz = vma->vm_end - vma->vm_start;
2703        struct io_ring_ctx *ctx = file->private_data;
2704        unsigned long pfn;
2705        struct page *page;
2706        void *ptr;
2707
2708        switch (offset) {
2709        case IORING_OFF_SQ_RING:
2710                ptr = ctx->sq_ring;
2711                break;
2712        case IORING_OFF_SQES:
2713                ptr = ctx->sq_sqes;
2714                break;
2715        case IORING_OFF_CQ_RING:
2716                ptr = ctx->cq_ring;
2717                break;
2718        default:
2719                return -EINVAL;
2720        }
2721
2722        page = virt_to_head_page(ptr);
2723        if (sz > (PAGE_SIZE << compound_order(page)))
2724                return -EINVAL;
2725
2726        pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2727        return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2728}
2729
2730SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2731                u32, min_complete, u32, flags, const sigset_t __user *, sig,
2732                size_t, sigsz)
2733{
2734        struct io_ring_ctx *ctx;
2735        long ret = -EBADF;
2736        int submitted = 0;
2737        struct fd f;
2738
2739        if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2740                return -EINVAL;
2741
2742        f = fdget(fd);
2743        if (!f.file)
2744                return -EBADF;
2745
2746        ret = -EOPNOTSUPP;
2747        if (f.file->f_op != &io_uring_fops)
2748                goto out_fput;
2749
2750        ret = -ENXIO;
2751        ctx = f.file->private_data;
2752        if (!percpu_ref_tryget(&ctx->refs))
2753                goto out_fput;
2754
2755        /*
2756         * For SQ polling, the thread will do all submissions and completions.
2757         * Just return the requested submit count, and wake the thread if
2758         * we were asked to.
2759         */
2760        if (ctx->flags & IORING_SETUP_SQPOLL) {
2761                if (flags & IORING_ENTER_SQ_WAKEUP)
2762                        wake_up(&ctx->sqo_wait);
2763                submitted = to_submit;
2764                goto out_ctx;
2765        }
2766
2767        ret = 0;
2768        if (to_submit) {
2769                to_submit = min(to_submit, ctx->sq_entries);
2770
2771                mutex_lock(&ctx->uring_lock);
2772                submitted = io_ring_submit(ctx, to_submit);
2773                mutex_unlock(&ctx->uring_lock);
2774        }
2775        if (flags & IORING_ENTER_GETEVENTS) {
2776                unsigned nr_events = 0;
2777
2778                min_complete = min(min_complete, ctx->cq_entries);
2779
2780                if (ctx->flags & IORING_SETUP_IOPOLL) {
2781                        mutex_lock(&ctx->uring_lock);
2782                        ret = io_iopoll_check(ctx, &nr_events, min_complete);
2783                        mutex_unlock(&ctx->uring_lock);
2784                } else {
2785                        ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2786                }
2787        }
2788
2789out_ctx:
2790        io_ring_drop_ctx_refs(ctx, 1);
2791out_fput:
2792        fdput(f);
2793        return submitted ? submitted : ret;
2794}
2795
2796static const struct file_operations io_uring_fops = {
2797        .release        = io_uring_release,
2798        .mmap           = io_uring_mmap,
2799        .poll           = io_uring_poll,
2800        .fasync         = io_uring_fasync,
2801};
2802
2803static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2804                                  struct io_uring_params *p)
2805{
2806        struct io_sq_ring *sq_ring;
2807        struct io_cq_ring *cq_ring;
2808        size_t size;
2809
2810        sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2811        if (!sq_ring)
2812                return -ENOMEM;
2813
2814        ctx->sq_ring = sq_ring;
2815        sq_ring->ring_mask = p->sq_entries - 1;
2816        sq_ring->ring_entries = p->sq_entries;
2817        ctx->sq_mask = sq_ring->ring_mask;
2818        ctx->sq_entries = sq_ring->ring_entries;
2819
2820        size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2821        if (size == SIZE_MAX)
2822                return -EOVERFLOW;
2823
2824        ctx->sq_sqes = io_mem_alloc(size);
2825        if (!ctx->sq_sqes)
2826                return -ENOMEM;
2827
2828        cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2829        if (!cq_ring)
2830                return -ENOMEM;
2831
2832        ctx->cq_ring = cq_ring;
2833        cq_ring->ring_mask = p->cq_entries - 1;
2834        cq_ring->ring_entries = p->cq_entries;
2835        ctx->cq_mask = cq_ring->ring_mask;
2836        ctx->cq_entries = cq_ring->ring_entries;
2837        return 0;
2838}
2839
2840/*
2841 * Allocate an anonymous fd, this is what constitutes the application
2842 * visible backing of an io_uring instance. The application mmaps this
2843 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
2844 * we have to tie this fd to a socket for file garbage collection purposes.
2845 */
2846static int io_uring_get_fd(struct io_ring_ctx *ctx)
2847{
2848        struct file *file;
2849        int ret;
2850
2851#if defined(CONFIG_UNIX)
2852        ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
2853                                &ctx->ring_sock);
2854        if (ret)
2855                return ret;
2856#endif
2857
2858        ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
2859        if (ret < 0)
2860                goto err;
2861
2862        file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
2863                                        O_RDWR | O_CLOEXEC);
2864        if (IS_ERR(file)) {
2865                put_unused_fd(ret);
2866                ret = PTR_ERR(file);
2867                goto err;
2868        }
2869
2870#if defined(CONFIG_UNIX)
2871        ctx->ring_sock->file = file;
2872        ctx->ring_sock->sk->sk_user_data = ctx;
2873#endif
2874        fd_install(ret, file);
2875        return ret;
2876err:
2877#if defined(CONFIG_UNIX)
2878        sock_release(ctx->ring_sock);
2879        ctx->ring_sock = NULL;
2880#endif
2881        return ret;
2882}
2883
2884static int io_uring_create(unsigned entries, struct io_uring_params *p)
2885{
2886        struct user_struct *user = NULL;
2887        struct io_ring_ctx *ctx;
2888        bool account_mem;
2889        int ret;
2890
2891        if (!entries || entries > IORING_MAX_ENTRIES)
2892                return -EINVAL;
2893
2894        /*
2895         * Use twice as many entries for the CQ ring. It's possible for the
2896         * application to drive a higher depth than the size of the SQ ring,
2897         * since the sqes are only used at submission time. This allows for
2898         * some flexibility in overcommitting a bit.
2899         */
2900        p->sq_entries = roundup_pow_of_two(entries);
2901        p->cq_entries = 2 * p->sq_entries;
2902
2903        user = get_uid(current_user());
2904        account_mem = !capable(CAP_IPC_LOCK);
2905
2906        if (account_mem) {
2907                ret = io_account_mem(user,
2908                                ring_pages(p->sq_entries, p->cq_entries));
2909                if (ret) {
2910                        free_uid(user);
2911                        return ret;
2912                }
2913        }
2914
2915        ctx = io_ring_ctx_alloc(p);
2916        if (!ctx) {
2917                if (account_mem)
2918                        io_unaccount_mem(user, ring_pages(p->sq_entries,
2919                                                                p->cq_entries));
2920                free_uid(user);
2921                return -ENOMEM;
2922        }
2923        ctx->compat = in_compat_syscall();
2924        ctx->account_mem = account_mem;
2925        ctx->user = user;
2926
2927        ret = io_allocate_scq_urings(ctx, p);
2928        if (ret)
2929                goto err;
2930
2931        ret = io_sq_offload_start(ctx, p);
2932        if (ret)
2933                goto err;
2934
2935        ret = io_uring_get_fd(ctx);
2936        if (ret < 0)
2937                goto err;
2938
2939        memset(&p->sq_off, 0, sizeof(p->sq_off));
2940        p->sq_off.head = offsetof(struct io_sq_ring, r.head);
2941        p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
2942        p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
2943        p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
2944        p->sq_off.flags = offsetof(struct io_sq_ring, flags);
2945        p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
2946        p->sq_off.array = offsetof(struct io_sq_ring, array);
2947
2948        memset(&p->cq_off, 0, sizeof(p->cq_off));
2949        p->cq_off.head = offsetof(struct io_cq_ring, r.head);
2950        p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
2951        p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
2952        p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
2953        p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
2954        p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
2955        return ret;
2956err:
2957        io_ring_ctx_wait_and_kill(ctx);
2958        return ret;
2959}
2960
2961/*
2962 * Sets up an aio uring context, and returns the fd. Applications asks for a
2963 * ring size, we return the actual sq/cq ring sizes (among other things) in the
2964 * params structure passed in.
2965 */
2966static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
2967{
2968        struct io_uring_params p;
2969        long ret;
2970        int i;
2971
2972        if (copy_from_user(&p, params, sizeof(p)))
2973                return -EFAULT;
2974        for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
2975                if (p.resv[i])
2976                        return -EINVAL;
2977        }
2978
2979        if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
2980                        IORING_SETUP_SQ_AFF))
2981                return -EINVAL;
2982
2983        ret = io_uring_create(entries, &p);
2984        if (ret < 0)
2985                return ret;
2986
2987        if (copy_to_user(params, &p, sizeof(p)))
2988                return -EFAULT;
2989
2990        return ret;
2991}
2992
2993SYSCALL_DEFINE2(io_uring_setup, u32, entries,
2994                struct io_uring_params __user *, params)
2995{
2996        return io_uring_setup(entries, params);
2997}
2998
2999static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3000                               void __user *arg, unsigned nr_args)
3001        __releases(ctx->uring_lock)
3002        __acquires(ctx->uring_lock)
3003{
3004        int ret;
3005
3006        /*
3007         * We're inside the ring mutex, if the ref is already dying, then
3008         * someone else killed the ctx or is already going through
3009         * io_uring_register().
3010         */
3011        if (percpu_ref_is_dying(&ctx->refs))
3012                return -ENXIO;
3013
3014        percpu_ref_kill(&ctx->refs);
3015
3016        /*
3017         * Drop uring mutex before waiting for references to exit. If another
3018         * thread is currently inside io_uring_enter() it might need to grab
3019         * the uring_lock to make progress. If we hold it here across the drain
3020         * wait, then we can deadlock. It's safe to drop the mutex here, since
3021         * no new references will come in after we've killed the percpu ref.
3022         */
3023        mutex_unlock(&ctx->uring_lock);
3024        wait_for_completion(&ctx->ctx_done);
3025        mutex_lock(&ctx->uring_lock);
3026
3027        switch (opcode) {
3028        case IORING_REGISTER_BUFFERS:
3029                ret = io_sqe_buffer_register(ctx, arg, nr_args);
3030                break;
3031        case IORING_UNREGISTER_BUFFERS:
3032                ret = -EINVAL;
3033                if (arg || nr_args)
3034                        break;
3035                ret = io_sqe_buffer_unregister(ctx);
3036                break;
3037        case IORING_REGISTER_FILES:
3038                ret = io_sqe_files_register(ctx, arg, nr_args);
3039                break;
3040        case IORING_UNREGISTER_FILES:
3041                ret = -EINVAL;
3042                if (arg || nr_args)
3043                        break;
3044                ret = io_sqe_files_unregister(ctx);
3045                break;
3046        default:
3047                ret = -EINVAL;
3048                break;
3049        }
3050
3051        /* bring the ctx back to life */
3052        reinit_completion(&ctx->ctx_done);
3053        percpu_ref_reinit(&ctx->refs);
3054        return ret;
3055}
3056
3057SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3058                void __user *, arg, unsigned int, nr_args)
3059{
3060        struct io_ring_ctx *ctx;
3061        long ret = -EBADF;
3062        struct fd f;
3063
3064        f = fdget(fd);
3065        if (!f.file)
3066                return -EBADF;
3067
3068        ret = -EOPNOTSUPP;
3069        if (f.file->f_op != &io_uring_fops)
3070                goto out_fput;
3071
3072        ctx = f.file->private_data;
3073
3074        mutex_lock(&ctx->uring_lock);
3075        ret = __io_uring_register(ctx, opcode, arg, nr_args);
3076        mutex_unlock(&ctx->uring_lock);
3077out_fput:
3078        fdput(f);
3079        return ret;
3080}
3081
3082static int __init io_uring_init(void)
3083{
3084        req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3085        return 0;
3086};
3087__initcall(io_uring_init);
3088