linux/fs/nfs/pnfs_nfs.c
<<
>>
Prefs
   1/*
   2 * Common NFS I/O  operations for the pnfs file based
   3 * layout drivers.
   4 *
   5 * Copyright (c) 2014, Primary Data, Inc. All rights reserved.
   6 *
   7 * Tom Haynes <loghyr@primarydata.com>
   8 */
   9
  10#include <linux/nfs_fs.h>
  11#include <linux/nfs_page.h>
  12#include <linux/sunrpc/addr.h>
  13#include <linux/module.h>
  14
  15#include "nfs4session.h"
  16#include "internal.h"
  17#include "pnfs.h"
  18
  19#define NFSDBG_FACILITY         NFSDBG_PNFS
  20
  21void pnfs_generic_rw_release(void *data)
  22{
  23        struct nfs_pgio_header *hdr = data;
  24
  25        nfs_put_client(hdr->ds_clp);
  26        hdr->mds_ops->rpc_release(data);
  27}
  28EXPORT_SYMBOL_GPL(pnfs_generic_rw_release);
  29
  30/* Fake up some data that will cause nfs_commit_release to retry the writes. */
  31void pnfs_generic_prepare_to_resend_writes(struct nfs_commit_data *data)
  32{
  33        struct nfs_page *first = nfs_list_entry(data->pages.next);
  34
  35        data->task.tk_status = 0;
  36        memcpy(&data->verf.verifier, &first->wb_verf,
  37               sizeof(data->verf.verifier));
  38        data->verf.verifier.data[0]++; /* ensure verifier mismatch */
  39}
  40EXPORT_SYMBOL_GPL(pnfs_generic_prepare_to_resend_writes);
  41
  42void pnfs_generic_write_commit_done(struct rpc_task *task, void *data)
  43{
  44        struct nfs_commit_data *wdata = data;
  45
  46        /* Note this may cause RPC to be resent */
  47        wdata->mds_ops->rpc_call_done(task, data);
  48}
  49EXPORT_SYMBOL_GPL(pnfs_generic_write_commit_done);
  50
  51void pnfs_generic_commit_release(void *calldata)
  52{
  53        struct nfs_commit_data *data = calldata;
  54
  55        data->completion_ops->completion(data);
  56        pnfs_put_lseg(data->lseg);
  57        nfs_put_client(data->ds_clp);
  58        nfs_commitdata_release(data);
  59}
  60EXPORT_SYMBOL_GPL(pnfs_generic_commit_release);
  61
  62/* The generic layer is about to remove the req from the commit list.
  63 * If this will make the bucket empty, it will need to put the lseg reference.
  64 * Note this must be called holding the inode (/cinfo) lock
  65 */
  66void
  67pnfs_generic_clear_request_commit(struct nfs_page *req,
  68                                  struct nfs_commit_info *cinfo)
  69{
  70        struct pnfs_layout_segment *freeme = NULL;
  71
  72        if (!test_and_clear_bit(PG_COMMIT_TO_DS, &req->wb_flags))
  73                goto out;
  74        cinfo->ds->nwritten--;
  75        if (list_is_singular(&req->wb_list)) {
  76                struct pnfs_commit_bucket *bucket;
  77
  78                bucket = list_first_entry(&req->wb_list,
  79                                          struct pnfs_commit_bucket,
  80                                          written);
  81                freeme = bucket->wlseg;
  82                bucket->wlseg = NULL;
  83        }
  84out:
  85        nfs_request_remove_commit_list(req, cinfo);
  86        pnfs_put_lseg_locked(freeme);
  87}
  88EXPORT_SYMBOL_GPL(pnfs_generic_clear_request_commit);
  89
  90static int
  91pnfs_generic_transfer_commit_list(struct list_head *src, struct list_head *dst,
  92                                  struct nfs_commit_info *cinfo, int max)
  93{
  94        struct nfs_page *req, *tmp;
  95        int ret = 0;
  96
  97        list_for_each_entry_safe(req, tmp, src, wb_list) {
  98                if (!nfs_lock_request(req))
  99                        continue;
 100                kref_get(&req->wb_kref);
 101                if (cond_resched_lock(cinfo->lock))
 102                        list_safe_reset_next(req, tmp, wb_list);
 103                nfs_request_remove_commit_list(req, cinfo);
 104                clear_bit(PG_COMMIT_TO_DS, &req->wb_flags);
 105                nfs_list_add_request(req, dst);
 106                ret++;
 107                if ((ret == max) && !cinfo->dreq)
 108                        break;
 109        }
 110        return ret;
 111}
 112
 113static int
 114pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
 115                                 struct nfs_commit_info *cinfo,
 116                                 int max)
 117{
 118        struct list_head *src = &bucket->written;
 119        struct list_head *dst = &bucket->committing;
 120        int ret;
 121
 122        lockdep_assert_held(cinfo->lock);
 123        ret = pnfs_generic_transfer_commit_list(src, dst, cinfo, max);
 124        if (ret) {
 125                cinfo->ds->nwritten -= ret;
 126                cinfo->ds->ncommitting += ret;
 127                bucket->clseg = bucket->wlseg;
 128                if (list_empty(src))
 129                        bucket->wlseg = NULL;
 130                else
 131                        pnfs_get_lseg(bucket->clseg);
 132        }
 133        return ret;
 134}
 135
 136/* Move reqs from written to committing lists, returning count
 137 * of number moved.
 138 */
 139int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo,
 140                                   int max)
 141{
 142        int i, rv = 0, cnt;
 143
 144        lockdep_assert_held(cinfo->lock);
 145        for (i = 0; i < cinfo->ds->nbuckets && max != 0; i++) {
 146                cnt = pnfs_generic_scan_ds_commit_list(&cinfo->ds->buckets[i],
 147                                                       cinfo, max);
 148                max -= cnt;
 149                rv += cnt;
 150        }
 151        return rv;
 152}
 153EXPORT_SYMBOL_GPL(pnfs_generic_scan_commit_lists);
 154
 155/* Pull everything off the committing lists and dump into @dst.  */
 156void pnfs_generic_recover_commit_reqs(struct list_head *dst,
 157                                      struct nfs_commit_info *cinfo)
 158{
 159        struct pnfs_commit_bucket *b;
 160        struct pnfs_layout_segment *freeme;
 161        int i;
 162
 163        lockdep_assert_held(cinfo->lock);
 164restart:
 165        for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) {
 166                if (pnfs_generic_transfer_commit_list(&b->written, dst,
 167                                                      cinfo, 0)) {
 168                        freeme = b->wlseg;
 169                        b->wlseg = NULL;
 170                        spin_unlock(cinfo->lock);
 171                        pnfs_put_lseg(freeme);
 172                        spin_lock(cinfo->lock);
 173                        goto restart;
 174                }
 175        }
 176        cinfo->ds->nwritten = 0;
 177}
 178EXPORT_SYMBOL_GPL(pnfs_generic_recover_commit_reqs);
 179
 180static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx)
 181{
 182        struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
 183        struct pnfs_commit_bucket *bucket;
 184        struct pnfs_layout_segment *freeme;
 185        int i;
 186
 187        for (i = idx; i < fl_cinfo->nbuckets; i++) {
 188                bucket = &fl_cinfo->buckets[i];
 189                if (list_empty(&bucket->committing))
 190                        continue;
 191                nfs_retry_commit(&bucket->committing, bucket->clseg, cinfo, i);
 192                spin_lock(cinfo->lock);
 193                freeme = bucket->clseg;
 194                bucket->clseg = NULL;
 195                spin_unlock(cinfo->lock);
 196                pnfs_put_lseg(freeme);
 197        }
 198}
 199
 200static unsigned int
 201pnfs_generic_alloc_ds_commits(struct nfs_commit_info *cinfo,
 202                              struct list_head *list)
 203{
 204        struct pnfs_ds_commit_info *fl_cinfo;
 205        struct pnfs_commit_bucket *bucket;
 206        struct nfs_commit_data *data;
 207        int i;
 208        unsigned int nreq = 0;
 209
 210        fl_cinfo = cinfo->ds;
 211        bucket = fl_cinfo->buckets;
 212        for (i = 0; i < fl_cinfo->nbuckets; i++, bucket++) {
 213                if (list_empty(&bucket->committing))
 214                        continue;
 215                data = nfs_commitdata_alloc();
 216                if (!data)
 217                        break;
 218                data->ds_commit_index = i;
 219                spin_lock(cinfo->lock);
 220                data->lseg = bucket->clseg;
 221                bucket->clseg = NULL;
 222                spin_unlock(cinfo->lock);
 223                list_add(&data->pages, list);
 224                nreq++;
 225        }
 226
 227        /* Clean up on error */
 228        pnfs_generic_retry_commit(cinfo, i);
 229        return nreq;
 230}
 231
 232/* This follows nfs_commit_list pretty closely */
 233int
 234pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
 235                             int how, struct nfs_commit_info *cinfo,
 236                             int (*initiate_commit)(struct nfs_commit_data *data,
 237                                                    int how))
 238{
 239        struct nfs_commit_data *data, *tmp;
 240        LIST_HEAD(list);
 241        unsigned int nreq = 0;
 242
 243        if (!list_empty(mds_pages)) {
 244                data = nfs_commitdata_alloc();
 245                if (data != NULL) {
 246                        data->lseg = NULL;
 247                        list_add(&data->pages, &list);
 248                        nreq++;
 249                } else {
 250                        nfs_retry_commit(mds_pages, NULL, cinfo, 0);
 251                        pnfs_generic_retry_commit(cinfo, 0);
 252                        cinfo->completion_ops->error_cleanup(NFS_I(inode));
 253                        return -ENOMEM;
 254                }
 255        }
 256
 257        nreq += pnfs_generic_alloc_ds_commits(cinfo, &list);
 258
 259        if (nreq == 0) {
 260                cinfo->completion_ops->error_cleanup(NFS_I(inode));
 261                goto out;
 262        }
 263
 264        atomic_add(nreq, &cinfo->mds->rpcs_out);
 265
 266        list_for_each_entry_safe(data, tmp, &list, pages) {
 267                list_del_init(&data->pages);
 268                if (!data->lseg) {
 269                        nfs_init_commit(data, mds_pages, NULL, cinfo);
 270                        nfs_initiate_commit(NFS_CLIENT(inode), data,
 271                                            NFS_PROTO(data->inode),
 272                                            data->mds_ops, how, 0);
 273                } else {
 274                        struct pnfs_commit_bucket *buckets;
 275
 276                        buckets = cinfo->ds->buckets;
 277                        nfs_init_commit(data,
 278                                        &buckets[data->ds_commit_index].committing,
 279                                        data->lseg,
 280                                        cinfo);
 281                        initiate_commit(data, how);
 282                }
 283        }
 284out:
 285        cinfo->ds->ncommitting = 0;
 286        return PNFS_ATTEMPTED;
 287}
 288EXPORT_SYMBOL_GPL(pnfs_generic_commit_pagelist);
 289
 290/*
 291 * Data server cache
 292 *
 293 * Data servers can be mapped to different device ids.
 294 * nfs4_pnfs_ds reference counting
 295 *   - set to 1 on allocation
 296 *   - incremented when a device id maps a data server already in the cache.
 297 *   - decremented when deviceid is removed from the cache.
 298 */
 299static DEFINE_SPINLOCK(nfs4_ds_cache_lock);
 300static LIST_HEAD(nfs4_data_server_cache);
 301
 302/* Debug routines */
 303static void
 304print_ds(struct nfs4_pnfs_ds *ds)
 305{
 306        if (ds == NULL) {
 307                printk(KERN_WARNING "%s NULL device\n", __func__);
 308                return;
 309        }
 310        printk(KERN_WARNING "        ds %s\n"
 311                "        ref count %d\n"
 312                "        client %p\n"
 313                "        cl_exchange_flags %x\n",
 314                ds->ds_remotestr,
 315                atomic_read(&ds->ds_count), ds->ds_clp,
 316                ds->ds_clp ? ds->ds_clp->cl_exchange_flags : 0);
 317}
 318
 319static bool
 320same_sockaddr(struct sockaddr *addr1, struct sockaddr *addr2)
 321{
 322        struct sockaddr_in *a, *b;
 323        struct sockaddr_in6 *a6, *b6;
 324
 325        if (addr1->sa_family != addr2->sa_family)
 326                return false;
 327
 328        switch (addr1->sa_family) {
 329        case AF_INET:
 330                a = (struct sockaddr_in *)addr1;
 331                b = (struct sockaddr_in *)addr2;
 332
 333                if (a->sin_addr.s_addr == b->sin_addr.s_addr &&
 334                    a->sin_port == b->sin_port)
 335                        return true;
 336                break;
 337
 338        case AF_INET6:
 339                a6 = (struct sockaddr_in6 *)addr1;
 340                b6 = (struct sockaddr_in6 *)addr2;
 341
 342                /* LINKLOCAL addresses must have matching scope_id */
 343                if (ipv6_addr_src_scope(&a6->sin6_addr) ==
 344                    IPV6_ADDR_SCOPE_LINKLOCAL &&
 345                    a6->sin6_scope_id != b6->sin6_scope_id)
 346                        return false;
 347
 348                if (ipv6_addr_equal(&a6->sin6_addr, &b6->sin6_addr) &&
 349                    a6->sin6_port == b6->sin6_port)
 350                        return true;
 351                break;
 352
 353        default:
 354                dprintk("%s: unhandled address family: %u\n",
 355                        __func__, addr1->sa_family);
 356                return false;
 357        }
 358
 359        return false;
 360}
 361
 362static bool
 363_same_data_server_addrs_locked(const struct list_head *dsaddrs1,
 364                               const struct list_head *dsaddrs2)
 365{
 366        struct nfs4_pnfs_ds_addr *da1, *da2;
 367
 368        /* step through both lists, comparing as we go */
 369        for (da1 = list_first_entry(dsaddrs1, typeof(*da1), da_node),
 370             da2 = list_first_entry(dsaddrs2, typeof(*da2), da_node);
 371             da1 != NULL && da2 != NULL;
 372             da1 = list_entry(da1->da_node.next, typeof(*da1), da_node),
 373             da2 = list_entry(da2->da_node.next, typeof(*da2), da_node)) {
 374                if (!same_sockaddr((struct sockaddr *)&da1->da_addr,
 375                                   (struct sockaddr *)&da2->da_addr))
 376                        return false;
 377        }
 378        if (da1 == NULL && da2 == NULL)
 379                return true;
 380
 381        return false;
 382}
 383
 384/*
 385 * Lookup DS by addresses.  nfs4_ds_cache_lock is held
 386 */
 387static struct nfs4_pnfs_ds *
 388_data_server_lookup_locked(const struct list_head *dsaddrs)
 389{
 390        struct nfs4_pnfs_ds *ds;
 391
 392        list_for_each_entry(ds, &nfs4_data_server_cache, ds_node)
 393                if (_same_data_server_addrs_locked(&ds->ds_addrs, dsaddrs))
 394                        return ds;
 395        return NULL;
 396}
 397
 398static void destroy_ds(struct nfs4_pnfs_ds *ds)
 399{
 400        struct nfs4_pnfs_ds_addr *da;
 401
 402        dprintk("--> %s\n", __func__);
 403        ifdebug(FACILITY)
 404                print_ds(ds);
 405
 406        nfs_put_client(ds->ds_clp);
 407
 408        while (!list_empty(&ds->ds_addrs)) {
 409                da = list_first_entry(&ds->ds_addrs,
 410                                      struct nfs4_pnfs_ds_addr,
 411                                      da_node);
 412                list_del_init(&da->da_node);
 413                kfree(da->da_remotestr);
 414                kfree(da);
 415        }
 416
 417        kfree(ds->ds_remotestr);
 418        kfree(ds);
 419}
 420
 421void nfs4_pnfs_ds_put(struct nfs4_pnfs_ds *ds)
 422{
 423        if (atomic_dec_and_lock(&ds->ds_count,
 424                                &nfs4_ds_cache_lock)) {
 425                list_del_init(&ds->ds_node);
 426                spin_unlock(&nfs4_ds_cache_lock);
 427                destroy_ds(ds);
 428        }
 429}
 430EXPORT_SYMBOL_GPL(nfs4_pnfs_ds_put);
 431
 432/*
 433 * Create a string with a human readable address and port to avoid
 434 * complicated setup around many dprinks.
 435 */
 436static char *
 437nfs4_pnfs_remotestr(struct list_head *dsaddrs, gfp_t gfp_flags)
 438{
 439        struct nfs4_pnfs_ds_addr *da;
 440        char *remotestr;
 441        size_t len;
 442        char *p;
 443
 444        len = 3;        /* '{', '}' and eol */
 445        list_for_each_entry(da, dsaddrs, da_node) {
 446                len += strlen(da->da_remotestr) + 1;    /* string plus comma */
 447        }
 448
 449        remotestr = kzalloc(len, gfp_flags);
 450        if (!remotestr)
 451                return NULL;
 452
 453        p = remotestr;
 454        *(p++) = '{';
 455        len--;
 456        list_for_each_entry(da, dsaddrs, da_node) {
 457                size_t ll = strlen(da->da_remotestr);
 458
 459                if (ll > len)
 460                        goto out_err;
 461
 462                memcpy(p, da->da_remotestr, ll);
 463                p += ll;
 464                len -= ll;
 465
 466                if (len < 1)
 467                        goto out_err;
 468                (*p++) = ',';
 469                len--;
 470        }
 471        if (len < 2)
 472                goto out_err;
 473        *(p++) = '}';
 474        *p = '\0';
 475        return remotestr;
 476out_err:
 477        kfree(remotestr);
 478        return NULL;
 479}
 480
 481/*
 482 * Given a list of multipath struct nfs4_pnfs_ds_addr, add it to ds cache if
 483 * uncached and return cached struct nfs4_pnfs_ds.
 484 */
 485struct nfs4_pnfs_ds *
 486nfs4_pnfs_ds_add(struct list_head *dsaddrs, gfp_t gfp_flags)
 487{
 488        struct nfs4_pnfs_ds *tmp_ds, *ds = NULL;
 489        char *remotestr;
 490
 491        if (list_empty(dsaddrs)) {
 492                dprintk("%s: no addresses defined\n", __func__);
 493                goto out;
 494        }
 495
 496        ds = kzalloc(sizeof(*ds), gfp_flags);
 497        if (!ds)
 498                goto out;
 499
 500        /* this is only used for debugging, so it's ok if its NULL */
 501        remotestr = nfs4_pnfs_remotestr(dsaddrs, gfp_flags);
 502
 503        spin_lock(&nfs4_ds_cache_lock);
 504        tmp_ds = _data_server_lookup_locked(dsaddrs);
 505        if (tmp_ds == NULL) {
 506                INIT_LIST_HEAD(&ds->ds_addrs);
 507                list_splice_init(dsaddrs, &ds->ds_addrs);
 508                ds->ds_remotestr = remotestr;
 509                atomic_set(&ds->ds_count, 1);
 510                INIT_LIST_HEAD(&ds->ds_node);
 511                ds->ds_clp = NULL;
 512                list_add(&ds->ds_node, &nfs4_data_server_cache);
 513                dprintk("%s add new data server %s\n", __func__,
 514                        ds->ds_remotestr);
 515        } else {
 516                kfree(remotestr);
 517                kfree(ds);
 518                atomic_inc(&tmp_ds->ds_count);
 519                dprintk("%s data server %s found, inc'ed ds_count to %d\n",
 520                        __func__, tmp_ds->ds_remotestr,
 521                        atomic_read(&tmp_ds->ds_count));
 522                ds = tmp_ds;
 523        }
 524        spin_unlock(&nfs4_ds_cache_lock);
 525out:
 526        return ds;
 527}
 528EXPORT_SYMBOL_GPL(nfs4_pnfs_ds_add);
 529
 530static void nfs4_wait_ds_connect(struct nfs4_pnfs_ds *ds)
 531{
 532        might_sleep();
 533        wait_on_bit(&ds->ds_state, NFS4DS_CONNECTING,
 534                        TASK_KILLABLE);
 535}
 536
 537static void nfs4_clear_ds_conn_bit(struct nfs4_pnfs_ds *ds)
 538{
 539        smp_mb__before_atomic();
 540        clear_bit(NFS4DS_CONNECTING, &ds->ds_state);
 541        smp_mb__after_atomic();
 542        wake_up_bit(&ds->ds_state, NFS4DS_CONNECTING);
 543}
 544
 545static struct nfs_client *(*get_v3_ds_connect)(
 546                        struct nfs_client *mds_clp,
 547                        const struct sockaddr *ds_addr,
 548                        int ds_addrlen,
 549                        int ds_proto,
 550                        unsigned int ds_timeo,
 551                        unsigned int ds_retrans,
 552                        rpc_authflavor_t au_flavor);
 553
 554static bool load_v3_ds_connect(void)
 555{
 556        if (!get_v3_ds_connect) {
 557                get_v3_ds_connect = symbol_request(nfs3_set_ds_client);
 558                WARN_ON_ONCE(!get_v3_ds_connect);
 559        }
 560
 561        return(get_v3_ds_connect != NULL);
 562}
 563
 564void nfs4_pnfs_v3_ds_connect_unload(void)
 565{
 566        if (get_v3_ds_connect) {
 567                symbol_put(nfs3_set_ds_client);
 568                get_v3_ds_connect = NULL;
 569        }
 570}
 571EXPORT_SYMBOL_GPL(nfs4_pnfs_v3_ds_connect_unload);
 572
 573static int _nfs4_pnfs_v3_ds_connect(struct nfs_server *mds_srv,
 574                                 struct nfs4_pnfs_ds *ds,
 575                                 unsigned int timeo,
 576                                 unsigned int retrans,
 577                                 rpc_authflavor_t au_flavor)
 578{
 579        struct nfs_client *clp = ERR_PTR(-EIO);
 580        struct nfs4_pnfs_ds_addr *da;
 581        int status = 0;
 582
 583        dprintk("--> %s DS %s au_flavor %d\n", __func__,
 584                ds->ds_remotestr, au_flavor);
 585
 586        if (!load_v3_ds_connect())
 587                goto out;
 588
 589        list_for_each_entry(da, &ds->ds_addrs, da_node) {
 590                dprintk("%s: DS %s: trying address %s\n",
 591                        __func__, ds->ds_remotestr, da->da_remotestr);
 592
 593                clp = get_v3_ds_connect(mds_srv->nfs_client,
 594                                        (struct sockaddr *)&da->da_addr,
 595                                        da->da_addrlen, IPPROTO_TCP,
 596                                        timeo, retrans, au_flavor);
 597                if (!IS_ERR(clp))
 598                        break;
 599        }
 600
 601        if (IS_ERR(clp)) {
 602                status = PTR_ERR(clp);
 603                goto out;
 604        }
 605
 606        smp_wmb();
 607        ds->ds_clp = clp;
 608        dprintk("%s [new] addr: %s\n", __func__, ds->ds_remotestr);
 609out:
 610        return status;
 611}
 612
 613static int _nfs4_pnfs_v4_ds_connect(struct nfs_server *mds_srv,
 614                                 struct nfs4_pnfs_ds *ds,
 615                                 unsigned int timeo,
 616                                 unsigned int retrans,
 617                                 u32 minor_version,
 618                                 rpc_authflavor_t au_flavor)
 619{
 620        struct nfs_client *clp = ERR_PTR(-EIO);
 621        struct nfs4_pnfs_ds_addr *da;
 622        int status = 0;
 623
 624        dprintk("--> %s DS %s au_flavor %d\n", __func__, ds->ds_remotestr,
 625                au_flavor);
 626
 627        list_for_each_entry(da, &ds->ds_addrs, da_node) {
 628                dprintk("%s: DS %s: trying address %s\n",
 629                        __func__, ds->ds_remotestr, da->da_remotestr);
 630
 631                clp = nfs4_set_ds_client(mds_srv->nfs_client,
 632                                        (struct sockaddr *)&da->da_addr,
 633                                        da->da_addrlen, IPPROTO_TCP,
 634                                        timeo, retrans, minor_version,
 635                                        au_flavor);
 636                if (!IS_ERR(clp))
 637                        break;
 638        }
 639
 640        if (IS_ERR(clp)) {
 641                status = PTR_ERR(clp);
 642                goto out;
 643        }
 644
 645        status = nfs4_init_ds_session(clp, mds_srv->nfs_client->cl_lease_time);
 646        if (status)
 647                goto out_put;
 648
 649        smp_wmb();
 650        ds->ds_clp = clp;
 651        dprintk("%s [new] addr: %s\n", __func__, ds->ds_remotestr);
 652out:
 653        return status;
 654out_put:
 655        nfs_put_client(clp);
 656        goto out;
 657}
 658
 659/*
 660 * Create an rpc connection to the nfs4_pnfs_ds data server.
 661 * Currently only supports IPv4 and IPv6 addresses.
 662 * If connection fails, make devid unavailable.
 663 */
 664void nfs4_pnfs_ds_connect(struct nfs_server *mds_srv, struct nfs4_pnfs_ds *ds,
 665                          struct nfs4_deviceid_node *devid, unsigned int timeo,
 666                          unsigned int retrans, u32 version,
 667                          u32 minor_version, rpc_authflavor_t au_flavor)
 668{
 669        if (test_and_set_bit(NFS4DS_CONNECTING, &ds->ds_state) == 0) {
 670                int err = 0;
 671
 672                if (version == 3) {
 673                        err = _nfs4_pnfs_v3_ds_connect(mds_srv, ds, timeo,
 674                                                       retrans, au_flavor);
 675                } else if (version == 4) {
 676                        err = _nfs4_pnfs_v4_ds_connect(mds_srv, ds, timeo,
 677                                                       retrans, minor_version,
 678                                                       au_flavor);
 679                } else {
 680                        dprintk("%s: unsupported DS version %d\n", __func__,
 681                                version);
 682                        err = -EPROTONOSUPPORT;
 683                }
 684
 685                if (err)
 686                        nfs4_mark_deviceid_unavailable(devid);
 687                nfs4_clear_ds_conn_bit(ds);
 688        } else {
 689                nfs4_wait_ds_connect(ds);
 690        }
 691}
 692EXPORT_SYMBOL_GPL(nfs4_pnfs_ds_connect);
 693
 694/*
 695 * Currently only supports ipv4, ipv6 and one multi-path address.
 696 */
 697struct nfs4_pnfs_ds_addr *
 698nfs4_decode_mp_ds_addr(struct net *net, struct xdr_stream *xdr, gfp_t gfp_flags)
 699{
 700        struct nfs4_pnfs_ds_addr *da = NULL;
 701        char *buf, *portstr;
 702        __be16 port;
 703        int nlen, rlen;
 704        int tmp[2];
 705        __be32 *p;
 706        char *netid, *match_netid;
 707        size_t len, match_netid_len;
 708        char *startsep = "";
 709        char *endsep = "";
 710
 711
 712        /* r_netid */
 713        p = xdr_inline_decode(xdr, 4);
 714        if (unlikely(!p))
 715                goto out_err;
 716        nlen = be32_to_cpup(p++);
 717
 718        p = xdr_inline_decode(xdr, nlen);
 719        if (unlikely(!p))
 720                goto out_err;
 721
 722        netid = kmalloc(nlen+1, gfp_flags);
 723        if (unlikely(!netid))
 724                goto out_err;
 725
 726        netid[nlen] = '\0';
 727        memcpy(netid, p, nlen);
 728
 729        /* r_addr: ip/ip6addr with port in dec octets - see RFC 5665 */
 730        p = xdr_inline_decode(xdr, 4);
 731        if (unlikely(!p))
 732                goto out_free_netid;
 733        rlen = be32_to_cpup(p);
 734
 735        p = xdr_inline_decode(xdr, rlen);
 736        if (unlikely(!p))
 737                goto out_free_netid;
 738
 739        /* port is ".ABC.DEF", 8 chars max */
 740        if (rlen > INET6_ADDRSTRLEN + IPV6_SCOPE_ID_LEN + 8) {
 741                dprintk("%s: Invalid address, length %d\n", __func__,
 742                        rlen);
 743                goto out_free_netid;
 744        }
 745        buf = kmalloc(rlen + 1, gfp_flags);
 746        if (!buf) {
 747                dprintk("%s: Not enough memory\n", __func__);
 748                goto out_free_netid;
 749        }
 750        buf[rlen] = '\0';
 751        memcpy(buf, p, rlen);
 752
 753        /* replace port '.' with '-' */
 754        portstr = strrchr(buf, '.');
 755        if (!portstr) {
 756                dprintk("%s: Failed finding expected dot in port\n",
 757                        __func__);
 758                goto out_free_buf;
 759        }
 760        *portstr = '-';
 761
 762        /* find '.' between address and port */
 763        portstr = strrchr(buf, '.');
 764        if (!portstr) {
 765                dprintk("%s: Failed finding expected dot between address and "
 766                        "port\n", __func__);
 767                goto out_free_buf;
 768        }
 769        *portstr = '\0';
 770
 771        da = kzalloc(sizeof(*da), gfp_flags);
 772        if (unlikely(!da))
 773                goto out_free_buf;
 774
 775        INIT_LIST_HEAD(&da->da_node);
 776
 777        if (!rpc_pton(net, buf, portstr-buf, (struct sockaddr *)&da->da_addr,
 778                      sizeof(da->da_addr))) {
 779                dprintk("%s: error parsing address %s\n", __func__, buf);
 780                goto out_free_da;
 781        }
 782
 783        portstr++;
 784        sscanf(portstr, "%d-%d", &tmp[0], &tmp[1]);
 785        port = htons((tmp[0] << 8) | (tmp[1]));
 786
 787        switch (da->da_addr.ss_family) {
 788        case AF_INET:
 789                ((struct sockaddr_in *)&da->da_addr)->sin_port = port;
 790                da->da_addrlen = sizeof(struct sockaddr_in);
 791                match_netid = "tcp";
 792                match_netid_len = 3;
 793                break;
 794
 795        case AF_INET6:
 796                ((struct sockaddr_in6 *)&da->da_addr)->sin6_port = port;
 797                da->da_addrlen = sizeof(struct sockaddr_in6);
 798                match_netid = "tcp6";
 799                match_netid_len = 4;
 800                startsep = "[";
 801                endsep = "]";
 802                break;
 803
 804        default:
 805                dprintk("%s: unsupported address family: %u\n",
 806                        __func__, da->da_addr.ss_family);
 807                goto out_free_da;
 808        }
 809
 810        if (nlen != match_netid_len || strncmp(netid, match_netid, nlen)) {
 811                dprintk("%s: ERROR: r_netid \"%s\" != \"%s\"\n",
 812                        __func__, netid, match_netid);
 813                goto out_free_da;
 814        }
 815
 816        /* save human readable address */
 817        len = strlen(startsep) + strlen(buf) + strlen(endsep) + 7;
 818        da->da_remotestr = kzalloc(len, gfp_flags);
 819
 820        /* NULL is ok, only used for dprintk */
 821        if (da->da_remotestr)
 822                snprintf(da->da_remotestr, len, "%s%s%s:%u", startsep,
 823                         buf, endsep, ntohs(port));
 824
 825        dprintk("%s: Parsed DS addr %s\n", __func__, da->da_remotestr);
 826        kfree(buf);
 827        kfree(netid);
 828        return da;
 829
 830out_free_da:
 831        kfree(da);
 832out_free_buf:
 833        dprintk("%s: Error parsing DS addr: %s\n", __func__, buf);
 834        kfree(buf);
 835out_free_netid:
 836        kfree(netid);
 837out_err:
 838        return NULL;
 839}
 840EXPORT_SYMBOL_GPL(nfs4_decode_mp_ds_addr);
 841
 842void
 843pnfs_layout_mark_request_commit(struct nfs_page *req,
 844                                struct pnfs_layout_segment *lseg,
 845                                struct nfs_commit_info *cinfo,
 846                                u32 ds_commit_idx)
 847{
 848        struct list_head *list;
 849        struct pnfs_commit_bucket *buckets;
 850
 851        spin_lock(cinfo->lock);
 852        buckets = cinfo->ds->buckets;
 853        list = &buckets[ds_commit_idx].written;
 854        if (list_empty(list)) {
 855                /* Non-empty buckets hold a reference on the lseg.  That ref
 856                 * is normally transferred to the COMMIT call and released
 857                 * there.  It could also be released if the last req is pulled
 858                 * off due to a rewrite, in which case it will be done in
 859                 * pnfs_common_clear_request_commit
 860                 */
 861                WARN_ON_ONCE(buckets[ds_commit_idx].wlseg != NULL);
 862                buckets[ds_commit_idx].wlseg = pnfs_get_lseg(lseg);
 863        }
 864        set_bit(PG_COMMIT_TO_DS, &req->wb_flags);
 865        cinfo->ds->nwritten++;
 866        spin_unlock(cinfo->lock);
 867
 868        nfs_request_add_commit_list(req, list, cinfo);
 869}
 870EXPORT_SYMBOL_GPL(pnfs_layout_mark_request_commit);
 871
 872int
 873pnfs_nfs_generic_sync(struct inode *inode, bool datasync)
 874{
 875        if (datasync)
 876                return 0;
 877        return pnfs_layoutcommit_inode(inode, true);
 878}
 879EXPORT_SYMBOL_GPL(pnfs_nfs_generic_sync);
 880
 881