linux/net/sunrpc/clnt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/clnt.c
   3 *
   4 *  This file contains the high-level RPC interface.
   5 *  It is modeled as a finite state machine to support both synchronous
   6 *  and asynchronous requests.
   7 *
   8 *  -   RPC header generation and argument serialization.
   9 *  -   Credential refresh.
  10 *  -   TCP connect handling.
  11 *  -   Retry of operation when it is suspected the operation failed because
  12 *      of uid squashing on the server, or when the credentials were stale
  13 *      and need to be refreshed, or when a packet was damaged in transit.
  14 *      This may be have to be moved to the VFS layer.
  15 *
  16 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
  17 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
  18 */
  19
  20
  21#include <linux/module.h>
  22#include <linux/types.h>
  23#include <linux/kallsyms.h>
  24#include <linux/mm.h>
  25#include <linux/namei.h>
  26#include <linux/mount.h>
  27#include <linux/slab.h>
  28#include <linux/rcupdate.h>
  29#include <linux/utsname.h>
  30#include <linux/workqueue.h>
  31#include <linux/in.h>
  32#include <linux/in6.h>
  33#include <linux/un.h>
  34
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/addr.h>
  37#include <linux/sunrpc/rpc_pipe_fs.h>
  38#include <linux/sunrpc/metrics.h>
  39#include <linux/sunrpc/bc_xprt.h>
  40#include <trace/events/sunrpc.h>
  41
  42#include "sunrpc.h"
  43#include "netns.h"
  44
  45#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  46# define RPCDBG_FACILITY        RPCDBG_CALL
  47#endif
  48
  49#define dprint_status(t)                                        \
  50        dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
  51                        __func__, t->tk_status)
  52
  53/*
  54 * All RPC clients are linked into this list
  55 */
  56
  57static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
  58
  59
  60static void     call_start(struct rpc_task *task);
  61static void     call_reserve(struct rpc_task *task);
  62static void     call_reserveresult(struct rpc_task *task);
  63static void     call_allocate(struct rpc_task *task);
  64static void     call_decode(struct rpc_task *task);
  65static void     call_bind(struct rpc_task *task);
  66static void     call_bind_status(struct rpc_task *task);
  67static void     call_transmit(struct rpc_task *task);
  68#if defined(CONFIG_SUNRPC_BACKCHANNEL)
  69static void     call_bc_transmit(struct rpc_task *task);
  70#endif /* CONFIG_SUNRPC_BACKCHANNEL */
  71static void     call_status(struct rpc_task *task);
  72static void     call_transmit_status(struct rpc_task *task);
  73static void     call_refresh(struct rpc_task *task);
  74static void     call_refreshresult(struct rpc_task *task);
  75static void     call_timeout(struct rpc_task *task);
  76static void     call_connect(struct rpc_task *task);
  77static void     call_connect_status(struct rpc_task *task);
  78
  79static __be32   *rpc_encode_header(struct rpc_task *task);
  80static __be32   *rpc_verify_header(struct rpc_task *task);
  81static int      rpc_ping(struct rpc_clnt *clnt);
  82
  83static void rpc_register_client(struct rpc_clnt *clnt)
  84{
  85        struct net *net = rpc_net_ns(clnt);
  86        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  87
  88        spin_lock(&sn->rpc_client_lock);
  89        list_add(&clnt->cl_clients, &sn->all_clients);
  90        spin_unlock(&sn->rpc_client_lock);
  91}
  92
  93static void rpc_unregister_client(struct rpc_clnt *clnt)
  94{
  95        struct net *net = rpc_net_ns(clnt);
  96        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  97
  98        spin_lock(&sn->rpc_client_lock);
  99        list_del(&clnt->cl_clients);
 100        spin_unlock(&sn->rpc_client_lock);
 101}
 102
 103static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 104{
 105        rpc_remove_client_dir(clnt);
 106}
 107
 108static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 109{
 110        struct net *net = rpc_net_ns(clnt);
 111        struct super_block *pipefs_sb;
 112
 113        pipefs_sb = rpc_get_sb_net(net);
 114        if (pipefs_sb) {
 115                __rpc_clnt_remove_pipedir(clnt);
 116                rpc_put_sb_net(net);
 117        }
 118}
 119
 120static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
 121                                    struct rpc_clnt *clnt)
 122{
 123        static uint32_t clntid;
 124        const char *dir_name = clnt->cl_program->pipe_dir_name;
 125        char name[15];
 126        struct dentry *dir, *dentry;
 127
 128        dir = rpc_d_lookup_sb(sb, dir_name);
 129        if (dir == NULL) {
 130                pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
 131                return dir;
 132        }
 133        for (;;) {
 134                snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
 135                name[sizeof(name) - 1] = '\0';
 136                dentry = rpc_create_client_dir(dir, name, clnt);
 137                if (!IS_ERR(dentry))
 138                        break;
 139                if (dentry == ERR_PTR(-EEXIST))
 140                        continue;
 141                printk(KERN_INFO "RPC: Couldn't create pipefs entry"
 142                                " %s/%s, error %ld\n",
 143                                dir_name, name, PTR_ERR(dentry));
 144                break;
 145        }
 146        dput(dir);
 147        return dentry;
 148}
 149
 150static int
 151rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
 152{
 153        struct dentry *dentry;
 154
 155        if (clnt->cl_program->pipe_dir_name != NULL) {
 156                dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
 157                if (IS_ERR(dentry))
 158                        return PTR_ERR(dentry);
 159        }
 160        return 0;
 161}
 162
 163static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
 164{
 165        if (clnt->cl_program->pipe_dir_name == NULL)
 166                return 1;
 167
 168        switch (event) {
 169        case RPC_PIPEFS_MOUNT:
 170                if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
 171                        return 1;
 172                if (atomic_read(&clnt->cl_count) == 0)
 173                        return 1;
 174                break;
 175        case RPC_PIPEFS_UMOUNT:
 176                if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
 177                        return 1;
 178                break;
 179        }
 180        return 0;
 181}
 182
 183static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
 184                                   struct super_block *sb)
 185{
 186        struct dentry *dentry;
 187
 188        switch (event) {
 189        case RPC_PIPEFS_MOUNT:
 190                dentry = rpc_setup_pipedir_sb(sb, clnt);
 191                if (!dentry)
 192                        return -ENOENT;
 193                if (IS_ERR(dentry))
 194                        return PTR_ERR(dentry);
 195                break;
 196        case RPC_PIPEFS_UMOUNT:
 197                __rpc_clnt_remove_pipedir(clnt);
 198                break;
 199        default:
 200                printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
 201                return -ENOTSUPP;
 202        }
 203        return 0;
 204}
 205
 206static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
 207                                struct super_block *sb)
 208{
 209        int error = 0;
 210
 211        for (;; clnt = clnt->cl_parent) {
 212                if (!rpc_clnt_skip_event(clnt, event))
 213                        error = __rpc_clnt_handle_event(clnt, event, sb);
 214                if (error || clnt == clnt->cl_parent)
 215                        break;
 216        }
 217        return error;
 218}
 219
 220static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
 221{
 222        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
 223        struct rpc_clnt *clnt;
 224
 225        spin_lock(&sn->rpc_client_lock);
 226        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
 227                if (rpc_clnt_skip_event(clnt, event))
 228                        continue;
 229                spin_unlock(&sn->rpc_client_lock);
 230                return clnt;
 231        }
 232        spin_unlock(&sn->rpc_client_lock);
 233        return NULL;
 234}
 235
 236static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
 237                            void *ptr)
 238{
 239        struct super_block *sb = ptr;
 240        struct rpc_clnt *clnt;
 241        int error = 0;
 242
 243        while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
 244                error = __rpc_pipefs_event(clnt, event, sb);
 245                if (error)
 246                        break;
 247        }
 248        return error;
 249}
 250
 251static struct notifier_block rpc_clients_block = {
 252        .notifier_call  = rpc_pipefs_event,
 253        .priority       = SUNRPC_PIPEFS_RPC_PRIO,
 254};
 255
 256int rpc_clients_notifier_register(void)
 257{
 258        return rpc_pipefs_notifier_register(&rpc_clients_block);
 259}
 260
 261void rpc_clients_notifier_unregister(void)
 262{
 263        return rpc_pipefs_notifier_unregister(&rpc_clients_block);
 264}
 265
 266static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
 267                struct rpc_xprt *xprt,
 268                const struct rpc_timeout *timeout)
 269{
 270        struct rpc_xprt *old;
 271
 272        spin_lock(&clnt->cl_lock);
 273        old = rcu_dereference_protected(clnt->cl_xprt,
 274                        lockdep_is_held(&clnt->cl_lock));
 275
 276        if (!xprt_bound(xprt))
 277                clnt->cl_autobind = 1;
 278
 279        clnt->cl_timeout = timeout;
 280        rcu_assign_pointer(clnt->cl_xprt, xprt);
 281        spin_unlock(&clnt->cl_lock);
 282
 283        return old;
 284}
 285
 286static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
 287{
 288        clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
 289                        nodename, sizeof(clnt->cl_nodename));
 290}
 291
 292static int rpc_client_register(struct rpc_clnt *clnt,
 293                               rpc_authflavor_t pseudoflavor,
 294                               const char *client_name)
 295{
 296        struct rpc_auth_create_args auth_args = {
 297                .pseudoflavor = pseudoflavor,
 298                .target_name = client_name,
 299        };
 300        struct rpc_auth *auth;
 301        struct net *net = rpc_net_ns(clnt);
 302        struct super_block *pipefs_sb;
 303        int err;
 304
 305        rpc_clnt_debugfs_register(clnt);
 306
 307        pipefs_sb = rpc_get_sb_net(net);
 308        if (pipefs_sb) {
 309                err = rpc_setup_pipedir(pipefs_sb, clnt);
 310                if (err)
 311                        goto out;
 312        }
 313
 314        rpc_register_client(clnt);
 315        if (pipefs_sb)
 316                rpc_put_sb_net(net);
 317
 318        auth = rpcauth_create(&auth_args, clnt);
 319        if (IS_ERR(auth)) {
 320                dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
 321                                pseudoflavor);
 322                err = PTR_ERR(auth);
 323                goto err_auth;
 324        }
 325        return 0;
 326err_auth:
 327        pipefs_sb = rpc_get_sb_net(net);
 328        rpc_unregister_client(clnt);
 329        __rpc_clnt_remove_pipedir(clnt);
 330out:
 331        if (pipefs_sb)
 332                rpc_put_sb_net(net);
 333        rpc_clnt_debugfs_unregister(clnt);
 334        return err;
 335}
 336
 337static DEFINE_IDA(rpc_clids);
 338
 339void rpc_cleanup_clids(void)
 340{
 341        ida_destroy(&rpc_clids);
 342}
 343
 344static int rpc_alloc_clid(struct rpc_clnt *clnt)
 345{
 346        int clid;
 347
 348        clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
 349        if (clid < 0)
 350                return clid;
 351        clnt->cl_clid = clid;
 352        return 0;
 353}
 354
 355static void rpc_free_clid(struct rpc_clnt *clnt)
 356{
 357        ida_simple_remove(&rpc_clids, clnt->cl_clid);
 358}
 359
 360static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
 361                struct rpc_xprt_switch *xps,
 362                struct rpc_xprt *xprt,
 363                struct rpc_clnt *parent)
 364{
 365        const struct rpc_program *program = args->program;
 366        const struct rpc_version *version;
 367        struct rpc_clnt *clnt = NULL;
 368        const struct rpc_timeout *timeout;
 369        const char *nodename = args->nodename;
 370        int err;
 371
 372        /* sanity check the name before trying to print it */
 373        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
 374                        program->name, args->servername, xprt);
 375
 376        err = rpciod_up();
 377        if (err)
 378                goto out_no_rpciod;
 379
 380        err = -EINVAL;
 381        if (args->version >= program->nrvers)
 382                goto out_err;
 383        version = program->version[args->version];
 384        if (version == NULL)
 385                goto out_err;
 386
 387        err = -ENOMEM;
 388        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
 389        if (!clnt)
 390                goto out_err;
 391        clnt->cl_parent = parent ? : clnt;
 392
 393        err = rpc_alloc_clid(clnt);
 394        if (err)
 395                goto out_no_clid;
 396
 397        clnt->cl_procinfo = version->procs;
 398        clnt->cl_maxproc  = version->nrprocs;
 399        clnt->cl_prog     = args->prognumber ? : program->number;
 400        clnt->cl_vers     = version->number;
 401        clnt->cl_stats    = program->stats;
 402        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
 403        rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
 404        err = -ENOMEM;
 405        if (clnt->cl_metrics == NULL)
 406                goto out_no_stats;
 407        clnt->cl_program  = program;
 408        INIT_LIST_HEAD(&clnt->cl_tasks);
 409        spin_lock_init(&clnt->cl_lock);
 410
 411        timeout = xprt->timeout;
 412        if (args->timeout != NULL) {
 413                memcpy(&clnt->cl_timeout_default, args->timeout,
 414                                sizeof(clnt->cl_timeout_default));
 415                timeout = &clnt->cl_timeout_default;
 416        }
 417
 418        rpc_clnt_set_transport(clnt, xprt, timeout);
 419        xprt_iter_init(&clnt->cl_xpi, xps);
 420        xprt_switch_put(xps);
 421
 422        clnt->cl_rtt = &clnt->cl_rtt_default;
 423        rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
 424
 425        atomic_set(&clnt->cl_count, 1);
 426
 427        if (nodename == NULL)
 428                nodename = utsname()->nodename;
 429        /* save the nodename */
 430        rpc_clnt_set_nodename(clnt, nodename);
 431
 432        err = rpc_client_register(clnt, args->authflavor, args->client_name);
 433        if (err)
 434                goto out_no_path;
 435        if (parent)
 436                atomic_inc(&parent->cl_count);
 437        return clnt;
 438
 439out_no_path:
 440        rpc_free_iostats(clnt->cl_metrics);
 441out_no_stats:
 442        rpc_free_clid(clnt);
 443out_no_clid:
 444        kfree(clnt);
 445out_err:
 446        rpciod_down();
 447out_no_rpciod:
 448        xprt_switch_put(xps);
 449        xprt_put(xprt);
 450        return ERR_PTR(err);
 451}
 452
 453static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
 454                                        struct rpc_xprt *xprt)
 455{
 456        struct rpc_clnt *clnt = NULL;
 457        struct rpc_xprt_switch *xps;
 458
 459        if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
 460                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 461                xps = args->bc_xprt->xpt_bc_xps;
 462                xprt_switch_get(xps);
 463        } else {
 464                xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 465                if (xps == NULL) {
 466                        xprt_put(xprt);
 467                        return ERR_PTR(-ENOMEM);
 468                }
 469                if (xprt->bc_xprt) {
 470                        xprt_switch_get(xps);
 471                        xprt->bc_xprt->xpt_bc_xps = xps;
 472                }
 473        }
 474        clnt = rpc_new_client(args, xps, xprt, NULL);
 475        if (IS_ERR(clnt))
 476                return clnt;
 477
 478        if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
 479                int err = rpc_ping(clnt);
 480                if (err != 0) {
 481                        rpc_shutdown_client(clnt);
 482                        return ERR_PTR(err);
 483                }
 484        }
 485
 486        clnt->cl_softrtry = 1;
 487        if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
 488                clnt->cl_softrtry = 0;
 489
 490        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
 491                clnt->cl_autobind = 1;
 492        if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
 493                clnt->cl_noretranstimeo = 1;
 494        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
 495                clnt->cl_discrtry = 1;
 496        if (!(args->flags & RPC_CLNT_CREATE_QUIET))
 497                clnt->cl_chatty = 1;
 498
 499        return clnt;
 500}
 501
 502/**
 503 * rpc_create - create an RPC client and transport with one call
 504 * @args: rpc_clnt create argument structure
 505 *
 506 * Creates and initializes an RPC transport and an RPC client.
 507 *
 508 * It can ping the server in order to determine if it is up, and to see if
 509 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
 510 * this behavior so asynchronous tasks can also use rpc_create.
 511 */
 512struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 513{
 514        struct rpc_xprt *xprt;
 515        struct xprt_create xprtargs = {
 516                .net = args->net,
 517                .ident = args->protocol,
 518                .srcaddr = args->saddress,
 519                .dstaddr = args->address,
 520                .addrlen = args->addrsize,
 521                .servername = args->servername,
 522                .bc_xprt = args->bc_xprt,
 523        };
 524        char servername[48];
 525
 526        if (args->bc_xprt) {
 527                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 528                xprt = args->bc_xprt->xpt_bc_xprt;
 529                if (xprt) {
 530                        xprt_get(xprt);
 531                        return rpc_create_xprt(args, xprt);
 532                }
 533        }
 534
 535        if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
 536                xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
 537        if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
 538                xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
 539        /*
 540         * If the caller chooses not to specify a hostname, whip
 541         * up a string representation of the passed-in address.
 542         */
 543        if (xprtargs.servername == NULL) {
 544                struct sockaddr_un *sun =
 545                                (struct sockaddr_un *)args->address;
 546                struct sockaddr_in *sin =
 547                                (struct sockaddr_in *)args->address;
 548                struct sockaddr_in6 *sin6 =
 549                                (struct sockaddr_in6 *)args->address;
 550
 551                servername[0] = '\0';
 552                switch (args->address->sa_family) {
 553                case AF_LOCAL:
 554                        snprintf(servername, sizeof(servername), "%s",
 555                                 sun->sun_path);
 556                        break;
 557                case AF_INET:
 558                        snprintf(servername, sizeof(servername), "%pI4",
 559                                 &sin->sin_addr.s_addr);
 560                        break;
 561                case AF_INET6:
 562                        snprintf(servername, sizeof(servername), "%pI6",
 563                                 &sin6->sin6_addr);
 564                        break;
 565                default:
 566                        /* caller wants default server name, but
 567                         * address family isn't recognized. */
 568                        return ERR_PTR(-EINVAL);
 569                }
 570                xprtargs.servername = servername;
 571        }
 572
 573        xprt = xprt_create_transport(&xprtargs);
 574        if (IS_ERR(xprt))
 575                return (struct rpc_clnt *)xprt;
 576
 577        /*
 578         * By default, kernel RPC client connects from a reserved port.
 579         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
 580         * but it is always enabled for rpciod, which handles the connect
 581         * operation.
 582         */
 583        xprt->resvport = 1;
 584        if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 585                xprt->resvport = 0;
 586
 587        return rpc_create_xprt(args, xprt);
 588}
 589EXPORT_SYMBOL_GPL(rpc_create);
 590
 591/*
 592 * This function clones the RPC client structure. It allows us to share the
 593 * same transport while varying parameters such as the authentication
 594 * flavour.
 595 */
 596static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
 597                                           struct rpc_clnt *clnt)
 598{
 599        struct rpc_xprt_switch *xps;
 600        struct rpc_xprt *xprt;
 601        struct rpc_clnt *new;
 602        int err;
 603
 604        err = -ENOMEM;
 605        rcu_read_lock();
 606        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
 607        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 608        rcu_read_unlock();
 609        if (xprt == NULL || xps == NULL) {
 610                xprt_put(xprt);
 611                xprt_switch_put(xps);
 612                goto out_err;
 613        }
 614        args->servername = xprt->servername;
 615        args->nodename = clnt->cl_nodename;
 616
 617        new = rpc_new_client(args, xps, xprt, clnt);
 618        if (IS_ERR(new)) {
 619                err = PTR_ERR(new);
 620                goto out_err;
 621        }
 622
 623        /* Turn off autobind on clones */
 624        new->cl_autobind = 0;
 625        new->cl_softrtry = clnt->cl_softrtry;
 626        new->cl_noretranstimeo = clnt->cl_noretranstimeo;
 627        new->cl_discrtry = clnt->cl_discrtry;
 628        new->cl_chatty = clnt->cl_chatty;
 629        return new;
 630
 631out_err:
 632        dprintk("RPC:       %s: returned error %d\n", __func__, err);
 633        return ERR_PTR(err);
 634}
 635
 636/**
 637 * rpc_clone_client - Clone an RPC client structure
 638 *
 639 * @clnt: RPC client whose parameters are copied
 640 *
 641 * Returns a fresh RPC client or an ERR_PTR.
 642 */
 643struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
 644{
 645        struct rpc_create_args args = {
 646                .program        = clnt->cl_program,
 647                .prognumber     = clnt->cl_prog,
 648                .version        = clnt->cl_vers,
 649                .authflavor     = clnt->cl_auth->au_flavor,
 650        };
 651        return __rpc_clone_client(&args, clnt);
 652}
 653EXPORT_SYMBOL_GPL(rpc_clone_client);
 654
 655/**
 656 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
 657 *
 658 * @clnt: RPC client whose parameters are copied
 659 * @flavor: security flavor for new client
 660 *
 661 * Returns a fresh RPC client or an ERR_PTR.
 662 */
 663struct rpc_clnt *
 664rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
 665{
 666        struct rpc_create_args args = {
 667                .program        = clnt->cl_program,
 668                .prognumber     = clnt->cl_prog,
 669                .version        = clnt->cl_vers,
 670                .authflavor     = flavor,
 671        };
 672        return __rpc_clone_client(&args, clnt);
 673}
 674EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
 675
 676/**
 677 * rpc_switch_client_transport: switch the RPC transport on the fly
 678 * @clnt: pointer to a struct rpc_clnt
 679 * @args: pointer to the new transport arguments
 680 * @timeout: pointer to the new timeout parameters
 681 *
 682 * This function allows the caller to switch the RPC transport for the
 683 * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
 684 * server, for instance.  It assumes that the caller has ensured that
 685 * there are no active RPC tasks by using some form of locking.
 686 *
 687 * Returns zero if "clnt" is now using the new xprt.  Otherwise a
 688 * negative errno is returned, and "clnt" continues to use the old
 689 * xprt.
 690 */
 691int rpc_switch_client_transport(struct rpc_clnt *clnt,
 692                struct xprt_create *args,
 693                const struct rpc_timeout *timeout)
 694{
 695        const struct rpc_timeout *old_timeo;
 696        rpc_authflavor_t pseudoflavor;
 697        struct rpc_xprt_switch *xps, *oldxps;
 698        struct rpc_xprt *xprt, *old;
 699        struct rpc_clnt *parent;
 700        int err;
 701
 702        xprt = xprt_create_transport(args);
 703        if (IS_ERR(xprt)) {
 704                dprintk("RPC:       failed to create new xprt for clnt %p\n",
 705                        clnt);
 706                return PTR_ERR(xprt);
 707        }
 708
 709        xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 710        if (xps == NULL) {
 711                xprt_put(xprt);
 712                return -ENOMEM;
 713        }
 714
 715        pseudoflavor = clnt->cl_auth->au_flavor;
 716
 717        old_timeo = clnt->cl_timeout;
 718        old = rpc_clnt_set_transport(clnt, xprt, timeout);
 719        oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
 720
 721        rpc_unregister_client(clnt);
 722        __rpc_clnt_remove_pipedir(clnt);
 723        rpc_clnt_debugfs_unregister(clnt);
 724
 725        /*
 726         * A new transport was created.  "clnt" therefore
 727         * becomes the root of a new cl_parent tree.  clnt's
 728         * children, if it has any, still point to the old xprt.
 729         */
 730        parent = clnt->cl_parent;
 731        clnt->cl_parent = clnt;
 732
 733        /*
 734         * The old rpc_auth cache cannot be re-used.  GSS
 735         * contexts in particular are between a single
 736         * client and server.
 737         */
 738        err = rpc_client_register(clnt, pseudoflavor, NULL);
 739        if (err)
 740                goto out_revert;
 741
 742        synchronize_rcu();
 743        if (parent != clnt)
 744                rpc_release_client(parent);
 745        xprt_switch_put(oldxps);
 746        xprt_put(old);
 747        dprintk("RPC:       replaced xprt for clnt %p\n", clnt);
 748        return 0;
 749
 750out_revert:
 751        xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
 752        rpc_clnt_set_transport(clnt, old, old_timeo);
 753        clnt->cl_parent = parent;
 754        rpc_client_register(clnt, pseudoflavor, NULL);
 755        xprt_switch_put(xps);
 756        xprt_put(xprt);
 757        dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt);
 758        return err;
 759}
 760EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
 761
 762static
 763int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
 764{
 765        struct rpc_xprt_switch *xps;
 766
 767        rcu_read_lock();
 768        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 769        rcu_read_unlock();
 770        if (xps == NULL)
 771                return -EAGAIN;
 772        xprt_iter_init_listall(xpi, xps);
 773        xprt_switch_put(xps);
 774        return 0;
 775}
 776
 777/**
 778 * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
 779 * @clnt: pointer to client
 780 * @fn: function to apply
 781 * @data: void pointer to function data
 782 *
 783 * Iterates through the list of RPC transports currently attached to the
 784 * client and applies the function fn(clnt, xprt, data).
 785 *
 786 * On error, the iteration stops, and the function returns the error value.
 787 */
 788int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
 789                int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
 790                void *data)
 791{
 792        struct rpc_xprt_iter xpi;
 793        int ret;
 794
 795        ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
 796        if (ret)
 797                return ret;
 798        for (;;) {
 799                struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
 800
 801                if (!xprt)
 802                        break;
 803                ret = fn(clnt, xprt, data);
 804                xprt_put(xprt);
 805                if (ret < 0)
 806                        break;
 807        }
 808        xprt_iter_destroy(&xpi);
 809        return ret;
 810}
 811EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
 812
 813/*
 814 * Kill all tasks for the given client.
 815 * XXX: kill their descendants as well?
 816 */
 817void rpc_killall_tasks(struct rpc_clnt *clnt)
 818{
 819        struct rpc_task *rovr;
 820
 821
 822        if (list_empty(&clnt->cl_tasks))
 823                return;
 824        dprintk("RPC:       killing all tasks for client %p\n", clnt);
 825        /*
 826         * Spin lock all_tasks to prevent changes...
 827         */
 828        spin_lock(&clnt->cl_lock);
 829        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
 830                if (!RPC_IS_ACTIVATED(rovr))
 831                        continue;
 832                if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
 833                        rovr->tk_flags |= RPC_TASK_KILLED;
 834                        rpc_exit(rovr, -EIO);
 835                        if (RPC_IS_QUEUED(rovr))
 836                                rpc_wake_up_queued_task(rovr->tk_waitqueue,
 837                                                        rovr);
 838                }
 839        }
 840        spin_unlock(&clnt->cl_lock);
 841}
 842EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 843
 844/*
 845 * Properly shut down an RPC client, terminating all outstanding
 846 * requests.
 847 */
 848void rpc_shutdown_client(struct rpc_clnt *clnt)
 849{
 850        might_sleep();
 851
 852        dprintk_rcu("RPC:       shutting down %s client for %s\n",
 853                        clnt->cl_program->name,
 854                        rcu_dereference(clnt->cl_xprt)->servername);
 855
 856        while (!list_empty(&clnt->cl_tasks)) {
 857                rpc_killall_tasks(clnt);
 858                wait_event_timeout(destroy_wait,
 859                        list_empty(&clnt->cl_tasks), 1*HZ);
 860        }
 861
 862        rpc_release_client(clnt);
 863}
 864EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 865
 866/*
 867 * Free an RPC client
 868 */
 869static struct rpc_clnt *
 870rpc_free_client(struct rpc_clnt *clnt)
 871{
 872        struct rpc_clnt *parent = NULL;
 873
 874        dprintk_rcu("RPC:       destroying %s client for %s\n",
 875                        clnt->cl_program->name,
 876                        rcu_dereference(clnt->cl_xprt)->servername);
 877        if (clnt->cl_parent != clnt)
 878                parent = clnt->cl_parent;
 879        rpc_clnt_debugfs_unregister(clnt);
 880        rpc_clnt_remove_pipedir(clnt);
 881        rpc_unregister_client(clnt);
 882        rpc_free_iostats(clnt->cl_metrics);
 883        clnt->cl_metrics = NULL;
 884        xprt_put(rcu_dereference_raw(clnt->cl_xprt));
 885        xprt_iter_destroy(&clnt->cl_xpi);
 886        rpciod_down();
 887        rpc_free_clid(clnt);
 888        kfree(clnt);
 889        return parent;
 890}
 891
 892/*
 893 * Free an RPC client
 894 */
 895static struct rpc_clnt * 
 896rpc_free_auth(struct rpc_clnt *clnt)
 897{
 898        if (clnt->cl_auth == NULL)
 899                return rpc_free_client(clnt);
 900
 901        /*
 902         * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
 903         *       release remaining GSS contexts. This mechanism ensures
 904         *       that it can do so safely.
 905         */
 906        atomic_inc(&clnt->cl_count);
 907        rpcauth_release(clnt->cl_auth);
 908        clnt->cl_auth = NULL;
 909        if (atomic_dec_and_test(&clnt->cl_count))
 910                return rpc_free_client(clnt);
 911        return NULL;
 912}
 913
 914/*
 915 * Release reference to the RPC client
 916 */
 917void
 918rpc_release_client(struct rpc_clnt *clnt)
 919{
 920        dprintk("RPC:       rpc_release_client(%p)\n", clnt);
 921
 922        do {
 923                if (list_empty(&clnt->cl_tasks))
 924                        wake_up(&destroy_wait);
 925                if (!atomic_dec_and_test(&clnt->cl_count))
 926                        break;
 927                clnt = rpc_free_auth(clnt);
 928        } while (clnt != NULL);
 929}
 930EXPORT_SYMBOL_GPL(rpc_release_client);
 931
 932/**
 933 * rpc_bind_new_program - bind a new RPC program to an existing client
 934 * @old: old rpc_client
 935 * @program: rpc program to set
 936 * @vers: rpc program version
 937 *
 938 * Clones the rpc client and sets up a new RPC program. This is mainly
 939 * of use for enabling different RPC programs to share the same transport.
 940 * The Sun NFSv2/v3 ACL protocol can do this.
 941 */
 942struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 943                                      const struct rpc_program *program,
 944                                      u32 vers)
 945{
 946        struct rpc_create_args args = {
 947                .program        = program,
 948                .prognumber     = program->number,
 949                .version        = vers,
 950                .authflavor     = old->cl_auth->au_flavor,
 951        };
 952        struct rpc_clnt *clnt;
 953        int err;
 954
 955        clnt = __rpc_clone_client(&args, old);
 956        if (IS_ERR(clnt))
 957                goto out;
 958        err = rpc_ping(clnt);
 959        if (err != 0) {
 960                rpc_shutdown_client(clnt);
 961                clnt = ERR_PTR(err);
 962        }
 963out:
 964        return clnt;
 965}
 966EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 967
 968void rpc_task_release_client(struct rpc_task *task)
 969{
 970        struct rpc_clnt *clnt = task->tk_client;
 971        struct rpc_xprt *xprt = task->tk_xprt;
 972
 973        if (clnt != NULL) {
 974                /* Remove from client task list */
 975                spin_lock(&clnt->cl_lock);
 976                list_del(&task->tk_task);
 977                spin_unlock(&clnt->cl_lock);
 978                task->tk_client = NULL;
 979
 980                rpc_release_client(clnt);
 981        }
 982
 983        if (xprt != NULL) {
 984                task->tk_xprt = NULL;
 985
 986                xprt_put(xprt);
 987        }
 988}
 989
 990static
 991void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 992{
 993
 994        if (clnt != NULL) {
 995                if (task->tk_xprt == NULL)
 996                        task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
 997                task->tk_client = clnt;
 998                atomic_inc(&clnt->cl_count);
 999                if (clnt->cl_softrtry)
1000                        task->tk_flags |= RPC_TASK_SOFT;
1001                if (clnt->cl_noretranstimeo)
1002                        task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1003                if (atomic_read(&clnt->cl_swapper))
1004                        task->tk_flags |= RPC_TASK_SWAPPER;
1005                /* Add to the client's list of all tasks */
1006                spin_lock(&clnt->cl_lock);
1007                list_add_tail(&task->tk_task, &clnt->cl_tasks);
1008                spin_unlock(&clnt->cl_lock);
1009        }
1010}
1011
1012static void
1013rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1014{
1015        if (msg != NULL) {
1016                task->tk_msg.rpc_proc = msg->rpc_proc;
1017                task->tk_msg.rpc_argp = msg->rpc_argp;
1018                task->tk_msg.rpc_resp = msg->rpc_resp;
1019                if (msg->rpc_cred != NULL)
1020                        task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
1021        }
1022}
1023
1024/*
1025 * Default callback for async RPC calls
1026 */
1027static void
1028rpc_default_callback(struct rpc_task *task, void *data)
1029{
1030}
1031
1032static const struct rpc_call_ops rpc_default_ops = {
1033        .rpc_call_done = rpc_default_callback,
1034};
1035
1036/**
1037 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1038 * @task_setup_data: pointer to task initialisation data
1039 */
1040struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1041{
1042        struct rpc_task *task;
1043
1044        task = rpc_new_task(task_setup_data);
1045
1046        rpc_task_set_client(task, task_setup_data->rpc_client);
1047        rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1048
1049        if (task->tk_action == NULL)
1050                rpc_call_start(task);
1051
1052        atomic_inc(&task->tk_count);
1053        rpc_execute(task);
1054        return task;
1055}
1056EXPORT_SYMBOL_GPL(rpc_run_task);
1057
1058/**
1059 * rpc_call_sync - Perform a synchronous RPC call
1060 * @clnt: pointer to RPC client
1061 * @msg: RPC call parameters
1062 * @flags: RPC call flags
1063 */
1064int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1065{
1066        struct rpc_task *task;
1067        struct rpc_task_setup task_setup_data = {
1068                .rpc_client = clnt,
1069                .rpc_message = msg,
1070                .callback_ops = &rpc_default_ops,
1071                .flags = flags,
1072        };
1073        int status;
1074
1075        WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1076        if (flags & RPC_TASK_ASYNC) {
1077                rpc_release_calldata(task_setup_data.callback_ops,
1078                        task_setup_data.callback_data);
1079                return -EINVAL;
1080        }
1081
1082        task = rpc_run_task(&task_setup_data);
1083        if (IS_ERR(task))
1084                return PTR_ERR(task);
1085        status = task->tk_status;
1086        rpc_put_task(task);
1087        return status;
1088}
1089EXPORT_SYMBOL_GPL(rpc_call_sync);
1090
1091/**
1092 * rpc_call_async - Perform an asynchronous RPC call
1093 * @clnt: pointer to RPC client
1094 * @msg: RPC call parameters
1095 * @flags: RPC call flags
1096 * @tk_ops: RPC call ops
1097 * @data: user call data
1098 */
1099int
1100rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1101               const struct rpc_call_ops *tk_ops, void *data)
1102{
1103        struct rpc_task *task;
1104        struct rpc_task_setup task_setup_data = {
1105                .rpc_client = clnt,
1106                .rpc_message = msg,
1107                .callback_ops = tk_ops,
1108                .callback_data = data,
1109                .flags = flags|RPC_TASK_ASYNC,
1110        };
1111
1112        task = rpc_run_task(&task_setup_data);
1113        if (IS_ERR(task))
1114                return PTR_ERR(task);
1115        rpc_put_task(task);
1116        return 0;
1117}
1118EXPORT_SYMBOL_GPL(rpc_call_async);
1119
1120#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1121/**
1122 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1123 * rpc_execute against it
1124 * @req: RPC request
1125 */
1126struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1127{
1128        struct rpc_task *task;
1129        struct xdr_buf *xbufp = &req->rq_snd_buf;
1130        struct rpc_task_setup task_setup_data = {
1131                .callback_ops = &rpc_default_ops,
1132                .flags = RPC_TASK_SOFTCONN,
1133        };
1134
1135        dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1136        /*
1137         * Create an rpc_task to send the data
1138         */
1139        task = rpc_new_task(&task_setup_data);
1140        task->tk_rqstp = req;
1141
1142        /*
1143         * Set up the xdr_buf length.
1144         * This also indicates that the buffer is XDR encoded already.
1145         */
1146        xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1147                        xbufp->tail[0].iov_len;
1148
1149        task->tk_action = call_bc_transmit;
1150        atomic_inc(&task->tk_count);
1151        WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1152        rpc_execute(task);
1153
1154        dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1155        return task;
1156}
1157#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1158
1159void
1160rpc_call_start(struct rpc_task *task)
1161{
1162        task->tk_action = call_start;
1163}
1164EXPORT_SYMBOL_GPL(rpc_call_start);
1165
1166/**
1167 * rpc_peeraddr - extract remote peer address from clnt's xprt
1168 * @clnt: RPC client structure
1169 * @buf: target buffer
1170 * @bufsize: length of target buffer
1171 *
1172 * Returns the number of bytes that are actually in the stored address.
1173 */
1174size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1175{
1176        size_t bytes;
1177        struct rpc_xprt *xprt;
1178
1179        rcu_read_lock();
1180        xprt = rcu_dereference(clnt->cl_xprt);
1181
1182        bytes = xprt->addrlen;
1183        if (bytes > bufsize)
1184                bytes = bufsize;
1185        memcpy(buf, &xprt->addr, bytes);
1186        rcu_read_unlock();
1187
1188        return bytes;
1189}
1190EXPORT_SYMBOL_GPL(rpc_peeraddr);
1191
1192/**
1193 * rpc_peeraddr2str - return remote peer address in printable format
1194 * @clnt: RPC client structure
1195 * @format: address format
1196 *
1197 * NB: the lifetime of the memory referenced by the returned pointer is
1198 * the same as the rpc_xprt itself.  As long as the caller uses this
1199 * pointer, it must hold the RCU read lock.
1200 */
1201const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1202                             enum rpc_display_format_t format)
1203{
1204        struct rpc_xprt *xprt;
1205
1206        xprt = rcu_dereference(clnt->cl_xprt);
1207
1208        if (xprt->address_strings[format] != NULL)
1209                return xprt->address_strings[format];
1210        else
1211                return "unprintable";
1212}
1213EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1214
1215static const struct sockaddr_in rpc_inaddr_loopback = {
1216        .sin_family             = AF_INET,
1217        .sin_addr.s_addr        = htonl(INADDR_ANY),
1218};
1219
1220static const struct sockaddr_in6 rpc_in6addr_loopback = {
1221        .sin6_family            = AF_INET6,
1222        .sin6_addr              = IN6ADDR_ANY_INIT,
1223};
1224
1225/*
1226 * Try a getsockname() on a connected datagram socket.  Using a
1227 * connected datagram socket prevents leaving a socket in TIME_WAIT.
1228 * This conserves the ephemeral port number space.
1229 *
1230 * Returns zero and fills in "buf" if successful; otherwise, a
1231 * negative errno is returned.
1232 */
1233static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1234                        struct sockaddr *buf)
1235{
1236        struct socket *sock;
1237        int err;
1238
1239        err = __sock_create(net, sap->sa_family,
1240                                SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1241        if (err < 0) {
1242                dprintk("RPC:       can't create UDP socket (%d)\n", err);
1243                goto out;
1244        }
1245
1246        switch (sap->sa_family) {
1247        case AF_INET:
1248                err = kernel_bind(sock,
1249                                (struct sockaddr *)&rpc_inaddr_loopback,
1250                                sizeof(rpc_inaddr_loopback));
1251                break;
1252        case AF_INET6:
1253                err = kernel_bind(sock,
1254                                (struct sockaddr *)&rpc_in6addr_loopback,
1255                                sizeof(rpc_in6addr_loopback));
1256                break;
1257        default:
1258                err = -EAFNOSUPPORT;
1259                goto out;
1260        }
1261        if (err < 0) {
1262                dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1263                goto out_release;
1264        }
1265
1266        err = kernel_connect(sock, sap, salen, 0);
1267        if (err < 0) {
1268                dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1269                goto out_release;
1270        }
1271
1272        err = kernel_getsockname(sock, buf);
1273        if (err < 0) {
1274                dprintk("RPC:       getsockname failed (%d)\n", err);
1275                goto out_release;
1276        }
1277
1278        err = 0;
1279        if (buf->sa_family == AF_INET6) {
1280                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1281                sin6->sin6_scope_id = 0;
1282        }
1283        dprintk("RPC:       %s succeeded\n", __func__);
1284
1285out_release:
1286        sock_release(sock);
1287out:
1288        return err;
1289}
1290
1291/*
1292 * Scraping a connected socket failed, so we don't have a useable
1293 * local address.  Fallback: generate an address that will prevent
1294 * the server from calling us back.
1295 *
1296 * Returns zero and fills in "buf" if successful; otherwise, a
1297 * negative errno is returned.
1298 */
1299static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1300{
1301        switch (family) {
1302        case AF_INET:
1303                if (buflen < sizeof(rpc_inaddr_loopback))
1304                        return -EINVAL;
1305                memcpy(buf, &rpc_inaddr_loopback,
1306                                sizeof(rpc_inaddr_loopback));
1307                break;
1308        case AF_INET6:
1309                if (buflen < sizeof(rpc_in6addr_loopback))
1310                        return -EINVAL;
1311                memcpy(buf, &rpc_in6addr_loopback,
1312                                sizeof(rpc_in6addr_loopback));
1313                break;
1314        default:
1315                dprintk("RPC:       %s: address family not supported\n",
1316                        __func__);
1317                return -EAFNOSUPPORT;
1318        }
1319        dprintk("RPC:       %s: succeeded\n", __func__);
1320        return 0;
1321}
1322
1323/**
1324 * rpc_localaddr - discover local endpoint address for an RPC client
1325 * @clnt: RPC client structure
1326 * @buf: target buffer
1327 * @buflen: size of target buffer, in bytes
1328 *
1329 * Returns zero and fills in "buf" and "buflen" if successful;
1330 * otherwise, a negative errno is returned.
1331 *
1332 * This works even if the underlying transport is not currently connected,
1333 * or if the upper layer never previously provided a source address.
1334 *
1335 * The result of this function call is transient: multiple calls in
1336 * succession may give different results, depending on how local
1337 * networking configuration changes over time.
1338 */
1339int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1340{
1341        struct sockaddr_storage address;
1342        struct sockaddr *sap = (struct sockaddr *)&address;
1343        struct rpc_xprt *xprt;
1344        struct net *net;
1345        size_t salen;
1346        int err;
1347
1348        rcu_read_lock();
1349        xprt = rcu_dereference(clnt->cl_xprt);
1350        salen = xprt->addrlen;
1351        memcpy(sap, &xprt->addr, salen);
1352        net = get_net(xprt->xprt_net);
1353        rcu_read_unlock();
1354
1355        rpc_set_port(sap, 0);
1356        err = rpc_sockname(net, sap, salen, buf);
1357        put_net(net);
1358        if (err != 0)
1359                /* Couldn't discover local address, return ANYADDR */
1360                return rpc_anyaddr(sap->sa_family, buf, buflen);
1361        return 0;
1362}
1363EXPORT_SYMBOL_GPL(rpc_localaddr);
1364
1365void
1366rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1367{
1368        struct rpc_xprt *xprt;
1369
1370        rcu_read_lock();
1371        xprt = rcu_dereference(clnt->cl_xprt);
1372        if (xprt->ops->set_buffer_size)
1373                xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1374        rcu_read_unlock();
1375}
1376EXPORT_SYMBOL_GPL(rpc_setbufsize);
1377
1378/**
1379 * rpc_net_ns - Get the network namespace for this RPC client
1380 * @clnt: RPC client to query
1381 *
1382 */
1383struct net *rpc_net_ns(struct rpc_clnt *clnt)
1384{
1385        struct net *ret;
1386
1387        rcu_read_lock();
1388        ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1389        rcu_read_unlock();
1390        return ret;
1391}
1392EXPORT_SYMBOL_GPL(rpc_net_ns);
1393
1394/**
1395 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1396 * @clnt: RPC client to query
1397 *
1398 * For stream transports, this is one RPC record fragment (see RFC
1399 * 1831), as we don't support multi-record requests yet.  For datagram
1400 * transports, this is the size of an IP packet minus the IP, UDP, and
1401 * RPC header sizes.
1402 */
1403size_t rpc_max_payload(struct rpc_clnt *clnt)
1404{
1405        size_t ret;
1406
1407        rcu_read_lock();
1408        ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1409        rcu_read_unlock();
1410        return ret;
1411}
1412EXPORT_SYMBOL_GPL(rpc_max_payload);
1413
1414/**
1415 * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1416 * @clnt: RPC client to query
1417 */
1418size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1419{
1420        struct rpc_xprt *xprt;
1421        size_t ret;
1422
1423        rcu_read_lock();
1424        xprt = rcu_dereference(clnt->cl_xprt);
1425        ret = xprt->ops->bc_maxpayload(xprt);
1426        rcu_read_unlock();
1427        return ret;
1428}
1429EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1430
1431/**
1432 * rpc_force_rebind - force transport to check that remote port is unchanged
1433 * @clnt: client to rebind
1434 *
1435 */
1436void rpc_force_rebind(struct rpc_clnt *clnt)
1437{
1438        if (clnt->cl_autobind) {
1439                rcu_read_lock();
1440                xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1441                rcu_read_unlock();
1442        }
1443}
1444EXPORT_SYMBOL_GPL(rpc_force_rebind);
1445
1446/*
1447 * Restart an (async) RPC call from the call_prepare state.
1448 * Usually called from within the exit handler.
1449 */
1450int
1451rpc_restart_call_prepare(struct rpc_task *task)
1452{
1453        if (RPC_ASSASSINATED(task))
1454                return 0;
1455        task->tk_action = call_start;
1456        task->tk_status = 0;
1457        if (task->tk_ops->rpc_call_prepare != NULL)
1458                task->tk_action = rpc_prepare_task;
1459        return 1;
1460}
1461EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1462
1463/*
1464 * Restart an (async) RPC call. Usually called from within the
1465 * exit handler.
1466 */
1467int
1468rpc_restart_call(struct rpc_task *task)
1469{
1470        if (RPC_ASSASSINATED(task))
1471                return 0;
1472        task->tk_action = call_start;
1473        task->tk_status = 0;
1474        return 1;
1475}
1476EXPORT_SYMBOL_GPL(rpc_restart_call);
1477
1478const char
1479*rpc_proc_name(const struct rpc_task *task)
1480{
1481        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1482
1483        if (proc) {
1484                if (proc->p_name)
1485                        return proc->p_name;
1486                else
1487                        return "NULL";
1488        } else
1489                return "no proc";
1490}
1491
1492/*
1493 * 0.  Initial state
1494 *
1495 *     Other FSM states can be visited zero or more times, but
1496 *     this state is visited exactly once for each RPC.
1497 */
1498static void
1499call_start(struct rpc_task *task)
1500{
1501        struct rpc_clnt *clnt = task->tk_client;
1502        int idx = task->tk_msg.rpc_proc->p_statidx;
1503
1504        trace_rpc_request(task);
1505        dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1506                        clnt->cl_program->name, clnt->cl_vers,
1507                        rpc_proc_name(task),
1508                        (RPC_IS_ASYNC(task) ? "async" : "sync"));
1509
1510        /* Increment call count (version might not be valid for ping) */
1511        if (clnt->cl_program->version[clnt->cl_vers])
1512                clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1513        clnt->cl_stats->rpccnt++;
1514        task->tk_action = call_reserve;
1515}
1516
1517/*
1518 * 1.   Reserve an RPC call slot
1519 */
1520static void
1521call_reserve(struct rpc_task *task)
1522{
1523        dprint_status(task);
1524
1525        task->tk_status  = 0;
1526        task->tk_action  = call_reserveresult;
1527        xprt_reserve(task);
1528}
1529
1530static void call_retry_reserve(struct rpc_task *task);
1531
1532/*
1533 * 1b.  Grok the result of xprt_reserve()
1534 */
1535static void
1536call_reserveresult(struct rpc_task *task)
1537{
1538        int status = task->tk_status;
1539
1540        dprint_status(task);
1541
1542        /*
1543         * After a call to xprt_reserve(), we must have either
1544         * a request slot or else an error status.
1545         */
1546        task->tk_status = 0;
1547        if (status >= 0) {
1548                if (task->tk_rqstp) {
1549                        xprt_request_init(task);
1550                        task->tk_action = call_refresh;
1551                        return;
1552                }
1553
1554                printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1555                                __func__, status);
1556                rpc_exit(task, -EIO);
1557                return;
1558        }
1559
1560        /*
1561         * Even though there was an error, we may have acquired
1562         * a request slot somehow.  Make sure not to leak it.
1563         */
1564        if (task->tk_rqstp) {
1565                printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1566                                __func__, status);
1567                xprt_release(task);
1568        }
1569
1570        switch (status) {
1571        case -ENOMEM:
1572                rpc_delay(task, HZ >> 2);
1573                /* fall through */
1574        case -EAGAIN:   /* woken up; retry */
1575                task->tk_action = call_retry_reserve;
1576                return;
1577        case -EIO:      /* probably a shutdown */
1578                break;
1579        default:
1580                printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1581                                __func__, status);
1582                break;
1583        }
1584        rpc_exit(task, status);
1585}
1586
1587/*
1588 * 1c.  Retry reserving an RPC call slot
1589 */
1590static void
1591call_retry_reserve(struct rpc_task *task)
1592{
1593        dprint_status(task);
1594
1595        task->tk_status  = 0;
1596        task->tk_action  = call_reserveresult;
1597        xprt_retry_reserve(task);
1598}
1599
1600/*
1601 * 2.   Bind and/or refresh the credentials
1602 */
1603static void
1604call_refresh(struct rpc_task *task)
1605{
1606        dprint_status(task);
1607
1608        task->tk_action = call_refreshresult;
1609        task->tk_status = 0;
1610        task->tk_client->cl_stats->rpcauthrefresh++;
1611        rpcauth_refreshcred(task);
1612}
1613
1614/*
1615 * 2a.  Process the results of a credential refresh
1616 */
1617static void
1618call_refreshresult(struct rpc_task *task)
1619{
1620        int status = task->tk_status;
1621
1622        dprint_status(task);
1623
1624        task->tk_status = 0;
1625        task->tk_action = call_refresh;
1626        switch (status) {
1627        case 0:
1628                if (rpcauth_uptodatecred(task)) {
1629                        task->tk_action = call_allocate;
1630                        return;
1631                }
1632                /* Use rate-limiting and a max number of retries if refresh
1633                 * had status 0 but failed to update the cred.
1634                 */
1635                /* fall through */
1636        case -ETIMEDOUT:
1637                rpc_delay(task, 3*HZ);
1638                /* fall through */
1639        case -EAGAIN:
1640                status = -EACCES;
1641                /* fall through */
1642        case -EKEYEXPIRED:
1643                if (!task->tk_cred_retry)
1644                        break;
1645                task->tk_cred_retry--;
1646                dprintk("RPC: %5u %s: retry refresh creds\n",
1647                                task->tk_pid, __func__);
1648                return;
1649        }
1650        dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1651                                task->tk_pid, __func__, status);
1652        rpc_exit(task, status);
1653}
1654
1655/*
1656 * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1657 *      (Note: buffer memory is freed in xprt_release).
1658 */
1659static void
1660call_allocate(struct rpc_task *task)
1661{
1662        unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1663        struct rpc_rqst *req = task->tk_rqstp;
1664        struct rpc_xprt *xprt = req->rq_xprt;
1665        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1666        int status;
1667
1668        dprint_status(task);
1669
1670        task->tk_status = 0;
1671        task->tk_action = call_bind;
1672
1673        if (req->rq_buffer)
1674                return;
1675
1676        if (proc->p_proc != 0) {
1677                BUG_ON(proc->p_arglen == 0);
1678                if (proc->p_decode != NULL)
1679                        BUG_ON(proc->p_replen == 0);
1680        }
1681
1682        /*
1683         * Calculate the size (in quads) of the RPC call
1684         * and reply headers, and convert both values
1685         * to byte sizes.
1686         */
1687        req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1688        req->rq_callsize <<= 2;
1689        req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1690        req->rq_rcvsize <<= 2;
1691
1692        status = xprt->ops->buf_alloc(task);
1693        xprt_inject_disconnect(xprt);
1694        if (status == 0)
1695                return;
1696        if (status != -ENOMEM) {
1697                rpc_exit(task, status);
1698                return;
1699        }
1700
1701        dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1702
1703        if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1704                task->tk_action = call_allocate;
1705                rpc_delay(task, HZ>>4);
1706                return;
1707        }
1708
1709        rpc_exit(task, -ERESTARTSYS);
1710}
1711
1712static inline int
1713rpc_task_need_encode(struct rpc_task *task)
1714{
1715        return task->tk_rqstp->rq_snd_buf.len == 0;
1716}
1717
1718static inline void
1719rpc_task_force_reencode(struct rpc_task *task)
1720{
1721        task->tk_rqstp->rq_snd_buf.len = 0;
1722        task->tk_rqstp->rq_bytes_sent = 0;
1723}
1724
1725/*
1726 * 3.   Encode arguments of an RPC call
1727 */
1728static void
1729rpc_xdr_encode(struct rpc_task *task)
1730{
1731        struct rpc_rqst *req = task->tk_rqstp;
1732        kxdreproc_t     encode;
1733        __be32          *p;
1734
1735        dprint_status(task);
1736
1737        xdr_buf_init(&req->rq_snd_buf,
1738                     req->rq_buffer,
1739                     req->rq_callsize);
1740        xdr_buf_init(&req->rq_rcv_buf,
1741                     req->rq_rbuffer,
1742                     req->rq_rcvsize);
1743
1744        p = rpc_encode_header(task);
1745        if (p == NULL) {
1746                printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1747                rpc_exit(task, -EIO);
1748                return;
1749        }
1750
1751        encode = task->tk_msg.rpc_proc->p_encode;
1752        if (encode == NULL)
1753                return;
1754
1755        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1756                        task->tk_msg.rpc_argp);
1757}
1758
1759/*
1760 * 4.   Get the server port number if not yet set
1761 */
1762static void
1763call_bind(struct rpc_task *task)
1764{
1765        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1766
1767        dprint_status(task);
1768
1769        task->tk_action = call_connect;
1770        if (!xprt_bound(xprt)) {
1771                task->tk_action = call_bind_status;
1772                task->tk_timeout = xprt->bind_timeout;
1773                xprt->ops->rpcbind(task);
1774        }
1775}
1776
1777/*
1778 * 4a.  Sort out bind result
1779 */
1780static void
1781call_bind_status(struct rpc_task *task)
1782{
1783        int status = -EIO;
1784
1785        if (task->tk_status >= 0) {
1786                dprint_status(task);
1787                task->tk_status = 0;
1788                task->tk_action = call_connect;
1789                return;
1790        }
1791
1792        trace_rpc_bind_status(task);
1793        switch (task->tk_status) {
1794        case -ENOMEM:
1795                dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1796                rpc_delay(task, HZ >> 2);
1797                goto retry_timeout;
1798        case -EACCES:
1799                dprintk("RPC: %5u remote rpcbind: RPC program/version "
1800                                "unavailable\n", task->tk_pid);
1801                /* fail immediately if this is an RPC ping */
1802                if (task->tk_msg.rpc_proc->p_proc == 0) {
1803                        status = -EOPNOTSUPP;
1804                        break;
1805                }
1806                if (task->tk_rebind_retry == 0)
1807                        break;
1808                task->tk_rebind_retry--;
1809                rpc_delay(task, 3*HZ);
1810                goto retry_timeout;
1811        case -ETIMEDOUT:
1812                dprintk("RPC: %5u rpcbind request timed out\n",
1813                                task->tk_pid);
1814                goto retry_timeout;
1815        case -EPFNOSUPPORT:
1816                /* server doesn't support any rpcbind version we know of */
1817                dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1818                                task->tk_pid);
1819                break;
1820        case -EPROTONOSUPPORT:
1821                dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1822                                task->tk_pid);
1823                goto retry_timeout;
1824        case -ECONNREFUSED:             /* connection problems */
1825        case -ECONNRESET:
1826        case -ECONNABORTED:
1827        case -ENOTCONN:
1828        case -EHOSTDOWN:
1829        case -ENETDOWN:
1830        case -EHOSTUNREACH:
1831        case -ENETUNREACH:
1832        case -ENOBUFS:
1833        case -EPIPE:
1834                dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1835                                task->tk_pid, task->tk_status);
1836                if (!RPC_IS_SOFTCONN(task)) {
1837                        rpc_delay(task, 5*HZ);
1838                        goto retry_timeout;
1839                }
1840                status = task->tk_status;
1841                break;
1842        default:
1843                dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1844                                task->tk_pid, -task->tk_status);
1845        }
1846
1847        rpc_exit(task, status);
1848        return;
1849
1850retry_timeout:
1851        task->tk_status = 0;
1852        task->tk_action = call_timeout;
1853}
1854
1855/*
1856 * 4b.  Connect to the RPC server
1857 */
1858static void
1859call_connect(struct rpc_task *task)
1860{
1861        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1862
1863        dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1864                        task->tk_pid, xprt,
1865                        (xprt_connected(xprt) ? "is" : "is not"));
1866
1867        task->tk_action = call_transmit;
1868        if (!xprt_connected(xprt)) {
1869                task->tk_action = call_connect_status;
1870                if (task->tk_status < 0)
1871                        return;
1872                if (task->tk_flags & RPC_TASK_NOCONNECT) {
1873                        rpc_exit(task, -ENOTCONN);
1874                        return;
1875                }
1876                xprt_connect(task);
1877        }
1878}
1879
1880/*
1881 * 4c.  Sort out connect result
1882 */
1883static void
1884call_connect_status(struct rpc_task *task)
1885{
1886        struct rpc_clnt *clnt = task->tk_client;
1887        int status = task->tk_status;
1888
1889        dprint_status(task);
1890
1891        trace_rpc_connect_status(task);
1892        task->tk_status = 0;
1893        switch (status) {
1894        case -ECONNREFUSED:
1895                /* A positive refusal suggests a rebind is needed. */
1896                if (RPC_IS_SOFTCONN(task))
1897                        break;
1898                if (clnt->cl_autobind) {
1899                        rpc_force_rebind(clnt);
1900                        task->tk_action = call_bind;
1901                        return;
1902                }
1903                /* fall through */
1904        case -ECONNRESET:
1905        case -ECONNABORTED:
1906        case -ENETDOWN:
1907        case -ENETUNREACH:
1908        case -EHOSTUNREACH:
1909        case -EADDRINUSE:
1910        case -ENOBUFS:
1911        case -EPIPE:
1912                xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
1913                                            task->tk_rqstp->rq_connect_cookie);
1914                if (RPC_IS_SOFTCONN(task))
1915                        break;
1916                /* retry with existing socket, after a delay */
1917                rpc_delay(task, 3*HZ);
1918                /* fall through */
1919        case -EAGAIN:
1920                /* Check for timeouts before looping back to call_bind */
1921        case -ETIMEDOUT:
1922                task->tk_action = call_timeout;
1923                return;
1924        case 0:
1925                clnt->cl_stats->netreconn++;
1926                task->tk_action = call_transmit;
1927                return;
1928        }
1929        rpc_exit(task, status);
1930}
1931
1932/*
1933 * 5.   Transmit the RPC request, and wait for reply
1934 */
1935static void
1936call_transmit(struct rpc_task *task)
1937{
1938        int is_retrans = RPC_WAS_SENT(task);
1939
1940        dprint_status(task);
1941
1942        task->tk_action = call_status;
1943        if (task->tk_status < 0)
1944                return;
1945        if (!xprt_prepare_transmit(task))
1946                return;
1947        task->tk_action = call_transmit_status;
1948        /* Encode here so that rpcsec_gss can use correct sequence number. */
1949        if (rpc_task_need_encode(task)) {
1950                rpc_xdr_encode(task);
1951                /* Did the encode result in an error condition? */
1952                if (task->tk_status != 0) {
1953                        /* Was the error nonfatal? */
1954                        if (task->tk_status == -EAGAIN)
1955                                rpc_delay(task, HZ >> 4);
1956                        else
1957                                rpc_exit(task, task->tk_status);
1958                        return;
1959                }
1960        }
1961        xprt_transmit(task);
1962        if (task->tk_status < 0)
1963                return;
1964        if (is_retrans)
1965                task->tk_client->cl_stats->rpcretrans++;
1966        /*
1967         * On success, ensure that we call xprt_end_transmit() before sleeping
1968         * in order to allow access to the socket to other RPC requests.
1969         */
1970        call_transmit_status(task);
1971        if (rpc_reply_expected(task))
1972                return;
1973        task->tk_action = rpc_exit_task;
1974        rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1975}
1976
1977/*
1978 * 5a.  Handle cleanup after a transmission
1979 */
1980static void
1981call_transmit_status(struct rpc_task *task)
1982{
1983        task->tk_action = call_status;
1984
1985        /*
1986         * Common case: success.  Force the compiler to put this
1987         * test first.
1988         */
1989        if (task->tk_status == 0) {
1990                xprt_end_transmit(task);
1991                rpc_task_force_reencode(task);
1992                return;
1993        }
1994
1995        switch (task->tk_status) {
1996        case -EAGAIN:
1997        case -ENOBUFS:
1998                break;
1999        default:
2000                dprint_status(task);
2001                xprt_end_transmit(task);
2002                rpc_task_force_reencode(task);
2003                break;
2004                /*
2005                 * Special cases: if we've been waiting on the
2006                 * socket's write_space() callback, or if the
2007                 * socket just returned a connection error,
2008                 * then hold onto the transport lock.
2009                 */
2010        case -ECONNREFUSED:
2011        case -EHOSTDOWN:
2012        case -ENETDOWN:
2013        case -EHOSTUNREACH:
2014        case -ENETUNREACH:
2015        case -EPERM:
2016                if (RPC_IS_SOFTCONN(task)) {
2017                        xprt_end_transmit(task);
2018                        if (!task->tk_msg.rpc_proc->p_proc)
2019                                trace_xprt_ping(task->tk_xprt,
2020                                                task->tk_status);
2021                        rpc_exit(task, task->tk_status);
2022                        break;
2023                }
2024                /* fall through */
2025        case -ECONNRESET:
2026        case -ECONNABORTED:
2027        case -EADDRINUSE:
2028        case -ENOTCONN:
2029        case -EPIPE:
2030                rpc_task_force_reencode(task);
2031        }
2032}
2033
2034#if defined(CONFIG_SUNRPC_BACKCHANNEL)
2035/*
2036 * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2037 * addition, disconnect on connectivity errors.
2038 */
2039static void
2040call_bc_transmit(struct rpc_task *task)
2041{
2042        struct rpc_rqst *req = task->tk_rqstp;
2043
2044        if (!xprt_prepare_transmit(task))
2045                goto out_retry;
2046
2047        if (task->tk_status < 0) {
2048                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2049                        "error: %d\n", task->tk_status);
2050                goto out_done;
2051        }
2052        if (req->rq_connect_cookie != req->rq_xprt->connect_cookie)
2053                req->rq_bytes_sent = 0;
2054
2055        xprt_transmit(task);
2056
2057        if (task->tk_status == -EAGAIN)
2058                goto out_nospace;
2059
2060        xprt_end_transmit(task);
2061        dprint_status(task);
2062        switch (task->tk_status) {
2063        case 0:
2064                /* Success */
2065        case -ENETDOWN:
2066        case -EHOSTDOWN:
2067        case -EHOSTUNREACH:
2068        case -ENETUNREACH:
2069        case -ECONNRESET:
2070        case -ECONNREFUSED:
2071        case -EADDRINUSE:
2072        case -ENOTCONN:
2073        case -EPIPE:
2074                break;
2075        case -ETIMEDOUT:
2076                /*
2077                 * Problem reaching the server.  Disconnect and let the
2078                 * forechannel reestablish the connection.  The server will
2079                 * have to retransmit the backchannel request and we'll
2080                 * reprocess it.  Since these ops are idempotent, there's no
2081                 * need to cache our reply at this time.
2082                 */
2083                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2084                        "error: %d\n", task->tk_status);
2085                xprt_conditional_disconnect(req->rq_xprt,
2086                        req->rq_connect_cookie);
2087                break;
2088        default:
2089                /*
2090                 * We were unable to reply and will have to drop the
2091                 * request.  The server should reconnect and retransmit.
2092                 */
2093                WARN_ON_ONCE(task->tk_status == -EAGAIN);
2094                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2095                        "error: %d\n", task->tk_status);
2096                break;
2097        }
2098        rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
2099out_done:
2100        task->tk_action = rpc_exit_task;
2101        return;
2102out_nospace:
2103        req->rq_connect_cookie = req->rq_xprt->connect_cookie;
2104out_retry:
2105        task->tk_status = 0;
2106}
2107#endif /* CONFIG_SUNRPC_BACKCHANNEL */
2108
2109/*
2110 * 6.   Sort out the RPC call status
2111 */
2112static void
2113call_status(struct rpc_task *task)
2114{
2115        struct rpc_clnt *clnt = task->tk_client;
2116        struct rpc_rqst *req = task->tk_rqstp;
2117        int             status;
2118
2119        if (!task->tk_msg.rpc_proc->p_proc)
2120                trace_xprt_ping(task->tk_xprt, task->tk_status);
2121
2122        if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
2123                task->tk_status = req->rq_reply_bytes_recvd;
2124
2125        dprint_status(task);
2126
2127        status = task->tk_status;
2128        if (status >= 0) {
2129                task->tk_action = call_decode;
2130                return;
2131        }
2132
2133        trace_rpc_call_status(task);
2134        task->tk_status = 0;
2135        switch(status) {
2136        case -EHOSTDOWN:
2137        case -ENETDOWN:
2138        case -EHOSTUNREACH:
2139        case -ENETUNREACH:
2140        case -EPERM:
2141                if (RPC_IS_SOFTCONN(task)) {
2142                        rpc_exit(task, status);
2143                        break;
2144                }
2145                /*
2146                 * Delay any retries for 3 seconds, then handle as if it
2147                 * were a timeout.
2148                 */
2149                rpc_delay(task, 3*HZ);
2150                /* fall through */
2151        case -ETIMEDOUT:
2152                task->tk_action = call_timeout;
2153                break;
2154        case -ECONNREFUSED:
2155        case -ECONNRESET:
2156        case -ECONNABORTED:
2157                rpc_force_rebind(clnt);
2158                /* fall through */
2159        case -EADDRINUSE:
2160                rpc_delay(task, 3*HZ);
2161                /* fall through */
2162        case -EPIPE:
2163        case -ENOTCONN:
2164                task->tk_action = call_bind;
2165                break;
2166        case -ENOBUFS:
2167                rpc_delay(task, HZ>>2);
2168                /* fall through */
2169        case -EAGAIN:
2170                task->tk_action = call_transmit;
2171                break;
2172        case -EIO:
2173                /* shutdown or soft timeout */
2174                rpc_exit(task, status);
2175                break;
2176        default:
2177                if (clnt->cl_chatty)
2178                        printk("%s: RPC call returned error %d\n",
2179                               clnt->cl_program->name, -status);
2180                rpc_exit(task, status);
2181        }
2182}
2183
2184/*
2185 * 6a.  Handle RPC timeout
2186 *      We do not release the request slot, so we keep using the
2187 *      same XID for all retransmits.
2188 */
2189static void
2190call_timeout(struct rpc_task *task)
2191{
2192        struct rpc_clnt *clnt = task->tk_client;
2193
2194        if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
2195                dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
2196                goto retry;
2197        }
2198
2199        dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
2200        task->tk_timeouts++;
2201
2202        if (RPC_IS_SOFTCONN(task)) {
2203                rpc_exit(task, -ETIMEDOUT);
2204                return;
2205        }
2206        if (RPC_IS_SOFT(task)) {
2207                if (clnt->cl_chatty) {
2208                        printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
2209                                clnt->cl_program->name,
2210                                task->tk_xprt->servername);
2211                }
2212                if (task->tk_flags & RPC_TASK_TIMEOUT)
2213                        rpc_exit(task, -ETIMEDOUT);
2214                else
2215                        rpc_exit(task, -EIO);
2216                return;
2217        }
2218
2219        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2220                task->tk_flags |= RPC_CALL_MAJORSEEN;
2221                if (clnt->cl_chatty) {
2222                        printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
2223                        clnt->cl_program->name,
2224                        task->tk_xprt->servername);
2225                }
2226        }
2227        rpc_force_rebind(clnt);
2228        /*
2229         * Did our request time out due to an RPCSEC_GSS out-of-sequence
2230         * event? RFC2203 requires the server to drop all such requests.
2231         */
2232        rpcauth_invalcred(task);
2233
2234retry:
2235        task->tk_action = call_bind;
2236        task->tk_status = 0;
2237}
2238
2239/*
2240 * 7.   Decode the RPC reply
2241 */
2242static void
2243call_decode(struct rpc_task *task)
2244{
2245        struct rpc_clnt *clnt = task->tk_client;
2246        struct rpc_rqst *req = task->tk_rqstp;
2247        kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
2248        __be32          *p;
2249
2250        dprint_status(task);
2251
2252        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2253                if (clnt->cl_chatty) {
2254                        printk(KERN_NOTICE "%s: server %s OK\n",
2255                                clnt->cl_program->name,
2256                                task->tk_xprt->servername);
2257                }
2258                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2259        }
2260
2261        /*
2262         * Ensure that we see all writes made by xprt_complete_rqst()
2263         * before it changed req->rq_reply_bytes_recvd.
2264         */
2265        smp_rmb();
2266        req->rq_rcv_buf.len = req->rq_private_buf.len;
2267
2268        /* Check that the softirq receive buffer is valid */
2269        WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2270                                sizeof(req->rq_rcv_buf)) != 0);
2271
2272        if (req->rq_rcv_buf.len < 12) {
2273                if (!RPC_IS_SOFT(task)) {
2274                        task->tk_action = call_bind;
2275                        goto out_retry;
2276                }
2277                dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2278                                clnt->cl_program->name, task->tk_status);
2279                task->tk_action = call_timeout;
2280                goto out_retry;
2281        }
2282
2283        p = rpc_verify_header(task);
2284        if (IS_ERR(p)) {
2285                if (p == ERR_PTR(-EAGAIN))
2286                        goto out_retry;
2287                return;
2288        }
2289
2290        task->tk_action = rpc_exit_task;
2291
2292        if (decode) {
2293                task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2294                                                      task->tk_msg.rpc_resp);
2295        }
2296        dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2297                        task->tk_status);
2298        return;
2299out_retry:
2300        task->tk_status = 0;
2301        /* Note: rpc_verify_header() may have freed the RPC slot */
2302        if (task->tk_rqstp == req) {
2303                req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2304                if (task->tk_client->cl_discrtry)
2305                        xprt_conditional_disconnect(req->rq_xprt,
2306                                        req->rq_connect_cookie);
2307        }
2308}
2309
2310static __be32 *
2311rpc_encode_header(struct rpc_task *task)
2312{
2313        struct rpc_clnt *clnt = task->tk_client;
2314        struct rpc_rqst *req = task->tk_rqstp;
2315        __be32          *p = req->rq_svec[0].iov_base;
2316
2317        /* FIXME: check buffer size? */
2318
2319        p = xprt_skip_transport_header(req->rq_xprt, p);
2320        *p++ = req->rq_xid;             /* XID */
2321        *p++ = htonl(RPC_CALL);         /* CALL */
2322        *p++ = htonl(RPC_VERSION);      /* RPC version */
2323        *p++ = htonl(clnt->cl_prog);    /* program number */
2324        *p++ = htonl(clnt->cl_vers);    /* program version */
2325        *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2326        p = rpcauth_marshcred(task, p);
2327        req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2328        return p;
2329}
2330
2331static __be32 *
2332rpc_verify_header(struct rpc_task *task)
2333{
2334        struct rpc_clnt *clnt = task->tk_client;
2335        struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2336        int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2337        __be32  *p = iov->iov_base;
2338        u32 n;
2339        int error = -EACCES;
2340
2341        if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2342                /* RFC-1014 says that the representation of XDR data must be a
2343                 * multiple of four bytes
2344                 * - if it isn't pointer subtraction in the NFS client may give
2345                 *   undefined results
2346                 */
2347                dprintk("RPC: %5u %s: XDR representation not a multiple of"
2348                       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2349                       task->tk_rqstp->rq_rcv_buf.len);
2350                error = -EIO;
2351                goto out_err;
2352        }
2353        if ((len -= 3) < 0)
2354                goto out_overflow;
2355
2356        p += 1; /* skip XID */
2357        if ((n = ntohl(*p++)) != RPC_REPLY) {
2358                dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2359                        task->tk_pid, __func__, n);
2360                error = -EIO;
2361                goto out_garbage;
2362        }
2363
2364        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2365                if (--len < 0)
2366                        goto out_overflow;
2367                switch ((n = ntohl(*p++))) {
2368                case RPC_AUTH_ERROR:
2369                        break;
2370                case RPC_MISMATCH:
2371                        dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2372                                task->tk_pid, __func__);
2373                        error = -EPROTONOSUPPORT;
2374                        goto out_err;
2375                default:
2376                        dprintk("RPC: %5u %s: RPC call rejected, "
2377                                "unknown error: %x\n",
2378                                task->tk_pid, __func__, n);
2379                        error = -EIO;
2380                        goto out_err;
2381                }
2382                if (--len < 0)
2383                        goto out_overflow;
2384                switch ((n = ntohl(*p++))) {
2385                case RPC_AUTH_REJECTEDCRED:
2386                case RPC_AUTH_REJECTEDVERF:
2387                case RPCSEC_GSS_CREDPROBLEM:
2388                case RPCSEC_GSS_CTXPROBLEM:
2389                        if (!task->tk_cred_retry)
2390                                break;
2391                        task->tk_cred_retry--;
2392                        dprintk("RPC: %5u %s: retry stale creds\n",
2393                                        task->tk_pid, __func__);
2394                        rpcauth_invalcred(task);
2395                        /* Ensure we obtain a new XID! */
2396                        xprt_release(task);
2397                        task->tk_action = call_reserve;
2398                        goto out_retry;
2399                case RPC_AUTH_BADCRED:
2400                case RPC_AUTH_BADVERF:
2401                        /* possibly garbled cred/verf? */
2402                        if (!task->tk_garb_retry)
2403                                break;
2404                        task->tk_garb_retry--;
2405                        dprintk("RPC: %5u %s: retry garbled creds\n",
2406                                        task->tk_pid, __func__);
2407                        task->tk_action = call_bind;
2408                        goto out_retry;
2409                case RPC_AUTH_TOOWEAK:
2410                        printk(KERN_NOTICE "RPC: server %s requires stronger "
2411                               "authentication.\n",
2412                               task->tk_xprt->servername);
2413                        break;
2414                default:
2415                        dprintk("RPC: %5u %s: unknown auth error: %x\n",
2416                                        task->tk_pid, __func__, n);
2417                        error = -EIO;
2418                }
2419                dprintk("RPC: %5u %s: call rejected %d\n",
2420                                task->tk_pid, __func__, n);
2421                goto out_err;
2422        }
2423        p = rpcauth_checkverf(task, p);
2424        if (IS_ERR(p)) {
2425                error = PTR_ERR(p);
2426                dprintk("RPC: %5u %s: auth check failed with %d\n",
2427                                task->tk_pid, __func__, error);
2428                goto out_garbage;               /* bad verifier, retry */
2429        }
2430        len = p - (__be32 *)iov->iov_base - 1;
2431        if (len < 0)
2432                goto out_overflow;
2433        switch ((n = ntohl(*p++))) {
2434        case RPC_SUCCESS:
2435                return p;
2436        case RPC_PROG_UNAVAIL:
2437                dprintk("RPC: %5u %s: program %u is unsupported "
2438                                "by server %s\n", task->tk_pid, __func__,
2439                                (unsigned int)clnt->cl_prog,
2440                                task->tk_xprt->servername);
2441                error = -EPFNOSUPPORT;
2442                goto out_err;
2443        case RPC_PROG_MISMATCH:
2444                dprintk("RPC: %5u %s: program %u, version %u unsupported "
2445                                "by server %s\n", task->tk_pid, __func__,
2446                                (unsigned int)clnt->cl_prog,
2447                                (unsigned int)clnt->cl_vers,
2448                                task->tk_xprt->servername);
2449                error = -EPROTONOSUPPORT;
2450                goto out_err;
2451        case RPC_PROC_UNAVAIL:
2452                dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
2453                                "version %u on server %s\n",
2454                                task->tk_pid, __func__,
2455                                rpc_proc_name(task),
2456                                clnt->cl_prog, clnt->cl_vers,
2457                                task->tk_xprt->servername);
2458                error = -EOPNOTSUPP;
2459                goto out_err;
2460        case RPC_GARBAGE_ARGS:
2461                dprintk("RPC: %5u %s: server saw garbage\n",
2462                                task->tk_pid, __func__);
2463                break;                  /* retry */
2464        default:
2465                dprintk("RPC: %5u %s: server accept status: %x\n",
2466                                task->tk_pid, __func__, n);
2467                /* Also retry */
2468        }
2469
2470out_garbage:
2471        clnt->cl_stats->rpcgarbage++;
2472        if (task->tk_garb_retry) {
2473                task->tk_garb_retry--;
2474                dprintk("RPC: %5u %s: retrying\n",
2475                                task->tk_pid, __func__);
2476                task->tk_action = call_bind;
2477out_retry:
2478                return ERR_PTR(-EAGAIN);
2479        }
2480out_err:
2481        rpc_exit(task, error);
2482        dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2483                        __func__, error);
2484        return ERR_PTR(error);
2485out_overflow:
2486        dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2487                        __func__);
2488        goto out_garbage;
2489}
2490
2491static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2492                const void *obj)
2493{
2494}
2495
2496static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2497                void *obj)
2498{
2499        return 0;
2500}
2501
2502static const struct rpc_procinfo rpcproc_null = {
2503        .p_encode = rpcproc_encode_null,
2504        .p_decode = rpcproc_decode_null,
2505};
2506
2507static int rpc_ping(struct rpc_clnt *clnt)
2508{
2509        struct rpc_message msg = {
2510                .rpc_proc = &rpcproc_null,
2511        };
2512        int err;
2513        msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2514        err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2515        put_rpccred(msg.rpc_cred);
2516        return err;
2517}
2518
2519static
2520struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2521                struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2522                const struct rpc_call_ops *ops, void *data)
2523{
2524        struct rpc_message msg = {
2525                .rpc_proc = &rpcproc_null,
2526                .rpc_cred = cred,
2527        };
2528        struct rpc_task_setup task_setup_data = {
2529                .rpc_client = clnt,
2530                .rpc_xprt = xprt,
2531                .rpc_message = &msg,
2532                .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
2533                .callback_data = data,
2534                .flags = flags,
2535        };
2536
2537        return rpc_run_task(&task_setup_data);
2538}
2539
2540struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2541{
2542        return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2543}
2544EXPORT_SYMBOL_GPL(rpc_call_null);
2545
2546struct rpc_cb_add_xprt_calldata {
2547        struct rpc_xprt_switch *xps;
2548        struct rpc_xprt *xprt;
2549};
2550
2551static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2552{
2553        struct rpc_cb_add_xprt_calldata *data = calldata;
2554
2555        if (task->tk_status == 0)
2556                rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2557}
2558
2559static void rpc_cb_add_xprt_release(void *calldata)
2560{
2561        struct rpc_cb_add_xprt_calldata *data = calldata;
2562
2563        xprt_put(data->xprt);
2564        xprt_switch_put(data->xps);
2565        kfree(data);
2566}
2567
2568static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2569        .rpc_call_done = rpc_cb_add_xprt_done,
2570        .rpc_release = rpc_cb_add_xprt_release,
2571};
2572
2573/**
2574 * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2575 * @clnt: pointer to struct rpc_clnt
2576 * @xps: pointer to struct rpc_xprt_switch,
2577 * @xprt: pointer struct rpc_xprt
2578 * @dummy: unused
2579 */
2580int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2581                struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2582                void *dummy)
2583{
2584        struct rpc_cb_add_xprt_calldata *data;
2585        struct rpc_cred *cred;
2586        struct rpc_task *task;
2587
2588        data = kmalloc(sizeof(*data), GFP_NOFS);
2589        if (!data)
2590                return -ENOMEM;
2591        data->xps = xprt_switch_get(xps);
2592        data->xprt = xprt_get(xprt);
2593
2594        cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2595        task = rpc_call_null_helper(clnt, xprt, cred,
2596                        RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC,
2597                        &rpc_cb_add_xprt_call_ops, data);
2598        put_rpccred(cred);
2599        if (IS_ERR(task))
2600                return PTR_ERR(task);
2601        rpc_put_task(task);
2602        return 1;
2603}
2604EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2605
2606/**
2607 * rpc_clnt_setup_test_and_add_xprt()
2608 *
2609 * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2610 *   1) caller of the test function must dereference the rpc_xprt_switch
2611 *   and the rpc_xprt.
2612 *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2613 *   the rpc_call_done routine.
2614 *
2615 * Upon success (return of 1), the test function adds the new
2616 * transport to the rpc_clnt xprt switch
2617 *
2618 * @clnt: struct rpc_clnt to get the new transport
2619 * @xps:  the rpc_xprt_switch to hold the new transport
2620 * @xprt: the rpc_xprt to test
2621 * @data: a struct rpc_add_xprt_test pointer that holds the test function
2622 *        and test function call data
2623 */
2624int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2625                                     struct rpc_xprt_switch *xps,
2626                                     struct rpc_xprt *xprt,
2627                                     void *data)
2628{
2629        struct rpc_cred *cred;
2630        struct rpc_task *task;
2631        struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2632        int status = -EADDRINUSE;
2633
2634        xprt = xprt_get(xprt);
2635        xprt_switch_get(xps);
2636
2637        if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2638                goto out_err;
2639
2640        /* Test the connection */
2641        cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2642        task = rpc_call_null_helper(clnt, xprt, cred,
2643                                    RPC_TASK_SOFT | RPC_TASK_SOFTCONN,
2644                                    NULL, NULL);
2645        put_rpccred(cred);
2646        if (IS_ERR(task)) {
2647                status = PTR_ERR(task);
2648                goto out_err;
2649        }
2650        status = task->tk_status;
2651        rpc_put_task(task);
2652
2653        if (status < 0)
2654                goto out_err;
2655
2656        /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2657        xtest->add_xprt_test(clnt, xprt, xtest->data);
2658
2659        /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2660        return 1;
2661out_err:
2662        xprt_put(xprt);
2663        xprt_switch_put(xps);
2664        pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2665                status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2666        return status;
2667}
2668EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2669
2670/**
2671 * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2672 * @clnt: pointer to struct rpc_clnt
2673 * @xprtargs: pointer to struct xprt_create
2674 * @setup: callback to test and/or set up the connection
2675 * @data: pointer to setup function data
2676 *
2677 * Creates a new transport using the parameters set in args and
2678 * adds it to clnt.
2679 * If ping is set, then test that connectivity succeeds before
2680 * adding the new transport.
2681 *
2682 */
2683int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2684                struct xprt_create *xprtargs,
2685                int (*setup)(struct rpc_clnt *,
2686                        struct rpc_xprt_switch *,
2687                        struct rpc_xprt *,
2688                        void *),
2689                void *data)
2690{
2691        struct rpc_xprt_switch *xps;
2692        struct rpc_xprt *xprt;
2693        unsigned long connect_timeout;
2694        unsigned long reconnect_timeout;
2695        unsigned char resvport;
2696        int ret = 0;
2697
2698        rcu_read_lock();
2699        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2700        xprt = xprt_iter_xprt(&clnt->cl_xpi);
2701        if (xps == NULL || xprt == NULL) {
2702                rcu_read_unlock();
2703                return -EAGAIN;
2704        }
2705        resvport = xprt->resvport;
2706        connect_timeout = xprt->connect_timeout;
2707        reconnect_timeout = xprt->max_reconnect_timeout;
2708        rcu_read_unlock();
2709
2710        xprt = xprt_create_transport(xprtargs);
2711        if (IS_ERR(xprt)) {
2712                ret = PTR_ERR(xprt);
2713                goto out_put_switch;
2714        }
2715        xprt->resvport = resvport;
2716        if (xprt->ops->set_connect_timeout != NULL)
2717                xprt->ops->set_connect_timeout(xprt,
2718                                connect_timeout,
2719                                reconnect_timeout);
2720
2721        rpc_xprt_switch_set_roundrobin(xps);
2722        if (setup) {
2723                ret = setup(clnt, xps, xprt, data);
2724                if (ret != 0)
2725                        goto out_put_xprt;
2726        }
2727        rpc_xprt_switch_add_xprt(xps, xprt);
2728out_put_xprt:
2729        xprt_put(xprt);
2730out_put_switch:
2731        xprt_switch_put(xps);
2732        return ret;
2733}
2734EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2735
2736struct connect_timeout_data {
2737        unsigned long connect_timeout;
2738        unsigned long reconnect_timeout;
2739};
2740
2741static int
2742rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2743                struct rpc_xprt *xprt,
2744                void *data)
2745{
2746        struct connect_timeout_data *timeo = data;
2747
2748        if (xprt->ops->set_connect_timeout)
2749                xprt->ops->set_connect_timeout(xprt,
2750                                timeo->connect_timeout,
2751                                timeo->reconnect_timeout);
2752        return 0;
2753}
2754
2755void
2756rpc_set_connect_timeout(struct rpc_clnt *clnt,
2757                unsigned long connect_timeout,
2758                unsigned long reconnect_timeout)
2759{
2760        struct connect_timeout_data timeout = {
2761                .connect_timeout = connect_timeout,
2762                .reconnect_timeout = reconnect_timeout,
2763        };
2764        rpc_clnt_iterate_for_each_xprt(clnt,
2765                        rpc_xprt_set_connect_timeout,
2766                        &timeout);
2767}
2768EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2769
2770void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2771{
2772        rcu_read_lock();
2773        xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2774        rcu_read_unlock();
2775}
2776EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
2777
2778void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
2779{
2780        rcu_read_lock();
2781        rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
2782                                 xprt);
2783        rcu_read_unlock();
2784}
2785EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
2786
2787bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
2788                                   const struct sockaddr *sap)
2789{
2790        struct rpc_xprt_switch *xps;
2791        bool ret;
2792
2793        rcu_read_lock();
2794        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
2795        ret = rpc_xprt_switch_has_addr(xps, sap);
2796        rcu_read_unlock();
2797        return ret;
2798}
2799EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
2800
2801#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2802static void rpc_show_header(void)
2803{
2804        printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2805                "-timeout ---ops--\n");
2806}
2807
2808static void rpc_show_task(const struct rpc_clnt *clnt,
2809                          const struct rpc_task *task)
2810{
2811        const char *rpc_waitq = "none";
2812
2813        if (RPC_IS_QUEUED(task))
2814                rpc_waitq = rpc_qname(task->tk_waitqueue);
2815
2816        printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2817                task->tk_pid, task->tk_flags, task->tk_status,
2818                clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2819                clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2820                task->tk_action, rpc_waitq);
2821}
2822
2823void rpc_show_tasks(struct net *net)
2824{
2825        struct rpc_clnt *clnt;
2826        struct rpc_task *task;
2827        int header = 0;
2828        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2829
2830        spin_lock(&sn->rpc_client_lock);
2831        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2832                spin_lock(&clnt->cl_lock);
2833                list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2834                        if (!header) {
2835                                rpc_show_header();
2836                                header++;
2837                        }
2838                        rpc_show_task(clnt, task);
2839                }
2840                spin_unlock(&clnt->cl_lock);
2841        }
2842        spin_unlock(&sn->rpc_client_lock);
2843}
2844#endif
2845
2846#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
2847static int
2848rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
2849                struct rpc_xprt *xprt,
2850                void *dummy)
2851{
2852        return xprt_enable_swap(xprt);
2853}
2854
2855int
2856rpc_clnt_swap_activate(struct rpc_clnt *clnt)
2857{
2858        if (atomic_inc_return(&clnt->cl_swapper) == 1)
2859                return rpc_clnt_iterate_for_each_xprt(clnt,
2860                                rpc_clnt_swap_activate_callback, NULL);
2861        return 0;
2862}
2863EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
2864
2865static int
2866rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
2867                struct rpc_xprt *xprt,
2868                void *dummy)
2869{
2870        xprt_disable_swap(xprt);
2871        return 0;
2872}
2873
2874void
2875rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
2876{
2877        if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
2878                rpc_clnt_iterate_for_each_xprt(clnt,
2879                                rpc_clnt_swap_deactivate_callback, NULL);
2880}
2881EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
2882#endif /* CONFIG_SUNRPC_SWAP */
2883