linux/drivers/staging/lustre/lnet/selftest/rpc.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lnet/selftest/rpc.c
  37 *
  38 * Author: Isaac Huang <isaac@clusterfs.com>
  39 *
  40 * 2012-05-13: Liang Zhen <liang@whamcloud.com>
  41 * - percpt data for service to improve smp performance
  42 * - code cleanup
  43 */
  44
  45#define DEBUG_SUBSYSTEM S_LNET
  46
  47#include "selftest.h"
  48
  49typedef enum {
  50        SRPC_STATE_NONE,
  51        SRPC_STATE_NI_INIT,
  52        SRPC_STATE_EQ_INIT,
  53        SRPC_STATE_RUNNING,
  54        SRPC_STATE_STOPPING,
  55} srpc_state_t;
  56
  57static struct smoketest_rpc {
  58        spinlock_t       rpc_glock;     /* global lock */
  59        srpc_service_t  *rpc_services[SRPC_SERVICE_MAX_ID + 1];
  60        lnet_handle_eq_t rpc_lnet_eq;   /* _the_ LNet event queue */
  61        srpc_state_t     rpc_state;
  62        srpc_counters_t  rpc_counters;
  63        __u64            rpc_matchbits; /* matchbits counter */
  64} srpc_data;
  65
  66static inline int
  67srpc_serv_portal(int svc_id)
  68{
  69        return svc_id < SRPC_FRAMEWORK_SERVICE_MAX_ID ?
  70               SRPC_FRAMEWORK_REQUEST_PORTAL : SRPC_REQUEST_PORTAL;
  71}
  72
  73/* forward ref's */
  74int srpc_handle_rpc(swi_workitem_t *wi);
  75
  76void srpc_get_counters(srpc_counters_t *cnt)
  77{
  78        spin_lock(&srpc_data.rpc_glock);
  79        *cnt = srpc_data.rpc_counters;
  80        spin_unlock(&srpc_data.rpc_glock);
  81}
  82
  83void srpc_set_counters(const srpc_counters_t *cnt)
  84{
  85        spin_lock(&srpc_data.rpc_glock);
  86        srpc_data.rpc_counters = *cnt;
  87        spin_unlock(&srpc_data.rpc_glock);
  88}
  89
  90static int
  91srpc_add_bulk_page(srpc_bulk_t *bk, struct page *pg, int i, int nob)
  92{
  93        nob = min_t(int, nob, PAGE_CACHE_SIZE);
  94
  95        LASSERT(nob > 0);
  96        LASSERT(i >= 0 && i < bk->bk_niov);
  97
  98        bk->bk_iovs[i].kiov_offset = 0;
  99        bk->bk_iovs[i].kiov_page   = pg;
 100        bk->bk_iovs[i].kiov_len    = nob;
 101        return nob;
 102}
 103
 104void
 105srpc_free_bulk(srpc_bulk_t *bk)
 106{
 107        int i;
 108        struct page *pg;
 109
 110        LASSERT(bk != NULL);
 111
 112        for (i = 0; i < bk->bk_niov; i++) {
 113                pg = bk->bk_iovs[i].kiov_page;
 114                if (pg == NULL)
 115                        break;
 116
 117                __free_page(pg);
 118        }
 119
 120        LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));
 121        return;
 122}
 123
 124srpc_bulk_t *
 125srpc_alloc_bulk(int cpt, unsigned bulk_npg, unsigned bulk_len, int sink)
 126{
 127        srpc_bulk_t *bk;
 128        int i;
 129
 130        LASSERT(bulk_npg > 0 && bulk_npg <= LNET_MAX_IOV);
 131
 132        LIBCFS_CPT_ALLOC(bk, lnet_cpt_table(), cpt,
 133                         offsetof(srpc_bulk_t, bk_iovs[bulk_npg]));
 134        if (bk == NULL) {
 135                CERROR("Can't allocate descriptor for %d pages\n", bulk_npg);
 136                return NULL;
 137        }
 138
 139        memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[bulk_npg]));
 140        bk->bk_sink   = sink;
 141        bk->bk_len    = bulk_len;
 142        bk->bk_niov   = bulk_npg;
 143
 144        for (i = 0; i < bulk_npg; i++) {
 145                struct page *pg;
 146                int nob;
 147
 148                pg = alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
 149                                      GFP_IOFS, 0);
 150                if (pg == NULL) {
 151                        CERROR("Can't allocate page %d of %d\n", i, bulk_npg);
 152                        srpc_free_bulk(bk);
 153                        return NULL;
 154                }
 155
 156                nob = srpc_add_bulk_page(bk, pg, i, bulk_len);
 157                bulk_len -= nob;
 158        }
 159
 160        return bk;
 161}
 162
 163static inline __u64
 164srpc_next_id(void)
 165{
 166        __u64 id;
 167
 168        spin_lock(&srpc_data.rpc_glock);
 169        id = srpc_data.rpc_matchbits++;
 170        spin_unlock(&srpc_data.rpc_glock);
 171        return id;
 172}
 173
 174static void
 175srpc_init_server_rpc(struct srpc_server_rpc *rpc,
 176                     struct srpc_service_cd *scd,
 177                     struct srpc_buffer *buffer)
 178{
 179        memset(rpc, 0, sizeof(*rpc));
 180        swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc,
 181                          srpc_serv_is_framework(scd->scd_svc) ?
 182                          lst_sched_serial : lst_sched_test[scd->scd_cpt]);
 183
 184        rpc->srpc_ev.ev_fired = 1; /* no event expected now */
 185
 186        rpc->srpc_scd      = scd;
 187        rpc->srpc_reqstbuf = buffer;
 188        rpc->srpc_peer     = buffer->buf_peer;
 189        rpc->srpc_self     = buffer->buf_self;
 190        LNetInvalidateHandle(&rpc->srpc_replymdh);
 191}
 192
 193static void
 194srpc_service_fini(struct srpc_service *svc)
 195{
 196        struct srpc_service_cd *scd;
 197        struct srpc_server_rpc *rpc;
 198        struct srpc_buffer *buf;
 199        struct list_head *q;
 200        int i;
 201
 202        if (svc->sv_cpt_data == NULL)
 203                return;
 204
 205        cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
 206                while (1) {
 207                        if (!list_empty(&scd->scd_buf_posted))
 208                                q = &scd->scd_buf_posted;
 209                        else if (!list_empty(&scd->scd_buf_blocked))
 210                                q = &scd->scd_buf_blocked;
 211                        else
 212                                break;
 213
 214                        while (!list_empty(q)) {
 215                                buf = list_entry(q->next,
 216                                                     struct srpc_buffer,
 217                                                     buf_list);
 218                                list_del(&buf->buf_list);
 219                                LIBCFS_FREE(buf, sizeof(*buf));
 220                        }
 221                }
 222
 223                LASSERT(list_empty(&scd->scd_rpc_active));
 224
 225                while (!list_empty(&scd->scd_rpc_free)) {
 226                        rpc = list_entry(scd->scd_rpc_free.next,
 227                                             struct srpc_server_rpc,
 228                                             srpc_list);
 229                        list_del(&rpc->srpc_list);
 230                        LIBCFS_FREE(rpc, sizeof(*rpc));
 231                }
 232        }
 233
 234        cfs_percpt_free(svc->sv_cpt_data);
 235        svc->sv_cpt_data = NULL;
 236}
 237
 238static int
 239srpc_service_nrpcs(struct srpc_service *svc)
 240{
 241        int nrpcs = svc->sv_wi_total / svc->sv_ncpts;
 242
 243        return srpc_serv_is_framework(svc) ?
 244               max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
 245}
 246
 247int srpc_add_buffer(struct swi_workitem *wi);
 248
 249static int
 250srpc_service_init(struct srpc_service *svc)
 251{
 252        struct srpc_service_cd *scd;
 253        struct srpc_server_rpc *rpc;
 254        int nrpcs;
 255        int i;
 256        int j;
 257
 258        svc->sv_shuttingdown = 0;
 259
 260        svc->sv_cpt_data = cfs_percpt_alloc(lnet_cpt_table(),
 261                                            sizeof(struct srpc_service_cd));
 262        if (svc->sv_cpt_data == NULL)
 263                return -ENOMEM;
 264
 265        svc->sv_ncpts = srpc_serv_is_framework(svc) ?
 266                        1 : cfs_cpt_number(lnet_cpt_table());
 267        nrpcs = srpc_service_nrpcs(svc);
 268
 269        cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
 270                scd->scd_cpt = i;
 271                scd->scd_svc = svc;
 272                spin_lock_init(&scd->scd_lock);
 273                INIT_LIST_HEAD(&scd->scd_rpc_free);
 274                INIT_LIST_HEAD(&scd->scd_rpc_active);
 275                INIT_LIST_HEAD(&scd->scd_buf_posted);
 276                INIT_LIST_HEAD(&scd->scd_buf_blocked);
 277
 278                scd->scd_ev.ev_data = scd;
 279                scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
 280
 281                /* NB: don't use lst_sched_serial for adding buffer,
 282                 * see details in srpc_service_add_buffers() */
 283                swi_init_workitem(&scd->scd_buf_wi, scd,
 284                                  srpc_add_buffer, lst_sched_test[i]);
 285
 286                if (i != 0 && srpc_serv_is_framework(svc)) {
 287                        /* NB: framework service only needs srpc_service_cd for
 288                         * one partition, but we allocate for all to make
 289                         * it easier to implement, it will waste a little
 290                         * memory but nobody should care about this */
 291                        continue;
 292                }
 293
 294                for (j = 0; j < nrpcs; j++) {
 295                        LIBCFS_CPT_ALLOC(rpc, lnet_cpt_table(),
 296                                         i, sizeof(*rpc));
 297                        if (rpc == NULL) {
 298                                srpc_service_fini(svc);
 299                                return -ENOMEM;
 300                        }
 301                        list_add(&rpc->srpc_list, &scd->scd_rpc_free);
 302                }
 303        }
 304
 305        return 0;
 306}
 307
 308int
 309srpc_add_service(struct srpc_service *sv)
 310{
 311        int id = sv->sv_id;
 312
 313        LASSERT(0 <= id && id <= SRPC_SERVICE_MAX_ID);
 314
 315        if (srpc_service_init(sv) != 0)
 316                return -ENOMEM;
 317
 318        spin_lock(&srpc_data.rpc_glock);
 319
 320        LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
 321
 322        if (srpc_data.rpc_services[id] != NULL) {
 323                spin_unlock(&srpc_data.rpc_glock);
 324                goto failed;
 325        }
 326
 327        srpc_data.rpc_services[id] = sv;
 328        spin_unlock(&srpc_data.rpc_glock);
 329
 330        CDEBUG(D_NET, "Adding service: id %d, name %s\n", id, sv->sv_name);
 331        return 0;
 332
 333 failed:
 334        srpc_service_fini(sv);
 335        return -EBUSY;
 336}
 337
 338int
 339srpc_remove_service(srpc_service_t *sv)
 340{
 341        int id = sv->sv_id;
 342
 343        spin_lock(&srpc_data.rpc_glock);
 344
 345        if (srpc_data.rpc_services[id] != sv) {
 346                spin_unlock(&srpc_data.rpc_glock);
 347                return -ENOENT;
 348        }
 349
 350        srpc_data.rpc_services[id] = NULL;
 351        spin_unlock(&srpc_data.rpc_glock);
 352        return 0;
 353}
 354
 355static int
 356srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf,
 357                       int len, int options, lnet_process_id_t peer,
 358                       lnet_handle_md_t *mdh, srpc_event_t *ev)
 359{
 360        int rc;
 361        lnet_md_t md;
 362        lnet_handle_me_t meh;
 363
 364        rc = LNetMEAttach(portal, peer, matchbits, 0, LNET_UNLINK,
 365                          local ? LNET_INS_LOCAL : LNET_INS_AFTER, &meh);
 366        if (rc != 0) {
 367                CERROR("LNetMEAttach failed: %d\n", rc);
 368                LASSERT(rc == -ENOMEM);
 369                return -ENOMEM;
 370        }
 371
 372        md.threshold = 1;
 373        md.user_ptr  = ev;
 374        md.start     = buf;
 375        md.length    = len;
 376        md.options   = options;
 377        md.eq_handle = srpc_data.rpc_lnet_eq;
 378
 379        rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
 380        if (rc != 0) {
 381                CERROR("LNetMDAttach failed: %d\n", rc);
 382                LASSERT(rc == -ENOMEM);
 383
 384                rc = LNetMEUnlink(meh);
 385                LASSERT(rc == 0);
 386                return -ENOMEM;
 387        }
 388
 389        CDEBUG(D_NET,
 390                "Posted passive RDMA: peer %s, portal %d, matchbits %#llx\n",
 391                libcfs_id2str(peer), portal, matchbits);
 392        return 0;
 393}
 394
 395static int
 396srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
 397                      int options, lnet_process_id_t peer, lnet_nid_t self,
 398                      lnet_handle_md_t *mdh, srpc_event_t *ev)
 399{
 400        int rc;
 401        lnet_md_t md;
 402
 403        md.user_ptr  = ev;
 404        md.start     = buf;
 405        md.length    = len;
 406        md.eq_handle = srpc_data.rpc_lnet_eq;
 407        md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1;
 408        md.options   = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
 409
 410        rc = LNetMDBind(md, LNET_UNLINK, mdh);
 411        if (rc != 0) {
 412                CERROR("LNetMDBind failed: %d\n", rc);
 413                LASSERT(rc == -ENOMEM);
 414                return -ENOMEM;
 415        }
 416
 417        /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
 418         * they're only meaningful for MDs attached to an ME (i.e. passive
 419         * buffers... */
 420        if ((options & LNET_MD_OP_PUT) != 0) {
 421                rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
 422                             portal, matchbits, 0, 0);
 423        } else {
 424                LASSERT((options & LNET_MD_OP_GET) != 0);
 425
 426                rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
 427        }
 428
 429        if (rc != 0) {
 430                CERROR("LNet%s(%s, %d, %lld) failed: %d\n",
 431                        ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get",
 432                        libcfs_id2str(peer), portal, matchbits, rc);
 433
 434                /* The forthcoming unlink event will complete this operation
 435                 * with failure, so fall through and return success here.
 436                 */
 437                rc = LNetMDUnlink(*mdh);
 438                LASSERT(rc == 0);
 439        } else {
 440                CDEBUG(D_NET,
 441                        "Posted active RDMA: peer %s, portal %u, matchbits %#llx\n",
 442                        libcfs_id2str(peer), portal, matchbits);
 443        }
 444        return 0;
 445}
 446
 447static int
 448srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
 449                        int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
 450{
 451        return srpc_post_active_rdma(srpc_serv_portal(service), service,
 452                                     buf, len, LNET_MD_OP_PUT, peer,
 453                                     LNET_NID_ANY, mdh, ev);
 454}
 455
 456static int
 457srpc_post_passive_rqtbuf(int service, int local, void *buf, int len,
 458                         lnet_handle_md_t *mdh, srpc_event_t *ev)
 459{
 460        lnet_process_id_t any = {0};
 461
 462        any.nid = LNET_NID_ANY;
 463        any.pid = LNET_PID_ANY;
 464
 465        return srpc_post_passive_rdma(srpc_serv_portal(service),
 466                                      local, service, buf, len,
 467                                      LNET_MD_OP_PUT, any, mdh, ev);
 468}
 469
 470static int
 471srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
 472        __must_hold(&scd->scd_lock)
 473{
 474        struct srpc_service *sv = scd->scd_svc;
 475        struct srpc_msg *msg = &buf->buf_msg;
 476        int rc;
 477
 478        LNetInvalidateHandle(&buf->buf_mdh);
 479        list_add(&buf->buf_list, &scd->scd_buf_posted);
 480        scd->scd_buf_nposted++;
 481        spin_unlock(&scd->scd_lock);
 482
 483        rc = srpc_post_passive_rqtbuf(sv->sv_id,
 484                                      !srpc_serv_is_framework(sv),
 485                                      msg, sizeof(*msg), &buf->buf_mdh,
 486                                      &scd->scd_ev);
 487
 488        /* At this point, a RPC (new or delayed) may have arrived in
 489         * msg and its event handler has been called. So we must add
 490         * buf to scd_buf_posted _before_ dropping scd_lock */
 491
 492        spin_lock(&scd->scd_lock);
 493
 494        if (rc == 0) {
 495                if (!sv->sv_shuttingdown)
 496                        return 0;
 497
 498                spin_unlock(&scd->scd_lock);
 499                /* srpc_shutdown_service might have tried to unlink me
 500                 * when my buf_mdh was still invalid */
 501                LNetMDUnlink(buf->buf_mdh);
 502                spin_lock(&scd->scd_lock);
 503                return 0;
 504        }
 505
 506        scd->scd_buf_nposted--;
 507        if (sv->sv_shuttingdown)
 508                return rc; /* don't allow to change scd_buf_posted */
 509
 510        list_del(&buf->buf_list);
 511        spin_unlock(&scd->scd_lock);
 512
 513        LIBCFS_FREE(buf, sizeof(*buf));
 514
 515        spin_lock(&scd->scd_lock);
 516        return rc;
 517}
 518
 519int
 520srpc_add_buffer(struct swi_workitem *wi)
 521{
 522        struct srpc_service_cd *scd = wi->swi_workitem.wi_data;
 523        struct srpc_buffer *buf;
 524        int rc = 0;
 525
 526        /* it's called by workitem scheduler threads, these threads
 527         * should have been set CPT affinity, so buffers will be posted
 528         * on CPT local list of Portal */
 529        spin_lock(&scd->scd_lock);
 530
 531        while (scd->scd_buf_adjust > 0 &&
 532               !scd->scd_svc->sv_shuttingdown) {
 533                scd->scd_buf_adjust--; /* consume it */
 534                scd->scd_buf_posting++;
 535
 536                spin_unlock(&scd->scd_lock);
 537
 538                LIBCFS_ALLOC(buf, sizeof(*buf));
 539                if (buf == NULL) {
 540                        CERROR("Failed to add new buf to service: %s\n",
 541                               scd->scd_svc->sv_name);
 542                        spin_lock(&scd->scd_lock);
 543                        rc = -ENOMEM;
 544                        break;
 545                }
 546
 547                spin_lock(&scd->scd_lock);
 548                if (scd->scd_svc->sv_shuttingdown) {
 549                        spin_unlock(&scd->scd_lock);
 550                        LIBCFS_FREE(buf, sizeof(*buf));
 551
 552                        spin_lock(&scd->scd_lock);
 553                        rc = -ESHUTDOWN;
 554                        break;
 555                }
 556
 557                rc = srpc_service_post_buffer(scd, buf);
 558                if (rc != 0)
 559                        break; /* buf has been freed inside */
 560
 561                LASSERT(scd->scd_buf_posting > 0);
 562                scd->scd_buf_posting--;
 563                scd->scd_buf_total++;
 564                scd->scd_buf_low = max(2, scd->scd_buf_total / 4);
 565        }
 566
 567        if (rc != 0) {
 568                scd->scd_buf_err_stamp = get_seconds();
 569                scd->scd_buf_err = rc;
 570
 571                LASSERT(scd->scd_buf_posting > 0);
 572                scd->scd_buf_posting--;
 573        }
 574
 575        spin_unlock(&scd->scd_lock);
 576        return 0;
 577}
 578
 579int
 580srpc_service_add_buffers(struct srpc_service *sv, int nbuffer)
 581{
 582        struct srpc_service_cd *scd;
 583        int rc = 0;
 584        int i;
 585
 586        LASSERTF(nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer);
 587
 588        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 589                spin_lock(&scd->scd_lock);
 590
 591                scd->scd_buf_err = 0;
 592                scd->scd_buf_err_stamp = 0;
 593                scd->scd_buf_posting = 0;
 594                scd->scd_buf_adjust = nbuffer;
 595                /* start to post buffers */
 596                swi_schedule_workitem(&scd->scd_buf_wi);
 597                spin_unlock(&scd->scd_lock);
 598
 599                /* framework service only post buffer for one partition  */
 600                if (srpc_serv_is_framework(sv))
 601                        break;
 602        }
 603
 604        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 605                spin_lock(&scd->scd_lock);
 606                /*
 607                 * NB: srpc_service_add_buffers() can be called inside
 608                 * thread context of lst_sched_serial, and we don't normally
 609                 * allow to sleep inside thread context of WI scheduler
 610                 * because it will block current scheduler thread from doing
 611                 * anything else, even worse, it could deadlock if it's
 612                 * waiting on result from another WI of the same scheduler.
 613                 * However, it's safe at here because scd_buf_wi is scheduled
 614                 * by thread in a different WI scheduler (lst_sched_test),
 615                 * so we don't have any risk of deadlock, though this could
 616                 * block all WIs pending on lst_sched_serial for a moment
 617                 * which is not good but not fatal.
 618                 */
 619                lst_wait_until(scd->scd_buf_err != 0 ||
 620                               (scd->scd_buf_adjust == 0 &&
 621                                scd->scd_buf_posting == 0),
 622                               scd->scd_lock, "waiting for adding buffer\n");
 623
 624                if (scd->scd_buf_err != 0 && rc == 0)
 625                        rc = scd->scd_buf_err;
 626
 627                spin_unlock(&scd->scd_lock);
 628        }
 629
 630        return rc;
 631}
 632
 633void
 634srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer)
 635{
 636        struct srpc_service_cd *scd;
 637        int num;
 638        int i;
 639
 640        LASSERT(!sv->sv_shuttingdown);
 641
 642        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 643                spin_lock(&scd->scd_lock);
 644
 645                num = scd->scd_buf_total + scd->scd_buf_posting;
 646                scd->scd_buf_adjust -= min(nbuffer, num);
 647
 648                spin_unlock(&scd->scd_lock);
 649        }
 650}
 651
 652/* returns 1 if sv has finished, otherwise 0 */
 653int
 654srpc_finish_service(struct srpc_service *sv)
 655{
 656        struct srpc_service_cd *scd;
 657        struct srpc_server_rpc *rpc;
 658        int i;
 659
 660        LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
 661
 662        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 663                spin_lock(&scd->scd_lock);
 664                if (!swi_deschedule_workitem(&scd->scd_buf_wi)) {
 665                        spin_unlock(&scd->scd_lock);
 666                        return 0;
 667                }
 668
 669                if (scd->scd_buf_nposted > 0) {
 670                        CDEBUG(D_NET, "waiting for %d posted buffers to unlink",
 671                               scd->scd_buf_nposted);
 672                        spin_unlock(&scd->scd_lock);
 673                        return 0;
 674                }
 675
 676                if (list_empty(&scd->scd_rpc_active)) {
 677                        spin_unlock(&scd->scd_lock);
 678                        continue;
 679                }
 680
 681                rpc = list_entry(scd->scd_rpc_active.next,
 682                                     struct srpc_server_rpc, srpc_list);
 683                CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n",
 684                        rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
 685                        swi_state2str(rpc->srpc_wi.swi_state),
 686                        rpc->srpc_wi.swi_workitem.wi_scheduled,
 687                        rpc->srpc_wi.swi_workitem.wi_running,
 688                        rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
 689                        rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
 690                spin_unlock(&scd->scd_lock);
 691                return 0;
 692        }
 693
 694        /* no lock needed from now on */
 695        srpc_service_fini(sv);
 696        return 1;
 697}
 698
 699/* called with sv->sv_lock held */
 700static void
 701srpc_service_recycle_buffer(struct srpc_service_cd *scd, srpc_buffer_t *buf)
 702        __must_hold(&scd->scd_lock)
 703{
 704        if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) {
 705                if (srpc_service_post_buffer(scd, buf) != 0) {
 706                        CWARN("Failed to post %s buffer\n",
 707                              scd->scd_svc->sv_name);
 708                }
 709                return;
 710        }
 711
 712        /* service is shutting down, or we want to recycle some buffers */
 713        scd->scd_buf_total--;
 714
 715        if (scd->scd_buf_adjust < 0) {
 716                scd->scd_buf_adjust++;
 717                if (scd->scd_buf_adjust < 0 &&
 718                    scd->scd_buf_total == 0 && scd->scd_buf_posting == 0) {
 719                        CDEBUG(D_INFO,
 720                               "Try to recycle %d buffers but nothing left\n",
 721                               scd->scd_buf_adjust);
 722                        scd->scd_buf_adjust = 0;
 723                }
 724        }
 725
 726        spin_unlock(&scd->scd_lock);
 727        LIBCFS_FREE(buf, sizeof(*buf));
 728        spin_lock(&scd->scd_lock);
 729}
 730
 731void
 732srpc_abort_service(struct srpc_service *sv)
 733{
 734        struct srpc_service_cd *scd;
 735        struct srpc_server_rpc *rpc;
 736        int i;
 737
 738        CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
 739               sv->sv_id, sv->sv_name);
 740
 741        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 742                spin_lock(&scd->scd_lock);
 743
 744                /* schedule in-flight RPCs to notice the abort, NB:
 745                 * racing with incoming RPCs; complete fix should make test
 746                 * RPCs carry session ID in its headers */
 747                list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
 748                        rpc->srpc_aborted = 1;
 749                        swi_schedule_workitem(&rpc->srpc_wi);
 750                }
 751
 752                spin_unlock(&scd->scd_lock);
 753        }
 754}
 755
 756void
 757srpc_shutdown_service(srpc_service_t *sv)
 758{
 759        struct srpc_service_cd *scd;
 760        struct srpc_server_rpc *rpc;
 761        srpc_buffer_t *buf;
 762        int i;
 763
 764        CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
 765               sv->sv_id, sv->sv_name);
 766
 767        cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
 768                spin_lock(&scd->scd_lock);
 769
 770        sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
 771
 772        cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
 773                spin_unlock(&scd->scd_lock);
 774
 775        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 776                spin_lock(&scd->scd_lock);
 777
 778                /* schedule in-flight RPCs to notice the shutdown */
 779                list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
 780                        swi_schedule_workitem(&rpc->srpc_wi);
 781
 782                spin_unlock(&scd->scd_lock);
 783
 784                /* OK to traverse scd_buf_posted without lock, since no one
 785                 * touches scd_buf_posted now */
 786                list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
 787                        LNetMDUnlink(buf->buf_mdh);
 788        }
 789}
 790
 791static int
 792srpc_send_request(srpc_client_rpc_t *rpc)
 793{
 794        srpc_event_t *ev = &rpc->crpc_reqstev;
 795        int rc;
 796
 797        ev->ev_fired = 0;
 798        ev->ev_data  = rpc;
 799        ev->ev_type  = SRPC_REQUEST_SENT;
 800
 801        rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service,
 802                                     &rpc->crpc_reqstmsg, sizeof(srpc_msg_t),
 803                                     &rpc->crpc_reqstmdh, ev);
 804        if (rc != 0) {
 805                LASSERT(rc == -ENOMEM);
 806                ev->ev_fired = 1;  /* no more event expected */
 807        }
 808        return rc;
 809}
 810
 811static int
 812srpc_prepare_reply(srpc_client_rpc_t *rpc)
 813{
 814        srpc_event_t *ev = &rpc->crpc_replyev;
 815        __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
 816        int rc;
 817
 818        ev->ev_fired = 0;
 819        ev->ev_data  = rpc;
 820        ev->ev_type  = SRPC_REPLY_RCVD;
 821
 822        *id = srpc_next_id();
 823
 824        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
 825                                    &rpc->crpc_replymsg, sizeof(srpc_msg_t),
 826                                    LNET_MD_OP_PUT, rpc->crpc_dest,
 827                                    &rpc->crpc_replymdh, ev);
 828        if (rc != 0) {
 829                LASSERT(rc == -ENOMEM);
 830                ev->ev_fired = 1;  /* no more event expected */
 831        }
 832        return rc;
 833}
 834
 835static int
 836srpc_prepare_bulk(srpc_client_rpc_t *rpc)
 837{
 838        srpc_bulk_t *bk = &rpc->crpc_bulk;
 839        srpc_event_t *ev = &rpc->crpc_bulkev;
 840        __u64   *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
 841        int rc;
 842        int opt;
 843
 844        LASSERT(bk->bk_niov <= LNET_MAX_IOV);
 845
 846        if (bk->bk_niov == 0)
 847                return 0; /* nothing to do */
 848
 849        opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
 850        opt |= LNET_MD_KIOV;
 851
 852        ev->ev_fired = 0;
 853        ev->ev_data  = rpc;
 854        ev->ev_type  = SRPC_BULK_REQ_RCVD;
 855
 856        *id = srpc_next_id();
 857
 858        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
 859                                    &bk->bk_iovs[0], bk->bk_niov, opt,
 860                                    rpc->crpc_dest, &bk->bk_mdh, ev);
 861        if (rc != 0) {
 862                LASSERT(rc == -ENOMEM);
 863                ev->ev_fired = 1;  /* no more event expected */
 864        }
 865        return rc;
 866}
 867
 868static int
 869srpc_do_bulk(srpc_server_rpc_t *rpc)
 870{
 871        srpc_event_t *ev = &rpc->srpc_ev;
 872        srpc_bulk_t *bk = rpc->srpc_bulk;
 873        __u64 id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
 874        int rc;
 875        int opt;
 876
 877        LASSERT(bk != NULL);
 878
 879        opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
 880        opt |= LNET_MD_KIOV;
 881
 882        ev->ev_fired = 0;
 883        ev->ev_data  = rpc;
 884        ev->ev_type  = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
 885
 886        rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
 887                                   &bk->bk_iovs[0], bk->bk_niov, opt,
 888                                   rpc->srpc_peer, rpc->srpc_self,
 889                                   &bk->bk_mdh, ev);
 890        if (rc != 0)
 891                ev->ev_fired = 1;  /* no more event expected */
 892        return rc;
 893}
 894
 895/* only called from srpc_handle_rpc */
 896static void
 897srpc_server_rpc_done(srpc_server_rpc_t *rpc, int status)
 898{
 899        struct srpc_service_cd *scd = rpc->srpc_scd;
 900        struct srpc_service *sv  = scd->scd_svc;
 901        srpc_buffer_t *buffer;
 902
 903        LASSERT(status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
 904
 905        rpc->srpc_status = status;
 906
 907        CDEBUG_LIMIT(status == 0 ? D_NET : D_NETERROR,
 908                "Server RPC %p done: service %s, peer %s, status %s:%d\n",
 909                rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
 910                swi_state2str(rpc->srpc_wi.swi_state), status);
 911
 912        if (status != 0) {
 913                spin_lock(&srpc_data.rpc_glock);
 914                srpc_data.rpc_counters.rpcs_dropped++;
 915                spin_unlock(&srpc_data.rpc_glock);
 916        }
 917
 918        if (rpc->srpc_done != NULL)
 919                (*rpc->srpc_done) (rpc);
 920        LASSERT(rpc->srpc_bulk == NULL);
 921
 922        spin_lock(&scd->scd_lock);
 923
 924        if (rpc->srpc_reqstbuf != NULL) {
 925                /* NB might drop sv_lock in srpc_service_recycle_buffer, but
 926                 * sv won't go away for scd_rpc_active must not be empty */
 927                srpc_service_recycle_buffer(scd, rpc->srpc_reqstbuf);
 928                rpc->srpc_reqstbuf = NULL;
 929        }
 930
 931        list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
 932
 933        /*
 934         * No one can schedule me now since:
 935         * - I'm not on scd_rpc_active.
 936         * - all LNet events have been fired.
 937         * Cancel pending schedules and prevent future schedule attempts:
 938         */
 939        LASSERT(rpc->srpc_ev.ev_fired);
 940        swi_exit_workitem(&rpc->srpc_wi);
 941
 942        if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
 943                buffer = list_entry(scd->scd_buf_blocked.next,
 944                                        srpc_buffer_t, buf_list);
 945                list_del(&buffer->buf_list);
 946
 947                srpc_init_server_rpc(rpc, scd, buffer);
 948                list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
 949                swi_schedule_workitem(&rpc->srpc_wi);
 950        } else {
 951                list_add(&rpc->srpc_list, &scd->scd_rpc_free);
 952        }
 953
 954        spin_unlock(&scd->scd_lock);
 955        return;
 956}
 957
 958/* handles an incoming RPC */
 959int
 960srpc_handle_rpc(swi_workitem_t *wi)
 961{
 962        struct srpc_server_rpc *rpc = wi->swi_workitem.wi_data;
 963        struct srpc_service_cd *scd = rpc->srpc_scd;
 964        struct srpc_service *sv = scd->scd_svc;
 965        srpc_event_t *ev = &rpc->srpc_ev;
 966        int rc = 0;
 967
 968        LASSERT(wi == &rpc->srpc_wi);
 969
 970        spin_lock(&scd->scd_lock);
 971
 972        if (sv->sv_shuttingdown || rpc->srpc_aborted) {
 973                spin_unlock(&scd->scd_lock);
 974
 975                if (rpc->srpc_bulk != NULL)
 976                        LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
 977                LNetMDUnlink(rpc->srpc_replymdh);
 978
 979                if (ev->ev_fired) { /* no more event, OK to finish */
 980                        srpc_server_rpc_done(rpc, -ESHUTDOWN);
 981                        return 1;
 982                }
 983                return 0;
 984        }
 985
 986        spin_unlock(&scd->scd_lock);
 987
 988        switch (wi->swi_state) {
 989        default:
 990                LBUG();
 991        case SWI_STATE_NEWBORN: {
 992                srpc_msg_t *msg;
 993                srpc_generic_reply_t *reply;
 994
 995                msg = &rpc->srpc_reqstbuf->buf_msg;
 996                reply = &rpc->srpc_replymsg.msg_body.reply;
 997
 998                if (msg->msg_magic == 0) {
 999                        /* moaned already in srpc_lnet_ev_handler */
1000                        srpc_server_rpc_done(rpc, EBADMSG);
1001                        return 1;
1002                }
1003
1004                srpc_unpack_msg_hdr(msg);
1005                if (msg->msg_version != SRPC_MSG_VERSION) {
1006                        CWARN("Version mismatch: %u, %u expected, from %s\n",
1007                              msg->msg_version, SRPC_MSG_VERSION,
1008                              libcfs_id2str(rpc->srpc_peer));
1009                        reply->status = EPROTO;
1010                        /* drop through and send reply */
1011                } else {
1012                        reply->status = 0;
1013                        rc = (*sv->sv_handler)(rpc);
1014                        LASSERT(reply->status == 0 || !rpc->srpc_bulk);
1015                        if (rc != 0) {
1016                                srpc_server_rpc_done(rpc, rc);
1017                                return 1;
1018                        }
1019                }
1020
1021                wi->swi_state = SWI_STATE_BULK_STARTED;
1022
1023                if (rpc->srpc_bulk != NULL) {
1024                        rc = srpc_do_bulk(rpc);
1025                        if (rc == 0)
1026                                return 0; /* wait for bulk */
1027
1028                        LASSERT(ev->ev_fired);
1029                        ev->ev_status = rc;
1030                }
1031        }
1032        case SWI_STATE_BULK_STARTED:
1033                LASSERT(rpc->srpc_bulk == NULL || ev->ev_fired);
1034
1035                if (rpc->srpc_bulk != NULL) {
1036                        rc = ev->ev_status;
1037
1038                        if (sv->sv_bulk_ready != NULL)
1039                                rc = (*sv->sv_bulk_ready) (rpc, rc);
1040
1041                        if (rc != 0) {
1042                                srpc_server_rpc_done(rpc, rc);
1043                                return 1;
1044                        }
1045                }
1046
1047                wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
1048                rc = srpc_send_reply(rpc);
1049                if (rc == 0)
1050                        return 0; /* wait for reply */
1051                srpc_server_rpc_done(rpc, rc);
1052                return 1;
1053
1054        case SWI_STATE_REPLY_SUBMITTED:
1055                if (!ev->ev_fired) {
1056                        CERROR("RPC %p: bulk %p, service %d\n",
1057                               rpc, rpc->srpc_bulk, sv->sv_id);
1058                        CERROR("Event: status %d, type %d, lnet %d\n",
1059                               ev->ev_status, ev->ev_type, ev->ev_lnet);
1060                        LASSERT(ev->ev_fired);
1061                }
1062
1063                wi->swi_state = SWI_STATE_DONE;
1064                srpc_server_rpc_done(rpc, ev->ev_status);
1065                return 1;
1066        }
1067
1068        return 0;
1069}
1070
1071static void
1072srpc_client_rpc_expired(void *data)
1073{
1074        srpc_client_rpc_t *rpc = data;
1075
1076        CWARN("Client RPC expired: service %d, peer %s, timeout %d.\n",
1077               rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1078               rpc->crpc_timeout);
1079
1080        spin_lock(&rpc->crpc_lock);
1081
1082        rpc->crpc_timeout = 0;
1083        srpc_abort_rpc(rpc, -ETIMEDOUT);
1084
1085        spin_unlock(&rpc->crpc_lock);
1086
1087        spin_lock(&srpc_data.rpc_glock);
1088        srpc_data.rpc_counters.rpcs_expired++;
1089        spin_unlock(&srpc_data.rpc_glock);
1090}
1091
1092inline void
1093srpc_add_client_rpc_timer(srpc_client_rpc_t *rpc)
1094{
1095        stt_timer_t *timer = &rpc->crpc_timer;
1096
1097        if (rpc->crpc_timeout == 0)
1098                return;
1099
1100        INIT_LIST_HEAD(&timer->stt_list);
1101        timer->stt_data    = rpc;
1102        timer->stt_func    = srpc_client_rpc_expired;
1103        timer->stt_expires = cfs_time_add(rpc->crpc_timeout,
1104                                          get_seconds());
1105        stt_add_timer(timer);
1106        return;
1107}
1108
1109/*
1110 * Called with rpc->crpc_lock held.
1111 *
1112 * Upon exit the RPC expiry timer is not queued and the handler is not
1113 * running on any CPU. */
1114static void
1115srpc_del_client_rpc_timer(srpc_client_rpc_t *rpc)
1116{
1117        /* timer not planted or already exploded */
1118        if (rpc->crpc_timeout == 0)
1119                return;
1120
1121        /* timer successfully defused */
1122        if (stt_del_timer(&rpc->crpc_timer))
1123                return;
1124
1125        /* timer detonated, wait for it to explode */
1126        while (rpc->crpc_timeout != 0) {
1127                spin_unlock(&rpc->crpc_lock);
1128
1129                schedule();
1130
1131                spin_lock(&rpc->crpc_lock);
1132        }
1133}
1134
1135static void
1136srpc_client_rpc_done(srpc_client_rpc_t *rpc, int status)
1137{
1138        swi_workitem_t *wi = &rpc->crpc_wi;
1139
1140        LASSERT(status != 0 || wi->swi_state == SWI_STATE_DONE);
1141
1142        spin_lock(&rpc->crpc_lock);
1143
1144        rpc->crpc_closed = 1;
1145        if (rpc->crpc_status == 0)
1146                rpc->crpc_status = status;
1147
1148        srpc_del_client_rpc_timer(rpc);
1149
1150        CDEBUG_LIMIT((status == 0) ? D_NET : D_NETERROR,
1151                "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1152                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1153                swi_state2str(wi->swi_state), rpc->crpc_aborted, status);
1154
1155        /*
1156         * No one can schedule me now since:
1157         * - RPC timer has been defused.
1158         * - all LNet events have been fired.
1159         * - crpc_closed has been set, preventing srpc_abort_rpc from
1160         *   scheduling me.
1161         * Cancel pending schedules and prevent future schedule attempts:
1162         */
1163        LASSERT(!srpc_event_pending(rpc));
1164        swi_exit_workitem(wi);
1165
1166        spin_unlock(&rpc->crpc_lock);
1167
1168        (*rpc->crpc_done)(rpc);
1169        return;
1170}
1171
1172/* sends an outgoing RPC */
1173int
1174srpc_send_rpc(swi_workitem_t *wi)
1175{
1176        int rc = 0;
1177        srpc_client_rpc_t *rpc;
1178        srpc_msg_t *reply;
1179        int do_bulk;
1180
1181        LASSERT(wi != NULL);
1182
1183        rpc = wi->swi_workitem.wi_data;
1184
1185        LASSERT(rpc != NULL);
1186        LASSERT(wi == &rpc->crpc_wi);
1187
1188        reply = &rpc->crpc_replymsg;
1189        do_bulk = rpc->crpc_bulk.bk_niov > 0;
1190
1191        spin_lock(&rpc->crpc_lock);
1192
1193        if (rpc->crpc_aborted) {
1194                spin_unlock(&rpc->crpc_lock);
1195                goto abort;
1196        }
1197
1198        spin_unlock(&rpc->crpc_lock);
1199
1200        switch (wi->swi_state) {
1201        default:
1202                LBUG();
1203        case SWI_STATE_NEWBORN:
1204                LASSERT(!srpc_event_pending(rpc));
1205
1206                rc = srpc_prepare_reply(rpc);
1207                if (rc != 0) {
1208                        srpc_client_rpc_done(rpc, rc);
1209                        return 1;
1210                }
1211
1212                rc = srpc_prepare_bulk(rpc);
1213                if (rc != 0)
1214                        break;
1215
1216                wi->swi_state = SWI_STATE_REQUEST_SUBMITTED;
1217                rc = srpc_send_request(rpc);
1218                break;
1219
1220        case SWI_STATE_REQUEST_SUBMITTED:
1221                /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1222                 * order; however, they're processed in a strict order:
1223                 * rqt, rpy, and bulk. */
1224                if (!rpc->crpc_reqstev.ev_fired)
1225                        break;
1226
1227                rc = rpc->crpc_reqstev.ev_status;
1228                if (rc != 0)
1229                        break;
1230
1231                wi->swi_state = SWI_STATE_REQUEST_SENT;
1232                /* perhaps more events, fall thru */
1233        case SWI_STATE_REQUEST_SENT: {
1234                srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);
1235
1236                if (!rpc->crpc_replyev.ev_fired)
1237                        break;
1238
1239                rc = rpc->crpc_replyev.ev_status;
1240                if (rc != 0)
1241                        break;
1242
1243                srpc_unpack_msg_hdr(reply);
1244                if (reply->msg_type != type ||
1245                    (reply->msg_magic != SRPC_MSG_MAGIC &&
1246                     reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1247                        CWARN("Bad message from %s: type %u (%d expected), magic %u (%d expected).\n",
1248                              libcfs_id2str(rpc->crpc_dest),
1249                              reply->msg_type, type,
1250                              reply->msg_magic, SRPC_MSG_MAGIC);
1251                        rc = -EBADMSG;
1252                        break;
1253                }
1254
1255                if (do_bulk && reply->msg_body.reply.status != 0) {
1256                        CWARN("Remote error %d at %s, unlink bulk buffer in case peer didn't initiate bulk transfer\n",
1257                              reply->msg_body.reply.status,
1258                              libcfs_id2str(rpc->crpc_dest));
1259                        LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1260                }
1261
1262                wi->swi_state = SWI_STATE_REPLY_RECEIVED;
1263        }
1264        case SWI_STATE_REPLY_RECEIVED:
1265                if (do_bulk && !rpc->crpc_bulkev.ev_fired)
1266                        break;
1267
1268                rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1269
1270                /* Bulk buffer was unlinked due to remote error. Clear error
1271                 * since reply buffer still contains valid data.
1272                 * NB rpc->crpc_done shouldn't look into bulk data in case of
1273                 * remote error. */
1274                if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1275                    rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)
1276                        rc = 0;
1277
1278                wi->swi_state = SWI_STATE_DONE;
1279                srpc_client_rpc_done(rpc, rc);
1280                return 1;
1281        }
1282
1283        if (rc != 0) {
1284                spin_lock(&rpc->crpc_lock);
1285                srpc_abort_rpc(rpc, rc);
1286                spin_unlock(&rpc->crpc_lock);
1287        }
1288
1289abort:
1290        if (rpc->crpc_aborted) {
1291                LNetMDUnlink(rpc->crpc_reqstmdh);
1292                LNetMDUnlink(rpc->crpc_replymdh);
1293                LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1294
1295                if (!srpc_event_pending(rpc)) {
1296                        srpc_client_rpc_done(rpc, -EINTR);
1297                        return 1;
1298                }
1299        }
1300        return 0;
1301}
1302
1303srpc_client_rpc_t *
1304srpc_create_client_rpc(lnet_process_id_t peer, int service,
1305                        int nbulkiov, int bulklen,
1306                        void (*rpc_done)(srpc_client_rpc_t *),
1307                        void (*rpc_fini)(srpc_client_rpc_t *), void *priv)
1308{
1309        srpc_client_rpc_t *rpc;
1310
1311        LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
1312                                   crpc_bulk.bk_iovs[nbulkiov]));
1313        if (rpc == NULL)
1314                return NULL;
1315
1316        srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1317                             bulklen, rpc_done, rpc_fini, priv);
1318        return rpc;
1319}
1320
1321/* called with rpc->crpc_lock held */
1322void
1323srpc_abort_rpc(srpc_client_rpc_t *rpc, int why)
1324{
1325        LASSERT(why != 0);
1326
1327        if (rpc->crpc_aborted || /* already aborted */
1328            rpc->crpc_closed)    /* callback imminent */
1329                return;
1330
1331        CDEBUG(D_NET,
1332                "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1333                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1334                swi_state2str(rpc->crpc_wi.swi_state), why);
1335
1336        rpc->crpc_aborted = 1;
1337        rpc->crpc_status  = why;
1338        swi_schedule_workitem(&rpc->crpc_wi);
1339        return;
1340}
1341
1342/* called with rpc->crpc_lock held */
1343void
1344srpc_post_rpc(srpc_client_rpc_t *rpc)
1345{
1346        LASSERT(!rpc->crpc_aborted);
1347        LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
1348
1349        CDEBUG(D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1350                libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1351                rpc->crpc_timeout);
1352
1353        srpc_add_client_rpc_timer(rpc);
1354        swi_schedule_workitem(&rpc->crpc_wi);
1355        return;
1356}
1357
1358
1359int
1360srpc_send_reply(struct srpc_server_rpc *rpc)
1361{
1362        srpc_event_t *ev = &rpc->srpc_ev;
1363        struct srpc_msg *msg = &rpc->srpc_replymsg;
1364        struct srpc_buffer *buffer = rpc->srpc_reqstbuf;
1365        struct srpc_service_cd *scd = rpc->srpc_scd;
1366        struct srpc_service *sv = scd->scd_svc;
1367        __u64 rpyid;
1368        int rc;
1369
1370        LASSERT(buffer != NULL);
1371        rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1372
1373        spin_lock(&scd->scd_lock);
1374
1375        if (!sv->sv_shuttingdown && !srpc_serv_is_framework(sv)) {
1376                /* Repost buffer before replying since test client
1377                 * might send me another RPC once it gets the reply */
1378                if (srpc_service_post_buffer(scd, buffer) != 0)
1379                        CWARN("Failed to repost %s buffer\n", sv->sv_name);
1380                rpc->srpc_reqstbuf = NULL;
1381        }
1382
1383        spin_unlock(&scd->scd_lock);
1384
1385        ev->ev_fired = 0;
1386        ev->ev_data  = rpc;
1387        ev->ev_type  = SRPC_REPLY_SENT;
1388
1389        msg->msg_magic   = SRPC_MSG_MAGIC;
1390        msg->msg_version = SRPC_MSG_VERSION;
1391        msg->msg_type    = srpc_service2reply(sv->sv_id);
1392
1393        rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1394                                   sizeof(*msg), LNET_MD_OP_PUT,
1395                                   rpc->srpc_peer, rpc->srpc_self,
1396                                   &rpc->srpc_replymdh, ev);
1397        if (rc != 0)
1398                ev->ev_fired = 1;  /* no more event expected */
1399        return rc;
1400}
1401
1402/* when in kernel always called with LNET_LOCK() held, and in thread context */
1403static void
1404srpc_lnet_ev_handler(lnet_event_t *ev)
1405{
1406        struct srpc_service_cd *scd;
1407        srpc_event_t *rpcev = ev->md.user_ptr;
1408        srpc_client_rpc_t *crpc;
1409        srpc_server_rpc_t *srpc;
1410        srpc_buffer_t *buffer;
1411        srpc_service_t *sv;
1412        srpc_msg_t *msg;
1413        srpc_msg_type_t type;
1414
1415        LASSERT(!in_interrupt());
1416
1417        if (ev->status != 0) {
1418                spin_lock(&srpc_data.rpc_glock);
1419                srpc_data.rpc_counters.errors++;
1420                spin_unlock(&srpc_data.rpc_glock);
1421        }
1422
1423        rpcev->ev_lnet = ev->type;
1424
1425        switch (rpcev->ev_type) {
1426        default:
1427                CERROR("Unknown event: status %d, type %d, lnet %d\n",
1428                       rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1429                LBUG();
1430        case SRPC_REQUEST_SENT:
1431                if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1432                        spin_lock(&srpc_data.rpc_glock);
1433                        srpc_data.rpc_counters.rpcs_sent++;
1434                        spin_unlock(&srpc_data.rpc_glock);
1435                }
1436        case SRPC_REPLY_RCVD:
1437        case SRPC_BULK_REQ_RCVD:
1438                crpc = rpcev->ev_data;
1439
1440                if (rpcev != &crpc->crpc_reqstev &&
1441                    rpcev != &crpc->crpc_replyev &&
1442                    rpcev != &crpc->crpc_bulkev) {
1443                        CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
1444                               rpcev, crpc, &crpc->crpc_reqstev,
1445                               &crpc->crpc_replyev, &crpc->crpc_bulkev);
1446                        CERROR("Bad event: status %d, type %d, lnet %d\n",
1447                               rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1448                        LBUG();
1449                }
1450
1451                spin_lock(&crpc->crpc_lock);
1452
1453                LASSERT(rpcev->ev_fired == 0);
1454                rpcev->ev_fired  = 1;
1455                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1456                                                -EINTR : ev->status;
1457                swi_schedule_workitem(&crpc->crpc_wi);
1458
1459                spin_unlock(&crpc->crpc_lock);
1460                break;
1461
1462        case SRPC_REQUEST_RCVD:
1463                scd = rpcev->ev_data;
1464                sv = scd->scd_svc;
1465
1466                LASSERT(rpcev == &scd->scd_ev);
1467
1468                spin_lock(&scd->scd_lock);
1469
1470                LASSERT(ev->unlinked);
1471                LASSERT(ev->type == LNET_EVENT_PUT ||
1472                         ev->type == LNET_EVENT_UNLINK);
1473                LASSERT(ev->type != LNET_EVENT_UNLINK ||
1474                         sv->sv_shuttingdown);
1475
1476                buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg);
1477                buffer->buf_peer = ev->initiator;
1478                buffer->buf_self = ev->target.nid;
1479
1480                LASSERT(scd->scd_buf_nposted > 0);
1481                scd->scd_buf_nposted--;
1482
1483                if (sv->sv_shuttingdown) {
1484                        /* Leave buffer on scd->scd_buf_nposted since
1485                         * srpc_finish_service needs to traverse it. */
1486                        spin_unlock(&scd->scd_lock);
1487                        break;
1488                }
1489
1490                if (scd->scd_buf_err_stamp != 0 &&
1491                    scd->scd_buf_err_stamp < get_seconds()) {
1492                        /* re-enable adding buffer */
1493                        scd->scd_buf_err_stamp = 0;
1494                        scd->scd_buf_err = 0;
1495                }
1496
1497                if (scd->scd_buf_err == 0 && /* adding buffer is enabled */
1498                    scd->scd_buf_adjust == 0 &&
1499                    scd->scd_buf_nposted < scd->scd_buf_low) {
1500                        scd->scd_buf_adjust = max(scd->scd_buf_total / 2,
1501                                                  SFW_TEST_WI_MIN);
1502                        swi_schedule_workitem(&scd->scd_buf_wi);
1503                }
1504
1505                list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
1506                msg = &buffer->buf_msg;
1507                type = srpc_service2request(sv->sv_id);
1508
1509                if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
1510                    (msg->msg_type != type &&
1511                     msg->msg_type != __swab32(type)) ||
1512                    (msg->msg_magic != SRPC_MSG_MAGIC &&
1513                     msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1514                        CERROR("Dropping RPC (%s) from %s: status %d mlength %d type %u magic %u.\n",
1515                               sv->sv_name, libcfs_id2str(ev->initiator),
1516                               ev->status, ev->mlength,
1517                               msg->msg_type, msg->msg_magic);
1518
1519                        /* NB can't call srpc_service_recycle_buffer here since
1520                         * it may call LNetM[DE]Attach. The invalid magic tells
1521                         * srpc_handle_rpc to drop this RPC */
1522                        msg->msg_magic = 0;
1523                }
1524
1525                if (!list_empty(&scd->scd_rpc_free)) {
1526                        srpc = list_entry(scd->scd_rpc_free.next,
1527                                              struct srpc_server_rpc,
1528                                              srpc_list);
1529                        list_del(&srpc->srpc_list);
1530
1531                        srpc_init_server_rpc(srpc, scd, buffer);
1532                        list_add_tail(&srpc->srpc_list,
1533                                          &scd->scd_rpc_active);
1534                        swi_schedule_workitem(&srpc->srpc_wi);
1535                } else {
1536                        list_add_tail(&buffer->buf_list,
1537                                          &scd->scd_buf_blocked);
1538                }
1539
1540                spin_unlock(&scd->scd_lock);
1541
1542                spin_lock(&srpc_data.rpc_glock);
1543                srpc_data.rpc_counters.rpcs_rcvd++;
1544                spin_unlock(&srpc_data.rpc_glock);
1545                break;
1546
1547        case SRPC_BULK_GET_RPLD:
1548                LASSERT(ev->type == LNET_EVENT_SEND ||
1549                         ev->type == LNET_EVENT_REPLY ||
1550                         ev->type == LNET_EVENT_UNLINK);
1551
1552                if (!ev->unlinked)
1553                        break; /* wait for final event */
1554
1555        case SRPC_BULK_PUT_SENT:
1556                if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1557                        spin_lock(&srpc_data.rpc_glock);
1558
1559                        if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1560                                srpc_data.rpc_counters.bulk_get += ev->mlength;
1561                        else
1562                                srpc_data.rpc_counters.bulk_put += ev->mlength;
1563
1564                        spin_unlock(&srpc_data.rpc_glock);
1565                }
1566        case SRPC_REPLY_SENT:
1567                srpc = rpcev->ev_data;
1568                scd  = srpc->srpc_scd;
1569
1570                LASSERT(rpcev == &srpc->srpc_ev);
1571
1572                spin_lock(&scd->scd_lock);
1573
1574                rpcev->ev_fired  = 1;
1575                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1576                                   -EINTR : ev->status;
1577                swi_schedule_workitem(&srpc->srpc_wi);
1578
1579                spin_unlock(&scd->scd_lock);
1580                break;
1581        }
1582}
1583
1584
1585int
1586srpc_startup(void)
1587{
1588        int rc;
1589
1590        memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1591        spin_lock_init(&srpc_data.rpc_glock);
1592
1593        /* 1 second pause to avoid timestamp reuse */
1594        set_current_state(TASK_UNINTERRUPTIBLE);
1595        schedule_timeout(cfs_time_seconds(1));
1596        srpc_data.rpc_matchbits = ((__u64) get_seconds()) << 48;
1597
1598        srpc_data.rpc_state = SRPC_STATE_NONE;
1599
1600        rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1601        if (rc < 0) {
1602                CERROR("LNetNIInit() has failed: %d\n", rc);
1603                return rc;
1604        }
1605
1606        srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1607
1608        LNetInvalidateHandle(&srpc_data.rpc_lnet_eq);
1609        rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1610        if (rc != 0) {
1611                CERROR("LNetEQAlloc() has failed: %d\n", rc);
1612                goto bail;
1613        }
1614
1615        rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1616        LASSERT(rc == 0);
1617        rc = LNetSetLazyPortal(SRPC_REQUEST_PORTAL);
1618        LASSERT(rc == 0);
1619
1620        srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1621
1622        rc = stt_startup();
1623
1624bail:
1625        if (rc != 0)
1626                srpc_shutdown();
1627        else
1628                srpc_data.rpc_state = SRPC_STATE_RUNNING;
1629
1630        return rc;
1631}
1632
1633void
1634srpc_shutdown(void)
1635{
1636        int i;
1637        int rc;
1638        int state;
1639
1640        state = srpc_data.rpc_state;
1641        srpc_data.rpc_state = SRPC_STATE_STOPPING;
1642
1643        switch (state) {
1644        default:
1645                LBUG();
1646        case SRPC_STATE_RUNNING:
1647                spin_lock(&srpc_data.rpc_glock);
1648
1649                for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1650                        srpc_service_t *sv = srpc_data.rpc_services[i];
1651
1652                        LASSERTF(sv == NULL,
1653                                  "service not empty: id %d, name %s\n",
1654                                  i, sv->sv_name);
1655                }
1656
1657                spin_unlock(&srpc_data.rpc_glock);
1658
1659                stt_shutdown();
1660
1661        case SRPC_STATE_EQ_INIT:
1662                rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1663                rc = LNetClearLazyPortal(SRPC_REQUEST_PORTAL);
1664                LASSERT(rc == 0);
1665                rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1666                LASSERT(rc == 0); /* the EQ should have no user by now */
1667
1668        case SRPC_STATE_NI_INIT:
1669                LNetNIFini();
1670        }
1671
1672        return;
1673}
1674