linux/net/sunrpc/clnt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/clnt.c
   3 *
   4 *  This file contains the high-level RPC interface.
   5 *  It is modeled as a finite state machine to support both synchronous
   6 *  and asynchronous requests.
   7 *
   8 *  -   RPC header generation and argument serialization.
   9 *  -   Credential refresh.
  10 *  -   TCP connect handling.
  11 *  -   Retry of operation when it is suspected the operation failed because
  12 *      of uid squashing on the server, or when the credentials were stale
  13 *      and need to be refreshed, or when a packet was damaged in transit.
  14 *      This may be have to be moved to the VFS layer.
  15 *
  16 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
  17 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
  18 */
  19
  20
  21#include <linux/module.h>
  22#include <linux/types.h>
  23#include <linux/kallsyms.h>
  24#include <linux/mm.h>
  25#include <linux/namei.h>
  26#include <linux/mount.h>
  27#include <linux/slab.h>
  28#include <linux/rcupdate.h>
  29#include <linux/utsname.h>
  30#include <linux/workqueue.h>
  31#include <linux/in.h>
  32#include <linux/in6.h>
  33#include <linux/un.h>
  34
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/addr.h>
  37#include <linux/sunrpc/rpc_pipe_fs.h>
  38#include <linux/sunrpc/metrics.h>
  39#include <linux/sunrpc/bc_xprt.h>
  40#include <trace/events/sunrpc.h>
  41
  42#include "sunrpc.h"
  43#include "netns.h"
  44
  45#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  46# define RPCDBG_FACILITY        RPCDBG_CALL
  47#endif
  48
  49#define dprint_status(t)                                        \
  50        dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
  51                        __func__, t->tk_status)
  52
  53/*
  54 * All RPC clients are linked into this list
  55 */
  56
  57static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
  58
  59
  60static void     call_start(struct rpc_task *task);
  61static void     call_reserve(struct rpc_task *task);
  62static void     call_reserveresult(struct rpc_task *task);
  63static void     call_allocate(struct rpc_task *task);
  64static void     call_decode(struct rpc_task *task);
  65static void     call_bind(struct rpc_task *task);
  66static void     call_bind_status(struct rpc_task *task);
  67static void     call_transmit(struct rpc_task *task);
  68#if defined(CONFIG_SUNRPC_BACKCHANNEL)
  69static void     call_bc_transmit(struct rpc_task *task);
  70#endif /* CONFIG_SUNRPC_BACKCHANNEL */
  71static void     call_status(struct rpc_task *task);
  72static void     call_transmit_status(struct rpc_task *task);
  73static void     call_refresh(struct rpc_task *task);
  74static void     call_refreshresult(struct rpc_task *task);
  75static void     call_timeout(struct rpc_task *task);
  76static void     call_connect(struct rpc_task *task);
  77static void     call_connect_status(struct rpc_task *task);
  78
  79static __be32   *rpc_encode_header(struct rpc_task *task);
  80static __be32   *rpc_verify_header(struct rpc_task *task);
  81static int      rpc_ping(struct rpc_clnt *clnt);
  82
  83static void rpc_register_client(struct rpc_clnt *clnt)
  84{
  85        struct net *net = rpc_net_ns(clnt);
  86        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  87
  88        spin_lock(&sn->rpc_client_lock);
  89        list_add(&clnt->cl_clients, &sn->all_clients);
  90        spin_unlock(&sn->rpc_client_lock);
  91}
  92
  93static void rpc_unregister_client(struct rpc_clnt *clnt)
  94{
  95        struct net *net = rpc_net_ns(clnt);
  96        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  97
  98        spin_lock(&sn->rpc_client_lock);
  99        list_del(&clnt->cl_clients);
 100        spin_unlock(&sn->rpc_client_lock);
 101}
 102
 103static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 104{
 105        rpc_remove_client_dir(clnt);
 106}
 107
 108static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 109{
 110        struct net *net = rpc_net_ns(clnt);
 111        struct super_block *pipefs_sb;
 112
 113        pipefs_sb = rpc_get_sb_net(net);
 114        if (pipefs_sb) {
 115                __rpc_clnt_remove_pipedir(clnt);
 116                rpc_put_sb_net(net);
 117        }
 118}
 119
 120static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
 121                                    struct rpc_clnt *clnt)
 122{
 123        static uint32_t clntid;
 124        const char *dir_name = clnt->cl_program->pipe_dir_name;
 125        char name[15];
 126        struct dentry *dir, *dentry;
 127
 128        dir = rpc_d_lookup_sb(sb, dir_name);
 129        if (dir == NULL) {
 130                pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
 131                return dir;
 132        }
 133        for (;;) {
 134                snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
 135                name[sizeof(name) - 1] = '\0';
 136                dentry = rpc_create_client_dir(dir, name, clnt);
 137                if (!IS_ERR(dentry))
 138                        break;
 139                if (dentry == ERR_PTR(-EEXIST))
 140                        continue;
 141                printk(KERN_INFO "RPC: Couldn't create pipefs entry"
 142                                " %s/%s, error %ld\n",
 143                                dir_name, name, PTR_ERR(dentry));
 144                break;
 145        }
 146        dput(dir);
 147        return dentry;
 148}
 149
 150static int
 151rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
 152{
 153        struct dentry *dentry;
 154
 155        if (clnt->cl_program->pipe_dir_name != NULL) {
 156                dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
 157                if (IS_ERR(dentry))
 158                        return PTR_ERR(dentry);
 159        }
 160        return 0;
 161}
 162
 163static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
 164{
 165        if (clnt->cl_program->pipe_dir_name == NULL)
 166                return 1;
 167
 168        switch (event) {
 169        case RPC_PIPEFS_MOUNT:
 170                if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
 171                        return 1;
 172                if (atomic_read(&clnt->cl_count) == 0)
 173                        return 1;
 174                break;
 175        case RPC_PIPEFS_UMOUNT:
 176                if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
 177                        return 1;
 178                break;
 179        }
 180        return 0;
 181}
 182
 183static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
 184                                   struct super_block *sb)
 185{
 186        struct dentry *dentry;
 187
 188        switch (event) {
 189        case RPC_PIPEFS_MOUNT:
 190                dentry = rpc_setup_pipedir_sb(sb, clnt);
 191                if (!dentry)
 192                        return -ENOENT;
 193                if (IS_ERR(dentry))
 194                        return PTR_ERR(dentry);
 195                break;
 196        case RPC_PIPEFS_UMOUNT:
 197                __rpc_clnt_remove_pipedir(clnt);
 198                break;
 199        default:
 200                printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
 201                return -ENOTSUPP;
 202        }
 203        return 0;
 204}
 205
 206static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
 207                                struct super_block *sb)
 208{
 209        int error = 0;
 210
 211        for (;; clnt = clnt->cl_parent) {
 212                if (!rpc_clnt_skip_event(clnt, event))
 213                        error = __rpc_clnt_handle_event(clnt, event, sb);
 214                if (error || clnt == clnt->cl_parent)
 215                        break;
 216        }
 217        return error;
 218}
 219
 220static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
 221{
 222        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
 223        struct rpc_clnt *clnt;
 224
 225        spin_lock(&sn->rpc_client_lock);
 226        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
 227                if (rpc_clnt_skip_event(clnt, event))
 228                        continue;
 229                spin_unlock(&sn->rpc_client_lock);
 230                return clnt;
 231        }
 232        spin_unlock(&sn->rpc_client_lock);
 233        return NULL;
 234}
 235
 236static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
 237                            void *ptr)
 238{
 239        struct super_block *sb = ptr;
 240        struct rpc_clnt *clnt;
 241        int error = 0;
 242
 243        while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
 244                error = __rpc_pipefs_event(clnt, event, sb);
 245                if (error)
 246                        break;
 247        }
 248        return error;
 249}
 250
 251static struct notifier_block rpc_clients_block = {
 252        .notifier_call  = rpc_pipefs_event,
 253        .priority       = SUNRPC_PIPEFS_RPC_PRIO,
 254};
 255
 256int rpc_clients_notifier_register(void)
 257{
 258        return rpc_pipefs_notifier_register(&rpc_clients_block);
 259}
 260
 261void rpc_clients_notifier_unregister(void)
 262{
 263        return rpc_pipefs_notifier_unregister(&rpc_clients_block);
 264}
 265
 266static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
 267                struct rpc_xprt *xprt,
 268                const struct rpc_timeout *timeout)
 269{
 270        struct rpc_xprt *old;
 271
 272        spin_lock(&clnt->cl_lock);
 273        old = rcu_dereference_protected(clnt->cl_xprt,
 274                        lockdep_is_held(&clnt->cl_lock));
 275
 276        if (!xprt_bound(xprt))
 277                clnt->cl_autobind = 1;
 278
 279        clnt->cl_timeout = timeout;
 280        rcu_assign_pointer(clnt->cl_xprt, xprt);
 281        spin_unlock(&clnt->cl_lock);
 282
 283        return old;
 284}
 285
 286static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
 287{
 288        clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
 289                        nodename, sizeof(clnt->cl_nodename));
 290}
 291
 292static int rpc_client_register(struct rpc_clnt *clnt,
 293                               rpc_authflavor_t pseudoflavor,
 294                               const char *client_name)
 295{
 296        struct rpc_auth_create_args auth_args = {
 297                .pseudoflavor = pseudoflavor,
 298                .target_name = client_name,
 299        };
 300        struct rpc_auth *auth;
 301        struct net *net = rpc_net_ns(clnt);
 302        struct super_block *pipefs_sb;
 303        int err;
 304
 305        rpc_clnt_debugfs_register(clnt);
 306
 307        pipefs_sb = rpc_get_sb_net(net);
 308        if (pipefs_sb) {
 309                err = rpc_setup_pipedir(pipefs_sb, clnt);
 310                if (err)
 311                        goto out;
 312        }
 313
 314        rpc_register_client(clnt);
 315        if (pipefs_sb)
 316                rpc_put_sb_net(net);
 317
 318        auth = rpcauth_create(&auth_args, clnt);
 319        if (IS_ERR(auth)) {
 320                dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
 321                                pseudoflavor);
 322                err = PTR_ERR(auth);
 323                goto err_auth;
 324        }
 325        return 0;
 326err_auth:
 327        pipefs_sb = rpc_get_sb_net(net);
 328        rpc_unregister_client(clnt);
 329        __rpc_clnt_remove_pipedir(clnt);
 330out:
 331        if (pipefs_sb)
 332                rpc_put_sb_net(net);
 333        rpc_clnt_debugfs_unregister(clnt);
 334        return err;
 335}
 336
 337static DEFINE_IDA(rpc_clids);
 338
 339void rpc_cleanup_clids(void)
 340{
 341        ida_destroy(&rpc_clids);
 342}
 343
 344static int rpc_alloc_clid(struct rpc_clnt *clnt)
 345{
 346        int clid;
 347
 348        clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
 349        if (clid < 0)
 350                return clid;
 351        clnt->cl_clid = clid;
 352        return 0;
 353}
 354
 355static void rpc_free_clid(struct rpc_clnt *clnt)
 356{
 357        ida_simple_remove(&rpc_clids, clnt->cl_clid);
 358}
 359
 360static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
 361                struct rpc_xprt_switch *xps,
 362                struct rpc_xprt *xprt,
 363                struct rpc_clnt *parent)
 364{
 365        const struct rpc_program *program = args->program;
 366        const struct rpc_version *version;
 367        struct rpc_clnt *clnt = NULL;
 368        const struct rpc_timeout *timeout;
 369        const char *nodename = args->nodename;
 370        int err;
 371
 372        /* sanity check the name before trying to print it */
 373        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
 374                        program->name, args->servername, xprt);
 375
 376        err = rpciod_up();
 377        if (err)
 378                goto out_no_rpciod;
 379
 380        err = -EINVAL;
 381        if (args->version >= program->nrvers)
 382                goto out_err;
 383        version = program->version[args->version];
 384        if (version == NULL)
 385                goto out_err;
 386
 387        err = -ENOMEM;
 388        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
 389        if (!clnt)
 390                goto out_err;
 391        clnt->cl_parent = parent ? : clnt;
 392
 393        err = rpc_alloc_clid(clnt);
 394        if (err)
 395                goto out_no_clid;
 396
 397        clnt->cl_procinfo = version->procs;
 398        clnt->cl_maxproc  = version->nrprocs;
 399        clnt->cl_prog     = args->prognumber ? : program->number;
 400        clnt->cl_vers     = version->number;
 401        clnt->cl_stats    = program->stats;
 402        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
 403        rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
 404        err = -ENOMEM;
 405        if (clnt->cl_metrics == NULL)
 406                goto out_no_stats;
 407        clnt->cl_program  = program;
 408        INIT_LIST_HEAD(&clnt->cl_tasks);
 409        spin_lock_init(&clnt->cl_lock);
 410
 411        timeout = xprt->timeout;
 412        if (args->timeout != NULL) {
 413                memcpy(&clnt->cl_timeout_default, args->timeout,
 414                                sizeof(clnt->cl_timeout_default));
 415                timeout = &clnt->cl_timeout_default;
 416        }
 417
 418        rpc_clnt_set_transport(clnt, xprt, timeout);
 419        xprt_iter_init(&clnt->cl_xpi, xps);
 420        xprt_switch_put(xps);
 421
 422        clnt->cl_rtt = &clnt->cl_rtt_default;
 423        rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
 424
 425        atomic_set(&clnt->cl_count, 1);
 426
 427        if (nodename == NULL)
 428                nodename = utsname()->nodename;
 429        /* save the nodename */
 430        rpc_clnt_set_nodename(clnt, nodename);
 431
 432        err = rpc_client_register(clnt, args->authflavor, args->client_name);
 433        if (err)
 434                goto out_no_path;
 435        if (parent)
 436                atomic_inc(&parent->cl_count);
 437        return clnt;
 438
 439out_no_path:
 440        rpc_free_iostats(clnt->cl_metrics);
 441out_no_stats:
 442        rpc_free_clid(clnt);
 443out_no_clid:
 444        kfree(clnt);
 445out_err:
 446        rpciod_down();
 447out_no_rpciod:
 448        xprt_switch_put(xps);
 449        xprt_put(xprt);
 450        return ERR_PTR(err);
 451}
 452
 453static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
 454                                        struct rpc_xprt *xprt)
 455{
 456        struct rpc_clnt *clnt = NULL;
 457        struct rpc_xprt_switch *xps;
 458
 459        if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
 460                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 461                xps = args->bc_xprt->xpt_bc_xps;
 462                xprt_switch_get(xps);
 463        } else {
 464                xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 465                if (xps == NULL) {
 466                        xprt_put(xprt);
 467                        return ERR_PTR(-ENOMEM);
 468                }
 469                if (xprt->bc_xprt) {
 470                        xprt_switch_get(xps);
 471                        xprt->bc_xprt->xpt_bc_xps = xps;
 472                }
 473        }
 474        clnt = rpc_new_client(args, xps, xprt, NULL);
 475        if (IS_ERR(clnt))
 476                return clnt;
 477
 478        if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
 479                int err = rpc_ping(clnt);
 480                if (err != 0) {
 481                        rpc_shutdown_client(clnt);
 482                        return ERR_PTR(err);
 483                }
 484        }
 485
 486        clnt->cl_softrtry = 1;
 487        if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
 488                clnt->cl_softrtry = 0;
 489
 490        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
 491                clnt->cl_autobind = 1;
 492        if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
 493                clnt->cl_noretranstimeo = 1;
 494        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
 495                clnt->cl_discrtry = 1;
 496        if (!(args->flags & RPC_CLNT_CREATE_QUIET))
 497                clnt->cl_chatty = 1;
 498
 499        return clnt;
 500}
 501
 502/**
 503 * rpc_create - create an RPC client and transport with one call
 504 * @args: rpc_clnt create argument structure
 505 *
 506 * Creates and initializes an RPC transport and an RPC client.
 507 *
 508 * It can ping the server in order to determine if it is up, and to see if
 509 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
 510 * this behavior so asynchronous tasks can also use rpc_create.
 511 */
 512struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 513{
 514        struct rpc_xprt *xprt;
 515        struct xprt_create xprtargs = {
 516                .net = args->net,
 517                .ident = args->protocol,
 518                .srcaddr = args->saddress,
 519                .dstaddr = args->address,
 520                .addrlen = args->addrsize,
 521                .servername = args->servername,
 522                .bc_xprt = args->bc_xprt,
 523        };
 524        char servername[48];
 525
 526        if (args->bc_xprt) {
 527                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 528                xprt = args->bc_xprt->xpt_bc_xprt;
 529                if (xprt) {
 530                        xprt_get(xprt);
 531                        return rpc_create_xprt(args, xprt);
 532                }
 533        }
 534
 535        if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
 536                xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
 537        if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
 538                xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
 539        /*
 540         * If the caller chooses not to specify a hostname, whip
 541         * up a string representation of the passed-in address.
 542         */
 543        if (xprtargs.servername == NULL) {
 544                struct sockaddr_un *sun =
 545                                (struct sockaddr_un *)args->address;
 546                struct sockaddr_in *sin =
 547                                (struct sockaddr_in *)args->address;
 548                struct sockaddr_in6 *sin6 =
 549                                (struct sockaddr_in6 *)args->address;
 550
 551                servername[0] = '\0';
 552                switch (args->address->sa_family) {
 553                case AF_LOCAL:
 554                        snprintf(servername, sizeof(servername), "%s",
 555                                 sun->sun_path);
 556                        break;
 557                case AF_INET:
 558                        snprintf(servername, sizeof(servername), "%pI4",
 559                                 &sin->sin_addr.s_addr);
 560                        break;
 561                case AF_INET6:
 562                        snprintf(servername, sizeof(servername), "%pI6",
 563                                 &sin6->sin6_addr);
 564                        break;
 565                default:
 566                        /* caller wants default server name, but
 567                         * address family isn't recognized. */
 568                        return ERR_PTR(-EINVAL);
 569                }
 570                xprtargs.servername = servername;
 571        }
 572
 573        xprt = xprt_create_transport(&xprtargs);
 574        if (IS_ERR(xprt))
 575                return (struct rpc_clnt *)xprt;
 576
 577        /*
 578         * By default, kernel RPC client connects from a reserved port.
 579         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
 580         * but it is always enabled for rpciod, which handles the connect
 581         * operation.
 582         */
 583        xprt->resvport = 1;
 584        if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 585                xprt->resvport = 0;
 586
 587        return rpc_create_xprt(args, xprt);
 588}
 589EXPORT_SYMBOL_GPL(rpc_create);
 590
 591/*
 592 * This function clones the RPC client structure. It allows us to share the
 593 * same transport while varying parameters such as the authentication
 594 * flavour.
 595 */
 596static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
 597                                           struct rpc_clnt *clnt)
 598{
 599        struct rpc_xprt_switch *xps;
 600        struct rpc_xprt *xprt;
 601        struct rpc_clnt *new;
 602        int err;
 603
 604        err = -ENOMEM;
 605        rcu_read_lock();
 606        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
 607        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 608        rcu_read_unlock();
 609        if (xprt == NULL || xps == NULL) {
 610                xprt_put(xprt);
 611                xprt_switch_put(xps);
 612                goto out_err;
 613        }
 614        args->servername = xprt->servername;
 615        args->nodename = clnt->cl_nodename;
 616
 617        new = rpc_new_client(args, xps, xprt, clnt);
 618        if (IS_ERR(new)) {
 619                err = PTR_ERR(new);
 620                goto out_err;
 621        }
 622
 623        /* Turn off autobind on clones */
 624        new->cl_autobind = 0;
 625        new->cl_softrtry = clnt->cl_softrtry;
 626        new->cl_noretranstimeo = clnt->cl_noretranstimeo;
 627        new->cl_discrtry = clnt->cl_discrtry;
 628        new->cl_chatty = clnt->cl_chatty;
 629        return new;
 630
 631out_err:
 632        dprintk("RPC:       %s: returned error %d\n", __func__, err);
 633        return ERR_PTR(err);
 634}
 635
 636/**
 637 * rpc_clone_client - Clone an RPC client structure
 638 *
 639 * @clnt: RPC client whose parameters are copied
 640 *
 641 * Returns a fresh RPC client or an ERR_PTR.
 642 */
 643struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
 644{
 645        struct rpc_create_args args = {
 646                .program        = clnt->cl_program,
 647                .prognumber     = clnt->cl_prog,
 648                .version        = clnt->cl_vers,
 649                .authflavor     = clnt->cl_auth->au_flavor,
 650        };
 651        return __rpc_clone_client(&args, clnt);
 652}
 653EXPORT_SYMBOL_GPL(rpc_clone_client);
 654
 655/**
 656 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
 657 *
 658 * @clnt: RPC client whose parameters are copied
 659 * @flavor: security flavor for new client
 660 *
 661 * Returns a fresh RPC client or an ERR_PTR.
 662 */
 663struct rpc_clnt *
 664rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
 665{
 666        struct rpc_create_args args = {
 667                .program        = clnt->cl_program,
 668                .prognumber     = clnt->cl_prog,
 669                .version        = clnt->cl_vers,
 670                .authflavor     = flavor,
 671        };
 672        return __rpc_clone_client(&args, clnt);
 673}
 674EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
 675
 676/**
 677 * rpc_switch_client_transport: switch the RPC transport on the fly
 678 * @clnt: pointer to a struct rpc_clnt
 679 * @args: pointer to the new transport arguments
 680 * @timeout: pointer to the new timeout parameters
 681 *
 682 * This function allows the caller to switch the RPC transport for the
 683 * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
 684 * server, for instance.  It assumes that the caller has ensured that
 685 * there are no active RPC tasks by using some form of locking.
 686 *
 687 * Returns zero if "clnt" is now using the new xprt.  Otherwise a
 688 * negative errno is returned, and "clnt" continues to use the old
 689 * xprt.
 690 */
 691int rpc_switch_client_transport(struct rpc_clnt *clnt,
 692                struct xprt_create *args,
 693                const struct rpc_timeout *timeout)
 694{
 695        const struct rpc_timeout *old_timeo;
 696        rpc_authflavor_t pseudoflavor;
 697        struct rpc_xprt_switch *xps, *oldxps;
 698        struct rpc_xprt *xprt, *old;
 699        struct rpc_clnt *parent;
 700        int err;
 701
 702        xprt = xprt_create_transport(args);
 703        if (IS_ERR(xprt)) {
 704                dprintk("RPC:       failed to create new xprt for clnt %p\n",
 705                        clnt);
 706                return PTR_ERR(xprt);
 707        }
 708
 709        xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 710        if (xps == NULL) {
 711                xprt_put(xprt);
 712                return -ENOMEM;
 713        }
 714
 715        pseudoflavor = clnt->cl_auth->au_flavor;
 716
 717        old_timeo = clnt->cl_timeout;
 718        old = rpc_clnt_set_transport(clnt, xprt, timeout);
 719        oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
 720
 721        rpc_unregister_client(clnt);
 722        __rpc_clnt_remove_pipedir(clnt);
 723        rpc_clnt_debugfs_unregister(clnt);
 724
 725        /*
 726         * A new transport was created.  "clnt" therefore
 727         * becomes the root of a new cl_parent tree.  clnt's
 728         * children, if it has any, still point to the old xprt.
 729         */
 730        parent = clnt->cl_parent;
 731        clnt->cl_parent = clnt;
 732
 733        /*
 734         * The old rpc_auth cache cannot be re-used.  GSS
 735         * contexts in particular are between a single
 736         * client and server.
 737         */
 738        err = rpc_client_register(clnt, pseudoflavor, NULL);
 739        if (err)
 740                goto out_revert;
 741
 742        synchronize_rcu();
 743        if (parent != clnt)
 744                rpc_release_client(parent);
 745        xprt_switch_put(oldxps);
 746        xprt_put(old);
 747        dprintk("RPC:       replaced xprt for clnt %p\n", clnt);
 748        return 0;
 749
 750out_revert:
 751        xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
 752        rpc_clnt_set_transport(clnt, old, old_timeo);
 753        clnt->cl_parent = parent;
 754        rpc_client_register(clnt, pseudoflavor, NULL);
 755        xprt_switch_put(xps);
 756        xprt_put(xprt);
 757        dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt);
 758        return err;
 759}
 760EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
 761
 762static
 763int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
 764{
 765        struct rpc_xprt_switch *xps;
 766
 767        rcu_read_lock();
 768        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 769        rcu_read_unlock();
 770        if (xps == NULL)
 771                return -EAGAIN;
 772        xprt_iter_init_listall(xpi, xps);
 773        xprt_switch_put(xps);
 774        return 0;
 775}
 776
 777/**
 778 * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
 779 * @clnt: pointer to client
 780 * @fn: function to apply
 781 * @data: void pointer to function data
 782 *
 783 * Iterates through the list of RPC transports currently attached to the
 784 * client and applies the function fn(clnt, xprt, data).
 785 *
 786 * On error, the iteration stops, and the function returns the error value.
 787 */
 788int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
 789                int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
 790                void *data)
 791{
 792        struct rpc_xprt_iter xpi;
 793        int ret;
 794
 795        ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
 796        if (ret)
 797                return ret;
 798        for (;;) {
 799                struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
 800
 801                if (!xprt)
 802                        break;
 803                ret = fn(clnt, xprt, data);
 804                xprt_put(xprt);
 805                if (ret < 0)
 806                        break;
 807        }
 808        xprt_iter_destroy(&xpi);
 809        return ret;
 810}
 811EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
 812
 813/*
 814 * Kill all tasks for the given client.
 815 * XXX: kill their descendants as well?
 816 */
 817void rpc_killall_tasks(struct rpc_clnt *clnt)
 818{
 819        struct rpc_task *rovr;
 820
 821
 822        if (list_empty(&clnt->cl_tasks))
 823                return;
 824        dprintk("RPC:       killing all tasks for client %p\n", clnt);
 825        /*
 826         * Spin lock all_tasks to prevent changes...
 827         */
 828        spin_lock(&clnt->cl_lock);
 829        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
 830                if (!RPC_IS_ACTIVATED(rovr))
 831                        continue;
 832                if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
 833                        rovr->tk_flags |= RPC_TASK_KILLED;
 834                        rpc_exit(rovr, -EIO);
 835                        if (RPC_IS_QUEUED(rovr))
 836                                rpc_wake_up_queued_task(rovr->tk_waitqueue,
 837                                                        rovr);
 838                }
 839        }
 840        spin_unlock(&clnt->cl_lock);
 841}
 842EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 843
 844/*
 845 * Properly shut down an RPC client, terminating all outstanding
 846 * requests.
 847 */
 848void rpc_shutdown_client(struct rpc_clnt *clnt)
 849{
 850        might_sleep();
 851
 852        dprintk_rcu("RPC:       shutting down %s client for %s\n",
 853                        clnt->cl_program->name,
 854                        rcu_dereference(clnt->cl_xprt)->servername);
 855
 856        while (!list_empty(&clnt->cl_tasks)) {
 857                rpc_killall_tasks(clnt);
 858                wait_event_timeout(destroy_wait,
 859                        list_empty(&clnt->cl_tasks), 1*HZ);
 860        }
 861
 862        rpc_release_client(clnt);
 863}
 864EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 865
 866/*
 867 * Free an RPC client
 868 */
 869static struct rpc_clnt *
 870rpc_free_client(struct rpc_clnt *clnt)
 871{
 872        struct rpc_clnt *parent = NULL;
 873
 874        dprintk_rcu("RPC:       destroying %s client for %s\n",
 875                        clnt->cl_program->name,
 876                        rcu_dereference(clnt->cl_xprt)->servername);
 877        if (clnt->cl_parent != clnt)
 878                parent = clnt->cl_parent;
 879        rpc_clnt_debugfs_unregister(clnt);
 880        rpc_clnt_remove_pipedir(clnt);
 881        rpc_unregister_client(clnt);
 882        rpc_free_iostats(clnt->cl_metrics);
 883        clnt->cl_metrics = NULL;
 884        xprt_put(rcu_dereference_raw(clnt->cl_xprt));
 885        xprt_iter_destroy(&clnt->cl_xpi);
 886        rpciod_down();
 887        rpc_free_clid(clnt);
 888        kfree(clnt);
 889        return parent;
 890}
 891
 892/*
 893 * Free an RPC client
 894 */
 895static struct rpc_clnt * 
 896rpc_free_auth(struct rpc_clnt *clnt)
 897{
 898        if (clnt->cl_auth == NULL)
 899                return rpc_free_client(clnt);
 900
 901        /*
 902         * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
 903         *       release remaining GSS contexts. This mechanism ensures
 904         *       that it can do so safely.
 905         */
 906        atomic_inc(&clnt->cl_count);
 907        rpcauth_release(clnt->cl_auth);
 908        clnt->cl_auth = NULL;
 909        if (atomic_dec_and_test(&clnt->cl_count))
 910                return rpc_free_client(clnt);
 911        return NULL;
 912}
 913
 914/*
 915 * Release reference to the RPC client
 916 */
 917void
 918rpc_release_client(struct rpc_clnt *clnt)
 919{
 920        dprintk("RPC:       rpc_release_client(%p)\n", clnt);
 921
 922        do {
 923                if (list_empty(&clnt->cl_tasks))
 924                        wake_up(&destroy_wait);
 925                if (!atomic_dec_and_test(&clnt->cl_count))
 926                        break;
 927                clnt = rpc_free_auth(clnt);
 928        } while (clnt != NULL);
 929}
 930EXPORT_SYMBOL_GPL(rpc_release_client);
 931
 932/**
 933 * rpc_bind_new_program - bind a new RPC program to an existing client
 934 * @old: old rpc_client
 935 * @program: rpc program to set
 936 * @vers: rpc program version
 937 *
 938 * Clones the rpc client and sets up a new RPC program. This is mainly
 939 * of use for enabling different RPC programs to share the same transport.
 940 * The Sun NFSv2/v3 ACL protocol can do this.
 941 */
 942struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 943                                      const struct rpc_program *program,
 944                                      u32 vers)
 945{
 946        struct rpc_create_args args = {
 947                .program        = program,
 948                .prognumber     = program->number,
 949                .version        = vers,
 950                .authflavor     = old->cl_auth->au_flavor,
 951        };
 952        struct rpc_clnt *clnt;
 953        int err;
 954
 955        clnt = __rpc_clone_client(&args, old);
 956        if (IS_ERR(clnt))
 957                goto out;
 958        err = rpc_ping(clnt);
 959        if (err != 0) {
 960                rpc_shutdown_client(clnt);
 961                clnt = ERR_PTR(err);
 962        }
 963out:
 964        return clnt;
 965}
 966EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 967
 968void rpc_task_release_client(struct rpc_task *task)
 969{
 970        struct rpc_clnt *clnt = task->tk_client;
 971        struct rpc_xprt *xprt = task->tk_xprt;
 972
 973        if (clnt != NULL) {
 974                /* Remove from client task list */
 975                spin_lock(&clnt->cl_lock);
 976                list_del(&task->tk_task);
 977                spin_unlock(&clnt->cl_lock);
 978                task->tk_client = NULL;
 979
 980                rpc_release_client(clnt);
 981        }
 982
 983        if (xprt != NULL) {
 984                task->tk_xprt = NULL;
 985
 986                xprt_put(xprt);
 987        }
 988}
 989
 990static
 991void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 992{
 993
 994        if (clnt != NULL) {
 995                if (task->tk_xprt == NULL)
 996                        task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
 997                task->tk_client = clnt;
 998                atomic_inc(&clnt->cl_count);
 999                if (clnt->cl_softrtry)
1000                        task->tk_flags |= RPC_TASK_SOFT;
1001                if (clnt->cl_noretranstimeo)
1002                        task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1003                if (atomic_read(&clnt->cl_swapper))
1004                        task->tk_flags |= RPC_TASK_SWAPPER;
1005                /* Add to the client's list of all tasks */
1006                spin_lock(&clnt->cl_lock);
1007                list_add_tail(&task->tk_task, &clnt->cl_tasks);
1008                spin_unlock(&clnt->cl_lock);
1009        }
1010}
1011
1012static void
1013rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1014{
1015        if (msg != NULL) {
1016                task->tk_msg.rpc_proc = msg->rpc_proc;
1017                task->tk_msg.rpc_argp = msg->rpc_argp;
1018                task->tk_msg.rpc_resp = msg->rpc_resp;
1019                if (msg->rpc_cred != NULL)
1020                        task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
1021        }
1022}
1023
1024/*
1025 * Default callback for async RPC calls
1026 */
1027static void
1028rpc_default_callback(struct rpc_task *task, void *data)
1029{
1030}
1031
1032static const struct rpc_call_ops rpc_default_ops = {
1033        .rpc_call_done = rpc_default_callback,
1034};
1035
1036/**
1037 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1038 * @task_setup_data: pointer to task initialisation data
1039 */
1040struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1041{
1042        struct rpc_task *task;
1043
1044        task = rpc_new_task(task_setup_data);
1045
1046        rpc_task_set_client(task, task_setup_data->rpc_client);
1047        rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1048
1049        if (task->tk_action == NULL)
1050                rpc_call_start(task);
1051
1052        atomic_inc(&task->tk_count);
1053        rpc_execute(task);
1054        return task;
1055}
1056EXPORT_SYMBOL_GPL(rpc_run_task);
1057
1058/**
1059 * rpc_call_sync - Perform a synchronous RPC call
1060 * @clnt: pointer to RPC client
1061 * @msg: RPC call parameters
1062 * @flags: RPC call flags
1063 */
1064int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1065{
1066        struct rpc_task *task;
1067        struct rpc_task_setup task_setup_data = {
1068                .rpc_client = clnt,
1069                .rpc_message = msg,
1070                .callback_ops = &rpc_default_ops,
1071                .flags = flags,
1072        };
1073        int status;
1074
1075        WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1076        if (flags & RPC_TASK_ASYNC) {
1077                rpc_release_calldata(task_setup_data.callback_ops,
1078                        task_setup_data.callback_data);
1079                return -EINVAL;
1080        }
1081
1082        task = rpc_run_task(&task_setup_data);
1083        if (IS_ERR(task))
1084                return PTR_ERR(task);
1085        status = task->tk_status;
1086        rpc_put_task(task);
1087        return status;
1088}
1089EXPORT_SYMBOL_GPL(rpc_call_sync);
1090
1091/**
1092 * rpc_call_async - Perform an asynchronous RPC call
1093 * @clnt: pointer to RPC client
1094 * @msg: RPC call parameters
1095 * @flags: RPC call flags
1096 * @tk_ops: RPC call ops
1097 * @data: user call data
1098 */
1099int
1100rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1101               const struct rpc_call_ops *tk_ops, void *data)
1102{
1103        struct rpc_task *task;
1104        struct rpc_task_setup task_setup_data = {
1105                .rpc_client = clnt,
1106                .rpc_message = msg,
1107                .callback_ops = tk_ops,
1108                .callback_data = data,
1109                .flags = flags|RPC_TASK_ASYNC,
1110        };
1111
1112        task = rpc_run_task(&task_setup_data);
1113        if (IS_ERR(task))
1114                return PTR_ERR(task);
1115        rpc_put_task(task);
1116        return 0;
1117}
1118EXPORT_SYMBOL_GPL(rpc_call_async);
1119
1120#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1121/**
1122 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1123 * rpc_execute against it
1124 * @req: RPC request
1125 */
1126struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1127{
1128        struct rpc_task *task;
1129        struct xdr_buf *xbufp = &req->rq_snd_buf;
1130        struct rpc_task_setup task_setup_data = {
1131                .callback_ops = &rpc_default_ops,
1132                .flags = RPC_TASK_SOFTCONN,
1133        };
1134
1135        dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1136        /*
1137         * Create an rpc_task to send the data
1138         */
1139        task = rpc_new_task(&task_setup_data);
1140        task->tk_rqstp = req;
1141
1142        /*
1143         * Set up the xdr_buf length.
1144         * This also indicates that the buffer is XDR encoded already.
1145         */
1146        xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1147                        xbufp->tail[0].iov_len;
1148
1149        task->tk_action = call_bc_transmit;
1150        atomic_inc(&task->tk_count);
1151        WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1152        rpc_execute(task);
1153
1154        dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1155        return task;
1156}
1157#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1158
1159void
1160rpc_call_start(struct rpc_task *task)
1161{
1162        task->tk_action = call_start;
1163}
1164EXPORT_SYMBOL_GPL(rpc_call_start);
1165
1166/**
1167 * rpc_peeraddr - extract remote peer address from clnt's xprt
1168 * @clnt: RPC client structure
1169 * @buf: target buffer
1170 * @bufsize: length of target buffer
1171 *
1172 * Returns the number of bytes that are actually in the stored address.
1173 */
1174size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1175{
1176        size_t bytes;
1177        struct rpc_xprt *xprt;
1178
1179        rcu_read_lock();
1180        xprt = rcu_dereference(clnt->cl_xprt);
1181
1182        bytes = xprt->addrlen;
1183        if (bytes > bufsize)
1184                bytes = bufsize;
1185        memcpy(buf, &xprt->addr, bytes);
1186        rcu_read_unlock();
1187
1188        return bytes;
1189}
1190EXPORT_SYMBOL_GPL(rpc_peeraddr);
1191
1192/**
1193 * rpc_peeraddr2str - return remote peer address in printable format
1194 * @clnt: RPC client structure
1195 * @format: address format
1196 *
1197 * NB: the lifetime of the memory referenced by the returned pointer is
1198 * the same as the rpc_xprt itself.  As long as the caller uses this
1199 * pointer, it must hold the RCU read lock.
1200 */
1201const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1202                             enum rpc_display_format_t format)
1203{
1204        struct rpc_xprt *xprt;
1205
1206        xprt = rcu_dereference(clnt->cl_xprt);
1207
1208        if (xprt->address_strings[format] != NULL)
1209                return xprt->address_strings[format];
1210        else
1211                return "unprintable";
1212}
1213EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1214
1215static const struct sockaddr_in rpc_inaddr_loopback = {
1216        .sin_family             = AF_INET,
1217        .sin_addr.s_addr        = htonl(INADDR_ANY),
1218};
1219
1220static const struct sockaddr_in6 rpc_in6addr_loopback = {
1221        .sin6_family            = AF_INET6,
1222        .sin6_addr              = IN6ADDR_ANY_INIT,
1223};
1224
1225/*
1226 * Try a getsockname() on a connected datagram socket.  Using a
1227 * connected datagram socket prevents leaving a socket in TIME_WAIT.
1228 * This conserves the ephemeral port number space.
1229 *
1230 * Returns zero and fills in "buf" if successful; otherwise, a
1231 * negative errno is returned.
1232 */
1233static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1234                        struct sockaddr *buf)
1235{
1236        struct socket *sock;
1237        int err;
1238
1239        err = __sock_create(net, sap->sa_family,
1240                                SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1241        if (err < 0) {
1242                dprintk("RPC:       can't create UDP socket (%d)\n", err);
1243                goto out;
1244        }
1245
1246        switch (sap->sa_family) {
1247        case AF_INET:
1248                err = kernel_bind(sock,
1249                                (struct sockaddr *)&rpc_inaddr_loopback,
1250                                sizeof(rpc_inaddr_loopback));
1251                break;
1252        case AF_INET6:
1253                err = kernel_bind(sock,
1254                                (struct sockaddr *)&rpc_in6addr_loopback,
1255                                sizeof(rpc_in6addr_loopback));
1256                break;
1257        default:
1258                err = -EAFNOSUPPORT;
1259                goto out;
1260        }
1261        if (err < 0) {
1262                dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1263                goto out_release;
1264        }
1265
1266        err = kernel_connect(sock, sap, salen, 0);
1267        if (err < 0) {
1268                dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1269                goto out_release;
1270        }
1271
1272        err = kernel_getsockname(sock, buf);
1273        if (err < 0) {
1274                dprintk("RPC:       getsockname failed (%d)\n", err);
1275                goto out_release;
1276        }
1277
1278        err = 0;
1279        if (buf->sa_family == AF_INET6) {
1280                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1281                sin6->sin6_scope_id = 0;
1282        }
1283        dprintk("RPC:       %s succeeded\n", __func__);
1284
1285out_release:
1286        sock_release(sock);
1287out:
1288        return err;
1289}
1290
1291/*
1292 * Scraping a connected socket failed, so we don't have a useable
1293 * local address.  Fallback: generate an address that will prevent
1294 * the server from calling us back.
1295 *
1296 * Returns zero and fills in "buf" if successful; otherwise, a
1297 * negative errno is returned.
1298 */
1299static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1300{
1301        switch (family) {
1302        case AF_INET:
1303                if (buflen < sizeof(rpc_inaddr_loopback))
1304                        return -EINVAL;
1305                memcpy(buf, &rpc_inaddr_loopback,
1306                                sizeof(rpc_inaddr_loopback));
1307                break;
1308        case AF_INET6:
1309                if (buflen < sizeof(rpc_in6addr_loopback))
1310                        return -EINVAL;
1311                memcpy(buf, &rpc_in6addr_loopback,
1312                                sizeof(rpc_in6addr_loopback));
1313                break;
1314        default:
1315                dprintk("RPC:       %s: address family not supported\n",
1316                        __func__);
1317                return -EAFNOSUPPORT;
1318        }
1319        dprintk("RPC:       %s: succeeded\n", __func__);
1320        return 0;
1321}
1322
1323/**
1324 * rpc_localaddr - discover local endpoint address for an RPC client
1325 * @clnt: RPC client structure
1326 * @buf: target buffer
1327 * @buflen: size of target buffer, in bytes
1328 *
1329 * Returns zero and fills in "buf" and "buflen" if successful;
1330 * otherwise, a negative errno is returned.
1331 *
1332 * This works even if the underlying transport is not currently connected,
1333 * or if the upper layer never previously provided a source address.
1334 *
1335 * The result of this function call is transient: multiple calls in
1336 * succession may give different results, depending on how local
1337 * networking configuration changes over time.
1338 */
1339int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1340{
1341        struct sockaddr_storage address;
1342        struct sockaddr *sap = (struct sockaddr *)&address;
1343        struct rpc_xprt *xprt;
1344        struct net *net;
1345        size_t salen;
1346        int err;
1347
1348        rcu_read_lock();
1349        xprt = rcu_dereference(clnt->cl_xprt);
1350        salen = xprt->addrlen;
1351        memcpy(sap, &xprt->addr, salen);
1352        net = get_net(xprt->xprt_net);
1353        rcu_read_unlock();
1354
1355        rpc_set_port(sap, 0);
1356        err = rpc_sockname(net, sap, salen, buf);
1357        put_net(net);
1358        if (err != 0)
1359                /* Couldn't discover local address, return ANYADDR */
1360                return rpc_anyaddr(sap->sa_family, buf, buflen);
1361        return 0;
1362}
1363EXPORT_SYMBOL_GPL(rpc_localaddr);
1364
1365void
1366rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1367{
1368        struct rpc_xprt *xprt;
1369
1370        rcu_read_lock();
1371        xprt = rcu_dereference(clnt->cl_xprt);
1372        if (xprt->ops->set_buffer_size)
1373                xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1374        rcu_read_unlock();
1375}
1376EXPORT_SYMBOL_GPL(rpc_setbufsize);
1377
1378/**
1379 * rpc_net_ns - Get the network namespace for this RPC client
1380 * @clnt: RPC client to query
1381 *
1382 */
1383struct net *rpc_net_ns(struct rpc_clnt *clnt)
1384{
1385        struct net *ret;
1386
1387        rcu_read_lock();
1388        ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1389        rcu_read_unlock();
1390        return ret;
1391}
1392EXPORT_SYMBOL_GPL(rpc_net_ns);
1393
1394/**
1395 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1396 * @clnt: RPC client to query
1397 *
1398 * For stream transports, this is one RPC record fragment (see RFC
1399 * 1831), as we don't support multi-record requests yet.  For datagram
1400 * transports, this is the size of an IP packet minus the IP, UDP, and
1401 * RPC header sizes.
1402 */
1403size_t rpc_max_payload(struct rpc_clnt *clnt)
1404{
1405        size_t ret;
1406
1407        rcu_read_lock();
1408        ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1409        rcu_read_unlock();
1410        return ret;
1411}
1412EXPORT_SYMBOL_GPL(rpc_max_payload);
1413
1414/**
1415 * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1416 * @clnt: RPC client to query
1417 */
1418size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1419{
1420        struct rpc_xprt *xprt;
1421        size_t ret;
1422
1423        rcu_read_lock();
1424        xprt = rcu_dereference(clnt->cl_xprt);
1425        ret = xprt->ops->bc_maxpayload(xprt);
1426        rcu_read_unlock();
1427        return ret;
1428}
1429EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1430
1431/**
1432 * rpc_force_rebind - force transport to check that remote port is unchanged
1433 * @clnt: client to rebind
1434 *
1435 */
1436void rpc_force_rebind(struct rpc_clnt *clnt)
1437{
1438        if (clnt->cl_autobind) {
1439                rcu_read_lock();
1440                xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1441                rcu_read_unlock();
1442        }
1443}
1444EXPORT_SYMBOL_GPL(rpc_force_rebind);
1445
1446/*
1447 * Restart an (async) RPC call from the call_prepare state.
1448 * Usually called from within the exit handler.
1449 */
1450int
1451rpc_restart_call_prepare(struct rpc_task *task)
1452{
1453        if (RPC_ASSASSINATED(task))
1454                return 0;
1455        task->tk_action = call_start;
1456        task->tk_status = 0;
1457        if (task->tk_ops->rpc_call_prepare != NULL)
1458                task->tk_action = rpc_prepare_task;
1459        return 1;
1460}
1461EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1462
1463/*
1464 * Restart an (async) RPC call. Usually called from within the
1465 * exit handler.
1466 */
1467int
1468rpc_restart_call(struct rpc_task *task)
1469{
1470        if (RPC_ASSASSINATED(task))
1471                return 0;
1472        task->tk_action = call_start;
1473        task->tk_status = 0;
1474        return 1;
1475}
1476EXPORT_SYMBOL_GPL(rpc_restart_call);
1477
1478const char
1479*rpc_proc_name(const struct rpc_task *task)
1480{
1481        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1482
1483        if (proc) {
1484                if (proc->p_name)
1485                        return proc->p_name;
1486                else
1487                        return "NULL";
1488        } else
1489                return "no proc";
1490}
1491
1492/*
1493 * 0.  Initial state
1494 *
1495 *     Other FSM states can be visited zero or more times, but
1496 *     this state is visited exactly once for each RPC.
1497 */
1498static void
1499call_start(struct rpc_task *task)
1500{
1501        struct rpc_clnt *clnt = task->tk_client;
1502        int idx = task->tk_msg.rpc_proc->p_statidx;
1503
1504        trace_rpc_request(task);
1505        dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1506                        clnt->cl_program->name, clnt->cl_vers,
1507                        rpc_proc_name(task),
1508                        (RPC_IS_ASYNC(task) ? "async" : "sync"));
1509
1510        /* Increment call count (version might not be valid for ping) */
1511        if (clnt->cl_program->version[clnt->cl_vers])
1512                clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1513        clnt->cl_stats->rpccnt++;
1514        task->tk_action = call_reserve;
1515}
1516
1517/*
1518 * 1.   Reserve an RPC call slot
1519 */
1520static void
1521call_reserve(struct rpc_task *task)
1522{
1523        dprint_status(task);
1524
1525        task->tk_status  = 0;
1526        task->tk_action  = call_reserveresult;
1527        xprt_reserve(task);
1528}
1529
1530static void call_retry_reserve(struct rpc_task *task);
1531
1532/*
1533 * 1b.  Grok the result of xprt_reserve()
1534 */
1535static void
1536call_reserveresult(struct rpc_task *task)
1537{
1538        int status = task->tk_status;
1539
1540        dprint_status(task);
1541
1542        /*
1543         * After a call to xprt_reserve(), we must have either
1544         * a request slot or else an error status.
1545         */
1546        task->tk_status = 0;
1547        if (status >= 0) {
1548                if (task->tk_rqstp) {
1549                        task->tk_action = call_refresh;
1550                        return;
1551                }
1552
1553                printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1554                                __func__, status);
1555                rpc_exit(task, -EIO);
1556                return;
1557        }
1558
1559        /*
1560         * Even though there was an error, we may have acquired
1561         * a request slot somehow.  Make sure not to leak it.
1562         */
1563        if (task->tk_rqstp) {
1564                printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1565                                __func__, status);
1566                xprt_release(task);
1567        }
1568
1569        switch (status) {
1570        case -ENOMEM:
1571                rpc_delay(task, HZ >> 2);
1572                /* fall through */
1573        case -EAGAIN:   /* woken up; retry */
1574                task->tk_action = call_retry_reserve;
1575                return;
1576        case -EIO:      /* probably a shutdown */
1577                break;
1578        default:
1579                printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1580                                __func__, status);
1581                break;
1582        }
1583        rpc_exit(task, status);
1584}
1585
1586/*
1587 * 1c.  Retry reserving an RPC call slot
1588 */
1589static void
1590call_retry_reserve(struct rpc_task *task)
1591{
1592        dprint_status(task);
1593
1594        task->tk_status  = 0;
1595        task->tk_action  = call_reserveresult;
1596        xprt_retry_reserve(task);
1597}
1598
1599/*
1600 * 2.   Bind and/or refresh the credentials
1601 */
1602static void
1603call_refresh(struct rpc_task *task)
1604{
1605        dprint_status(task);
1606
1607        task->tk_action = call_refreshresult;
1608        task->tk_status = 0;
1609        task->tk_client->cl_stats->rpcauthrefresh++;
1610        rpcauth_refreshcred(task);
1611}
1612
1613/*
1614 * 2a.  Process the results of a credential refresh
1615 */
1616static void
1617call_refreshresult(struct rpc_task *task)
1618{
1619        int status = task->tk_status;
1620
1621        dprint_status(task);
1622
1623        task->tk_status = 0;
1624        task->tk_action = call_refresh;
1625        switch (status) {
1626        case 0:
1627                if (rpcauth_uptodatecred(task)) {
1628                        task->tk_action = call_allocate;
1629                        return;
1630                }
1631                /* Use rate-limiting and a max number of retries if refresh
1632                 * had status 0 but failed to update the cred.
1633                 */
1634                /* fall through */
1635        case -ETIMEDOUT:
1636                rpc_delay(task, 3*HZ);
1637                /* fall through */
1638        case -EAGAIN:
1639                status = -EACCES;
1640                /* fall through */
1641        case -EKEYEXPIRED:
1642                if (!task->tk_cred_retry)
1643                        break;
1644                task->tk_cred_retry--;
1645                dprintk("RPC: %5u %s: retry refresh creds\n",
1646                                task->tk_pid, __func__);
1647                return;
1648        }
1649        dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1650                                task->tk_pid, __func__, status);
1651        rpc_exit(task, status);
1652}
1653
1654/*
1655 * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1656 *      (Note: buffer memory is freed in xprt_release).
1657 */
1658static void
1659call_allocate(struct rpc_task *task)
1660{
1661        unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1662        struct rpc_rqst *req = task->tk_rqstp;
1663        struct rpc_xprt *xprt = req->rq_xprt;
1664        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1665        int status;
1666
1667        dprint_status(task);
1668
1669        task->tk_status = 0;
1670        task->tk_action = call_bind;
1671
1672        if (req->rq_buffer)
1673                return;
1674
1675        if (proc->p_proc != 0) {
1676                BUG_ON(proc->p_arglen == 0);
1677                if (proc->p_decode != NULL)
1678                        BUG_ON(proc->p_replen == 0);
1679        }
1680
1681        /*
1682         * Calculate the size (in quads) of the RPC call
1683         * and reply headers, and convert both values
1684         * to byte sizes.
1685         */
1686        req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1687        req->rq_callsize <<= 2;
1688        req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1689        req->rq_rcvsize <<= 2;
1690
1691        status = xprt->ops->buf_alloc(task);
1692        xprt_inject_disconnect(xprt);
1693        if (status == 0)
1694                return;
1695        if (status != -ENOMEM) {
1696                rpc_exit(task, status);
1697                return;
1698        }
1699
1700        dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1701
1702        if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1703                task->tk_action = call_allocate;
1704                rpc_delay(task, HZ>>4);
1705                return;
1706        }
1707
1708        rpc_exit(task, -ERESTARTSYS);
1709}
1710
1711static inline int
1712rpc_task_need_encode(struct rpc_task *task)
1713{
1714        return task->tk_rqstp->rq_snd_buf.len == 0;
1715}
1716
1717static inline void
1718rpc_task_force_reencode(struct rpc_task *task)
1719{
1720        task->tk_rqstp->rq_snd_buf.len = 0;
1721        task->tk_rqstp->rq_bytes_sent = 0;
1722}
1723
1724/*
1725 * 3.   Encode arguments of an RPC call
1726 */
1727static void
1728rpc_xdr_encode(struct rpc_task *task)
1729{
1730        struct rpc_rqst *req = task->tk_rqstp;
1731        kxdreproc_t     encode;
1732        __be32          *p;
1733
1734        dprint_status(task);
1735
1736        xdr_buf_init(&req->rq_snd_buf,
1737                     req->rq_buffer,
1738                     req->rq_callsize);
1739        xdr_buf_init(&req->rq_rcv_buf,
1740                     req->rq_rbuffer,
1741                     req->rq_rcvsize);
1742
1743        p = rpc_encode_header(task);
1744        if (p == NULL) {
1745                printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1746                rpc_exit(task, -EIO);
1747                return;
1748        }
1749
1750        encode = task->tk_msg.rpc_proc->p_encode;
1751        if (encode == NULL)
1752                return;
1753
1754        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1755                        task->tk_msg.rpc_argp);
1756}
1757
1758/*
1759 * 4.   Get the server port number if not yet set
1760 */
1761static void
1762call_bind(struct rpc_task *task)
1763{
1764        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1765
1766        dprint_status(task);
1767
1768        task->tk_action = call_connect;
1769        if (!xprt_bound(xprt)) {
1770                task->tk_action = call_bind_status;
1771                task->tk_timeout = xprt->bind_timeout;
1772                xprt->ops->rpcbind(task);
1773        }
1774}
1775
1776/*
1777 * 4a.  Sort out bind result
1778 */
1779static void
1780call_bind_status(struct rpc_task *task)
1781{
1782        int status = -EIO;
1783
1784        if (task->tk_status >= 0) {
1785                dprint_status(task);
1786                task->tk_status = 0;
1787                task->tk_action = call_connect;
1788                return;
1789        }
1790
1791        trace_rpc_bind_status(task);
1792        switch (task->tk_status) {
1793        case -ENOMEM:
1794                dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1795                rpc_delay(task, HZ >> 2);
1796                goto retry_timeout;
1797        case -EACCES:
1798                dprintk("RPC: %5u remote rpcbind: RPC program/version "
1799                                "unavailable\n", task->tk_pid);
1800                /* fail immediately if this is an RPC ping */
1801                if (task->tk_msg.rpc_proc->p_proc == 0) {
1802                        status = -EOPNOTSUPP;
1803                        break;
1804                }
1805                if (task->tk_rebind_retry == 0)
1806                        break;
1807                task->tk_rebind_retry--;
1808                rpc_delay(task, 3*HZ);
1809                goto retry_timeout;
1810        case -ETIMEDOUT:
1811                dprintk("RPC: %5u rpcbind request timed out\n",
1812                                task->tk_pid);
1813                goto retry_timeout;
1814        case -EPFNOSUPPORT:
1815                /* server doesn't support any rpcbind version we know of */
1816                dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1817                                task->tk_pid);
1818                break;
1819        case -EPROTONOSUPPORT:
1820                dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1821                                task->tk_pid);
1822                goto retry_timeout;
1823        case -ECONNREFUSED:             /* connection problems */
1824        case -ECONNRESET:
1825        case -ECONNABORTED:
1826        case -ENOTCONN:
1827        case -EHOSTDOWN:
1828        case -ENETDOWN:
1829        case -EHOSTUNREACH:
1830        case -ENETUNREACH:
1831        case -ENOBUFS:
1832        case -EPIPE:
1833                dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1834                                task->tk_pid, task->tk_status);
1835                if (!RPC_IS_SOFTCONN(task)) {
1836                        rpc_delay(task, 5*HZ);
1837                        goto retry_timeout;
1838                }
1839                status = task->tk_status;
1840                break;
1841        default:
1842                dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1843                                task->tk_pid, -task->tk_status);
1844        }
1845
1846        rpc_exit(task, status);
1847        return;
1848
1849retry_timeout:
1850        task->tk_status = 0;
1851        task->tk_action = call_timeout;
1852}
1853
1854/*
1855 * 4b.  Connect to the RPC server
1856 */
1857static void
1858call_connect(struct rpc_task *task)
1859{
1860        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1861
1862        dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1863                        task->tk_pid, xprt,
1864                        (xprt_connected(xprt) ? "is" : "is not"));
1865
1866        task->tk_action = call_transmit;
1867        if (!xprt_connected(xprt)) {
1868                task->tk_action = call_connect_status;
1869                if (task->tk_status < 0)
1870                        return;
1871                if (task->tk_flags & RPC_TASK_NOCONNECT) {
1872                        rpc_exit(task, -ENOTCONN);
1873                        return;
1874                }
1875                xprt_connect(task);
1876        }
1877}
1878
1879/*
1880 * 4c.  Sort out connect result
1881 */
1882static void
1883call_connect_status(struct rpc_task *task)
1884{
1885        struct rpc_clnt *clnt = task->tk_client;
1886        int status = task->tk_status;
1887
1888        dprint_status(task);
1889
1890        trace_rpc_connect_status(task);
1891        task->tk_status = 0;
1892        switch (status) {
1893        case -ECONNREFUSED:
1894                /* A positive refusal suggests a rebind is needed. */
1895                if (RPC_IS_SOFTCONN(task))
1896                        break;
1897                if (clnt->cl_autobind) {
1898                        rpc_force_rebind(clnt);
1899                        task->tk_action = call_bind;
1900                        return;
1901                }
1902                /* fall through */
1903        case -ECONNRESET:
1904        case -ECONNABORTED:
1905        case -ENETDOWN:
1906        case -ENETUNREACH:
1907        case -EHOSTUNREACH:
1908        case -EADDRINUSE:
1909        case -ENOBUFS:
1910        case -EPIPE:
1911                xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
1912                                            task->tk_rqstp->rq_connect_cookie);
1913                if (RPC_IS_SOFTCONN(task))
1914                        break;
1915                /* retry with existing socket, after a delay */
1916                rpc_delay(task, 3*HZ);
1917                /* fall through */
1918        case -EAGAIN:
1919                /* Check for timeouts before looping back to call_bind */
1920        case -ETIMEDOUT:
1921                task->tk_action = call_timeout;
1922                return;
1923        case 0:
1924                clnt->cl_stats->netreconn++;
1925                task->tk_action = call_transmit;
1926                return;
1927        }
1928        rpc_exit(task, status);
1929}
1930
1931/*
1932 * 5.   Transmit the RPC request, and wait for reply
1933 */
1934static void
1935call_transmit(struct rpc_task *task)
1936{
1937        int is_retrans = RPC_WAS_SENT(task);
1938
1939        dprint_status(task);
1940
1941        task->tk_action = call_status;
1942        if (task->tk_status < 0)
1943                return;
1944        if (!xprt_prepare_transmit(task))
1945                return;
1946        task->tk_action = call_transmit_status;
1947        /* Encode here so that rpcsec_gss can use correct sequence number. */
1948        if (rpc_task_need_encode(task)) {
1949                rpc_xdr_encode(task);
1950                /* Did the encode result in an error condition? */
1951                if (task->tk_status != 0) {
1952                        /* Was the error nonfatal? */
1953                        if (task->tk_status == -EAGAIN)
1954                                rpc_delay(task, HZ >> 4);
1955                        else
1956                                rpc_exit(task, task->tk_status);
1957                        return;
1958                }
1959        }
1960        xprt_transmit(task);
1961        if (task->tk_status < 0)
1962                return;
1963        if (is_retrans)
1964                task->tk_client->cl_stats->rpcretrans++;
1965        /*
1966         * On success, ensure that we call xprt_end_transmit() before sleeping
1967         * in order to allow access to the socket to other RPC requests.
1968         */
1969        call_transmit_status(task);
1970        if (rpc_reply_expected(task))
1971                return;
1972        task->tk_action = rpc_exit_task;
1973        rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1974}
1975
1976/*
1977 * 5a.  Handle cleanup after a transmission
1978 */
1979static void
1980call_transmit_status(struct rpc_task *task)
1981{
1982        task->tk_action = call_status;
1983
1984        /*
1985         * Common case: success.  Force the compiler to put this
1986         * test first.
1987         */
1988        if (task->tk_status == 0) {
1989                xprt_end_transmit(task);
1990                rpc_task_force_reencode(task);
1991                return;
1992        }
1993
1994        switch (task->tk_status) {
1995        case -EAGAIN:
1996        case -ENOBUFS:
1997                break;
1998        default:
1999                dprint_status(task);
2000                xprt_end_transmit(task);
2001                rpc_task_force_reencode(task);
2002                break;
2003                /*
2004                 * Special cases: if we've been waiting on the
2005                 * socket's write_space() callback, or if the
2006                 * socket just returned a connection error,
2007                 * then hold onto the transport lock.
2008                 */
2009        case -ECONNREFUSED:
2010        case -EHOSTDOWN:
2011        case -ENETDOWN:
2012        case -EHOSTUNREACH:
2013        case -ENETUNREACH:
2014        case -EPERM:
2015                if (RPC_IS_SOFTCONN(task)) {
2016                        xprt_end_transmit(task);
2017                        if (!task->tk_msg.rpc_proc->p_proc)
2018                                trace_xprt_ping(task->tk_xprt,
2019                                                task->tk_status);
2020                        rpc_exit(task, task->tk_status);
2021                        break;
2022                }
2023                /* fall through */
2024        case -ECONNRESET:
2025        case -ECONNABORTED:
2026        case -EADDRINUSE:
2027        case -ENOTCONN:
2028        case -EPIPE:
2029                rpc_task_force_reencode(task);
2030        }
2031}
2032
2033#if defined(CONFIG_SUNRPC_BACKCHANNEL)
2034/*
2035 * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2036 * addition, disconnect on connectivity errors.
2037 */
2038static void
2039call_bc_transmit(struct rpc_task *task)
2040{
2041        struct rpc_rqst *req = task->tk_rqstp;
2042
2043        if (!xprt_prepare_transmit(task))
2044                goto out_retry;
2045
2046        if (task->tk_status < 0) {
2047                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2048                        "error: %d\n", task->tk_status);
2049                goto out_done;
2050        }
2051        if (req->rq_connect_cookie != req->rq_xprt->connect_cookie)
2052                req->rq_bytes_sent = 0;
2053
2054        xprt_transmit(task);
2055
2056        if (task->tk_status == -EAGAIN)
2057                goto out_nospace;
2058
2059        xprt_end_transmit(task);
2060        dprint_status(task);
2061        switch (task->tk_status) {
2062        case 0:
2063                /* Success */
2064        case -ENETDOWN:
2065        case -EHOSTDOWN:
2066        case -EHOSTUNREACH:
2067        case -ENETUNREACH:
2068        case -ECONNRESET:
2069        case -ECONNREFUSED:
2070        case -EADDRINUSE:
2071        case -ENOTCONN:
2072        case -EPIPE:
2073                break;
2074        case -ETIMEDOUT:
2075                /*
2076                 * Problem reaching the server.  Disconnect and let the
2077                 * forechannel reestablish the connection.  The server will
2078                 * have to retransmit the backchannel request and we'll
2079                 * reprocess it.  Since these ops are idempotent, there's no
2080                 * need to cache our reply at this time.
2081                 */
2082                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2083                        "error: %d\n", task->tk_status);
2084                xprt_conditional_disconnect(req->rq_xprt,
2085                        req->rq_connect_cookie);
2086                break;
2087        default:
2088                /*
2089                 * We were unable to reply and will have to drop the
2090                 * request.  The server should reconnect and retransmit.
2091                 */
2092                WARN_ON_ONCE(task->tk_status == -EAGAIN);
2093                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2094                        "error: %d\n", task->tk_status);
2095                break;
2096        }
2097        rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
2098out_done:
2099        task->tk_action = rpc_exit_task;
2100        return;
2101out_nospace:
2102        req->rq_connect_cookie = req->rq_xprt->connect_cookie;
2103out_retry:
2104        task->tk_status = 0;
2105}
2106#endif /* CONFIG_SUNRPC_BACKCHANNEL */
2107
2108/*
2109 * 6.   Sort out the RPC call status
2110 */
2111static void
2112call_status(struct rpc_task *task)
2113{
2114        struct rpc_clnt *clnt = task->tk_client;
2115        struct rpc_rqst *req = task->tk_rqstp;
2116        int             status;
2117
2118        if (!task->tk_msg.rpc_proc->p_proc)
2119                trace_xprt_ping(task->tk_xprt, task->tk_status);
2120
2121        if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
2122                task->tk_status = req->rq_reply_bytes_recvd;
2123
2124        dprint_status(task);
2125
2126        status = task->tk_status;
2127        if (status >= 0) {
2128                task->tk_action = call_decode;
2129                return;
2130        }
2131
2132        trace_rpc_call_status(task);
2133        task->tk_status = 0;
2134        switch(status) {
2135        case -EHOSTDOWN:
2136        case -ENETDOWN:
2137        case -EHOSTUNREACH:
2138        case -ENETUNREACH:
2139        case -EPERM:
2140                if (RPC_IS_SOFTCONN(task)) {
2141                        rpc_exit(task, status);
2142                        break;
2143                }
2144                /*
2145                 * Delay any retries for 3 seconds, then handle as if it
2146                 * were a timeout.
2147                 */
2148                rpc_delay(task, 3*HZ);
2149                /* fall through */
2150        case -ETIMEDOUT:
2151                task->tk_action = call_timeout;
2152                break;
2153        case -ECONNREFUSED:
2154        case -ECONNRESET:
2155        case -ECONNABORTED:
2156                rpc_force_rebind(clnt);
2157                /* fall through */
2158        case -EADDRINUSE:
2159                rpc_delay(task, 3*HZ);
2160                /* fall through */
2161        case -EPIPE:
2162        case -ENOTCONN:
2163                task->tk_action = call_bind;
2164                break;
2165        case -ENOBUFS:
2166                rpc_delay(task, HZ>>2);
2167                /* fall through */
2168        case -EAGAIN:
2169                task->tk_action = call_transmit;
2170                break;
2171        case -EIO:
2172                /* shutdown or soft timeout */
2173                rpc_exit(task, status);
2174                break;
2175        default:
2176                if (clnt->cl_chatty)
2177                        printk("%s: RPC call returned error %d\n",
2178                               clnt->cl_program->name, -status);
2179                rpc_exit(task, status);
2180        }
2181}
2182
2183/*
2184 * 6a.  Handle RPC timeout
2185 *      We do not release the request slot, so we keep using the
2186 *      same XID for all retransmits.
2187 */
2188static void
2189call_timeout(struct rpc_task *task)
2190{
2191        struct rpc_clnt *clnt = task->tk_client;
2192
2193        if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
2194                dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
2195                goto retry;
2196        }
2197
2198        dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
2199        task->tk_timeouts++;
2200
2201        if (RPC_IS_SOFTCONN(task)) {
2202                rpc_exit(task, -ETIMEDOUT);
2203                return;
2204        }
2205        if (RPC_IS_SOFT(task)) {
2206                if (clnt->cl_chatty) {
2207                        printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
2208                                clnt->cl_program->name,
2209                                task->tk_xprt->servername);
2210                }
2211                if (task->tk_flags & RPC_TASK_TIMEOUT)
2212                        rpc_exit(task, -ETIMEDOUT);
2213                else
2214                        rpc_exit(task, -EIO);
2215                return;
2216        }
2217
2218        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2219                task->tk_flags |= RPC_CALL_MAJORSEEN;
2220                if (clnt->cl_chatty) {
2221                        printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
2222                        clnt->cl_program->name,
2223                        task->tk_xprt->servername);
2224                }
2225        }
2226        rpc_force_rebind(clnt);
2227        /*
2228         * Did our request time out due to an RPCSEC_GSS out-of-sequence
2229         * event? RFC2203 requires the server to drop all such requests.
2230         */
2231        rpcauth_invalcred(task);
2232
2233retry:
2234        task->tk_action = call_bind;
2235        task->tk_status = 0;
2236}
2237
2238/*
2239 * 7.   Decode the RPC reply
2240 */
2241static void
2242call_decode(struct rpc_task *task)
2243{
2244        struct rpc_clnt *clnt = task->tk_client;
2245        struct rpc_rqst *req = task->tk_rqstp;
2246        kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
2247        __be32          *p;
2248
2249        dprint_status(task);
2250
2251        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2252                if (clnt->cl_chatty) {
2253                        printk(KERN_NOTICE "%s: server %s OK\n",
2254                                clnt->cl_program->name,
2255                                task->tk_xprt->servername);
2256                }
2257                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2258        }
2259
2260        /*
2261         * Ensure that we see all writes made by xprt_complete_rqst()
2262         * before it changed req->rq_reply_bytes_recvd.
2263         */
2264        smp_rmb();
2265        req->rq_rcv_buf.len = req->rq_private_buf.len;
2266
2267        /* Check that the softirq receive buffer is valid */
2268        WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2269                                sizeof(req->rq_rcv_buf)) != 0);
2270
2271        if (req->rq_rcv_buf.len < 12) {
2272                if (!RPC_IS_SOFT(task)) {
2273                        task->tk_action = call_bind;
2274                        goto out_retry;
2275                }
2276                dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2277                                clnt->cl_program->name, task->tk_status);
2278                task->tk_action = call_timeout;
2279                goto out_retry;
2280        }
2281
2282        p = rpc_verify_header(task);
2283        if (IS_ERR(p)) {
2284                if (p == ERR_PTR(-EAGAIN))
2285                        goto out_retry;
2286                return;
2287        }
2288
2289        task->tk_action = rpc_exit_task;
2290
2291        if (decode) {
2292                task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2293                                                      task->tk_msg.rpc_resp);
2294        }
2295        dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2296                        task->tk_status);
2297        return;
2298out_retry:
2299        task->tk_status = 0;
2300        /* Note: rpc_verify_header() may have freed the RPC slot */
2301        if (task->tk_rqstp == req) {
2302                req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2303                if (task->tk_client->cl_discrtry)
2304                        xprt_conditional_disconnect(req->rq_xprt,
2305                                        req->rq_connect_cookie);
2306        }
2307}
2308
2309static __be32 *
2310rpc_encode_header(struct rpc_task *task)
2311{
2312        struct rpc_clnt *clnt = task->tk_client;
2313        struct rpc_rqst *req = task->tk_rqstp;
2314        __be32          *p = req->rq_svec[0].iov_base;
2315
2316        /* FIXME: check buffer size? */
2317
2318        p = xprt_skip_transport_header(req->rq_xprt, p);
2319        *p++ = req->rq_xid;             /* XID */
2320        *p++ = htonl(RPC_CALL);         /* CALL */
2321        *p++ = htonl(RPC_VERSION);      /* RPC version */
2322        *p++ = htonl(clnt->cl_prog);    /* program number */
2323        *p++ = htonl(clnt->cl_vers);    /* program version */
2324        *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2325        p = rpcauth_marshcred(task, p);
2326        req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2327        return p;
2328}
2329
2330static __be32 *
2331rpc_verify_header(struct rpc_task *task)
2332{
2333        struct rpc_clnt *clnt = task->tk_client;
2334        struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2335        int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2336        __be32  *p = iov->iov_base;
2337        u32 n;
2338        int error = -EACCES;
2339
2340        if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2341                /* RFC-1014 says that the representation of XDR data must be a
2342                 * multiple of four bytes
2343                 * - if it isn't pointer subtraction in the NFS client may give
2344                 *   undefined results
2345                 */
2346                dprintk("RPC: %5u %s: XDR representation not a multiple of"
2347                       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2348                       task->tk_rqstp->rq_rcv_buf.len);
2349                error = -EIO;
2350                goto out_err;
2351        }
2352        if ((len -= 3) < 0)
2353                goto out_overflow;
2354
2355        p += 1; /* skip XID */
2356        if ((n = ntohl(*p++)) != RPC_REPLY) {
2357                dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2358                        task->tk_pid, __func__, n);
2359                error = -EIO;
2360                goto out_garbage;
2361        }
2362
2363        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2364                if (--len < 0)
2365                        goto out_overflow;
2366                switch ((n = ntohl(*p++))) {
2367                case RPC_AUTH_ERROR:
2368                        break;
2369                case RPC_MISMATCH:
2370                        dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2371                                task->tk_pid, __func__);
2372                        error = -EPROTONOSUPPORT;
2373                        goto out_err;
2374                default:
2375                        dprintk("RPC: %5u %s: RPC call rejected, "
2376                                "unknown error: %x\n",
2377                                task->tk_pid, __func__, n);
2378                        error = -EIO;
2379                        goto out_err;
2380                }
2381                if (--len < 0)
2382                        goto out_overflow;
2383                switch ((n = ntohl(*p++))) {
2384                case RPC_AUTH_REJECTEDCRED:
2385                case RPC_AUTH_REJECTEDVERF:
2386                case RPCSEC_GSS_CREDPROBLEM:
2387                case RPCSEC_GSS_CTXPROBLEM:
2388                        if (!task->tk_cred_retry)
2389                                break;
2390                        task->tk_cred_retry--;
2391                        dprintk("RPC: %5u %s: retry stale creds\n",
2392                                        task->tk_pid, __func__);
2393                        rpcauth_invalcred(task);
2394                        /* Ensure we obtain a new XID! */
2395                        xprt_release(task);
2396                        task->tk_action = call_reserve;
2397                        goto out_retry;
2398                case RPC_AUTH_BADCRED:
2399                case RPC_AUTH_BADVERF:
2400                        /* possibly garbled cred/verf? */
2401                        if (!task->tk_garb_retry)
2402                                break;
2403                        task->tk_garb_retry--;
2404                        dprintk("RPC: %5u %s: retry garbled creds\n",
2405                                        task->tk_pid, __func__);
2406                        task->tk_action = call_bind;
2407                        goto out_retry;
2408                case RPC_AUTH_TOOWEAK:
2409                        printk(KERN_NOTICE "RPC: server %s requires stronger "
2410                               "authentication.\n",
2411                               task->tk_xprt->servername);
2412                        break;
2413                default:
2414                        dprintk("RPC: %5u %s: unknown auth error: %x\n",
2415                                        task->tk_pid, __func__, n);
2416                        error = -EIO;
2417                }
2418                dprintk("RPC: %5u %s: call rejected %d\n",
2419                                task->tk_pid, __func__, n);
2420                goto out_err;
2421        }
2422        p = rpcauth_checkverf(task, p);
2423        if (IS_ERR(p)) {
2424                error = PTR_ERR(p);
2425                dprintk("RPC: %5u %s: auth check failed with %d\n",
2426                                task->tk_pid, __func__, error);
2427                goto out_garbage;               /* bad verifier, retry */
2428        }
2429        len = p - (__be32 *)iov->iov_base - 1;
2430        if (len < 0)
2431                goto out_overflow;
2432        switch ((n = ntohl(*p++))) {
2433        case RPC_SUCCESS:
2434                return p;
2435        case RPC_PROG_UNAVAIL:
2436                dprintk("RPC: %5u %s: program %u is unsupported "
2437                                "by server %s\n", task->tk_pid, __func__,
2438                                (unsigned int)clnt->cl_prog,
2439                                task->tk_xprt->servername);
2440                error = -EPFNOSUPPORT;
2441                goto out_err;
2442        case RPC_PROG_MISMATCH:
2443                dprintk("RPC: %5u %s: program %u, version %u unsupported "
2444                                "by server %s\n", task->tk_pid, __func__,
2445                                (unsigned int)clnt->cl_prog,
2446                                (unsigned int)clnt->cl_vers,
2447                                task->tk_xprt->servername);
2448                error = -EPROTONOSUPPORT;
2449                goto out_err;
2450        case RPC_PROC_UNAVAIL:
2451                dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
2452                                "version %u on server %s\n",
2453                                task->tk_pid, __func__,
2454                                rpc_proc_name(task),
2455                                clnt->cl_prog, clnt->cl_vers,
2456                                task->tk_xprt->servername);
2457                error = -EOPNOTSUPP;
2458                goto out_err;
2459        case RPC_GARBAGE_ARGS:
2460                dprintk("RPC: %5u %s: server saw garbage\n",
2461                                task->tk_pid, __func__);
2462                break;                  /* retry */
2463        default:
2464                dprintk("RPC: %5u %s: server accept status: %x\n",
2465                                task->tk_pid, __func__, n);
2466                /* Also retry */
2467        }
2468
2469out_garbage:
2470        clnt->cl_stats->rpcgarbage++;
2471        if (task->tk_garb_retry) {
2472                task->tk_garb_retry--;
2473                dprintk("RPC: %5u %s: retrying\n",
2474                                task->tk_pid, __func__);
2475                task->tk_action = call_bind;
2476out_retry:
2477                return ERR_PTR(-EAGAIN);
2478        }
2479out_err:
2480        rpc_exit(task, error);
2481        dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2482                        __func__, error);
2483        return ERR_PTR(error);
2484out_overflow:
2485        dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2486                        __func__);
2487        goto out_garbage;
2488}
2489
2490static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2491                const void *obj)
2492{
2493}
2494
2495static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2496                void *obj)
2497{
2498        return 0;
2499}
2500
2501static const struct rpc_procinfo rpcproc_null = {
2502        .p_encode = rpcproc_encode_null,
2503        .p_decode = rpcproc_decode_null,
2504};
2505
2506static int rpc_ping(struct rpc_clnt *clnt)
2507{
2508        struct rpc_message msg = {
2509                .rpc_proc = &rpcproc_null,
2510        };
2511        int err;
2512        msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2513        err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2514        put_rpccred(msg.rpc_cred);
2515        return err;
2516}
2517
2518static
2519struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2520                struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2521                const struct rpc_call_ops *ops, void *data)
2522{
2523        struct rpc_message msg = {
2524                .rpc_proc = &rpcproc_null,
2525                .rpc_cred = cred,
2526        };
2527        struct rpc_task_setup task_setup_data = {
2528                .rpc_client = clnt,
2529                .rpc_xprt = xprt,
2530                .rpc_message = &msg,
2531                .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
2532                .callback_data = data,
2533                .flags = flags,
2534        };
2535
2536        return rpc_run_task(&task_setup_data);
2537}
2538
2539struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2540{
2541        return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2542}
2543EXPORT_SYMBOL_GPL(rpc_call_null);
2544
2545struct rpc_cb_add_xprt_calldata {
2546        struct rpc_xprt_switch *xps;
2547        struct rpc_xprt *xprt;
2548};
2549
2550static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2551{
2552        struct rpc_cb_add_xprt_calldata *data = calldata;
2553
2554        if (task->tk_status == 0)
2555                rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2556}
2557
2558static void rpc_cb_add_xprt_release(void *calldata)
2559{
2560        struct rpc_cb_add_xprt_calldata *data = calldata;
2561
2562        xprt_put(data->xprt);
2563        xprt_switch_put(data->xps);
2564        kfree(data);
2565}
2566
2567static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2568        .rpc_call_done = rpc_cb_add_xprt_done,
2569        .rpc_release = rpc_cb_add_xprt_release,
2570};
2571
2572/**
2573 * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2574 * @clnt: pointer to struct rpc_clnt
2575 * @xps: pointer to struct rpc_xprt_switch,
2576 * @xprt: pointer struct rpc_xprt
2577 * @dummy: unused
2578 */
2579int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2580                struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2581                void *dummy)
2582{
2583        struct rpc_cb_add_xprt_calldata *data;
2584        struct rpc_cred *cred;
2585        struct rpc_task *task;
2586
2587        data = kmalloc(sizeof(*data), GFP_NOFS);
2588        if (!data)
2589                return -ENOMEM;
2590        data->xps = xprt_switch_get(xps);
2591        data->xprt = xprt_get(xprt);
2592
2593        cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2594        task = rpc_call_null_helper(clnt, xprt, cred,
2595                        RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC,
2596                        &rpc_cb_add_xprt_call_ops, data);
2597        put_rpccred(cred);
2598        if (IS_ERR(task))
2599                return PTR_ERR(task);
2600        rpc_put_task(task);
2601        return 1;
2602}
2603EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2604
2605/**
2606 * rpc_clnt_setup_test_and_add_xprt()
2607 *
2608 * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2609 *   1) caller of the test function must dereference the rpc_xprt_switch
2610 *   and the rpc_xprt.
2611 *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2612 *   the rpc_call_done routine.
2613 *
2614 * Upon success (return of 1), the test function adds the new
2615 * transport to the rpc_clnt xprt switch
2616 *
2617 * @clnt: struct rpc_clnt to get the new transport
2618 * @xps:  the rpc_xprt_switch to hold the new transport
2619 * @xprt: the rpc_xprt to test
2620 * @data: a struct rpc_add_xprt_test pointer that holds the test function
2621 *        and test function call data
2622 */
2623int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2624                                     struct rpc_xprt_switch *xps,
2625                                     struct rpc_xprt *xprt,
2626                                     void *data)
2627{
2628        struct rpc_cred *cred;
2629        struct rpc_task *task;
2630        struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2631        int status = -EADDRINUSE;
2632
2633        xprt = xprt_get(xprt);
2634        xprt_switch_get(xps);
2635
2636        if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2637                goto out_err;
2638
2639        /* Test the connection */
2640        cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2641        task = rpc_call_null_helper(clnt, xprt, cred,
2642                                    RPC_TASK_SOFT | RPC_TASK_SOFTCONN,
2643                                    NULL, NULL);
2644        put_rpccred(cred);
2645        if (IS_ERR(task)) {
2646                status = PTR_ERR(task);
2647                goto out_err;
2648        }
2649        status = task->tk_status;
2650        rpc_put_task(task);
2651
2652        if (status < 0)
2653                goto out_err;
2654
2655        /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2656        xtest->add_xprt_test(clnt, xprt, xtest->data);
2657
2658        /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2659        return 1;
2660out_err:
2661        xprt_put(xprt);
2662        xprt_switch_put(xps);
2663        pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2664                status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2665        return status;
2666}
2667EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2668
2669/**
2670 * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2671 * @clnt: pointer to struct rpc_clnt
2672 * @xprtargs: pointer to struct xprt_create
2673 * @setup: callback to test and/or set up the connection
2674 * @data: pointer to setup function data
2675 *
2676 * Creates a new transport using the parameters set in args and
2677 * adds it to clnt.
2678 * If ping is set, then test that connectivity succeeds before
2679 * adding the new transport.
2680 *
2681 */
2682int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2683                struct xprt_create *xprtargs,
2684                int (*setup)(struct rpc_clnt *,
2685                        struct rpc_xprt_switch *,
2686                        struct rpc_xprt *,
2687                        void *),
2688                void *data)
2689{
2690        struct rpc_xprt_switch *xps;
2691        struct rpc_xprt *xprt;
2692        unsigned long connect_timeout;
2693        unsigned long reconnect_timeout;
2694        unsigned char resvport;
2695        int ret = 0;
2696
2697        rcu_read_lock();
2698        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2699        xprt = xprt_iter_xprt(&clnt->cl_xpi);
2700        if (xps == NULL || xprt == NULL) {
2701                rcu_read_unlock();
2702                return -EAGAIN;
2703        }
2704        resvport = xprt->resvport;
2705        connect_timeout = xprt->connect_timeout;
2706        reconnect_timeout = xprt->max_reconnect_timeout;
2707        rcu_read_unlock();
2708
2709        xprt = xprt_create_transport(xprtargs);
2710        if (IS_ERR(xprt)) {
2711                ret = PTR_ERR(xprt);
2712                goto out_put_switch;
2713        }
2714        xprt->resvport = resvport;
2715        if (xprt->ops->set_connect_timeout != NULL)
2716                xprt->ops->set_connect_timeout(xprt,
2717                                connect_timeout,
2718                                reconnect_timeout);
2719
2720        rpc_xprt_switch_set_roundrobin(xps);
2721        if (setup) {
2722                ret = setup(clnt, xps, xprt, data);
2723                if (ret != 0)
2724                        goto out_put_xprt;
2725        }
2726        rpc_xprt_switch_add_xprt(xps, xprt);
2727out_put_xprt:
2728        xprt_put(xprt);
2729out_put_switch:
2730        xprt_switch_put(xps);
2731        return ret;
2732}
2733EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2734
2735struct connect_timeout_data {
2736        unsigned long connect_timeout;
2737        unsigned long reconnect_timeout;
2738};
2739
2740static int
2741rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2742                struct rpc_xprt *xprt,
2743                void *data)
2744{
2745        struct connect_timeout_data *timeo = data;
2746
2747        if (xprt->ops->set_connect_timeout)
2748                xprt->ops->set_connect_timeout(xprt,
2749                                timeo->connect_timeout,
2750                                timeo->reconnect_timeout);
2751        return 0;
2752}
2753
2754void
2755rpc_set_connect_timeout(struct rpc_clnt *clnt,
2756                unsigned long connect_timeout,
2757                unsigned long reconnect_timeout)
2758{
2759        struct connect_timeout_data timeout = {
2760                .connect_timeout = connect_timeout,
2761                .reconnect_timeout = reconnect_timeout,
2762        };
2763        rpc_clnt_iterate_for_each_xprt(clnt,
2764                        rpc_xprt_set_connect_timeout,
2765                        &timeout);
2766}
2767EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2768
2769void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2770{
2771        rcu_read_lock();
2772        xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2773        rcu_read_unlock();
2774}
2775EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
2776
2777void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
2778{
2779        rcu_read_lock();
2780        rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
2781                                 xprt);
2782        rcu_read_unlock();
2783}
2784EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
2785
2786bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
2787                                   const struct sockaddr *sap)
2788{
2789        struct rpc_xprt_switch *xps;
2790        bool ret;
2791
2792        rcu_read_lock();
2793        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
2794        ret = rpc_xprt_switch_has_addr(xps, sap);
2795        rcu_read_unlock();
2796        return ret;
2797}
2798EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
2799
2800#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2801static void rpc_show_header(void)
2802{
2803        printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2804                "-timeout ---ops--\n");
2805}
2806
2807static void rpc_show_task(const struct rpc_clnt *clnt,
2808                          const struct rpc_task *task)
2809{
2810        const char *rpc_waitq = "none";
2811
2812        if (RPC_IS_QUEUED(task))
2813                rpc_waitq = rpc_qname(task->tk_waitqueue);
2814
2815        printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2816                task->tk_pid, task->tk_flags, task->tk_status,
2817                clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2818                clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2819                task->tk_action, rpc_waitq);
2820}
2821
2822void rpc_show_tasks(struct net *net)
2823{
2824        struct rpc_clnt *clnt;
2825        struct rpc_task *task;
2826        int header = 0;
2827        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2828
2829        spin_lock(&sn->rpc_client_lock);
2830        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2831                spin_lock(&clnt->cl_lock);
2832                list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2833                        if (!header) {
2834                                rpc_show_header();
2835                                header++;
2836                        }
2837                        rpc_show_task(clnt, task);
2838                }
2839                spin_unlock(&clnt->cl_lock);
2840        }
2841        spin_unlock(&sn->rpc_client_lock);
2842}
2843#endif
2844
2845#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
2846static int
2847rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
2848                struct rpc_xprt *xprt,
2849                void *dummy)
2850{
2851        return xprt_enable_swap(xprt);
2852}
2853
2854int
2855rpc_clnt_swap_activate(struct rpc_clnt *clnt)
2856{
2857        if (atomic_inc_return(&clnt->cl_swapper) == 1)
2858                return rpc_clnt_iterate_for_each_xprt(clnt,
2859                                rpc_clnt_swap_activate_callback, NULL);
2860        return 0;
2861}
2862EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
2863
2864static int
2865rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
2866                struct rpc_xprt *xprt,
2867                void *dummy)
2868{
2869        xprt_disable_swap(xprt);
2870        return 0;
2871}
2872
2873void
2874rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
2875{
2876        if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
2877                rpc_clnt_iterate_for_each_xprt(clnt,
2878                                rpc_clnt_swap_deactivate_callback, NULL);
2879}
2880EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
2881#endif /* CONFIG_SUNRPC_SWAP */
2882