linux/fs/nfs/pnfs_nfs.c
<<
>>
Prefs
   1/*
   2 * Common NFS I/O  operations for the pnfs file based
   3 * layout drivers.
   4 *
   5 * Copyright (c) 2014, Primary Data, Inc. All rights reserved.
   6 *
   7 * Tom Haynes <loghyr@primarydata.com>
   8 */
   9
  10#include <linux/nfs_fs.h>
  11#include <linux/nfs_page.h>
  12#include <linux/sunrpc/addr.h>
  13#include <linux/module.h>
  14
  15#include "nfs4session.h"
  16#include "internal.h"
  17#include "pnfs.h"
  18
  19#define NFSDBG_FACILITY         NFSDBG_PNFS
  20
  21void pnfs_generic_rw_release(void *data)
  22{
  23        struct nfs_pgio_header *hdr = data;
  24
  25        nfs_put_client(hdr->ds_clp);
  26        hdr->mds_ops->rpc_release(data);
  27}
  28EXPORT_SYMBOL_GPL(pnfs_generic_rw_release);
  29
  30/* Fake up some data that will cause nfs_commit_release to retry the writes. */
  31void pnfs_generic_prepare_to_resend_writes(struct nfs_commit_data *data)
  32{
  33        struct nfs_page *first = nfs_list_entry(data->pages.next);
  34
  35        data->task.tk_status = 0;
  36        memcpy(&data->verf.verifier, &first->wb_verf,
  37               sizeof(data->verf.verifier));
  38        data->verf.verifier.data[0]++; /* ensure verifier mismatch */
  39}
  40EXPORT_SYMBOL_GPL(pnfs_generic_prepare_to_resend_writes);
  41
  42void pnfs_generic_write_commit_done(struct rpc_task *task, void *data)
  43{
  44        struct nfs_commit_data *wdata = data;
  45
  46        /* Note this may cause RPC to be resent */
  47        wdata->mds_ops->rpc_call_done(task, data);
  48}
  49EXPORT_SYMBOL_GPL(pnfs_generic_write_commit_done);
  50
  51void pnfs_generic_commit_release(void *calldata)
  52{
  53        struct nfs_commit_data *data = calldata;
  54
  55        data->completion_ops->completion(data);
  56        pnfs_put_lseg(data->lseg);
  57        nfs_put_client(data->ds_clp);
  58        nfs_commitdata_release(data);
  59}
  60EXPORT_SYMBOL_GPL(pnfs_generic_commit_release);
  61
  62/* The generic layer is about to remove the req from the commit list.
  63 * If this will make the bucket empty, it will need to put the lseg reference.
  64 * Note this must be called holding the inode (/cinfo) lock
  65 */
  66void
  67pnfs_generic_clear_request_commit(struct nfs_page *req,
  68                                  struct nfs_commit_info *cinfo)
  69{
  70        struct pnfs_layout_segment *freeme = NULL;
  71
  72        if (!test_and_clear_bit(PG_COMMIT_TO_DS, &req->wb_flags))
  73                goto out;
  74        cinfo->ds->nwritten--;
  75        if (list_is_singular(&req->wb_list)) {
  76                struct pnfs_commit_bucket *bucket;
  77
  78                bucket = list_first_entry(&req->wb_list,
  79                                          struct pnfs_commit_bucket,
  80                                          written);
  81                freeme = bucket->wlseg;
  82                bucket->wlseg = NULL;
  83        }
  84out:
  85        nfs_request_remove_commit_list(req, cinfo);
  86        pnfs_put_lseg_locked(freeme);
  87}
  88EXPORT_SYMBOL_GPL(pnfs_generic_clear_request_commit);
  89
  90static int
  91pnfs_generic_transfer_commit_list(struct list_head *src, struct list_head *dst,
  92                                  struct nfs_commit_info *cinfo, int max)
  93{
  94        struct nfs_page *req, *tmp;
  95        int ret = 0;
  96
  97        list_for_each_entry_safe(req, tmp, src, wb_list) {
  98                if (!nfs_lock_request(req))
  99                        continue;
 100                kref_get(&req->wb_kref);
 101                if (cond_resched_lock(cinfo->lock))
 102                        list_safe_reset_next(req, tmp, wb_list);
 103                nfs_request_remove_commit_list(req, cinfo);
 104                clear_bit(PG_COMMIT_TO_DS, &req->wb_flags);
 105                nfs_list_add_request(req, dst);
 106                ret++;
 107                if ((ret == max) && !cinfo->dreq)
 108                        break;
 109        }
 110        return ret;
 111}
 112
 113static int
 114pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
 115                                 struct nfs_commit_info *cinfo,
 116                                 int max)
 117{
 118        struct list_head *src = &bucket->written;
 119        struct list_head *dst = &bucket->committing;
 120        int ret;
 121
 122        lockdep_assert_held(cinfo->lock);
 123        ret = pnfs_generic_transfer_commit_list(src, dst, cinfo, max);
 124        if (ret) {
 125                cinfo->ds->nwritten -= ret;
 126                cinfo->ds->ncommitting += ret;
 127                if (bucket->clseg == NULL)
 128                        bucket->clseg = pnfs_get_lseg(bucket->wlseg);
 129                if (list_empty(src)) {
 130                        pnfs_put_lseg_locked(bucket->wlseg);
 131                        bucket->wlseg = NULL;
 132                }
 133        }
 134        return ret;
 135}
 136
 137/* Move reqs from written to committing lists, returning count
 138 * of number moved.
 139 */
 140int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo,
 141                                   int max)
 142{
 143        int i, rv = 0, cnt;
 144
 145        lockdep_assert_held(cinfo->lock);
 146        for (i = 0; i < cinfo->ds->nbuckets && max != 0; i++) {
 147                cnt = pnfs_generic_scan_ds_commit_list(&cinfo->ds->buckets[i],
 148                                                       cinfo, max);
 149                max -= cnt;
 150                rv += cnt;
 151        }
 152        return rv;
 153}
 154EXPORT_SYMBOL_GPL(pnfs_generic_scan_commit_lists);
 155
 156/* Pull everything off the committing lists and dump into @dst.  */
 157void pnfs_generic_recover_commit_reqs(struct list_head *dst,
 158                                      struct nfs_commit_info *cinfo)
 159{
 160        struct pnfs_commit_bucket *b;
 161        struct pnfs_layout_segment *freeme;
 162        int i;
 163
 164        lockdep_assert_held(cinfo->lock);
 165restart:
 166        for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) {
 167                if (pnfs_generic_transfer_commit_list(&b->written, dst,
 168                                                      cinfo, 0)) {
 169                        freeme = b->wlseg;
 170                        b->wlseg = NULL;
 171                        spin_unlock(cinfo->lock);
 172                        pnfs_put_lseg(freeme);
 173                        spin_lock(cinfo->lock);
 174                        goto restart;
 175                }
 176        }
 177        cinfo->ds->nwritten = 0;
 178}
 179EXPORT_SYMBOL_GPL(pnfs_generic_recover_commit_reqs);
 180
 181static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx)
 182{
 183        struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
 184        struct pnfs_commit_bucket *bucket;
 185        struct pnfs_layout_segment *freeme;
 186        LIST_HEAD(pages);
 187        int i;
 188
 189        spin_lock(cinfo->lock);
 190        for (i = idx; i < fl_cinfo->nbuckets; i++) {
 191                bucket = &fl_cinfo->buckets[i];
 192                if (list_empty(&bucket->committing))
 193                        continue;
 194                freeme = bucket->clseg;
 195                bucket->clseg = NULL;
 196                list_splice_init(&bucket->committing, &pages);
 197                spin_unlock(cinfo->lock);
 198                nfs_retry_commit(&pages, freeme, cinfo, i);
 199                pnfs_put_lseg(freeme);
 200                spin_lock(cinfo->lock);
 201        }
 202        spin_unlock(cinfo->lock);
 203}
 204
 205static unsigned int
 206pnfs_generic_alloc_ds_commits(struct nfs_commit_info *cinfo,
 207                              struct list_head *list)
 208{
 209        struct pnfs_ds_commit_info *fl_cinfo;
 210        struct pnfs_commit_bucket *bucket;
 211        struct nfs_commit_data *data;
 212        int i;
 213        unsigned int nreq = 0;
 214
 215        fl_cinfo = cinfo->ds;
 216        bucket = fl_cinfo->buckets;
 217        for (i = 0; i < fl_cinfo->nbuckets; i++, bucket++) {
 218                if (list_empty(&bucket->committing))
 219                        continue;
 220                data = nfs_commitdata_alloc();
 221                if (!data)
 222                        break;
 223                data->ds_commit_index = i;
 224                list_add(&data->pages, list);
 225                nreq++;
 226        }
 227
 228        /* Clean up on error */
 229        pnfs_generic_retry_commit(cinfo, i);
 230        return nreq;
 231}
 232
 233static inline
 234void pnfs_fetch_commit_bucket_list(struct list_head *pages,
 235                struct nfs_commit_data *data,
 236                struct nfs_commit_info *cinfo)
 237{
 238        struct pnfs_commit_bucket *bucket;
 239
 240        bucket = &cinfo->ds->buckets[data->ds_commit_index];
 241        spin_lock(cinfo->lock);
 242        list_splice_init(&bucket->committing, pages);
 243        data->lseg = bucket->clseg;
 244        bucket->clseg = NULL;
 245        spin_unlock(cinfo->lock);
 246
 247}
 248
 249/* This follows nfs_commit_list pretty closely */
 250int
 251pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
 252                             int how, struct nfs_commit_info *cinfo,
 253                             int (*initiate_commit)(struct nfs_commit_data *data,
 254                                                    int how))
 255{
 256        struct nfs_commit_data *data, *tmp;
 257        LIST_HEAD(list);
 258        unsigned int nreq = 0;
 259
 260        if (!list_empty(mds_pages)) {
 261                data = nfs_commitdata_alloc();
 262                if (data != NULL) {
 263                        data->ds_commit_index = -1;
 264                        list_add(&data->pages, &list);
 265                        nreq++;
 266                } else {
 267                        nfs_retry_commit(mds_pages, NULL, cinfo, 0);
 268                        pnfs_generic_retry_commit(cinfo, 0);
 269                        cinfo->completion_ops->error_cleanup(NFS_I(inode));
 270                        return -ENOMEM;
 271                }
 272        }
 273
 274        nreq += pnfs_generic_alloc_ds_commits(cinfo, &list);
 275
 276        if (nreq == 0) {
 277                cinfo->completion_ops->error_cleanup(NFS_I(inode));
 278                goto out;
 279        }
 280
 281        atomic_add(nreq, &cinfo->mds->rpcs_out);
 282
 283        list_for_each_entry_safe(data, tmp, &list, pages) {
 284                list_del_init(&data->pages);
 285                if (data->ds_commit_index < 0) {
 286                        nfs_init_commit(data, mds_pages, NULL, cinfo);
 287                        nfs_initiate_commit(NFS_CLIENT(inode), data,
 288                                            NFS_PROTO(data->inode),
 289                                            data->mds_ops, how, 0);
 290                } else {
 291                        LIST_HEAD(pages);
 292
 293                        pnfs_fetch_commit_bucket_list(&pages, data, cinfo);
 294                        nfs_init_commit(data, &pages, data->lseg, cinfo);
 295                        initiate_commit(data, how);
 296                }
 297        }
 298out:
 299        cinfo->ds->ncommitting = 0;
 300        return PNFS_ATTEMPTED;
 301}
 302EXPORT_SYMBOL_GPL(pnfs_generic_commit_pagelist);
 303
 304/*
 305 * Data server cache
 306 *
 307 * Data servers can be mapped to different device ids.
 308 * nfs4_pnfs_ds reference counting
 309 *   - set to 1 on allocation
 310 *   - incremented when a device id maps a data server already in the cache.
 311 *   - decremented when deviceid is removed from the cache.
 312 */
 313static DEFINE_SPINLOCK(nfs4_ds_cache_lock);
 314static LIST_HEAD(nfs4_data_server_cache);
 315
 316/* Debug routines */
 317static void
 318print_ds(struct nfs4_pnfs_ds *ds)
 319{
 320        if (ds == NULL) {
 321                printk(KERN_WARNING "%s NULL device\n", __func__);
 322                return;
 323        }
 324        printk(KERN_WARNING "        ds %s\n"
 325                "        ref count %d\n"
 326                "        client %p\n"
 327                "        cl_exchange_flags %x\n",
 328                ds->ds_remotestr,
 329                atomic_read(&ds->ds_count), ds->ds_clp,
 330                ds->ds_clp ? ds->ds_clp->cl_exchange_flags : 0);
 331}
 332
 333static bool
 334same_sockaddr(struct sockaddr *addr1, struct sockaddr *addr2)
 335{
 336        struct sockaddr_in *a, *b;
 337        struct sockaddr_in6 *a6, *b6;
 338
 339        if (addr1->sa_family != addr2->sa_family)
 340                return false;
 341
 342        switch (addr1->sa_family) {
 343        case AF_INET:
 344                a = (struct sockaddr_in *)addr1;
 345                b = (struct sockaddr_in *)addr2;
 346
 347                if (a->sin_addr.s_addr == b->sin_addr.s_addr &&
 348                    a->sin_port == b->sin_port)
 349                        return true;
 350                break;
 351
 352        case AF_INET6:
 353                a6 = (struct sockaddr_in6 *)addr1;
 354                b6 = (struct sockaddr_in6 *)addr2;
 355
 356                /* LINKLOCAL addresses must have matching scope_id */
 357                if (ipv6_addr_src_scope(&a6->sin6_addr) ==
 358                    IPV6_ADDR_SCOPE_LINKLOCAL &&
 359                    a6->sin6_scope_id != b6->sin6_scope_id)
 360                        return false;
 361
 362                if (ipv6_addr_equal(&a6->sin6_addr, &b6->sin6_addr) &&
 363                    a6->sin6_port == b6->sin6_port)
 364                        return true;
 365                break;
 366
 367        default:
 368                dprintk("%s: unhandled address family: %u\n",
 369                        __func__, addr1->sa_family);
 370                return false;
 371        }
 372
 373        return false;
 374}
 375
 376/*
 377 * Checks if 'dsaddrs1' contains a subset of 'dsaddrs2'. If it does,
 378 * declare a match.
 379 */
 380static bool
 381_same_data_server_addrs_locked(const struct list_head *dsaddrs1,
 382                               const struct list_head *dsaddrs2)
 383{
 384        struct nfs4_pnfs_ds_addr *da1, *da2;
 385        struct sockaddr *sa1, *sa2;
 386        bool match = false;
 387
 388        list_for_each_entry(da1, dsaddrs1, da_node) {
 389                sa1 = (struct sockaddr *)&da1->da_addr;
 390                match = false;
 391                list_for_each_entry(da2, dsaddrs2, da_node) {
 392                        sa2 = (struct sockaddr *)&da2->da_addr;
 393                        match = same_sockaddr(sa1, sa2);
 394                        if (match)
 395                                break;
 396                }
 397                if (!match)
 398                        break;
 399        }
 400        return match;
 401}
 402
 403/*
 404 * Lookup DS by addresses.  nfs4_ds_cache_lock is held
 405 */
 406static struct nfs4_pnfs_ds *
 407_data_server_lookup_locked(const struct list_head *dsaddrs)
 408{
 409        struct nfs4_pnfs_ds *ds;
 410
 411        list_for_each_entry(ds, &nfs4_data_server_cache, ds_node)
 412                if (_same_data_server_addrs_locked(&ds->ds_addrs, dsaddrs))
 413                        return ds;
 414        return NULL;
 415}
 416
 417static void destroy_ds(struct nfs4_pnfs_ds *ds)
 418{
 419        struct nfs4_pnfs_ds_addr *da;
 420
 421        dprintk("--> %s\n", __func__);
 422        ifdebug(FACILITY)
 423                print_ds(ds);
 424
 425        nfs_put_client(ds->ds_clp);
 426
 427        while (!list_empty(&ds->ds_addrs)) {
 428                da = list_first_entry(&ds->ds_addrs,
 429                                      struct nfs4_pnfs_ds_addr,
 430                                      da_node);
 431                list_del_init(&da->da_node);
 432                kfree(da->da_remotestr);
 433                kfree(da);
 434        }
 435
 436        kfree(ds->ds_remotestr);
 437        kfree(ds);
 438}
 439
 440void nfs4_pnfs_ds_put(struct nfs4_pnfs_ds *ds)
 441{
 442        if (atomic_dec_and_lock(&ds->ds_count,
 443                                &nfs4_ds_cache_lock)) {
 444                list_del_init(&ds->ds_node);
 445                spin_unlock(&nfs4_ds_cache_lock);
 446                destroy_ds(ds);
 447        }
 448}
 449EXPORT_SYMBOL_GPL(nfs4_pnfs_ds_put);
 450
 451/*
 452 * Create a string with a human readable address and port to avoid
 453 * complicated setup around many dprinks.
 454 */
 455static char *
 456nfs4_pnfs_remotestr(struct list_head *dsaddrs, gfp_t gfp_flags)
 457{
 458        struct nfs4_pnfs_ds_addr *da;
 459        char *remotestr;
 460        size_t len;
 461        char *p;
 462
 463        len = 3;        /* '{', '}' and eol */
 464        list_for_each_entry(da, dsaddrs, da_node) {
 465                len += strlen(da->da_remotestr) + 1;    /* string plus comma */
 466        }
 467
 468        remotestr = kzalloc(len, gfp_flags);
 469        if (!remotestr)
 470                return NULL;
 471
 472        p = remotestr;
 473        *(p++) = '{';
 474        len--;
 475        list_for_each_entry(da, dsaddrs, da_node) {
 476                size_t ll = strlen(da->da_remotestr);
 477
 478                if (ll > len)
 479                        goto out_err;
 480
 481                memcpy(p, da->da_remotestr, ll);
 482                p += ll;
 483                len -= ll;
 484
 485                if (len < 1)
 486                        goto out_err;
 487                (*p++) = ',';
 488                len--;
 489        }
 490        if (len < 2)
 491                goto out_err;
 492        *(p++) = '}';
 493        *p = '\0';
 494        return remotestr;
 495out_err:
 496        kfree(remotestr);
 497        return NULL;
 498}
 499
 500/*
 501 * Given a list of multipath struct nfs4_pnfs_ds_addr, add it to ds cache if
 502 * uncached and return cached struct nfs4_pnfs_ds.
 503 */
 504struct nfs4_pnfs_ds *
 505nfs4_pnfs_ds_add(struct list_head *dsaddrs, gfp_t gfp_flags)
 506{
 507        struct nfs4_pnfs_ds *tmp_ds, *ds = NULL;
 508        char *remotestr;
 509
 510        if (list_empty(dsaddrs)) {
 511                dprintk("%s: no addresses defined\n", __func__);
 512                goto out;
 513        }
 514
 515        ds = kzalloc(sizeof(*ds), gfp_flags);
 516        if (!ds)
 517                goto out;
 518
 519        /* this is only used for debugging, so it's ok if its NULL */
 520        remotestr = nfs4_pnfs_remotestr(dsaddrs, gfp_flags);
 521
 522        spin_lock(&nfs4_ds_cache_lock);
 523        tmp_ds = _data_server_lookup_locked(dsaddrs);
 524        if (tmp_ds == NULL) {
 525                INIT_LIST_HEAD(&ds->ds_addrs);
 526                list_splice_init(dsaddrs, &ds->ds_addrs);
 527                ds->ds_remotestr = remotestr;
 528                atomic_set(&ds->ds_count, 1);
 529                INIT_LIST_HEAD(&ds->ds_node);
 530                ds->ds_clp = NULL;
 531                list_add(&ds->ds_node, &nfs4_data_server_cache);
 532                dprintk("%s add new data server %s\n", __func__,
 533                        ds->ds_remotestr);
 534        } else {
 535                kfree(remotestr);
 536                kfree(ds);
 537                atomic_inc(&tmp_ds->ds_count);
 538                dprintk("%s data server %s found, inc'ed ds_count to %d\n",
 539                        __func__, tmp_ds->ds_remotestr,
 540                        atomic_read(&tmp_ds->ds_count));
 541                ds = tmp_ds;
 542        }
 543        spin_unlock(&nfs4_ds_cache_lock);
 544out:
 545        return ds;
 546}
 547EXPORT_SYMBOL_GPL(nfs4_pnfs_ds_add);
 548
 549static void nfs4_wait_ds_connect(struct nfs4_pnfs_ds *ds)
 550{
 551        might_sleep();
 552        wait_on_bit(&ds->ds_state, NFS4DS_CONNECTING,
 553                        TASK_KILLABLE);
 554}
 555
 556static void nfs4_clear_ds_conn_bit(struct nfs4_pnfs_ds *ds)
 557{
 558        smp_mb__before_atomic();
 559        clear_bit(NFS4DS_CONNECTING, &ds->ds_state);
 560        smp_mb__after_atomic();
 561        wake_up_bit(&ds->ds_state, NFS4DS_CONNECTING);
 562}
 563
 564static struct nfs_client *(*get_v3_ds_connect)(
 565                        struct nfs_client *mds_clp,
 566                        const struct sockaddr *ds_addr,
 567                        int ds_addrlen,
 568                        int ds_proto,
 569                        unsigned int ds_timeo,
 570                        unsigned int ds_retrans,
 571                        rpc_authflavor_t au_flavor);
 572
 573static bool load_v3_ds_connect(void)
 574{
 575        if (!get_v3_ds_connect) {
 576                get_v3_ds_connect = symbol_request(nfs3_set_ds_client);
 577                WARN_ON_ONCE(!get_v3_ds_connect);
 578        }
 579
 580        return(get_v3_ds_connect != NULL);
 581}
 582
 583void nfs4_pnfs_v3_ds_connect_unload(void)
 584{
 585        if (get_v3_ds_connect) {
 586                symbol_put(nfs3_set_ds_client);
 587                get_v3_ds_connect = NULL;
 588        }
 589}
 590EXPORT_SYMBOL_GPL(nfs4_pnfs_v3_ds_connect_unload);
 591
 592static int _nfs4_pnfs_v3_ds_connect(struct nfs_server *mds_srv,
 593                                 struct nfs4_pnfs_ds *ds,
 594                                 unsigned int timeo,
 595                                 unsigned int retrans,
 596                                 rpc_authflavor_t au_flavor)
 597{
 598        struct nfs_client *clp = ERR_PTR(-EIO);
 599        struct nfs4_pnfs_ds_addr *da;
 600        int status = 0;
 601
 602        dprintk("--> %s DS %s au_flavor %d\n", __func__,
 603                ds->ds_remotestr, au_flavor);
 604
 605        if (!load_v3_ds_connect())
 606                goto out;
 607
 608        list_for_each_entry(da, &ds->ds_addrs, da_node) {
 609                dprintk("%s: DS %s: trying address %s\n",
 610                        __func__, ds->ds_remotestr, da->da_remotestr);
 611
 612                clp = get_v3_ds_connect(mds_srv->nfs_client,
 613                                        (struct sockaddr *)&da->da_addr,
 614                                        da->da_addrlen, IPPROTO_TCP,
 615                                        timeo, retrans, au_flavor);
 616                if (!IS_ERR(clp))
 617                        break;
 618        }
 619
 620        if (IS_ERR(clp)) {
 621                status = PTR_ERR(clp);
 622                goto out;
 623        }
 624
 625        smp_wmb();
 626        ds->ds_clp = clp;
 627        dprintk("%s [new] addr: %s\n", __func__, ds->ds_remotestr);
 628out:
 629        return status;
 630}
 631
 632static int _nfs4_pnfs_v4_ds_connect(struct nfs_server *mds_srv,
 633                                 struct nfs4_pnfs_ds *ds,
 634                                 unsigned int timeo,
 635                                 unsigned int retrans,
 636                                 u32 minor_version,
 637                                 rpc_authflavor_t au_flavor)
 638{
 639        struct nfs_client *clp = ERR_PTR(-EIO);
 640        struct nfs4_pnfs_ds_addr *da;
 641        int status = 0;
 642
 643        dprintk("--> %s DS %s au_flavor %d\n", __func__, ds->ds_remotestr,
 644                au_flavor);
 645
 646        list_for_each_entry(da, &ds->ds_addrs, da_node) {
 647                dprintk("%s: DS %s: trying address %s\n",
 648                        __func__, ds->ds_remotestr, da->da_remotestr);
 649
 650                clp = nfs4_set_ds_client(mds_srv->nfs_client,
 651                                        (struct sockaddr *)&da->da_addr,
 652                                        da->da_addrlen, IPPROTO_TCP,
 653                                        timeo, retrans, minor_version,
 654                                        au_flavor);
 655                if (!IS_ERR(clp))
 656                        break;
 657        }
 658
 659        if (IS_ERR(clp)) {
 660                status = PTR_ERR(clp);
 661                goto out;
 662        }
 663
 664        status = nfs4_init_ds_session(clp, mds_srv->nfs_client->cl_lease_time);
 665        if (status)
 666                goto out_put;
 667
 668        smp_wmb();
 669        ds->ds_clp = clp;
 670        dprintk("%s [new] addr: %s\n", __func__, ds->ds_remotestr);
 671out:
 672        return status;
 673out_put:
 674        nfs_put_client(clp);
 675        goto out;
 676}
 677
 678/*
 679 * Create an rpc connection to the nfs4_pnfs_ds data server.
 680 * Currently only supports IPv4 and IPv6 addresses.
 681 * If connection fails, make devid unavailable.
 682 */
 683void nfs4_pnfs_ds_connect(struct nfs_server *mds_srv, struct nfs4_pnfs_ds *ds,
 684                          struct nfs4_deviceid_node *devid, unsigned int timeo,
 685                          unsigned int retrans, u32 version,
 686                          u32 minor_version, rpc_authflavor_t au_flavor)
 687{
 688        if (test_and_set_bit(NFS4DS_CONNECTING, &ds->ds_state) == 0) {
 689                int err = 0;
 690
 691                if (version == 3) {
 692                        err = _nfs4_pnfs_v3_ds_connect(mds_srv, ds, timeo,
 693                                                       retrans, au_flavor);
 694                } else if (version == 4) {
 695                        err = _nfs4_pnfs_v4_ds_connect(mds_srv, ds, timeo,
 696                                                       retrans, minor_version,
 697                                                       au_flavor);
 698                } else {
 699                        dprintk("%s: unsupported DS version %d\n", __func__,
 700                                version);
 701                        err = -EPROTONOSUPPORT;
 702                }
 703
 704                if (err)
 705                        nfs4_mark_deviceid_unavailable(devid);
 706                nfs4_clear_ds_conn_bit(ds);
 707        } else {
 708                nfs4_wait_ds_connect(ds);
 709        }
 710}
 711EXPORT_SYMBOL_GPL(nfs4_pnfs_ds_connect);
 712
 713/*
 714 * Currently only supports ipv4, ipv6 and one multi-path address.
 715 */
 716struct nfs4_pnfs_ds_addr *
 717nfs4_decode_mp_ds_addr(struct net *net, struct xdr_stream *xdr, gfp_t gfp_flags)
 718{
 719        struct nfs4_pnfs_ds_addr *da = NULL;
 720        char *buf, *portstr;
 721        __be16 port;
 722        int nlen, rlen;
 723        int tmp[2];
 724        __be32 *p;
 725        char *netid, *match_netid;
 726        size_t len, match_netid_len;
 727        char *startsep = "";
 728        char *endsep = "";
 729
 730
 731        /* r_netid */
 732        p = xdr_inline_decode(xdr, 4);
 733        if (unlikely(!p))
 734                goto out_err;
 735        nlen = be32_to_cpup(p++);
 736
 737        p = xdr_inline_decode(xdr, nlen);
 738        if (unlikely(!p))
 739                goto out_err;
 740
 741        netid = kmalloc(nlen+1, gfp_flags);
 742        if (unlikely(!netid))
 743                goto out_err;
 744
 745        netid[nlen] = '\0';
 746        memcpy(netid, p, nlen);
 747
 748        /* r_addr: ip/ip6addr with port in dec octets - see RFC 5665 */
 749        p = xdr_inline_decode(xdr, 4);
 750        if (unlikely(!p))
 751                goto out_free_netid;
 752        rlen = be32_to_cpup(p);
 753
 754        p = xdr_inline_decode(xdr, rlen);
 755        if (unlikely(!p))
 756                goto out_free_netid;
 757
 758        /* port is ".ABC.DEF", 8 chars max */
 759        if (rlen > INET6_ADDRSTRLEN + IPV6_SCOPE_ID_LEN + 8) {
 760                dprintk("%s: Invalid address, length %d\n", __func__,
 761                        rlen);
 762                goto out_free_netid;
 763        }
 764        buf = kmalloc(rlen + 1, gfp_flags);
 765        if (!buf) {
 766                dprintk("%s: Not enough memory\n", __func__);
 767                goto out_free_netid;
 768        }
 769        buf[rlen] = '\0';
 770        memcpy(buf, p, rlen);
 771
 772        /* replace port '.' with '-' */
 773        portstr = strrchr(buf, '.');
 774        if (!portstr) {
 775                dprintk("%s: Failed finding expected dot in port\n",
 776                        __func__);
 777                goto out_free_buf;
 778        }
 779        *portstr = '-';
 780
 781        /* find '.' between address and port */
 782        portstr = strrchr(buf, '.');
 783        if (!portstr) {
 784                dprintk("%s: Failed finding expected dot between address and "
 785                        "port\n", __func__);
 786                goto out_free_buf;
 787        }
 788        *portstr = '\0';
 789
 790        da = kzalloc(sizeof(*da), gfp_flags);
 791        if (unlikely(!da))
 792                goto out_free_buf;
 793
 794        INIT_LIST_HEAD(&da->da_node);
 795
 796        if (!rpc_pton(net, buf, portstr-buf, (struct sockaddr *)&da->da_addr,
 797                      sizeof(da->da_addr))) {
 798                dprintk("%s: error parsing address %s\n", __func__, buf);
 799                goto out_free_da;
 800        }
 801
 802        portstr++;
 803        sscanf(portstr, "%d-%d", &tmp[0], &tmp[1]);
 804        port = htons((tmp[0] << 8) | (tmp[1]));
 805
 806        switch (da->da_addr.ss_family) {
 807        case AF_INET:
 808                ((struct sockaddr_in *)&da->da_addr)->sin_port = port;
 809                da->da_addrlen = sizeof(struct sockaddr_in);
 810                match_netid = "tcp";
 811                match_netid_len = 3;
 812                break;
 813
 814        case AF_INET6:
 815                ((struct sockaddr_in6 *)&da->da_addr)->sin6_port = port;
 816                da->da_addrlen = sizeof(struct sockaddr_in6);
 817                match_netid = "tcp6";
 818                match_netid_len = 4;
 819                startsep = "[";
 820                endsep = "]";
 821                break;
 822
 823        default:
 824                dprintk("%s: unsupported address family: %u\n",
 825                        __func__, da->da_addr.ss_family);
 826                goto out_free_da;
 827        }
 828
 829        if (nlen != match_netid_len || strncmp(netid, match_netid, nlen)) {
 830                dprintk("%s: ERROR: r_netid \"%s\" != \"%s\"\n",
 831                        __func__, netid, match_netid);
 832                goto out_free_da;
 833        }
 834
 835        /* save human readable address */
 836        len = strlen(startsep) + strlen(buf) + strlen(endsep) + 7;
 837        da->da_remotestr = kzalloc(len, gfp_flags);
 838
 839        /* NULL is ok, only used for dprintk */
 840        if (da->da_remotestr)
 841                snprintf(da->da_remotestr, len, "%s%s%s:%u", startsep,
 842                         buf, endsep, ntohs(port));
 843
 844        dprintk("%s: Parsed DS addr %s\n", __func__, da->da_remotestr);
 845        kfree(buf);
 846        kfree(netid);
 847        return da;
 848
 849out_free_da:
 850        kfree(da);
 851out_free_buf:
 852        dprintk("%s: Error parsing DS addr: %s\n", __func__, buf);
 853        kfree(buf);
 854out_free_netid:
 855        kfree(netid);
 856out_err:
 857        return NULL;
 858}
 859EXPORT_SYMBOL_GPL(nfs4_decode_mp_ds_addr);
 860
 861void
 862pnfs_layout_mark_request_commit(struct nfs_page *req,
 863                                struct pnfs_layout_segment *lseg,
 864                                struct nfs_commit_info *cinfo,
 865                                u32 ds_commit_idx)
 866{
 867        struct list_head *list;
 868        struct pnfs_commit_bucket *buckets;
 869
 870        spin_lock(cinfo->lock);
 871        buckets = cinfo->ds->buckets;
 872        list = &buckets[ds_commit_idx].written;
 873        if (list_empty(list)) {
 874                /* Non-empty buckets hold a reference on the lseg.  That ref
 875                 * is normally transferred to the COMMIT call and released
 876                 * there.  It could also be released if the last req is pulled
 877                 * off due to a rewrite, in which case it will be done in
 878                 * pnfs_common_clear_request_commit
 879                 */
 880                WARN_ON_ONCE(buckets[ds_commit_idx].wlseg != NULL);
 881                buckets[ds_commit_idx].wlseg = pnfs_get_lseg(lseg);
 882        }
 883        set_bit(PG_COMMIT_TO_DS, &req->wb_flags);
 884        cinfo->ds->nwritten++;
 885
 886        nfs_request_add_commit_list_locked(req, list, cinfo);
 887        spin_unlock(cinfo->lock);
 888        nfs_mark_page_unstable(req->wb_page, cinfo);
 889}
 890EXPORT_SYMBOL_GPL(pnfs_layout_mark_request_commit);
 891
 892int
 893pnfs_nfs_generic_sync(struct inode *inode, bool datasync)
 894{
 895        if (datasync)
 896                return 0;
 897        return pnfs_layoutcommit_inode(inode, true);
 898}
 899EXPORT_SYMBOL_GPL(pnfs_nfs_generic_sync);
 900
 901