linux/net/sunrpc/clnt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/clnt.c
   3 *
   4 *  This file contains the high-level RPC interface.
   5 *  It is modeled as a finite state machine to support both synchronous
   6 *  and asynchronous requests.
   7 *
   8 *  -   RPC header generation and argument serialization.
   9 *  -   Credential refresh.
  10 *  -   TCP connect handling.
  11 *  -   Retry of operation when it is suspected the operation failed because
  12 *      of uid squashing on the server, or when the credentials were stale
  13 *      and need to be refreshed, or when a packet was damaged in transit.
  14 *      This may be have to be moved to the VFS layer.
  15 *
  16 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
  17 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
  18 */
  19
  20
  21#include <linux/module.h>
  22#include <linux/types.h>
  23#include <linux/kallsyms.h>
  24#include <linux/mm.h>
  25#include <linux/namei.h>
  26#include <linux/mount.h>
  27#include <linux/slab.h>
  28#include <linux/rcupdate.h>
  29#include <linux/utsname.h>
  30#include <linux/workqueue.h>
  31#include <linux/in.h>
  32#include <linux/in6.h>
  33#include <linux/un.h>
  34
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/addr.h>
  37#include <linux/sunrpc/rpc_pipe_fs.h>
  38#include <linux/sunrpc/metrics.h>
  39#include <linux/sunrpc/bc_xprt.h>
  40#include <trace/events/sunrpc.h>
  41
  42#include "sunrpc.h"
  43#include "netns.h"
  44
  45#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  46# define RPCDBG_FACILITY        RPCDBG_CALL
  47#endif
  48
  49#define dprint_status(t)                                        \
  50        dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
  51                        __func__, t->tk_status)
  52
  53/*
  54 * All RPC clients are linked into this list
  55 */
  56
  57static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
  58
  59
  60static void     call_start(struct rpc_task *task);
  61static void     call_reserve(struct rpc_task *task);
  62static void     call_reserveresult(struct rpc_task *task);
  63static void     call_allocate(struct rpc_task *task);
  64static void     call_decode(struct rpc_task *task);
  65static void     call_bind(struct rpc_task *task);
  66static void     call_bind_status(struct rpc_task *task);
  67static void     call_transmit(struct rpc_task *task);
  68#if defined(CONFIG_SUNRPC_BACKCHANNEL)
  69static void     call_bc_transmit(struct rpc_task *task);
  70#endif /* CONFIG_SUNRPC_BACKCHANNEL */
  71static void     call_status(struct rpc_task *task);
  72static void     call_transmit_status(struct rpc_task *task);
  73static void     call_refresh(struct rpc_task *task);
  74static void     call_refreshresult(struct rpc_task *task);
  75static void     call_timeout(struct rpc_task *task);
  76static void     call_connect(struct rpc_task *task);
  77static void     call_connect_status(struct rpc_task *task);
  78
  79static __be32   *rpc_encode_header(struct rpc_task *task);
  80static __be32   *rpc_verify_header(struct rpc_task *task);
  81static int      rpc_ping(struct rpc_clnt *clnt);
  82
  83static void rpc_register_client(struct rpc_clnt *clnt)
  84{
  85        struct net *net = rpc_net_ns(clnt);
  86        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  87
  88        spin_lock(&sn->rpc_client_lock);
  89        list_add(&clnt->cl_clients, &sn->all_clients);
  90        spin_unlock(&sn->rpc_client_lock);
  91}
  92
  93static void rpc_unregister_client(struct rpc_clnt *clnt)
  94{
  95        struct net *net = rpc_net_ns(clnt);
  96        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  97
  98        spin_lock(&sn->rpc_client_lock);
  99        list_del(&clnt->cl_clients);
 100        spin_unlock(&sn->rpc_client_lock);
 101}
 102
 103static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 104{
 105        rpc_remove_client_dir(clnt);
 106}
 107
 108static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 109{
 110        struct net *net = rpc_net_ns(clnt);
 111        struct super_block *pipefs_sb;
 112
 113        pipefs_sb = rpc_get_sb_net(net);
 114        if (pipefs_sb) {
 115                __rpc_clnt_remove_pipedir(clnt);
 116                rpc_put_sb_net(net);
 117        }
 118}
 119
 120static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
 121                                    struct rpc_clnt *clnt)
 122{
 123        static uint32_t clntid;
 124        const char *dir_name = clnt->cl_program->pipe_dir_name;
 125        char name[15];
 126        struct dentry *dir, *dentry;
 127
 128        dir = rpc_d_lookup_sb(sb, dir_name);
 129        if (dir == NULL) {
 130                pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
 131                return dir;
 132        }
 133        for (;;) {
 134                snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
 135                name[sizeof(name) - 1] = '\0';
 136                dentry = rpc_create_client_dir(dir, name, clnt);
 137                if (!IS_ERR(dentry))
 138                        break;
 139                if (dentry == ERR_PTR(-EEXIST))
 140                        continue;
 141                printk(KERN_INFO "RPC: Couldn't create pipefs entry"
 142                                " %s/%s, error %ld\n",
 143                                dir_name, name, PTR_ERR(dentry));
 144                break;
 145        }
 146        dput(dir);
 147        return dentry;
 148}
 149
 150static int
 151rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
 152{
 153        struct dentry *dentry;
 154
 155        if (clnt->cl_program->pipe_dir_name != NULL) {
 156                dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
 157                if (IS_ERR(dentry))
 158                        return PTR_ERR(dentry);
 159        }
 160        return 0;
 161}
 162
 163static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
 164{
 165        if (clnt->cl_program->pipe_dir_name == NULL)
 166                return 1;
 167
 168        switch (event) {
 169        case RPC_PIPEFS_MOUNT:
 170                if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
 171                        return 1;
 172                if (atomic_read(&clnt->cl_count) == 0)
 173                        return 1;
 174                break;
 175        case RPC_PIPEFS_UMOUNT:
 176                if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
 177                        return 1;
 178                break;
 179        }
 180        return 0;
 181}
 182
 183static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
 184                                   struct super_block *sb)
 185{
 186        struct dentry *dentry;
 187
 188        switch (event) {
 189        case RPC_PIPEFS_MOUNT:
 190                dentry = rpc_setup_pipedir_sb(sb, clnt);
 191                if (!dentry)
 192                        return -ENOENT;
 193                if (IS_ERR(dentry))
 194                        return PTR_ERR(dentry);
 195                break;
 196        case RPC_PIPEFS_UMOUNT:
 197                __rpc_clnt_remove_pipedir(clnt);
 198                break;
 199        default:
 200                printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
 201                return -ENOTSUPP;
 202        }
 203        return 0;
 204}
 205
 206static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
 207                                struct super_block *sb)
 208{
 209        int error = 0;
 210
 211        for (;; clnt = clnt->cl_parent) {
 212                if (!rpc_clnt_skip_event(clnt, event))
 213                        error = __rpc_clnt_handle_event(clnt, event, sb);
 214                if (error || clnt == clnt->cl_parent)
 215                        break;
 216        }
 217        return error;
 218}
 219
 220static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
 221{
 222        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
 223        struct rpc_clnt *clnt;
 224
 225        spin_lock(&sn->rpc_client_lock);
 226        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
 227                if (rpc_clnt_skip_event(clnt, event))
 228                        continue;
 229                spin_unlock(&sn->rpc_client_lock);
 230                return clnt;
 231        }
 232        spin_unlock(&sn->rpc_client_lock);
 233        return NULL;
 234}
 235
 236static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
 237                            void *ptr)
 238{
 239        struct super_block *sb = ptr;
 240        struct rpc_clnt *clnt;
 241        int error = 0;
 242
 243        while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
 244                error = __rpc_pipefs_event(clnt, event, sb);
 245                if (error)
 246                        break;
 247        }
 248        return error;
 249}
 250
 251static struct notifier_block rpc_clients_block = {
 252        .notifier_call  = rpc_pipefs_event,
 253        .priority       = SUNRPC_PIPEFS_RPC_PRIO,
 254};
 255
 256int rpc_clients_notifier_register(void)
 257{
 258        return rpc_pipefs_notifier_register(&rpc_clients_block);
 259}
 260
 261void rpc_clients_notifier_unregister(void)
 262{
 263        return rpc_pipefs_notifier_unregister(&rpc_clients_block);
 264}
 265
 266static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
 267                struct rpc_xprt *xprt,
 268                const struct rpc_timeout *timeout)
 269{
 270        struct rpc_xprt *old;
 271
 272        spin_lock(&clnt->cl_lock);
 273        old = rcu_dereference_protected(clnt->cl_xprt,
 274                        lockdep_is_held(&clnt->cl_lock));
 275
 276        if (!xprt_bound(xprt))
 277                clnt->cl_autobind = 1;
 278
 279        clnt->cl_timeout = timeout;
 280        rcu_assign_pointer(clnt->cl_xprt, xprt);
 281        spin_unlock(&clnt->cl_lock);
 282
 283        return old;
 284}
 285
 286static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
 287{
 288        clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
 289                        nodename, sizeof(clnt->cl_nodename));
 290}
 291
 292static int rpc_client_register(struct rpc_clnt *clnt,
 293                               rpc_authflavor_t pseudoflavor,
 294                               const char *client_name)
 295{
 296        struct rpc_auth_create_args auth_args = {
 297                .pseudoflavor = pseudoflavor,
 298                .target_name = client_name,
 299        };
 300        struct rpc_auth *auth;
 301        struct net *net = rpc_net_ns(clnt);
 302        struct super_block *pipefs_sb;
 303        int err;
 304
 305        rpc_clnt_debugfs_register(clnt);
 306
 307        pipefs_sb = rpc_get_sb_net(net);
 308        if (pipefs_sb) {
 309                err = rpc_setup_pipedir(pipefs_sb, clnt);
 310                if (err)
 311                        goto out;
 312        }
 313
 314        rpc_register_client(clnt);
 315        if (pipefs_sb)
 316                rpc_put_sb_net(net);
 317
 318        auth = rpcauth_create(&auth_args, clnt);
 319        if (IS_ERR(auth)) {
 320                dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
 321                                pseudoflavor);
 322                err = PTR_ERR(auth);
 323                goto err_auth;
 324        }
 325        return 0;
 326err_auth:
 327        pipefs_sb = rpc_get_sb_net(net);
 328        rpc_unregister_client(clnt);
 329        __rpc_clnt_remove_pipedir(clnt);
 330out:
 331        if (pipefs_sb)
 332                rpc_put_sb_net(net);
 333        rpc_clnt_debugfs_unregister(clnt);
 334        return err;
 335}
 336
 337static DEFINE_IDA(rpc_clids);
 338
 339void rpc_cleanup_clids(void)
 340{
 341        ida_destroy(&rpc_clids);
 342}
 343
 344static int rpc_alloc_clid(struct rpc_clnt *clnt)
 345{
 346        int clid;
 347
 348        clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
 349        if (clid < 0)
 350                return clid;
 351        clnt->cl_clid = clid;
 352        return 0;
 353}
 354
 355static void rpc_free_clid(struct rpc_clnt *clnt)
 356{
 357        ida_simple_remove(&rpc_clids, clnt->cl_clid);
 358}
 359
 360static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
 361                struct rpc_xprt_switch *xps,
 362                struct rpc_xprt *xprt,
 363                struct rpc_clnt *parent)
 364{
 365        const struct rpc_program *program = args->program;
 366        const struct rpc_version *version;
 367        struct rpc_clnt *clnt = NULL;
 368        const struct rpc_timeout *timeout;
 369        const char *nodename = args->nodename;
 370        int err;
 371
 372        /* sanity check the name before trying to print it */
 373        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
 374                        program->name, args->servername, xprt);
 375
 376        err = rpciod_up();
 377        if (err)
 378                goto out_no_rpciod;
 379
 380        err = -EINVAL;
 381        if (args->version >= program->nrvers)
 382                goto out_err;
 383        version = program->version[args->version];
 384        if (version == NULL)
 385                goto out_err;
 386
 387        err = -ENOMEM;
 388        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
 389        if (!clnt)
 390                goto out_err;
 391        clnt->cl_parent = parent ? : clnt;
 392
 393        err = rpc_alloc_clid(clnt);
 394        if (err)
 395                goto out_no_clid;
 396
 397        clnt->cl_procinfo = version->procs;
 398        clnt->cl_maxproc  = version->nrprocs;
 399        clnt->cl_prog     = args->prognumber ? : program->number;
 400        clnt->cl_vers     = version->number;
 401        clnt->cl_stats    = program->stats;
 402        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
 403        rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
 404        err = -ENOMEM;
 405        if (clnt->cl_metrics == NULL)
 406                goto out_no_stats;
 407        clnt->cl_program  = program;
 408        INIT_LIST_HEAD(&clnt->cl_tasks);
 409        spin_lock_init(&clnt->cl_lock);
 410
 411        timeout = xprt->timeout;
 412        if (args->timeout != NULL) {
 413                memcpy(&clnt->cl_timeout_default, args->timeout,
 414                                sizeof(clnt->cl_timeout_default));
 415                timeout = &clnt->cl_timeout_default;
 416        }
 417
 418        rpc_clnt_set_transport(clnt, xprt, timeout);
 419        xprt_iter_init(&clnt->cl_xpi, xps);
 420        xprt_switch_put(xps);
 421
 422        clnt->cl_rtt = &clnt->cl_rtt_default;
 423        rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
 424
 425        atomic_set(&clnt->cl_count, 1);
 426
 427        if (nodename == NULL)
 428                nodename = utsname()->nodename;
 429        /* save the nodename */
 430        rpc_clnt_set_nodename(clnt, nodename);
 431
 432        err = rpc_client_register(clnt, args->authflavor, args->client_name);
 433        if (err)
 434                goto out_no_path;
 435        if (parent)
 436                atomic_inc(&parent->cl_count);
 437        return clnt;
 438
 439out_no_path:
 440        rpc_free_iostats(clnt->cl_metrics);
 441out_no_stats:
 442        rpc_free_clid(clnt);
 443out_no_clid:
 444        kfree(clnt);
 445out_err:
 446        rpciod_down();
 447out_no_rpciod:
 448        xprt_switch_put(xps);
 449        xprt_put(xprt);
 450        return ERR_PTR(err);
 451}
 452
 453static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
 454                                        struct rpc_xprt *xprt)
 455{
 456        struct rpc_clnt *clnt = NULL;
 457        struct rpc_xprt_switch *xps;
 458
 459        if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
 460                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 461                xps = args->bc_xprt->xpt_bc_xps;
 462                xprt_switch_get(xps);
 463        } else {
 464                xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 465                if (xps == NULL) {
 466                        xprt_put(xprt);
 467                        return ERR_PTR(-ENOMEM);
 468                }
 469                if (xprt->bc_xprt) {
 470                        xprt_switch_get(xps);
 471                        xprt->bc_xprt->xpt_bc_xps = xps;
 472                }
 473        }
 474        clnt = rpc_new_client(args, xps, xprt, NULL);
 475        if (IS_ERR(clnt))
 476                return clnt;
 477
 478        if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
 479                int err = rpc_ping(clnt);
 480                if (err != 0) {
 481                        rpc_shutdown_client(clnt);
 482                        return ERR_PTR(err);
 483                }
 484        }
 485
 486        clnt->cl_softrtry = 1;
 487        if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
 488                clnt->cl_softrtry = 0;
 489
 490        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
 491                clnt->cl_autobind = 1;
 492        if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
 493                clnt->cl_noretranstimeo = 1;
 494        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
 495                clnt->cl_discrtry = 1;
 496        if (!(args->flags & RPC_CLNT_CREATE_QUIET))
 497                clnt->cl_chatty = 1;
 498
 499        return clnt;
 500}
 501
 502/**
 503 * rpc_create - create an RPC client and transport with one call
 504 * @args: rpc_clnt create argument structure
 505 *
 506 * Creates and initializes an RPC transport and an RPC client.
 507 *
 508 * It can ping the server in order to determine if it is up, and to see if
 509 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
 510 * this behavior so asynchronous tasks can also use rpc_create.
 511 */
 512struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 513{
 514        struct rpc_xprt *xprt;
 515        struct xprt_create xprtargs = {
 516                .net = args->net,
 517                .ident = args->protocol,
 518                .srcaddr = args->saddress,
 519                .dstaddr = args->address,
 520                .addrlen = args->addrsize,
 521                .servername = args->servername,
 522                .bc_xprt = args->bc_xprt,
 523        };
 524        char servername[48];
 525
 526        if (args->bc_xprt) {
 527                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 528                xprt = args->bc_xprt->xpt_bc_xprt;
 529                if (xprt) {
 530                        xprt_get(xprt);
 531                        return rpc_create_xprt(args, xprt);
 532                }
 533        }
 534
 535        if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
 536                xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
 537        if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
 538                xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
 539        /*
 540         * If the caller chooses not to specify a hostname, whip
 541         * up a string representation of the passed-in address.
 542         */
 543        if (xprtargs.servername == NULL) {
 544                struct sockaddr_un *sun =
 545                                (struct sockaddr_un *)args->address;
 546                struct sockaddr_in *sin =
 547                                (struct sockaddr_in *)args->address;
 548                struct sockaddr_in6 *sin6 =
 549                                (struct sockaddr_in6 *)args->address;
 550
 551                servername[0] = '\0';
 552                switch (args->address->sa_family) {
 553                case AF_LOCAL:
 554                        snprintf(servername, sizeof(servername), "%s",
 555                                 sun->sun_path);
 556                        break;
 557                case AF_INET:
 558                        snprintf(servername, sizeof(servername), "%pI4",
 559                                 &sin->sin_addr.s_addr);
 560                        break;
 561                case AF_INET6:
 562                        snprintf(servername, sizeof(servername), "%pI6",
 563                                 &sin6->sin6_addr);
 564                        break;
 565                default:
 566                        /* caller wants default server name, but
 567                         * address family isn't recognized. */
 568                        return ERR_PTR(-EINVAL);
 569                }
 570                xprtargs.servername = servername;
 571        }
 572
 573        xprt = xprt_create_transport(&xprtargs);
 574        if (IS_ERR(xprt))
 575                return (struct rpc_clnt *)xprt;
 576
 577        /*
 578         * By default, kernel RPC client connects from a reserved port.
 579         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
 580         * but it is always enabled for rpciod, which handles the connect
 581         * operation.
 582         */
 583        xprt->resvport = 1;
 584        if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 585                xprt->resvport = 0;
 586
 587        return rpc_create_xprt(args, xprt);
 588}
 589EXPORT_SYMBOL_GPL(rpc_create);
 590
 591/*
 592 * This function clones the RPC client structure. It allows us to share the
 593 * same transport while varying parameters such as the authentication
 594 * flavour.
 595 */
 596static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
 597                                           struct rpc_clnt *clnt)
 598{
 599        struct rpc_xprt_switch *xps;
 600        struct rpc_xprt *xprt;
 601        struct rpc_clnt *new;
 602        int err;
 603
 604        err = -ENOMEM;
 605        rcu_read_lock();
 606        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
 607        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 608        rcu_read_unlock();
 609        if (xprt == NULL || xps == NULL) {
 610                xprt_put(xprt);
 611                xprt_switch_put(xps);
 612                goto out_err;
 613        }
 614        args->servername = xprt->servername;
 615        args->nodename = clnt->cl_nodename;
 616
 617        new = rpc_new_client(args, xps, xprt, clnt);
 618        if (IS_ERR(new)) {
 619                err = PTR_ERR(new);
 620                goto out_err;
 621        }
 622
 623        /* Turn off autobind on clones */
 624        new->cl_autobind = 0;
 625        new->cl_softrtry = clnt->cl_softrtry;
 626        new->cl_noretranstimeo = clnt->cl_noretranstimeo;
 627        new->cl_discrtry = clnt->cl_discrtry;
 628        new->cl_chatty = clnt->cl_chatty;
 629        return new;
 630
 631out_err:
 632        dprintk("RPC:       %s: returned error %d\n", __func__, err);
 633        return ERR_PTR(err);
 634}
 635
 636/**
 637 * rpc_clone_client - Clone an RPC client structure
 638 *
 639 * @clnt: RPC client whose parameters are copied
 640 *
 641 * Returns a fresh RPC client or an ERR_PTR.
 642 */
 643struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
 644{
 645        struct rpc_create_args args = {
 646                .program        = clnt->cl_program,
 647                .prognumber     = clnt->cl_prog,
 648                .version        = clnt->cl_vers,
 649                .authflavor     = clnt->cl_auth->au_flavor,
 650        };
 651        return __rpc_clone_client(&args, clnt);
 652}
 653EXPORT_SYMBOL_GPL(rpc_clone_client);
 654
 655/**
 656 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
 657 *
 658 * @clnt: RPC client whose parameters are copied
 659 * @flavor: security flavor for new client
 660 *
 661 * Returns a fresh RPC client or an ERR_PTR.
 662 */
 663struct rpc_clnt *
 664rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
 665{
 666        struct rpc_create_args args = {
 667                .program        = clnt->cl_program,
 668                .prognumber     = clnt->cl_prog,
 669                .version        = clnt->cl_vers,
 670                .authflavor     = flavor,
 671        };
 672        return __rpc_clone_client(&args, clnt);
 673}
 674EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
 675
 676/**
 677 * rpc_switch_client_transport: switch the RPC transport on the fly
 678 * @clnt: pointer to a struct rpc_clnt
 679 * @args: pointer to the new transport arguments
 680 * @timeout: pointer to the new timeout parameters
 681 *
 682 * This function allows the caller to switch the RPC transport for the
 683 * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
 684 * server, for instance.  It assumes that the caller has ensured that
 685 * there are no active RPC tasks by using some form of locking.
 686 *
 687 * Returns zero if "clnt" is now using the new xprt.  Otherwise a
 688 * negative errno is returned, and "clnt" continues to use the old
 689 * xprt.
 690 */
 691int rpc_switch_client_transport(struct rpc_clnt *clnt,
 692                struct xprt_create *args,
 693                const struct rpc_timeout *timeout)
 694{
 695        const struct rpc_timeout *old_timeo;
 696        rpc_authflavor_t pseudoflavor;
 697        struct rpc_xprt_switch *xps, *oldxps;
 698        struct rpc_xprt *xprt, *old;
 699        struct rpc_clnt *parent;
 700        int err;
 701
 702        xprt = xprt_create_transport(args);
 703        if (IS_ERR(xprt)) {
 704                dprintk("RPC:       failed to create new xprt for clnt %p\n",
 705                        clnt);
 706                return PTR_ERR(xprt);
 707        }
 708
 709        xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 710        if (xps == NULL) {
 711                xprt_put(xprt);
 712                return -ENOMEM;
 713        }
 714
 715        pseudoflavor = clnt->cl_auth->au_flavor;
 716
 717        old_timeo = clnt->cl_timeout;
 718        old = rpc_clnt_set_transport(clnt, xprt, timeout);
 719        oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
 720
 721        rpc_unregister_client(clnt);
 722        __rpc_clnt_remove_pipedir(clnt);
 723        rpc_clnt_debugfs_unregister(clnt);
 724
 725        /*
 726         * A new transport was created.  "clnt" therefore
 727         * becomes the root of a new cl_parent tree.  clnt's
 728         * children, if it has any, still point to the old xprt.
 729         */
 730        parent = clnt->cl_parent;
 731        clnt->cl_parent = clnt;
 732
 733        /*
 734         * The old rpc_auth cache cannot be re-used.  GSS
 735         * contexts in particular are between a single
 736         * client and server.
 737         */
 738        err = rpc_client_register(clnt, pseudoflavor, NULL);
 739        if (err)
 740                goto out_revert;
 741
 742        synchronize_rcu();
 743        if (parent != clnt)
 744                rpc_release_client(parent);
 745        xprt_switch_put(oldxps);
 746        xprt_put(old);
 747        dprintk("RPC:       replaced xprt for clnt %p\n", clnt);
 748        return 0;
 749
 750out_revert:
 751        xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
 752        rpc_clnt_set_transport(clnt, old, old_timeo);
 753        clnt->cl_parent = parent;
 754        rpc_client_register(clnt, pseudoflavor, NULL);
 755        xprt_switch_put(xps);
 756        xprt_put(xprt);
 757        dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt);
 758        return err;
 759}
 760EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
 761
 762static
 763int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
 764{
 765        struct rpc_xprt_switch *xps;
 766
 767        rcu_read_lock();
 768        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 769        rcu_read_unlock();
 770        if (xps == NULL)
 771                return -EAGAIN;
 772        xprt_iter_init_listall(xpi, xps);
 773        xprt_switch_put(xps);
 774        return 0;
 775}
 776
 777/**
 778 * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
 779 * @clnt: pointer to client
 780 * @fn: function to apply
 781 * @data: void pointer to function data
 782 *
 783 * Iterates through the list of RPC transports currently attached to the
 784 * client and applies the function fn(clnt, xprt, data).
 785 *
 786 * On error, the iteration stops, and the function returns the error value.
 787 */
 788int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
 789                int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
 790                void *data)
 791{
 792        struct rpc_xprt_iter xpi;
 793        int ret;
 794
 795        ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
 796        if (ret)
 797                return ret;
 798        for (;;) {
 799                struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
 800
 801                if (!xprt)
 802                        break;
 803                ret = fn(clnt, xprt, data);
 804                xprt_put(xprt);
 805                if (ret < 0)
 806                        break;
 807        }
 808        xprt_iter_destroy(&xpi);
 809        return ret;
 810}
 811EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
 812
 813/*
 814 * Kill all tasks for the given client.
 815 * XXX: kill their descendants as well?
 816 */
 817void rpc_killall_tasks(struct rpc_clnt *clnt)
 818{
 819        struct rpc_task *rovr;
 820
 821
 822        if (list_empty(&clnt->cl_tasks))
 823                return;
 824        dprintk("RPC:       killing all tasks for client %p\n", clnt);
 825        /*
 826         * Spin lock all_tasks to prevent changes...
 827         */
 828        spin_lock(&clnt->cl_lock);
 829        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
 830                if (!RPC_IS_ACTIVATED(rovr))
 831                        continue;
 832                if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
 833                        rovr->tk_flags |= RPC_TASK_KILLED;
 834                        rpc_exit(rovr, -EIO);
 835                        if (RPC_IS_QUEUED(rovr))
 836                                rpc_wake_up_queued_task(rovr->tk_waitqueue,
 837                                                        rovr);
 838                }
 839        }
 840        spin_unlock(&clnt->cl_lock);
 841}
 842EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 843
 844/*
 845 * Properly shut down an RPC client, terminating all outstanding
 846 * requests.
 847 */
 848void rpc_shutdown_client(struct rpc_clnt *clnt)
 849{
 850        might_sleep();
 851
 852        dprintk_rcu("RPC:       shutting down %s client for %s\n",
 853                        clnt->cl_program->name,
 854                        rcu_dereference(clnt->cl_xprt)->servername);
 855
 856        while (!list_empty(&clnt->cl_tasks)) {
 857                rpc_killall_tasks(clnt);
 858                wait_event_timeout(destroy_wait,
 859                        list_empty(&clnt->cl_tasks), 1*HZ);
 860        }
 861
 862        rpc_release_client(clnt);
 863}
 864EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 865
 866/*
 867 * Free an RPC client
 868 */
 869static struct rpc_clnt *
 870rpc_free_client(struct rpc_clnt *clnt)
 871{
 872        struct rpc_clnt *parent = NULL;
 873
 874        dprintk_rcu("RPC:       destroying %s client for %s\n",
 875                        clnt->cl_program->name,
 876                        rcu_dereference(clnt->cl_xprt)->servername);
 877        if (clnt->cl_parent != clnt)
 878                parent = clnt->cl_parent;
 879        rpc_clnt_debugfs_unregister(clnt);
 880        rpc_clnt_remove_pipedir(clnt);
 881        rpc_unregister_client(clnt);
 882        rpc_free_iostats(clnt->cl_metrics);
 883        clnt->cl_metrics = NULL;
 884        xprt_put(rcu_dereference_raw(clnt->cl_xprt));
 885        xprt_iter_destroy(&clnt->cl_xpi);
 886        rpciod_down();
 887        rpc_free_clid(clnt);
 888        kfree(clnt);
 889        return parent;
 890}
 891
 892/*
 893 * Free an RPC client
 894 */
 895static struct rpc_clnt * 
 896rpc_free_auth(struct rpc_clnt *clnt)
 897{
 898        if (clnt->cl_auth == NULL)
 899                return rpc_free_client(clnt);
 900
 901        /*
 902         * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
 903         *       release remaining GSS contexts. This mechanism ensures
 904         *       that it can do so safely.
 905         */
 906        atomic_inc(&clnt->cl_count);
 907        rpcauth_release(clnt->cl_auth);
 908        clnt->cl_auth = NULL;
 909        if (atomic_dec_and_test(&clnt->cl_count))
 910                return rpc_free_client(clnt);
 911        return NULL;
 912}
 913
 914/*
 915 * Release reference to the RPC client
 916 */
 917void
 918rpc_release_client(struct rpc_clnt *clnt)
 919{
 920        dprintk("RPC:       rpc_release_client(%p)\n", clnt);
 921
 922        do {
 923                if (list_empty(&clnt->cl_tasks))
 924                        wake_up(&destroy_wait);
 925                if (!atomic_dec_and_test(&clnt->cl_count))
 926                        break;
 927                clnt = rpc_free_auth(clnt);
 928        } while (clnt != NULL);
 929}
 930EXPORT_SYMBOL_GPL(rpc_release_client);
 931
 932/**
 933 * rpc_bind_new_program - bind a new RPC program to an existing client
 934 * @old: old rpc_client
 935 * @program: rpc program to set
 936 * @vers: rpc program version
 937 *
 938 * Clones the rpc client and sets up a new RPC program. This is mainly
 939 * of use for enabling different RPC programs to share the same transport.
 940 * The Sun NFSv2/v3 ACL protocol can do this.
 941 */
 942struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 943                                      const struct rpc_program *program,
 944                                      u32 vers)
 945{
 946        struct rpc_create_args args = {
 947                .program        = program,
 948                .prognumber     = program->number,
 949                .version        = vers,
 950                .authflavor     = old->cl_auth->au_flavor,
 951        };
 952        struct rpc_clnt *clnt;
 953        int err;
 954
 955        clnt = __rpc_clone_client(&args, old);
 956        if (IS_ERR(clnt))
 957                goto out;
 958        err = rpc_ping(clnt);
 959        if (err != 0) {
 960                rpc_shutdown_client(clnt);
 961                clnt = ERR_PTR(err);
 962        }
 963out:
 964        return clnt;
 965}
 966EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 967
 968void rpc_task_release_client(struct rpc_task *task)
 969{
 970        struct rpc_clnt *clnt = task->tk_client;
 971        struct rpc_xprt *xprt = task->tk_xprt;
 972
 973        if (clnt != NULL) {
 974                /* Remove from client task list */
 975                spin_lock(&clnt->cl_lock);
 976                list_del(&task->tk_task);
 977                spin_unlock(&clnt->cl_lock);
 978                task->tk_client = NULL;
 979
 980                rpc_release_client(clnt);
 981        }
 982
 983        if (xprt != NULL) {
 984                task->tk_xprt = NULL;
 985
 986                xprt_put(xprt);
 987        }
 988}
 989
 990static
 991void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 992{
 993
 994        if (clnt != NULL) {
 995                if (task->tk_xprt == NULL)
 996                        task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
 997                task->tk_client = clnt;
 998                atomic_inc(&clnt->cl_count);
 999                if (clnt->cl_softrtry)
1000                        task->tk_flags |= RPC_TASK_SOFT;
1001                if (clnt->cl_noretranstimeo)
1002                        task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1003                if (atomic_read(&clnt->cl_swapper))
1004                        task->tk_flags |= RPC_TASK_SWAPPER;
1005                /* Add to the client's list of all tasks */
1006                spin_lock(&clnt->cl_lock);
1007                list_add_tail(&task->tk_task, &clnt->cl_tasks);
1008                spin_unlock(&clnt->cl_lock);
1009        }
1010}
1011
1012static void
1013rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1014{
1015        if (msg != NULL) {
1016                task->tk_msg.rpc_proc = msg->rpc_proc;
1017                task->tk_msg.rpc_argp = msg->rpc_argp;
1018                task->tk_msg.rpc_resp = msg->rpc_resp;
1019                if (msg->rpc_cred != NULL)
1020                        task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
1021        }
1022}
1023
1024/*
1025 * Default callback for async RPC calls
1026 */
1027static void
1028rpc_default_callback(struct rpc_task *task, void *data)
1029{
1030}
1031
1032static const struct rpc_call_ops rpc_default_ops = {
1033        .rpc_call_done = rpc_default_callback,
1034};
1035
1036/**
1037 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1038 * @task_setup_data: pointer to task initialisation data
1039 */
1040struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1041{
1042        struct rpc_task *task;
1043
1044        task = rpc_new_task(task_setup_data);
1045        if (IS_ERR(task))
1046                goto out;
1047
1048        rpc_task_set_client(task, task_setup_data->rpc_client);
1049        rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1050
1051        if (task->tk_action == NULL)
1052                rpc_call_start(task);
1053
1054        atomic_inc(&task->tk_count);
1055        rpc_execute(task);
1056out:
1057        return task;
1058}
1059EXPORT_SYMBOL_GPL(rpc_run_task);
1060
1061/**
1062 * rpc_call_sync - Perform a synchronous RPC call
1063 * @clnt: pointer to RPC client
1064 * @msg: RPC call parameters
1065 * @flags: RPC call flags
1066 */
1067int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1068{
1069        struct rpc_task *task;
1070        struct rpc_task_setup task_setup_data = {
1071                .rpc_client = clnt,
1072                .rpc_message = msg,
1073                .callback_ops = &rpc_default_ops,
1074                .flags = flags,
1075        };
1076        int status;
1077
1078        WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1079        if (flags & RPC_TASK_ASYNC) {
1080                rpc_release_calldata(task_setup_data.callback_ops,
1081                        task_setup_data.callback_data);
1082                return -EINVAL;
1083        }
1084
1085        task = rpc_run_task(&task_setup_data);
1086        if (IS_ERR(task))
1087                return PTR_ERR(task);
1088        status = task->tk_status;
1089        rpc_put_task(task);
1090        return status;
1091}
1092EXPORT_SYMBOL_GPL(rpc_call_sync);
1093
1094/**
1095 * rpc_call_async - Perform an asynchronous RPC call
1096 * @clnt: pointer to RPC client
1097 * @msg: RPC call parameters
1098 * @flags: RPC call flags
1099 * @tk_ops: RPC call ops
1100 * @data: user call data
1101 */
1102int
1103rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1104               const struct rpc_call_ops *tk_ops, void *data)
1105{
1106        struct rpc_task *task;
1107        struct rpc_task_setup task_setup_data = {
1108                .rpc_client = clnt,
1109                .rpc_message = msg,
1110                .callback_ops = tk_ops,
1111                .callback_data = data,
1112                .flags = flags|RPC_TASK_ASYNC,
1113        };
1114
1115        task = rpc_run_task(&task_setup_data);
1116        if (IS_ERR(task))
1117                return PTR_ERR(task);
1118        rpc_put_task(task);
1119        return 0;
1120}
1121EXPORT_SYMBOL_GPL(rpc_call_async);
1122
1123#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1124/**
1125 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1126 * rpc_execute against it
1127 * @req: RPC request
1128 */
1129struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1130{
1131        struct rpc_task *task;
1132        struct xdr_buf *xbufp = &req->rq_snd_buf;
1133        struct rpc_task_setup task_setup_data = {
1134                .callback_ops = &rpc_default_ops,
1135                .flags = RPC_TASK_SOFTCONN,
1136        };
1137
1138        dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1139        /*
1140         * Create an rpc_task to send the data
1141         */
1142        task = rpc_new_task(&task_setup_data);
1143        if (IS_ERR(task)) {
1144                xprt_free_bc_request(req);
1145                goto out;
1146        }
1147        task->tk_rqstp = req;
1148
1149        /*
1150         * Set up the xdr_buf length.
1151         * This also indicates that the buffer is XDR encoded already.
1152         */
1153        xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1154                        xbufp->tail[0].iov_len;
1155
1156        task->tk_action = call_bc_transmit;
1157        atomic_inc(&task->tk_count);
1158        WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1159        rpc_execute(task);
1160
1161out:
1162        dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1163        return task;
1164}
1165#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1166
1167void
1168rpc_call_start(struct rpc_task *task)
1169{
1170        task->tk_action = call_start;
1171}
1172EXPORT_SYMBOL_GPL(rpc_call_start);
1173
1174/**
1175 * rpc_peeraddr - extract remote peer address from clnt's xprt
1176 * @clnt: RPC client structure
1177 * @buf: target buffer
1178 * @bufsize: length of target buffer
1179 *
1180 * Returns the number of bytes that are actually in the stored address.
1181 */
1182size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1183{
1184        size_t bytes;
1185        struct rpc_xprt *xprt;
1186
1187        rcu_read_lock();
1188        xprt = rcu_dereference(clnt->cl_xprt);
1189
1190        bytes = xprt->addrlen;
1191        if (bytes > bufsize)
1192                bytes = bufsize;
1193        memcpy(buf, &xprt->addr, bytes);
1194        rcu_read_unlock();
1195
1196        return bytes;
1197}
1198EXPORT_SYMBOL_GPL(rpc_peeraddr);
1199
1200/**
1201 * rpc_peeraddr2str - return remote peer address in printable format
1202 * @clnt: RPC client structure
1203 * @format: address format
1204 *
1205 * NB: the lifetime of the memory referenced by the returned pointer is
1206 * the same as the rpc_xprt itself.  As long as the caller uses this
1207 * pointer, it must hold the RCU read lock.
1208 */
1209const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1210                             enum rpc_display_format_t format)
1211{
1212        struct rpc_xprt *xprt;
1213
1214        xprt = rcu_dereference(clnt->cl_xprt);
1215
1216        if (xprt->address_strings[format] != NULL)
1217                return xprt->address_strings[format];
1218        else
1219                return "unprintable";
1220}
1221EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1222
1223static const struct sockaddr_in rpc_inaddr_loopback = {
1224        .sin_family             = AF_INET,
1225        .sin_addr.s_addr        = htonl(INADDR_ANY),
1226};
1227
1228static const struct sockaddr_in6 rpc_in6addr_loopback = {
1229        .sin6_family            = AF_INET6,
1230        .sin6_addr              = IN6ADDR_ANY_INIT,
1231};
1232
1233/*
1234 * Try a getsockname() on a connected datagram socket.  Using a
1235 * connected datagram socket prevents leaving a socket in TIME_WAIT.
1236 * This conserves the ephemeral port number space.
1237 *
1238 * Returns zero and fills in "buf" if successful; otherwise, a
1239 * negative errno is returned.
1240 */
1241static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1242                        struct sockaddr *buf, int buflen)
1243{
1244        struct socket *sock;
1245        int err;
1246
1247        err = __sock_create(net, sap->sa_family,
1248                                SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1249        if (err < 0) {
1250                dprintk("RPC:       can't create UDP socket (%d)\n", err);
1251                goto out;
1252        }
1253
1254        switch (sap->sa_family) {
1255        case AF_INET:
1256                err = kernel_bind(sock,
1257                                (struct sockaddr *)&rpc_inaddr_loopback,
1258                                sizeof(rpc_inaddr_loopback));
1259                break;
1260        case AF_INET6:
1261                err = kernel_bind(sock,
1262                                (struct sockaddr *)&rpc_in6addr_loopback,
1263                                sizeof(rpc_in6addr_loopback));
1264                break;
1265        default:
1266                err = -EAFNOSUPPORT;
1267                goto out;
1268        }
1269        if (err < 0) {
1270                dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1271                goto out_release;
1272        }
1273
1274        err = kernel_connect(sock, sap, salen, 0);
1275        if (err < 0) {
1276                dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1277                goto out_release;
1278        }
1279
1280        err = kernel_getsockname(sock, buf, &buflen);
1281        if (err < 0) {
1282                dprintk("RPC:       getsockname failed (%d)\n", err);
1283                goto out_release;
1284        }
1285
1286        err = 0;
1287        if (buf->sa_family == AF_INET6) {
1288                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1289                sin6->sin6_scope_id = 0;
1290        }
1291        dprintk("RPC:       %s succeeded\n", __func__);
1292
1293out_release:
1294        sock_release(sock);
1295out:
1296        return err;
1297}
1298
1299/*
1300 * Scraping a connected socket failed, so we don't have a useable
1301 * local address.  Fallback: generate an address that will prevent
1302 * the server from calling us back.
1303 *
1304 * Returns zero and fills in "buf" if successful; otherwise, a
1305 * negative errno is returned.
1306 */
1307static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1308{
1309        switch (family) {
1310        case AF_INET:
1311                if (buflen < sizeof(rpc_inaddr_loopback))
1312                        return -EINVAL;
1313                memcpy(buf, &rpc_inaddr_loopback,
1314                                sizeof(rpc_inaddr_loopback));
1315                break;
1316        case AF_INET6:
1317                if (buflen < sizeof(rpc_in6addr_loopback))
1318                        return -EINVAL;
1319                memcpy(buf, &rpc_in6addr_loopback,
1320                                sizeof(rpc_in6addr_loopback));
1321                break;
1322        default:
1323                dprintk("RPC:       %s: address family not supported\n",
1324                        __func__);
1325                return -EAFNOSUPPORT;
1326        }
1327        dprintk("RPC:       %s: succeeded\n", __func__);
1328        return 0;
1329}
1330
1331/**
1332 * rpc_localaddr - discover local endpoint address for an RPC client
1333 * @clnt: RPC client structure
1334 * @buf: target buffer
1335 * @buflen: size of target buffer, in bytes
1336 *
1337 * Returns zero and fills in "buf" and "buflen" if successful;
1338 * otherwise, a negative errno is returned.
1339 *
1340 * This works even if the underlying transport is not currently connected,
1341 * or if the upper layer never previously provided a source address.
1342 *
1343 * The result of this function call is transient: multiple calls in
1344 * succession may give different results, depending on how local
1345 * networking configuration changes over time.
1346 */
1347int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1348{
1349        struct sockaddr_storage address;
1350        struct sockaddr *sap = (struct sockaddr *)&address;
1351        struct rpc_xprt *xprt;
1352        struct net *net;
1353        size_t salen;
1354        int err;
1355
1356        rcu_read_lock();
1357        xprt = rcu_dereference(clnt->cl_xprt);
1358        salen = xprt->addrlen;
1359        memcpy(sap, &xprt->addr, salen);
1360        net = get_net(xprt->xprt_net);
1361        rcu_read_unlock();
1362
1363        rpc_set_port(sap, 0);
1364        err = rpc_sockname(net, sap, salen, buf, buflen);
1365        put_net(net);
1366        if (err != 0)
1367                /* Couldn't discover local address, return ANYADDR */
1368                return rpc_anyaddr(sap->sa_family, buf, buflen);
1369        return 0;
1370}
1371EXPORT_SYMBOL_GPL(rpc_localaddr);
1372
1373void
1374rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1375{
1376        struct rpc_xprt *xprt;
1377
1378        rcu_read_lock();
1379        xprt = rcu_dereference(clnt->cl_xprt);
1380        if (xprt->ops->set_buffer_size)
1381                xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1382        rcu_read_unlock();
1383}
1384EXPORT_SYMBOL_GPL(rpc_setbufsize);
1385
1386/**
1387 * rpc_protocol - Get transport protocol number for an RPC client
1388 * @clnt: RPC client to query
1389 *
1390 */
1391int rpc_protocol(struct rpc_clnt *clnt)
1392{
1393        int protocol;
1394
1395        rcu_read_lock();
1396        protocol = rcu_dereference(clnt->cl_xprt)->prot;
1397        rcu_read_unlock();
1398        return protocol;
1399}
1400EXPORT_SYMBOL_GPL(rpc_protocol);
1401
1402/**
1403 * rpc_net_ns - Get the network namespace for this RPC client
1404 * @clnt: RPC client to query
1405 *
1406 */
1407struct net *rpc_net_ns(struct rpc_clnt *clnt)
1408{
1409        struct net *ret;
1410
1411        rcu_read_lock();
1412        ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1413        rcu_read_unlock();
1414        return ret;
1415}
1416EXPORT_SYMBOL_GPL(rpc_net_ns);
1417
1418/**
1419 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1420 * @clnt: RPC client to query
1421 *
1422 * For stream transports, this is one RPC record fragment (see RFC
1423 * 1831), as we don't support multi-record requests yet.  For datagram
1424 * transports, this is the size of an IP packet minus the IP, UDP, and
1425 * RPC header sizes.
1426 */
1427size_t rpc_max_payload(struct rpc_clnt *clnt)
1428{
1429        size_t ret;
1430
1431        rcu_read_lock();
1432        ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1433        rcu_read_unlock();
1434        return ret;
1435}
1436EXPORT_SYMBOL_GPL(rpc_max_payload);
1437
1438/**
1439 * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1440 * @clnt: RPC client to query
1441 */
1442size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1443{
1444        struct rpc_xprt *xprt;
1445        size_t ret;
1446
1447        rcu_read_lock();
1448        xprt = rcu_dereference(clnt->cl_xprt);
1449        ret = xprt->ops->bc_maxpayload(xprt);
1450        rcu_read_unlock();
1451        return ret;
1452}
1453EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1454
1455/**
1456 * rpc_get_timeout - Get timeout for transport in units of HZ
1457 * @clnt: RPC client to query
1458 */
1459unsigned long rpc_get_timeout(struct rpc_clnt *clnt)
1460{
1461        unsigned long ret;
1462
1463        rcu_read_lock();
1464        ret = rcu_dereference(clnt->cl_xprt)->timeout->to_initval;
1465        rcu_read_unlock();
1466        return ret;
1467}
1468EXPORT_SYMBOL_GPL(rpc_get_timeout);
1469
1470/**
1471 * rpc_force_rebind - force transport to check that remote port is unchanged
1472 * @clnt: client to rebind
1473 *
1474 */
1475void rpc_force_rebind(struct rpc_clnt *clnt)
1476{
1477        if (clnt->cl_autobind) {
1478                rcu_read_lock();
1479                xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1480                rcu_read_unlock();
1481        }
1482}
1483EXPORT_SYMBOL_GPL(rpc_force_rebind);
1484
1485/*
1486 * Restart an (async) RPC call from the call_prepare state.
1487 * Usually called from within the exit handler.
1488 */
1489int
1490rpc_restart_call_prepare(struct rpc_task *task)
1491{
1492        if (RPC_ASSASSINATED(task))
1493                return 0;
1494        task->tk_action = call_start;
1495        task->tk_status = 0;
1496        if (task->tk_ops->rpc_call_prepare != NULL)
1497                task->tk_action = rpc_prepare_task;
1498        return 1;
1499}
1500EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1501
1502/*
1503 * Restart an (async) RPC call. Usually called from within the
1504 * exit handler.
1505 */
1506int
1507rpc_restart_call(struct rpc_task *task)
1508{
1509        if (RPC_ASSASSINATED(task))
1510                return 0;
1511        task->tk_action = call_start;
1512        task->tk_status = 0;
1513        return 1;
1514}
1515EXPORT_SYMBOL_GPL(rpc_restart_call);
1516
1517#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1518const char
1519*rpc_proc_name(const struct rpc_task *task)
1520{
1521        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1522
1523        if (proc) {
1524                if (proc->p_name)
1525                        return proc->p_name;
1526                else
1527                        return "NULL";
1528        } else
1529                return "no proc";
1530}
1531#endif
1532
1533/*
1534 * 0.  Initial state
1535 *
1536 *     Other FSM states can be visited zero or more times, but
1537 *     this state is visited exactly once for each RPC.
1538 */
1539static void
1540call_start(struct rpc_task *task)
1541{
1542        struct rpc_clnt *clnt = task->tk_client;
1543
1544        dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1545                        clnt->cl_program->name, clnt->cl_vers,
1546                        rpc_proc_name(task),
1547                        (RPC_IS_ASYNC(task) ? "async" : "sync"));
1548
1549        /* Increment call count */
1550        task->tk_msg.rpc_proc->p_count++;
1551        clnt->cl_stats->rpccnt++;
1552        task->tk_action = call_reserve;
1553}
1554
1555/*
1556 * 1.   Reserve an RPC call slot
1557 */
1558static void
1559call_reserve(struct rpc_task *task)
1560{
1561        dprint_status(task);
1562
1563        task->tk_status  = 0;
1564        task->tk_action  = call_reserveresult;
1565        xprt_reserve(task);
1566}
1567
1568static void call_retry_reserve(struct rpc_task *task);
1569
1570/*
1571 * 1b.  Grok the result of xprt_reserve()
1572 */
1573static void
1574call_reserveresult(struct rpc_task *task)
1575{
1576        int status = task->tk_status;
1577
1578        dprint_status(task);
1579
1580        /*
1581         * After a call to xprt_reserve(), we must have either
1582         * a request slot or else an error status.
1583         */
1584        task->tk_status = 0;
1585        if (status >= 0) {
1586                if (task->tk_rqstp) {
1587                        task->tk_action = call_refresh;
1588                        return;
1589                }
1590
1591                printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1592                                __func__, status);
1593                rpc_exit(task, -EIO);
1594                return;
1595        }
1596
1597        /*
1598         * Even though there was an error, we may have acquired
1599         * a request slot somehow.  Make sure not to leak it.
1600         */
1601        if (task->tk_rqstp) {
1602                printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1603                                __func__, status);
1604                xprt_release(task);
1605        }
1606
1607        switch (status) {
1608        case -ENOMEM:
1609                rpc_delay(task, HZ >> 2);
1610        case -EAGAIN:   /* woken up; retry */
1611                task->tk_action = call_retry_reserve;
1612                return;
1613        case -EIO:      /* probably a shutdown */
1614                break;
1615        default:
1616                printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1617                                __func__, status);
1618                break;
1619        }
1620        rpc_exit(task, status);
1621}
1622
1623/*
1624 * 1c.  Retry reserving an RPC call slot
1625 */
1626static void
1627call_retry_reserve(struct rpc_task *task)
1628{
1629        dprint_status(task);
1630
1631        task->tk_status  = 0;
1632        task->tk_action  = call_reserveresult;
1633        xprt_retry_reserve(task);
1634}
1635
1636/*
1637 * 2.   Bind and/or refresh the credentials
1638 */
1639static void
1640call_refresh(struct rpc_task *task)
1641{
1642        dprint_status(task);
1643
1644        task->tk_action = call_refreshresult;
1645        task->tk_status = 0;
1646        task->tk_client->cl_stats->rpcauthrefresh++;
1647        rpcauth_refreshcred(task);
1648}
1649
1650/*
1651 * 2a.  Process the results of a credential refresh
1652 */
1653static void
1654call_refreshresult(struct rpc_task *task)
1655{
1656        int status = task->tk_status;
1657
1658        dprint_status(task);
1659
1660        task->tk_status = 0;
1661        task->tk_action = call_refresh;
1662        switch (status) {
1663        case 0:
1664                if (rpcauth_uptodatecred(task)) {
1665                        task->tk_action = call_allocate;
1666                        return;
1667                }
1668                /* Use rate-limiting and a max number of retries if refresh
1669                 * had status 0 but failed to update the cred.
1670                 */
1671        case -ETIMEDOUT:
1672                rpc_delay(task, 3*HZ);
1673        case -EAGAIN:
1674                status = -EACCES;
1675        case -EKEYEXPIRED:
1676                if (!task->tk_cred_retry)
1677                        break;
1678                task->tk_cred_retry--;
1679                dprintk("RPC: %5u %s: retry refresh creds\n",
1680                                task->tk_pid, __func__);
1681                return;
1682        }
1683        dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1684                                task->tk_pid, __func__, status);
1685        rpc_exit(task, status);
1686}
1687
1688/*
1689 * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1690 *      (Note: buffer memory is freed in xprt_release).
1691 */
1692static void
1693call_allocate(struct rpc_task *task)
1694{
1695        unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1696        struct rpc_rqst *req = task->tk_rqstp;
1697        struct rpc_xprt *xprt = req->rq_xprt;
1698        struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1699        int status;
1700
1701        dprint_status(task);
1702
1703        task->tk_status = 0;
1704        task->tk_action = call_bind;
1705
1706        if (req->rq_buffer)
1707                return;
1708
1709        if (proc->p_proc != 0) {
1710                BUG_ON(proc->p_arglen == 0);
1711                if (proc->p_decode != NULL)
1712                        BUG_ON(proc->p_replen == 0);
1713        }
1714
1715        /*
1716         * Calculate the size (in quads) of the RPC call
1717         * and reply headers, and convert both values
1718         * to byte sizes.
1719         */
1720        req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1721        req->rq_callsize <<= 2;
1722        req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1723        req->rq_rcvsize <<= 2;
1724
1725        status = xprt->ops->buf_alloc(task);
1726        xprt_inject_disconnect(xprt);
1727        if (status == 0)
1728                return;
1729        if (status != -ENOMEM) {
1730                rpc_exit(task, status);
1731                return;
1732        }
1733
1734        dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1735
1736        if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1737                task->tk_action = call_allocate;
1738                rpc_delay(task, HZ>>4);
1739                return;
1740        }
1741
1742        rpc_exit(task, -ERESTARTSYS);
1743}
1744
1745static inline int
1746rpc_task_need_encode(struct rpc_task *task)
1747{
1748        return task->tk_rqstp->rq_snd_buf.len == 0;
1749}
1750
1751static inline void
1752rpc_task_force_reencode(struct rpc_task *task)
1753{
1754        task->tk_rqstp->rq_snd_buf.len = 0;
1755        task->tk_rqstp->rq_bytes_sent = 0;
1756}
1757
1758/*
1759 * 3.   Encode arguments of an RPC call
1760 */
1761static void
1762rpc_xdr_encode(struct rpc_task *task)
1763{
1764        struct rpc_rqst *req = task->tk_rqstp;
1765        kxdreproc_t     encode;
1766        __be32          *p;
1767
1768        dprint_status(task);
1769
1770        xdr_buf_init(&req->rq_snd_buf,
1771                     req->rq_buffer,
1772                     req->rq_callsize);
1773        xdr_buf_init(&req->rq_rcv_buf,
1774                     req->rq_rbuffer,
1775                     req->rq_rcvsize);
1776
1777        p = rpc_encode_header(task);
1778        if (p == NULL) {
1779                printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1780                rpc_exit(task, -EIO);
1781                return;
1782        }
1783
1784        encode = task->tk_msg.rpc_proc->p_encode;
1785        if (encode == NULL)
1786                return;
1787
1788        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1789                        task->tk_msg.rpc_argp);
1790}
1791
1792/*
1793 * 4.   Get the server port number if not yet set
1794 */
1795static void
1796call_bind(struct rpc_task *task)
1797{
1798        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1799
1800        dprint_status(task);
1801
1802        task->tk_action = call_connect;
1803        if (!xprt_bound(xprt)) {
1804                task->tk_action = call_bind_status;
1805                task->tk_timeout = xprt->bind_timeout;
1806                xprt->ops->rpcbind(task);
1807        }
1808}
1809
1810/*
1811 * 4a.  Sort out bind result
1812 */
1813static void
1814call_bind_status(struct rpc_task *task)
1815{
1816        int status = -EIO;
1817
1818        if (task->tk_status >= 0) {
1819                dprint_status(task);
1820                task->tk_status = 0;
1821                task->tk_action = call_connect;
1822                return;
1823        }
1824
1825        trace_rpc_bind_status(task);
1826        switch (task->tk_status) {
1827        case -ENOMEM:
1828                dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1829                rpc_delay(task, HZ >> 2);
1830                goto retry_timeout;
1831        case -EACCES:
1832                dprintk("RPC: %5u remote rpcbind: RPC program/version "
1833                                "unavailable\n", task->tk_pid);
1834                /* fail immediately if this is an RPC ping */
1835                if (task->tk_msg.rpc_proc->p_proc == 0) {
1836                        status = -EOPNOTSUPP;
1837                        break;
1838                }
1839                if (task->tk_rebind_retry == 0)
1840                        break;
1841                task->tk_rebind_retry--;
1842                rpc_delay(task, 3*HZ);
1843                goto retry_timeout;
1844        case -ETIMEDOUT:
1845                dprintk("RPC: %5u rpcbind request timed out\n",
1846                                task->tk_pid);
1847                goto retry_timeout;
1848        case -EPFNOSUPPORT:
1849                /* server doesn't support any rpcbind version we know of */
1850                dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1851                                task->tk_pid);
1852                break;
1853        case -EPROTONOSUPPORT:
1854                dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1855                                task->tk_pid);
1856                goto retry_timeout;
1857        case -ECONNREFUSED:             /* connection problems */
1858        case -ECONNRESET:
1859        case -ECONNABORTED:
1860        case -ENOTCONN:
1861        case -EHOSTDOWN:
1862        case -EHOSTUNREACH:
1863        case -ENETUNREACH:
1864        case -ENOBUFS:
1865        case -EPIPE:
1866                dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1867                                task->tk_pid, task->tk_status);
1868                if (!RPC_IS_SOFTCONN(task)) {
1869                        rpc_delay(task, 5*HZ);
1870                        goto retry_timeout;
1871                }
1872                status = task->tk_status;
1873                break;
1874        default:
1875                dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1876                                task->tk_pid, -task->tk_status);
1877        }
1878
1879        rpc_exit(task, status);
1880        return;
1881
1882retry_timeout:
1883        task->tk_status = 0;
1884        task->tk_action = call_timeout;
1885}
1886
1887/*
1888 * 4b.  Connect to the RPC server
1889 */
1890static void
1891call_connect(struct rpc_task *task)
1892{
1893        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1894
1895        dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1896                        task->tk_pid, xprt,
1897                        (xprt_connected(xprt) ? "is" : "is not"));
1898
1899        task->tk_action = call_transmit;
1900        if (!xprt_connected(xprt)) {
1901                task->tk_action = call_connect_status;
1902                if (task->tk_status < 0)
1903                        return;
1904                if (task->tk_flags & RPC_TASK_NOCONNECT) {
1905                        rpc_exit(task, -ENOTCONN);
1906                        return;
1907                }
1908                xprt_connect(task);
1909        }
1910}
1911
1912/*
1913 * 4c.  Sort out connect result
1914 */
1915static void
1916call_connect_status(struct rpc_task *task)
1917{
1918        struct rpc_clnt *clnt = task->tk_client;
1919        int status = task->tk_status;
1920
1921        dprint_status(task);
1922
1923        trace_rpc_connect_status(task, status);
1924        task->tk_status = 0;
1925        switch (status) {
1926        case -ECONNREFUSED:
1927        case -ECONNRESET:
1928        case -ECONNABORTED:
1929        case -ENETUNREACH:
1930        case -EHOSTUNREACH:
1931        case -EADDRINUSE:
1932        case -ENOBUFS:
1933        case -EPIPE:
1934                xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
1935                                            task->tk_rqstp->rq_connect_cookie);
1936                if (RPC_IS_SOFTCONN(task))
1937                        break;
1938                /* retry with existing socket, after a delay */
1939                rpc_delay(task, 3*HZ);
1940        case -EAGAIN:
1941                /* Check for timeouts before looping back to call_bind */
1942        case -ETIMEDOUT:
1943                task->tk_action = call_timeout;
1944                return;
1945        case 0:
1946                clnt->cl_stats->netreconn++;
1947                task->tk_action = call_transmit;
1948                return;
1949        }
1950        rpc_exit(task, status);
1951}
1952
1953/*
1954 * 5.   Transmit the RPC request, and wait for reply
1955 */
1956static void
1957call_transmit(struct rpc_task *task)
1958{
1959        int is_retrans = RPC_WAS_SENT(task);
1960
1961        dprint_status(task);
1962
1963        task->tk_action = call_status;
1964        if (task->tk_status < 0)
1965                return;
1966        if (!xprt_prepare_transmit(task))
1967                return;
1968        task->tk_action = call_transmit_status;
1969        /* Encode here so that rpcsec_gss can use correct sequence number. */
1970        if (rpc_task_need_encode(task)) {
1971                rpc_xdr_encode(task);
1972                /* Did the encode result in an error condition? */
1973                if (task->tk_status != 0) {
1974                        /* Was the error nonfatal? */
1975                        if (task->tk_status == -EAGAIN)
1976                                rpc_delay(task, HZ >> 4);
1977                        else
1978                                rpc_exit(task, task->tk_status);
1979                        return;
1980                }
1981        }
1982        xprt_transmit(task);
1983        if (task->tk_status < 0)
1984                return;
1985        if (is_retrans)
1986                task->tk_client->cl_stats->rpcretrans++;
1987        /*
1988         * On success, ensure that we call xprt_end_transmit() before sleeping
1989         * in order to allow access to the socket to other RPC requests.
1990         */
1991        call_transmit_status(task);
1992        if (rpc_reply_expected(task))
1993                return;
1994        task->tk_action = rpc_exit_task;
1995        rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1996}
1997
1998/*
1999 * 5a.  Handle cleanup after a transmission
2000 */
2001static void
2002call_transmit_status(struct rpc_task *task)
2003{
2004        task->tk_action = call_status;
2005
2006        /*
2007         * Common case: success.  Force the compiler to put this
2008         * test first.
2009         */
2010        if (task->tk_status == 0) {
2011                xprt_end_transmit(task);
2012                rpc_task_force_reencode(task);
2013                return;
2014        }
2015
2016        switch (task->tk_status) {
2017        case -EAGAIN:
2018        case -ENOBUFS:
2019                break;
2020        default:
2021                dprint_status(task);
2022                xprt_end_transmit(task);
2023                rpc_task_force_reencode(task);
2024                break;
2025                /*
2026                 * Special cases: if we've been waiting on the
2027                 * socket's write_space() callback, or if the
2028                 * socket just returned a connection error,
2029                 * then hold onto the transport lock.
2030                 */
2031        case -ECONNREFUSED:
2032        case -EHOSTDOWN:
2033        case -EHOSTUNREACH:
2034        case -ENETUNREACH:
2035        case -EPERM:
2036                if (RPC_IS_SOFTCONN(task)) {
2037                        xprt_end_transmit(task);
2038                        rpc_exit(task, task->tk_status);
2039                        break;
2040                }
2041        case -ECONNRESET:
2042        case -ECONNABORTED:
2043        case -EADDRINUSE:
2044        case -ENOTCONN:
2045        case -EPIPE:
2046                rpc_task_force_reencode(task);
2047        }
2048}
2049
2050#if defined(CONFIG_SUNRPC_BACKCHANNEL)
2051/*
2052 * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2053 * addition, disconnect on connectivity errors.
2054 */
2055static void
2056call_bc_transmit(struct rpc_task *task)
2057{
2058        struct rpc_rqst *req = task->tk_rqstp;
2059
2060        if (!xprt_prepare_transmit(task))
2061                goto out_retry;
2062
2063        if (task->tk_status < 0) {
2064                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2065                        "error: %d\n", task->tk_status);
2066                goto out_done;
2067        }
2068        if (req->rq_connect_cookie != req->rq_xprt->connect_cookie)
2069                req->rq_bytes_sent = 0;
2070
2071        xprt_transmit(task);
2072
2073        if (task->tk_status == -EAGAIN)
2074                goto out_nospace;
2075
2076        xprt_end_transmit(task);
2077        dprint_status(task);
2078        switch (task->tk_status) {
2079        case 0:
2080                /* Success */
2081        case -EHOSTDOWN:
2082        case -EHOSTUNREACH:
2083        case -ENETUNREACH:
2084        case -ECONNRESET:
2085        case -ECONNREFUSED:
2086        case -EADDRINUSE:
2087        case -ENOTCONN:
2088        case -EPIPE:
2089                break;
2090        case -ETIMEDOUT:
2091                /*
2092                 * Problem reaching the server.  Disconnect and let the
2093                 * forechannel reestablish the connection.  The server will
2094                 * have to retransmit the backchannel request and we'll
2095                 * reprocess it.  Since these ops are idempotent, there's no
2096                 * need to cache our reply at this time.
2097                 */
2098                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2099                        "error: %d\n", task->tk_status);
2100                xprt_conditional_disconnect(req->rq_xprt,
2101                        req->rq_connect_cookie);
2102                break;
2103        default:
2104                /*
2105                 * We were unable to reply and will have to drop the
2106                 * request.  The server should reconnect and retransmit.
2107                 */
2108                WARN_ON_ONCE(task->tk_status == -EAGAIN);
2109                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2110                        "error: %d\n", task->tk_status);
2111                break;
2112        }
2113        rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
2114out_done:
2115        task->tk_action = rpc_exit_task;
2116        return;
2117out_nospace:
2118        req->rq_connect_cookie = req->rq_xprt->connect_cookie;
2119out_retry:
2120        task->tk_status = 0;
2121}
2122#endif /* CONFIG_SUNRPC_BACKCHANNEL */
2123
2124/*
2125 * 6.   Sort out the RPC call status
2126 */
2127static void
2128call_status(struct rpc_task *task)
2129{
2130        struct rpc_clnt *clnt = task->tk_client;
2131        struct rpc_rqst *req = task->tk_rqstp;
2132        int             status;
2133
2134        if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
2135                task->tk_status = req->rq_reply_bytes_recvd;
2136
2137        dprint_status(task);
2138
2139        status = task->tk_status;
2140        if (status >= 0) {
2141                task->tk_action = call_decode;
2142                return;
2143        }
2144
2145        trace_rpc_call_status(task);
2146        task->tk_status = 0;
2147        switch(status) {
2148        case -EHOSTDOWN:
2149        case -EHOSTUNREACH:
2150        case -ENETUNREACH:
2151        case -EPERM:
2152                if (RPC_IS_SOFTCONN(task)) {
2153                        rpc_exit(task, status);
2154                        break;
2155                }
2156                /*
2157                 * Delay any retries for 3 seconds, then handle as if it
2158                 * were a timeout.
2159                 */
2160                rpc_delay(task, 3*HZ);
2161        case -ETIMEDOUT:
2162                task->tk_action = call_timeout;
2163                if (!(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
2164                    && task->tk_client->cl_discrtry)
2165                        xprt_conditional_disconnect(req->rq_xprt,
2166                                        req->rq_connect_cookie);
2167                break;
2168        case -ECONNREFUSED:
2169        case -ECONNRESET:
2170        case -ECONNABORTED:
2171                rpc_force_rebind(clnt);
2172        case -EADDRINUSE:
2173                rpc_delay(task, 3*HZ);
2174        case -EPIPE:
2175        case -ENOTCONN:
2176                task->tk_action = call_bind;
2177                break;
2178        case -ENOBUFS:
2179                rpc_delay(task, HZ>>2);
2180        case -EAGAIN:
2181                task->tk_action = call_transmit;
2182                break;
2183        case -EIO:
2184                /* shutdown or soft timeout */
2185                rpc_exit(task, status);
2186                break;
2187        default:
2188                if (clnt->cl_chatty)
2189                        printk("%s: RPC call returned error %d\n",
2190                               clnt->cl_program->name, -status);
2191                rpc_exit(task, status);
2192        }
2193}
2194
2195/*
2196 * 6a.  Handle RPC timeout
2197 *      We do not release the request slot, so we keep using the
2198 *      same XID for all retransmits.
2199 */
2200static void
2201call_timeout(struct rpc_task *task)
2202{
2203        struct rpc_clnt *clnt = task->tk_client;
2204
2205        if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
2206                dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
2207                goto retry;
2208        }
2209
2210        dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
2211        task->tk_timeouts++;
2212
2213        if (RPC_IS_SOFTCONN(task)) {
2214                rpc_exit(task, -ETIMEDOUT);
2215                return;
2216        }
2217        if (RPC_IS_SOFT(task)) {
2218                if (clnt->cl_chatty) {
2219                        printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
2220                                clnt->cl_program->name,
2221                                task->tk_xprt->servername);
2222                }
2223                if (task->tk_flags & RPC_TASK_TIMEOUT)
2224                        rpc_exit(task, -ETIMEDOUT);
2225                else
2226                        rpc_exit(task, -EIO);
2227                return;
2228        }
2229
2230        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2231                task->tk_flags |= RPC_CALL_MAJORSEEN;
2232                if (clnt->cl_chatty) {
2233                        printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
2234                        clnt->cl_program->name,
2235                        task->tk_xprt->servername);
2236                }
2237        }
2238        rpc_force_rebind(clnt);
2239        /*
2240         * Did our request time out due to an RPCSEC_GSS out-of-sequence
2241         * event? RFC2203 requires the server to drop all such requests.
2242         */
2243        rpcauth_invalcred(task);
2244
2245retry:
2246        task->tk_action = call_bind;
2247        task->tk_status = 0;
2248}
2249
2250/*
2251 * 7.   Decode the RPC reply
2252 */
2253static void
2254call_decode(struct rpc_task *task)
2255{
2256        struct rpc_clnt *clnt = task->tk_client;
2257        struct rpc_rqst *req = task->tk_rqstp;
2258        kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
2259        __be32          *p;
2260
2261        dprint_status(task);
2262
2263        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2264                if (clnt->cl_chatty) {
2265                        printk(KERN_NOTICE "%s: server %s OK\n",
2266                                clnt->cl_program->name,
2267                                task->tk_xprt->servername);
2268                }
2269                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2270        }
2271
2272        /*
2273         * Ensure that we see all writes made by xprt_complete_rqst()
2274         * before it changed req->rq_reply_bytes_recvd.
2275         */
2276        smp_rmb();
2277        req->rq_rcv_buf.len = req->rq_private_buf.len;
2278
2279        /* Check that the softirq receive buffer is valid */
2280        WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2281                                sizeof(req->rq_rcv_buf)) != 0);
2282
2283        if (req->rq_rcv_buf.len < 12) {
2284                if (!RPC_IS_SOFT(task)) {
2285                        task->tk_action = call_bind;
2286                        goto out_retry;
2287                }
2288                dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2289                                clnt->cl_program->name, task->tk_status);
2290                task->tk_action = call_timeout;
2291                goto out_retry;
2292        }
2293
2294        p = rpc_verify_header(task);
2295        if (IS_ERR(p)) {
2296                if (p == ERR_PTR(-EAGAIN))
2297                        goto out_retry;
2298                return;
2299        }
2300
2301        task->tk_action = rpc_exit_task;
2302
2303        if (decode) {
2304                task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2305                                                      task->tk_msg.rpc_resp);
2306        }
2307        dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2308                        task->tk_status);
2309        return;
2310out_retry:
2311        task->tk_status = 0;
2312        /* Note: rpc_verify_header() may have freed the RPC slot */
2313        if (task->tk_rqstp == req) {
2314                req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2315                if (task->tk_client->cl_discrtry)
2316                        xprt_conditional_disconnect(req->rq_xprt,
2317                                        req->rq_connect_cookie);
2318        }
2319}
2320
2321static __be32 *
2322rpc_encode_header(struct rpc_task *task)
2323{
2324        struct rpc_clnt *clnt = task->tk_client;
2325        struct rpc_rqst *req = task->tk_rqstp;
2326        __be32          *p = req->rq_svec[0].iov_base;
2327
2328        /* FIXME: check buffer size? */
2329
2330        p = xprt_skip_transport_header(req->rq_xprt, p);
2331        *p++ = req->rq_xid;             /* XID */
2332        *p++ = htonl(RPC_CALL);         /* CALL */
2333        *p++ = htonl(RPC_VERSION);      /* RPC version */
2334        *p++ = htonl(clnt->cl_prog);    /* program number */
2335        *p++ = htonl(clnt->cl_vers);    /* program version */
2336        *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2337        p = rpcauth_marshcred(task, p);
2338        req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2339        return p;
2340}
2341
2342static __be32 *
2343rpc_verify_header(struct rpc_task *task)
2344{
2345        struct rpc_clnt *clnt = task->tk_client;
2346        struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2347        int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2348        __be32  *p = iov->iov_base;
2349        u32 n;
2350        int error = -EACCES;
2351
2352        if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2353                /* RFC-1014 says that the representation of XDR data must be a
2354                 * multiple of four bytes
2355                 * - if it isn't pointer subtraction in the NFS client may give
2356                 *   undefined results
2357                 */
2358                dprintk("RPC: %5u %s: XDR representation not a multiple of"
2359                       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2360                       task->tk_rqstp->rq_rcv_buf.len);
2361                error = -EIO;
2362                goto out_err;
2363        }
2364        if ((len -= 3) < 0)
2365                goto out_overflow;
2366
2367        p += 1; /* skip XID */
2368        if ((n = ntohl(*p++)) != RPC_REPLY) {
2369                dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2370                        task->tk_pid, __func__, n);
2371                error = -EIO;
2372                goto out_garbage;
2373        }
2374
2375        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2376                if (--len < 0)
2377                        goto out_overflow;
2378                switch ((n = ntohl(*p++))) {
2379                case RPC_AUTH_ERROR:
2380                        break;
2381                case RPC_MISMATCH:
2382                        dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2383                                task->tk_pid, __func__);
2384                        error = -EPROTONOSUPPORT;
2385                        goto out_err;
2386                default:
2387                        dprintk("RPC: %5u %s: RPC call rejected, "
2388                                "unknown error: %x\n",
2389                                task->tk_pid, __func__, n);
2390                        error = -EIO;
2391                        goto out_err;
2392                }
2393                if (--len < 0)
2394                        goto out_overflow;
2395                switch ((n = ntohl(*p++))) {
2396                case RPC_AUTH_REJECTEDCRED:
2397                case RPC_AUTH_REJECTEDVERF:
2398                case RPCSEC_GSS_CREDPROBLEM:
2399                case RPCSEC_GSS_CTXPROBLEM:
2400                        if (!task->tk_cred_retry)
2401                                break;
2402                        task->tk_cred_retry--;
2403                        dprintk("RPC: %5u %s: retry stale creds\n",
2404                                        task->tk_pid, __func__);
2405                        rpcauth_invalcred(task);
2406                        /* Ensure we obtain a new XID! */
2407                        xprt_release(task);
2408                        task->tk_action = call_reserve;
2409                        goto out_retry;
2410                case RPC_AUTH_BADCRED:
2411                case RPC_AUTH_BADVERF:
2412                        /* possibly garbled cred/verf? */
2413                        if (!task->tk_garb_retry)
2414                                break;
2415                        task->tk_garb_retry--;
2416                        dprintk("RPC: %5u %s: retry garbled creds\n",
2417                                        task->tk_pid, __func__);
2418                        task->tk_action = call_bind;
2419                        goto out_retry;
2420                case RPC_AUTH_TOOWEAK:
2421                        printk(KERN_NOTICE "RPC: server %s requires stronger "
2422                               "authentication.\n",
2423                               task->tk_xprt->servername);
2424                        break;
2425                default:
2426                        dprintk("RPC: %5u %s: unknown auth error: %x\n",
2427                                        task->tk_pid, __func__, n);
2428                        error = -EIO;
2429                }
2430                dprintk("RPC: %5u %s: call rejected %d\n",
2431                                task->tk_pid, __func__, n);
2432                goto out_err;
2433        }
2434        p = rpcauth_checkverf(task, p);
2435        if (IS_ERR(p)) {
2436                error = PTR_ERR(p);
2437                dprintk("RPC: %5u %s: auth check failed with %d\n",
2438                                task->tk_pid, __func__, error);
2439                goto out_garbage;               /* bad verifier, retry */
2440        }
2441        len = p - (__be32 *)iov->iov_base - 1;
2442        if (len < 0)
2443                goto out_overflow;
2444        switch ((n = ntohl(*p++))) {
2445        case RPC_SUCCESS:
2446                return p;
2447        case RPC_PROG_UNAVAIL:
2448                dprintk("RPC: %5u %s: program %u is unsupported "
2449                                "by server %s\n", task->tk_pid, __func__,
2450                                (unsigned int)clnt->cl_prog,
2451                                task->tk_xprt->servername);
2452                error = -EPFNOSUPPORT;
2453                goto out_err;
2454        case RPC_PROG_MISMATCH:
2455                dprintk("RPC: %5u %s: program %u, version %u unsupported "
2456                                "by server %s\n", task->tk_pid, __func__,
2457                                (unsigned int)clnt->cl_prog,
2458                                (unsigned int)clnt->cl_vers,
2459                                task->tk_xprt->servername);
2460                error = -EPROTONOSUPPORT;
2461                goto out_err;
2462        case RPC_PROC_UNAVAIL:
2463                dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
2464                                "version %u on server %s\n",
2465                                task->tk_pid, __func__,
2466                                rpc_proc_name(task),
2467                                clnt->cl_prog, clnt->cl_vers,
2468                                task->tk_xprt->servername);
2469                error = -EOPNOTSUPP;
2470                goto out_err;
2471        case RPC_GARBAGE_ARGS:
2472                dprintk("RPC: %5u %s: server saw garbage\n",
2473                                task->tk_pid, __func__);
2474                break;                  /* retry */
2475        default:
2476                dprintk("RPC: %5u %s: server accept status: %x\n",
2477                                task->tk_pid, __func__, n);
2478                /* Also retry */
2479        }
2480
2481out_garbage:
2482        clnt->cl_stats->rpcgarbage++;
2483        if (task->tk_garb_retry) {
2484                task->tk_garb_retry--;
2485                dprintk("RPC: %5u %s: retrying\n",
2486                                task->tk_pid, __func__);
2487                task->tk_action = call_bind;
2488out_retry:
2489                return ERR_PTR(-EAGAIN);
2490        }
2491out_err:
2492        rpc_exit(task, error);
2493        dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2494                        __func__, error);
2495        return ERR_PTR(error);
2496out_overflow:
2497        dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2498                        __func__);
2499        goto out_garbage;
2500}
2501
2502static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2503{
2504}
2505
2506static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2507{
2508        return 0;
2509}
2510
2511static struct rpc_procinfo rpcproc_null = {
2512        .p_encode = rpcproc_encode_null,
2513        .p_decode = rpcproc_decode_null,
2514};
2515
2516static int rpc_ping(struct rpc_clnt *clnt)
2517{
2518        struct rpc_message msg = {
2519                .rpc_proc = &rpcproc_null,
2520        };
2521        int err;
2522        msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2523        err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2524        put_rpccred(msg.rpc_cred);
2525        return err;
2526}
2527
2528static
2529struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2530                struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2531                const struct rpc_call_ops *ops, void *data)
2532{
2533        struct rpc_message msg = {
2534                .rpc_proc = &rpcproc_null,
2535                .rpc_cred = cred,
2536        };
2537        struct rpc_task_setup task_setup_data = {
2538                .rpc_client = clnt,
2539                .rpc_xprt = xprt,
2540                .rpc_message = &msg,
2541                .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
2542                .callback_data = data,
2543                .flags = flags,
2544        };
2545
2546        return rpc_run_task(&task_setup_data);
2547}
2548
2549struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2550{
2551        return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2552}
2553EXPORT_SYMBOL_GPL(rpc_call_null);
2554
2555struct rpc_cb_add_xprt_calldata {
2556        struct rpc_xprt_switch *xps;
2557        struct rpc_xprt *xprt;
2558};
2559
2560static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2561{
2562        struct rpc_cb_add_xprt_calldata *data = calldata;
2563
2564        if (task->tk_status == 0)
2565                rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2566}
2567
2568static void rpc_cb_add_xprt_release(void *calldata)
2569{
2570        struct rpc_cb_add_xprt_calldata *data = calldata;
2571
2572        xprt_put(data->xprt);
2573        xprt_switch_put(data->xps);
2574        kfree(data);
2575}
2576
2577static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2578        .rpc_call_done = rpc_cb_add_xprt_done,
2579        .rpc_release = rpc_cb_add_xprt_release,
2580};
2581
2582/**
2583 * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2584 * @clnt: pointer to struct rpc_clnt
2585 * @xps: pointer to struct rpc_xprt_switch,
2586 * @xprt: pointer struct rpc_xprt
2587 * @dummy: unused
2588 */
2589int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2590                struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2591                void *dummy)
2592{
2593        struct rpc_cb_add_xprt_calldata *data;
2594        struct rpc_cred *cred;
2595        struct rpc_task *task;
2596
2597        data = kmalloc(sizeof(*data), GFP_NOFS);
2598        if (!data)
2599                return -ENOMEM;
2600        data->xps = xprt_switch_get(xps);
2601        data->xprt = xprt_get(xprt);
2602
2603        cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2604        task = rpc_call_null_helper(clnt, xprt, cred,
2605                        RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC,
2606                        &rpc_cb_add_xprt_call_ops, data);
2607        put_rpccred(cred);
2608        if (IS_ERR(task))
2609                return PTR_ERR(task);
2610        rpc_put_task(task);
2611        return 1;
2612}
2613EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2614
2615/**
2616 * rpc_clnt_setup_test_and_add_xprt()
2617 *
2618 * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2619 *   1) caller of the test function must dereference the rpc_xprt_switch
2620 *   and the rpc_xprt.
2621 *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2622 *   the rpc_call_done routine.
2623 *
2624 * Upon success (return of 1), the test function adds the new
2625 * transport to the rpc_clnt xprt switch
2626 *
2627 * @clnt: struct rpc_clnt to get the new transport
2628 * @xps:  the rpc_xprt_switch to hold the new transport
2629 * @xprt: the rpc_xprt to test
2630 * @data: a struct rpc_add_xprt_test pointer that holds the test function
2631 *        and test function call data
2632 */
2633int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2634                                     struct rpc_xprt_switch *xps,
2635                                     struct rpc_xprt *xprt,
2636                                     void *data)
2637{
2638        struct rpc_cred *cred;
2639        struct rpc_task *task;
2640        struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2641        int status = -EADDRINUSE;
2642
2643        xprt = xprt_get(xprt);
2644        xprt_switch_get(xps);
2645
2646        if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2647                goto out_err;
2648
2649        /* Test the connection */
2650        cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2651        task = rpc_call_null_helper(clnt, xprt, cred,
2652                                    RPC_TASK_SOFT | RPC_TASK_SOFTCONN,
2653                                    NULL, NULL);
2654        put_rpccred(cred);
2655        if (IS_ERR(task)) {
2656                status = PTR_ERR(task);
2657                goto out_err;
2658        }
2659        status = task->tk_status;
2660        rpc_put_task(task);
2661
2662        if (status < 0)
2663                goto out_err;
2664
2665        /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2666        xtest->add_xprt_test(clnt, xprt, xtest->data);
2667
2668        /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2669        return 1;
2670out_err:
2671        xprt_put(xprt);
2672        xprt_switch_put(xps);
2673        pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2674                status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2675        return status;
2676}
2677EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2678
2679/**
2680 * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2681 * @clnt: pointer to struct rpc_clnt
2682 * @xprtargs: pointer to struct xprt_create
2683 * @setup: callback to test and/or set up the connection
2684 * @data: pointer to setup function data
2685 *
2686 * Creates a new transport using the parameters set in args and
2687 * adds it to clnt.
2688 * If ping is set, then test that connectivity succeeds before
2689 * adding the new transport.
2690 *
2691 */
2692int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2693                struct xprt_create *xprtargs,
2694                int (*setup)(struct rpc_clnt *,
2695                        struct rpc_xprt_switch *,
2696                        struct rpc_xprt *,
2697                        void *),
2698                void *data)
2699{
2700        struct rpc_xprt_switch *xps;
2701        struct rpc_xprt *xprt;
2702        unsigned long reconnect_timeout;
2703        unsigned char resvport;
2704        int ret = 0;
2705
2706        rcu_read_lock();
2707        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2708        xprt = xprt_iter_xprt(&clnt->cl_xpi);
2709        if (xps == NULL || xprt == NULL) {
2710                rcu_read_unlock();
2711                return -EAGAIN;
2712        }
2713        resvport = xprt->resvport;
2714        reconnect_timeout = xprt->max_reconnect_timeout;
2715        rcu_read_unlock();
2716
2717        xprt = xprt_create_transport(xprtargs);
2718        if (IS_ERR(xprt)) {
2719                ret = PTR_ERR(xprt);
2720                goto out_put_switch;
2721        }
2722        xprt->resvport = resvport;
2723        xprt->max_reconnect_timeout = reconnect_timeout;
2724
2725        rpc_xprt_switch_set_roundrobin(xps);
2726        if (setup) {
2727                ret = setup(clnt, xps, xprt, data);
2728                if (ret != 0)
2729                        goto out_put_xprt;
2730        }
2731        rpc_xprt_switch_add_xprt(xps, xprt);
2732out_put_xprt:
2733        xprt_put(xprt);
2734out_put_switch:
2735        xprt_switch_put(xps);
2736        return ret;
2737}
2738EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2739
2740static int
2741rpc_xprt_cap_max_reconnect_timeout(struct rpc_clnt *clnt,
2742                struct rpc_xprt *xprt,
2743                void *data)
2744{
2745        unsigned long timeout = *((unsigned long *)data);
2746
2747        if (timeout < xprt->max_reconnect_timeout)
2748                xprt->max_reconnect_timeout = timeout;
2749        return 0;
2750}
2751
2752void
2753rpc_cap_max_reconnect_timeout(struct rpc_clnt *clnt, unsigned long timeo)
2754{
2755        rpc_clnt_iterate_for_each_xprt(clnt,
2756                        rpc_xprt_cap_max_reconnect_timeout,
2757                        &timeo);
2758}
2759EXPORT_SYMBOL_GPL(rpc_cap_max_reconnect_timeout);
2760
2761void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2762{
2763        rcu_read_lock();
2764        xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2765        rcu_read_unlock();
2766}
2767EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
2768
2769void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
2770{
2771        rcu_read_lock();
2772        rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
2773                                 xprt);
2774        rcu_read_unlock();
2775}
2776EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
2777
2778bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
2779                                   const struct sockaddr *sap)
2780{
2781        struct rpc_xprt_switch *xps;
2782        bool ret;
2783
2784        rcu_read_lock();
2785        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
2786        ret = rpc_xprt_switch_has_addr(xps, sap);
2787        rcu_read_unlock();
2788        return ret;
2789}
2790EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
2791
2792#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2793static void rpc_show_header(void)
2794{
2795        printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2796                "-timeout ---ops--\n");
2797}
2798
2799static void rpc_show_task(const struct rpc_clnt *clnt,
2800                          const struct rpc_task *task)
2801{
2802        const char *rpc_waitq = "none";
2803
2804        if (RPC_IS_QUEUED(task))
2805                rpc_waitq = rpc_qname(task->tk_waitqueue);
2806
2807        printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2808                task->tk_pid, task->tk_flags, task->tk_status,
2809                clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2810                clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2811                task->tk_action, rpc_waitq);
2812}
2813
2814void rpc_show_tasks(struct net *net)
2815{
2816        struct rpc_clnt *clnt;
2817        struct rpc_task *task;
2818        int header = 0;
2819        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2820
2821        spin_lock(&sn->rpc_client_lock);
2822        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2823                spin_lock(&clnt->cl_lock);
2824                list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2825                        if (!header) {
2826                                rpc_show_header();
2827                                header++;
2828                        }
2829                        rpc_show_task(clnt, task);
2830                }
2831                spin_unlock(&clnt->cl_lock);
2832        }
2833        spin_unlock(&sn->rpc_client_lock);
2834}
2835#endif
2836
2837#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
2838static int
2839rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
2840                struct rpc_xprt *xprt,
2841                void *dummy)
2842{
2843        return xprt_enable_swap(xprt);
2844}
2845
2846int
2847rpc_clnt_swap_activate(struct rpc_clnt *clnt)
2848{
2849        if (atomic_inc_return(&clnt->cl_swapper) == 1)
2850                return rpc_clnt_iterate_for_each_xprt(clnt,
2851                                rpc_clnt_swap_activate_callback, NULL);
2852        return 0;
2853}
2854EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
2855
2856static int
2857rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
2858                struct rpc_xprt *xprt,
2859                void *dummy)
2860{
2861        xprt_disable_swap(xprt);
2862        return 0;
2863}
2864
2865void
2866rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
2867{
2868        if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
2869                rpc_clnt_iterate_for_each_xprt(clnt,
2870                                rpc_clnt_swap_deactivate_callback, NULL);
2871}
2872EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
2873#endif /* CONFIG_SUNRPC_SWAP */
2874