linux/fs/nfs/direct.c
<<
>>
Prefs
   1/*
   2 * linux/fs/nfs/direct.c
   3 *
   4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
   5 *
   6 * High-performance uncached I/O for the Linux NFS client
   7 *
   8 * There are important applications whose performance or correctness
   9 * depends on uncached access to file data.  Database clusters
  10 * (multiple copies of the same instance running on separate hosts)
  11 * implement their own cache coherency protocol that subsumes file
  12 * system cache protocols.  Applications that process datasets
  13 * considerably larger than the client's memory do not always benefit
  14 * from a local cache.  A streaming video server, for instance, has no
  15 * need to cache the contents of a file.
  16 *
  17 * When an application requests uncached I/O, all read and write requests
  18 * are made directly to the server; data stored or fetched via these
  19 * requests is not cached in the Linux page cache.  The client does not
  20 * correct unaligned requests from applications.  All requested bytes are
  21 * held on permanent storage before a direct write system call returns to
  22 * an application.
  23 *
  24 * Solaris implements an uncached I/O facility called directio() that
  25 * is used for backups and sequential I/O to very large files.  Solaris
  26 * also supports uncaching whole NFS partitions with "-o forcedirectio,"
  27 * an undocumented mount option.
  28 *
  29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
  30 * help from Andrew Morton.
  31 *
  32 * 18 Dec 2001  Initial implementation for 2.4  --cel
  33 * 08 Jul 2002  Version for 2.4.19, with bug fixes --trondmy
  34 * 08 Jun 2003  Port to 2.5 APIs  --cel
  35 * 31 Mar 2004  Handle direct I/O without VFS support  --cel
  36 * 15 Sep 2004  Parallel async reads  --cel
  37 * 04 May 2005  support O_DIRECT with aio  --cel
  38 *
  39 */
  40
  41#include <linux/errno.h>
  42#include <linux/sched.h>
  43#include <linux/kernel.h>
  44#include <linux/file.h>
  45#include <linux/pagemap.h>
  46#include <linux/kref.h>
  47#include <linux/slab.h>
  48#include <linux/task_io_accounting_ops.h>
  49#include <linux/module.h>
  50
  51#include <linux/nfs_fs.h>
  52#include <linux/nfs_page.h>
  53#include <linux/sunrpc/clnt.h>
  54
  55#include <asm/uaccess.h>
  56#include <linux/atomic.h>
  57
  58#include "internal.h"
  59#include "iostat.h"
  60#include "pnfs.h"
  61
  62#define NFSDBG_FACILITY         NFSDBG_VFS
  63
  64static struct kmem_cache *nfs_direct_cachep;
  65
  66/*
  67 * This represents a set of asynchronous requests that we're waiting on
  68 */
  69struct nfs_direct_req {
  70        struct kref             kref;           /* release manager */
  71
  72        /* I/O parameters */
  73        struct nfs_open_context *ctx;           /* file open context info */
  74        struct nfs_lock_context *l_ctx;         /* Lock context info */
  75        struct kiocb *          iocb;           /* controlling i/o request */
  76        struct inode *          inode;          /* target file of i/o */
  77
  78        /* completion state */
  79        atomic_t                io_count;       /* i/os we're waiting for */
  80        spinlock_t              lock;           /* protect completion state */
  81        ssize_t                 count,          /* bytes actually processed */
  82                                bytes_left,     /* bytes left to be sent */
  83                                error;          /* any reported error */
  84        struct completion       completion;     /* wait for i/o completion */
  85
  86        /* commit state */
  87        struct nfs_mds_commit_info mds_cinfo;   /* Storage for cinfo */
  88        struct pnfs_ds_commit_info ds_cinfo;    /* Storage for cinfo */
  89        struct work_struct      work;
  90        int                     flags;
  91#define NFS_ODIRECT_DO_COMMIT           (1)     /* an unstable reply was received */
  92#define NFS_ODIRECT_RESCHED_WRITES      (2)     /* write verification failed */
  93        struct nfs_writeverf    verf;           /* unstable write verifier */
  94};
  95
  96static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
  97static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
  98static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
  99static void nfs_direct_write_schedule_work(struct work_struct *work);
 100
 101static inline void get_dreq(struct nfs_direct_req *dreq)
 102{
 103        atomic_inc(&dreq->io_count);
 104}
 105
 106static inline int put_dreq(struct nfs_direct_req *dreq)
 107{
 108        return atomic_dec_and_test(&dreq->io_count);
 109}
 110
 111/**
 112 * nfs_direct_IO - NFS address space operation for direct I/O
 113 * @rw: direction (read or write)
 114 * @iocb: target I/O control block
 115 * @iov: array of vectors that define I/O buffer
 116 * @pos: offset in file to begin the operation
 117 * @nr_segs: size of iovec array
 118 *
 119 * The presence of this routine in the address space ops vector means
 120 * the NFS client supports direct I/O. However, for most direct IO, we
 121 * shunt off direct read and write requests before the VFS gets them,
 122 * so this method is only ever called for swap.
 123 */
 124ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
 125{
 126#ifndef CONFIG_NFS_SWAP
 127        dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
 128                        iocb->ki_filp->f_path.dentry->d_name.name,
 129                        (long long) pos, nr_segs);
 130
 131        return -EINVAL;
 132#else
 133        VM_BUG_ON(iocb->ki_left != PAGE_SIZE);
 134        VM_BUG_ON(iocb->ki_nbytes != PAGE_SIZE);
 135
 136        if (rw == READ || rw == KERNEL_READ)
 137                return nfs_file_direct_read(iocb, iov, nr_segs, pos,
 138                                rw == READ ? true : false);
 139        return nfs_file_direct_write(iocb, iov, nr_segs, pos,
 140                                rw == WRITE ? true : false);
 141#endif /* CONFIG_NFS_SWAP */
 142}
 143
 144static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
 145{
 146        unsigned int i;
 147        for (i = 0; i < npages; i++)
 148                page_cache_release(pages[i]);
 149}
 150
 151void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
 152                              struct nfs_direct_req *dreq)
 153{
 154        cinfo->lock = &dreq->lock;
 155        cinfo->mds = &dreq->mds_cinfo;
 156        cinfo->ds = &dreq->ds_cinfo;
 157        cinfo->dreq = dreq;
 158        cinfo->completion_ops = &nfs_direct_commit_completion_ops;
 159}
 160
 161static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
 162{
 163        struct nfs_direct_req *dreq;
 164
 165        dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
 166        if (!dreq)
 167                return NULL;
 168
 169        kref_init(&dreq->kref);
 170        kref_get(&dreq->kref);
 171        init_completion(&dreq->completion);
 172        INIT_LIST_HEAD(&dreq->mds_cinfo.list);
 173        INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
 174        spin_lock_init(&dreq->lock);
 175
 176        return dreq;
 177}
 178
 179static void nfs_direct_req_free(struct kref *kref)
 180{
 181        struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
 182
 183        if (dreq->l_ctx != NULL)
 184                nfs_put_lock_context(dreq->l_ctx);
 185        if (dreq->ctx != NULL)
 186                put_nfs_open_context(dreq->ctx);
 187        kmem_cache_free(nfs_direct_cachep, dreq);
 188}
 189
 190static void nfs_direct_req_release(struct nfs_direct_req *dreq)
 191{
 192        kref_put(&dreq->kref, nfs_direct_req_free);
 193}
 194
 195ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq)
 196{
 197        return dreq->bytes_left;
 198}
 199EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
 200
 201/*
 202 * Collects and returns the final error value/byte-count.
 203 */
 204static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
 205{
 206        ssize_t result = -EIOCBQUEUED;
 207
 208        /* Async requests don't wait here */
 209        if (dreq->iocb)
 210                goto out;
 211
 212        result = wait_for_completion_killable(&dreq->completion);
 213
 214        if (!result)
 215                result = dreq->error;
 216        if (!result)
 217                result = dreq->count;
 218
 219out:
 220        return (ssize_t) result;
 221}
 222
 223/*
 224 * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
 225 * the iocb is still valid here if this is a synchronous request.
 226 */
 227static void nfs_direct_complete(struct nfs_direct_req *dreq)
 228{
 229        if (dreq->iocb) {
 230                long res = (long) dreq->error;
 231                if (!res)
 232                        res = (long) dreq->count;
 233                aio_complete(dreq->iocb, res, 0);
 234        }
 235        complete_all(&dreq->completion);
 236
 237        nfs_direct_req_release(dreq);
 238}
 239
 240static void nfs_direct_readpage_release(struct nfs_page *req)
 241{
 242        dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
 243                req->wb_context->dentry->d_inode->i_sb->s_id,
 244                (long long)NFS_FILEID(req->wb_context->dentry->d_inode),
 245                req->wb_bytes,
 246                (long long)req_offset(req));
 247        nfs_release_request(req);
 248}
 249
 250static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
 251{
 252        unsigned long bytes = 0;
 253        struct nfs_direct_req *dreq = hdr->dreq;
 254
 255        if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
 256                goto out_put;
 257
 258        spin_lock(&dreq->lock);
 259        if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
 260                dreq->error = hdr->error;
 261        else
 262                dreq->count += hdr->good_bytes;
 263        spin_unlock(&dreq->lock);
 264
 265        while (!list_empty(&hdr->pages)) {
 266                struct nfs_page *req = nfs_list_entry(hdr->pages.next);
 267                struct page *page = req->wb_page;
 268
 269                if (!PageCompound(page) && bytes < hdr->good_bytes)
 270                        set_page_dirty(page);
 271                bytes += req->wb_bytes;
 272                nfs_list_remove_request(req);
 273                nfs_direct_readpage_release(req);
 274        }
 275out_put:
 276        if (put_dreq(dreq))
 277                nfs_direct_complete(dreq);
 278        hdr->release(hdr);
 279}
 280
 281static void nfs_read_sync_pgio_error(struct list_head *head)
 282{
 283        struct nfs_page *req;
 284
 285        while (!list_empty(head)) {
 286                req = nfs_list_entry(head->next);
 287                nfs_list_remove_request(req);
 288                nfs_release_request(req);
 289        }
 290}
 291
 292static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
 293{
 294        get_dreq(hdr->dreq);
 295}
 296
 297static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
 298        .error_cleanup = nfs_read_sync_pgio_error,
 299        .init_hdr = nfs_direct_pgio_init,
 300        .completion = nfs_direct_read_completion,
 301};
 302
 303/*
 304 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
 305 * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
 306 * bail and stop sending more reads.  Read length accounting is
 307 * handled automatically by nfs_direct_read_result().  Otherwise, if
 308 * no requests have been sent, just return an error.
 309 */
 310static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
 311                                                const struct iovec *iov,
 312                                                loff_t pos, bool uio)
 313{
 314        struct nfs_direct_req *dreq = desc->pg_dreq;
 315        struct nfs_open_context *ctx = dreq->ctx;
 316        struct inode *inode = ctx->dentry->d_inode;
 317        unsigned long user_addr = (unsigned long)iov->iov_base;
 318        size_t count = iov->iov_len;
 319        size_t rsize = NFS_SERVER(inode)->rsize;
 320        unsigned int pgbase;
 321        int result;
 322        ssize_t started = 0;
 323        struct page **pagevec = NULL;
 324        unsigned int npages;
 325
 326        do {
 327                size_t bytes;
 328                int i;
 329
 330                pgbase = user_addr & ~PAGE_MASK;
 331                bytes = min(max_t(size_t, rsize, PAGE_SIZE), count);
 332
 333                result = -ENOMEM;
 334                npages = nfs_page_array_len(pgbase, bytes);
 335                if (!pagevec)
 336                        pagevec = kmalloc(npages * sizeof(struct page *),
 337                                          GFP_KERNEL);
 338                if (!pagevec)
 339                        break;
 340                if (uio) {
 341                        down_read(&current->mm->mmap_sem);
 342                        result = get_user_pages(current, current->mm, user_addr,
 343                                        npages, 1, 0, pagevec, NULL);
 344                        up_read(&current->mm->mmap_sem);
 345                        if (result < 0)
 346                                break;
 347                } else {
 348                        WARN_ON(npages != 1);
 349                        result = get_kernel_page(user_addr, 1, pagevec);
 350                        if (WARN_ON(result != 1))
 351                                break;
 352                }
 353
 354                if ((unsigned)result < npages) {
 355                        bytes = result * PAGE_SIZE;
 356                        if (bytes <= pgbase) {
 357                                nfs_direct_release_pages(pagevec, result);
 358                                break;
 359                        }
 360                        bytes -= pgbase;
 361                        npages = result;
 362                }
 363
 364                for (i = 0; i < npages; i++) {
 365                        struct nfs_page *req;
 366                        unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
 367                        /* XXX do we need to do the eof zeroing found in async_filler? */
 368                        req = nfs_create_request(dreq->ctx, dreq->inode,
 369                                                 pagevec[i],
 370                                                 pgbase, req_len);
 371                        if (IS_ERR(req)) {
 372                                result = PTR_ERR(req);
 373                                break;
 374                        }
 375                        req->wb_index = pos >> PAGE_SHIFT;
 376                        req->wb_offset = pos & ~PAGE_MASK;
 377                        if (!nfs_pageio_add_request(desc, req)) {
 378                                result = desc->pg_error;
 379                                nfs_release_request(req);
 380                                break;
 381                        }
 382                        pgbase = 0;
 383                        bytes -= req_len;
 384                        started += req_len;
 385                        user_addr += req_len;
 386                        pos += req_len;
 387                        count -= req_len;
 388                        dreq->bytes_left -= req_len;
 389                }
 390                /* The nfs_page now hold references to these pages */
 391                nfs_direct_release_pages(pagevec, npages);
 392        } while (count != 0 && result >= 0);
 393
 394        kfree(pagevec);
 395
 396        if (started)
 397                return started;
 398        return result < 0 ? (ssize_t) result : -EFAULT;
 399}
 400
 401static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
 402                                              const struct iovec *iov,
 403                                              unsigned long nr_segs,
 404                                              loff_t pos, bool uio)
 405{
 406        struct nfs_pageio_descriptor desc;
 407        ssize_t result = -EINVAL;
 408        size_t requested_bytes = 0;
 409        unsigned long seg;
 410
 411        NFS_PROTO(dreq->inode)->read_pageio_init(&desc, dreq->inode,
 412                             &nfs_direct_read_completion_ops);
 413        get_dreq(dreq);
 414        desc.pg_dreq = dreq;
 415
 416        for (seg = 0; seg < nr_segs; seg++) {
 417                const struct iovec *vec = &iov[seg];
 418                result = nfs_direct_read_schedule_segment(&desc, vec, pos, uio);
 419                if (result < 0)
 420                        break;
 421                requested_bytes += result;
 422                if ((size_t)result < vec->iov_len)
 423                        break;
 424                pos += vec->iov_len;
 425        }
 426
 427        nfs_pageio_complete(&desc);
 428
 429        /*
 430         * If no bytes were started, return the error, and let the
 431         * generic layer handle the completion.
 432         */
 433        if (requested_bytes == 0) {
 434                nfs_direct_req_release(dreq);
 435                return result < 0 ? result : -EIO;
 436        }
 437
 438        if (put_dreq(dreq))
 439                nfs_direct_complete(dreq);
 440        return 0;
 441}
 442
 443static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
 444                               unsigned long nr_segs, loff_t pos, bool uio)
 445{
 446        ssize_t result = -ENOMEM;
 447        struct inode *inode = iocb->ki_filp->f_mapping->host;
 448        struct nfs_direct_req *dreq;
 449        struct nfs_lock_context *l_ctx;
 450
 451        dreq = nfs_direct_req_alloc();
 452        if (dreq == NULL)
 453                goto out;
 454
 455        dreq->inode = inode;
 456        dreq->bytes_left = iov_length(iov, nr_segs);
 457        dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
 458        l_ctx = nfs_get_lock_context(dreq->ctx);
 459        if (IS_ERR(l_ctx)) {
 460                result = PTR_ERR(l_ctx);
 461                goto out_release;
 462        }
 463        dreq->l_ctx = l_ctx;
 464        if (!is_sync_kiocb(iocb))
 465                dreq->iocb = iocb;
 466
 467        NFS_I(inode)->read_io += iov_length(iov, nr_segs);
 468        result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos, uio);
 469        if (!result)
 470                result = nfs_direct_wait(dreq);
 471out_release:
 472        nfs_direct_req_release(dreq);
 473out:
 474        return result;
 475}
 476
 477static void nfs_inode_dio_write_done(struct inode *inode)
 478{
 479        nfs_zap_mapping(inode, inode->i_mapping);
 480        inode_dio_done(inode);
 481}
 482
 483#if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
 484static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
 485{
 486        struct nfs_pageio_descriptor desc;
 487        struct nfs_page *req, *tmp;
 488        LIST_HEAD(reqs);
 489        struct nfs_commit_info cinfo;
 490        LIST_HEAD(failed);
 491
 492        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 493        pnfs_recover_commit_reqs(dreq->inode, &reqs, &cinfo);
 494        spin_lock(cinfo.lock);
 495        nfs_scan_commit_list(&cinfo.mds->list, &reqs, &cinfo, 0);
 496        spin_unlock(cinfo.lock);
 497
 498        dreq->count = 0;
 499        get_dreq(dreq);
 500
 501        NFS_PROTO(dreq->inode)->write_pageio_init(&desc, dreq->inode, FLUSH_STABLE,
 502                              &nfs_direct_write_completion_ops);
 503        desc.pg_dreq = dreq;
 504
 505        list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
 506                if (!nfs_pageio_add_request(&desc, req)) {
 507                        nfs_list_remove_request(req);
 508                        nfs_list_add_request(req, &failed);
 509                        spin_lock(cinfo.lock);
 510                        dreq->flags = 0;
 511                        dreq->error = -EIO;
 512                        spin_unlock(cinfo.lock);
 513                }
 514                nfs_release_request(req);
 515        }
 516        nfs_pageio_complete(&desc);
 517
 518        while (!list_empty(&failed)) {
 519                req = nfs_list_entry(failed.next);
 520                nfs_list_remove_request(req);
 521                nfs_unlock_and_release_request(req);
 522        }
 523
 524        if (put_dreq(dreq))
 525                nfs_direct_write_complete(dreq, dreq->inode);
 526}
 527
 528static void nfs_direct_commit_complete(struct nfs_commit_data *data)
 529{
 530        struct nfs_direct_req *dreq = data->dreq;
 531        struct nfs_commit_info cinfo;
 532        struct nfs_page *req;
 533        int status = data->task.tk_status;
 534
 535        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 536        if (status < 0) {
 537                dprintk("NFS: %5u commit failed with error %d.\n",
 538                        data->task.tk_pid, status);
 539                dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 540        } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
 541                dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid);
 542                dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 543        }
 544
 545        dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status);
 546        while (!list_empty(&data->pages)) {
 547                req = nfs_list_entry(data->pages.next);
 548                nfs_list_remove_request(req);
 549                if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
 550                        /* Note the rewrite will go through mds */
 551                        nfs_mark_request_commit(req, NULL, &cinfo);
 552                } else
 553                        nfs_release_request(req);
 554                nfs_unlock_and_release_request(req);
 555        }
 556
 557        if (atomic_dec_and_test(&cinfo.mds->rpcs_out))
 558                nfs_direct_write_complete(dreq, data->inode);
 559}
 560
 561static void nfs_direct_error_cleanup(struct nfs_inode *nfsi)
 562{
 563        /* There is no lock to clear */
 564}
 565
 566static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
 567        .completion = nfs_direct_commit_complete,
 568        .error_cleanup = nfs_direct_error_cleanup,
 569};
 570
 571static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
 572{
 573        int res;
 574        struct nfs_commit_info cinfo;
 575        LIST_HEAD(mds_list);
 576
 577        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 578        nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
 579        res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
 580        if (res < 0) /* res == -ENOMEM */
 581                nfs_direct_write_reschedule(dreq);
 582}
 583
 584static void nfs_direct_write_schedule_work(struct work_struct *work)
 585{
 586        struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
 587        int flags = dreq->flags;
 588
 589        dreq->flags = 0;
 590        switch (flags) {
 591                case NFS_ODIRECT_DO_COMMIT:
 592                        nfs_direct_commit_schedule(dreq);
 593                        break;
 594                case NFS_ODIRECT_RESCHED_WRITES:
 595                        nfs_direct_write_reschedule(dreq);
 596                        break;
 597                default:
 598                        nfs_inode_dio_write_done(dreq->inode);
 599                        nfs_direct_complete(dreq);
 600        }
 601}
 602
 603static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
 604{
 605        schedule_work(&dreq->work); /* Calls nfs_direct_write_schedule_work */
 606}
 607
 608#else
 609static void nfs_direct_write_schedule_work(struct work_struct *work)
 610{
 611}
 612
 613static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
 614{
 615        nfs_inode_dio_write_done(inode);
 616        nfs_direct_complete(dreq);
 617}
 618#endif
 619
 620/*
 621 * NB: Return the value of the first error return code.  Subsequent
 622 *     errors after the first one are ignored.
 623 */
 624/*
 625 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
 626 * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
 627 * bail and stop sending more writes.  Write length accounting is
 628 * handled automatically by nfs_direct_write_result().  Otherwise, if
 629 * no requests have been sent, just return an error.
 630 */
 631static ssize_t nfs_direct_write_schedule_segment(struct nfs_pageio_descriptor *desc,
 632                                                 const struct iovec *iov,
 633                                                 loff_t pos, bool uio)
 634{
 635        struct nfs_direct_req *dreq = desc->pg_dreq;
 636        struct nfs_open_context *ctx = dreq->ctx;
 637        struct inode *inode = ctx->dentry->d_inode;
 638        unsigned long user_addr = (unsigned long)iov->iov_base;
 639        size_t count = iov->iov_len;
 640        size_t wsize = NFS_SERVER(inode)->wsize;
 641        unsigned int pgbase;
 642        int result;
 643        ssize_t started = 0;
 644        struct page **pagevec = NULL;
 645        unsigned int npages;
 646
 647        do {
 648                size_t bytes;
 649                int i;
 650
 651                pgbase = user_addr & ~PAGE_MASK;
 652                bytes = min(max_t(size_t, wsize, PAGE_SIZE), count);
 653
 654                result = -ENOMEM;
 655                npages = nfs_page_array_len(pgbase, bytes);
 656                if (!pagevec)
 657                        pagevec = kmalloc(npages * sizeof(struct page *), GFP_KERNEL);
 658                if (!pagevec)
 659                        break;
 660
 661                if (uio) {
 662                        down_read(&current->mm->mmap_sem);
 663                        result = get_user_pages(current, current->mm, user_addr,
 664                                                npages, 0, 0, pagevec, NULL);
 665                        up_read(&current->mm->mmap_sem);
 666                        if (result < 0)
 667                                break;
 668                } else {
 669                        WARN_ON(npages != 1);
 670                        result = get_kernel_page(user_addr, 0, pagevec);
 671                        if (WARN_ON(result != 1))
 672                                break;
 673                }
 674
 675                if ((unsigned)result < npages) {
 676                        bytes = result * PAGE_SIZE;
 677                        if (bytes <= pgbase) {
 678                                nfs_direct_release_pages(pagevec, result);
 679                                break;
 680                        }
 681                        bytes -= pgbase;
 682                        npages = result;
 683                }
 684
 685                for (i = 0; i < npages; i++) {
 686                        struct nfs_page *req;
 687                        unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
 688
 689                        req = nfs_create_request(dreq->ctx, dreq->inode,
 690                                                 pagevec[i],
 691                                                 pgbase, req_len);
 692                        if (IS_ERR(req)) {
 693                                result = PTR_ERR(req);
 694                                break;
 695                        }
 696                        nfs_lock_request(req);
 697                        req->wb_index = pos >> PAGE_SHIFT;
 698                        req->wb_offset = pos & ~PAGE_MASK;
 699                        if (!nfs_pageio_add_request(desc, req)) {
 700                                result = desc->pg_error;
 701                                nfs_unlock_and_release_request(req);
 702                                break;
 703                        }
 704                        pgbase = 0;
 705                        bytes -= req_len;
 706                        started += req_len;
 707                        user_addr += req_len;
 708                        pos += req_len;
 709                        count -= req_len;
 710                        dreq->bytes_left -= req_len;
 711                }
 712                /* The nfs_page now hold references to these pages */
 713                nfs_direct_release_pages(pagevec, npages);
 714        } while (count != 0 && result >= 0);
 715
 716        kfree(pagevec);
 717
 718        if (started)
 719                return started;
 720        return result < 0 ? (ssize_t) result : -EFAULT;
 721}
 722
 723static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
 724{
 725        struct nfs_direct_req *dreq = hdr->dreq;
 726        struct nfs_commit_info cinfo;
 727        int bit = -1;
 728        struct nfs_page *req = nfs_list_entry(hdr->pages.next);
 729
 730        if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
 731                goto out_put;
 732
 733        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 734
 735        spin_lock(&dreq->lock);
 736
 737        if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
 738                dreq->flags = 0;
 739                dreq->error = hdr->error;
 740        }
 741        if (dreq->error != 0)
 742                bit = NFS_IOHDR_ERROR;
 743        else {
 744                dreq->count += hdr->good_bytes;
 745                if (test_bit(NFS_IOHDR_NEED_RESCHED, &hdr->flags)) {
 746                        dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 747                        bit = NFS_IOHDR_NEED_RESCHED;
 748                } else if (test_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags)) {
 749                        if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
 750                                bit = NFS_IOHDR_NEED_RESCHED;
 751                        else if (dreq->flags == 0) {
 752                                memcpy(&dreq->verf, hdr->verf,
 753                                       sizeof(dreq->verf));
 754                                bit = NFS_IOHDR_NEED_COMMIT;
 755                                dreq->flags = NFS_ODIRECT_DO_COMMIT;
 756                        } else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
 757                                if (memcmp(&dreq->verf, hdr->verf, sizeof(dreq->verf))) {
 758                                        dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 759                                        bit = NFS_IOHDR_NEED_RESCHED;
 760                                } else
 761                                        bit = NFS_IOHDR_NEED_COMMIT;
 762                        }
 763                }
 764        }
 765        spin_unlock(&dreq->lock);
 766
 767        while (!list_empty(&hdr->pages)) {
 768                req = nfs_list_entry(hdr->pages.next);
 769                nfs_list_remove_request(req);
 770                switch (bit) {
 771                case NFS_IOHDR_NEED_RESCHED:
 772                case NFS_IOHDR_NEED_COMMIT:
 773                        kref_get(&req->wb_kref);
 774                        nfs_mark_request_commit(req, hdr->lseg, &cinfo);
 775                }
 776                nfs_unlock_and_release_request(req);
 777        }
 778
 779out_put:
 780        if (put_dreq(dreq))
 781                nfs_direct_write_complete(dreq, hdr->inode);
 782        hdr->release(hdr);
 783}
 784
 785static void nfs_write_sync_pgio_error(struct list_head *head)
 786{
 787        struct nfs_page *req;
 788
 789        while (!list_empty(head)) {
 790                req = nfs_list_entry(head->next);
 791                nfs_list_remove_request(req);
 792                nfs_unlock_and_release_request(req);
 793        }
 794}
 795
 796static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
 797        .error_cleanup = nfs_write_sync_pgio_error,
 798        .init_hdr = nfs_direct_pgio_init,
 799        .completion = nfs_direct_write_completion,
 800};
 801
 802static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
 803                                               const struct iovec *iov,
 804                                               unsigned long nr_segs,
 805                                               loff_t pos, bool uio)
 806{
 807        struct nfs_pageio_descriptor desc;
 808        struct inode *inode = dreq->inode;
 809        ssize_t result = 0;
 810        size_t requested_bytes = 0;
 811        unsigned long seg;
 812
 813        NFS_PROTO(inode)->write_pageio_init(&desc, inode, FLUSH_COND_STABLE,
 814                              &nfs_direct_write_completion_ops);
 815        desc.pg_dreq = dreq;
 816        get_dreq(dreq);
 817        atomic_inc(&inode->i_dio_count);
 818
 819        NFS_I(dreq->inode)->write_io += iov_length(iov, nr_segs);
 820        for (seg = 0; seg < nr_segs; seg++) {
 821                const struct iovec *vec = &iov[seg];
 822                result = nfs_direct_write_schedule_segment(&desc, vec, pos, uio);
 823                if (result < 0)
 824                        break;
 825                requested_bytes += result;
 826                if ((size_t)result < vec->iov_len)
 827                        break;
 828                pos += vec->iov_len;
 829        }
 830        nfs_pageio_complete(&desc);
 831
 832        /*
 833         * If no bytes were started, return the error, and let the
 834         * generic layer handle the completion.
 835         */
 836        if (requested_bytes == 0) {
 837                inode_dio_done(inode);
 838                nfs_direct_req_release(dreq);
 839                return result < 0 ? result : -EIO;
 840        }
 841
 842        if (put_dreq(dreq))
 843                nfs_direct_write_complete(dreq, dreq->inode);
 844        return 0;
 845}
 846
 847static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
 848                                unsigned long nr_segs, loff_t pos,
 849                                size_t count, bool uio)
 850{
 851        ssize_t result = -ENOMEM;
 852        struct inode *inode = iocb->ki_filp->f_mapping->host;
 853        struct nfs_direct_req *dreq;
 854        struct nfs_lock_context *l_ctx;
 855
 856        dreq = nfs_direct_req_alloc();
 857        if (!dreq)
 858                goto out;
 859
 860        dreq->inode = inode;
 861        dreq->bytes_left = count;
 862        dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
 863        l_ctx = nfs_get_lock_context(dreq->ctx);
 864        if (IS_ERR(l_ctx)) {
 865                result = PTR_ERR(l_ctx);
 866                goto out_release;
 867        }
 868        dreq->l_ctx = l_ctx;
 869        if (!is_sync_kiocb(iocb))
 870                dreq->iocb = iocb;
 871
 872        result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, uio);
 873        if (!result)
 874                result = nfs_direct_wait(dreq);
 875out_release:
 876        nfs_direct_req_release(dreq);
 877out:
 878        return result;
 879}
 880
 881/**
 882 * nfs_file_direct_read - file direct read operation for NFS files
 883 * @iocb: target I/O control block
 884 * @iov: vector of user buffers into which to read data
 885 * @nr_segs: size of iov vector
 886 * @pos: byte offset in file where reading starts
 887 *
 888 * We use this function for direct reads instead of calling
 889 * generic_file_aio_read() in order to avoid gfar's check to see if
 890 * the request starts before the end of the file.  For that check
 891 * to work, we must generate a GETATTR before each direct read, and
 892 * even then there is a window between the GETATTR and the subsequent
 893 * READ where the file size could change.  Our preference is simply
 894 * to do all reads the application wants, and the server will take
 895 * care of managing the end of file boundary.
 896 *
 897 * This function also eliminates unnecessarily updating the file's
 898 * atime locally, as the NFS server sets the file's atime, and this
 899 * client must read the updated atime from the server back into its
 900 * cache.
 901 */
 902ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
 903                                unsigned long nr_segs, loff_t pos, bool uio)
 904{
 905        ssize_t retval = -EINVAL;
 906        struct file *file = iocb->ki_filp;
 907        struct address_space *mapping = file->f_mapping;
 908        size_t count;
 909
 910        count = iov_length(iov, nr_segs);
 911        nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
 912
 913        dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n",
 914                file->f_path.dentry->d_parent->d_name.name,
 915                file->f_path.dentry->d_name.name,
 916                count, (long long) pos);
 917
 918        retval = 0;
 919        if (!count)
 920                goto out;
 921
 922        retval = nfs_sync_mapping(mapping);
 923        if (retval)
 924                goto out;
 925
 926        task_io_account_read(count);
 927
 928        retval = nfs_direct_read(iocb, iov, nr_segs, pos, uio);
 929        if (retval > 0)
 930                iocb->ki_pos = pos + retval;
 931
 932out:
 933        return retval;
 934}
 935
 936/**
 937 * nfs_file_direct_write - file direct write operation for NFS files
 938 * @iocb: target I/O control block
 939 * @iov: vector of user buffers from which to write data
 940 * @nr_segs: size of iov vector
 941 * @pos: byte offset in file where writing starts
 942 *
 943 * We use this function for direct writes instead of calling
 944 * generic_file_aio_write() in order to avoid taking the inode
 945 * semaphore and updating the i_size.  The NFS server will set
 946 * the new i_size and this client must read the updated size
 947 * back into its cache.  We let the server do generic write
 948 * parameter checking and report problems.
 949 *
 950 * We eliminate local atime updates, see direct read above.
 951 *
 952 * We avoid unnecessary page cache invalidations for normal cached
 953 * readers of this file.
 954 *
 955 * Note that O_APPEND is not supported for NFS direct writes, as there
 956 * is no atomic O_APPEND write facility in the NFS protocol.
 957 */
 958ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
 959                                unsigned long nr_segs, loff_t pos, bool uio)
 960{
 961        ssize_t retval = -EINVAL;
 962        struct file *file = iocb->ki_filp;
 963        struct address_space *mapping = file->f_mapping;
 964        size_t count;
 965
 966        count = iov_length(iov, nr_segs);
 967        nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
 968
 969        dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n",
 970                file->f_path.dentry->d_parent->d_name.name,
 971                file->f_path.dentry->d_name.name,
 972                count, (long long) pos);
 973
 974        retval = generic_write_checks(file, &pos, &count, 0);
 975        if (retval)
 976                goto out;
 977
 978        retval = -EINVAL;
 979        if ((ssize_t) count < 0)
 980                goto out;
 981        retval = 0;
 982        if (!count)
 983                goto out;
 984
 985        retval = nfs_sync_mapping(mapping);
 986        if (retval)
 987                goto out;
 988
 989        task_io_account_write(count);
 990
 991        retval = nfs_direct_write(iocb, iov, nr_segs, pos, count, uio);
 992        if (retval > 0) {
 993                struct inode *inode = mapping->host;
 994
 995                iocb->ki_pos = pos + retval;
 996                spin_lock(&inode->i_lock);
 997                if (i_size_read(inode) < iocb->ki_pos)
 998                        i_size_write(inode, iocb->ki_pos);
 999                spin_unlock(&inode->i_lock);
1000        }
1001out:
1002        return retval;
1003}
1004
1005/**
1006 * nfs_init_directcache - create a slab cache for nfs_direct_req structures
1007 *
1008 */
1009int __init nfs_init_directcache(void)
1010{
1011        nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
1012                                                sizeof(struct nfs_direct_req),
1013                                                0, (SLAB_RECLAIM_ACCOUNT|
1014                                                        SLAB_MEM_SPREAD),
1015                                                NULL);
1016        if (nfs_direct_cachep == NULL)
1017                return -ENOMEM;
1018
1019        return 0;
1020}
1021
1022/**
1023 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1024 *
1025 */
1026void nfs_destroy_directcache(void)
1027{
1028        kmem_cache_destroy(nfs_direct_cachep);
1029}
1030