linux/fs/nfs/flexfilelayout/flexfilelayoutdev.c
<<
>>
Prefs
   1/*
   2 * Device operations for the pnfs nfs4 file layout driver.
   3 *
   4 * Copyright (c) 2014, Primary Data, Inc. All rights reserved.
   5 *
   6 * Tao Peng <bergwolf@primarydata.com>
   7 */
   8
   9#include <linux/nfs_fs.h>
  10#include <linux/vmalloc.h>
  11#include <linux/module.h>
  12#include <linux/sunrpc/addr.h>
  13
  14#include "../internal.h"
  15#include "../nfs4session.h"
  16#include "flexfilelayout.h"
  17
  18#define NFSDBG_FACILITY         NFSDBG_PNFS_LD
  19
  20static unsigned int dataserver_timeo = NFS_DEF_TCP_RETRANS;
  21static unsigned int dataserver_retrans;
  22
  23void nfs4_ff_layout_put_deviceid(struct nfs4_ff_layout_ds *mirror_ds)
  24{
  25        if (mirror_ds)
  26                nfs4_put_deviceid_node(&mirror_ds->id_node);
  27}
  28
  29void nfs4_ff_layout_free_deviceid(struct nfs4_ff_layout_ds *mirror_ds)
  30{
  31        nfs4_print_deviceid(&mirror_ds->id_node.deviceid);
  32        nfs4_pnfs_ds_put(mirror_ds->ds);
  33        kfree_rcu(mirror_ds, id_node.rcu);
  34}
  35
  36/* Decode opaque device data and construct new_ds using it */
  37struct nfs4_ff_layout_ds *
  38nfs4_ff_alloc_deviceid_node(struct nfs_server *server, struct pnfs_device *pdev,
  39                            gfp_t gfp_flags)
  40{
  41        struct xdr_stream stream;
  42        struct xdr_buf buf;
  43        struct page *scratch;
  44        struct list_head dsaddrs;
  45        struct nfs4_pnfs_ds_addr *da;
  46        struct nfs4_ff_layout_ds *new_ds = NULL;
  47        struct nfs4_ff_ds_version *ds_versions = NULL;
  48        u32 mp_count;
  49        u32 version_count;
  50        __be32 *p;
  51        int i, ret = -ENOMEM;
  52
  53        /* set up xdr stream */
  54        scratch = alloc_page(gfp_flags);
  55        if (!scratch)
  56                goto out_err;
  57
  58        new_ds = kzalloc(sizeof(struct nfs4_ff_layout_ds), gfp_flags);
  59        if (!new_ds)
  60                goto out_scratch;
  61
  62        nfs4_init_deviceid_node(&new_ds->id_node,
  63                                server,
  64                                &pdev->dev_id);
  65        INIT_LIST_HEAD(&dsaddrs);
  66
  67        xdr_init_decode_pages(&stream, &buf, pdev->pages, pdev->pglen);
  68        xdr_set_scratch_buffer(&stream, page_address(scratch), PAGE_SIZE);
  69
  70        /* multipath count */
  71        p = xdr_inline_decode(&stream, 4);
  72        if (unlikely(!p))
  73                goto out_err_drain_dsaddrs;
  74        mp_count = be32_to_cpup(p);
  75        dprintk("%s: multipath ds count %d\n", __func__, mp_count);
  76
  77        for (i = 0; i < mp_count; i++) {
  78                /* multipath ds */
  79                da = nfs4_decode_mp_ds_addr(server->nfs_client->cl_net,
  80                                            &stream, gfp_flags);
  81                if (da)
  82                        list_add_tail(&da->da_node, &dsaddrs);
  83        }
  84        if (list_empty(&dsaddrs)) {
  85                dprintk("%s: no suitable DS addresses found\n",
  86                        __func__);
  87                ret = -ENOMEDIUM;
  88                goto out_err_drain_dsaddrs;
  89        }
  90
  91        /* version count */
  92        p = xdr_inline_decode(&stream, 4);
  93        if (unlikely(!p))
  94                goto out_err_drain_dsaddrs;
  95        version_count = be32_to_cpup(p);
  96        dprintk("%s: version count %d\n", __func__, version_count);
  97
  98        ds_versions = kzalloc(version_count * sizeof(struct nfs4_ff_ds_version),
  99                              gfp_flags);
 100        if (!ds_versions)
 101                goto out_scratch;
 102
 103        for (i = 0; i < version_count; i++) {
 104                /* 20 = version(4) + minor_version(4) + rsize(4) + wsize(4) +
 105                 * tightly_coupled(4) */
 106                p = xdr_inline_decode(&stream, 20);
 107                if (unlikely(!p))
 108                        goto out_err_drain_dsaddrs;
 109                ds_versions[i].version = be32_to_cpup(p++);
 110                ds_versions[i].minor_version = be32_to_cpup(p++);
 111                ds_versions[i].rsize = nfs_block_size(be32_to_cpup(p++), NULL);
 112                ds_versions[i].wsize = nfs_block_size(be32_to_cpup(p++), NULL);
 113                ds_versions[i].tightly_coupled = be32_to_cpup(p);
 114
 115                if (ds_versions[i].rsize > NFS_MAX_FILE_IO_SIZE)
 116                        ds_versions[i].rsize = NFS_MAX_FILE_IO_SIZE;
 117                if (ds_versions[i].wsize > NFS_MAX_FILE_IO_SIZE)
 118                        ds_versions[i].wsize = NFS_MAX_FILE_IO_SIZE;
 119
 120                if (ds_versions[i].version != 3 || ds_versions[i].minor_version != 0) {
 121                        dprintk("%s: [%d] unsupported ds version %d-%d\n", __func__,
 122                                i, ds_versions[i].version,
 123                                ds_versions[i].minor_version);
 124                        ret = -EPROTONOSUPPORT;
 125                        goto out_err_drain_dsaddrs;
 126                }
 127
 128                dprintk("%s: [%d] vers %u minor_ver %u rsize %u wsize %u coupled %d\n",
 129                        __func__, i, ds_versions[i].version,
 130                        ds_versions[i].minor_version,
 131                        ds_versions[i].rsize,
 132                        ds_versions[i].wsize,
 133                        ds_versions[i].tightly_coupled);
 134        }
 135
 136        new_ds->ds_versions = ds_versions;
 137        new_ds->ds_versions_cnt = version_count;
 138
 139        new_ds->ds = nfs4_pnfs_ds_add(&dsaddrs, gfp_flags);
 140        if (!new_ds->ds)
 141                goto out_err_drain_dsaddrs;
 142
 143        /* If DS was already in cache, free ds addrs */
 144        while (!list_empty(&dsaddrs)) {
 145                da = list_first_entry(&dsaddrs,
 146                                      struct nfs4_pnfs_ds_addr,
 147                                      da_node);
 148                list_del_init(&da->da_node);
 149                kfree(da->da_remotestr);
 150                kfree(da);
 151        }
 152
 153        __free_page(scratch);
 154        return new_ds;
 155
 156out_err_drain_dsaddrs:
 157        while (!list_empty(&dsaddrs)) {
 158                da = list_first_entry(&dsaddrs, struct nfs4_pnfs_ds_addr,
 159                                      da_node);
 160                list_del_init(&da->da_node);
 161                kfree(da->da_remotestr);
 162                kfree(da);
 163        }
 164
 165        kfree(ds_versions);
 166out_scratch:
 167        __free_page(scratch);
 168out_err:
 169        kfree(new_ds);
 170
 171        dprintk("%s ERROR: returning %d\n", __func__, ret);
 172        return NULL;
 173}
 174
 175static void ff_layout_mark_devid_invalid(struct pnfs_layout_segment *lseg,
 176                struct nfs4_deviceid_node *devid)
 177{
 178        nfs4_mark_deviceid_unavailable(devid);
 179        if (!ff_layout_has_available_ds(lseg))
 180                pnfs_error_mark_layout_for_return(lseg->pls_layout->plh_inode,
 181                                lseg);
 182}
 183
 184static bool ff_layout_mirror_valid(struct pnfs_layout_segment *lseg,
 185                struct nfs4_ff_layout_mirror *mirror)
 186{
 187        if (mirror == NULL || mirror->mirror_ds == NULL) {
 188                pnfs_error_mark_layout_for_return(lseg->pls_layout->plh_inode,
 189                                        lseg);
 190                return false;
 191        }
 192        if (mirror->mirror_ds->ds == NULL) {
 193                struct nfs4_deviceid_node *devid;
 194                devid = &mirror->mirror_ds->id_node;
 195                ff_layout_mark_devid_invalid(lseg, devid);
 196                return false;
 197        }
 198        return true;
 199}
 200
 201static u64
 202end_offset(u64 start, u64 len)
 203{
 204        u64 end;
 205
 206        end = start + len;
 207        return end >= start ? end : NFS4_MAX_UINT64;
 208}
 209
 210static void extend_ds_error(struct nfs4_ff_layout_ds_err *err,
 211                            u64 offset, u64 length)
 212{
 213        u64 end;
 214
 215        end = max_t(u64, end_offset(err->offset, err->length),
 216                    end_offset(offset, length));
 217        err->offset = min_t(u64, err->offset, offset);
 218        err->length = end - err->offset;
 219}
 220
 221static int
 222ff_ds_error_match(const struct nfs4_ff_layout_ds_err *e1,
 223                const struct nfs4_ff_layout_ds_err *e2)
 224{
 225        int ret;
 226
 227        if (e1->opnum != e2->opnum)
 228                return e1->opnum < e2->opnum ? -1 : 1;
 229        if (e1->status != e2->status)
 230                return e1->status < e2->status ? -1 : 1;
 231        ret = memcmp(e1->stateid.data, e2->stateid.data,
 232                        sizeof(e1->stateid.data));
 233        if (ret != 0)
 234                return ret;
 235        ret = memcmp(&e1->deviceid, &e2->deviceid, sizeof(e1->deviceid));
 236        if (ret != 0)
 237                return ret;
 238        if (end_offset(e1->offset, e1->length) < e2->offset)
 239                return -1;
 240        if (e1->offset > end_offset(e2->offset, e2->length))
 241                return 1;
 242        /* If ranges overlap or are contiguous, they are the same */
 243        return 0;
 244}
 245
 246static void
 247ff_layout_add_ds_error_locked(struct nfs4_flexfile_layout *flo,
 248                              struct nfs4_ff_layout_ds_err *dserr)
 249{
 250        struct nfs4_ff_layout_ds_err *err, *tmp;
 251        struct list_head *head = &flo->error_list;
 252        int match;
 253
 254        /* Do insertion sort w/ merges */
 255        list_for_each_entry_safe(err, tmp, &flo->error_list, list) {
 256                match = ff_ds_error_match(err, dserr);
 257                if (match < 0)
 258                        continue;
 259                if (match > 0) {
 260                        /* Add entry "dserr" _before_ entry "err" */
 261                        head = &err->list;
 262                        break;
 263                }
 264                /* Entries match, so merge "err" into "dserr" */
 265                extend_ds_error(dserr, err->offset, err->length);
 266                list_del(&err->list);
 267                kfree(err);
 268        }
 269
 270        list_add_tail(&dserr->list, head);
 271}
 272
 273int ff_layout_track_ds_error(struct nfs4_flexfile_layout *flo,
 274                             struct nfs4_ff_layout_mirror *mirror, u64 offset,
 275                             u64 length, int status, enum nfs_opnum4 opnum,
 276                             gfp_t gfp_flags)
 277{
 278        struct nfs4_ff_layout_ds_err *dserr;
 279
 280        if (status == 0)
 281                return 0;
 282
 283        if (mirror->mirror_ds == NULL)
 284                return -EINVAL;
 285
 286        dserr = kmalloc(sizeof(*dserr), gfp_flags);
 287        if (!dserr)
 288                return -ENOMEM;
 289
 290        INIT_LIST_HEAD(&dserr->list);
 291        dserr->offset = offset;
 292        dserr->length = length;
 293        dserr->status = status;
 294        dserr->opnum = opnum;
 295        nfs4_stateid_copy(&dserr->stateid, &mirror->stateid);
 296        memcpy(&dserr->deviceid, &mirror->mirror_ds->id_node.deviceid,
 297               NFS4_DEVICEID4_SIZE);
 298
 299        spin_lock(&flo->generic_hdr.plh_inode->i_lock);
 300        ff_layout_add_ds_error_locked(flo, dserr);
 301        spin_unlock(&flo->generic_hdr.plh_inode->i_lock);
 302
 303        return 0;
 304}
 305
 306static struct rpc_cred *
 307ff_layout_get_mirror_cred(struct nfs4_ff_layout_mirror *mirror, u32 iomode)
 308{
 309        struct rpc_cred *cred, __rcu **pcred;
 310
 311        if (iomode == IOMODE_READ)
 312                pcred = &mirror->ro_cred;
 313        else
 314                pcred = &mirror->rw_cred;
 315
 316        rcu_read_lock();
 317        do {
 318                cred = rcu_dereference(*pcred);
 319                if (!cred)
 320                        break;
 321
 322                cred = get_rpccred_rcu(cred);
 323        } while(!cred);
 324        rcu_read_unlock();
 325        return cred;
 326}
 327
 328struct nfs_fh *
 329nfs4_ff_layout_select_ds_fh(struct pnfs_layout_segment *lseg, u32 mirror_idx)
 330{
 331        struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, mirror_idx);
 332        struct nfs_fh *fh = NULL;
 333
 334        if (!ff_layout_mirror_valid(lseg, mirror)) {
 335                pr_err_ratelimited("NFS: %s: No data server for mirror offset index %d\n",
 336                        __func__, mirror_idx);
 337                goto out;
 338        }
 339
 340        /* FIXME: For now assume there is only 1 version available for the DS */
 341        fh = &mirror->fh_versions[0];
 342out:
 343        return fh;
 344}
 345
 346/**
 347 * nfs4_ff_layout_prepare_ds - prepare a DS connection for an RPC call
 348 * @lseg: the layout segment we're operating on
 349 * @ds_idx: index of the DS to use
 350 * @fail_return: return layout on connect failure?
 351 *
 352 * Try to prepare a DS connection to accept an RPC call. This involves
 353 * selecting a mirror to use and connecting the client to it if it's not
 354 * already connected.
 355 *
 356 * Since we only need a single functioning mirror to satisfy a read, we don't
 357 * want to return the layout if there is one. For writes though, any down
 358 * mirror should result in a LAYOUTRETURN. @fail_return is how we distinguish
 359 * between the two cases.
 360 *
 361 * Returns a pointer to a connected DS object on success or NULL on failure.
 362 */
 363struct nfs4_pnfs_ds *
 364nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
 365                          bool fail_return)
 366{
 367        struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
 368        struct nfs4_pnfs_ds *ds = NULL;
 369        struct nfs4_deviceid_node *devid;
 370        struct inode *ino = lseg->pls_layout->plh_inode;
 371        struct nfs_server *s = NFS_SERVER(ino);
 372        unsigned int max_payload;
 373
 374        if (!ff_layout_mirror_valid(lseg, mirror)) {
 375                pr_err_ratelimited("NFS: %s: No data server for offset index %d\n",
 376                        __func__, ds_idx);
 377                goto out;
 378        }
 379
 380        devid = &mirror->mirror_ds->id_node;
 381        if (ff_layout_test_devid_unavailable(devid))
 382                goto out_fail;
 383
 384        ds = mirror->mirror_ds->ds;
 385        /* matching smp_wmb() in _nfs4_pnfs_v3/4_ds_connect */
 386        smp_rmb();
 387        if (ds->ds_clp)
 388                goto out;
 389
 390        /* FIXME: For now we assume the server sent only one version of NFS
 391         * to use for the DS.
 392         */
 393        nfs4_pnfs_ds_connect(s, ds, devid, dataserver_timeo,
 394                             dataserver_retrans,
 395                             mirror->mirror_ds->ds_versions[0].version,
 396                             mirror->mirror_ds->ds_versions[0].minor_version,
 397                             RPC_AUTH_UNIX);
 398
 399        /* connect success, check rsize/wsize limit */
 400        if (ds->ds_clp) {
 401                max_payload =
 402                        nfs_block_size(rpc_max_payload(ds->ds_clp->cl_rpcclient),
 403                                       NULL);
 404                if (mirror->mirror_ds->ds_versions[0].rsize > max_payload)
 405                        mirror->mirror_ds->ds_versions[0].rsize = max_payload;
 406                if (mirror->mirror_ds->ds_versions[0].wsize > max_payload)
 407                        mirror->mirror_ds->ds_versions[0].wsize = max_payload;
 408                goto out;
 409        }
 410        ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
 411                                 mirror, lseg->pls_range.offset,
 412                                 lseg->pls_range.length, NFS4ERR_NXIO,
 413                                 OP_ILLEGAL, GFP_NOIO);
 414out_fail:
 415        if (fail_return || !ff_layout_has_available_ds(lseg))
 416                pnfs_error_mark_layout_for_return(ino, lseg);
 417        ds = NULL;
 418out:
 419        return ds;
 420}
 421
 422struct rpc_cred *
 423ff_layout_get_ds_cred(struct pnfs_layout_segment *lseg, u32 ds_idx,
 424                      struct rpc_cred *mdscred)
 425{
 426        struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
 427        struct rpc_cred *cred;
 428
 429        if (mirror) {
 430                cred = ff_layout_get_mirror_cred(mirror, lseg->pls_range.iomode);
 431                if (!cred)
 432                        cred = get_rpccred(mdscred);
 433        } else {
 434                cred = get_rpccred(mdscred);
 435        }
 436        return cred;
 437}
 438
 439/**
 440* Find or create a DS rpc client with th MDS server rpc client auth flavor
 441* in the nfs_client cl_ds_clients list.
 442*/
 443struct rpc_clnt *
 444nfs4_ff_find_or_create_ds_client(struct pnfs_layout_segment *lseg, u32 ds_idx,
 445                                 struct nfs_client *ds_clp, struct inode *inode)
 446{
 447        struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
 448
 449        switch (mirror->mirror_ds->ds_versions[0].version) {
 450        case 3:
 451                /* For NFSv3 DS, flavor is set when creating DS connections */
 452                return ds_clp->cl_rpcclient;
 453        case 4:
 454                return nfs4_find_or_create_ds_client(ds_clp, inode);
 455        default:
 456                BUG();
 457        }
 458}
 459
 460static bool is_range_intersecting(u64 offset1, u64 length1,
 461                                  u64 offset2, u64 length2)
 462{
 463        u64 end1 = end_offset(offset1, length1);
 464        u64 end2 = end_offset(offset2, length2);
 465
 466        return (end1 == NFS4_MAX_UINT64 || end1 > offset2) &&
 467               (end2 == NFS4_MAX_UINT64 || end2 > offset1);
 468}
 469
 470/* called with inode i_lock held */
 471int ff_layout_encode_ds_ioerr(struct nfs4_flexfile_layout *flo,
 472                              struct xdr_stream *xdr, int *count,
 473                              const struct pnfs_layout_range *range)
 474{
 475        struct nfs4_ff_layout_ds_err *err, *n;
 476        __be32 *p;
 477
 478        list_for_each_entry_safe(err, n, &flo->error_list, list) {
 479                if (!is_range_intersecting(err->offset, err->length,
 480                                           range->offset, range->length))
 481                        continue;
 482                /* offset(8) + length(8) + stateid(NFS4_STATEID_SIZE)
 483                 * + array length + deviceid(NFS4_DEVICEID4_SIZE)
 484                 * + status(4) + opnum(4)
 485                 */
 486                p = xdr_reserve_space(xdr,
 487                                28 + NFS4_STATEID_SIZE + NFS4_DEVICEID4_SIZE);
 488                if (unlikely(!p))
 489                        return -ENOBUFS;
 490                p = xdr_encode_hyper(p, err->offset);
 491                p = xdr_encode_hyper(p, err->length);
 492                p = xdr_encode_opaque_fixed(p, &err->stateid,
 493                                            NFS4_STATEID_SIZE);
 494                /* Encode 1 error */
 495                *p++ = cpu_to_be32(1);
 496                p = xdr_encode_opaque_fixed(p, &err->deviceid,
 497                                            NFS4_DEVICEID4_SIZE);
 498                *p++ = cpu_to_be32(err->status);
 499                *p++ = cpu_to_be32(err->opnum);
 500                *count += 1;
 501                list_del(&err->list);
 502                dprintk("%s: offset %llu length %llu status %d op %d count %d\n",
 503                        __func__, err->offset, err->length, err->status,
 504                        err->opnum, *count);
 505                kfree(err);
 506        }
 507
 508        return 0;
 509}
 510
 511static bool ff_read_layout_has_available_ds(struct pnfs_layout_segment *lseg)
 512{
 513        struct nfs4_ff_layout_mirror *mirror;
 514        struct nfs4_deviceid_node *devid;
 515        u32 idx;
 516
 517        for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
 518                mirror = FF_LAYOUT_COMP(lseg, idx);
 519                if (mirror && mirror->mirror_ds) {
 520                        devid = &mirror->mirror_ds->id_node;
 521                        if (!ff_layout_test_devid_unavailable(devid))
 522                                return true;
 523                }
 524        }
 525
 526        return false;
 527}
 528
 529static bool ff_rw_layout_has_available_ds(struct pnfs_layout_segment *lseg)
 530{
 531        struct nfs4_ff_layout_mirror *mirror;
 532        struct nfs4_deviceid_node *devid;
 533        u32 idx;
 534
 535        for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
 536                mirror = FF_LAYOUT_COMP(lseg, idx);
 537                if (!mirror || !mirror->mirror_ds)
 538                        return false;
 539                devid = &mirror->mirror_ds->id_node;
 540                if (ff_layout_test_devid_unavailable(devid))
 541                        return false;
 542        }
 543
 544        return FF_LAYOUT_MIRROR_COUNT(lseg) != 0;
 545}
 546
 547bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg)
 548{
 549        if (lseg->pls_range.iomode == IOMODE_READ)
 550                return  ff_read_layout_has_available_ds(lseg);
 551        /* Note: RW layout needs all mirrors available */
 552        return ff_rw_layout_has_available_ds(lseg);
 553}
 554
 555bool ff_layout_avoid_mds_available_ds(struct pnfs_layout_segment *lseg)
 556{
 557        return ff_layout_no_fallback_to_mds(lseg) ||
 558               ff_layout_has_available_ds(lseg);
 559}
 560
 561bool ff_layout_avoid_read_on_rw(struct pnfs_layout_segment *lseg)
 562{
 563        return lseg->pls_range.iomode == IOMODE_RW &&
 564               ff_layout_no_read_on_rw(lseg);
 565}
 566
 567module_param(dataserver_retrans, uint, 0644);
 568MODULE_PARM_DESC(dataserver_retrans, "The  number of times the NFSv4.1 client "
 569                        "retries a request before it attempts further "
 570                        " recovery  action.");
 571module_param(dataserver_timeo, uint, 0644);
 572MODULE_PARM_DESC(dataserver_timeo, "The time (in tenths of a second) the "
 573                        "NFSv4.1  client  waits for a response from a "
 574                        " data server before it retries an NFS request.");
 575