linux/drivers/staging/lustre/lustre/ptlrpc/service.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2010, 2015, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_RPC
  38#include "../include/obd_support.h"
  39#include "../include/obd_class.h"
  40#include "../include/lustre_net.h"
  41#include "../include/lu_object.h"
  42#include "../../include/linux/lnet/types.h"
  43#include "ptlrpc_internal.h"
  44
  45/* The following are visible and mutable through /sys/module/ptlrpc */
  46int test_req_buffer_pressure;
  47module_param(test_req_buffer_pressure, int, 0444);
  48MODULE_PARM_DESC(test_req_buffer_pressure, "set non-zero to put pressure on request buffer pools");
  49module_param(at_min, int, 0644);
  50MODULE_PARM_DESC(at_min, "Adaptive timeout minimum (sec)");
  51module_param(at_max, int, 0644);
  52MODULE_PARM_DESC(at_max, "Adaptive timeout maximum (sec)");
  53module_param(at_history, int, 0644);
  54MODULE_PARM_DESC(at_history,
  55                 "Adaptive timeouts remember the slowest event that took place within this period (sec)");
  56module_param(at_early_margin, int, 0644);
  57MODULE_PARM_DESC(at_early_margin, "How soon before an RPC deadline to send an early reply");
  58module_param(at_extra, int, 0644);
  59MODULE_PARM_DESC(at_extra, "How much extra time to give with each early reply");
  60
  61/* forward ref */
  62static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
  63static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req);
  64static void ptlrpc_at_remove_timed(struct ptlrpc_request *req);
  65
  66/** Holds a list of all PTLRPC services */
  67LIST_HEAD(ptlrpc_all_services);
  68/** Used to protect the \e ptlrpc_all_services list */
  69struct mutex ptlrpc_all_services_mutex;
  70
  71static struct ptlrpc_request_buffer_desc *
  72ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
  73{
  74        struct ptlrpc_service *svc = svcpt->scp_service;
  75        struct ptlrpc_request_buffer_desc *rqbd;
  76
  77        rqbd = kzalloc_node(sizeof(*rqbd), GFP_NOFS,
  78                            cfs_cpt_spread_node(svc->srv_cptable,
  79                                                svcpt->scp_cpt));
  80        if (!rqbd)
  81                return NULL;
  82
  83        rqbd->rqbd_svcpt = svcpt;
  84        rqbd->rqbd_refcount = 0;
  85        rqbd->rqbd_cbid.cbid_fn = request_in_callback;
  86        rqbd->rqbd_cbid.cbid_arg = rqbd;
  87        INIT_LIST_HEAD(&rqbd->rqbd_reqs);
  88        rqbd->rqbd_buffer = libcfs_kvzalloc_cpt(svc->srv_cptable,
  89                                                svcpt->scp_cpt,
  90                                                svc->srv_buf_size,
  91                                                GFP_KERNEL);
  92        if (!rqbd->rqbd_buffer) {
  93                kfree(rqbd);
  94                return NULL;
  95        }
  96
  97        spin_lock(&svcpt->scp_lock);
  98        list_add(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
  99        svcpt->scp_nrqbds_total++;
 100        spin_unlock(&svcpt->scp_lock);
 101
 102        return rqbd;
 103}
 104
 105static void
 106ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
 107{
 108        struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
 109
 110        LASSERT(rqbd->rqbd_refcount == 0);
 111        LASSERT(list_empty(&rqbd->rqbd_reqs));
 112
 113        spin_lock(&svcpt->scp_lock);
 114        list_del(&rqbd->rqbd_list);
 115        svcpt->scp_nrqbds_total--;
 116        spin_unlock(&svcpt->scp_lock);
 117
 118        kvfree(rqbd->rqbd_buffer);
 119        kfree(rqbd);
 120}
 121
 122static int
 123ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
 124{
 125        struct ptlrpc_service *svc = svcpt->scp_service;
 126        struct ptlrpc_request_buffer_desc *rqbd;
 127        int rc = 0;
 128        int i;
 129
 130        if (svcpt->scp_rqbd_allocating)
 131                goto try_post;
 132
 133        spin_lock(&svcpt->scp_lock);
 134        /* check again with lock */
 135        if (svcpt->scp_rqbd_allocating) {
 136                /* NB: we might allow more than one thread in the future */
 137                LASSERT(svcpt->scp_rqbd_allocating == 1);
 138                spin_unlock(&svcpt->scp_lock);
 139                goto try_post;
 140        }
 141
 142        svcpt->scp_rqbd_allocating++;
 143        spin_unlock(&svcpt->scp_lock);
 144
 145        for (i = 0; i < svc->srv_nbuf_per_group; i++) {
 146                /* NB: another thread might have recycled enough rqbds, we
 147                 * need to make sure it wouldn't over-allocate, see LU-1212.
 148                 */
 149                if (svcpt->scp_nrqbds_posted >= svc->srv_nbuf_per_group)
 150                        break;
 151
 152                rqbd = ptlrpc_alloc_rqbd(svcpt);
 153
 154                if (!rqbd) {
 155                        CERROR("%s: Can't allocate request buffer\n",
 156                               svc->srv_name);
 157                        rc = -ENOMEM;
 158                        break;
 159                }
 160        }
 161
 162        spin_lock(&svcpt->scp_lock);
 163
 164        LASSERT(svcpt->scp_rqbd_allocating == 1);
 165        svcpt->scp_rqbd_allocating--;
 166
 167        spin_unlock(&svcpt->scp_lock);
 168
 169        CDEBUG(D_RPCTRACE,
 170               "%s: allocate %d new %d-byte reqbufs (%d/%d left), rc = %d\n",
 171               svc->srv_name, i, svc->srv_buf_size, svcpt->scp_nrqbds_posted,
 172               svcpt->scp_nrqbds_total, rc);
 173
 174 try_post:
 175        if (post && rc == 0)
 176                rc = ptlrpc_server_post_idle_rqbds(svcpt);
 177
 178        return rc;
 179}
 180
 181struct ptlrpc_hr_partition;
 182
 183struct ptlrpc_hr_thread {
 184        int                             hrt_id;         /* thread ID */
 185        spinlock_t                      hrt_lock;
 186        wait_queue_head_t                       hrt_waitq;
 187        struct list_head                        hrt_queue;      /* RS queue */
 188        struct ptlrpc_hr_partition      *hrt_partition;
 189};
 190
 191struct ptlrpc_hr_partition {
 192        /* # of started threads */
 193        atomic_t                        hrp_nstarted;
 194        /* # of stopped threads */
 195        atomic_t                        hrp_nstopped;
 196        /* cpu partition id */
 197        int                             hrp_cpt;
 198        /* round-robin rotor for choosing thread */
 199        int                             hrp_rotor;
 200        /* total number of threads on this partition */
 201        int                             hrp_nthrs;
 202        /* threads table */
 203        struct ptlrpc_hr_thread         *hrp_thrs;
 204};
 205
 206#define HRT_RUNNING 0
 207#define HRT_STOPPING 1
 208
 209struct ptlrpc_hr_service {
 210        /* CPU partition table, it's just cfs_cpt_table for now */
 211        struct cfs_cpt_table            *hr_cpt_table;
 212        /** controller sleep waitq */
 213        wait_queue_head_t                       hr_waitq;
 214        unsigned int                    hr_stopping;
 215        /** roundrobin rotor for non-affinity service */
 216        unsigned int                    hr_rotor;
 217        /* partition data */
 218        struct ptlrpc_hr_partition      **hr_partitions;
 219};
 220
 221/** reply handling service. */
 222static struct ptlrpc_hr_service         ptlrpc_hr;
 223
 224/**
 225 * Choose an hr thread to dispatch requests to.
 226 */
 227static struct ptlrpc_hr_thread *
 228ptlrpc_hr_select(struct ptlrpc_service_part *svcpt)
 229{
 230        struct ptlrpc_hr_partition *hrp;
 231        unsigned int rotor;
 232
 233        if (svcpt->scp_cpt >= 0 &&
 234            svcpt->scp_service->srv_cptable == ptlrpc_hr.hr_cpt_table) {
 235                /* directly match partition */
 236                hrp = ptlrpc_hr.hr_partitions[svcpt->scp_cpt];
 237
 238        } else {
 239                rotor = ptlrpc_hr.hr_rotor++;
 240                rotor %= cfs_cpt_number(ptlrpc_hr.hr_cpt_table);
 241
 242                hrp = ptlrpc_hr.hr_partitions[rotor];
 243        }
 244
 245        rotor = hrp->hrp_rotor++;
 246        return &hrp->hrp_thrs[rotor % hrp->hrp_nthrs];
 247}
 248
 249/**
 250 * Put reply state into a queue for processing because we received
 251 * ACK from the client
 252 */
 253void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
 254{
 255        struct ptlrpc_hr_thread *hrt;
 256
 257        LASSERT(list_empty(&rs->rs_list));
 258
 259        hrt = ptlrpc_hr_select(rs->rs_svcpt);
 260
 261        spin_lock(&hrt->hrt_lock);
 262        list_add_tail(&rs->rs_list, &hrt->hrt_queue);
 263        spin_unlock(&hrt->hrt_lock);
 264
 265        wake_up(&hrt->hrt_waitq);
 266}
 267
 268void
 269ptlrpc_schedule_difficult_reply(struct ptlrpc_reply_state *rs)
 270{
 271        assert_spin_locked(&rs->rs_svcpt->scp_rep_lock);
 272        assert_spin_locked(&rs->rs_lock);
 273        LASSERT(rs->rs_difficult);
 274        rs->rs_scheduled_ever = 1;  /* flag any notification attempt */
 275
 276        if (rs->rs_scheduled) {     /* being set up or already notified */
 277                return;
 278        }
 279
 280        rs->rs_scheduled = 1;
 281        list_del_init(&rs->rs_list);
 282        ptlrpc_dispatch_difficult_reply(rs);
 283}
 284EXPORT_SYMBOL(ptlrpc_schedule_difficult_reply);
 285
 286static int
 287ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
 288{
 289        struct ptlrpc_request_buffer_desc *rqbd;
 290        int rc;
 291        int posted = 0;
 292
 293        for (;;) {
 294                spin_lock(&svcpt->scp_lock);
 295
 296                if (list_empty(&svcpt->scp_rqbd_idle)) {
 297                        spin_unlock(&svcpt->scp_lock);
 298                        return posted;
 299                }
 300
 301                rqbd = list_entry(svcpt->scp_rqbd_idle.next,
 302                                  struct ptlrpc_request_buffer_desc,
 303                                  rqbd_list);
 304                list_del(&rqbd->rqbd_list);
 305
 306                /* assume we will post successfully */
 307                svcpt->scp_nrqbds_posted++;
 308                list_add(&rqbd->rqbd_list, &svcpt->scp_rqbd_posted);
 309
 310                spin_unlock(&svcpt->scp_lock);
 311
 312                rc = ptlrpc_register_rqbd(rqbd);
 313                if (rc != 0)
 314                        break;
 315
 316                posted = 1;
 317        }
 318
 319        spin_lock(&svcpt->scp_lock);
 320
 321        svcpt->scp_nrqbds_posted--;
 322        list_del(&rqbd->rqbd_list);
 323        list_add_tail(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
 324
 325        /* Don't complain if no request buffers are posted right now; LNET
 326         * won't drop requests because we set the portal lazy!
 327         */
 328
 329        spin_unlock(&svcpt->scp_lock);
 330
 331        return -1;
 332}
 333
 334static void ptlrpc_at_timer(unsigned long castmeharder)
 335{
 336        struct ptlrpc_service_part *svcpt;
 337
 338        svcpt = (struct ptlrpc_service_part *)castmeharder;
 339
 340        svcpt->scp_at_check = 1;
 341        svcpt->scp_at_checktime = cfs_time_current();
 342        wake_up(&svcpt->scp_waitq);
 343}
 344
 345static void
 346ptlrpc_server_nthreads_check(struct ptlrpc_service *svc,
 347                             struct ptlrpc_service_conf *conf)
 348{
 349        struct ptlrpc_service_thr_conf *tc = &conf->psc_thr;
 350        unsigned init;
 351        unsigned total;
 352        unsigned nthrs;
 353        int weight;
 354
 355        /*
 356         * Common code for estimating & validating threads number.
 357         * CPT affinity service could have percpt thread-pool instead
 358         * of a global thread-pool, which means user might not always
 359         * get the threads number they give it in conf::tc_nthrs_user
 360         * even they did set. It's because we need to validate threads
 361         * number for each CPT to guarantee each pool will have enough
 362         * threads to keep the service healthy.
 363         */
 364        init = PTLRPC_NTHRS_INIT + (svc->srv_ops.so_hpreq_handler != NULL);
 365        init = max_t(int, init, tc->tc_nthrs_init);
 366
 367        /* NB: please see comments in lustre_lnet.h for definition
 368         * details of these members
 369         */
 370        LASSERT(tc->tc_nthrs_max != 0);
 371
 372        if (tc->tc_nthrs_user != 0) {
 373                /* In case there is a reason to test a service with many
 374                 * threads, we give a less strict check here, it can
 375                 * be up to 8 * nthrs_max
 376                 */
 377                total = min(tc->tc_nthrs_max * 8, tc->tc_nthrs_user);
 378                nthrs = total / svc->srv_ncpts;
 379                init = max(init, nthrs);
 380                goto out;
 381        }
 382
 383        total = tc->tc_nthrs_max;
 384        if (tc->tc_nthrs_base == 0) {
 385                /* don't care about base threads number per partition,
 386                 * this is most for non-affinity service
 387                 */
 388                nthrs = total / svc->srv_ncpts;
 389                goto out;
 390        }
 391
 392        nthrs = tc->tc_nthrs_base;
 393        if (svc->srv_ncpts == 1) {
 394                int i;
 395
 396                /* NB: Increase the base number if it's single partition
 397                 * and total number of cores/HTs is larger or equal to 4.
 398                 * result will always < 2 * nthrs_base
 399                 */
 400                weight = cfs_cpt_weight(svc->srv_cptable, CFS_CPT_ANY);
 401                for (i = 1; (weight >> (i + 1)) != 0 && /* >= 4 cores/HTs */
 402                            (tc->tc_nthrs_base >> i) != 0; i++)
 403                        nthrs += tc->tc_nthrs_base >> i;
 404        }
 405
 406        if (tc->tc_thr_factor != 0) {
 407                int factor = tc->tc_thr_factor;
 408                const int fade = 4;
 409
 410                /*
 411                 * User wants to increase number of threads with for
 412                 * each CPU core/HT, most likely the factor is larger then
 413                 * one thread/core because service threads are supposed to
 414                 * be blocked by lock or wait for IO.
 415                 */
 416                /*
 417                 * Amdahl's law says that adding processors wouldn't give
 418                 * a linear increasing of parallelism, so it's nonsense to
 419                 * have too many threads no matter how many cores/HTs
 420                 * there are.
 421                 */
 422                /* weight is # of HTs */
 423                if (cpumask_weight(topology_sibling_cpumask(0)) > 1) {
 424                        /* depress thread factor for hyper-thread */
 425                        factor = factor - (factor >> 1) + (factor >> 3);
 426                }
 427
 428                weight = cfs_cpt_weight(svc->srv_cptable, 0);
 429                LASSERT(weight > 0);
 430
 431                for (; factor > 0 && weight > 0; factor--, weight -= fade)
 432                        nthrs += min(weight, fade) * factor;
 433        }
 434
 435        if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) {
 436                nthrs = max(tc->tc_nthrs_base,
 437                            tc->tc_nthrs_max / svc->srv_ncpts);
 438        }
 439 out:
 440        nthrs = max(nthrs, tc->tc_nthrs_init);
 441        svc->srv_nthrs_cpt_limit = nthrs;
 442        svc->srv_nthrs_cpt_init = init;
 443
 444        if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) {
 445                CDEBUG(D_OTHER, "%s: This service may have more threads (%d) than the given soft limit (%d)\n",
 446                       svc->srv_name, nthrs * svc->srv_ncpts,
 447                       tc->tc_nthrs_max);
 448        }
 449}
 450
 451/**
 452 * Initialize percpt data for a service
 453 */
 454static int
 455ptlrpc_service_part_init(struct ptlrpc_service *svc,
 456                         struct ptlrpc_service_part *svcpt, int cpt)
 457{
 458        struct ptlrpc_at_array  *array;
 459        int size;
 460        int index;
 461        int rc;
 462
 463        svcpt->scp_cpt = cpt;
 464        INIT_LIST_HEAD(&svcpt->scp_threads);
 465
 466        /* rqbd and incoming request queue */
 467        spin_lock_init(&svcpt->scp_lock);
 468        INIT_LIST_HEAD(&svcpt->scp_rqbd_idle);
 469        INIT_LIST_HEAD(&svcpt->scp_rqbd_posted);
 470        INIT_LIST_HEAD(&svcpt->scp_req_incoming);
 471        init_waitqueue_head(&svcpt->scp_waitq);
 472        /* history request & rqbd list */
 473        INIT_LIST_HEAD(&svcpt->scp_hist_reqs);
 474        INIT_LIST_HEAD(&svcpt->scp_hist_rqbds);
 475
 476        /* active requests and hp requests */
 477        spin_lock_init(&svcpt->scp_req_lock);
 478
 479        /* reply states */
 480        spin_lock_init(&svcpt->scp_rep_lock);
 481        INIT_LIST_HEAD(&svcpt->scp_rep_active);
 482        INIT_LIST_HEAD(&svcpt->scp_rep_idle);
 483        init_waitqueue_head(&svcpt->scp_rep_waitq);
 484        atomic_set(&svcpt->scp_nreps_difficult, 0);
 485
 486        /* adaptive timeout */
 487        spin_lock_init(&svcpt->scp_at_lock);
 488        array = &svcpt->scp_at_array;
 489
 490        size = at_est2timeout(at_max);
 491        array->paa_size = size;
 492        array->paa_count = 0;
 493        array->paa_deadline = -1;
 494
 495        /* allocate memory for scp_at_array (ptlrpc_at_array) */
 496        array->paa_reqs_array =
 497                kzalloc_node(sizeof(struct list_head) * size, GFP_NOFS,
 498                             cfs_cpt_spread_node(svc->srv_cptable, cpt));
 499        if (!array->paa_reqs_array)
 500                return -ENOMEM;
 501
 502        for (index = 0; index < size; index++)
 503                INIT_LIST_HEAD(&array->paa_reqs_array[index]);
 504
 505        array->paa_reqs_count =
 506                kzalloc_node(sizeof(__u32) * size, GFP_NOFS,
 507                             cfs_cpt_spread_node(svc->srv_cptable, cpt));
 508        if (!array->paa_reqs_count)
 509                goto free_reqs_array;
 510
 511        setup_timer(&svcpt->scp_at_timer, ptlrpc_at_timer,
 512                    (unsigned long)svcpt);
 513
 514        /* At SOW, service time should be quick; 10s seems generous. If client
 515         * timeout is less than this, we'll be sending an early reply.
 516         */
 517        at_init(&svcpt->scp_at_estimate, 10, 0);
 518
 519        /* assign this before call ptlrpc_grow_req_bufs */
 520        svcpt->scp_service = svc;
 521        /* Now allocate the request buffers, but don't post them now */
 522        rc = ptlrpc_grow_req_bufs(svcpt, 0);
 523        /* We shouldn't be under memory pressure at startup, so
 524         * fail if we can't allocate all our buffers at this time.
 525         */
 526        if (rc != 0)
 527                goto free_reqs_count;
 528
 529        return 0;
 530
 531free_reqs_count:
 532        kfree(array->paa_reqs_count);
 533        array->paa_reqs_count = NULL;
 534free_reqs_array:
 535        kfree(array->paa_reqs_array);
 536        array->paa_reqs_array = NULL;
 537
 538        return -ENOMEM;
 539}
 540
 541/**
 542 * Initialize service on a given portal.
 543 * This includes starting serving threads , allocating and posting rqbds and
 544 * so on.
 545 */
 546struct ptlrpc_service *
 547ptlrpc_register_service(struct ptlrpc_service_conf *conf,
 548                        struct kset *parent,
 549                        struct dentry *debugfs_entry)
 550{
 551        struct ptlrpc_service_cpt_conf *cconf = &conf->psc_cpt;
 552        struct ptlrpc_service *service;
 553        struct ptlrpc_service_part *svcpt;
 554        struct cfs_cpt_table *cptable;
 555        __u32 *cpts = NULL;
 556        int ncpts;
 557        int cpt;
 558        int rc;
 559        int i;
 560
 561        LASSERT(conf->psc_buf.bc_nbufs > 0);
 562        LASSERT(conf->psc_buf.bc_buf_size >=
 563                conf->psc_buf.bc_req_max_size + SPTLRPC_MAX_PAYLOAD);
 564        LASSERT(conf->psc_thr.tc_ctx_tags != 0);
 565
 566        cptable = cconf->cc_cptable;
 567        if (!cptable)
 568                cptable = cfs_cpt_table;
 569
 570        if (!conf->psc_thr.tc_cpu_affinity) {
 571                ncpts = 1;
 572        } else {
 573                ncpts = cfs_cpt_number(cptable);
 574                if (cconf->cc_pattern) {
 575                        struct cfs_expr_list *el;
 576
 577                        rc = cfs_expr_list_parse(cconf->cc_pattern,
 578                                                 strlen(cconf->cc_pattern),
 579                                                 0, ncpts - 1, &el);
 580                        if (rc != 0) {
 581                                CERROR("%s: invalid CPT pattern string: %s",
 582                                       conf->psc_name, cconf->cc_pattern);
 583                                return ERR_PTR(-EINVAL);
 584                        }
 585
 586                        rc = cfs_expr_list_values(el, ncpts, &cpts);
 587                        cfs_expr_list_free(el);
 588                        if (rc <= 0) {
 589                                CERROR("%s: failed to parse CPT array %s: %d\n",
 590                                       conf->psc_name, cconf->cc_pattern, rc);
 591                                kfree(cpts);
 592                                return ERR_PTR(rc < 0 ? rc : -EINVAL);
 593                        }
 594                        ncpts = rc;
 595                }
 596        }
 597
 598        service = kzalloc(offsetof(struct ptlrpc_service, srv_parts[ncpts]),
 599                          GFP_NOFS);
 600        if (!service) {
 601                kfree(cpts);
 602                return ERR_PTR(-ENOMEM);
 603        }
 604
 605        service->srv_cptable = cptable;
 606        service->srv_cpts = cpts;
 607        service->srv_ncpts = ncpts;
 608
 609        service->srv_cpt_bits = 0; /* it's zero already, easy to read... */
 610        while ((1 << service->srv_cpt_bits) < cfs_cpt_number(cptable))
 611                service->srv_cpt_bits++;
 612
 613        /* public members */
 614        spin_lock_init(&service->srv_lock);
 615        service->srv_name = conf->psc_name;
 616        service->srv_watchdog_factor = conf->psc_watchdog_factor;
 617        INIT_LIST_HEAD(&service->srv_list); /* for safety of cleanup */
 618
 619        /* buffer configuration */
 620        service->srv_nbuf_per_group = test_req_buffer_pressure ?
 621                                          1 : conf->psc_buf.bc_nbufs;
 622        service->srv_max_req_size = conf->psc_buf.bc_req_max_size +
 623                                          SPTLRPC_MAX_PAYLOAD;
 624        service->srv_buf_size = conf->psc_buf.bc_buf_size;
 625        service->srv_rep_portal = conf->psc_buf.bc_rep_portal;
 626        service->srv_req_portal = conf->psc_buf.bc_req_portal;
 627
 628        /* Increase max reply size to next power of two */
 629        service->srv_max_reply_size = 1;
 630        while (service->srv_max_reply_size <
 631               conf->psc_buf.bc_rep_max_size + SPTLRPC_MAX_PAYLOAD)
 632                service->srv_max_reply_size <<= 1;
 633
 634        service->srv_thread_name = conf->psc_thr.tc_thr_name;
 635        service->srv_ctx_tags = conf->psc_thr.tc_ctx_tags;
 636        service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
 637        service->srv_ops = conf->psc_ops;
 638
 639        for (i = 0; i < ncpts; i++) {
 640                if (!conf->psc_thr.tc_cpu_affinity)
 641                        cpt = CFS_CPT_ANY;
 642                else
 643                        cpt = cpts ? cpts[i] : i;
 644
 645                svcpt = kzalloc_node(sizeof(*svcpt), GFP_NOFS,
 646                                     cfs_cpt_spread_node(cptable, cpt));
 647                if (!svcpt) {
 648                        rc = -ENOMEM;
 649                        goto failed;
 650                }
 651
 652                service->srv_parts[i] = svcpt;
 653                rc = ptlrpc_service_part_init(service, svcpt, cpt);
 654                if (rc != 0)
 655                        goto failed;
 656        }
 657
 658        ptlrpc_server_nthreads_check(service, conf);
 659
 660        rc = LNetSetLazyPortal(service->srv_req_portal);
 661        LASSERT(rc == 0);
 662
 663        mutex_lock(&ptlrpc_all_services_mutex);
 664        list_add(&service->srv_list, &ptlrpc_all_services);
 665        mutex_unlock(&ptlrpc_all_services_mutex);
 666
 667        if (parent) {
 668                rc = ptlrpc_sysfs_register_service(parent, service);
 669                if (rc)
 670                        goto failed;
 671        }
 672
 673        if (!IS_ERR_OR_NULL(debugfs_entry))
 674                ptlrpc_ldebugfs_register_service(debugfs_entry, service);
 675
 676        rc = ptlrpc_service_nrs_setup(service);
 677        if (rc != 0)
 678                goto failed;
 679
 680        CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
 681               service->srv_name, service->srv_req_portal);
 682
 683        rc = ptlrpc_start_threads(service);
 684        if (rc != 0) {
 685                CERROR("Failed to start threads for service %s: %d\n",
 686                       service->srv_name, rc);
 687                goto failed;
 688        }
 689
 690        return service;
 691failed:
 692        ptlrpc_unregister_service(service);
 693        return ERR_PTR(rc);
 694}
 695EXPORT_SYMBOL(ptlrpc_register_service);
 696
 697/**
 698 * to actually free the request, must be called without holding svc_lock.
 699 * note it's caller's responsibility to unlink req->rq_list.
 700 */
 701static void ptlrpc_server_free_request(struct ptlrpc_request *req)
 702{
 703        LASSERT(atomic_read(&req->rq_refcount) == 0);
 704        LASSERT(list_empty(&req->rq_timed_list));
 705
 706         /* DEBUG_REQ() assumes the reply state of a request with a valid
 707          * ref will not be destroyed until that reference is dropped.
 708          */
 709        ptlrpc_req_drop_rs(req);
 710
 711        sptlrpc_svc_ctx_decref(req);
 712
 713        if (req != &req->rq_rqbd->rqbd_req) {
 714                /* NB request buffers use an embedded
 715                 * req if the incoming req unlinked the
 716                 * MD; this isn't one of them!
 717                 */
 718                ptlrpc_request_cache_free(req);
 719        }
 720}
 721
 722/**
 723 * drop a reference count of the request. if it reaches 0, we either
 724 * put it into history list, or free it immediately.
 725 */
 726static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 727{
 728        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
 729        struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
 730        struct ptlrpc_service *svc = svcpt->scp_service;
 731        int refcount;
 732        struct list_head *tmp;
 733        struct list_head *nxt;
 734
 735        if (!atomic_dec_and_test(&req->rq_refcount))
 736                return;
 737
 738        if (req->rq_at_linked) {
 739                spin_lock(&svcpt->scp_at_lock);
 740                /* recheck with lock, in case it's unlinked by
 741                 * ptlrpc_at_check_timed()
 742                 */
 743                if (likely(req->rq_at_linked))
 744                        ptlrpc_at_remove_timed(req);
 745                spin_unlock(&svcpt->scp_at_lock);
 746        }
 747
 748        LASSERT(list_empty(&req->rq_timed_list));
 749
 750        /* finalize request */
 751        if (req->rq_export) {
 752                class_export_put(req->rq_export);
 753                req->rq_export = NULL;
 754        }
 755
 756        spin_lock(&svcpt->scp_lock);
 757
 758        list_add(&req->rq_list, &rqbd->rqbd_reqs);
 759
 760        refcount = --(rqbd->rqbd_refcount);
 761        if (refcount == 0) {
 762                /* request buffer is now idle: add to history */
 763                list_del(&rqbd->rqbd_list);
 764
 765                list_add_tail(&rqbd->rqbd_list, &svcpt->scp_hist_rqbds);
 766                svcpt->scp_hist_nrqbds++;
 767
 768                /* cull some history?
 769                 * I expect only about 1 or 2 rqbds need to be recycled here
 770                 */
 771                while (svcpt->scp_hist_nrqbds > svc->srv_hist_nrqbds_cpt_max) {
 772                        rqbd = list_entry(svcpt->scp_hist_rqbds.next,
 773                                          struct ptlrpc_request_buffer_desc,
 774                                          rqbd_list);
 775
 776                        list_del(&rqbd->rqbd_list);
 777                        svcpt->scp_hist_nrqbds--;
 778
 779                        /* remove rqbd's reqs from svc's req history while
 780                         * I've got the service lock
 781                         */
 782                        list_for_each(tmp, &rqbd->rqbd_reqs) {
 783                                req = list_entry(tmp, struct ptlrpc_request,
 784                                                 rq_list);
 785                                /* Track the highest culled req seq */
 786                                if (req->rq_history_seq >
 787                                    svcpt->scp_hist_seq_culled) {
 788                                        svcpt->scp_hist_seq_culled =
 789                                                req->rq_history_seq;
 790                                }
 791                                list_del(&req->rq_history_list);
 792                        }
 793
 794                        spin_unlock(&svcpt->scp_lock);
 795
 796                        list_for_each_safe(tmp, nxt, &rqbd->rqbd_reqs) {
 797                                req = list_entry(rqbd->rqbd_reqs.next,
 798                                                 struct ptlrpc_request,
 799                                                 rq_list);
 800                                list_del(&req->rq_list);
 801                                ptlrpc_server_free_request(req);
 802                        }
 803
 804                        spin_lock(&svcpt->scp_lock);
 805                        /*
 806                         * now all reqs including the embedded req has been
 807                         * disposed, schedule request buffer for re-use.
 808                         */
 809                        LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) ==
 810                                0);
 811                        list_add_tail(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
 812                }
 813
 814                spin_unlock(&svcpt->scp_lock);
 815        } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
 816                /* If we are low on memory, we are not interested in history */
 817                list_del(&req->rq_list);
 818                list_del_init(&req->rq_history_list);
 819
 820                /* Track the highest culled req seq */
 821                if (req->rq_history_seq > svcpt->scp_hist_seq_culled)
 822                        svcpt->scp_hist_seq_culled = req->rq_history_seq;
 823
 824                spin_unlock(&svcpt->scp_lock);
 825
 826                ptlrpc_server_free_request(req);
 827        } else {
 828                spin_unlock(&svcpt->scp_lock);
 829        }
 830}
 831
 832/**
 833 * to finish a request: stop sending more early replies, and release
 834 * the request.
 835 */
 836static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
 837                                         struct ptlrpc_request *req)
 838{
 839        ptlrpc_server_hpreq_fini(req);
 840
 841        ptlrpc_server_drop_request(req);
 842}
 843
 844/**
 845 * to finish a active request: stop sending more early replies, and release
 846 * the request. should be called after we finished handling the request.
 847 */
 848static void ptlrpc_server_finish_active_request(
 849                                        struct ptlrpc_service_part *svcpt,
 850                                        struct ptlrpc_request *req)
 851{
 852        spin_lock(&svcpt->scp_req_lock);
 853        ptlrpc_nrs_req_stop_nolock(req);
 854        svcpt->scp_nreqs_active--;
 855        if (req->rq_hp)
 856                svcpt->scp_nhreqs_active--;
 857        spin_unlock(&svcpt->scp_req_lock);
 858
 859        ptlrpc_nrs_req_finalize(req);
 860
 861        if (req->rq_export)
 862                class_export_rpc_dec(req->rq_export);
 863
 864        ptlrpc_server_finish_request(svcpt, req);
 865}
 866
 867/**
 868 * Sanity check request \a req.
 869 * Return 0 if all is ok, error code otherwise.
 870 */
 871static int ptlrpc_check_req(struct ptlrpc_request *req)
 872{
 873        struct obd_device *obd = req->rq_export->exp_obd;
 874        int rc = 0;
 875
 876        if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
 877                     req->rq_export->exp_conn_cnt)) {
 878                DEBUG_REQ(D_RPCTRACE, req,
 879                          "DROPPING req from old connection %d < %d",
 880                          lustre_msg_get_conn_cnt(req->rq_reqmsg),
 881                          req->rq_export->exp_conn_cnt);
 882                return -EEXIST;
 883        }
 884        if (unlikely(!obd || obd->obd_fail)) {
 885                /*
 886                 * Failing over, don't handle any more reqs, send
 887                 * error response instead.
 888                 */
 889                CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n",
 890                       req, obd ? obd->obd_name : "unknown");
 891                rc = -ENODEV;
 892        } else if (lustre_msg_get_flags(req->rq_reqmsg) &
 893                   (MSG_REPLAY | MSG_REQ_REPLAY_DONE)) {
 894                DEBUG_REQ(D_ERROR, req, "Invalid replay without recovery");
 895                class_fail_export(req->rq_export);
 896                rc = -ENODEV;
 897        } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0) {
 898                DEBUG_REQ(D_ERROR, req,
 899                          "Invalid req with transno %llu without recovery",
 900                          lustre_msg_get_transno(req->rq_reqmsg));
 901                class_fail_export(req->rq_export);
 902                rc = -ENODEV;
 903        }
 904
 905        if (unlikely(rc < 0)) {
 906                req->rq_status = rc;
 907                ptlrpc_error(req);
 908        }
 909        return rc;
 910}
 911
 912static void ptlrpc_at_set_timer(struct ptlrpc_service_part *svcpt)
 913{
 914        struct ptlrpc_at_array *array = &svcpt->scp_at_array;
 915        __s32 next;
 916
 917        if (array->paa_count == 0) {
 918                del_timer(&svcpt->scp_at_timer);
 919                return;
 920        }
 921
 922        /* Set timer for closest deadline */
 923        next = (__s32)(array->paa_deadline - ktime_get_real_seconds() -
 924                       at_early_margin);
 925        if (next <= 0) {
 926                ptlrpc_at_timer((unsigned long)svcpt);
 927        } else {
 928                mod_timer(&svcpt->scp_at_timer, cfs_time_shift(next));
 929                CDEBUG(D_INFO, "armed %s at %+ds\n",
 930                       svcpt->scp_service->srv_name, next);
 931        }
 932}
 933
 934/* Add rpc to early reply check list */
 935static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
 936{
 937        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
 938        struct ptlrpc_at_array *array = &svcpt->scp_at_array;
 939        struct ptlrpc_request *rq = NULL;
 940        __u32 index;
 941
 942        if (AT_OFF)
 943                return 0;
 944
 945        if (req->rq_no_reply)
 946                return 0;
 947
 948        if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
 949                return -ENOSYS;
 950
 951        spin_lock(&svcpt->scp_at_lock);
 952        LASSERT(list_empty(&req->rq_timed_list));
 953
 954        div_u64_rem(req->rq_deadline, array->paa_size, &index);
 955        if (array->paa_reqs_count[index] > 0) {
 956                /* latest rpcs will have the latest deadlines in the list,
 957                 * so search backward.
 958                 */
 959                list_for_each_entry_reverse(rq, &array->paa_reqs_array[index],
 960                                            rq_timed_list) {
 961                        if (req->rq_deadline >= rq->rq_deadline) {
 962                                list_add(&req->rq_timed_list,
 963                                         &rq->rq_timed_list);
 964                                break;
 965                        }
 966                }
 967        }
 968
 969        /* Add the request at the head of the list */
 970        if (list_empty(&req->rq_timed_list))
 971                list_add(&req->rq_timed_list, &array->paa_reqs_array[index]);
 972
 973        spin_lock(&req->rq_lock);
 974        req->rq_at_linked = 1;
 975        spin_unlock(&req->rq_lock);
 976        req->rq_at_index = index;
 977        array->paa_reqs_count[index]++;
 978        array->paa_count++;
 979        if (array->paa_count == 1 || array->paa_deadline > req->rq_deadline) {
 980                array->paa_deadline = req->rq_deadline;
 981                ptlrpc_at_set_timer(svcpt);
 982        }
 983        spin_unlock(&svcpt->scp_at_lock);
 984
 985        return 0;
 986}
 987
 988static void
 989ptlrpc_at_remove_timed(struct ptlrpc_request *req)
 990{
 991        struct ptlrpc_at_array *array;
 992
 993        array = &req->rq_rqbd->rqbd_svcpt->scp_at_array;
 994
 995        /* NB: must call with hold svcpt::scp_at_lock */
 996        LASSERT(!list_empty(&req->rq_timed_list));
 997        list_del_init(&req->rq_timed_list);
 998
 999        spin_lock(&req->rq_lock);
1000        req->rq_at_linked = 0;
1001        spin_unlock(&req->rq_lock);
1002
1003        array->paa_reqs_count[req->rq_at_index]--;
1004        array->paa_count--;
1005}
1006
1007static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
1008{
1009        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
1010        struct ptlrpc_request *reqcopy;
1011        struct lustre_msg *reqmsg;
1012        long olddl = req->rq_deadline - ktime_get_real_seconds();
1013        time64_t newdl;
1014        int rc;
1015
1016        /* deadline is when the client expects us to reply, margin is the
1017         * difference between clients' and servers' expectations
1018         */
1019        DEBUG_REQ(D_ADAPTTO, req,
1020                  "%ssending early reply (deadline %+lds, margin %+lds) for %d+%d",
1021                  AT_OFF ? "AT off - not " : "",
1022                  olddl, olddl - at_get(&svcpt->scp_at_estimate),
1023                  at_get(&svcpt->scp_at_estimate), at_extra);
1024
1025        if (AT_OFF)
1026                return 0;
1027
1028        if (olddl < 0) {
1029                DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), not sending early reply. Consider increasing at_early_margin (%d)?",
1030                          olddl, at_early_margin);
1031
1032                /* Return an error so we're not re-added to the timed list. */
1033                return -ETIMEDOUT;
1034        }
1035
1036        if (!(lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
1037                DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, but no AT support");
1038                return -ENOSYS;
1039        }
1040
1041        /* Fake our processing time into the future to ask the clients
1042         * for some extra amount of time
1043         */
1044        at_measured(&svcpt->scp_at_estimate, at_extra +
1045                    ktime_get_real_seconds() - req->rq_arrival_time.tv_sec);
1046
1047        /* Check to see if we've actually increased the deadline -
1048         * we may be past adaptive_max
1049         */
1050        if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
1051            at_get(&svcpt->scp_at_estimate)) {
1052                DEBUG_REQ(D_WARNING, req, "Couldn't add any time (%ld/%lld), not sending early reply\n",
1053                          olddl, req->rq_arrival_time.tv_sec +
1054                          at_get(&svcpt->scp_at_estimate) -
1055                          ktime_get_real_seconds());
1056                return -ETIMEDOUT;
1057        }
1058        newdl = ktime_get_real_seconds() + at_get(&svcpt->scp_at_estimate);
1059
1060        reqcopy = ptlrpc_request_cache_alloc(GFP_NOFS);
1061        if (!reqcopy)
1062                return -ENOMEM;
1063        reqmsg = libcfs_kvzalloc(req->rq_reqlen, GFP_NOFS);
1064        if (!reqmsg) {
1065                rc = -ENOMEM;
1066                goto out_free;
1067        }
1068
1069        *reqcopy = *req;
1070        reqcopy->rq_reply_state = NULL;
1071        reqcopy->rq_rep_swab_mask = 0;
1072        reqcopy->rq_pack_bulk = 0;
1073        reqcopy->rq_pack_udesc = 0;
1074        reqcopy->rq_packed_final = 0;
1075        sptlrpc_svc_ctx_addref(reqcopy);
1076        /* We only need the reqmsg for the magic */
1077        reqcopy->rq_reqmsg = reqmsg;
1078        memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
1079
1080        LASSERT(atomic_read(&req->rq_refcount));
1081        /** if it is last refcount then early reply isn't needed */
1082        if (atomic_read(&req->rq_refcount) == 1) {
1083                DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, abort sending early reply\n");
1084                rc = -EINVAL;
1085                goto out;
1086        }
1087
1088        /* Connection ref */
1089        reqcopy->rq_export = class_conn2export(
1090                                     lustre_msg_get_handle(reqcopy->rq_reqmsg));
1091        if (!reqcopy->rq_export) {
1092                rc = -ENODEV;
1093                goto out;
1094        }
1095
1096        /* RPC ref */
1097        class_export_rpc_inc(reqcopy->rq_export);
1098        if (reqcopy->rq_export->exp_obd &&
1099            reqcopy->rq_export->exp_obd->obd_fail) {
1100                rc = -ENODEV;
1101                goto out_put;
1102        }
1103
1104        rc = lustre_pack_reply_flags(reqcopy, 1, NULL, NULL, LPRFL_EARLY_REPLY);
1105        if (rc)
1106                goto out_put;
1107
1108        rc = ptlrpc_send_reply(reqcopy, PTLRPC_REPLY_EARLY);
1109
1110        if (!rc) {
1111                /* Adjust our own deadline to what we told the client */
1112                req->rq_deadline = newdl;
1113                req->rq_early_count++; /* number sent, server side */
1114        } else {
1115                DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
1116        }
1117
1118        /* Free the (early) reply state from lustre_pack_reply.
1119         * (ptlrpc_send_reply takes it's own rs ref, so this is safe here)
1120         */
1121        ptlrpc_req_drop_rs(reqcopy);
1122
1123out_put:
1124        class_export_rpc_dec(reqcopy->rq_export);
1125        class_export_put(reqcopy->rq_export);
1126out:
1127        sptlrpc_svc_ctx_decref(reqcopy);
1128        kvfree(reqmsg);
1129out_free:
1130        ptlrpc_request_cache_free(reqcopy);
1131        return rc;
1132}
1133
1134/* Send early replies to everybody expiring within at_early_margin
1135 * asking for at_extra time
1136 */
1137static void ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
1138{
1139        struct ptlrpc_at_array *array = &svcpt->scp_at_array;
1140        struct ptlrpc_request *rq, *n;
1141        struct list_head work_list;
1142        __u32 index, count;
1143        time64_t deadline;
1144        time64_t now = ktime_get_real_seconds();
1145        long delay;
1146        int first, counter = 0;
1147
1148        spin_lock(&svcpt->scp_at_lock);
1149        if (svcpt->scp_at_check == 0) {
1150                spin_unlock(&svcpt->scp_at_lock);
1151                return;
1152        }
1153        delay = cfs_time_sub(cfs_time_current(), svcpt->scp_at_checktime);
1154        svcpt->scp_at_check = 0;
1155
1156        if (array->paa_count == 0) {
1157                spin_unlock(&svcpt->scp_at_lock);
1158                return;
1159        }
1160
1161        /* The timer went off, but maybe the nearest rpc already completed. */
1162        first = array->paa_deadline - now;
1163        if (first > at_early_margin) {
1164                /* We've still got plenty of time.  Reset the timer. */
1165                ptlrpc_at_set_timer(svcpt);
1166                spin_unlock(&svcpt->scp_at_lock);
1167                return;
1168        }
1169
1170        /* We're close to a timeout, and we don't know how much longer the
1171         * server will take. Send early replies to everyone expiring soon.
1172         */
1173        INIT_LIST_HEAD(&work_list);
1174        deadline = -1;
1175        div_u64_rem(array->paa_deadline, array->paa_size, &index);
1176        count = array->paa_count;
1177        while (count > 0) {
1178                count -= array->paa_reqs_count[index];
1179                list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index],
1180                                         rq_timed_list) {
1181                        if (rq->rq_deadline > now + at_early_margin) {
1182                                /* update the earliest deadline */
1183                                if (deadline == -1 ||
1184                                    rq->rq_deadline < deadline)
1185                                        deadline = rq->rq_deadline;
1186                                break;
1187                        }
1188
1189                        ptlrpc_at_remove_timed(rq);
1190                        /**
1191                         * ptlrpc_server_drop_request() may drop
1192                         * refcount to 0 already. Let's check this and
1193                         * don't add entry to work_list
1194                         */
1195                        if (likely(atomic_inc_not_zero(&rq->rq_refcount)))
1196                                list_add(&rq->rq_timed_list, &work_list);
1197                        counter++;
1198                }
1199
1200                if (++index >= array->paa_size)
1201                        index = 0;
1202        }
1203        array->paa_deadline = deadline;
1204        /* we have a new earliest deadline, restart the timer */
1205        ptlrpc_at_set_timer(svcpt);
1206
1207        spin_unlock(&svcpt->scp_at_lock);
1208
1209        CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early replies\n",
1210               first, at_extra, counter);
1211        if (first < 0) {
1212                /* We're already past request deadlines before we even get a
1213                 * chance to send early replies
1214                 */
1215                LCONSOLE_WARN("%s: This server is not able to keep up with request traffic (cpu-bound).\n",
1216                              svcpt->scp_service->srv_name);
1217                CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=%ld(jiff)\n",
1218                      counter, svcpt->scp_nreqs_incoming,
1219                      svcpt->scp_nreqs_active,
1220                      at_get(&svcpt->scp_at_estimate), delay);
1221        }
1222
1223        /* we took additional refcount so entries can't be deleted from list, no
1224         * locking is needed
1225         */
1226        while (!list_empty(&work_list)) {
1227                rq = list_entry(work_list.next, struct ptlrpc_request,
1228                                rq_timed_list);
1229                list_del_init(&rq->rq_timed_list);
1230
1231                if (ptlrpc_at_send_early_reply(rq) == 0)
1232                        ptlrpc_at_add_timed(rq);
1233
1234                ptlrpc_server_drop_request(rq);
1235        }
1236}
1237
1238/**
1239 * Put the request to the export list if the request may become
1240 * a high priority one.
1241 */
1242static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
1243                                    struct ptlrpc_request *req)
1244{
1245        int rc = 0;
1246
1247        if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
1248                rc = svcpt->scp_service->srv_ops.so_hpreq_handler(req);
1249                if (rc < 0)
1250                        return rc;
1251                LASSERT(rc == 0);
1252        }
1253        if (req->rq_export && req->rq_ops) {
1254                /* Perform request specific check. We should do this check
1255                 * before the request is added into exp_hp_rpcs list otherwise
1256                 * it may hit swab race at LU-1044.
1257                 */
1258                if (req->rq_ops->hpreq_check) {
1259                        rc = req->rq_ops->hpreq_check(req);
1260                        /**
1261                         * XXX: Out of all current
1262                         * ptlrpc_hpreq_ops::hpreq_check(), only
1263                         * ldlm_cancel_hpreq_check() can return an error code;
1264                         * other functions assert in similar places, which seems
1265                         * odd. What also does not seem right is that handlers
1266                         * for those RPCs do not assert on the same checks, but
1267                         * rather handle the error cases. e.g. see
1268                         * ost_rw_hpreq_check(), and ost_brw_read(),
1269                         * ost_brw_write().
1270                         */
1271                        if (rc < 0)
1272                                return rc;
1273                        LASSERT(rc == 0 || rc == 1);
1274                }
1275
1276                spin_lock_bh(&req->rq_export->exp_rpc_lock);
1277                list_add(&req->rq_exp_list, &req->rq_export->exp_hp_rpcs);
1278                spin_unlock_bh(&req->rq_export->exp_rpc_lock);
1279        }
1280
1281        ptlrpc_nrs_req_initialize(svcpt, req, rc);
1282
1283        return rc;
1284}
1285
1286/** Remove the request from the export list. */
1287static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
1288{
1289        if (req->rq_export && req->rq_ops) {
1290                /* refresh lock timeout again so that client has more
1291                 * room to send lock cancel RPC.
1292                 */
1293                if (req->rq_ops->hpreq_fini)
1294                        req->rq_ops->hpreq_fini(req);
1295
1296                spin_lock_bh(&req->rq_export->exp_rpc_lock);
1297                list_del_init(&req->rq_exp_list);
1298                spin_unlock_bh(&req->rq_export->exp_rpc_lock);
1299        }
1300}
1301
1302static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
1303                                     struct ptlrpc_request *req)
1304{
1305        int     rc;
1306
1307        rc = ptlrpc_server_hpreq_init(svcpt, req);
1308        if (rc < 0)
1309                return rc;
1310
1311        ptlrpc_nrs_req_add(svcpt, req, !!rc);
1312
1313        return 0;
1314}
1315
1316/**
1317 * Allow to handle high priority request
1318 * User can call it w/o any lock but need to hold
1319 * ptlrpc_service_part::scp_req_lock to get reliable result
1320 */
1321static bool ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt,
1322                                     bool force)
1323{
1324        int running = svcpt->scp_nthrs_running;
1325
1326        if (!nrs_svcpt_has_hp(svcpt))
1327                return false;
1328
1329        if (force)
1330                return true;
1331
1332        if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
1333                     CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
1334                /* leave just 1 thread for normal RPCs */
1335                running = PTLRPC_NTHRS_INIT;
1336                if (svcpt->scp_service->srv_ops.so_hpreq_handler)
1337                        running += 1;
1338        }
1339
1340        if (svcpt->scp_nreqs_active >= running - 1)
1341                return false;
1342
1343        if (svcpt->scp_nhreqs_active == 0)
1344                return true;
1345
1346        return !ptlrpc_nrs_req_pending_nolock(svcpt, false) ||
1347               svcpt->scp_hreq_count < svcpt->scp_service->srv_hpreq_ratio;
1348}
1349
1350static bool ptlrpc_server_high_pending(struct ptlrpc_service_part *svcpt,
1351                                       bool force)
1352{
1353        return ptlrpc_server_allow_high(svcpt, force) &&
1354               ptlrpc_nrs_req_pending_nolock(svcpt, true);
1355}
1356
1357/**
1358 * Only allow normal priority requests on a service that has a high-priority
1359 * queue if forced (i.e. cleanup), if there are other high priority requests
1360 * already being processed (i.e. those threads can service more high-priority
1361 * requests), or if there are enough idle threads that a later thread can do
1362 * a high priority request.
1363 * User can call it w/o any lock but need to hold
1364 * ptlrpc_service_part::scp_req_lock to get reliable result
1365 */
1366static bool ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt,
1367                                       bool force)
1368{
1369        int running = svcpt->scp_nthrs_running;
1370
1371        if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
1372                     CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
1373                /* leave just 1 thread for normal RPCs */
1374                running = PTLRPC_NTHRS_INIT;
1375                if (svcpt->scp_service->srv_ops.so_hpreq_handler)
1376                        running += 1;
1377        }
1378
1379        if (force ||
1380            svcpt->scp_nreqs_active < running - 2)
1381                return true;
1382
1383        if (svcpt->scp_nreqs_active >= running - 1)
1384                return false;
1385
1386        return svcpt->scp_nhreqs_active > 0 || !nrs_svcpt_has_hp(svcpt);
1387}
1388
1389static bool ptlrpc_server_normal_pending(struct ptlrpc_service_part *svcpt,
1390                                         bool force)
1391{
1392        return ptlrpc_server_allow_normal(svcpt, force) &&
1393               ptlrpc_nrs_req_pending_nolock(svcpt, false);
1394}
1395
1396/**
1397 * Returns true if there are requests available in incoming
1398 * request queue for processing and it is allowed to fetch them.
1399 * User can call it w/o any lock but need to hold ptlrpc_service::scp_req_lock
1400 * to get reliable result
1401 * \see ptlrpc_server_allow_normal
1402 * \see ptlrpc_server_allow high
1403 */
1404static inline bool
1405ptlrpc_server_request_pending(struct ptlrpc_service_part *svcpt, bool force)
1406{
1407        return ptlrpc_server_high_pending(svcpt, force) ||
1408               ptlrpc_server_normal_pending(svcpt, force);
1409}
1410
1411/**
1412 * Fetch a request for processing from queue of unprocessed requests.
1413 * Favors high-priority requests.
1414 * Returns a pointer to fetched request.
1415 */
1416static struct ptlrpc_request *
1417ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, bool force)
1418{
1419        struct ptlrpc_request *req = NULL;
1420
1421        spin_lock(&svcpt->scp_req_lock);
1422
1423        if (ptlrpc_server_high_pending(svcpt, force)) {
1424                req = ptlrpc_nrs_req_get_nolock(svcpt, true, force);
1425                if (req) {
1426                        svcpt->scp_hreq_count++;
1427                        goto got_request;
1428                }
1429        }
1430
1431        if (ptlrpc_server_normal_pending(svcpt, force)) {
1432                req = ptlrpc_nrs_req_get_nolock(svcpt, false, force);
1433                if (req) {
1434                        svcpt->scp_hreq_count = 0;
1435                        goto got_request;
1436                }
1437        }
1438
1439        spin_unlock(&svcpt->scp_req_lock);
1440        return NULL;
1441
1442got_request:
1443        svcpt->scp_nreqs_active++;
1444        if (req->rq_hp)
1445                svcpt->scp_nhreqs_active++;
1446
1447        spin_unlock(&svcpt->scp_req_lock);
1448
1449        if (likely(req->rq_export))
1450                class_export_rpc_inc(req->rq_export);
1451
1452        return req;
1453}
1454
1455/**
1456 * Handle freshly incoming reqs, add to timed early reply list,
1457 * pass on to regular request queue.
1458 * All incoming requests pass through here before getting into
1459 * ptlrpc_server_handle_req later on.
1460 */
1461static int
1462ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
1463                            struct ptlrpc_thread *thread)
1464{
1465        struct ptlrpc_service *svc = svcpt->scp_service;
1466        struct ptlrpc_request *req;
1467        __u32 deadline;
1468        int rc;
1469
1470        spin_lock(&svcpt->scp_lock);
1471        if (list_empty(&svcpt->scp_req_incoming)) {
1472                spin_unlock(&svcpt->scp_lock);
1473                return 0;
1474        }
1475
1476        req = list_entry(svcpt->scp_req_incoming.next,
1477                         struct ptlrpc_request, rq_list);
1478        list_del_init(&req->rq_list);
1479        svcpt->scp_nreqs_incoming--;
1480        /* Consider this still a "queued" request as far as stats are
1481         * concerned
1482         */
1483        spin_unlock(&svcpt->scp_lock);
1484
1485        /* go through security check/transform */
1486        rc = sptlrpc_svc_unwrap_request(req);
1487        switch (rc) {
1488        case SECSVC_OK:
1489                break;
1490        case SECSVC_COMPLETE:
1491                target_send_reply(req, 0, OBD_FAIL_MDS_ALL_REPLY_NET);
1492                goto err_req;
1493        case SECSVC_DROP:
1494                goto err_req;
1495        default:
1496                LBUG();
1497        }
1498
1499        /*
1500         * for null-flavored rpc, msg has been unpacked by sptlrpc, although
1501         * redo it wouldn't be harmful.
1502         */
1503        if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
1504                rc = ptlrpc_unpack_req_msg(req, req->rq_reqlen);
1505                if (rc != 0) {
1506                        CERROR("error unpacking request: ptl %d from %s x%llu\n",
1507                               svc->srv_req_portal, libcfs_id2str(req->rq_peer),
1508                               req->rq_xid);
1509                        goto err_req;
1510                }
1511        }
1512
1513        rc = lustre_unpack_req_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
1514        if (rc) {
1515                CERROR("error unpacking ptlrpc body: ptl %d from %s x%llu\n",
1516                       svc->srv_req_portal, libcfs_id2str(req->rq_peer),
1517                       req->rq_xid);
1518                goto err_req;
1519        }
1520
1521        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
1522            lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
1523                CERROR("drop incoming rpc opc %u, x%llu\n",
1524                       cfs_fail_val, req->rq_xid);
1525                goto err_req;
1526        }
1527
1528        rc = -EINVAL;
1529        if (lustre_msg_get_type(req->rq_reqmsg) != PTL_RPC_MSG_REQUEST) {
1530                CERROR("wrong packet type received (type=%u) from %s\n",
1531                       lustre_msg_get_type(req->rq_reqmsg),
1532                       libcfs_id2str(req->rq_peer));
1533                goto err_req;
1534        }
1535
1536        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1537        case MDS_WRITEPAGE:
1538        case OST_WRITE:
1539                req->rq_bulk_write = 1;
1540                break;
1541        case MDS_READPAGE:
1542        case OST_READ:
1543        case MGS_CONFIG_READ:
1544                req->rq_bulk_read = 1;
1545                break;
1546        }
1547
1548        CDEBUG(D_RPCTRACE, "got req x%llu\n", req->rq_xid);
1549
1550        req->rq_export = class_conn2export(
1551                lustre_msg_get_handle(req->rq_reqmsg));
1552        if (req->rq_export) {
1553                rc = ptlrpc_check_req(req);
1554                if (rc == 0) {
1555                        rc = sptlrpc_target_export_check(req->rq_export, req);
1556                        if (rc)
1557                                DEBUG_REQ(D_ERROR, req, "DROPPING req with illegal security flavor,");
1558                }
1559
1560                if (rc)
1561                        goto err_req;
1562        }
1563
1564        /* req_in handling should/must be fast */
1565        if (ktime_get_real_seconds() - req->rq_arrival_time.tv_sec > 5)
1566                DEBUG_REQ(D_WARNING, req, "Slow req_in handling "CFS_DURATION_T"s",
1567                          (long)(ktime_get_real_seconds() -
1568                                 req->rq_arrival_time.tv_sec));
1569
1570        /* Set rpc server deadline and add it to the timed list */
1571        deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
1572                    MSGHDR_AT_SUPPORT) ?
1573                   /* The max time the client expects us to take */
1574                   lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
1575        req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
1576        if (unlikely(deadline == 0)) {
1577                DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
1578                goto err_req;
1579        }
1580
1581        req->rq_svc_thread = thread;
1582
1583        ptlrpc_at_add_timed(req);
1584
1585        /* Move it over to the request processing queue */
1586        rc = ptlrpc_server_request_add(svcpt, req);
1587        if (rc)
1588                goto err_req;
1589
1590        wake_up(&svcpt->scp_waitq);
1591        return 1;
1592
1593err_req:
1594        ptlrpc_server_finish_request(svcpt, req);
1595
1596        return 1;
1597}
1598
1599/**
1600 * Main incoming request handling logic.
1601 * Calls handler function from service to do actual processing.
1602 */
1603static int
1604ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
1605                             struct ptlrpc_thread *thread)
1606{
1607        struct ptlrpc_service *svc = svcpt->scp_service;
1608        struct ptlrpc_request *request;
1609        struct timespec64 work_start;
1610        struct timespec64 work_end;
1611        struct timespec64 timediff;
1612        struct timespec64 arrived;
1613        unsigned long timediff_usecs;
1614        unsigned long arrived_usecs;
1615        int rc;
1616        int fail_opc = 0;
1617
1618        request = ptlrpc_server_request_get(svcpt, false);
1619        if (!request)
1620                return 0;
1621
1622        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
1623                fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
1624        else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
1625                fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
1626
1627        if (unlikely(fail_opc)) {
1628                if (request->rq_export && request->rq_ops)
1629                        OBD_FAIL_TIMEOUT(fail_opc, 4);
1630        }
1631
1632        ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
1633
1634        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
1635                libcfs_debug_dumplog();
1636
1637        ktime_get_real_ts64(&work_start);
1638        timediff = timespec64_sub(work_start, request->rq_arrival_time);
1639        timediff_usecs = timediff.tv_sec * USEC_PER_SEC +
1640                         timediff.tv_nsec / NSEC_PER_USEC;
1641        if (likely(svc->srv_stats)) {
1642                lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR,
1643                                    timediff_usecs);
1644                lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR,
1645                                    svcpt->scp_nreqs_incoming);
1646                lprocfs_counter_add(svc->srv_stats, PTLRPC_REQACTIVE_CNTR,
1647                                    svcpt->scp_nreqs_active);
1648                lprocfs_counter_add(svc->srv_stats, PTLRPC_TIMEOUT,
1649                                    at_get(&svcpt->scp_at_estimate));
1650        }
1651
1652        rc = lu_context_init(&request->rq_session, LCT_SESSION | LCT_NOREF);
1653        if (rc) {
1654                CERROR("Failure to initialize session: %d\n", rc);
1655                goto out_req;
1656        }
1657        request->rq_session.lc_thread = thread;
1658        request->rq_session.lc_cookie = 0x5;
1659        lu_context_enter(&request->rq_session);
1660
1661        CDEBUG(D_NET, "got req %llu\n", request->rq_xid);
1662
1663        request->rq_svc_thread = thread;
1664        if (thread)
1665                request->rq_svc_thread->t_env->le_ses = &request->rq_session;
1666
1667        if (likely(request->rq_export)) {
1668                if (unlikely(ptlrpc_check_req(request)))
1669                        goto put_conn;
1670        }
1671
1672        /* Discard requests queued for longer than the deadline.
1673         * The deadline is increased if we send an early reply.
1674         */
1675        if (ktime_get_real_seconds() > request->rq_deadline) {
1676                DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s: deadline " CFS_DURATION_T ":" CFS_DURATION_T "s ago\n",
1677                          libcfs_id2str(request->rq_peer),
1678                          (long)(request->rq_deadline -
1679                                 request->rq_arrival_time.tv_sec),
1680                          (long)(ktime_get_real_seconds() -
1681                                 request->rq_deadline));
1682                goto put_conn;
1683        }
1684
1685        CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d\n",
1686               current_comm(),
1687               (request->rq_export ?
1688                (char *)request->rq_export->exp_client_uuid.uuid : "0"),
1689               (request->rq_export ?
1690                atomic_read(&request->rq_export->exp_refcount) : -99),
1691               lustre_msg_get_status(request->rq_reqmsg), request->rq_xid,
1692               libcfs_id2str(request->rq_peer),
1693               lustre_msg_get_opc(request->rq_reqmsg));
1694
1695        if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
1696                CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
1697
1698        rc = svc->srv_ops.so_req_handler(request);
1699
1700        ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
1701
1702put_conn:
1703        lu_context_exit(&request->rq_session);
1704        lu_context_fini(&request->rq_session);
1705
1706        if (unlikely(ktime_get_real_seconds() > request->rq_deadline)) {
1707                DEBUG_REQ(D_WARNING, request,
1708                          "Request took longer than estimated (%lld:%llds); "
1709                          "client may timeout.",
1710                          (s64)request->rq_deadline -
1711                               request->rq_arrival_time.tv_sec,
1712                          (s64)ktime_get_real_seconds() - request->rq_deadline);
1713        }
1714
1715        ktime_get_real_ts64(&work_end);
1716        timediff = timespec64_sub(work_end, work_start);
1717        timediff_usecs = timediff.tv_sec * USEC_PER_SEC +
1718                         timediff.tv_nsec / NSEC_PER_USEC;
1719        arrived = timespec64_sub(work_end, request->rq_arrival_time);
1720        arrived_usecs = arrived.tv_sec * USEC_PER_SEC +
1721                         arrived.tv_nsec / NSEC_PER_USEC;
1722        CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d Request processed in %ldus (%ldus total) trans %llu rc %d/%d\n",
1723               current_comm(),
1724               (request->rq_export ?
1725                (char *)request->rq_export->exp_client_uuid.uuid : "0"),
1726               (request->rq_export ?
1727                atomic_read(&request->rq_export->exp_refcount) : -99),
1728               lustre_msg_get_status(request->rq_reqmsg),
1729               request->rq_xid,
1730               libcfs_id2str(request->rq_peer),
1731               lustre_msg_get_opc(request->rq_reqmsg),
1732               timediff_usecs,
1733               arrived_usecs,
1734               (request->rq_repmsg ?
1735                lustre_msg_get_transno(request->rq_repmsg) :
1736                request->rq_transno),
1737               request->rq_status,
1738               (request->rq_repmsg ?
1739                lustre_msg_get_status(request->rq_repmsg) : -999));
1740        if (likely(svc->srv_stats && request->rq_reqmsg)) {
1741                __u32 op = lustre_msg_get_opc(request->rq_reqmsg);
1742                int opc = opcode_offset(op);
1743
1744                if (opc > 0 && !(op == LDLM_ENQUEUE || op == MDS_REINT)) {
1745                        LASSERT(opc < LUSTRE_MAX_OPCODES);
1746                        lprocfs_counter_add(svc->srv_stats,
1747                                            opc + EXTRA_MAX_OPCODES,
1748                                            timediff_usecs);
1749                }
1750        }
1751        if (unlikely(request->rq_early_count)) {
1752                DEBUG_REQ(D_ADAPTTO, request,
1753                          "sent %d early replies before finishing in %llds",
1754                          request->rq_early_count,
1755                          (s64)work_end.tv_sec -
1756                          request->rq_arrival_time.tv_sec);
1757        }
1758
1759out_req:
1760        ptlrpc_server_finish_active_request(svcpt, request);
1761
1762        return 1;
1763}
1764
1765/**
1766 * An internal function to process a single reply state object.
1767 */
1768static int
1769ptlrpc_handle_rs(struct ptlrpc_reply_state *rs)
1770{
1771        struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
1772        struct ptlrpc_service *svc = svcpt->scp_service;
1773        struct obd_export *exp;
1774        int nlocks;
1775        int been_handled;
1776
1777        exp = rs->rs_export;
1778
1779        LASSERT(rs->rs_difficult);
1780        LASSERT(rs->rs_scheduled);
1781        LASSERT(list_empty(&rs->rs_list));
1782
1783        spin_lock(&exp->exp_lock);
1784        /* Noop if removed already */
1785        list_del_init(&rs->rs_exp_list);
1786        spin_unlock(&exp->exp_lock);
1787
1788        /* The disk commit callback holds exp_uncommitted_replies_lock while it
1789         * iterates over newly committed replies, removing them from
1790         * exp_uncommitted_replies.  It then drops this lock and schedules the
1791         * replies it found for handling here.
1792         *
1793         * We can avoid contention for exp_uncommitted_replies_lock between the
1794         * HRT threads and further commit callbacks by checking rs_committed
1795         * which is set in the commit callback while it holds both
1796         * rs_lock and exp_uncommitted_reples.
1797         *
1798         * If we see rs_committed clear, the commit callback _may_ not have
1799         * handled this reply yet and we race with it to grab
1800         * exp_uncommitted_replies_lock before removing the reply from
1801         * exp_uncommitted_replies.  Note that if we lose the race and the
1802         * reply has already been removed, list_del_init() is a noop.
1803         *
1804         * If we see rs_committed set, we know the commit callback is handling,
1805         * or has handled this reply since store reordering might allow us to
1806         * see rs_committed set out of sequence.  But since this is done
1807         * holding rs_lock, we can be sure it has all completed once we hold
1808         * rs_lock, which we do right next.
1809         */
1810        if (!rs->rs_committed) {
1811                spin_lock(&exp->exp_uncommitted_replies_lock);
1812                list_del_init(&rs->rs_obd_list);
1813                spin_unlock(&exp->exp_uncommitted_replies_lock);
1814        }
1815
1816        spin_lock(&rs->rs_lock);
1817
1818        been_handled = rs->rs_handled;
1819        rs->rs_handled = 1;
1820
1821        nlocks = rs->rs_nlocks;          /* atomic "steal", but */
1822        rs->rs_nlocks = 0;                    /* locks still on rs_locks! */
1823
1824        if (nlocks == 0 && !been_handled) {
1825                /* If we see this, we should already have seen the warning
1826                 * in mds_steal_ack_locks()
1827                 */
1828                CDEBUG(D_HA, "All locks stolen from rs %p x%lld.t%lld o%d NID %s\n",
1829                       rs,
1830                       rs->rs_xid, rs->rs_transno, rs->rs_opc,
1831                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
1832        }
1833
1834        if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
1835                spin_unlock(&rs->rs_lock);
1836
1837                if (!been_handled && rs->rs_on_net) {
1838                        LNetMDUnlink(rs->rs_md_h);
1839                        /* Ignore return code; we're racing with completion */
1840                }
1841
1842                while (nlocks-- > 0)
1843                        ldlm_lock_decref(&rs->rs_locks[nlocks],
1844                                         rs->rs_modes[nlocks]);
1845
1846                spin_lock(&rs->rs_lock);
1847        }
1848
1849        rs->rs_scheduled = 0;
1850
1851        if (!rs->rs_on_net) {
1852                /* Off the net */
1853                spin_unlock(&rs->rs_lock);
1854
1855                class_export_put(exp);
1856                rs->rs_export = NULL;
1857                ptlrpc_rs_decref(rs);
1858                if (atomic_dec_and_test(&svcpt->scp_nreps_difficult) &&
1859                    svc->srv_is_stopping)
1860                        wake_up_all(&svcpt->scp_waitq);
1861                return 1;
1862        }
1863
1864        /* still on the net; callback will schedule */
1865        spin_unlock(&rs->rs_lock);
1866        return 1;
1867}
1868
1869static void
1870ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt)
1871{
1872        int avail = svcpt->scp_nrqbds_posted;
1873        int low_water = test_req_buffer_pressure ? 0 :
1874                        svcpt->scp_service->srv_nbuf_per_group / 2;
1875
1876        /* NB I'm not locking; just looking. */
1877
1878        /* CAVEAT EMPTOR: We might be allocating buffers here because we've
1879         * allowed the request history to grow out of control.  We could put a
1880         * sanity check on that here and cull some history if we need the
1881         * space.
1882         */
1883
1884        if (avail <= low_water)
1885                ptlrpc_grow_req_bufs(svcpt, 1);
1886
1887        if (svcpt->scp_service->srv_stats) {
1888                lprocfs_counter_add(svcpt->scp_service->srv_stats,
1889                                    PTLRPC_REQBUF_AVAIL_CNTR, avail);
1890        }
1891}
1892
1893static int
1894ptlrpc_retry_rqbds(void *arg)
1895{
1896        struct ptlrpc_service_part *svcpt = arg;
1897
1898        svcpt->scp_rqbd_timeout = 0;
1899        return -ETIMEDOUT;
1900}
1901
1902static inline int
1903ptlrpc_threads_enough(struct ptlrpc_service_part *svcpt)
1904{
1905        return svcpt->scp_nreqs_active <
1906               svcpt->scp_nthrs_running - 1 -
1907               (svcpt->scp_service->srv_ops.so_hpreq_handler != NULL);
1908}
1909
1910/**
1911 * allowed to create more threads
1912 * user can call it w/o any lock but need to hold
1913 * ptlrpc_service_part::scp_lock to get reliable result
1914 */
1915static inline int
1916ptlrpc_threads_increasable(struct ptlrpc_service_part *svcpt)
1917{
1918        return svcpt->scp_nthrs_running +
1919               svcpt->scp_nthrs_starting <
1920               svcpt->scp_service->srv_nthrs_cpt_limit;
1921}
1922
1923/**
1924 * too many requests and allowed to create more threads
1925 */
1926static inline int
1927ptlrpc_threads_need_create(struct ptlrpc_service_part *svcpt)
1928{
1929        return !ptlrpc_threads_enough(svcpt) &&
1930                ptlrpc_threads_increasable(svcpt);
1931}
1932
1933static inline int
1934ptlrpc_thread_stopping(struct ptlrpc_thread *thread)
1935{
1936        return thread_is_stopping(thread) ||
1937               thread->t_svcpt->scp_service->srv_is_stopping;
1938}
1939
1940static inline int
1941ptlrpc_rqbd_pending(struct ptlrpc_service_part *svcpt)
1942{
1943        return !list_empty(&svcpt->scp_rqbd_idle) &&
1944               svcpt->scp_rqbd_timeout == 0;
1945}
1946
1947static inline int
1948ptlrpc_at_check(struct ptlrpc_service_part *svcpt)
1949{
1950        return svcpt->scp_at_check;
1951}
1952
1953/**
1954 * requests wait on preprocessing
1955 * user can call it w/o any lock but need to hold
1956 * ptlrpc_service_part::scp_lock to get reliable result
1957 */
1958static inline int
1959ptlrpc_server_request_incoming(struct ptlrpc_service_part *svcpt)
1960{
1961        return !list_empty(&svcpt->scp_req_incoming);
1962}
1963
1964static __attribute__((__noinline__)) int
1965ptlrpc_wait_event(struct ptlrpc_service_part *svcpt,
1966                  struct ptlrpc_thread *thread)
1967{
1968        /* Don't exit while there are replies to be handled */
1969        struct l_wait_info lwi = LWI_TIMEOUT(svcpt->scp_rqbd_timeout,
1970                                             ptlrpc_retry_rqbds, svcpt);
1971
1972        /* XXX: Add this back when libcfs watchdog is merged upstream
1973        lc_watchdog_disable(thread->t_watchdog);
1974         */
1975
1976        cond_resched();
1977
1978        l_wait_event_exclusive_head(svcpt->scp_waitq,
1979                                ptlrpc_thread_stopping(thread) ||
1980                                ptlrpc_server_request_incoming(svcpt) ||
1981                                ptlrpc_server_request_pending(svcpt, false) ||
1982                                ptlrpc_rqbd_pending(svcpt) ||
1983                                ptlrpc_at_check(svcpt), &lwi);
1984
1985        if (ptlrpc_thread_stopping(thread))
1986                return -EINTR;
1987
1988        /*
1989        lc_watchdog_touch(thread->t_watchdog,
1990                          ptlrpc_server_get_timeout(svcpt));
1991         */
1992        return 0;
1993}
1994
1995/**
1996 * Main thread body for service threads.
1997 * Waits in a loop waiting for new requests to process to appear.
1998 * Every time an incoming requests is added to its queue, a waitq
1999 * is woken up and one of the threads will handle it.
2000 */
2001static int ptlrpc_main(void *arg)
2002{
2003        struct ptlrpc_thread *thread = arg;
2004        struct ptlrpc_service_part *svcpt = thread->t_svcpt;
2005        struct ptlrpc_service *svc = svcpt->scp_service;
2006        struct ptlrpc_reply_state *rs;
2007        struct group_info *ginfo = NULL;
2008        struct lu_env *env;
2009        int counter = 0, rc = 0;
2010
2011        thread->t_pid = current_pid();
2012        unshare_fs_struct();
2013
2014        /* NB: we will call cfs_cpt_bind() for all threads, because we
2015         * might want to run lustre server only on a subset of system CPUs,
2016         * in that case ->scp_cpt is CFS_CPT_ANY
2017         */
2018        rc = cfs_cpt_bind(svc->srv_cptable, svcpt->scp_cpt);
2019        if (rc != 0) {
2020                CWARN("%s: failed to bind %s on CPT %d\n",
2021                      svc->srv_name, thread->t_name, svcpt->scp_cpt);
2022        }
2023
2024        ginfo = groups_alloc(0);
2025        if (!ginfo) {
2026                rc = -ENOMEM;
2027                goto out;
2028        }
2029
2030        set_current_groups(ginfo);
2031        put_group_info(ginfo);
2032
2033        if (svc->srv_ops.so_thr_init) {
2034                rc = svc->srv_ops.so_thr_init(thread);
2035                if (rc)
2036                        goto out;
2037        }
2038
2039        env = kzalloc(sizeof(*env), GFP_NOFS);
2040        if (!env) {
2041                rc = -ENOMEM;
2042                goto out_srv_fini;
2043        }
2044
2045        rc = lu_context_init(&env->le_ctx,
2046                             svc->srv_ctx_tags|LCT_REMEMBER|LCT_NOREF);
2047        if (rc)
2048                goto out_srv_fini;
2049
2050        thread->t_env = env;
2051        env->le_ctx.lc_thread = thread;
2052        env->le_ctx.lc_cookie = 0x6;
2053
2054        while (!list_empty(&svcpt->scp_rqbd_idle)) {
2055                rc = ptlrpc_server_post_idle_rqbds(svcpt);
2056                if (rc >= 0)
2057                        continue;
2058
2059                CERROR("Failed to post rqbd for %s on CPT %d: %d\n",
2060                       svc->srv_name, svcpt->scp_cpt, rc);
2061                goto out_srv_fini;
2062        }
2063
2064        /* Alloc reply state structure for this one */
2065        rs = libcfs_kvzalloc(svc->srv_max_reply_size, GFP_NOFS);
2066        if (!rs) {
2067                rc = -ENOMEM;
2068                goto out_srv_fini;
2069        }
2070
2071        spin_lock(&svcpt->scp_lock);
2072
2073        LASSERT(thread_is_starting(thread));
2074        thread_clear_flags(thread, SVC_STARTING);
2075
2076        LASSERT(svcpt->scp_nthrs_starting == 1);
2077        svcpt->scp_nthrs_starting--;
2078
2079        /* SVC_STOPPING may already be set here if someone else is trying
2080         * to stop the service while this new thread has been dynamically
2081         * forked. We still set SVC_RUNNING to let our creator know that
2082         * we are now running, however we will exit as soon as possible
2083         */
2084        thread_add_flags(thread, SVC_RUNNING);
2085        svcpt->scp_nthrs_running++;
2086        spin_unlock(&svcpt->scp_lock);
2087
2088        /* wake up our creator in case he's still waiting. */
2089        wake_up(&thread->t_ctl_waitq);
2090
2091        /*
2092        thread->t_watchdog = lc_watchdog_add(ptlrpc_server_get_timeout(svcpt),
2093                                             NULL, NULL);
2094         */
2095
2096        spin_lock(&svcpt->scp_rep_lock);
2097        list_add(&rs->rs_list, &svcpt->scp_rep_idle);
2098        wake_up(&svcpt->scp_rep_waitq);
2099        spin_unlock(&svcpt->scp_rep_lock);
2100
2101        CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
2102               svcpt->scp_nthrs_running);
2103
2104        /* XXX maintain a list of all managed devices: insert here */
2105        while (!ptlrpc_thread_stopping(thread)) {
2106                if (ptlrpc_wait_event(svcpt, thread))
2107                        break;
2108
2109                ptlrpc_check_rqbd_pool(svcpt);
2110
2111                if (ptlrpc_threads_need_create(svcpt)) {
2112                        /* Ignore return code - we tried... */
2113                        ptlrpc_start_thread(svcpt, 0);
2114                }
2115
2116                /* Process all incoming reqs before handling any */
2117                if (ptlrpc_server_request_incoming(svcpt)) {
2118                        lu_context_enter(&env->le_ctx);
2119                        env->le_ses = NULL;
2120                        ptlrpc_server_handle_req_in(svcpt, thread);
2121                        lu_context_exit(&env->le_ctx);
2122
2123                        /* but limit ourselves in case of flood */
2124                        if (counter++ < 100)
2125                                continue;
2126                        counter = 0;
2127                }
2128
2129                if (ptlrpc_at_check(svcpt))
2130                        ptlrpc_at_check_timed(svcpt);
2131
2132                if (ptlrpc_server_request_pending(svcpt, false)) {
2133                        lu_context_enter(&env->le_ctx);
2134                        ptlrpc_server_handle_request(svcpt, thread);
2135                        lu_context_exit(&env->le_ctx);
2136                }
2137
2138                if (ptlrpc_rqbd_pending(svcpt) &&
2139                    ptlrpc_server_post_idle_rqbds(svcpt) < 0) {
2140                        /* I just failed to repost request buffers.
2141                         * Wait for a timeout (unless something else
2142                         * happens) before I try again
2143                         */
2144                        svcpt->scp_rqbd_timeout = cfs_time_seconds(1) / 10;
2145                        CDEBUG(D_RPCTRACE, "Posted buffers: %d\n",
2146                               svcpt->scp_nrqbds_posted);
2147                }
2148        }
2149
2150        /*
2151        lc_watchdog_delete(thread->t_watchdog);
2152        thread->t_watchdog = NULL;
2153        */
2154
2155out_srv_fini:
2156        /*
2157         * deconstruct service specific state created by ptlrpc_start_thread()
2158         */
2159        if (svc->srv_ops.so_thr_done)
2160                svc->srv_ops.so_thr_done(thread);
2161
2162        if (env) {
2163                lu_context_fini(&env->le_ctx);
2164                kfree(env);
2165        }
2166out:
2167        CDEBUG(D_RPCTRACE, "service thread [ %p : %u ] %d exiting: rc %d\n",
2168               thread, thread->t_pid, thread->t_id, rc);
2169
2170        spin_lock(&svcpt->scp_lock);
2171        if (thread_test_and_clear_flags(thread, SVC_STARTING))
2172                svcpt->scp_nthrs_starting--;
2173
2174        if (thread_test_and_clear_flags(thread, SVC_RUNNING)) {
2175                /* must know immediately */
2176                svcpt->scp_nthrs_running--;
2177        }
2178
2179        thread->t_id = rc;
2180        thread_add_flags(thread, SVC_STOPPED);
2181
2182        wake_up(&thread->t_ctl_waitq);
2183        spin_unlock(&svcpt->scp_lock);
2184
2185        return rc;
2186}
2187
2188static int hrt_dont_sleep(struct ptlrpc_hr_thread *hrt,
2189                          struct list_head *replies)
2190{
2191        int result;
2192
2193        spin_lock(&hrt->hrt_lock);
2194
2195        list_splice_init(&hrt->hrt_queue, replies);
2196        result = ptlrpc_hr.hr_stopping || !list_empty(replies);
2197
2198        spin_unlock(&hrt->hrt_lock);
2199        return result;
2200}
2201
2202/**
2203 * Main body of "handle reply" function.
2204 * It processes acked reply states
2205 */
2206static int ptlrpc_hr_main(void *arg)
2207{
2208        struct ptlrpc_hr_thread *hrt = arg;
2209        struct ptlrpc_hr_partition *hrp = hrt->hrt_partition;
2210        LIST_HEAD(replies);
2211        char threadname[20];
2212        int rc;
2213
2214        snprintf(threadname, sizeof(threadname), "ptlrpc_hr%02d_%03d",
2215                 hrp->hrp_cpt, hrt->hrt_id);
2216        unshare_fs_struct();
2217
2218        rc = cfs_cpt_bind(ptlrpc_hr.hr_cpt_table, hrp->hrp_cpt);
2219        if (rc != 0) {
2220                CWARN("Failed to bind %s on CPT %d of CPT table %p: rc = %d\n",
2221                      threadname, hrp->hrp_cpt, ptlrpc_hr.hr_cpt_table, rc);
2222        }
2223
2224        atomic_inc(&hrp->hrp_nstarted);
2225        wake_up(&ptlrpc_hr.hr_waitq);
2226
2227        while (!ptlrpc_hr.hr_stopping) {
2228                l_wait_condition(hrt->hrt_waitq, hrt_dont_sleep(hrt, &replies));
2229
2230                while (!list_empty(&replies)) {
2231                        struct ptlrpc_reply_state *rs;
2232
2233                        rs = list_entry(replies.prev, struct ptlrpc_reply_state,
2234                                        rs_list);
2235                        list_del_init(&rs->rs_list);
2236                        ptlrpc_handle_rs(rs);
2237                }
2238        }
2239
2240        atomic_inc(&hrp->hrp_nstopped);
2241        wake_up(&ptlrpc_hr.hr_waitq);
2242
2243        return 0;
2244}
2245
2246static void ptlrpc_stop_hr_threads(void)
2247{
2248        struct ptlrpc_hr_partition *hrp;
2249        int i;
2250        int j;
2251
2252        ptlrpc_hr.hr_stopping = 1;
2253
2254        cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2255                if (!hrp->hrp_thrs)
2256                        continue; /* uninitialized */
2257                for (j = 0; j < hrp->hrp_nthrs; j++)
2258                        wake_up_all(&hrp->hrp_thrs[j].hrt_waitq);
2259        }
2260
2261        cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2262                if (!hrp->hrp_thrs)
2263                        continue; /* uninitialized */
2264                wait_event(ptlrpc_hr.hr_waitq,
2265                           atomic_read(&hrp->hrp_nstopped) ==
2266                           atomic_read(&hrp->hrp_nstarted));
2267        }
2268}
2269
2270static int ptlrpc_start_hr_threads(void)
2271{
2272        struct ptlrpc_hr_partition *hrp;
2273        int i;
2274        int j;
2275
2276        cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2277                int rc = 0;
2278
2279                for (j = 0; j < hrp->hrp_nthrs; j++) {
2280                        struct  ptlrpc_hr_thread *hrt = &hrp->hrp_thrs[j];
2281                        struct task_struct *task;
2282
2283                        task = kthread_run(ptlrpc_hr_main,
2284                                           &hrp->hrp_thrs[j],
2285                                           "ptlrpc_hr%02d_%03d",
2286                                           hrp->hrp_cpt, hrt->hrt_id);
2287                        if (IS_ERR(task)) {
2288                                rc = PTR_ERR(task);
2289                                break;
2290                        }
2291                }
2292                wait_event(ptlrpc_hr.hr_waitq,
2293                           atomic_read(&hrp->hrp_nstarted) == j);
2294
2295                if (rc < 0) {
2296                        CERROR("cannot start reply handler thread %d:%d: rc = %d\n",
2297                               i, j, rc);
2298                        ptlrpc_stop_hr_threads();
2299                        return rc;
2300                }
2301        }
2302        return 0;
2303}
2304
2305static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
2306{
2307        struct l_wait_info lwi = { 0 };
2308        struct ptlrpc_thread *thread;
2309        LIST_HEAD(zombie);
2310
2311        CDEBUG(D_INFO, "Stopping threads for service %s\n",
2312               svcpt->scp_service->srv_name);
2313
2314        spin_lock(&svcpt->scp_lock);
2315        /* let the thread know that we would like it to stop asap */
2316        list_for_each_entry(thread, &svcpt->scp_threads, t_link) {
2317                CDEBUG(D_INFO, "Stopping thread %s #%u\n",
2318                       svcpt->scp_service->srv_thread_name, thread->t_id);
2319                thread_add_flags(thread, SVC_STOPPING);
2320        }
2321
2322        wake_up_all(&svcpt->scp_waitq);
2323
2324        while (!list_empty(&svcpt->scp_threads)) {
2325                thread = list_entry(svcpt->scp_threads.next,
2326                                    struct ptlrpc_thread, t_link);
2327                if (thread_is_stopped(thread)) {
2328                        list_del(&thread->t_link);
2329                        list_add(&thread->t_link, &zombie);
2330                        continue;
2331                }
2332                spin_unlock(&svcpt->scp_lock);
2333
2334                CDEBUG(D_INFO, "waiting for stopping-thread %s #%u\n",
2335                       svcpt->scp_service->srv_thread_name, thread->t_id);
2336                l_wait_event(thread->t_ctl_waitq,
2337                             thread_is_stopped(thread), &lwi);
2338
2339                spin_lock(&svcpt->scp_lock);
2340        }
2341
2342        spin_unlock(&svcpt->scp_lock);
2343
2344        while (!list_empty(&zombie)) {
2345                thread = list_entry(zombie.next,
2346                                        struct ptlrpc_thread, t_link);
2347                list_del(&thread->t_link);
2348                kfree(thread);
2349        }
2350}
2351
2352/**
2353 * Stops all threads of a particular service \a svc
2354 */
2355static void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
2356{
2357        struct ptlrpc_service_part *svcpt;
2358        int i;
2359
2360        ptlrpc_service_for_each_part(svcpt, i, svc) {
2361                if (svcpt->scp_service)
2362                        ptlrpc_svcpt_stop_threads(svcpt);
2363        }
2364}
2365
2366int ptlrpc_start_threads(struct ptlrpc_service *svc)
2367{
2368        int rc = 0;
2369        int i;
2370        int j;
2371
2372        /* We require 2 threads min, see note in ptlrpc_server_handle_request */
2373        LASSERT(svc->srv_nthrs_cpt_init >= PTLRPC_NTHRS_INIT);
2374
2375        for (i = 0; i < svc->srv_ncpts; i++) {
2376                for (j = 0; j < svc->srv_nthrs_cpt_init; j++) {
2377                        rc = ptlrpc_start_thread(svc->srv_parts[i], 1);
2378                        if (rc == 0)
2379                                continue;
2380
2381                        if (rc != -EMFILE)
2382                                goto failed;
2383                        /* We have enough threads, don't start more. b=15759 */
2384                        break;
2385                }
2386        }
2387
2388        return 0;
2389 failed:
2390        CERROR("cannot start %s thread #%d_%d: rc %d\n",
2391               svc->srv_thread_name, i, j, rc);
2392        ptlrpc_stop_all_threads(svc);
2393        return rc;
2394}
2395EXPORT_SYMBOL(ptlrpc_start_threads);
2396
2397int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
2398{
2399        struct l_wait_info lwi = { 0 };
2400        struct ptlrpc_thread *thread;
2401        struct ptlrpc_service *svc;
2402        struct task_struct *task;
2403        int rc;
2404
2405        svc = svcpt->scp_service;
2406
2407        CDEBUG(D_RPCTRACE, "%s[%d] started %d min %d max %d\n",
2408               svc->srv_name, svcpt->scp_cpt, svcpt->scp_nthrs_running,
2409               svc->srv_nthrs_cpt_init, svc->srv_nthrs_cpt_limit);
2410
2411 again:
2412        if (unlikely(svc->srv_is_stopping))
2413                return -ESRCH;
2414
2415        if (!ptlrpc_threads_increasable(svcpt) ||
2416            (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
2417             svcpt->scp_nthrs_running == svc->srv_nthrs_cpt_init - 1))
2418                return -EMFILE;
2419
2420        thread = kzalloc_node(sizeof(*thread), GFP_NOFS,
2421                              cfs_cpt_spread_node(svc->srv_cptable,
2422                                                  svcpt->scp_cpt));
2423        if (!thread)
2424                return -ENOMEM;
2425        init_waitqueue_head(&thread->t_ctl_waitq);
2426
2427        spin_lock(&svcpt->scp_lock);
2428        if (!ptlrpc_threads_increasable(svcpt)) {
2429                spin_unlock(&svcpt->scp_lock);
2430                kfree(thread);
2431                return -EMFILE;
2432        }
2433
2434        if (svcpt->scp_nthrs_starting != 0) {
2435                /* serialize starting because some modules (obdfilter)
2436                 * might require unique and contiguous t_id
2437                 */
2438                LASSERT(svcpt->scp_nthrs_starting == 1);
2439                spin_unlock(&svcpt->scp_lock);
2440                kfree(thread);
2441                if (wait) {
2442                        CDEBUG(D_INFO, "Waiting for creating thread %s #%d\n",
2443                               svc->srv_thread_name, svcpt->scp_thr_nextid);
2444                        schedule();
2445                        goto again;
2446                }
2447
2448                CDEBUG(D_INFO, "Creating thread %s #%d race, retry later\n",
2449                       svc->srv_thread_name, svcpt->scp_thr_nextid);
2450                return -EAGAIN;
2451        }
2452
2453        svcpt->scp_nthrs_starting++;
2454        thread->t_id = svcpt->scp_thr_nextid++;
2455        thread_add_flags(thread, SVC_STARTING);
2456        thread->t_svcpt = svcpt;
2457
2458        list_add(&thread->t_link, &svcpt->scp_threads);
2459        spin_unlock(&svcpt->scp_lock);
2460
2461        if (svcpt->scp_cpt >= 0) {
2462                snprintf(thread->t_name, sizeof(thread->t_name), "%s%02d_%03d",
2463                         svc->srv_thread_name, svcpt->scp_cpt, thread->t_id);
2464        } else {
2465                snprintf(thread->t_name, sizeof(thread->t_name), "%s_%04d",
2466                         svc->srv_thread_name, thread->t_id);
2467        }
2468
2469        CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name);
2470        task = kthread_run(ptlrpc_main, thread, "%s", thread->t_name);
2471        if (IS_ERR(task)) {
2472                rc = PTR_ERR(task);
2473                CERROR("cannot start thread '%s': rc = %d\n",
2474                       thread->t_name, rc);
2475                spin_lock(&svcpt->scp_lock);
2476                --svcpt->scp_nthrs_starting;
2477                if (thread_is_stopping(thread)) {
2478                        /* this ptlrpc_thread is being handled
2479                         * by ptlrpc_svcpt_stop_threads now
2480                         */
2481                        thread_add_flags(thread, SVC_STOPPED);
2482                        wake_up(&thread->t_ctl_waitq);
2483                        spin_unlock(&svcpt->scp_lock);
2484                } else {
2485                        list_del(&thread->t_link);
2486                        spin_unlock(&svcpt->scp_lock);
2487                        kfree(thread);
2488                }
2489                return rc;
2490        }
2491
2492        if (!wait)
2493                return 0;
2494
2495        l_wait_event(thread->t_ctl_waitq,
2496                     thread_is_running(thread) || thread_is_stopped(thread),
2497                     &lwi);
2498
2499        rc = thread_is_stopped(thread) ? thread->t_id : 0;
2500        return rc;
2501}
2502
2503int ptlrpc_hr_init(void)
2504{
2505        struct ptlrpc_hr_partition *hrp;
2506        struct ptlrpc_hr_thread *hrt;
2507        int rc;
2508        int i;
2509        int j;
2510        int weight;
2511
2512        memset(&ptlrpc_hr, 0, sizeof(ptlrpc_hr));
2513        ptlrpc_hr.hr_cpt_table = cfs_cpt_table;
2514
2515        ptlrpc_hr.hr_partitions = cfs_percpt_alloc(ptlrpc_hr.hr_cpt_table,
2516                                                   sizeof(*hrp));
2517        if (!ptlrpc_hr.hr_partitions)
2518                return -ENOMEM;
2519
2520        init_waitqueue_head(&ptlrpc_hr.hr_waitq);
2521
2522        weight = cpumask_weight(topology_sibling_cpumask(0));
2523
2524        cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2525                hrp->hrp_cpt = i;
2526
2527                atomic_set(&hrp->hrp_nstarted, 0);
2528                atomic_set(&hrp->hrp_nstopped, 0);
2529
2530                hrp->hrp_nthrs = cfs_cpt_weight(ptlrpc_hr.hr_cpt_table, i);
2531                hrp->hrp_nthrs /= weight;
2532
2533                LASSERT(hrp->hrp_nthrs > 0);
2534                hrp->hrp_thrs =
2535                        kzalloc_node(hrp->hrp_nthrs * sizeof(*hrt), GFP_NOFS,
2536                                cfs_cpt_spread_node(ptlrpc_hr.hr_cpt_table,
2537                                                    i));
2538                if (!hrp->hrp_thrs) {
2539                        rc = -ENOMEM;
2540                        goto out;
2541                }
2542
2543                for (j = 0; j < hrp->hrp_nthrs; j++) {
2544                        hrt = &hrp->hrp_thrs[j];
2545
2546                        hrt->hrt_id = j;
2547                        hrt->hrt_partition = hrp;
2548                        init_waitqueue_head(&hrt->hrt_waitq);
2549                        spin_lock_init(&hrt->hrt_lock);
2550                        INIT_LIST_HEAD(&hrt->hrt_queue);
2551                }
2552        }
2553
2554        rc = ptlrpc_start_hr_threads();
2555out:
2556        if (rc != 0)
2557                ptlrpc_hr_fini();
2558        return rc;
2559}
2560
2561void ptlrpc_hr_fini(void)
2562{
2563        struct ptlrpc_hr_partition *hrp;
2564        int i;
2565
2566        if (!ptlrpc_hr.hr_partitions)
2567                return;
2568
2569        ptlrpc_stop_hr_threads();
2570
2571        cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2572                kfree(hrp->hrp_thrs);
2573        }
2574
2575        cfs_percpt_free(ptlrpc_hr.hr_partitions);
2576        ptlrpc_hr.hr_partitions = NULL;
2577}
2578
2579/**
2580 * Wait until all already scheduled replies are processed.
2581 */
2582static void ptlrpc_wait_replies(struct ptlrpc_service_part *svcpt)
2583{
2584        while (1) {
2585                int rc;
2586                struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10),
2587                                                     NULL, NULL);
2588
2589                rc = l_wait_event(svcpt->scp_waitq,
2590                     atomic_read(&svcpt->scp_nreps_difficult) == 0, &lwi);
2591                if (rc == 0)
2592                        break;
2593                CWARN("Unexpectedly long timeout %s %p\n",
2594                      svcpt->scp_service->srv_name, svcpt->scp_service);
2595        }
2596}
2597
2598static void
2599ptlrpc_service_del_atimer(struct ptlrpc_service *svc)
2600{
2601        struct ptlrpc_service_part *svcpt;
2602        int i;
2603
2604        /* early disarm AT timer... */
2605        ptlrpc_service_for_each_part(svcpt, i, svc) {
2606                if (svcpt->scp_service)
2607                        del_timer(&svcpt->scp_at_timer);
2608        }
2609}
2610
2611static void
2612ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc)
2613{
2614        struct ptlrpc_service_part *svcpt;
2615        struct ptlrpc_request_buffer_desc *rqbd;
2616        struct l_wait_info lwi;
2617        int rc;
2618        int i;
2619
2620        /* All history will be culled when the next request buffer is
2621         * freed in ptlrpc_service_purge_all()
2622         */
2623        svc->srv_hist_nrqbds_cpt_max = 0;
2624
2625        rc = LNetClearLazyPortal(svc->srv_req_portal);
2626        LASSERT(rc == 0);
2627
2628        ptlrpc_service_for_each_part(svcpt, i, svc) {
2629                if (!svcpt->scp_service)
2630                        break;
2631
2632                /* Unlink all the request buffers.  This forces a 'final'
2633                 * event with its 'unlink' flag set for each posted rqbd
2634                 */
2635                list_for_each_entry(rqbd, &svcpt->scp_rqbd_posted,
2636                                        rqbd_list) {
2637                        rc = LNetMDUnlink(rqbd->rqbd_md_h);
2638                        LASSERT(rc == 0 || rc == -ENOENT);
2639                }
2640        }
2641
2642        ptlrpc_service_for_each_part(svcpt, i, svc) {
2643                if (!svcpt->scp_service)
2644                        break;
2645
2646                /* Wait for the network to release any buffers
2647                 * it's currently filling
2648                 */
2649                spin_lock(&svcpt->scp_lock);
2650                while (svcpt->scp_nrqbds_posted != 0) {
2651                        spin_unlock(&svcpt->scp_lock);
2652                        /* Network access will complete in finite time but
2653                         * the HUGE timeout lets us CWARN for visibility
2654                         * of sluggish LNDs
2655                         */
2656                        lwi = LWI_TIMEOUT_INTERVAL(
2657                                        cfs_time_seconds(LONG_UNLINK),
2658                                        cfs_time_seconds(1), NULL, NULL);
2659                        rc = l_wait_event(svcpt->scp_waitq,
2660                                          svcpt->scp_nrqbds_posted == 0, &lwi);
2661                        if (rc == -ETIMEDOUT) {
2662                                CWARN("Service %s waiting for request buffers\n",
2663                                      svcpt->scp_service->srv_name);
2664                        }
2665                        spin_lock(&svcpt->scp_lock);
2666                }
2667                spin_unlock(&svcpt->scp_lock);
2668        }
2669}
2670
2671static void
2672ptlrpc_service_purge_all(struct ptlrpc_service *svc)
2673{
2674        struct ptlrpc_service_part *svcpt;
2675        struct ptlrpc_request_buffer_desc *rqbd;
2676        struct ptlrpc_request *req;
2677        struct ptlrpc_reply_state *rs;
2678        int i;
2679
2680        ptlrpc_service_for_each_part(svcpt, i, svc) {
2681                if (!svcpt->scp_service)
2682                        break;
2683
2684                spin_lock(&svcpt->scp_rep_lock);
2685                while (!list_empty(&svcpt->scp_rep_active)) {
2686                        rs = list_entry(svcpt->scp_rep_active.next,
2687                                        struct ptlrpc_reply_state, rs_list);
2688                        spin_lock(&rs->rs_lock);
2689                        ptlrpc_schedule_difficult_reply(rs);
2690                        spin_unlock(&rs->rs_lock);
2691                }
2692                spin_unlock(&svcpt->scp_rep_lock);
2693
2694                /* purge the request queue.  NB No new replies (rqbds
2695                 * all unlinked) and no service threads, so I'm the only
2696                 * thread noodling the request queue now
2697                 */
2698                while (!list_empty(&svcpt->scp_req_incoming)) {
2699                        req = list_entry(svcpt->scp_req_incoming.next,
2700                                         struct ptlrpc_request, rq_list);
2701
2702                        list_del(&req->rq_list);
2703                        svcpt->scp_nreqs_incoming--;
2704                        ptlrpc_server_finish_request(svcpt, req);
2705                }
2706
2707                while (ptlrpc_server_request_pending(svcpt, true)) {
2708                        req = ptlrpc_server_request_get(svcpt, true);
2709                        ptlrpc_server_finish_active_request(svcpt, req);
2710                }
2711
2712                LASSERT(list_empty(&svcpt->scp_rqbd_posted));
2713                LASSERT(svcpt->scp_nreqs_incoming == 0);
2714                LASSERT(svcpt->scp_nreqs_active == 0);
2715                /* history should have been culled by
2716                 * ptlrpc_server_finish_request
2717                 */
2718                LASSERT(svcpt->scp_hist_nrqbds == 0);
2719
2720                /* Now free all the request buffers since nothing
2721                 * references them any more...
2722                 */
2723
2724                while (!list_empty(&svcpt->scp_rqbd_idle)) {
2725                        rqbd = list_entry(svcpt->scp_rqbd_idle.next,
2726                                          struct ptlrpc_request_buffer_desc,
2727                                          rqbd_list);
2728                        ptlrpc_free_rqbd(rqbd);
2729                }
2730                ptlrpc_wait_replies(svcpt);
2731
2732                while (!list_empty(&svcpt->scp_rep_idle)) {
2733                        rs = list_entry(svcpt->scp_rep_idle.next,
2734                                        struct ptlrpc_reply_state,
2735                                        rs_list);
2736                        list_del(&rs->rs_list);
2737                        kvfree(rs);
2738                }
2739        }
2740}
2741
2742static void
2743ptlrpc_service_free(struct ptlrpc_service *svc)
2744{
2745        struct ptlrpc_service_part *svcpt;
2746        struct ptlrpc_at_array *array;
2747        int i;
2748
2749        ptlrpc_service_for_each_part(svcpt, i, svc) {
2750                if (!svcpt->scp_service)
2751                        break;
2752
2753                /* In case somebody rearmed this in the meantime */
2754                del_timer(&svcpt->scp_at_timer);
2755                array = &svcpt->scp_at_array;
2756
2757                kfree(array->paa_reqs_array);
2758                array->paa_reqs_array = NULL;
2759                kfree(array->paa_reqs_count);
2760                array->paa_reqs_count = NULL;
2761        }
2762
2763        ptlrpc_service_for_each_part(svcpt, i, svc)
2764                kfree(svcpt);
2765
2766        if (svc->srv_cpts)
2767                cfs_expr_list_values_free(svc->srv_cpts, svc->srv_ncpts);
2768
2769        kfree(svc);
2770}
2771
2772int ptlrpc_unregister_service(struct ptlrpc_service *service)
2773{
2774        CDEBUG(D_NET, "%s: tearing down\n", service->srv_name);
2775
2776        service->srv_is_stopping = 1;
2777
2778        mutex_lock(&ptlrpc_all_services_mutex);
2779        list_del_init(&service->srv_list);
2780        mutex_unlock(&ptlrpc_all_services_mutex);
2781
2782        ptlrpc_service_del_atimer(service);
2783        ptlrpc_stop_all_threads(service);
2784
2785        ptlrpc_service_unlink_rqbd(service);
2786        ptlrpc_service_purge_all(service);
2787        ptlrpc_service_nrs_cleanup(service);
2788
2789        ptlrpc_lprocfs_unregister_service(service);
2790        ptlrpc_sysfs_unregister_service(service);
2791
2792        ptlrpc_service_free(service);
2793
2794        return 0;
2795}
2796EXPORT_SYMBOL(ptlrpc_unregister_service);
2797