linux/net/sunrpc/svc.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/svc.c
   3 *
   4 * High-level RPC service routines
   5 *
   6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   7 *
   8 * Multiple threads pools and NUMAisation
   9 * Copyright (c) 2006 Silicon Graphics, Inc.
  10 * by Greg Banks <gnb@melbourne.sgi.com>
  11 */
  12
  13#include <linux/linkage.h>
  14#include <linux/sched/signal.h>
  15#include <linux/errno.h>
  16#include <linux/net.h>
  17#include <linux/in.h>
  18#include <linux/mm.h>
  19#include <linux/interrupt.h>
  20#include <linux/module.h>
  21#include <linux/kthread.h>
  22#include <linux/slab.h>
  23
  24#include <linux/sunrpc/types.h>
  25#include <linux/sunrpc/xdr.h>
  26#include <linux/sunrpc/stats.h>
  27#include <linux/sunrpc/svcsock.h>
  28#include <linux/sunrpc/clnt.h>
  29#include <linux/sunrpc/bc_xprt.h>
  30
  31#include <trace/events/sunrpc.h>
  32
  33#define RPCDBG_FACILITY RPCDBG_SVCDSP
  34
  35static void svc_unregister(const struct svc_serv *serv, struct net *net);
  36
  37#define svc_serv_is_pooled(serv)    ((serv)->sv_ops->svo_function)
  38
  39#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  40
  41/*
  42 * Structure for mapping cpus to pools and vice versa.
  43 * Setup once during sunrpc initialisation.
  44 */
  45struct svc_pool_map svc_pool_map = {
  46        .mode = SVC_POOL_DEFAULT
  47};
  48EXPORT_SYMBOL_GPL(svc_pool_map);
  49
  50static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  51
  52static int
  53param_set_pool_mode(const char *val, const struct kernel_param *kp)
  54{
  55        int *ip = (int *)kp->arg;
  56        struct svc_pool_map *m = &svc_pool_map;
  57        int err;
  58
  59        mutex_lock(&svc_pool_map_mutex);
  60
  61        err = -EBUSY;
  62        if (m->count)
  63                goto out;
  64
  65        err = 0;
  66        if (!strncmp(val, "auto", 4))
  67                *ip = SVC_POOL_AUTO;
  68        else if (!strncmp(val, "global", 6))
  69                *ip = SVC_POOL_GLOBAL;
  70        else if (!strncmp(val, "percpu", 6))
  71                *ip = SVC_POOL_PERCPU;
  72        else if (!strncmp(val, "pernode", 7))
  73                *ip = SVC_POOL_PERNODE;
  74        else
  75                err = -EINVAL;
  76
  77out:
  78        mutex_unlock(&svc_pool_map_mutex);
  79        return err;
  80}
  81
  82static int
  83param_get_pool_mode(char *buf, const struct kernel_param *kp)
  84{
  85        int *ip = (int *)kp->arg;
  86
  87        switch (*ip)
  88        {
  89        case SVC_POOL_AUTO:
  90                return strlcpy(buf, "auto", 20);
  91        case SVC_POOL_GLOBAL:
  92                return strlcpy(buf, "global", 20);
  93        case SVC_POOL_PERCPU:
  94                return strlcpy(buf, "percpu", 20);
  95        case SVC_POOL_PERNODE:
  96                return strlcpy(buf, "pernode", 20);
  97        default:
  98                return sprintf(buf, "%d", *ip);
  99        }
 100}
 101
 102module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 103                 &svc_pool_map.mode, 0644);
 104
 105/*
 106 * Detect best pool mapping mode heuristically,
 107 * according to the machine's topology.
 108 */
 109static int
 110svc_pool_map_choose_mode(void)
 111{
 112        unsigned int node;
 113
 114        if (nr_online_nodes > 1) {
 115                /*
 116                 * Actually have multiple NUMA nodes,
 117                 * so split pools on NUMA node boundaries
 118                 */
 119                return SVC_POOL_PERNODE;
 120        }
 121
 122        node = first_online_node;
 123        if (nr_cpus_node(node) > 2) {
 124                /*
 125                 * Non-trivial SMP, or CONFIG_NUMA on
 126                 * non-NUMA hardware, e.g. with a generic
 127                 * x86_64 kernel on Xeons.  In this case we
 128                 * want to divide the pools on cpu boundaries.
 129                 */
 130                return SVC_POOL_PERCPU;
 131        }
 132
 133        /* default: one global pool */
 134        return SVC_POOL_GLOBAL;
 135}
 136
 137/*
 138 * Allocate the to_pool[] and pool_to[] arrays.
 139 * Returns 0 on success or an errno.
 140 */
 141static int
 142svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 143{
 144        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 145        if (!m->to_pool)
 146                goto fail;
 147        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 148        if (!m->pool_to)
 149                goto fail_free;
 150
 151        return 0;
 152
 153fail_free:
 154        kfree(m->to_pool);
 155        m->to_pool = NULL;
 156fail:
 157        return -ENOMEM;
 158}
 159
 160/*
 161 * Initialise the pool map for SVC_POOL_PERCPU mode.
 162 * Returns number of pools or <0 on error.
 163 */
 164static int
 165svc_pool_map_init_percpu(struct svc_pool_map *m)
 166{
 167        unsigned int maxpools = nr_cpu_ids;
 168        unsigned int pidx = 0;
 169        unsigned int cpu;
 170        int err;
 171
 172        err = svc_pool_map_alloc_arrays(m, maxpools);
 173        if (err)
 174                return err;
 175
 176        for_each_online_cpu(cpu) {
 177                BUG_ON(pidx >= maxpools);
 178                m->to_pool[cpu] = pidx;
 179                m->pool_to[pidx] = cpu;
 180                pidx++;
 181        }
 182        /* cpus brought online later all get mapped to pool0, sorry */
 183
 184        return pidx;
 185};
 186
 187
 188/*
 189 * Initialise the pool map for SVC_POOL_PERNODE mode.
 190 * Returns number of pools or <0 on error.
 191 */
 192static int
 193svc_pool_map_init_pernode(struct svc_pool_map *m)
 194{
 195        unsigned int maxpools = nr_node_ids;
 196        unsigned int pidx = 0;
 197        unsigned int node;
 198        int err;
 199
 200        err = svc_pool_map_alloc_arrays(m, maxpools);
 201        if (err)
 202                return err;
 203
 204        for_each_node_with_cpus(node) {
 205                /* some architectures (e.g. SN2) have cpuless nodes */
 206                BUG_ON(pidx > maxpools);
 207                m->to_pool[node] = pidx;
 208                m->pool_to[pidx] = node;
 209                pidx++;
 210        }
 211        /* nodes brought online later all get mapped to pool0, sorry */
 212
 213        return pidx;
 214}
 215
 216
 217/*
 218 * Add a reference to the global map of cpus to pools (and
 219 * vice versa).  Initialise the map if we're the first user.
 220 * Returns the number of pools.
 221 */
 222unsigned int
 223svc_pool_map_get(void)
 224{
 225        struct svc_pool_map *m = &svc_pool_map;
 226        int npools = -1;
 227
 228        mutex_lock(&svc_pool_map_mutex);
 229
 230        if (m->count++) {
 231                mutex_unlock(&svc_pool_map_mutex);
 232                return m->npools;
 233        }
 234
 235        if (m->mode == SVC_POOL_AUTO)
 236                m->mode = svc_pool_map_choose_mode();
 237
 238        switch (m->mode) {
 239        case SVC_POOL_PERCPU:
 240                npools = svc_pool_map_init_percpu(m);
 241                break;
 242        case SVC_POOL_PERNODE:
 243                npools = svc_pool_map_init_pernode(m);
 244                break;
 245        }
 246
 247        if (npools < 0) {
 248                /* default, or memory allocation failure */
 249                npools = 1;
 250                m->mode = SVC_POOL_GLOBAL;
 251        }
 252        m->npools = npools;
 253
 254        mutex_unlock(&svc_pool_map_mutex);
 255        return m->npools;
 256}
 257EXPORT_SYMBOL_GPL(svc_pool_map_get);
 258
 259/*
 260 * Drop a reference to the global map of cpus to pools.
 261 * When the last reference is dropped, the map data is
 262 * freed; this allows the sysadmin to change the pool
 263 * mode using the pool_mode module option without
 264 * rebooting or re-loading sunrpc.ko.
 265 */
 266void
 267svc_pool_map_put(void)
 268{
 269        struct svc_pool_map *m = &svc_pool_map;
 270
 271        mutex_lock(&svc_pool_map_mutex);
 272
 273        if (!--m->count) {
 274                kfree(m->to_pool);
 275                m->to_pool = NULL;
 276                kfree(m->pool_to);
 277                m->pool_to = NULL;
 278                m->npools = 0;
 279        }
 280
 281        mutex_unlock(&svc_pool_map_mutex);
 282}
 283EXPORT_SYMBOL_GPL(svc_pool_map_put);
 284
 285static int svc_pool_map_get_node(unsigned int pidx)
 286{
 287        const struct svc_pool_map *m = &svc_pool_map;
 288
 289        if (m->count) {
 290                if (m->mode == SVC_POOL_PERCPU)
 291                        return cpu_to_node(m->pool_to[pidx]);
 292                if (m->mode == SVC_POOL_PERNODE)
 293                        return m->pool_to[pidx];
 294        }
 295        return NUMA_NO_NODE;
 296}
 297/*
 298 * Set the given thread's cpus_allowed mask so that it
 299 * will only run on cpus in the given pool.
 300 */
 301static inline void
 302svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 303{
 304        struct svc_pool_map *m = &svc_pool_map;
 305        unsigned int node = m->pool_to[pidx];
 306
 307        /*
 308         * The caller checks for sv_nrpools > 1, which
 309         * implies that we've been initialized.
 310         */
 311        WARN_ON_ONCE(m->count == 0);
 312        if (m->count == 0)
 313                return;
 314
 315        switch (m->mode) {
 316        case SVC_POOL_PERCPU:
 317        {
 318                set_cpus_allowed_ptr(task, cpumask_of(node));
 319                break;
 320        }
 321        case SVC_POOL_PERNODE:
 322        {
 323                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 324                break;
 325        }
 326        }
 327}
 328
 329/*
 330 * Use the mapping mode to choose a pool for a given CPU.
 331 * Used when enqueueing an incoming RPC.  Always returns
 332 * a non-NULL pool pointer.
 333 */
 334struct svc_pool *
 335svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 336{
 337        struct svc_pool_map *m = &svc_pool_map;
 338        unsigned int pidx = 0;
 339
 340        /*
 341         * An uninitialised map happens in a pure client when
 342         * lockd is brought up, so silently treat it the
 343         * same as SVC_POOL_GLOBAL.
 344         */
 345        if (svc_serv_is_pooled(serv)) {
 346                switch (m->mode) {
 347                case SVC_POOL_PERCPU:
 348                        pidx = m->to_pool[cpu];
 349                        break;
 350                case SVC_POOL_PERNODE:
 351                        pidx = m->to_pool[cpu_to_node(cpu)];
 352                        break;
 353                }
 354        }
 355        return &serv->sv_pools[pidx % serv->sv_nrpools];
 356}
 357
 358int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
 359{
 360        int err;
 361
 362        err = rpcb_create_local(net);
 363        if (err)
 364                return err;
 365
 366        /* Remove any stale portmap registrations */
 367        svc_unregister(serv, net);
 368        return 0;
 369}
 370EXPORT_SYMBOL_GPL(svc_rpcb_setup);
 371
 372void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
 373{
 374        svc_unregister(serv, net);
 375        rpcb_put_local(net);
 376}
 377EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
 378
 379static int svc_uses_rpcbind(struct svc_serv *serv)
 380{
 381        struct svc_program      *progp;
 382        unsigned int            i;
 383
 384        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 385                for (i = 0; i < progp->pg_nvers; i++) {
 386                        if (progp->pg_vers[i] == NULL)
 387                                continue;
 388                        if (!progp->pg_vers[i]->vs_hidden)
 389                                return 1;
 390                }
 391        }
 392
 393        return 0;
 394}
 395
 396int svc_bind(struct svc_serv *serv, struct net *net)
 397{
 398        if (!svc_uses_rpcbind(serv))
 399                return 0;
 400        return svc_rpcb_setup(serv, net);
 401}
 402EXPORT_SYMBOL_GPL(svc_bind);
 403
 404#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 405static void
 406__svc_init_bc(struct svc_serv *serv)
 407{
 408        INIT_LIST_HEAD(&serv->sv_cb_list);
 409        spin_lock_init(&serv->sv_cb_lock);
 410        init_waitqueue_head(&serv->sv_cb_waitq);
 411}
 412#else
 413static void
 414__svc_init_bc(struct svc_serv *serv)
 415{
 416}
 417#endif
 418
 419/*
 420 * Create an RPC service
 421 */
 422static struct svc_serv *
 423__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 424             const struct svc_serv_ops *ops)
 425{
 426        struct svc_serv *serv;
 427        unsigned int vers;
 428        unsigned int xdrsize;
 429        unsigned int i;
 430
 431        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 432                return NULL;
 433        serv->sv_name      = prog->pg_name;
 434        serv->sv_program   = prog;
 435        serv->sv_nrthreads = 1;
 436        serv->sv_stats     = prog->pg_stats;
 437        if (bufsize > RPCSVC_MAXPAYLOAD)
 438                bufsize = RPCSVC_MAXPAYLOAD;
 439        serv->sv_max_payload = bufsize? bufsize : 4096;
 440        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 441        serv->sv_ops = ops;
 442        xdrsize = 0;
 443        while (prog) {
 444                prog->pg_lovers = prog->pg_nvers-1;
 445                for (vers=0; vers<prog->pg_nvers ; vers++)
 446                        if (prog->pg_vers[vers]) {
 447                                prog->pg_hivers = vers;
 448                                if (prog->pg_lovers > vers)
 449                                        prog->pg_lovers = vers;
 450                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 451                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 452                        }
 453                prog = prog->pg_next;
 454        }
 455        serv->sv_xdrsize   = xdrsize;
 456        INIT_LIST_HEAD(&serv->sv_tempsocks);
 457        INIT_LIST_HEAD(&serv->sv_permsocks);
 458        timer_setup(&serv->sv_temptimer, NULL, 0);
 459        spin_lock_init(&serv->sv_lock);
 460
 461        __svc_init_bc(serv);
 462
 463        serv->sv_nrpools = npools;
 464        serv->sv_pools =
 465                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 466                        GFP_KERNEL);
 467        if (!serv->sv_pools) {
 468                kfree(serv);
 469                return NULL;
 470        }
 471
 472        for (i = 0; i < serv->sv_nrpools; i++) {
 473                struct svc_pool *pool = &serv->sv_pools[i];
 474
 475                dprintk("svc: initialising pool %u for %s\n",
 476                                i, serv->sv_name);
 477
 478                pool->sp_id = i;
 479                INIT_LIST_HEAD(&pool->sp_sockets);
 480                INIT_LIST_HEAD(&pool->sp_all_threads);
 481                spin_lock_init(&pool->sp_lock);
 482        }
 483
 484        return serv;
 485}
 486
 487struct svc_serv *
 488svc_create(struct svc_program *prog, unsigned int bufsize,
 489           const struct svc_serv_ops *ops)
 490{
 491        return __svc_create(prog, bufsize, /*npools*/1, ops);
 492}
 493EXPORT_SYMBOL_GPL(svc_create);
 494
 495struct svc_serv *
 496svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 497                  const struct svc_serv_ops *ops)
 498{
 499        struct svc_serv *serv;
 500        unsigned int npools = svc_pool_map_get();
 501
 502        serv = __svc_create(prog, bufsize, npools, ops);
 503        if (!serv)
 504                goto out_err;
 505        return serv;
 506out_err:
 507        svc_pool_map_put();
 508        return NULL;
 509}
 510EXPORT_SYMBOL_GPL(svc_create_pooled);
 511
 512void svc_shutdown_net(struct svc_serv *serv, struct net *net)
 513{
 514        svc_close_net(serv, net);
 515
 516        if (serv->sv_ops->svo_shutdown)
 517                serv->sv_ops->svo_shutdown(serv, net);
 518}
 519EXPORT_SYMBOL_GPL(svc_shutdown_net);
 520
 521/*
 522 * Destroy an RPC service. Should be called with appropriate locking to
 523 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 524 */
 525void
 526svc_destroy(struct svc_serv *serv)
 527{
 528        dprintk("svc: svc_destroy(%s, %d)\n",
 529                                serv->sv_program->pg_name,
 530                                serv->sv_nrthreads);
 531
 532        if (serv->sv_nrthreads) {
 533                if (--(serv->sv_nrthreads) != 0) {
 534                        svc_sock_update_bufs(serv);
 535                        return;
 536                }
 537        } else
 538                printk("svc_destroy: no threads for serv=%p!\n", serv);
 539
 540        del_timer_sync(&serv->sv_temptimer);
 541
 542        /*
 543         * The last user is gone and thus all sockets have to be destroyed to
 544         * the point. Check this.
 545         */
 546        BUG_ON(!list_empty(&serv->sv_permsocks));
 547        BUG_ON(!list_empty(&serv->sv_tempsocks));
 548
 549        cache_clean_deferred(serv);
 550
 551        if (svc_serv_is_pooled(serv))
 552                svc_pool_map_put();
 553
 554        kfree(serv->sv_pools);
 555        kfree(serv);
 556}
 557EXPORT_SYMBOL_GPL(svc_destroy);
 558
 559/*
 560 * Allocate an RPC server's buffer space.
 561 * We allocate pages and place them in rq_argpages.
 562 */
 563static int
 564svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
 565{
 566        unsigned int pages, arghi;
 567
 568        /* bc_xprt uses fore channel allocated buffers */
 569        if (svc_is_backchannel(rqstp))
 570                return 1;
 571
 572        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 573                                       * We assume one is at most one page
 574                                       */
 575        arghi = 0;
 576        WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
 577        if (pages > RPCSVC_MAXPAGES)
 578                pages = RPCSVC_MAXPAGES;
 579        while (pages) {
 580                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
 581                if (!p)
 582                        break;
 583                rqstp->rq_pages[arghi++] = p;
 584                pages--;
 585        }
 586        return pages == 0;
 587}
 588
 589/*
 590 * Release an RPC server buffer
 591 */
 592static void
 593svc_release_buffer(struct svc_rqst *rqstp)
 594{
 595        unsigned int i;
 596
 597        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 598                if (rqstp->rq_pages[i])
 599                        put_page(rqstp->rq_pages[i]);
 600}
 601
 602struct svc_rqst *
 603svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
 604{
 605        struct svc_rqst *rqstp;
 606
 607        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
 608        if (!rqstp)
 609                return rqstp;
 610
 611        __set_bit(RQ_BUSY, &rqstp->rq_flags);
 612        spin_lock_init(&rqstp->rq_lock);
 613        rqstp->rq_server = serv;
 614        rqstp->rq_pool = pool;
 615
 616        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 617        if (!rqstp->rq_argp)
 618                goto out_enomem;
 619
 620        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 621        if (!rqstp->rq_resp)
 622                goto out_enomem;
 623
 624        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
 625                goto out_enomem;
 626
 627        return rqstp;
 628out_enomem:
 629        svc_rqst_free(rqstp);
 630        return NULL;
 631}
 632EXPORT_SYMBOL_GPL(svc_rqst_alloc);
 633
 634struct svc_rqst *
 635svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 636{
 637        struct svc_rqst *rqstp;
 638
 639        rqstp = svc_rqst_alloc(serv, pool, node);
 640        if (!rqstp)
 641                return ERR_PTR(-ENOMEM);
 642
 643        serv->sv_nrthreads++;
 644        spin_lock_bh(&pool->sp_lock);
 645        pool->sp_nrthreads++;
 646        list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
 647        spin_unlock_bh(&pool->sp_lock);
 648        return rqstp;
 649}
 650EXPORT_SYMBOL_GPL(svc_prepare_thread);
 651
 652/*
 653 * Choose a pool in which to create a new thread, for svc_set_num_threads
 654 */
 655static inline struct svc_pool *
 656choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 657{
 658        if (pool != NULL)
 659                return pool;
 660
 661        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 662}
 663
 664/*
 665 * Choose a thread to kill, for svc_set_num_threads
 666 */
 667static inline struct task_struct *
 668choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 669{
 670        unsigned int i;
 671        struct task_struct *task = NULL;
 672
 673        if (pool != NULL) {
 674                spin_lock_bh(&pool->sp_lock);
 675        } else {
 676                /* choose a pool in round-robin fashion */
 677                for (i = 0; i < serv->sv_nrpools; i++) {
 678                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 679                        spin_lock_bh(&pool->sp_lock);
 680                        if (!list_empty(&pool->sp_all_threads))
 681                                goto found_pool;
 682                        spin_unlock_bh(&pool->sp_lock);
 683                }
 684                return NULL;
 685        }
 686
 687found_pool:
 688        if (!list_empty(&pool->sp_all_threads)) {
 689                struct svc_rqst *rqstp;
 690
 691                /*
 692                 * Remove from the pool->sp_all_threads list
 693                 * so we don't try to kill it again.
 694                 */
 695                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 696                set_bit(RQ_VICTIM, &rqstp->rq_flags);
 697                list_del_rcu(&rqstp->rq_all);
 698                task = rqstp->rq_task;
 699        }
 700        spin_unlock_bh(&pool->sp_lock);
 701
 702        return task;
 703}
 704
 705/* create new threads */
 706static int
 707svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 708{
 709        struct svc_rqst *rqstp;
 710        struct task_struct *task;
 711        struct svc_pool *chosen_pool;
 712        unsigned int state = serv->sv_nrthreads-1;
 713        int node;
 714
 715        do {
 716                nrservs--;
 717                chosen_pool = choose_pool(serv, pool, &state);
 718
 719                node = svc_pool_map_get_node(chosen_pool->sp_id);
 720                rqstp = svc_prepare_thread(serv, chosen_pool, node);
 721                if (IS_ERR(rqstp))
 722                        return PTR_ERR(rqstp);
 723
 724                __module_get(serv->sv_ops->svo_module);
 725                task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
 726                                              node, "%s", serv->sv_name);
 727                if (IS_ERR(task)) {
 728                        module_put(serv->sv_ops->svo_module);
 729                        svc_exit_thread(rqstp);
 730                        return PTR_ERR(task);
 731                }
 732
 733                rqstp->rq_task = task;
 734                if (serv->sv_nrpools > 1)
 735                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 736
 737                svc_sock_update_bufs(serv);
 738                wake_up_process(task);
 739        } while (nrservs > 0);
 740
 741        return 0;
 742}
 743
 744
 745/* destroy old threads */
 746static int
 747svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 748{
 749        struct task_struct *task;
 750        unsigned int state = serv->sv_nrthreads-1;
 751
 752        /* destroy old threads */
 753        do {
 754                task = choose_victim(serv, pool, &state);
 755                if (task == NULL)
 756                        break;
 757                send_sig(SIGINT, task, 1);
 758                nrservs++;
 759        } while (nrservs < 0);
 760
 761        return 0;
 762}
 763
 764/*
 765 * Create or destroy enough new threads to make the number
 766 * of threads the given number.  If `pool' is non-NULL, applies
 767 * only to threads in that pool, otherwise round-robins between
 768 * all pools.  Caller must ensure that mutual exclusion between this and
 769 * server startup or shutdown.
 770 *
 771 * Destroying threads relies on the service threads filling in
 772 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 773 * has been created using svc_create_pooled().
 774 *
 775 * Based on code that used to be in nfsd_svc() but tweaked
 776 * to be pool-aware.
 777 */
 778int
 779svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 780{
 781        if (pool == NULL) {
 782                /* The -1 assumes caller has done a svc_get() */
 783                nrservs -= (serv->sv_nrthreads-1);
 784        } else {
 785                spin_lock_bh(&pool->sp_lock);
 786                nrservs -= pool->sp_nrthreads;
 787                spin_unlock_bh(&pool->sp_lock);
 788        }
 789
 790        if (nrservs > 0)
 791                return svc_start_kthreads(serv, pool, nrservs);
 792        if (nrservs < 0)
 793                return svc_signal_kthreads(serv, pool, nrservs);
 794        return 0;
 795}
 796EXPORT_SYMBOL_GPL(svc_set_num_threads);
 797
 798/* destroy old threads */
 799static int
 800svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 801{
 802        struct task_struct *task;
 803        unsigned int state = serv->sv_nrthreads-1;
 804
 805        /* destroy old threads */
 806        do {
 807                task = choose_victim(serv, pool, &state);
 808                if (task == NULL)
 809                        break;
 810                kthread_stop(task);
 811                nrservs++;
 812        } while (nrservs < 0);
 813        return 0;
 814}
 815
 816int
 817svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 818{
 819        if (pool == NULL) {
 820                /* The -1 assumes caller has done a svc_get() */
 821                nrservs -= (serv->sv_nrthreads-1);
 822        } else {
 823                spin_lock_bh(&pool->sp_lock);
 824                nrservs -= pool->sp_nrthreads;
 825                spin_unlock_bh(&pool->sp_lock);
 826        }
 827
 828        if (nrservs > 0)
 829                return svc_start_kthreads(serv, pool, nrservs);
 830        if (nrservs < 0)
 831                return svc_stop_kthreads(serv, pool, nrservs);
 832        return 0;
 833}
 834EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
 835
 836/*
 837 * Called from a server thread as it's exiting. Caller must hold the "service
 838 * mutex" for the service.
 839 */
 840void
 841svc_rqst_free(struct svc_rqst *rqstp)
 842{
 843        svc_release_buffer(rqstp);
 844        kfree(rqstp->rq_resp);
 845        kfree(rqstp->rq_argp);
 846        kfree(rqstp->rq_auth_data);
 847        kfree_rcu(rqstp, rq_rcu_head);
 848}
 849EXPORT_SYMBOL_GPL(svc_rqst_free);
 850
 851void
 852svc_exit_thread(struct svc_rqst *rqstp)
 853{
 854        struct svc_serv *serv = rqstp->rq_server;
 855        struct svc_pool *pool = rqstp->rq_pool;
 856
 857        spin_lock_bh(&pool->sp_lock);
 858        pool->sp_nrthreads--;
 859        if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
 860                list_del_rcu(&rqstp->rq_all);
 861        spin_unlock_bh(&pool->sp_lock);
 862
 863        svc_rqst_free(rqstp);
 864
 865        /* Release the server */
 866        if (serv)
 867                svc_destroy(serv);
 868}
 869EXPORT_SYMBOL_GPL(svc_exit_thread);
 870
 871/*
 872 * Register an "inet" protocol family netid with the local
 873 * rpcbind daemon via an rpcbind v4 SET request.
 874 *
 875 * No netconfig infrastructure is available in the kernel, so
 876 * we map IP_ protocol numbers to netids by hand.
 877 *
 878 * Returns zero on success; a negative errno value is returned
 879 * if any error occurs.
 880 */
 881static int __svc_rpcb_register4(struct net *net, const u32 program,
 882                                const u32 version,
 883                                const unsigned short protocol,
 884                                const unsigned short port)
 885{
 886        const struct sockaddr_in sin = {
 887                .sin_family             = AF_INET,
 888                .sin_addr.s_addr        = htonl(INADDR_ANY),
 889                .sin_port               = htons(port),
 890        };
 891        const char *netid;
 892        int error;
 893
 894        switch (protocol) {
 895        case IPPROTO_UDP:
 896                netid = RPCBIND_NETID_UDP;
 897                break;
 898        case IPPROTO_TCP:
 899                netid = RPCBIND_NETID_TCP;
 900                break;
 901        default:
 902                return -ENOPROTOOPT;
 903        }
 904
 905        error = rpcb_v4_register(net, program, version,
 906                                        (const struct sockaddr *)&sin, netid);
 907
 908        /*
 909         * User space didn't support rpcbind v4, so retry this
 910         * registration request with the legacy rpcbind v2 protocol.
 911         */
 912        if (error == -EPROTONOSUPPORT)
 913                error = rpcb_register(net, program, version, protocol, port);
 914
 915        return error;
 916}
 917
 918#if IS_ENABLED(CONFIG_IPV6)
 919/*
 920 * Register an "inet6" protocol family netid with the local
 921 * rpcbind daemon via an rpcbind v4 SET request.
 922 *
 923 * No netconfig infrastructure is available in the kernel, so
 924 * we map IP_ protocol numbers to netids by hand.
 925 *
 926 * Returns zero on success; a negative errno value is returned
 927 * if any error occurs.
 928 */
 929static int __svc_rpcb_register6(struct net *net, const u32 program,
 930                                const u32 version,
 931                                const unsigned short protocol,
 932                                const unsigned short port)
 933{
 934        const struct sockaddr_in6 sin6 = {
 935                .sin6_family            = AF_INET6,
 936                .sin6_addr              = IN6ADDR_ANY_INIT,
 937                .sin6_port              = htons(port),
 938        };
 939        const char *netid;
 940        int error;
 941
 942        switch (protocol) {
 943        case IPPROTO_UDP:
 944                netid = RPCBIND_NETID_UDP6;
 945                break;
 946        case IPPROTO_TCP:
 947                netid = RPCBIND_NETID_TCP6;
 948                break;
 949        default:
 950                return -ENOPROTOOPT;
 951        }
 952
 953        error = rpcb_v4_register(net, program, version,
 954                                        (const struct sockaddr *)&sin6, netid);
 955
 956        /*
 957         * User space didn't support rpcbind version 4, so we won't
 958         * use a PF_INET6 listener.
 959         */
 960        if (error == -EPROTONOSUPPORT)
 961                error = -EAFNOSUPPORT;
 962
 963        return error;
 964}
 965#endif  /* IS_ENABLED(CONFIG_IPV6) */
 966
 967/*
 968 * Register a kernel RPC service via rpcbind version 4.
 969 *
 970 * Returns zero on success; a negative errno value is returned
 971 * if any error occurs.
 972 */
 973static int __svc_register(struct net *net, const char *progname,
 974                          const u32 program, const u32 version,
 975                          const int family,
 976                          const unsigned short protocol,
 977                          const unsigned short port)
 978{
 979        int error = -EAFNOSUPPORT;
 980
 981        switch (family) {
 982        case PF_INET:
 983                error = __svc_rpcb_register4(net, program, version,
 984                                                protocol, port);
 985                break;
 986#if IS_ENABLED(CONFIG_IPV6)
 987        case PF_INET6:
 988                error = __svc_rpcb_register6(net, program, version,
 989                                                protocol, port);
 990#endif
 991        }
 992
 993        return error;
 994}
 995
 996/**
 997 * svc_register - register an RPC service with the local portmapper
 998 * @serv: svc_serv struct for the service to register
 999 * @net: net namespace for the service to register
1000 * @family: protocol family of service's listener socket
1001 * @proto: transport protocol number to advertise
1002 * @port: port to advertise
1003 *
1004 * Service is registered for any address in the passed-in protocol family
1005 */
1006int svc_register(const struct svc_serv *serv, struct net *net,
1007                 const int family, const unsigned short proto,
1008                 const unsigned short port)
1009{
1010        struct svc_program      *progp;
1011        const struct svc_version *vers;
1012        unsigned int            i;
1013        int                     error = 0;
1014
1015        WARN_ON_ONCE(proto == 0 && port == 0);
1016        if (proto == 0 && port == 0)
1017                return -EINVAL;
1018
1019        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1020                for (i = 0; i < progp->pg_nvers; i++) {
1021                        vers = progp->pg_vers[i];
1022                        if (vers == NULL)
1023                                continue;
1024
1025                        dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
1026                                        progp->pg_name,
1027                                        i,
1028                                        proto == IPPROTO_UDP?  "udp" : "tcp",
1029                                        port,
1030                                        family,
1031                                        vers->vs_hidden ?
1032                                        " (but not telling portmap)" : "");
1033
1034                        if (vers->vs_hidden)
1035                                continue;
1036
1037                        /*
1038                         * Don't register a UDP port if we need congestion
1039                         * control.
1040                         */
1041                        if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
1042                                continue;
1043
1044                        error = __svc_register(net, progp->pg_name, progp->pg_prog,
1045                                                i, family, proto, port);
1046
1047                        if (vers->vs_rpcb_optnl) {
1048                                error = 0;
1049                                continue;
1050                        }
1051
1052                        if (error < 0) {
1053                                printk(KERN_WARNING "svc: failed to register "
1054                                        "%sv%u RPC service (errno %d).\n",
1055                                        progp->pg_name, i, -error);
1056                                break;
1057                        }
1058                }
1059        }
1060
1061        return error;
1062}
1063
1064/*
1065 * If user space is running rpcbind, it should take the v4 UNSET
1066 * and clear everything for this [program, version].  If user space
1067 * is running portmap, it will reject the v4 UNSET, but won't have
1068 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
1069 * in this case to clear all existing entries for [program, version].
1070 */
1071static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1072                             const char *progname)
1073{
1074        int error;
1075
1076        error = rpcb_v4_register(net, program, version, NULL, "");
1077
1078        /*
1079         * User space didn't support rpcbind v4, so retry this
1080         * request with the legacy rpcbind v2 protocol.
1081         */
1082        if (error == -EPROTONOSUPPORT)
1083                error = rpcb_register(net, program, version, 0, 0);
1084
1085        dprintk("svc: %s(%sv%u), error %d\n",
1086                        __func__, progname, version, error);
1087}
1088
1089/*
1090 * All netids, bind addresses and ports registered for [program, version]
1091 * are removed from the local rpcbind database (if the service is not
1092 * hidden) to make way for a new instance of the service.
1093 *
1094 * The result of unregistration is reported via dprintk for those who want
1095 * verification of the result, but is otherwise not important.
1096 */
1097static void svc_unregister(const struct svc_serv *serv, struct net *net)
1098{
1099        struct svc_program *progp;
1100        unsigned long flags;
1101        unsigned int i;
1102
1103        clear_thread_flag(TIF_SIGPENDING);
1104
1105        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1106                for (i = 0; i < progp->pg_nvers; i++) {
1107                        if (progp->pg_vers[i] == NULL)
1108                                continue;
1109                        if (progp->pg_vers[i]->vs_hidden)
1110                                continue;
1111
1112                        dprintk("svc: attempting to unregister %sv%u\n",
1113                                progp->pg_name, i);
1114                        __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1115                }
1116        }
1117
1118        spin_lock_irqsave(&current->sighand->siglock, flags);
1119        recalc_sigpending();
1120        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1121}
1122
1123/*
1124 * dprintk the given error with the address of the client that caused it.
1125 */
1126#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1127static __printf(2, 3)
1128void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1129{
1130        struct va_format vaf;
1131        va_list args;
1132        char    buf[RPC_MAX_ADDRBUFLEN];
1133
1134        va_start(args, fmt);
1135
1136        vaf.fmt = fmt;
1137        vaf.va = &args;
1138
1139        dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1140
1141        va_end(args);
1142}
1143#else
1144static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1145#endif
1146
1147/*
1148 * Common routine for processing the RPC request.
1149 */
1150static int
1151svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1152{
1153        struct svc_program      *progp;
1154        const struct svc_version *versp = NULL; /* compiler food */
1155        const struct svc_procedure *procp = NULL;
1156        struct svc_serv         *serv = rqstp->rq_server;
1157        __be32                  *statp;
1158        u32                     prog, vers, proc;
1159        __be32                  auth_stat, rpc_stat;
1160        int                     auth_res;
1161        __be32                  *reply_statp;
1162
1163        rpc_stat = rpc_success;
1164
1165        if (argv->iov_len < 6*4)
1166                goto err_short_len;
1167
1168        /* Will be turned off by GSS integrity and privacy services */
1169        set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1170        /* Will be turned off only when NFSv4 Sessions are used */
1171        set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1172        clear_bit(RQ_DROPME, &rqstp->rq_flags);
1173
1174        svc_putu32(resv, rqstp->rq_xid);
1175
1176        vers = svc_getnl(argv);
1177
1178        /* First words of reply: */
1179        svc_putnl(resv, 1);             /* REPLY */
1180
1181        if (vers != 2)          /* RPC version number */
1182                goto err_bad_rpc;
1183
1184        /* Save position in case we later decide to reject: */
1185        reply_statp = resv->iov_base + resv->iov_len;
1186
1187        svc_putnl(resv, 0);             /* ACCEPT */
1188
1189        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1190        rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
1191        rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
1192
1193        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1194                if (prog == progp->pg_prog)
1195                        break;
1196
1197        /*
1198         * Decode auth data, and add verifier to reply buffer.
1199         * We do this before anything else in order to get a decent
1200         * auth verifier.
1201         */
1202        auth_res = svc_authenticate(rqstp, &auth_stat);
1203        /* Also give the program a chance to reject this call: */
1204        if (auth_res == SVC_OK && progp) {
1205                auth_stat = rpc_autherr_badcred;
1206                auth_res = progp->pg_authenticate(rqstp);
1207        }
1208        switch (auth_res) {
1209        case SVC_OK:
1210                break;
1211        case SVC_GARBAGE:
1212                goto err_garbage;
1213        case SVC_SYSERR:
1214                rpc_stat = rpc_system_err;
1215                goto err_bad;
1216        case SVC_DENIED:
1217                goto err_bad_auth;
1218        case SVC_CLOSE:
1219                goto close;
1220        case SVC_DROP:
1221                goto dropit;
1222        case SVC_COMPLETE:
1223                goto sendit;
1224        }
1225
1226        if (progp == NULL)
1227                goto err_bad_prog;
1228
1229        if (vers >= progp->pg_nvers ||
1230          !(versp = progp->pg_vers[vers]))
1231                goto err_bad_vers;
1232
1233        /*
1234         * Some protocol versions (namely NFSv4) require some form of
1235         * congestion control.  (See RFC 7530 section 3.1 paragraph 2)
1236         * In other words, UDP is not allowed. We mark those when setting
1237         * up the svc_xprt, and verify that here.
1238         *
1239         * The spec is not very clear about what error should be returned
1240         * when someone tries to access a server that is listening on UDP
1241         * for lower versions. RPC_PROG_MISMATCH seems to be the closest
1242         * fit.
1243         */
1244        if (versp->vs_need_cong_ctrl && rqstp->rq_xprt &&
1245            !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
1246                goto err_bad_vers;
1247
1248        procp = versp->vs_proc + proc;
1249        if (proc >= versp->vs_nproc || !procp->pc_func)
1250                goto err_bad_proc;
1251        rqstp->rq_procinfo = procp;
1252
1253        /* Syntactic check complete */
1254        serv->sv_stats->rpccnt++;
1255        trace_svc_process(rqstp, progp->pg_name);
1256
1257        /* Build the reply header. */
1258        statp = resv->iov_base +resv->iov_len;
1259        svc_putnl(resv, RPC_SUCCESS);
1260
1261        /* Bump per-procedure stats counter */
1262        versp->vs_count[proc]++;
1263
1264        /* Initialize storage for argp and resp */
1265        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1266        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1267
1268        /* un-reserve some of the out-queue now that we have a
1269         * better idea of reply size
1270         */
1271        if (procp->pc_xdrressize)
1272                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1273
1274        /* Call the function that processes the request. */
1275        if (!versp->vs_dispatch) {
1276                /*
1277                 * Decode arguments
1278                 * XXX: why do we ignore the return value?
1279                 */
1280                if (procp->pc_decode &&
1281                    !procp->pc_decode(rqstp, argv->iov_base))
1282                        goto err_garbage;
1283
1284                *statp = procp->pc_func(rqstp);
1285
1286                /* Encode reply */
1287                if (*statp == rpc_drop_reply ||
1288                    test_bit(RQ_DROPME, &rqstp->rq_flags)) {
1289                        if (procp->pc_release)
1290                                procp->pc_release(rqstp);
1291                        goto dropit;
1292                }
1293                if (*statp == rpc_autherr_badcred) {
1294                        if (procp->pc_release)
1295                                procp->pc_release(rqstp);
1296                        goto err_bad_auth;
1297                }
1298                if (*statp == rpc_success && procp->pc_encode &&
1299                    !procp->pc_encode(rqstp, resv->iov_base + resv->iov_len)) {
1300                        dprintk("svc: failed to encode reply\n");
1301                        /* serv->sv_stats->rpcsystemerr++; */
1302                        *statp = rpc_system_err;
1303                }
1304        } else {
1305                dprintk("svc: calling dispatcher\n");
1306                if (!versp->vs_dispatch(rqstp, statp)) {
1307                        /* Release reply info */
1308                        if (procp->pc_release)
1309                                procp->pc_release(rqstp);
1310                        goto dropit;
1311                }
1312        }
1313
1314        /* Check RPC status result */
1315        if (*statp != rpc_success)
1316                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1317
1318        /* Release reply info */
1319        if (procp->pc_release)
1320                procp->pc_release(rqstp);
1321
1322        if (procp->pc_encode == NULL)
1323                goto dropit;
1324
1325 sendit:
1326        if (svc_authorise(rqstp))
1327                goto close;
1328        return 1;               /* Caller can now send it */
1329
1330 dropit:
1331        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1332        dprintk("svc: svc_process dropit\n");
1333        return 0;
1334
1335 close:
1336        if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1337                svc_close_xprt(rqstp->rq_xprt);
1338        dprintk("svc: svc_process close\n");
1339        return 0;
1340
1341err_short_len:
1342        svc_printk(rqstp, "short len %zd, dropping request\n",
1343                        argv->iov_len);
1344        goto close;
1345
1346err_bad_rpc:
1347        serv->sv_stats->rpcbadfmt++;
1348        svc_putnl(resv, 1);     /* REJECT */
1349        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1350        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1351        svc_putnl(resv, 2);
1352        goto sendit;
1353
1354err_bad_auth:
1355        dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1356        serv->sv_stats->rpcbadauth++;
1357        /* Restore write pointer to location of accept status: */
1358        xdr_ressize_check(rqstp, reply_statp);
1359        svc_putnl(resv, 1);     /* REJECT */
1360        svc_putnl(resv, 1);     /* AUTH_ERROR */
1361        svc_putnl(resv, ntohl(auth_stat));      /* status */
1362        goto sendit;
1363
1364err_bad_prog:
1365        dprintk("svc: unknown program %d\n", prog);
1366        serv->sv_stats->rpcbadfmt++;
1367        svc_putnl(resv, RPC_PROG_UNAVAIL);
1368        goto sendit;
1369
1370err_bad_vers:
1371        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1372                       vers, prog, progp->pg_name);
1373
1374        serv->sv_stats->rpcbadfmt++;
1375        svc_putnl(resv, RPC_PROG_MISMATCH);
1376        svc_putnl(resv, progp->pg_lovers);
1377        svc_putnl(resv, progp->pg_hivers);
1378        goto sendit;
1379
1380err_bad_proc:
1381        svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1382
1383        serv->sv_stats->rpcbadfmt++;
1384        svc_putnl(resv, RPC_PROC_UNAVAIL);
1385        goto sendit;
1386
1387err_garbage:
1388        svc_printk(rqstp, "failed to decode args\n");
1389
1390        rpc_stat = rpc_garbage_args;
1391err_bad:
1392        serv->sv_stats->rpcbadfmt++;
1393        svc_putnl(resv, ntohl(rpc_stat));
1394        goto sendit;
1395}
1396
1397/*
1398 * Process the RPC request.
1399 */
1400int
1401svc_process(struct svc_rqst *rqstp)
1402{
1403        struct kvec             *argv = &rqstp->rq_arg.head[0];
1404        struct kvec             *resv = &rqstp->rq_res.head[0];
1405        struct svc_serv         *serv = rqstp->rq_server;
1406        u32                     dir;
1407
1408        /*
1409         * Setup response xdr_buf.
1410         * Initially it has just one page
1411         */
1412        rqstp->rq_next_page = &rqstp->rq_respages[1];
1413        resv->iov_base = page_address(rqstp->rq_respages[0]);
1414        resv->iov_len = 0;
1415        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1416        rqstp->rq_res.len = 0;
1417        rqstp->rq_res.page_base = 0;
1418        rqstp->rq_res.page_len = 0;
1419        rqstp->rq_res.buflen = PAGE_SIZE;
1420        rqstp->rq_res.tail[0].iov_base = NULL;
1421        rqstp->rq_res.tail[0].iov_len = 0;
1422
1423        dir  = svc_getnl(argv);
1424        if (dir != 0) {
1425                /* direction != CALL */
1426                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1427                serv->sv_stats->rpcbadfmt++;
1428                goto out_drop;
1429        }
1430
1431        /* Reserve space for the record marker */
1432        if (rqstp->rq_prot == IPPROTO_TCP)
1433                svc_putnl(resv, 0);
1434
1435        /* Returns 1 for send, 0 for drop */
1436        if (likely(svc_process_common(rqstp, argv, resv)))
1437                return svc_send(rqstp);
1438
1439out_drop:
1440        svc_drop(rqstp);
1441        return 0;
1442}
1443EXPORT_SYMBOL_GPL(svc_process);
1444
1445#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1446/*
1447 * Process a backchannel RPC request that arrived over an existing
1448 * outbound connection
1449 */
1450int
1451bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1452               struct svc_rqst *rqstp)
1453{
1454        struct kvec     *argv = &rqstp->rq_arg.head[0];
1455        struct kvec     *resv = &rqstp->rq_res.head[0];
1456        struct rpc_task *task;
1457        int proc_error;
1458        int error;
1459
1460        dprintk("svc: %s(%p)\n", __func__, req);
1461
1462        /* Build the svc_rqst used by the common processing routine */
1463        rqstp->rq_xid = req->rq_xid;
1464        rqstp->rq_prot = req->rq_xprt->prot;
1465        rqstp->rq_server = serv;
1466        rqstp->rq_bc_net = req->rq_xprt->xprt_net;
1467
1468        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1469        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1470        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1471        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1472
1473        /* Adjust the argument buffer length */
1474        rqstp->rq_arg.len = req->rq_private_buf.len;
1475        if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1476                rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1477                rqstp->rq_arg.page_len = 0;
1478        } else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
1479                        rqstp->rq_arg.page_len)
1480                rqstp->rq_arg.page_len = rqstp->rq_arg.len -
1481                        rqstp->rq_arg.head[0].iov_len;
1482        else
1483                rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
1484                        rqstp->rq_arg.page_len;
1485
1486        /* reset result send buffer "put" position */
1487        resv->iov_len = 0;
1488
1489        /*
1490         * Skip the next two words because they've already been
1491         * processed in the transport
1492         */
1493        svc_getu32(argv);       /* XID */
1494        svc_getnl(argv);        /* CALLDIR */
1495
1496        /* Parse and execute the bc call */
1497        proc_error = svc_process_common(rqstp, argv, resv);
1498
1499        atomic_inc(&req->rq_xprt->bc_free_slots);
1500        if (!proc_error) {
1501                /* Processing error: drop the request */
1502                xprt_free_bc_request(req);
1503                error = -EINVAL;
1504                goto out;
1505        }
1506        /* Finally, send the reply synchronously */
1507        memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1508        task = rpc_run_bc_task(req);
1509        if (IS_ERR(task)) {
1510                error = PTR_ERR(task);
1511                goto out;
1512        }
1513
1514        WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
1515        error = task->tk_status;
1516        rpc_put_task(task);
1517
1518out:
1519        dprintk("svc: %s(), error=%d\n", __func__, error);
1520        return error;
1521}
1522EXPORT_SYMBOL_GPL(bc_svc_process);
1523#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1524
1525/*
1526 * Return (transport-specific) limit on the rpc payload.
1527 */
1528u32 svc_max_payload(const struct svc_rqst *rqstp)
1529{
1530        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1531
1532        if (rqstp->rq_server->sv_max_payload < max)
1533                max = rqstp->rq_server->sv_max_payload;
1534        return max;
1535}
1536EXPORT_SYMBOL_GPL(svc_max_payload);
1537
1538/**
1539 * svc_fill_write_vector - Construct data argument for VFS write call
1540 * @rqstp: svc_rqst to operate on
1541 * @pages: list of pages containing data payload
1542 * @first: buffer containing first section of write payload
1543 * @total: total number of bytes of write payload
1544 *
1545 * Fills in rqstp::rq_vec, and returns the number of elements.
1546 */
1547unsigned int svc_fill_write_vector(struct svc_rqst *rqstp, struct page **pages,
1548                                   struct kvec *first, size_t total)
1549{
1550        struct kvec *vec = rqstp->rq_vec;
1551        unsigned int i;
1552
1553        /* Some types of transport can present the write payload
1554         * entirely in rq_arg.pages. In this case, @first is empty.
1555         */
1556        i = 0;
1557        if (first->iov_len) {
1558                vec[i].iov_base = first->iov_base;
1559                vec[i].iov_len = min_t(size_t, total, first->iov_len);
1560                total -= vec[i].iov_len;
1561                ++i;
1562        }
1563
1564        while (total) {
1565                vec[i].iov_base = page_address(*pages);
1566                vec[i].iov_len = min_t(size_t, total, PAGE_SIZE);
1567                total -= vec[i].iov_len;
1568                ++i;
1569                ++pages;
1570        }
1571
1572        WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec));
1573        return i;
1574}
1575EXPORT_SYMBOL_GPL(svc_fill_write_vector);
1576
1577/**
1578 * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call
1579 * @rqstp: svc_rqst to operate on
1580 * @first: buffer containing first section of pathname
1581 * @p: buffer containing remaining section of pathname
1582 * @total: total length of the pathname argument
1583 *
1584 * The VFS symlink API demands a NUL-terminated pathname in mapped memory.
1585 * Returns pointer to a NUL-terminated string, or an ERR_PTR. Caller must free
1586 * the returned string.
1587 */
1588char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first,
1589                                void *p, size_t total)
1590{
1591        size_t len, remaining;
1592        char *result, *dst;
1593
1594        result = kmalloc(total + 1, GFP_KERNEL);
1595        if (!result)
1596                return ERR_PTR(-ESERVERFAULT);
1597
1598        dst = result;
1599        remaining = total;
1600
1601        len = min_t(size_t, total, first->iov_len);
1602        if (len) {
1603                memcpy(dst, first->iov_base, len);
1604                dst += len;
1605                remaining -= len;
1606        }
1607
1608        if (remaining) {
1609                len = min_t(size_t, remaining, PAGE_SIZE);
1610                memcpy(dst, p, len);
1611                dst += len;
1612        }
1613
1614        *dst = '\0';
1615
1616        /* Sanity check: Linux doesn't allow the pathname argument to
1617         * contain a NUL byte.
1618         */
1619        if (strlen(result) != total) {
1620                kfree(result);
1621                return ERR_PTR(-EINVAL);
1622        }
1623        return result;
1624}
1625EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname);
1626