linux/drivers/staging/lustre/lnet/selftest/rpc.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2012, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 *
  32 * lnet/selftest/rpc.c
  33 *
  34 * Author: Isaac Huang <isaac@clusterfs.com>
  35 *
  36 * 2012-05-13: Liang Zhen <liang@whamcloud.com>
  37 * - percpt data for service to improve smp performance
  38 * - code cleanup
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_LNET
  42
  43#include "selftest.h"
  44
  45enum srpc_state {
  46        SRPC_STATE_NONE,
  47        SRPC_STATE_NI_INIT,
  48        SRPC_STATE_EQ_INIT,
  49        SRPC_STATE_RUNNING,
  50        SRPC_STATE_STOPPING,
  51};
  52
  53static struct smoketest_rpc {
  54        spinlock_t       rpc_glock;     /* global lock */
  55        struct srpc_service     *rpc_services[SRPC_SERVICE_MAX_ID + 1];
  56        lnet_handle_eq_t rpc_lnet_eq;   /* _the_ LNet event queue */
  57        enum srpc_state  rpc_state;
  58        srpc_counters_t  rpc_counters;
  59        __u64            rpc_matchbits; /* matchbits counter */
  60} srpc_data;
  61
  62static inline int
  63srpc_serv_portal(int svc_id)
  64{
  65        return svc_id < SRPC_FRAMEWORK_SERVICE_MAX_ID ?
  66               SRPC_FRAMEWORK_REQUEST_PORTAL : SRPC_REQUEST_PORTAL;
  67}
  68
  69/* forward ref's */
  70int srpc_handle_rpc(struct swi_workitem *wi);
  71
  72void srpc_get_counters(srpc_counters_t *cnt)
  73{
  74        spin_lock(&srpc_data.rpc_glock);
  75        *cnt = srpc_data.rpc_counters;
  76        spin_unlock(&srpc_data.rpc_glock);
  77}
  78
  79void srpc_set_counters(const srpc_counters_t *cnt)
  80{
  81        spin_lock(&srpc_data.rpc_glock);
  82        srpc_data.rpc_counters = *cnt;
  83        spin_unlock(&srpc_data.rpc_glock);
  84}
  85
  86static int
  87srpc_add_bulk_page(struct srpc_bulk *bk, struct page *pg, int i, int nob)
  88{
  89        nob = min_t(int, nob, PAGE_SIZE);
  90
  91        LASSERT(nob > 0);
  92        LASSERT(i >= 0 && i < bk->bk_niov);
  93
  94        bk->bk_iovs[i].bv_offset = 0;
  95        bk->bk_iovs[i].bv_page = pg;
  96        bk->bk_iovs[i].bv_len = nob;
  97        return nob;
  98}
  99
 100void
 101srpc_free_bulk(struct srpc_bulk *bk)
 102{
 103        int i;
 104        struct page *pg;
 105
 106        LASSERT(bk);
 107
 108        for (i = 0; i < bk->bk_niov; i++) {
 109                pg = bk->bk_iovs[i].bv_page;
 110                if (!pg)
 111                        break;
 112
 113                __free_page(pg);
 114        }
 115
 116        LIBCFS_FREE(bk, offsetof(struct srpc_bulk, bk_iovs[bk->bk_niov]));
 117}
 118
 119struct srpc_bulk *
 120srpc_alloc_bulk(int cpt, unsigned bulk_npg, unsigned bulk_len, int sink)
 121{
 122        struct srpc_bulk *bk;
 123        int i;
 124
 125        LASSERT(bulk_npg > 0 && bulk_npg <= LNET_MAX_IOV);
 126
 127        LIBCFS_CPT_ALLOC(bk, lnet_cpt_table(), cpt,
 128                         offsetof(struct srpc_bulk, bk_iovs[bulk_npg]));
 129        if (!bk) {
 130                CERROR("Can't allocate descriptor for %d pages\n", bulk_npg);
 131                return NULL;
 132        }
 133
 134        memset(bk, 0, offsetof(struct srpc_bulk, bk_iovs[bulk_npg]));
 135        bk->bk_sink = sink;
 136        bk->bk_len = bulk_len;
 137        bk->bk_niov = bulk_npg;
 138
 139        for (i = 0; i < bulk_npg; i++) {
 140                struct page *pg;
 141                int nob;
 142
 143                pg = alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
 144                                      GFP_KERNEL, 0);
 145                if (!pg) {
 146                        CERROR("Can't allocate page %d of %d\n", i, bulk_npg);
 147                        srpc_free_bulk(bk);
 148                        return NULL;
 149                }
 150
 151                nob = srpc_add_bulk_page(bk, pg, i, bulk_len);
 152                bulk_len -= nob;
 153        }
 154
 155        return bk;
 156}
 157
 158static inline __u64
 159srpc_next_id(void)
 160{
 161        __u64 id;
 162
 163        spin_lock(&srpc_data.rpc_glock);
 164        id = srpc_data.rpc_matchbits++;
 165        spin_unlock(&srpc_data.rpc_glock);
 166        return id;
 167}
 168
 169static void
 170srpc_init_server_rpc(struct srpc_server_rpc *rpc,
 171                     struct srpc_service_cd *scd,
 172                     struct srpc_buffer *buffer)
 173{
 174        memset(rpc, 0, sizeof(*rpc));
 175        swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc,
 176                          srpc_serv_is_framework(scd->scd_svc) ?
 177                          lst_sched_serial : lst_sched_test[scd->scd_cpt]);
 178
 179        rpc->srpc_ev.ev_fired = 1; /* no event expected now */
 180
 181        rpc->srpc_scd = scd;
 182        rpc->srpc_reqstbuf = buffer;
 183        rpc->srpc_peer = buffer->buf_peer;
 184        rpc->srpc_self = buffer->buf_self;
 185        LNetInvalidateHandle(&rpc->srpc_replymdh);
 186}
 187
 188static void
 189srpc_service_fini(struct srpc_service *svc)
 190{
 191        struct srpc_service_cd *scd;
 192        struct srpc_server_rpc *rpc;
 193        struct srpc_buffer *buf;
 194        struct list_head *q;
 195        int i;
 196
 197        if (!svc->sv_cpt_data)
 198                return;
 199
 200        cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
 201                while (1) {
 202                        if (!list_empty(&scd->scd_buf_posted))
 203                                q = &scd->scd_buf_posted;
 204                        else if (!list_empty(&scd->scd_buf_blocked))
 205                                q = &scd->scd_buf_blocked;
 206                        else
 207                                break;
 208
 209                        while (!list_empty(q)) {
 210                                buf = list_entry(q->next, struct srpc_buffer,
 211                                                 buf_list);
 212                                list_del(&buf->buf_list);
 213                                LIBCFS_FREE(buf, sizeof(*buf));
 214                        }
 215                }
 216
 217                LASSERT(list_empty(&scd->scd_rpc_active));
 218
 219                while (!list_empty(&scd->scd_rpc_free)) {
 220                        rpc = list_entry(scd->scd_rpc_free.next,
 221                                         struct srpc_server_rpc,
 222                                         srpc_list);
 223                        list_del(&rpc->srpc_list);
 224                        LIBCFS_FREE(rpc, sizeof(*rpc));
 225                }
 226        }
 227
 228        cfs_percpt_free(svc->sv_cpt_data);
 229        svc->sv_cpt_data = NULL;
 230}
 231
 232static int
 233srpc_service_nrpcs(struct srpc_service *svc)
 234{
 235        int nrpcs = svc->sv_wi_total / svc->sv_ncpts;
 236
 237        return srpc_serv_is_framework(svc) ?
 238               max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
 239}
 240
 241int srpc_add_buffer(struct swi_workitem *wi);
 242
 243static int
 244srpc_service_init(struct srpc_service *svc)
 245{
 246        struct srpc_service_cd *scd;
 247        struct srpc_server_rpc *rpc;
 248        int nrpcs;
 249        int i;
 250        int j;
 251
 252        svc->sv_shuttingdown = 0;
 253
 254        svc->sv_cpt_data = cfs_percpt_alloc(lnet_cpt_table(),
 255                                            sizeof(*svc->sv_cpt_data));
 256        if (!svc->sv_cpt_data)
 257                return -ENOMEM;
 258
 259        svc->sv_ncpts = srpc_serv_is_framework(svc) ?
 260                        1 : cfs_cpt_number(lnet_cpt_table());
 261        nrpcs = srpc_service_nrpcs(svc);
 262
 263        cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
 264                scd->scd_cpt = i;
 265                scd->scd_svc = svc;
 266                spin_lock_init(&scd->scd_lock);
 267                INIT_LIST_HEAD(&scd->scd_rpc_free);
 268                INIT_LIST_HEAD(&scd->scd_rpc_active);
 269                INIT_LIST_HEAD(&scd->scd_buf_posted);
 270                INIT_LIST_HEAD(&scd->scd_buf_blocked);
 271
 272                scd->scd_ev.ev_data = scd;
 273                scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
 274
 275                /*
 276                 * NB: don't use lst_sched_serial for adding buffer,
 277                 * see details in srpc_service_add_buffers()
 278                 */
 279                swi_init_workitem(&scd->scd_buf_wi, scd,
 280                                  srpc_add_buffer, lst_sched_test[i]);
 281
 282                if (i && srpc_serv_is_framework(svc)) {
 283                        /*
 284                         * NB: framework service only needs srpc_service_cd for
 285                         * one partition, but we allocate for all to make
 286                         * it easier to implement, it will waste a little
 287                         * memory but nobody should care about this
 288                         */
 289                        continue;
 290                }
 291
 292                for (j = 0; j < nrpcs; j++) {
 293                        LIBCFS_CPT_ALLOC(rpc, lnet_cpt_table(),
 294                                         i, sizeof(*rpc));
 295                        if (!rpc) {
 296                                srpc_service_fini(svc);
 297                                return -ENOMEM;
 298                        }
 299                        list_add(&rpc->srpc_list, &scd->scd_rpc_free);
 300                }
 301        }
 302
 303        return 0;
 304}
 305
 306int
 307srpc_add_service(struct srpc_service *sv)
 308{
 309        int id = sv->sv_id;
 310
 311        LASSERT(0 <= id && id <= SRPC_SERVICE_MAX_ID);
 312
 313        if (srpc_service_init(sv))
 314                return -ENOMEM;
 315
 316        spin_lock(&srpc_data.rpc_glock);
 317
 318        LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
 319
 320        if (srpc_data.rpc_services[id]) {
 321                spin_unlock(&srpc_data.rpc_glock);
 322                goto failed;
 323        }
 324
 325        srpc_data.rpc_services[id] = sv;
 326        spin_unlock(&srpc_data.rpc_glock);
 327
 328        CDEBUG(D_NET, "Adding service: id %d, name %s\n", id, sv->sv_name);
 329        return 0;
 330
 331 failed:
 332        srpc_service_fini(sv);
 333        return -EBUSY;
 334}
 335
 336int
 337srpc_remove_service(struct srpc_service *sv)
 338{
 339        int id = sv->sv_id;
 340
 341        spin_lock(&srpc_data.rpc_glock);
 342
 343        if (srpc_data.rpc_services[id] != sv) {
 344                spin_unlock(&srpc_data.rpc_glock);
 345                return -ENOENT;
 346        }
 347
 348        srpc_data.rpc_services[id] = NULL;
 349        spin_unlock(&srpc_data.rpc_glock);
 350        return 0;
 351}
 352
 353static int
 354srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf,
 355                       int len, int options, lnet_process_id_t peer,
 356                       lnet_handle_md_t *mdh, struct srpc_event *ev)
 357{
 358        int rc;
 359        lnet_md_t md;
 360        lnet_handle_me_t meh;
 361
 362        rc = LNetMEAttach(portal, peer, matchbits, 0, LNET_UNLINK,
 363                          local ? LNET_INS_LOCAL : LNET_INS_AFTER, &meh);
 364        if (rc) {
 365                CERROR("LNetMEAttach failed: %d\n", rc);
 366                LASSERT(rc == -ENOMEM);
 367                return -ENOMEM;
 368        }
 369
 370        md.threshold = 1;
 371        md.user_ptr = ev;
 372        md.start = buf;
 373        md.length = len;
 374        md.options = options;
 375        md.eq_handle = srpc_data.rpc_lnet_eq;
 376
 377        rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
 378        if (rc) {
 379                CERROR("LNetMDAttach failed: %d\n", rc);
 380                LASSERT(rc == -ENOMEM);
 381
 382                rc = LNetMEUnlink(meh);
 383                LASSERT(!rc);
 384                return -ENOMEM;
 385        }
 386
 387        CDEBUG(D_NET, "Posted passive RDMA: peer %s, portal %d, matchbits %#llx\n",
 388               libcfs_id2str(peer), portal, matchbits);
 389        return 0;
 390}
 391
 392static int
 393srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
 394                      int options, lnet_process_id_t peer, lnet_nid_t self,
 395                      lnet_handle_md_t *mdh, struct srpc_event *ev)
 396{
 397        int rc;
 398        lnet_md_t md;
 399
 400        md.user_ptr = ev;
 401        md.start = buf;
 402        md.length = len;
 403        md.eq_handle = srpc_data.rpc_lnet_eq;
 404        md.threshold = options & LNET_MD_OP_GET ? 2 : 1;
 405        md.options = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
 406
 407        rc = LNetMDBind(md, LNET_UNLINK, mdh);
 408        if (rc) {
 409                CERROR("LNetMDBind failed: %d\n", rc);
 410                LASSERT(rc == -ENOMEM);
 411                return -ENOMEM;
 412        }
 413
 414        /*
 415         * this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
 416         * they're only meaningful for MDs attached to an ME (i.e. passive
 417         * buffers...
 418         */
 419        if (options & LNET_MD_OP_PUT) {
 420                rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
 421                             portal, matchbits, 0, 0);
 422        } else {
 423                LASSERT(options & LNET_MD_OP_GET);
 424
 425                rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
 426        }
 427
 428        if (rc) {
 429                CERROR("LNet%s(%s, %d, %lld) failed: %d\n",
 430                       options & LNET_MD_OP_PUT ? "Put" : "Get",
 431                       libcfs_id2str(peer), portal, matchbits, rc);
 432
 433                /*
 434                 * The forthcoming unlink event will complete this operation
 435                 * with failure, so fall through and return success here.
 436                 */
 437                rc = LNetMDUnlink(*mdh);
 438                LASSERT(!rc);
 439        } else {
 440                CDEBUG(D_NET, "Posted active RDMA: peer %s, portal %u, matchbits %#llx\n",
 441                       libcfs_id2str(peer), portal, matchbits);
 442        }
 443        return 0;
 444}
 445
 446static int
 447srpc_post_passive_rqtbuf(int service, int local, void *buf, int len,
 448                         lnet_handle_md_t *mdh, struct srpc_event *ev)
 449{
 450        lnet_process_id_t any = { 0 };
 451
 452        any.nid = LNET_NID_ANY;
 453        any.pid = LNET_PID_ANY;
 454
 455        return srpc_post_passive_rdma(srpc_serv_portal(service),
 456                                      local, service, buf, len,
 457                                      LNET_MD_OP_PUT, any, mdh, ev);
 458}
 459
 460static int
 461srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
 462__must_hold(&scd->scd_lock)
 463{
 464        struct srpc_service *sv = scd->scd_svc;
 465        struct srpc_msg *msg = &buf->buf_msg;
 466        int rc;
 467
 468        LNetInvalidateHandle(&buf->buf_mdh);
 469        list_add(&buf->buf_list, &scd->scd_buf_posted);
 470        scd->scd_buf_nposted++;
 471        spin_unlock(&scd->scd_lock);
 472
 473        rc = srpc_post_passive_rqtbuf(sv->sv_id,
 474                                      !srpc_serv_is_framework(sv),
 475                                      msg, sizeof(*msg), &buf->buf_mdh,
 476                                      &scd->scd_ev);
 477
 478        /*
 479         * At this point, a RPC (new or delayed) may have arrived in
 480         * msg and its event handler has been called. So we must add
 481         * buf to scd_buf_posted _before_ dropping scd_lock
 482         */
 483        spin_lock(&scd->scd_lock);
 484
 485        if (!rc) {
 486                if (!sv->sv_shuttingdown)
 487                        return 0;
 488
 489                spin_unlock(&scd->scd_lock);
 490                /*
 491                 * srpc_shutdown_service might have tried to unlink me
 492                 * when my buf_mdh was still invalid
 493                 */
 494                LNetMDUnlink(buf->buf_mdh);
 495                spin_lock(&scd->scd_lock);
 496                return 0;
 497        }
 498
 499        scd->scd_buf_nposted--;
 500        if (sv->sv_shuttingdown)
 501                return rc; /* don't allow to change scd_buf_posted */
 502
 503        list_del(&buf->buf_list);
 504        spin_unlock(&scd->scd_lock);
 505
 506        LIBCFS_FREE(buf, sizeof(*buf));
 507
 508        spin_lock(&scd->scd_lock);
 509        return rc;
 510}
 511
 512int
 513srpc_add_buffer(struct swi_workitem *wi)
 514{
 515        struct srpc_service_cd *scd = wi->swi_workitem.wi_data;
 516        struct srpc_buffer *buf;
 517        int rc = 0;
 518
 519        /*
 520         * it's called by workitem scheduler threads, these threads
 521         * should have been set CPT affinity, so buffers will be posted
 522         * on CPT local list of Portal
 523         */
 524        spin_lock(&scd->scd_lock);
 525
 526        while (scd->scd_buf_adjust > 0 &&
 527               !scd->scd_svc->sv_shuttingdown) {
 528                scd->scd_buf_adjust--; /* consume it */
 529                scd->scd_buf_posting++;
 530
 531                spin_unlock(&scd->scd_lock);
 532
 533                LIBCFS_ALLOC(buf, sizeof(*buf));
 534                if (!buf) {
 535                        CERROR("Failed to add new buf to service: %s\n",
 536                               scd->scd_svc->sv_name);
 537                        spin_lock(&scd->scd_lock);
 538                        rc = -ENOMEM;
 539                        break;
 540                }
 541
 542                spin_lock(&scd->scd_lock);
 543                if (scd->scd_svc->sv_shuttingdown) {
 544                        spin_unlock(&scd->scd_lock);
 545                        LIBCFS_FREE(buf, sizeof(*buf));
 546
 547                        spin_lock(&scd->scd_lock);
 548                        rc = -ESHUTDOWN;
 549                        break;
 550                }
 551
 552                rc = srpc_service_post_buffer(scd, buf);
 553                if (rc)
 554                        break; /* buf has been freed inside */
 555
 556                LASSERT(scd->scd_buf_posting > 0);
 557                scd->scd_buf_posting--;
 558                scd->scd_buf_total++;
 559                scd->scd_buf_low = max(2, scd->scd_buf_total / 4);
 560        }
 561
 562        if (rc) {
 563                scd->scd_buf_err_stamp = ktime_get_real_seconds();
 564                scd->scd_buf_err = rc;
 565
 566                LASSERT(scd->scd_buf_posting > 0);
 567                scd->scd_buf_posting--;
 568        }
 569
 570        spin_unlock(&scd->scd_lock);
 571        return 0;
 572}
 573
 574int
 575srpc_service_add_buffers(struct srpc_service *sv, int nbuffer)
 576{
 577        struct srpc_service_cd *scd;
 578        int rc = 0;
 579        int i;
 580
 581        LASSERTF(nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer);
 582
 583        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 584                spin_lock(&scd->scd_lock);
 585
 586                scd->scd_buf_err = 0;
 587                scd->scd_buf_err_stamp = 0;
 588                scd->scd_buf_posting = 0;
 589                scd->scd_buf_adjust = nbuffer;
 590                /* start to post buffers */
 591                swi_schedule_workitem(&scd->scd_buf_wi);
 592                spin_unlock(&scd->scd_lock);
 593
 594                /* framework service only post buffer for one partition  */
 595                if (srpc_serv_is_framework(sv))
 596                        break;
 597        }
 598
 599        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 600                spin_lock(&scd->scd_lock);
 601                /*
 602                 * NB: srpc_service_add_buffers() can be called inside
 603                 * thread context of lst_sched_serial, and we don't normally
 604                 * allow to sleep inside thread context of WI scheduler
 605                 * because it will block current scheduler thread from doing
 606                 * anything else, even worse, it could deadlock if it's
 607                 * waiting on result from another WI of the same scheduler.
 608                 * However, it's safe at here because scd_buf_wi is scheduled
 609                 * by thread in a different WI scheduler (lst_sched_test),
 610                 * so we don't have any risk of deadlock, though this could
 611                 * block all WIs pending on lst_sched_serial for a moment
 612                 * which is not good but not fatal.
 613                 */
 614                lst_wait_until(scd->scd_buf_err ||
 615                               (!scd->scd_buf_adjust &&
 616                                !scd->scd_buf_posting),
 617                               scd->scd_lock, "waiting for adding buffer\n");
 618
 619                if (scd->scd_buf_err && !rc)
 620                        rc = scd->scd_buf_err;
 621
 622                spin_unlock(&scd->scd_lock);
 623        }
 624
 625        return rc;
 626}
 627
 628void
 629srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer)
 630{
 631        struct srpc_service_cd *scd;
 632        int num;
 633        int i;
 634
 635        LASSERT(!sv->sv_shuttingdown);
 636
 637        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 638                spin_lock(&scd->scd_lock);
 639
 640                num = scd->scd_buf_total + scd->scd_buf_posting;
 641                scd->scd_buf_adjust -= min(nbuffer, num);
 642
 643                spin_unlock(&scd->scd_lock);
 644        }
 645}
 646
 647/* returns 1 if sv has finished, otherwise 0 */
 648int
 649srpc_finish_service(struct srpc_service *sv)
 650{
 651        struct srpc_service_cd *scd;
 652        struct srpc_server_rpc *rpc;
 653        int i;
 654
 655        LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
 656
 657        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 658                spin_lock(&scd->scd_lock);
 659                if (!swi_deschedule_workitem(&scd->scd_buf_wi)) {
 660                        spin_unlock(&scd->scd_lock);
 661                        return 0;
 662                }
 663
 664                if (scd->scd_buf_nposted > 0) {
 665                        CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n",
 666                               scd->scd_buf_nposted);
 667                        spin_unlock(&scd->scd_lock);
 668                        return 0;
 669                }
 670
 671                if (list_empty(&scd->scd_rpc_active)) {
 672                        spin_unlock(&scd->scd_lock);
 673                        continue;
 674                }
 675
 676                rpc = list_entry(scd->scd_rpc_active.next,
 677                                 struct srpc_server_rpc, srpc_list);
 678                CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n",
 679                        rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
 680                        swi_state2str(rpc->srpc_wi.swi_state),
 681                        rpc->srpc_wi.swi_workitem.wi_scheduled,
 682                        rpc->srpc_wi.swi_workitem.wi_running,
 683                        rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
 684                        rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
 685                spin_unlock(&scd->scd_lock);
 686                return 0;
 687        }
 688
 689        /* no lock needed from now on */
 690        srpc_service_fini(sv);
 691        return 1;
 692}
 693
 694/* called with sv->sv_lock held */
 695static void
 696srpc_service_recycle_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
 697__must_hold(&scd->scd_lock)
 698{
 699        if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) {
 700                if (srpc_service_post_buffer(scd, buf)) {
 701                        CWARN("Failed to post %s buffer\n",
 702                              scd->scd_svc->sv_name);
 703                }
 704                return;
 705        }
 706
 707        /* service is shutting down, or we want to recycle some buffers */
 708        scd->scd_buf_total--;
 709
 710        if (scd->scd_buf_adjust < 0) {
 711                scd->scd_buf_adjust++;
 712                if (scd->scd_buf_adjust < 0 &&
 713                    !scd->scd_buf_total && !scd->scd_buf_posting) {
 714                        CDEBUG(D_INFO,
 715                               "Try to recycle %d buffers but nothing left\n",
 716                               scd->scd_buf_adjust);
 717                        scd->scd_buf_adjust = 0;
 718                }
 719        }
 720
 721        spin_unlock(&scd->scd_lock);
 722        LIBCFS_FREE(buf, sizeof(*buf));
 723        spin_lock(&scd->scd_lock);
 724}
 725
 726void
 727srpc_abort_service(struct srpc_service *sv)
 728{
 729        struct srpc_service_cd *scd;
 730        struct srpc_server_rpc *rpc;
 731        int i;
 732
 733        CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
 734               sv->sv_id, sv->sv_name);
 735
 736        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 737                spin_lock(&scd->scd_lock);
 738
 739                /*
 740                 * schedule in-flight RPCs to notice the abort, NB:
 741                 * racing with incoming RPCs; complete fix should make test
 742                 * RPCs carry session ID in its headers
 743                 */
 744                list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
 745                        rpc->srpc_aborted = 1;
 746                        swi_schedule_workitem(&rpc->srpc_wi);
 747                }
 748
 749                spin_unlock(&scd->scd_lock);
 750        }
 751}
 752
 753void
 754srpc_shutdown_service(struct srpc_service *sv)
 755{
 756        struct srpc_service_cd *scd;
 757        struct srpc_server_rpc *rpc;
 758        struct srpc_buffer *buf;
 759        int i;
 760
 761        CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
 762               sv->sv_id, sv->sv_name);
 763
 764        cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
 765                spin_lock(&scd->scd_lock);
 766
 767        sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
 768
 769        cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
 770                spin_unlock(&scd->scd_lock);
 771
 772        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 773                spin_lock(&scd->scd_lock);
 774
 775                /* schedule in-flight RPCs to notice the shutdown */
 776                list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
 777                        swi_schedule_workitem(&rpc->srpc_wi);
 778
 779                spin_unlock(&scd->scd_lock);
 780
 781                /*
 782                 * OK to traverse scd_buf_posted without lock, since no one
 783                 * touches scd_buf_posted now
 784                 */
 785                list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
 786                        LNetMDUnlink(buf->buf_mdh);
 787        }
 788}
 789
 790static int
 791srpc_send_request(struct srpc_client_rpc *rpc)
 792{
 793        struct srpc_event *ev = &rpc->crpc_reqstev;
 794        int rc;
 795
 796        ev->ev_fired = 0;
 797        ev->ev_data = rpc;
 798        ev->ev_type = SRPC_REQUEST_SENT;
 799
 800         rc = srpc_post_active_rdma(srpc_serv_portal(rpc->crpc_service),
 801                                    rpc->crpc_service, &rpc->crpc_reqstmsg,
 802                                    sizeof(struct srpc_msg), LNET_MD_OP_PUT,
 803                                    rpc->crpc_dest, LNET_NID_ANY,
 804                                    &rpc->crpc_reqstmdh, ev);
 805        if (rc) {
 806                LASSERT(rc == -ENOMEM);
 807                ev->ev_fired = 1;  /* no more event expected */
 808        }
 809        return rc;
 810}
 811
 812static int
 813srpc_prepare_reply(struct srpc_client_rpc *rpc)
 814{
 815        struct srpc_event *ev = &rpc->crpc_replyev;
 816        __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
 817        int rc;
 818
 819        ev->ev_fired = 0;
 820        ev->ev_data = rpc;
 821        ev->ev_type = SRPC_REPLY_RCVD;
 822
 823        *id = srpc_next_id();
 824
 825        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
 826                                    &rpc->crpc_replymsg,
 827                                    sizeof(struct srpc_msg),
 828                                    LNET_MD_OP_PUT, rpc->crpc_dest,
 829                                    &rpc->crpc_replymdh, ev);
 830        if (rc) {
 831                LASSERT(rc == -ENOMEM);
 832                ev->ev_fired = 1;  /* no more event expected */
 833        }
 834        return rc;
 835}
 836
 837static int
 838srpc_prepare_bulk(struct srpc_client_rpc *rpc)
 839{
 840        struct srpc_bulk *bk = &rpc->crpc_bulk;
 841        struct srpc_event *ev = &rpc->crpc_bulkev;
 842        __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
 843        int rc;
 844        int opt;
 845
 846        LASSERT(bk->bk_niov <= LNET_MAX_IOV);
 847
 848        if (!bk->bk_niov)
 849                return 0; /* nothing to do */
 850
 851        opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
 852        opt |= LNET_MD_KIOV;
 853
 854        ev->ev_fired = 0;
 855        ev->ev_data = rpc;
 856        ev->ev_type = SRPC_BULK_REQ_RCVD;
 857
 858        *id = srpc_next_id();
 859
 860        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
 861                                    &bk->bk_iovs[0], bk->bk_niov, opt,
 862                                    rpc->crpc_dest, &bk->bk_mdh, ev);
 863        if (rc) {
 864                LASSERT(rc == -ENOMEM);
 865                ev->ev_fired = 1;  /* no more event expected */
 866        }
 867        return rc;
 868}
 869
 870static int
 871srpc_do_bulk(struct srpc_server_rpc *rpc)
 872{
 873        struct srpc_event *ev = &rpc->srpc_ev;
 874        struct srpc_bulk *bk = rpc->srpc_bulk;
 875        __u64 id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
 876        int rc;
 877        int opt;
 878
 879        LASSERT(bk);
 880
 881        opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
 882        opt |= LNET_MD_KIOV;
 883
 884        ev->ev_fired = 0;
 885        ev->ev_data = rpc;
 886        ev->ev_type = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
 887
 888        rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
 889                                   &bk->bk_iovs[0], bk->bk_niov, opt,
 890                                   rpc->srpc_peer, rpc->srpc_self,
 891                                   &bk->bk_mdh, ev);
 892        if (rc)
 893                ev->ev_fired = 1;  /* no more event expected */
 894        return rc;
 895}
 896
 897/* only called from srpc_handle_rpc */
 898static void
 899srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
 900{
 901        struct srpc_service_cd *scd = rpc->srpc_scd;
 902        struct srpc_service *sv = scd->scd_svc;
 903        struct srpc_buffer *buffer;
 904
 905        LASSERT(status || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
 906
 907        rpc->srpc_status = status;
 908
 909        CDEBUG_LIMIT(!status ? D_NET : D_NETERROR,
 910                     "Server RPC %p done: service %s, peer %s, status %s:%d\n",
 911                     rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
 912                     swi_state2str(rpc->srpc_wi.swi_state), status);
 913
 914        if (status) {
 915                spin_lock(&srpc_data.rpc_glock);
 916                srpc_data.rpc_counters.rpcs_dropped++;
 917                spin_unlock(&srpc_data.rpc_glock);
 918        }
 919
 920        if (rpc->srpc_done)
 921                (*rpc->srpc_done) (rpc);
 922        LASSERT(!rpc->srpc_bulk);
 923
 924        spin_lock(&scd->scd_lock);
 925
 926        if (rpc->srpc_reqstbuf) {
 927                /*
 928                 * NB might drop sv_lock in srpc_service_recycle_buffer, but
 929                 * sv won't go away for scd_rpc_active must not be empty
 930                 */
 931                srpc_service_recycle_buffer(scd, rpc->srpc_reqstbuf);
 932                rpc->srpc_reqstbuf = NULL;
 933        }
 934
 935        list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
 936
 937        /*
 938         * No one can schedule me now since:
 939         * - I'm not on scd_rpc_active.
 940         * - all LNet events have been fired.
 941         * Cancel pending schedules and prevent future schedule attempts:
 942         */
 943        LASSERT(rpc->srpc_ev.ev_fired);
 944        swi_exit_workitem(&rpc->srpc_wi);
 945
 946        if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
 947                buffer = list_entry(scd->scd_buf_blocked.next,
 948                                    struct srpc_buffer, buf_list);
 949                list_del(&buffer->buf_list);
 950
 951                srpc_init_server_rpc(rpc, scd, buffer);
 952                list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
 953                swi_schedule_workitem(&rpc->srpc_wi);
 954        } else {
 955                list_add(&rpc->srpc_list, &scd->scd_rpc_free);
 956        }
 957
 958        spin_unlock(&scd->scd_lock);
 959}
 960
 961/* handles an incoming RPC */
 962int
 963srpc_handle_rpc(struct swi_workitem *wi)
 964{
 965        struct srpc_server_rpc *rpc = wi->swi_workitem.wi_data;
 966        struct srpc_service_cd *scd = rpc->srpc_scd;
 967        struct srpc_service *sv = scd->scd_svc;
 968        struct srpc_event *ev = &rpc->srpc_ev;
 969        int rc = 0;
 970
 971        LASSERT(wi == &rpc->srpc_wi);
 972
 973        spin_lock(&scd->scd_lock);
 974
 975        if (sv->sv_shuttingdown || rpc->srpc_aborted) {
 976                spin_unlock(&scd->scd_lock);
 977
 978                if (rpc->srpc_bulk)
 979                        LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
 980                LNetMDUnlink(rpc->srpc_replymdh);
 981
 982                if (ev->ev_fired) { /* no more event, OK to finish */
 983                        srpc_server_rpc_done(rpc, -ESHUTDOWN);
 984                        return 1;
 985                }
 986                return 0;
 987        }
 988
 989        spin_unlock(&scd->scd_lock);
 990
 991        switch (wi->swi_state) {
 992        default:
 993                LBUG();
 994        case SWI_STATE_NEWBORN: {
 995                struct srpc_msg *msg;
 996                struct srpc_generic_reply *reply;
 997
 998                msg = &rpc->srpc_reqstbuf->buf_msg;
 999                reply = &rpc->srpc_replymsg.msg_body.reply;
1000
1001                if (!msg->msg_magic) {
1002                        /* moaned already in srpc_lnet_ev_handler */
1003                        srpc_server_rpc_done(rpc, EBADMSG);
1004                        return 1;
1005                }
1006
1007                srpc_unpack_msg_hdr(msg);
1008                if (msg->msg_version != SRPC_MSG_VERSION) {
1009                        CWARN("Version mismatch: %u, %u expected, from %s\n",
1010                              msg->msg_version, SRPC_MSG_VERSION,
1011                              libcfs_id2str(rpc->srpc_peer));
1012                        reply->status = EPROTO;
1013                        /* drop through and send reply */
1014                } else {
1015                        reply->status = 0;
1016                        rc = (*sv->sv_handler)(rpc);
1017                        LASSERT(!reply->status || !rpc->srpc_bulk);
1018                        if (rc) {
1019                                srpc_server_rpc_done(rpc, rc);
1020                                return 1;
1021                        }
1022                }
1023
1024                wi->swi_state = SWI_STATE_BULK_STARTED;
1025
1026                if (rpc->srpc_bulk) {
1027                        rc = srpc_do_bulk(rpc);
1028                        if (!rc)
1029                                return 0; /* wait for bulk */
1030
1031                        LASSERT(ev->ev_fired);
1032                        ev->ev_status = rc;
1033                }
1034        }
1035        case SWI_STATE_BULK_STARTED:
1036                LASSERT(!rpc->srpc_bulk || ev->ev_fired);
1037
1038                if (rpc->srpc_bulk) {
1039                        rc = ev->ev_status;
1040
1041                        if (sv->sv_bulk_ready)
1042                                rc = (*sv->sv_bulk_ready) (rpc, rc);
1043
1044                        if (rc) {
1045                                srpc_server_rpc_done(rpc, rc);
1046                                return 1;
1047                        }
1048                }
1049
1050                wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
1051                rc = srpc_send_reply(rpc);
1052                if (!rc)
1053                        return 0; /* wait for reply */
1054                srpc_server_rpc_done(rpc, rc);
1055                return 1;
1056
1057        case SWI_STATE_REPLY_SUBMITTED:
1058                if (!ev->ev_fired) {
1059                        CERROR("RPC %p: bulk %p, service %d\n",
1060                               rpc, rpc->srpc_bulk, sv->sv_id);
1061                        CERROR("Event: status %d, type %d, lnet %d\n",
1062                               ev->ev_status, ev->ev_type, ev->ev_lnet);
1063                        LASSERT(ev->ev_fired);
1064                }
1065
1066                wi->swi_state = SWI_STATE_DONE;
1067                srpc_server_rpc_done(rpc, ev->ev_status);
1068                return 1;
1069        }
1070
1071        return 0;
1072}
1073
1074static void
1075srpc_client_rpc_expired(void *data)
1076{
1077        struct srpc_client_rpc *rpc = data;
1078
1079        CWARN("Client RPC expired: service %d, peer %s, timeout %d.\n",
1080              rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1081              rpc->crpc_timeout);
1082
1083        spin_lock(&rpc->crpc_lock);
1084
1085        rpc->crpc_timeout = 0;
1086        srpc_abort_rpc(rpc, -ETIMEDOUT);
1087
1088        spin_unlock(&rpc->crpc_lock);
1089
1090        spin_lock(&srpc_data.rpc_glock);
1091        srpc_data.rpc_counters.rpcs_expired++;
1092        spin_unlock(&srpc_data.rpc_glock);
1093}
1094
1095static void
1096srpc_add_client_rpc_timer(struct srpc_client_rpc *rpc)
1097{
1098        struct stt_timer *timer = &rpc->crpc_timer;
1099
1100        if (!rpc->crpc_timeout)
1101                return;
1102
1103        INIT_LIST_HEAD(&timer->stt_list);
1104        timer->stt_data = rpc;
1105        timer->stt_func = srpc_client_rpc_expired;
1106        timer->stt_expires = ktime_get_real_seconds() + rpc->crpc_timeout;
1107        stt_add_timer(timer);
1108}
1109
1110/*
1111 * Called with rpc->crpc_lock held.
1112 *
1113 * Upon exit the RPC expiry timer is not queued and the handler is not
1114 * running on any CPU.
1115 */
1116static void
1117srpc_del_client_rpc_timer(struct srpc_client_rpc *rpc)
1118{
1119        /* timer not planted or already exploded */
1120        if (!rpc->crpc_timeout)
1121                return;
1122
1123        /* timer successfully defused */
1124        if (stt_del_timer(&rpc->crpc_timer))
1125                return;
1126
1127        /* timer detonated, wait for it to explode */
1128        while (rpc->crpc_timeout) {
1129                spin_unlock(&rpc->crpc_lock);
1130
1131                schedule();
1132
1133                spin_lock(&rpc->crpc_lock);
1134        }
1135}
1136
1137static void
1138srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
1139{
1140        struct swi_workitem *wi = &rpc->crpc_wi;
1141
1142        LASSERT(status || wi->swi_state == SWI_STATE_DONE);
1143
1144        spin_lock(&rpc->crpc_lock);
1145
1146        rpc->crpc_closed = 1;
1147        if (!rpc->crpc_status)
1148                rpc->crpc_status = status;
1149
1150        srpc_del_client_rpc_timer(rpc);
1151
1152        CDEBUG_LIMIT(!status ? D_NET : D_NETERROR,
1153                     "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1154                     rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1155                     swi_state2str(wi->swi_state), rpc->crpc_aborted, status);
1156
1157        /*
1158         * No one can schedule me now since:
1159         * - RPC timer has been defused.
1160         * - all LNet events have been fired.
1161         * - crpc_closed has been set, preventing srpc_abort_rpc from
1162         *   scheduling me.
1163         * Cancel pending schedules and prevent future schedule attempts:
1164         */
1165        LASSERT(!srpc_event_pending(rpc));
1166        swi_exit_workitem(wi);
1167
1168        spin_unlock(&rpc->crpc_lock);
1169
1170        (*rpc->crpc_done)(rpc);
1171}
1172
1173/* sends an outgoing RPC */
1174int
1175srpc_send_rpc(struct swi_workitem *wi)
1176{
1177        int rc = 0;
1178        struct srpc_client_rpc *rpc;
1179        struct srpc_msg *reply;
1180        int do_bulk;
1181
1182        LASSERT(wi);
1183
1184        rpc = wi->swi_workitem.wi_data;
1185
1186        LASSERT(rpc);
1187        LASSERT(wi == &rpc->crpc_wi);
1188
1189        reply = &rpc->crpc_replymsg;
1190        do_bulk = rpc->crpc_bulk.bk_niov > 0;
1191
1192        spin_lock(&rpc->crpc_lock);
1193
1194        if (rpc->crpc_aborted) {
1195                spin_unlock(&rpc->crpc_lock);
1196                goto abort;
1197        }
1198
1199        spin_unlock(&rpc->crpc_lock);
1200
1201        switch (wi->swi_state) {
1202        default:
1203                LBUG();
1204        case SWI_STATE_NEWBORN:
1205                LASSERT(!srpc_event_pending(rpc));
1206
1207                rc = srpc_prepare_reply(rpc);
1208                if (rc) {
1209                        srpc_client_rpc_done(rpc, rc);
1210                        return 1;
1211                }
1212
1213                rc = srpc_prepare_bulk(rpc);
1214                if (rc)
1215                        break;
1216
1217                wi->swi_state = SWI_STATE_REQUEST_SUBMITTED;
1218                rc = srpc_send_request(rpc);
1219                break;
1220
1221        case SWI_STATE_REQUEST_SUBMITTED:
1222                /*
1223                 * CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1224                 * order; however, they're processed in a strict order:
1225                 * rqt, rpy, and bulk.
1226                 */
1227                if (!rpc->crpc_reqstev.ev_fired)
1228                        break;
1229
1230                rc = rpc->crpc_reqstev.ev_status;
1231                if (rc)
1232                        break;
1233
1234                wi->swi_state = SWI_STATE_REQUEST_SENT;
1235                /* perhaps more events, fall thru */
1236        case SWI_STATE_REQUEST_SENT: {
1237                enum srpc_msg_type type = srpc_service2reply(rpc->crpc_service);
1238
1239                if (!rpc->crpc_replyev.ev_fired)
1240                        break;
1241
1242                rc = rpc->crpc_replyev.ev_status;
1243                if (rc)
1244                        break;
1245
1246                srpc_unpack_msg_hdr(reply);
1247                if (reply->msg_type != type ||
1248                    (reply->msg_magic != SRPC_MSG_MAGIC &&
1249                     reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1250                        CWARN("Bad message from %s: type %u (%d expected), magic %u (%d expected).\n",
1251                              libcfs_id2str(rpc->crpc_dest),
1252                              reply->msg_type, type,
1253                              reply->msg_magic, SRPC_MSG_MAGIC);
1254                        rc = -EBADMSG;
1255                        break;
1256                }
1257
1258                if (do_bulk && reply->msg_body.reply.status) {
1259                        CWARN("Remote error %d at %s, unlink bulk buffer in case peer didn't initiate bulk transfer\n",
1260                              reply->msg_body.reply.status,
1261                              libcfs_id2str(rpc->crpc_dest));
1262                        LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1263                }
1264
1265                wi->swi_state = SWI_STATE_REPLY_RECEIVED;
1266        }
1267        case SWI_STATE_REPLY_RECEIVED:
1268                if (do_bulk && !rpc->crpc_bulkev.ev_fired)
1269                        break;
1270
1271                rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1272
1273                /*
1274                 * Bulk buffer was unlinked due to remote error. Clear error
1275                 * since reply buffer still contains valid data.
1276                 * NB rpc->crpc_done shouldn't look into bulk data in case of
1277                 * remote error.
1278                 */
1279                if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1280                    !rpc->crpc_status && reply->msg_body.reply.status)
1281                        rc = 0;
1282
1283                wi->swi_state = SWI_STATE_DONE;
1284                srpc_client_rpc_done(rpc, rc);
1285                return 1;
1286        }
1287
1288        if (rc) {
1289                spin_lock(&rpc->crpc_lock);
1290                srpc_abort_rpc(rpc, rc);
1291                spin_unlock(&rpc->crpc_lock);
1292        }
1293
1294abort:
1295        if (rpc->crpc_aborted) {
1296                LNetMDUnlink(rpc->crpc_reqstmdh);
1297                LNetMDUnlink(rpc->crpc_replymdh);
1298                LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1299
1300                if (!srpc_event_pending(rpc)) {
1301                        srpc_client_rpc_done(rpc, -EINTR);
1302                        return 1;
1303                }
1304        }
1305        return 0;
1306}
1307
1308struct srpc_client_rpc *
1309srpc_create_client_rpc(lnet_process_id_t peer, int service,
1310                       int nbulkiov, int bulklen,
1311                       void (*rpc_done)(struct srpc_client_rpc *),
1312                       void (*rpc_fini)(struct srpc_client_rpc *), void *priv)
1313{
1314        struct srpc_client_rpc *rpc;
1315
1316        LIBCFS_ALLOC(rpc, offsetof(struct srpc_client_rpc,
1317                                   crpc_bulk.bk_iovs[nbulkiov]));
1318        if (!rpc)
1319                return NULL;
1320
1321        srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1322                             bulklen, rpc_done, rpc_fini, priv);
1323        return rpc;
1324}
1325
1326/* called with rpc->crpc_lock held */
1327void
1328srpc_abort_rpc(struct srpc_client_rpc *rpc, int why)
1329{
1330        LASSERT(why);
1331
1332        if (rpc->crpc_aborted ||        /* already aborted */
1333            rpc->crpc_closed)           /* callback imminent */
1334                return;
1335
1336        CDEBUG(D_NET, "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1337               rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1338               swi_state2str(rpc->crpc_wi.swi_state), why);
1339
1340        rpc->crpc_aborted = 1;
1341        rpc->crpc_status = why;
1342        swi_schedule_workitem(&rpc->crpc_wi);
1343}
1344
1345/* called with rpc->crpc_lock held */
1346void
1347srpc_post_rpc(struct srpc_client_rpc *rpc)
1348{
1349        LASSERT(!rpc->crpc_aborted);
1350        LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
1351
1352        CDEBUG(D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1353               libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1354               rpc->crpc_timeout);
1355
1356        srpc_add_client_rpc_timer(rpc);
1357        swi_schedule_workitem(&rpc->crpc_wi);
1358}
1359
1360int
1361srpc_send_reply(struct srpc_server_rpc *rpc)
1362{
1363        struct srpc_event *ev = &rpc->srpc_ev;
1364        struct srpc_msg *msg = &rpc->srpc_replymsg;
1365        struct srpc_buffer *buffer = rpc->srpc_reqstbuf;
1366        struct srpc_service_cd *scd = rpc->srpc_scd;
1367        struct srpc_service *sv = scd->scd_svc;
1368        __u64 rpyid;
1369        int rc;
1370
1371        LASSERT(buffer);
1372        rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1373
1374        spin_lock(&scd->scd_lock);
1375
1376        if (!sv->sv_shuttingdown && !srpc_serv_is_framework(sv)) {
1377                /*
1378                 * Repost buffer before replying since test client
1379                 * might send me another RPC once it gets the reply
1380                 */
1381                if (srpc_service_post_buffer(scd, buffer))
1382                        CWARN("Failed to repost %s buffer\n", sv->sv_name);
1383                rpc->srpc_reqstbuf = NULL;
1384        }
1385
1386        spin_unlock(&scd->scd_lock);
1387
1388        ev->ev_fired = 0;
1389        ev->ev_data = rpc;
1390        ev->ev_type = SRPC_REPLY_SENT;
1391
1392        msg->msg_magic = SRPC_MSG_MAGIC;
1393        msg->msg_version = SRPC_MSG_VERSION;
1394        msg->msg_type = srpc_service2reply(sv->sv_id);
1395
1396        rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1397                                   sizeof(*msg), LNET_MD_OP_PUT,
1398                                   rpc->srpc_peer, rpc->srpc_self,
1399                                   &rpc->srpc_replymdh, ev);
1400        if (rc)
1401                ev->ev_fired = 1; /* no more event expected */
1402        return rc;
1403}
1404
1405/* when in kernel always called with LNET_LOCK() held, and in thread context */
1406static void
1407srpc_lnet_ev_handler(lnet_event_t *ev)
1408{
1409        struct srpc_service_cd *scd;
1410        struct srpc_event *rpcev = ev->md.user_ptr;
1411        struct srpc_client_rpc *crpc;
1412        struct srpc_server_rpc *srpc;
1413        struct srpc_buffer *buffer;
1414        struct srpc_service *sv;
1415        struct srpc_msg *msg;
1416        enum srpc_msg_type type;
1417
1418        LASSERT(!in_interrupt());
1419
1420        if (ev->status) {
1421                __u32 errors;
1422
1423                spin_lock(&srpc_data.rpc_glock);
1424                if (ev->status != -ECANCELED) /* cancellation is not error */
1425                        srpc_data.rpc_counters.errors++;
1426                errors = srpc_data.rpc_counters.errors;
1427                spin_unlock(&srpc_data.rpc_glock);
1428
1429                CNETERR("LNet event status %d type %d, RPC errors %u\n",
1430                        ev->status, ev->type, errors);
1431        }
1432
1433        rpcev->ev_lnet = ev->type;
1434
1435        switch (rpcev->ev_type) {
1436        default:
1437                CERROR("Unknown event: status %d, type %d, lnet %d\n",
1438                       rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1439                LBUG();
1440        case SRPC_REQUEST_SENT:
1441                if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
1442                        spin_lock(&srpc_data.rpc_glock);
1443                        srpc_data.rpc_counters.rpcs_sent++;
1444                        spin_unlock(&srpc_data.rpc_glock);
1445                }
1446        case SRPC_REPLY_RCVD:
1447        case SRPC_BULK_REQ_RCVD:
1448                crpc = rpcev->ev_data;
1449
1450                if (rpcev != &crpc->crpc_reqstev &&
1451                    rpcev != &crpc->crpc_replyev &&
1452                    rpcev != &crpc->crpc_bulkev) {
1453                        CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
1454                               rpcev, crpc, &crpc->crpc_reqstev,
1455                               &crpc->crpc_replyev, &crpc->crpc_bulkev);
1456                        CERROR("Bad event: status %d, type %d, lnet %d\n",
1457                               rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1458                        LBUG();
1459                }
1460
1461                spin_lock(&crpc->crpc_lock);
1462
1463                LASSERT(!rpcev->ev_fired);
1464                rpcev->ev_fired = 1;
1465                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1466                                                -EINTR : ev->status;
1467                swi_schedule_workitem(&crpc->crpc_wi);
1468
1469                spin_unlock(&crpc->crpc_lock);
1470                break;
1471
1472        case SRPC_REQUEST_RCVD:
1473                scd = rpcev->ev_data;
1474                sv = scd->scd_svc;
1475
1476                LASSERT(rpcev == &scd->scd_ev);
1477
1478                spin_lock(&scd->scd_lock);
1479
1480                LASSERT(ev->unlinked);
1481                LASSERT(ev->type == LNET_EVENT_PUT ||
1482                        ev->type == LNET_EVENT_UNLINK);
1483                LASSERT(ev->type != LNET_EVENT_UNLINK ||
1484                        sv->sv_shuttingdown);
1485
1486                buffer = container_of(ev->md.start, struct srpc_buffer, buf_msg);
1487                buffer->buf_peer = ev->initiator;
1488                buffer->buf_self = ev->target.nid;
1489
1490                LASSERT(scd->scd_buf_nposted > 0);
1491                scd->scd_buf_nposted--;
1492
1493                if (sv->sv_shuttingdown) {
1494                        /*
1495                         * Leave buffer on scd->scd_buf_nposted since
1496                         * srpc_finish_service needs to traverse it.
1497                         */
1498                        spin_unlock(&scd->scd_lock);
1499                        break;
1500                }
1501
1502                if (scd->scd_buf_err_stamp &&
1503                    scd->scd_buf_err_stamp < ktime_get_real_seconds()) {
1504                        /* re-enable adding buffer */
1505                        scd->scd_buf_err_stamp = 0;
1506                        scd->scd_buf_err = 0;
1507                }
1508
1509                if (!scd->scd_buf_err &&        /* adding buffer is enabled */
1510                    !scd->scd_buf_adjust &&
1511                    scd->scd_buf_nposted < scd->scd_buf_low) {
1512                        scd->scd_buf_adjust = max(scd->scd_buf_total / 2,
1513                                                  SFW_TEST_WI_MIN);
1514                        swi_schedule_workitem(&scd->scd_buf_wi);
1515                }
1516
1517                list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
1518                msg = &buffer->buf_msg;
1519                type = srpc_service2request(sv->sv_id);
1520
1521                if (ev->status || ev->mlength != sizeof(*msg) ||
1522                    (msg->msg_type != type &&
1523                     msg->msg_type != __swab32(type)) ||
1524                    (msg->msg_magic != SRPC_MSG_MAGIC &&
1525                     msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1526                        CERROR("Dropping RPC (%s) from %s: status %d mlength %d type %u magic %u.\n",
1527                               sv->sv_name, libcfs_id2str(ev->initiator),
1528                               ev->status, ev->mlength,
1529                               msg->msg_type, msg->msg_magic);
1530
1531                        /*
1532                         * NB can't call srpc_service_recycle_buffer here since
1533                         * it may call LNetM[DE]Attach. The invalid magic tells
1534                         * srpc_handle_rpc to drop this RPC
1535                         */
1536                        msg->msg_magic = 0;
1537                }
1538
1539                if (!list_empty(&scd->scd_rpc_free)) {
1540                        srpc = list_entry(scd->scd_rpc_free.next,
1541                                          struct srpc_server_rpc,
1542                                          srpc_list);
1543                        list_del(&srpc->srpc_list);
1544
1545                        srpc_init_server_rpc(srpc, scd, buffer);
1546                        list_add_tail(&srpc->srpc_list,
1547                                      &scd->scd_rpc_active);
1548                        swi_schedule_workitem(&srpc->srpc_wi);
1549                } else {
1550                        list_add_tail(&buffer->buf_list,
1551                                      &scd->scd_buf_blocked);
1552                }
1553
1554                spin_unlock(&scd->scd_lock);
1555
1556                spin_lock(&srpc_data.rpc_glock);
1557                srpc_data.rpc_counters.rpcs_rcvd++;
1558                spin_unlock(&srpc_data.rpc_glock);
1559                break;
1560
1561        case SRPC_BULK_GET_RPLD:
1562                LASSERT(ev->type == LNET_EVENT_SEND ||
1563                        ev->type == LNET_EVENT_REPLY ||
1564                        ev->type == LNET_EVENT_UNLINK);
1565
1566                if (!ev->unlinked)
1567                        break; /* wait for final event */
1568
1569        case SRPC_BULK_PUT_SENT:
1570                if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
1571                        spin_lock(&srpc_data.rpc_glock);
1572
1573                        if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1574                                srpc_data.rpc_counters.bulk_get += ev->mlength;
1575                        else
1576                                srpc_data.rpc_counters.bulk_put += ev->mlength;
1577
1578                        spin_unlock(&srpc_data.rpc_glock);
1579                }
1580        case SRPC_REPLY_SENT:
1581                srpc = rpcev->ev_data;
1582                scd = srpc->srpc_scd;
1583
1584                LASSERT(rpcev == &srpc->srpc_ev);
1585
1586                spin_lock(&scd->scd_lock);
1587
1588                rpcev->ev_fired = 1;
1589                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1590                                   -EINTR : ev->status;
1591                swi_schedule_workitem(&srpc->srpc_wi);
1592
1593                spin_unlock(&scd->scd_lock);
1594                break;
1595        }
1596}
1597
1598int
1599srpc_startup(void)
1600{
1601        int rc;
1602
1603        memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1604        spin_lock_init(&srpc_data.rpc_glock);
1605
1606        /* 1 second pause to avoid timestamp reuse */
1607        set_current_state(TASK_UNINTERRUPTIBLE);
1608        schedule_timeout(cfs_time_seconds(1));
1609        srpc_data.rpc_matchbits = ((__u64)ktime_get_real_seconds()) << 48;
1610
1611        srpc_data.rpc_state = SRPC_STATE_NONE;
1612
1613        rc = LNetNIInit(LNET_PID_LUSTRE);
1614        if (rc < 0) {
1615                CERROR("LNetNIInit() has failed: %d\n", rc);
1616                return rc;
1617        }
1618
1619        srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1620
1621        LNetInvalidateHandle(&srpc_data.rpc_lnet_eq);
1622        rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1623        if (rc) {
1624                CERROR("LNetEQAlloc() has failed: %d\n", rc);
1625                goto bail;
1626        }
1627
1628        rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1629        LASSERT(!rc);
1630        rc = LNetSetLazyPortal(SRPC_REQUEST_PORTAL);
1631        LASSERT(!rc);
1632
1633        srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1634
1635        rc = stt_startup();
1636
1637bail:
1638        if (rc)
1639                srpc_shutdown();
1640        else
1641                srpc_data.rpc_state = SRPC_STATE_RUNNING;
1642
1643        return rc;
1644}
1645
1646void
1647srpc_shutdown(void)
1648{
1649        int i;
1650        int rc;
1651        int state;
1652
1653        state = srpc_data.rpc_state;
1654        srpc_data.rpc_state = SRPC_STATE_STOPPING;
1655
1656        switch (state) {
1657        default:
1658                LBUG();
1659        case SRPC_STATE_RUNNING:
1660                spin_lock(&srpc_data.rpc_glock);
1661
1662                for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1663                        struct srpc_service *sv = srpc_data.rpc_services[i];
1664
1665                        LASSERTF(!sv, "service not empty: id %d, name %s\n",
1666                                 i, sv->sv_name);
1667                }
1668
1669                spin_unlock(&srpc_data.rpc_glock);
1670
1671                stt_shutdown();
1672
1673        case SRPC_STATE_EQ_INIT:
1674                rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1675                rc = LNetClearLazyPortal(SRPC_REQUEST_PORTAL);
1676                LASSERT(!rc);
1677                rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1678                LASSERT(!rc); /* the EQ should have no user by now */
1679
1680        case SRPC_STATE_NI_INIT:
1681                LNetNIFini();
1682        }
1683}
1684