linux/fs/aio.c
<<
>>
Prefs
   1/*
   2 *      An async IO implementation for Linux
   3 *      Written by Benjamin LaHaise <bcrl@kvack.org>
   4 *
   5 *      Implements an efficient asynchronous io interface.
   6 *
   7 *      Copyright 2000, 2001, 2002 Red Hat, Inc.  All Rights Reserved.
   8 *
   9 *      See ../COPYING for licensing terms.
  10 */
  11#define pr_fmt(fmt) "%s: " fmt, __func__
  12
  13#include <linux/kernel.h>
  14#include <linux/init.h>
  15#include <linux/errno.h>
  16#include <linux/time.h>
  17#include <linux/aio_abi.h>
  18#include <linux/export.h>
  19#include <linux/syscalls.h>
  20#include <linux/backing-dev.h>
  21#include <linux/uio.h>
  22
  23#include <linux/sched.h>
  24#include <linux/fs.h>
  25#include <linux/file.h>
  26#include <linux/mm.h>
  27#include <linux/mman.h>
  28#include <linux/mmu_context.h>
  29#include <linux/slab.h>
  30#include <linux/timer.h>
  31#include <linux/aio.h>
  32#include <linux/highmem.h>
  33#include <linux/workqueue.h>
  34#include <linux/security.h>
  35#include <linux/eventfd.h>
  36#include <linux/blkdev.h>
  37#include <linux/compat.h>
  38
  39#include <asm/kmap_types.h>
  40#include <asm/uaccess.h>
  41
  42#include "internal.h"
  43
  44#define AIO_RING_MAGIC                  0xa10a10a1
  45#define AIO_RING_COMPAT_FEATURES        1
  46#define AIO_RING_INCOMPAT_FEATURES      0
  47struct aio_ring {
  48        unsigned        id;     /* kernel internal index number */
  49        unsigned        nr;     /* number of io_events */
  50        unsigned        head;
  51        unsigned        tail;
  52
  53        unsigned        magic;
  54        unsigned        compat_features;
  55        unsigned        incompat_features;
  56        unsigned        header_length;  /* size of aio_ring */
  57
  58
  59        struct io_event         io_events[0];
  60}; /* 128 bytes + ring size */
  61
  62#define AIO_RING_PAGES  8
  63
  64struct kioctx {
  65        atomic_t                users;
  66        atomic_t                dead;
  67
  68        /* This needs improving */
  69        unsigned long           user_id;
  70        struct hlist_node       list;
  71
  72        /*
  73         * This is what userspace passed to io_setup(), it's not used for
  74         * anything but counting against the global max_reqs quota.
  75         *
  76         * The real limit is nr_events - 1, which will be larger (see
  77         * aio_setup_ring())
  78         */
  79        unsigned                max_reqs;
  80
  81        /* Size of ringbuffer, in units of struct io_event */
  82        unsigned                nr_events;
  83
  84        unsigned long           mmap_base;
  85        unsigned long           mmap_size;
  86
  87        struct page             **ring_pages;
  88        long                    nr_pages;
  89
  90        struct rcu_head         rcu_head;
  91        struct work_struct      rcu_work;
  92
  93        struct {
  94                atomic_t        reqs_active;
  95        } ____cacheline_aligned_in_smp;
  96
  97        struct {
  98                spinlock_t      ctx_lock;
  99                struct list_head active_reqs;   /* used for cancellation */
 100        } ____cacheline_aligned_in_smp;
 101
 102        struct {
 103                struct mutex    ring_lock;
 104                wait_queue_head_t wait;
 105        } ____cacheline_aligned_in_smp;
 106
 107        struct {
 108                unsigned        tail;
 109                spinlock_t      completion_lock;
 110        } ____cacheline_aligned_in_smp;
 111
 112        struct page             *internal_pages[AIO_RING_PAGES];
 113};
 114
 115/*------ sysctl variables----*/
 116static DEFINE_SPINLOCK(aio_nr_lock);
 117unsigned long aio_nr;           /* current system wide number of aio requests */
 118unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
 119/*----end sysctl variables---*/
 120
 121static struct kmem_cache        *kiocb_cachep;
 122static struct kmem_cache        *kioctx_cachep;
 123
 124/* aio_setup
 125 *      Creates the slab caches used by the aio routines, panic on
 126 *      failure as this is done early during the boot sequence.
 127 */
 128static int __init aio_setup(void)
 129{
 130        kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
 131        kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
 132
 133        pr_debug("sizeof(struct page) = %zu\n", sizeof(struct page));
 134
 135        return 0;
 136}
 137__initcall(aio_setup);
 138
 139static void aio_free_ring(struct kioctx *ctx)
 140{
 141        long i;
 142
 143        for (i = 0; i < ctx->nr_pages; i++)
 144                put_page(ctx->ring_pages[i]);
 145
 146        if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages)
 147                kfree(ctx->ring_pages);
 148}
 149
 150static int aio_setup_ring(struct kioctx *ctx)
 151{
 152        struct aio_ring *ring;
 153        unsigned nr_events = ctx->max_reqs;
 154        struct mm_struct *mm = current->mm;
 155        unsigned long size, populate;
 156        int nr_pages;
 157
 158        /* Compensate for the ring buffer's head/tail overlap entry */
 159        nr_events += 2; /* 1 is required, 2 for good luck */
 160
 161        size = sizeof(struct aio_ring);
 162        size += sizeof(struct io_event) * nr_events;
 163        nr_pages = (size + PAGE_SIZE-1) >> PAGE_SHIFT;
 164
 165        if (nr_pages < 0)
 166                return -EINVAL;
 167
 168        nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
 169
 170        ctx->nr_events = 0;
 171        ctx->ring_pages = ctx->internal_pages;
 172        if (nr_pages > AIO_RING_PAGES) {
 173                ctx->ring_pages = kcalloc(nr_pages, sizeof(struct page *),
 174                                          GFP_KERNEL);
 175                if (!ctx->ring_pages)
 176                        return -ENOMEM;
 177        }
 178
 179        ctx->mmap_size = nr_pages * PAGE_SIZE;
 180        pr_debug("attempting mmap of %lu bytes\n", ctx->mmap_size);
 181        down_write(&mm->mmap_sem);
 182        ctx->mmap_base = do_mmap_pgoff(NULL, 0, ctx->mmap_size,
 183                                       PROT_READ|PROT_WRITE,
 184                                       MAP_ANONYMOUS|MAP_PRIVATE, 0, &populate);
 185        if (IS_ERR((void *)ctx->mmap_base)) {
 186                up_write(&mm->mmap_sem);
 187                ctx->mmap_size = 0;
 188                aio_free_ring(ctx);
 189                return -EAGAIN;
 190        }
 191
 192        pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base);
 193        ctx->nr_pages = get_user_pages(current, mm, ctx->mmap_base, nr_pages,
 194                                       1, 0, ctx->ring_pages, NULL);
 195        up_write(&mm->mmap_sem);
 196
 197        if (unlikely(ctx->nr_pages != nr_pages)) {
 198                aio_free_ring(ctx);
 199                return -EAGAIN;
 200        }
 201        if (populate)
 202                mm_populate(ctx->mmap_base, populate);
 203
 204        ctx->user_id = ctx->mmap_base;
 205        ctx->nr_events = nr_events; /* trusted copy */
 206
 207        ring = kmap_atomic(ctx->ring_pages[0]);
 208        ring->nr = nr_events;   /* user copy */
 209        ring->id = ctx->user_id;
 210        ring->head = ring->tail = 0;
 211        ring->magic = AIO_RING_MAGIC;
 212        ring->compat_features = AIO_RING_COMPAT_FEATURES;
 213        ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
 214        ring->header_length = sizeof(struct aio_ring);
 215        kunmap_atomic(ring);
 216        flush_dcache_page(ctx->ring_pages[0]);
 217
 218        return 0;
 219}
 220
 221#define AIO_EVENTS_PER_PAGE     (PAGE_SIZE / sizeof(struct io_event))
 222#define AIO_EVENTS_FIRST_PAGE   ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
 223#define AIO_EVENTS_OFFSET       (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
 224
 225void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)
 226{
 227        struct kioctx *ctx = req->ki_ctx;
 228        unsigned long flags;
 229
 230        spin_lock_irqsave(&ctx->ctx_lock, flags);
 231
 232        if (!req->ki_list.next)
 233                list_add(&req->ki_list, &ctx->active_reqs);
 234
 235        req->ki_cancel = cancel;
 236
 237        spin_unlock_irqrestore(&ctx->ctx_lock, flags);
 238}
 239EXPORT_SYMBOL(kiocb_set_cancel_fn);
 240
 241static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
 242                        struct io_event *res)
 243{
 244        kiocb_cancel_fn *old, *cancel;
 245        int ret = -EINVAL;
 246
 247        /*
 248         * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it
 249         * actually has a cancel function, hence the cmpxchg()
 250         */
 251
 252        cancel = ACCESS_ONCE(kiocb->ki_cancel);
 253        do {
 254                if (!cancel || cancel == KIOCB_CANCELLED)
 255                        return ret;
 256
 257                old = cancel;
 258                cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED);
 259        } while (cancel != old);
 260
 261        atomic_inc(&kiocb->ki_users);
 262        spin_unlock_irq(&ctx->ctx_lock);
 263
 264        memset(res, 0, sizeof(*res));
 265        res->obj = (u64)(unsigned long)kiocb->ki_obj.user;
 266        res->data = kiocb->ki_user_data;
 267        ret = cancel(kiocb, res);
 268
 269        spin_lock_irq(&ctx->ctx_lock);
 270
 271        return ret;
 272}
 273
 274static void free_ioctx_rcu(struct rcu_head *head)
 275{
 276        struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
 277        kmem_cache_free(kioctx_cachep, ctx);
 278}
 279
 280/*
 281 * When this function runs, the kioctx has been removed from the "hash table"
 282 * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
 283 * now it's safe to cancel any that need to be.
 284 */
 285static void free_ioctx(struct kioctx *ctx)
 286{
 287        struct aio_ring *ring;
 288        struct io_event res;
 289        struct kiocb *req;
 290        unsigned head, avail;
 291
 292        spin_lock_irq(&ctx->ctx_lock);
 293
 294        while (!list_empty(&ctx->active_reqs)) {
 295                req = list_first_entry(&ctx->active_reqs,
 296                                       struct kiocb, ki_list);
 297
 298                list_del_init(&req->ki_list);
 299                kiocb_cancel(ctx, req, &res);
 300        }
 301
 302        spin_unlock_irq(&ctx->ctx_lock);
 303
 304        ring = kmap_atomic(ctx->ring_pages[0]);
 305        head = ring->head;
 306        kunmap_atomic(ring);
 307
 308        while (atomic_read(&ctx->reqs_active) > 0) {
 309                wait_event(ctx->wait,
 310                                head != ctx->tail ||
 311                                atomic_read(&ctx->reqs_active) <= 0);
 312
 313                avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head;
 314
 315                atomic_sub(avail, &ctx->reqs_active);
 316                head += avail;
 317                head %= ctx->nr_events;
 318        }
 319
 320        WARN_ON(atomic_read(&ctx->reqs_active) < 0);
 321
 322        aio_free_ring(ctx);
 323
 324        pr_debug("freeing %p\n", ctx);
 325
 326        /*
 327         * Here the call_rcu() is between the wait_event() for reqs_active to
 328         * hit 0, and freeing the ioctx.
 329         *
 330         * aio_complete() decrements reqs_active, but it has to touch the ioctx
 331         * after to issue a wakeup so we use rcu.
 332         */
 333        call_rcu(&ctx->rcu_head, free_ioctx_rcu);
 334}
 335
 336static void put_ioctx(struct kioctx *ctx)
 337{
 338        if (unlikely(atomic_dec_and_test(&ctx->users)))
 339                free_ioctx(ctx);
 340}
 341
 342/* ioctx_alloc
 343 *      Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
 344 */
 345static struct kioctx *ioctx_alloc(unsigned nr_events)
 346{
 347        struct mm_struct *mm = current->mm;
 348        struct kioctx *ctx;
 349        int err = -ENOMEM;
 350
 351        /* Prevent overflows */
 352        if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
 353            (nr_events > (0x10000000U / sizeof(struct kiocb)))) {
 354                pr_debug("ENOMEM: nr_events too high\n");
 355                return ERR_PTR(-EINVAL);
 356        }
 357
 358        if (!nr_events || (unsigned long)nr_events > aio_max_nr)
 359                return ERR_PTR(-EAGAIN);
 360
 361        ctx = kmem_cache_zalloc(kioctx_cachep, GFP_KERNEL);
 362        if (!ctx)
 363                return ERR_PTR(-ENOMEM);
 364
 365        ctx->max_reqs = nr_events;
 366
 367        atomic_set(&ctx->users, 2);
 368        atomic_set(&ctx->dead, 0);
 369        spin_lock_init(&ctx->ctx_lock);
 370        spin_lock_init(&ctx->completion_lock);
 371        mutex_init(&ctx->ring_lock);
 372        init_waitqueue_head(&ctx->wait);
 373
 374        INIT_LIST_HEAD(&ctx->active_reqs);
 375
 376        if (aio_setup_ring(ctx) < 0)
 377                goto out_freectx;
 378
 379        /* limit the number of system wide aios */
 380        spin_lock(&aio_nr_lock);
 381        if (aio_nr + nr_events > aio_max_nr ||
 382            aio_nr + nr_events < aio_nr) {
 383                spin_unlock(&aio_nr_lock);
 384                goto out_cleanup;
 385        }
 386        aio_nr += ctx->max_reqs;
 387        spin_unlock(&aio_nr_lock);
 388
 389        /* now link into global list. */
 390        spin_lock(&mm->ioctx_lock);
 391        hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
 392        spin_unlock(&mm->ioctx_lock);
 393
 394        pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
 395                 ctx, ctx->user_id, mm, ctx->nr_events);
 396        return ctx;
 397
 398out_cleanup:
 399        err = -EAGAIN;
 400        aio_free_ring(ctx);
 401out_freectx:
 402        kmem_cache_free(kioctx_cachep, ctx);
 403        pr_debug("error allocating ioctx %d\n", err);
 404        return ERR_PTR(err);
 405}
 406
 407static void kill_ioctx_work(struct work_struct *work)
 408{
 409        struct kioctx *ctx = container_of(work, struct kioctx, rcu_work);
 410
 411        wake_up_all(&ctx->wait);
 412        put_ioctx(ctx);
 413}
 414
 415static void kill_ioctx_rcu(struct rcu_head *head)
 416{
 417        struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
 418
 419        INIT_WORK(&ctx->rcu_work, kill_ioctx_work);
 420        schedule_work(&ctx->rcu_work);
 421}
 422
 423/* kill_ioctx
 424 *      Cancels all outstanding aio requests on an aio context.  Used
 425 *      when the processes owning a context have all exited to encourage
 426 *      the rapid destruction of the kioctx.
 427 */
 428static void kill_ioctx(struct kioctx *ctx)
 429{
 430        if (!atomic_xchg(&ctx->dead, 1)) {
 431                hlist_del_rcu(&ctx->list);
 432
 433                /*
 434                 * It'd be more correct to do this in free_ioctx(), after all
 435                 * the outstanding kiocbs have finished - but by then io_destroy
 436                 * has already returned, so io_setup() could potentially return
 437                 * -EAGAIN with no ioctxs actually in use (as far as userspace
 438                 *  could tell).
 439                 */
 440                spin_lock(&aio_nr_lock);
 441                BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
 442                aio_nr -= ctx->max_reqs;
 443                spin_unlock(&aio_nr_lock);
 444
 445                if (ctx->mmap_size)
 446                        vm_munmap(ctx->mmap_base, ctx->mmap_size);
 447
 448                /* Between hlist_del_rcu() and dropping the initial ref */
 449                call_rcu(&ctx->rcu_head, kill_ioctx_rcu);
 450        }
 451}
 452
 453/* wait_on_sync_kiocb:
 454 *      Waits on the given sync kiocb to complete.
 455 */
 456ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
 457{
 458        while (atomic_read(&iocb->ki_users)) {
 459                set_current_state(TASK_UNINTERRUPTIBLE);
 460                if (!atomic_read(&iocb->ki_users))
 461                        break;
 462                io_schedule();
 463        }
 464        __set_current_state(TASK_RUNNING);
 465        return iocb->ki_user_data;
 466}
 467EXPORT_SYMBOL(wait_on_sync_kiocb);
 468
 469/*
 470 * exit_aio: called when the last user of mm goes away.  At this point, there is
 471 * no way for any new requests to be submited or any of the io_* syscalls to be
 472 * called on the context.
 473 *
 474 * There may be outstanding kiocbs, but free_ioctx() will explicitly wait on
 475 * them.
 476 */
 477void exit_aio(struct mm_struct *mm)
 478{
 479        struct kioctx *ctx;
 480        struct hlist_node *n;
 481
 482        hlist_for_each_entry_safe(ctx, n, &mm->ioctx_list, list) {
 483                if (1 != atomic_read(&ctx->users))
 484                        printk(KERN_DEBUG
 485                                "exit_aio:ioctx still alive: %d %d %d\n",
 486                                atomic_read(&ctx->users),
 487                                atomic_read(&ctx->dead),
 488                                atomic_read(&ctx->reqs_active));
 489                /*
 490                 * We don't need to bother with munmap() here -
 491                 * exit_mmap(mm) is coming and it'll unmap everything.
 492                 * Since aio_free_ring() uses non-zero ->mmap_size
 493                 * as indicator that it needs to unmap the area,
 494                 * just set it to 0; aio_free_ring() is the only
 495                 * place that uses ->mmap_size, so it's safe.
 496                 */
 497                ctx->mmap_size = 0;
 498
 499                kill_ioctx(ctx);
 500        }
 501}
 502
 503/* aio_get_req
 504 *      Allocate a slot for an aio request.  Increments the ki_users count
 505 * of the kioctx so that the kioctx stays around until all requests are
 506 * complete.  Returns NULL if no requests are free.
 507 *
 508 * Returns with kiocb->ki_users set to 2.  The io submit code path holds
 509 * an extra reference while submitting the i/o.
 510 * This prevents races between the aio code path referencing the
 511 * req (after submitting it) and aio_complete() freeing the req.
 512 */
 513static inline struct kiocb *aio_get_req(struct kioctx *ctx)
 514{
 515        struct kiocb *req;
 516
 517        if (atomic_read(&ctx->reqs_active) >= ctx->nr_events)
 518                return NULL;
 519
 520        if (atomic_inc_return(&ctx->reqs_active) > ctx->nr_events - 1)
 521                goto out_put;
 522
 523        req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
 524        if (unlikely(!req))
 525                goto out_put;
 526
 527        atomic_set(&req->ki_users, 2);
 528        req->ki_ctx = ctx;
 529
 530        return req;
 531out_put:
 532        atomic_dec(&ctx->reqs_active);
 533        return NULL;
 534}
 535
 536static void kiocb_free(struct kiocb *req)
 537{
 538        if (req->ki_filp)
 539                fput(req->ki_filp);
 540        if (req->ki_eventfd != NULL)
 541                eventfd_ctx_put(req->ki_eventfd);
 542        if (req->ki_dtor)
 543                req->ki_dtor(req);
 544        if (req->ki_iovec != &req->ki_inline_vec)
 545                kfree(req->ki_iovec);
 546        kmem_cache_free(kiocb_cachep, req);
 547}
 548
 549void aio_put_req(struct kiocb *req)
 550{
 551        if (atomic_dec_and_test(&req->ki_users))
 552                kiocb_free(req);
 553}
 554EXPORT_SYMBOL(aio_put_req);
 555
 556static struct kioctx *lookup_ioctx(unsigned long ctx_id)
 557{
 558        struct mm_struct *mm = current->mm;
 559        struct kioctx *ctx, *ret = NULL;
 560
 561        rcu_read_lock();
 562
 563        hlist_for_each_entry_rcu(ctx, &mm->ioctx_list, list) {
 564                if (ctx->user_id == ctx_id) {
 565                        atomic_inc(&ctx->users);
 566                        ret = ctx;
 567                        break;
 568                }
 569        }
 570
 571        rcu_read_unlock();
 572        return ret;
 573}
 574
 575/* aio_complete
 576 *      Called when the io request on the given iocb is complete.
 577 */
 578void aio_complete(struct kiocb *iocb, long res, long res2)
 579{
 580        struct kioctx   *ctx = iocb->ki_ctx;
 581        struct aio_ring *ring;
 582        struct io_event *ev_page, *event;
 583        unsigned long   flags;
 584        unsigned tail, pos;
 585
 586        /*
 587         * Special case handling for sync iocbs:
 588         *  - events go directly into the iocb for fast handling
 589         *  - the sync task with the iocb in its stack holds the single iocb
 590         *    ref, no other paths have a way to get another ref
 591         *  - the sync task helpfully left a reference to itself in the iocb
 592         */
 593        if (is_sync_kiocb(iocb)) {
 594                BUG_ON(atomic_read(&iocb->ki_users) != 1);
 595                iocb->ki_user_data = res;
 596                atomic_set(&iocb->ki_users, 0);
 597                wake_up_process(iocb->ki_obj.tsk);
 598                return;
 599        }
 600
 601        /*
 602         * Take rcu_read_lock() in case the kioctx is being destroyed, as we
 603         * need to issue a wakeup after decrementing reqs_active.
 604         */
 605        rcu_read_lock();
 606
 607        if (iocb->ki_list.next) {
 608                unsigned long flags;
 609
 610                spin_lock_irqsave(&ctx->ctx_lock, flags);
 611                list_del(&iocb->ki_list);
 612                spin_unlock_irqrestore(&ctx->ctx_lock, flags);
 613        }
 614
 615        /*
 616         * cancelled requests don't get events, userland was given one
 617         * when the event got cancelled.
 618         */
 619        if (unlikely(xchg(&iocb->ki_cancel,
 620                          KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
 621                atomic_dec(&ctx->reqs_active);
 622                /* Still need the wake_up in case free_ioctx is waiting */
 623                goto put_rq;
 624        }
 625
 626        /*
 627         * Add a completion event to the ring buffer. Must be done holding
 628         * ctx->completion_lock to prevent other code from messing with the tail
 629         * pointer since we might be called from irq context.
 630         */
 631        spin_lock_irqsave(&ctx->completion_lock, flags);
 632
 633        tail = ctx->tail;
 634        pos = tail + AIO_EVENTS_OFFSET;
 635
 636        if (++tail >= ctx->nr_events)
 637                tail = 0;
 638
 639        ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
 640        event = ev_page + pos % AIO_EVENTS_PER_PAGE;
 641
 642        event->obj = (u64)(unsigned long)iocb->ki_obj.user;
 643        event->data = iocb->ki_user_data;
 644        event->res = res;
 645        event->res2 = res2;
 646
 647        kunmap_atomic(ev_page);
 648        flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
 649
 650        pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n",
 651                 ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
 652                 res, res2);
 653
 654        /* after flagging the request as done, we
 655         * must never even look at it again
 656         */
 657        smp_wmb();      /* make event visible before updating tail */
 658
 659        ctx->tail = tail;
 660
 661        ring = kmap_atomic(ctx->ring_pages[0]);
 662        ring->tail = tail;
 663        kunmap_atomic(ring);
 664        flush_dcache_page(ctx->ring_pages[0]);
 665
 666        spin_unlock_irqrestore(&ctx->completion_lock, flags);
 667
 668        pr_debug("added to ring %p at [%u]\n", iocb, tail);
 669
 670        /*
 671         * Check if the user asked us to deliver the result through an
 672         * eventfd. The eventfd_signal() function is safe to be called
 673         * from IRQ context.
 674         */
 675        if (iocb->ki_eventfd != NULL)
 676                eventfd_signal(iocb->ki_eventfd, 1);
 677
 678put_rq:
 679        /* everything turned out well, dispose of the aiocb. */
 680        aio_put_req(iocb);
 681
 682        /*
 683         * We have to order our ring_info tail store above and test
 684         * of the wait list below outside the wait lock.  This is
 685         * like in wake_up_bit() where clearing a bit has to be
 686         * ordered with the unlocked test.
 687         */
 688        smp_mb();
 689
 690        if (waitqueue_active(&ctx->wait))
 691                wake_up(&ctx->wait);
 692
 693        rcu_read_unlock();
 694}
 695EXPORT_SYMBOL(aio_complete);
 696
 697/* aio_read_events
 698 *      Pull an event off of the ioctx's event ring.  Returns the number of
 699 *      events fetched
 700 */
 701static long aio_read_events_ring(struct kioctx *ctx,
 702                                 struct io_event __user *event, long nr)
 703{
 704        struct aio_ring *ring;
 705        unsigned head, pos;
 706        long ret = 0;
 707        int copy_ret;
 708
 709        mutex_lock(&ctx->ring_lock);
 710
 711        ring = kmap_atomic(ctx->ring_pages[0]);
 712        head = ring->head;
 713        kunmap_atomic(ring);
 714
 715        pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr_events);
 716
 717        if (head == ctx->tail)
 718                goto out;
 719
 720        while (ret < nr) {
 721                long avail;
 722                struct io_event *ev;
 723                struct page *page;
 724
 725                avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head;
 726                if (head == ctx->tail)
 727                        break;
 728
 729                avail = min(avail, nr - ret);
 730                avail = min_t(long, avail, AIO_EVENTS_PER_PAGE -
 731                            ((head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE));
 732
 733                pos = head + AIO_EVENTS_OFFSET;
 734                page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE];
 735                pos %= AIO_EVENTS_PER_PAGE;
 736
 737                ev = kmap(page);
 738                copy_ret = copy_to_user(event + ret, ev + pos,
 739                                        sizeof(*ev) * avail);
 740                kunmap(page);
 741
 742                if (unlikely(copy_ret)) {
 743                        ret = -EFAULT;
 744                        goto out;
 745                }
 746
 747                ret += avail;
 748                head += avail;
 749                head %= ctx->nr_events;
 750        }
 751
 752        ring = kmap_atomic(ctx->ring_pages[0]);
 753        ring->head = head;
 754        kunmap_atomic(ring);
 755        flush_dcache_page(ctx->ring_pages[0]);
 756
 757        pr_debug("%li  h%u t%u\n", ret, head, ctx->tail);
 758
 759        atomic_sub(ret, &ctx->reqs_active);
 760out:
 761        mutex_unlock(&ctx->ring_lock);
 762
 763        return ret;
 764}
 765
 766static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
 767                            struct io_event __user *event, long *i)
 768{
 769        long ret = aio_read_events_ring(ctx, event + *i, nr - *i);
 770
 771        if (ret > 0)
 772                *i += ret;
 773
 774        if (unlikely(atomic_read(&ctx->dead)))
 775                ret = -EINVAL;
 776
 777        if (!*i)
 778                *i = ret;
 779
 780        return ret < 0 || *i >= min_nr;
 781}
 782
 783static long read_events(struct kioctx *ctx, long min_nr, long nr,
 784                        struct io_event __user *event,
 785                        struct timespec __user *timeout)
 786{
 787        ktime_t until = { .tv64 = KTIME_MAX };
 788        long ret = 0;
 789
 790        if (timeout) {
 791                struct timespec ts;
 792
 793                if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
 794                        return -EFAULT;
 795
 796                until = timespec_to_ktime(ts);
 797        }
 798
 799        /*
 800         * Note that aio_read_events() is being called as the conditional - i.e.
 801         * we're calling it after prepare_to_wait() has set task state to
 802         * TASK_INTERRUPTIBLE.
 803         *
 804         * But aio_read_events() can block, and if it blocks it's going to flip
 805         * the task state back to TASK_RUNNING.
 806         *
 807         * This should be ok, provided it doesn't flip the state back to
 808         * TASK_RUNNING and return 0 too much - that causes us to spin. That
 809         * will only happen if the mutex_lock() call blocks, and we then find
 810         * the ringbuffer empty. So in practice we should be ok, but it's
 811         * something to be aware of when touching this code.
 812         */
 813        wait_event_interruptible_hrtimeout(ctx->wait,
 814                        aio_read_events(ctx, min_nr, nr, event, &ret), until);
 815
 816        if (!ret && signal_pending(current))
 817                ret = -EINTR;
 818
 819        return ret;
 820}
 821
 822/* sys_io_setup:
 823 *      Create an aio_context capable of receiving at least nr_events.
 824 *      ctxp must not point to an aio_context that already exists, and
 825 *      must be initialized to 0 prior to the call.  On successful
 826 *      creation of the aio_context, *ctxp is filled in with the resulting 
 827 *      handle.  May fail with -EINVAL if *ctxp is not initialized,
 828 *      if the specified nr_events exceeds internal limits.  May fail 
 829 *      with -EAGAIN if the specified nr_events exceeds the user's limit 
 830 *      of available events.  May fail with -ENOMEM if insufficient kernel
 831 *      resources are available.  May fail with -EFAULT if an invalid
 832 *      pointer is passed for ctxp.  Will fail with -ENOSYS if not
 833 *      implemented.
 834 */
 835SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
 836{
 837        struct kioctx *ioctx = NULL;
 838        unsigned long ctx;
 839        long ret;
 840
 841        ret = get_user(ctx, ctxp);
 842        if (unlikely(ret))
 843                goto out;
 844
 845        ret = -EINVAL;
 846        if (unlikely(ctx || nr_events == 0)) {
 847                pr_debug("EINVAL: io_setup: ctx %lu nr_events %u\n",
 848                         ctx, nr_events);
 849                goto out;
 850        }
 851
 852        ioctx = ioctx_alloc(nr_events);
 853        ret = PTR_ERR(ioctx);
 854        if (!IS_ERR(ioctx)) {
 855                ret = put_user(ioctx->user_id, ctxp);
 856                if (ret)
 857                        kill_ioctx(ioctx);
 858                put_ioctx(ioctx);
 859        }
 860
 861out:
 862        return ret;
 863}
 864
 865/* sys_io_destroy:
 866 *      Destroy the aio_context specified.  May cancel any outstanding 
 867 *      AIOs and block on completion.  Will fail with -ENOSYS if not
 868 *      implemented.  May fail with -EINVAL if the context pointed to
 869 *      is invalid.
 870 */
 871SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
 872{
 873        struct kioctx *ioctx = lookup_ioctx(ctx);
 874        if (likely(NULL != ioctx)) {
 875                kill_ioctx(ioctx);
 876                put_ioctx(ioctx);
 877                return 0;
 878        }
 879        pr_debug("EINVAL: io_destroy: invalid context id\n");
 880        return -EINVAL;
 881}
 882
 883static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret)
 884{
 885        struct iovec *iov = &iocb->ki_iovec[iocb->ki_cur_seg];
 886
 887        BUG_ON(ret <= 0);
 888
 889        while (iocb->ki_cur_seg < iocb->ki_nr_segs && ret > 0) {
 890                ssize_t this = min((ssize_t)iov->iov_len, ret);
 891                iov->iov_base += this;
 892                iov->iov_len -= this;
 893                iocb->ki_left -= this;
 894                ret -= this;
 895                if (iov->iov_len == 0) {
 896                        iocb->ki_cur_seg++;
 897                        iov++;
 898                }
 899        }
 900
 901        /* the caller should not have done more io than what fit in
 902         * the remaining iovecs */
 903        BUG_ON(ret > 0 && iocb->ki_left == 0);
 904}
 905
 906typedef ssize_t (aio_rw_op)(struct kiocb *, const struct iovec *,
 907                            unsigned long, loff_t);
 908
 909static ssize_t aio_rw_vect_retry(struct kiocb *iocb, int rw, aio_rw_op *rw_op)
 910{
 911        struct file *file = iocb->ki_filp;
 912        struct address_space *mapping = file->f_mapping;
 913        struct inode *inode = mapping->host;
 914        ssize_t ret = 0;
 915
 916        /* This matches the pread()/pwrite() logic */
 917        if (iocb->ki_pos < 0)
 918                return -EINVAL;
 919
 920        if (rw == WRITE)
 921                file_start_write(file);
 922        do {
 923                ret = rw_op(iocb, &iocb->ki_iovec[iocb->ki_cur_seg],
 924                            iocb->ki_nr_segs - iocb->ki_cur_seg,
 925                            iocb->ki_pos);
 926                if (ret > 0)
 927                        aio_advance_iovec(iocb, ret);
 928
 929        /* retry all partial writes.  retry partial reads as long as its a
 930         * regular file. */
 931        } while (ret > 0 && iocb->ki_left > 0 &&
 932                 (rw == WRITE ||
 933                  (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))));
 934        if (rw == WRITE)
 935                file_end_write(file);
 936
 937        /* This means we must have transferred all that we could */
 938        /* No need to retry anymore */
 939        if ((ret == 0) || (iocb->ki_left == 0))
 940                ret = iocb->ki_nbytes - iocb->ki_left;
 941
 942        /* If we managed to write some out we return that, rather than
 943         * the eventual error. */
 944        if (rw == WRITE
 945            && ret < 0 && ret != -EIOCBQUEUED
 946            && iocb->ki_nbytes - iocb->ki_left)
 947                ret = iocb->ki_nbytes - iocb->ki_left;
 948
 949        return ret;
 950}
 951
 952static ssize_t aio_setup_vectored_rw(int rw, struct kiocb *kiocb, bool compat)
 953{
 954        ssize_t ret;
 955
 956        kiocb->ki_nr_segs = kiocb->ki_nbytes;
 957
 958#ifdef CONFIG_COMPAT
 959        if (compat)
 960                ret = compat_rw_copy_check_uvector(rw,
 961                                (struct compat_iovec __user *)kiocb->ki_buf,
 962                                kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec,
 963                                &kiocb->ki_iovec);
 964        else
 965#endif
 966                ret = rw_copy_check_uvector(rw,
 967                                (struct iovec __user *)kiocb->ki_buf,
 968                                kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec,
 969                                &kiocb->ki_iovec);
 970        if (ret < 0)
 971                return ret;
 972
 973        /* ki_nbytes now reflect bytes instead of segs */
 974        kiocb->ki_nbytes = ret;
 975        return 0;
 976}
 977
 978static ssize_t aio_setup_single_vector(int rw, struct kiocb *kiocb)
 979{
 980        if (unlikely(!access_ok(!rw, kiocb->ki_buf, kiocb->ki_nbytes)))
 981                return -EFAULT;
 982
 983        kiocb->ki_iovec = &kiocb->ki_inline_vec;
 984        kiocb->ki_iovec->iov_base = kiocb->ki_buf;
 985        kiocb->ki_iovec->iov_len = kiocb->ki_nbytes;
 986        kiocb->ki_nr_segs = 1;
 987        return 0;
 988}
 989
 990/*
 991 * aio_setup_iocb:
 992 *      Performs the initial checks and aio retry method
 993 *      setup for the kiocb at the time of io submission.
 994 */
 995static ssize_t aio_run_iocb(struct kiocb *req, bool compat)
 996{
 997        struct file *file = req->ki_filp;
 998        ssize_t ret;
 999        int rw;
1000        fmode_t mode;
1001        aio_rw_op *rw_op;
1002
1003        switch (req->ki_opcode) {
1004        case IOCB_CMD_PREAD:
1005        case IOCB_CMD_PREADV:
1006                mode    = FMODE_READ;
1007                rw      = READ;
1008                rw_op   = file->f_op->aio_read;
1009                goto rw_common;
1010
1011        case IOCB_CMD_PWRITE:
1012        case IOCB_CMD_PWRITEV:
1013                mode    = FMODE_WRITE;
1014                rw      = WRITE;
1015                rw_op   = file->f_op->aio_write;
1016                goto rw_common;
1017rw_common:
1018                if (unlikely(!(file->f_mode & mode)))
1019                        return -EBADF;
1020
1021                if (!rw_op)
1022                        return -EINVAL;
1023
1024                ret = (req->ki_opcode == IOCB_CMD_PREADV ||
1025                       req->ki_opcode == IOCB_CMD_PWRITEV)
1026                        ? aio_setup_vectored_rw(rw, req, compat)
1027                        : aio_setup_single_vector(rw, req);
1028                if (ret)
1029                        return ret;
1030
1031                ret = rw_verify_area(rw, file, &req->ki_pos, req->ki_nbytes);
1032                if (ret < 0)
1033                        return ret;
1034
1035                req->ki_nbytes = ret;
1036                req->ki_left = ret;
1037
1038                ret = aio_rw_vect_retry(req, rw, rw_op);
1039                break;
1040
1041        case IOCB_CMD_FDSYNC:
1042                if (!file->f_op->aio_fsync)
1043                        return -EINVAL;
1044
1045                ret = file->f_op->aio_fsync(req, 1);
1046                break;
1047
1048        case IOCB_CMD_FSYNC:
1049                if (!file->f_op->aio_fsync)
1050                        return -EINVAL;
1051
1052                ret = file->f_op->aio_fsync(req, 0);
1053                break;
1054
1055        default:
1056                pr_debug("EINVAL: no operation provided\n");
1057                return -EINVAL;
1058        }
1059
1060        if (ret != -EIOCBQUEUED) {
1061                /*
1062                 * There's no easy way to restart the syscall since other AIO's
1063                 * may be already running. Just fail this IO with EINTR.
1064                 */
1065                if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
1066                             ret == -ERESTARTNOHAND ||
1067                             ret == -ERESTART_RESTARTBLOCK))
1068                        ret = -EINTR;
1069                aio_complete(req, ret, 0);
1070        }
1071
1072        return 0;
1073}
1074
1075static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1076                         struct iocb *iocb, bool compat)
1077{
1078        struct kiocb *req;
1079        ssize_t ret;
1080
1081        /* enforce forwards compatibility on users */
1082        if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) {
1083                pr_debug("EINVAL: reserve field set\n");
1084                return -EINVAL;
1085        }
1086
1087        /* prevent overflows */
1088        if (unlikely(
1089            (iocb->aio_buf != (unsigned long)iocb->aio_buf) ||
1090            (iocb->aio_nbytes != (size_t)iocb->aio_nbytes) ||
1091            ((ssize_t)iocb->aio_nbytes < 0)
1092           )) {
1093                pr_debug("EINVAL: io_submit: overflow check\n");
1094                return -EINVAL;
1095        }
1096
1097        req = aio_get_req(ctx);
1098        if (unlikely(!req))
1099                return -EAGAIN;
1100
1101        req->ki_filp = fget(iocb->aio_fildes);
1102        if (unlikely(!req->ki_filp)) {
1103                ret = -EBADF;
1104                goto out_put_req;
1105        }
1106
1107        if (iocb->aio_flags & IOCB_FLAG_RESFD) {
1108                /*
1109                 * If the IOCB_FLAG_RESFD flag of aio_flags is set, get an
1110                 * instance of the file* now. The file descriptor must be
1111                 * an eventfd() fd, and will be signaled for each completed
1112                 * event using the eventfd_signal() function.
1113                 */
1114                req->ki_eventfd = eventfd_ctx_fdget((int) iocb->aio_resfd);
1115                if (IS_ERR(req->ki_eventfd)) {
1116                        ret = PTR_ERR(req->ki_eventfd);
1117                        req->ki_eventfd = NULL;
1118                        goto out_put_req;
1119                }
1120        }
1121
1122        ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
1123        if (unlikely(ret)) {
1124                pr_debug("EFAULT: aio_key\n");
1125                goto out_put_req;
1126        }
1127
1128        req->ki_obj.user = user_iocb;
1129        req->ki_user_data = iocb->aio_data;
1130        req->ki_pos = iocb->aio_offset;
1131
1132        req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf;
1133        req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
1134        req->ki_opcode = iocb->aio_lio_opcode;
1135
1136        ret = aio_run_iocb(req, compat);
1137        if (ret)
1138                goto out_put_req;
1139
1140        aio_put_req(req);       /* drop extra ref to req */
1141        return 0;
1142out_put_req:
1143        atomic_dec(&ctx->reqs_active);
1144        aio_put_req(req);       /* drop extra ref to req */
1145        aio_put_req(req);       /* drop i/o ref to req */
1146        return ret;
1147}
1148
1149long do_io_submit(aio_context_t ctx_id, long nr,
1150                  struct iocb __user *__user *iocbpp, bool compat)
1151{
1152        struct kioctx *ctx;
1153        long ret = 0;
1154        int i = 0;
1155        struct blk_plug plug;
1156
1157        if (unlikely(nr < 0))
1158                return -EINVAL;
1159
1160        if (unlikely(nr > LONG_MAX/sizeof(*iocbpp)))
1161                nr = LONG_MAX/sizeof(*iocbpp);
1162
1163        if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp)))))
1164                return -EFAULT;
1165
1166        ctx = lookup_ioctx(ctx_id);
1167        if (unlikely(!ctx)) {
1168                pr_debug("EINVAL: invalid context id\n");
1169                return -EINVAL;
1170        }
1171
1172        blk_start_plug(&plug);
1173
1174        /*
1175         * AKPM: should this return a partial result if some of the IOs were
1176         * successfully submitted?
1177         */
1178        for (i=0; i<nr; i++) {
1179                struct iocb __user *user_iocb;
1180                struct iocb tmp;
1181
1182                if (unlikely(__get_user(user_iocb, iocbpp + i))) {
1183                        ret = -EFAULT;
1184                        break;
1185                }
1186
1187                if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp)))) {
1188                        ret = -EFAULT;
1189                        break;
1190                }
1191
1192                ret = io_submit_one(ctx, user_iocb, &tmp, compat);
1193                if (ret)
1194                        break;
1195        }
1196        blk_finish_plug(&plug);
1197
1198        put_ioctx(ctx);
1199        return i ? i : ret;
1200}
1201
1202/* sys_io_submit:
1203 *      Queue the nr iocbs pointed to by iocbpp for processing.  Returns
1204 *      the number of iocbs queued.  May return -EINVAL if the aio_context
1205 *      specified by ctx_id is invalid, if nr is < 0, if the iocb at
1206 *      *iocbpp[0] is not properly initialized, if the operation specified
1207 *      is invalid for the file descriptor in the iocb.  May fail with
1208 *      -EFAULT if any of the data structures point to invalid data.  May
1209 *      fail with -EBADF if the file descriptor specified in the first
1210 *      iocb is invalid.  May fail with -EAGAIN if insufficient resources
1211 *      are available to queue any iocbs.  Will return 0 if nr is 0.  Will
1212 *      fail with -ENOSYS if not implemented.
1213 */
1214SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
1215                struct iocb __user * __user *, iocbpp)
1216{
1217        return do_io_submit(ctx_id, nr, iocbpp, 0);
1218}
1219
1220/* lookup_kiocb
1221 *      Finds a given iocb for cancellation.
1222 */
1223static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
1224                                  u32 key)
1225{
1226        struct list_head *pos;
1227
1228        assert_spin_locked(&ctx->ctx_lock);
1229
1230        if (key != KIOCB_KEY)
1231                return NULL;
1232
1233        /* TODO: use a hash or array, this sucks. */
1234        list_for_each(pos, &ctx->active_reqs) {
1235                struct kiocb *kiocb = list_kiocb(pos);
1236                if (kiocb->ki_obj.user == iocb)
1237                        return kiocb;
1238        }
1239        return NULL;
1240}
1241
1242/* sys_io_cancel:
1243 *      Attempts to cancel an iocb previously passed to io_submit.  If
1244 *      the operation is successfully cancelled, the resulting event is
1245 *      copied into the memory pointed to by result without being placed
1246 *      into the completion queue and 0 is returned.  May fail with
1247 *      -EFAULT if any of the data structures pointed to are invalid.
1248 *      May fail with -EINVAL if aio_context specified by ctx_id is
1249 *      invalid.  May fail with -EAGAIN if the iocb specified was not
1250 *      cancelled.  Will fail with -ENOSYS if not implemented.
1251 */
1252SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
1253                struct io_event __user *, result)
1254{
1255        struct io_event res;
1256        struct kioctx *ctx;
1257        struct kiocb *kiocb;
1258        u32 key;
1259        int ret;
1260
1261        ret = get_user(key, &iocb->aio_key);
1262        if (unlikely(ret))
1263                return -EFAULT;
1264
1265        ctx = lookup_ioctx(ctx_id);
1266        if (unlikely(!ctx))
1267                return -EINVAL;
1268
1269        spin_lock_irq(&ctx->ctx_lock);
1270
1271        kiocb = lookup_kiocb(ctx, iocb, key);
1272        if (kiocb)
1273                ret = kiocb_cancel(ctx, kiocb, &res);
1274        else
1275                ret = -EINVAL;
1276
1277        spin_unlock_irq(&ctx->ctx_lock);
1278
1279        if (!ret) {
1280                /* Cancellation succeeded -- copy the result
1281                 * into the user's buffer.
1282                 */
1283                if (copy_to_user(result, &res, sizeof(res)))
1284                        ret = -EFAULT;
1285        }
1286
1287        put_ioctx(ctx);
1288
1289        return ret;
1290}
1291
1292/* io_getevents:
1293 *      Attempts to read at least min_nr events and up to nr events from
1294 *      the completion queue for the aio_context specified by ctx_id. If
1295 *      it succeeds, the number of read events is returned. May fail with
1296 *      -EINVAL if ctx_id is invalid, if min_nr is out of range, if nr is
1297 *      out of range, if timeout is out of range.  May fail with -EFAULT
1298 *      if any of the memory specified is invalid.  May return 0 or
1299 *      < min_nr if the timeout specified by timeout has elapsed
1300 *      before sufficient events are available, where timeout == NULL
1301 *      specifies an infinite timeout. Note that the timeout pointed to by
1302 *      timeout is relative.  Will fail with -ENOSYS if not implemented.
1303 */
1304SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
1305                long, min_nr,
1306                long, nr,
1307                struct io_event __user *, events,
1308                struct timespec __user *, timeout)
1309{
1310        struct kioctx *ioctx = lookup_ioctx(ctx_id);
1311        long ret = -EINVAL;
1312
1313        if (likely(ioctx)) {
1314                if (likely(min_nr <= nr && min_nr >= 0))
1315                        ret = read_events(ioctx, min_nr, nr, events, timeout);
1316                put_ioctx(ioctx);
1317        }
1318        return ret;
1319}
1320