linux/net/sunrpc/svc.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 * linux/net/sunrpc/svc.c
   4 *
   5 * High-level RPC service routines
   6 *
   7 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   8 *
   9 * Multiple threads pools and NUMAisation
  10 * Copyright (c) 2006 Silicon Graphics, Inc.
  11 * by Greg Banks <gnb@melbourne.sgi.com>
  12 */
  13
  14#include <linux/linkage.h>
  15#include <linux/sched/signal.h>
  16#include <linux/errno.h>
  17#include <linux/net.h>
  18#include <linux/in.h>
  19#include <linux/mm.h>
  20#include <linux/interrupt.h>
  21#include <linux/module.h>
  22#include <linux/kthread.h>
  23#include <linux/slab.h>
  24
  25#include <linux/sunrpc/types.h>
  26#include <linux/sunrpc/xdr.h>
  27#include <linux/sunrpc/stats.h>
  28#include <linux/sunrpc/svcsock.h>
  29#include <linux/sunrpc/clnt.h>
  30#include <linux/sunrpc/bc_xprt.h>
  31
  32#include <trace/events/sunrpc.h>
  33
  34#include "fail.h"
  35
  36#define RPCDBG_FACILITY RPCDBG_SVCDSP
  37
  38static void svc_unregister(const struct svc_serv *serv, struct net *net);
  39
  40#define svc_serv_is_pooled(serv)    ((serv)->sv_ops->svo_function)
  41
  42#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  43
  44/*
  45 * Structure for mapping cpus to pools and vice versa.
  46 * Setup once during sunrpc initialisation.
  47 */
  48struct svc_pool_map svc_pool_map = {
  49        .mode = SVC_POOL_DEFAULT
  50};
  51EXPORT_SYMBOL_GPL(svc_pool_map);
  52
  53static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  54
  55static int
  56param_set_pool_mode(const char *val, const struct kernel_param *kp)
  57{
  58        int *ip = (int *)kp->arg;
  59        struct svc_pool_map *m = &svc_pool_map;
  60        int err;
  61
  62        mutex_lock(&svc_pool_map_mutex);
  63
  64        err = -EBUSY;
  65        if (m->count)
  66                goto out;
  67
  68        err = 0;
  69        if (!strncmp(val, "auto", 4))
  70                *ip = SVC_POOL_AUTO;
  71        else if (!strncmp(val, "global", 6))
  72                *ip = SVC_POOL_GLOBAL;
  73        else if (!strncmp(val, "percpu", 6))
  74                *ip = SVC_POOL_PERCPU;
  75        else if (!strncmp(val, "pernode", 7))
  76                *ip = SVC_POOL_PERNODE;
  77        else
  78                err = -EINVAL;
  79
  80out:
  81        mutex_unlock(&svc_pool_map_mutex);
  82        return err;
  83}
  84
  85static int
  86param_get_pool_mode(char *buf, const struct kernel_param *kp)
  87{
  88        int *ip = (int *)kp->arg;
  89
  90        switch (*ip)
  91        {
  92        case SVC_POOL_AUTO:
  93                return strlcpy(buf, "auto\n", 20);
  94        case SVC_POOL_GLOBAL:
  95                return strlcpy(buf, "global\n", 20);
  96        case SVC_POOL_PERCPU:
  97                return strlcpy(buf, "percpu\n", 20);
  98        case SVC_POOL_PERNODE:
  99                return strlcpy(buf, "pernode\n", 20);
 100        default:
 101                return sprintf(buf, "%d\n", *ip);
 102        }
 103}
 104
 105module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 106                 &svc_pool_map.mode, 0644);
 107
 108/*
 109 * Detect best pool mapping mode heuristically,
 110 * according to the machine's topology.
 111 */
 112static int
 113svc_pool_map_choose_mode(void)
 114{
 115        unsigned int node;
 116
 117        if (nr_online_nodes > 1) {
 118                /*
 119                 * Actually have multiple NUMA nodes,
 120                 * so split pools on NUMA node boundaries
 121                 */
 122                return SVC_POOL_PERNODE;
 123        }
 124
 125        node = first_online_node;
 126        if (nr_cpus_node(node) > 2) {
 127                /*
 128                 * Non-trivial SMP, or CONFIG_NUMA on
 129                 * non-NUMA hardware, e.g. with a generic
 130                 * x86_64 kernel on Xeons.  In this case we
 131                 * want to divide the pools on cpu boundaries.
 132                 */
 133                return SVC_POOL_PERCPU;
 134        }
 135
 136        /* default: one global pool */
 137        return SVC_POOL_GLOBAL;
 138}
 139
 140/*
 141 * Allocate the to_pool[] and pool_to[] arrays.
 142 * Returns 0 on success or an errno.
 143 */
 144static int
 145svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 146{
 147        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 148        if (!m->to_pool)
 149                goto fail;
 150        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 151        if (!m->pool_to)
 152                goto fail_free;
 153
 154        return 0;
 155
 156fail_free:
 157        kfree(m->to_pool);
 158        m->to_pool = NULL;
 159fail:
 160        return -ENOMEM;
 161}
 162
 163/*
 164 * Initialise the pool map for SVC_POOL_PERCPU mode.
 165 * Returns number of pools or <0 on error.
 166 */
 167static int
 168svc_pool_map_init_percpu(struct svc_pool_map *m)
 169{
 170        unsigned int maxpools = nr_cpu_ids;
 171        unsigned int pidx = 0;
 172        unsigned int cpu;
 173        int err;
 174
 175        err = svc_pool_map_alloc_arrays(m, maxpools);
 176        if (err)
 177                return err;
 178
 179        for_each_online_cpu(cpu) {
 180                BUG_ON(pidx >= maxpools);
 181                m->to_pool[cpu] = pidx;
 182                m->pool_to[pidx] = cpu;
 183                pidx++;
 184        }
 185        /* cpus brought online later all get mapped to pool0, sorry */
 186
 187        return pidx;
 188};
 189
 190
 191/*
 192 * Initialise the pool map for SVC_POOL_PERNODE mode.
 193 * Returns number of pools or <0 on error.
 194 */
 195static int
 196svc_pool_map_init_pernode(struct svc_pool_map *m)
 197{
 198        unsigned int maxpools = nr_node_ids;
 199        unsigned int pidx = 0;
 200        unsigned int node;
 201        int err;
 202
 203        err = svc_pool_map_alloc_arrays(m, maxpools);
 204        if (err)
 205                return err;
 206
 207        for_each_node_with_cpus(node) {
 208                /* some architectures (e.g. SN2) have cpuless nodes */
 209                BUG_ON(pidx > maxpools);
 210                m->to_pool[node] = pidx;
 211                m->pool_to[pidx] = node;
 212                pidx++;
 213        }
 214        /* nodes brought online later all get mapped to pool0, sorry */
 215
 216        return pidx;
 217}
 218
 219
 220/*
 221 * Add a reference to the global map of cpus to pools (and
 222 * vice versa).  Initialise the map if we're the first user.
 223 * Returns the number of pools.
 224 */
 225unsigned int
 226svc_pool_map_get(void)
 227{
 228        struct svc_pool_map *m = &svc_pool_map;
 229        int npools = -1;
 230
 231        mutex_lock(&svc_pool_map_mutex);
 232
 233        if (m->count++) {
 234                mutex_unlock(&svc_pool_map_mutex);
 235                return m->npools;
 236        }
 237
 238        if (m->mode == SVC_POOL_AUTO)
 239                m->mode = svc_pool_map_choose_mode();
 240
 241        switch (m->mode) {
 242        case SVC_POOL_PERCPU:
 243                npools = svc_pool_map_init_percpu(m);
 244                break;
 245        case SVC_POOL_PERNODE:
 246                npools = svc_pool_map_init_pernode(m);
 247                break;
 248        }
 249
 250        if (npools < 0) {
 251                /* default, or memory allocation failure */
 252                npools = 1;
 253                m->mode = SVC_POOL_GLOBAL;
 254        }
 255        m->npools = npools;
 256
 257        mutex_unlock(&svc_pool_map_mutex);
 258        return m->npools;
 259}
 260EXPORT_SYMBOL_GPL(svc_pool_map_get);
 261
 262/*
 263 * Drop a reference to the global map of cpus to pools.
 264 * When the last reference is dropped, the map data is
 265 * freed; this allows the sysadmin to change the pool
 266 * mode using the pool_mode module option without
 267 * rebooting or re-loading sunrpc.ko.
 268 */
 269void
 270svc_pool_map_put(void)
 271{
 272        struct svc_pool_map *m = &svc_pool_map;
 273
 274        mutex_lock(&svc_pool_map_mutex);
 275
 276        if (!--m->count) {
 277                kfree(m->to_pool);
 278                m->to_pool = NULL;
 279                kfree(m->pool_to);
 280                m->pool_to = NULL;
 281                m->npools = 0;
 282        }
 283
 284        mutex_unlock(&svc_pool_map_mutex);
 285}
 286EXPORT_SYMBOL_GPL(svc_pool_map_put);
 287
 288static int svc_pool_map_get_node(unsigned int pidx)
 289{
 290        const struct svc_pool_map *m = &svc_pool_map;
 291
 292        if (m->count) {
 293                if (m->mode == SVC_POOL_PERCPU)
 294                        return cpu_to_node(m->pool_to[pidx]);
 295                if (m->mode == SVC_POOL_PERNODE)
 296                        return m->pool_to[pidx];
 297        }
 298        return NUMA_NO_NODE;
 299}
 300/*
 301 * Set the given thread's cpus_allowed mask so that it
 302 * will only run on cpus in the given pool.
 303 */
 304static inline void
 305svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 306{
 307        struct svc_pool_map *m = &svc_pool_map;
 308        unsigned int node = m->pool_to[pidx];
 309
 310        /*
 311         * The caller checks for sv_nrpools > 1, which
 312         * implies that we've been initialized.
 313         */
 314        WARN_ON_ONCE(m->count == 0);
 315        if (m->count == 0)
 316                return;
 317
 318        switch (m->mode) {
 319        case SVC_POOL_PERCPU:
 320        {
 321                set_cpus_allowed_ptr(task, cpumask_of(node));
 322                break;
 323        }
 324        case SVC_POOL_PERNODE:
 325        {
 326                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 327                break;
 328        }
 329        }
 330}
 331
 332/*
 333 * Use the mapping mode to choose a pool for a given CPU.
 334 * Used when enqueueing an incoming RPC.  Always returns
 335 * a non-NULL pool pointer.
 336 */
 337struct svc_pool *
 338svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 339{
 340        struct svc_pool_map *m = &svc_pool_map;
 341        unsigned int pidx = 0;
 342
 343        /*
 344         * An uninitialised map happens in a pure client when
 345         * lockd is brought up, so silently treat it the
 346         * same as SVC_POOL_GLOBAL.
 347         */
 348        if (svc_serv_is_pooled(serv)) {
 349                switch (m->mode) {
 350                case SVC_POOL_PERCPU:
 351                        pidx = m->to_pool[cpu];
 352                        break;
 353                case SVC_POOL_PERNODE:
 354                        pidx = m->to_pool[cpu_to_node(cpu)];
 355                        break;
 356                }
 357        }
 358        return &serv->sv_pools[pidx % serv->sv_nrpools];
 359}
 360
 361int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
 362{
 363        int err;
 364
 365        err = rpcb_create_local(net);
 366        if (err)
 367                return err;
 368
 369        /* Remove any stale portmap registrations */
 370        svc_unregister(serv, net);
 371        return 0;
 372}
 373EXPORT_SYMBOL_GPL(svc_rpcb_setup);
 374
 375void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
 376{
 377        svc_unregister(serv, net);
 378        rpcb_put_local(net);
 379}
 380EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
 381
 382static int svc_uses_rpcbind(struct svc_serv *serv)
 383{
 384        struct svc_program      *progp;
 385        unsigned int            i;
 386
 387        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 388                for (i = 0; i < progp->pg_nvers; i++) {
 389                        if (progp->pg_vers[i] == NULL)
 390                                continue;
 391                        if (!progp->pg_vers[i]->vs_hidden)
 392                                return 1;
 393                }
 394        }
 395
 396        return 0;
 397}
 398
 399int svc_bind(struct svc_serv *serv, struct net *net)
 400{
 401        if (!svc_uses_rpcbind(serv))
 402                return 0;
 403        return svc_rpcb_setup(serv, net);
 404}
 405EXPORT_SYMBOL_GPL(svc_bind);
 406
 407#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 408static void
 409__svc_init_bc(struct svc_serv *serv)
 410{
 411        INIT_LIST_HEAD(&serv->sv_cb_list);
 412        spin_lock_init(&serv->sv_cb_lock);
 413        init_waitqueue_head(&serv->sv_cb_waitq);
 414}
 415#else
 416static void
 417__svc_init_bc(struct svc_serv *serv)
 418{
 419}
 420#endif
 421
 422/*
 423 * Create an RPC service
 424 */
 425static struct svc_serv *
 426__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 427             const struct svc_serv_ops *ops)
 428{
 429        struct svc_serv *serv;
 430        unsigned int vers;
 431        unsigned int xdrsize;
 432        unsigned int i;
 433
 434        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 435                return NULL;
 436        serv->sv_name      = prog->pg_name;
 437        serv->sv_program   = prog;
 438        serv->sv_nrthreads = 1;
 439        serv->sv_stats     = prog->pg_stats;
 440        if (bufsize > RPCSVC_MAXPAYLOAD)
 441                bufsize = RPCSVC_MAXPAYLOAD;
 442        serv->sv_max_payload = bufsize? bufsize : 4096;
 443        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 444        serv->sv_ops = ops;
 445        xdrsize = 0;
 446        while (prog) {
 447                prog->pg_lovers = prog->pg_nvers-1;
 448                for (vers=0; vers<prog->pg_nvers ; vers++)
 449                        if (prog->pg_vers[vers]) {
 450                                prog->pg_hivers = vers;
 451                                if (prog->pg_lovers > vers)
 452                                        prog->pg_lovers = vers;
 453                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 454                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 455                        }
 456                prog = prog->pg_next;
 457        }
 458        serv->sv_xdrsize   = xdrsize;
 459        INIT_LIST_HEAD(&serv->sv_tempsocks);
 460        INIT_LIST_HEAD(&serv->sv_permsocks);
 461        timer_setup(&serv->sv_temptimer, NULL, 0);
 462        spin_lock_init(&serv->sv_lock);
 463
 464        __svc_init_bc(serv);
 465
 466        serv->sv_nrpools = npools;
 467        serv->sv_pools =
 468                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 469                        GFP_KERNEL);
 470        if (!serv->sv_pools) {
 471                kfree(serv);
 472                return NULL;
 473        }
 474
 475        for (i = 0; i < serv->sv_nrpools; i++) {
 476                struct svc_pool *pool = &serv->sv_pools[i];
 477
 478                dprintk("svc: initialising pool %u for %s\n",
 479                                i, serv->sv_name);
 480
 481                pool->sp_id = i;
 482                INIT_LIST_HEAD(&pool->sp_sockets);
 483                INIT_LIST_HEAD(&pool->sp_all_threads);
 484                spin_lock_init(&pool->sp_lock);
 485        }
 486
 487        return serv;
 488}
 489
 490struct svc_serv *
 491svc_create(struct svc_program *prog, unsigned int bufsize,
 492           const struct svc_serv_ops *ops)
 493{
 494        return __svc_create(prog, bufsize, /*npools*/1, ops);
 495}
 496EXPORT_SYMBOL_GPL(svc_create);
 497
 498struct svc_serv *
 499svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 500                  const struct svc_serv_ops *ops)
 501{
 502        struct svc_serv *serv;
 503        unsigned int npools = svc_pool_map_get();
 504
 505        serv = __svc_create(prog, bufsize, npools, ops);
 506        if (!serv)
 507                goto out_err;
 508        return serv;
 509out_err:
 510        svc_pool_map_put();
 511        return NULL;
 512}
 513EXPORT_SYMBOL_GPL(svc_create_pooled);
 514
 515void svc_shutdown_net(struct svc_serv *serv, struct net *net)
 516{
 517        svc_close_net(serv, net);
 518
 519        if (serv->sv_ops->svo_shutdown)
 520                serv->sv_ops->svo_shutdown(serv, net);
 521}
 522EXPORT_SYMBOL_GPL(svc_shutdown_net);
 523
 524/*
 525 * Destroy an RPC service. Should be called with appropriate locking to
 526 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 527 */
 528void
 529svc_destroy(struct svc_serv *serv)
 530{
 531        dprintk("svc: svc_destroy(%s, %d)\n",
 532                                serv->sv_program->pg_name,
 533                                serv->sv_nrthreads);
 534
 535        if (serv->sv_nrthreads) {
 536                if (--(serv->sv_nrthreads) != 0) {
 537                        svc_sock_update_bufs(serv);
 538                        return;
 539                }
 540        } else
 541                printk("svc_destroy: no threads for serv=%p!\n", serv);
 542
 543        del_timer_sync(&serv->sv_temptimer);
 544
 545        /*
 546         * The last user is gone and thus all sockets have to be destroyed to
 547         * the point. Check this.
 548         */
 549        BUG_ON(!list_empty(&serv->sv_permsocks));
 550        BUG_ON(!list_empty(&serv->sv_tempsocks));
 551
 552        cache_clean_deferred(serv);
 553
 554        if (svc_serv_is_pooled(serv))
 555                svc_pool_map_put();
 556
 557        kfree(serv->sv_pools);
 558        kfree(serv);
 559}
 560EXPORT_SYMBOL_GPL(svc_destroy);
 561
 562/*
 563 * Allocate an RPC server's buffer space.
 564 * We allocate pages and place them in rq_pages.
 565 */
 566static int
 567svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
 568{
 569        unsigned int pages, arghi;
 570
 571        /* bc_xprt uses fore channel allocated buffers */
 572        if (svc_is_backchannel(rqstp))
 573                return 1;
 574
 575        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 576                                       * We assume one is at most one page
 577                                       */
 578        arghi = 0;
 579        WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
 580        if (pages > RPCSVC_MAXPAGES)
 581                pages = RPCSVC_MAXPAGES;
 582        while (pages) {
 583                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
 584                if (!p)
 585                        break;
 586                rqstp->rq_pages[arghi++] = p;
 587                pages--;
 588        }
 589        return pages == 0;
 590}
 591
 592/*
 593 * Release an RPC server buffer
 594 */
 595static void
 596svc_release_buffer(struct svc_rqst *rqstp)
 597{
 598        unsigned int i;
 599
 600        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 601                if (rqstp->rq_pages[i])
 602                        put_page(rqstp->rq_pages[i]);
 603}
 604
 605struct svc_rqst *
 606svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
 607{
 608        struct svc_rqst *rqstp;
 609
 610        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
 611        if (!rqstp)
 612                return rqstp;
 613
 614        __set_bit(RQ_BUSY, &rqstp->rq_flags);
 615        spin_lock_init(&rqstp->rq_lock);
 616        rqstp->rq_server = serv;
 617        rqstp->rq_pool = pool;
 618
 619        rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0);
 620        if (!rqstp->rq_scratch_page)
 621                goto out_enomem;
 622
 623        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 624        if (!rqstp->rq_argp)
 625                goto out_enomem;
 626
 627        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 628        if (!rqstp->rq_resp)
 629                goto out_enomem;
 630
 631        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
 632                goto out_enomem;
 633
 634        return rqstp;
 635out_enomem:
 636        svc_rqst_free(rqstp);
 637        return NULL;
 638}
 639EXPORT_SYMBOL_GPL(svc_rqst_alloc);
 640
 641struct svc_rqst *
 642svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 643{
 644        struct svc_rqst *rqstp;
 645
 646        rqstp = svc_rqst_alloc(serv, pool, node);
 647        if (!rqstp)
 648                return ERR_PTR(-ENOMEM);
 649
 650        serv->sv_nrthreads++;
 651        spin_lock_bh(&pool->sp_lock);
 652        pool->sp_nrthreads++;
 653        list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
 654        spin_unlock_bh(&pool->sp_lock);
 655        return rqstp;
 656}
 657EXPORT_SYMBOL_GPL(svc_prepare_thread);
 658
 659/*
 660 * Choose a pool in which to create a new thread, for svc_set_num_threads
 661 */
 662static inline struct svc_pool *
 663choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 664{
 665        if (pool != NULL)
 666                return pool;
 667
 668        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 669}
 670
 671/*
 672 * Choose a thread to kill, for svc_set_num_threads
 673 */
 674static inline struct task_struct *
 675choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 676{
 677        unsigned int i;
 678        struct task_struct *task = NULL;
 679
 680        if (pool != NULL) {
 681                spin_lock_bh(&pool->sp_lock);
 682        } else {
 683                /* choose a pool in round-robin fashion */
 684                for (i = 0; i < serv->sv_nrpools; i++) {
 685                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 686                        spin_lock_bh(&pool->sp_lock);
 687                        if (!list_empty(&pool->sp_all_threads))
 688                                goto found_pool;
 689                        spin_unlock_bh(&pool->sp_lock);
 690                }
 691                return NULL;
 692        }
 693
 694found_pool:
 695        if (!list_empty(&pool->sp_all_threads)) {
 696                struct svc_rqst *rqstp;
 697
 698                /*
 699                 * Remove from the pool->sp_all_threads list
 700                 * so we don't try to kill it again.
 701                 */
 702                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 703                set_bit(RQ_VICTIM, &rqstp->rq_flags);
 704                list_del_rcu(&rqstp->rq_all);
 705                task = rqstp->rq_task;
 706        }
 707        spin_unlock_bh(&pool->sp_lock);
 708
 709        return task;
 710}
 711
 712/* create new threads */
 713static int
 714svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 715{
 716        struct svc_rqst *rqstp;
 717        struct task_struct *task;
 718        struct svc_pool *chosen_pool;
 719        unsigned int state = serv->sv_nrthreads-1;
 720        int node;
 721
 722        do {
 723                nrservs--;
 724                chosen_pool = choose_pool(serv, pool, &state);
 725
 726                node = svc_pool_map_get_node(chosen_pool->sp_id);
 727                rqstp = svc_prepare_thread(serv, chosen_pool, node);
 728                if (IS_ERR(rqstp))
 729                        return PTR_ERR(rqstp);
 730
 731                __module_get(serv->sv_ops->svo_module);
 732                task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
 733                                              node, "%s", serv->sv_name);
 734                if (IS_ERR(task)) {
 735                        module_put(serv->sv_ops->svo_module);
 736                        svc_exit_thread(rqstp);
 737                        return PTR_ERR(task);
 738                }
 739
 740                rqstp->rq_task = task;
 741                if (serv->sv_nrpools > 1)
 742                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 743
 744                svc_sock_update_bufs(serv);
 745                wake_up_process(task);
 746        } while (nrservs > 0);
 747
 748        return 0;
 749}
 750
 751
 752/* destroy old threads */
 753static int
 754svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 755{
 756        struct task_struct *task;
 757        unsigned int state = serv->sv_nrthreads-1;
 758
 759        /* destroy old threads */
 760        do {
 761                task = choose_victim(serv, pool, &state);
 762                if (task == NULL)
 763                        break;
 764                send_sig(SIGINT, task, 1);
 765                nrservs++;
 766        } while (nrservs < 0);
 767
 768        return 0;
 769}
 770
 771/*
 772 * Create or destroy enough new threads to make the number
 773 * of threads the given number.  If `pool' is non-NULL, applies
 774 * only to threads in that pool, otherwise round-robins between
 775 * all pools.  Caller must ensure that mutual exclusion between this and
 776 * server startup or shutdown.
 777 *
 778 * Destroying threads relies on the service threads filling in
 779 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 780 * has been created using svc_create_pooled().
 781 *
 782 * Based on code that used to be in nfsd_svc() but tweaked
 783 * to be pool-aware.
 784 */
 785int
 786svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 787{
 788        if (pool == NULL) {
 789                /* The -1 assumes caller has done a svc_get() */
 790                nrservs -= (serv->sv_nrthreads-1);
 791        } else {
 792                spin_lock_bh(&pool->sp_lock);
 793                nrservs -= pool->sp_nrthreads;
 794                spin_unlock_bh(&pool->sp_lock);
 795        }
 796
 797        if (nrservs > 0)
 798                return svc_start_kthreads(serv, pool, nrservs);
 799        if (nrservs < 0)
 800                return svc_signal_kthreads(serv, pool, nrservs);
 801        return 0;
 802}
 803EXPORT_SYMBOL_GPL(svc_set_num_threads);
 804
 805/* destroy old threads */
 806static int
 807svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 808{
 809        struct task_struct *task;
 810        unsigned int state = serv->sv_nrthreads-1;
 811
 812        /* destroy old threads */
 813        do {
 814                task = choose_victim(serv, pool, &state);
 815                if (task == NULL)
 816                        break;
 817                kthread_stop(task);
 818                nrservs++;
 819        } while (nrservs < 0);
 820        return 0;
 821}
 822
 823int
 824svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 825{
 826        if (pool == NULL) {
 827                /* The -1 assumes caller has done a svc_get() */
 828                nrservs -= (serv->sv_nrthreads-1);
 829        } else {
 830                spin_lock_bh(&pool->sp_lock);
 831                nrservs -= pool->sp_nrthreads;
 832                spin_unlock_bh(&pool->sp_lock);
 833        }
 834
 835        if (nrservs > 0)
 836                return svc_start_kthreads(serv, pool, nrservs);
 837        if (nrservs < 0)
 838                return svc_stop_kthreads(serv, pool, nrservs);
 839        return 0;
 840}
 841EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
 842
 843/**
 844 * svc_rqst_replace_page - Replace one page in rq_pages[]
 845 * @rqstp: svc_rqst with pages to replace
 846 * @page: replacement page
 847 *
 848 * When replacing a page in rq_pages, batch the release of the
 849 * replaced pages to avoid hammering the page allocator.
 850 */
 851void svc_rqst_replace_page(struct svc_rqst *rqstp, struct page *page)
 852{
 853        if (*rqstp->rq_next_page) {
 854                if (!pagevec_space(&rqstp->rq_pvec))
 855                        __pagevec_release(&rqstp->rq_pvec);
 856                pagevec_add(&rqstp->rq_pvec, *rqstp->rq_next_page);
 857        }
 858
 859        get_page(page);
 860        *(rqstp->rq_next_page++) = page;
 861}
 862EXPORT_SYMBOL_GPL(svc_rqst_replace_page);
 863
 864/*
 865 * Called from a server thread as it's exiting. Caller must hold the "service
 866 * mutex" for the service.
 867 */
 868void
 869svc_rqst_free(struct svc_rqst *rqstp)
 870{
 871        svc_release_buffer(rqstp);
 872        if (rqstp->rq_scratch_page)
 873                put_page(rqstp->rq_scratch_page);
 874        kfree(rqstp->rq_resp);
 875        kfree(rqstp->rq_argp);
 876        kfree(rqstp->rq_auth_data);
 877        kfree_rcu(rqstp, rq_rcu_head);
 878}
 879EXPORT_SYMBOL_GPL(svc_rqst_free);
 880
 881void
 882svc_exit_thread(struct svc_rqst *rqstp)
 883{
 884        struct svc_serv *serv = rqstp->rq_server;
 885        struct svc_pool *pool = rqstp->rq_pool;
 886
 887        spin_lock_bh(&pool->sp_lock);
 888        pool->sp_nrthreads--;
 889        if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
 890                list_del_rcu(&rqstp->rq_all);
 891        spin_unlock_bh(&pool->sp_lock);
 892
 893        svc_rqst_free(rqstp);
 894
 895        /* Release the server */
 896        if (serv)
 897                svc_destroy(serv);
 898}
 899EXPORT_SYMBOL_GPL(svc_exit_thread);
 900
 901/*
 902 * Register an "inet" protocol family netid with the local
 903 * rpcbind daemon via an rpcbind v4 SET request.
 904 *
 905 * No netconfig infrastructure is available in the kernel, so
 906 * we map IP_ protocol numbers to netids by hand.
 907 *
 908 * Returns zero on success; a negative errno value is returned
 909 * if any error occurs.
 910 */
 911static int __svc_rpcb_register4(struct net *net, const u32 program,
 912                                const u32 version,
 913                                const unsigned short protocol,
 914                                const unsigned short port)
 915{
 916        const struct sockaddr_in sin = {
 917                .sin_family             = AF_INET,
 918                .sin_addr.s_addr        = htonl(INADDR_ANY),
 919                .sin_port               = htons(port),
 920        };
 921        const char *netid;
 922        int error;
 923
 924        switch (protocol) {
 925        case IPPROTO_UDP:
 926                netid = RPCBIND_NETID_UDP;
 927                break;
 928        case IPPROTO_TCP:
 929                netid = RPCBIND_NETID_TCP;
 930                break;
 931        default:
 932                return -ENOPROTOOPT;
 933        }
 934
 935        error = rpcb_v4_register(net, program, version,
 936                                        (const struct sockaddr *)&sin, netid);
 937
 938        /*
 939         * User space didn't support rpcbind v4, so retry this
 940         * registration request with the legacy rpcbind v2 protocol.
 941         */
 942        if (error == -EPROTONOSUPPORT)
 943                error = rpcb_register(net, program, version, protocol, port);
 944
 945        return error;
 946}
 947
 948#if IS_ENABLED(CONFIG_IPV6)
 949/*
 950 * Register an "inet6" protocol family netid with the local
 951 * rpcbind daemon via an rpcbind v4 SET request.
 952 *
 953 * No netconfig infrastructure is available in the kernel, so
 954 * we map IP_ protocol numbers to netids by hand.
 955 *
 956 * Returns zero on success; a negative errno value is returned
 957 * if any error occurs.
 958 */
 959static int __svc_rpcb_register6(struct net *net, const u32 program,
 960                                const u32 version,
 961                                const unsigned short protocol,
 962                                const unsigned short port)
 963{
 964        const struct sockaddr_in6 sin6 = {
 965                .sin6_family            = AF_INET6,
 966                .sin6_addr              = IN6ADDR_ANY_INIT,
 967                .sin6_port              = htons(port),
 968        };
 969        const char *netid;
 970        int error;
 971
 972        switch (protocol) {
 973        case IPPROTO_UDP:
 974                netid = RPCBIND_NETID_UDP6;
 975                break;
 976        case IPPROTO_TCP:
 977                netid = RPCBIND_NETID_TCP6;
 978                break;
 979        default:
 980                return -ENOPROTOOPT;
 981        }
 982
 983        error = rpcb_v4_register(net, program, version,
 984                                        (const struct sockaddr *)&sin6, netid);
 985
 986        /*
 987         * User space didn't support rpcbind version 4, so we won't
 988         * use a PF_INET6 listener.
 989         */
 990        if (error == -EPROTONOSUPPORT)
 991                error = -EAFNOSUPPORT;
 992
 993        return error;
 994}
 995#endif  /* IS_ENABLED(CONFIG_IPV6) */
 996
 997/*
 998 * Register a kernel RPC service via rpcbind version 4.
 999 *
1000 * Returns zero on success; a negative errno value is returned
1001 * if any error occurs.
1002 */
1003static int __svc_register(struct net *net, const char *progname,
1004                          const u32 program, const u32 version,
1005                          const int family,
1006                          const unsigned short protocol,
1007                          const unsigned short port)
1008{
1009        int error = -EAFNOSUPPORT;
1010
1011        switch (family) {
1012        case PF_INET:
1013                error = __svc_rpcb_register4(net, program, version,
1014                                                protocol, port);
1015                break;
1016#if IS_ENABLED(CONFIG_IPV6)
1017        case PF_INET6:
1018                error = __svc_rpcb_register6(net, program, version,
1019                                                protocol, port);
1020#endif
1021        }
1022
1023        trace_svc_register(progname, version, protocol, port, family, error);
1024        return error;
1025}
1026
1027int svc_rpcbind_set_version(struct net *net,
1028                            const struct svc_program *progp,
1029                            u32 version, int family,
1030                            unsigned short proto,
1031                            unsigned short port)
1032{
1033        return __svc_register(net, progp->pg_name, progp->pg_prog,
1034                                version, family, proto, port);
1035
1036}
1037EXPORT_SYMBOL_GPL(svc_rpcbind_set_version);
1038
1039int svc_generic_rpcbind_set(struct net *net,
1040                            const struct svc_program *progp,
1041                            u32 version, int family,
1042                            unsigned short proto,
1043                            unsigned short port)
1044{
1045        const struct svc_version *vers = progp->pg_vers[version];
1046        int error;
1047
1048        if (vers == NULL)
1049                return 0;
1050
1051        if (vers->vs_hidden) {
1052                trace_svc_noregister(progp->pg_name, version, proto,
1053                                     port, family, 0);
1054                return 0;
1055        }
1056
1057        /*
1058         * Don't register a UDP port if we need congestion
1059         * control.
1060         */
1061        if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
1062                return 0;
1063
1064        error = svc_rpcbind_set_version(net, progp, version,
1065                                        family, proto, port);
1066
1067        return (vers->vs_rpcb_optnl) ? 0 : error;
1068}
1069EXPORT_SYMBOL_GPL(svc_generic_rpcbind_set);
1070
1071/**
1072 * svc_register - register an RPC service with the local portmapper
1073 * @serv: svc_serv struct for the service to register
1074 * @net: net namespace for the service to register
1075 * @family: protocol family of service's listener socket
1076 * @proto: transport protocol number to advertise
1077 * @port: port to advertise
1078 *
1079 * Service is registered for any address in the passed-in protocol family
1080 */
1081int svc_register(const struct svc_serv *serv, struct net *net,
1082                 const int family, const unsigned short proto,
1083                 const unsigned short port)
1084{
1085        struct svc_program      *progp;
1086        unsigned int            i;
1087        int                     error = 0;
1088
1089        WARN_ON_ONCE(proto == 0 && port == 0);
1090        if (proto == 0 && port == 0)
1091                return -EINVAL;
1092
1093        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1094                for (i = 0; i < progp->pg_nvers; i++) {
1095
1096                        error = progp->pg_rpcbind_set(net, progp, i,
1097                                        family, proto, port);
1098                        if (error < 0) {
1099                                printk(KERN_WARNING "svc: failed to register "
1100                                        "%sv%u RPC service (errno %d).\n",
1101                                        progp->pg_name, i, -error);
1102                                break;
1103                        }
1104                }
1105        }
1106
1107        return error;
1108}
1109
1110/*
1111 * If user space is running rpcbind, it should take the v4 UNSET
1112 * and clear everything for this [program, version].  If user space
1113 * is running portmap, it will reject the v4 UNSET, but won't have
1114 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
1115 * in this case to clear all existing entries for [program, version].
1116 */
1117static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1118                             const char *progname)
1119{
1120        int error;
1121
1122        error = rpcb_v4_register(net, program, version, NULL, "");
1123
1124        /*
1125         * User space didn't support rpcbind v4, so retry this
1126         * request with the legacy rpcbind v2 protocol.
1127         */
1128        if (error == -EPROTONOSUPPORT)
1129                error = rpcb_register(net, program, version, 0, 0);
1130
1131        trace_svc_unregister(progname, version, error);
1132}
1133
1134/*
1135 * All netids, bind addresses and ports registered for [program, version]
1136 * are removed from the local rpcbind database (if the service is not
1137 * hidden) to make way for a new instance of the service.
1138 *
1139 * The result of unregistration is reported via dprintk for those who want
1140 * verification of the result, but is otherwise not important.
1141 */
1142static void svc_unregister(const struct svc_serv *serv, struct net *net)
1143{
1144        struct svc_program *progp;
1145        unsigned long flags;
1146        unsigned int i;
1147
1148        clear_thread_flag(TIF_SIGPENDING);
1149
1150        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1151                for (i = 0; i < progp->pg_nvers; i++) {
1152                        if (progp->pg_vers[i] == NULL)
1153                                continue;
1154                        if (progp->pg_vers[i]->vs_hidden)
1155                                continue;
1156                        __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1157                }
1158        }
1159
1160        spin_lock_irqsave(&current->sighand->siglock, flags);
1161        recalc_sigpending();
1162        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1163}
1164
1165/*
1166 * dprintk the given error with the address of the client that caused it.
1167 */
1168#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1169static __printf(2, 3)
1170void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1171{
1172        struct va_format vaf;
1173        va_list args;
1174        char    buf[RPC_MAX_ADDRBUFLEN];
1175
1176        va_start(args, fmt);
1177
1178        vaf.fmt = fmt;
1179        vaf.va = &args;
1180
1181        dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1182
1183        va_end(args);
1184}
1185#else
1186static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1187#endif
1188
1189static int
1190svc_generic_dispatch(struct svc_rqst *rqstp, __be32 *statp)
1191{
1192        struct kvec *argv = &rqstp->rq_arg.head[0];
1193        struct kvec *resv = &rqstp->rq_res.head[0];
1194        const struct svc_procedure *procp = rqstp->rq_procinfo;
1195
1196        /*
1197         * Decode arguments
1198         * XXX: why do we ignore the return value?
1199         */
1200        if (procp->pc_decode &&
1201            !procp->pc_decode(rqstp, argv->iov_base)) {
1202                *statp = rpc_garbage_args;
1203                return 1;
1204        }
1205
1206        *statp = procp->pc_func(rqstp);
1207
1208        if (*statp == rpc_drop_reply ||
1209            test_bit(RQ_DROPME, &rqstp->rq_flags))
1210                return 0;
1211
1212        if (rqstp->rq_auth_stat != rpc_auth_ok)
1213                return 1;
1214
1215        if (*statp != rpc_success)
1216                return 1;
1217
1218        /* Encode reply */
1219        if (procp->pc_encode &&
1220            !procp->pc_encode(rqstp, resv->iov_base + resv->iov_len)) {
1221                dprintk("svc: failed to encode reply\n");
1222                /* serv->sv_stats->rpcsystemerr++; */
1223                *statp = rpc_system_err;
1224        }
1225        return 1;
1226}
1227
1228__be32
1229svc_generic_init_request(struct svc_rqst *rqstp,
1230                const struct svc_program *progp,
1231                struct svc_process_info *ret)
1232{
1233        const struct svc_version *versp = NULL; /* compiler food */
1234        const struct svc_procedure *procp = NULL;
1235
1236        if (rqstp->rq_vers >= progp->pg_nvers )
1237                goto err_bad_vers;
1238        versp = progp->pg_vers[rqstp->rq_vers];
1239        if (!versp)
1240                goto err_bad_vers;
1241
1242        /*
1243         * Some protocol versions (namely NFSv4) require some form of
1244         * congestion control.  (See RFC 7530 section 3.1 paragraph 2)
1245         * In other words, UDP is not allowed. We mark those when setting
1246         * up the svc_xprt, and verify that here.
1247         *
1248         * The spec is not very clear about what error should be returned
1249         * when someone tries to access a server that is listening on UDP
1250         * for lower versions. RPC_PROG_MISMATCH seems to be the closest
1251         * fit.
1252         */
1253        if (versp->vs_need_cong_ctrl && rqstp->rq_xprt &&
1254            !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
1255                goto err_bad_vers;
1256
1257        if (rqstp->rq_proc >= versp->vs_nproc)
1258                goto err_bad_proc;
1259        rqstp->rq_procinfo = procp = &versp->vs_proc[rqstp->rq_proc];
1260        if (!procp)
1261                goto err_bad_proc;
1262
1263        /* Initialize storage for argp and resp */
1264        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1265        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1266
1267        /* Bump per-procedure stats counter */
1268        versp->vs_count[rqstp->rq_proc]++;
1269
1270        ret->dispatch = versp->vs_dispatch;
1271        return rpc_success;
1272err_bad_vers:
1273        ret->mismatch.lovers = progp->pg_lovers;
1274        ret->mismatch.hivers = progp->pg_hivers;
1275        return rpc_prog_mismatch;
1276err_bad_proc:
1277        return rpc_proc_unavail;
1278}
1279EXPORT_SYMBOL_GPL(svc_generic_init_request);
1280
1281/*
1282 * Common routine for processing the RPC request.
1283 */
1284static int
1285svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1286{
1287        struct svc_program      *progp;
1288        const struct svc_procedure *procp = NULL;
1289        struct svc_serv         *serv = rqstp->rq_server;
1290        struct svc_process_info process;
1291        __be32                  *statp;
1292        u32                     prog, vers;
1293        __be32                  rpc_stat;
1294        int                     auth_res;
1295        __be32                  *reply_statp;
1296
1297        rpc_stat = rpc_success;
1298
1299        if (argv->iov_len < 6*4)
1300                goto err_short_len;
1301
1302        /* Will be turned off by GSS integrity and privacy services */
1303        set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1304        /* Will be turned off only when NFSv4 Sessions are used */
1305        set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1306        clear_bit(RQ_DROPME, &rqstp->rq_flags);
1307
1308        svc_putu32(resv, rqstp->rq_xid);
1309
1310        vers = svc_getnl(argv);
1311
1312        /* First words of reply: */
1313        svc_putnl(resv, 1);             /* REPLY */
1314
1315        if (vers != 2)          /* RPC version number */
1316                goto err_bad_rpc;
1317
1318        /* Save position in case we later decide to reject: */
1319        reply_statp = resv->iov_base + resv->iov_len;
1320
1321        svc_putnl(resv, 0);             /* ACCEPT */
1322
1323        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1324        rqstp->rq_vers = svc_getnl(argv);       /* version number */
1325        rqstp->rq_proc = svc_getnl(argv);       /* procedure number */
1326
1327        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1328                if (prog == progp->pg_prog)
1329                        break;
1330
1331        /*
1332         * Decode auth data, and add verifier to reply buffer.
1333         * We do this before anything else in order to get a decent
1334         * auth verifier.
1335         */
1336        auth_res = svc_authenticate(rqstp);
1337        /* Also give the program a chance to reject this call: */
1338        if (auth_res == SVC_OK && progp)
1339                auth_res = progp->pg_authenticate(rqstp);
1340        if (auth_res != SVC_OK)
1341                trace_svc_authenticate(rqstp, auth_res);
1342        switch (auth_res) {
1343        case SVC_OK:
1344                break;
1345        case SVC_GARBAGE:
1346                goto err_garbage;
1347        case SVC_SYSERR:
1348                rpc_stat = rpc_system_err;
1349                goto err_bad;
1350        case SVC_DENIED:
1351                goto err_bad_auth;
1352        case SVC_CLOSE:
1353                goto close;
1354        case SVC_DROP:
1355                goto dropit;
1356        case SVC_COMPLETE:
1357                goto sendit;
1358        }
1359
1360        if (progp == NULL)
1361                goto err_bad_prog;
1362
1363        rpc_stat = progp->pg_init_request(rqstp, progp, &process);
1364        switch (rpc_stat) {
1365        case rpc_success:
1366                break;
1367        case rpc_prog_unavail:
1368                goto err_bad_prog;
1369        case rpc_prog_mismatch:
1370                goto err_bad_vers;
1371        case rpc_proc_unavail:
1372                goto err_bad_proc;
1373        }
1374
1375        procp = rqstp->rq_procinfo;
1376        /* Should this check go into the dispatcher? */
1377        if (!procp || !procp->pc_func)
1378                goto err_bad_proc;
1379
1380        /* Syntactic check complete */
1381        serv->sv_stats->rpccnt++;
1382        trace_svc_process(rqstp, progp->pg_name);
1383
1384        /* Build the reply header. */
1385        statp = resv->iov_base +resv->iov_len;
1386        svc_putnl(resv, RPC_SUCCESS);
1387
1388        /* un-reserve some of the out-queue now that we have a
1389         * better idea of reply size
1390         */
1391        if (procp->pc_xdrressize)
1392                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1393
1394        /* Call the function that processes the request. */
1395        if (!process.dispatch) {
1396                if (!svc_generic_dispatch(rqstp, statp))
1397                        goto release_dropit;
1398                if (*statp == rpc_garbage_args)
1399                        goto err_garbage;
1400        } else {
1401                dprintk("svc: calling dispatcher\n");
1402                if (!process.dispatch(rqstp, statp))
1403                        goto release_dropit; /* Release reply info */
1404        }
1405
1406        if (rqstp->rq_auth_stat != rpc_auth_ok)
1407                goto err_release_bad_auth;
1408
1409        /* Check RPC status result */
1410        if (*statp != rpc_success)
1411                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1412
1413        /* Release reply info */
1414        if (procp->pc_release)
1415                procp->pc_release(rqstp);
1416
1417        if (procp->pc_encode == NULL)
1418                goto dropit;
1419
1420 sendit:
1421        if (svc_authorise(rqstp))
1422                goto close_xprt;
1423        return 1;               /* Caller can now send it */
1424
1425release_dropit:
1426        if (procp->pc_release)
1427                procp->pc_release(rqstp);
1428 dropit:
1429        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1430        dprintk("svc: svc_process dropit\n");
1431        return 0;
1432
1433 close:
1434        svc_authorise(rqstp);
1435close_xprt:
1436        if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1437                svc_close_xprt(rqstp->rq_xprt);
1438        dprintk("svc: svc_process close\n");
1439        return 0;
1440
1441err_short_len:
1442        svc_printk(rqstp, "short len %zd, dropping request\n",
1443                        argv->iov_len);
1444        goto close_xprt;
1445
1446err_bad_rpc:
1447        serv->sv_stats->rpcbadfmt++;
1448        svc_putnl(resv, 1);     /* REJECT */
1449        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1450        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1451        svc_putnl(resv, 2);
1452        goto sendit;
1453
1454err_release_bad_auth:
1455        if (procp->pc_release)
1456                procp->pc_release(rqstp);
1457err_bad_auth:
1458        dprintk("svc: authentication failed (%d)\n",
1459                be32_to_cpu(rqstp->rq_auth_stat));
1460        serv->sv_stats->rpcbadauth++;
1461        /* Restore write pointer to location of accept status: */
1462        xdr_ressize_check(rqstp, reply_statp);
1463        svc_putnl(resv, 1);     /* REJECT */
1464        svc_putnl(resv, 1);     /* AUTH_ERROR */
1465        svc_putu32(resv, rqstp->rq_auth_stat);  /* status */
1466        goto sendit;
1467
1468err_bad_prog:
1469        dprintk("svc: unknown program %d\n", prog);
1470        serv->sv_stats->rpcbadfmt++;
1471        svc_putnl(resv, RPC_PROG_UNAVAIL);
1472        goto sendit;
1473
1474err_bad_vers:
1475        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1476                       rqstp->rq_vers, rqstp->rq_prog, progp->pg_name);
1477
1478        serv->sv_stats->rpcbadfmt++;
1479        svc_putnl(resv, RPC_PROG_MISMATCH);
1480        svc_putnl(resv, process.mismatch.lovers);
1481        svc_putnl(resv, process.mismatch.hivers);
1482        goto sendit;
1483
1484err_bad_proc:
1485        svc_printk(rqstp, "unknown procedure (%d)\n", rqstp->rq_proc);
1486
1487        serv->sv_stats->rpcbadfmt++;
1488        svc_putnl(resv, RPC_PROC_UNAVAIL);
1489        goto sendit;
1490
1491err_garbage:
1492        svc_printk(rqstp, "failed to decode args\n");
1493
1494        rpc_stat = rpc_garbage_args;
1495err_bad:
1496        serv->sv_stats->rpcbadfmt++;
1497        svc_putnl(resv, ntohl(rpc_stat));
1498        goto sendit;
1499}
1500
1501/*
1502 * Process the RPC request.
1503 */
1504int
1505svc_process(struct svc_rqst *rqstp)
1506{
1507        struct kvec             *argv = &rqstp->rq_arg.head[0];
1508        struct kvec             *resv = &rqstp->rq_res.head[0];
1509        struct svc_serv         *serv = rqstp->rq_server;
1510        u32                     dir;
1511
1512#if IS_ENABLED(CONFIG_FAIL_SUNRPC)
1513        if (!fail_sunrpc.ignore_server_disconnect &&
1514            should_fail(&fail_sunrpc.attr, 1))
1515                svc_xprt_deferred_close(rqstp->rq_xprt);
1516#endif
1517
1518        /*
1519         * Setup response xdr_buf.
1520         * Initially it has just one page
1521         */
1522        rqstp->rq_next_page = &rqstp->rq_respages[1];
1523        resv->iov_base = page_address(rqstp->rq_respages[0]);
1524        resv->iov_len = 0;
1525        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1526        rqstp->rq_res.len = 0;
1527        rqstp->rq_res.page_base = 0;
1528        rqstp->rq_res.page_len = 0;
1529        rqstp->rq_res.buflen = PAGE_SIZE;
1530        rqstp->rq_res.tail[0].iov_base = NULL;
1531        rqstp->rq_res.tail[0].iov_len = 0;
1532
1533        dir  = svc_getnl(argv);
1534        if (dir != 0) {
1535                /* direction != CALL */
1536                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1537                serv->sv_stats->rpcbadfmt++;
1538                goto out_drop;
1539        }
1540
1541        /* Returns 1 for send, 0 for drop */
1542        if (likely(svc_process_common(rqstp, argv, resv)))
1543                return svc_send(rqstp);
1544
1545out_drop:
1546        svc_drop(rqstp);
1547        return 0;
1548}
1549EXPORT_SYMBOL_GPL(svc_process);
1550
1551#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1552/*
1553 * Process a backchannel RPC request that arrived over an existing
1554 * outbound connection
1555 */
1556int
1557bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1558               struct svc_rqst *rqstp)
1559{
1560        struct kvec     *argv = &rqstp->rq_arg.head[0];
1561        struct kvec     *resv = &rqstp->rq_res.head[0];
1562        struct rpc_task *task;
1563        int proc_error;
1564        int error;
1565
1566        dprintk("svc: %s(%p)\n", __func__, req);
1567
1568        /* Build the svc_rqst used by the common processing routine */
1569        rqstp->rq_xid = req->rq_xid;
1570        rqstp->rq_prot = req->rq_xprt->prot;
1571        rqstp->rq_server = serv;
1572        rqstp->rq_bc_net = req->rq_xprt->xprt_net;
1573
1574        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1575        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1576        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1577        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1578
1579        /* Adjust the argument buffer length */
1580        rqstp->rq_arg.len = req->rq_private_buf.len;
1581        if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1582                rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1583                rqstp->rq_arg.page_len = 0;
1584        } else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
1585                        rqstp->rq_arg.page_len)
1586                rqstp->rq_arg.page_len = rqstp->rq_arg.len -
1587                        rqstp->rq_arg.head[0].iov_len;
1588        else
1589                rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
1590                        rqstp->rq_arg.page_len;
1591
1592        /* reset result send buffer "put" position */
1593        resv->iov_len = 0;
1594
1595        /*
1596         * Skip the next two words because they've already been
1597         * processed in the transport
1598         */
1599        svc_getu32(argv);       /* XID */
1600        svc_getnl(argv);        /* CALLDIR */
1601
1602        /* Parse and execute the bc call */
1603        proc_error = svc_process_common(rqstp, argv, resv);
1604
1605        atomic_dec(&req->rq_xprt->bc_slot_count);
1606        if (!proc_error) {
1607                /* Processing error: drop the request */
1608                xprt_free_bc_request(req);
1609                error = -EINVAL;
1610                goto out;
1611        }
1612        /* Finally, send the reply synchronously */
1613        memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1614        task = rpc_run_bc_task(req);
1615        if (IS_ERR(task)) {
1616                error = PTR_ERR(task);
1617                goto out;
1618        }
1619
1620        WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
1621        error = task->tk_status;
1622        rpc_put_task(task);
1623
1624out:
1625        dprintk("svc: %s(), error=%d\n", __func__, error);
1626        return error;
1627}
1628EXPORT_SYMBOL_GPL(bc_svc_process);
1629#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1630
1631/*
1632 * Return (transport-specific) limit on the rpc payload.
1633 */
1634u32 svc_max_payload(const struct svc_rqst *rqstp)
1635{
1636        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1637
1638        if (rqstp->rq_server->sv_max_payload < max)
1639                max = rqstp->rq_server->sv_max_payload;
1640        return max;
1641}
1642EXPORT_SYMBOL_GPL(svc_max_payload);
1643
1644/**
1645 * svc_proc_name - Return RPC procedure name in string form
1646 * @rqstp: svc_rqst to operate on
1647 *
1648 * Return value:
1649 *   Pointer to a NUL-terminated string
1650 */
1651const char *svc_proc_name(const struct svc_rqst *rqstp)
1652{
1653        if (rqstp && rqstp->rq_procinfo)
1654                return rqstp->rq_procinfo->pc_name;
1655        return "unknown";
1656}
1657
1658
1659/**
1660 * svc_encode_result_payload - mark a range of bytes as a result payload
1661 * @rqstp: svc_rqst to operate on
1662 * @offset: payload's byte offset in rqstp->rq_res
1663 * @length: size of payload, in bytes
1664 *
1665 * Returns zero on success, or a negative errno if a permanent
1666 * error occurred.
1667 */
1668int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset,
1669                              unsigned int length)
1670{
1671        return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset,
1672                                                           length);
1673}
1674EXPORT_SYMBOL_GPL(svc_encode_result_payload);
1675
1676/**
1677 * svc_fill_write_vector - Construct data argument for VFS write call
1678 * @rqstp: svc_rqst to operate on
1679 * @pages: list of pages containing data payload
1680 * @first: buffer containing first section of write payload
1681 * @total: total number of bytes of write payload
1682 *
1683 * Fills in rqstp::rq_vec, and returns the number of elements.
1684 */
1685unsigned int svc_fill_write_vector(struct svc_rqst *rqstp, struct page **pages,
1686                                   struct kvec *first, size_t total)
1687{
1688        struct kvec *vec = rqstp->rq_vec;
1689        unsigned int i;
1690
1691        /* Some types of transport can present the write payload
1692         * entirely in rq_arg.pages. In this case, @first is empty.
1693         */
1694        i = 0;
1695        if (first->iov_len) {
1696                vec[i].iov_base = first->iov_base;
1697                vec[i].iov_len = min_t(size_t, total, first->iov_len);
1698                total -= vec[i].iov_len;
1699                ++i;
1700        }
1701
1702        while (total) {
1703                vec[i].iov_base = page_address(*pages);
1704                vec[i].iov_len = min_t(size_t, total, PAGE_SIZE);
1705                total -= vec[i].iov_len;
1706                ++i;
1707                ++pages;
1708        }
1709
1710        WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec));
1711        return i;
1712}
1713EXPORT_SYMBOL_GPL(svc_fill_write_vector);
1714
1715/**
1716 * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call
1717 * @rqstp: svc_rqst to operate on
1718 * @first: buffer containing first section of pathname
1719 * @p: buffer containing remaining section of pathname
1720 * @total: total length of the pathname argument
1721 *
1722 * The VFS symlink API demands a NUL-terminated pathname in mapped memory.
1723 * Returns pointer to a NUL-terminated string, or an ERR_PTR. Caller must free
1724 * the returned string.
1725 */
1726char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first,
1727                                void *p, size_t total)
1728{
1729        size_t len, remaining;
1730        char *result, *dst;
1731
1732        result = kmalloc(total + 1, GFP_KERNEL);
1733        if (!result)
1734                return ERR_PTR(-ESERVERFAULT);
1735
1736        dst = result;
1737        remaining = total;
1738
1739        len = min_t(size_t, total, first->iov_len);
1740        if (len) {
1741                memcpy(dst, first->iov_base, len);
1742                dst += len;
1743                remaining -= len;
1744        }
1745
1746        if (remaining) {
1747                len = min_t(size_t, remaining, PAGE_SIZE);
1748                memcpy(dst, p, len);
1749                dst += len;
1750        }
1751
1752        *dst = '\0';
1753
1754        /* Sanity check: Linux doesn't allow the pathname argument to
1755         * contain a NUL byte.
1756         */
1757        if (strlen(result) != total) {
1758                kfree(result);
1759                return ERR_PTR(-EINVAL);
1760        }
1761        return result;
1762}
1763EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname);
1764