linux/net/sunrpc/svc.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 * linux/net/sunrpc/svc.c
   4 *
   5 * High-level RPC service routines
   6 *
   7 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   8 *
   9 * Multiple threads pools and NUMAisation
  10 * Copyright (c) 2006 Silicon Graphics, Inc.
  11 * by Greg Banks <gnb@melbourne.sgi.com>
  12 */
  13
  14#include <linux/linkage.h>
  15#include <linux/sched/signal.h>
  16#include <linux/errno.h>
  17#include <linux/net.h>
  18#include <linux/in.h>
  19#include <linux/mm.h>
  20#include <linux/interrupt.h>
  21#include <linux/module.h>
  22#include <linux/kthread.h>
  23#include <linux/slab.h>
  24
  25#include <linux/sunrpc/types.h>
  26#include <linux/sunrpc/xdr.h>
  27#include <linux/sunrpc/stats.h>
  28#include <linux/sunrpc/svcsock.h>
  29#include <linux/sunrpc/clnt.h>
  30#include <linux/sunrpc/bc_xprt.h>
  31
  32#include <trace/events/sunrpc.h>
  33
  34#include "fail.h"
  35
  36#define RPCDBG_FACILITY RPCDBG_SVCDSP
  37
  38static void svc_unregister(const struct svc_serv *serv, struct net *net);
  39
  40#define svc_serv_is_pooled(serv)    ((serv)->sv_ops->svo_function)
  41
  42#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  43
  44/*
  45 * Structure for mapping cpus to pools and vice versa.
  46 * Setup once during sunrpc initialisation.
  47 */
  48struct svc_pool_map svc_pool_map = {
  49        .mode = SVC_POOL_DEFAULT
  50};
  51EXPORT_SYMBOL_GPL(svc_pool_map);
  52
  53static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  54
  55static int
  56param_set_pool_mode(const char *val, const struct kernel_param *kp)
  57{
  58        int *ip = (int *)kp->arg;
  59        struct svc_pool_map *m = &svc_pool_map;
  60        int err;
  61
  62        mutex_lock(&svc_pool_map_mutex);
  63
  64        err = -EBUSY;
  65        if (m->count)
  66                goto out;
  67
  68        err = 0;
  69        if (!strncmp(val, "auto", 4))
  70                *ip = SVC_POOL_AUTO;
  71        else if (!strncmp(val, "global", 6))
  72                *ip = SVC_POOL_GLOBAL;
  73        else if (!strncmp(val, "percpu", 6))
  74                *ip = SVC_POOL_PERCPU;
  75        else if (!strncmp(val, "pernode", 7))
  76                *ip = SVC_POOL_PERNODE;
  77        else
  78                err = -EINVAL;
  79
  80out:
  81        mutex_unlock(&svc_pool_map_mutex);
  82        return err;
  83}
  84
  85static int
  86param_get_pool_mode(char *buf, const struct kernel_param *kp)
  87{
  88        int *ip = (int *)kp->arg;
  89
  90        switch (*ip)
  91        {
  92        case SVC_POOL_AUTO:
  93                return strlcpy(buf, "auto\n", 20);
  94        case SVC_POOL_GLOBAL:
  95                return strlcpy(buf, "global\n", 20);
  96        case SVC_POOL_PERCPU:
  97                return strlcpy(buf, "percpu\n", 20);
  98        case SVC_POOL_PERNODE:
  99                return strlcpy(buf, "pernode\n", 20);
 100        default:
 101                return sprintf(buf, "%d\n", *ip);
 102        }
 103}
 104
 105module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 106                 &svc_pool_map.mode, 0644);
 107
 108/*
 109 * Detect best pool mapping mode heuristically,
 110 * according to the machine's topology.
 111 */
 112static int
 113svc_pool_map_choose_mode(void)
 114{
 115        unsigned int node;
 116
 117        if (nr_online_nodes > 1) {
 118                /*
 119                 * Actually have multiple NUMA nodes,
 120                 * so split pools on NUMA node boundaries
 121                 */
 122                return SVC_POOL_PERNODE;
 123        }
 124
 125        node = first_online_node;
 126        if (nr_cpus_node(node) > 2) {
 127                /*
 128                 * Non-trivial SMP, or CONFIG_NUMA on
 129                 * non-NUMA hardware, e.g. with a generic
 130                 * x86_64 kernel on Xeons.  In this case we
 131                 * want to divide the pools on cpu boundaries.
 132                 */
 133                return SVC_POOL_PERCPU;
 134        }
 135
 136        /* default: one global pool */
 137        return SVC_POOL_GLOBAL;
 138}
 139
 140/*
 141 * Allocate the to_pool[] and pool_to[] arrays.
 142 * Returns 0 on success or an errno.
 143 */
 144static int
 145svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 146{
 147        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 148        if (!m->to_pool)
 149                goto fail;
 150        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 151        if (!m->pool_to)
 152                goto fail_free;
 153
 154        return 0;
 155
 156fail_free:
 157        kfree(m->to_pool);
 158        m->to_pool = NULL;
 159fail:
 160        return -ENOMEM;
 161}
 162
 163/*
 164 * Initialise the pool map for SVC_POOL_PERCPU mode.
 165 * Returns number of pools or <0 on error.
 166 */
 167static int
 168svc_pool_map_init_percpu(struct svc_pool_map *m)
 169{
 170        unsigned int maxpools = nr_cpu_ids;
 171        unsigned int pidx = 0;
 172        unsigned int cpu;
 173        int err;
 174
 175        err = svc_pool_map_alloc_arrays(m, maxpools);
 176        if (err)
 177                return err;
 178
 179        for_each_online_cpu(cpu) {
 180                BUG_ON(pidx >= maxpools);
 181                m->to_pool[cpu] = pidx;
 182                m->pool_to[pidx] = cpu;
 183                pidx++;
 184        }
 185        /* cpus brought online later all get mapped to pool0, sorry */
 186
 187        return pidx;
 188};
 189
 190
 191/*
 192 * Initialise the pool map for SVC_POOL_PERNODE mode.
 193 * Returns number of pools or <0 on error.
 194 */
 195static int
 196svc_pool_map_init_pernode(struct svc_pool_map *m)
 197{
 198        unsigned int maxpools = nr_node_ids;
 199        unsigned int pidx = 0;
 200        unsigned int node;
 201        int err;
 202
 203        err = svc_pool_map_alloc_arrays(m, maxpools);
 204        if (err)
 205                return err;
 206
 207        for_each_node_with_cpus(node) {
 208                /* some architectures (e.g. SN2) have cpuless nodes */
 209                BUG_ON(pidx > maxpools);
 210                m->to_pool[node] = pidx;
 211                m->pool_to[pidx] = node;
 212                pidx++;
 213        }
 214        /* nodes brought online later all get mapped to pool0, sorry */
 215
 216        return pidx;
 217}
 218
 219
 220/*
 221 * Add a reference to the global map of cpus to pools (and
 222 * vice versa).  Initialise the map if we're the first user.
 223 * Returns the number of pools.
 224 */
 225unsigned int
 226svc_pool_map_get(void)
 227{
 228        struct svc_pool_map *m = &svc_pool_map;
 229        int npools = -1;
 230
 231        mutex_lock(&svc_pool_map_mutex);
 232
 233        if (m->count++) {
 234                mutex_unlock(&svc_pool_map_mutex);
 235                return m->npools;
 236        }
 237
 238        if (m->mode == SVC_POOL_AUTO)
 239                m->mode = svc_pool_map_choose_mode();
 240
 241        switch (m->mode) {
 242        case SVC_POOL_PERCPU:
 243                npools = svc_pool_map_init_percpu(m);
 244                break;
 245        case SVC_POOL_PERNODE:
 246                npools = svc_pool_map_init_pernode(m);
 247                break;
 248        }
 249
 250        if (npools < 0) {
 251                /* default, or memory allocation failure */
 252                npools = 1;
 253                m->mode = SVC_POOL_GLOBAL;
 254        }
 255        m->npools = npools;
 256
 257        mutex_unlock(&svc_pool_map_mutex);
 258        return m->npools;
 259}
 260EXPORT_SYMBOL_GPL(svc_pool_map_get);
 261
 262/*
 263 * Drop a reference to the global map of cpus to pools.
 264 * When the last reference is dropped, the map data is
 265 * freed; this allows the sysadmin to change the pool
 266 * mode using the pool_mode module option without
 267 * rebooting or re-loading sunrpc.ko.
 268 */
 269void
 270svc_pool_map_put(void)
 271{
 272        struct svc_pool_map *m = &svc_pool_map;
 273
 274        mutex_lock(&svc_pool_map_mutex);
 275
 276        if (!--m->count) {
 277                kfree(m->to_pool);
 278                m->to_pool = NULL;
 279                kfree(m->pool_to);
 280                m->pool_to = NULL;
 281                m->npools = 0;
 282        }
 283
 284        mutex_unlock(&svc_pool_map_mutex);
 285}
 286EXPORT_SYMBOL_GPL(svc_pool_map_put);
 287
 288static int svc_pool_map_get_node(unsigned int pidx)
 289{
 290        const struct svc_pool_map *m = &svc_pool_map;
 291
 292        if (m->count) {
 293                if (m->mode == SVC_POOL_PERCPU)
 294                        return cpu_to_node(m->pool_to[pidx]);
 295                if (m->mode == SVC_POOL_PERNODE)
 296                        return m->pool_to[pidx];
 297        }
 298        return NUMA_NO_NODE;
 299}
 300/*
 301 * Set the given thread's cpus_allowed mask so that it
 302 * will only run on cpus in the given pool.
 303 */
 304static inline void
 305svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 306{
 307        struct svc_pool_map *m = &svc_pool_map;
 308        unsigned int node = m->pool_to[pidx];
 309
 310        /*
 311         * The caller checks for sv_nrpools > 1, which
 312         * implies that we've been initialized.
 313         */
 314        WARN_ON_ONCE(m->count == 0);
 315        if (m->count == 0)
 316                return;
 317
 318        switch (m->mode) {
 319        case SVC_POOL_PERCPU:
 320        {
 321                set_cpus_allowed_ptr(task, cpumask_of(node));
 322                break;
 323        }
 324        case SVC_POOL_PERNODE:
 325        {
 326                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 327                break;
 328        }
 329        }
 330}
 331
 332/*
 333 * Use the mapping mode to choose a pool for a given CPU.
 334 * Used when enqueueing an incoming RPC.  Always returns
 335 * a non-NULL pool pointer.
 336 */
 337struct svc_pool *
 338svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 339{
 340        struct svc_pool_map *m = &svc_pool_map;
 341        unsigned int pidx = 0;
 342
 343        /*
 344         * An uninitialised map happens in a pure client when
 345         * lockd is brought up, so silently treat it the
 346         * same as SVC_POOL_GLOBAL.
 347         */
 348        if (svc_serv_is_pooled(serv)) {
 349                switch (m->mode) {
 350                case SVC_POOL_PERCPU:
 351                        pidx = m->to_pool[cpu];
 352                        break;
 353                case SVC_POOL_PERNODE:
 354                        pidx = m->to_pool[cpu_to_node(cpu)];
 355                        break;
 356                }
 357        }
 358        return &serv->sv_pools[pidx % serv->sv_nrpools];
 359}
 360
 361int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
 362{
 363        int err;
 364
 365        err = rpcb_create_local(net);
 366        if (err)
 367                return err;
 368
 369        /* Remove any stale portmap registrations */
 370        svc_unregister(serv, net);
 371        return 0;
 372}
 373EXPORT_SYMBOL_GPL(svc_rpcb_setup);
 374
 375void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
 376{
 377        svc_unregister(serv, net);
 378        rpcb_put_local(net);
 379}
 380EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
 381
 382static int svc_uses_rpcbind(struct svc_serv *serv)
 383{
 384        struct svc_program      *progp;
 385        unsigned int            i;
 386
 387        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 388                for (i = 0; i < progp->pg_nvers; i++) {
 389                        if (progp->pg_vers[i] == NULL)
 390                                continue;
 391                        if (!progp->pg_vers[i]->vs_hidden)
 392                                return 1;
 393                }
 394        }
 395
 396        return 0;
 397}
 398
 399int svc_bind(struct svc_serv *serv, struct net *net)
 400{
 401        if (!svc_uses_rpcbind(serv))
 402                return 0;
 403        return svc_rpcb_setup(serv, net);
 404}
 405EXPORT_SYMBOL_GPL(svc_bind);
 406
 407#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 408static void
 409__svc_init_bc(struct svc_serv *serv)
 410{
 411        INIT_LIST_HEAD(&serv->sv_cb_list);
 412        spin_lock_init(&serv->sv_cb_lock);
 413        init_waitqueue_head(&serv->sv_cb_waitq);
 414}
 415#else
 416static void
 417__svc_init_bc(struct svc_serv *serv)
 418{
 419}
 420#endif
 421
 422/*
 423 * Create an RPC service
 424 */
 425static struct svc_serv *
 426__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 427             const struct svc_serv_ops *ops)
 428{
 429        struct svc_serv *serv;
 430        unsigned int vers;
 431        unsigned int xdrsize;
 432        unsigned int i;
 433
 434        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 435                return NULL;
 436        serv->sv_name      = prog->pg_name;
 437        serv->sv_program   = prog;
 438        serv->sv_nrthreads = 1;
 439        serv->sv_stats     = prog->pg_stats;
 440        if (bufsize > RPCSVC_MAXPAYLOAD)
 441                bufsize = RPCSVC_MAXPAYLOAD;
 442        serv->sv_max_payload = bufsize? bufsize : 4096;
 443        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 444        serv->sv_ops = ops;
 445        xdrsize = 0;
 446        while (prog) {
 447                prog->pg_lovers = prog->pg_nvers-1;
 448                for (vers=0; vers<prog->pg_nvers ; vers++)
 449                        if (prog->pg_vers[vers]) {
 450                                prog->pg_hivers = vers;
 451                                if (prog->pg_lovers > vers)
 452                                        prog->pg_lovers = vers;
 453                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 454                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 455                        }
 456                prog = prog->pg_next;
 457        }
 458        serv->sv_xdrsize   = xdrsize;
 459        INIT_LIST_HEAD(&serv->sv_tempsocks);
 460        INIT_LIST_HEAD(&serv->sv_permsocks);
 461        timer_setup(&serv->sv_temptimer, NULL, 0);
 462        spin_lock_init(&serv->sv_lock);
 463
 464        __svc_init_bc(serv);
 465
 466        serv->sv_nrpools = npools;
 467        serv->sv_pools =
 468                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 469                        GFP_KERNEL);
 470        if (!serv->sv_pools) {
 471                kfree(serv);
 472                return NULL;
 473        }
 474
 475        for (i = 0; i < serv->sv_nrpools; i++) {
 476                struct svc_pool *pool = &serv->sv_pools[i];
 477
 478                dprintk("svc: initialising pool %u for %s\n",
 479                                i, serv->sv_name);
 480
 481                pool->sp_id = i;
 482                INIT_LIST_HEAD(&pool->sp_sockets);
 483                INIT_LIST_HEAD(&pool->sp_all_threads);
 484                spin_lock_init(&pool->sp_lock);
 485        }
 486
 487        return serv;
 488}
 489
 490struct svc_serv *
 491svc_create(struct svc_program *prog, unsigned int bufsize,
 492           const struct svc_serv_ops *ops)
 493{
 494        return __svc_create(prog, bufsize, /*npools*/1, ops);
 495}
 496EXPORT_SYMBOL_GPL(svc_create);
 497
 498struct svc_serv *
 499svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 500                  const struct svc_serv_ops *ops)
 501{
 502        struct svc_serv *serv;
 503        unsigned int npools = svc_pool_map_get();
 504
 505        serv = __svc_create(prog, bufsize, npools, ops);
 506        if (!serv)
 507                goto out_err;
 508        return serv;
 509out_err:
 510        svc_pool_map_put();
 511        return NULL;
 512}
 513EXPORT_SYMBOL_GPL(svc_create_pooled);
 514
 515void svc_shutdown_net(struct svc_serv *serv, struct net *net)
 516{
 517        svc_close_net(serv, net);
 518
 519        if (serv->sv_ops->svo_shutdown)
 520                serv->sv_ops->svo_shutdown(serv, net);
 521}
 522EXPORT_SYMBOL_GPL(svc_shutdown_net);
 523
 524/*
 525 * Destroy an RPC service. Should be called with appropriate locking to
 526 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 527 */
 528void
 529svc_destroy(struct svc_serv *serv)
 530{
 531        dprintk("svc: svc_destroy(%s, %d)\n",
 532                                serv->sv_program->pg_name,
 533                                serv->sv_nrthreads);
 534
 535        if (serv->sv_nrthreads) {
 536                if (--(serv->sv_nrthreads) != 0) {
 537                        svc_sock_update_bufs(serv);
 538                        return;
 539                }
 540        } else
 541                printk("svc_destroy: no threads for serv=%p!\n", serv);
 542
 543        del_timer_sync(&serv->sv_temptimer);
 544
 545        /*
 546         * The last user is gone and thus all sockets have to be destroyed to
 547         * the point. Check this.
 548         */
 549        BUG_ON(!list_empty(&serv->sv_permsocks));
 550        BUG_ON(!list_empty(&serv->sv_tempsocks));
 551
 552        cache_clean_deferred(serv);
 553
 554        if (svc_serv_is_pooled(serv))
 555                svc_pool_map_put();
 556
 557        kfree(serv->sv_pools);
 558        kfree(serv);
 559}
 560EXPORT_SYMBOL_GPL(svc_destroy);
 561
 562/*
 563 * Allocate an RPC server's buffer space.
 564 * We allocate pages and place them in rq_pages.
 565 */
 566static int
 567svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
 568{
 569        unsigned int pages, arghi;
 570
 571        /* bc_xprt uses fore channel allocated buffers */
 572        if (svc_is_backchannel(rqstp))
 573                return 1;
 574
 575        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 576                                       * We assume one is at most one page
 577                                       */
 578        arghi = 0;
 579        WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
 580        if (pages > RPCSVC_MAXPAGES)
 581                pages = RPCSVC_MAXPAGES;
 582        while (pages) {
 583                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
 584                if (!p)
 585                        break;
 586                rqstp->rq_pages[arghi++] = p;
 587                pages--;
 588        }
 589        return pages == 0;
 590}
 591
 592/*
 593 * Release an RPC server buffer
 594 */
 595static void
 596svc_release_buffer(struct svc_rqst *rqstp)
 597{
 598        unsigned int i;
 599
 600        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 601                if (rqstp->rq_pages[i])
 602                        put_page(rqstp->rq_pages[i]);
 603}
 604
 605struct svc_rqst *
 606svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
 607{
 608        struct svc_rqst *rqstp;
 609
 610        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
 611        if (!rqstp)
 612                return rqstp;
 613
 614        __set_bit(RQ_BUSY, &rqstp->rq_flags);
 615        spin_lock_init(&rqstp->rq_lock);
 616        rqstp->rq_server = serv;
 617        rqstp->rq_pool = pool;
 618
 619        rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0);
 620        if (!rqstp->rq_scratch_page)
 621                goto out_enomem;
 622
 623        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 624        if (!rqstp->rq_argp)
 625                goto out_enomem;
 626
 627        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 628        if (!rqstp->rq_resp)
 629                goto out_enomem;
 630
 631        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
 632                goto out_enomem;
 633
 634        return rqstp;
 635out_enomem:
 636        svc_rqst_free(rqstp);
 637        return NULL;
 638}
 639EXPORT_SYMBOL_GPL(svc_rqst_alloc);
 640
 641struct svc_rqst *
 642svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 643{
 644        struct svc_rqst *rqstp;
 645
 646        rqstp = svc_rqst_alloc(serv, pool, node);
 647        if (!rqstp)
 648                return ERR_PTR(-ENOMEM);
 649
 650        serv->sv_nrthreads++;
 651        spin_lock_bh(&pool->sp_lock);
 652        pool->sp_nrthreads++;
 653        list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
 654        spin_unlock_bh(&pool->sp_lock);
 655        return rqstp;
 656}
 657EXPORT_SYMBOL_GPL(svc_prepare_thread);
 658
 659/*
 660 * Choose a pool in which to create a new thread, for svc_set_num_threads
 661 */
 662static inline struct svc_pool *
 663choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 664{
 665        if (pool != NULL)
 666                return pool;
 667
 668        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 669}
 670
 671/*
 672 * Choose a thread to kill, for svc_set_num_threads
 673 */
 674static inline struct task_struct *
 675choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 676{
 677        unsigned int i;
 678        struct task_struct *task = NULL;
 679
 680        if (pool != NULL) {
 681                spin_lock_bh(&pool->sp_lock);
 682        } else {
 683                /* choose a pool in round-robin fashion */
 684                for (i = 0; i < serv->sv_nrpools; i++) {
 685                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 686                        spin_lock_bh(&pool->sp_lock);
 687                        if (!list_empty(&pool->sp_all_threads))
 688                                goto found_pool;
 689                        spin_unlock_bh(&pool->sp_lock);
 690                }
 691                return NULL;
 692        }
 693
 694found_pool:
 695        if (!list_empty(&pool->sp_all_threads)) {
 696                struct svc_rqst *rqstp;
 697
 698                /*
 699                 * Remove from the pool->sp_all_threads list
 700                 * so we don't try to kill it again.
 701                 */
 702                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 703                set_bit(RQ_VICTIM, &rqstp->rq_flags);
 704                list_del_rcu(&rqstp->rq_all);
 705                task = rqstp->rq_task;
 706        }
 707        spin_unlock_bh(&pool->sp_lock);
 708
 709        return task;
 710}
 711
 712/* create new threads */
 713static int
 714svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 715{
 716        struct svc_rqst *rqstp;
 717        struct task_struct *task;
 718        struct svc_pool *chosen_pool;
 719        unsigned int state = serv->sv_nrthreads-1;
 720        int node;
 721
 722        do {
 723                nrservs--;
 724                chosen_pool = choose_pool(serv, pool, &state);
 725
 726                node = svc_pool_map_get_node(chosen_pool->sp_id);
 727                rqstp = svc_prepare_thread(serv, chosen_pool, node);
 728                if (IS_ERR(rqstp))
 729                        return PTR_ERR(rqstp);
 730
 731                __module_get(serv->sv_ops->svo_module);
 732                task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
 733                                              node, "%s", serv->sv_name);
 734                if (IS_ERR(task)) {
 735                        module_put(serv->sv_ops->svo_module);
 736                        svc_exit_thread(rqstp);
 737                        return PTR_ERR(task);
 738                }
 739
 740                rqstp->rq_task = task;
 741                if (serv->sv_nrpools > 1)
 742                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 743
 744                svc_sock_update_bufs(serv);
 745                wake_up_process(task);
 746        } while (nrservs > 0);
 747
 748        return 0;
 749}
 750
 751
 752/* destroy old threads */
 753static int
 754svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 755{
 756        struct task_struct *task;
 757        unsigned int state = serv->sv_nrthreads-1;
 758
 759        /* destroy old threads */
 760        do {
 761                task = choose_victim(serv, pool, &state);
 762                if (task == NULL)
 763                        break;
 764                send_sig(SIGINT, task, 1);
 765                nrservs++;
 766        } while (nrservs < 0);
 767
 768        return 0;
 769}
 770
 771/*
 772 * Create or destroy enough new threads to make the number
 773 * of threads the given number.  If `pool' is non-NULL, applies
 774 * only to threads in that pool, otherwise round-robins between
 775 * all pools.  Caller must ensure that mutual exclusion between this and
 776 * server startup or shutdown.
 777 *
 778 * Destroying threads relies on the service threads filling in
 779 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 780 * has been created using svc_create_pooled().
 781 *
 782 * Based on code that used to be in nfsd_svc() but tweaked
 783 * to be pool-aware.
 784 */
 785int
 786svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 787{
 788        if (pool == NULL) {
 789                /* The -1 assumes caller has done a svc_get() */
 790                nrservs -= (serv->sv_nrthreads-1);
 791        } else {
 792                spin_lock_bh(&pool->sp_lock);
 793                nrservs -= pool->sp_nrthreads;
 794                spin_unlock_bh(&pool->sp_lock);
 795        }
 796
 797        if (nrservs > 0)
 798                return svc_start_kthreads(serv, pool, nrservs);
 799        if (nrservs < 0)
 800                return svc_signal_kthreads(serv, pool, nrservs);
 801        return 0;
 802}
 803EXPORT_SYMBOL_GPL(svc_set_num_threads);
 804
 805/* destroy old threads */
 806static int
 807svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 808{
 809        struct task_struct *task;
 810        unsigned int state = serv->sv_nrthreads-1;
 811
 812        /* destroy old threads */
 813        do {
 814                task = choose_victim(serv, pool, &state);
 815                if (task == NULL)
 816                        break;
 817                kthread_stop(task);
 818                nrservs++;
 819        } while (nrservs < 0);
 820        return 0;
 821}
 822
 823int
 824svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 825{
 826        if (pool == NULL) {
 827                /* The -1 assumes caller has done a svc_get() */
 828                nrservs -= (serv->sv_nrthreads-1);
 829        } else {
 830                spin_lock_bh(&pool->sp_lock);
 831                nrservs -= pool->sp_nrthreads;
 832                spin_unlock_bh(&pool->sp_lock);
 833        }
 834
 835        if (nrservs > 0)
 836                return svc_start_kthreads(serv, pool, nrservs);
 837        if (nrservs < 0)
 838                return svc_stop_kthreads(serv, pool, nrservs);
 839        return 0;
 840}
 841EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
 842
 843/**
 844 * svc_rqst_replace_page - Replace one page in rq_pages[]
 845 * @rqstp: svc_rqst with pages to replace
 846 * @page: replacement page
 847 *
 848 * When replacing a page in rq_pages, batch the release of the
 849 * replaced pages to avoid hammering the page allocator.
 850 */
 851void svc_rqst_replace_page(struct svc_rqst *rqstp, struct page *page)
 852{
 853        if (*rqstp->rq_next_page) {
 854                if (!pagevec_space(&rqstp->rq_pvec))
 855                        __pagevec_release(&rqstp->rq_pvec);
 856                pagevec_add(&rqstp->rq_pvec, *rqstp->rq_next_page);
 857        }
 858
 859        get_page(page);
 860        *(rqstp->rq_next_page++) = page;
 861}
 862EXPORT_SYMBOL_GPL(svc_rqst_replace_page);
 863
 864/*
 865 * Called from a server thread as it's exiting. Caller must hold the "service
 866 * mutex" for the service.
 867 */
 868void
 869svc_rqst_free(struct svc_rqst *rqstp)
 870{
 871        svc_release_buffer(rqstp);
 872        if (rqstp->rq_scratch_page)
 873                put_page(rqstp->rq_scratch_page);
 874        kfree(rqstp->rq_resp);
 875        kfree(rqstp->rq_argp);
 876        kfree(rqstp->rq_auth_data);
 877        kfree_rcu(rqstp, rq_rcu_head);
 878}
 879EXPORT_SYMBOL_GPL(svc_rqst_free);
 880
 881void
 882svc_exit_thread(struct svc_rqst *rqstp)
 883{
 884        struct svc_serv *serv = rqstp->rq_server;
 885        struct svc_pool *pool = rqstp->rq_pool;
 886
 887        spin_lock_bh(&pool->sp_lock);
 888        pool->sp_nrthreads--;
 889        if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
 890                list_del_rcu(&rqstp->rq_all);
 891        spin_unlock_bh(&pool->sp_lock);
 892
 893        svc_rqst_free(rqstp);
 894
 895        /* Release the server */
 896        if (serv)
 897                svc_destroy(serv);
 898}
 899EXPORT_SYMBOL_GPL(svc_exit_thread);
 900
 901/*
 902 * Register an "inet" protocol family netid with the local
 903 * rpcbind daemon via an rpcbind v4 SET request.
 904 *
 905 * No netconfig infrastructure is available in the kernel, so
 906 * we map IP_ protocol numbers to netids by hand.
 907 *
 908 * Returns zero on success; a negative errno value is returned
 909 * if any error occurs.
 910 */
 911static int __svc_rpcb_register4(struct net *net, const u32 program,
 912                                const u32 version,
 913                                const unsigned short protocol,
 914                                const unsigned short port)
 915{
 916        const struct sockaddr_in sin = {
 917                .sin_family             = AF_INET,
 918                .sin_addr.s_addr        = htonl(INADDR_ANY),
 919                .sin_port               = htons(port),
 920        };
 921        const char *netid;
 922        int error;
 923
 924        switch (protocol) {
 925        case IPPROTO_UDP:
 926                netid = RPCBIND_NETID_UDP;
 927                break;
 928        case IPPROTO_TCP:
 929                netid = RPCBIND_NETID_TCP;
 930                break;
 931        default:
 932                return -ENOPROTOOPT;
 933        }
 934
 935        error = rpcb_v4_register(net, program, version,
 936                                        (const struct sockaddr *)&sin, netid);
 937
 938        /*
 939         * User space didn't support rpcbind v4, so retry this
 940         * registration request with the legacy rpcbind v2 protocol.
 941         */
 942        if (error == -EPROTONOSUPPORT)
 943                error = rpcb_register(net, program, version, protocol, port);
 944
 945        return error;
 946}
 947
 948#if IS_ENABLED(CONFIG_IPV6)
 949/*
 950 * Register an "inet6" protocol family netid with the local
 951 * rpcbind daemon via an rpcbind v4 SET request.
 952 *
 953 * No netconfig infrastructure is available in the kernel, so
 954 * we map IP_ protocol numbers to netids by hand.
 955 *
 956 * Returns zero on success; a negative errno value is returned
 957 * if any error occurs.
 958 */
 959static int __svc_rpcb_register6(struct net *net, const u32 program,
 960                                const u32 version,
 961                                const unsigned short protocol,
 962                                const unsigned short port)
 963{
 964        const struct sockaddr_in6 sin6 = {
 965                .sin6_family            = AF_INET6,
 966                .sin6_addr              = IN6ADDR_ANY_INIT,
 967                .sin6_port              = htons(port),
 968        };
 969        const char *netid;
 970        int error;
 971
 972        switch (protocol) {
 973        case IPPROTO_UDP:
 974                netid = RPCBIND_NETID_UDP6;
 975                break;
 976        case IPPROTO_TCP:
 977                netid = RPCBIND_NETID_TCP6;
 978                break;
 979        default:
 980                return -ENOPROTOOPT;
 981        }
 982
 983        error = rpcb_v4_register(net, program, version,
 984                                        (const struct sockaddr *)&sin6, netid);
 985
 986        /*
 987         * User space didn't support rpcbind version 4, so we won't
 988         * use a PF_INET6 listener.
 989         */
 990        if (error == -EPROTONOSUPPORT)
 991                error = -EAFNOSUPPORT;
 992
 993        return error;
 994}
 995#endif  /* IS_ENABLED(CONFIG_IPV6) */
 996
 997/*
 998 * Register a kernel RPC service via rpcbind version 4.
 999 *
1000 * Returns zero on success; a negative errno value is returned
1001 * if any error occurs.
1002 */
1003static int __svc_register(struct net *net, const char *progname,
1004                          const u32 program, const u32 version,
1005                          const int family,
1006                          const unsigned short protocol,
1007                          const unsigned short port)
1008{
1009        int error = -EAFNOSUPPORT;
1010
1011        switch (family) {
1012        case PF_INET:
1013                error = __svc_rpcb_register4(net, program, version,
1014                                                protocol, port);
1015                break;
1016#if IS_ENABLED(CONFIG_IPV6)
1017        case PF_INET6:
1018                error = __svc_rpcb_register6(net, program, version,
1019                                                protocol, port);
1020#endif
1021        }
1022
1023        trace_svc_register(progname, version, protocol, port, family, error);
1024        return error;
1025}
1026
1027int svc_rpcbind_set_version(struct net *net,
1028                            const struct svc_program *progp,
1029                            u32 version, int family,
1030                            unsigned short proto,
1031                            unsigned short port)
1032{
1033        return __svc_register(net, progp->pg_name, progp->pg_prog,
1034                                version, family, proto, port);
1035
1036}
1037EXPORT_SYMBOL_GPL(svc_rpcbind_set_version);
1038
1039int svc_generic_rpcbind_set(struct net *net,
1040                            const struct svc_program *progp,
1041                            u32 version, int family,
1042                            unsigned short proto,
1043                            unsigned short port)
1044{
1045        const struct svc_version *vers = progp->pg_vers[version];
1046        int error;
1047
1048        if (vers == NULL)
1049                return 0;
1050
1051        if (vers->vs_hidden) {
1052                trace_svc_noregister(progp->pg_name, version, proto,
1053                                     port, family, 0);
1054                return 0;
1055        }
1056
1057        /*
1058         * Don't register a UDP port if we need congestion
1059         * control.
1060         */
1061        if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
1062                return 0;
1063
1064        error = svc_rpcbind_set_version(net, progp, version,
1065                                        family, proto, port);
1066
1067        return (vers->vs_rpcb_optnl) ? 0 : error;
1068}
1069EXPORT_SYMBOL_GPL(svc_generic_rpcbind_set);
1070
1071/**
1072 * svc_register - register an RPC service with the local portmapper
1073 * @serv: svc_serv struct for the service to register
1074 * @net: net namespace for the service to register
1075 * @family: protocol family of service's listener socket
1076 * @proto: transport protocol number to advertise
1077 * @port: port to advertise
1078 *
1079 * Service is registered for any address in the passed-in protocol family
1080 */
1081int svc_register(const struct svc_serv *serv, struct net *net,
1082                 const int family, const unsigned short proto,
1083                 const unsigned short port)
1084{
1085        struct svc_program      *progp;
1086        unsigned int            i;
1087        int                     error = 0;
1088
1089        WARN_ON_ONCE(proto == 0 && port == 0);
1090        if (proto == 0 && port == 0)
1091                return -EINVAL;
1092
1093        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1094                for (i = 0; i < progp->pg_nvers; i++) {
1095
1096                        error = progp->pg_rpcbind_set(net, progp, i,
1097                                        family, proto, port);
1098                        if (error < 0) {
1099                                printk(KERN_WARNING "svc: failed to register "
1100                                        "%sv%u RPC service (errno %d).\n",
1101                                        progp->pg_name, i, -error);
1102                                break;
1103                        }
1104                }
1105        }
1106
1107        return error;
1108}
1109
1110/*
1111 * If user space is running rpcbind, it should take the v4 UNSET
1112 * and clear everything for this [program, version].  If user space
1113 * is running portmap, it will reject the v4 UNSET, but won't have
1114 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
1115 * in this case to clear all existing entries for [program, version].
1116 */
1117static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1118                             const char *progname)
1119{
1120        int error;
1121
1122        error = rpcb_v4_register(net, program, version, NULL, "");
1123
1124        /*
1125         * User space didn't support rpcbind v4, so retry this
1126         * request with the legacy rpcbind v2 protocol.
1127         */
1128        if (error == -EPROTONOSUPPORT)
1129                error = rpcb_register(net, program, version, 0, 0);
1130
1131        trace_svc_unregister(progname, version, error);
1132}
1133
1134/*
1135 * All netids, bind addresses and ports registered for [program, version]
1136 * are removed from the local rpcbind database (if the service is not
1137 * hidden) to make way for a new instance of the service.
1138 *
1139 * The result of unregistration is reported via dprintk for those who want
1140 * verification of the result, but is otherwise not important.
1141 */
1142static void svc_unregister(const struct svc_serv *serv, struct net *net)
1143{
1144        struct svc_program *progp;
1145        unsigned long flags;
1146        unsigned int i;
1147
1148        clear_thread_flag(TIF_SIGPENDING);
1149
1150        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1151                for (i = 0; i < progp->pg_nvers; i++) {
1152                        if (progp->pg_vers[i] == NULL)
1153                                continue;
1154                        if (progp->pg_vers[i]->vs_hidden)
1155                                continue;
1156                        __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1157                }
1158        }
1159
1160        spin_lock_irqsave(&current->sighand->siglock, flags);
1161        recalc_sigpending();
1162        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1163}
1164
1165/*
1166 * dprintk the given error with the address of the client that caused it.
1167 */
1168#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1169static __printf(2, 3)
1170void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1171{
1172        struct va_format vaf;
1173        va_list args;
1174        char    buf[RPC_MAX_ADDRBUFLEN];
1175
1176        va_start(args, fmt);
1177
1178        vaf.fmt = fmt;
1179        vaf.va = &args;
1180
1181        dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1182
1183        va_end(args);
1184}
1185#else
1186static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1187#endif
1188
1189__be32
1190svc_generic_init_request(struct svc_rqst *rqstp,
1191                const struct svc_program *progp,
1192                struct svc_process_info *ret)
1193{
1194        const struct svc_version *versp = NULL; /* compiler food */
1195        const struct svc_procedure *procp = NULL;
1196
1197        if (rqstp->rq_vers >= progp->pg_nvers )
1198                goto err_bad_vers;
1199        versp = progp->pg_vers[rqstp->rq_vers];
1200        if (!versp)
1201                goto err_bad_vers;
1202
1203        /*
1204         * Some protocol versions (namely NFSv4) require some form of
1205         * congestion control.  (See RFC 7530 section 3.1 paragraph 2)
1206         * In other words, UDP is not allowed. We mark those when setting
1207         * up the svc_xprt, and verify that here.
1208         *
1209         * The spec is not very clear about what error should be returned
1210         * when someone tries to access a server that is listening on UDP
1211         * for lower versions. RPC_PROG_MISMATCH seems to be the closest
1212         * fit.
1213         */
1214        if (versp->vs_need_cong_ctrl && rqstp->rq_xprt &&
1215            !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
1216                goto err_bad_vers;
1217
1218        if (rqstp->rq_proc >= versp->vs_nproc)
1219                goto err_bad_proc;
1220        rqstp->rq_procinfo = procp = &versp->vs_proc[rqstp->rq_proc];
1221        if (!procp)
1222                goto err_bad_proc;
1223
1224        /* Initialize storage for argp and resp */
1225        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1226        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1227
1228        /* Bump per-procedure stats counter */
1229        versp->vs_count[rqstp->rq_proc]++;
1230
1231        ret->dispatch = versp->vs_dispatch;
1232        return rpc_success;
1233err_bad_vers:
1234        ret->mismatch.lovers = progp->pg_lovers;
1235        ret->mismatch.hivers = progp->pg_hivers;
1236        return rpc_prog_mismatch;
1237err_bad_proc:
1238        return rpc_proc_unavail;
1239}
1240EXPORT_SYMBOL_GPL(svc_generic_init_request);
1241
1242/*
1243 * Common routine for processing the RPC request.
1244 */
1245static int
1246svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1247{
1248        struct svc_program      *progp;
1249        const struct svc_procedure *procp = NULL;
1250        struct svc_serv         *serv = rqstp->rq_server;
1251        struct svc_process_info process;
1252        __be32                  *statp;
1253        u32                     prog, vers;
1254        __be32                  rpc_stat;
1255        int                     auth_res, rc;
1256        __be32                  *reply_statp;
1257
1258        rpc_stat = rpc_success;
1259
1260        if (argv->iov_len < 6*4)
1261                goto err_short_len;
1262
1263        /* Will be turned off by GSS integrity and privacy services */
1264        set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1265        /* Will be turned off only when NFSv4 Sessions are used */
1266        set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1267        clear_bit(RQ_DROPME, &rqstp->rq_flags);
1268
1269        svc_putu32(resv, rqstp->rq_xid);
1270
1271        vers = svc_getnl(argv);
1272
1273        /* First words of reply: */
1274        svc_putnl(resv, 1);             /* REPLY */
1275
1276        if (vers != 2)          /* RPC version number */
1277                goto err_bad_rpc;
1278
1279        /* Save position in case we later decide to reject: */
1280        reply_statp = resv->iov_base + resv->iov_len;
1281
1282        svc_putnl(resv, 0);             /* ACCEPT */
1283
1284        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1285        rqstp->rq_vers = svc_getnl(argv);       /* version number */
1286        rqstp->rq_proc = svc_getnl(argv);       /* procedure number */
1287
1288        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1289                if (prog == progp->pg_prog)
1290                        break;
1291
1292        /*
1293         * Decode auth data, and add verifier to reply buffer.
1294         * We do this before anything else in order to get a decent
1295         * auth verifier.
1296         */
1297        auth_res = svc_authenticate(rqstp);
1298        /* Also give the program a chance to reject this call: */
1299        if (auth_res == SVC_OK && progp)
1300                auth_res = progp->pg_authenticate(rqstp);
1301        if (auth_res != SVC_OK)
1302                trace_svc_authenticate(rqstp, auth_res);
1303        switch (auth_res) {
1304        case SVC_OK:
1305                break;
1306        case SVC_GARBAGE:
1307                goto err_garbage;
1308        case SVC_SYSERR:
1309                rpc_stat = rpc_system_err;
1310                goto err_bad;
1311        case SVC_DENIED:
1312                goto err_bad_auth;
1313        case SVC_CLOSE:
1314                goto close;
1315        case SVC_DROP:
1316                goto dropit;
1317        case SVC_COMPLETE:
1318                goto sendit;
1319        }
1320
1321        if (progp == NULL)
1322                goto err_bad_prog;
1323
1324        rpc_stat = progp->pg_init_request(rqstp, progp, &process);
1325        switch (rpc_stat) {
1326        case rpc_success:
1327                break;
1328        case rpc_prog_unavail:
1329                goto err_bad_prog;
1330        case rpc_prog_mismatch:
1331                goto err_bad_vers;
1332        case rpc_proc_unavail:
1333                goto err_bad_proc;
1334        }
1335
1336        procp = rqstp->rq_procinfo;
1337        /* Should this check go into the dispatcher? */
1338        if (!procp || !procp->pc_func)
1339                goto err_bad_proc;
1340
1341        /* Syntactic check complete */
1342        serv->sv_stats->rpccnt++;
1343        trace_svc_process(rqstp, progp->pg_name);
1344
1345        /* Build the reply header. */
1346        statp = resv->iov_base +resv->iov_len;
1347        svc_putnl(resv, RPC_SUCCESS);
1348
1349        /* un-reserve some of the out-queue now that we have a
1350         * better idea of reply size
1351         */
1352        if (procp->pc_xdrressize)
1353                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1354
1355        /* Call the function that processes the request. */
1356        rc = process.dispatch(rqstp, statp);
1357        if (procp->pc_release)
1358                procp->pc_release(rqstp);
1359        if (!rc)
1360                goto dropit;
1361        if (rqstp->rq_auth_stat != rpc_auth_ok)
1362                goto err_bad_auth;
1363
1364        /* Check RPC status result */
1365        if (*statp != rpc_success)
1366                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1367
1368        if (procp->pc_encode == NULL)
1369                goto dropit;
1370
1371 sendit:
1372        if (svc_authorise(rqstp))
1373                goto close_xprt;
1374        return 1;               /* Caller can now send it */
1375
1376 dropit:
1377        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1378        dprintk("svc: svc_process dropit\n");
1379        return 0;
1380
1381 close:
1382        svc_authorise(rqstp);
1383close_xprt:
1384        if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1385                svc_close_xprt(rqstp->rq_xprt);
1386        dprintk("svc: svc_process close\n");
1387        return 0;
1388
1389err_short_len:
1390        svc_printk(rqstp, "short len %zd, dropping request\n",
1391                        argv->iov_len);
1392        goto close_xprt;
1393
1394err_bad_rpc:
1395        serv->sv_stats->rpcbadfmt++;
1396        svc_putnl(resv, 1);     /* REJECT */
1397        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1398        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1399        svc_putnl(resv, 2);
1400        goto sendit;
1401
1402err_bad_auth:
1403        dprintk("svc: authentication failed (%d)\n",
1404                be32_to_cpu(rqstp->rq_auth_stat));
1405        serv->sv_stats->rpcbadauth++;
1406        /* Restore write pointer to location of accept status: */
1407        xdr_ressize_check(rqstp, reply_statp);
1408        svc_putnl(resv, 1);     /* REJECT */
1409        svc_putnl(resv, 1);     /* AUTH_ERROR */
1410        svc_putu32(resv, rqstp->rq_auth_stat);  /* status */
1411        goto sendit;
1412
1413err_bad_prog:
1414        dprintk("svc: unknown program %d\n", prog);
1415        serv->sv_stats->rpcbadfmt++;
1416        svc_putnl(resv, RPC_PROG_UNAVAIL);
1417        goto sendit;
1418
1419err_bad_vers:
1420        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1421                       rqstp->rq_vers, rqstp->rq_prog, progp->pg_name);
1422
1423        serv->sv_stats->rpcbadfmt++;
1424        svc_putnl(resv, RPC_PROG_MISMATCH);
1425        svc_putnl(resv, process.mismatch.lovers);
1426        svc_putnl(resv, process.mismatch.hivers);
1427        goto sendit;
1428
1429err_bad_proc:
1430        svc_printk(rqstp, "unknown procedure (%d)\n", rqstp->rq_proc);
1431
1432        serv->sv_stats->rpcbadfmt++;
1433        svc_putnl(resv, RPC_PROC_UNAVAIL);
1434        goto sendit;
1435
1436err_garbage:
1437        svc_printk(rqstp, "failed to decode args\n");
1438
1439        rpc_stat = rpc_garbage_args;
1440err_bad:
1441        serv->sv_stats->rpcbadfmt++;
1442        svc_putnl(resv, ntohl(rpc_stat));
1443        goto sendit;
1444}
1445
1446/*
1447 * Process the RPC request.
1448 */
1449int
1450svc_process(struct svc_rqst *rqstp)
1451{
1452        struct kvec             *argv = &rqstp->rq_arg.head[0];
1453        struct kvec             *resv = &rqstp->rq_res.head[0];
1454        struct svc_serv         *serv = rqstp->rq_server;
1455        u32                     dir;
1456
1457#if IS_ENABLED(CONFIG_FAIL_SUNRPC)
1458        if (!fail_sunrpc.ignore_server_disconnect &&
1459            should_fail(&fail_sunrpc.attr, 1))
1460                svc_xprt_deferred_close(rqstp->rq_xprt);
1461#endif
1462
1463        /*
1464         * Setup response xdr_buf.
1465         * Initially it has just one page
1466         */
1467        rqstp->rq_next_page = &rqstp->rq_respages[1];
1468        resv->iov_base = page_address(rqstp->rq_respages[0]);
1469        resv->iov_len = 0;
1470        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1471        rqstp->rq_res.len = 0;
1472        rqstp->rq_res.page_base = 0;
1473        rqstp->rq_res.page_len = 0;
1474        rqstp->rq_res.buflen = PAGE_SIZE;
1475        rqstp->rq_res.tail[0].iov_base = NULL;
1476        rqstp->rq_res.tail[0].iov_len = 0;
1477
1478        dir  = svc_getnl(argv);
1479        if (dir != 0) {
1480                /* direction != CALL */
1481                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1482                serv->sv_stats->rpcbadfmt++;
1483                goto out_drop;
1484        }
1485
1486        /* Returns 1 for send, 0 for drop */
1487        if (likely(svc_process_common(rqstp, argv, resv)))
1488                return svc_send(rqstp);
1489
1490out_drop:
1491        svc_drop(rqstp);
1492        return 0;
1493}
1494EXPORT_SYMBOL_GPL(svc_process);
1495
1496#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1497/*
1498 * Process a backchannel RPC request that arrived over an existing
1499 * outbound connection
1500 */
1501int
1502bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1503               struct svc_rqst *rqstp)
1504{
1505        struct kvec     *argv = &rqstp->rq_arg.head[0];
1506        struct kvec     *resv = &rqstp->rq_res.head[0];
1507        struct rpc_task *task;
1508        int proc_error;
1509        int error;
1510
1511        dprintk("svc: %s(%p)\n", __func__, req);
1512
1513        /* Build the svc_rqst used by the common processing routine */
1514        rqstp->rq_xid = req->rq_xid;
1515        rqstp->rq_prot = req->rq_xprt->prot;
1516        rqstp->rq_server = serv;
1517        rqstp->rq_bc_net = req->rq_xprt->xprt_net;
1518
1519        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1520        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1521        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1522        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1523
1524        /* Adjust the argument buffer length */
1525        rqstp->rq_arg.len = req->rq_private_buf.len;
1526        if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1527                rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1528                rqstp->rq_arg.page_len = 0;
1529        } else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
1530                        rqstp->rq_arg.page_len)
1531                rqstp->rq_arg.page_len = rqstp->rq_arg.len -
1532                        rqstp->rq_arg.head[0].iov_len;
1533        else
1534                rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
1535                        rqstp->rq_arg.page_len;
1536
1537        /* reset result send buffer "put" position */
1538        resv->iov_len = 0;
1539
1540        /*
1541         * Skip the next two words because they've already been
1542         * processed in the transport
1543         */
1544        svc_getu32(argv);       /* XID */
1545        svc_getnl(argv);        /* CALLDIR */
1546
1547        /* Parse and execute the bc call */
1548        proc_error = svc_process_common(rqstp, argv, resv);
1549
1550        atomic_dec(&req->rq_xprt->bc_slot_count);
1551        if (!proc_error) {
1552                /* Processing error: drop the request */
1553                xprt_free_bc_request(req);
1554                error = -EINVAL;
1555                goto out;
1556        }
1557        /* Finally, send the reply synchronously */
1558        memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1559        task = rpc_run_bc_task(req);
1560        if (IS_ERR(task)) {
1561                error = PTR_ERR(task);
1562                goto out;
1563        }
1564
1565        WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
1566        error = task->tk_status;
1567        rpc_put_task(task);
1568
1569out:
1570        dprintk("svc: %s(), error=%d\n", __func__, error);
1571        return error;
1572}
1573EXPORT_SYMBOL_GPL(bc_svc_process);
1574#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1575
1576/*
1577 * Return (transport-specific) limit on the rpc payload.
1578 */
1579u32 svc_max_payload(const struct svc_rqst *rqstp)
1580{
1581        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1582
1583        if (rqstp->rq_server->sv_max_payload < max)
1584                max = rqstp->rq_server->sv_max_payload;
1585        return max;
1586}
1587EXPORT_SYMBOL_GPL(svc_max_payload);
1588
1589/**
1590 * svc_proc_name - Return RPC procedure name in string form
1591 * @rqstp: svc_rqst to operate on
1592 *
1593 * Return value:
1594 *   Pointer to a NUL-terminated string
1595 */
1596const char *svc_proc_name(const struct svc_rqst *rqstp)
1597{
1598        if (rqstp && rqstp->rq_procinfo)
1599                return rqstp->rq_procinfo->pc_name;
1600        return "unknown";
1601}
1602
1603
1604/**
1605 * svc_encode_result_payload - mark a range of bytes as a result payload
1606 * @rqstp: svc_rqst to operate on
1607 * @offset: payload's byte offset in rqstp->rq_res
1608 * @length: size of payload, in bytes
1609 *
1610 * Returns zero on success, or a negative errno if a permanent
1611 * error occurred.
1612 */
1613int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset,
1614                              unsigned int length)
1615{
1616        return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset,
1617                                                           length);
1618}
1619EXPORT_SYMBOL_GPL(svc_encode_result_payload);
1620
1621/**
1622 * svc_fill_write_vector - Construct data argument for VFS write call
1623 * @rqstp: svc_rqst to operate on
1624 * @payload: xdr_buf containing only the write data payload
1625 *
1626 * Fills in rqstp::rq_vec, and returns the number of elements.
1627 */
1628unsigned int svc_fill_write_vector(struct svc_rqst *rqstp,
1629                                   struct xdr_buf *payload)
1630{
1631        struct page **pages = payload->pages;
1632        struct kvec *first = payload->head;
1633        struct kvec *vec = rqstp->rq_vec;
1634        size_t total = payload->len;
1635        unsigned int i;
1636
1637        /* Some types of transport can present the write payload
1638         * entirely in rq_arg.pages. In this case, @first is empty.
1639         */
1640        i = 0;
1641        if (first->iov_len) {
1642                vec[i].iov_base = first->iov_base;
1643                vec[i].iov_len = min_t(size_t, total, first->iov_len);
1644                total -= vec[i].iov_len;
1645                ++i;
1646        }
1647
1648        while (total) {
1649                vec[i].iov_base = page_address(*pages);
1650                vec[i].iov_len = min_t(size_t, total, PAGE_SIZE);
1651                total -= vec[i].iov_len;
1652                ++i;
1653                ++pages;
1654        }
1655
1656        WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec));
1657        return i;
1658}
1659EXPORT_SYMBOL_GPL(svc_fill_write_vector);
1660
1661/**
1662 * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call
1663 * @rqstp: svc_rqst to operate on
1664 * @first: buffer containing first section of pathname
1665 * @p: buffer containing remaining section of pathname
1666 * @total: total length of the pathname argument
1667 *
1668 * The VFS symlink API demands a NUL-terminated pathname in mapped memory.
1669 * Returns pointer to a NUL-terminated string, or an ERR_PTR. Caller must free
1670 * the returned string.
1671 */
1672char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first,
1673                                void *p, size_t total)
1674{
1675        size_t len, remaining;
1676        char *result, *dst;
1677
1678        result = kmalloc(total + 1, GFP_KERNEL);
1679        if (!result)
1680                return ERR_PTR(-ESERVERFAULT);
1681
1682        dst = result;
1683        remaining = total;
1684
1685        len = min_t(size_t, total, first->iov_len);
1686        if (len) {
1687                memcpy(dst, first->iov_base, len);
1688                dst += len;
1689                remaining -= len;
1690        }
1691
1692        if (remaining) {
1693                len = min_t(size_t, remaining, PAGE_SIZE);
1694                memcpy(dst, p, len);
1695                dst += len;
1696        }
1697
1698        *dst = '\0';
1699
1700        /* Sanity check: Linux doesn't allow the pathname argument to
1701         * contain a NUL byte.
1702         */
1703        if (strlen(result) != total) {
1704                kfree(result);
1705                return ERR_PTR(-EINVAL);
1706        }
1707        return result;
1708}
1709EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname);
1710