linux/net/sunrpc/clnt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/clnt.c
   3 *
   4 *  This file contains the high-level RPC interface.
   5 *  It is modeled as a finite state machine to support both synchronous
   6 *  and asynchronous requests.
   7 *
   8 *  -   RPC header generation and argument serialization.
   9 *  -   Credential refresh.
  10 *  -   TCP connect handling.
  11 *  -   Retry of operation when it is suspected the operation failed because
  12 *      of uid squashing on the server, or when the credentials were stale
  13 *      and need to be refreshed, or when a packet was damaged in transit.
  14 *      This may be have to be moved to the VFS layer.
  15 *
  16 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
  17 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
  18 */
  19
  20#include <asm/system.h>
  21
  22#include <linux/module.h>
  23#include <linux/types.h>
  24#include <linux/kallsyms.h>
  25#include <linux/mm.h>
  26#include <linux/namei.h>
  27#include <linux/mount.h>
  28#include <linux/slab.h>
  29#include <linux/utsname.h>
  30#include <linux/workqueue.h>
  31#include <linux/in.h>
  32#include <linux/in6.h>
  33#include <linux/un.h>
  34
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/rpc_pipe_fs.h>
  37#include <linux/sunrpc/metrics.h>
  38#include <linux/sunrpc/bc_xprt.h>
  39
  40#include "sunrpc.h"
  41
  42#ifdef RPC_DEBUG
  43# define RPCDBG_FACILITY        RPCDBG_CALL
  44#endif
  45
  46#define dprint_status(t)                                        \
  47        dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
  48                        __func__, t->tk_status)
  49
  50/*
  51 * All RPC clients are linked into this list
  52 */
  53static LIST_HEAD(all_clients);
  54static DEFINE_SPINLOCK(rpc_client_lock);
  55
  56static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
  57
  58
  59static void     call_start(struct rpc_task *task);
  60static void     call_reserve(struct rpc_task *task);
  61static void     call_reserveresult(struct rpc_task *task);
  62static void     call_allocate(struct rpc_task *task);
  63static void     call_decode(struct rpc_task *task);
  64static void     call_bind(struct rpc_task *task);
  65static void     call_bind_status(struct rpc_task *task);
  66static void     call_transmit(struct rpc_task *task);
  67#if defined(CONFIG_NFS_V4_1)
  68static void     call_bc_transmit(struct rpc_task *task);
  69#endif /* CONFIG_NFS_V4_1 */
  70static void     call_status(struct rpc_task *task);
  71static void     call_transmit_status(struct rpc_task *task);
  72static void     call_refresh(struct rpc_task *task);
  73static void     call_refreshresult(struct rpc_task *task);
  74static void     call_timeout(struct rpc_task *task);
  75static void     call_connect(struct rpc_task *task);
  76static void     call_connect_status(struct rpc_task *task);
  77
  78static __be32   *rpc_encode_header(struct rpc_task *task);
  79static __be32   *rpc_verify_header(struct rpc_task *task);
  80static int      rpc_ping(struct rpc_clnt *clnt);
  81
  82static void rpc_register_client(struct rpc_clnt *clnt)
  83{
  84        spin_lock(&rpc_client_lock);
  85        list_add(&clnt->cl_clients, &all_clients);
  86        spin_unlock(&rpc_client_lock);
  87}
  88
  89static void rpc_unregister_client(struct rpc_clnt *clnt)
  90{
  91        spin_lock(&rpc_client_lock);
  92        list_del(&clnt->cl_clients);
  93        spin_unlock(&rpc_client_lock);
  94}
  95
  96static int
  97rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
  98{
  99        static uint32_t clntid;
 100        struct nameidata nd;
 101        struct path path;
 102        char name[15];
 103        struct qstr q = {
 104                .name = name,
 105        };
 106        int error;
 107
 108        clnt->cl_path.mnt = ERR_PTR(-ENOENT);
 109        clnt->cl_path.dentry = ERR_PTR(-ENOENT);
 110        if (dir_name == NULL)
 111                return 0;
 112
 113        path.mnt = rpc_get_mount();
 114        if (IS_ERR(path.mnt))
 115                return PTR_ERR(path.mnt);
 116        error = vfs_path_lookup(path.mnt->mnt_root, path.mnt, dir_name, 0, &nd);
 117        if (error)
 118                goto err;
 119
 120        for (;;) {
 121                q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
 122                name[sizeof(name) - 1] = '\0';
 123                q.hash = full_name_hash(q.name, q.len);
 124                path.dentry = rpc_create_client_dir(nd.path.dentry, &q, clnt);
 125                if (!IS_ERR(path.dentry))
 126                        break;
 127                error = PTR_ERR(path.dentry);
 128                if (error != -EEXIST) {
 129                        printk(KERN_INFO "RPC: Couldn't create pipefs entry"
 130                                        " %s/%s, error %d\n",
 131                                        dir_name, name, error);
 132                        goto err_path_put;
 133                }
 134        }
 135        path_put(&nd.path);
 136        clnt->cl_path = path;
 137        return 0;
 138err_path_put:
 139        path_put(&nd.path);
 140err:
 141        rpc_put_mount();
 142        return error;
 143}
 144
 145static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
 146{
 147        struct rpc_program      *program = args->program;
 148        struct rpc_version      *version;
 149        struct rpc_clnt         *clnt = NULL;
 150        struct rpc_auth         *auth;
 151        int err;
 152        size_t len;
 153
 154        /* sanity check the name before trying to print it */
 155        err = -EINVAL;
 156        len = strlen(args->servername);
 157        if (len > RPC_MAXNETNAMELEN)
 158                goto out_no_rpciod;
 159        len++;
 160
 161        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
 162                        program->name, args->servername, xprt);
 163
 164        err = rpciod_up();
 165        if (err)
 166                goto out_no_rpciod;
 167        err = -EINVAL;
 168        if (!xprt)
 169                goto out_no_xprt;
 170
 171        if (args->version >= program->nrvers)
 172                goto out_err;
 173        version = program->version[args->version];
 174        if (version == NULL)
 175                goto out_err;
 176
 177        err = -ENOMEM;
 178        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
 179        if (!clnt)
 180                goto out_err;
 181        clnt->cl_parent = clnt;
 182
 183        clnt->cl_server = clnt->cl_inline_name;
 184        if (len > sizeof(clnt->cl_inline_name)) {
 185                char *buf = kmalloc(len, GFP_KERNEL);
 186                if (buf != NULL)
 187                        clnt->cl_server = buf;
 188                else
 189                        len = sizeof(clnt->cl_inline_name);
 190        }
 191        strlcpy(clnt->cl_server, args->servername, len);
 192
 193        clnt->cl_xprt     = xprt;
 194        clnt->cl_procinfo = version->procs;
 195        clnt->cl_maxproc  = version->nrprocs;
 196        clnt->cl_protname = program->name;
 197        clnt->cl_prog     = args->prognumber ? : program->number;
 198        clnt->cl_vers     = version->number;
 199        clnt->cl_stats    = program->stats;
 200        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
 201        err = -ENOMEM;
 202        if (clnt->cl_metrics == NULL)
 203                goto out_no_stats;
 204        clnt->cl_program  = program;
 205        INIT_LIST_HEAD(&clnt->cl_tasks);
 206        spin_lock_init(&clnt->cl_lock);
 207
 208        if (!xprt_bound(clnt->cl_xprt))
 209                clnt->cl_autobind = 1;
 210
 211        clnt->cl_timeout = xprt->timeout;
 212        if (args->timeout != NULL) {
 213                memcpy(&clnt->cl_timeout_default, args->timeout,
 214                                sizeof(clnt->cl_timeout_default));
 215                clnt->cl_timeout = &clnt->cl_timeout_default;
 216        }
 217
 218        clnt->cl_rtt = &clnt->cl_rtt_default;
 219        rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
 220        clnt->cl_principal = NULL;
 221        if (args->client_name) {
 222                clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
 223                if (!clnt->cl_principal)
 224                        goto out_no_principal;
 225        }
 226
 227        atomic_set(&clnt->cl_count, 1);
 228
 229        err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
 230        if (err < 0)
 231                goto out_no_path;
 232
 233        auth = rpcauth_create(args->authflavor, clnt);
 234        if (IS_ERR(auth)) {
 235                printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
 236                                args->authflavor);
 237                err = PTR_ERR(auth);
 238                goto out_no_auth;
 239        }
 240
 241        /* save the nodename */
 242        clnt->cl_nodelen = strlen(init_utsname()->nodename);
 243        if (clnt->cl_nodelen > UNX_MAXNODENAME)
 244                clnt->cl_nodelen = UNX_MAXNODENAME;
 245        memcpy(clnt->cl_nodename, init_utsname()->nodename, clnt->cl_nodelen);
 246        rpc_register_client(clnt);
 247        return clnt;
 248
 249out_no_auth:
 250        if (!IS_ERR(clnt->cl_path.dentry)) {
 251                rpc_remove_client_dir(clnt->cl_path.dentry);
 252                rpc_put_mount();
 253        }
 254out_no_path:
 255        kfree(clnt->cl_principal);
 256out_no_principal:
 257        rpc_free_iostats(clnt->cl_metrics);
 258out_no_stats:
 259        if (clnt->cl_server != clnt->cl_inline_name)
 260                kfree(clnt->cl_server);
 261        kfree(clnt);
 262out_err:
 263        xprt_put(xprt);
 264out_no_xprt:
 265        rpciod_down();
 266out_no_rpciod:
 267        return ERR_PTR(err);
 268}
 269
 270/*
 271 * rpc_create - create an RPC client and transport with one call
 272 * @args: rpc_clnt create argument structure
 273 *
 274 * Creates and initializes an RPC transport and an RPC client.
 275 *
 276 * It can ping the server in order to determine if it is up, and to see if
 277 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
 278 * this behavior so asynchronous tasks can also use rpc_create.
 279 */
 280struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 281{
 282        struct rpc_xprt *xprt;
 283        struct rpc_clnt *clnt;
 284        struct xprt_create xprtargs = {
 285                .net = args->net,
 286                .ident = args->protocol,
 287                .srcaddr = args->saddress,
 288                .dstaddr = args->address,
 289                .addrlen = args->addrsize,
 290                .bc_xprt = args->bc_xprt,
 291        };
 292        char servername[48];
 293
 294        /*
 295         * If the caller chooses not to specify a hostname, whip
 296         * up a string representation of the passed-in address.
 297         */
 298        if (args->servername == NULL) {
 299                struct sockaddr_un *sun =
 300                                (struct sockaddr_un *)args->address;
 301                struct sockaddr_in *sin =
 302                                (struct sockaddr_in *)args->address;
 303                struct sockaddr_in6 *sin6 =
 304                                (struct sockaddr_in6 *)args->address;
 305
 306                servername[0] = '\0';
 307                switch (args->address->sa_family) {
 308                case AF_LOCAL:
 309                        snprintf(servername, sizeof(servername), "%s",
 310                                 sun->sun_path);
 311                        break;
 312                case AF_INET:
 313                        snprintf(servername, sizeof(servername), "%pI4",
 314                                 &sin->sin_addr.s_addr);
 315                        break;
 316                case AF_INET6:
 317                        snprintf(servername, sizeof(servername), "%pI6",
 318                                 &sin6->sin6_addr);
 319                        break;
 320                default:
 321                        /* caller wants default server name, but
 322                         * address family isn't recognized. */
 323                        return ERR_PTR(-EINVAL);
 324                }
 325                args->servername = servername;
 326        }
 327
 328        xprt = xprt_create_transport(&xprtargs);
 329        if (IS_ERR(xprt))
 330                return (struct rpc_clnt *)xprt;
 331
 332        /*
 333         * By default, kernel RPC client connects from a reserved port.
 334         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
 335         * but it is always enabled for rpciod, which handles the connect
 336         * operation.
 337         */
 338        xprt->resvport = 1;
 339        if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 340                xprt->resvport = 0;
 341
 342        clnt = rpc_new_client(args, xprt);
 343        if (IS_ERR(clnt))
 344                return clnt;
 345
 346        if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
 347                int err = rpc_ping(clnt);
 348                if (err != 0) {
 349                        rpc_shutdown_client(clnt);
 350                        return ERR_PTR(err);
 351                }
 352        }
 353
 354        clnt->cl_softrtry = 1;
 355        if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
 356                clnt->cl_softrtry = 0;
 357
 358        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
 359                clnt->cl_autobind = 1;
 360        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
 361                clnt->cl_discrtry = 1;
 362        if (!(args->flags & RPC_CLNT_CREATE_QUIET))
 363                clnt->cl_chatty = 1;
 364
 365        return clnt;
 366}
 367EXPORT_SYMBOL_GPL(rpc_create);
 368
 369/*
 370 * This function clones the RPC client structure. It allows us to share the
 371 * same transport while varying parameters such as the authentication
 372 * flavour.
 373 */
 374struct rpc_clnt *
 375rpc_clone_client(struct rpc_clnt *clnt)
 376{
 377        struct rpc_clnt *new;
 378        int err = -ENOMEM;
 379
 380        new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
 381        if (!new)
 382                goto out_no_clnt;
 383        new->cl_parent = clnt;
 384        /* Turn off autobind on clones */
 385        new->cl_autobind = 0;
 386        INIT_LIST_HEAD(&new->cl_tasks);
 387        spin_lock_init(&new->cl_lock);
 388        rpc_init_rtt(&new->cl_rtt_default, clnt->cl_timeout->to_initval);
 389        new->cl_metrics = rpc_alloc_iostats(clnt);
 390        if (new->cl_metrics == NULL)
 391                goto out_no_stats;
 392        if (clnt->cl_principal) {
 393                new->cl_principal = kstrdup(clnt->cl_principal, GFP_KERNEL);
 394                if (new->cl_principal == NULL)
 395                        goto out_no_principal;
 396        }
 397        atomic_set(&new->cl_count, 1);
 398        err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
 399        if (err != 0)
 400                goto out_no_path;
 401        if (new->cl_auth)
 402                atomic_inc(&new->cl_auth->au_count);
 403        xprt_get(clnt->cl_xprt);
 404        atomic_inc(&clnt->cl_count);
 405        rpc_register_client(new);
 406        rpciod_up();
 407        return new;
 408out_no_path:
 409        kfree(new->cl_principal);
 410out_no_principal:
 411        rpc_free_iostats(new->cl_metrics);
 412out_no_stats:
 413        kfree(new);
 414out_no_clnt:
 415        dprintk("RPC:       %s: returned error %d\n", __func__, err);
 416        return ERR_PTR(err);
 417}
 418EXPORT_SYMBOL_GPL(rpc_clone_client);
 419
 420/*
 421 * Kill all tasks for the given client.
 422 * XXX: kill their descendants as well?
 423 */
 424void rpc_killall_tasks(struct rpc_clnt *clnt)
 425{
 426        struct rpc_task *rovr;
 427
 428
 429        if (list_empty(&clnt->cl_tasks))
 430                return;
 431        dprintk("RPC:       killing all tasks for client %p\n", clnt);
 432        /*
 433         * Spin lock all_tasks to prevent changes...
 434         */
 435        spin_lock(&clnt->cl_lock);
 436        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
 437                if (!RPC_IS_ACTIVATED(rovr))
 438                        continue;
 439                if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
 440                        rovr->tk_flags |= RPC_TASK_KILLED;
 441                        rpc_exit(rovr, -EIO);
 442                        if (RPC_IS_QUEUED(rovr))
 443                                rpc_wake_up_queued_task(rovr->tk_waitqueue,
 444                                                        rovr);
 445                }
 446        }
 447        spin_unlock(&clnt->cl_lock);
 448}
 449EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 450
 451/*
 452 * Properly shut down an RPC client, terminating all outstanding
 453 * requests.
 454 */
 455void rpc_shutdown_client(struct rpc_clnt *clnt)
 456{
 457        dprintk("RPC:       shutting down %s client for %s\n",
 458                        clnt->cl_protname, clnt->cl_server);
 459
 460        while (!list_empty(&clnt->cl_tasks)) {
 461                rpc_killall_tasks(clnt);
 462                wait_event_timeout(destroy_wait,
 463                        list_empty(&clnt->cl_tasks), 1*HZ);
 464        }
 465
 466        rpc_release_client(clnt);
 467}
 468EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 469
 470/*
 471 * Free an RPC client
 472 */
 473static void
 474rpc_free_client(struct rpc_clnt *clnt)
 475{
 476        dprintk("RPC:       destroying %s client for %s\n",
 477                        clnt->cl_protname, clnt->cl_server);
 478        if (!IS_ERR(clnt->cl_path.dentry)) {
 479                rpc_remove_client_dir(clnt->cl_path.dentry);
 480                rpc_put_mount();
 481        }
 482        if (clnt->cl_parent != clnt) {
 483                rpc_release_client(clnt->cl_parent);
 484                goto out_free;
 485        }
 486        if (clnt->cl_server != clnt->cl_inline_name)
 487                kfree(clnt->cl_server);
 488out_free:
 489        rpc_unregister_client(clnt);
 490        rpc_free_iostats(clnt->cl_metrics);
 491        kfree(clnt->cl_principal);
 492        clnt->cl_metrics = NULL;
 493        xprt_put(clnt->cl_xprt);
 494        rpciod_down();
 495        kfree(clnt);
 496}
 497
 498/*
 499 * Free an RPC client
 500 */
 501static void
 502rpc_free_auth(struct rpc_clnt *clnt)
 503{
 504        if (clnt->cl_auth == NULL) {
 505                rpc_free_client(clnt);
 506                return;
 507        }
 508
 509        /*
 510         * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
 511         *       release remaining GSS contexts. This mechanism ensures
 512         *       that it can do so safely.
 513         */
 514        atomic_inc(&clnt->cl_count);
 515        rpcauth_release(clnt->cl_auth);
 516        clnt->cl_auth = NULL;
 517        if (atomic_dec_and_test(&clnt->cl_count))
 518                rpc_free_client(clnt);
 519}
 520
 521/*
 522 * Release reference to the RPC client
 523 */
 524void
 525rpc_release_client(struct rpc_clnt *clnt)
 526{
 527        dprintk("RPC:       rpc_release_client(%p)\n", clnt);
 528
 529        if (list_empty(&clnt->cl_tasks))
 530                wake_up(&destroy_wait);
 531        if (atomic_dec_and_test(&clnt->cl_count))
 532                rpc_free_auth(clnt);
 533}
 534
 535/**
 536 * rpc_bind_new_program - bind a new RPC program to an existing client
 537 * @old: old rpc_client
 538 * @program: rpc program to set
 539 * @vers: rpc program version
 540 *
 541 * Clones the rpc client and sets up a new RPC program. This is mainly
 542 * of use for enabling different RPC programs to share the same transport.
 543 * The Sun NFSv2/v3 ACL protocol can do this.
 544 */
 545struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 546                                      struct rpc_program *program,
 547                                      u32 vers)
 548{
 549        struct rpc_clnt *clnt;
 550        struct rpc_version *version;
 551        int err;
 552
 553        BUG_ON(vers >= program->nrvers || !program->version[vers]);
 554        version = program->version[vers];
 555        clnt = rpc_clone_client(old);
 556        if (IS_ERR(clnt))
 557                goto out;
 558        clnt->cl_procinfo = version->procs;
 559        clnt->cl_maxproc  = version->nrprocs;
 560        clnt->cl_protname = program->name;
 561        clnt->cl_prog     = program->number;
 562        clnt->cl_vers     = version->number;
 563        clnt->cl_stats    = program->stats;
 564        err = rpc_ping(clnt);
 565        if (err != 0) {
 566                rpc_shutdown_client(clnt);
 567                clnt = ERR_PTR(err);
 568        }
 569out:
 570        return clnt;
 571}
 572EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 573
 574void rpc_task_release_client(struct rpc_task *task)
 575{
 576        struct rpc_clnt *clnt = task->tk_client;
 577
 578        if (clnt != NULL) {
 579                /* Remove from client task list */
 580                spin_lock(&clnt->cl_lock);
 581                list_del(&task->tk_task);
 582                spin_unlock(&clnt->cl_lock);
 583                task->tk_client = NULL;
 584
 585                rpc_release_client(clnt);
 586        }
 587}
 588
 589static
 590void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 591{
 592        if (clnt != NULL) {
 593                rpc_task_release_client(task);
 594                task->tk_client = clnt;
 595                atomic_inc(&clnt->cl_count);
 596                if (clnt->cl_softrtry)
 597                        task->tk_flags |= RPC_TASK_SOFT;
 598                /* Add to the client's list of all tasks */
 599                spin_lock(&clnt->cl_lock);
 600                list_add_tail(&task->tk_task, &clnt->cl_tasks);
 601                spin_unlock(&clnt->cl_lock);
 602        }
 603}
 604
 605void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
 606{
 607        rpc_task_release_client(task);
 608        rpc_task_set_client(task, clnt);
 609}
 610EXPORT_SYMBOL_GPL(rpc_task_reset_client);
 611
 612
 613static void
 614rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
 615{
 616        if (msg != NULL) {
 617                task->tk_msg.rpc_proc = msg->rpc_proc;
 618                task->tk_msg.rpc_argp = msg->rpc_argp;
 619                task->tk_msg.rpc_resp = msg->rpc_resp;
 620                if (msg->rpc_cred != NULL)
 621                        task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
 622        }
 623}
 624
 625/*
 626 * Default callback for async RPC calls
 627 */
 628static void
 629rpc_default_callback(struct rpc_task *task, void *data)
 630{
 631}
 632
 633static const struct rpc_call_ops rpc_default_ops = {
 634        .rpc_call_done = rpc_default_callback,
 635};
 636
 637/**
 638 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
 639 * @task_setup_data: pointer to task initialisation data
 640 */
 641struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
 642{
 643        struct rpc_task *task;
 644
 645        task = rpc_new_task(task_setup_data);
 646        if (IS_ERR(task))
 647                goto out;
 648
 649        rpc_task_set_client(task, task_setup_data->rpc_client);
 650        rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
 651
 652        if (task->tk_action == NULL)
 653                rpc_call_start(task);
 654
 655        atomic_inc(&task->tk_count);
 656        rpc_execute(task);
 657out:
 658        return task;
 659}
 660EXPORT_SYMBOL_GPL(rpc_run_task);
 661
 662/**
 663 * rpc_call_sync - Perform a synchronous RPC call
 664 * @clnt: pointer to RPC client
 665 * @msg: RPC call parameters
 666 * @flags: RPC call flags
 667 */
 668int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
 669{
 670        struct rpc_task *task;
 671        struct rpc_task_setup task_setup_data = {
 672                .rpc_client = clnt,
 673                .rpc_message = msg,
 674                .callback_ops = &rpc_default_ops,
 675                .flags = flags,
 676        };
 677        int status;
 678
 679        BUG_ON(flags & RPC_TASK_ASYNC);
 680
 681        task = rpc_run_task(&task_setup_data);
 682        if (IS_ERR(task))
 683                return PTR_ERR(task);
 684        status = task->tk_status;
 685        rpc_put_task(task);
 686        return status;
 687}
 688EXPORT_SYMBOL_GPL(rpc_call_sync);
 689
 690/**
 691 * rpc_call_async - Perform an asynchronous RPC call
 692 * @clnt: pointer to RPC client
 693 * @msg: RPC call parameters
 694 * @flags: RPC call flags
 695 * @tk_ops: RPC call ops
 696 * @data: user call data
 697 */
 698int
 699rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
 700               const struct rpc_call_ops *tk_ops, void *data)
 701{
 702        struct rpc_task *task;
 703        struct rpc_task_setup task_setup_data = {
 704                .rpc_client = clnt,
 705                .rpc_message = msg,
 706                .callback_ops = tk_ops,
 707                .callback_data = data,
 708                .flags = flags|RPC_TASK_ASYNC,
 709        };
 710
 711        task = rpc_run_task(&task_setup_data);
 712        if (IS_ERR(task))
 713                return PTR_ERR(task);
 714        rpc_put_task(task);
 715        return 0;
 716}
 717EXPORT_SYMBOL_GPL(rpc_call_async);
 718
 719#if defined(CONFIG_NFS_V4_1)
 720/**
 721 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
 722 * rpc_execute against it
 723 * @req: RPC request
 724 * @tk_ops: RPC call ops
 725 */
 726struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
 727                                const struct rpc_call_ops *tk_ops)
 728{
 729        struct rpc_task *task;
 730        struct xdr_buf *xbufp = &req->rq_snd_buf;
 731        struct rpc_task_setup task_setup_data = {
 732                .callback_ops = tk_ops,
 733        };
 734
 735        dprintk("RPC: rpc_run_bc_task req= %p\n", req);
 736        /*
 737         * Create an rpc_task to send the data
 738         */
 739        task = rpc_new_task(&task_setup_data);
 740        if (IS_ERR(task)) {
 741                xprt_free_bc_request(req);
 742                goto out;
 743        }
 744        task->tk_rqstp = req;
 745
 746        /*
 747         * Set up the xdr_buf length.
 748         * This also indicates that the buffer is XDR encoded already.
 749         */
 750        xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
 751                        xbufp->tail[0].iov_len;
 752
 753        task->tk_action = call_bc_transmit;
 754        atomic_inc(&task->tk_count);
 755        BUG_ON(atomic_read(&task->tk_count) != 2);
 756        rpc_execute(task);
 757
 758out:
 759        dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
 760        return task;
 761}
 762#endif /* CONFIG_NFS_V4_1 */
 763
 764void
 765rpc_call_start(struct rpc_task *task)
 766{
 767        task->tk_action = call_start;
 768}
 769EXPORT_SYMBOL_GPL(rpc_call_start);
 770
 771/**
 772 * rpc_peeraddr - extract remote peer address from clnt's xprt
 773 * @clnt: RPC client structure
 774 * @buf: target buffer
 775 * @bufsize: length of target buffer
 776 *
 777 * Returns the number of bytes that are actually in the stored address.
 778 */
 779size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
 780{
 781        size_t bytes;
 782        struct rpc_xprt *xprt = clnt->cl_xprt;
 783
 784        bytes = sizeof(xprt->addr);
 785        if (bytes > bufsize)
 786                bytes = bufsize;
 787        memcpy(buf, &clnt->cl_xprt->addr, bytes);
 788        return xprt->addrlen;
 789}
 790EXPORT_SYMBOL_GPL(rpc_peeraddr);
 791
 792/**
 793 * rpc_peeraddr2str - return remote peer address in printable format
 794 * @clnt: RPC client structure
 795 * @format: address format
 796 *
 797 */
 798const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
 799                             enum rpc_display_format_t format)
 800{
 801        struct rpc_xprt *xprt = clnt->cl_xprt;
 802
 803        if (xprt->address_strings[format] != NULL)
 804                return xprt->address_strings[format];
 805        else
 806                return "unprintable";
 807}
 808EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
 809
 810void
 811rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
 812{
 813        struct rpc_xprt *xprt = clnt->cl_xprt;
 814        if (xprt->ops->set_buffer_size)
 815                xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
 816}
 817EXPORT_SYMBOL_GPL(rpc_setbufsize);
 818
 819/*
 820 * Return size of largest payload RPC client can support, in bytes
 821 *
 822 * For stream transports, this is one RPC record fragment (see RFC
 823 * 1831), as we don't support multi-record requests yet.  For datagram
 824 * transports, this is the size of an IP packet minus the IP, UDP, and
 825 * RPC header sizes.
 826 */
 827size_t rpc_max_payload(struct rpc_clnt *clnt)
 828{
 829        return clnt->cl_xprt->max_payload;
 830}
 831EXPORT_SYMBOL_GPL(rpc_max_payload);
 832
 833/**
 834 * rpc_force_rebind - force transport to check that remote port is unchanged
 835 * @clnt: client to rebind
 836 *
 837 */
 838void rpc_force_rebind(struct rpc_clnt *clnt)
 839{
 840        if (clnt->cl_autobind)
 841                xprt_clear_bound(clnt->cl_xprt);
 842}
 843EXPORT_SYMBOL_GPL(rpc_force_rebind);
 844
 845/*
 846 * Restart an (async) RPC call from the call_prepare state.
 847 * Usually called from within the exit handler.
 848 */
 849int
 850rpc_restart_call_prepare(struct rpc_task *task)
 851{
 852        if (RPC_ASSASSINATED(task))
 853                return 0;
 854        task->tk_action = rpc_prepare_task;
 855        return 1;
 856}
 857EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
 858
 859/*
 860 * Restart an (async) RPC call. Usually called from within the
 861 * exit handler.
 862 */
 863int
 864rpc_restart_call(struct rpc_task *task)
 865{
 866        if (RPC_ASSASSINATED(task))
 867                return 0;
 868        task->tk_action = call_start;
 869        return 1;
 870}
 871EXPORT_SYMBOL_GPL(rpc_restart_call);
 872
 873#ifdef RPC_DEBUG
 874static const char *rpc_proc_name(const struct rpc_task *task)
 875{
 876        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
 877
 878        if (proc) {
 879                if (proc->p_name)
 880                        return proc->p_name;
 881                else
 882                        return "NULL";
 883        } else
 884                return "no proc";
 885}
 886#endif
 887
 888/*
 889 * 0.  Initial state
 890 *
 891 *     Other FSM states can be visited zero or more times, but
 892 *     this state is visited exactly once for each RPC.
 893 */
 894static void
 895call_start(struct rpc_task *task)
 896{
 897        struct rpc_clnt *clnt = task->tk_client;
 898
 899        dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
 900                        clnt->cl_protname, clnt->cl_vers,
 901                        rpc_proc_name(task),
 902                        (RPC_IS_ASYNC(task) ? "async" : "sync"));
 903
 904        /* Increment call count */
 905        task->tk_msg.rpc_proc->p_count++;
 906        clnt->cl_stats->rpccnt++;
 907        task->tk_action = call_reserve;
 908}
 909
 910/*
 911 * 1.   Reserve an RPC call slot
 912 */
 913static void
 914call_reserve(struct rpc_task *task)
 915{
 916        dprint_status(task);
 917
 918        task->tk_status  = 0;
 919        task->tk_action  = call_reserveresult;
 920        xprt_reserve(task);
 921}
 922
 923/*
 924 * 1b.  Grok the result of xprt_reserve()
 925 */
 926static void
 927call_reserveresult(struct rpc_task *task)
 928{
 929        int status = task->tk_status;
 930
 931        dprint_status(task);
 932
 933        /*
 934         * After a call to xprt_reserve(), we must have either
 935         * a request slot or else an error status.
 936         */
 937        task->tk_status = 0;
 938        if (status >= 0) {
 939                if (task->tk_rqstp) {
 940                        task->tk_action = call_refresh;
 941                        return;
 942                }
 943
 944                printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
 945                                __func__, status);
 946                rpc_exit(task, -EIO);
 947                return;
 948        }
 949
 950        /*
 951         * Even though there was an error, we may have acquired
 952         * a request slot somehow.  Make sure not to leak it.
 953         */
 954        if (task->tk_rqstp) {
 955                printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
 956                                __func__, status);
 957                xprt_release(task);
 958        }
 959
 960        switch (status) {
 961        case -EAGAIN:   /* woken up; retry */
 962                task->tk_action = call_reserve;
 963                return;
 964        case -EIO:      /* probably a shutdown */
 965                break;
 966        default:
 967                printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
 968                                __func__, status);
 969                break;
 970        }
 971        rpc_exit(task, status);
 972}
 973
 974/*
 975 * 2.   Bind and/or refresh the credentials
 976 */
 977static void
 978call_refresh(struct rpc_task *task)
 979{
 980        dprint_status(task);
 981
 982        task->tk_action = call_refreshresult;
 983        task->tk_status = 0;
 984        task->tk_client->cl_stats->rpcauthrefresh++;
 985        rpcauth_refreshcred(task);
 986}
 987
 988/*
 989 * 2a.  Process the results of a credential refresh
 990 */
 991static void
 992call_refreshresult(struct rpc_task *task)
 993{
 994        int status = task->tk_status;
 995
 996        dprint_status(task);
 997
 998        task->tk_status = 0;
 999        task->tk_action = call_refresh;
1000        switch (status) {
1001        case 0:
1002                if (rpcauth_uptodatecred(task))
1003                        task->tk_action = call_allocate;
1004                return;
1005        case -ETIMEDOUT:
1006                rpc_delay(task, 3*HZ);
1007        case -EAGAIN:
1008                status = -EACCES;
1009                if (!task->tk_cred_retry)
1010                        break;
1011                task->tk_cred_retry--;
1012                dprintk("RPC: %5u %s: retry refresh creds\n",
1013                                task->tk_pid, __func__);
1014                return;
1015        }
1016        dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1017                                task->tk_pid, __func__, status);
1018        rpc_exit(task, status);
1019}
1020
1021/*
1022 * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1023 *      (Note: buffer memory is freed in xprt_release).
1024 */
1025static void
1026call_allocate(struct rpc_task *task)
1027{
1028        unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1029        struct rpc_rqst *req = task->tk_rqstp;
1030        struct rpc_xprt *xprt = task->tk_xprt;
1031        struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1032
1033        dprint_status(task);
1034
1035        task->tk_status = 0;
1036        task->tk_action = call_bind;
1037
1038        if (req->rq_buffer)
1039                return;
1040
1041        if (proc->p_proc != 0) {
1042                BUG_ON(proc->p_arglen == 0);
1043                if (proc->p_decode != NULL)
1044                        BUG_ON(proc->p_replen == 0);
1045        }
1046
1047        /*
1048         * Calculate the size (in quads) of the RPC call
1049         * and reply headers, and convert both values
1050         * to byte sizes.
1051         */
1052        req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1053        req->rq_callsize <<= 2;
1054        req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1055        req->rq_rcvsize <<= 2;
1056
1057        req->rq_buffer = xprt->ops->buf_alloc(task,
1058                                        req->rq_callsize + req->rq_rcvsize);
1059        if (req->rq_buffer != NULL)
1060                return;
1061
1062        dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1063
1064        if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1065                task->tk_action = call_allocate;
1066                rpc_delay(task, HZ>>4);
1067                return;
1068        }
1069
1070        rpc_exit(task, -ERESTARTSYS);
1071}
1072
1073static inline int
1074rpc_task_need_encode(struct rpc_task *task)
1075{
1076        return task->tk_rqstp->rq_snd_buf.len == 0;
1077}
1078
1079static inline void
1080rpc_task_force_reencode(struct rpc_task *task)
1081{
1082        task->tk_rqstp->rq_snd_buf.len = 0;
1083        task->tk_rqstp->rq_bytes_sent = 0;
1084}
1085
1086static inline void
1087rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1088{
1089        buf->head[0].iov_base = start;
1090        buf->head[0].iov_len = len;
1091        buf->tail[0].iov_len = 0;
1092        buf->page_len = 0;
1093        buf->flags = 0;
1094        buf->len = 0;
1095        buf->buflen = len;
1096}
1097
1098/*
1099 * 3.   Encode arguments of an RPC call
1100 */
1101static void
1102rpc_xdr_encode(struct rpc_task *task)
1103{
1104        struct rpc_rqst *req = task->tk_rqstp;
1105        kxdreproc_t     encode;
1106        __be32          *p;
1107
1108        dprint_status(task);
1109
1110        rpc_xdr_buf_init(&req->rq_snd_buf,
1111                         req->rq_buffer,
1112                         req->rq_callsize);
1113        rpc_xdr_buf_init(&req->rq_rcv_buf,
1114                         (char *)req->rq_buffer + req->rq_callsize,
1115                         req->rq_rcvsize);
1116
1117        p = rpc_encode_header(task);
1118        if (p == NULL) {
1119                printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1120                rpc_exit(task, -EIO);
1121                return;
1122        }
1123
1124        encode = task->tk_msg.rpc_proc->p_encode;
1125        if (encode == NULL)
1126                return;
1127
1128        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1129                        task->tk_msg.rpc_argp);
1130}
1131
1132/*
1133 * 4.   Get the server port number if not yet set
1134 */
1135static void
1136call_bind(struct rpc_task *task)
1137{
1138        struct rpc_xprt *xprt = task->tk_xprt;
1139
1140        dprint_status(task);
1141
1142        task->tk_action = call_connect;
1143        if (!xprt_bound(xprt)) {
1144                task->tk_action = call_bind_status;
1145                task->tk_timeout = xprt->bind_timeout;
1146                xprt->ops->rpcbind(task);
1147        }
1148}
1149
1150/*
1151 * 4a.  Sort out bind result
1152 */
1153static void
1154call_bind_status(struct rpc_task *task)
1155{
1156        int status = -EIO;
1157
1158        if (task->tk_status >= 0) {
1159                dprint_status(task);
1160                task->tk_status = 0;
1161                task->tk_action = call_connect;
1162                return;
1163        }
1164
1165        switch (task->tk_status) {
1166        case -ENOMEM:
1167                dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1168                rpc_delay(task, HZ >> 2);
1169                goto retry_timeout;
1170        case -EACCES:
1171                dprintk("RPC: %5u remote rpcbind: RPC program/version "
1172                                "unavailable\n", task->tk_pid);
1173                /* fail immediately if this is an RPC ping */
1174                if (task->tk_msg.rpc_proc->p_proc == 0) {
1175                        status = -EOPNOTSUPP;
1176                        break;
1177                }
1178                if (task->tk_rebind_retry == 0)
1179                        break;
1180                task->tk_rebind_retry--;
1181                rpc_delay(task, 3*HZ);
1182                goto retry_timeout;
1183        case -ETIMEDOUT:
1184                dprintk("RPC: %5u rpcbind request timed out\n",
1185                                task->tk_pid);
1186                goto retry_timeout;
1187        case -EPFNOSUPPORT:
1188                /* server doesn't support any rpcbind version we know of */
1189                dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1190                                task->tk_pid);
1191                break;
1192        case -EPROTONOSUPPORT:
1193                dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1194                                task->tk_pid);
1195                task->tk_status = 0;
1196                task->tk_action = call_bind;
1197                return;
1198        case -ECONNREFUSED:             /* connection problems */
1199        case -ECONNRESET:
1200        case -ENOTCONN:
1201        case -EHOSTDOWN:
1202        case -EHOSTUNREACH:
1203        case -ENETUNREACH:
1204        case -EPIPE:
1205                dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1206                                task->tk_pid, task->tk_status);
1207                if (!RPC_IS_SOFTCONN(task)) {
1208                        rpc_delay(task, 5*HZ);
1209                        goto retry_timeout;
1210                }
1211                status = task->tk_status;
1212                break;
1213        default:
1214                dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1215                                task->tk_pid, -task->tk_status);
1216        }
1217
1218        rpc_exit(task, status);
1219        return;
1220
1221retry_timeout:
1222        task->tk_action = call_timeout;
1223}
1224
1225/*
1226 * 4b.  Connect to the RPC server
1227 */
1228static void
1229call_connect(struct rpc_task *task)
1230{
1231        struct rpc_xprt *xprt = task->tk_xprt;
1232
1233        dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1234                        task->tk_pid, xprt,
1235                        (xprt_connected(xprt) ? "is" : "is not"));
1236
1237        task->tk_action = call_transmit;
1238        if (!xprt_connected(xprt)) {
1239                task->tk_action = call_connect_status;
1240                if (task->tk_status < 0)
1241                        return;
1242                xprt_connect(task);
1243        }
1244}
1245
1246/*
1247 * 4c.  Sort out connect result
1248 */
1249static void
1250call_connect_status(struct rpc_task *task)
1251{
1252        struct rpc_clnt *clnt = task->tk_client;
1253        int status = task->tk_status;
1254
1255        dprint_status(task);
1256
1257        task->tk_status = 0;
1258        if (status >= 0 || status == -EAGAIN) {
1259                clnt->cl_stats->netreconn++;
1260                task->tk_action = call_transmit;
1261                return;
1262        }
1263
1264        switch (status) {
1265                /* if soft mounted, test if we've timed out */
1266        case -ETIMEDOUT:
1267                task->tk_action = call_timeout;
1268                break;
1269        default:
1270                rpc_exit(task, -EIO);
1271        }
1272}
1273
1274/*
1275 * 5.   Transmit the RPC request, and wait for reply
1276 */
1277static void
1278call_transmit(struct rpc_task *task)
1279{
1280        dprint_status(task);
1281
1282        task->tk_action = call_status;
1283        if (task->tk_status < 0)
1284                return;
1285        task->tk_status = xprt_prepare_transmit(task);
1286        if (task->tk_status != 0)
1287                return;
1288        task->tk_action = call_transmit_status;
1289        /* Encode here so that rpcsec_gss can use correct sequence number. */
1290        if (rpc_task_need_encode(task)) {
1291                BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
1292                rpc_xdr_encode(task);
1293                /* Did the encode result in an error condition? */
1294                if (task->tk_status != 0) {
1295                        /* Was the error nonfatal? */
1296                        if (task->tk_status == -EAGAIN)
1297                                rpc_delay(task, HZ >> 4);
1298                        else
1299                                rpc_exit(task, task->tk_status);
1300                        return;
1301                }
1302        }
1303        xprt_transmit(task);
1304        if (task->tk_status < 0)
1305                return;
1306        /*
1307         * On success, ensure that we call xprt_end_transmit() before sleeping
1308         * in order to allow access to the socket to other RPC requests.
1309         */
1310        call_transmit_status(task);
1311        if (rpc_reply_expected(task))
1312                return;
1313        task->tk_action = rpc_exit_task;
1314        rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
1315}
1316
1317/*
1318 * 5a.  Handle cleanup after a transmission
1319 */
1320static void
1321call_transmit_status(struct rpc_task *task)
1322{
1323        task->tk_action = call_status;
1324
1325        /*
1326         * Common case: success.  Force the compiler to put this
1327         * test first.
1328         */
1329        if (task->tk_status == 0) {
1330                xprt_end_transmit(task);
1331                rpc_task_force_reencode(task);
1332                return;
1333        }
1334
1335        switch (task->tk_status) {
1336        case -EAGAIN:
1337                break;
1338        default:
1339                dprint_status(task);
1340                xprt_end_transmit(task);
1341                rpc_task_force_reencode(task);
1342                break;
1343                /*
1344                 * Special cases: if we've been waiting on the
1345                 * socket's write_space() callback, or if the
1346                 * socket just returned a connection error,
1347                 * then hold onto the transport lock.
1348                 */
1349        case -ECONNREFUSED:
1350        case -EHOSTDOWN:
1351        case -EHOSTUNREACH:
1352        case -ENETUNREACH:
1353                if (RPC_IS_SOFTCONN(task)) {
1354                        xprt_end_transmit(task);
1355                        rpc_exit(task, task->tk_status);
1356                        break;
1357                }
1358        case -ECONNRESET:
1359        case -ENOTCONN:
1360        case -EPIPE:
1361                rpc_task_force_reencode(task);
1362        }
1363}
1364
1365#if defined(CONFIG_NFS_V4_1)
1366/*
1367 * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
1368 * addition, disconnect on connectivity errors.
1369 */
1370static void
1371call_bc_transmit(struct rpc_task *task)
1372{
1373        struct rpc_rqst *req = task->tk_rqstp;
1374
1375        BUG_ON(task->tk_status != 0);
1376        task->tk_status = xprt_prepare_transmit(task);
1377        if (task->tk_status == -EAGAIN) {
1378                /*
1379                 * Could not reserve the transport. Try again after the
1380                 * transport is released.
1381                 */
1382                task->tk_status = 0;
1383                task->tk_action = call_bc_transmit;
1384                return;
1385        }
1386
1387        task->tk_action = rpc_exit_task;
1388        if (task->tk_status < 0) {
1389                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1390                        "error: %d\n", task->tk_status);
1391                return;
1392        }
1393
1394        xprt_transmit(task);
1395        xprt_end_transmit(task);
1396        dprint_status(task);
1397        switch (task->tk_status) {
1398        case 0:
1399                /* Success */
1400                break;
1401        case -EHOSTDOWN:
1402        case -EHOSTUNREACH:
1403        case -ENETUNREACH:
1404        case -ETIMEDOUT:
1405                /*
1406                 * Problem reaching the server.  Disconnect and let the
1407                 * forechannel reestablish the connection.  The server will
1408                 * have to retransmit the backchannel request and we'll
1409                 * reprocess it.  Since these ops are idempotent, there's no
1410                 * need to cache our reply at this time.
1411                 */
1412                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1413                        "error: %d\n", task->tk_status);
1414                xprt_conditional_disconnect(task->tk_xprt,
1415                        req->rq_connect_cookie);
1416                break;
1417        default:
1418                /*
1419                 * We were unable to reply and will have to drop the
1420                 * request.  The server should reconnect and retransmit.
1421                 */
1422                BUG_ON(task->tk_status == -EAGAIN);
1423                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1424                        "error: %d\n", task->tk_status);
1425                break;
1426        }
1427        rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1428}
1429#endif /* CONFIG_NFS_V4_1 */
1430
1431/*
1432 * 6.   Sort out the RPC call status
1433 */
1434static void
1435call_status(struct rpc_task *task)
1436{
1437        struct rpc_clnt *clnt = task->tk_client;
1438        struct rpc_rqst *req = task->tk_rqstp;
1439        int             status;
1440
1441        if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1442                task->tk_status = req->rq_reply_bytes_recvd;
1443
1444        dprint_status(task);
1445
1446        status = task->tk_status;
1447        if (status >= 0) {
1448                task->tk_action = call_decode;
1449                return;
1450        }
1451
1452        task->tk_status = 0;
1453        switch(status) {
1454        case -EHOSTDOWN:
1455        case -EHOSTUNREACH:
1456        case -ENETUNREACH:
1457                /*
1458                 * Delay any retries for 3 seconds, then handle as if it
1459                 * were a timeout.
1460                 */
1461                rpc_delay(task, 3*HZ);
1462        case -ETIMEDOUT:
1463                task->tk_action = call_timeout;
1464                if (task->tk_client->cl_discrtry)
1465                        xprt_conditional_disconnect(task->tk_xprt,
1466                                        req->rq_connect_cookie);
1467                break;
1468        case -ECONNRESET:
1469        case -ECONNREFUSED:
1470                rpc_force_rebind(clnt);
1471                rpc_delay(task, 3*HZ);
1472        case -EPIPE:
1473        case -ENOTCONN:
1474                task->tk_action = call_bind;
1475                break;
1476        case -EAGAIN:
1477                task->tk_action = call_transmit;
1478                break;
1479        case -EIO:
1480                /* shutdown or soft timeout */
1481                rpc_exit(task, status);
1482                break;
1483        default:
1484                if (clnt->cl_chatty)
1485                        printk("%s: RPC call returned error %d\n",
1486                               clnt->cl_protname, -status);
1487                rpc_exit(task, status);
1488        }
1489}
1490
1491/*
1492 * 6a.  Handle RPC timeout
1493 *      We do not release the request slot, so we keep using the
1494 *      same XID for all retransmits.
1495 */
1496static void
1497call_timeout(struct rpc_task *task)
1498{
1499        struct rpc_clnt *clnt = task->tk_client;
1500
1501        if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1502                dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1503                goto retry;
1504        }
1505
1506        dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1507        task->tk_timeouts++;
1508
1509        if (RPC_IS_SOFTCONN(task)) {
1510                rpc_exit(task, -ETIMEDOUT);
1511                return;
1512        }
1513        if (RPC_IS_SOFT(task)) {
1514                if (clnt->cl_chatty)
1515                        printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1516                                clnt->cl_protname, clnt->cl_server);
1517                if (task->tk_flags & RPC_TASK_TIMEOUT)
1518                        rpc_exit(task, -ETIMEDOUT);
1519                else
1520                        rpc_exit(task, -EIO);
1521                return;
1522        }
1523
1524        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1525                task->tk_flags |= RPC_CALL_MAJORSEEN;
1526                if (clnt->cl_chatty)
1527                        printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1528                        clnt->cl_protname, clnt->cl_server);
1529        }
1530        rpc_force_rebind(clnt);
1531        /*
1532         * Did our request time out due to an RPCSEC_GSS out-of-sequence
1533         * event? RFC2203 requires the server to drop all such requests.
1534         */
1535        rpcauth_invalcred(task);
1536
1537retry:
1538        clnt->cl_stats->rpcretrans++;
1539        task->tk_action = call_bind;
1540        task->tk_status = 0;
1541}
1542
1543/*
1544 * 7.   Decode the RPC reply
1545 */
1546static void
1547call_decode(struct rpc_task *task)
1548{
1549        struct rpc_clnt *clnt = task->tk_client;
1550        struct rpc_rqst *req = task->tk_rqstp;
1551        kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
1552        __be32          *p;
1553
1554        dprintk("RPC: %5u call_decode (status %d)\n",
1555                        task->tk_pid, task->tk_status);
1556
1557        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1558                if (clnt->cl_chatty)
1559                        printk(KERN_NOTICE "%s: server %s OK\n",
1560                                clnt->cl_protname, clnt->cl_server);
1561                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1562        }
1563
1564        /*
1565         * Ensure that we see all writes made by xprt_complete_rqst()
1566         * before it changed req->rq_reply_bytes_recvd.
1567         */
1568        smp_rmb();
1569        req->rq_rcv_buf.len = req->rq_private_buf.len;
1570
1571        /* Check that the softirq receive buffer is valid */
1572        WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1573                                sizeof(req->rq_rcv_buf)) != 0);
1574
1575        if (req->rq_rcv_buf.len < 12) {
1576                if (!RPC_IS_SOFT(task)) {
1577                        task->tk_action = call_bind;
1578                        clnt->cl_stats->rpcretrans++;
1579                        goto out_retry;
1580                }
1581                dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1582                                clnt->cl_protname, task->tk_status);
1583                task->tk_action = call_timeout;
1584                goto out_retry;
1585        }
1586
1587        p = rpc_verify_header(task);
1588        if (IS_ERR(p)) {
1589                if (p == ERR_PTR(-EAGAIN))
1590                        goto out_retry;
1591                return;
1592        }
1593
1594        task->tk_action = rpc_exit_task;
1595
1596        if (decode) {
1597                task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1598                                                      task->tk_msg.rpc_resp);
1599        }
1600        dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1601                        task->tk_status);
1602        return;
1603out_retry:
1604        task->tk_status = 0;
1605        /* Note: rpc_verify_header() may have freed the RPC slot */
1606        if (task->tk_rqstp == req) {
1607                req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
1608                if (task->tk_client->cl_discrtry)
1609                        xprt_conditional_disconnect(task->tk_xprt,
1610                                        req->rq_connect_cookie);
1611        }
1612}
1613
1614static __be32 *
1615rpc_encode_header(struct rpc_task *task)
1616{
1617        struct rpc_clnt *clnt = task->tk_client;
1618        struct rpc_rqst *req = task->tk_rqstp;
1619        __be32          *p = req->rq_svec[0].iov_base;
1620
1621        /* FIXME: check buffer size? */
1622
1623        p = xprt_skip_transport_header(task->tk_xprt, p);
1624        *p++ = req->rq_xid;             /* XID */
1625        *p++ = htonl(RPC_CALL);         /* CALL */
1626        *p++ = htonl(RPC_VERSION);      /* RPC version */
1627        *p++ = htonl(clnt->cl_prog);    /* program number */
1628        *p++ = htonl(clnt->cl_vers);    /* program version */
1629        *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1630        p = rpcauth_marshcred(task, p);
1631        req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1632        return p;
1633}
1634
1635static __be32 *
1636rpc_verify_header(struct rpc_task *task)
1637{
1638        struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1639        int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1640        __be32  *p = iov->iov_base;
1641        u32 n;
1642        int error = -EACCES;
1643
1644        if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1645                /* RFC-1014 says that the representation of XDR data must be a
1646                 * multiple of four bytes
1647                 * - if it isn't pointer subtraction in the NFS client may give
1648                 *   undefined results
1649                 */
1650                dprintk("RPC: %5u %s: XDR representation not a multiple of"
1651                       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
1652                       task->tk_rqstp->rq_rcv_buf.len);
1653                goto out_eio;
1654        }
1655        if ((len -= 3) < 0)
1656                goto out_overflow;
1657
1658        p += 1; /* skip XID */
1659        if ((n = ntohl(*p++)) != RPC_REPLY) {
1660                dprintk("RPC: %5u %s: not an RPC reply: %x\n",
1661                        task->tk_pid, __func__, n);
1662                goto out_garbage;
1663        }
1664
1665        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1666                if (--len < 0)
1667                        goto out_overflow;
1668                switch ((n = ntohl(*p++))) {
1669                        case RPC_AUTH_ERROR:
1670                                break;
1671                        case RPC_MISMATCH:
1672                                dprintk("RPC: %5u %s: RPC call version "
1673                                                "mismatch!\n",
1674                                                task->tk_pid, __func__);
1675                                error = -EPROTONOSUPPORT;
1676                                goto out_err;
1677                        default:
1678                                dprintk("RPC: %5u %s: RPC call rejected, "
1679                                                "unknown error: %x\n",
1680                                                task->tk_pid, __func__, n);
1681                                goto out_eio;
1682                }
1683                if (--len < 0)
1684                        goto out_overflow;
1685                switch ((n = ntohl(*p++))) {
1686                case RPC_AUTH_REJECTEDCRED:
1687                case RPC_AUTH_REJECTEDVERF:
1688                case RPCSEC_GSS_CREDPROBLEM:
1689                case RPCSEC_GSS_CTXPROBLEM:
1690                        if (!task->tk_cred_retry)
1691                                break;
1692                        task->tk_cred_retry--;
1693                        dprintk("RPC: %5u %s: retry stale creds\n",
1694                                        task->tk_pid, __func__);
1695                        rpcauth_invalcred(task);
1696                        /* Ensure we obtain a new XID! */
1697                        xprt_release(task);
1698                        task->tk_action = call_reserve;
1699                        goto out_retry;
1700                case RPC_AUTH_BADCRED:
1701                case RPC_AUTH_BADVERF:
1702                        /* possibly garbled cred/verf? */
1703                        if (!task->tk_garb_retry)
1704                                break;
1705                        task->tk_garb_retry--;
1706                        dprintk("RPC: %5u %s: retry garbled creds\n",
1707                                        task->tk_pid, __func__);
1708                        task->tk_action = call_bind;
1709                        goto out_retry;
1710                case RPC_AUTH_TOOWEAK:
1711                        printk(KERN_NOTICE "RPC: server %s requires stronger "
1712                               "authentication.\n", task->tk_client->cl_server);
1713                        break;
1714                default:
1715                        dprintk("RPC: %5u %s: unknown auth error: %x\n",
1716                                        task->tk_pid, __func__, n);
1717                        error = -EIO;
1718                }
1719                dprintk("RPC: %5u %s: call rejected %d\n",
1720                                task->tk_pid, __func__, n);
1721                goto out_err;
1722        }
1723        if (!(p = rpcauth_checkverf(task, p))) {
1724                dprintk("RPC: %5u %s: auth check failed\n",
1725                                task->tk_pid, __func__);
1726                goto out_garbage;               /* bad verifier, retry */
1727        }
1728        len = p - (__be32 *)iov->iov_base - 1;
1729        if (len < 0)
1730                goto out_overflow;
1731        switch ((n = ntohl(*p++))) {
1732        case RPC_SUCCESS:
1733                return p;
1734        case RPC_PROG_UNAVAIL:
1735                dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
1736                                task->tk_pid, __func__,
1737                                (unsigned int)task->tk_client->cl_prog,
1738                                task->tk_client->cl_server);
1739                error = -EPFNOSUPPORT;
1740                goto out_err;
1741        case RPC_PROG_MISMATCH:
1742                dprintk("RPC: %5u %s: program %u, version %u unsupported by "
1743                                "server %s\n", task->tk_pid, __func__,
1744                                (unsigned int)task->tk_client->cl_prog,
1745                                (unsigned int)task->tk_client->cl_vers,
1746                                task->tk_client->cl_server);
1747                error = -EPROTONOSUPPORT;
1748                goto out_err;
1749        case RPC_PROC_UNAVAIL:
1750                dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
1751                                "version %u on server %s\n",
1752                                task->tk_pid, __func__,
1753                                rpc_proc_name(task),
1754                                task->tk_client->cl_prog,
1755                                task->tk_client->cl_vers,
1756                                task->tk_client->cl_server);
1757                error = -EOPNOTSUPP;
1758                goto out_err;
1759        case RPC_GARBAGE_ARGS:
1760                dprintk("RPC: %5u %s: server saw garbage\n",
1761                                task->tk_pid, __func__);
1762                break;                  /* retry */
1763        default:
1764                dprintk("RPC: %5u %s: server accept status: %x\n",
1765                                task->tk_pid, __func__, n);
1766                /* Also retry */
1767        }
1768
1769out_garbage:
1770        task->tk_client->cl_stats->rpcgarbage++;
1771        if (task->tk_garb_retry) {
1772                task->tk_garb_retry--;
1773                dprintk("RPC: %5u %s: retrying\n",
1774                                task->tk_pid, __func__);
1775                task->tk_action = call_bind;
1776out_retry:
1777                return ERR_PTR(-EAGAIN);
1778        }
1779out_eio:
1780        error = -EIO;
1781out_err:
1782        rpc_exit(task, error);
1783        dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
1784                        __func__, error);
1785        return ERR_PTR(error);
1786out_overflow:
1787        dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
1788                        __func__);
1789        goto out_garbage;
1790}
1791
1792static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
1793{
1794}
1795
1796static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
1797{
1798        return 0;
1799}
1800
1801static struct rpc_procinfo rpcproc_null = {
1802        .p_encode = rpcproc_encode_null,
1803        .p_decode = rpcproc_decode_null,
1804};
1805
1806static int rpc_ping(struct rpc_clnt *clnt)
1807{
1808        struct rpc_message msg = {
1809                .rpc_proc = &rpcproc_null,
1810        };
1811        int err;
1812        msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1813        err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
1814        put_rpccred(msg.rpc_cred);
1815        return err;
1816}
1817
1818struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
1819{
1820        struct rpc_message msg = {
1821                .rpc_proc = &rpcproc_null,
1822                .rpc_cred = cred,
1823        };
1824        struct rpc_task_setup task_setup_data = {
1825                .rpc_client = clnt,
1826                .rpc_message = &msg,
1827                .callback_ops = &rpc_default_ops,
1828                .flags = flags,
1829        };
1830        return rpc_run_task(&task_setup_data);
1831}
1832EXPORT_SYMBOL_GPL(rpc_call_null);
1833
1834#ifdef RPC_DEBUG
1835static void rpc_show_header(void)
1836{
1837        printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
1838                "-timeout ---ops--\n");
1839}
1840
1841static void rpc_show_task(const struct rpc_clnt *clnt,
1842                          const struct rpc_task *task)
1843{
1844        const char *rpc_waitq = "none";
1845
1846        if (RPC_IS_QUEUED(task))
1847                rpc_waitq = rpc_qname(task->tk_waitqueue);
1848
1849        printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
1850                task->tk_pid, task->tk_flags, task->tk_status,
1851                clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
1852                clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
1853                task->tk_action, rpc_waitq);
1854}
1855
1856void rpc_show_tasks(void)
1857{
1858        struct rpc_clnt *clnt;
1859        struct rpc_task *task;
1860        int header = 0;
1861
1862        spin_lock(&rpc_client_lock);
1863        list_for_each_entry(clnt, &all_clients, cl_clients) {
1864                spin_lock(&clnt->cl_lock);
1865                list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
1866                        if (!header) {
1867                                rpc_show_header();
1868                                header++;
1869                        }
1870                        rpc_show_task(clnt, task);
1871                }
1872                spin_unlock(&clnt->cl_lock);
1873        }
1874        spin_unlock(&rpc_client_lock);
1875}
1876#endif
1877