linux/net/sunrpc/svc.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/svc.c
   3 *
   4 * High-level RPC service routines
   5 *
   6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   7 *
   8 * Multiple threads pools and NUMAisation
   9 * Copyright (c) 2006 Silicon Graphics, Inc.
  10 * by Greg Banks <gnb@melbourne.sgi.com>
  11 */
  12
  13#include <linux/linkage.h>
  14#include <linux/sched/signal.h>
  15#include <linux/errno.h>
  16#include <linux/net.h>
  17#include <linux/in.h>
  18#include <linux/mm.h>
  19#include <linux/interrupt.h>
  20#include <linux/module.h>
  21#include <linux/kthread.h>
  22#include <linux/slab.h>
  23
  24#include <linux/sunrpc/types.h>
  25#include <linux/sunrpc/xdr.h>
  26#include <linux/sunrpc/stats.h>
  27#include <linux/sunrpc/svcsock.h>
  28#include <linux/sunrpc/clnt.h>
  29#include <linux/sunrpc/bc_xprt.h>
  30
  31#include <trace/events/sunrpc.h>
  32
  33#define RPCDBG_FACILITY RPCDBG_SVCDSP
  34
  35static void svc_unregister(const struct svc_serv *serv, struct net *net);
  36
  37#define svc_serv_is_pooled(serv)    ((serv)->sv_ops->svo_function)
  38
  39#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  40
  41/*
  42 * Structure for mapping cpus to pools and vice versa.
  43 * Setup once during sunrpc initialisation.
  44 */
  45struct svc_pool_map svc_pool_map = {
  46        .mode = SVC_POOL_DEFAULT
  47};
  48EXPORT_SYMBOL_GPL(svc_pool_map);
  49
  50static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  51
  52static int
  53param_set_pool_mode(const char *val, const struct kernel_param *kp)
  54{
  55        int *ip = (int *)kp->arg;
  56        struct svc_pool_map *m = &svc_pool_map;
  57        int err;
  58
  59        mutex_lock(&svc_pool_map_mutex);
  60
  61        err = -EBUSY;
  62        if (m->count)
  63                goto out;
  64
  65        err = 0;
  66        if (!strncmp(val, "auto", 4))
  67                *ip = SVC_POOL_AUTO;
  68        else if (!strncmp(val, "global", 6))
  69                *ip = SVC_POOL_GLOBAL;
  70        else if (!strncmp(val, "percpu", 6))
  71                *ip = SVC_POOL_PERCPU;
  72        else if (!strncmp(val, "pernode", 7))
  73                *ip = SVC_POOL_PERNODE;
  74        else
  75                err = -EINVAL;
  76
  77out:
  78        mutex_unlock(&svc_pool_map_mutex);
  79        return err;
  80}
  81
  82static int
  83param_get_pool_mode(char *buf, const struct kernel_param *kp)
  84{
  85        int *ip = (int *)kp->arg;
  86
  87        switch (*ip)
  88        {
  89        case SVC_POOL_AUTO:
  90                return strlcpy(buf, "auto", 20);
  91        case SVC_POOL_GLOBAL:
  92                return strlcpy(buf, "global", 20);
  93        case SVC_POOL_PERCPU:
  94                return strlcpy(buf, "percpu", 20);
  95        case SVC_POOL_PERNODE:
  96                return strlcpy(buf, "pernode", 20);
  97        default:
  98                return sprintf(buf, "%d", *ip);
  99        }
 100}
 101
 102module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 103                 &svc_pool_map.mode, 0644);
 104
 105/*
 106 * Detect best pool mapping mode heuristically,
 107 * according to the machine's topology.
 108 */
 109static int
 110svc_pool_map_choose_mode(void)
 111{
 112        unsigned int node;
 113
 114        if (nr_online_nodes > 1) {
 115                /*
 116                 * Actually have multiple NUMA nodes,
 117                 * so split pools on NUMA node boundaries
 118                 */
 119                return SVC_POOL_PERNODE;
 120        }
 121
 122        node = first_online_node;
 123        if (nr_cpus_node(node) > 2) {
 124                /*
 125                 * Non-trivial SMP, or CONFIG_NUMA on
 126                 * non-NUMA hardware, e.g. with a generic
 127                 * x86_64 kernel on Xeons.  In this case we
 128                 * want to divide the pools on cpu boundaries.
 129                 */
 130                return SVC_POOL_PERCPU;
 131        }
 132
 133        /* default: one global pool */
 134        return SVC_POOL_GLOBAL;
 135}
 136
 137/*
 138 * Allocate the to_pool[] and pool_to[] arrays.
 139 * Returns 0 on success or an errno.
 140 */
 141static int
 142svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 143{
 144        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 145        if (!m->to_pool)
 146                goto fail;
 147        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 148        if (!m->pool_to)
 149                goto fail_free;
 150
 151        return 0;
 152
 153fail_free:
 154        kfree(m->to_pool);
 155        m->to_pool = NULL;
 156fail:
 157        return -ENOMEM;
 158}
 159
 160/*
 161 * Initialise the pool map for SVC_POOL_PERCPU mode.
 162 * Returns number of pools or <0 on error.
 163 */
 164static int
 165svc_pool_map_init_percpu(struct svc_pool_map *m)
 166{
 167        unsigned int maxpools = nr_cpu_ids;
 168        unsigned int pidx = 0;
 169        unsigned int cpu;
 170        int err;
 171
 172        err = svc_pool_map_alloc_arrays(m, maxpools);
 173        if (err)
 174                return err;
 175
 176        for_each_online_cpu(cpu) {
 177                BUG_ON(pidx >= maxpools);
 178                m->to_pool[cpu] = pidx;
 179                m->pool_to[pidx] = cpu;
 180                pidx++;
 181        }
 182        /* cpus brought online later all get mapped to pool0, sorry */
 183
 184        return pidx;
 185};
 186
 187
 188/*
 189 * Initialise the pool map for SVC_POOL_PERNODE mode.
 190 * Returns number of pools or <0 on error.
 191 */
 192static int
 193svc_pool_map_init_pernode(struct svc_pool_map *m)
 194{
 195        unsigned int maxpools = nr_node_ids;
 196        unsigned int pidx = 0;
 197        unsigned int node;
 198        int err;
 199
 200        err = svc_pool_map_alloc_arrays(m, maxpools);
 201        if (err)
 202                return err;
 203
 204        for_each_node_with_cpus(node) {
 205                /* some architectures (e.g. SN2) have cpuless nodes */
 206                BUG_ON(pidx > maxpools);
 207                m->to_pool[node] = pidx;
 208                m->pool_to[pidx] = node;
 209                pidx++;
 210        }
 211        /* nodes brought online later all get mapped to pool0, sorry */
 212
 213        return pidx;
 214}
 215
 216
 217/*
 218 * Add a reference to the global map of cpus to pools (and
 219 * vice versa).  Initialise the map if we're the first user.
 220 * Returns the number of pools.
 221 */
 222unsigned int
 223svc_pool_map_get(void)
 224{
 225        struct svc_pool_map *m = &svc_pool_map;
 226        int npools = -1;
 227
 228        mutex_lock(&svc_pool_map_mutex);
 229
 230        if (m->count++) {
 231                mutex_unlock(&svc_pool_map_mutex);
 232                return m->npools;
 233        }
 234
 235        if (m->mode == SVC_POOL_AUTO)
 236                m->mode = svc_pool_map_choose_mode();
 237
 238        switch (m->mode) {
 239        case SVC_POOL_PERCPU:
 240                npools = svc_pool_map_init_percpu(m);
 241                break;
 242        case SVC_POOL_PERNODE:
 243                npools = svc_pool_map_init_pernode(m);
 244                break;
 245        }
 246
 247        if (npools < 0) {
 248                /* default, or memory allocation failure */
 249                npools = 1;
 250                m->mode = SVC_POOL_GLOBAL;
 251        }
 252        m->npools = npools;
 253
 254        mutex_unlock(&svc_pool_map_mutex);
 255        return m->npools;
 256}
 257EXPORT_SYMBOL_GPL(svc_pool_map_get);
 258
 259/*
 260 * Drop a reference to the global map of cpus to pools.
 261 * When the last reference is dropped, the map data is
 262 * freed; this allows the sysadmin to change the pool
 263 * mode using the pool_mode module option without
 264 * rebooting or re-loading sunrpc.ko.
 265 */
 266void
 267svc_pool_map_put(void)
 268{
 269        struct svc_pool_map *m = &svc_pool_map;
 270
 271        mutex_lock(&svc_pool_map_mutex);
 272
 273        if (!--m->count) {
 274                kfree(m->to_pool);
 275                m->to_pool = NULL;
 276                kfree(m->pool_to);
 277                m->pool_to = NULL;
 278                m->npools = 0;
 279        }
 280
 281        mutex_unlock(&svc_pool_map_mutex);
 282}
 283EXPORT_SYMBOL_GPL(svc_pool_map_put);
 284
 285static int svc_pool_map_get_node(unsigned int pidx)
 286{
 287        const struct svc_pool_map *m = &svc_pool_map;
 288
 289        if (m->count) {
 290                if (m->mode == SVC_POOL_PERCPU)
 291                        return cpu_to_node(m->pool_to[pidx]);
 292                if (m->mode == SVC_POOL_PERNODE)
 293                        return m->pool_to[pidx];
 294        }
 295        return NUMA_NO_NODE;
 296}
 297/*
 298 * Set the given thread's cpus_allowed mask so that it
 299 * will only run on cpus in the given pool.
 300 */
 301static inline void
 302svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 303{
 304        struct svc_pool_map *m = &svc_pool_map;
 305        unsigned int node = m->pool_to[pidx];
 306
 307        /*
 308         * The caller checks for sv_nrpools > 1, which
 309         * implies that we've been initialized.
 310         */
 311        WARN_ON_ONCE(m->count == 0);
 312        if (m->count == 0)
 313                return;
 314
 315        switch (m->mode) {
 316        case SVC_POOL_PERCPU:
 317        {
 318                set_cpus_allowed_ptr(task, cpumask_of(node));
 319                break;
 320        }
 321        case SVC_POOL_PERNODE:
 322        {
 323                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 324                break;
 325        }
 326        }
 327}
 328
 329/*
 330 * Use the mapping mode to choose a pool for a given CPU.
 331 * Used when enqueueing an incoming RPC.  Always returns
 332 * a non-NULL pool pointer.
 333 */
 334struct svc_pool *
 335svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 336{
 337        struct svc_pool_map *m = &svc_pool_map;
 338        unsigned int pidx = 0;
 339
 340        /*
 341         * An uninitialised map happens in a pure client when
 342         * lockd is brought up, so silently treat it the
 343         * same as SVC_POOL_GLOBAL.
 344         */
 345        if (svc_serv_is_pooled(serv)) {
 346                switch (m->mode) {
 347                case SVC_POOL_PERCPU:
 348                        pidx = m->to_pool[cpu];
 349                        break;
 350                case SVC_POOL_PERNODE:
 351                        pidx = m->to_pool[cpu_to_node(cpu)];
 352                        break;
 353                }
 354        }
 355        return &serv->sv_pools[pidx % serv->sv_nrpools];
 356}
 357
 358int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
 359{
 360        int err;
 361
 362        err = rpcb_create_local(net);
 363        if (err)
 364                return err;
 365
 366        /* Remove any stale portmap registrations */
 367        svc_unregister(serv, net);
 368        return 0;
 369}
 370EXPORT_SYMBOL_GPL(svc_rpcb_setup);
 371
 372void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
 373{
 374        svc_unregister(serv, net);
 375        rpcb_put_local(net);
 376}
 377EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
 378
 379static int svc_uses_rpcbind(struct svc_serv *serv)
 380{
 381        struct svc_program      *progp;
 382        unsigned int            i;
 383
 384        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 385                for (i = 0; i < progp->pg_nvers; i++) {
 386                        if (progp->pg_vers[i] == NULL)
 387                                continue;
 388                        if (!progp->pg_vers[i]->vs_hidden)
 389                                return 1;
 390                }
 391        }
 392
 393        return 0;
 394}
 395
 396int svc_bind(struct svc_serv *serv, struct net *net)
 397{
 398        if (!svc_uses_rpcbind(serv))
 399                return 0;
 400        return svc_rpcb_setup(serv, net);
 401}
 402EXPORT_SYMBOL_GPL(svc_bind);
 403
 404#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 405static void
 406__svc_init_bc(struct svc_serv *serv)
 407{
 408        INIT_LIST_HEAD(&serv->sv_cb_list);
 409        spin_lock_init(&serv->sv_cb_lock);
 410        init_waitqueue_head(&serv->sv_cb_waitq);
 411}
 412#else
 413static void
 414__svc_init_bc(struct svc_serv *serv)
 415{
 416}
 417#endif
 418
 419/*
 420 * Create an RPC service
 421 */
 422static struct svc_serv *
 423__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 424             const struct svc_serv_ops *ops)
 425{
 426        struct svc_serv *serv;
 427        unsigned int vers;
 428        unsigned int xdrsize;
 429        unsigned int i;
 430
 431        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 432                return NULL;
 433        serv->sv_name      = prog->pg_name;
 434        serv->sv_program   = prog;
 435        serv->sv_nrthreads = 1;
 436        serv->sv_stats     = prog->pg_stats;
 437        if (bufsize > RPCSVC_MAXPAYLOAD)
 438                bufsize = RPCSVC_MAXPAYLOAD;
 439        serv->sv_max_payload = bufsize? bufsize : 4096;
 440        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 441        serv->sv_ops = ops;
 442        xdrsize = 0;
 443        while (prog) {
 444                prog->pg_lovers = prog->pg_nvers-1;
 445                for (vers=0; vers<prog->pg_nvers ; vers++)
 446                        if (prog->pg_vers[vers]) {
 447                                prog->pg_hivers = vers;
 448                                if (prog->pg_lovers > vers)
 449                                        prog->pg_lovers = vers;
 450                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 451                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 452                        }
 453                prog = prog->pg_next;
 454        }
 455        serv->sv_xdrsize   = xdrsize;
 456        INIT_LIST_HEAD(&serv->sv_tempsocks);
 457        INIT_LIST_HEAD(&serv->sv_permsocks);
 458        timer_setup(&serv->sv_temptimer, NULL, 0);
 459        spin_lock_init(&serv->sv_lock);
 460
 461        __svc_init_bc(serv);
 462
 463        serv->sv_nrpools = npools;
 464        serv->sv_pools =
 465                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 466                        GFP_KERNEL);
 467        if (!serv->sv_pools) {
 468                kfree(serv);
 469                return NULL;
 470        }
 471
 472        for (i = 0; i < serv->sv_nrpools; i++) {
 473                struct svc_pool *pool = &serv->sv_pools[i];
 474
 475                dprintk("svc: initialising pool %u for %s\n",
 476                                i, serv->sv_name);
 477
 478                pool->sp_id = i;
 479                INIT_LIST_HEAD(&pool->sp_sockets);
 480                INIT_LIST_HEAD(&pool->sp_all_threads);
 481                spin_lock_init(&pool->sp_lock);
 482        }
 483
 484        return serv;
 485}
 486
 487struct svc_serv *
 488svc_create(struct svc_program *prog, unsigned int bufsize,
 489           const struct svc_serv_ops *ops)
 490{
 491        return __svc_create(prog, bufsize, /*npools*/1, ops);
 492}
 493EXPORT_SYMBOL_GPL(svc_create);
 494
 495struct svc_serv *
 496svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 497                  const struct svc_serv_ops *ops)
 498{
 499        struct svc_serv *serv;
 500        unsigned int npools = svc_pool_map_get();
 501
 502        serv = __svc_create(prog, bufsize, npools, ops);
 503        if (!serv)
 504                goto out_err;
 505        return serv;
 506out_err:
 507        svc_pool_map_put();
 508        return NULL;
 509}
 510EXPORT_SYMBOL_GPL(svc_create_pooled);
 511
 512void svc_shutdown_net(struct svc_serv *serv, struct net *net)
 513{
 514        svc_close_net(serv, net);
 515
 516        if (serv->sv_ops->svo_shutdown)
 517                serv->sv_ops->svo_shutdown(serv, net);
 518}
 519EXPORT_SYMBOL_GPL(svc_shutdown_net);
 520
 521/*
 522 * Destroy an RPC service. Should be called with appropriate locking to
 523 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 524 */
 525void
 526svc_destroy(struct svc_serv *serv)
 527{
 528        dprintk("svc: svc_destroy(%s, %d)\n",
 529                                serv->sv_program->pg_name,
 530                                serv->sv_nrthreads);
 531
 532        if (serv->sv_nrthreads) {
 533                if (--(serv->sv_nrthreads) != 0) {
 534                        svc_sock_update_bufs(serv);
 535                        return;
 536                }
 537        } else
 538                printk("svc_destroy: no threads for serv=%p!\n", serv);
 539
 540        del_timer_sync(&serv->sv_temptimer);
 541
 542        /*
 543         * The last user is gone and thus all sockets have to be destroyed to
 544         * the point. Check this.
 545         */
 546        BUG_ON(!list_empty(&serv->sv_permsocks));
 547        BUG_ON(!list_empty(&serv->sv_tempsocks));
 548
 549        cache_clean_deferred(serv);
 550
 551        if (svc_serv_is_pooled(serv))
 552                svc_pool_map_put();
 553
 554        kfree(serv->sv_pools);
 555        kfree(serv);
 556}
 557EXPORT_SYMBOL_GPL(svc_destroy);
 558
 559/*
 560 * Allocate an RPC server's buffer space.
 561 * We allocate pages and place them in rq_argpages.
 562 */
 563static int
 564svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
 565{
 566        unsigned int pages, arghi;
 567
 568        /* bc_xprt uses fore channel allocated buffers */
 569        if (svc_is_backchannel(rqstp))
 570                return 1;
 571
 572        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 573                                       * We assume one is at most one page
 574                                       */
 575        arghi = 0;
 576        WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
 577        if (pages > RPCSVC_MAXPAGES)
 578                pages = RPCSVC_MAXPAGES;
 579        while (pages) {
 580                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
 581                if (!p)
 582                        break;
 583                rqstp->rq_pages[arghi++] = p;
 584                pages--;
 585        }
 586        return pages == 0;
 587}
 588
 589/*
 590 * Release an RPC server buffer
 591 */
 592static void
 593svc_release_buffer(struct svc_rqst *rqstp)
 594{
 595        unsigned int i;
 596
 597        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 598                if (rqstp->rq_pages[i])
 599                        put_page(rqstp->rq_pages[i]);
 600}
 601
 602struct svc_rqst *
 603svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
 604{
 605        struct svc_rqst *rqstp;
 606
 607        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
 608        if (!rqstp)
 609                return rqstp;
 610
 611        __set_bit(RQ_BUSY, &rqstp->rq_flags);
 612        spin_lock_init(&rqstp->rq_lock);
 613        rqstp->rq_server = serv;
 614        rqstp->rq_pool = pool;
 615
 616        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 617        if (!rqstp->rq_argp)
 618                goto out_enomem;
 619
 620        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 621        if (!rqstp->rq_resp)
 622                goto out_enomem;
 623
 624        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
 625                goto out_enomem;
 626
 627        return rqstp;
 628out_enomem:
 629        svc_rqst_free(rqstp);
 630        return NULL;
 631}
 632EXPORT_SYMBOL_GPL(svc_rqst_alloc);
 633
 634struct svc_rqst *
 635svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 636{
 637        struct svc_rqst *rqstp;
 638
 639        rqstp = svc_rqst_alloc(serv, pool, node);
 640        if (!rqstp)
 641                return ERR_PTR(-ENOMEM);
 642
 643        serv->sv_nrthreads++;
 644        spin_lock_bh(&pool->sp_lock);
 645        pool->sp_nrthreads++;
 646        list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
 647        spin_unlock_bh(&pool->sp_lock);
 648        return rqstp;
 649}
 650EXPORT_SYMBOL_GPL(svc_prepare_thread);
 651
 652/*
 653 * Choose a pool in which to create a new thread, for svc_set_num_threads
 654 */
 655static inline struct svc_pool *
 656choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 657{
 658        if (pool != NULL)
 659                return pool;
 660
 661        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 662}
 663
 664/*
 665 * Choose a thread to kill, for svc_set_num_threads
 666 */
 667static inline struct task_struct *
 668choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 669{
 670        unsigned int i;
 671        struct task_struct *task = NULL;
 672
 673        if (pool != NULL) {
 674                spin_lock_bh(&pool->sp_lock);
 675        } else {
 676                /* choose a pool in round-robin fashion */
 677                for (i = 0; i < serv->sv_nrpools; i++) {
 678                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 679                        spin_lock_bh(&pool->sp_lock);
 680                        if (!list_empty(&pool->sp_all_threads))
 681                                goto found_pool;
 682                        spin_unlock_bh(&pool->sp_lock);
 683                }
 684                return NULL;
 685        }
 686
 687found_pool:
 688        if (!list_empty(&pool->sp_all_threads)) {
 689                struct svc_rqst *rqstp;
 690
 691                /*
 692                 * Remove from the pool->sp_all_threads list
 693                 * so we don't try to kill it again.
 694                 */
 695                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 696                set_bit(RQ_VICTIM, &rqstp->rq_flags);
 697                list_del_rcu(&rqstp->rq_all);
 698                task = rqstp->rq_task;
 699        }
 700        spin_unlock_bh(&pool->sp_lock);
 701
 702        return task;
 703}
 704
 705/* create new threads */
 706static int
 707svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 708{
 709        struct svc_rqst *rqstp;
 710        struct task_struct *task;
 711        struct svc_pool *chosen_pool;
 712        unsigned int state = serv->sv_nrthreads-1;
 713        int node;
 714
 715        do {
 716                nrservs--;
 717                chosen_pool = choose_pool(serv, pool, &state);
 718
 719                node = svc_pool_map_get_node(chosen_pool->sp_id);
 720                rqstp = svc_prepare_thread(serv, chosen_pool, node);
 721                if (IS_ERR(rqstp))
 722                        return PTR_ERR(rqstp);
 723
 724                __module_get(serv->sv_ops->svo_module);
 725                task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
 726                                              node, "%s", serv->sv_name);
 727                if (IS_ERR(task)) {
 728                        module_put(serv->sv_ops->svo_module);
 729                        svc_exit_thread(rqstp);
 730                        return PTR_ERR(task);
 731                }
 732
 733                rqstp->rq_task = task;
 734                if (serv->sv_nrpools > 1)
 735                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 736
 737                svc_sock_update_bufs(serv);
 738                wake_up_process(task);
 739        } while (nrservs > 0);
 740
 741        return 0;
 742}
 743
 744
 745/* destroy old threads */
 746static int
 747svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 748{
 749        struct task_struct *task;
 750        unsigned int state = serv->sv_nrthreads-1;
 751
 752        /* destroy old threads */
 753        do {
 754                task = choose_victim(serv, pool, &state);
 755                if (task == NULL)
 756                        break;
 757                send_sig(SIGINT, task, 1);
 758                nrservs++;
 759        } while (nrservs < 0);
 760
 761        return 0;
 762}
 763
 764/*
 765 * Create or destroy enough new threads to make the number
 766 * of threads the given number.  If `pool' is non-NULL, applies
 767 * only to threads in that pool, otherwise round-robins between
 768 * all pools.  Caller must ensure that mutual exclusion between this and
 769 * server startup or shutdown.
 770 *
 771 * Destroying threads relies on the service threads filling in
 772 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 773 * has been created using svc_create_pooled().
 774 *
 775 * Based on code that used to be in nfsd_svc() but tweaked
 776 * to be pool-aware.
 777 */
 778int
 779svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 780{
 781        if (pool == NULL) {
 782                /* The -1 assumes caller has done a svc_get() */
 783                nrservs -= (serv->sv_nrthreads-1);
 784        } else {
 785                spin_lock_bh(&pool->sp_lock);
 786                nrservs -= pool->sp_nrthreads;
 787                spin_unlock_bh(&pool->sp_lock);
 788        }
 789
 790        if (nrservs > 0)
 791                return svc_start_kthreads(serv, pool, nrservs);
 792        if (nrservs < 0)
 793                return svc_signal_kthreads(serv, pool, nrservs);
 794        return 0;
 795}
 796EXPORT_SYMBOL_GPL(svc_set_num_threads);
 797
 798/* destroy old threads */
 799static int
 800svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 801{
 802        struct task_struct *task;
 803        unsigned int state = serv->sv_nrthreads-1;
 804
 805        /* destroy old threads */
 806        do {
 807                task = choose_victim(serv, pool, &state);
 808                if (task == NULL)
 809                        break;
 810                kthread_stop(task);
 811                nrservs++;
 812        } while (nrservs < 0);
 813        return 0;
 814}
 815
 816int
 817svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 818{
 819        if (pool == NULL) {
 820                /* The -1 assumes caller has done a svc_get() */
 821                nrservs -= (serv->sv_nrthreads-1);
 822        } else {
 823                spin_lock_bh(&pool->sp_lock);
 824                nrservs -= pool->sp_nrthreads;
 825                spin_unlock_bh(&pool->sp_lock);
 826        }
 827
 828        if (nrservs > 0)
 829                return svc_start_kthreads(serv, pool, nrservs);
 830        if (nrservs < 0)
 831                return svc_stop_kthreads(serv, pool, nrservs);
 832        return 0;
 833}
 834EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
 835
 836/*
 837 * Called from a server thread as it's exiting. Caller must hold the "service
 838 * mutex" for the service.
 839 */
 840void
 841svc_rqst_free(struct svc_rqst *rqstp)
 842{
 843        svc_release_buffer(rqstp);
 844        kfree(rqstp->rq_resp);
 845        kfree(rqstp->rq_argp);
 846        kfree(rqstp->rq_auth_data);
 847        kfree_rcu(rqstp, rq_rcu_head);
 848}
 849EXPORT_SYMBOL_GPL(svc_rqst_free);
 850
 851void
 852svc_exit_thread(struct svc_rqst *rqstp)
 853{
 854        struct svc_serv *serv = rqstp->rq_server;
 855        struct svc_pool *pool = rqstp->rq_pool;
 856
 857        spin_lock_bh(&pool->sp_lock);
 858        pool->sp_nrthreads--;
 859        if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
 860                list_del_rcu(&rqstp->rq_all);
 861        spin_unlock_bh(&pool->sp_lock);
 862
 863        svc_rqst_free(rqstp);
 864
 865        /* Release the server */
 866        if (serv)
 867                svc_destroy(serv);
 868}
 869EXPORT_SYMBOL_GPL(svc_exit_thread);
 870
 871/*
 872 * Register an "inet" protocol family netid with the local
 873 * rpcbind daemon via an rpcbind v4 SET request.
 874 *
 875 * No netconfig infrastructure is available in the kernel, so
 876 * we map IP_ protocol numbers to netids by hand.
 877 *
 878 * Returns zero on success; a negative errno value is returned
 879 * if any error occurs.
 880 */
 881static int __svc_rpcb_register4(struct net *net, const u32 program,
 882                                const u32 version,
 883                                const unsigned short protocol,
 884                                const unsigned short port)
 885{
 886        const struct sockaddr_in sin = {
 887                .sin_family             = AF_INET,
 888                .sin_addr.s_addr        = htonl(INADDR_ANY),
 889                .sin_port               = htons(port),
 890        };
 891        const char *netid;
 892        int error;
 893
 894        switch (protocol) {
 895        case IPPROTO_UDP:
 896                netid = RPCBIND_NETID_UDP;
 897                break;
 898        case IPPROTO_TCP:
 899                netid = RPCBIND_NETID_TCP;
 900                break;
 901        default:
 902                return -ENOPROTOOPT;
 903        }
 904
 905        error = rpcb_v4_register(net, program, version,
 906                                        (const struct sockaddr *)&sin, netid);
 907
 908        /*
 909         * User space didn't support rpcbind v4, so retry this
 910         * registration request with the legacy rpcbind v2 protocol.
 911         */
 912        if (error == -EPROTONOSUPPORT)
 913                error = rpcb_register(net, program, version, protocol, port);
 914
 915        return error;
 916}
 917
 918#if IS_ENABLED(CONFIG_IPV6)
 919/*
 920 * Register an "inet6" protocol family netid with the local
 921 * rpcbind daemon via an rpcbind v4 SET request.
 922 *
 923 * No netconfig infrastructure is available in the kernel, so
 924 * we map IP_ protocol numbers to netids by hand.
 925 *
 926 * Returns zero on success; a negative errno value is returned
 927 * if any error occurs.
 928 */
 929static int __svc_rpcb_register6(struct net *net, const u32 program,
 930                                const u32 version,
 931                                const unsigned short protocol,
 932                                const unsigned short port)
 933{
 934        const struct sockaddr_in6 sin6 = {
 935                .sin6_family            = AF_INET6,
 936                .sin6_addr              = IN6ADDR_ANY_INIT,
 937                .sin6_port              = htons(port),
 938        };
 939        const char *netid;
 940        int error;
 941
 942        switch (protocol) {
 943        case IPPROTO_UDP:
 944                netid = RPCBIND_NETID_UDP6;
 945                break;
 946        case IPPROTO_TCP:
 947                netid = RPCBIND_NETID_TCP6;
 948                break;
 949        default:
 950                return -ENOPROTOOPT;
 951        }
 952
 953        error = rpcb_v4_register(net, program, version,
 954                                        (const struct sockaddr *)&sin6, netid);
 955
 956        /*
 957         * User space didn't support rpcbind version 4, so we won't
 958         * use a PF_INET6 listener.
 959         */
 960        if (error == -EPROTONOSUPPORT)
 961                error = -EAFNOSUPPORT;
 962
 963        return error;
 964}
 965#endif  /* IS_ENABLED(CONFIG_IPV6) */
 966
 967/*
 968 * Register a kernel RPC service via rpcbind version 4.
 969 *
 970 * Returns zero on success; a negative errno value is returned
 971 * if any error occurs.
 972 */
 973static int __svc_register(struct net *net, const char *progname,
 974                          const u32 program, const u32 version,
 975                          const int family,
 976                          const unsigned short protocol,
 977                          const unsigned short port)
 978{
 979        int error = -EAFNOSUPPORT;
 980
 981        switch (family) {
 982        case PF_INET:
 983                error = __svc_rpcb_register4(net, program, version,
 984                                                protocol, port);
 985                break;
 986#if IS_ENABLED(CONFIG_IPV6)
 987        case PF_INET6:
 988                error = __svc_rpcb_register6(net, program, version,
 989                                                protocol, port);
 990#endif
 991        }
 992
 993        return error;
 994}
 995
 996/**
 997 * svc_register - register an RPC service with the local portmapper
 998 * @serv: svc_serv struct for the service to register
 999 * @net: net namespace for the service to register
1000 * @family: protocol family of service's listener socket
1001 * @proto: transport protocol number to advertise
1002 * @port: port to advertise
1003 *
1004 * Service is registered for any address in the passed-in protocol family
1005 */
1006int svc_register(const struct svc_serv *serv, struct net *net,
1007                 const int family, const unsigned short proto,
1008                 const unsigned short port)
1009{
1010        struct svc_program      *progp;
1011        const struct svc_version *vers;
1012        unsigned int            i;
1013        int                     error = 0;
1014
1015        WARN_ON_ONCE(proto == 0 && port == 0);
1016        if (proto == 0 && port == 0)
1017                return -EINVAL;
1018
1019        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1020                for (i = 0; i < progp->pg_nvers; i++) {
1021                        vers = progp->pg_vers[i];
1022                        if (vers == NULL)
1023                                continue;
1024
1025                        dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
1026                                        progp->pg_name,
1027                                        i,
1028                                        proto == IPPROTO_UDP?  "udp" : "tcp",
1029                                        port,
1030                                        family,
1031                                        vers->vs_hidden ?
1032                                        " (but not telling portmap)" : "");
1033
1034                        if (vers->vs_hidden)
1035                                continue;
1036
1037                        /*
1038                         * Don't register a UDP port if we need congestion
1039                         * control.
1040                         */
1041                        if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
1042                                continue;
1043
1044                        error = __svc_register(net, progp->pg_name, progp->pg_prog,
1045                                                i, family, proto, port);
1046
1047                        if (vers->vs_rpcb_optnl) {
1048                                error = 0;
1049                                continue;
1050                        }
1051
1052                        if (error < 0) {
1053                                printk(KERN_WARNING "svc: failed to register "
1054                                        "%sv%u RPC service (errno %d).\n",
1055                                        progp->pg_name, i, -error);
1056                                break;
1057                        }
1058                }
1059        }
1060
1061        return error;
1062}
1063
1064/*
1065 * If user space is running rpcbind, it should take the v4 UNSET
1066 * and clear everything for this [program, version].  If user space
1067 * is running portmap, it will reject the v4 UNSET, but won't have
1068 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
1069 * in this case to clear all existing entries for [program, version].
1070 */
1071static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1072                             const char *progname)
1073{
1074        int error;
1075
1076        error = rpcb_v4_register(net, program, version, NULL, "");
1077
1078        /*
1079         * User space didn't support rpcbind v4, so retry this
1080         * request with the legacy rpcbind v2 protocol.
1081         */
1082        if (error == -EPROTONOSUPPORT)
1083                error = rpcb_register(net, program, version, 0, 0);
1084
1085        dprintk("svc: %s(%sv%u), error %d\n",
1086                        __func__, progname, version, error);
1087}
1088
1089/*
1090 * All netids, bind addresses and ports registered for [program, version]
1091 * are removed from the local rpcbind database (if the service is not
1092 * hidden) to make way for a new instance of the service.
1093 *
1094 * The result of unregistration is reported via dprintk for those who want
1095 * verification of the result, but is otherwise not important.
1096 */
1097static void svc_unregister(const struct svc_serv *serv, struct net *net)
1098{
1099        struct svc_program *progp;
1100        unsigned long flags;
1101        unsigned int i;
1102
1103        clear_thread_flag(TIF_SIGPENDING);
1104
1105        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1106                for (i = 0; i < progp->pg_nvers; i++) {
1107                        if (progp->pg_vers[i] == NULL)
1108                                continue;
1109                        if (progp->pg_vers[i]->vs_hidden)
1110                                continue;
1111
1112                        dprintk("svc: attempting to unregister %sv%u\n",
1113                                progp->pg_name, i);
1114                        __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1115                }
1116        }
1117
1118        spin_lock_irqsave(&current->sighand->siglock, flags);
1119        recalc_sigpending();
1120        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1121}
1122
1123/*
1124 * dprintk the given error with the address of the client that caused it.
1125 */
1126#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1127static __printf(2, 3)
1128void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1129{
1130        struct va_format vaf;
1131        va_list args;
1132        char    buf[RPC_MAX_ADDRBUFLEN];
1133
1134        va_start(args, fmt);
1135
1136        vaf.fmt = fmt;
1137        vaf.va = &args;
1138
1139        dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1140
1141        va_end(args);
1142}
1143#else
1144static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1145#endif
1146
1147/*
1148 * Common routine for processing the RPC request.
1149 */
1150static int
1151svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1152{
1153        struct svc_program      *progp;
1154        const struct svc_version *versp = NULL; /* compiler food */
1155        const struct svc_procedure *procp = NULL;
1156        struct svc_serv         *serv = rqstp->rq_server;
1157        __be32                  *statp;
1158        u32                     prog, vers, proc;
1159        __be32                  auth_stat, rpc_stat;
1160        int                     auth_res;
1161        __be32                  *reply_statp;
1162
1163        rpc_stat = rpc_success;
1164
1165        if (argv->iov_len < 6*4)
1166                goto err_short_len;
1167
1168        /* Will be turned off by GSS integrity and privacy services */
1169        set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1170        /* Will be turned off only when NFSv4 Sessions are used */
1171        set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1172        clear_bit(RQ_DROPME, &rqstp->rq_flags);
1173
1174        /* Setup reply header */
1175        rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1176
1177        svc_putu32(resv, rqstp->rq_xid);
1178
1179        vers = svc_getnl(argv);
1180
1181        /* First words of reply: */
1182        svc_putnl(resv, 1);             /* REPLY */
1183
1184        if (vers != 2)          /* RPC version number */
1185                goto err_bad_rpc;
1186
1187        /* Save position in case we later decide to reject: */
1188        reply_statp = resv->iov_base + resv->iov_len;
1189
1190        svc_putnl(resv, 0);             /* ACCEPT */
1191
1192        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1193        rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
1194        rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
1195
1196        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1197                if (prog == progp->pg_prog)
1198                        break;
1199
1200        /*
1201         * Decode auth data, and add verifier to reply buffer.
1202         * We do this before anything else in order to get a decent
1203         * auth verifier.
1204         */
1205        auth_res = svc_authenticate(rqstp, &auth_stat);
1206        /* Also give the program a chance to reject this call: */
1207        if (auth_res == SVC_OK && progp) {
1208                auth_stat = rpc_autherr_badcred;
1209                auth_res = progp->pg_authenticate(rqstp);
1210        }
1211        switch (auth_res) {
1212        case SVC_OK:
1213                break;
1214        case SVC_GARBAGE:
1215                goto err_garbage;
1216        case SVC_SYSERR:
1217                rpc_stat = rpc_system_err;
1218                goto err_bad;
1219        case SVC_DENIED:
1220                goto err_bad_auth;
1221        case SVC_CLOSE:
1222                goto close;
1223        case SVC_DROP:
1224                goto dropit;
1225        case SVC_COMPLETE:
1226                goto sendit;
1227        }
1228
1229        if (progp == NULL)
1230                goto err_bad_prog;
1231
1232        if (vers >= progp->pg_nvers ||
1233          !(versp = progp->pg_vers[vers]))
1234                goto err_bad_vers;
1235
1236        /*
1237         * Some protocol versions (namely NFSv4) require some form of
1238         * congestion control.  (See RFC 7530 section 3.1 paragraph 2)
1239         * In other words, UDP is not allowed. We mark those when setting
1240         * up the svc_xprt, and verify that here.
1241         *
1242         * The spec is not very clear about what error should be returned
1243         * when someone tries to access a server that is listening on UDP
1244         * for lower versions. RPC_PROG_MISMATCH seems to be the closest
1245         * fit.
1246         */
1247        if (versp->vs_need_cong_ctrl &&
1248            !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
1249                goto err_bad_vers;
1250
1251        procp = versp->vs_proc + proc;
1252        if (proc >= versp->vs_nproc || !procp->pc_func)
1253                goto err_bad_proc;
1254        rqstp->rq_procinfo = procp;
1255
1256        /* Syntactic check complete */
1257        serv->sv_stats->rpccnt++;
1258        trace_svc_process(rqstp, progp->pg_name);
1259
1260        /* Build the reply header. */
1261        statp = resv->iov_base +resv->iov_len;
1262        svc_putnl(resv, RPC_SUCCESS);
1263
1264        /* Bump per-procedure stats counter */
1265        versp->vs_count[proc]++;
1266
1267        /* Initialize storage for argp and resp */
1268        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1269        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1270
1271        /* un-reserve some of the out-queue now that we have a
1272         * better idea of reply size
1273         */
1274        if (procp->pc_xdrressize)
1275                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1276
1277        /* Call the function that processes the request. */
1278        if (!versp->vs_dispatch) {
1279                /*
1280                 * Decode arguments
1281                 * XXX: why do we ignore the return value?
1282                 */
1283                if (procp->pc_decode &&
1284                    !procp->pc_decode(rqstp, argv->iov_base))
1285                        goto err_garbage;
1286
1287                *statp = procp->pc_func(rqstp);
1288
1289                /* Encode reply */
1290                if (*statp == rpc_drop_reply ||
1291                    test_bit(RQ_DROPME, &rqstp->rq_flags)) {
1292                        if (procp->pc_release)
1293                                procp->pc_release(rqstp);
1294                        goto dropit;
1295                }
1296                if (*statp == rpc_autherr_badcred) {
1297                        if (procp->pc_release)
1298                                procp->pc_release(rqstp);
1299                        goto err_bad_auth;
1300                }
1301                if (*statp == rpc_success && procp->pc_encode &&
1302                    !procp->pc_encode(rqstp, resv->iov_base + resv->iov_len)) {
1303                        dprintk("svc: failed to encode reply\n");
1304                        /* serv->sv_stats->rpcsystemerr++; */
1305                        *statp = rpc_system_err;
1306                }
1307        } else {
1308                dprintk("svc: calling dispatcher\n");
1309                if (!versp->vs_dispatch(rqstp, statp)) {
1310                        /* Release reply info */
1311                        if (procp->pc_release)
1312                                procp->pc_release(rqstp);
1313                        goto dropit;
1314                }
1315        }
1316
1317        /* Check RPC status result */
1318        if (*statp != rpc_success)
1319                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1320
1321        /* Release reply info */
1322        if (procp->pc_release)
1323                procp->pc_release(rqstp);
1324
1325        if (procp->pc_encode == NULL)
1326                goto dropit;
1327
1328 sendit:
1329        if (svc_authorise(rqstp))
1330                goto close;
1331        return 1;               /* Caller can now send it */
1332
1333 dropit:
1334        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1335        dprintk("svc: svc_process dropit\n");
1336        return 0;
1337
1338 close:
1339        if (test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1340                svc_close_xprt(rqstp->rq_xprt);
1341        dprintk("svc: svc_process close\n");
1342        return 0;
1343
1344err_short_len:
1345        svc_printk(rqstp, "short len %zd, dropping request\n",
1346                        argv->iov_len);
1347        goto close;
1348
1349err_bad_rpc:
1350        serv->sv_stats->rpcbadfmt++;
1351        svc_putnl(resv, 1);     /* REJECT */
1352        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1353        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1354        svc_putnl(resv, 2);
1355        goto sendit;
1356
1357err_bad_auth:
1358        dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1359        serv->sv_stats->rpcbadauth++;
1360        /* Restore write pointer to location of accept status: */
1361        xdr_ressize_check(rqstp, reply_statp);
1362        svc_putnl(resv, 1);     /* REJECT */
1363        svc_putnl(resv, 1);     /* AUTH_ERROR */
1364        svc_putnl(resv, ntohl(auth_stat));      /* status */
1365        goto sendit;
1366
1367err_bad_prog:
1368        dprintk("svc: unknown program %d\n", prog);
1369        serv->sv_stats->rpcbadfmt++;
1370        svc_putnl(resv, RPC_PROG_UNAVAIL);
1371        goto sendit;
1372
1373err_bad_vers:
1374        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1375                       vers, prog, progp->pg_name);
1376
1377        serv->sv_stats->rpcbadfmt++;
1378        svc_putnl(resv, RPC_PROG_MISMATCH);
1379        svc_putnl(resv, progp->pg_lovers);
1380        svc_putnl(resv, progp->pg_hivers);
1381        goto sendit;
1382
1383err_bad_proc:
1384        svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1385
1386        serv->sv_stats->rpcbadfmt++;
1387        svc_putnl(resv, RPC_PROC_UNAVAIL);
1388        goto sendit;
1389
1390err_garbage:
1391        svc_printk(rqstp, "failed to decode args\n");
1392
1393        rpc_stat = rpc_garbage_args;
1394err_bad:
1395        serv->sv_stats->rpcbadfmt++;
1396        svc_putnl(resv, ntohl(rpc_stat));
1397        goto sendit;
1398}
1399
1400/*
1401 * Process the RPC request.
1402 */
1403int
1404svc_process(struct svc_rqst *rqstp)
1405{
1406        struct kvec             *argv = &rqstp->rq_arg.head[0];
1407        struct kvec             *resv = &rqstp->rq_res.head[0];
1408        struct svc_serv         *serv = rqstp->rq_server;
1409        u32                     dir;
1410
1411        /*
1412         * Setup response xdr_buf.
1413         * Initially it has just one page
1414         */
1415        rqstp->rq_next_page = &rqstp->rq_respages[1];
1416        resv->iov_base = page_address(rqstp->rq_respages[0]);
1417        resv->iov_len = 0;
1418        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1419        rqstp->rq_res.len = 0;
1420        rqstp->rq_res.page_base = 0;
1421        rqstp->rq_res.page_len = 0;
1422        rqstp->rq_res.buflen = PAGE_SIZE;
1423        rqstp->rq_res.tail[0].iov_base = NULL;
1424        rqstp->rq_res.tail[0].iov_len = 0;
1425
1426        dir  = svc_getnl(argv);
1427        if (dir != 0) {
1428                /* direction != CALL */
1429                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1430                serv->sv_stats->rpcbadfmt++;
1431                goto out_drop;
1432        }
1433
1434        /* Returns 1 for send, 0 for drop */
1435        if (likely(svc_process_common(rqstp, argv, resv)))
1436                return svc_send(rqstp);
1437
1438out_drop:
1439        svc_drop(rqstp);
1440        return 0;
1441}
1442EXPORT_SYMBOL_GPL(svc_process);
1443
1444#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1445/*
1446 * Process a backchannel RPC request that arrived over an existing
1447 * outbound connection
1448 */
1449int
1450bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1451               struct svc_rqst *rqstp)
1452{
1453        struct kvec     *argv = &rqstp->rq_arg.head[0];
1454        struct kvec     *resv = &rqstp->rq_res.head[0];
1455        struct rpc_task *task;
1456        int proc_error;
1457        int error;
1458
1459        dprintk("svc: %s(%p)\n", __func__, req);
1460
1461        /* Build the svc_rqst used by the common processing routine */
1462        rqstp->rq_xprt = serv->sv_bc_xprt;
1463        rqstp->rq_xid = req->rq_xid;
1464        rqstp->rq_prot = req->rq_xprt->prot;
1465        rqstp->rq_server = serv;
1466
1467        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1468        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1469        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1470        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1471
1472        /* Adjust the argument buffer length */
1473        rqstp->rq_arg.len = req->rq_private_buf.len;
1474        if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1475                rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1476                rqstp->rq_arg.page_len = 0;
1477        } else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
1478                        rqstp->rq_arg.page_len)
1479                rqstp->rq_arg.page_len = rqstp->rq_arg.len -
1480                        rqstp->rq_arg.head[0].iov_len;
1481        else
1482                rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
1483                        rqstp->rq_arg.page_len;
1484
1485        /* reset result send buffer "put" position */
1486        resv->iov_len = 0;
1487
1488        /*
1489         * Skip the next two words because they've already been
1490         * processed in the transport
1491         */
1492        svc_getu32(argv);       /* XID */
1493        svc_getnl(argv);        /* CALLDIR */
1494
1495        /* Parse and execute the bc call */
1496        proc_error = svc_process_common(rqstp, argv, resv);
1497
1498        atomic_inc(&req->rq_xprt->bc_free_slots);
1499        if (!proc_error) {
1500                /* Processing error: drop the request */
1501                xprt_free_bc_request(req);
1502                return 0;
1503        }
1504
1505        /* Finally, send the reply synchronously */
1506        memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1507        task = rpc_run_bc_task(req);
1508        if (IS_ERR(task)) {
1509                error = PTR_ERR(task);
1510                goto out;
1511        }
1512
1513        WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
1514        error = task->tk_status;
1515        rpc_put_task(task);
1516
1517out:
1518        dprintk("svc: %s(), error=%d\n", __func__, error);
1519        return error;
1520}
1521EXPORT_SYMBOL_GPL(bc_svc_process);
1522#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1523
1524/*
1525 * Return (transport-specific) limit on the rpc payload.
1526 */
1527u32 svc_max_payload(const struct svc_rqst *rqstp)
1528{
1529        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1530
1531        if (rqstp->rq_server->sv_max_payload < max)
1532                max = rqstp->rq_server->sv_max_payload;
1533        return max;
1534}
1535EXPORT_SYMBOL_GPL(svc_max_payload);
1536
1537/**
1538 * svc_fill_write_vector - Construct data argument for VFS write call
1539 * @rqstp: svc_rqst to operate on
1540 * @first: buffer containing first section of write payload
1541 * @total: total number of bytes of write payload
1542 *
1543 * Returns the number of elements populated in the data argument array.
1544 */
1545unsigned int svc_fill_write_vector(struct svc_rqst *rqstp, struct kvec *first,
1546                                   size_t total)
1547{
1548        struct kvec *vec = rqstp->rq_vec;
1549        struct page **pages;
1550        unsigned int i;
1551
1552        /* Some types of transport can present the write payload
1553         * entirely in rq_arg.pages. In this case, @first is empty.
1554         */
1555        i = 0;
1556        if (first->iov_len) {
1557                vec[i].iov_base = first->iov_base;
1558                vec[i].iov_len = min_t(size_t, total, first->iov_len);
1559                total -= vec[i].iov_len;
1560                ++i;
1561        }
1562
1563        WARN_ON_ONCE(rqstp->rq_arg.page_base != 0);
1564        pages = rqstp->rq_arg.pages;
1565        while (total) {
1566                vec[i].iov_base = page_address(*pages);
1567                vec[i].iov_len = min_t(size_t, total, PAGE_SIZE);
1568                total -= vec[i].iov_len;
1569                ++i;
1570
1571                ++pages;
1572        }
1573
1574        WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec));
1575        return i;
1576}
1577EXPORT_SYMBOL_GPL(svc_fill_write_vector);
1578
1579/**
1580 * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call
1581 * @rqstp: svc_rqst to operate on
1582 * @first: buffer containing first section of pathname
1583 * @total: total length of the pathname argument
1584 *
1585 * Returns pointer to a NUL-terminated string, or an ERR_PTR. The buffer is
1586 * released automatically when @rqstp is recycled.
1587 */
1588char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first,
1589                                size_t total)
1590{
1591        struct xdr_buf *arg = &rqstp->rq_arg;
1592        struct page **pages;
1593        char *result;
1594
1595        /* VFS API demands a NUL-terminated pathname. This function
1596         * uses a page from @rqstp as the pathname buffer, to enable
1597         * direct placement. Thus the total buffer size is PAGE_SIZE.
1598         * Space in this buffer for NUL-termination requires that we
1599         * cap the size of the returned symlink pathname just a
1600         * little early.
1601         */
1602        if (total > PAGE_SIZE - 1)
1603                return ERR_PTR(-ENAMETOOLONG);
1604
1605        /* Some types of transport can present the pathname entirely
1606         * in rq_arg.pages. If not, then copy the pathname into one
1607         * page.
1608         */
1609        pages = arg->pages;
1610        WARN_ON_ONCE(arg->page_base != 0);
1611        if (first->iov_base == 0) {
1612                result = page_address(*pages);
1613                result[total] = '\0';
1614        } else {
1615                size_t len, remaining;
1616                char *dst;
1617
1618                result = page_address(*(rqstp->rq_next_page++));
1619                dst = result;
1620                remaining = total;
1621
1622                len = min_t(size_t, total, first->iov_len);
1623                memcpy(dst, first->iov_base, len);
1624                dst += len;
1625                remaining -= len;
1626
1627                /* No more than one page left */
1628                if (remaining) {
1629                        len = min_t(size_t, remaining, PAGE_SIZE);
1630                        memcpy(dst, page_address(*pages), len);
1631                        dst += len;
1632                }
1633
1634                *dst = '\0';
1635        }
1636
1637        /* Sanity check: we don't allow the pathname argument to
1638         * contain a NUL byte.
1639         */
1640        if (strlen(result) != total)
1641                return ERR_PTR(-EINVAL);
1642        return result;
1643}
1644EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname);
1645