linux/net/sunrpc/clnt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/clnt.c
   3 *
   4 *  This file contains the high-level RPC interface.
   5 *  It is modeled as a finite state machine to support both synchronous
   6 *  and asynchronous requests.
   7 *
   8 *  -   RPC header generation and argument serialization.
   9 *  -   Credential refresh.
  10 *  -   TCP connect handling.
  11 *  -   Retry of operation when it is suspected the operation failed because
  12 *      of uid squashing on the server, or when the credentials were stale
  13 *      and need to be refreshed, or when a packet was damaged in transit.
  14 *      This may be have to be moved to the VFS layer.
  15 *
  16 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
  17 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
  18 */
  19
  20
  21#include <linux/module.h>
  22#include <linux/types.h>
  23#include <linux/kallsyms.h>
  24#include <linux/mm.h>
  25#include <linux/namei.h>
  26#include <linux/mount.h>
  27#include <linux/slab.h>
  28#include <linux/rcupdate.h>
  29#include <linux/utsname.h>
  30#include <linux/workqueue.h>
  31#include <linux/in.h>
  32#include <linux/in6.h>
  33#include <linux/un.h>
  34
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/addr.h>
  37#include <linux/sunrpc/rpc_pipe_fs.h>
  38#include <linux/sunrpc/metrics.h>
  39#include <linux/sunrpc/bc_xprt.h>
  40#include <trace/events/sunrpc.h>
  41
  42#include "sunrpc.h"
  43#include "netns.h"
  44
  45#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  46# define RPCDBG_FACILITY        RPCDBG_CALL
  47#endif
  48
  49#define dprint_status(t)                                        \
  50        dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
  51                        __func__, t->tk_status)
  52
  53/*
  54 * All RPC clients are linked into this list
  55 */
  56
  57static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
  58
  59
  60static void     call_start(struct rpc_task *task);
  61static void     call_reserve(struct rpc_task *task);
  62static void     call_reserveresult(struct rpc_task *task);
  63static void     call_allocate(struct rpc_task *task);
  64static void     call_decode(struct rpc_task *task);
  65static void     call_bind(struct rpc_task *task);
  66static void     call_bind_status(struct rpc_task *task);
  67static void     call_transmit(struct rpc_task *task);
  68#if defined(CONFIG_SUNRPC_BACKCHANNEL)
  69static void     call_bc_transmit(struct rpc_task *task);
  70#endif /* CONFIG_SUNRPC_BACKCHANNEL */
  71static void     call_status(struct rpc_task *task);
  72static void     call_transmit_status(struct rpc_task *task);
  73static void     call_refresh(struct rpc_task *task);
  74static void     call_refreshresult(struct rpc_task *task);
  75static void     call_timeout(struct rpc_task *task);
  76static void     call_connect(struct rpc_task *task);
  77static void     call_connect_status(struct rpc_task *task);
  78
  79static __be32   *rpc_encode_header(struct rpc_task *task);
  80static __be32   *rpc_verify_header(struct rpc_task *task);
  81static int      rpc_ping(struct rpc_clnt *clnt);
  82
  83static void rpc_register_client(struct rpc_clnt *clnt)
  84{
  85        struct net *net = rpc_net_ns(clnt);
  86        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  87
  88        spin_lock(&sn->rpc_client_lock);
  89        list_add(&clnt->cl_clients, &sn->all_clients);
  90        spin_unlock(&sn->rpc_client_lock);
  91}
  92
  93static void rpc_unregister_client(struct rpc_clnt *clnt)
  94{
  95        struct net *net = rpc_net_ns(clnt);
  96        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  97
  98        spin_lock(&sn->rpc_client_lock);
  99        list_del(&clnt->cl_clients);
 100        spin_unlock(&sn->rpc_client_lock);
 101}
 102
 103static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 104{
 105        rpc_remove_client_dir(clnt);
 106}
 107
 108static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 109{
 110        struct net *net = rpc_net_ns(clnt);
 111        struct super_block *pipefs_sb;
 112
 113        pipefs_sb = rpc_get_sb_net(net);
 114        if (pipefs_sb) {
 115                __rpc_clnt_remove_pipedir(clnt);
 116                rpc_put_sb_net(net);
 117        }
 118}
 119
 120static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
 121                                    struct rpc_clnt *clnt)
 122{
 123        static uint32_t clntid;
 124        const char *dir_name = clnt->cl_program->pipe_dir_name;
 125        char name[15];
 126        struct dentry *dir, *dentry;
 127
 128        dir = rpc_d_lookup_sb(sb, dir_name);
 129        if (dir == NULL) {
 130                pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
 131                return dir;
 132        }
 133        for (;;) {
 134                snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
 135                name[sizeof(name) - 1] = '\0';
 136                dentry = rpc_create_client_dir(dir, name, clnt);
 137                if (!IS_ERR(dentry))
 138                        break;
 139                if (dentry == ERR_PTR(-EEXIST))
 140                        continue;
 141                printk(KERN_INFO "RPC: Couldn't create pipefs entry"
 142                                " %s/%s, error %ld\n",
 143                                dir_name, name, PTR_ERR(dentry));
 144                break;
 145        }
 146        dput(dir);
 147        return dentry;
 148}
 149
 150static int
 151rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
 152{
 153        struct dentry *dentry;
 154
 155        if (clnt->cl_program->pipe_dir_name != NULL) {
 156                dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
 157                if (IS_ERR(dentry))
 158                        return PTR_ERR(dentry);
 159        }
 160        return 0;
 161}
 162
 163static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
 164{
 165        if (clnt->cl_program->pipe_dir_name == NULL)
 166                return 1;
 167
 168        switch (event) {
 169        case RPC_PIPEFS_MOUNT:
 170                if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
 171                        return 1;
 172                if (atomic_read(&clnt->cl_count) == 0)
 173                        return 1;
 174                break;
 175        case RPC_PIPEFS_UMOUNT:
 176                if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
 177                        return 1;
 178                break;
 179        }
 180        return 0;
 181}
 182
 183static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
 184                                   struct super_block *sb)
 185{
 186        struct dentry *dentry;
 187
 188        switch (event) {
 189        case RPC_PIPEFS_MOUNT:
 190                dentry = rpc_setup_pipedir_sb(sb, clnt);
 191                if (!dentry)
 192                        return -ENOENT;
 193                if (IS_ERR(dentry))
 194                        return PTR_ERR(dentry);
 195                break;
 196        case RPC_PIPEFS_UMOUNT:
 197                __rpc_clnt_remove_pipedir(clnt);
 198                break;
 199        default:
 200                printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
 201                return -ENOTSUPP;
 202        }
 203        return 0;
 204}
 205
 206static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
 207                                struct super_block *sb)
 208{
 209        int error = 0;
 210
 211        for (;; clnt = clnt->cl_parent) {
 212                if (!rpc_clnt_skip_event(clnt, event))
 213                        error = __rpc_clnt_handle_event(clnt, event, sb);
 214                if (error || clnt == clnt->cl_parent)
 215                        break;
 216        }
 217        return error;
 218}
 219
 220static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
 221{
 222        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
 223        struct rpc_clnt *clnt;
 224
 225        spin_lock(&sn->rpc_client_lock);
 226        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
 227                if (rpc_clnt_skip_event(clnt, event))
 228                        continue;
 229                spin_unlock(&sn->rpc_client_lock);
 230                return clnt;
 231        }
 232        spin_unlock(&sn->rpc_client_lock);
 233        return NULL;
 234}
 235
 236static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
 237                            void *ptr)
 238{
 239        struct super_block *sb = ptr;
 240        struct rpc_clnt *clnt;
 241        int error = 0;
 242
 243        while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
 244                error = __rpc_pipefs_event(clnt, event, sb);
 245                if (error)
 246                        break;
 247        }
 248        return error;
 249}
 250
 251static struct notifier_block rpc_clients_block = {
 252        .notifier_call  = rpc_pipefs_event,
 253        .priority       = SUNRPC_PIPEFS_RPC_PRIO,
 254};
 255
 256int rpc_clients_notifier_register(void)
 257{
 258        return rpc_pipefs_notifier_register(&rpc_clients_block);
 259}
 260
 261void rpc_clients_notifier_unregister(void)
 262{
 263        return rpc_pipefs_notifier_unregister(&rpc_clients_block);
 264}
 265
 266static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
 267                struct rpc_xprt *xprt,
 268                const struct rpc_timeout *timeout)
 269{
 270        struct rpc_xprt *old;
 271
 272        spin_lock(&clnt->cl_lock);
 273        old = rcu_dereference_protected(clnt->cl_xprt,
 274                        lockdep_is_held(&clnt->cl_lock));
 275
 276        if (!xprt_bound(xprt))
 277                clnt->cl_autobind = 1;
 278
 279        clnt->cl_timeout = timeout;
 280        rcu_assign_pointer(clnt->cl_xprt, xprt);
 281        spin_unlock(&clnt->cl_lock);
 282
 283        return old;
 284}
 285
 286static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
 287{
 288        clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
 289                        nodename, sizeof(clnt->cl_nodename));
 290}
 291
 292static int rpc_client_register(struct rpc_clnt *clnt,
 293                               rpc_authflavor_t pseudoflavor,
 294                               const char *client_name)
 295{
 296        struct rpc_auth_create_args auth_args = {
 297                .pseudoflavor = pseudoflavor,
 298                .target_name = client_name,
 299        };
 300        struct rpc_auth *auth;
 301        struct net *net = rpc_net_ns(clnt);
 302        struct super_block *pipefs_sb;
 303        int err;
 304
 305        rpc_clnt_debugfs_register(clnt);
 306
 307        pipefs_sb = rpc_get_sb_net(net);
 308        if (pipefs_sb) {
 309                err = rpc_setup_pipedir(pipefs_sb, clnt);
 310                if (err)
 311                        goto out;
 312        }
 313
 314        rpc_register_client(clnt);
 315        if (pipefs_sb)
 316                rpc_put_sb_net(net);
 317
 318        auth = rpcauth_create(&auth_args, clnt);
 319        if (IS_ERR(auth)) {
 320                dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
 321                                pseudoflavor);
 322                err = PTR_ERR(auth);
 323                goto err_auth;
 324        }
 325        return 0;
 326err_auth:
 327        pipefs_sb = rpc_get_sb_net(net);
 328        rpc_unregister_client(clnt);
 329        __rpc_clnt_remove_pipedir(clnt);
 330out:
 331        if (pipefs_sb)
 332                rpc_put_sb_net(net);
 333        rpc_clnt_debugfs_unregister(clnt);
 334        return err;
 335}
 336
 337static DEFINE_IDA(rpc_clids);
 338
 339void rpc_cleanup_clids(void)
 340{
 341        ida_destroy(&rpc_clids);
 342}
 343
 344static int rpc_alloc_clid(struct rpc_clnt *clnt)
 345{
 346        int clid;
 347
 348        clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
 349        if (clid < 0)
 350                return clid;
 351        clnt->cl_clid = clid;
 352        return 0;
 353}
 354
 355static void rpc_free_clid(struct rpc_clnt *clnt)
 356{
 357        ida_simple_remove(&rpc_clids, clnt->cl_clid);
 358}
 359
 360static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
 361                struct rpc_xprt_switch *xps,
 362                struct rpc_xprt *xprt,
 363                struct rpc_clnt *parent)
 364{
 365        const struct rpc_program *program = args->program;
 366        const struct rpc_version *version;
 367        struct rpc_clnt *clnt = NULL;
 368        const struct rpc_timeout *timeout;
 369        const char *nodename = args->nodename;
 370        int err;
 371
 372        /* sanity check the name before trying to print it */
 373        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
 374                        program->name, args->servername, xprt);
 375
 376        err = rpciod_up();
 377        if (err)
 378                goto out_no_rpciod;
 379
 380        err = -EINVAL;
 381        if (args->version >= program->nrvers)
 382                goto out_err;
 383        version = program->version[args->version];
 384        if (version == NULL)
 385                goto out_err;
 386
 387        err = -ENOMEM;
 388        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
 389        if (!clnt)
 390                goto out_err;
 391        clnt->cl_parent = parent ? : clnt;
 392
 393        err = rpc_alloc_clid(clnt);
 394        if (err)
 395                goto out_no_clid;
 396
 397        clnt->cl_procinfo = version->procs;
 398        clnt->cl_maxproc  = version->nrprocs;
 399        clnt->cl_prog     = args->prognumber ? : program->number;
 400        clnt->cl_vers     = version->number;
 401        clnt->cl_stats    = program->stats;
 402        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
 403        rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
 404        err = -ENOMEM;
 405        if (clnt->cl_metrics == NULL)
 406                goto out_no_stats;
 407        clnt->cl_program  = program;
 408        INIT_LIST_HEAD(&clnt->cl_tasks);
 409        spin_lock_init(&clnt->cl_lock);
 410
 411        timeout = xprt->timeout;
 412        if (args->timeout != NULL) {
 413                memcpy(&clnt->cl_timeout_default, args->timeout,
 414                                sizeof(clnt->cl_timeout_default));
 415                timeout = &clnt->cl_timeout_default;
 416        }
 417
 418        rpc_clnt_set_transport(clnt, xprt, timeout);
 419        xprt_iter_init(&clnt->cl_xpi, xps);
 420        xprt_switch_put(xps);
 421
 422        clnt->cl_rtt = &clnt->cl_rtt_default;
 423        rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
 424
 425        atomic_set(&clnt->cl_count, 1);
 426
 427        if (nodename == NULL)
 428                nodename = utsname()->nodename;
 429        /* save the nodename */
 430        rpc_clnt_set_nodename(clnt, nodename);
 431
 432        err = rpc_client_register(clnt, args->authflavor, args->client_name);
 433        if (err)
 434                goto out_no_path;
 435        if (parent)
 436                atomic_inc(&parent->cl_count);
 437        return clnt;
 438
 439out_no_path:
 440        rpc_free_iostats(clnt->cl_metrics);
 441out_no_stats:
 442        rpc_free_clid(clnt);
 443out_no_clid:
 444        kfree(clnt);
 445out_err:
 446        rpciod_down();
 447out_no_rpciod:
 448        xprt_switch_put(xps);
 449        xprt_put(xprt);
 450        return ERR_PTR(err);
 451}
 452
 453static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
 454                                        struct rpc_xprt *xprt)
 455{
 456        struct rpc_clnt *clnt = NULL;
 457        struct rpc_xprt_switch *xps;
 458
 459        if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
 460                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 461                xps = args->bc_xprt->xpt_bc_xps;
 462                xprt_switch_get(xps);
 463        } else {
 464                xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 465                if (xps == NULL) {
 466                        xprt_put(xprt);
 467                        return ERR_PTR(-ENOMEM);
 468                }
 469                if (xprt->bc_xprt) {
 470                        xprt_switch_get(xps);
 471                        xprt->bc_xprt->xpt_bc_xps = xps;
 472                }
 473        }
 474        clnt = rpc_new_client(args, xps, xprt, NULL);
 475        if (IS_ERR(clnt))
 476                return clnt;
 477
 478        if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
 479                int err = rpc_ping(clnt);
 480                if (err != 0) {
 481                        rpc_shutdown_client(clnt);
 482                        return ERR_PTR(err);
 483                }
 484        }
 485
 486        clnt->cl_softrtry = 1;
 487        if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
 488                clnt->cl_softrtry = 0;
 489
 490        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
 491                clnt->cl_autobind = 1;
 492        if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
 493                clnt->cl_noretranstimeo = 1;
 494        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
 495                clnt->cl_discrtry = 1;
 496        if (!(args->flags & RPC_CLNT_CREATE_QUIET))
 497                clnt->cl_chatty = 1;
 498
 499        return clnt;
 500}
 501
 502/**
 503 * rpc_create - create an RPC client and transport with one call
 504 * @args: rpc_clnt create argument structure
 505 *
 506 * Creates and initializes an RPC transport and an RPC client.
 507 *
 508 * It can ping the server in order to determine if it is up, and to see if
 509 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
 510 * this behavior so asynchronous tasks can also use rpc_create.
 511 */
 512struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 513{
 514        struct rpc_xprt *xprt;
 515        struct xprt_create xprtargs = {
 516                .net = args->net,
 517                .ident = args->protocol,
 518                .srcaddr = args->saddress,
 519                .dstaddr = args->address,
 520                .addrlen = args->addrsize,
 521                .servername = args->servername,
 522                .bc_xprt = args->bc_xprt,
 523        };
 524        char servername[48];
 525
 526        if (args->bc_xprt) {
 527                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 528                xprt = args->bc_xprt->xpt_bc_xprt;
 529                if (xprt) {
 530                        xprt_get(xprt);
 531                        return rpc_create_xprt(args, xprt);
 532                }
 533        }
 534
 535        if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
 536                xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
 537        if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
 538                xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
 539        /*
 540         * If the caller chooses not to specify a hostname, whip
 541         * up a string representation of the passed-in address.
 542         */
 543        if (xprtargs.servername == NULL) {
 544                struct sockaddr_un *sun =
 545                                (struct sockaddr_un *)args->address;
 546                struct sockaddr_in *sin =
 547                                (struct sockaddr_in *)args->address;
 548                struct sockaddr_in6 *sin6 =
 549                                (struct sockaddr_in6 *)args->address;
 550
 551                servername[0] = '\0';
 552                switch (args->address->sa_family) {
 553                case AF_LOCAL:
 554                        snprintf(servername, sizeof(servername), "%s",
 555                                 sun->sun_path);
 556                        break;
 557                case AF_INET:
 558                        snprintf(servername, sizeof(servername), "%pI4",
 559                                 &sin->sin_addr.s_addr);
 560                        break;
 561                case AF_INET6:
 562                        snprintf(servername, sizeof(servername), "%pI6",
 563                                 &sin6->sin6_addr);
 564                        break;
 565                default:
 566                        /* caller wants default server name, but
 567                         * address family isn't recognized. */
 568                        return ERR_PTR(-EINVAL);
 569                }
 570                xprtargs.servername = servername;
 571        }
 572
 573        xprt = xprt_create_transport(&xprtargs);
 574        if (IS_ERR(xprt))
 575                return (struct rpc_clnt *)xprt;
 576
 577        /*
 578         * By default, kernel RPC client connects from a reserved port.
 579         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
 580         * but it is always enabled for rpciod, which handles the connect
 581         * operation.
 582         */
 583        xprt->resvport = 1;
 584        if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 585                xprt->resvport = 0;
 586
 587        return rpc_create_xprt(args, xprt);
 588}
 589EXPORT_SYMBOL_GPL(rpc_create);
 590
 591/*
 592 * This function clones the RPC client structure. It allows us to share the
 593 * same transport while varying parameters such as the authentication
 594 * flavour.
 595 */
 596static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
 597                                           struct rpc_clnt *clnt)
 598{
 599        struct rpc_xprt_switch *xps;
 600        struct rpc_xprt *xprt;
 601        struct rpc_clnt *new;
 602        int err;
 603
 604        err = -ENOMEM;
 605        rcu_read_lock();
 606        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
 607        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 608        rcu_read_unlock();
 609        if (xprt == NULL || xps == NULL) {
 610                xprt_put(xprt);
 611                xprt_switch_put(xps);
 612                goto out_err;
 613        }
 614        args->servername = xprt->servername;
 615        args->nodename = clnt->cl_nodename;
 616
 617        new = rpc_new_client(args, xps, xprt, clnt);
 618        if (IS_ERR(new)) {
 619                err = PTR_ERR(new);
 620                goto out_err;
 621        }
 622
 623        /* Turn off autobind on clones */
 624        new->cl_autobind = 0;
 625        new->cl_softrtry = clnt->cl_softrtry;
 626        new->cl_noretranstimeo = clnt->cl_noretranstimeo;
 627        new->cl_discrtry = clnt->cl_discrtry;
 628        new->cl_chatty = clnt->cl_chatty;
 629        return new;
 630
 631out_err:
 632        dprintk("RPC:       %s: returned error %d\n", __func__, err);
 633        return ERR_PTR(err);
 634}
 635
 636/**
 637 * rpc_clone_client - Clone an RPC client structure
 638 *
 639 * @clnt: RPC client whose parameters are copied
 640 *
 641 * Returns a fresh RPC client or an ERR_PTR.
 642 */
 643struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
 644{
 645        struct rpc_create_args args = {
 646                .program        = clnt->cl_program,
 647                .prognumber     = clnt->cl_prog,
 648                .version        = clnt->cl_vers,
 649                .authflavor     = clnt->cl_auth->au_flavor,
 650        };
 651        return __rpc_clone_client(&args, clnt);
 652}
 653EXPORT_SYMBOL_GPL(rpc_clone_client);
 654
 655/**
 656 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
 657 *
 658 * @clnt: RPC client whose parameters are copied
 659 * @flavor: security flavor for new client
 660 *
 661 * Returns a fresh RPC client or an ERR_PTR.
 662 */
 663struct rpc_clnt *
 664rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
 665{
 666        struct rpc_create_args args = {
 667                .program        = clnt->cl_program,
 668                .prognumber     = clnt->cl_prog,
 669                .version        = clnt->cl_vers,
 670                .authflavor     = flavor,
 671        };
 672        return __rpc_clone_client(&args, clnt);
 673}
 674EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
 675
 676/**
 677 * rpc_switch_client_transport: switch the RPC transport on the fly
 678 * @clnt: pointer to a struct rpc_clnt
 679 * @args: pointer to the new transport arguments
 680 * @timeout: pointer to the new timeout parameters
 681 *
 682 * This function allows the caller to switch the RPC transport for the
 683 * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
 684 * server, for instance.  It assumes that the caller has ensured that
 685 * there are no active RPC tasks by using some form of locking.
 686 *
 687 * Returns zero if "clnt" is now using the new xprt.  Otherwise a
 688 * negative errno is returned, and "clnt" continues to use the old
 689 * xprt.
 690 */
 691int rpc_switch_client_transport(struct rpc_clnt *clnt,
 692                struct xprt_create *args,
 693                const struct rpc_timeout *timeout)
 694{
 695        const struct rpc_timeout *old_timeo;
 696        rpc_authflavor_t pseudoflavor;
 697        struct rpc_xprt_switch *xps, *oldxps;
 698        struct rpc_xprt *xprt, *old;
 699        struct rpc_clnt *parent;
 700        int err;
 701
 702        xprt = xprt_create_transport(args);
 703        if (IS_ERR(xprt)) {
 704                dprintk("RPC:       failed to create new xprt for clnt %p\n",
 705                        clnt);
 706                return PTR_ERR(xprt);
 707        }
 708
 709        xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 710        if (xps == NULL) {
 711                xprt_put(xprt);
 712                return -ENOMEM;
 713        }
 714
 715        pseudoflavor = clnt->cl_auth->au_flavor;
 716
 717        old_timeo = clnt->cl_timeout;
 718        old = rpc_clnt_set_transport(clnt, xprt, timeout);
 719        oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
 720
 721        rpc_unregister_client(clnt);
 722        __rpc_clnt_remove_pipedir(clnt);
 723        rpc_clnt_debugfs_unregister(clnt);
 724
 725        /*
 726         * A new transport was created.  "clnt" therefore
 727         * becomes the root of a new cl_parent tree.  clnt's
 728         * children, if it has any, still point to the old xprt.
 729         */
 730        parent = clnt->cl_parent;
 731        clnt->cl_parent = clnt;
 732
 733        /*
 734         * The old rpc_auth cache cannot be re-used.  GSS
 735         * contexts in particular are between a single
 736         * client and server.
 737         */
 738        err = rpc_client_register(clnt, pseudoflavor, NULL);
 739        if (err)
 740                goto out_revert;
 741
 742        synchronize_rcu();
 743        if (parent != clnt)
 744                rpc_release_client(parent);
 745        xprt_switch_put(oldxps);
 746        xprt_put(old);
 747        dprintk("RPC:       replaced xprt for clnt %p\n", clnt);
 748        return 0;
 749
 750out_revert:
 751        xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
 752        rpc_clnt_set_transport(clnt, old, old_timeo);
 753        clnt->cl_parent = parent;
 754        rpc_client_register(clnt, pseudoflavor, NULL);
 755        xprt_switch_put(xps);
 756        xprt_put(xprt);
 757        dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt);
 758        return err;
 759}
 760EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
 761
 762static
 763int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
 764{
 765        struct rpc_xprt_switch *xps;
 766
 767        rcu_read_lock();
 768        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 769        rcu_read_unlock();
 770        if (xps == NULL)
 771                return -EAGAIN;
 772        xprt_iter_init_listall(xpi, xps);
 773        xprt_switch_put(xps);
 774        return 0;
 775}
 776
 777/**
 778 * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
 779 * @clnt: pointer to client
 780 * @fn: function to apply
 781 * @data: void pointer to function data
 782 *
 783 * Iterates through the list of RPC transports currently attached to the
 784 * client and applies the function fn(clnt, xprt, data).
 785 *
 786 * On error, the iteration stops, and the function returns the error value.
 787 */
 788int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
 789                int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
 790                void *data)
 791{
 792        struct rpc_xprt_iter xpi;
 793        int ret;
 794
 795        ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
 796        if (ret)
 797                return ret;
 798        for (;;) {
 799                struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
 800
 801                if (!xprt)
 802                        break;
 803                ret = fn(clnt, xprt, data);
 804                xprt_put(xprt);
 805                if (ret < 0)
 806                        break;
 807        }
 808        xprt_iter_destroy(&xpi);
 809        return ret;
 810}
 811EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
 812
 813/*
 814 * Kill all tasks for the given client.
 815 * XXX: kill their descendants as well?
 816 */
 817void rpc_killall_tasks(struct rpc_clnt *clnt)
 818{
 819        struct rpc_task *rovr;
 820
 821
 822        if (list_empty(&clnt->cl_tasks))
 823                return;
 824        dprintk("RPC:       killing all tasks for client %p\n", clnt);
 825        /*
 826         * Spin lock all_tasks to prevent changes...
 827         */
 828        spin_lock(&clnt->cl_lock);
 829        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
 830                if (!RPC_IS_ACTIVATED(rovr))
 831                        continue;
 832                if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
 833                        rovr->tk_flags |= RPC_TASK_KILLED;
 834                        rpc_exit(rovr, -EIO);
 835                        if (RPC_IS_QUEUED(rovr))
 836                                rpc_wake_up_queued_task(rovr->tk_waitqueue,
 837                                                        rovr);
 838                }
 839        }
 840        spin_unlock(&clnt->cl_lock);
 841}
 842EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 843
 844/*
 845 * Properly shut down an RPC client, terminating all outstanding
 846 * requests.
 847 */
 848void rpc_shutdown_client(struct rpc_clnt *clnt)
 849{
 850        might_sleep();
 851
 852        dprintk_rcu("RPC:       shutting down %s client for %s\n",
 853                        clnt->cl_program->name,
 854                        rcu_dereference(clnt->cl_xprt)->servername);
 855
 856        while (!list_empty(&clnt->cl_tasks)) {
 857                rpc_killall_tasks(clnt);
 858                wait_event_timeout(destroy_wait,
 859                        list_empty(&clnt->cl_tasks), 1*HZ);
 860        }
 861
 862        rpc_release_client(clnt);
 863}
 864EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 865
 866/*
 867 * Free an RPC client
 868 */
 869static struct rpc_clnt *
 870rpc_free_client(struct rpc_clnt *clnt)
 871{
 872        struct rpc_clnt *parent = NULL;
 873
 874        dprintk_rcu("RPC:       destroying %s client for %s\n",
 875                        clnt->cl_program->name,
 876                        rcu_dereference(clnt->cl_xprt)->servername);
 877        if (clnt->cl_parent != clnt)
 878                parent = clnt->cl_parent;
 879        rpc_clnt_debugfs_unregister(clnt);
 880        rpc_clnt_remove_pipedir(clnt);
 881        rpc_unregister_client(clnt);
 882        rpc_free_iostats(clnt->cl_metrics);
 883        clnt->cl_metrics = NULL;
 884        xprt_put(rcu_dereference_raw(clnt->cl_xprt));
 885        xprt_iter_destroy(&clnt->cl_xpi);
 886        rpciod_down();
 887        rpc_free_clid(clnt);
 888        kfree(clnt);
 889        return parent;
 890}
 891
 892/*
 893 * Free an RPC client
 894 */
 895static struct rpc_clnt * 
 896rpc_free_auth(struct rpc_clnt *clnt)
 897{
 898        if (clnt->cl_auth == NULL)
 899                return rpc_free_client(clnt);
 900
 901        /*
 902         * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
 903         *       release remaining GSS contexts. This mechanism ensures
 904         *       that it can do so safely.
 905         */
 906        atomic_inc(&clnt->cl_count);
 907        rpcauth_release(clnt->cl_auth);
 908        clnt->cl_auth = NULL;
 909        if (atomic_dec_and_test(&clnt->cl_count))
 910                return rpc_free_client(clnt);
 911        return NULL;
 912}
 913
 914/*
 915 * Release reference to the RPC client
 916 */
 917void
 918rpc_release_client(struct rpc_clnt *clnt)
 919{
 920        dprintk("RPC:       rpc_release_client(%p)\n", clnt);
 921
 922        do {
 923                if (list_empty(&clnt->cl_tasks))
 924                        wake_up(&destroy_wait);
 925                if (!atomic_dec_and_test(&clnt->cl_count))
 926                        break;
 927                clnt = rpc_free_auth(clnt);
 928        } while (clnt != NULL);
 929}
 930EXPORT_SYMBOL_GPL(rpc_release_client);
 931
 932/**
 933 * rpc_bind_new_program - bind a new RPC program to an existing client
 934 * @old: old rpc_client
 935 * @program: rpc program to set
 936 * @vers: rpc program version
 937 *
 938 * Clones the rpc client and sets up a new RPC program. This is mainly
 939 * of use for enabling different RPC programs to share the same transport.
 940 * The Sun NFSv2/v3 ACL protocol can do this.
 941 */
 942struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 943                                      const struct rpc_program *program,
 944                                      u32 vers)
 945{
 946        struct rpc_create_args args = {
 947                .program        = program,
 948                .prognumber     = program->number,
 949                .version        = vers,
 950                .authflavor     = old->cl_auth->au_flavor,
 951        };
 952        struct rpc_clnt *clnt;
 953        int err;
 954
 955        clnt = __rpc_clone_client(&args, old);
 956        if (IS_ERR(clnt))
 957                goto out;
 958        err = rpc_ping(clnt);
 959        if (err != 0) {
 960                rpc_shutdown_client(clnt);
 961                clnt = ERR_PTR(err);
 962        }
 963out:
 964        return clnt;
 965}
 966EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 967
 968void rpc_task_release_client(struct rpc_task *task)
 969{
 970        struct rpc_clnt *clnt = task->tk_client;
 971        struct rpc_xprt *xprt = task->tk_xprt;
 972
 973        if (clnt != NULL) {
 974                /* Remove from client task list */
 975                spin_lock(&clnt->cl_lock);
 976                list_del(&task->tk_task);
 977                spin_unlock(&clnt->cl_lock);
 978                task->tk_client = NULL;
 979
 980                rpc_release_client(clnt);
 981        }
 982
 983        if (xprt != NULL) {
 984                task->tk_xprt = NULL;
 985
 986                xprt_put(xprt);
 987        }
 988}
 989
 990static
 991void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 992{
 993
 994        if (clnt != NULL) {
 995                if (task->tk_xprt == NULL)
 996                        task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
 997                task->tk_client = clnt;
 998                atomic_inc(&clnt->cl_count);
 999                if (clnt->cl_softrtry)
1000                        task->tk_flags |= RPC_TASK_SOFT;
1001                if (clnt->cl_noretranstimeo)
1002                        task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1003                if (atomic_read(&clnt->cl_swapper))
1004                        task->tk_flags |= RPC_TASK_SWAPPER;
1005                /* Add to the client's list of all tasks */
1006                spin_lock(&clnt->cl_lock);
1007                list_add_tail(&task->tk_task, &clnt->cl_tasks);
1008                spin_unlock(&clnt->cl_lock);
1009        }
1010}
1011
1012static void
1013rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1014{
1015        if (msg != NULL) {
1016                task->tk_msg.rpc_proc = msg->rpc_proc;
1017                task->tk_msg.rpc_argp = msg->rpc_argp;
1018                task->tk_msg.rpc_resp = msg->rpc_resp;
1019                if (msg->rpc_cred != NULL)
1020                        task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
1021        }
1022}
1023
1024/*
1025 * Default callback for async RPC calls
1026 */
1027static void
1028rpc_default_callback(struct rpc_task *task, void *data)
1029{
1030}
1031
1032static const struct rpc_call_ops rpc_default_ops = {
1033        .rpc_call_done = rpc_default_callback,
1034};
1035
1036/**
1037 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1038 * @task_setup_data: pointer to task initialisation data
1039 */
1040struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1041{
1042        struct rpc_task *task;
1043
1044        task = rpc_new_task(task_setup_data);
1045        if (IS_ERR(task))
1046                goto out;
1047
1048        rpc_task_set_client(task, task_setup_data->rpc_client);
1049        rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1050
1051        if (task->tk_action == NULL)
1052                rpc_call_start(task);
1053
1054        atomic_inc(&task->tk_count);
1055        rpc_execute(task);
1056out:
1057        return task;
1058}
1059EXPORT_SYMBOL_GPL(rpc_run_task);
1060
1061/**
1062 * rpc_call_sync - Perform a synchronous RPC call
1063 * @clnt: pointer to RPC client
1064 * @msg: RPC call parameters
1065 * @flags: RPC call flags
1066 */
1067int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1068{
1069        struct rpc_task *task;
1070        struct rpc_task_setup task_setup_data = {
1071                .rpc_client = clnt,
1072                .rpc_message = msg,
1073                .callback_ops = &rpc_default_ops,
1074                .flags = flags,
1075        };
1076        int status;
1077
1078        WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1079        if (flags & RPC_TASK_ASYNC) {
1080                rpc_release_calldata(task_setup_data.callback_ops,
1081                        task_setup_data.callback_data);
1082                return -EINVAL;
1083        }
1084
1085        task = rpc_run_task(&task_setup_data);
1086        if (IS_ERR(task))
1087                return PTR_ERR(task);
1088        status = task->tk_status;
1089        rpc_put_task(task);
1090        return status;
1091}
1092EXPORT_SYMBOL_GPL(rpc_call_sync);
1093
1094/**
1095 * rpc_call_async - Perform an asynchronous RPC call
1096 * @clnt: pointer to RPC client
1097 * @msg: RPC call parameters
1098 * @flags: RPC call flags
1099 * @tk_ops: RPC call ops
1100 * @data: user call data
1101 */
1102int
1103rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1104               const struct rpc_call_ops *tk_ops, void *data)
1105{
1106        struct rpc_task *task;
1107        struct rpc_task_setup task_setup_data = {
1108                .rpc_client = clnt,
1109                .rpc_message = msg,
1110                .callback_ops = tk_ops,
1111                .callback_data = data,
1112                .flags = flags|RPC_TASK_ASYNC,
1113        };
1114
1115        task = rpc_run_task(&task_setup_data);
1116        if (IS_ERR(task))
1117                return PTR_ERR(task);
1118        rpc_put_task(task);
1119        return 0;
1120}
1121EXPORT_SYMBOL_GPL(rpc_call_async);
1122
1123#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1124/**
1125 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1126 * rpc_execute against it
1127 * @req: RPC request
1128 */
1129struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1130{
1131        struct rpc_task *task;
1132        struct xdr_buf *xbufp = &req->rq_snd_buf;
1133        struct rpc_task_setup task_setup_data = {
1134                .callback_ops = &rpc_default_ops,
1135                .flags = RPC_TASK_SOFTCONN,
1136        };
1137
1138        dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1139        /*
1140         * Create an rpc_task to send the data
1141         */
1142        task = rpc_new_task(&task_setup_data);
1143        if (IS_ERR(task)) {
1144                xprt_free_bc_request(req);
1145                goto out;
1146        }
1147        task->tk_rqstp = req;
1148
1149        /*
1150         * Set up the xdr_buf length.
1151         * This also indicates that the buffer is XDR encoded already.
1152         */
1153        xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1154                        xbufp->tail[0].iov_len;
1155
1156        task->tk_action = call_bc_transmit;
1157        atomic_inc(&task->tk_count);
1158        WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1159        rpc_execute(task);
1160
1161out:
1162        dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1163        return task;
1164}
1165#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1166
1167void
1168rpc_call_start(struct rpc_task *task)
1169{
1170        task->tk_action = call_start;
1171}
1172EXPORT_SYMBOL_GPL(rpc_call_start);
1173
1174/**
1175 * rpc_peeraddr - extract remote peer address from clnt's xprt
1176 * @clnt: RPC client structure
1177 * @buf: target buffer
1178 * @bufsize: length of target buffer
1179 *
1180 * Returns the number of bytes that are actually in the stored address.
1181 */
1182size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1183{
1184        size_t bytes;
1185        struct rpc_xprt *xprt;
1186
1187        rcu_read_lock();
1188        xprt = rcu_dereference(clnt->cl_xprt);
1189
1190        bytes = xprt->addrlen;
1191        if (bytes > bufsize)
1192                bytes = bufsize;
1193        memcpy(buf, &xprt->addr, bytes);
1194        rcu_read_unlock();
1195
1196        return bytes;
1197}
1198EXPORT_SYMBOL_GPL(rpc_peeraddr);
1199
1200/**
1201 * rpc_peeraddr2str - return remote peer address in printable format
1202 * @clnt: RPC client structure
1203 * @format: address format
1204 *
1205 * NB: the lifetime of the memory referenced by the returned pointer is
1206 * the same as the rpc_xprt itself.  As long as the caller uses this
1207 * pointer, it must hold the RCU read lock.
1208 */
1209const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1210                             enum rpc_display_format_t format)
1211{
1212        struct rpc_xprt *xprt;
1213
1214        xprt = rcu_dereference(clnt->cl_xprt);
1215
1216        if (xprt->address_strings[format] != NULL)
1217                return xprt->address_strings[format];
1218        else
1219                return "unprintable";
1220}
1221EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1222
1223static const struct sockaddr_in rpc_inaddr_loopback = {
1224        .sin_family             = AF_INET,
1225        .sin_addr.s_addr        = htonl(INADDR_ANY),
1226};
1227
1228static const struct sockaddr_in6 rpc_in6addr_loopback = {
1229        .sin6_family            = AF_INET6,
1230        .sin6_addr              = IN6ADDR_ANY_INIT,
1231};
1232
1233/*
1234 * Try a getsockname() on a connected datagram socket.  Using a
1235 * connected datagram socket prevents leaving a socket in TIME_WAIT.
1236 * This conserves the ephemeral port number space.
1237 *
1238 * Returns zero and fills in "buf" if successful; otherwise, a
1239 * negative errno is returned.
1240 */
1241static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1242                        struct sockaddr *buf, int buflen)
1243{
1244        struct socket *sock;
1245        int err;
1246
1247        err = __sock_create(net, sap->sa_family,
1248                                SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1249        if (err < 0) {
1250                dprintk("RPC:       can't create UDP socket (%d)\n", err);
1251                goto out;
1252        }
1253
1254        switch (sap->sa_family) {
1255        case AF_INET:
1256                err = kernel_bind(sock,
1257                                (struct sockaddr *)&rpc_inaddr_loopback,
1258                                sizeof(rpc_inaddr_loopback));
1259                break;
1260        case AF_INET6:
1261                err = kernel_bind(sock,
1262                                (struct sockaddr *)&rpc_in6addr_loopback,
1263                                sizeof(rpc_in6addr_loopback));
1264                break;
1265        default:
1266                err = -EAFNOSUPPORT;
1267                goto out;
1268        }
1269        if (err < 0) {
1270                dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1271                goto out_release;
1272        }
1273
1274        err = kernel_connect(sock, sap, salen, 0);
1275        if (err < 0) {
1276                dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1277                goto out_release;
1278        }
1279
1280        err = kernel_getsockname(sock, buf, &buflen);
1281        if (err < 0) {
1282                dprintk("RPC:       getsockname failed (%d)\n", err);
1283                goto out_release;
1284        }
1285
1286        err = 0;
1287        if (buf->sa_family == AF_INET6) {
1288                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1289                sin6->sin6_scope_id = 0;
1290        }
1291        dprintk("RPC:       %s succeeded\n", __func__);
1292
1293out_release:
1294        sock_release(sock);
1295out:
1296        return err;
1297}
1298
1299/*
1300 * Scraping a connected socket failed, so we don't have a useable
1301 * local address.  Fallback: generate an address that will prevent
1302 * the server from calling us back.
1303 *
1304 * Returns zero and fills in "buf" if successful; otherwise, a
1305 * negative errno is returned.
1306 */
1307static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1308{
1309        switch (family) {
1310        case AF_INET:
1311                if (buflen < sizeof(rpc_inaddr_loopback))
1312                        return -EINVAL;
1313                memcpy(buf, &rpc_inaddr_loopback,
1314                                sizeof(rpc_inaddr_loopback));
1315                break;
1316        case AF_INET6:
1317                if (buflen < sizeof(rpc_in6addr_loopback))
1318                        return -EINVAL;
1319                memcpy(buf, &rpc_in6addr_loopback,
1320                                sizeof(rpc_in6addr_loopback));
1321                break;
1322        default:
1323                dprintk("RPC:       %s: address family not supported\n",
1324                        __func__);
1325                return -EAFNOSUPPORT;
1326        }
1327        dprintk("RPC:       %s: succeeded\n", __func__);
1328        return 0;
1329}
1330
1331/**
1332 * rpc_localaddr - discover local endpoint address for an RPC client
1333 * @clnt: RPC client structure
1334 * @buf: target buffer
1335 * @buflen: size of target buffer, in bytes
1336 *
1337 * Returns zero and fills in "buf" and "buflen" if successful;
1338 * otherwise, a negative errno is returned.
1339 *
1340 * This works even if the underlying transport is not currently connected,
1341 * or if the upper layer never previously provided a source address.
1342 *
1343 * The result of this function call is transient: multiple calls in
1344 * succession may give different results, depending on how local
1345 * networking configuration changes over time.
1346 */
1347int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1348{
1349        struct sockaddr_storage address;
1350        struct sockaddr *sap = (struct sockaddr *)&address;
1351        struct rpc_xprt *xprt;
1352        struct net *net;
1353        size_t salen;
1354        int err;
1355
1356        rcu_read_lock();
1357        xprt = rcu_dereference(clnt->cl_xprt);
1358        salen = xprt->addrlen;
1359        memcpy(sap, &xprt->addr, salen);
1360        net = get_net(xprt->xprt_net);
1361        rcu_read_unlock();
1362
1363        rpc_set_port(sap, 0);
1364        err = rpc_sockname(net, sap, salen, buf, buflen);
1365        put_net(net);
1366        if (err != 0)
1367                /* Couldn't discover local address, return ANYADDR */
1368                return rpc_anyaddr(sap->sa_family, buf, buflen);
1369        return 0;
1370}
1371EXPORT_SYMBOL_GPL(rpc_localaddr);
1372
1373void
1374rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1375{
1376        struct rpc_xprt *xprt;
1377
1378        rcu_read_lock();
1379        xprt = rcu_dereference(clnt->cl_xprt);
1380        if (xprt->ops->set_buffer_size)
1381                xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1382        rcu_read_unlock();
1383}
1384EXPORT_SYMBOL_GPL(rpc_setbufsize);
1385
1386/**
1387 * rpc_protocol - Get transport protocol number for an RPC client
1388 * @clnt: RPC client to query
1389 *
1390 */
1391int rpc_protocol(struct rpc_clnt *clnt)
1392{
1393        int protocol;
1394
1395        rcu_read_lock();
1396        protocol = rcu_dereference(clnt->cl_xprt)->prot;
1397        rcu_read_unlock();
1398        return protocol;
1399}
1400EXPORT_SYMBOL_GPL(rpc_protocol);
1401
1402/**
1403 * rpc_net_ns - Get the network namespace for this RPC client
1404 * @clnt: RPC client to query
1405 *
1406 */
1407struct net *rpc_net_ns(struct rpc_clnt *clnt)
1408{
1409        struct net *ret;
1410
1411        rcu_read_lock();
1412        ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1413        rcu_read_unlock();
1414        return ret;
1415}
1416EXPORT_SYMBOL_GPL(rpc_net_ns);
1417
1418/**
1419 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1420 * @clnt: RPC client to query
1421 *
1422 * For stream transports, this is one RPC record fragment (see RFC
1423 * 1831), as we don't support multi-record requests yet.  For datagram
1424 * transports, this is the size of an IP packet minus the IP, UDP, and
1425 * RPC header sizes.
1426 */
1427size_t rpc_max_payload(struct rpc_clnt *clnt)
1428{
1429        size_t ret;
1430
1431        rcu_read_lock();
1432        ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1433        rcu_read_unlock();
1434        return ret;
1435}
1436EXPORT_SYMBOL_GPL(rpc_max_payload);
1437
1438/**
1439 * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1440 * @clnt: RPC client to query
1441 */
1442size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1443{
1444        struct rpc_xprt *xprt;
1445        size_t ret;
1446
1447        rcu_read_lock();
1448        xprt = rcu_dereference(clnt->cl_xprt);
1449        ret = xprt->ops->bc_maxpayload(xprt);
1450        rcu_read_unlock();
1451        return ret;
1452}
1453EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1454
1455/**
1456 * rpc_force_rebind - force transport to check that remote port is unchanged
1457 * @clnt: client to rebind
1458 *
1459 */
1460void rpc_force_rebind(struct rpc_clnt *clnt)
1461{
1462        if (clnt->cl_autobind) {
1463                rcu_read_lock();
1464                xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1465                rcu_read_unlock();
1466        }
1467}
1468EXPORT_SYMBOL_GPL(rpc_force_rebind);
1469
1470/*
1471 * Restart an (async) RPC call from the call_prepare state.
1472 * Usually called from within the exit handler.
1473 */
1474int
1475rpc_restart_call_prepare(struct rpc_task *task)
1476{
1477        if (RPC_ASSASSINATED(task))
1478                return 0;
1479        task->tk_action = call_start;
1480        task->tk_status = 0;
1481        if (task->tk_ops->rpc_call_prepare != NULL)
1482                task->tk_action = rpc_prepare_task;
1483        return 1;
1484}
1485EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1486
1487/*
1488 * Restart an (async) RPC call. Usually called from within the
1489 * exit handler.
1490 */
1491int
1492rpc_restart_call(struct rpc_task *task)
1493{
1494        if (RPC_ASSASSINATED(task))
1495                return 0;
1496        task->tk_action = call_start;
1497        task->tk_status = 0;
1498        return 1;
1499}
1500EXPORT_SYMBOL_GPL(rpc_restart_call);
1501
1502#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1503const char
1504*rpc_proc_name(const struct rpc_task *task)
1505{
1506        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1507
1508        if (proc) {
1509                if (proc->p_name)
1510                        return proc->p_name;
1511                else
1512                        return "NULL";
1513        } else
1514                return "no proc";
1515}
1516#endif
1517
1518/*
1519 * 0.  Initial state
1520 *
1521 *     Other FSM states can be visited zero or more times, but
1522 *     this state is visited exactly once for each RPC.
1523 */
1524static void
1525call_start(struct rpc_task *task)
1526{
1527        struct rpc_clnt *clnt = task->tk_client;
1528
1529        dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1530                        clnt->cl_program->name, clnt->cl_vers,
1531                        rpc_proc_name(task),
1532                        (RPC_IS_ASYNC(task) ? "async" : "sync"));
1533
1534        /* Increment call count */
1535        task->tk_msg.rpc_proc->p_count++;
1536        clnt->cl_stats->rpccnt++;
1537        task->tk_action = call_reserve;
1538}
1539
1540/*
1541 * 1.   Reserve an RPC call slot
1542 */
1543static void
1544call_reserve(struct rpc_task *task)
1545{
1546        dprint_status(task);
1547
1548        task->tk_status  = 0;
1549        task->tk_action  = call_reserveresult;
1550        xprt_reserve(task);
1551}
1552
1553static void call_retry_reserve(struct rpc_task *task);
1554
1555/*
1556 * 1b.  Grok the result of xprt_reserve()
1557 */
1558static void
1559call_reserveresult(struct rpc_task *task)
1560{
1561        int status = task->tk_status;
1562
1563        dprint_status(task);
1564
1565        /*
1566         * After a call to xprt_reserve(), we must have either
1567         * a request slot or else an error status.
1568         */
1569        task->tk_status = 0;
1570        if (status >= 0) {
1571                if (task->tk_rqstp) {
1572                        task->tk_action = call_refresh;
1573                        return;
1574                }
1575
1576                printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1577                                __func__, status);
1578                rpc_exit(task, -EIO);
1579                return;
1580        }
1581
1582        /*
1583         * Even though there was an error, we may have acquired
1584         * a request slot somehow.  Make sure not to leak it.
1585         */
1586        if (task->tk_rqstp) {
1587                printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1588                                __func__, status);
1589                xprt_release(task);
1590        }
1591
1592        switch (status) {
1593        case -ENOMEM:
1594                rpc_delay(task, HZ >> 2);
1595        case -EAGAIN:   /* woken up; retry */
1596                task->tk_action = call_retry_reserve;
1597                return;
1598        case -EIO:      /* probably a shutdown */
1599                break;
1600        default:
1601                printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1602                                __func__, status);
1603                break;
1604        }
1605        rpc_exit(task, status);
1606}
1607
1608/*
1609 * 1c.  Retry reserving an RPC call slot
1610 */
1611static void
1612call_retry_reserve(struct rpc_task *task)
1613{
1614        dprint_status(task);
1615
1616        task->tk_status  = 0;
1617        task->tk_action  = call_reserveresult;
1618        xprt_retry_reserve(task);
1619}
1620
1621/*
1622 * 2.   Bind and/or refresh the credentials
1623 */
1624static void
1625call_refresh(struct rpc_task *task)
1626{
1627        dprint_status(task);
1628
1629        task->tk_action = call_refreshresult;
1630        task->tk_status = 0;
1631        task->tk_client->cl_stats->rpcauthrefresh++;
1632        rpcauth_refreshcred(task);
1633}
1634
1635/*
1636 * 2a.  Process the results of a credential refresh
1637 */
1638static void
1639call_refreshresult(struct rpc_task *task)
1640{
1641        int status = task->tk_status;
1642
1643        dprint_status(task);
1644
1645        task->tk_status = 0;
1646        task->tk_action = call_refresh;
1647        switch (status) {
1648        case 0:
1649                if (rpcauth_uptodatecred(task)) {
1650                        task->tk_action = call_allocate;
1651                        return;
1652                }
1653                /* Use rate-limiting and a max number of retries if refresh
1654                 * had status 0 but failed to update the cred.
1655                 */
1656        case -ETIMEDOUT:
1657                rpc_delay(task, 3*HZ);
1658        case -EAGAIN:
1659                status = -EACCES;
1660        case -EKEYEXPIRED:
1661                if (!task->tk_cred_retry)
1662                        break;
1663                task->tk_cred_retry--;
1664                dprintk("RPC: %5u %s: retry refresh creds\n",
1665                                task->tk_pid, __func__);
1666                return;
1667        }
1668        dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1669                                task->tk_pid, __func__, status);
1670        rpc_exit(task, status);
1671}
1672
1673/*
1674 * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1675 *      (Note: buffer memory is freed in xprt_release).
1676 */
1677static void
1678call_allocate(struct rpc_task *task)
1679{
1680        unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1681        struct rpc_rqst *req = task->tk_rqstp;
1682        struct rpc_xprt *xprt = req->rq_xprt;
1683        struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1684        int status;
1685
1686        dprint_status(task);
1687
1688        task->tk_status = 0;
1689        task->tk_action = call_bind;
1690
1691        if (req->rq_buffer)
1692                return;
1693
1694        if (proc->p_proc != 0) {
1695                BUG_ON(proc->p_arglen == 0);
1696                if (proc->p_decode != NULL)
1697                        BUG_ON(proc->p_replen == 0);
1698        }
1699
1700        /*
1701         * Calculate the size (in quads) of the RPC call
1702         * and reply headers, and convert both values
1703         * to byte sizes.
1704         */
1705        req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1706        req->rq_callsize <<= 2;
1707        req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1708        req->rq_rcvsize <<= 2;
1709
1710        status = xprt->ops->buf_alloc(task);
1711        xprt_inject_disconnect(xprt);
1712        if (status == 0)
1713                return;
1714        if (status != -ENOMEM) {
1715                rpc_exit(task, status);
1716                return;
1717        }
1718
1719        dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1720
1721        if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1722                task->tk_action = call_allocate;
1723                rpc_delay(task, HZ>>4);
1724                return;
1725        }
1726
1727        rpc_exit(task, -ERESTARTSYS);
1728}
1729
1730static inline int
1731rpc_task_need_encode(struct rpc_task *task)
1732{
1733        return task->tk_rqstp->rq_snd_buf.len == 0;
1734}
1735
1736static inline void
1737rpc_task_force_reencode(struct rpc_task *task)
1738{
1739        task->tk_rqstp->rq_snd_buf.len = 0;
1740        task->tk_rqstp->rq_bytes_sent = 0;
1741}
1742
1743/*
1744 * 3.   Encode arguments of an RPC call
1745 */
1746static void
1747rpc_xdr_encode(struct rpc_task *task)
1748{
1749        struct rpc_rqst *req = task->tk_rqstp;
1750        kxdreproc_t     encode;
1751        __be32          *p;
1752
1753        dprint_status(task);
1754
1755        xdr_buf_init(&req->rq_snd_buf,
1756                     req->rq_buffer,
1757                     req->rq_callsize);
1758        xdr_buf_init(&req->rq_rcv_buf,
1759                     req->rq_rbuffer,
1760                     req->rq_rcvsize);
1761
1762        p = rpc_encode_header(task);
1763        if (p == NULL) {
1764                printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1765                rpc_exit(task, -EIO);
1766                return;
1767        }
1768
1769        encode = task->tk_msg.rpc_proc->p_encode;
1770        if (encode == NULL)
1771                return;
1772
1773        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1774                        task->tk_msg.rpc_argp);
1775}
1776
1777/*
1778 * 4.   Get the server port number if not yet set
1779 */
1780static void
1781call_bind(struct rpc_task *task)
1782{
1783        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1784
1785        dprint_status(task);
1786
1787        task->tk_action = call_connect;
1788        if (!xprt_bound(xprt)) {
1789                task->tk_action = call_bind_status;
1790                task->tk_timeout = xprt->bind_timeout;
1791                xprt->ops->rpcbind(task);
1792        }
1793}
1794
1795/*
1796 * 4a.  Sort out bind result
1797 */
1798static void
1799call_bind_status(struct rpc_task *task)
1800{
1801        int status = -EIO;
1802
1803        if (task->tk_status >= 0) {
1804                dprint_status(task);
1805                task->tk_status = 0;
1806                task->tk_action = call_connect;
1807                return;
1808        }
1809
1810        trace_rpc_bind_status(task);
1811        switch (task->tk_status) {
1812        case -ENOMEM:
1813                dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1814                rpc_delay(task, HZ >> 2);
1815                goto retry_timeout;
1816        case -EACCES:
1817                dprintk("RPC: %5u remote rpcbind: RPC program/version "
1818                                "unavailable\n", task->tk_pid);
1819                /* fail immediately if this is an RPC ping */
1820                if (task->tk_msg.rpc_proc->p_proc == 0) {
1821                        status = -EOPNOTSUPP;
1822                        break;
1823                }
1824                if (task->tk_rebind_retry == 0)
1825                        break;
1826                task->tk_rebind_retry--;
1827                rpc_delay(task, 3*HZ);
1828                goto retry_timeout;
1829        case -ETIMEDOUT:
1830                dprintk("RPC: %5u rpcbind request timed out\n",
1831                                task->tk_pid);
1832                goto retry_timeout;
1833        case -EPFNOSUPPORT:
1834                /* server doesn't support any rpcbind version we know of */
1835                dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1836                                task->tk_pid);
1837                break;
1838        case -EPROTONOSUPPORT:
1839                dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1840                                task->tk_pid);
1841                goto retry_timeout;
1842        case -ECONNREFUSED:             /* connection problems */
1843        case -ECONNRESET:
1844        case -ECONNABORTED:
1845        case -ENOTCONN:
1846        case -EHOSTDOWN:
1847        case -EHOSTUNREACH:
1848        case -ENETUNREACH:
1849        case -ENOBUFS:
1850        case -EPIPE:
1851                dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1852                                task->tk_pid, task->tk_status);
1853                if (!RPC_IS_SOFTCONN(task)) {
1854                        rpc_delay(task, 5*HZ);
1855                        goto retry_timeout;
1856                }
1857                status = task->tk_status;
1858                break;
1859        default:
1860                dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1861                                task->tk_pid, -task->tk_status);
1862        }
1863
1864        rpc_exit(task, status);
1865        return;
1866
1867retry_timeout:
1868        task->tk_status = 0;
1869        task->tk_action = call_timeout;
1870}
1871
1872/*
1873 * 4b.  Connect to the RPC server
1874 */
1875static void
1876call_connect(struct rpc_task *task)
1877{
1878        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1879
1880        dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1881                        task->tk_pid, xprt,
1882                        (xprt_connected(xprt) ? "is" : "is not"));
1883
1884        task->tk_action = call_transmit;
1885        if (!xprt_connected(xprt)) {
1886                task->tk_action = call_connect_status;
1887                if (task->tk_status < 0)
1888                        return;
1889                if (task->tk_flags & RPC_TASK_NOCONNECT) {
1890                        rpc_exit(task, -ENOTCONN);
1891                        return;
1892                }
1893                xprt_connect(task);
1894        }
1895}
1896
1897/*
1898 * 4c.  Sort out connect result
1899 */
1900static void
1901call_connect_status(struct rpc_task *task)
1902{
1903        struct rpc_clnt *clnt = task->tk_client;
1904        int status = task->tk_status;
1905
1906        dprint_status(task);
1907
1908        trace_rpc_connect_status(task, status);
1909        task->tk_status = 0;
1910        switch (status) {
1911        case -ECONNREFUSED:
1912        case -ECONNRESET:
1913        case -ECONNABORTED:
1914        case -ENETUNREACH:
1915        case -EHOSTUNREACH:
1916        case -EADDRINUSE:
1917        case -ENOBUFS:
1918        case -EPIPE:
1919                xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
1920                                            task->tk_rqstp->rq_connect_cookie);
1921                if (RPC_IS_SOFTCONN(task))
1922                        break;
1923                /* retry with existing socket, after a delay */
1924                rpc_delay(task, 3*HZ);
1925        case -EAGAIN:
1926                /* Check for timeouts before looping back to call_bind */
1927        case -ETIMEDOUT:
1928                task->tk_action = call_timeout;
1929                return;
1930        case 0:
1931                clnt->cl_stats->netreconn++;
1932                task->tk_action = call_transmit;
1933                return;
1934        }
1935        rpc_exit(task, status);
1936}
1937
1938/*
1939 * 5.   Transmit the RPC request, and wait for reply
1940 */
1941static void
1942call_transmit(struct rpc_task *task)
1943{
1944        int is_retrans = RPC_WAS_SENT(task);
1945
1946        dprint_status(task);
1947
1948        task->tk_action = call_status;
1949        if (task->tk_status < 0)
1950                return;
1951        if (!xprt_prepare_transmit(task))
1952                return;
1953        task->tk_action = call_transmit_status;
1954        /* Encode here so that rpcsec_gss can use correct sequence number. */
1955        if (rpc_task_need_encode(task)) {
1956                rpc_xdr_encode(task);
1957                /* Did the encode result in an error condition? */
1958                if (task->tk_status != 0) {
1959                        /* Was the error nonfatal? */
1960                        if (task->tk_status == -EAGAIN)
1961                                rpc_delay(task, HZ >> 4);
1962                        else
1963                                rpc_exit(task, task->tk_status);
1964                        return;
1965                }
1966        }
1967        xprt_transmit(task);
1968        if (task->tk_status < 0)
1969                return;
1970        if (is_retrans)
1971                task->tk_client->cl_stats->rpcretrans++;
1972        /*
1973         * On success, ensure that we call xprt_end_transmit() before sleeping
1974         * in order to allow access to the socket to other RPC requests.
1975         */
1976        call_transmit_status(task);
1977        if (rpc_reply_expected(task))
1978                return;
1979        task->tk_action = rpc_exit_task;
1980        rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1981}
1982
1983/*
1984 * 5a.  Handle cleanup after a transmission
1985 */
1986static void
1987call_transmit_status(struct rpc_task *task)
1988{
1989        task->tk_action = call_status;
1990
1991        /*
1992         * Common case: success.  Force the compiler to put this
1993         * test first.
1994         */
1995        if (task->tk_status == 0) {
1996                xprt_end_transmit(task);
1997                rpc_task_force_reencode(task);
1998                return;
1999        }
2000
2001        switch (task->tk_status) {
2002        case -EAGAIN:
2003        case -ENOBUFS:
2004                break;
2005        default:
2006                dprint_status(task);
2007                xprt_end_transmit(task);
2008                rpc_task_force_reencode(task);
2009                break;
2010                /*
2011                 * Special cases: if we've been waiting on the
2012                 * socket's write_space() callback, or if the
2013                 * socket just returned a connection error,
2014                 * then hold onto the transport lock.
2015                 */
2016        case -ECONNREFUSED:
2017        case -EHOSTDOWN:
2018        case -EHOSTUNREACH:
2019        case -ENETUNREACH:
2020        case -EPERM:
2021                if (RPC_IS_SOFTCONN(task)) {
2022                        xprt_end_transmit(task);
2023                        rpc_exit(task, task->tk_status);
2024                        break;
2025                }
2026        case -ECONNRESET:
2027        case -ECONNABORTED:
2028        case -EADDRINUSE:
2029        case -ENOTCONN:
2030        case -EPIPE:
2031                rpc_task_force_reencode(task);
2032        }
2033}
2034
2035#if defined(CONFIG_SUNRPC_BACKCHANNEL)
2036/*
2037 * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2038 * addition, disconnect on connectivity errors.
2039 */
2040static void
2041call_bc_transmit(struct rpc_task *task)
2042{
2043        struct rpc_rqst *req = task->tk_rqstp;
2044
2045        if (!xprt_prepare_transmit(task))
2046                goto out_retry;
2047
2048        if (task->tk_status < 0) {
2049                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2050                        "error: %d\n", task->tk_status);
2051                goto out_done;
2052        }
2053        if (req->rq_connect_cookie != req->rq_xprt->connect_cookie)
2054                req->rq_bytes_sent = 0;
2055
2056        xprt_transmit(task);
2057
2058        if (task->tk_status == -EAGAIN)
2059                goto out_nospace;
2060
2061        xprt_end_transmit(task);
2062        dprint_status(task);
2063        switch (task->tk_status) {
2064        case 0:
2065                /* Success */
2066        case -EHOSTDOWN:
2067        case -EHOSTUNREACH:
2068        case -ENETUNREACH:
2069        case -ECONNRESET:
2070        case -ECONNREFUSED:
2071        case -EADDRINUSE:
2072        case -ENOTCONN:
2073        case -EPIPE:
2074                break;
2075        case -ETIMEDOUT:
2076                /*
2077                 * Problem reaching the server.  Disconnect and let the
2078                 * forechannel reestablish the connection.  The server will
2079                 * have to retransmit the backchannel request and we'll
2080                 * reprocess it.  Since these ops are idempotent, there's no
2081                 * need to cache our reply at this time.
2082                 */
2083                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2084                        "error: %d\n", task->tk_status);
2085                xprt_conditional_disconnect(req->rq_xprt,
2086                        req->rq_connect_cookie);
2087                break;
2088        default:
2089                /*
2090                 * We were unable to reply and will have to drop the
2091                 * request.  The server should reconnect and retransmit.
2092                 */
2093                WARN_ON_ONCE(task->tk_status == -EAGAIN);
2094                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2095                        "error: %d\n", task->tk_status);
2096                break;
2097        }
2098        rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
2099out_done:
2100        task->tk_action = rpc_exit_task;
2101        return;
2102out_nospace:
2103        req->rq_connect_cookie = req->rq_xprt->connect_cookie;
2104out_retry:
2105        task->tk_status = 0;
2106}
2107#endif /* CONFIG_SUNRPC_BACKCHANNEL */
2108
2109/*
2110 * 6.   Sort out the RPC call status
2111 */
2112static void
2113call_status(struct rpc_task *task)
2114{
2115        struct rpc_clnt *clnt = task->tk_client;
2116        struct rpc_rqst *req = task->tk_rqstp;
2117        int             status;
2118
2119        if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
2120                task->tk_status = req->rq_reply_bytes_recvd;
2121
2122        dprint_status(task);
2123
2124        status = task->tk_status;
2125        if (status >= 0) {
2126                task->tk_action = call_decode;
2127                return;
2128        }
2129
2130        trace_rpc_call_status(task);
2131        task->tk_status = 0;
2132        switch(status) {
2133        case -EHOSTDOWN:
2134        case -EHOSTUNREACH:
2135        case -ENETUNREACH:
2136        case -EPERM:
2137                if (RPC_IS_SOFTCONN(task)) {
2138                        rpc_exit(task, status);
2139                        break;
2140                }
2141                /*
2142                 * Delay any retries for 3 seconds, then handle as if it
2143                 * were a timeout.
2144                 */
2145                rpc_delay(task, 3*HZ);
2146        case -ETIMEDOUT:
2147                task->tk_action = call_timeout;
2148                if (!(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
2149                    && task->tk_client->cl_discrtry)
2150                        xprt_conditional_disconnect(req->rq_xprt,
2151                                        req->rq_connect_cookie);
2152                break;
2153        case -ECONNREFUSED:
2154        case -ECONNRESET:
2155        case -ECONNABORTED:
2156                rpc_force_rebind(clnt);
2157        case -EADDRINUSE:
2158                rpc_delay(task, 3*HZ);
2159        case -EPIPE:
2160        case -ENOTCONN:
2161                task->tk_action = call_bind;
2162                break;
2163        case -ENOBUFS:
2164                rpc_delay(task, HZ>>2);
2165        case -EAGAIN:
2166                task->tk_action = call_transmit;
2167                break;
2168        case -EIO:
2169                /* shutdown or soft timeout */
2170                rpc_exit(task, status);
2171                break;
2172        default:
2173                if (clnt->cl_chatty)
2174                        printk("%s: RPC call returned error %d\n",
2175                               clnt->cl_program->name, -status);
2176                rpc_exit(task, status);
2177        }
2178}
2179
2180/*
2181 * 6a.  Handle RPC timeout
2182 *      We do not release the request slot, so we keep using the
2183 *      same XID for all retransmits.
2184 */
2185static void
2186call_timeout(struct rpc_task *task)
2187{
2188        struct rpc_clnt *clnt = task->tk_client;
2189
2190        if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
2191                dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
2192                goto retry;
2193        }
2194
2195        dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
2196        task->tk_timeouts++;
2197
2198        if (RPC_IS_SOFTCONN(task)) {
2199                rpc_exit(task, -ETIMEDOUT);
2200                return;
2201        }
2202        if (RPC_IS_SOFT(task)) {
2203                if (clnt->cl_chatty) {
2204                        printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
2205                                clnt->cl_program->name,
2206                                task->tk_xprt->servername);
2207                }
2208                if (task->tk_flags & RPC_TASK_TIMEOUT)
2209                        rpc_exit(task, -ETIMEDOUT);
2210                else
2211                        rpc_exit(task, -EIO);
2212                return;
2213        }
2214
2215        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2216                task->tk_flags |= RPC_CALL_MAJORSEEN;
2217                if (clnt->cl_chatty) {
2218                        printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
2219                        clnt->cl_program->name,
2220                        task->tk_xprt->servername);
2221                }
2222        }
2223        rpc_force_rebind(clnt);
2224        /*
2225         * Did our request time out due to an RPCSEC_GSS out-of-sequence
2226         * event? RFC2203 requires the server to drop all such requests.
2227         */
2228        rpcauth_invalcred(task);
2229
2230retry:
2231        task->tk_action = call_bind;
2232        task->tk_status = 0;
2233}
2234
2235/*
2236 * 7.   Decode the RPC reply
2237 */
2238static void
2239call_decode(struct rpc_task *task)
2240{
2241        struct rpc_clnt *clnt = task->tk_client;
2242        struct rpc_rqst *req = task->tk_rqstp;
2243        kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
2244        __be32          *p;
2245
2246        dprint_status(task);
2247
2248        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2249                if (clnt->cl_chatty) {
2250                        printk(KERN_NOTICE "%s: server %s OK\n",
2251                                clnt->cl_program->name,
2252                                task->tk_xprt->servername);
2253                }
2254                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2255        }
2256
2257        /*
2258         * Ensure that we see all writes made by xprt_complete_rqst()
2259         * before it changed req->rq_reply_bytes_recvd.
2260         */
2261        smp_rmb();
2262        req->rq_rcv_buf.len = req->rq_private_buf.len;
2263
2264        /* Check that the softirq receive buffer is valid */
2265        WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2266                                sizeof(req->rq_rcv_buf)) != 0);
2267
2268        if (req->rq_rcv_buf.len < 12) {
2269                if (!RPC_IS_SOFT(task)) {
2270                        task->tk_action = call_bind;
2271                        goto out_retry;
2272                }
2273                dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2274                                clnt->cl_program->name, task->tk_status);
2275                task->tk_action = call_timeout;
2276                goto out_retry;
2277        }
2278
2279        p = rpc_verify_header(task);
2280        if (IS_ERR(p)) {
2281                if (p == ERR_PTR(-EAGAIN))
2282                        goto out_retry;
2283                return;
2284        }
2285
2286        task->tk_action = rpc_exit_task;
2287
2288        if (decode) {
2289                task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2290                                                      task->tk_msg.rpc_resp);
2291        }
2292        dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2293                        task->tk_status);
2294        return;
2295out_retry:
2296        task->tk_status = 0;
2297        /* Note: rpc_verify_header() may have freed the RPC slot */
2298        if (task->tk_rqstp == req) {
2299                req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2300                if (task->tk_client->cl_discrtry)
2301                        xprt_conditional_disconnect(req->rq_xprt,
2302                                        req->rq_connect_cookie);
2303        }
2304}
2305
2306static __be32 *
2307rpc_encode_header(struct rpc_task *task)
2308{
2309        struct rpc_clnt *clnt = task->tk_client;
2310        struct rpc_rqst *req = task->tk_rqstp;
2311        __be32          *p = req->rq_svec[0].iov_base;
2312
2313        /* FIXME: check buffer size? */
2314
2315        p = xprt_skip_transport_header(req->rq_xprt, p);
2316        *p++ = req->rq_xid;             /* XID */
2317        *p++ = htonl(RPC_CALL);         /* CALL */
2318        *p++ = htonl(RPC_VERSION);      /* RPC version */
2319        *p++ = htonl(clnt->cl_prog);    /* program number */
2320        *p++ = htonl(clnt->cl_vers);    /* program version */
2321        *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2322        p = rpcauth_marshcred(task, p);
2323        req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2324        return p;
2325}
2326
2327static __be32 *
2328rpc_verify_header(struct rpc_task *task)
2329{
2330        struct rpc_clnt *clnt = task->tk_client;
2331        struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2332        int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2333        __be32  *p = iov->iov_base;
2334        u32 n;
2335        int error = -EACCES;
2336
2337        if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2338                /* RFC-1014 says that the representation of XDR data must be a
2339                 * multiple of four bytes
2340                 * - if it isn't pointer subtraction in the NFS client may give
2341                 *   undefined results
2342                 */
2343                dprintk("RPC: %5u %s: XDR representation not a multiple of"
2344                       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2345                       task->tk_rqstp->rq_rcv_buf.len);
2346                error = -EIO;
2347                goto out_err;
2348        }
2349        if ((len -= 3) < 0)
2350                goto out_overflow;
2351
2352        p += 1; /* skip XID */
2353        if ((n = ntohl(*p++)) != RPC_REPLY) {
2354                dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2355                        task->tk_pid, __func__, n);
2356                error = -EIO;
2357                goto out_garbage;
2358        }
2359
2360        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2361                if (--len < 0)
2362                        goto out_overflow;
2363                switch ((n = ntohl(*p++))) {
2364                case RPC_AUTH_ERROR:
2365                        break;
2366                case RPC_MISMATCH:
2367                        dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2368                                task->tk_pid, __func__);
2369                        error = -EPROTONOSUPPORT;
2370                        goto out_err;
2371                default:
2372                        dprintk("RPC: %5u %s: RPC call rejected, "
2373                                "unknown error: %x\n",
2374                                task->tk_pid, __func__, n);
2375                        error = -EIO;
2376                        goto out_err;
2377                }
2378                if (--len < 0)
2379                        goto out_overflow;
2380                switch ((n = ntohl(*p++))) {
2381                case RPC_AUTH_REJECTEDCRED:
2382                case RPC_AUTH_REJECTEDVERF:
2383                case RPCSEC_GSS_CREDPROBLEM:
2384                case RPCSEC_GSS_CTXPROBLEM:
2385                        if (!task->tk_cred_retry)
2386                                break;
2387                        task->tk_cred_retry--;
2388                        dprintk("RPC: %5u %s: retry stale creds\n",
2389                                        task->tk_pid, __func__);
2390                        rpcauth_invalcred(task);
2391                        /* Ensure we obtain a new XID! */
2392                        xprt_release(task);
2393                        task->tk_action = call_reserve;
2394                        goto out_retry;
2395                case RPC_AUTH_BADCRED:
2396                case RPC_AUTH_BADVERF:
2397                        /* possibly garbled cred/verf? */
2398                        if (!task->tk_garb_retry)
2399                                break;
2400                        task->tk_garb_retry--;
2401                        dprintk("RPC: %5u %s: retry garbled creds\n",
2402                                        task->tk_pid, __func__);
2403                        task->tk_action = call_bind;
2404                        goto out_retry;
2405                case RPC_AUTH_TOOWEAK:
2406                        printk(KERN_NOTICE "RPC: server %s requires stronger "
2407                               "authentication.\n",
2408                               task->tk_xprt->servername);
2409                        break;
2410                default:
2411                        dprintk("RPC: %5u %s: unknown auth error: %x\n",
2412                                        task->tk_pid, __func__, n);
2413                        error = -EIO;
2414                }
2415                dprintk("RPC: %5u %s: call rejected %d\n",
2416                                task->tk_pid, __func__, n);
2417                goto out_err;
2418        }
2419        p = rpcauth_checkverf(task, p);
2420        if (IS_ERR(p)) {
2421                error = PTR_ERR(p);
2422                dprintk("RPC: %5u %s: auth check failed with %d\n",
2423                                task->tk_pid, __func__, error);
2424                goto out_garbage;               /* bad verifier, retry */
2425        }
2426        len = p - (__be32 *)iov->iov_base - 1;
2427        if (len < 0)
2428                goto out_overflow;
2429        switch ((n = ntohl(*p++))) {
2430        case RPC_SUCCESS:
2431                return p;
2432        case RPC_PROG_UNAVAIL:
2433                dprintk("RPC: %5u %s: program %u is unsupported "
2434                                "by server %s\n", task->tk_pid, __func__,
2435                                (unsigned int)clnt->cl_prog,
2436                                task->tk_xprt->servername);
2437                error = -EPFNOSUPPORT;
2438                goto out_err;
2439        case RPC_PROG_MISMATCH:
2440                dprintk("RPC: %5u %s: program %u, version %u unsupported "
2441                                "by server %s\n", task->tk_pid, __func__,
2442                                (unsigned int)clnt->cl_prog,
2443                                (unsigned int)clnt->cl_vers,
2444                                task->tk_xprt->servername);
2445                error = -EPROTONOSUPPORT;
2446                goto out_err;
2447        case RPC_PROC_UNAVAIL:
2448                dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
2449                                "version %u on server %s\n",
2450                                task->tk_pid, __func__,
2451                                rpc_proc_name(task),
2452                                clnt->cl_prog, clnt->cl_vers,
2453                                task->tk_xprt->servername);
2454                error = -EOPNOTSUPP;
2455                goto out_err;
2456        case RPC_GARBAGE_ARGS:
2457                dprintk("RPC: %5u %s: server saw garbage\n",
2458                                task->tk_pid, __func__);
2459                break;                  /* retry */
2460        default:
2461                dprintk("RPC: %5u %s: server accept status: %x\n",
2462                                task->tk_pid, __func__, n);
2463                /* Also retry */
2464        }
2465
2466out_garbage:
2467        clnt->cl_stats->rpcgarbage++;
2468        if (task->tk_garb_retry) {
2469                task->tk_garb_retry--;
2470                dprintk("RPC: %5u %s: retrying\n",
2471                                task->tk_pid, __func__);
2472                task->tk_action = call_bind;
2473out_retry:
2474                return ERR_PTR(-EAGAIN);
2475        }
2476out_err:
2477        rpc_exit(task, error);
2478        dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2479                        __func__, error);
2480        return ERR_PTR(error);
2481out_overflow:
2482        dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2483                        __func__);
2484        goto out_garbage;
2485}
2486
2487static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2488{
2489}
2490
2491static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2492{
2493        return 0;
2494}
2495
2496static struct rpc_procinfo rpcproc_null = {
2497        .p_encode = rpcproc_encode_null,
2498        .p_decode = rpcproc_decode_null,
2499};
2500
2501static int rpc_ping(struct rpc_clnt *clnt)
2502{
2503        struct rpc_message msg = {
2504                .rpc_proc = &rpcproc_null,
2505        };
2506        int err;
2507        msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2508        err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2509        put_rpccred(msg.rpc_cred);
2510        return err;
2511}
2512
2513static
2514struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2515                struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2516                const struct rpc_call_ops *ops, void *data)
2517{
2518        struct rpc_message msg = {
2519                .rpc_proc = &rpcproc_null,
2520                .rpc_cred = cred,
2521        };
2522        struct rpc_task_setup task_setup_data = {
2523                .rpc_client = clnt,
2524                .rpc_xprt = xprt,
2525                .rpc_message = &msg,
2526                .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
2527                .callback_data = data,
2528                .flags = flags,
2529        };
2530
2531        return rpc_run_task(&task_setup_data);
2532}
2533
2534struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2535{
2536        return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2537}
2538EXPORT_SYMBOL_GPL(rpc_call_null);
2539
2540struct rpc_cb_add_xprt_calldata {
2541        struct rpc_xprt_switch *xps;
2542        struct rpc_xprt *xprt;
2543};
2544
2545static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2546{
2547        struct rpc_cb_add_xprt_calldata *data = calldata;
2548
2549        if (task->tk_status == 0)
2550                rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2551}
2552
2553static void rpc_cb_add_xprt_release(void *calldata)
2554{
2555        struct rpc_cb_add_xprt_calldata *data = calldata;
2556
2557        xprt_put(data->xprt);
2558        xprt_switch_put(data->xps);
2559        kfree(data);
2560}
2561
2562static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2563        .rpc_call_done = rpc_cb_add_xprt_done,
2564        .rpc_release = rpc_cb_add_xprt_release,
2565};
2566
2567/**
2568 * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2569 * @clnt: pointer to struct rpc_clnt
2570 * @xps: pointer to struct rpc_xprt_switch,
2571 * @xprt: pointer struct rpc_xprt
2572 * @dummy: unused
2573 */
2574int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2575                struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2576                void *dummy)
2577{
2578        struct rpc_cb_add_xprt_calldata *data;
2579        struct rpc_cred *cred;
2580        struct rpc_task *task;
2581
2582        data = kmalloc(sizeof(*data), GFP_NOFS);
2583        if (!data)
2584                return -ENOMEM;
2585        data->xps = xprt_switch_get(xps);
2586        data->xprt = xprt_get(xprt);
2587
2588        cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2589        task = rpc_call_null_helper(clnt, xprt, cred,
2590                        RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC,
2591                        &rpc_cb_add_xprt_call_ops, data);
2592        put_rpccred(cred);
2593        if (IS_ERR(task))
2594                return PTR_ERR(task);
2595        rpc_put_task(task);
2596        return 1;
2597}
2598EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2599
2600/**
2601 * rpc_clnt_setup_test_and_add_xprt()
2602 *
2603 * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2604 *   1) caller of the test function must dereference the rpc_xprt_switch
2605 *   and the rpc_xprt.
2606 *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2607 *   the rpc_call_done routine.
2608 *
2609 * Upon success (return of 1), the test function adds the new
2610 * transport to the rpc_clnt xprt switch
2611 *
2612 * @clnt: struct rpc_clnt to get the new transport
2613 * @xps:  the rpc_xprt_switch to hold the new transport
2614 * @xprt: the rpc_xprt to test
2615 * @data: a struct rpc_add_xprt_test pointer that holds the test function
2616 *        and test function call data
2617 */
2618int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2619                                     struct rpc_xprt_switch *xps,
2620                                     struct rpc_xprt *xprt,
2621                                     void *data)
2622{
2623        struct rpc_cred *cred;
2624        struct rpc_task *task;
2625        struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2626        int status = -EADDRINUSE;
2627
2628        xprt = xprt_get(xprt);
2629        xprt_switch_get(xps);
2630
2631        if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2632                goto out_err;
2633
2634        /* Test the connection */
2635        cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2636        task = rpc_call_null_helper(clnt, xprt, cred,
2637                                    RPC_TASK_SOFT | RPC_TASK_SOFTCONN,
2638                                    NULL, NULL);
2639        put_rpccred(cred);
2640        if (IS_ERR(task)) {
2641                status = PTR_ERR(task);
2642                goto out_err;
2643        }
2644        status = task->tk_status;
2645        rpc_put_task(task);
2646
2647        if (status < 0)
2648                goto out_err;
2649
2650        /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2651        xtest->add_xprt_test(clnt, xprt, xtest->data);
2652
2653        /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2654        return 1;
2655out_err:
2656        xprt_put(xprt);
2657        xprt_switch_put(xps);
2658        pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2659                status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2660        return status;
2661}
2662EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2663
2664/**
2665 * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2666 * @clnt: pointer to struct rpc_clnt
2667 * @xprtargs: pointer to struct xprt_create
2668 * @setup: callback to test and/or set up the connection
2669 * @data: pointer to setup function data
2670 *
2671 * Creates a new transport using the parameters set in args and
2672 * adds it to clnt.
2673 * If ping is set, then test that connectivity succeeds before
2674 * adding the new transport.
2675 *
2676 */
2677int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2678                struct xprt_create *xprtargs,
2679                int (*setup)(struct rpc_clnt *,
2680                        struct rpc_xprt_switch *,
2681                        struct rpc_xprt *,
2682                        void *),
2683                void *data)
2684{
2685        struct rpc_xprt_switch *xps;
2686        struct rpc_xprt *xprt;
2687        unsigned long connect_timeout;
2688        unsigned long reconnect_timeout;
2689        unsigned char resvport;
2690        int ret = 0;
2691
2692        rcu_read_lock();
2693        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2694        xprt = xprt_iter_xprt(&clnt->cl_xpi);
2695        if (xps == NULL || xprt == NULL) {
2696                rcu_read_unlock();
2697                return -EAGAIN;
2698        }
2699        resvport = xprt->resvport;
2700        connect_timeout = xprt->connect_timeout;
2701        reconnect_timeout = xprt->max_reconnect_timeout;
2702        rcu_read_unlock();
2703
2704        xprt = xprt_create_transport(xprtargs);
2705        if (IS_ERR(xprt)) {
2706                ret = PTR_ERR(xprt);
2707                goto out_put_switch;
2708        }
2709        xprt->resvport = resvport;
2710        if (xprt->ops->set_connect_timeout != NULL)
2711                xprt->ops->set_connect_timeout(xprt,
2712                                connect_timeout,
2713                                reconnect_timeout);
2714
2715        rpc_xprt_switch_set_roundrobin(xps);
2716        if (setup) {
2717                ret = setup(clnt, xps, xprt, data);
2718                if (ret != 0)
2719                        goto out_put_xprt;
2720        }
2721        rpc_xprt_switch_add_xprt(xps, xprt);
2722out_put_xprt:
2723        xprt_put(xprt);
2724out_put_switch:
2725        xprt_switch_put(xps);
2726        return ret;
2727}
2728EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2729
2730struct connect_timeout_data {
2731        unsigned long connect_timeout;
2732        unsigned long reconnect_timeout;
2733};
2734
2735static int
2736rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2737                struct rpc_xprt *xprt,
2738                void *data)
2739{
2740        struct connect_timeout_data *timeo = data;
2741
2742        if (xprt->ops->set_connect_timeout)
2743                xprt->ops->set_connect_timeout(xprt,
2744                                timeo->connect_timeout,
2745                                timeo->reconnect_timeout);
2746        return 0;
2747}
2748
2749void
2750rpc_set_connect_timeout(struct rpc_clnt *clnt,
2751                unsigned long connect_timeout,
2752                unsigned long reconnect_timeout)
2753{
2754        struct connect_timeout_data timeout = {
2755                .connect_timeout = connect_timeout,
2756                .reconnect_timeout = reconnect_timeout,
2757        };
2758        rpc_clnt_iterate_for_each_xprt(clnt,
2759                        rpc_xprt_set_connect_timeout,
2760                        &timeout);
2761}
2762EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2763
2764void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2765{
2766        rcu_read_lock();
2767        xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2768        rcu_read_unlock();
2769}
2770EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
2771
2772void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
2773{
2774        rcu_read_lock();
2775        rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
2776                                 xprt);
2777        rcu_read_unlock();
2778}
2779EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
2780
2781bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
2782                                   const struct sockaddr *sap)
2783{
2784        struct rpc_xprt_switch *xps;
2785        bool ret;
2786
2787        rcu_read_lock();
2788        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
2789        ret = rpc_xprt_switch_has_addr(xps, sap);
2790        rcu_read_unlock();
2791        return ret;
2792}
2793EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
2794
2795#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2796static void rpc_show_header(void)
2797{
2798        printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2799                "-timeout ---ops--\n");
2800}
2801
2802static void rpc_show_task(const struct rpc_clnt *clnt,
2803                          const struct rpc_task *task)
2804{
2805        const char *rpc_waitq = "none";
2806
2807        if (RPC_IS_QUEUED(task))
2808                rpc_waitq = rpc_qname(task->tk_waitqueue);
2809
2810        printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2811                task->tk_pid, task->tk_flags, task->tk_status,
2812                clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2813                clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2814                task->tk_action, rpc_waitq);
2815}
2816
2817void rpc_show_tasks(struct net *net)
2818{
2819        struct rpc_clnt *clnt;
2820        struct rpc_task *task;
2821        int header = 0;
2822        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2823
2824        spin_lock(&sn->rpc_client_lock);
2825        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2826                spin_lock(&clnt->cl_lock);
2827                list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2828                        if (!header) {
2829                                rpc_show_header();
2830                                header++;
2831                        }
2832                        rpc_show_task(clnt, task);
2833                }
2834                spin_unlock(&clnt->cl_lock);
2835        }
2836        spin_unlock(&sn->rpc_client_lock);
2837}
2838#endif
2839
2840#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
2841static int
2842rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
2843                struct rpc_xprt *xprt,
2844                void *dummy)
2845{
2846        return xprt_enable_swap(xprt);
2847}
2848
2849int
2850rpc_clnt_swap_activate(struct rpc_clnt *clnt)
2851{
2852        if (atomic_inc_return(&clnt->cl_swapper) == 1)
2853                return rpc_clnt_iterate_for_each_xprt(clnt,
2854                                rpc_clnt_swap_activate_callback, NULL);
2855        return 0;
2856}
2857EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
2858
2859static int
2860rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
2861                struct rpc_xprt *xprt,
2862                void *dummy)
2863{
2864        xprt_disable_swap(xprt);
2865        return 0;
2866}
2867
2868void
2869rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
2870{
2871        if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
2872                rpc_clnt_iterate_for_each_xprt(clnt,
2873                                rpc_clnt_swap_deactivate_callback, NULL);
2874}
2875EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
2876#endif /* CONFIG_SUNRPC_SWAP */
2877