linux/fs/nfs/direct.c
<<
>>
Prefs
   1/*
   2 * linux/fs/nfs/direct.c
   3 *
   4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
   5 *
   6 * High-performance uncached I/O for the Linux NFS client
   7 *
   8 * There are important applications whose performance or correctness
   9 * depends on uncached access to file data.  Database clusters
  10 * (multiple copies of the same instance running on separate hosts)
  11 * implement their own cache coherency protocol that subsumes file
  12 * system cache protocols.  Applications that process datasets
  13 * considerably larger than the client's memory do not always benefit
  14 * from a local cache.  A streaming video server, for instance, has no
  15 * need to cache the contents of a file.
  16 *
  17 * When an application requests uncached I/O, all read and write requests
  18 * are made directly to the server; data stored or fetched via these
  19 * requests is not cached in the Linux page cache.  The client does not
  20 * correct unaligned requests from applications.  All requested bytes are
  21 * held on permanent storage before a direct write system call returns to
  22 * an application.
  23 *
  24 * Solaris implements an uncached I/O facility called directio() that
  25 * is used for backups and sequential I/O to very large files.  Solaris
  26 * also supports uncaching whole NFS partitions with "-o forcedirectio,"
  27 * an undocumented mount option.
  28 *
  29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
  30 * help from Andrew Morton.
  31 *
  32 * 18 Dec 2001  Initial implementation for 2.4  --cel
  33 * 08 Jul 2002  Version for 2.4.19, with bug fixes --trondmy
  34 * 08 Jun 2003  Port to 2.5 APIs  --cel
  35 * 31 Mar 2004  Handle direct I/O without VFS support  --cel
  36 * 15 Sep 2004  Parallel async reads  --cel
  37 * 04 May 2005  support O_DIRECT with aio  --cel
  38 *
  39 */
  40
  41#include <linux/errno.h>
  42#include <linux/sched.h>
  43#include <linux/kernel.h>
  44#include <linux/file.h>
  45#include <linux/pagemap.h>
  46#include <linux/kref.h>
  47#include <linux/slab.h>
  48#include <linux/task_io_accounting_ops.h>
  49#include <linux/module.h>
  50
  51#include <linux/nfs_fs.h>
  52#include <linux/nfs_page.h>
  53#include <linux/sunrpc/clnt.h>
  54
  55#include <asm/uaccess.h>
  56#include <linux/atomic.h>
  57
  58#include "internal.h"
  59#include "iostat.h"
  60#include "pnfs.h"
  61
  62#define NFSDBG_FACILITY         NFSDBG_VFS
  63
  64static struct kmem_cache *nfs_direct_cachep;
  65
  66/*
  67 * This represents a set of asynchronous requests that we're waiting on
  68 */
  69struct nfs_direct_req {
  70        struct kref             kref;           /* release manager */
  71
  72        /* I/O parameters */
  73        struct nfs_open_context *ctx;           /* file open context info */
  74        struct nfs_lock_context *l_ctx;         /* Lock context info */
  75        struct kiocb *          iocb;           /* controlling i/o request */
  76        struct inode *          inode;          /* target file of i/o */
  77
  78        /* completion state */
  79        atomic_t                io_count;       /* i/os we're waiting for */
  80        spinlock_t              lock;           /* protect completion state */
  81        ssize_t                 count,          /* bytes actually processed */
  82                                bytes_left,     /* bytes left to be sent */
  83                                error;          /* any reported error */
  84        struct completion       completion;     /* wait for i/o completion */
  85
  86        /* commit state */
  87        struct nfs_mds_commit_info mds_cinfo;   /* Storage for cinfo */
  88        struct pnfs_ds_commit_info ds_cinfo;    /* Storage for cinfo */
  89        struct work_struct      work;
  90        int                     flags;
  91#define NFS_ODIRECT_DO_COMMIT           (1)     /* an unstable reply was received */
  92#define NFS_ODIRECT_RESCHED_WRITES      (2)     /* write verification failed */
  93        struct nfs_writeverf    verf;           /* unstable write verifier */
  94};
  95
  96static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
  97static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
  98static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
  99static void nfs_direct_write_schedule_work(struct work_struct *work);
 100
 101static inline void get_dreq(struct nfs_direct_req *dreq)
 102{
 103        atomic_inc(&dreq->io_count);
 104}
 105
 106static inline int put_dreq(struct nfs_direct_req *dreq)
 107{
 108        return atomic_dec_and_test(&dreq->io_count);
 109}
 110
 111/*
 112 * nfs_direct_select_verf - select the right verifier
 113 * @dreq - direct request possibly spanning multiple servers
 114 * @ds_clp - nfs_client of data server or NULL if MDS / non-pnfs
 115 * @ds_idx - index of data server in data server list, only valid if ds_clp set
 116 *
 117 * returns the correct verifier to use given the role of the server
 118 */
 119static struct nfs_writeverf *
 120nfs_direct_select_verf(struct nfs_direct_req *dreq,
 121                       struct nfs_client *ds_clp,
 122                       int ds_idx)
 123{
 124        struct nfs_writeverf *verfp = &dreq->verf;
 125
 126#ifdef CONFIG_NFS_V4_1
 127        if (ds_clp) {
 128                /* pNFS is in use, use the DS verf */
 129                if (ds_idx >= 0 && ds_idx < dreq->ds_cinfo.nbuckets)
 130                        verfp = &dreq->ds_cinfo.buckets[ds_idx].direct_verf;
 131                else
 132                        WARN_ON_ONCE(1);
 133        }
 134#endif
 135        return verfp;
 136}
 137
 138
 139/*
 140 * nfs_direct_set_hdr_verf - set the write/commit verifier
 141 * @dreq - direct request possibly spanning multiple servers
 142 * @hdr - pageio header to validate against previously seen verfs
 143 *
 144 * Set the server's (MDS or DS) "seen" verifier
 145 */
 146static void nfs_direct_set_hdr_verf(struct nfs_direct_req *dreq,
 147                                    struct nfs_pgio_header *hdr)
 148{
 149        struct nfs_writeverf *verfp;
 150
 151        verfp = nfs_direct_select_verf(dreq, hdr->data->ds_clp,
 152                                      hdr->data->ds_idx);
 153        WARN_ON_ONCE(verfp->committed >= 0);
 154        memcpy(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
 155        WARN_ON_ONCE(verfp->committed < 0);
 156}
 157
 158/*
 159 * nfs_direct_cmp_hdr_verf - compare verifier for pgio header
 160 * @dreq - direct request possibly spanning multiple servers
 161 * @hdr - pageio header to validate against previously seen verf
 162 *
 163 * set the server's "seen" verf if not initialized.
 164 * returns result of comparison between @hdr->verf and the "seen"
 165 * verf of the server used by @hdr (DS or MDS)
 166 */
 167static int nfs_direct_set_or_cmp_hdr_verf(struct nfs_direct_req *dreq,
 168                                          struct nfs_pgio_header *hdr)
 169{
 170        struct nfs_writeverf *verfp;
 171
 172        verfp = nfs_direct_select_verf(dreq, hdr->data->ds_clp,
 173                                         hdr->data->ds_idx);
 174        if (verfp->committed < 0) {
 175                nfs_direct_set_hdr_verf(dreq, hdr);
 176                return 0;
 177        }
 178        return memcmp(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
 179}
 180
 181#if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
 182/*
 183 * nfs_direct_cmp_commit_data_verf - compare verifier for commit data
 184 * @dreq - direct request possibly spanning multiple servers
 185 * @data - commit data to validate against previously seen verf
 186 *
 187 * returns result of comparison between @data->verf and the verf of
 188 * the server used by @data (DS or MDS)
 189 */
 190static int nfs_direct_cmp_commit_data_verf(struct nfs_direct_req *dreq,
 191                                           struct nfs_commit_data *data)
 192{
 193        struct nfs_writeverf *verfp;
 194
 195        verfp = nfs_direct_select_verf(dreq, data->ds_clp,
 196                                         data->ds_commit_index);
 197        WARN_ON_ONCE(verfp->committed < 0);
 198        return memcmp(verfp, &data->verf, sizeof(struct nfs_writeverf));
 199}
 200#endif
 201
 202/**
 203 * nfs_direct_IO - NFS address space operation for direct I/O
 204 * @rw: direction (read or write)
 205 * @iocb: target I/O control block
 206 * @iov: array of vectors that define I/O buffer
 207 * @pos: offset in file to begin the operation
 208 * @nr_segs: size of iovec array
 209 *
 210 * The presence of this routine in the address space ops vector means
 211 * the NFS client supports direct I/O. However, for most direct IO, we
 212 * shunt off direct read and write requests before the VFS gets them,
 213 * so this method is only ever called for swap.
 214 */
 215ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, struct iov_iter *iter, loff_t pos)
 216{
 217#ifndef CONFIG_NFS_SWAP
 218        dprintk("NFS: nfs_direct_IO (%pD) off/no(%Ld/%lu) EINVAL\n",
 219                        iocb->ki_filp, (long long) pos, iter->nr_segs);
 220
 221        return -EINVAL;
 222#else
 223        VM_BUG_ON(iocb->ki_nbytes != PAGE_SIZE);
 224
 225        if (rw == READ || rw == KERNEL_READ)
 226                return nfs_file_direct_read(iocb, iter, pos,
 227                                rw == READ ? true : false);
 228        return nfs_file_direct_write(iocb, iter, pos,
 229                                rw == WRITE ? true : false);
 230#endif /* CONFIG_NFS_SWAP */
 231}
 232
 233static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
 234{
 235        unsigned int i;
 236        for (i = 0; i < npages; i++)
 237                page_cache_release(pages[i]);
 238}
 239
 240void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
 241                              struct nfs_direct_req *dreq)
 242{
 243        cinfo->lock = &dreq->lock;
 244        cinfo->mds = &dreq->mds_cinfo;
 245        cinfo->ds = &dreq->ds_cinfo;
 246        cinfo->dreq = dreq;
 247        cinfo->completion_ops = &nfs_direct_commit_completion_ops;
 248}
 249
 250static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
 251{
 252        struct nfs_direct_req *dreq;
 253
 254        dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
 255        if (!dreq)
 256                return NULL;
 257
 258        kref_init(&dreq->kref);
 259        kref_get(&dreq->kref);
 260        init_completion(&dreq->completion);
 261        INIT_LIST_HEAD(&dreq->mds_cinfo.list);
 262        dreq->verf.committed = NFS_INVALID_STABLE_HOW;  /* not set yet */
 263        INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
 264        spin_lock_init(&dreq->lock);
 265
 266        return dreq;
 267}
 268
 269static void nfs_direct_req_free(struct kref *kref)
 270{
 271        struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
 272
 273        if (dreq->l_ctx != NULL)
 274                nfs_put_lock_context(dreq->l_ctx);
 275        if (dreq->ctx != NULL)
 276                put_nfs_open_context(dreq->ctx);
 277        kmem_cache_free(nfs_direct_cachep, dreq);
 278}
 279
 280static void nfs_direct_req_release(struct nfs_direct_req *dreq)
 281{
 282        kref_put(&dreq->kref, nfs_direct_req_free);
 283}
 284
 285ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq)
 286{
 287        return dreq->bytes_left;
 288}
 289EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
 290
 291/*
 292 * Collects and returns the final error value/byte-count.
 293 */
 294static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
 295{
 296        ssize_t result = -EIOCBQUEUED;
 297
 298        /* Async requests don't wait here */
 299        if (dreq->iocb)
 300                goto out;
 301
 302        result = wait_for_completion_killable(&dreq->completion);
 303
 304        if (!result)
 305                result = dreq->error;
 306        if (!result)
 307                result = dreq->count;
 308
 309out:
 310        return (ssize_t) result;
 311}
 312
 313/*
 314 * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
 315 * the iocb is still valid here if this is a synchronous request.
 316 */
 317static void nfs_direct_complete(struct nfs_direct_req *dreq, bool write)
 318{
 319        struct inode *inode = dreq->inode;
 320
 321        if (dreq->iocb && write) {
 322                loff_t pos = dreq->iocb->ki_pos + dreq->count;
 323
 324                spin_lock(&inode->i_lock);
 325                if (i_size_read(inode) < pos)
 326                        i_size_write(inode, pos);
 327                spin_unlock(&inode->i_lock);
 328        }
 329
 330        if (write)
 331                nfs_zap_mapping(inode, inode->i_mapping);
 332
 333        inode_dio_done(inode);
 334
 335        if (dreq->iocb) {
 336                long res = (long) dreq->error;
 337                if (!res)
 338                        res = (long) dreq->count;
 339                aio_complete(dreq->iocb, res, 0);
 340        }
 341
 342        complete_all(&dreq->completion);
 343
 344        nfs_direct_req_release(dreq);
 345}
 346
 347static void nfs_direct_readpage_release(struct nfs_page *req)
 348{
 349        dprintk("NFS: direct read done (%s/%llu %d@%lld)\n",
 350                req->wb_context->dentry->d_inode->i_sb->s_id,
 351                (unsigned long long)NFS_FILEID(req->wb_context->dentry->d_inode),
 352                req->wb_bytes,
 353                (long long)req_offset(req));
 354        nfs_release_request(req);
 355}
 356
 357static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
 358{
 359        unsigned long bytes = 0;
 360        struct nfs_direct_req *dreq = hdr->dreq;
 361
 362        if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
 363                goto out_put;
 364
 365        spin_lock(&dreq->lock);
 366        if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
 367                dreq->error = hdr->error;
 368        else
 369                dreq->count += hdr->good_bytes;
 370        spin_unlock(&dreq->lock);
 371
 372        while (!list_empty(&hdr->pages)) {
 373                struct nfs_page *req = nfs_list_entry(hdr->pages.next);
 374                struct page *page = req->wb_page;
 375
 376                if (!PageCompound(page) && bytes < hdr->good_bytes)
 377                        set_page_dirty(page);
 378                bytes += req->wb_bytes;
 379                nfs_list_remove_request(req);
 380                nfs_direct_readpage_release(req);
 381        }
 382out_put:
 383        if (put_dreq(dreq))
 384                nfs_direct_complete(dreq, false);
 385        hdr->release(hdr);
 386}
 387
 388static void nfs_read_sync_pgio_error(struct list_head *head)
 389{
 390        struct nfs_page *req;
 391
 392        while (!list_empty(head)) {
 393                req = nfs_list_entry(head->next);
 394                nfs_list_remove_request(req);
 395                nfs_release_request(req);
 396        }
 397}
 398
 399static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
 400{
 401        get_dreq(hdr->dreq);
 402}
 403
 404static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
 405        .error_cleanup = nfs_read_sync_pgio_error,
 406        .init_hdr = nfs_direct_pgio_init,
 407        .completion = nfs_direct_read_completion,
 408};
 409
 410/*
 411 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
 412 * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
 413 * bail and stop sending more reads.  Read length accounting is
 414 * handled automatically by nfs_direct_read_result().  Otherwise, if
 415 * no requests have been sent, just return an error.
 416 */
 417
 418static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
 419                                              struct iov_iter *iter,
 420                                              loff_t pos)
 421{
 422        struct nfs_pageio_descriptor desc;
 423        struct inode *inode = dreq->inode;
 424        ssize_t result = -EINVAL;
 425        size_t requested_bytes = 0;
 426        size_t rsize = max_t(size_t, NFS_SERVER(inode)->rsize, PAGE_SIZE);
 427
 428        nfs_pageio_init_read(&desc, dreq->inode, false,
 429                             &nfs_direct_read_completion_ops);
 430        get_dreq(dreq);
 431        desc.pg_dreq = dreq;
 432        atomic_inc(&inode->i_dio_count);
 433
 434        while (iov_iter_count(iter)) {
 435                struct page **pagevec;
 436                size_t bytes;
 437                size_t pgbase;
 438                unsigned npages, i;
 439
 440                result = iov_iter_get_pages_alloc(iter, &pagevec, 
 441                                                  rsize, &pgbase);
 442                if (result < 0)
 443                        break;
 444        
 445                bytes = result;
 446                iov_iter_advance(iter, bytes);
 447                npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
 448                for (i = 0; i < npages; i++) {
 449                        struct nfs_page *req;
 450                        unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
 451                        /* XXX do we need to do the eof zeroing found in async_filler? */
 452                        req = nfs_create_request(dreq->ctx, pagevec[i], NULL,
 453                                                 pgbase, req_len);
 454                        if (IS_ERR(req)) {
 455                                result = PTR_ERR(req);
 456                                break;
 457                        }
 458                        req->wb_index = pos >> PAGE_SHIFT;
 459                        req->wb_offset = pos & ~PAGE_MASK;
 460                        if (!nfs_pageio_add_request(&desc, req)) {
 461                                result = desc.pg_error;
 462                                nfs_release_request(req);
 463                                break;
 464                        }
 465                        pgbase = 0;
 466                        bytes -= req_len;
 467                        requested_bytes += req_len;
 468                        pos += req_len;
 469                        dreq->bytes_left -= req_len;
 470                }
 471                nfs_direct_release_pages(pagevec, npages);
 472                kvfree(pagevec);
 473                if (result < 0)
 474                        break;
 475        }
 476
 477        nfs_pageio_complete(&desc);
 478
 479        /*
 480         * If no bytes were started, return the error, and let the
 481         * generic layer handle the completion.
 482         */
 483        if (requested_bytes == 0) {
 484                inode_dio_done(inode);
 485                nfs_direct_req_release(dreq);
 486                return result < 0 ? result : -EIO;
 487        }
 488
 489        if (put_dreq(dreq))
 490                nfs_direct_complete(dreq, false);
 491        return 0;
 492}
 493
 494/**
 495 * nfs_file_direct_read - file direct read operation for NFS files
 496 * @iocb: target I/O control block
 497 * @iter: vector of user buffers into which to read data
 498 * @pos: byte offset in file where reading starts
 499 *
 500 * We use this function for direct reads instead of calling
 501 * generic_file_aio_read() in order to avoid gfar's check to see if
 502 * the request starts before the end of the file.  For that check
 503 * to work, we must generate a GETATTR before each direct read, and
 504 * even then there is a window between the GETATTR and the subsequent
 505 * READ where the file size could change.  Our preference is simply
 506 * to do all reads the application wants, and the server will take
 507 * care of managing the end of file boundary.
 508 *
 509 * This function also eliminates unnecessarily updating the file's
 510 * atime locally, as the NFS server sets the file's atime, and this
 511 * client must read the updated atime from the server back into its
 512 * cache.
 513 */
 514ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter,
 515                                loff_t pos, bool uio)
 516{
 517        struct file *file = iocb->ki_filp;
 518        struct address_space *mapping = file->f_mapping;
 519        struct inode *inode = mapping->host;
 520        struct nfs_direct_req *dreq;
 521        struct nfs_lock_context *l_ctx;
 522        ssize_t result = -EINVAL;
 523        size_t count = iov_iter_count(iter);
 524        nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
 525
 526        dfprintk(FILE, "NFS: direct read(%pD2, %zd@%Ld)\n",
 527                file, count, (long long) pos);
 528
 529        result = 0;
 530        if (!count)
 531                goto out;
 532
 533        mutex_lock(&inode->i_mutex);
 534        result = nfs_sync_mapping(mapping);
 535        if (result)
 536                goto out_unlock;
 537
 538        task_io_account_read(count);
 539
 540        result = -ENOMEM;
 541        dreq = nfs_direct_req_alloc();
 542        if (dreq == NULL)
 543                goto out_unlock;
 544
 545        dreq->inode = inode;
 546        dreq->bytes_left = count;
 547        dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
 548        l_ctx = nfs_get_lock_context(dreq->ctx);
 549        if (IS_ERR(l_ctx)) {
 550                result = PTR_ERR(l_ctx);
 551                goto out_release;
 552        }
 553        dreq->l_ctx = l_ctx;
 554        if (!is_sync_kiocb(iocb))
 555                dreq->iocb = iocb;
 556
 557        NFS_I(inode)->read_io += count;
 558        result = nfs_direct_read_schedule_iovec(dreq, iter, pos);
 559
 560        mutex_unlock(&inode->i_mutex);
 561
 562        if (!result) {
 563                result = nfs_direct_wait(dreq);
 564                if (result > 0)
 565                        iocb->ki_pos = pos + result;
 566        }
 567
 568        nfs_direct_req_release(dreq);
 569        return result;
 570
 571out_release:
 572        nfs_direct_req_release(dreq);
 573out_unlock:
 574        mutex_unlock(&inode->i_mutex);
 575out:
 576        return result;
 577}
 578
 579#if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
 580static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
 581{
 582        struct nfs_pageio_descriptor desc;
 583        struct nfs_page *req, *tmp;
 584        LIST_HEAD(reqs);
 585        struct nfs_commit_info cinfo;
 586        LIST_HEAD(failed);
 587
 588        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 589        pnfs_recover_commit_reqs(dreq->inode, &reqs, &cinfo);
 590        spin_lock(cinfo.lock);
 591        nfs_scan_commit_list(&cinfo.mds->list, &reqs, &cinfo, 0);
 592        spin_unlock(cinfo.lock);
 593
 594        dreq->count = 0;
 595        get_dreq(dreq);
 596
 597        nfs_pageio_init_write(&desc, dreq->inode, FLUSH_STABLE, false,
 598                              &nfs_direct_write_completion_ops);
 599        desc.pg_dreq = dreq;
 600
 601        list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
 602                if (!nfs_pageio_add_request(&desc, req)) {
 603                        nfs_list_remove_request(req);
 604                        nfs_list_add_request(req, &failed);
 605                        spin_lock(cinfo.lock);
 606                        dreq->flags = 0;
 607                        dreq->error = -EIO;
 608                        spin_unlock(cinfo.lock);
 609                }
 610                nfs_release_request(req);
 611        }
 612        nfs_pageio_complete(&desc);
 613
 614        while (!list_empty(&failed)) {
 615                req = nfs_list_entry(failed.next);
 616                nfs_list_remove_request(req);
 617                nfs_unlock_and_release_request(req);
 618        }
 619
 620        if (put_dreq(dreq))
 621                nfs_direct_write_complete(dreq, dreq->inode);
 622}
 623
 624static void nfs_direct_commit_complete(struct nfs_commit_data *data)
 625{
 626        struct nfs_direct_req *dreq = data->dreq;
 627        struct nfs_commit_info cinfo;
 628        struct nfs_page *req;
 629        int status = data->task.tk_status;
 630
 631        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 632        if (status < 0) {
 633                dprintk("NFS: %5u commit failed with error %d.\n",
 634                        data->task.tk_pid, status);
 635                dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 636        } else if (nfs_direct_cmp_commit_data_verf(dreq, data)) {
 637                dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid);
 638                dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 639        }
 640
 641        dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status);
 642        while (!list_empty(&data->pages)) {
 643                req = nfs_list_entry(data->pages.next);
 644                nfs_list_remove_request(req);
 645                if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
 646                        /* Note the rewrite will go through mds */
 647                        nfs_mark_request_commit(req, NULL, &cinfo);
 648                } else
 649                        nfs_release_request(req);
 650                nfs_unlock_and_release_request(req);
 651        }
 652
 653        if (atomic_dec_and_test(&cinfo.mds->rpcs_out))
 654                nfs_direct_write_complete(dreq, data->inode);
 655}
 656
 657static void nfs_direct_error_cleanup(struct nfs_inode *nfsi)
 658{
 659        /* There is no lock to clear */
 660}
 661
 662static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
 663        .completion = nfs_direct_commit_complete,
 664        .error_cleanup = nfs_direct_error_cleanup,
 665};
 666
 667static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
 668{
 669        int res;
 670        struct nfs_commit_info cinfo;
 671        LIST_HEAD(mds_list);
 672
 673        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 674        nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
 675        res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
 676        if (res < 0) /* res == -ENOMEM */
 677                nfs_direct_write_reschedule(dreq);
 678}
 679
 680static void nfs_direct_write_schedule_work(struct work_struct *work)
 681{
 682        struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
 683        int flags = dreq->flags;
 684
 685        dreq->flags = 0;
 686        switch (flags) {
 687                case NFS_ODIRECT_DO_COMMIT:
 688                        nfs_direct_commit_schedule(dreq);
 689                        break;
 690                case NFS_ODIRECT_RESCHED_WRITES:
 691                        nfs_direct_write_reschedule(dreq);
 692                        break;
 693                default:
 694                        nfs_direct_complete(dreq, true);
 695        }
 696}
 697
 698static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
 699{
 700        schedule_work(&dreq->work); /* Calls nfs_direct_write_schedule_work */
 701}
 702
 703#else
 704static void nfs_direct_write_schedule_work(struct work_struct *work)
 705{
 706}
 707
 708static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
 709{
 710        nfs_direct_complete(dreq, true);
 711}
 712#endif
 713
 714static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
 715{
 716        struct nfs_direct_req *dreq = hdr->dreq;
 717        struct nfs_commit_info cinfo;
 718        int bit = -1;
 719        struct nfs_page *req = nfs_list_entry(hdr->pages.next);
 720
 721        if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
 722                goto out_put;
 723
 724        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 725
 726        spin_lock(&dreq->lock);
 727
 728        if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
 729                dreq->flags = 0;
 730                dreq->error = hdr->error;
 731        }
 732        if (dreq->error != 0)
 733                bit = NFS_IOHDR_ERROR;
 734        else {
 735                dreq->count += hdr->good_bytes;
 736                if (test_bit(NFS_IOHDR_NEED_RESCHED, &hdr->flags)) {
 737                        dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 738                        bit = NFS_IOHDR_NEED_RESCHED;
 739                } else if (test_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags)) {
 740                        if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
 741                                bit = NFS_IOHDR_NEED_RESCHED;
 742                        else if (dreq->flags == 0) {
 743                                nfs_direct_set_hdr_verf(dreq, hdr);
 744                                bit = NFS_IOHDR_NEED_COMMIT;
 745                                dreq->flags = NFS_ODIRECT_DO_COMMIT;
 746                        } else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
 747                                if (nfs_direct_set_or_cmp_hdr_verf(dreq, hdr)) {
 748                                        dreq->flags =
 749                                                NFS_ODIRECT_RESCHED_WRITES;
 750                                        bit = NFS_IOHDR_NEED_RESCHED;
 751                                } else
 752                                        bit = NFS_IOHDR_NEED_COMMIT;
 753                        }
 754                }
 755        }
 756        spin_unlock(&dreq->lock);
 757
 758        while (!list_empty(&hdr->pages)) {
 759
 760                req = nfs_list_entry(hdr->pages.next);
 761                nfs_list_remove_request(req);
 762                switch (bit) {
 763                case NFS_IOHDR_NEED_RESCHED:
 764                case NFS_IOHDR_NEED_COMMIT:
 765                        kref_get(&req->wb_kref);
 766                        nfs_mark_request_commit(req, hdr->lseg, &cinfo);
 767                }
 768                nfs_unlock_and_release_request(req);
 769        }
 770
 771out_put:
 772        if (put_dreq(dreq))
 773                nfs_direct_write_complete(dreq, hdr->inode);
 774        hdr->release(hdr);
 775}
 776
 777static void nfs_write_sync_pgio_error(struct list_head *head)
 778{
 779        struct nfs_page *req;
 780
 781        while (!list_empty(head)) {
 782                req = nfs_list_entry(head->next);
 783                nfs_list_remove_request(req);
 784                nfs_unlock_and_release_request(req);
 785        }
 786}
 787
 788static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
 789        .error_cleanup = nfs_write_sync_pgio_error,
 790        .init_hdr = nfs_direct_pgio_init,
 791        .completion = nfs_direct_write_completion,
 792};
 793
 794
 795/*
 796 * NB: Return the value of the first error return code.  Subsequent
 797 *     errors after the first one are ignored.
 798 */
 799/*
 800 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
 801 * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
 802 * bail and stop sending more writes.  Write length accounting is
 803 * handled automatically by nfs_direct_write_result().  Otherwise, if
 804 * no requests have been sent, just return an error.
 805 */
 806static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
 807                                               struct iov_iter *iter,
 808                                               loff_t pos)
 809{
 810        struct nfs_pageio_descriptor desc;
 811        struct inode *inode = dreq->inode;
 812        ssize_t result = 0;
 813        size_t requested_bytes = 0;
 814        size_t wsize = max_t(size_t, NFS_SERVER(inode)->wsize, PAGE_SIZE);
 815
 816        nfs_pageio_init_write(&desc, inode, FLUSH_COND_STABLE, false,
 817                              &nfs_direct_write_completion_ops);
 818        desc.pg_dreq = dreq;
 819        get_dreq(dreq);
 820        atomic_inc(&inode->i_dio_count);
 821
 822        NFS_I(inode)->write_io += iov_iter_count(iter);
 823        while (iov_iter_count(iter)) {
 824                struct page **pagevec;
 825                size_t bytes;
 826                size_t pgbase;
 827                unsigned npages, i;
 828
 829                result = iov_iter_get_pages_alloc(iter, &pagevec, 
 830                                                  wsize, &pgbase);
 831                if (result < 0)
 832                        break;
 833
 834                bytes = result;
 835                iov_iter_advance(iter, bytes);
 836                npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
 837                for (i = 0; i < npages; i++) {
 838                        struct nfs_page *req;
 839                        unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
 840
 841                        req = nfs_create_request(dreq->ctx, pagevec[i], NULL,
 842                                                 pgbase, req_len);
 843                        if (IS_ERR(req)) {
 844                                result = PTR_ERR(req);
 845                                break;
 846                        }
 847                        nfs_lock_request(req);
 848                        req->wb_index = pos >> PAGE_SHIFT;
 849                        req->wb_offset = pos & ~PAGE_MASK;
 850                        if (!nfs_pageio_add_request(&desc, req)) {
 851                                result = desc.pg_error;
 852                                nfs_unlock_and_release_request(req);
 853                                break;
 854                        }
 855                        pgbase = 0;
 856                        bytes -= req_len;
 857                        requested_bytes += req_len;
 858                        pos += req_len;
 859                        dreq->bytes_left -= req_len;
 860                }
 861                nfs_direct_release_pages(pagevec, npages);
 862                kvfree(pagevec);
 863                if (result < 0)
 864                        break;
 865        }
 866        nfs_pageio_complete(&desc);
 867
 868        /*
 869         * If no bytes were started, return the error, and let the
 870         * generic layer handle the completion.
 871         */
 872        if (requested_bytes == 0) {
 873                inode_dio_done(inode);
 874                nfs_direct_req_release(dreq);
 875                return result < 0 ? result : -EIO;
 876        }
 877
 878        if (put_dreq(dreq))
 879                nfs_direct_write_complete(dreq, dreq->inode);
 880        return 0;
 881}
 882
 883/**
 884 * nfs_file_direct_write - file direct write operation for NFS files
 885 * @iocb: target I/O control block
 886 * @iter: vector of user buffers from which to write data
 887 * @pos: byte offset in file where writing starts
 888 *
 889 * We use this function for direct writes instead of calling
 890 * generic_file_aio_write() in order to avoid taking the inode
 891 * semaphore and updating the i_size.  The NFS server will set
 892 * the new i_size and this client must read the updated size
 893 * back into its cache.  We let the server do generic write
 894 * parameter checking and report problems.
 895 *
 896 * We eliminate local atime updates, see direct read above.
 897 *
 898 * We avoid unnecessary page cache invalidations for normal cached
 899 * readers of this file.
 900 *
 901 * Note that O_APPEND is not supported for NFS direct writes, as there
 902 * is no atomic O_APPEND write facility in the NFS protocol.
 903 */
 904ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter,
 905                                loff_t pos, bool uio)
 906{
 907        ssize_t result = -EINVAL;
 908        struct file *file = iocb->ki_filp;
 909        struct address_space *mapping = file->f_mapping;
 910        struct inode *inode = mapping->host;
 911        struct nfs_direct_req *dreq;
 912        struct nfs_lock_context *l_ctx;
 913        loff_t end;
 914        size_t count = iov_iter_count(iter);
 915        end = (pos + count - 1) >> PAGE_CACHE_SHIFT;
 916
 917        nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
 918
 919        dfprintk(FILE, "NFS: direct write(%pD2, %zd@%Ld)\n",
 920                file, count, (long long) pos);
 921
 922        result = generic_write_checks(file, &pos, &count, 0);
 923        if (result)
 924                goto out;
 925
 926        result = -EINVAL;
 927        if ((ssize_t) count < 0)
 928                goto out;
 929        result = 0;
 930        if (!count)
 931                goto out;
 932
 933        mutex_lock(&inode->i_mutex);
 934
 935        result = nfs_sync_mapping(mapping);
 936        if (result)
 937                goto out_unlock;
 938
 939        if (mapping->nrpages) {
 940                result = invalidate_inode_pages2_range(mapping,
 941                                        pos >> PAGE_CACHE_SHIFT, end);
 942                if (result)
 943                        goto out_unlock;
 944        }
 945
 946        task_io_account_write(count);
 947
 948        result = -ENOMEM;
 949        dreq = nfs_direct_req_alloc();
 950        if (!dreq)
 951                goto out_unlock;
 952
 953        dreq->inode = inode;
 954        dreq->bytes_left = count;
 955        dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
 956        l_ctx = nfs_get_lock_context(dreq->ctx);
 957        if (IS_ERR(l_ctx)) {
 958                result = PTR_ERR(l_ctx);
 959                goto out_release;
 960        }
 961        dreq->l_ctx = l_ctx;
 962        if (!is_sync_kiocb(iocb))
 963                dreq->iocb = iocb;
 964
 965        result = nfs_direct_write_schedule_iovec(dreq, iter, pos);
 966
 967        if (mapping->nrpages) {
 968                invalidate_inode_pages2_range(mapping,
 969                                              pos >> PAGE_CACHE_SHIFT, end);
 970        }
 971
 972        mutex_unlock(&inode->i_mutex);
 973
 974        if (!result) {
 975                result = nfs_direct_wait(dreq);
 976                if (result > 0) {
 977                        struct inode *inode = mapping->host;
 978
 979                        iocb->ki_pos = pos + result;
 980                        spin_lock(&inode->i_lock);
 981                        if (i_size_read(inode) < iocb->ki_pos)
 982                                i_size_write(inode, iocb->ki_pos);
 983                        spin_unlock(&inode->i_lock);
 984                }
 985        }
 986        nfs_direct_req_release(dreq);
 987        return result;
 988
 989out_release:
 990        nfs_direct_req_release(dreq);
 991out_unlock:
 992        mutex_unlock(&inode->i_mutex);
 993out:
 994        return result;
 995}
 996
 997/**
 998 * nfs_init_directcache - create a slab cache for nfs_direct_req structures
 999 *
1000 */
1001int __init nfs_init_directcache(void)
1002{
1003        nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
1004                                                sizeof(struct nfs_direct_req),
1005                                                0, (SLAB_RECLAIM_ACCOUNT|
1006                                                        SLAB_MEM_SPREAD),
1007                                                NULL);
1008        if (nfs_direct_cachep == NULL)
1009                return -ENOMEM;
1010
1011        return 0;
1012}
1013
1014/**
1015 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1016 *
1017 */
1018void nfs_destroy_directcache(void)
1019{
1020        kmem_cache_destroy(nfs_direct_cachep);
1021}
1022