linux/fs/nfs/flexfilelayout/flexfilelayout.c
<<
>>
Prefs
   1/*
   2 * Module for pnfs flexfile layout driver.
   3 *
   4 * Copyright (c) 2014, Primary Data, Inc. All rights reserved.
   5 *
   6 * Tao Peng <bergwolf@primarydata.com>
   7 */
   8
   9#include <linux/nfs_fs.h>
  10#include <linux/nfs_page.h>
  11#include <linux/module.h>
  12
  13#include <linux/sunrpc/metrics.h>
  14
  15#include "flexfilelayout.h"
  16#include "../nfs4session.h"
  17#include "../nfs4idmap.h"
  18#include "../internal.h"
  19#include "../delegation.h"
  20#include "../nfs4trace.h"
  21#include "../iostat.h"
  22#include "../nfs.h"
  23#include "../nfs42.h"
  24
  25#define NFSDBG_FACILITY         NFSDBG_PNFS_LD
  26
  27#define FF_LAYOUT_POLL_RETRY_MAX     (15*HZ)
  28#define FF_LAYOUTRETURN_MAXERR 20
  29
  30
  31static struct group_info        *ff_zero_group;
  32
  33static void ff_layout_read_record_layoutstats_done(struct rpc_task *task,
  34                struct nfs_pgio_header *hdr);
  35static int ff_layout_mirror_prepare_stats(struct pnfs_layout_hdr *lo,
  36                               struct nfs42_layoutstat_devinfo *devinfo,
  37                               int dev_limit);
  38static void ff_layout_encode_ff_layoutupdate(struct xdr_stream *xdr,
  39                              const struct nfs42_layoutstat_devinfo *devinfo,
  40                              struct nfs4_ff_layout_mirror *mirror);
  41
  42static struct pnfs_layout_hdr *
  43ff_layout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags)
  44{
  45        struct nfs4_flexfile_layout *ffl;
  46
  47        ffl = kzalloc(sizeof(*ffl), gfp_flags);
  48        if (ffl) {
  49                INIT_LIST_HEAD(&ffl->error_list);
  50                INIT_LIST_HEAD(&ffl->mirrors);
  51                ffl->last_report_time = ktime_get();
  52                return &ffl->generic_hdr;
  53        } else
  54                return NULL;
  55}
  56
  57static void
  58ff_layout_free_layout_hdr(struct pnfs_layout_hdr *lo)
  59{
  60        struct nfs4_ff_layout_ds_err *err, *n;
  61
  62        list_for_each_entry_safe(err, n, &FF_LAYOUT_FROM_HDR(lo)->error_list,
  63                                 list) {
  64                list_del(&err->list);
  65                kfree(err);
  66        }
  67        kfree(FF_LAYOUT_FROM_HDR(lo));
  68}
  69
  70static int decode_pnfs_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
  71{
  72        __be32 *p;
  73
  74        p = xdr_inline_decode(xdr, NFS4_STATEID_SIZE);
  75        if (unlikely(p == NULL))
  76                return -ENOBUFS;
  77        stateid->type = NFS4_PNFS_DS_STATEID_TYPE;
  78        memcpy(stateid->data, p, NFS4_STATEID_SIZE);
  79        dprintk("%s: stateid id= [%x%x%x%x]\n", __func__,
  80                p[0], p[1], p[2], p[3]);
  81        return 0;
  82}
  83
  84static int decode_deviceid(struct xdr_stream *xdr, struct nfs4_deviceid *devid)
  85{
  86        __be32 *p;
  87
  88        p = xdr_inline_decode(xdr, NFS4_DEVICEID4_SIZE);
  89        if (unlikely(!p))
  90                return -ENOBUFS;
  91        memcpy(devid, p, NFS4_DEVICEID4_SIZE);
  92        nfs4_print_deviceid(devid);
  93        return 0;
  94}
  95
  96static int decode_nfs_fh(struct xdr_stream *xdr, struct nfs_fh *fh)
  97{
  98        __be32 *p;
  99
 100        p = xdr_inline_decode(xdr, 4);
 101        if (unlikely(!p))
 102                return -ENOBUFS;
 103        fh->size = be32_to_cpup(p++);
 104        if (fh->size > sizeof(struct nfs_fh)) {
 105                printk(KERN_ERR "NFS flexfiles: Too big fh received %d\n",
 106                       fh->size);
 107                return -EOVERFLOW;
 108        }
 109        /* fh.data */
 110        p = xdr_inline_decode(xdr, fh->size);
 111        if (unlikely(!p))
 112                return -ENOBUFS;
 113        memcpy(&fh->data, p, fh->size);
 114        dprintk("%s: fh len %d\n", __func__, fh->size);
 115
 116        return 0;
 117}
 118
 119/*
 120 * Currently only stringified uids and gids are accepted.
 121 * I.e., kerberos is not supported to the DSes, so no pricipals.
 122 *
 123 * That means that one common function will suffice, but when
 124 * principals are added, this should be split to accomodate
 125 * calls to both nfs_map_name_to_uid() and nfs_map_group_to_gid().
 126 */
 127static int
 128decode_name(struct xdr_stream *xdr, u32 *id)
 129{
 130        __be32 *p;
 131        int len;
 132
 133        /* opaque_length(4)*/
 134        p = xdr_inline_decode(xdr, 4);
 135        if (unlikely(!p))
 136                return -ENOBUFS;
 137        len = be32_to_cpup(p++);
 138        if (len < 0)
 139                return -EINVAL;
 140
 141        dprintk("%s: len %u\n", __func__, len);
 142
 143        /* opaque body */
 144        p = xdr_inline_decode(xdr, len);
 145        if (unlikely(!p))
 146                return -ENOBUFS;
 147
 148        if (!nfs_map_string_to_numeric((char *)p, len, id))
 149                return -EINVAL;
 150
 151        return 0;
 152}
 153
 154static bool ff_mirror_match_fh(const struct nfs4_ff_layout_mirror *m1,
 155                const struct nfs4_ff_layout_mirror *m2)
 156{
 157        int i, j;
 158
 159        if (m1->fh_versions_cnt != m2->fh_versions_cnt)
 160                return false;
 161        for (i = 0; i < m1->fh_versions_cnt; i++) {
 162                bool found_fh = false;
 163                for (j = 0; j < m2->fh_versions_cnt; j++) {
 164                        if (nfs_compare_fh(&m1->fh_versions[i],
 165                                        &m2->fh_versions[j]) == 0) {
 166                                found_fh = true;
 167                                break;
 168                        }
 169                }
 170                if (!found_fh)
 171                        return false;
 172        }
 173        return true;
 174}
 175
 176static struct nfs4_ff_layout_mirror *
 177ff_layout_add_mirror(struct pnfs_layout_hdr *lo,
 178                struct nfs4_ff_layout_mirror *mirror)
 179{
 180        struct nfs4_flexfile_layout *ff_layout = FF_LAYOUT_FROM_HDR(lo);
 181        struct nfs4_ff_layout_mirror *pos;
 182        struct inode *inode = lo->plh_inode;
 183
 184        spin_lock(&inode->i_lock);
 185        list_for_each_entry(pos, &ff_layout->mirrors, mirrors) {
 186                if (memcmp(&mirror->devid, &pos->devid, sizeof(pos->devid)) != 0)
 187                        continue;
 188                if (!ff_mirror_match_fh(mirror, pos))
 189                        continue;
 190                if (atomic_inc_not_zero(&pos->ref)) {
 191                        spin_unlock(&inode->i_lock);
 192                        return pos;
 193                }
 194        }
 195        list_add(&mirror->mirrors, &ff_layout->mirrors);
 196        mirror->layout = lo;
 197        spin_unlock(&inode->i_lock);
 198        return mirror;
 199}
 200
 201static void
 202ff_layout_remove_mirror(struct nfs4_ff_layout_mirror *mirror)
 203{
 204        struct inode *inode;
 205        if (mirror->layout == NULL)
 206                return;
 207        inode = mirror->layout->plh_inode;
 208        spin_lock(&inode->i_lock);
 209        list_del(&mirror->mirrors);
 210        spin_unlock(&inode->i_lock);
 211        mirror->layout = NULL;
 212}
 213
 214static struct nfs4_ff_layout_mirror *ff_layout_alloc_mirror(gfp_t gfp_flags)
 215{
 216        struct nfs4_ff_layout_mirror *mirror;
 217
 218        mirror = kzalloc(sizeof(*mirror), gfp_flags);
 219        if (mirror != NULL) {
 220                spin_lock_init(&mirror->lock);
 221                atomic_set(&mirror->ref, 1);
 222                INIT_LIST_HEAD(&mirror->mirrors);
 223        }
 224        return mirror;
 225}
 226
 227static void ff_layout_free_mirror(struct nfs4_ff_layout_mirror *mirror)
 228{
 229        struct rpc_cred *cred;
 230
 231        ff_layout_remove_mirror(mirror);
 232        kfree(mirror->fh_versions);
 233        cred = rcu_access_pointer(mirror->ro_cred);
 234        if (cred)
 235                put_rpccred(cred);
 236        cred = rcu_access_pointer(mirror->rw_cred);
 237        if (cred)
 238                put_rpccred(cred);
 239        nfs4_ff_layout_put_deviceid(mirror->mirror_ds);
 240        kfree(mirror);
 241}
 242
 243static void ff_layout_put_mirror(struct nfs4_ff_layout_mirror *mirror)
 244{
 245        if (mirror != NULL && atomic_dec_and_test(&mirror->ref))
 246                ff_layout_free_mirror(mirror);
 247}
 248
 249static void ff_layout_free_mirror_array(struct nfs4_ff_layout_segment *fls)
 250{
 251        int i;
 252
 253        if (fls->mirror_array) {
 254                for (i = 0; i < fls->mirror_array_cnt; i++) {
 255                        /* normally mirror_ds is freed in
 256                         * .free_deviceid_node but we still do it here
 257                         * for .alloc_lseg error path */
 258                        ff_layout_put_mirror(fls->mirror_array[i]);
 259                }
 260                kfree(fls->mirror_array);
 261                fls->mirror_array = NULL;
 262        }
 263}
 264
 265static int ff_layout_check_layout(struct nfs4_layoutget_res *lgr)
 266{
 267        int ret = 0;
 268
 269        dprintk("--> %s\n", __func__);
 270
 271        /* FIXME: remove this check when layout segment support is added */
 272        if (lgr->range.offset != 0 ||
 273            lgr->range.length != NFS4_MAX_UINT64) {
 274                dprintk("%s Only whole file layouts supported. Use MDS i/o\n",
 275                        __func__);
 276                ret = -EINVAL;
 277        }
 278
 279        dprintk("--> %s returns %d\n", __func__, ret);
 280        return ret;
 281}
 282
 283static void _ff_layout_free_lseg(struct nfs4_ff_layout_segment *fls)
 284{
 285        if (fls) {
 286                ff_layout_free_mirror_array(fls);
 287                kfree(fls);
 288        }
 289}
 290
 291static bool
 292ff_lseg_range_is_after(const struct pnfs_layout_range *l1,
 293                const struct pnfs_layout_range *l2)
 294{
 295        u64 end1, end2;
 296
 297        if (l1->iomode != l2->iomode)
 298                return l1->iomode != IOMODE_READ;
 299        end1 = pnfs_calc_offset_end(l1->offset, l1->length);
 300        end2 = pnfs_calc_offset_end(l2->offset, l2->length);
 301        if (end1 < l2->offset)
 302                return false;
 303        if (end2 < l1->offset)
 304                return true;
 305        return l2->offset <= l1->offset;
 306}
 307
 308static bool
 309ff_lseg_merge(struct pnfs_layout_segment *new,
 310                struct pnfs_layout_segment *old)
 311{
 312        u64 new_end, old_end;
 313
 314        if (test_bit(NFS_LSEG_LAYOUTRETURN, &old->pls_flags))
 315                return false;
 316        if (new->pls_range.iomode != old->pls_range.iomode)
 317                return false;
 318        old_end = pnfs_calc_offset_end(old->pls_range.offset,
 319                        old->pls_range.length);
 320        if (old_end < new->pls_range.offset)
 321                return false;
 322        new_end = pnfs_calc_offset_end(new->pls_range.offset,
 323                        new->pls_range.length);
 324        if (new_end < old->pls_range.offset)
 325                return false;
 326
 327        /* Mergeable: copy info from 'old' to 'new' */
 328        if (new_end < old_end)
 329                new_end = old_end;
 330        if (new->pls_range.offset < old->pls_range.offset)
 331                new->pls_range.offset = old->pls_range.offset;
 332        new->pls_range.length = pnfs_calc_offset_length(new->pls_range.offset,
 333                        new_end);
 334        if (test_bit(NFS_LSEG_ROC, &old->pls_flags))
 335                set_bit(NFS_LSEG_ROC, &new->pls_flags);
 336        return true;
 337}
 338
 339static void
 340ff_layout_add_lseg(struct pnfs_layout_hdr *lo,
 341                struct pnfs_layout_segment *lseg,
 342                struct list_head *free_me)
 343{
 344        pnfs_generic_layout_insert_lseg(lo, lseg,
 345                        ff_lseg_range_is_after,
 346                        ff_lseg_merge,
 347                        free_me);
 348}
 349
 350static void ff_layout_sort_mirrors(struct nfs4_ff_layout_segment *fls)
 351{
 352        int i, j;
 353
 354        for (i = 0; i < fls->mirror_array_cnt - 1; i++) {
 355                for (j = i + 1; j < fls->mirror_array_cnt; j++)
 356                        if (fls->mirror_array[i]->efficiency <
 357                            fls->mirror_array[j]->efficiency)
 358                                swap(fls->mirror_array[i],
 359                                     fls->mirror_array[j]);
 360        }
 361}
 362
 363static struct pnfs_layout_segment *
 364ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
 365                     struct nfs4_layoutget_res *lgr,
 366                     gfp_t gfp_flags)
 367{
 368        struct pnfs_layout_segment *ret;
 369        struct nfs4_ff_layout_segment *fls = NULL;
 370        struct xdr_stream stream;
 371        struct xdr_buf buf;
 372        struct page *scratch;
 373        u64 stripe_unit;
 374        u32 mirror_array_cnt;
 375        __be32 *p;
 376        int i, rc;
 377
 378        dprintk("--> %s\n", __func__);
 379        scratch = alloc_page(gfp_flags);
 380        if (!scratch)
 381                return ERR_PTR(-ENOMEM);
 382
 383        xdr_init_decode_pages(&stream, &buf, lgr->layoutp->pages,
 384                              lgr->layoutp->len);
 385        xdr_set_scratch_buffer(&stream, page_address(scratch), PAGE_SIZE);
 386
 387        /* stripe unit and mirror_array_cnt */
 388        rc = -EIO;
 389        p = xdr_inline_decode(&stream, 8 + 4);
 390        if (!p)
 391                goto out_err_free;
 392
 393        p = xdr_decode_hyper(p, &stripe_unit);
 394        mirror_array_cnt = be32_to_cpup(p++);
 395        dprintk("%s: stripe_unit=%llu mirror_array_cnt=%u\n", __func__,
 396                stripe_unit, mirror_array_cnt);
 397
 398        if (mirror_array_cnt > NFS4_FLEXFILE_LAYOUT_MAX_MIRROR_CNT ||
 399            mirror_array_cnt == 0)
 400                goto out_err_free;
 401
 402        rc = -ENOMEM;
 403        fls = kzalloc(sizeof(*fls), gfp_flags);
 404        if (!fls)
 405                goto out_err_free;
 406
 407        fls->mirror_array_cnt = mirror_array_cnt;
 408        fls->stripe_unit = stripe_unit;
 409        fls->mirror_array = kcalloc(fls->mirror_array_cnt,
 410                                    sizeof(fls->mirror_array[0]), gfp_flags);
 411        if (fls->mirror_array == NULL)
 412                goto out_err_free;
 413
 414        for (i = 0; i < fls->mirror_array_cnt; i++) {
 415                struct nfs4_ff_layout_mirror *mirror;
 416                struct auth_cred acred = { .group_info = ff_zero_group };
 417                struct rpc_cred __rcu *cred;
 418                u32 ds_count, fh_count, id;
 419                int j;
 420
 421                rc = -EIO;
 422                p = xdr_inline_decode(&stream, 4);
 423                if (!p)
 424                        goto out_err_free;
 425                ds_count = be32_to_cpup(p);
 426
 427                /* FIXME: allow for striping? */
 428                if (ds_count != 1)
 429                        goto out_err_free;
 430
 431                fls->mirror_array[i] = ff_layout_alloc_mirror(gfp_flags);
 432                if (fls->mirror_array[i] == NULL) {
 433                        rc = -ENOMEM;
 434                        goto out_err_free;
 435                }
 436
 437                fls->mirror_array[i]->ds_count = ds_count;
 438
 439                /* deviceid */
 440                rc = decode_deviceid(&stream, &fls->mirror_array[i]->devid);
 441                if (rc)
 442                        goto out_err_free;
 443
 444                /* efficiency */
 445                rc = -EIO;
 446                p = xdr_inline_decode(&stream, 4);
 447                if (!p)
 448                        goto out_err_free;
 449                fls->mirror_array[i]->efficiency = be32_to_cpup(p);
 450
 451                /* stateid */
 452                rc = decode_pnfs_stateid(&stream, &fls->mirror_array[i]->stateid);
 453                if (rc)
 454                        goto out_err_free;
 455
 456                /* fh */
 457                rc = -EIO;
 458                p = xdr_inline_decode(&stream, 4);
 459                if (!p)
 460                        goto out_err_free;
 461                fh_count = be32_to_cpup(p);
 462
 463                fls->mirror_array[i]->fh_versions =
 464                        kzalloc(fh_count * sizeof(struct nfs_fh),
 465                                gfp_flags);
 466                if (fls->mirror_array[i]->fh_versions == NULL) {
 467                        rc = -ENOMEM;
 468                        goto out_err_free;
 469                }
 470
 471                for (j = 0; j < fh_count; j++) {
 472                        rc = decode_nfs_fh(&stream,
 473                                           &fls->mirror_array[i]->fh_versions[j]);
 474                        if (rc)
 475                                goto out_err_free;
 476                }
 477
 478                fls->mirror_array[i]->fh_versions_cnt = fh_count;
 479
 480                /* user */
 481                rc = decode_name(&stream, &id);
 482                if (rc)
 483                        goto out_err_free;
 484
 485                acred.uid = make_kuid(&init_user_ns, id);
 486
 487                /* group */
 488                rc = decode_name(&stream, &id);
 489                if (rc)
 490                        goto out_err_free;
 491
 492                acred.gid = make_kgid(&init_user_ns, id);
 493
 494                /* find the cred for it */
 495                rcu_assign_pointer(cred, rpc_lookup_generic_cred(&acred, 0, gfp_flags));
 496                if (IS_ERR(cred)) {
 497                        rc = PTR_ERR(cred);
 498                        goto out_err_free;
 499                }
 500
 501                if (lgr->range.iomode == IOMODE_READ)
 502                        rcu_assign_pointer(fls->mirror_array[i]->ro_cred, cred);
 503                else
 504                        rcu_assign_pointer(fls->mirror_array[i]->rw_cred, cred);
 505
 506                mirror = ff_layout_add_mirror(lh, fls->mirror_array[i]);
 507                if (mirror != fls->mirror_array[i]) {
 508                        /* swap cred ptrs so free_mirror will clean up old */
 509                        if (lgr->range.iomode == IOMODE_READ) {
 510                                cred = xchg(&mirror->ro_cred, cred);
 511                                rcu_assign_pointer(fls->mirror_array[i]->ro_cred, cred);
 512                        } else {
 513                                cred = xchg(&mirror->rw_cred, cred);
 514                                rcu_assign_pointer(fls->mirror_array[i]->rw_cred, cred);
 515                        }
 516                        ff_layout_free_mirror(fls->mirror_array[i]);
 517                        fls->mirror_array[i] = mirror;
 518                }
 519
 520                dprintk("%s: iomode %s uid %u gid %u\n", __func__,
 521                        lgr->range.iomode == IOMODE_READ ? "READ" : "RW",
 522                        from_kuid(&init_user_ns, acred.uid),
 523                        from_kgid(&init_user_ns, acred.gid));
 524        }
 525
 526        p = xdr_inline_decode(&stream, 4);
 527        if (!p)
 528                goto out_sort_mirrors;
 529        fls->flags = be32_to_cpup(p);
 530
 531        p = xdr_inline_decode(&stream, 4);
 532        if (!p)
 533                goto out_sort_mirrors;
 534        for (i=0; i < fls->mirror_array_cnt; i++)
 535                fls->mirror_array[i]->report_interval = be32_to_cpup(p);
 536
 537out_sort_mirrors:
 538        ff_layout_sort_mirrors(fls);
 539        rc = ff_layout_check_layout(lgr);
 540        if (rc)
 541                goto out_err_free;
 542        ret = &fls->generic_hdr;
 543        dprintk("<-- %s (success)\n", __func__);
 544out_free_page:
 545        __free_page(scratch);
 546        return ret;
 547out_err_free:
 548        _ff_layout_free_lseg(fls);
 549        ret = ERR_PTR(rc);
 550        dprintk("<-- %s (%d)\n", __func__, rc);
 551        goto out_free_page;
 552}
 553
 554static bool ff_layout_has_rw_segments(struct pnfs_layout_hdr *layout)
 555{
 556        struct pnfs_layout_segment *lseg;
 557
 558        list_for_each_entry(lseg, &layout->plh_segs, pls_list)
 559                if (lseg->pls_range.iomode == IOMODE_RW)
 560                        return true;
 561
 562        return false;
 563}
 564
 565static void
 566ff_layout_free_lseg(struct pnfs_layout_segment *lseg)
 567{
 568        struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg);
 569
 570        dprintk("--> %s\n", __func__);
 571
 572        if (lseg->pls_range.iomode == IOMODE_RW) {
 573                struct nfs4_flexfile_layout *ffl;
 574                struct inode *inode;
 575
 576                ffl = FF_LAYOUT_FROM_HDR(lseg->pls_layout);
 577                inode = ffl->generic_hdr.plh_inode;
 578                spin_lock(&inode->i_lock);
 579                if (!ff_layout_has_rw_segments(lseg->pls_layout)) {
 580                        ffl->commit_info.nbuckets = 0;
 581                        kfree(ffl->commit_info.buckets);
 582                        ffl->commit_info.buckets = NULL;
 583                }
 584                spin_unlock(&inode->i_lock);
 585        }
 586        _ff_layout_free_lseg(fls);
 587}
 588
 589/* Return 1 until we have multiple lsegs support */
 590static int
 591ff_layout_get_lseg_count(struct nfs4_ff_layout_segment *fls)
 592{
 593        return 1;
 594}
 595
 596static void
 597nfs4_ff_start_busy_timer(struct nfs4_ff_busy_timer *timer, ktime_t now)
 598{
 599        /* first IO request? */
 600        if (atomic_inc_return(&timer->n_ops) == 1) {
 601                timer->start_time = now;
 602        }
 603}
 604
 605static ktime_t
 606nfs4_ff_end_busy_timer(struct nfs4_ff_busy_timer *timer, ktime_t now)
 607{
 608        ktime_t start;
 609
 610        if (atomic_dec_return(&timer->n_ops) < 0)
 611                WARN_ON_ONCE(1);
 612
 613        start = timer->start_time;
 614        timer->start_time = now;
 615        return ktime_sub(now, start);
 616}
 617
 618static bool
 619nfs4_ff_layoutstat_start_io(struct nfs4_ff_layout_mirror *mirror,
 620                            struct nfs4_ff_layoutstat *layoutstat,
 621                            ktime_t now)
 622{
 623        s64 report_interval = FF_LAYOUTSTATS_REPORT_INTERVAL;
 624        struct nfs4_flexfile_layout *ffl = FF_LAYOUT_FROM_HDR(mirror->layout);
 625
 626        nfs4_ff_start_busy_timer(&layoutstat->busy_timer, now);
 627        if (!mirror->start_time)
 628                mirror->start_time = now;
 629        if (mirror->report_interval != 0)
 630                report_interval = (s64)mirror->report_interval * 1000LL;
 631        else if (layoutstats_timer != 0)
 632                report_interval = (s64)layoutstats_timer * 1000LL;
 633        if (ktime_to_ms(ktime_sub(now, ffl->last_report_time)) >=
 634                        report_interval) {
 635                ffl->last_report_time = now;
 636                return true;
 637        }
 638
 639        return false;
 640}
 641
 642static void
 643nfs4_ff_layout_stat_io_update_requested(struct nfs4_ff_layoutstat *layoutstat,
 644                __u64 requested)
 645{
 646        struct nfs4_ff_io_stat *iostat = &layoutstat->io_stat;
 647
 648        iostat->ops_requested++;
 649        iostat->bytes_requested += requested;
 650}
 651
 652static void
 653nfs4_ff_layout_stat_io_update_completed(struct nfs4_ff_layoutstat *layoutstat,
 654                __u64 requested,
 655                __u64 completed,
 656                ktime_t time_completed,
 657                ktime_t time_started)
 658{
 659        struct nfs4_ff_io_stat *iostat = &layoutstat->io_stat;
 660        ktime_t completion_time = ktime_sub(time_completed, time_started);
 661        ktime_t timer;
 662
 663        iostat->ops_completed++;
 664        iostat->bytes_completed += completed;
 665        iostat->bytes_not_delivered += requested - completed;
 666
 667        timer = nfs4_ff_end_busy_timer(&layoutstat->busy_timer, time_completed);
 668        iostat->total_busy_time =
 669                        ktime_add(iostat->total_busy_time, timer);
 670        iostat->aggregate_completion_time =
 671                        ktime_add(iostat->aggregate_completion_time,
 672                                        completion_time);
 673}
 674
 675static void
 676nfs4_ff_layout_stat_io_start_read(struct inode *inode,
 677                struct nfs4_ff_layout_mirror *mirror,
 678                __u64 requested, ktime_t now)
 679{
 680        bool report;
 681
 682        spin_lock(&mirror->lock);
 683        report = nfs4_ff_layoutstat_start_io(mirror, &mirror->read_stat, now);
 684        nfs4_ff_layout_stat_io_update_requested(&mirror->read_stat, requested);
 685        set_bit(NFS4_FF_MIRROR_STAT_AVAIL, &mirror->flags);
 686        spin_unlock(&mirror->lock);
 687
 688        if (report)
 689                pnfs_report_layoutstat(inode, GFP_KERNEL);
 690}
 691
 692static void
 693nfs4_ff_layout_stat_io_end_read(struct rpc_task *task,
 694                struct nfs4_ff_layout_mirror *mirror,
 695                __u64 requested,
 696                __u64 completed)
 697{
 698        spin_lock(&mirror->lock);
 699        nfs4_ff_layout_stat_io_update_completed(&mirror->read_stat,
 700                        requested, completed,
 701                        ktime_get(), task->tk_start);
 702        set_bit(NFS4_FF_MIRROR_STAT_AVAIL, &mirror->flags);
 703        spin_unlock(&mirror->lock);
 704}
 705
 706static void
 707nfs4_ff_layout_stat_io_start_write(struct inode *inode,
 708                struct nfs4_ff_layout_mirror *mirror,
 709                __u64 requested, ktime_t now)
 710{
 711        bool report;
 712
 713        spin_lock(&mirror->lock);
 714        report = nfs4_ff_layoutstat_start_io(mirror , &mirror->write_stat, now);
 715        nfs4_ff_layout_stat_io_update_requested(&mirror->write_stat, requested);
 716        set_bit(NFS4_FF_MIRROR_STAT_AVAIL, &mirror->flags);
 717        spin_unlock(&mirror->lock);
 718
 719        if (report)
 720                pnfs_report_layoutstat(inode, GFP_NOIO);
 721}
 722
 723static void
 724nfs4_ff_layout_stat_io_end_write(struct rpc_task *task,
 725                struct nfs4_ff_layout_mirror *mirror,
 726                __u64 requested,
 727                __u64 completed,
 728                enum nfs3_stable_how committed)
 729{
 730        if (committed == NFS_UNSTABLE)
 731                requested = completed = 0;
 732
 733        spin_lock(&mirror->lock);
 734        nfs4_ff_layout_stat_io_update_completed(&mirror->write_stat,
 735                        requested, completed, ktime_get(), task->tk_start);
 736        set_bit(NFS4_FF_MIRROR_STAT_AVAIL, &mirror->flags);
 737        spin_unlock(&mirror->lock);
 738}
 739
 740static int
 741ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg,
 742                            struct nfs_commit_info *cinfo,
 743                            gfp_t gfp_flags)
 744{
 745        struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg);
 746        struct pnfs_commit_bucket *buckets;
 747        int size;
 748
 749        if (cinfo->ds->nbuckets != 0) {
 750                /* This assumes there is only one RW lseg per file.
 751                 * To support multiple lseg per file, we need to
 752                 * change struct pnfs_commit_bucket to allow dynamic
 753                 * increasing nbuckets.
 754                 */
 755                return 0;
 756        }
 757
 758        size = ff_layout_get_lseg_count(fls) * FF_LAYOUT_MIRROR_COUNT(lseg);
 759
 760        buckets = kcalloc(size, sizeof(struct pnfs_commit_bucket),
 761                          gfp_flags);
 762        if (!buckets)
 763                return -ENOMEM;
 764        else {
 765                int i;
 766
 767                spin_lock(&cinfo->inode->i_lock);
 768                if (cinfo->ds->nbuckets != 0)
 769                        kfree(buckets);
 770                else {
 771                        cinfo->ds->buckets = buckets;
 772                        cinfo->ds->nbuckets = size;
 773                        for (i = 0; i < size; i++) {
 774                                INIT_LIST_HEAD(&buckets[i].written);
 775                                INIT_LIST_HEAD(&buckets[i].committing);
 776                                /* mark direct verifier as unset */
 777                                buckets[i].direct_verf.committed =
 778                                        NFS_INVALID_STABLE_HOW;
 779                        }
 780                }
 781                spin_unlock(&cinfo->inode->i_lock);
 782                return 0;
 783        }
 784}
 785
 786static struct nfs4_pnfs_ds *
 787ff_layout_choose_best_ds_for_read(struct pnfs_layout_segment *lseg,
 788                                  int start_idx,
 789                                  int *best_idx)
 790{
 791        struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg);
 792        struct nfs4_pnfs_ds *ds;
 793        bool fail_return = false;
 794        int idx;
 795
 796        /* mirrors are sorted by efficiency */
 797        for (idx = start_idx; idx < fls->mirror_array_cnt; idx++) {
 798                if (idx+1 == fls->mirror_array_cnt)
 799                        fail_return = true;
 800                ds = nfs4_ff_layout_prepare_ds(lseg, idx, fail_return);
 801                if (ds) {
 802                        *best_idx = idx;
 803                        return ds;
 804                }
 805        }
 806
 807        return NULL;
 808}
 809
 810static void
 811ff_layout_pg_get_read(struct nfs_pageio_descriptor *pgio,
 812                      struct nfs_page *req,
 813                      bool strict_iomode)
 814{
 815retry_strict:
 816        pnfs_put_lseg(pgio->pg_lseg);
 817        pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
 818                                           req->wb_context,
 819                                           0,
 820                                           NFS4_MAX_UINT64,
 821                                           IOMODE_READ,
 822                                           strict_iomode,
 823                                           GFP_KERNEL);
 824        if (IS_ERR(pgio->pg_lseg)) {
 825                pgio->pg_error = PTR_ERR(pgio->pg_lseg);
 826                pgio->pg_lseg = NULL;
 827        }
 828
 829        /* If we don't have checking, do get a IOMODE_RW
 830         * segment, and the server wants to avoid READs
 831         * there, then retry!
 832         */
 833        if (pgio->pg_lseg && !strict_iomode &&
 834            ff_layout_avoid_read_on_rw(pgio->pg_lseg)) {
 835                strict_iomode = true;
 836                goto retry_strict;
 837        }
 838}
 839
 840static void
 841ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 842                        struct nfs_page *req)
 843{
 844        struct nfs_pgio_mirror *pgm;
 845        struct nfs4_ff_layout_mirror *mirror;
 846        struct nfs4_pnfs_ds *ds;
 847        int ds_idx;
 848
 849retry:
 850        pnfs_generic_pg_check_layout(pgio);
 851        /* Use full layout for now */
 852        if (!pgio->pg_lseg)
 853                ff_layout_pg_get_read(pgio, req, false);
 854        else if (ff_layout_avoid_read_on_rw(pgio->pg_lseg))
 855                ff_layout_pg_get_read(pgio, req, true);
 856
 857        /* If no lseg, fall back to read through mds */
 858        if (pgio->pg_lseg == NULL)
 859                goto out_mds;
 860
 861        ds = ff_layout_choose_best_ds_for_read(pgio->pg_lseg, 0, &ds_idx);
 862        if (!ds) {
 863                if (!ff_layout_no_fallback_to_mds(pgio->pg_lseg))
 864                        goto out_mds;
 865                pnfs_put_lseg(pgio->pg_lseg);
 866                pgio->pg_lseg = NULL;
 867                /* Sleep for 1 second before retrying */
 868                ssleep(1);
 869                goto retry;
 870        }
 871
 872        mirror = FF_LAYOUT_COMP(pgio->pg_lseg, ds_idx);
 873
 874        pgio->pg_mirror_idx = ds_idx;
 875
 876        /* read always uses only one mirror - idx 0 for pgio layer */
 877        pgm = &pgio->pg_mirrors[0];
 878        pgm->pg_bsize = mirror->mirror_ds->ds_versions[0].rsize;
 879
 880        return;
 881out_mds:
 882        pnfs_put_lseg(pgio->pg_lseg);
 883        pgio->pg_lseg = NULL;
 884        nfs_pageio_reset_read_mds(pgio);
 885}
 886
 887static void
 888ff_layout_pg_init_write(struct nfs_pageio_descriptor *pgio,
 889                        struct nfs_page *req)
 890{
 891        struct nfs4_ff_layout_mirror *mirror;
 892        struct nfs_pgio_mirror *pgm;
 893        struct nfs_commit_info cinfo;
 894        struct nfs4_pnfs_ds *ds;
 895        int i;
 896        int status;
 897
 898retry:
 899        pnfs_generic_pg_check_layout(pgio);
 900        if (!pgio->pg_lseg) {
 901                pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
 902                                                   req->wb_context,
 903                                                   0,
 904                                                   NFS4_MAX_UINT64,
 905                                                   IOMODE_RW,
 906                                                   false,
 907                                                   GFP_NOFS);
 908                if (IS_ERR(pgio->pg_lseg)) {
 909                        pgio->pg_error = PTR_ERR(pgio->pg_lseg);
 910                        pgio->pg_lseg = NULL;
 911                        return;
 912                }
 913        }
 914        /* If no lseg, fall back to write through mds */
 915        if (pgio->pg_lseg == NULL)
 916                goto out_mds;
 917
 918        nfs_init_cinfo(&cinfo, pgio->pg_inode, pgio->pg_dreq);
 919        status = ff_layout_alloc_commit_info(pgio->pg_lseg, &cinfo, GFP_NOFS);
 920        if (status < 0)
 921                goto out_mds;
 922
 923        /* Use a direct mapping of ds_idx to pgio mirror_idx */
 924        if (WARN_ON_ONCE(pgio->pg_mirror_count !=
 925            FF_LAYOUT_MIRROR_COUNT(pgio->pg_lseg)))
 926                goto out_mds;
 927
 928        for (i = 0; i < pgio->pg_mirror_count; i++) {
 929                ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, i, true);
 930                if (!ds) {
 931                        if (!ff_layout_no_fallback_to_mds(pgio->pg_lseg))
 932                                goto out_mds;
 933                        pnfs_put_lseg(pgio->pg_lseg);
 934                        pgio->pg_lseg = NULL;
 935                        /* Sleep for 1 second before retrying */
 936                        ssleep(1);
 937                        goto retry;
 938                }
 939                pgm = &pgio->pg_mirrors[i];
 940                mirror = FF_LAYOUT_COMP(pgio->pg_lseg, i);
 941                pgm->pg_bsize = mirror->mirror_ds->ds_versions[0].wsize;
 942        }
 943
 944        return;
 945
 946out_mds:
 947        pnfs_put_lseg(pgio->pg_lseg);
 948        pgio->pg_lseg = NULL;
 949        nfs_pageio_reset_write_mds(pgio);
 950}
 951
 952static unsigned int
 953ff_layout_pg_get_mirror_count_write(struct nfs_pageio_descriptor *pgio,
 954                                    struct nfs_page *req)
 955{
 956        if (!pgio->pg_lseg) {
 957                pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
 958                                                   req->wb_context,
 959                                                   0,
 960                                                   NFS4_MAX_UINT64,
 961                                                   IOMODE_RW,
 962                                                   false,
 963                                                   GFP_NOFS);
 964                if (IS_ERR(pgio->pg_lseg)) {
 965                        pgio->pg_error = PTR_ERR(pgio->pg_lseg);
 966                        pgio->pg_lseg = NULL;
 967                        goto out;
 968                }
 969        }
 970        if (pgio->pg_lseg)
 971                return FF_LAYOUT_MIRROR_COUNT(pgio->pg_lseg);
 972
 973        /* no lseg means that pnfs is not in use, so no mirroring here */
 974        nfs_pageio_reset_write_mds(pgio);
 975out:
 976        return 1;
 977}
 978
 979static const struct nfs_pageio_ops ff_layout_pg_read_ops = {
 980        .pg_init = ff_layout_pg_init_read,
 981        .pg_test = pnfs_generic_pg_test,
 982        .pg_doio = pnfs_generic_pg_readpages,
 983        .pg_cleanup = pnfs_generic_pg_cleanup,
 984};
 985
 986static const struct nfs_pageio_ops ff_layout_pg_write_ops = {
 987        .pg_init = ff_layout_pg_init_write,
 988        .pg_test = pnfs_generic_pg_test,
 989        .pg_doio = pnfs_generic_pg_writepages,
 990        .pg_get_mirror_count = ff_layout_pg_get_mirror_count_write,
 991        .pg_cleanup = pnfs_generic_pg_cleanup,
 992};
 993
 994static void ff_layout_reset_write(struct nfs_pgio_header *hdr, bool retry_pnfs)
 995{
 996        struct rpc_task *task = &hdr->task;
 997
 998        pnfs_layoutcommit_inode(hdr->inode, false);
 999
1000        if (retry_pnfs) {
1001                dprintk("%s Reset task %5u for i/o through pNFS "
1002                        "(req %s/%llu, %u bytes @ offset %llu)\n", __func__,
1003                        hdr->task.tk_pid,
1004                        hdr->inode->i_sb->s_id,
1005                        (unsigned long long)NFS_FILEID(hdr->inode),
1006                        hdr->args.count,
1007                        (unsigned long long)hdr->args.offset);
1008
1009                hdr->completion_ops->reschedule_io(hdr);
1010                return;
1011        }
1012
1013        if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
1014                dprintk("%s Reset task %5u for i/o through MDS "
1015                        "(req %s/%llu, %u bytes @ offset %llu)\n", __func__,
1016                        hdr->task.tk_pid,
1017                        hdr->inode->i_sb->s_id,
1018                        (unsigned long long)NFS_FILEID(hdr->inode),
1019                        hdr->args.count,
1020                        (unsigned long long)hdr->args.offset);
1021
1022                task->tk_status = pnfs_write_done_resend_to_mds(hdr);
1023        }
1024}
1025
1026static void ff_layout_reset_read(struct nfs_pgio_header *hdr)
1027{
1028        struct rpc_task *task = &hdr->task;
1029
1030        pnfs_layoutcommit_inode(hdr->inode, false);
1031
1032        if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
1033                dprintk("%s Reset task %5u for i/o through MDS "
1034                        "(req %s/%llu, %u bytes @ offset %llu)\n", __func__,
1035                        hdr->task.tk_pid,
1036                        hdr->inode->i_sb->s_id,
1037                        (unsigned long long)NFS_FILEID(hdr->inode),
1038                        hdr->args.count,
1039                        (unsigned long long)hdr->args.offset);
1040
1041                task->tk_status = pnfs_read_done_resend_to_mds(hdr);
1042        }
1043}
1044
1045static int ff_layout_async_handle_error_v4(struct rpc_task *task,
1046                                           struct nfs4_state *state,
1047                                           struct nfs_client *clp,
1048                                           struct pnfs_layout_segment *lseg,
1049                                           int idx)
1050{
1051        struct pnfs_layout_hdr *lo = lseg->pls_layout;
1052        struct inode *inode = lo->plh_inode;
1053        struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx);
1054        struct nfs4_slot_table *tbl = &clp->cl_session->fc_slot_table;
1055
1056        switch (task->tk_status) {
1057        case -NFS4ERR_BADSESSION:
1058        case -NFS4ERR_BADSLOT:
1059        case -NFS4ERR_BAD_HIGH_SLOT:
1060        case -NFS4ERR_DEADSESSION:
1061        case -NFS4ERR_CONN_NOT_BOUND_TO_SESSION:
1062        case -NFS4ERR_SEQ_FALSE_RETRY:
1063        case -NFS4ERR_SEQ_MISORDERED:
1064                dprintk("%s ERROR %d, Reset session. Exchangeid "
1065                        "flags 0x%x\n", __func__, task->tk_status,
1066                        clp->cl_exchange_flags);
1067                nfs4_schedule_session_recovery(clp->cl_session, task->tk_status);
1068                break;
1069        case -NFS4ERR_DELAY:
1070        case -NFS4ERR_GRACE:
1071                rpc_delay(task, FF_LAYOUT_POLL_RETRY_MAX);
1072                break;
1073        case -NFS4ERR_RETRY_UNCACHED_REP:
1074                break;
1075        /* Invalidate Layout errors */
1076        case -NFS4ERR_PNFS_NO_LAYOUT:
1077        case -ESTALE:           /* mapped NFS4ERR_STALE */
1078        case -EBADHANDLE:       /* mapped NFS4ERR_BADHANDLE */
1079        case -EISDIR:           /* mapped NFS4ERR_ISDIR */
1080        case -NFS4ERR_FHEXPIRED:
1081        case -NFS4ERR_WRONG_TYPE:
1082                dprintk("%s Invalid layout error %d\n", __func__,
1083                        task->tk_status);
1084                /*
1085                 * Destroy layout so new i/o will get a new layout.
1086                 * Layout will not be destroyed until all current lseg
1087                 * references are put. Mark layout as invalid to resend failed
1088                 * i/o and all i/o waiting on the slot table to the MDS until
1089                 * layout is destroyed and a new valid layout is obtained.
1090                 */
1091                pnfs_destroy_layout(NFS_I(inode));
1092                rpc_wake_up(&tbl->slot_tbl_waitq);
1093                goto reset;
1094        /* RPC connection errors */
1095        case -ECONNREFUSED:
1096        case -EHOSTDOWN:
1097        case -EHOSTUNREACH:
1098        case -ENETUNREACH:
1099        case -EIO:
1100        case -ETIMEDOUT:
1101        case -EPIPE:
1102                dprintk("%s DS connection error %d\n", __func__,
1103                        task->tk_status);
1104                nfs4_delete_deviceid(devid->ld, devid->nfs_client,
1105                                &devid->deviceid);
1106                rpc_wake_up(&tbl->slot_tbl_waitq);
1107                /* fall through */
1108        default:
1109                if (ff_layout_avoid_mds_available_ds(lseg))
1110                        return -NFS4ERR_RESET_TO_PNFS;
1111reset:
1112                dprintk("%s Retry through MDS. Error %d\n", __func__,
1113                        task->tk_status);
1114                return -NFS4ERR_RESET_TO_MDS;
1115        }
1116        task->tk_status = 0;
1117        return -EAGAIN;
1118}
1119
1120/* Retry all errors through either pNFS or MDS except for -EJUKEBOX */
1121static int ff_layout_async_handle_error_v3(struct rpc_task *task,
1122                                           struct pnfs_layout_segment *lseg,
1123                                           int idx)
1124{
1125        struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx);
1126
1127        switch (task->tk_status) {
1128        /* File access problems. Don't mark the device as unavailable */
1129        case -EACCES:
1130        case -ESTALE:
1131        case -EISDIR:
1132        case -EBADHANDLE:
1133        case -ELOOP:
1134        case -ENOSPC:
1135                break;
1136        case -EJUKEBOX:
1137                nfs_inc_stats(lseg->pls_layout->plh_inode, NFSIOS_DELAY);
1138                goto out_retry;
1139        default:
1140                dprintk("%s DS connection error %d\n", __func__,
1141                        task->tk_status);
1142                nfs4_delete_deviceid(devid->ld, devid->nfs_client,
1143                                &devid->deviceid);
1144        }
1145        /* FIXME: Need to prevent infinite looping here. */
1146        return -NFS4ERR_RESET_TO_PNFS;
1147out_retry:
1148        task->tk_status = 0;
1149        rpc_restart_call_prepare(task);
1150        rpc_delay(task, NFS_JUKEBOX_RETRY_TIME);
1151        return -EAGAIN;
1152}
1153
1154static int ff_layout_async_handle_error(struct rpc_task *task,
1155                                        struct nfs4_state *state,
1156                                        struct nfs_client *clp,
1157                                        struct pnfs_layout_segment *lseg,
1158                                        int idx)
1159{
1160        int vers = clp->cl_nfs_mod->rpc_vers->number;
1161
1162        if (task->tk_status >= 0)
1163                return 0;
1164
1165        /* Handle the case of an invalid layout segment */
1166        if (!pnfs_is_valid_lseg(lseg))
1167                return -NFS4ERR_RESET_TO_PNFS;
1168
1169        switch (vers) {
1170        case 3:
1171                return ff_layout_async_handle_error_v3(task, lseg, idx);
1172        case 4:
1173                return ff_layout_async_handle_error_v4(task, state, clp,
1174                                                       lseg, idx);
1175        default:
1176                /* should never happen */
1177                WARN_ON_ONCE(1);
1178                return 0;
1179        }
1180}
1181
1182static void ff_layout_io_track_ds_error(struct pnfs_layout_segment *lseg,
1183                                        int idx, u64 offset, u64 length,
1184                                        u32 status, int opnum, int error)
1185{
1186        struct nfs4_ff_layout_mirror *mirror;
1187        int err;
1188
1189        if (status == 0) {
1190                switch (error) {
1191                case -ETIMEDOUT:
1192                case -EPFNOSUPPORT:
1193                case -EPROTONOSUPPORT:
1194                case -EOPNOTSUPP:
1195                case -ECONNREFUSED:
1196                case -ECONNRESET:
1197                case -EHOSTDOWN:
1198                case -EHOSTUNREACH:
1199                case -ENETUNREACH:
1200                case -EADDRINUSE:
1201                case -ENOBUFS:
1202                case -EPIPE:
1203                case -EPERM:
1204                        status = NFS4ERR_NXIO;
1205                        break;
1206                case -EACCES:
1207                        status = NFS4ERR_ACCESS;
1208                        break;
1209                default:
1210                        return;
1211                }
1212        }
1213
1214        switch (status) {
1215        case NFS4ERR_DELAY:
1216        case NFS4ERR_GRACE:
1217                return;
1218        default:
1219                break;
1220        }
1221
1222        mirror = FF_LAYOUT_COMP(lseg, idx);
1223        err = ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
1224                                       mirror, offset, length, status, opnum,
1225                                       GFP_NOIO);
1226        pnfs_error_mark_layout_for_return(lseg->pls_layout->plh_inode, lseg);
1227        dprintk("%s: err %d op %d status %u\n", __func__, err, opnum, status);
1228}
1229
1230/* NFS_PROTO call done callback routines */
1231static int ff_layout_read_done_cb(struct rpc_task *task,
1232                                struct nfs_pgio_header *hdr)
1233{
1234        int err;
1235
1236        trace_nfs4_pnfs_read(hdr, task->tk_status);
1237        if (task->tk_status < 0)
1238                ff_layout_io_track_ds_error(hdr->lseg, hdr->pgio_mirror_idx,
1239                                            hdr->args.offset, hdr->args.count,
1240                                            hdr->res.op_status, OP_READ,
1241                                            task->tk_status);
1242        err = ff_layout_async_handle_error(task, hdr->args.context->state,
1243                                           hdr->ds_clp, hdr->lseg,
1244                                           hdr->pgio_mirror_idx);
1245
1246        switch (err) {
1247        case -NFS4ERR_RESET_TO_PNFS:
1248                if (ff_layout_choose_best_ds_for_read(hdr->lseg,
1249                                        hdr->pgio_mirror_idx + 1,
1250                                        &hdr->pgio_mirror_idx))
1251                        goto out_eagain;
1252                ff_layout_read_record_layoutstats_done(task, hdr);
1253                pnfs_read_resend_pnfs(hdr);
1254                return task->tk_status;
1255        case -NFS4ERR_RESET_TO_MDS:
1256                ff_layout_reset_read(hdr);
1257                return task->tk_status;
1258        case -EAGAIN:
1259                goto out_eagain;
1260        }
1261
1262        return 0;
1263out_eagain:
1264        rpc_restart_call_prepare(task);
1265        return -EAGAIN;
1266}
1267
1268static bool
1269ff_layout_need_layoutcommit(struct pnfs_layout_segment *lseg)
1270{
1271        return !(FF_LAYOUT_LSEG(lseg)->flags & FF_FLAGS_NO_LAYOUTCOMMIT);
1272}
1273
1274/*
1275 * We reference the rpc_cred of the first WRITE that triggers the need for
1276 * a LAYOUTCOMMIT, and use it to send the layoutcommit compound.
1277 * rfc5661 is not clear about which credential should be used.
1278 *
1279 * Flexlayout client should treat DS replied FILE_SYNC as DATA_SYNC, so
1280 * to follow http://www.rfc-editor.org/errata_search.php?rfc=5661&eid=2751
1281 * we always send layoutcommit after DS writes.
1282 */
1283static void
1284ff_layout_set_layoutcommit(struct inode *inode,
1285                struct pnfs_layout_segment *lseg,
1286                loff_t end_offset)
1287{
1288        if (!ff_layout_need_layoutcommit(lseg))
1289                return;
1290
1291        pnfs_set_layoutcommit(inode, lseg, end_offset);
1292        dprintk("%s inode %lu pls_end_pos %llu\n", __func__, inode->i_ino,
1293                (unsigned long long) NFS_I(inode)->layout->plh_lwb);
1294}
1295
1296static bool
1297ff_layout_device_unavailable(struct pnfs_layout_segment *lseg, int idx)
1298{
1299        /* No mirroring for now */
1300        struct nfs4_deviceid_node *node = FF_LAYOUT_DEVID_NODE(lseg, idx);
1301
1302        return ff_layout_test_devid_unavailable(node);
1303}
1304
1305static void ff_layout_read_record_layoutstats_start(struct rpc_task *task,
1306                struct nfs_pgio_header *hdr)
1307{
1308        if (test_and_set_bit(NFS_IOHDR_STAT, &hdr->flags))
1309                return;
1310        nfs4_ff_layout_stat_io_start_read(hdr->inode,
1311                        FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
1312                        hdr->args.count,
1313                        task->tk_start);
1314}
1315
1316static void ff_layout_read_record_layoutstats_done(struct rpc_task *task,
1317                struct nfs_pgio_header *hdr)
1318{
1319        if (!test_and_clear_bit(NFS_IOHDR_STAT, &hdr->flags))
1320                return;
1321        nfs4_ff_layout_stat_io_end_read(task,
1322                        FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
1323                        hdr->args.count,
1324                        hdr->res.count);
1325}
1326
1327static int ff_layout_read_prepare_common(struct rpc_task *task,
1328                                         struct nfs_pgio_header *hdr)
1329{
1330        if (unlikely(test_bit(NFS_CONTEXT_BAD, &hdr->args.context->flags))) {
1331                rpc_exit(task, -EIO);
1332                return -EIO;
1333        }
1334        if (ff_layout_device_unavailable(hdr->lseg, hdr->pgio_mirror_idx)) {
1335                rpc_exit(task, -EHOSTDOWN);
1336                return -EAGAIN;
1337        }
1338
1339        ff_layout_read_record_layoutstats_start(task, hdr);
1340        return 0;
1341}
1342
1343/*
1344 * Call ops for the async read/write cases
1345 * In the case of dense layouts, the offset needs to be reset to its
1346 * original value.
1347 */
1348static void ff_layout_read_prepare_v3(struct rpc_task *task, void *data)
1349{
1350        struct nfs_pgio_header *hdr = data;
1351
1352        if (ff_layout_read_prepare_common(task, hdr))
1353                return;
1354
1355        rpc_call_start(task);
1356}
1357
1358static void ff_layout_read_prepare_v4(struct rpc_task *task, void *data)
1359{
1360        struct nfs_pgio_header *hdr = data;
1361
1362        if (nfs4_setup_sequence(hdr->ds_clp,
1363                                &hdr->args.seq_args,
1364                                &hdr->res.seq_res,
1365                                task))
1366                return;
1367
1368        if (ff_layout_read_prepare_common(task, hdr))
1369                return;
1370
1371        if (nfs4_set_rw_stateid(&hdr->args.stateid, hdr->args.context,
1372                        hdr->args.lock_context, FMODE_READ) == -EIO)
1373                rpc_exit(task, -EIO); /* lost lock, terminate I/O */
1374}
1375
1376static void ff_layout_read_call_done(struct rpc_task *task, void *data)
1377{
1378        struct nfs_pgio_header *hdr = data;
1379
1380        dprintk("--> %s task->tk_status %d\n", __func__, task->tk_status);
1381
1382        if (test_bit(NFS_IOHDR_REDO, &hdr->flags) &&
1383            task->tk_status == 0) {
1384                nfs4_sequence_done(task, &hdr->res.seq_res);
1385                return;
1386        }
1387
1388        /* Note this may cause RPC to be resent */
1389        hdr->mds_ops->rpc_call_done(task, hdr);
1390}
1391
1392static void ff_layout_read_count_stats(struct rpc_task *task, void *data)
1393{
1394        struct nfs_pgio_header *hdr = data;
1395
1396        ff_layout_read_record_layoutstats_done(task, hdr);
1397        rpc_count_iostats_metrics(task,
1398            &NFS_CLIENT(hdr->inode)->cl_metrics[NFSPROC4_CLNT_READ]);
1399}
1400
1401static void ff_layout_read_release(void *data)
1402{
1403        struct nfs_pgio_header *hdr = data;
1404
1405        ff_layout_read_record_layoutstats_done(&hdr->task, hdr);
1406        pnfs_generic_rw_release(data);
1407}
1408
1409
1410static int ff_layout_write_done_cb(struct rpc_task *task,
1411                                struct nfs_pgio_header *hdr)
1412{
1413        loff_t end_offs = 0;
1414        int err;
1415
1416        trace_nfs4_pnfs_write(hdr, task->tk_status);
1417        if (task->tk_status < 0)
1418                ff_layout_io_track_ds_error(hdr->lseg, hdr->pgio_mirror_idx,
1419                                            hdr->args.offset, hdr->args.count,
1420                                            hdr->res.op_status, OP_WRITE,
1421                                            task->tk_status);
1422        err = ff_layout_async_handle_error(task, hdr->args.context->state,
1423                                           hdr->ds_clp, hdr->lseg,
1424                                           hdr->pgio_mirror_idx);
1425
1426        switch (err) {
1427        case -NFS4ERR_RESET_TO_PNFS:
1428                ff_layout_reset_write(hdr, true);
1429                return task->tk_status;
1430        case -NFS4ERR_RESET_TO_MDS:
1431                ff_layout_reset_write(hdr, false);
1432                return task->tk_status;
1433        case -EAGAIN:
1434                return -EAGAIN;
1435        }
1436
1437        if (hdr->res.verf->committed == NFS_FILE_SYNC ||
1438            hdr->res.verf->committed == NFS_DATA_SYNC)
1439                end_offs = hdr->mds_offset + (loff_t)hdr->res.count;
1440
1441        /* Note: if the write is unstable, don't set end_offs until commit */
1442        ff_layout_set_layoutcommit(hdr->inode, hdr->lseg, end_offs);
1443
1444        /* zero out fattr since we don't care DS attr at all */
1445        hdr->fattr.valid = 0;
1446        if (task->tk_status >= 0)
1447                nfs_writeback_update_inode(hdr);
1448
1449        return 0;
1450}
1451
1452static int ff_layout_commit_done_cb(struct rpc_task *task,
1453                                     struct nfs_commit_data *data)
1454{
1455        int err;
1456
1457        trace_nfs4_pnfs_commit_ds(data, task->tk_status);
1458        if (task->tk_status < 0)
1459                ff_layout_io_track_ds_error(data->lseg, data->ds_commit_index,
1460                                            data->args.offset, data->args.count,
1461                                            data->res.op_status, OP_COMMIT,
1462                                            task->tk_status);
1463        err = ff_layout_async_handle_error(task, NULL, data->ds_clp,
1464                                           data->lseg, data->ds_commit_index);
1465
1466        switch (err) {
1467        case -NFS4ERR_RESET_TO_PNFS:
1468                pnfs_generic_prepare_to_resend_writes(data);
1469                return -EAGAIN;
1470        case -NFS4ERR_RESET_TO_MDS:
1471                pnfs_generic_prepare_to_resend_writes(data);
1472                return -EAGAIN;
1473        case -EAGAIN:
1474                rpc_restart_call_prepare(task);
1475                return -EAGAIN;
1476        }
1477
1478        ff_layout_set_layoutcommit(data->inode, data->lseg, data->lwb);
1479
1480        return 0;
1481}
1482
1483static void ff_layout_write_record_layoutstats_start(struct rpc_task *task,
1484                struct nfs_pgio_header *hdr)
1485{
1486        if (test_and_set_bit(NFS_IOHDR_STAT, &hdr->flags))
1487                return;
1488        nfs4_ff_layout_stat_io_start_write(hdr->inode,
1489                        FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
1490                        hdr->args.count,
1491                        task->tk_start);
1492}
1493
1494static void ff_layout_write_record_layoutstats_done(struct rpc_task *task,
1495                struct nfs_pgio_header *hdr)
1496{
1497        if (!test_and_clear_bit(NFS_IOHDR_STAT, &hdr->flags))
1498                return;
1499        nfs4_ff_layout_stat_io_end_write(task,
1500                        FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
1501                        hdr->args.count, hdr->res.count,
1502                        hdr->res.verf->committed);
1503}
1504
1505static int ff_layout_write_prepare_common(struct rpc_task *task,
1506                                          struct nfs_pgio_header *hdr)
1507{
1508        if (unlikely(test_bit(NFS_CONTEXT_BAD, &hdr->args.context->flags))) {
1509                rpc_exit(task, -EIO);
1510                return -EIO;
1511        }
1512
1513        if (ff_layout_device_unavailable(hdr->lseg, hdr->pgio_mirror_idx)) {
1514                rpc_exit(task, -EHOSTDOWN);
1515                return -EAGAIN;
1516        }
1517
1518        ff_layout_write_record_layoutstats_start(task, hdr);
1519        return 0;
1520}
1521
1522static void ff_layout_write_prepare_v3(struct rpc_task *task, void *data)
1523{
1524        struct nfs_pgio_header *hdr = data;
1525
1526        if (ff_layout_write_prepare_common(task, hdr))
1527                return;
1528
1529        rpc_call_start(task);
1530}
1531
1532static void ff_layout_write_prepare_v4(struct rpc_task *task, void *data)
1533{
1534        struct nfs_pgio_header *hdr = data;
1535
1536        if (nfs4_setup_sequence(hdr->ds_clp,
1537                                &hdr->args.seq_args,
1538                                &hdr->res.seq_res,
1539                                task))
1540                return;
1541
1542        if (ff_layout_write_prepare_common(task, hdr))
1543                return;
1544
1545        if (nfs4_set_rw_stateid(&hdr->args.stateid, hdr->args.context,
1546                        hdr->args.lock_context, FMODE_WRITE) == -EIO)
1547                rpc_exit(task, -EIO); /* lost lock, terminate I/O */
1548}
1549
1550static void ff_layout_write_call_done(struct rpc_task *task, void *data)
1551{
1552        struct nfs_pgio_header *hdr = data;
1553
1554        if (test_bit(NFS_IOHDR_REDO, &hdr->flags) &&
1555            task->tk_status == 0) {
1556                nfs4_sequence_done(task, &hdr->res.seq_res);
1557                return;
1558        }
1559
1560        /* Note this may cause RPC to be resent */
1561        hdr->mds_ops->rpc_call_done(task, hdr);
1562}
1563
1564static void ff_layout_write_count_stats(struct rpc_task *task, void *data)
1565{
1566        struct nfs_pgio_header *hdr = data;
1567
1568        ff_layout_write_record_layoutstats_done(task, hdr);
1569        rpc_count_iostats_metrics(task,
1570            &NFS_CLIENT(hdr->inode)->cl_metrics[NFSPROC4_CLNT_WRITE]);
1571}
1572
1573static void ff_layout_write_release(void *data)
1574{
1575        struct nfs_pgio_header *hdr = data;
1576
1577        ff_layout_write_record_layoutstats_done(&hdr->task, hdr);
1578        pnfs_generic_rw_release(data);
1579}
1580
1581static void ff_layout_commit_record_layoutstats_start(struct rpc_task *task,
1582                struct nfs_commit_data *cdata)
1583{
1584        if (test_and_set_bit(NFS_IOHDR_STAT, &cdata->flags))
1585                return;
1586        nfs4_ff_layout_stat_io_start_write(cdata->inode,
1587                        FF_LAYOUT_COMP(cdata->lseg, cdata->ds_commit_index),
1588                        0, task->tk_start);
1589}
1590
1591static void ff_layout_commit_record_layoutstats_done(struct rpc_task *task,
1592                struct nfs_commit_data *cdata)
1593{
1594        struct nfs_page *req;
1595        __u64 count = 0;
1596
1597        if (!test_and_clear_bit(NFS_IOHDR_STAT, &cdata->flags))
1598                return;
1599
1600        if (task->tk_status == 0) {
1601                list_for_each_entry(req, &cdata->pages, wb_list)
1602                        count += req->wb_bytes;
1603        }
1604        nfs4_ff_layout_stat_io_end_write(task,
1605                        FF_LAYOUT_COMP(cdata->lseg, cdata->ds_commit_index),
1606                        count, count, NFS_FILE_SYNC);
1607}
1608
1609static void ff_layout_commit_prepare_common(struct rpc_task *task,
1610                struct nfs_commit_data *cdata)
1611{
1612        ff_layout_commit_record_layoutstats_start(task, cdata);
1613}
1614
1615static void ff_layout_commit_prepare_v3(struct rpc_task *task, void *data)
1616{
1617        ff_layout_commit_prepare_common(task, data);
1618        rpc_call_start(task);
1619}
1620
1621static void ff_layout_commit_prepare_v4(struct rpc_task *task, void *data)
1622{
1623        struct nfs_commit_data *wdata = data;
1624
1625        if (nfs4_setup_sequence(wdata->ds_clp,
1626                                &wdata->args.seq_args,
1627                                &wdata->res.seq_res,
1628                                task))
1629                return;
1630        ff_layout_commit_prepare_common(task, data);
1631}
1632
1633static void ff_layout_commit_done(struct rpc_task *task, void *data)
1634{
1635        pnfs_generic_write_commit_done(task, data);
1636}
1637
1638static void ff_layout_commit_count_stats(struct rpc_task *task, void *data)
1639{
1640        struct nfs_commit_data *cdata = data;
1641
1642        ff_layout_commit_record_layoutstats_done(task, cdata);
1643        rpc_count_iostats_metrics(task,
1644            &NFS_CLIENT(cdata->inode)->cl_metrics[NFSPROC4_CLNT_COMMIT]);
1645}
1646
1647static void ff_layout_commit_release(void *data)
1648{
1649        struct nfs_commit_data *cdata = data;
1650
1651        ff_layout_commit_record_layoutstats_done(&cdata->task, cdata);
1652        pnfs_generic_commit_release(data);
1653}
1654
1655static const struct rpc_call_ops ff_layout_read_call_ops_v3 = {
1656        .rpc_call_prepare = ff_layout_read_prepare_v3,
1657        .rpc_call_done = ff_layout_read_call_done,
1658        .rpc_count_stats = ff_layout_read_count_stats,
1659        .rpc_release = ff_layout_read_release,
1660};
1661
1662static const struct rpc_call_ops ff_layout_read_call_ops_v4 = {
1663        .rpc_call_prepare = ff_layout_read_prepare_v4,
1664        .rpc_call_done = ff_layout_read_call_done,
1665        .rpc_count_stats = ff_layout_read_count_stats,
1666        .rpc_release = ff_layout_read_release,
1667};
1668
1669static const struct rpc_call_ops ff_layout_write_call_ops_v3 = {
1670        .rpc_call_prepare = ff_layout_write_prepare_v3,
1671        .rpc_call_done = ff_layout_write_call_done,
1672        .rpc_count_stats = ff_layout_write_count_stats,
1673        .rpc_release = ff_layout_write_release,
1674};
1675
1676static const struct rpc_call_ops ff_layout_write_call_ops_v4 = {
1677        .rpc_call_prepare = ff_layout_write_prepare_v4,
1678        .rpc_call_done = ff_layout_write_call_done,
1679        .rpc_count_stats = ff_layout_write_count_stats,
1680        .rpc_release = ff_layout_write_release,
1681};
1682
1683static const struct rpc_call_ops ff_layout_commit_call_ops_v3 = {
1684        .rpc_call_prepare = ff_layout_commit_prepare_v3,
1685        .rpc_call_done = ff_layout_commit_done,
1686        .rpc_count_stats = ff_layout_commit_count_stats,
1687        .rpc_release = ff_layout_commit_release,
1688};
1689
1690static const struct rpc_call_ops ff_layout_commit_call_ops_v4 = {
1691        .rpc_call_prepare = ff_layout_commit_prepare_v4,
1692        .rpc_call_done = ff_layout_commit_done,
1693        .rpc_count_stats = ff_layout_commit_count_stats,
1694        .rpc_release = ff_layout_commit_release,
1695};
1696
1697static enum pnfs_try_status
1698ff_layout_read_pagelist(struct nfs_pgio_header *hdr)
1699{
1700        struct pnfs_layout_segment *lseg = hdr->lseg;
1701        struct nfs4_pnfs_ds *ds;
1702        struct rpc_clnt *ds_clnt;
1703        struct rpc_cred *ds_cred;
1704        loff_t offset = hdr->args.offset;
1705        u32 idx = hdr->pgio_mirror_idx;
1706        int vers;
1707        struct nfs_fh *fh;
1708
1709        dprintk("--> %s ino %lu pgbase %u req %zu@%llu\n",
1710                __func__, hdr->inode->i_ino,
1711                hdr->args.pgbase, (size_t)hdr->args.count, offset);
1712
1713        ds = nfs4_ff_layout_prepare_ds(lseg, idx, false);
1714        if (!ds)
1715                goto out_failed;
1716
1717        ds_clnt = nfs4_ff_find_or_create_ds_client(lseg, idx, ds->ds_clp,
1718                                                   hdr->inode);
1719        if (IS_ERR(ds_clnt))
1720                goto out_failed;
1721
1722        ds_cred = ff_layout_get_ds_cred(lseg, idx, hdr->cred);
1723        if (!ds_cred)
1724                goto out_failed;
1725
1726        vers = nfs4_ff_layout_ds_version(lseg, idx);
1727
1728        dprintk("%s USE DS: %s cl_count %d vers %d\n", __func__,
1729                ds->ds_remotestr, atomic_read(&ds->ds_clp->cl_count), vers);
1730
1731        hdr->pgio_done_cb = ff_layout_read_done_cb;
1732        atomic_inc(&ds->ds_clp->cl_count);
1733        hdr->ds_clp = ds->ds_clp;
1734        fh = nfs4_ff_layout_select_ds_fh(lseg, idx);
1735        if (fh)
1736                hdr->args.fh = fh;
1737        /*
1738         * Note that if we ever decide to split across DSes,
1739         * then we may need to handle dense-like offsets.
1740         */
1741        hdr->args.offset = offset;
1742        hdr->mds_offset = offset;
1743
1744        /* Perform an asynchronous read to ds */
1745        nfs_initiate_pgio(ds_clnt, hdr, ds_cred, ds->ds_clp->rpc_ops,
1746                          vers == 3 ? &ff_layout_read_call_ops_v3 :
1747                                      &ff_layout_read_call_ops_v4,
1748                          0, RPC_TASK_SOFTCONN);
1749        put_rpccred(ds_cred);
1750        return PNFS_ATTEMPTED;
1751
1752out_failed:
1753        if (ff_layout_avoid_mds_available_ds(lseg))
1754                return PNFS_TRY_AGAIN;
1755        return PNFS_NOT_ATTEMPTED;
1756}
1757
1758/* Perform async writes. */
1759static enum pnfs_try_status
1760ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync)
1761{
1762        struct pnfs_layout_segment *lseg = hdr->lseg;
1763        struct nfs4_pnfs_ds *ds;
1764        struct rpc_clnt *ds_clnt;
1765        struct rpc_cred *ds_cred;
1766        loff_t offset = hdr->args.offset;
1767        int vers;
1768        struct nfs_fh *fh;
1769        int idx = hdr->pgio_mirror_idx;
1770
1771        ds = nfs4_ff_layout_prepare_ds(lseg, idx, true);
1772        if (!ds)
1773                goto out_failed;
1774
1775        ds_clnt = nfs4_ff_find_or_create_ds_client(lseg, idx, ds->ds_clp,
1776                                                   hdr->inode);
1777        if (IS_ERR(ds_clnt))
1778                goto out_failed;
1779
1780        ds_cred = ff_layout_get_ds_cred(lseg, idx, hdr->cred);
1781        if (!ds_cred)
1782                goto out_failed;
1783
1784        vers = nfs4_ff_layout_ds_version(lseg, idx);
1785
1786        dprintk("%s ino %lu sync %d req %zu@%llu DS: %s cl_count %d vers %d\n",
1787                __func__, hdr->inode->i_ino, sync, (size_t) hdr->args.count,
1788                offset, ds->ds_remotestr, atomic_read(&ds->ds_clp->cl_count),
1789                vers);
1790
1791        hdr->pgio_done_cb = ff_layout_write_done_cb;
1792        atomic_inc(&ds->ds_clp->cl_count);
1793        hdr->ds_clp = ds->ds_clp;
1794        hdr->ds_commit_idx = idx;
1795        fh = nfs4_ff_layout_select_ds_fh(lseg, idx);
1796        if (fh)
1797                hdr->args.fh = fh;
1798
1799        /*
1800         * Note that if we ever decide to split across DSes,
1801         * then we may need to handle dense-like offsets.
1802         */
1803        hdr->args.offset = offset;
1804
1805        /* Perform an asynchronous write */
1806        nfs_initiate_pgio(ds_clnt, hdr, ds_cred, ds->ds_clp->rpc_ops,
1807                          vers == 3 ? &ff_layout_write_call_ops_v3 :
1808                                      &ff_layout_write_call_ops_v4,
1809                          sync, RPC_TASK_SOFTCONN);
1810        put_rpccred(ds_cred);
1811        return PNFS_ATTEMPTED;
1812
1813out_failed:
1814        if (ff_layout_avoid_mds_available_ds(lseg))
1815                return PNFS_TRY_AGAIN;
1816        return PNFS_NOT_ATTEMPTED;
1817}
1818
1819static u32 calc_ds_index_from_commit(struct pnfs_layout_segment *lseg, u32 i)
1820{
1821        return i;
1822}
1823
1824static struct nfs_fh *
1825select_ds_fh_from_commit(struct pnfs_layout_segment *lseg, u32 i)
1826{
1827        struct nfs4_ff_layout_segment *flseg = FF_LAYOUT_LSEG(lseg);
1828
1829        /* FIXME: Assume that there is only one NFS version available
1830         * for the DS.
1831         */
1832        return &flseg->mirror_array[i]->fh_versions[0];
1833}
1834
1835static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how)
1836{
1837        struct pnfs_layout_segment *lseg = data->lseg;
1838        struct nfs4_pnfs_ds *ds;
1839        struct rpc_clnt *ds_clnt;
1840        struct rpc_cred *ds_cred;
1841        u32 idx;
1842        int vers, ret;
1843        struct nfs_fh *fh;
1844
1845        if (!lseg || !(pnfs_is_valid_lseg(lseg) ||
1846            test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags)))
1847                goto out_err;
1848
1849        idx = calc_ds_index_from_commit(lseg, data->ds_commit_index);
1850        ds = nfs4_ff_layout_prepare_ds(lseg, idx, true);
1851        if (!ds)
1852                goto out_err;
1853
1854        ds_clnt = nfs4_ff_find_or_create_ds_client(lseg, idx, ds->ds_clp,
1855                                                   data->inode);
1856        if (IS_ERR(ds_clnt))
1857                goto out_err;
1858
1859        ds_cred = ff_layout_get_ds_cred(lseg, idx, data->cred);
1860        if (!ds_cred)
1861                goto out_err;
1862
1863        vers = nfs4_ff_layout_ds_version(lseg, idx);
1864
1865        dprintk("%s ino %lu, how %d cl_count %d vers %d\n", __func__,
1866                data->inode->i_ino, how, atomic_read(&ds->ds_clp->cl_count),
1867                vers);
1868        data->commit_done_cb = ff_layout_commit_done_cb;
1869        data->cred = ds_cred;
1870        atomic_inc(&ds->ds_clp->cl_count);
1871        data->ds_clp = ds->ds_clp;
1872        fh = select_ds_fh_from_commit(lseg, data->ds_commit_index);
1873        if (fh)
1874                data->args.fh = fh;
1875
1876        ret = nfs_initiate_commit(ds_clnt, data, ds->ds_clp->rpc_ops,
1877                                   vers == 3 ? &ff_layout_commit_call_ops_v3 :
1878                                               &ff_layout_commit_call_ops_v4,
1879                                   how, RPC_TASK_SOFTCONN);
1880        put_rpccred(ds_cred);
1881        return ret;
1882out_err:
1883        pnfs_generic_prepare_to_resend_writes(data);
1884        pnfs_generic_commit_release(data);
1885        return -EAGAIN;
1886}
1887
1888static int
1889ff_layout_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
1890                           int how, struct nfs_commit_info *cinfo)
1891{
1892        return pnfs_generic_commit_pagelist(inode, mds_pages, how, cinfo,
1893                                            ff_layout_initiate_commit);
1894}
1895
1896static struct pnfs_ds_commit_info *
1897ff_layout_get_ds_info(struct inode *inode)
1898{
1899        struct pnfs_layout_hdr *layout = NFS_I(inode)->layout;
1900
1901        if (layout == NULL)
1902                return NULL;
1903
1904        return &FF_LAYOUT_FROM_HDR(layout)->commit_info;
1905}
1906
1907static void
1908ff_layout_free_deviceid_node(struct nfs4_deviceid_node *d)
1909{
1910        nfs4_ff_layout_free_deviceid(container_of(d, struct nfs4_ff_layout_ds,
1911                                                  id_node));
1912}
1913
1914static int ff_layout_encode_ioerr(struct xdr_stream *xdr,
1915                                  const struct nfs4_layoutreturn_args *args,
1916                                  const struct nfs4_flexfile_layoutreturn_args *ff_args)
1917{
1918        __be32 *start;
1919
1920        start = xdr_reserve_space(xdr, 4);
1921        if (unlikely(!start))
1922                return -E2BIG;
1923
1924        *start = cpu_to_be32(ff_args->num_errors);
1925        /* This assume we always return _ALL_ layouts */
1926        return ff_layout_encode_ds_ioerr(xdr, &ff_args->errors);
1927}
1928
1929static void
1930encode_opaque_fixed(struct xdr_stream *xdr, const void *buf, size_t len)
1931{
1932        WARN_ON_ONCE(xdr_stream_encode_opaque_fixed(xdr, buf, len) < 0);
1933}
1934
1935static void
1936ff_layout_encode_ff_iostat_head(struct xdr_stream *xdr,
1937                            const nfs4_stateid *stateid,
1938                            const struct nfs42_layoutstat_devinfo *devinfo)
1939{
1940        __be32 *p;
1941
1942        p = xdr_reserve_space(xdr, 8 + 8);
1943        p = xdr_encode_hyper(p, devinfo->offset);
1944        p = xdr_encode_hyper(p, devinfo->length);
1945        encode_opaque_fixed(xdr, stateid->data, NFS4_STATEID_SIZE);
1946        p = xdr_reserve_space(xdr, 4*8);
1947        p = xdr_encode_hyper(p, devinfo->read_count);
1948        p = xdr_encode_hyper(p, devinfo->read_bytes);
1949        p = xdr_encode_hyper(p, devinfo->write_count);
1950        p = xdr_encode_hyper(p, devinfo->write_bytes);
1951        encode_opaque_fixed(xdr, devinfo->dev_id.data, NFS4_DEVICEID4_SIZE);
1952}
1953
1954static void
1955ff_layout_encode_ff_iostat(struct xdr_stream *xdr,
1956                            const nfs4_stateid *stateid,
1957                            const struct nfs42_layoutstat_devinfo *devinfo)
1958{
1959        ff_layout_encode_ff_iostat_head(xdr, stateid, devinfo);
1960        ff_layout_encode_ff_layoutupdate(xdr, devinfo,
1961                        devinfo->ld_private.data);
1962}
1963
1964/* report nothing for now */
1965static void ff_layout_encode_iostats_array(struct xdr_stream *xdr,
1966                const struct nfs4_layoutreturn_args *args,
1967                struct nfs4_flexfile_layoutreturn_args *ff_args)
1968{
1969        __be32 *p;
1970        int i;
1971
1972        p = xdr_reserve_space(xdr, 4);
1973        *p = cpu_to_be32(ff_args->num_dev);
1974        for (i = 0; i < ff_args->num_dev; i++)
1975                ff_layout_encode_ff_iostat(xdr,
1976                                &args->layout->plh_stateid,
1977                                &ff_args->devinfo[i]);
1978}
1979
1980static void
1981ff_layout_free_iostats_array(struct nfs42_layoutstat_devinfo *devinfo,
1982                unsigned int num_entries)
1983{
1984        unsigned int i;
1985
1986        for (i = 0; i < num_entries; i++) {
1987                if (!devinfo[i].ld_private.ops)
1988                        continue;
1989                if (!devinfo[i].ld_private.ops->free)
1990                        continue;
1991                devinfo[i].ld_private.ops->free(&devinfo[i].ld_private);
1992        }
1993}
1994
1995static struct nfs4_deviceid_node *
1996ff_layout_alloc_deviceid_node(struct nfs_server *server,
1997                              struct pnfs_device *pdev, gfp_t gfp_flags)
1998{
1999        struct nfs4_ff_layout_ds *dsaddr;
2000
2001        dsaddr = nfs4_ff_alloc_deviceid_node(server, pdev, gfp_flags);
2002        if (!dsaddr)
2003                return NULL;
2004        return &dsaddr->id_node;
2005}
2006
2007static void
2008ff_layout_encode_layoutreturn(struct xdr_stream *xdr,
2009                const void *voidargs,
2010                const struct nfs4_xdr_opaque_data *ff_opaque)
2011{
2012        const struct nfs4_layoutreturn_args *args = voidargs;
2013        struct nfs4_flexfile_layoutreturn_args *ff_args = ff_opaque->data;
2014        struct xdr_buf tmp_buf = {
2015                .head = {
2016                        [0] = {
2017                                .iov_base = page_address(ff_args->pages[0]),
2018                        },
2019                },
2020                .buflen = PAGE_SIZE,
2021        };
2022        struct xdr_stream tmp_xdr;
2023        __be32 *start;
2024
2025        dprintk("%s: Begin\n", __func__);
2026
2027        xdr_init_encode(&tmp_xdr, &tmp_buf, NULL);
2028
2029        ff_layout_encode_ioerr(&tmp_xdr, args, ff_args);
2030        ff_layout_encode_iostats_array(&tmp_xdr, args, ff_args);
2031
2032        start = xdr_reserve_space(xdr, 4);
2033        *start = cpu_to_be32(tmp_buf.len);
2034        xdr_write_pages(xdr, ff_args->pages, 0, tmp_buf.len);
2035
2036        dprintk("%s: Return\n", __func__);
2037}
2038
2039static void
2040ff_layout_free_layoutreturn(struct nfs4_xdr_opaque_data *args)
2041{
2042        struct nfs4_flexfile_layoutreturn_args *ff_args;
2043
2044        if (!args->data)
2045                return;
2046        ff_args = args->data;
2047        args->data = NULL;
2048
2049        ff_layout_free_ds_ioerr(&ff_args->errors);
2050        ff_layout_free_iostats_array(ff_args->devinfo, ff_args->num_dev);
2051
2052        put_page(ff_args->pages[0]);
2053        kfree(ff_args);
2054}
2055
2056static const struct nfs4_xdr_opaque_ops layoutreturn_ops = {
2057        .encode = ff_layout_encode_layoutreturn,
2058        .free = ff_layout_free_layoutreturn,
2059};
2060
2061static int
2062ff_layout_prepare_layoutreturn(struct nfs4_layoutreturn_args *args)
2063{
2064        struct nfs4_flexfile_layoutreturn_args *ff_args;
2065        struct nfs4_flexfile_layout *ff_layout = FF_LAYOUT_FROM_HDR(args->layout);
2066
2067        ff_args = kmalloc(sizeof(*ff_args), GFP_KERNEL);
2068        if (!ff_args)
2069                goto out_nomem;
2070        ff_args->pages[0] = alloc_page(GFP_KERNEL);
2071        if (!ff_args->pages[0])
2072                goto out_nomem_free;
2073
2074        INIT_LIST_HEAD(&ff_args->errors);
2075        ff_args->num_errors = ff_layout_fetch_ds_ioerr(args->layout,
2076                        &args->range, &ff_args->errors,
2077                        FF_LAYOUTRETURN_MAXERR);
2078
2079        spin_lock(&args->inode->i_lock);
2080        ff_args->num_dev = ff_layout_mirror_prepare_stats(&ff_layout->generic_hdr,
2081                        &ff_args->devinfo[0], ARRAY_SIZE(ff_args->devinfo));
2082        spin_unlock(&args->inode->i_lock);
2083
2084        args->ld_private->ops = &layoutreturn_ops;
2085        args->ld_private->data = ff_args;
2086        return 0;
2087out_nomem_free:
2088        kfree(ff_args);
2089out_nomem:
2090        return -ENOMEM;
2091}
2092
2093static int
2094ff_layout_ntop4(const struct sockaddr *sap, char *buf, const size_t buflen)
2095{
2096        const struct sockaddr_in *sin = (struct sockaddr_in *)sap;
2097
2098        return snprintf(buf, buflen, "%pI4", &sin->sin_addr);
2099}
2100
2101static size_t
2102ff_layout_ntop6_noscopeid(const struct sockaddr *sap, char *buf,
2103                          const int buflen)
2104{
2105        const struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
2106        const struct in6_addr *addr = &sin6->sin6_addr;
2107
2108        /*
2109         * RFC 4291, Section 2.2.2
2110         *
2111         * Shorthanded ANY address
2112         */
2113        if (ipv6_addr_any(addr))
2114                return snprintf(buf, buflen, "::");
2115
2116        /*
2117         * RFC 4291, Section 2.2.2
2118         *
2119         * Shorthanded loopback address
2120         */
2121        if (ipv6_addr_loopback(addr))
2122                return snprintf(buf, buflen, "::1");
2123
2124        /*
2125         * RFC 4291, Section 2.2.3
2126         *
2127         * Special presentation address format for mapped v4
2128         * addresses.
2129         */
2130        if (ipv6_addr_v4mapped(addr))
2131                return snprintf(buf, buflen, "::ffff:%pI4",
2132                                        &addr->s6_addr32[3]);
2133
2134        /*
2135         * RFC 4291, Section 2.2.1
2136         */
2137        return snprintf(buf, buflen, "%pI6c", addr);
2138}
2139
2140/* Derived from rpc_sockaddr2uaddr */
2141static void
2142ff_layout_encode_netaddr(struct xdr_stream *xdr, struct nfs4_pnfs_ds_addr *da)
2143{
2144        struct sockaddr *sap = (struct sockaddr *)&da->da_addr;
2145        char portbuf[RPCBIND_MAXUADDRPLEN];
2146        char addrbuf[RPCBIND_MAXUADDRLEN];
2147        char *netid;
2148        unsigned short port;
2149        int len, netid_len;
2150        __be32 *p;
2151
2152        switch (sap->sa_family) {
2153        case AF_INET:
2154                if (ff_layout_ntop4(sap, addrbuf, sizeof(addrbuf)) == 0)
2155                        return;
2156                port = ntohs(((struct sockaddr_in *)sap)->sin_port);
2157                netid = "tcp";
2158                netid_len = 3;
2159                break;
2160        case AF_INET6:
2161                if (ff_layout_ntop6_noscopeid(sap, addrbuf, sizeof(addrbuf)) == 0)
2162                        return;
2163                port = ntohs(((struct sockaddr_in6 *)sap)->sin6_port);
2164                netid = "tcp6";
2165                netid_len = 4;
2166                break;
2167        default:
2168                /* we only support tcp and tcp6 */
2169                WARN_ON_ONCE(1);
2170                return;
2171        }
2172
2173        snprintf(portbuf, sizeof(portbuf), ".%u.%u", port >> 8, port & 0xff);
2174        len = strlcat(addrbuf, portbuf, sizeof(addrbuf));
2175
2176        p = xdr_reserve_space(xdr, 4 + netid_len);
2177        xdr_encode_opaque(p, netid, netid_len);
2178
2179        p = xdr_reserve_space(xdr, 4 + len);
2180        xdr_encode_opaque(p, addrbuf, len);
2181}
2182
2183static void
2184ff_layout_encode_nfstime(struct xdr_stream *xdr,
2185                         ktime_t t)
2186{
2187        struct timespec64 ts;
2188        __be32 *p;
2189
2190        p = xdr_reserve_space(xdr, 12);
2191        ts = ktime_to_timespec64(t);
2192        p = xdr_encode_hyper(p, ts.tv_sec);
2193        *p++ = cpu_to_be32(ts.tv_nsec);
2194}
2195
2196static void
2197ff_layout_encode_io_latency(struct xdr_stream *xdr,
2198                            struct nfs4_ff_io_stat *stat)
2199{
2200        __be32 *p;
2201
2202        p = xdr_reserve_space(xdr, 5 * 8);
2203        p = xdr_encode_hyper(p, stat->ops_requested);
2204        p = xdr_encode_hyper(p, stat->bytes_requested);
2205        p = xdr_encode_hyper(p, stat->ops_completed);
2206        p = xdr_encode_hyper(p, stat->bytes_completed);
2207        p = xdr_encode_hyper(p, stat->bytes_not_delivered);
2208        ff_layout_encode_nfstime(xdr, stat->total_busy_time);
2209        ff_layout_encode_nfstime(xdr, stat->aggregate_completion_time);
2210}
2211
2212static void
2213ff_layout_encode_ff_layoutupdate(struct xdr_stream *xdr,
2214                              const struct nfs42_layoutstat_devinfo *devinfo,
2215                              struct nfs4_ff_layout_mirror *mirror)
2216{
2217        struct nfs4_pnfs_ds_addr *da;
2218        struct nfs4_pnfs_ds *ds = mirror->mirror_ds->ds;
2219        struct nfs_fh *fh = &mirror->fh_versions[0];
2220        __be32 *p;
2221
2222        da = list_first_entry(&ds->ds_addrs, struct nfs4_pnfs_ds_addr, da_node);
2223        dprintk("%s: DS %s: encoding address %s\n",
2224                __func__, ds->ds_remotestr, da->da_remotestr);
2225        /* netaddr4 */
2226        ff_layout_encode_netaddr(xdr, da);
2227        /* nfs_fh4 */
2228        p = xdr_reserve_space(xdr, 4 + fh->size);
2229        xdr_encode_opaque(p, fh->data, fh->size);
2230        /* ff_io_latency4 read */
2231        spin_lock(&mirror->lock);
2232        ff_layout_encode_io_latency(xdr, &mirror->read_stat.io_stat);
2233        /* ff_io_latency4 write */
2234        ff_layout_encode_io_latency(xdr, &mirror->write_stat.io_stat);
2235        spin_unlock(&mirror->lock);
2236        /* nfstime4 */
2237        ff_layout_encode_nfstime(xdr, ktime_sub(ktime_get(), mirror->start_time));
2238        /* bool */
2239        p = xdr_reserve_space(xdr, 4);
2240        *p = cpu_to_be32(false);
2241}
2242
2243static void
2244ff_layout_encode_layoutstats(struct xdr_stream *xdr, const void *args,
2245                             const struct nfs4_xdr_opaque_data *opaque)
2246{
2247        struct nfs42_layoutstat_devinfo *devinfo = container_of(opaque,
2248                        struct nfs42_layoutstat_devinfo, ld_private);
2249        __be32 *start;
2250
2251        /* layoutupdate length */
2252        start = xdr_reserve_space(xdr, 4);
2253        ff_layout_encode_ff_layoutupdate(xdr, devinfo, opaque->data);
2254
2255        *start = cpu_to_be32((xdr->p - start - 1) * 4);
2256}
2257
2258static void
2259ff_layout_free_layoutstats(struct nfs4_xdr_opaque_data *opaque)
2260{
2261        struct nfs4_ff_layout_mirror *mirror = opaque->data;
2262
2263        ff_layout_put_mirror(mirror);
2264}
2265
2266static const struct nfs4_xdr_opaque_ops layoutstat_ops = {
2267        .encode = ff_layout_encode_layoutstats,
2268        .free   = ff_layout_free_layoutstats,
2269};
2270
2271static int
2272ff_layout_mirror_prepare_stats(struct pnfs_layout_hdr *lo,
2273                               struct nfs42_layoutstat_devinfo *devinfo,
2274                               int dev_limit)
2275{
2276        struct nfs4_flexfile_layout *ff_layout = FF_LAYOUT_FROM_HDR(lo);
2277        struct nfs4_ff_layout_mirror *mirror;
2278        struct nfs4_deviceid_node *dev;
2279        int i = 0;
2280
2281        list_for_each_entry(mirror, &ff_layout->mirrors, mirrors) {
2282                if (i >= dev_limit)
2283                        break;
2284                if (IS_ERR_OR_NULL(mirror->mirror_ds))
2285                        continue;
2286                if (!test_and_clear_bit(NFS4_FF_MIRROR_STAT_AVAIL, &mirror->flags))
2287                        continue;
2288                /* mirror refcount put in cleanup_layoutstats */
2289                if (!atomic_inc_not_zero(&mirror->ref))
2290                        continue;
2291                dev = &mirror->mirror_ds->id_node; 
2292                memcpy(&devinfo->dev_id, &dev->deviceid, NFS4_DEVICEID4_SIZE);
2293                devinfo->offset = 0;
2294                devinfo->length = NFS4_MAX_UINT64;
2295                spin_lock(&mirror->lock);
2296                devinfo->read_count = mirror->read_stat.io_stat.ops_completed;
2297                devinfo->read_bytes = mirror->read_stat.io_stat.bytes_completed;
2298                devinfo->write_count = mirror->write_stat.io_stat.ops_completed;
2299                devinfo->write_bytes = mirror->write_stat.io_stat.bytes_completed;
2300                spin_unlock(&mirror->lock);
2301                devinfo->layout_type = LAYOUT_FLEX_FILES;
2302                devinfo->ld_private.ops = &layoutstat_ops;
2303                devinfo->ld_private.data = mirror;
2304
2305                devinfo++;
2306                i++;
2307        }
2308        return i;
2309}
2310
2311static int
2312ff_layout_prepare_layoutstats(struct nfs42_layoutstat_args *args)
2313{
2314        struct nfs4_flexfile_layout *ff_layout;
2315        const int dev_count = PNFS_LAYOUTSTATS_MAXDEV;
2316
2317        /* For now, send at most PNFS_LAYOUTSTATS_MAXDEV statistics */
2318        args->devinfo = kmalloc_array(dev_count, sizeof(*args->devinfo), GFP_NOIO);
2319        if (!args->devinfo)
2320                return -ENOMEM;
2321
2322        spin_lock(&args->inode->i_lock);
2323        ff_layout = FF_LAYOUT_FROM_HDR(NFS_I(args->inode)->layout);
2324        args->num_dev = ff_layout_mirror_prepare_stats(&ff_layout->generic_hdr,
2325                        &args->devinfo[0], dev_count);
2326        spin_unlock(&args->inode->i_lock);
2327        if (!args->num_dev) {
2328                kfree(args->devinfo);
2329                args->devinfo = NULL;
2330                return -ENOENT;
2331        }
2332
2333        return 0;
2334}
2335
2336static int
2337ff_layout_set_layoutdriver(struct nfs_server *server,
2338                const struct nfs_fh *dummy)
2339{
2340#if IS_ENABLED(CONFIG_NFS_V4_2)
2341        server->caps |= NFS_CAP_LAYOUTSTATS;
2342#endif
2343        return 0;
2344}
2345
2346static struct pnfs_layoutdriver_type flexfilelayout_type = {
2347        .id                     = LAYOUT_FLEX_FILES,
2348        .name                   = "LAYOUT_FLEX_FILES",
2349        .owner                  = THIS_MODULE,
2350        .set_layoutdriver       = ff_layout_set_layoutdriver,
2351        .alloc_layout_hdr       = ff_layout_alloc_layout_hdr,
2352        .free_layout_hdr        = ff_layout_free_layout_hdr,
2353        .alloc_lseg             = ff_layout_alloc_lseg,
2354        .free_lseg              = ff_layout_free_lseg,
2355        .add_lseg               = ff_layout_add_lseg,
2356        .pg_read_ops            = &ff_layout_pg_read_ops,
2357        .pg_write_ops           = &ff_layout_pg_write_ops,
2358        .get_ds_info            = ff_layout_get_ds_info,
2359        .free_deviceid_node     = ff_layout_free_deviceid_node,
2360        .mark_request_commit    = pnfs_layout_mark_request_commit,
2361        .clear_request_commit   = pnfs_generic_clear_request_commit,
2362        .scan_commit_lists      = pnfs_generic_scan_commit_lists,
2363        .recover_commit_reqs    = pnfs_generic_recover_commit_reqs,
2364        .commit_pagelist        = ff_layout_commit_pagelist,
2365        .read_pagelist          = ff_layout_read_pagelist,
2366        .write_pagelist         = ff_layout_write_pagelist,
2367        .alloc_deviceid_node    = ff_layout_alloc_deviceid_node,
2368        .prepare_layoutreturn   = ff_layout_prepare_layoutreturn,
2369        .sync                   = pnfs_nfs_generic_sync,
2370        .prepare_layoutstats    = ff_layout_prepare_layoutstats,
2371};
2372
2373static int __init nfs4flexfilelayout_init(void)
2374{
2375        printk(KERN_INFO "%s: NFSv4 Flexfile Layout Driver Registering...\n",
2376               __func__);
2377        if (!ff_zero_group) {
2378                ff_zero_group = groups_alloc(0);
2379                if (!ff_zero_group)
2380                        return -ENOMEM;
2381        }
2382        return pnfs_register_layoutdriver(&flexfilelayout_type);
2383}
2384
2385static void __exit nfs4flexfilelayout_exit(void)
2386{
2387        printk(KERN_INFO "%s: NFSv4 Flexfile Layout Driver Unregistering...\n",
2388               __func__);
2389        pnfs_unregister_layoutdriver(&flexfilelayout_type);
2390        if (ff_zero_group) {
2391                put_group_info(ff_zero_group);
2392                ff_zero_group = NULL;
2393        }
2394}
2395
2396MODULE_ALIAS("nfs-layouttype4-4");
2397
2398MODULE_LICENSE("GPL");
2399MODULE_DESCRIPTION("The NFSv4 flexfile layout driver");
2400
2401module_init(nfs4flexfilelayout_init);
2402module_exit(nfs4flexfilelayout_exit);
2403