linux/net/sunrpc/svc.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/svc.c
   3 *
   4 * High-level RPC service routines
   5 *
   6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   7 *
   8 * Multiple threads pools and NUMAisation
   9 * Copyright (c) 2006 Silicon Graphics, Inc.
  10 * by Greg Banks <gnb@melbourne.sgi.com>
  11 */
  12
  13#include <linux/linkage.h>
  14#include <linux/sched.h>
  15#include <linux/errno.h>
  16#include <linux/net.h>
  17#include <linux/in.h>
  18#include <linux/mm.h>
  19#include <linux/interrupt.h>
  20#include <linux/module.h>
  21#include <linux/kthread.h>
  22#include <linux/slab.h>
  23
  24#include <linux/sunrpc/types.h>
  25#include <linux/sunrpc/xdr.h>
  26#include <linux/sunrpc/stats.h>
  27#include <linux/sunrpc/svcsock.h>
  28#include <linux/sunrpc/clnt.h>
  29#include <linux/sunrpc/bc_xprt.h>
  30
  31#define RPCDBG_FACILITY RPCDBG_SVCDSP
  32
  33static void svc_unregister(const struct svc_serv *serv, struct net *net);
  34
  35#define svc_serv_is_pooled(serv)    ((serv)->sv_function)
  36
  37/*
  38 * Mode for mapping cpus to pools.
  39 */
  40enum {
  41        SVC_POOL_AUTO = -1,     /* choose one of the others */
  42        SVC_POOL_GLOBAL,        /* no mapping, just a single global pool
  43                                 * (legacy & UP mode) */
  44        SVC_POOL_PERCPU,        /* one pool per cpu */
  45        SVC_POOL_PERNODE        /* one pool per numa node */
  46};
  47#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  48
  49/*
  50 * Structure for mapping cpus to pools and vice versa.
  51 * Setup once during sunrpc initialisation.
  52 */
  53static struct svc_pool_map {
  54        int count;                      /* How many svc_servs use us */
  55        int mode;                       /* Note: int not enum to avoid
  56                                         * warnings about "enumeration value
  57                                         * not handled in switch" */
  58        unsigned int npools;
  59        unsigned int *pool_to;          /* maps pool id to cpu or node */
  60        unsigned int *to_pool;          /* maps cpu or node to pool id */
  61} svc_pool_map = {
  62        .count = 0,
  63        .mode = SVC_POOL_DEFAULT
  64};
  65static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  66
  67static int
  68param_set_pool_mode(const char *val, struct kernel_param *kp)
  69{
  70        int *ip = (int *)kp->arg;
  71        struct svc_pool_map *m = &svc_pool_map;
  72        int err;
  73
  74        mutex_lock(&svc_pool_map_mutex);
  75
  76        err = -EBUSY;
  77        if (m->count)
  78                goto out;
  79
  80        err = 0;
  81        if (!strncmp(val, "auto", 4))
  82                *ip = SVC_POOL_AUTO;
  83        else if (!strncmp(val, "global", 6))
  84                *ip = SVC_POOL_GLOBAL;
  85        else if (!strncmp(val, "percpu", 6))
  86                *ip = SVC_POOL_PERCPU;
  87        else if (!strncmp(val, "pernode", 7))
  88                *ip = SVC_POOL_PERNODE;
  89        else
  90                err = -EINVAL;
  91
  92out:
  93        mutex_unlock(&svc_pool_map_mutex);
  94        return err;
  95}
  96
  97static int
  98param_get_pool_mode(char *buf, struct kernel_param *kp)
  99{
 100        int *ip = (int *)kp->arg;
 101
 102        switch (*ip)
 103        {
 104        case SVC_POOL_AUTO:
 105                return strlcpy(buf, "auto", 20);
 106        case SVC_POOL_GLOBAL:
 107                return strlcpy(buf, "global", 20);
 108        case SVC_POOL_PERCPU:
 109                return strlcpy(buf, "percpu", 20);
 110        case SVC_POOL_PERNODE:
 111                return strlcpy(buf, "pernode", 20);
 112        default:
 113                return sprintf(buf, "%d", *ip);
 114        }
 115}
 116
 117module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 118                 &svc_pool_map.mode, 0644);
 119
 120/*
 121 * Detect best pool mapping mode heuristically,
 122 * according to the machine's topology.
 123 */
 124static int
 125svc_pool_map_choose_mode(void)
 126{
 127        unsigned int node;
 128
 129        if (nr_online_nodes > 1) {
 130                /*
 131                 * Actually have multiple NUMA nodes,
 132                 * so split pools on NUMA node boundaries
 133                 */
 134                return SVC_POOL_PERNODE;
 135        }
 136
 137        node = first_online_node;
 138        if (nr_cpus_node(node) > 2) {
 139                /*
 140                 * Non-trivial SMP, or CONFIG_NUMA on
 141                 * non-NUMA hardware, e.g. with a generic
 142                 * x86_64 kernel on Xeons.  In this case we
 143                 * want to divide the pools on cpu boundaries.
 144                 */
 145                return SVC_POOL_PERCPU;
 146        }
 147
 148        /* default: one global pool */
 149        return SVC_POOL_GLOBAL;
 150}
 151
 152/*
 153 * Allocate the to_pool[] and pool_to[] arrays.
 154 * Returns 0 on success or an errno.
 155 */
 156static int
 157svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 158{
 159        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 160        if (!m->to_pool)
 161                goto fail;
 162        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 163        if (!m->pool_to)
 164                goto fail_free;
 165
 166        return 0;
 167
 168fail_free:
 169        kfree(m->to_pool);
 170        m->to_pool = NULL;
 171fail:
 172        return -ENOMEM;
 173}
 174
 175/*
 176 * Initialise the pool map for SVC_POOL_PERCPU mode.
 177 * Returns number of pools or <0 on error.
 178 */
 179static int
 180svc_pool_map_init_percpu(struct svc_pool_map *m)
 181{
 182        unsigned int maxpools = nr_cpu_ids;
 183        unsigned int pidx = 0;
 184        unsigned int cpu;
 185        int err;
 186
 187        err = svc_pool_map_alloc_arrays(m, maxpools);
 188        if (err)
 189                return err;
 190
 191        for_each_online_cpu(cpu) {
 192                BUG_ON(pidx > maxpools);
 193                m->to_pool[cpu] = pidx;
 194                m->pool_to[pidx] = cpu;
 195                pidx++;
 196        }
 197        /* cpus brought online later all get mapped to pool0, sorry */
 198
 199        return pidx;
 200};
 201
 202
 203/*
 204 * Initialise the pool map for SVC_POOL_PERNODE mode.
 205 * Returns number of pools or <0 on error.
 206 */
 207static int
 208svc_pool_map_init_pernode(struct svc_pool_map *m)
 209{
 210        unsigned int maxpools = nr_node_ids;
 211        unsigned int pidx = 0;
 212        unsigned int node;
 213        int err;
 214
 215        err = svc_pool_map_alloc_arrays(m, maxpools);
 216        if (err)
 217                return err;
 218
 219        for_each_node_with_cpus(node) {
 220                /* some architectures (e.g. SN2) have cpuless nodes */
 221                BUG_ON(pidx > maxpools);
 222                m->to_pool[node] = pidx;
 223                m->pool_to[pidx] = node;
 224                pidx++;
 225        }
 226        /* nodes brought online later all get mapped to pool0, sorry */
 227
 228        return pidx;
 229}
 230
 231
 232/*
 233 * Add a reference to the global map of cpus to pools (and
 234 * vice versa).  Initialise the map if we're the first user.
 235 * Returns the number of pools.
 236 */
 237static unsigned int
 238svc_pool_map_get(void)
 239{
 240        struct svc_pool_map *m = &svc_pool_map;
 241        int npools = -1;
 242
 243        mutex_lock(&svc_pool_map_mutex);
 244
 245        if (m->count++) {
 246                mutex_unlock(&svc_pool_map_mutex);
 247                return m->npools;
 248        }
 249
 250        if (m->mode == SVC_POOL_AUTO)
 251                m->mode = svc_pool_map_choose_mode();
 252
 253        switch (m->mode) {
 254        case SVC_POOL_PERCPU:
 255                npools = svc_pool_map_init_percpu(m);
 256                break;
 257        case SVC_POOL_PERNODE:
 258                npools = svc_pool_map_init_pernode(m);
 259                break;
 260        }
 261
 262        if (npools < 0) {
 263                /* default, or memory allocation failure */
 264                npools = 1;
 265                m->mode = SVC_POOL_GLOBAL;
 266        }
 267        m->npools = npools;
 268
 269        mutex_unlock(&svc_pool_map_mutex);
 270        return m->npools;
 271}
 272
 273
 274/*
 275 * Drop a reference to the global map of cpus to pools.
 276 * When the last reference is dropped, the map data is
 277 * freed; this allows the sysadmin to change the pool
 278 * mode using the pool_mode module option without
 279 * rebooting or re-loading sunrpc.ko.
 280 */
 281static void
 282svc_pool_map_put(void)
 283{
 284        struct svc_pool_map *m = &svc_pool_map;
 285
 286        mutex_lock(&svc_pool_map_mutex);
 287
 288        if (!--m->count) {
 289                kfree(m->to_pool);
 290                m->to_pool = NULL;
 291                kfree(m->pool_to);
 292                m->pool_to = NULL;
 293                m->npools = 0;
 294        }
 295
 296        mutex_unlock(&svc_pool_map_mutex);
 297}
 298
 299
 300static int svc_pool_map_get_node(unsigned int pidx)
 301{
 302        const struct svc_pool_map *m = &svc_pool_map;
 303
 304        if (m->count) {
 305                if (m->mode == SVC_POOL_PERCPU)
 306                        return cpu_to_node(m->pool_to[pidx]);
 307                if (m->mode == SVC_POOL_PERNODE)
 308                        return m->pool_to[pidx];
 309        }
 310        return NUMA_NO_NODE;
 311}
 312/*
 313 * Set the given thread's cpus_allowed mask so that it
 314 * will only run on cpus in the given pool.
 315 */
 316static inline void
 317svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 318{
 319        struct svc_pool_map *m = &svc_pool_map;
 320        unsigned int node = m->pool_to[pidx];
 321
 322        /*
 323         * The caller checks for sv_nrpools > 1, which
 324         * implies that we've been initialized.
 325         */
 326        WARN_ON_ONCE(m->count == 0);
 327        if (m->count == 0)
 328                return;
 329
 330        switch (m->mode) {
 331        case SVC_POOL_PERCPU:
 332        {
 333                set_cpus_allowed_ptr(task, cpumask_of(node));
 334                break;
 335        }
 336        case SVC_POOL_PERNODE:
 337        {
 338                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 339                break;
 340        }
 341        }
 342}
 343
 344/*
 345 * Use the mapping mode to choose a pool for a given CPU.
 346 * Used when enqueueing an incoming RPC.  Always returns
 347 * a non-NULL pool pointer.
 348 */
 349struct svc_pool *
 350svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 351{
 352        struct svc_pool_map *m = &svc_pool_map;
 353        unsigned int pidx = 0;
 354
 355        /*
 356         * An uninitialised map happens in a pure client when
 357         * lockd is brought up, so silently treat it the
 358         * same as SVC_POOL_GLOBAL.
 359         */
 360        if (svc_serv_is_pooled(serv)) {
 361                switch (m->mode) {
 362                case SVC_POOL_PERCPU:
 363                        pidx = m->to_pool[cpu];
 364                        break;
 365                case SVC_POOL_PERNODE:
 366                        pidx = m->to_pool[cpu_to_node(cpu)];
 367                        break;
 368                }
 369        }
 370        return &serv->sv_pools[pidx % serv->sv_nrpools];
 371}
 372
 373int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
 374{
 375        int err;
 376
 377        err = rpcb_create_local(net);
 378        if (err)
 379                return err;
 380
 381        /* Remove any stale portmap registrations */
 382        svc_unregister(serv, net);
 383        return 0;
 384}
 385EXPORT_SYMBOL_GPL(svc_rpcb_setup);
 386
 387void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
 388{
 389        svc_unregister(serv, net);
 390        rpcb_put_local(net);
 391}
 392EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
 393
 394static int svc_uses_rpcbind(struct svc_serv *serv)
 395{
 396        struct svc_program      *progp;
 397        unsigned int            i;
 398
 399        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 400                for (i = 0; i < progp->pg_nvers; i++) {
 401                        if (progp->pg_vers[i] == NULL)
 402                                continue;
 403                        if (progp->pg_vers[i]->vs_hidden == 0)
 404                                return 1;
 405                }
 406        }
 407
 408        return 0;
 409}
 410
 411int svc_bind(struct svc_serv *serv, struct net *net)
 412{
 413        if (!svc_uses_rpcbind(serv))
 414                return 0;
 415        return svc_rpcb_setup(serv, net);
 416}
 417EXPORT_SYMBOL_GPL(svc_bind);
 418
 419/*
 420 * Create an RPC service
 421 */
 422static struct svc_serv *
 423__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 424             void (*shutdown)(struct svc_serv *serv, struct net *net))
 425{
 426        struct svc_serv *serv;
 427        unsigned int vers;
 428        unsigned int xdrsize;
 429        unsigned int i;
 430
 431        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 432                return NULL;
 433        serv->sv_name      = prog->pg_name;
 434        serv->sv_program   = prog;
 435        serv->sv_nrthreads = 1;
 436        serv->sv_stats     = prog->pg_stats;
 437        if (bufsize > RPCSVC_MAXPAYLOAD)
 438                bufsize = RPCSVC_MAXPAYLOAD;
 439        serv->sv_max_payload = bufsize? bufsize : 4096;
 440        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 441        serv->sv_shutdown  = shutdown;
 442        xdrsize = 0;
 443        while (prog) {
 444                prog->pg_lovers = prog->pg_nvers-1;
 445                for (vers=0; vers<prog->pg_nvers ; vers++)
 446                        if (prog->pg_vers[vers]) {
 447                                prog->pg_hivers = vers;
 448                                if (prog->pg_lovers > vers)
 449                                        prog->pg_lovers = vers;
 450                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 451                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 452                        }
 453                prog = prog->pg_next;
 454        }
 455        serv->sv_xdrsize   = xdrsize;
 456        INIT_LIST_HEAD(&serv->sv_tempsocks);
 457        INIT_LIST_HEAD(&serv->sv_permsocks);
 458        init_timer(&serv->sv_temptimer);
 459        spin_lock_init(&serv->sv_lock);
 460
 461        serv->sv_nrpools = npools;
 462        serv->sv_pools =
 463                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 464                        GFP_KERNEL);
 465        if (!serv->sv_pools) {
 466                kfree(serv);
 467                return NULL;
 468        }
 469
 470        for (i = 0; i < serv->sv_nrpools; i++) {
 471                struct svc_pool *pool = &serv->sv_pools[i];
 472
 473                dprintk("svc: initialising pool %u for %s\n",
 474                                i, serv->sv_name);
 475
 476                pool->sp_id = i;
 477                INIT_LIST_HEAD(&pool->sp_threads);
 478                INIT_LIST_HEAD(&pool->sp_sockets);
 479                INIT_LIST_HEAD(&pool->sp_all_threads);
 480                spin_lock_init(&pool->sp_lock);
 481        }
 482
 483        if (svc_uses_rpcbind(serv) && (!serv->sv_shutdown))
 484                serv->sv_shutdown = svc_rpcb_cleanup;
 485
 486        return serv;
 487}
 488
 489struct svc_serv *
 490svc_create(struct svc_program *prog, unsigned int bufsize,
 491           void (*shutdown)(struct svc_serv *serv, struct net *net))
 492{
 493        return __svc_create(prog, bufsize, /*npools*/1, shutdown);
 494}
 495EXPORT_SYMBOL_GPL(svc_create);
 496
 497struct svc_serv *
 498svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 499                  void (*shutdown)(struct svc_serv *serv, struct net *net),
 500                  svc_thread_fn func, struct module *mod)
 501{
 502        struct svc_serv *serv;
 503        unsigned int npools = svc_pool_map_get();
 504
 505        serv = __svc_create(prog, bufsize, npools, shutdown);
 506
 507        if (serv != NULL) {
 508                serv->sv_function = func;
 509                serv->sv_module = mod;
 510        }
 511
 512        return serv;
 513}
 514EXPORT_SYMBOL_GPL(svc_create_pooled);
 515
 516void svc_shutdown_net(struct svc_serv *serv, struct net *net)
 517{
 518        svc_close_net(serv, net);
 519
 520        if (serv->sv_shutdown)
 521                serv->sv_shutdown(serv, net);
 522}
 523EXPORT_SYMBOL_GPL(svc_shutdown_net);
 524
 525/*
 526 * Destroy an RPC service. Should be called with appropriate locking to
 527 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 528 */
 529void
 530svc_destroy(struct svc_serv *serv)
 531{
 532        dprintk("svc: svc_destroy(%s, %d)\n",
 533                                serv->sv_program->pg_name,
 534                                serv->sv_nrthreads);
 535
 536        if (serv->sv_nrthreads) {
 537                if (--(serv->sv_nrthreads) != 0) {
 538                        svc_sock_update_bufs(serv);
 539                        return;
 540                }
 541        } else
 542                printk("svc_destroy: no threads for serv=%p!\n", serv);
 543
 544        del_timer_sync(&serv->sv_temptimer);
 545
 546        /*
 547         * The last user is gone and thus all sockets have to be destroyed to
 548         * the point. Check this.
 549         */
 550        BUG_ON(!list_empty(&serv->sv_permsocks));
 551        BUG_ON(!list_empty(&serv->sv_tempsocks));
 552
 553        cache_clean_deferred(serv);
 554
 555        if (svc_serv_is_pooled(serv))
 556                svc_pool_map_put();
 557
 558        kfree(serv->sv_pools);
 559        kfree(serv);
 560}
 561EXPORT_SYMBOL_GPL(svc_destroy);
 562
 563/*
 564 * Allocate an RPC server's buffer space.
 565 * We allocate pages and place them in rq_argpages.
 566 */
 567static int
 568svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
 569{
 570        unsigned int pages, arghi;
 571
 572        /* bc_xprt uses fore channel allocated buffers */
 573        if (svc_is_backchannel(rqstp))
 574                return 1;
 575
 576        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 577                                       * We assume one is at most one page
 578                                       */
 579        arghi = 0;
 580        WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
 581        if (pages > RPCSVC_MAXPAGES)
 582                pages = RPCSVC_MAXPAGES;
 583        while (pages) {
 584                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
 585                if (!p)
 586                        break;
 587                rqstp->rq_pages[arghi++] = p;
 588                pages--;
 589        }
 590        return pages == 0;
 591}
 592
 593/*
 594 * Release an RPC server buffer
 595 */
 596static void
 597svc_release_buffer(struct svc_rqst *rqstp)
 598{
 599        unsigned int i;
 600
 601        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 602                if (rqstp->rq_pages[i])
 603                        put_page(rqstp->rq_pages[i]);
 604}
 605
 606struct svc_rqst *
 607svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 608{
 609        struct svc_rqst *rqstp;
 610
 611        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
 612        if (!rqstp)
 613                goto out_enomem;
 614
 615        init_waitqueue_head(&rqstp->rq_wait);
 616
 617        serv->sv_nrthreads++;
 618        spin_lock_bh(&pool->sp_lock);
 619        pool->sp_nrthreads++;
 620        list_add(&rqstp->rq_all, &pool->sp_all_threads);
 621        spin_unlock_bh(&pool->sp_lock);
 622        rqstp->rq_server = serv;
 623        rqstp->rq_pool = pool;
 624
 625        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 626        if (!rqstp->rq_argp)
 627                goto out_thread;
 628
 629        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 630        if (!rqstp->rq_resp)
 631                goto out_thread;
 632
 633        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
 634                goto out_thread;
 635
 636        return rqstp;
 637out_thread:
 638        svc_exit_thread(rqstp);
 639out_enomem:
 640        return ERR_PTR(-ENOMEM);
 641}
 642EXPORT_SYMBOL_GPL(svc_prepare_thread);
 643
 644/*
 645 * Choose a pool in which to create a new thread, for svc_set_num_threads
 646 */
 647static inline struct svc_pool *
 648choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 649{
 650        if (pool != NULL)
 651                return pool;
 652
 653        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 654}
 655
 656/*
 657 * Choose a thread to kill, for svc_set_num_threads
 658 */
 659static inline struct task_struct *
 660choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 661{
 662        unsigned int i;
 663        struct task_struct *task = NULL;
 664
 665        if (pool != NULL) {
 666                spin_lock_bh(&pool->sp_lock);
 667        } else {
 668                /* choose a pool in round-robin fashion */
 669                for (i = 0; i < serv->sv_nrpools; i++) {
 670                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 671                        spin_lock_bh(&pool->sp_lock);
 672                        if (!list_empty(&pool->sp_all_threads))
 673                                goto found_pool;
 674                        spin_unlock_bh(&pool->sp_lock);
 675                }
 676                return NULL;
 677        }
 678
 679found_pool:
 680        if (!list_empty(&pool->sp_all_threads)) {
 681                struct svc_rqst *rqstp;
 682
 683                /*
 684                 * Remove from the pool->sp_all_threads list
 685                 * so we don't try to kill it again.
 686                 */
 687                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 688                list_del_init(&rqstp->rq_all);
 689                task = rqstp->rq_task;
 690        }
 691        spin_unlock_bh(&pool->sp_lock);
 692
 693        return task;
 694}
 695
 696/*
 697 * Create or destroy enough new threads to make the number
 698 * of threads the given number.  If `pool' is non-NULL, applies
 699 * only to threads in that pool, otherwise round-robins between
 700 * all pools.  Caller must ensure that mutual exclusion between this and
 701 * server startup or shutdown.
 702 *
 703 * Destroying threads relies on the service threads filling in
 704 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 705 * has been created using svc_create_pooled().
 706 *
 707 * Based on code that used to be in nfsd_svc() but tweaked
 708 * to be pool-aware.
 709 */
 710int
 711svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 712{
 713        struct svc_rqst *rqstp;
 714        struct task_struct *task;
 715        struct svc_pool *chosen_pool;
 716        int error = 0;
 717        unsigned int state = serv->sv_nrthreads-1;
 718        int node;
 719
 720        if (pool == NULL) {
 721                /* The -1 assumes caller has done a svc_get() */
 722                nrservs -= (serv->sv_nrthreads-1);
 723        } else {
 724                spin_lock_bh(&pool->sp_lock);
 725                nrservs -= pool->sp_nrthreads;
 726                spin_unlock_bh(&pool->sp_lock);
 727        }
 728
 729        /* create new threads */
 730        while (nrservs > 0) {
 731                nrservs--;
 732                chosen_pool = choose_pool(serv, pool, &state);
 733
 734                node = svc_pool_map_get_node(chosen_pool->sp_id);
 735                rqstp = svc_prepare_thread(serv, chosen_pool, node);
 736                if (IS_ERR(rqstp)) {
 737                        error = PTR_ERR(rqstp);
 738                        break;
 739                }
 740
 741                __module_get(serv->sv_module);
 742                task = kthread_create_on_node(serv->sv_function, rqstp,
 743                                              node, "%s", serv->sv_name);
 744                if (IS_ERR(task)) {
 745                        error = PTR_ERR(task);
 746                        module_put(serv->sv_module);
 747                        svc_exit_thread(rqstp);
 748                        break;
 749                }
 750
 751                rqstp->rq_task = task;
 752                if (serv->sv_nrpools > 1)
 753                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 754
 755                svc_sock_update_bufs(serv);
 756                wake_up_process(task);
 757        }
 758        /* destroy old threads */
 759        while (nrservs < 0 &&
 760               (task = choose_victim(serv, pool, &state)) != NULL) {
 761                send_sig(SIGINT, task, 1);
 762                nrservs++;
 763        }
 764
 765        return error;
 766}
 767EXPORT_SYMBOL_GPL(svc_set_num_threads);
 768
 769/*
 770 * Called from a server thread as it's exiting. Caller must hold the BKL or
 771 * the "service mutex", whichever is appropriate for the service.
 772 */
 773void
 774svc_exit_thread(struct svc_rqst *rqstp)
 775{
 776        struct svc_serv *serv = rqstp->rq_server;
 777        struct svc_pool *pool = rqstp->rq_pool;
 778
 779        svc_release_buffer(rqstp);
 780        kfree(rqstp->rq_resp);
 781        kfree(rqstp->rq_argp);
 782        kfree(rqstp->rq_auth_data);
 783
 784        spin_lock_bh(&pool->sp_lock);
 785        pool->sp_nrthreads--;
 786        list_del(&rqstp->rq_all);
 787        spin_unlock_bh(&pool->sp_lock);
 788
 789        kfree(rqstp);
 790
 791        /* Release the server */
 792        if (serv)
 793                svc_destroy(serv);
 794}
 795EXPORT_SYMBOL_GPL(svc_exit_thread);
 796
 797/*
 798 * Register an "inet" protocol family netid with the local
 799 * rpcbind daemon via an rpcbind v4 SET request.
 800 *
 801 * No netconfig infrastructure is available in the kernel, so
 802 * we map IP_ protocol numbers to netids by hand.
 803 *
 804 * Returns zero on success; a negative errno value is returned
 805 * if any error occurs.
 806 */
 807static int __svc_rpcb_register4(struct net *net, const u32 program,
 808                                const u32 version,
 809                                const unsigned short protocol,
 810                                const unsigned short port)
 811{
 812        const struct sockaddr_in sin = {
 813                .sin_family             = AF_INET,
 814                .sin_addr.s_addr        = htonl(INADDR_ANY),
 815                .sin_port               = htons(port),
 816        };
 817        const char *netid;
 818        int error;
 819
 820        switch (protocol) {
 821        case IPPROTO_UDP:
 822                netid = RPCBIND_NETID_UDP;
 823                break;
 824        case IPPROTO_TCP:
 825                netid = RPCBIND_NETID_TCP;
 826                break;
 827        default:
 828                return -ENOPROTOOPT;
 829        }
 830
 831        error = rpcb_v4_register(net, program, version,
 832                                        (const struct sockaddr *)&sin, netid);
 833
 834        /*
 835         * User space didn't support rpcbind v4, so retry this
 836         * registration request with the legacy rpcbind v2 protocol.
 837         */
 838        if (error == -EPROTONOSUPPORT)
 839                error = rpcb_register(net, program, version, protocol, port);
 840
 841        return error;
 842}
 843
 844#if IS_ENABLED(CONFIG_IPV6)
 845/*
 846 * Register an "inet6" protocol family netid with the local
 847 * rpcbind daemon via an rpcbind v4 SET request.
 848 *
 849 * No netconfig infrastructure is available in the kernel, so
 850 * we map IP_ protocol numbers to netids by hand.
 851 *
 852 * Returns zero on success; a negative errno value is returned
 853 * if any error occurs.
 854 */
 855static int __svc_rpcb_register6(struct net *net, const u32 program,
 856                                const u32 version,
 857                                const unsigned short protocol,
 858                                const unsigned short port)
 859{
 860        const struct sockaddr_in6 sin6 = {
 861                .sin6_family            = AF_INET6,
 862                .sin6_addr              = IN6ADDR_ANY_INIT,
 863                .sin6_port              = htons(port),
 864        };
 865        const char *netid;
 866        int error;
 867
 868        switch (protocol) {
 869        case IPPROTO_UDP:
 870                netid = RPCBIND_NETID_UDP6;
 871                break;
 872        case IPPROTO_TCP:
 873                netid = RPCBIND_NETID_TCP6;
 874                break;
 875        default:
 876                return -ENOPROTOOPT;
 877        }
 878
 879        error = rpcb_v4_register(net, program, version,
 880                                        (const struct sockaddr *)&sin6, netid);
 881
 882        /*
 883         * User space didn't support rpcbind version 4, so we won't
 884         * use a PF_INET6 listener.
 885         */
 886        if (error == -EPROTONOSUPPORT)
 887                error = -EAFNOSUPPORT;
 888
 889        return error;
 890}
 891#endif  /* IS_ENABLED(CONFIG_IPV6) */
 892
 893/*
 894 * Register a kernel RPC service via rpcbind version 4.
 895 *
 896 * Returns zero on success; a negative errno value is returned
 897 * if any error occurs.
 898 */
 899static int __svc_register(struct net *net, const char *progname,
 900                          const u32 program, const u32 version,
 901                          const int family,
 902                          const unsigned short protocol,
 903                          const unsigned short port)
 904{
 905        int error = -EAFNOSUPPORT;
 906
 907        switch (family) {
 908        case PF_INET:
 909                error = __svc_rpcb_register4(net, program, version,
 910                                                protocol, port);
 911                break;
 912#if IS_ENABLED(CONFIG_IPV6)
 913        case PF_INET6:
 914                error = __svc_rpcb_register6(net, program, version,
 915                                                protocol, port);
 916#endif
 917        }
 918
 919        return error;
 920}
 921
 922/**
 923 * svc_register - register an RPC service with the local portmapper
 924 * @serv: svc_serv struct for the service to register
 925 * @net: net namespace for the service to register
 926 * @family: protocol family of service's listener socket
 927 * @proto: transport protocol number to advertise
 928 * @port: port to advertise
 929 *
 930 * Service is registered for any address in the passed-in protocol family
 931 */
 932int svc_register(const struct svc_serv *serv, struct net *net,
 933                 const int family, const unsigned short proto,
 934                 const unsigned short port)
 935{
 936        struct svc_program      *progp;
 937        struct svc_version      *vers;
 938        unsigned int            i;
 939        int                     error = 0;
 940
 941        WARN_ON_ONCE(proto == 0 && port == 0);
 942        if (proto == 0 && port == 0)
 943                return -EINVAL;
 944
 945        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 946                for (i = 0; i < progp->pg_nvers; i++) {
 947                        vers = progp->pg_vers[i];
 948                        if (vers == NULL)
 949                                continue;
 950
 951                        dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
 952                                        progp->pg_name,
 953                                        i,
 954                                        proto == IPPROTO_UDP?  "udp" : "tcp",
 955                                        port,
 956                                        family,
 957                                        vers->vs_hidden ?
 958                                        " (but not telling portmap)" : "");
 959
 960                        if (vers->vs_hidden)
 961                                continue;
 962
 963                        error = __svc_register(net, progp->pg_name, progp->pg_prog,
 964                                                i, family, proto, port);
 965
 966                        if (vers->vs_rpcb_optnl) {
 967                                error = 0;
 968                                continue;
 969                        }
 970
 971                        if (error < 0) {
 972                                printk(KERN_WARNING "svc: failed to register "
 973                                        "%sv%u RPC service (errno %d).\n",
 974                                        progp->pg_name, i, -error);
 975                                break;
 976                        }
 977                }
 978        }
 979
 980        return error;
 981}
 982
 983/*
 984 * If user space is running rpcbind, it should take the v4 UNSET
 985 * and clear everything for this [program, version].  If user space
 986 * is running portmap, it will reject the v4 UNSET, but won't have
 987 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
 988 * in this case to clear all existing entries for [program, version].
 989 */
 990static void __svc_unregister(struct net *net, const u32 program, const u32 version,
 991                             const char *progname)
 992{
 993        int error;
 994
 995        error = rpcb_v4_register(net, program, version, NULL, "");
 996
 997        /*
 998         * User space didn't support rpcbind v4, so retry this
 999         * request with the legacy rpcbind v2 protocol.
1000         */
1001        if (error == -EPROTONOSUPPORT)
1002                error = rpcb_register(net, program, version, 0, 0);
1003
1004        dprintk("svc: %s(%sv%u), error %d\n",
1005                        __func__, progname, version, error);
1006}
1007
1008/*
1009 * All netids, bind addresses and ports registered for [program, version]
1010 * are removed from the local rpcbind database (if the service is not
1011 * hidden) to make way for a new instance of the service.
1012 *
1013 * The result of unregistration is reported via dprintk for those who want
1014 * verification of the result, but is otherwise not important.
1015 */
1016static void svc_unregister(const struct svc_serv *serv, struct net *net)
1017{
1018        struct svc_program *progp;
1019        unsigned long flags;
1020        unsigned int i;
1021
1022        clear_thread_flag(TIF_SIGPENDING);
1023
1024        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1025                for (i = 0; i < progp->pg_nvers; i++) {
1026                        if (progp->pg_vers[i] == NULL)
1027                                continue;
1028                        if (progp->pg_vers[i]->vs_hidden)
1029                                continue;
1030
1031                        dprintk("svc: attempting to unregister %sv%u\n",
1032                                progp->pg_name, i);
1033                        __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1034                }
1035        }
1036
1037        spin_lock_irqsave(&current->sighand->siglock, flags);
1038        recalc_sigpending();
1039        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1040}
1041
1042/*
1043 * dprintk the given error with the address of the client that caused it.
1044 */
1045#ifdef RPC_DEBUG
1046static __printf(2, 3)
1047void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1048{
1049        struct va_format vaf;
1050        va_list args;
1051        char    buf[RPC_MAX_ADDRBUFLEN];
1052
1053        va_start(args, fmt);
1054
1055        vaf.fmt = fmt;
1056        vaf.va = &args;
1057
1058        dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1059
1060        va_end(args);
1061}
1062#else
1063static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1064#endif
1065
1066/*
1067 * Common routine for processing the RPC request.
1068 */
1069static int
1070svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1071{
1072        struct svc_program      *progp;
1073        struct svc_version      *versp = NULL;  /* compiler food */
1074        struct svc_procedure    *procp = NULL;
1075        struct svc_serv         *serv = rqstp->rq_server;
1076        kxdrproc_t              xdr;
1077        __be32                  *statp;
1078        u32                     prog, vers, proc;
1079        __be32                  auth_stat, rpc_stat;
1080        int                     auth_res;
1081        __be32                  *reply_statp;
1082
1083        rpc_stat = rpc_success;
1084
1085        if (argv->iov_len < 6*4)
1086                goto err_short_len;
1087
1088        /* Will be turned off only in gss privacy case: */
1089        rqstp->rq_splice_ok = 1;
1090        /* Will be turned off only when NFSv4 Sessions are used */
1091        rqstp->rq_usedeferral = 1;
1092        rqstp->rq_dropme = false;
1093
1094        /* Setup reply header */
1095        rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1096
1097        svc_putu32(resv, rqstp->rq_xid);
1098
1099        vers = svc_getnl(argv);
1100
1101        /* First words of reply: */
1102        svc_putnl(resv, 1);             /* REPLY */
1103
1104        if (vers != 2)          /* RPC version number */
1105                goto err_bad_rpc;
1106
1107        /* Save position in case we later decide to reject: */
1108        reply_statp = resv->iov_base + resv->iov_len;
1109
1110        svc_putnl(resv, 0);             /* ACCEPT */
1111
1112        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1113        rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
1114        rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
1115
1116        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1117                if (prog == progp->pg_prog)
1118                        break;
1119
1120        /*
1121         * Decode auth data, and add verifier to reply buffer.
1122         * We do this before anything else in order to get a decent
1123         * auth verifier.
1124         */
1125        auth_res = svc_authenticate(rqstp, &auth_stat);
1126        /* Also give the program a chance to reject this call: */
1127        if (auth_res == SVC_OK && progp) {
1128                auth_stat = rpc_autherr_badcred;
1129                auth_res = progp->pg_authenticate(rqstp);
1130        }
1131        switch (auth_res) {
1132        case SVC_OK:
1133                break;
1134        case SVC_GARBAGE:
1135                goto err_garbage;
1136        case SVC_SYSERR:
1137                rpc_stat = rpc_system_err;
1138                goto err_bad;
1139        case SVC_DENIED:
1140                goto err_bad_auth;
1141        case SVC_CLOSE:
1142                if (test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1143                        svc_close_xprt(rqstp->rq_xprt);
1144        case SVC_DROP:
1145                goto dropit;
1146        case SVC_COMPLETE:
1147                goto sendit;
1148        }
1149
1150        if (progp == NULL)
1151                goto err_bad_prog;
1152
1153        if (vers >= progp->pg_nvers ||
1154          !(versp = progp->pg_vers[vers]))
1155                goto err_bad_vers;
1156
1157        procp = versp->vs_proc + proc;
1158        if (proc >= versp->vs_nproc || !procp->pc_func)
1159                goto err_bad_proc;
1160        rqstp->rq_procinfo = procp;
1161
1162        /* Syntactic check complete */
1163        serv->sv_stats->rpccnt++;
1164
1165        /* Build the reply header. */
1166        statp = resv->iov_base +resv->iov_len;
1167        svc_putnl(resv, RPC_SUCCESS);
1168
1169        /* Bump per-procedure stats counter */
1170        procp->pc_count++;
1171
1172        /* Initialize storage for argp and resp */
1173        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1174        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1175
1176        /* un-reserve some of the out-queue now that we have a
1177         * better idea of reply size
1178         */
1179        if (procp->pc_xdrressize)
1180                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1181
1182        /* Call the function that processes the request. */
1183        if (!versp->vs_dispatch) {
1184                /* Decode arguments */
1185                xdr = procp->pc_decode;
1186                if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
1187                        goto err_garbage;
1188
1189                *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
1190
1191                /* Encode reply */
1192                if (rqstp->rq_dropme) {
1193                        if (procp->pc_release)
1194                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1195                        goto dropit;
1196                }
1197                if (*statp == rpc_success &&
1198                    (xdr = procp->pc_encode) &&
1199                    !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1200                        dprintk("svc: failed to encode reply\n");
1201                        /* serv->sv_stats->rpcsystemerr++; */
1202                        *statp = rpc_system_err;
1203                }
1204        } else {
1205                dprintk("svc: calling dispatcher\n");
1206                if (!versp->vs_dispatch(rqstp, statp)) {
1207                        /* Release reply info */
1208                        if (procp->pc_release)
1209                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1210                        goto dropit;
1211                }
1212        }
1213
1214        /* Check RPC status result */
1215        if (*statp != rpc_success)
1216                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1217
1218        /* Release reply info */
1219        if (procp->pc_release)
1220                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1221
1222        if (procp->pc_encode == NULL)
1223                goto dropit;
1224
1225 sendit:
1226        if (svc_authorise(rqstp))
1227                goto dropit;
1228        return 1;               /* Caller can now send it */
1229
1230 dropit:
1231        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1232        dprintk("svc: svc_process dropit\n");
1233        return 0;
1234
1235err_short_len:
1236        svc_printk(rqstp, "short len %Zd, dropping request\n",
1237                        argv->iov_len);
1238
1239        goto dropit;                    /* drop request */
1240
1241err_bad_rpc:
1242        serv->sv_stats->rpcbadfmt++;
1243        svc_putnl(resv, 1);     /* REJECT */
1244        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1245        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1246        svc_putnl(resv, 2);
1247        goto sendit;
1248
1249err_bad_auth:
1250        dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1251        serv->sv_stats->rpcbadauth++;
1252        /* Restore write pointer to location of accept status: */
1253        xdr_ressize_check(rqstp, reply_statp);
1254        svc_putnl(resv, 1);     /* REJECT */
1255        svc_putnl(resv, 1);     /* AUTH_ERROR */
1256        svc_putnl(resv, ntohl(auth_stat));      /* status */
1257        goto sendit;
1258
1259err_bad_prog:
1260        dprintk("svc: unknown program %d\n", prog);
1261        serv->sv_stats->rpcbadfmt++;
1262        svc_putnl(resv, RPC_PROG_UNAVAIL);
1263        goto sendit;
1264
1265err_bad_vers:
1266        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1267                       vers, prog, progp->pg_name);
1268
1269        serv->sv_stats->rpcbadfmt++;
1270        svc_putnl(resv, RPC_PROG_MISMATCH);
1271        svc_putnl(resv, progp->pg_lovers);
1272        svc_putnl(resv, progp->pg_hivers);
1273        goto sendit;
1274
1275err_bad_proc:
1276        svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1277
1278        serv->sv_stats->rpcbadfmt++;
1279        svc_putnl(resv, RPC_PROC_UNAVAIL);
1280        goto sendit;
1281
1282err_garbage:
1283        svc_printk(rqstp, "failed to decode args\n");
1284
1285        rpc_stat = rpc_garbage_args;
1286err_bad:
1287        serv->sv_stats->rpcbadfmt++;
1288        svc_putnl(resv, ntohl(rpc_stat));
1289        goto sendit;
1290}
1291EXPORT_SYMBOL_GPL(svc_process);
1292
1293/*
1294 * Process the RPC request.
1295 */
1296int
1297svc_process(struct svc_rqst *rqstp)
1298{
1299        struct kvec             *argv = &rqstp->rq_arg.head[0];
1300        struct kvec             *resv = &rqstp->rq_res.head[0];
1301        struct svc_serv         *serv = rqstp->rq_server;
1302        u32                     dir;
1303
1304        /*
1305         * Setup response xdr_buf.
1306         * Initially it has just one page
1307         */
1308        rqstp->rq_next_page = &rqstp->rq_respages[1];
1309        resv->iov_base = page_address(rqstp->rq_respages[0]);
1310        resv->iov_len = 0;
1311        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1312        rqstp->rq_res.len = 0;
1313        rqstp->rq_res.page_base = 0;
1314        rqstp->rq_res.page_len = 0;
1315        rqstp->rq_res.buflen = PAGE_SIZE;
1316        rqstp->rq_res.tail[0].iov_base = NULL;
1317        rqstp->rq_res.tail[0].iov_len = 0;
1318
1319        rqstp->rq_xid = svc_getu32(argv);
1320
1321        dir  = svc_getnl(argv);
1322        if (dir != 0) {
1323                /* direction != CALL */
1324                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1325                serv->sv_stats->rpcbadfmt++;
1326                svc_drop(rqstp);
1327                return 0;
1328        }
1329
1330        /* Returns 1 for send, 0 for drop */
1331        if (svc_process_common(rqstp, argv, resv))
1332                return svc_send(rqstp);
1333        else {
1334                svc_drop(rqstp);
1335                return 0;
1336        }
1337}
1338
1339#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1340/*
1341 * Process a backchannel RPC request that arrived over an existing
1342 * outbound connection
1343 */
1344int
1345bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1346               struct svc_rqst *rqstp)
1347{
1348        struct kvec     *argv = &rqstp->rq_arg.head[0];
1349        struct kvec     *resv = &rqstp->rq_res.head[0];
1350
1351        /* Build the svc_rqst used by the common processing routine */
1352        rqstp->rq_xprt = serv->sv_bc_xprt;
1353        rqstp->rq_xid = req->rq_xid;
1354        rqstp->rq_prot = req->rq_xprt->prot;
1355        rqstp->rq_server = serv;
1356
1357        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1358        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1359        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1360        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1361
1362        /* reset result send buffer "put" position */
1363        resv->iov_len = 0;
1364
1365        if (rqstp->rq_prot != IPPROTO_TCP) {
1366                printk(KERN_ERR "No support for Non-TCP transports!\n");
1367                BUG();
1368        }
1369
1370        /*
1371         * Skip the next two words because they've already been
1372         * processed in the trasport
1373         */
1374        svc_getu32(argv);       /* XID */
1375        svc_getnl(argv);        /* CALLDIR */
1376
1377        /* Returns 1 for send, 0 for drop */
1378        if (svc_process_common(rqstp, argv, resv)) {
1379                memcpy(&req->rq_snd_buf, &rqstp->rq_res,
1380                                                sizeof(req->rq_snd_buf));
1381                return bc_send(req);
1382        } else {
1383                /* drop request */
1384                xprt_free_bc_request(req);
1385                return 0;
1386        }
1387}
1388EXPORT_SYMBOL_GPL(bc_svc_process);
1389#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1390
1391/*
1392 * Return (transport-specific) limit on the rpc payload.
1393 */
1394u32 svc_max_payload(const struct svc_rqst *rqstp)
1395{
1396        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1397
1398        if (rqstp->rq_server->sv_max_payload < max)
1399                max = rqstp->rq_server->sv_max_payload;
1400        return max;
1401}
1402EXPORT_SYMBOL_GPL(svc_max_payload);
1403