linux/net/sunrpc/clnt.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 *  linux/net/sunrpc/clnt.c
   4 *
   5 *  This file contains the high-level RPC interface.
   6 *  It is modeled as a finite state machine to support both synchronous
   7 *  and asynchronous requests.
   8 *
   9 *  -   RPC header generation and argument serialization.
  10 *  -   Credential refresh.
  11 *  -   TCP connect handling.
  12 *  -   Retry of operation when it is suspected the operation failed because
  13 *      of uid squashing on the server, or when the credentials were stale
  14 *      and need to be refreshed, or when a packet was damaged in transit.
  15 *      This may be have to be moved to the VFS layer.
  16 *
  17 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
  18 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
  19 */
  20
  21
  22#include <linux/module.h>
  23#include <linux/types.h>
  24#include <linux/kallsyms.h>
  25#include <linux/mm.h>
  26#include <linux/namei.h>
  27#include <linux/mount.h>
  28#include <linux/slab.h>
  29#include <linux/rcupdate.h>
  30#include <linux/utsname.h>
  31#include <linux/workqueue.h>
  32#include <linux/in.h>
  33#include <linux/in6.h>
  34#include <linux/un.h>
  35
  36#include <linux/sunrpc/clnt.h>
  37#include <linux/sunrpc/addr.h>
  38#include <linux/sunrpc/rpc_pipe_fs.h>
  39#include <linux/sunrpc/metrics.h>
  40#include <linux/sunrpc/bc_xprt.h>
  41#include <trace/events/sunrpc.h>
  42
  43#include "sunrpc.h"
  44#include "sysfs.h"
  45#include "netns.h"
  46
  47#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  48# define RPCDBG_FACILITY        RPCDBG_CALL
  49#endif
  50
  51/*
  52 * All RPC clients are linked into this list
  53 */
  54
  55static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
  56
  57
  58static void     call_start(struct rpc_task *task);
  59static void     call_reserve(struct rpc_task *task);
  60static void     call_reserveresult(struct rpc_task *task);
  61static void     call_allocate(struct rpc_task *task);
  62static void     call_encode(struct rpc_task *task);
  63static void     call_decode(struct rpc_task *task);
  64static void     call_bind(struct rpc_task *task);
  65static void     call_bind_status(struct rpc_task *task);
  66static void     call_transmit(struct rpc_task *task);
  67static void     call_status(struct rpc_task *task);
  68static void     call_transmit_status(struct rpc_task *task);
  69static void     call_refresh(struct rpc_task *task);
  70static void     call_refreshresult(struct rpc_task *task);
  71static void     call_connect(struct rpc_task *task);
  72static void     call_connect_status(struct rpc_task *task);
  73
  74static int      rpc_encode_header(struct rpc_task *task,
  75                                  struct xdr_stream *xdr);
  76static int      rpc_decode_header(struct rpc_task *task,
  77                                  struct xdr_stream *xdr);
  78static int      rpc_ping(struct rpc_clnt *clnt);
  79static int      rpc_ping_noreply(struct rpc_clnt *clnt);
  80static void     rpc_check_timeout(struct rpc_task *task);
  81
  82static void rpc_register_client(struct rpc_clnt *clnt)
  83{
  84        struct net *net = rpc_net_ns(clnt);
  85        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  86
  87        spin_lock(&sn->rpc_client_lock);
  88        list_add(&clnt->cl_clients, &sn->all_clients);
  89        spin_unlock(&sn->rpc_client_lock);
  90}
  91
  92static void rpc_unregister_client(struct rpc_clnt *clnt)
  93{
  94        struct net *net = rpc_net_ns(clnt);
  95        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  96
  97        spin_lock(&sn->rpc_client_lock);
  98        list_del(&clnt->cl_clients);
  99        spin_unlock(&sn->rpc_client_lock);
 100}
 101
 102static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 103{
 104        rpc_remove_client_dir(clnt);
 105}
 106
 107static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 108{
 109        struct net *net = rpc_net_ns(clnt);
 110        struct super_block *pipefs_sb;
 111
 112        pipefs_sb = rpc_get_sb_net(net);
 113        if (pipefs_sb) {
 114                __rpc_clnt_remove_pipedir(clnt);
 115                rpc_put_sb_net(net);
 116        }
 117}
 118
 119static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
 120                                    struct rpc_clnt *clnt)
 121{
 122        static uint32_t clntid;
 123        const char *dir_name = clnt->cl_program->pipe_dir_name;
 124        char name[15];
 125        struct dentry *dir, *dentry;
 126
 127        dir = rpc_d_lookup_sb(sb, dir_name);
 128        if (dir == NULL) {
 129                pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
 130                return dir;
 131        }
 132        for (;;) {
 133                snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
 134                name[sizeof(name) - 1] = '\0';
 135                dentry = rpc_create_client_dir(dir, name, clnt);
 136                if (!IS_ERR(dentry))
 137                        break;
 138                if (dentry == ERR_PTR(-EEXIST))
 139                        continue;
 140                printk(KERN_INFO "RPC: Couldn't create pipefs entry"
 141                                " %s/%s, error %ld\n",
 142                                dir_name, name, PTR_ERR(dentry));
 143                break;
 144        }
 145        dput(dir);
 146        return dentry;
 147}
 148
 149static int
 150rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
 151{
 152        struct dentry *dentry;
 153
 154        if (clnt->cl_program->pipe_dir_name != NULL) {
 155                dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
 156                if (IS_ERR(dentry))
 157                        return PTR_ERR(dentry);
 158        }
 159        return 0;
 160}
 161
 162static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
 163{
 164        if (clnt->cl_program->pipe_dir_name == NULL)
 165                return 1;
 166
 167        switch (event) {
 168        case RPC_PIPEFS_MOUNT:
 169                if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
 170                        return 1;
 171                if (refcount_read(&clnt->cl_count) == 0)
 172                        return 1;
 173                break;
 174        case RPC_PIPEFS_UMOUNT:
 175                if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
 176                        return 1;
 177                break;
 178        }
 179        return 0;
 180}
 181
 182static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
 183                                   struct super_block *sb)
 184{
 185        struct dentry *dentry;
 186
 187        switch (event) {
 188        case RPC_PIPEFS_MOUNT:
 189                dentry = rpc_setup_pipedir_sb(sb, clnt);
 190                if (!dentry)
 191                        return -ENOENT;
 192                if (IS_ERR(dentry))
 193                        return PTR_ERR(dentry);
 194                break;
 195        case RPC_PIPEFS_UMOUNT:
 196                __rpc_clnt_remove_pipedir(clnt);
 197                break;
 198        default:
 199                printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
 200                return -ENOTSUPP;
 201        }
 202        return 0;
 203}
 204
 205static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
 206                                struct super_block *sb)
 207{
 208        int error = 0;
 209
 210        for (;; clnt = clnt->cl_parent) {
 211                if (!rpc_clnt_skip_event(clnt, event))
 212                        error = __rpc_clnt_handle_event(clnt, event, sb);
 213                if (error || clnt == clnt->cl_parent)
 214                        break;
 215        }
 216        return error;
 217}
 218
 219static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
 220{
 221        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
 222        struct rpc_clnt *clnt;
 223
 224        spin_lock(&sn->rpc_client_lock);
 225        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
 226                if (rpc_clnt_skip_event(clnt, event))
 227                        continue;
 228                spin_unlock(&sn->rpc_client_lock);
 229                return clnt;
 230        }
 231        spin_unlock(&sn->rpc_client_lock);
 232        return NULL;
 233}
 234
 235static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
 236                            void *ptr)
 237{
 238        struct super_block *sb = ptr;
 239        struct rpc_clnt *clnt;
 240        int error = 0;
 241
 242        while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
 243                error = __rpc_pipefs_event(clnt, event, sb);
 244                if (error)
 245                        break;
 246        }
 247        return error;
 248}
 249
 250static struct notifier_block rpc_clients_block = {
 251        .notifier_call  = rpc_pipefs_event,
 252        .priority       = SUNRPC_PIPEFS_RPC_PRIO,
 253};
 254
 255int rpc_clients_notifier_register(void)
 256{
 257        return rpc_pipefs_notifier_register(&rpc_clients_block);
 258}
 259
 260void rpc_clients_notifier_unregister(void)
 261{
 262        return rpc_pipefs_notifier_unregister(&rpc_clients_block);
 263}
 264
 265static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
 266                struct rpc_xprt *xprt,
 267                const struct rpc_timeout *timeout)
 268{
 269        struct rpc_xprt *old;
 270
 271        spin_lock(&clnt->cl_lock);
 272        old = rcu_dereference_protected(clnt->cl_xprt,
 273                        lockdep_is_held(&clnt->cl_lock));
 274
 275        if (!xprt_bound(xprt))
 276                clnt->cl_autobind = 1;
 277
 278        clnt->cl_timeout = timeout;
 279        rcu_assign_pointer(clnt->cl_xprt, xprt);
 280        spin_unlock(&clnt->cl_lock);
 281
 282        return old;
 283}
 284
 285static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
 286{
 287        clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
 288                        nodename, sizeof(clnt->cl_nodename));
 289}
 290
 291static int rpc_client_register(struct rpc_clnt *clnt,
 292                               rpc_authflavor_t pseudoflavor,
 293                               const char *client_name)
 294{
 295        struct rpc_auth_create_args auth_args = {
 296                .pseudoflavor = pseudoflavor,
 297                .target_name = client_name,
 298        };
 299        struct rpc_auth *auth;
 300        struct net *net = rpc_net_ns(clnt);
 301        struct super_block *pipefs_sb;
 302        int err;
 303
 304        rpc_clnt_debugfs_register(clnt);
 305
 306        pipefs_sb = rpc_get_sb_net(net);
 307        if (pipefs_sb) {
 308                err = rpc_setup_pipedir(pipefs_sb, clnt);
 309                if (err)
 310                        goto out;
 311        }
 312
 313        rpc_register_client(clnt);
 314        if (pipefs_sb)
 315                rpc_put_sb_net(net);
 316
 317        auth = rpcauth_create(&auth_args, clnt);
 318        if (IS_ERR(auth)) {
 319                dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
 320                                pseudoflavor);
 321                err = PTR_ERR(auth);
 322                goto err_auth;
 323        }
 324        return 0;
 325err_auth:
 326        pipefs_sb = rpc_get_sb_net(net);
 327        rpc_unregister_client(clnt);
 328        __rpc_clnt_remove_pipedir(clnt);
 329out:
 330        if (pipefs_sb)
 331                rpc_put_sb_net(net);
 332        rpc_sysfs_client_destroy(clnt);
 333        rpc_clnt_debugfs_unregister(clnt);
 334        return err;
 335}
 336
 337static DEFINE_IDA(rpc_clids);
 338
 339void rpc_cleanup_clids(void)
 340{
 341        ida_destroy(&rpc_clids);
 342}
 343
 344static int rpc_alloc_clid(struct rpc_clnt *clnt)
 345{
 346        int clid;
 347
 348        clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
 349        if (clid < 0)
 350                return clid;
 351        clnt->cl_clid = clid;
 352        return 0;
 353}
 354
 355static void rpc_free_clid(struct rpc_clnt *clnt)
 356{
 357        ida_simple_remove(&rpc_clids, clnt->cl_clid);
 358}
 359
 360static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
 361                struct rpc_xprt_switch *xps,
 362                struct rpc_xprt *xprt,
 363                struct rpc_clnt *parent)
 364{
 365        const struct rpc_program *program = args->program;
 366        const struct rpc_version *version;
 367        struct rpc_clnt *clnt = NULL;
 368        const struct rpc_timeout *timeout;
 369        const char *nodename = args->nodename;
 370        int err;
 371
 372        err = rpciod_up();
 373        if (err)
 374                goto out_no_rpciod;
 375
 376        err = -EINVAL;
 377        if (args->version >= program->nrvers)
 378                goto out_err;
 379        version = program->version[args->version];
 380        if (version == NULL)
 381                goto out_err;
 382
 383        err = -ENOMEM;
 384        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
 385        if (!clnt)
 386                goto out_err;
 387        clnt->cl_parent = parent ? : clnt;
 388
 389        err = rpc_alloc_clid(clnt);
 390        if (err)
 391                goto out_no_clid;
 392
 393        clnt->cl_cred     = get_cred(args->cred);
 394        clnt->cl_procinfo = version->procs;
 395        clnt->cl_maxproc  = version->nrprocs;
 396        clnt->cl_prog     = args->prognumber ? : program->number;
 397        clnt->cl_vers     = version->number;
 398        clnt->cl_stats    = program->stats;
 399        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
 400        rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
 401        err = -ENOMEM;
 402        if (clnt->cl_metrics == NULL)
 403                goto out_no_stats;
 404        clnt->cl_program  = program;
 405        INIT_LIST_HEAD(&clnt->cl_tasks);
 406        spin_lock_init(&clnt->cl_lock);
 407
 408        timeout = xprt->timeout;
 409        if (args->timeout != NULL) {
 410                memcpy(&clnt->cl_timeout_default, args->timeout,
 411                                sizeof(clnt->cl_timeout_default));
 412                timeout = &clnt->cl_timeout_default;
 413        }
 414
 415        rpc_clnt_set_transport(clnt, xprt, timeout);
 416        xprt->main = true;
 417        xprt_iter_init(&clnt->cl_xpi, xps);
 418        xprt_switch_put(xps);
 419
 420        clnt->cl_rtt = &clnt->cl_rtt_default;
 421        rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
 422
 423        refcount_set(&clnt->cl_count, 1);
 424
 425        if (nodename == NULL)
 426                nodename = utsname()->nodename;
 427        /* save the nodename */
 428        rpc_clnt_set_nodename(clnt, nodename);
 429
 430        rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
 431        err = rpc_client_register(clnt, args->authflavor, args->client_name);
 432        if (err)
 433                goto out_no_path;
 434        if (parent)
 435                refcount_inc(&parent->cl_count);
 436
 437        trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
 438        return clnt;
 439
 440out_no_path:
 441        rpc_free_iostats(clnt->cl_metrics);
 442out_no_stats:
 443        put_cred(clnt->cl_cred);
 444        rpc_free_clid(clnt);
 445out_no_clid:
 446        kfree(clnt);
 447out_err:
 448        rpciod_down();
 449out_no_rpciod:
 450        xprt_switch_put(xps);
 451        xprt_put(xprt);
 452        trace_rpc_clnt_new_err(program->name, args->servername, err);
 453        return ERR_PTR(err);
 454}
 455
 456static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
 457                                        struct rpc_xprt *xprt)
 458{
 459        struct rpc_clnt *clnt = NULL;
 460        struct rpc_xprt_switch *xps;
 461
 462        if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
 463                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 464                xps = args->bc_xprt->xpt_bc_xps;
 465                xprt_switch_get(xps);
 466        } else {
 467                xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 468                if (xps == NULL) {
 469                        xprt_put(xprt);
 470                        return ERR_PTR(-ENOMEM);
 471                }
 472                if (xprt->bc_xprt) {
 473                        xprt_switch_get(xps);
 474                        xprt->bc_xprt->xpt_bc_xps = xps;
 475                }
 476        }
 477        clnt = rpc_new_client(args, xps, xprt, NULL);
 478        if (IS_ERR(clnt))
 479                return clnt;
 480
 481        if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
 482                int err = rpc_ping(clnt);
 483                if (err != 0) {
 484                        rpc_shutdown_client(clnt);
 485                        return ERR_PTR(err);
 486                }
 487        } else if (args->flags & RPC_CLNT_CREATE_CONNECTED) {
 488                int err = rpc_ping_noreply(clnt);
 489                if (err != 0) {
 490                        rpc_shutdown_client(clnt);
 491                        return ERR_PTR(err);
 492                }
 493        }
 494
 495        clnt->cl_softrtry = 1;
 496        if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
 497                clnt->cl_softrtry = 0;
 498                if (args->flags & RPC_CLNT_CREATE_SOFTERR)
 499                        clnt->cl_softerr = 1;
 500        }
 501
 502        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
 503                clnt->cl_autobind = 1;
 504        if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
 505                clnt->cl_noretranstimeo = 1;
 506        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
 507                clnt->cl_discrtry = 1;
 508        if (!(args->flags & RPC_CLNT_CREATE_QUIET))
 509                clnt->cl_chatty = 1;
 510
 511        return clnt;
 512}
 513
 514/**
 515 * rpc_create - create an RPC client and transport with one call
 516 * @args: rpc_clnt create argument structure
 517 *
 518 * Creates and initializes an RPC transport and an RPC client.
 519 *
 520 * It can ping the server in order to determine if it is up, and to see if
 521 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
 522 * this behavior so asynchronous tasks can also use rpc_create.
 523 */
 524struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 525{
 526        struct rpc_xprt *xprt;
 527        struct xprt_create xprtargs = {
 528                .net = args->net,
 529                .ident = args->protocol,
 530                .srcaddr = args->saddress,
 531                .dstaddr = args->address,
 532                .addrlen = args->addrsize,
 533                .servername = args->servername,
 534                .bc_xprt = args->bc_xprt,
 535        };
 536        char servername[48];
 537        struct rpc_clnt *clnt;
 538        int i;
 539
 540        if (args->bc_xprt) {
 541                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 542                xprt = args->bc_xprt->xpt_bc_xprt;
 543                if (xprt) {
 544                        xprt_get(xprt);
 545                        return rpc_create_xprt(args, xprt);
 546                }
 547        }
 548
 549        if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
 550                xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
 551        if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
 552                xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
 553        /*
 554         * If the caller chooses not to specify a hostname, whip
 555         * up a string representation of the passed-in address.
 556         */
 557        if (xprtargs.servername == NULL) {
 558                struct sockaddr_un *sun =
 559                                (struct sockaddr_un *)args->address;
 560                struct sockaddr_in *sin =
 561                                (struct sockaddr_in *)args->address;
 562                struct sockaddr_in6 *sin6 =
 563                                (struct sockaddr_in6 *)args->address;
 564
 565                servername[0] = '\0';
 566                switch (args->address->sa_family) {
 567                case AF_LOCAL:
 568                        snprintf(servername, sizeof(servername), "%s",
 569                                 sun->sun_path);
 570                        break;
 571                case AF_INET:
 572                        snprintf(servername, sizeof(servername), "%pI4",
 573                                 &sin->sin_addr.s_addr);
 574                        break;
 575                case AF_INET6:
 576                        snprintf(servername, sizeof(servername), "%pI6",
 577                                 &sin6->sin6_addr);
 578                        break;
 579                default:
 580                        /* caller wants default server name, but
 581                         * address family isn't recognized. */
 582                        return ERR_PTR(-EINVAL);
 583                }
 584                xprtargs.servername = servername;
 585        }
 586
 587        xprt = xprt_create_transport(&xprtargs);
 588        if (IS_ERR(xprt))
 589                return (struct rpc_clnt *)xprt;
 590
 591        /*
 592         * By default, kernel RPC client connects from a reserved port.
 593         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
 594         * but it is always enabled for rpciod, which handles the connect
 595         * operation.
 596         */
 597        xprt->resvport = 1;
 598        if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 599                xprt->resvport = 0;
 600        xprt->reuseport = 0;
 601        if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
 602                xprt->reuseport = 1;
 603
 604        clnt = rpc_create_xprt(args, xprt);
 605        if (IS_ERR(clnt) || args->nconnect <= 1)
 606                return clnt;
 607
 608        for (i = 0; i < args->nconnect - 1; i++) {
 609                if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
 610                        break;
 611        }
 612        return clnt;
 613}
 614EXPORT_SYMBOL_GPL(rpc_create);
 615
 616/*
 617 * This function clones the RPC client structure. It allows us to share the
 618 * same transport while varying parameters such as the authentication
 619 * flavour.
 620 */
 621static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
 622                                           struct rpc_clnt *clnt)
 623{
 624        struct rpc_xprt_switch *xps;
 625        struct rpc_xprt *xprt;
 626        struct rpc_clnt *new;
 627        int err;
 628
 629        err = -ENOMEM;
 630        rcu_read_lock();
 631        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
 632        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 633        rcu_read_unlock();
 634        if (xprt == NULL || xps == NULL) {
 635                xprt_put(xprt);
 636                xprt_switch_put(xps);
 637                goto out_err;
 638        }
 639        args->servername = xprt->servername;
 640        args->nodename = clnt->cl_nodename;
 641
 642        new = rpc_new_client(args, xps, xprt, clnt);
 643        if (IS_ERR(new))
 644                return new;
 645
 646        /* Turn off autobind on clones */
 647        new->cl_autobind = 0;
 648        new->cl_softrtry = clnt->cl_softrtry;
 649        new->cl_softerr = clnt->cl_softerr;
 650        new->cl_noretranstimeo = clnt->cl_noretranstimeo;
 651        new->cl_discrtry = clnt->cl_discrtry;
 652        new->cl_chatty = clnt->cl_chatty;
 653        new->cl_principal = clnt->cl_principal;
 654        new->cl_max_connect = clnt->cl_max_connect;
 655        return new;
 656
 657out_err:
 658        trace_rpc_clnt_clone_err(clnt, err);
 659        return ERR_PTR(err);
 660}
 661
 662/**
 663 * rpc_clone_client - Clone an RPC client structure
 664 *
 665 * @clnt: RPC client whose parameters are copied
 666 *
 667 * Returns a fresh RPC client or an ERR_PTR.
 668 */
 669struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
 670{
 671        struct rpc_create_args args = {
 672                .program        = clnt->cl_program,
 673                .prognumber     = clnt->cl_prog,
 674                .version        = clnt->cl_vers,
 675                .authflavor     = clnt->cl_auth->au_flavor,
 676                .cred           = clnt->cl_cred,
 677        };
 678        return __rpc_clone_client(&args, clnt);
 679}
 680EXPORT_SYMBOL_GPL(rpc_clone_client);
 681
 682/**
 683 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
 684 *
 685 * @clnt: RPC client whose parameters are copied
 686 * @flavor: security flavor for new client
 687 *
 688 * Returns a fresh RPC client or an ERR_PTR.
 689 */
 690struct rpc_clnt *
 691rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
 692{
 693        struct rpc_create_args args = {
 694                .program        = clnt->cl_program,
 695                .prognumber     = clnt->cl_prog,
 696                .version        = clnt->cl_vers,
 697                .authflavor     = flavor,
 698                .cred           = clnt->cl_cred,
 699        };
 700        return __rpc_clone_client(&args, clnt);
 701}
 702EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
 703
 704/**
 705 * rpc_switch_client_transport: switch the RPC transport on the fly
 706 * @clnt: pointer to a struct rpc_clnt
 707 * @args: pointer to the new transport arguments
 708 * @timeout: pointer to the new timeout parameters
 709 *
 710 * This function allows the caller to switch the RPC transport for the
 711 * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
 712 * server, for instance.  It assumes that the caller has ensured that
 713 * there are no active RPC tasks by using some form of locking.
 714 *
 715 * Returns zero if "clnt" is now using the new xprt.  Otherwise a
 716 * negative errno is returned, and "clnt" continues to use the old
 717 * xprt.
 718 */
 719int rpc_switch_client_transport(struct rpc_clnt *clnt,
 720                struct xprt_create *args,
 721                const struct rpc_timeout *timeout)
 722{
 723        const struct rpc_timeout *old_timeo;
 724        rpc_authflavor_t pseudoflavor;
 725        struct rpc_xprt_switch *xps, *oldxps;
 726        struct rpc_xprt *xprt, *old;
 727        struct rpc_clnt *parent;
 728        int err;
 729
 730        xprt = xprt_create_transport(args);
 731        if (IS_ERR(xprt))
 732                return PTR_ERR(xprt);
 733
 734        xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 735        if (xps == NULL) {
 736                xprt_put(xprt);
 737                return -ENOMEM;
 738        }
 739
 740        pseudoflavor = clnt->cl_auth->au_flavor;
 741
 742        old_timeo = clnt->cl_timeout;
 743        old = rpc_clnt_set_transport(clnt, xprt, timeout);
 744        oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
 745
 746        rpc_unregister_client(clnt);
 747        __rpc_clnt_remove_pipedir(clnt);
 748        rpc_sysfs_client_destroy(clnt);
 749        rpc_clnt_debugfs_unregister(clnt);
 750
 751        /*
 752         * A new transport was created.  "clnt" therefore
 753         * becomes the root of a new cl_parent tree.  clnt's
 754         * children, if it has any, still point to the old xprt.
 755         */
 756        parent = clnt->cl_parent;
 757        clnt->cl_parent = clnt;
 758
 759        /*
 760         * The old rpc_auth cache cannot be re-used.  GSS
 761         * contexts in particular are between a single
 762         * client and server.
 763         */
 764        err = rpc_client_register(clnt, pseudoflavor, NULL);
 765        if (err)
 766                goto out_revert;
 767
 768        synchronize_rcu();
 769        if (parent != clnt)
 770                rpc_release_client(parent);
 771        xprt_switch_put(oldxps);
 772        xprt_put(old);
 773        trace_rpc_clnt_replace_xprt(clnt);
 774        return 0;
 775
 776out_revert:
 777        xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
 778        rpc_clnt_set_transport(clnt, old, old_timeo);
 779        clnt->cl_parent = parent;
 780        rpc_client_register(clnt, pseudoflavor, NULL);
 781        xprt_switch_put(xps);
 782        xprt_put(xprt);
 783        trace_rpc_clnt_replace_xprt_err(clnt);
 784        return err;
 785}
 786EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
 787
 788static
 789int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
 790{
 791        struct rpc_xprt_switch *xps;
 792
 793        rcu_read_lock();
 794        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 795        rcu_read_unlock();
 796        if (xps == NULL)
 797                return -EAGAIN;
 798        xprt_iter_init_listall(xpi, xps);
 799        xprt_switch_put(xps);
 800        return 0;
 801}
 802
 803/**
 804 * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
 805 * @clnt: pointer to client
 806 * @fn: function to apply
 807 * @data: void pointer to function data
 808 *
 809 * Iterates through the list of RPC transports currently attached to the
 810 * client and applies the function fn(clnt, xprt, data).
 811 *
 812 * On error, the iteration stops, and the function returns the error value.
 813 */
 814int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
 815                int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
 816                void *data)
 817{
 818        struct rpc_xprt_iter xpi;
 819        int ret;
 820
 821        ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
 822        if (ret)
 823                return ret;
 824        for (;;) {
 825                struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
 826
 827                if (!xprt)
 828                        break;
 829                ret = fn(clnt, xprt, data);
 830                xprt_put(xprt);
 831                if (ret < 0)
 832                        break;
 833        }
 834        xprt_iter_destroy(&xpi);
 835        return ret;
 836}
 837EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
 838
 839/*
 840 * Kill all tasks for the given client.
 841 * XXX: kill their descendants as well?
 842 */
 843void rpc_killall_tasks(struct rpc_clnt *clnt)
 844{
 845        struct rpc_task *rovr;
 846
 847
 848        if (list_empty(&clnt->cl_tasks))
 849                return;
 850
 851        /*
 852         * Spin lock all_tasks to prevent changes...
 853         */
 854        trace_rpc_clnt_killall(clnt);
 855        spin_lock(&clnt->cl_lock);
 856        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
 857                rpc_signal_task(rovr);
 858        spin_unlock(&clnt->cl_lock);
 859}
 860EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 861
 862/*
 863 * Properly shut down an RPC client, terminating all outstanding
 864 * requests.
 865 */
 866void rpc_shutdown_client(struct rpc_clnt *clnt)
 867{
 868        might_sleep();
 869
 870        trace_rpc_clnt_shutdown(clnt);
 871
 872        while (!list_empty(&clnt->cl_tasks)) {
 873                rpc_killall_tasks(clnt);
 874                wait_event_timeout(destroy_wait,
 875                        list_empty(&clnt->cl_tasks), 1*HZ);
 876        }
 877
 878        rpc_release_client(clnt);
 879}
 880EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 881
 882/*
 883 * Free an RPC client
 884 */
 885static void rpc_free_client_work(struct work_struct *work)
 886{
 887        struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
 888
 889        trace_rpc_clnt_free(clnt);
 890
 891        /* These might block on processes that might allocate memory,
 892         * so they cannot be called in rpciod, so they are handled separately
 893         * here.
 894         */
 895        rpc_sysfs_client_destroy(clnt);
 896        rpc_clnt_debugfs_unregister(clnt);
 897        rpc_free_clid(clnt);
 898        rpc_clnt_remove_pipedir(clnt);
 899        xprt_put(rcu_dereference_raw(clnt->cl_xprt));
 900
 901        kfree(clnt);
 902        rpciod_down();
 903}
 904static struct rpc_clnt *
 905rpc_free_client(struct rpc_clnt *clnt)
 906{
 907        struct rpc_clnt *parent = NULL;
 908
 909        trace_rpc_clnt_release(clnt);
 910        if (clnt->cl_parent != clnt)
 911                parent = clnt->cl_parent;
 912        rpc_unregister_client(clnt);
 913        rpc_free_iostats(clnt->cl_metrics);
 914        clnt->cl_metrics = NULL;
 915        xprt_iter_destroy(&clnt->cl_xpi);
 916        put_cred(clnt->cl_cred);
 917
 918        INIT_WORK(&clnt->cl_work, rpc_free_client_work);
 919        schedule_work(&clnt->cl_work);
 920        return parent;
 921}
 922
 923/*
 924 * Free an RPC client
 925 */
 926static struct rpc_clnt *
 927rpc_free_auth(struct rpc_clnt *clnt)
 928{
 929        /*
 930         * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
 931         *       release remaining GSS contexts. This mechanism ensures
 932         *       that it can do so safely.
 933         */
 934        if (clnt->cl_auth != NULL) {
 935                rpcauth_release(clnt->cl_auth);
 936                clnt->cl_auth = NULL;
 937        }
 938        if (refcount_dec_and_test(&clnt->cl_count))
 939                return rpc_free_client(clnt);
 940        return NULL;
 941}
 942
 943/*
 944 * Release reference to the RPC client
 945 */
 946void
 947rpc_release_client(struct rpc_clnt *clnt)
 948{
 949        do {
 950                if (list_empty(&clnt->cl_tasks))
 951                        wake_up(&destroy_wait);
 952                if (refcount_dec_not_one(&clnt->cl_count))
 953                        break;
 954                clnt = rpc_free_auth(clnt);
 955        } while (clnt != NULL);
 956}
 957EXPORT_SYMBOL_GPL(rpc_release_client);
 958
 959/**
 960 * rpc_bind_new_program - bind a new RPC program to an existing client
 961 * @old: old rpc_client
 962 * @program: rpc program to set
 963 * @vers: rpc program version
 964 *
 965 * Clones the rpc client and sets up a new RPC program. This is mainly
 966 * of use for enabling different RPC programs to share the same transport.
 967 * The Sun NFSv2/v3 ACL protocol can do this.
 968 */
 969struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 970                                      const struct rpc_program *program,
 971                                      u32 vers)
 972{
 973        struct rpc_create_args args = {
 974                .program        = program,
 975                .prognumber     = program->number,
 976                .version        = vers,
 977                .authflavor     = old->cl_auth->au_flavor,
 978                .cred           = old->cl_cred,
 979        };
 980        struct rpc_clnt *clnt;
 981        int err;
 982
 983        clnt = __rpc_clone_client(&args, old);
 984        if (IS_ERR(clnt))
 985                goto out;
 986        err = rpc_ping(clnt);
 987        if (err != 0) {
 988                rpc_shutdown_client(clnt);
 989                clnt = ERR_PTR(err);
 990        }
 991out:
 992        return clnt;
 993}
 994EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 995
 996struct rpc_xprt *
 997rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
 998{
 999        struct rpc_xprt_switch *xps;
1000
1001        if (!xprt)
1002                return NULL;
1003        rcu_read_lock();
1004        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1005        atomic_long_inc(&xps->xps_queuelen);
1006        rcu_read_unlock();
1007        atomic_long_inc(&xprt->queuelen);
1008
1009        return xprt;
1010}
1011
1012static void
1013rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1014{
1015        struct rpc_xprt_switch *xps;
1016
1017        atomic_long_dec(&xprt->queuelen);
1018        rcu_read_lock();
1019        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1020        atomic_long_dec(&xps->xps_queuelen);
1021        rcu_read_unlock();
1022
1023        xprt_put(xprt);
1024}
1025
1026void rpc_task_release_transport(struct rpc_task *task)
1027{
1028        struct rpc_xprt *xprt = task->tk_xprt;
1029
1030        if (xprt) {
1031                task->tk_xprt = NULL;
1032                if (task->tk_client)
1033                        rpc_task_release_xprt(task->tk_client, xprt);
1034                else
1035                        xprt_put(xprt);
1036        }
1037}
1038EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1039
1040void rpc_task_release_client(struct rpc_task *task)
1041{
1042        struct rpc_clnt *clnt = task->tk_client;
1043
1044        rpc_task_release_transport(task);
1045        if (clnt != NULL) {
1046                /* Remove from client task list */
1047                spin_lock(&clnt->cl_lock);
1048                list_del(&task->tk_task);
1049                spin_unlock(&clnt->cl_lock);
1050                task->tk_client = NULL;
1051
1052                rpc_release_client(clnt);
1053        }
1054}
1055
1056static struct rpc_xprt *
1057rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1058{
1059        struct rpc_xprt *xprt;
1060
1061        rcu_read_lock();
1062        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1063        rcu_read_unlock();
1064        return rpc_task_get_xprt(clnt, xprt);
1065}
1066
1067static struct rpc_xprt *
1068rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1069{
1070        return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1071}
1072
1073static
1074void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1075{
1076        if (task->tk_xprt) {
1077                if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1078                      (task->tk_flags & RPC_TASK_MOVEABLE)))
1079                        return;
1080                xprt_release(task);
1081                xprt_put(task->tk_xprt);
1082        }
1083        if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1084                task->tk_xprt = rpc_task_get_first_xprt(clnt);
1085        else
1086                task->tk_xprt = rpc_task_get_next_xprt(clnt);
1087}
1088
1089static
1090void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1091{
1092        rpc_task_set_transport(task, clnt);
1093        task->tk_client = clnt;
1094        refcount_inc(&clnt->cl_count);
1095        if (clnt->cl_softrtry)
1096                task->tk_flags |= RPC_TASK_SOFT;
1097        if (clnt->cl_softerr)
1098                task->tk_flags |= RPC_TASK_TIMEOUT;
1099        if (clnt->cl_noretranstimeo)
1100                task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1101        /* Add to the client's list of all tasks */
1102        spin_lock(&clnt->cl_lock);
1103        list_add_tail(&task->tk_task, &clnt->cl_tasks);
1104        spin_unlock(&clnt->cl_lock);
1105}
1106
1107static void
1108rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1109{
1110        if (msg != NULL) {
1111                task->tk_msg.rpc_proc = msg->rpc_proc;
1112                task->tk_msg.rpc_argp = msg->rpc_argp;
1113                task->tk_msg.rpc_resp = msg->rpc_resp;
1114                task->tk_msg.rpc_cred = msg->rpc_cred;
1115                if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1116                        get_cred(task->tk_msg.rpc_cred);
1117        }
1118}
1119
1120/*
1121 * Default callback for async RPC calls
1122 */
1123static void
1124rpc_default_callback(struct rpc_task *task, void *data)
1125{
1126}
1127
1128static const struct rpc_call_ops rpc_default_ops = {
1129        .rpc_call_done = rpc_default_callback,
1130};
1131
1132/**
1133 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1134 * @task_setup_data: pointer to task initialisation data
1135 */
1136struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1137{
1138        struct rpc_task *task;
1139
1140        task = rpc_new_task(task_setup_data);
1141        if (IS_ERR(task))
1142                return task;
1143
1144        if (!RPC_IS_ASYNC(task))
1145                task->tk_flags |= RPC_TASK_CRED_NOREF;
1146
1147        rpc_task_set_client(task, task_setup_data->rpc_client);
1148        rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1149
1150        if (task->tk_action == NULL)
1151                rpc_call_start(task);
1152
1153        atomic_inc(&task->tk_count);
1154        rpc_execute(task);
1155        return task;
1156}
1157EXPORT_SYMBOL_GPL(rpc_run_task);
1158
1159/**
1160 * rpc_call_sync - Perform a synchronous RPC call
1161 * @clnt: pointer to RPC client
1162 * @msg: RPC call parameters
1163 * @flags: RPC call flags
1164 */
1165int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1166{
1167        struct rpc_task *task;
1168        struct rpc_task_setup task_setup_data = {
1169                .rpc_client = clnt,
1170                .rpc_message = msg,
1171                .callback_ops = &rpc_default_ops,
1172                .flags = flags,
1173        };
1174        int status;
1175
1176        WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1177        if (flags & RPC_TASK_ASYNC) {
1178                rpc_release_calldata(task_setup_data.callback_ops,
1179                        task_setup_data.callback_data);
1180                return -EINVAL;
1181        }
1182
1183        task = rpc_run_task(&task_setup_data);
1184        if (IS_ERR(task))
1185                return PTR_ERR(task);
1186        status = task->tk_status;
1187        rpc_put_task(task);
1188        return status;
1189}
1190EXPORT_SYMBOL_GPL(rpc_call_sync);
1191
1192/**
1193 * rpc_call_async - Perform an asynchronous RPC call
1194 * @clnt: pointer to RPC client
1195 * @msg: RPC call parameters
1196 * @flags: RPC call flags
1197 * @tk_ops: RPC call ops
1198 * @data: user call data
1199 */
1200int
1201rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1202               const struct rpc_call_ops *tk_ops, void *data)
1203{
1204        struct rpc_task *task;
1205        struct rpc_task_setup task_setup_data = {
1206                .rpc_client = clnt,
1207                .rpc_message = msg,
1208                .callback_ops = tk_ops,
1209                .callback_data = data,
1210                .flags = flags|RPC_TASK_ASYNC,
1211        };
1212
1213        task = rpc_run_task(&task_setup_data);
1214        if (IS_ERR(task))
1215                return PTR_ERR(task);
1216        rpc_put_task(task);
1217        return 0;
1218}
1219EXPORT_SYMBOL_GPL(rpc_call_async);
1220
1221#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1222static void call_bc_encode(struct rpc_task *task);
1223
1224/**
1225 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1226 * rpc_execute against it
1227 * @req: RPC request
1228 */
1229struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1230{
1231        struct rpc_task *task;
1232        struct rpc_task_setup task_setup_data = {
1233                .callback_ops = &rpc_default_ops,
1234                .flags = RPC_TASK_SOFTCONN |
1235                        RPC_TASK_NO_RETRANS_TIMEOUT,
1236        };
1237
1238        dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1239        /*
1240         * Create an rpc_task to send the data
1241         */
1242        task = rpc_new_task(&task_setup_data);
1243        if (IS_ERR(task)) {
1244                xprt_free_bc_request(req);
1245                return task;
1246        }
1247
1248        xprt_init_bc_request(req, task);
1249
1250        task->tk_action = call_bc_encode;
1251        atomic_inc(&task->tk_count);
1252        WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1253        rpc_execute(task);
1254
1255        dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1256        return task;
1257}
1258#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1259
1260/**
1261 * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1262 * @req: RPC request to prepare
1263 * @pages: vector of struct page pointers
1264 * @base: offset in first page where receive should start, in bytes
1265 * @len: expected size of the upper layer data payload, in bytes
1266 * @hdrsize: expected size of upper layer reply header, in XDR words
1267 *
1268 */
1269void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1270                             unsigned int base, unsigned int len,
1271                             unsigned int hdrsize)
1272{
1273        hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1274
1275        xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1276        trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1277}
1278EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1279
1280void
1281rpc_call_start(struct rpc_task *task)
1282{
1283        task->tk_action = call_start;
1284}
1285EXPORT_SYMBOL_GPL(rpc_call_start);
1286
1287/**
1288 * rpc_peeraddr - extract remote peer address from clnt's xprt
1289 * @clnt: RPC client structure
1290 * @buf: target buffer
1291 * @bufsize: length of target buffer
1292 *
1293 * Returns the number of bytes that are actually in the stored address.
1294 */
1295size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1296{
1297        size_t bytes;
1298        struct rpc_xprt *xprt;
1299
1300        rcu_read_lock();
1301        xprt = rcu_dereference(clnt->cl_xprt);
1302
1303        bytes = xprt->addrlen;
1304        if (bytes > bufsize)
1305                bytes = bufsize;
1306        memcpy(buf, &xprt->addr, bytes);
1307        rcu_read_unlock();
1308
1309        return bytes;
1310}
1311EXPORT_SYMBOL_GPL(rpc_peeraddr);
1312
1313/**
1314 * rpc_peeraddr2str - return remote peer address in printable format
1315 * @clnt: RPC client structure
1316 * @format: address format
1317 *
1318 * NB: the lifetime of the memory referenced by the returned pointer is
1319 * the same as the rpc_xprt itself.  As long as the caller uses this
1320 * pointer, it must hold the RCU read lock.
1321 */
1322const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1323                             enum rpc_display_format_t format)
1324{
1325        struct rpc_xprt *xprt;
1326
1327        xprt = rcu_dereference(clnt->cl_xprt);
1328
1329        if (xprt->address_strings[format] != NULL)
1330                return xprt->address_strings[format];
1331        else
1332                return "unprintable";
1333}
1334EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1335
1336static const struct sockaddr_in rpc_inaddr_loopback = {
1337        .sin_family             = AF_INET,
1338        .sin_addr.s_addr        = htonl(INADDR_ANY),
1339};
1340
1341static const struct sockaddr_in6 rpc_in6addr_loopback = {
1342        .sin6_family            = AF_INET6,
1343        .sin6_addr              = IN6ADDR_ANY_INIT,
1344};
1345
1346/*
1347 * Try a getsockname() on a connected datagram socket.  Using a
1348 * connected datagram socket prevents leaving a socket in TIME_WAIT.
1349 * This conserves the ephemeral port number space.
1350 *
1351 * Returns zero and fills in "buf" if successful; otherwise, a
1352 * negative errno is returned.
1353 */
1354static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1355                        struct sockaddr *buf)
1356{
1357        struct socket *sock;
1358        int err;
1359
1360        err = __sock_create(net, sap->sa_family,
1361                                SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1362        if (err < 0) {
1363                dprintk("RPC:       can't create UDP socket (%d)\n", err);
1364                goto out;
1365        }
1366
1367        switch (sap->sa_family) {
1368        case AF_INET:
1369                err = kernel_bind(sock,
1370                                (struct sockaddr *)&rpc_inaddr_loopback,
1371                                sizeof(rpc_inaddr_loopback));
1372                break;
1373        case AF_INET6:
1374                err = kernel_bind(sock,
1375                                (struct sockaddr *)&rpc_in6addr_loopback,
1376                                sizeof(rpc_in6addr_loopback));
1377                break;
1378        default:
1379                err = -EAFNOSUPPORT;
1380                goto out;
1381        }
1382        if (err < 0) {
1383                dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1384                goto out_release;
1385        }
1386
1387        err = kernel_connect(sock, sap, salen, 0);
1388        if (err < 0) {
1389                dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1390                goto out_release;
1391        }
1392
1393        err = kernel_getsockname(sock, buf);
1394        if (err < 0) {
1395                dprintk("RPC:       getsockname failed (%d)\n", err);
1396                goto out_release;
1397        }
1398
1399        err = 0;
1400        if (buf->sa_family == AF_INET6) {
1401                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1402                sin6->sin6_scope_id = 0;
1403        }
1404        dprintk("RPC:       %s succeeded\n", __func__);
1405
1406out_release:
1407        sock_release(sock);
1408out:
1409        return err;
1410}
1411
1412/*
1413 * Scraping a connected socket failed, so we don't have a useable
1414 * local address.  Fallback: generate an address that will prevent
1415 * the server from calling us back.
1416 *
1417 * Returns zero and fills in "buf" if successful; otherwise, a
1418 * negative errno is returned.
1419 */
1420static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1421{
1422        switch (family) {
1423        case AF_INET:
1424                if (buflen < sizeof(rpc_inaddr_loopback))
1425                        return -EINVAL;
1426                memcpy(buf, &rpc_inaddr_loopback,
1427                                sizeof(rpc_inaddr_loopback));
1428                break;
1429        case AF_INET6:
1430                if (buflen < sizeof(rpc_in6addr_loopback))
1431                        return -EINVAL;
1432                memcpy(buf, &rpc_in6addr_loopback,
1433                                sizeof(rpc_in6addr_loopback));
1434                break;
1435        default:
1436                dprintk("RPC:       %s: address family not supported\n",
1437                        __func__);
1438                return -EAFNOSUPPORT;
1439        }
1440        dprintk("RPC:       %s: succeeded\n", __func__);
1441        return 0;
1442}
1443
1444/**
1445 * rpc_localaddr - discover local endpoint address for an RPC client
1446 * @clnt: RPC client structure
1447 * @buf: target buffer
1448 * @buflen: size of target buffer, in bytes
1449 *
1450 * Returns zero and fills in "buf" and "buflen" if successful;
1451 * otherwise, a negative errno is returned.
1452 *
1453 * This works even if the underlying transport is not currently connected,
1454 * or if the upper layer never previously provided a source address.
1455 *
1456 * The result of this function call is transient: multiple calls in
1457 * succession may give different results, depending on how local
1458 * networking configuration changes over time.
1459 */
1460int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1461{
1462        struct sockaddr_storage address;
1463        struct sockaddr *sap = (struct sockaddr *)&address;
1464        struct rpc_xprt *xprt;
1465        struct net *net;
1466        size_t salen;
1467        int err;
1468
1469        rcu_read_lock();
1470        xprt = rcu_dereference(clnt->cl_xprt);
1471        salen = xprt->addrlen;
1472        memcpy(sap, &xprt->addr, salen);
1473        net = get_net(xprt->xprt_net);
1474        rcu_read_unlock();
1475
1476        rpc_set_port(sap, 0);
1477        err = rpc_sockname(net, sap, salen, buf);
1478        put_net(net);
1479        if (err != 0)
1480                /* Couldn't discover local address, return ANYADDR */
1481                return rpc_anyaddr(sap->sa_family, buf, buflen);
1482        return 0;
1483}
1484EXPORT_SYMBOL_GPL(rpc_localaddr);
1485
1486void
1487rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1488{
1489        struct rpc_xprt *xprt;
1490
1491        rcu_read_lock();
1492        xprt = rcu_dereference(clnt->cl_xprt);
1493        if (xprt->ops->set_buffer_size)
1494                xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1495        rcu_read_unlock();
1496}
1497EXPORT_SYMBOL_GPL(rpc_setbufsize);
1498
1499/**
1500 * rpc_net_ns - Get the network namespace for this RPC client
1501 * @clnt: RPC client to query
1502 *
1503 */
1504struct net *rpc_net_ns(struct rpc_clnt *clnt)
1505{
1506        struct net *ret;
1507
1508        rcu_read_lock();
1509        ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1510        rcu_read_unlock();
1511        return ret;
1512}
1513EXPORT_SYMBOL_GPL(rpc_net_ns);
1514
1515/**
1516 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1517 * @clnt: RPC client to query
1518 *
1519 * For stream transports, this is one RPC record fragment (see RFC
1520 * 1831), as we don't support multi-record requests yet.  For datagram
1521 * transports, this is the size of an IP packet minus the IP, UDP, and
1522 * RPC header sizes.
1523 */
1524size_t rpc_max_payload(struct rpc_clnt *clnt)
1525{
1526        size_t ret;
1527
1528        rcu_read_lock();
1529        ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1530        rcu_read_unlock();
1531        return ret;
1532}
1533EXPORT_SYMBOL_GPL(rpc_max_payload);
1534
1535/**
1536 * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1537 * @clnt: RPC client to query
1538 */
1539size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1540{
1541        struct rpc_xprt *xprt;
1542        size_t ret;
1543
1544        rcu_read_lock();
1545        xprt = rcu_dereference(clnt->cl_xprt);
1546        ret = xprt->ops->bc_maxpayload(xprt);
1547        rcu_read_unlock();
1548        return ret;
1549}
1550EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1551
1552unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1553{
1554        struct rpc_xprt *xprt;
1555        unsigned int ret;
1556
1557        rcu_read_lock();
1558        xprt = rcu_dereference(clnt->cl_xprt);
1559        ret = xprt->ops->bc_num_slots(xprt);
1560        rcu_read_unlock();
1561        return ret;
1562}
1563EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1564
1565/**
1566 * rpc_force_rebind - force transport to check that remote port is unchanged
1567 * @clnt: client to rebind
1568 *
1569 */
1570void rpc_force_rebind(struct rpc_clnt *clnt)
1571{
1572        if (clnt->cl_autobind) {
1573                rcu_read_lock();
1574                xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1575                rcu_read_unlock();
1576        }
1577}
1578EXPORT_SYMBOL_GPL(rpc_force_rebind);
1579
1580static int
1581__rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1582{
1583        task->tk_status = 0;
1584        task->tk_rpc_status = 0;
1585        task->tk_action = action;
1586        return 1;
1587}
1588
1589/*
1590 * Restart an (async) RPC call. Usually called from within the
1591 * exit handler.
1592 */
1593int
1594rpc_restart_call(struct rpc_task *task)
1595{
1596        return __rpc_restart_call(task, call_start);
1597}
1598EXPORT_SYMBOL_GPL(rpc_restart_call);
1599
1600/*
1601 * Restart an (async) RPC call from the call_prepare state.
1602 * Usually called from within the exit handler.
1603 */
1604int
1605rpc_restart_call_prepare(struct rpc_task *task)
1606{
1607        if (task->tk_ops->rpc_call_prepare != NULL)
1608                return __rpc_restart_call(task, rpc_prepare_task);
1609        return rpc_restart_call(task);
1610}
1611EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1612
1613const char
1614*rpc_proc_name(const struct rpc_task *task)
1615{
1616        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1617
1618        if (proc) {
1619                if (proc->p_name)
1620                        return proc->p_name;
1621                else
1622                        return "NULL";
1623        } else
1624                return "no proc";
1625}
1626
1627static void
1628__rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1629{
1630        trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1631        task->tk_rpc_status = rpc_status;
1632        rpc_exit(task, tk_status);
1633}
1634
1635static void
1636rpc_call_rpcerror(struct rpc_task *task, int status)
1637{
1638        __rpc_call_rpcerror(task, status, status);
1639}
1640
1641/*
1642 * 0.  Initial state
1643 *
1644 *     Other FSM states can be visited zero or more times, but
1645 *     this state is visited exactly once for each RPC.
1646 */
1647static void
1648call_start(struct rpc_task *task)
1649{
1650        struct rpc_clnt *clnt = task->tk_client;
1651        int idx = task->tk_msg.rpc_proc->p_statidx;
1652
1653        trace_rpc_request(task);
1654
1655        /* Increment call count (version might not be valid for ping) */
1656        if (clnt->cl_program->version[clnt->cl_vers])
1657                clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1658        clnt->cl_stats->rpccnt++;
1659        task->tk_action = call_reserve;
1660        rpc_task_set_transport(task, clnt);
1661}
1662
1663/*
1664 * 1.   Reserve an RPC call slot
1665 */
1666static void
1667call_reserve(struct rpc_task *task)
1668{
1669        task->tk_status  = 0;
1670        task->tk_action  = call_reserveresult;
1671        xprt_reserve(task);
1672}
1673
1674static void call_retry_reserve(struct rpc_task *task);
1675
1676/*
1677 * 1b.  Grok the result of xprt_reserve()
1678 */
1679static void
1680call_reserveresult(struct rpc_task *task)
1681{
1682        int status = task->tk_status;
1683
1684        /*
1685         * After a call to xprt_reserve(), we must have either
1686         * a request slot or else an error status.
1687         */
1688        task->tk_status = 0;
1689        if (status >= 0) {
1690                if (task->tk_rqstp) {
1691                        task->tk_action = call_refresh;
1692                        return;
1693                }
1694
1695                rpc_call_rpcerror(task, -EIO);
1696                return;
1697        }
1698
1699        switch (status) {
1700        case -ENOMEM:
1701                rpc_delay(task, HZ >> 2);
1702                fallthrough;
1703        case -EAGAIN:   /* woken up; retry */
1704                task->tk_action = call_retry_reserve;
1705                return;
1706        default:
1707                rpc_call_rpcerror(task, status);
1708        }
1709}
1710
1711/*
1712 * 1c.  Retry reserving an RPC call slot
1713 */
1714static void
1715call_retry_reserve(struct rpc_task *task)
1716{
1717        task->tk_status  = 0;
1718        task->tk_action  = call_reserveresult;
1719        xprt_retry_reserve(task);
1720}
1721
1722/*
1723 * 2.   Bind and/or refresh the credentials
1724 */
1725static void
1726call_refresh(struct rpc_task *task)
1727{
1728        task->tk_action = call_refreshresult;
1729        task->tk_status = 0;
1730        task->tk_client->cl_stats->rpcauthrefresh++;
1731        rpcauth_refreshcred(task);
1732}
1733
1734/*
1735 * 2a.  Process the results of a credential refresh
1736 */
1737static void
1738call_refreshresult(struct rpc_task *task)
1739{
1740        int status = task->tk_status;
1741
1742        task->tk_status = 0;
1743        task->tk_action = call_refresh;
1744        switch (status) {
1745        case 0:
1746                if (rpcauth_uptodatecred(task)) {
1747                        task->tk_action = call_allocate;
1748                        return;
1749                }
1750                /* Use rate-limiting and a max number of retries if refresh
1751                 * had status 0 but failed to update the cred.
1752                 */
1753                fallthrough;
1754        case -ETIMEDOUT:
1755                rpc_delay(task, 3*HZ);
1756                fallthrough;
1757        case -EAGAIN:
1758                status = -EACCES;
1759                fallthrough;
1760        case -EKEYEXPIRED:
1761                if (!task->tk_cred_retry)
1762                        break;
1763                task->tk_cred_retry--;
1764                trace_rpc_retry_refresh_status(task);
1765                return;
1766        case -ENOMEM:
1767                rpc_delay(task, HZ >> 4);
1768                return;
1769        }
1770        trace_rpc_refresh_status(task);
1771        rpc_call_rpcerror(task, status);
1772}
1773
1774/*
1775 * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1776 *      (Note: buffer memory is freed in xprt_release).
1777 */
1778static void
1779call_allocate(struct rpc_task *task)
1780{
1781        const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1782        struct rpc_rqst *req = task->tk_rqstp;
1783        struct rpc_xprt *xprt = req->rq_xprt;
1784        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1785        int status;
1786
1787        task->tk_status = 0;
1788        task->tk_action = call_encode;
1789
1790        if (req->rq_buffer)
1791                return;
1792
1793        if (proc->p_proc != 0) {
1794                BUG_ON(proc->p_arglen == 0);
1795                if (proc->p_decode != NULL)
1796                        BUG_ON(proc->p_replen == 0);
1797        }
1798
1799        /*
1800         * Calculate the size (in quads) of the RPC call
1801         * and reply headers, and convert both values
1802         * to byte sizes.
1803         */
1804        req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1805                           proc->p_arglen;
1806        req->rq_callsize <<= 2;
1807        /*
1808         * Note: the reply buffer must at minimum allocate enough space
1809         * for the 'struct accepted_reply' from RFC5531.
1810         */
1811        req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1812                        max_t(size_t, proc->p_replen, 2);
1813        req->rq_rcvsize <<= 2;
1814
1815        status = xprt->ops->buf_alloc(task);
1816        trace_rpc_buf_alloc(task, status);
1817        if (status == 0)
1818                return;
1819        if (status != -ENOMEM) {
1820                rpc_call_rpcerror(task, status);
1821                return;
1822        }
1823
1824        if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1825                task->tk_action = call_allocate;
1826                rpc_delay(task, HZ>>4);
1827                return;
1828        }
1829
1830        rpc_call_rpcerror(task, -ERESTARTSYS);
1831}
1832
1833static int
1834rpc_task_need_encode(struct rpc_task *task)
1835{
1836        return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1837                (!(task->tk_flags & RPC_TASK_SENT) ||
1838                 !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1839                 xprt_request_need_retransmit(task));
1840}
1841
1842static void
1843rpc_xdr_encode(struct rpc_task *task)
1844{
1845        struct rpc_rqst *req = task->tk_rqstp;
1846        struct xdr_stream xdr;
1847
1848        xdr_buf_init(&req->rq_snd_buf,
1849                     req->rq_buffer,
1850                     req->rq_callsize);
1851        xdr_buf_init(&req->rq_rcv_buf,
1852                     req->rq_rbuffer,
1853                     req->rq_rcvsize);
1854
1855        req->rq_reply_bytes_recvd = 0;
1856        req->rq_snd_buf.head[0].iov_len = 0;
1857        xdr_init_encode(&xdr, &req->rq_snd_buf,
1858                        req->rq_snd_buf.head[0].iov_base, req);
1859        xdr_free_bvec(&req->rq_snd_buf);
1860        if (rpc_encode_header(task, &xdr))
1861                return;
1862
1863        task->tk_status = rpcauth_wrap_req(task, &xdr);
1864}
1865
1866/*
1867 * 3.   Encode arguments of an RPC call
1868 */
1869static void
1870call_encode(struct rpc_task *task)
1871{
1872        if (!rpc_task_need_encode(task))
1873                goto out;
1874
1875        /* Dequeue task from the receive queue while we're encoding */
1876        xprt_request_dequeue_xprt(task);
1877        /* Encode here so that rpcsec_gss can use correct sequence number. */
1878        rpc_xdr_encode(task);
1879        /* Add task to reply queue before transmission to avoid races */
1880        if (task->tk_status == 0 && rpc_reply_expected(task))
1881                task->tk_status = xprt_request_enqueue_receive(task);
1882        /* Did the encode result in an error condition? */
1883        if (task->tk_status != 0) {
1884                /* Was the error nonfatal? */
1885                switch (task->tk_status) {
1886                case -EAGAIN:
1887                case -ENOMEM:
1888                        rpc_delay(task, HZ >> 4);
1889                        break;
1890                case -EKEYEXPIRED:
1891                        if (!task->tk_cred_retry) {
1892                                rpc_exit(task, task->tk_status);
1893                        } else {
1894                                task->tk_action = call_refresh;
1895                                task->tk_cred_retry--;
1896                                trace_rpc_retry_refresh_status(task);
1897                        }
1898                        break;
1899                default:
1900                        rpc_call_rpcerror(task, task->tk_status);
1901                }
1902                return;
1903        }
1904
1905        xprt_request_enqueue_transmit(task);
1906out:
1907        task->tk_action = call_transmit;
1908        /* Check that the connection is OK */
1909        if (!xprt_bound(task->tk_xprt))
1910                task->tk_action = call_bind;
1911        else if (!xprt_connected(task->tk_xprt))
1912                task->tk_action = call_connect;
1913}
1914
1915/*
1916 * Helpers to check if the task was already transmitted, and
1917 * to take action when that is the case.
1918 */
1919static bool
1920rpc_task_transmitted(struct rpc_task *task)
1921{
1922        return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1923}
1924
1925static void
1926rpc_task_handle_transmitted(struct rpc_task *task)
1927{
1928        xprt_end_transmit(task);
1929        task->tk_action = call_transmit_status;
1930}
1931
1932/*
1933 * 4.   Get the server port number if not yet set
1934 */
1935static void
1936call_bind(struct rpc_task *task)
1937{
1938        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1939
1940        if (rpc_task_transmitted(task)) {
1941                rpc_task_handle_transmitted(task);
1942                return;
1943        }
1944
1945        if (xprt_bound(xprt)) {
1946                task->tk_action = call_connect;
1947                return;
1948        }
1949
1950        task->tk_action = call_bind_status;
1951        if (!xprt_prepare_transmit(task))
1952                return;
1953
1954        xprt->ops->rpcbind(task);
1955}
1956
1957/*
1958 * 4a.  Sort out bind result
1959 */
1960static void
1961call_bind_status(struct rpc_task *task)
1962{
1963        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1964        int status = -EIO;
1965
1966        if (rpc_task_transmitted(task)) {
1967                rpc_task_handle_transmitted(task);
1968                return;
1969        }
1970
1971        if (task->tk_status >= 0)
1972                goto out_next;
1973        if (xprt_bound(xprt)) {
1974                task->tk_status = 0;
1975                goto out_next;
1976        }
1977
1978        switch (task->tk_status) {
1979        case -ENOMEM:
1980                rpc_delay(task, HZ >> 2);
1981                goto retry_timeout;
1982        case -EACCES:
1983                trace_rpcb_prog_unavail_err(task);
1984                /* fail immediately if this is an RPC ping */
1985                if (task->tk_msg.rpc_proc->p_proc == 0) {
1986                        status = -EOPNOTSUPP;
1987                        break;
1988                }
1989                if (task->tk_rebind_retry == 0)
1990                        break;
1991                task->tk_rebind_retry--;
1992                rpc_delay(task, 3*HZ);
1993                goto retry_timeout;
1994        case -ENOBUFS:
1995                rpc_delay(task, HZ >> 2);
1996                goto retry_timeout;
1997        case -EAGAIN:
1998                goto retry_timeout;
1999        case -ETIMEDOUT:
2000                trace_rpcb_timeout_err(task);
2001                goto retry_timeout;
2002        case -EPFNOSUPPORT:
2003                /* server doesn't support any rpcbind version we know of */
2004                trace_rpcb_bind_version_err(task);
2005                break;
2006        case -EPROTONOSUPPORT:
2007                trace_rpcb_bind_version_err(task);
2008                goto retry_timeout;
2009        case -ECONNREFUSED:             /* connection problems */
2010        case -ECONNRESET:
2011        case -ECONNABORTED:
2012        case -ENOTCONN:
2013        case -EHOSTDOWN:
2014        case -ENETDOWN:
2015        case -EHOSTUNREACH:
2016        case -ENETUNREACH:
2017        case -EPIPE:
2018                trace_rpcb_unreachable_err(task);
2019                if (!RPC_IS_SOFTCONN(task)) {
2020                        rpc_delay(task, 5*HZ);
2021                        goto retry_timeout;
2022                }
2023                status = task->tk_status;
2024                break;
2025        default:
2026                trace_rpcb_unrecognized_err(task);
2027        }
2028
2029        rpc_call_rpcerror(task, status);
2030        return;
2031out_next:
2032        task->tk_action = call_connect;
2033        return;
2034retry_timeout:
2035        task->tk_status = 0;
2036        task->tk_action = call_bind;
2037        rpc_check_timeout(task);
2038}
2039
2040/*
2041 * 4b.  Connect to the RPC server
2042 */
2043static void
2044call_connect(struct rpc_task *task)
2045{
2046        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2047
2048        if (rpc_task_transmitted(task)) {
2049                rpc_task_handle_transmitted(task);
2050                return;
2051        }
2052
2053        if (xprt_connected(xprt)) {
2054                task->tk_action = call_transmit;
2055                return;
2056        }
2057
2058        task->tk_action = call_connect_status;
2059        if (task->tk_status < 0)
2060                return;
2061        if (task->tk_flags & RPC_TASK_NOCONNECT) {
2062                rpc_call_rpcerror(task, -ENOTCONN);
2063                return;
2064        }
2065        if (!xprt_prepare_transmit(task))
2066                return;
2067        xprt_connect(task);
2068}
2069
2070/*
2071 * 4c.  Sort out connect result
2072 */
2073static void
2074call_connect_status(struct rpc_task *task)
2075{
2076        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2077        struct rpc_clnt *clnt = task->tk_client;
2078        int status = task->tk_status;
2079
2080        if (rpc_task_transmitted(task)) {
2081                rpc_task_handle_transmitted(task);
2082                return;
2083        }
2084
2085        trace_rpc_connect_status(task);
2086
2087        if (task->tk_status == 0) {
2088                clnt->cl_stats->netreconn++;
2089                goto out_next;
2090        }
2091        if (xprt_connected(xprt)) {
2092                task->tk_status = 0;
2093                goto out_next;
2094        }
2095
2096        task->tk_status = 0;
2097        switch (status) {
2098        case -ECONNREFUSED:
2099                /* A positive refusal suggests a rebind is needed. */
2100                if (RPC_IS_SOFTCONN(task))
2101                        break;
2102                if (clnt->cl_autobind) {
2103                        rpc_force_rebind(clnt);
2104                        goto out_retry;
2105                }
2106                fallthrough;
2107        case -ECONNRESET:
2108        case -ECONNABORTED:
2109        case -ENETDOWN:
2110        case -ENETUNREACH:
2111        case -EHOSTUNREACH:
2112        case -EPIPE:
2113        case -EPROTO:
2114                xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2115                                            task->tk_rqstp->rq_connect_cookie);
2116                if (RPC_IS_SOFTCONN(task))
2117                        break;
2118                /* retry with existing socket, after a delay */
2119                rpc_delay(task, 3*HZ);
2120                fallthrough;
2121        case -EADDRINUSE:
2122        case -ENOTCONN:
2123        case -EAGAIN:
2124        case -ETIMEDOUT:
2125                if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2126                    (task->tk_flags & RPC_TASK_MOVEABLE) &&
2127                    test_bit(XPRT_REMOVE, &xprt->state)) {
2128                        struct rpc_xprt *saved = task->tk_xprt;
2129                        struct rpc_xprt_switch *xps;
2130
2131                        rcu_read_lock();
2132                        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2133                        rcu_read_unlock();
2134                        if (xps->xps_nxprts > 1) {
2135                                long value;
2136
2137                                xprt_release(task);
2138                                value = atomic_long_dec_return(&xprt->queuelen);
2139                                if (value == 0)
2140                                        rpc_xprt_switch_remove_xprt(xps, saved);
2141                                xprt_put(saved);
2142                                task->tk_xprt = NULL;
2143                                task->tk_action = call_start;
2144                        }
2145                        xprt_switch_put(xps);
2146                        if (!task->tk_xprt)
2147                                return;
2148                }
2149                goto out_retry;
2150        case -ENOBUFS:
2151                rpc_delay(task, HZ >> 2);
2152                goto out_retry;
2153        }
2154        rpc_call_rpcerror(task, status);
2155        return;
2156out_next:
2157        task->tk_action = call_transmit;
2158        return;
2159out_retry:
2160        /* Check for timeouts before looping back to call_bind */
2161        task->tk_action = call_bind;
2162        rpc_check_timeout(task);
2163}
2164
2165/*
2166 * 5.   Transmit the RPC request, and wait for reply
2167 */
2168static void
2169call_transmit(struct rpc_task *task)
2170{
2171        if (rpc_task_transmitted(task)) {
2172                rpc_task_handle_transmitted(task);
2173                return;
2174        }
2175
2176        task->tk_action = call_transmit_status;
2177        if (!xprt_prepare_transmit(task))
2178                return;
2179        task->tk_status = 0;
2180        if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2181                if (!xprt_connected(task->tk_xprt)) {
2182                        task->tk_status = -ENOTCONN;
2183                        return;
2184                }
2185                xprt_transmit(task);
2186        }
2187        xprt_end_transmit(task);
2188}
2189
2190/*
2191 * 5a.  Handle cleanup after a transmission
2192 */
2193static void
2194call_transmit_status(struct rpc_task *task)
2195{
2196        task->tk_action = call_status;
2197
2198        /*
2199         * Common case: success.  Force the compiler to put this
2200         * test first.
2201         */
2202        if (rpc_task_transmitted(task)) {
2203                task->tk_status = 0;
2204                xprt_request_wait_receive(task);
2205                return;
2206        }
2207
2208        switch (task->tk_status) {
2209        default:
2210                break;
2211        case -EBADMSG:
2212                task->tk_status = 0;
2213                task->tk_action = call_encode;
2214                break;
2215                /*
2216                 * Special cases: if we've been waiting on the
2217                 * socket's write_space() callback, or if the
2218                 * socket just returned a connection error,
2219                 * then hold onto the transport lock.
2220                 */
2221        case -ENOMEM:
2222        case -ENOBUFS:
2223                rpc_delay(task, HZ>>2);
2224                fallthrough;
2225        case -EBADSLT:
2226        case -EAGAIN:
2227                task->tk_action = call_transmit;
2228                task->tk_status = 0;
2229                break;
2230        case -ECONNREFUSED:
2231        case -EHOSTDOWN:
2232        case -ENETDOWN:
2233        case -EHOSTUNREACH:
2234        case -ENETUNREACH:
2235        case -EPERM:
2236                if (RPC_IS_SOFTCONN(task)) {
2237                        if (!task->tk_msg.rpc_proc->p_proc)
2238                                trace_xprt_ping(task->tk_xprt,
2239                                                task->tk_status);
2240                        rpc_call_rpcerror(task, task->tk_status);
2241                        return;
2242                }
2243                fallthrough;
2244        case -ECONNRESET:
2245        case -ECONNABORTED:
2246        case -EADDRINUSE:
2247        case -ENOTCONN:
2248        case -EPIPE:
2249                task->tk_action = call_bind;
2250                task->tk_status = 0;
2251                break;
2252        }
2253        rpc_check_timeout(task);
2254}
2255
2256#if defined(CONFIG_SUNRPC_BACKCHANNEL)
2257static void call_bc_transmit(struct rpc_task *task);
2258static void call_bc_transmit_status(struct rpc_task *task);
2259
2260static void
2261call_bc_encode(struct rpc_task *task)
2262{
2263        xprt_request_enqueue_transmit(task);
2264        task->tk_action = call_bc_transmit;
2265}
2266
2267/*
2268 * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2269 * addition, disconnect on connectivity errors.
2270 */
2271static void
2272call_bc_transmit(struct rpc_task *task)
2273{
2274        task->tk_action = call_bc_transmit_status;
2275        if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2276                if (!xprt_prepare_transmit(task))
2277                        return;
2278                task->tk_status = 0;
2279                xprt_transmit(task);
2280        }
2281        xprt_end_transmit(task);
2282}
2283
2284static void
2285call_bc_transmit_status(struct rpc_task *task)
2286{
2287        struct rpc_rqst *req = task->tk_rqstp;
2288
2289        if (rpc_task_transmitted(task))
2290                task->tk_status = 0;
2291
2292        switch (task->tk_status) {
2293        case 0:
2294                /* Success */
2295        case -ENETDOWN:
2296        case -EHOSTDOWN:
2297        case -EHOSTUNREACH:
2298        case -ENETUNREACH:
2299        case -ECONNRESET:
2300        case -ECONNREFUSED:
2301        case -EADDRINUSE:
2302        case -ENOTCONN:
2303        case -EPIPE:
2304                break;
2305        case -ENOMEM:
2306        case -ENOBUFS:
2307                rpc_delay(task, HZ>>2);
2308                fallthrough;
2309        case -EBADSLT:
2310        case -EAGAIN:
2311                task->tk_status = 0;
2312                task->tk_action = call_bc_transmit;
2313                return;
2314        case -ETIMEDOUT:
2315                /*
2316                 * Problem reaching the server.  Disconnect and let the
2317                 * forechannel reestablish the connection.  The server will
2318                 * have to retransmit the backchannel request and we'll
2319                 * reprocess it.  Since these ops are idempotent, there's no
2320                 * need to cache our reply at this time.
2321                 */
2322                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2323                        "error: %d\n", task->tk_status);
2324                xprt_conditional_disconnect(req->rq_xprt,
2325                        req->rq_connect_cookie);
2326                break;
2327        default:
2328                /*
2329                 * We were unable to reply and will have to drop the
2330                 * request.  The server should reconnect and retransmit.
2331                 */
2332                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2333                        "error: %d\n", task->tk_status);
2334                break;
2335        }
2336        task->tk_action = rpc_exit_task;
2337}
2338#endif /* CONFIG_SUNRPC_BACKCHANNEL */
2339
2340/*
2341 * 6.   Sort out the RPC call status
2342 */
2343static void
2344call_status(struct rpc_task *task)
2345{
2346        struct rpc_clnt *clnt = task->tk_client;
2347        int             status;
2348
2349        if (!task->tk_msg.rpc_proc->p_proc)
2350                trace_xprt_ping(task->tk_xprt, task->tk_status);
2351
2352        status = task->tk_status;
2353        if (status >= 0) {
2354                task->tk_action = call_decode;
2355                return;
2356        }
2357
2358        trace_rpc_call_status(task);
2359        task->tk_status = 0;
2360        switch(status) {
2361        case -EHOSTDOWN:
2362        case -ENETDOWN:
2363        case -EHOSTUNREACH:
2364        case -ENETUNREACH:
2365        case -EPERM:
2366                if (RPC_IS_SOFTCONN(task))
2367                        goto out_exit;
2368                /*
2369                 * Delay any retries for 3 seconds, then handle as if it
2370                 * were a timeout.
2371                 */
2372                rpc_delay(task, 3*HZ);
2373                fallthrough;
2374        case -ETIMEDOUT:
2375                break;
2376        case -ECONNREFUSED:
2377        case -ECONNRESET:
2378        case -ECONNABORTED:
2379        case -ENOTCONN:
2380                rpc_force_rebind(clnt);
2381                break;
2382        case -EADDRINUSE:
2383                rpc_delay(task, 3*HZ);
2384                fallthrough;
2385        case -EPIPE:
2386        case -EAGAIN:
2387                break;
2388        case -ENFILE:
2389        case -ENOBUFS:
2390        case -ENOMEM:
2391                rpc_delay(task, HZ>>2);
2392                break;
2393        case -EIO:
2394                /* shutdown or soft timeout */
2395                goto out_exit;
2396        default:
2397                if (clnt->cl_chatty)
2398                        printk("%s: RPC call returned error %d\n",
2399                               clnt->cl_program->name, -status);
2400                goto out_exit;
2401        }
2402        task->tk_action = call_encode;
2403        if (status != -ECONNRESET && status != -ECONNABORTED)
2404                rpc_check_timeout(task);
2405        return;
2406out_exit:
2407        rpc_call_rpcerror(task, status);
2408}
2409
2410static bool
2411rpc_check_connected(const struct rpc_rqst *req)
2412{
2413        /* No allocated request or transport? return true */
2414        if (!req || !req->rq_xprt)
2415                return true;
2416        return xprt_connected(req->rq_xprt);
2417}
2418
2419static void
2420rpc_check_timeout(struct rpc_task *task)
2421{
2422        struct rpc_clnt *clnt = task->tk_client;
2423
2424        if (RPC_SIGNALLED(task)) {
2425                rpc_call_rpcerror(task, -ERESTARTSYS);
2426                return;
2427        }
2428
2429        if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2430                return;
2431
2432        trace_rpc_timeout_status(task);
2433        task->tk_timeouts++;
2434
2435        if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2436                rpc_call_rpcerror(task, -ETIMEDOUT);
2437                return;
2438        }
2439
2440        if (RPC_IS_SOFT(task)) {
2441                /*
2442                 * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2443                 * been sent, it should time out only if the transport
2444                 * connection gets terminally broken.
2445                 */
2446                if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2447                    rpc_check_connected(task->tk_rqstp))
2448                        return;
2449
2450                if (clnt->cl_chatty) {
2451                        pr_notice_ratelimited(
2452                                "%s: server %s not responding, timed out\n",
2453                                clnt->cl_program->name,
2454                                task->tk_xprt->servername);
2455                }
2456                if (task->tk_flags & RPC_TASK_TIMEOUT)
2457                        rpc_call_rpcerror(task, -ETIMEDOUT);
2458                else
2459                        __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2460                return;
2461        }
2462
2463        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2464                task->tk_flags |= RPC_CALL_MAJORSEEN;
2465                if (clnt->cl_chatty) {
2466                        pr_notice_ratelimited(
2467                                "%s: server %s not responding, still trying\n",
2468                                clnt->cl_program->name,
2469                                task->tk_xprt->servername);
2470                }
2471        }
2472        rpc_force_rebind(clnt);
2473        /*
2474         * Did our request time out due to an RPCSEC_GSS out-of-sequence
2475         * event? RFC2203 requires the server to drop all such requests.
2476         */
2477        rpcauth_invalcred(task);
2478}
2479
2480/*
2481 * 7.   Decode the RPC reply
2482 */
2483static void
2484call_decode(struct rpc_task *task)
2485{
2486        struct rpc_clnt *clnt = task->tk_client;
2487        struct rpc_rqst *req = task->tk_rqstp;
2488        struct xdr_stream xdr;
2489        int err;
2490
2491        if (!task->tk_msg.rpc_proc->p_decode) {
2492                task->tk_action = rpc_exit_task;
2493                return;
2494        }
2495
2496        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2497                if (clnt->cl_chatty) {
2498                        pr_notice_ratelimited("%s: server %s OK\n",
2499                                clnt->cl_program->name,
2500                                task->tk_xprt->servername);
2501                }
2502                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2503        }
2504
2505        /*
2506         * Did we ever call xprt_complete_rqst()? If not, we should assume
2507         * the message is incomplete.
2508         */
2509        err = -EAGAIN;
2510        if (!req->rq_reply_bytes_recvd)
2511                goto out;
2512
2513        /* Ensure that we see all writes made by xprt_complete_rqst()
2514         * before it changed req->rq_reply_bytes_recvd.
2515         */
2516        smp_rmb();
2517
2518        req->rq_rcv_buf.len = req->rq_private_buf.len;
2519        trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2520
2521        /* Check that the softirq receive buffer is valid */
2522        WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2523                                sizeof(req->rq_rcv_buf)) != 0);
2524
2525        xdr_init_decode(&xdr, &req->rq_rcv_buf,
2526                        req->rq_rcv_buf.head[0].iov_base, req);
2527        err = rpc_decode_header(task, &xdr);
2528out:
2529        switch (err) {
2530        case 0:
2531                task->tk_action = rpc_exit_task;
2532                task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2533                return;
2534        case -EAGAIN:
2535                task->tk_status = 0;
2536                if (task->tk_client->cl_discrtry)
2537                        xprt_conditional_disconnect(req->rq_xprt,
2538                                                    req->rq_connect_cookie);
2539                task->tk_action = call_encode;
2540                rpc_check_timeout(task);
2541                break;
2542        case -EKEYREJECTED:
2543                task->tk_action = call_reserve;
2544                rpc_check_timeout(task);
2545                rpcauth_invalcred(task);
2546                /* Ensure we obtain a new XID if we retry! */
2547                xprt_release(task);
2548        }
2549}
2550
2551static int
2552rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2553{
2554        struct rpc_clnt *clnt = task->tk_client;
2555        struct rpc_rqst *req = task->tk_rqstp;
2556        __be32 *p;
2557        int error;
2558
2559        error = -EMSGSIZE;
2560        p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2561        if (!p)
2562                goto out_fail;
2563        *p++ = req->rq_xid;
2564        *p++ = rpc_call;
2565        *p++ = cpu_to_be32(RPC_VERSION);
2566        *p++ = cpu_to_be32(clnt->cl_prog);
2567        *p++ = cpu_to_be32(clnt->cl_vers);
2568        *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2569
2570        error = rpcauth_marshcred(task, xdr);
2571        if (error < 0)
2572                goto out_fail;
2573        return 0;
2574out_fail:
2575        trace_rpc_bad_callhdr(task);
2576        rpc_call_rpcerror(task, error);
2577        return error;
2578}
2579
2580static noinline int
2581rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2582{
2583        struct rpc_clnt *clnt = task->tk_client;
2584        int error;
2585        __be32 *p;
2586
2587        /* RFC-1014 says that the representation of XDR data must be a
2588         * multiple of four bytes
2589         * - if it isn't pointer subtraction in the NFS client may give
2590         *   undefined results
2591         */
2592        if (task->tk_rqstp->rq_rcv_buf.len & 3)
2593                goto out_unparsable;
2594
2595        p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2596        if (!p)
2597                goto out_unparsable;
2598        p++;    /* skip XID */
2599        if (*p++ != rpc_reply)
2600                goto out_unparsable;
2601        if (*p++ != rpc_msg_accepted)
2602                goto out_msg_denied;
2603
2604        error = rpcauth_checkverf(task, xdr);
2605        if (error)
2606                goto out_verifier;
2607
2608        p = xdr_inline_decode(xdr, sizeof(*p));
2609        if (!p)
2610                goto out_unparsable;
2611        switch (*p) {
2612        case rpc_success:
2613                return 0;
2614        case rpc_prog_unavail:
2615                trace_rpc__prog_unavail(task);
2616                error = -EPFNOSUPPORT;
2617                goto out_err;
2618        case rpc_prog_mismatch:
2619                trace_rpc__prog_mismatch(task);
2620                error = -EPROTONOSUPPORT;
2621                goto out_err;
2622        case rpc_proc_unavail:
2623                trace_rpc__proc_unavail(task);
2624                error = -EOPNOTSUPP;
2625                goto out_err;
2626        case rpc_garbage_args:
2627        case rpc_system_err:
2628                trace_rpc__garbage_args(task);
2629                error = -EIO;
2630                break;
2631        default:
2632                goto out_unparsable;
2633        }
2634
2635out_garbage:
2636        clnt->cl_stats->rpcgarbage++;
2637        if (task->tk_garb_retry) {
2638                task->tk_garb_retry--;
2639                task->tk_action = call_encode;
2640                return -EAGAIN;
2641        }
2642out_err:
2643        rpc_call_rpcerror(task, error);
2644        return error;
2645
2646out_unparsable:
2647        trace_rpc__unparsable(task);
2648        error = -EIO;
2649        goto out_garbage;
2650
2651out_verifier:
2652        trace_rpc_bad_verifier(task);
2653        goto out_garbage;
2654
2655out_msg_denied:
2656        error = -EACCES;
2657        p = xdr_inline_decode(xdr, sizeof(*p));
2658        if (!p)
2659                goto out_unparsable;
2660        switch (*p++) {
2661        case rpc_auth_error:
2662                break;
2663        case rpc_mismatch:
2664                trace_rpc__mismatch(task);
2665                error = -EPROTONOSUPPORT;
2666                goto out_err;
2667        default:
2668                goto out_unparsable;
2669        }
2670
2671        p = xdr_inline_decode(xdr, sizeof(*p));
2672        if (!p)
2673                goto out_unparsable;
2674        switch (*p++) {
2675        case rpc_autherr_rejectedcred:
2676        case rpc_autherr_rejectedverf:
2677        case rpcsec_gsserr_credproblem:
2678        case rpcsec_gsserr_ctxproblem:
2679                if (!task->tk_cred_retry)
2680                        break;
2681                task->tk_cred_retry--;
2682                trace_rpc__stale_creds(task);
2683                return -EKEYREJECTED;
2684        case rpc_autherr_badcred:
2685        case rpc_autherr_badverf:
2686                /* possibly garbled cred/verf? */
2687                if (!task->tk_garb_retry)
2688                        break;
2689                task->tk_garb_retry--;
2690                trace_rpc__bad_creds(task);
2691                task->tk_action = call_encode;
2692                return -EAGAIN;
2693        case rpc_autherr_tooweak:
2694                trace_rpc__auth_tooweak(task);
2695                pr_warn("RPC: server %s requires stronger authentication.\n",
2696                        task->tk_xprt->servername);
2697                break;
2698        default:
2699                goto out_unparsable;
2700        }
2701        goto out_err;
2702}
2703
2704static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2705                const void *obj)
2706{
2707}
2708
2709static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2710                void *obj)
2711{
2712        return 0;
2713}
2714
2715static const struct rpc_procinfo rpcproc_null = {
2716        .p_encode = rpcproc_encode_null,
2717        .p_decode = rpcproc_decode_null,
2718};
2719
2720static const struct rpc_procinfo rpcproc_null_noreply = {
2721        .p_encode = rpcproc_encode_null,
2722};
2723
2724static void
2725rpc_null_call_prepare(struct rpc_task *task, void *data)
2726{
2727        task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2728        rpc_call_start(task);
2729}
2730
2731static const struct rpc_call_ops rpc_null_ops = {
2732        .rpc_call_prepare = rpc_null_call_prepare,
2733        .rpc_call_done = rpc_default_callback,
2734};
2735
2736static
2737struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2738                struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2739                const struct rpc_call_ops *ops, void *data)
2740{
2741        struct rpc_message msg = {
2742                .rpc_proc = &rpcproc_null,
2743        };
2744        struct rpc_task_setup task_setup_data = {
2745                .rpc_client = clnt,
2746                .rpc_xprt = xprt,
2747                .rpc_message = &msg,
2748                .rpc_op_cred = cred,
2749                .callback_ops = ops ?: &rpc_null_ops,
2750                .callback_data = data,
2751                .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2752                         RPC_TASK_NULLCREDS,
2753        };
2754
2755        return rpc_run_task(&task_setup_data);
2756}
2757
2758struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2759{
2760        return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2761}
2762EXPORT_SYMBOL_GPL(rpc_call_null);
2763
2764static int rpc_ping(struct rpc_clnt *clnt)
2765{
2766        struct rpc_task *task;
2767        int status;
2768
2769        task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2770        if (IS_ERR(task))
2771                return PTR_ERR(task);
2772        status = task->tk_status;
2773        rpc_put_task(task);
2774        return status;
2775}
2776
2777static int rpc_ping_noreply(struct rpc_clnt *clnt)
2778{
2779        struct rpc_message msg = {
2780                .rpc_proc = &rpcproc_null_noreply,
2781        };
2782        struct rpc_task_setup task_setup_data = {
2783                .rpc_client = clnt,
2784                .rpc_message = &msg,
2785                .callback_ops = &rpc_null_ops,
2786                .flags = RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
2787        };
2788        struct rpc_task *task;
2789        int status;
2790
2791        task = rpc_run_task(&task_setup_data);
2792        if (IS_ERR(task))
2793                return PTR_ERR(task);
2794        status = task->tk_status;
2795        rpc_put_task(task);
2796        return status;
2797}
2798
2799struct rpc_cb_add_xprt_calldata {
2800        struct rpc_xprt_switch *xps;
2801        struct rpc_xprt *xprt;
2802};
2803
2804static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2805{
2806        struct rpc_cb_add_xprt_calldata *data = calldata;
2807
2808        if (task->tk_status == 0)
2809                rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2810}
2811
2812static void rpc_cb_add_xprt_release(void *calldata)
2813{
2814        struct rpc_cb_add_xprt_calldata *data = calldata;
2815
2816        xprt_put(data->xprt);
2817        xprt_switch_put(data->xps);
2818        kfree(data);
2819}
2820
2821static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2822        .rpc_call_prepare = rpc_null_call_prepare,
2823        .rpc_call_done = rpc_cb_add_xprt_done,
2824        .rpc_release = rpc_cb_add_xprt_release,
2825};
2826
2827/**
2828 * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2829 * @clnt: pointer to struct rpc_clnt
2830 * @xps: pointer to struct rpc_xprt_switch,
2831 * @xprt: pointer struct rpc_xprt
2832 * @dummy: unused
2833 */
2834int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2835                struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2836                void *dummy)
2837{
2838        struct rpc_cb_add_xprt_calldata *data;
2839        struct rpc_task *task;
2840
2841        if (xps->xps_nunique_destaddr_xprts + 1 > clnt->cl_max_connect) {
2842                rcu_read_lock();
2843                pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2844                        "transport to server: %s\n", clnt->cl_max_connect,
2845                        rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2846                rcu_read_unlock();
2847                return -EINVAL;
2848        }
2849
2850        data = kmalloc(sizeof(*data), GFP_KERNEL);
2851        if (!data)
2852                return -ENOMEM;
2853        data->xps = xprt_switch_get(xps);
2854        data->xprt = xprt_get(xprt);
2855        if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2856                rpc_cb_add_xprt_release(data);
2857                goto success;
2858        }
2859
2860        task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2861                        &rpc_cb_add_xprt_call_ops, data);
2862        data->xps->xps_nunique_destaddr_xprts++;
2863        rpc_put_task(task);
2864success:
2865        return 1;
2866}
2867EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2868
2869/**
2870 * rpc_clnt_setup_test_and_add_xprt()
2871 *
2872 * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2873 *   1) caller of the test function must dereference the rpc_xprt_switch
2874 *   and the rpc_xprt.
2875 *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2876 *   the rpc_call_done routine.
2877 *
2878 * Upon success (return of 1), the test function adds the new
2879 * transport to the rpc_clnt xprt switch
2880 *
2881 * @clnt: struct rpc_clnt to get the new transport
2882 * @xps:  the rpc_xprt_switch to hold the new transport
2883 * @xprt: the rpc_xprt to test
2884 * @data: a struct rpc_add_xprt_test pointer that holds the test function
2885 *        and test function call data
2886 */
2887int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2888                                     struct rpc_xprt_switch *xps,
2889                                     struct rpc_xprt *xprt,
2890                                     void *data)
2891{
2892        struct rpc_task *task;
2893        struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2894        int status = -EADDRINUSE;
2895
2896        xprt = xprt_get(xprt);
2897        xprt_switch_get(xps);
2898
2899        if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2900                goto out_err;
2901
2902        /* Test the connection */
2903        task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2904        if (IS_ERR(task)) {
2905                status = PTR_ERR(task);
2906                goto out_err;
2907        }
2908        status = task->tk_status;
2909        rpc_put_task(task);
2910
2911        if (status < 0)
2912                goto out_err;
2913
2914        /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2915        xtest->add_xprt_test(clnt, xprt, xtest->data);
2916
2917        xprt_put(xprt);
2918        xprt_switch_put(xps);
2919
2920        /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2921        return 1;
2922out_err:
2923        xprt_put(xprt);
2924        xprt_switch_put(xps);
2925        pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2926                status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2927        return status;
2928}
2929EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2930
2931/**
2932 * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2933 * @clnt: pointer to struct rpc_clnt
2934 * @xprtargs: pointer to struct xprt_create
2935 * @setup: callback to test and/or set up the connection
2936 * @data: pointer to setup function data
2937 *
2938 * Creates a new transport using the parameters set in args and
2939 * adds it to clnt.
2940 * If ping is set, then test that connectivity succeeds before
2941 * adding the new transport.
2942 *
2943 */
2944int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2945                struct xprt_create *xprtargs,
2946                int (*setup)(struct rpc_clnt *,
2947                        struct rpc_xprt_switch *,
2948                        struct rpc_xprt *,
2949                        void *),
2950                void *data)
2951{
2952        struct rpc_xprt_switch *xps;
2953        struct rpc_xprt *xprt;
2954        unsigned long connect_timeout;
2955        unsigned long reconnect_timeout;
2956        unsigned char resvport, reuseport;
2957        int ret = 0, ident;
2958
2959        rcu_read_lock();
2960        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2961        xprt = xprt_iter_xprt(&clnt->cl_xpi);
2962        if (xps == NULL || xprt == NULL) {
2963                rcu_read_unlock();
2964                xprt_switch_put(xps);
2965                return -EAGAIN;
2966        }
2967        resvport = xprt->resvport;
2968        reuseport = xprt->reuseport;
2969        connect_timeout = xprt->connect_timeout;
2970        reconnect_timeout = xprt->max_reconnect_timeout;
2971        ident = xprt->xprt_class->ident;
2972        rcu_read_unlock();
2973
2974        if (!xprtargs->ident)
2975                xprtargs->ident = ident;
2976        xprt = xprt_create_transport(xprtargs);
2977        if (IS_ERR(xprt)) {
2978                ret = PTR_ERR(xprt);
2979                goto out_put_switch;
2980        }
2981        xprt->resvport = resvport;
2982        xprt->reuseport = reuseport;
2983        if (xprt->ops->set_connect_timeout != NULL)
2984                xprt->ops->set_connect_timeout(xprt,
2985                                connect_timeout,
2986                                reconnect_timeout);
2987
2988        rpc_xprt_switch_set_roundrobin(xps);
2989        if (setup) {
2990                ret = setup(clnt, xps, xprt, data);
2991                if (ret != 0)
2992                        goto out_put_xprt;
2993        }
2994        rpc_xprt_switch_add_xprt(xps, xprt);
2995out_put_xprt:
2996        xprt_put(xprt);
2997out_put_switch:
2998        xprt_switch_put(xps);
2999        return ret;
3000}
3001EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
3002
3003struct connect_timeout_data {
3004        unsigned long connect_timeout;
3005        unsigned long reconnect_timeout;
3006};
3007
3008static int
3009rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
3010                struct rpc_xprt *xprt,
3011                void *data)
3012{
3013        struct connect_timeout_data *timeo = data;
3014
3015        if (xprt->ops->set_connect_timeout)
3016                xprt->ops->set_connect_timeout(xprt,
3017                                timeo->connect_timeout,
3018                                timeo->reconnect_timeout);
3019        return 0;
3020}
3021
3022void
3023rpc_set_connect_timeout(struct rpc_clnt *clnt,
3024                unsigned long connect_timeout,
3025                unsigned long reconnect_timeout)
3026{
3027        struct connect_timeout_data timeout = {
3028                .connect_timeout = connect_timeout,
3029                .reconnect_timeout = reconnect_timeout,
3030        };
3031        rpc_clnt_iterate_for_each_xprt(clnt,
3032                        rpc_xprt_set_connect_timeout,
3033                        &timeout);
3034}
3035EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
3036
3037void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
3038{
3039        rcu_read_lock();
3040        xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3041        rcu_read_unlock();
3042}
3043EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
3044
3045void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3046{
3047        rcu_read_lock();
3048        rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3049                                 xprt);
3050        rcu_read_unlock();
3051}
3052EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3053
3054bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3055                                   const struct sockaddr *sap)
3056{
3057        struct rpc_xprt_switch *xps;
3058        bool ret;
3059
3060        rcu_read_lock();
3061        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3062        ret = rpc_xprt_switch_has_addr(xps, sap);
3063        rcu_read_unlock();
3064        return ret;
3065}
3066EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3067
3068#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
3069static void rpc_show_header(void)
3070{
3071        printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3072                "-timeout ---ops--\n");
3073}
3074
3075static void rpc_show_task(const struct rpc_clnt *clnt,
3076                          const struct rpc_task *task)
3077{
3078        const char *rpc_waitq = "none";
3079
3080        if (RPC_IS_QUEUED(task))
3081                rpc_waitq = rpc_qname(task->tk_waitqueue);
3082
3083        printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3084                task->tk_pid, task->tk_flags, task->tk_status,
3085                clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3086                clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3087                task->tk_action, rpc_waitq);
3088}
3089
3090void rpc_show_tasks(struct net *net)
3091{
3092        struct rpc_clnt *clnt;
3093        struct rpc_task *task;
3094        int header = 0;
3095        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3096
3097        spin_lock(&sn->rpc_client_lock);
3098        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3099                spin_lock(&clnt->cl_lock);
3100                list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3101                        if (!header) {
3102                                rpc_show_header();
3103                                header++;
3104                        }
3105                        rpc_show_task(clnt, task);
3106                }
3107                spin_unlock(&clnt->cl_lock);
3108        }
3109        spin_unlock(&sn->rpc_client_lock);
3110}
3111#endif
3112
3113#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3114static int
3115rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3116                struct rpc_xprt *xprt,
3117                void *dummy)
3118{
3119        return xprt_enable_swap(xprt);
3120}
3121
3122int
3123rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3124{
3125        while (clnt != clnt->cl_parent)
3126                clnt = clnt->cl_parent;
3127        if (atomic_inc_return(&clnt->cl_swapper) == 1)
3128                return rpc_clnt_iterate_for_each_xprt(clnt,
3129                                rpc_clnt_swap_activate_callback, NULL);
3130        return 0;
3131}
3132EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3133
3134static int
3135rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3136                struct rpc_xprt *xprt,
3137                void *dummy)
3138{
3139        xprt_disable_swap(xprt);
3140        return 0;
3141}
3142
3143void
3144rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3145{
3146        if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3147                rpc_clnt_iterate_for_each_xprt(clnt,
3148                                rpc_clnt_swap_deactivate_callback, NULL);
3149}
3150EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3151#endif /* CONFIG_SUNRPC_SWAP */
3152