linux/net/sunrpc/svc.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/svc.c
   3 *
   4 * High-level RPC service routines
   5 *
   6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   7 *
   8 * Multiple threads pools and NUMAisation
   9 * Copyright (c) 2006 Silicon Graphics, Inc.
  10 * by Greg Banks <gnb@melbourne.sgi.com>
  11 */
  12
  13#include <linux/linkage.h>
  14#include <linux/sched.h>
  15#include <linux/errno.h>
  16#include <linux/net.h>
  17#include <linux/in.h>
  18#include <linux/mm.h>
  19#include <linux/interrupt.h>
  20#include <linux/module.h>
  21#include <linux/kthread.h>
  22#include <linux/slab.h>
  23
  24#include <linux/sunrpc/types.h>
  25#include <linux/sunrpc/xdr.h>
  26#include <linux/sunrpc/stats.h>
  27#include <linux/sunrpc/svcsock.h>
  28#include <linux/sunrpc/clnt.h>
  29#include <linux/sunrpc/bc_xprt.h>
  30
  31#include <trace/events/sunrpc.h>
  32
  33#define RPCDBG_FACILITY RPCDBG_SVCDSP
  34
  35static void svc_unregister(const struct svc_serv *serv, struct net *net);
  36
  37#define svc_serv_is_pooled(serv)    ((serv)->sv_function)
  38
  39/*
  40 * Mode for mapping cpus to pools.
  41 */
  42enum {
  43        SVC_POOL_AUTO = -1,     /* choose one of the others */
  44        SVC_POOL_GLOBAL,        /* no mapping, just a single global pool
  45                                 * (legacy & UP mode) */
  46        SVC_POOL_PERCPU,        /* one pool per cpu */
  47        SVC_POOL_PERNODE        /* one pool per numa node */
  48};
  49#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  50
  51/*
  52 * Structure for mapping cpus to pools and vice versa.
  53 * Setup once during sunrpc initialisation.
  54 */
  55static struct svc_pool_map {
  56        int count;                      /* How many svc_servs use us */
  57        int mode;                       /* Note: int not enum to avoid
  58                                         * warnings about "enumeration value
  59                                         * not handled in switch" */
  60        unsigned int npools;
  61        unsigned int *pool_to;          /* maps pool id to cpu or node */
  62        unsigned int *to_pool;          /* maps cpu or node to pool id */
  63} svc_pool_map = {
  64        .count = 0,
  65        .mode = SVC_POOL_DEFAULT
  66};
  67static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  68
  69static int
  70param_set_pool_mode(const char *val, struct kernel_param *kp)
  71{
  72        int *ip = (int *)kp->arg;
  73        struct svc_pool_map *m = &svc_pool_map;
  74        int err;
  75
  76        mutex_lock(&svc_pool_map_mutex);
  77
  78        err = -EBUSY;
  79        if (m->count)
  80                goto out;
  81
  82        err = 0;
  83        if (!strncmp(val, "auto", 4))
  84                *ip = SVC_POOL_AUTO;
  85        else if (!strncmp(val, "global", 6))
  86                *ip = SVC_POOL_GLOBAL;
  87        else if (!strncmp(val, "percpu", 6))
  88                *ip = SVC_POOL_PERCPU;
  89        else if (!strncmp(val, "pernode", 7))
  90                *ip = SVC_POOL_PERNODE;
  91        else
  92                err = -EINVAL;
  93
  94out:
  95        mutex_unlock(&svc_pool_map_mutex);
  96        return err;
  97}
  98
  99static int
 100param_get_pool_mode(char *buf, struct kernel_param *kp)
 101{
 102        int *ip = (int *)kp->arg;
 103
 104        switch (*ip)
 105        {
 106        case SVC_POOL_AUTO:
 107                return strlcpy(buf, "auto", 20);
 108        case SVC_POOL_GLOBAL:
 109                return strlcpy(buf, "global", 20);
 110        case SVC_POOL_PERCPU:
 111                return strlcpy(buf, "percpu", 20);
 112        case SVC_POOL_PERNODE:
 113                return strlcpy(buf, "pernode", 20);
 114        default:
 115                return sprintf(buf, "%d", *ip);
 116        }
 117}
 118
 119module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 120                 &svc_pool_map.mode, 0644);
 121
 122/*
 123 * Detect best pool mapping mode heuristically,
 124 * according to the machine's topology.
 125 */
 126static int
 127svc_pool_map_choose_mode(void)
 128{
 129        unsigned int node;
 130
 131        if (nr_online_nodes > 1) {
 132                /*
 133                 * Actually have multiple NUMA nodes,
 134                 * so split pools on NUMA node boundaries
 135                 */
 136                return SVC_POOL_PERNODE;
 137        }
 138
 139        node = first_online_node;
 140        if (nr_cpus_node(node) > 2) {
 141                /*
 142                 * Non-trivial SMP, or CONFIG_NUMA on
 143                 * non-NUMA hardware, e.g. with a generic
 144                 * x86_64 kernel on Xeons.  In this case we
 145                 * want to divide the pools on cpu boundaries.
 146                 */
 147                return SVC_POOL_PERCPU;
 148        }
 149
 150        /* default: one global pool */
 151        return SVC_POOL_GLOBAL;
 152}
 153
 154/*
 155 * Allocate the to_pool[] and pool_to[] arrays.
 156 * Returns 0 on success or an errno.
 157 */
 158static int
 159svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 160{
 161        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 162        if (!m->to_pool)
 163                goto fail;
 164        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 165        if (!m->pool_to)
 166                goto fail_free;
 167
 168        return 0;
 169
 170fail_free:
 171        kfree(m->to_pool);
 172        m->to_pool = NULL;
 173fail:
 174        return -ENOMEM;
 175}
 176
 177/*
 178 * Initialise the pool map for SVC_POOL_PERCPU mode.
 179 * Returns number of pools or <0 on error.
 180 */
 181static int
 182svc_pool_map_init_percpu(struct svc_pool_map *m)
 183{
 184        unsigned int maxpools = nr_cpu_ids;
 185        unsigned int pidx = 0;
 186        unsigned int cpu;
 187        int err;
 188
 189        err = svc_pool_map_alloc_arrays(m, maxpools);
 190        if (err)
 191                return err;
 192
 193        for_each_online_cpu(cpu) {
 194                BUG_ON(pidx >= maxpools);
 195                m->to_pool[cpu] = pidx;
 196                m->pool_to[pidx] = cpu;
 197                pidx++;
 198        }
 199        /* cpus brought online later all get mapped to pool0, sorry */
 200
 201        return pidx;
 202};
 203
 204
 205/*
 206 * Initialise the pool map for SVC_POOL_PERNODE mode.
 207 * Returns number of pools or <0 on error.
 208 */
 209static int
 210svc_pool_map_init_pernode(struct svc_pool_map *m)
 211{
 212        unsigned int maxpools = nr_node_ids;
 213        unsigned int pidx = 0;
 214        unsigned int node;
 215        int err;
 216
 217        err = svc_pool_map_alloc_arrays(m, maxpools);
 218        if (err)
 219                return err;
 220
 221        for_each_node_with_cpus(node) {
 222                /* some architectures (e.g. SN2) have cpuless nodes */
 223                BUG_ON(pidx > maxpools);
 224                m->to_pool[node] = pidx;
 225                m->pool_to[pidx] = node;
 226                pidx++;
 227        }
 228        /* nodes brought online later all get mapped to pool0, sorry */
 229
 230        return pidx;
 231}
 232
 233
 234/*
 235 * Add a reference to the global map of cpus to pools (and
 236 * vice versa).  Initialise the map if we're the first user.
 237 * Returns the number of pools.
 238 */
 239static unsigned int
 240svc_pool_map_get(void)
 241{
 242        struct svc_pool_map *m = &svc_pool_map;
 243        int npools = -1;
 244
 245        mutex_lock(&svc_pool_map_mutex);
 246
 247        if (m->count++) {
 248                mutex_unlock(&svc_pool_map_mutex);
 249                return m->npools;
 250        }
 251
 252        if (m->mode == SVC_POOL_AUTO)
 253                m->mode = svc_pool_map_choose_mode();
 254
 255        switch (m->mode) {
 256        case SVC_POOL_PERCPU:
 257                npools = svc_pool_map_init_percpu(m);
 258                break;
 259        case SVC_POOL_PERNODE:
 260                npools = svc_pool_map_init_pernode(m);
 261                break;
 262        }
 263
 264        if (npools < 0) {
 265                /* default, or memory allocation failure */
 266                npools = 1;
 267                m->mode = SVC_POOL_GLOBAL;
 268        }
 269        m->npools = npools;
 270
 271        mutex_unlock(&svc_pool_map_mutex);
 272        return m->npools;
 273}
 274
 275
 276/*
 277 * Drop a reference to the global map of cpus to pools.
 278 * When the last reference is dropped, the map data is
 279 * freed; this allows the sysadmin to change the pool
 280 * mode using the pool_mode module option without
 281 * rebooting or re-loading sunrpc.ko.
 282 */
 283static void
 284svc_pool_map_put(void)
 285{
 286        struct svc_pool_map *m = &svc_pool_map;
 287
 288        mutex_lock(&svc_pool_map_mutex);
 289
 290        if (!--m->count) {
 291                kfree(m->to_pool);
 292                m->to_pool = NULL;
 293                kfree(m->pool_to);
 294                m->pool_to = NULL;
 295                m->npools = 0;
 296        }
 297
 298        mutex_unlock(&svc_pool_map_mutex);
 299}
 300
 301
 302static int svc_pool_map_get_node(unsigned int pidx)
 303{
 304        const struct svc_pool_map *m = &svc_pool_map;
 305
 306        if (m->count) {
 307                if (m->mode == SVC_POOL_PERCPU)
 308                        return cpu_to_node(m->pool_to[pidx]);
 309                if (m->mode == SVC_POOL_PERNODE)
 310                        return m->pool_to[pidx];
 311        }
 312        return NUMA_NO_NODE;
 313}
 314/*
 315 * Set the given thread's cpus_allowed mask so that it
 316 * will only run on cpus in the given pool.
 317 */
 318static inline void
 319svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 320{
 321        struct svc_pool_map *m = &svc_pool_map;
 322        unsigned int node = m->pool_to[pidx];
 323
 324        /*
 325         * The caller checks for sv_nrpools > 1, which
 326         * implies that we've been initialized.
 327         */
 328        WARN_ON_ONCE(m->count == 0);
 329        if (m->count == 0)
 330                return;
 331
 332        switch (m->mode) {
 333        case SVC_POOL_PERCPU:
 334        {
 335                set_cpus_allowed_ptr(task, cpumask_of(node));
 336                break;
 337        }
 338        case SVC_POOL_PERNODE:
 339        {
 340                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 341                break;
 342        }
 343        }
 344}
 345
 346/*
 347 * Use the mapping mode to choose a pool for a given CPU.
 348 * Used when enqueueing an incoming RPC.  Always returns
 349 * a non-NULL pool pointer.
 350 */
 351struct svc_pool *
 352svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 353{
 354        struct svc_pool_map *m = &svc_pool_map;
 355        unsigned int pidx = 0;
 356
 357        /*
 358         * An uninitialised map happens in a pure client when
 359         * lockd is brought up, so silently treat it the
 360         * same as SVC_POOL_GLOBAL.
 361         */
 362        if (svc_serv_is_pooled(serv)) {
 363                switch (m->mode) {
 364                case SVC_POOL_PERCPU:
 365                        pidx = m->to_pool[cpu];
 366                        break;
 367                case SVC_POOL_PERNODE:
 368                        pidx = m->to_pool[cpu_to_node(cpu)];
 369                        break;
 370                }
 371        }
 372        return &serv->sv_pools[pidx % serv->sv_nrpools];
 373}
 374
 375int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
 376{
 377        int err;
 378
 379        err = rpcb_create_local(net);
 380        if (err)
 381                return err;
 382
 383        /* Remove any stale portmap registrations */
 384        svc_unregister(serv, net);
 385        return 0;
 386}
 387EXPORT_SYMBOL_GPL(svc_rpcb_setup);
 388
 389void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
 390{
 391        svc_unregister(serv, net);
 392        rpcb_put_local(net);
 393}
 394EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
 395
 396static int svc_uses_rpcbind(struct svc_serv *serv)
 397{
 398        struct svc_program      *progp;
 399        unsigned int            i;
 400
 401        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 402                for (i = 0; i < progp->pg_nvers; i++) {
 403                        if (progp->pg_vers[i] == NULL)
 404                                continue;
 405                        if (progp->pg_vers[i]->vs_hidden == 0)
 406                                return 1;
 407                }
 408        }
 409
 410        return 0;
 411}
 412
 413int svc_bind(struct svc_serv *serv, struct net *net)
 414{
 415        if (!svc_uses_rpcbind(serv))
 416                return 0;
 417        return svc_rpcb_setup(serv, net);
 418}
 419EXPORT_SYMBOL_GPL(svc_bind);
 420
 421/*
 422 * Create an RPC service
 423 */
 424static struct svc_serv *
 425__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 426             void (*shutdown)(struct svc_serv *serv, struct net *net))
 427{
 428        struct svc_serv *serv;
 429        unsigned int vers;
 430        unsigned int xdrsize;
 431        unsigned int i;
 432
 433        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 434                return NULL;
 435        serv->sv_name      = prog->pg_name;
 436        serv->sv_program   = prog;
 437        serv->sv_nrthreads = 1;
 438        serv->sv_stats     = prog->pg_stats;
 439        if (bufsize > RPCSVC_MAXPAYLOAD)
 440                bufsize = RPCSVC_MAXPAYLOAD;
 441        serv->sv_max_payload = bufsize? bufsize : 4096;
 442        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 443        serv->sv_shutdown  = shutdown;
 444        xdrsize = 0;
 445        while (prog) {
 446                prog->pg_lovers = prog->pg_nvers-1;
 447                for (vers=0; vers<prog->pg_nvers ; vers++)
 448                        if (prog->pg_vers[vers]) {
 449                                prog->pg_hivers = vers;
 450                                if (prog->pg_lovers > vers)
 451                                        prog->pg_lovers = vers;
 452                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 453                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 454                        }
 455                prog = prog->pg_next;
 456        }
 457        serv->sv_xdrsize   = xdrsize;
 458        INIT_LIST_HEAD(&serv->sv_tempsocks);
 459        INIT_LIST_HEAD(&serv->sv_permsocks);
 460        init_timer(&serv->sv_temptimer);
 461        spin_lock_init(&serv->sv_lock);
 462
 463        serv->sv_nrpools = npools;
 464        serv->sv_pools =
 465                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 466                        GFP_KERNEL);
 467        if (!serv->sv_pools) {
 468                kfree(serv);
 469                return NULL;
 470        }
 471
 472        for (i = 0; i < serv->sv_nrpools; i++) {
 473                struct svc_pool *pool = &serv->sv_pools[i];
 474
 475                dprintk("svc: initialising pool %u for %s\n",
 476                                i, serv->sv_name);
 477
 478                pool->sp_id = i;
 479                INIT_LIST_HEAD(&pool->sp_sockets);
 480                INIT_LIST_HEAD(&pool->sp_all_threads);
 481                spin_lock_init(&pool->sp_lock);
 482        }
 483
 484        return serv;
 485}
 486
 487struct svc_serv *
 488svc_create(struct svc_program *prog, unsigned int bufsize,
 489           void (*shutdown)(struct svc_serv *serv, struct net *net))
 490{
 491        return __svc_create(prog, bufsize, /*npools*/1, shutdown);
 492}
 493EXPORT_SYMBOL_GPL(svc_create);
 494
 495struct svc_serv *
 496svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 497                  void (*shutdown)(struct svc_serv *serv, struct net *net),
 498                  svc_thread_fn func, struct module *mod)
 499{
 500        struct svc_serv *serv;
 501        unsigned int npools = svc_pool_map_get();
 502
 503        serv = __svc_create(prog, bufsize, npools, shutdown);
 504        if (!serv)
 505                goto out_err;
 506
 507        serv->sv_function = func;
 508        serv->sv_module = mod;
 509        return serv;
 510out_err:
 511        svc_pool_map_put();
 512        return NULL;
 513}
 514EXPORT_SYMBOL_GPL(svc_create_pooled);
 515
 516void svc_shutdown_net(struct svc_serv *serv, struct net *net)
 517{
 518        svc_close_net(serv, net);
 519
 520        if (serv->sv_shutdown)
 521                serv->sv_shutdown(serv, net);
 522}
 523EXPORT_SYMBOL_GPL(svc_shutdown_net);
 524
 525/*
 526 * Destroy an RPC service. Should be called with appropriate locking to
 527 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 528 */
 529void
 530svc_destroy(struct svc_serv *serv)
 531{
 532        dprintk("svc: svc_destroy(%s, %d)\n",
 533                                serv->sv_program->pg_name,
 534                                serv->sv_nrthreads);
 535
 536        if (serv->sv_nrthreads) {
 537                if (--(serv->sv_nrthreads) != 0) {
 538                        svc_sock_update_bufs(serv);
 539                        return;
 540                }
 541        } else
 542                printk("svc_destroy: no threads for serv=%p!\n", serv);
 543
 544        del_timer_sync(&serv->sv_temptimer);
 545
 546        /*
 547         * The last user is gone and thus all sockets have to be destroyed to
 548         * the point. Check this.
 549         */
 550        BUG_ON(!list_empty(&serv->sv_permsocks));
 551        BUG_ON(!list_empty(&serv->sv_tempsocks));
 552
 553        cache_clean_deferred(serv);
 554
 555        if (svc_serv_is_pooled(serv))
 556                svc_pool_map_put();
 557
 558        kfree(serv->sv_pools);
 559        kfree(serv);
 560}
 561EXPORT_SYMBOL_GPL(svc_destroy);
 562
 563/*
 564 * Allocate an RPC server's buffer space.
 565 * We allocate pages and place them in rq_argpages.
 566 */
 567static int
 568svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
 569{
 570        unsigned int pages, arghi;
 571
 572        /* bc_xprt uses fore channel allocated buffers */
 573        if (svc_is_backchannel(rqstp))
 574                return 1;
 575
 576        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 577                                       * We assume one is at most one page
 578                                       */
 579        arghi = 0;
 580        WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
 581        if (pages > RPCSVC_MAXPAGES)
 582                pages = RPCSVC_MAXPAGES;
 583        while (pages) {
 584                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
 585                if (!p)
 586                        break;
 587                rqstp->rq_pages[arghi++] = p;
 588                pages--;
 589        }
 590        return pages == 0;
 591}
 592
 593/*
 594 * Release an RPC server buffer
 595 */
 596static void
 597svc_release_buffer(struct svc_rqst *rqstp)
 598{
 599        unsigned int i;
 600
 601        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 602                if (rqstp->rq_pages[i])
 603                        put_page(rqstp->rq_pages[i]);
 604}
 605
 606struct svc_rqst *
 607svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 608{
 609        struct svc_rqst *rqstp;
 610
 611        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
 612        if (!rqstp)
 613                goto out_enomem;
 614
 615        serv->sv_nrthreads++;
 616        __set_bit(RQ_BUSY, &rqstp->rq_flags);
 617        spin_lock_init(&rqstp->rq_lock);
 618        rqstp->rq_server = serv;
 619        rqstp->rq_pool = pool;
 620        spin_lock_bh(&pool->sp_lock);
 621        pool->sp_nrthreads++;
 622        list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
 623        spin_unlock_bh(&pool->sp_lock);
 624
 625        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 626        if (!rqstp->rq_argp)
 627                goto out_thread;
 628
 629        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 630        if (!rqstp->rq_resp)
 631                goto out_thread;
 632
 633        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
 634                goto out_thread;
 635
 636        return rqstp;
 637out_thread:
 638        svc_exit_thread(rqstp);
 639out_enomem:
 640        return ERR_PTR(-ENOMEM);
 641}
 642EXPORT_SYMBOL_GPL(svc_prepare_thread);
 643
 644/*
 645 * Choose a pool in which to create a new thread, for svc_set_num_threads
 646 */
 647static inline struct svc_pool *
 648choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 649{
 650        if (pool != NULL)
 651                return pool;
 652
 653        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 654}
 655
 656/*
 657 * Choose a thread to kill, for svc_set_num_threads
 658 */
 659static inline struct task_struct *
 660choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 661{
 662        unsigned int i;
 663        struct task_struct *task = NULL;
 664
 665        if (pool != NULL) {
 666                spin_lock_bh(&pool->sp_lock);
 667        } else {
 668                /* choose a pool in round-robin fashion */
 669                for (i = 0; i < serv->sv_nrpools; i++) {
 670                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 671                        spin_lock_bh(&pool->sp_lock);
 672                        if (!list_empty(&pool->sp_all_threads))
 673                                goto found_pool;
 674                        spin_unlock_bh(&pool->sp_lock);
 675                }
 676                return NULL;
 677        }
 678
 679found_pool:
 680        if (!list_empty(&pool->sp_all_threads)) {
 681                struct svc_rqst *rqstp;
 682
 683                /*
 684                 * Remove from the pool->sp_all_threads list
 685                 * so we don't try to kill it again.
 686                 */
 687                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 688                set_bit(RQ_VICTIM, &rqstp->rq_flags);
 689                list_del_rcu(&rqstp->rq_all);
 690                task = rqstp->rq_task;
 691        }
 692        spin_unlock_bh(&pool->sp_lock);
 693
 694        return task;
 695}
 696
 697/*
 698 * Create or destroy enough new threads to make the number
 699 * of threads the given number.  If `pool' is non-NULL, applies
 700 * only to threads in that pool, otherwise round-robins between
 701 * all pools.  Caller must ensure that mutual exclusion between this and
 702 * server startup or shutdown.
 703 *
 704 * Destroying threads relies on the service threads filling in
 705 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 706 * has been created using svc_create_pooled().
 707 *
 708 * Based on code that used to be in nfsd_svc() but tweaked
 709 * to be pool-aware.
 710 */
 711int
 712svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 713{
 714        struct svc_rqst *rqstp;
 715        struct task_struct *task;
 716        struct svc_pool *chosen_pool;
 717        int error = 0;
 718        unsigned int state = serv->sv_nrthreads-1;
 719        int node;
 720
 721        if (pool == NULL) {
 722                /* The -1 assumes caller has done a svc_get() */
 723                nrservs -= (serv->sv_nrthreads-1);
 724        } else {
 725                spin_lock_bh(&pool->sp_lock);
 726                nrservs -= pool->sp_nrthreads;
 727                spin_unlock_bh(&pool->sp_lock);
 728        }
 729
 730        /* create new threads */
 731        while (nrservs > 0) {
 732                nrservs--;
 733                chosen_pool = choose_pool(serv, pool, &state);
 734
 735                node = svc_pool_map_get_node(chosen_pool->sp_id);
 736                rqstp = svc_prepare_thread(serv, chosen_pool, node);
 737                if (IS_ERR(rqstp)) {
 738                        error = PTR_ERR(rqstp);
 739                        break;
 740                }
 741
 742                __module_get(serv->sv_module);
 743                task = kthread_create_on_node(serv->sv_function, rqstp,
 744                                              node, "%s", serv->sv_name);
 745                if (IS_ERR(task)) {
 746                        error = PTR_ERR(task);
 747                        module_put(serv->sv_module);
 748                        svc_exit_thread(rqstp);
 749                        break;
 750                }
 751
 752                rqstp->rq_task = task;
 753                if (serv->sv_nrpools > 1)
 754                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 755
 756                svc_sock_update_bufs(serv);
 757                wake_up_process(task);
 758        }
 759        /* destroy old threads */
 760        while (nrservs < 0 &&
 761               (task = choose_victim(serv, pool, &state)) != NULL) {
 762                send_sig(SIGINT, task, 1);
 763                nrservs++;
 764        }
 765
 766        return error;
 767}
 768EXPORT_SYMBOL_GPL(svc_set_num_threads);
 769
 770/*
 771 * Called from a server thread as it's exiting. Caller must hold the BKL or
 772 * the "service mutex", whichever is appropriate for the service.
 773 */
 774void
 775svc_exit_thread(struct svc_rqst *rqstp)
 776{
 777        struct svc_serv *serv = rqstp->rq_server;
 778        struct svc_pool *pool = rqstp->rq_pool;
 779
 780        svc_release_buffer(rqstp);
 781        kfree(rqstp->rq_resp);
 782        kfree(rqstp->rq_argp);
 783        kfree(rqstp->rq_auth_data);
 784
 785        spin_lock_bh(&pool->sp_lock);
 786        pool->sp_nrthreads--;
 787        if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
 788                list_del_rcu(&rqstp->rq_all);
 789        spin_unlock_bh(&pool->sp_lock);
 790
 791        kfree_rcu(rqstp, rq_rcu_head);
 792
 793        /* Release the server */
 794        if (serv)
 795                svc_destroy(serv);
 796}
 797EXPORT_SYMBOL_GPL(svc_exit_thread);
 798
 799/*
 800 * Register an "inet" protocol family netid with the local
 801 * rpcbind daemon via an rpcbind v4 SET request.
 802 *
 803 * No netconfig infrastructure is available in the kernel, so
 804 * we map IP_ protocol numbers to netids by hand.
 805 *
 806 * Returns zero on success; a negative errno value is returned
 807 * if any error occurs.
 808 */
 809static int __svc_rpcb_register4(struct net *net, const u32 program,
 810                                const u32 version,
 811                                const unsigned short protocol,
 812                                const unsigned short port)
 813{
 814        const struct sockaddr_in sin = {
 815                .sin_family             = AF_INET,
 816                .sin_addr.s_addr        = htonl(INADDR_ANY),
 817                .sin_port               = htons(port),
 818        };
 819        const char *netid;
 820        int error;
 821
 822        switch (protocol) {
 823        case IPPROTO_UDP:
 824                netid = RPCBIND_NETID_UDP;
 825                break;
 826        case IPPROTO_TCP:
 827                netid = RPCBIND_NETID_TCP;
 828                break;
 829        default:
 830                return -ENOPROTOOPT;
 831        }
 832
 833        error = rpcb_v4_register(net, program, version,
 834                                        (const struct sockaddr *)&sin, netid);
 835
 836        /*
 837         * User space didn't support rpcbind v4, so retry this
 838         * registration request with the legacy rpcbind v2 protocol.
 839         */
 840        if (error == -EPROTONOSUPPORT)
 841                error = rpcb_register(net, program, version, protocol, port);
 842
 843        return error;
 844}
 845
 846#if IS_ENABLED(CONFIG_IPV6)
 847/*
 848 * Register an "inet6" protocol family netid with the local
 849 * rpcbind daemon via an rpcbind v4 SET request.
 850 *
 851 * No netconfig infrastructure is available in the kernel, so
 852 * we map IP_ protocol numbers to netids by hand.
 853 *
 854 * Returns zero on success; a negative errno value is returned
 855 * if any error occurs.
 856 */
 857static int __svc_rpcb_register6(struct net *net, const u32 program,
 858                                const u32 version,
 859                                const unsigned short protocol,
 860                                const unsigned short port)
 861{
 862        const struct sockaddr_in6 sin6 = {
 863                .sin6_family            = AF_INET6,
 864                .sin6_addr              = IN6ADDR_ANY_INIT,
 865                .sin6_port              = htons(port),
 866        };
 867        const char *netid;
 868        int error;
 869
 870        switch (protocol) {
 871        case IPPROTO_UDP:
 872                netid = RPCBIND_NETID_UDP6;
 873                break;
 874        case IPPROTO_TCP:
 875                netid = RPCBIND_NETID_TCP6;
 876                break;
 877        default:
 878                return -ENOPROTOOPT;
 879        }
 880
 881        error = rpcb_v4_register(net, program, version,
 882                                        (const struct sockaddr *)&sin6, netid);
 883
 884        /*
 885         * User space didn't support rpcbind version 4, so we won't
 886         * use a PF_INET6 listener.
 887         */
 888        if (error == -EPROTONOSUPPORT)
 889                error = -EAFNOSUPPORT;
 890
 891        return error;
 892}
 893#endif  /* IS_ENABLED(CONFIG_IPV6) */
 894
 895/*
 896 * Register a kernel RPC service via rpcbind version 4.
 897 *
 898 * Returns zero on success; a negative errno value is returned
 899 * if any error occurs.
 900 */
 901static int __svc_register(struct net *net, const char *progname,
 902                          const u32 program, const u32 version,
 903                          const int family,
 904                          const unsigned short protocol,
 905                          const unsigned short port)
 906{
 907        int error = -EAFNOSUPPORT;
 908
 909        switch (family) {
 910        case PF_INET:
 911                error = __svc_rpcb_register4(net, program, version,
 912                                                protocol, port);
 913                break;
 914#if IS_ENABLED(CONFIG_IPV6)
 915        case PF_INET6:
 916                error = __svc_rpcb_register6(net, program, version,
 917                                                protocol, port);
 918#endif
 919        }
 920
 921        return error;
 922}
 923
 924/**
 925 * svc_register - register an RPC service with the local portmapper
 926 * @serv: svc_serv struct for the service to register
 927 * @net: net namespace for the service to register
 928 * @family: protocol family of service's listener socket
 929 * @proto: transport protocol number to advertise
 930 * @port: port to advertise
 931 *
 932 * Service is registered for any address in the passed-in protocol family
 933 */
 934int svc_register(const struct svc_serv *serv, struct net *net,
 935                 const int family, const unsigned short proto,
 936                 const unsigned short port)
 937{
 938        struct svc_program      *progp;
 939        struct svc_version      *vers;
 940        unsigned int            i;
 941        int                     error = 0;
 942
 943        WARN_ON_ONCE(proto == 0 && port == 0);
 944        if (proto == 0 && port == 0)
 945                return -EINVAL;
 946
 947        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 948                for (i = 0; i < progp->pg_nvers; i++) {
 949                        vers = progp->pg_vers[i];
 950                        if (vers == NULL)
 951                                continue;
 952
 953                        dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
 954                                        progp->pg_name,
 955                                        i,
 956                                        proto == IPPROTO_UDP?  "udp" : "tcp",
 957                                        port,
 958                                        family,
 959                                        vers->vs_hidden ?
 960                                        " (but not telling portmap)" : "");
 961
 962                        if (vers->vs_hidden)
 963                                continue;
 964
 965                        error = __svc_register(net, progp->pg_name, progp->pg_prog,
 966                                                i, family, proto, port);
 967
 968                        if (vers->vs_rpcb_optnl) {
 969                                error = 0;
 970                                continue;
 971                        }
 972
 973                        if (error < 0) {
 974                                printk(KERN_WARNING "svc: failed to register "
 975                                        "%sv%u RPC service (errno %d).\n",
 976                                        progp->pg_name, i, -error);
 977                                break;
 978                        }
 979                }
 980        }
 981
 982        return error;
 983}
 984
 985/*
 986 * If user space is running rpcbind, it should take the v4 UNSET
 987 * and clear everything for this [program, version].  If user space
 988 * is running portmap, it will reject the v4 UNSET, but won't have
 989 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
 990 * in this case to clear all existing entries for [program, version].
 991 */
 992static void __svc_unregister(struct net *net, const u32 program, const u32 version,
 993                             const char *progname)
 994{
 995        int error;
 996
 997        error = rpcb_v4_register(net, program, version, NULL, "");
 998
 999        /*
1000         * User space didn't support rpcbind v4, so retry this
1001         * request with the legacy rpcbind v2 protocol.
1002         */
1003        if (error == -EPROTONOSUPPORT)
1004                error = rpcb_register(net, program, version, 0, 0);
1005
1006        dprintk("svc: %s(%sv%u), error %d\n",
1007                        __func__, progname, version, error);
1008}
1009
1010/*
1011 * All netids, bind addresses and ports registered for [program, version]
1012 * are removed from the local rpcbind database (if the service is not
1013 * hidden) to make way for a new instance of the service.
1014 *
1015 * The result of unregistration is reported via dprintk for those who want
1016 * verification of the result, but is otherwise not important.
1017 */
1018static void svc_unregister(const struct svc_serv *serv, struct net *net)
1019{
1020        struct svc_program *progp;
1021        unsigned long flags;
1022        unsigned int i;
1023
1024        clear_thread_flag(TIF_SIGPENDING);
1025
1026        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1027                for (i = 0; i < progp->pg_nvers; i++) {
1028                        if (progp->pg_vers[i] == NULL)
1029                                continue;
1030                        if (progp->pg_vers[i]->vs_hidden)
1031                                continue;
1032
1033                        dprintk("svc: attempting to unregister %sv%u\n",
1034                                progp->pg_name, i);
1035                        __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1036                }
1037        }
1038
1039        spin_lock_irqsave(&current->sighand->siglock, flags);
1040        recalc_sigpending();
1041        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1042}
1043
1044/*
1045 * dprintk the given error with the address of the client that caused it.
1046 */
1047#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1048static __printf(2, 3)
1049void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1050{
1051        struct va_format vaf;
1052        va_list args;
1053        char    buf[RPC_MAX_ADDRBUFLEN];
1054
1055        va_start(args, fmt);
1056
1057        vaf.fmt = fmt;
1058        vaf.va = &args;
1059
1060        dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1061
1062        va_end(args);
1063}
1064#else
1065static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1066#endif
1067
1068/*
1069 * Common routine for processing the RPC request.
1070 */
1071static int
1072svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1073{
1074        struct svc_program      *progp;
1075        struct svc_version      *versp = NULL;  /* compiler food */
1076        struct svc_procedure    *procp = NULL;
1077        struct svc_serv         *serv = rqstp->rq_server;
1078        kxdrproc_t              xdr;
1079        __be32                  *statp;
1080        u32                     prog, vers, proc;
1081        __be32                  auth_stat, rpc_stat;
1082        int                     auth_res;
1083        __be32                  *reply_statp;
1084
1085        rpc_stat = rpc_success;
1086
1087        if (argv->iov_len < 6*4)
1088                goto err_short_len;
1089
1090        /* Will be turned off only in gss privacy case: */
1091        set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1092        /* Will be turned off only when NFSv4 Sessions are used */
1093        set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1094        clear_bit(RQ_DROPME, &rqstp->rq_flags);
1095
1096        /* Setup reply header */
1097        rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1098
1099        svc_putu32(resv, rqstp->rq_xid);
1100
1101        vers = svc_getnl(argv);
1102
1103        /* First words of reply: */
1104        svc_putnl(resv, 1);             /* REPLY */
1105
1106        if (vers != 2)          /* RPC version number */
1107                goto err_bad_rpc;
1108
1109        /* Save position in case we later decide to reject: */
1110        reply_statp = resv->iov_base + resv->iov_len;
1111
1112        svc_putnl(resv, 0);             /* ACCEPT */
1113
1114        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1115        rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
1116        rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
1117
1118        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1119                if (prog == progp->pg_prog)
1120                        break;
1121
1122        /*
1123         * Decode auth data, and add verifier to reply buffer.
1124         * We do this before anything else in order to get a decent
1125         * auth verifier.
1126         */
1127        auth_res = svc_authenticate(rqstp, &auth_stat);
1128        /* Also give the program a chance to reject this call: */
1129        if (auth_res == SVC_OK && progp) {
1130                auth_stat = rpc_autherr_badcred;
1131                auth_res = progp->pg_authenticate(rqstp);
1132        }
1133        switch (auth_res) {
1134        case SVC_OK:
1135                break;
1136        case SVC_GARBAGE:
1137                goto err_garbage;
1138        case SVC_SYSERR:
1139                rpc_stat = rpc_system_err;
1140                goto err_bad;
1141        case SVC_DENIED:
1142                goto err_bad_auth;
1143        case SVC_CLOSE:
1144                if (test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1145                        svc_close_xprt(rqstp->rq_xprt);
1146        case SVC_DROP:
1147                goto dropit;
1148        case SVC_COMPLETE:
1149                goto sendit;
1150        }
1151
1152        if (progp == NULL)
1153                goto err_bad_prog;
1154
1155        if (vers >= progp->pg_nvers ||
1156          !(versp = progp->pg_vers[vers]))
1157                goto err_bad_vers;
1158
1159        procp = versp->vs_proc + proc;
1160        if (proc >= versp->vs_nproc || !procp->pc_func)
1161                goto err_bad_proc;
1162        rqstp->rq_procinfo = procp;
1163
1164        /* Syntactic check complete */
1165        serv->sv_stats->rpccnt++;
1166
1167        /* Build the reply header. */
1168        statp = resv->iov_base +resv->iov_len;
1169        svc_putnl(resv, RPC_SUCCESS);
1170
1171        /* Bump per-procedure stats counter */
1172        procp->pc_count++;
1173
1174        /* Initialize storage for argp and resp */
1175        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1176        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1177
1178        /* un-reserve some of the out-queue now that we have a
1179         * better idea of reply size
1180         */
1181        if (procp->pc_xdrressize)
1182                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1183
1184        /* Call the function that processes the request. */
1185        if (!versp->vs_dispatch) {
1186                /* Decode arguments */
1187                xdr = procp->pc_decode;
1188                if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
1189                        goto err_garbage;
1190
1191                *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
1192
1193                /* Encode reply */
1194                if (test_bit(RQ_DROPME, &rqstp->rq_flags)) {
1195                        if (procp->pc_release)
1196                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1197                        goto dropit;
1198                }
1199                if (*statp == rpc_success &&
1200                    (xdr = procp->pc_encode) &&
1201                    !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1202                        dprintk("svc: failed to encode reply\n");
1203                        /* serv->sv_stats->rpcsystemerr++; */
1204                        *statp = rpc_system_err;
1205                }
1206        } else {
1207                dprintk("svc: calling dispatcher\n");
1208                if (!versp->vs_dispatch(rqstp, statp)) {
1209                        /* Release reply info */
1210                        if (procp->pc_release)
1211                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1212                        goto dropit;
1213                }
1214        }
1215
1216        /* Check RPC status result */
1217        if (*statp != rpc_success)
1218                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1219
1220        /* Release reply info */
1221        if (procp->pc_release)
1222                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1223
1224        if (procp->pc_encode == NULL)
1225                goto dropit;
1226
1227 sendit:
1228        if (svc_authorise(rqstp))
1229                goto dropit;
1230        return 1;               /* Caller can now send it */
1231
1232 dropit:
1233        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1234        dprintk("svc: svc_process dropit\n");
1235        return 0;
1236
1237err_short_len:
1238        svc_printk(rqstp, "short len %Zd, dropping request\n",
1239                        argv->iov_len);
1240
1241        goto dropit;                    /* drop request */
1242
1243err_bad_rpc:
1244        serv->sv_stats->rpcbadfmt++;
1245        svc_putnl(resv, 1);     /* REJECT */
1246        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1247        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1248        svc_putnl(resv, 2);
1249        goto sendit;
1250
1251err_bad_auth:
1252        dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1253        serv->sv_stats->rpcbadauth++;
1254        /* Restore write pointer to location of accept status: */
1255        xdr_ressize_check(rqstp, reply_statp);
1256        svc_putnl(resv, 1);     /* REJECT */
1257        svc_putnl(resv, 1);     /* AUTH_ERROR */
1258        svc_putnl(resv, ntohl(auth_stat));      /* status */
1259        goto sendit;
1260
1261err_bad_prog:
1262        dprintk("svc: unknown program %d\n", prog);
1263        serv->sv_stats->rpcbadfmt++;
1264        svc_putnl(resv, RPC_PROG_UNAVAIL);
1265        goto sendit;
1266
1267err_bad_vers:
1268        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1269                       vers, prog, progp->pg_name);
1270
1271        serv->sv_stats->rpcbadfmt++;
1272        svc_putnl(resv, RPC_PROG_MISMATCH);
1273        svc_putnl(resv, progp->pg_lovers);
1274        svc_putnl(resv, progp->pg_hivers);
1275        goto sendit;
1276
1277err_bad_proc:
1278        svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1279
1280        serv->sv_stats->rpcbadfmt++;
1281        svc_putnl(resv, RPC_PROC_UNAVAIL);
1282        goto sendit;
1283
1284err_garbage:
1285        svc_printk(rqstp, "failed to decode args\n");
1286
1287        rpc_stat = rpc_garbage_args;
1288err_bad:
1289        serv->sv_stats->rpcbadfmt++;
1290        svc_putnl(resv, ntohl(rpc_stat));
1291        goto sendit;
1292}
1293EXPORT_SYMBOL_GPL(svc_process);
1294
1295/*
1296 * Process the RPC request.
1297 */
1298int
1299svc_process(struct svc_rqst *rqstp)
1300{
1301        struct kvec             *argv = &rqstp->rq_arg.head[0];
1302        struct kvec             *resv = &rqstp->rq_res.head[0];
1303        struct svc_serv         *serv = rqstp->rq_server;
1304        u32                     dir;
1305
1306        /*
1307         * Setup response xdr_buf.
1308         * Initially it has just one page
1309         */
1310        rqstp->rq_next_page = &rqstp->rq_respages[1];
1311        resv->iov_base = page_address(rqstp->rq_respages[0]);
1312        resv->iov_len = 0;
1313        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1314        rqstp->rq_res.len = 0;
1315        rqstp->rq_res.page_base = 0;
1316        rqstp->rq_res.page_len = 0;
1317        rqstp->rq_res.buflen = PAGE_SIZE;
1318        rqstp->rq_res.tail[0].iov_base = NULL;
1319        rqstp->rq_res.tail[0].iov_len = 0;
1320
1321        dir  = svc_getnl(argv);
1322        if (dir != 0) {
1323                /* direction != CALL */
1324                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1325                serv->sv_stats->rpcbadfmt++;
1326                goto out_drop;
1327        }
1328
1329        /* Returns 1 for send, 0 for drop */
1330        if (likely(svc_process_common(rqstp, argv, resv))) {
1331                int ret = svc_send(rqstp);
1332
1333                trace_svc_process(rqstp, ret);
1334                return ret;
1335        }
1336out_drop:
1337        trace_svc_process(rqstp, 0);
1338        svc_drop(rqstp);
1339        return 0;
1340}
1341
1342#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1343/*
1344 * Process a backchannel RPC request that arrived over an existing
1345 * outbound connection
1346 */
1347int
1348bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1349               struct svc_rqst *rqstp)
1350{
1351        struct kvec     *argv = &rqstp->rq_arg.head[0];
1352        struct kvec     *resv = &rqstp->rq_res.head[0];
1353
1354        /* Build the svc_rqst used by the common processing routine */
1355        rqstp->rq_xprt = serv->sv_bc_xprt;
1356        rqstp->rq_xid = req->rq_xid;
1357        rqstp->rq_prot = req->rq_xprt->prot;
1358        rqstp->rq_server = serv;
1359
1360        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1361        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1362        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1363        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1364
1365        /* reset result send buffer "put" position */
1366        resv->iov_len = 0;
1367
1368        if (rqstp->rq_prot != IPPROTO_TCP) {
1369                printk(KERN_ERR "No support for Non-TCP transports!\n");
1370                BUG();
1371        }
1372
1373        /*
1374         * Skip the next two words because they've already been
1375         * processed in the trasport
1376         */
1377        svc_getu32(argv);       /* XID */
1378        svc_getnl(argv);        /* CALLDIR */
1379
1380        /* Returns 1 for send, 0 for drop */
1381        if (svc_process_common(rqstp, argv, resv)) {
1382                memcpy(&req->rq_snd_buf, &rqstp->rq_res,
1383                                                sizeof(req->rq_snd_buf));
1384                return bc_send(req);
1385        } else {
1386                /* drop request */
1387                xprt_free_bc_request(req);
1388                return 0;
1389        }
1390}
1391EXPORT_SYMBOL_GPL(bc_svc_process);
1392#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1393
1394/*
1395 * Return (transport-specific) limit on the rpc payload.
1396 */
1397u32 svc_max_payload(const struct svc_rqst *rqstp)
1398{
1399        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1400
1401        if (rqstp->rq_server->sv_max_payload < max)
1402                max = rqstp->rq_server->sv_max_payload;
1403        return max;
1404}
1405EXPORT_SYMBOL_GPL(svc_max_payload);
1406