linux/drivers/staging/lustre/lnet/selftest/rpc.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * GPL HEADER START
   4 *
   5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   6 *
   7 * This program is free software; you can redistribute it and/or modify
   8 * it under the terms of the GNU General Public License version 2 only,
   9 * as published by the Free Software Foundation.
  10 *
  11 * This program is distributed in the hope that it will be useful, but
  12 * WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License version 2 for more details (a copy is included
  15 * in the LICENSE file that accompanied this code).
  16 *
  17 * You should have received a copy of the GNU General Public License
  18 * version 2 along with this program; If not, see
  19 * http://www.gnu.org/licenses/gpl-2.0.html
  20 *
  21 * GPL HEADER END
  22 */
  23/*
  24 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  25 * Use is subject to license terms.
  26 *
  27 * Copyright (c) 2012, 2015, Intel Corporation.
  28 */
  29/*
  30 * This file is part of Lustre, http://www.lustre.org/
  31 * Lustre is a trademark of Sun Microsystems, Inc.
  32 *
  33 * lnet/selftest/rpc.c
  34 *
  35 * Author: Isaac Huang <isaac@clusterfs.com>
  36 *
  37 * 2012-05-13: Liang Zhen <liang@whamcloud.com>
  38 * - percpt data for service to improve smp performance
  39 * - code cleanup
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_LNET
  43
  44#include "selftest.h"
  45
  46enum srpc_state {
  47        SRPC_STATE_NONE,
  48        SRPC_STATE_NI_INIT,
  49        SRPC_STATE_EQ_INIT,
  50        SRPC_STATE_RUNNING,
  51        SRPC_STATE_STOPPING,
  52};
  53
  54static struct smoketest_rpc {
  55        spinlock_t       rpc_glock;     /* global lock */
  56        struct srpc_service     *rpc_services[SRPC_SERVICE_MAX_ID + 1];
  57        struct lnet_handle_eq    rpc_lnet_eq;   /* _the_ LNet event queue */
  58        enum srpc_state  rpc_state;
  59        struct srpc_counters     rpc_counters;
  60        __u64            rpc_matchbits; /* matchbits counter */
  61} srpc_data;
  62
  63static inline int
  64srpc_serv_portal(int svc_id)
  65{
  66        return svc_id < SRPC_FRAMEWORK_SERVICE_MAX_ID ?
  67               SRPC_FRAMEWORK_REQUEST_PORTAL : SRPC_REQUEST_PORTAL;
  68}
  69
  70/* forward ref's */
  71void srpc_handle_rpc(struct swi_workitem *wi);
  72
  73void srpc_get_counters(struct srpc_counters *cnt)
  74{
  75        spin_lock(&srpc_data.rpc_glock);
  76        *cnt = srpc_data.rpc_counters;
  77        spin_unlock(&srpc_data.rpc_glock);
  78}
  79
  80void srpc_set_counters(const struct srpc_counters *cnt)
  81{
  82        spin_lock(&srpc_data.rpc_glock);
  83        srpc_data.rpc_counters = *cnt;
  84        spin_unlock(&srpc_data.rpc_glock);
  85}
  86
  87static int
  88srpc_add_bulk_page(struct srpc_bulk *bk, struct page *pg, int i, int off,
  89                   int nob)
  90{
  91        LASSERT(off < PAGE_SIZE);
  92        LASSERT(nob > 0 && nob <= PAGE_SIZE);
  93
  94        bk->bk_iovs[i].bv_offset = off;
  95        bk->bk_iovs[i].bv_page = pg;
  96        bk->bk_iovs[i].bv_len = nob;
  97        return nob;
  98}
  99
 100void
 101srpc_free_bulk(struct srpc_bulk *bk)
 102{
 103        int i;
 104        struct page *pg;
 105
 106        LASSERT(bk);
 107
 108        for (i = 0; i < bk->bk_niov; i++) {
 109                pg = bk->bk_iovs[i].bv_page;
 110                if (!pg)
 111                        break;
 112
 113                __free_page(pg);
 114        }
 115
 116        kfree(bk);
 117}
 118
 119struct srpc_bulk *
 120srpc_alloc_bulk(int cpt, unsigned int bulk_off, unsigned int bulk_npg,
 121                unsigned int bulk_len, int sink)
 122{
 123        struct srpc_bulk *bk;
 124        int i;
 125
 126        LASSERT(bulk_npg > 0 && bulk_npg <= LNET_MAX_IOV);
 127
 128        bk = kzalloc_cpt(offsetof(struct srpc_bulk, bk_iovs[bulk_npg]),
 129                         GFP_KERNEL, cpt);
 130        if (!bk) {
 131                CERROR("Can't allocate descriptor for %d pages\n", bulk_npg);
 132                return NULL;
 133        }
 134
 135        memset(bk, 0, offsetof(struct srpc_bulk, bk_iovs[bulk_npg]));
 136        bk->bk_sink = sink;
 137        bk->bk_len = bulk_len;
 138        bk->bk_niov = bulk_npg;
 139
 140        for (i = 0; i < bulk_npg; i++) {
 141                struct page *pg;
 142                int nob;
 143
 144                pg = alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
 145                                      GFP_KERNEL, 0);
 146                if (!pg) {
 147                        CERROR("Can't allocate page %d of %d\n", i, bulk_npg);
 148                        srpc_free_bulk(bk);
 149                        return NULL;
 150                }
 151
 152                nob = min_t(unsigned int, bulk_off + bulk_len, PAGE_SIZE) -
 153                      bulk_off;
 154                srpc_add_bulk_page(bk, pg, i, bulk_off, nob);
 155                bulk_len -= nob;
 156                bulk_off = 0;
 157        }
 158
 159        return bk;
 160}
 161
 162static inline __u64
 163srpc_next_id(void)
 164{
 165        __u64 id;
 166
 167        spin_lock(&srpc_data.rpc_glock);
 168        id = srpc_data.rpc_matchbits++;
 169        spin_unlock(&srpc_data.rpc_glock);
 170        return id;
 171}
 172
 173static void
 174srpc_init_server_rpc(struct srpc_server_rpc *rpc,
 175                     struct srpc_service_cd *scd,
 176                     struct srpc_buffer *buffer)
 177{
 178        memset(rpc, 0, sizeof(*rpc));
 179        swi_init_workitem(&rpc->srpc_wi, srpc_handle_rpc,
 180                          srpc_serv_is_framework(scd->scd_svc) ?
 181                          lst_serial_wq : lst_test_wq[scd->scd_cpt]);
 182
 183        rpc->srpc_ev.ev_fired = 1; /* no event expected now */
 184
 185        rpc->srpc_scd = scd;
 186        rpc->srpc_reqstbuf = buffer;
 187        rpc->srpc_peer = buffer->buf_peer;
 188        rpc->srpc_self = buffer->buf_self;
 189        LNetInvalidateMDHandle(&rpc->srpc_replymdh);
 190}
 191
 192static void
 193srpc_service_fini(struct srpc_service *svc)
 194{
 195        struct srpc_service_cd *scd;
 196        struct srpc_server_rpc *rpc;
 197        struct srpc_buffer *buf;
 198        struct list_head *q;
 199        int i;
 200
 201        if (!svc->sv_cpt_data)
 202                return;
 203
 204        cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
 205                while (1) {
 206                        if (!list_empty(&scd->scd_buf_posted))
 207                                q = &scd->scd_buf_posted;
 208                        else if (!list_empty(&scd->scd_buf_blocked))
 209                                q = &scd->scd_buf_blocked;
 210                        else
 211                                break;
 212
 213                        while (!list_empty(q)) {
 214                                buf = list_entry(q->next, struct srpc_buffer,
 215                                                 buf_list);
 216                                list_del(&buf->buf_list);
 217                                kfree(buf);
 218                        }
 219                }
 220
 221                LASSERT(list_empty(&scd->scd_rpc_active));
 222
 223                while (!list_empty(&scd->scd_rpc_free)) {
 224                        rpc = list_entry(scd->scd_rpc_free.next,
 225                                         struct srpc_server_rpc,
 226                                         srpc_list);
 227                        list_del(&rpc->srpc_list);
 228                        kfree(rpc);
 229                }
 230        }
 231
 232        cfs_percpt_free(svc->sv_cpt_data);
 233        svc->sv_cpt_data = NULL;
 234}
 235
 236static int
 237srpc_service_nrpcs(struct srpc_service *svc)
 238{
 239        int nrpcs = svc->sv_wi_total / svc->sv_ncpts;
 240
 241        return srpc_serv_is_framework(svc) ?
 242               max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
 243}
 244
 245void srpc_add_buffer(struct swi_workitem *wi);
 246
 247static int
 248srpc_service_init(struct srpc_service *svc)
 249{
 250        struct srpc_service_cd *scd;
 251        struct srpc_server_rpc *rpc;
 252        int nrpcs;
 253        int i;
 254        int j;
 255
 256        svc->sv_shuttingdown = 0;
 257
 258        svc->sv_cpt_data = cfs_percpt_alloc(lnet_cpt_table(),
 259                                            sizeof(**svc->sv_cpt_data));
 260        if (!svc->sv_cpt_data)
 261                return -ENOMEM;
 262
 263        svc->sv_ncpts = srpc_serv_is_framework(svc) ?
 264                        1 : cfs_cpt_number(lnet_cpt_table());
 265        nrpcs = srpc_service_nrpcs(svc);
 266
 267        cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
 268                scd->scd_cpt = i;
 269                scd->scd_svc = svc;
 270                spin_lock_init(&scd->scd_lock);
 271                INIT_LIST_HEAD(&scd->scd_rpc_free);
 272                INIT_LIST_HEAD(&scd->scd_rpc_active);
 273                INIT_LIST_HEAD(&scd->scd_buf_posted);
 274                INIT_LIST_HEAD(&scd->scd_buf_blocked);
 275
 276                scd->scd_ev.ev_data = scd;
 277                scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
 278
 279                /*
 280                 * NB: don't use lst_serial_wq for adding buffer,
 281                 * see details in srpc_service_add_buffers()
 282                 */
 283                swi_init_workitem(&scd->scd_buf_wi,
 284                                  srpc_add_buffer, lst_test_wq[i]);
 285
 286                if (i && srpc_serv_is_framework(svc)) {
 287                        /*
 288                         * NB: framework service only needs srpc_service_cd for
 289                         * one partition, but we allocate for all to make
 290                         * it easier to implement, it will waste a little
 291                         * memory but nobody should care about this
 292                         */
 293                        continue;
 294                }
 295
 296                for (j = 0; j < nrpcs; j++) {
 297                        rpc = kzalloc_cpt(sizeof(*rpc), GFP_NOFS, i);
 298                        if (!rpc) {
 299                                srpc_service_fini(svc);
 300                                return -ENOMEM;
 301                        }
 302                        list_add(&rpc->srpc_list, &scd->scd_rpc_free);
 303                }
 304        }
 305
 306        return 0;
 307}
 308
 309int
 310srpc_add_service(struct srpc_service *sv)
 311{
 312        int id = sv->sv_id;
 313
 314        LASSERT(0 <= id && id <= SRPC_SERVICE_MAX_ID);
 315
 316        if (srpc_service_init(sv))
 317                return -ENOMEM;
 318
 319        spin_lock(&srpc_data.rpc_glock);
 320
 321        LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
 322
 323        if (srpc_data.rpc_services[id]) {
 324                spin_unlock(&srpc_data.rpc_glock);
 325                goto failed;
 326        }
 327
 328        srpc_data.rpc_services[id] = sv;
 329        spin_unlock(&srpc_data.rpc_glock);
 330
 331        CDEBUG(D_NET, "Adding service: id %d, name %s\n", id, sv->sv_name);
 332        return 0;
 333
 334 failed:
 335        srpc_service_fini(sv);
 336        return -EBUSY;
 337}
 338
 339int
 340srpc_remove_service(struct srpc_service *sv)
 341{
 342        int id = sv->sv_id;
 343
 344        spin_lock(&srpc_data.rpc_glock);
 345
 346        if (srpc_data.rpc_services[id] != sv) {
 347                spin_unlock(&srpc_data.rpc_glock);
 348                return -ENOENT;
 349        }
 350
 351        srpc_data.rpc_services[id] = NULL;
 352        spin_unlock(&srpc_data.rpc_glock);
 353        return 0;
 354}
 355
 356static int
 357srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf,
 358                       int len, int options, struct lnet_process_id peer,
 359                       struct lnet_handle_md *mdh, struct srpc_event *ev)
 360{
 361        int rc;
 362        struct lnet_md md;
 363        struct lnet_handle_me meh;
 364
 365        rc = LNetMEAttach(portal, peer, matchbits, 0, LNET_UNLINK,
 366                          local ? LNET_INS_LOCAL : LNET_INS_AFTER, &meh);
 367        if (rc) {
 368                CERROR("LNetMEAttach failed: %d\n", rc);
 369                LASSERT(rc == -ENOMEM);
 370                return -ENOMEM;
 371        }
 372
 373        md.threshold = 1;
 374        md.user_ptr = ev;
 375        md.start = buf;
 376        md.length = len;
 377        md.options = options;
 378        md.eq_handle = srpc_data.rpc_lnet_eq;
 379
 380        rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
 381        if (rc) {
 382                CERROR("LNetMDAttach failed: %d\n", rc);
 383                LASSERT(rc == -ENOMEM);
 384
 385                rc = LNetMEUnlink(meh);
 386                LASSERT(!rc);
 387                return -ENOMEM;
 388        }
 389
 390        CDEBUG(D_NET, "Posted passive RDMA: peer %s, portal %d, matchbits %#llx\n",
 391               libcfs_id2str(peer), portal, matchbits);
 392        return 0;
 393}
 394
 395static int
 396srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
 397                      int options, struct lnet_process_id peer,
 398                      lnet_nid_t self, struct lnet_handle_md *mdh,
 399                      struct srpc_event *ev)
 400{
 401        int rc;
 402        struct lnet_md md;
 403
 404        md.user_ptr = ev;
 405        md.start = buf;
 406        md.length = len;
 407        md.eq_handle = srpc_data.rpc_lnet_eq;
 408        md.threshold = options & LNET_MD_OP_GET ? 2 : 1;
 409        md.options = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
 410
 411        rc = LNetMDBind(md, LNET_UNLINK, mdh);
 412        if (rc) {
 413                CERROR("LNetMDBind failed: %d\n", rc);
 414                LASSERT(rc == -ENOMEM);
 415                return -ENOMEM;
 416        }
 417
 418        /*
 419         * this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
 420         * they're only meaningful for MDs attached to an ME (i.e. passive
 421         * buffers...
 422         */
 423        if (options & LNET_MD_OP_PUT) {
 424                rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
 425                             portal, matchbits, 0, 0);
 426        } else {
 427                LASSERT(options & LNET_MD_OP_GET);
 428
 429                rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
 430        }
 431
 432        if (rc) {
 433                CERROR("LNet%s(%s, %d, %lld) failed: %d\n",
 434                       options & LNET_MD_OP_PUT ? "Put" : "Get",
 435                       libcfs_id2str(peer), portal, matchbits, rc);
 436
 437                /*
 438                 * The forthcoming unlink event will complete this operation
 439                 * with failure, so fall through and return success here.
 440                 */
 441                rc = LNetMDUnlink(*mdh);
 442                LASSERT(!rc);
 443        } else {
 444                CDEBUG(D_NET, "Posted active RDMA: peer %s, portal %u, matchbits %#llx\n",
 445                       libcfs_id2str(peer), portal, matchbits);
 446        }
 447        return 0;
 448}
 449
 450static int
 451srpc_post_passive_rqtbuf(int service, int local, void *buf, int len,
 452                         struct lnet_handle_md *mdh, struct srpc_event *ev)
 453{
 454        struct lnet_process_id any = { 0 };
 455
 456        any.nid = LNET_NID_ANY;
 457        any.pid = LNET_PID_ANY;
 458
 459        return srpc_post_passive_rdma(srpc_serv_portal(service),
 460                                      local, service, buf, len,
 461                                      LNET_MD_OP_PUT, any, mdh, ev);
 462}
 463
 464static int
 465srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
 466__must_hold(&scd->scd_lock)
 467{
 468        struct srpc_service *sv = scd->scd_svc;
 469        struct srpc_msg *msg = &buf->buf_msg;
 470        int rc;
 471
 472        LNetInvalidateMDHandle(&buf->buf_mdh);
 473        list_add(&buf->buf_list, &scd->scd_buf_posted);
 474        scd->scd_buf_nposted++;
 475        spin_unlock(&scd->scd_lock);
 476
 477        rc = srpc_post_passive_rqtbuf(sv->sv_id,
 478                                      !srpc_serv_is_framework(sv),
 479                                      msg, sizeof(*msg), &buf->buf_mdh,
 480                                      &scd->scd_ev);
 481
 482        /*
 483         * At this point, a RPC (new or delayed) may have arrived in
 484         * msg and its event handler has been called. So we must add
 485         * buf to scd_buf_posted _before_ dropping scd_lock
 486         */
 487        spin_lock(&scd->scd_lock);
 488
 489        if (!rc) {
 490                if (!sv->sv_shuttingdown)
 491                        return 0;
 492
 493                spin_unlock(&scd->scd_lock);
 494                /*
 495                 * srpc_shutdown_service might have tried to unlink me
 496                 * when my buf_mdh was still invalid
 497                 */
 498                LNetMDUnlink(buf->buf_mdh);
 499                spin_lock(&scd->scd_lock);
 500                return 0;
 501        }
 502
 503        scd->scd_buf_nposted--;
 504        if (sv->sv_shuttingdown)
 505                return rc; /* don't allow to change scd_buf_posted */
 506
 507        list_del(&buf->buf_list);
 508        spin_unlock(&scd->scd_lock);
 509
 510        kfree(buf);
 511
 512        spin_lock(&scd->scd_lock);
 513        return rc;
 514}
 515
 516void
 517srpc_add_buffer(struct swi_workitem *wi)
 518{
 519        struct srpc_service_cd *scd = container_of(wi, struct srpc_service_cd, scd_buf_wi);
 520        struct srpc_buffer *buf;
 521        int rc = 0;
 522
 523        /*
 524         * it's called by workitem scheduler threads, these threads
 525         * should have been set CPT affinity, so buffers will be posted
 526         * on CPT local list of Portal
 527         */
 528        spin_lock(&scd->scd_lock);
 529
 530        while (scd->scd_buf_adjust > 0 &&
 531               !scd->scd_svc->sv_shuttingdown) {
 532                scd->scd_buf_adjust--; /* consume it */
 533                scd->scd_buf_posting++;
 534
 535                spin_unlock(&scd->scd_lock);
 536
 537                buf = kzalloc(sizeof(*buf), GFP_NOFS);
 538                if (!buf) {
 539                        CERROR("Failed to add new buf to service: %s\n",
 540                               scd->scd_svc->sv_name);
 541                        spin_lock(&scd->scd_lock);
 542                        rc = -ENOMEM;
 543                        break;
 544                }
 545
 546                spin_lock(&scd->scd_lock);
 547                if (scd->scd_svc->sv_shuttingdown) {
 548                        spin_unlock(&scd->scd_lock);
 549                        kfree(buf);
 550
 551                        spin_lock(&scd->scd_lock);
 552                        rc = -ESHUTDOWN;
 553                        break;
 554                }
 555
 556                rc = srpc_service_post_buffer(scd, buf);
 557                if (rc)
 558                        break; /* buf has been freed inside */
 559
 560                LASSERT(scd->scd_buf_posting > 0);
 561                scd->scd_buf_posting--;
 562                scd->scd_buf_total++;
 563                scd->scd_buf_low = max(2, scd->scd_buf_total / 4);
 564        }
 565
 566        if (rc) {
 567                scd->scd_buf_err_stamp = ktime_get_real_seconds();
 568                scd->scd_buf_err = rc;
 569
 570                LASSERT(scd->scd_buf_posting > 0);
 571                scd->scd_buf_posting--;
 572        }
 573
 574        spin_unlock(&scd->scd_lock);
 575}
 576
 577int
 578srpc_service_add_buffers(struct srpc_service *sv, int nbuffer)
 579{
 580        struct srpc_service_cd *scd;
 581        int rc = 0;
 582        int i;
 583
 584        LASSERTF(nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer);
 585
 586        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 587                spin_lock(&scd->scd_lock);
 588
 589                scd->scd_buf_err = 0;
 590                scd->scd_buf_err_stamp = 0;
 591                scd->scd_buf_posting = 0;
 592                scd->scd_buf_adjust = nbuffer;
 593                /* start to post buffers */
 594                swi_schedule_workitem(&scd->scd_buf_wi);
 595                spin_unlock(&scd->scd_lock);
 596
 597                /* framework service only post buffer for one partition  */
 598                if (srpc_serv_is_framework(sv))
 599                        break;
 600        }
 601
 602        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 603                spin_lock(&scd->scd_lock);
 604                /*
 605                 * NB: srpc_service_add_buffers() can be called inside
 606                 * thread context of lst_serial_wq, and we don't normally
 607                 * allow to sleep inside thread context of WI scheduler
 608                 * because it will block current scheduler thread from doing
 609                 * anything else, even worse, it could deadlock if it's
 610                 * waiting on result from another WI of the same scheduler.
 611                 * However, it's safe at here because scd_buf_wi is scheduled
 612                 * by thread in a different WI scheduler (lst_test_wq),
 613                 * so we don't have any risk of deadlock, though this could
 614                 * block all WIs pending on lst_serial_wq for a moment
 615                 * which is not good but not fatal.
 616                 */
 617                lst_wait_until(scd->scd_buf_err ||
 618                               (!scd->scd_buf_adjust &&
 619                                !scd->scd_buf_posting),
 620                               scd->scd_lock, "waiting for adding buffer\n");
 621
 622                if (scd->scd_buf_err && !rc)
 623                        rc = scd->scd_buf_err;
 624
 625                spin_unlock(&scd->scd_lock);
 626        }
 627
 628        return rc;
 629}
 630
 631void
 632srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer)
 633{
 634        struct srpc_service_cd *scd;
 635        int num;
 636        int i;
 637
 638        LASSERT(!sv->sv_shuttingdown);
 639
 640        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 641                spin_lock(&scd->scd_lock);
 642
 643                num = scd->scd_buf_total + scd->scd_buf_posting;
 644                scd->scd_buf_adjust -= min(nbuffer, num);
 645
 646                spin_unlock(&scd->scd_lock);
 647        }
 648}
 649
 650/* returns 1 if sv has finished, otherwise 0 */
 651int
 652srpc_finish_service(struct srpc_service *sv)
 653{
 654        struct srpc_service_cd *scd;
 655        struct srpc_server_rpc *rpc;
 656        int i;
 657
 658        LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
 659
 660        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 661                swi_cancel_workitem(&scd->scd_buf_wi);
 662
 663                spin_lock(&scd->scd_lock);
 664
 665                if (scd->scd_buf_nposted > 0) {
 666                        CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n",
 667                               scd->scd_buf_nposted);
 668                        spin_unlock(&scd->scd_lock);
 669                        return 0;
 670                }
 671
 672                if (list_empty(&scd->scd_rpc_active)) {
 673                        spin_unlock(&scd->scd_lock);
 674                        continue;
 675                }
 676
 677                rpc = list_entry(scd->scd_rpc_active.next,
 678                                 struct srpc_server_rpc, srpc_list);
 679                CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s, ev fired %d type %d status %d lnet %d\n",
 680                        rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
 681                        swi_state2str(rpc->srpc_wi.swi_state),
 682                        rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
 683                        rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
 684                spin_unlock(&scd->scd_lock);
 685                return 0;
 686        }
 687
 688        /* no lock needed from now on */
 689        srpc_service_fini(sv);
 690        return 1;
 691}
 692
 693/* called with sv->sv_lock held */
 694static void
 695srpc_service_recycle_buffer(struct srpc_service_cd *scd,
 696                            struct srpc_buffer *buf)
 697__must_hold(&scd->scd_lock)
 698{
 699        if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) {
 700                if (srpc_service_post_buffer(scd, buf)) {
 701                        CWARN("Failed to post %s buffer\n",
 702                              scd->scd_svc->sv_name);
 703                }
 704                return;
 705        }
 706
 707        /* service is shutting down, or we want to recycle some buffers */
 708        scd->scd_buf_total--;
 709
 710        if (scd->scd_buf_adjust < 0) {
 711                scd->scd_buf_adjust++;
 712                if (scd->scd_buf_adjust < 0 &&
 713                    !scd->scd_buf_total && !scd->scd_buf_posting) {
 714                        CDEBUG(D_INFO,
 715                               "Try to recycle %d buffers but nothing left\n",
 716                               scd->scd_buf_adjust);
 717                        scd->scd_buf_adjust = 0;
 718                }
 719        }
 720
 721        spin_unlock(&scd->scd_lock);
 722        kfree(buf);
 723        spin_lock(&scd->scd_lock);
 724}
 725
 726void
 727srpc_abort_service(struct srpc_service *sv)
 728{
 729        struct srpc_service_cd *scd;
 730        struct srpc_server_rpc *rpc;
 731        int i;
 732
 733        CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
 734               sv->sv_id, sv->sv_name);
 735
 736        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 737                spin_lock(&scd->scd_lock);
 738
 739                /*
 740                 * schedule in-flight RPCs to notice the abort, NB:
 741                 * racing with incoming RPCs; complete fix should make test
 742                 * RPCs carry session ID in its headers
 743                 */
 744                list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
 745                        rpc->srpc_aborted = 1;
 746                        swi_schedule_workitem(&rpc->srpc_wi);
 747                }
 748
 749                spin_unlock(&scd->scd_lock);
 750        }
 751}
 752
 753void
 754srpc_shutdown_service(struct srpc_service *sv)
 755{
 756        struct srpc_service_cd *scd;
 757        struct srpc_server_rpc *rpc;
 758        struct srpc_buffer *buf;
 759        int i;
 760
 761        CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
 762               sv->sv_id, sv->sv_name);
 763
 764        cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
 765                spin_lock(&scd->scd_lock);
 766
 767        sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
 768
 769        cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
 770                spin_unlock(&scd->scd_lock);
 771
 772        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
 773                spin_lock(&scd->scd_lock);
 774
 775                /* schedule in-flight RPCs to notice the shutdown */
 776                list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
 777                        swi_schedule_workitem(&rpc->srpc_wi);
 778
 779                spin_unlock(&scd->scd_lock);
 780
 781                /*
 782                 * OK to traverse scd_buf_posted without lock, since no one
 783                 * touches scd_buf_posted now
 784                 */
 785                list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
 786                        LNetMDUnlink(buf->buf_mdh);
 787        }
 788}
 789
 790static int
 791srpc_send_request(struct srpc_client_rpc *rpc)
 792{
 793        struct srpc_event *ev = &rpc->crpc_reqstev;
 794        int rc;
 795
 796        ev->ev_fired = 0;
 797        ev->ev_data = rpc;
 798        ev->ev_type = SRPC_REQUEST_SENT;
 799
 800         rc = srpc_post_active_rdma(srpc_serv_portal(rpc->crpc_service),
 801                                    rpc->crpc_service, &rpc->crpc_reqstmsg,
 802                                    sizeof(struct srpc_msg), LNET_MD_OP_PUT,
 803                                    rpc->crpc_dest, LNET_NID_ANY,
 804                                    &rpc->crpc_reqstmdh, ev);
 805        if (rc) {
 806                LASSERT(rc == -ENOMEM);
 807                ev->ev_fired = 1;  /* no more event expected */
 808        }
 809        return rc;
 810}
 811
 812static int
 813srpc_prepare_reply(struct srpc_client_rpc *rpc)
 814{
 815        struct srpc_event *ev = &rpc->crpc_replyev;
 816        __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
 817        int rc;
 818
 819        ev->ev_fired = 0;
 820        ev->ev_data = rpc;
 821        ev->ev_type = SRPC_REPLY_RCVD;
 822
 823        *id = srpc_next_id();
 824
 825        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
 826                                    &rpc->crpc_replymsg,
 827                                    sizeof(struct srpc_msg),
 828                                    LNET_MD_OP_PUT, rpc->crpc_dest,
 829                                    &rpc->crpc_replymdh, ev);
 830        if (rc) {
 831                LASSERT(rc == -ENOMEM);
 832                ev->ev_fired = 1;  /* no more event expected */
 833        }
 834        return rc;
 835}
 836
 837static int
 838srpc_prepare_bulk(struct srpc_client_rpc *rpc)
 839{
 840        struct srpc_bulk *bk = &rpc->crpc_bulk;
 841        struct srpc_event *ev = &rpc->crpc_bulkev;
 842        __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
 843        int rc;
 844        int opt;
 845
 846        LASSERT(bk->bk_niov <= LNET_MAX_IOV);
 847
 848        if (!bk->bk_niov)
 849                return 0; /* nothing to do */
 850
 851        opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
 852        opt |= LNET_MD_KIOV;
 853
 854        ev->ev_fired = 0;
 855        ev->ev_data = rpc;
 856        ev->ev_type = SRPC_BULK_REQ_RCVD;
 857
 858        *id = srpc_next_id();
 859
 860        rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
 861                                    &bk->bk_iovs[0], bk->bk_niov, opt,
 862                                    rpc->crpc_dest, &bk->bk_mdh, ev);
 863        if (rc) {
 864                LASSERT(rc == -ENOMEM);
 865                ev->ev_fired = 1;  /* no more event expected */
 866        }
 867        return rc;
 868}
 869
 870static int
 871srpc_do_bulk(struct srpc_server_rpc *rpc)
 872{
 873        struct srpc_event *ev = &rpc->srpc_ev;
 874        struct srpc_bulk *bk = rpc->srpc_bulk;
 875        __u64 id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
 876        int rc;
 877        int opt;
 878
 879        LASSERT(bk);
 880
 881        opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
 882        opt |= LNET_MD_KIOV;
 883
 884        ev->ev_fired = 0;
 885        ev->ev_data = rpc;
 886        ev->ev_type = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
 887
 888        rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
 889                                   &bk->bk_iovs[0], bk->bk_niov, opt,
 890                                   rpc->srpc_peer, rpc->srpc_self,
 891                                   &bk->bk_mdh, ev);
 892        if (rc)
 893                ev->ev_fired = 1;  /* no more event expected */
 894        return rc;
 895}
 896
 897/* only called from srpc_handle_rpc */
 898static void
 899srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
 900{
 901        struct srpc_service_cd *scd = rpc->srpc_scd;
 902        struct srpc_service *sv = scd->scd_svc;
 903        struct srpc_buffer *buffer;
 904
 905        LASSERT(status || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
 906
 907        rpc->srpc_status = status;
 908
 909        CDEBUG_LIMIT(!status ? D_NET : D_NETERROR,
 910                     "Server RPC %p done: service %s, peer %s, status %s:%d\n",
 911                     rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
 912                     swi_state2str(rpc->srpc_wi.swi_state), status);
 913
 914        if (status) {
 915                spin_lock(&srpc_data.rpc_glock);
 916                srpc_data.rpc_counters.rpcs_dropped++;
 917                spin_unlock(&srpc_data.rpc_glock);
 918        }
 919
 920        if (rpc->srpc_done)
 921                (*rpc->srpc_done) (rpc);
 922        LASSERT(!rpc->srpc_bulk);
 923
 924        spin_lock(&scd->scd_lock);
 925
 926        if (rpc->srpc_reqstbuf) {
 927                /*
 928                 * NB might drop sv_lock in srpc_service_recycle_buffer, but
 929                 * sv won't go away for scd_rpc_active must not be empty
 930                 */
 931                srpc_service_recycle_buffer(scd, rpc->srpc_reqstbuf);
 932                rpc->srpc_reqstbuf = NULL;
 933        }
 934
 935        list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
 936
 937        /*
 938         * No one can schedule me now since:
 939         * - I'm not on scd_rpc_active.
 940         * - all LNet events have been fired.
 941         * Cancel pending schedules and prevent future schedule attempts:
 942         */
 943        LASSERT(rpc->srpc_ev.ev_fired);
 944
 945        if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
 946                buffer = list_entry(scd->scd_buf_blocked.next,
 947                                    struct srpc_buffer, buf_list);
 948                list_del(&buffer->buf_list);
 949
 950                srpc_init_server_rpc(rpc, scd, buffer);
 951                list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
 952                swi_schedule_workitem(&rpc->srpc_wi);
 953        } else {
 954                list_add(&rpc->srpc_list, &scd->scd_rpc_free);
 955        }
 956
 957        spin_unlock(&scd->scd_lock);
 958}
 959
 960/* handles an incoming RPC */
 961void
 962srpc_handle_rpc(struct swi_workitem *wi)
 963{
 964        struct srpc_server_rpc *rpc = container_of(wi, struct srpc_server_rpc, srpc_wi);
 965        struct srpc_service_cd *scd = rpc->srpc_scd;
 966        struct srpc_service *sv = scd->scd_svc;
 967        struct srpc_event *ev = &rpc->srpc_ev;
 968        int rc = 0;
 969
 970        LASSERT(wi == &rpc->srpc_wi);
 971
 972        spin_lock(&scd->scd_lock);
 973
 974        if (sv->sv_shuttingdown || rpc->srpc_aborted) {
 975                spin_unlock(&scd->scd_lock);
 976
 977                if (rpc->srpc_bulk)
 978                        LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
 979                LNetMDUnlink(rpc->srpc_replymdh);
 980
 981                if (ev->ev_fired) { /* no more event, OK to finish */
 982                        srpc_server_rpc_done(rpc, -ESHUTDOWN);
 983                }
 984                return;
 985        }
 986
 987        spin_unlock(&scd->scd_lock);
 988
 989        switch (wi->swi_state) {
 990        default:
 991                LBUG();
 992        case SWI_STATE_NEWBORN: {
 993                struct srpc_msg *msg;
 994                struct srpc_generic_reply *reply;
 995
 996                msg = &rpc->srpc_reqstbuf->buf_msg;
 997                reply = &rpc->srpc_replymsg.msg_body.reply;
 998
 999                if (!msg->msg_magic) {
1000                        /* moaned already in srpc_lnet_ev_handler */
1001                        srpc_server_rpc_done(rpc, EBADMSG);
1002                        return;
1003                }
1004
1005                srpc_unpack_msg_hdr(msg);
1006                if (msg->msg_version != SRPC_MSG_VERSION) {
1007                        CWARN("Version mismatch: %u, %u expected, from %s\n",
1008                              msg->msg_version, SRPC_MSG_VERSION,
1009                              libcfs_id2str(rpc->srpc_peer));
1010                        reply->status = EPROTO;
1011                        /* drop through and send reply */
1012                } else {
1013                        reply->status = 0;
1014                        rc = (*sv->sv_handler)(rpc);
1015                        LASSERT(!reply->status || !rpc->srpc_bulk);
1016                        if (rc) {
1017                                srpc_server_rpc_done(rpc, rc);
1018                                return;
1019                        }
1020                }
1021
1022                wi->swi_state = SWI_STATE_BULK_STARTED;
1023
1024                if (rpc->srpc_bulk) {
1025                        rc = srpc_do_bulk(rpc);
1026                        if (!rc)
1027                                return; /* wait for bulk */
1028
1029                        LASSERT(ev->ev_fired);
1030                        ev->ev_status = rc;
1031                }
1032        }
1033                /* fall through */
1034        case SWI_STATE_BULK_STARTED:
1035                LASSERT(!rpc->srpc_bulk || ev->ev_fired);
1036
1037                if (rpc->srpc_bulk) {
1038                        rc = ev->ev_status;
1039
1040                        if (sv->sv_bulk_ready)
1041                                rc = (*sv->sv_bulk_ready) (rpc, rc);
1042
1043                        if (rc) {
1044                                srpc_server_rpc_done(rpc, rc);
1045                                return;
1046                        }
1047                }
1048
1049                wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
1050                rc = srpc_send_reply(rpc);
1051                if (!rc)
1052                        return; /* wait for reply */
1053                srpc_server_rpc_done(rpc, rc);
1054                return;
1055
1056        case SWI_STATE_REPLY_SUBMITTED:
1057                if (!ev->ev_fired) {
1058                        CERROR("RPC %p: bulk %p, service %d\n",
1059                               rpc, rpc->srpc_bulk, sv->sv_id);
1060                        CERROR("Event: status %d, type %d, lnet %d\n",
1061                               ev->ev_status, ev->ev_type, ev->ev_lnet);
1062                        LASSERT(ev->ev_fired);
1063                }
1064
1065                wi->swi_state = SWI_STATE_DONE;
1066                srpc_server_rpc_done(rpc, ev->ev_status);
1067                return;
1068        }
1069}
1070
1071static void
1072srpc_client_rpc_expired(void *data)
1073{
1074        struct srpc_client_rpc *rpc = data;
1075
1076        CWARN("Client RPC expired: service %d, peer %s, timeout %d.\n",
1077              rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1078              rpc->crpc_timeout);
1079
1080        spin_lock(&rpc->crpc_lock);
1081
1082        rpc->crpc_timeout = 0;
1083        srpc_abort_rpc(rpc, -ETIMEDOUT);
1084
1085        spin_unlock(&rpc->crpc_lock);
1086
1087        spin_lock(&srpc_data.rpc_glock);
1088        srpc_data.rpc_counters.rpcs_expired++;
1089        spin_unlock(&srpc_data.rpc_glock);
1090}
1091
1092static void
1093srpc_add_client_rpc_timer(struct srpc_client_rpc *rpc)
1094{
1095        struct stt_timer *timer = &rpc->crpc_timer;
1096
1097        if (!rpc->crpc_timeout)
1098                return;
1099
1100        INIT_LIST_HEAD(&timer->stt_list);
1101        timer->stt_data = rpc;
1102        timer->stt_func = srpc_client_rpc_expired;
1103        timer->stt_expires = ktime_get_real_seconds() + rpc->crpc_timeout;
1104        stt_add_timer(timer);
1105}
1106
1107/*
1108 * Called with rpc->crpc_lock held.
1109 *
1110 * Upon exit the RPC expiry timer is not queued and the handler is not
1111 * running on any CPU.
1112 */
1113static void
1114srpc_del_client_rpc_timer(struct srpc_client_rpc *rpc)
1115{
1116        /* timer not planted or already exploded */
1117        if (!rpc->crpc_timeout)
1118                return;
1119
1120        /* timer successfully defused */
1121        if (stt_del_timer(&rpc->crpc_timer))
1122                return;
1123
1124        /* timer detonated, wait for it to explode */
1125        while (rpc->crpc_timeout) {
1126                spin_unlock(&rpc->crpc_lock);
1127
1128                schedule();
1129
1130                spin_lock(&rpc->crpc_lock);
1131        }
1132}
1133
1134static void
1135srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
1136{
1137        struct swi_workitem *wi = &rpc->crpc_wi;
1138
1139        LASSERT(status || wi->swi_state == SWI_STATE_DONE);
1140
1141        spin_lock(&rpc->crpc_lock);
1142
1143        rpc->crpc_closed = 1;
1144        if (!rpc->crpc_status)
1145                rpc->crpc_status = status;
1146
1147        srpc_del_client_rpc_timer(rpc);
1148
1149        CDEBUG_LIMIT(!status ? D_NET : D_NETERROR,
1150                     "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1151                     rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1152                     swi_state2str(wi->swi_state), rpc->crpc_aborted, status);
1153
1154        /*
1155         * No one can schedule me now since:
1156         * - RPC timer has been defused.
1157         * - all LNet events have been fired.
1158         * - crpc_closed has been set, preventing srpc_abort_rpc from
1159         *   scheduling me.
1160         * Cancel pending schedules and prevent future schedule attempts:
1161         */
1162        LASSERT(!srpc_event_pending(rpc));
1163
1164        spin_unlock(&rpc->crpc_lock);
1165
1166        (*rpc->crpc_done)(rpc);
1167}
1168
1169/* sends an outgoing RPC */
1170void
1171srpc_send_rpc(struct swi_workitem *wi)
1172{
1173        int rc = 0;
1174        struct srpc_client_rpc *rpc;
1175        struct srpc_msg *reply;
1176        int do_bulk;
1177
1178        LASSERT(wi);
1179
1180        rpc = container_of(wi, struct srpc_client_rpc, crpc_wi);
1181
1182        LASSERT(rpc);
1183        LASSERT(wi == &rpc->crpc_wi);
1184
1185        reply = &rpc->crpc_replymsg;
1186        do_bulk = rpc->crpc_bulk.bk_niov > 0;
1187
1188        spin_lock(&rpc->crpc_lock);
1189
1190        if (rpc->crpc_aborted) {
1191                spin_unlock(&rpc->crpc_lock);
1192                goto abort;
1193        }
1194
1195        spin_unlock(&rpc->crpc_lock);
1196
1197        switch (wi->swi_state) {
1198        default:
1199                LBUG();
1200        case SWI_STATE_NEWBORN:
1201                LASSERT(!srpc_event_pending(rpc));
1202
1203                rc = srpc_prepare_reply(rpc);
1204                if (rc) {
1205                        srpc_client_rpc_done(rpc, rc);
1206                        return;
1207                }
1208
1209                rc = srpc_prepare_bulk(rpc);
1210                if (rc)
1211                        break;
1212
1213                wi->swi_state = SWI_STATE_REQUEST_SUBMITTED;
1214                rc = srpc_send_request(rpc);
1215                break;
1216
1217        case SWI_STATE_REQUEST_SUBMITTED:
1218                /*
1219                 * CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1220                 * order; however, they're processed in a strict order:
1221                 * rqt, rpy, and bulk.
1222                 */
1223                if (!rpc->crpc_reqstev.ev_fired)
1224                        break;
1225
1226                rc = rpc->crpc_reqstev.ev_status;
1227                if (rc)
1228                        break;
1229
1230                wi->swi_state = SWI_STATE_REQUEST_SENT;
1231                /* perhaps more events */
1232                /* fall through */
1233        case SWI_STATE_REQUEST_SENT: {
1234                enum srpc_msg_type type = srpc_service2reply(rpc->crpc_service);
1235
1236                if (!rpc->crpc_replyev.ev_fired)
1237                        break;
1238
1239                rc = rpc->crpc_replyev.ev_status;
1240                if (rc)
1241                        break;
1242
1243                srpc_unpack_msg_hdr(reply);
1244                if (reply->msg_type != type ||
1245                    (reply->msg_magic != SRPC_MSG_MAGIC &&
1246                     reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1247                        CWARN("Bad message from %s: type %u (%d expected), magic %u (%d expected).\n",
1248                              libcfs_id2str(rpc->crpc_dest),
1249                              reply->msg_type, type,
1250                              reply->msg_magic, SRPC_MSG_MAGIC);
1251                        rc = -EBADMSG;
1252                        break;
1253                }
1254
1255                if (do_bulk && reply->msg_body.reply.status) {
1256                        CWARN("Remote error %d at %s, unlink bulk buffer in case peer didn't initiate bulk transfer\n",
1257                              reply->msg_body.reply.status,
1258                              libcfs_id2str(rpc->crpc_dest));
1259                        LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1260                }
1261
1262                wi->swi_state = SWI_STATE_REPLY_RECEIVED;
1263        }
1264                /* fall through */
1265        case SWI_STATE_REPLY_RECEIVED:
1266                if (do_bulk && !rpc->crpc_bulkev.ev_fired)
1267                        break;
1268
1269                rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1270
1271                /*
1272                 * Bulk buffer was unlinked due to remote error. Clear error
1273                 * since reply buffer still contains valid data.
1274                 * NB rpc->crpc_done shouldn't look into bulk data in case of
1275                 * remote error.
1276                 */
1277                if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1278                    !rpc->crpc_status && reply->msg_body.reply.status)
1279                        rc = 0;
1280
1281                wi->swi_state = SWI_STATE_DONE;
1282                srpc_client_rpc_done(rpc, rc);
1283                return;
1284        }
1285
1286        if (rc) {
1287                spin_lock(&rpc->crpc_lock);
1288                srpc_abort_rpc(rpc, rc);
1289                spin_unlock(&rpc->crpc_lock);
1290        }
1291
1292abort:
1293        if (rpc->crpc_aborted) {
1294                LNetMDUnlink(rpc->crpc_reqstmdh);
1295                LNetMDUnlink(rpc->crpc_replymdh);
1296                LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1297
1298                if (!srpc_event_pending(rpc)) {
1299                        srpc_client_rpc_done(rpc, -EINTR);
1300                        return;
1301                }
1302        }
1303}
1304
1305struct srpc_client_rpc *
1306srpc_create_client_rpc(struct lnet_process_id peer, int service,
1307                       int nbulkiov, int bulklen,
1308                       void (*rpc_done)(struct srpc_client_rpc *),
1309                       void (*rpc_fini)(struct srpc_client_rpc *), void *priv)
1310{
1311        struct srpc_client_rpc *rpc;
1312
1313        rpc = kzalloc(offsetof(struct srpc_client_rpc,
1314                               crpc_bulk.bk_iovs[nbulkiov]), GFP_KERNEL);
1315        if (!rpc)
1316                return NULL;
1317
1318        srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1319                             bulklen, rpc_done, rpc_fini, priv);
1320        return rpc;
1321}
1322
1323/* called with rpc->crpc_lock held */
1324void
1325srpc_abort_rpc(struct srpc_client_rpc *rpc, int why)
1326{
1327        LASSERT(why);
1328
1329        if (rpc->crpc_aborted ||        /* already aborted */
1330            rpc->crpc_closed)           /* callback imminent */
1331                return;
1332
1333        CDEBUG(D_NET, "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1334               rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1335               swi_state2str(rpc->crpc_wi.swi_state), why);
1336
1337        rpc->crpc_aborted = 1;
1338        rpc->crpc_status = why;
1339        swi_schedule_workitem(&rpc->crpc_wi);
1340}
1341
1342/* called with rpc->crpc_lock held */
1343void
1344srpc_post_rpc(struct srpc_client_rpc *rpc)
1345{
1346        LASSERT(!rpc->crpc_aborted);
1347        LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
1348
1349        CDEBUG(D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1350               libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1351               rpc->crpc_timeout);
1352
1353        srpc_add_client_rpc_timer(rpc);
1354        swi_schedule_workitem(&rpc->crpc_wi);
1355}
1356
1357int
1358srpc_send_reply(struct srpc_server_rpc *rpc)
1359{
1360        struct srpc_event *ev = &rpc->srpc_ev;
1361        struct srpc_msg *msg = &rpc->srpc_replymsg;
1362        struct srpc_buffer *buffer = rpc->srpc_reqstbuf;
1363        struct srpc_service_cd *scd = rpc->srpc_scd;
1364        struct srpc_service *sv = scd->scd_svc;
1365        __u64 rpyid;
1366        int rc;
1367
1368        LASSERT(buffer);
1369        rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1370
1371        spin_lock(&scd->scd_lock);
1372
1373        if (!sv->sv_shuttingdown && !srpc_serv_is_framework(sv)) {
1374                /*
1375                 * Repost buffer before replying since test client
1376                 * might send me another RPC once it gets the reply
1377                 */
1378                if (srpc_service_post_buffer(scd, buffer))
1379                        CWARN("Failed to repost %s buffer\n", sv->sv_name);
1380                rpc->srpc_reqstbuf = NULL;
1381        }
1382
1383        spin_unlock(&scd->scd_lock);
1384
1385        ev->ev_fired = 0;
1386        ev->ev_data = rpc;
1387        ev->ev_type = SRPC_REPLY_SENT;
1388
1389        msg->msg_magic = SRPC_MSG_MAGIC;
1390        msg->msg_version = SRPC_MSG_VERSION;
1391        msg->msg_type = srpc_service2reply(sv->sv_id);
1392
1393        rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1394                                   sizeof(*msg), LNET_MD_OP_PUT,
1395                                   rpc->srpc_peer, rpc->srpc_self,
1396                                   &rpc->srpc_replymdh, ev);
1397        if (rc)
1398                ev->ev_fired = 1; /* no more event expected */
1399        return rc;
1400}
1401
1402/* when in kernel always called with LNET_LOCK() held, and in thread context */
1403static void
1404srpc_lnet_ev_handler(struct lnet_event *ev)
1405{
1406        struct srpc_service_cd *scd;
1407        struct srpc_event *rpcev = ev->md.user_ptr;
1408        struct srpc_client_rpc *crpc;
1409        struct srpc_server_rpc *srpc;
1410        struct srpc_buffer *buffer;
1411        struct srpc_service *sv;
1412        struct srpc_msg *msg;
1413        enum srpc_msg_type type;
1414
1415        LASSERT(!in_interrupt());
1416
1417        if (ev->status) {
1418                __u32 errors;
1419
1420                spin_lock(&srpc_data.rpc_glock);
1421                if (ev->status != -ECANCELED) /* cancellation is not error */
1422                        srpc_data.rpc_counters.errors++;
1423                errors = srpc_data.rpc_counters.errors;
1424                spin_unlock(&srpc_data.rpc_glock);
1425
1426                CNETERR("LNet event status %d type %d, RPC errors %u\n",
1427                        ev->status, ev->type, errors);
1428        }
1429
1430        rpcev->ev_lnet = ev->type;
1431
1432        switch (rpcev->ev_type) {
1433        default:
1434                CERROR("Unknown event: status %d, type %d, lnet %d\n",
1435                       rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1436                LBUG();
1437        case SRPC_REQUEST_SENT:
1438                if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
1439                        spin_lock(&srpc_data.rpc_glock);
1440                        srpc_data.rpc_counters.rpcs_sent++;
1441                        spin_unlock(&srpc_data.rpc_glock);
1442                }
1443                /* fall through */
1444        case SRPC_REPLY_RCVD:
1445        case SRPC_BULK_REQ_RCVD:
1446                crpc = rpcev->ev_data;
1447
1448                if (rpcev != &crpc->crpc_reqstev &&
1449                    rpcev != &crpc->crpc_replyev &&
1450                    rpcev != &crpc->crpc_bulkev) {
1451                        CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
1452                               rpcev, crpc, &crpc->crpc_reqstev,
1453                               &crpc->crpc_replyev, &crpc->crpc_bulkev);
1454                        CERROR("Bad event: status %d, type %d, lnet %d\n",
1455                               rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1456                        LBUG();
1457                }
1458
1459                spin_lock(&crpc->crpc_lock);
1460
1461                LASSERT(!rpcev->ev_fired);
1462                rpcev->ev_fired = 1;
1463                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1464                                                -EINTR : ev->status;
1465                swi_schedule_workitem(&crpc->crpc_wi);
1466
1467                spin_unlock(&crpc->crpc_lock);
1468                break;
1469
1470        case SRPC_REQUEST_RCVD:
1471                scd = rpcev->ev_data;
1472                sv = scd->scd_svc;
1473
1474                LASSERT(rpcev == &scd->scd_ev);
1475
1476                spin_lock(&scd->scd_lock);
1477
1478                LASSERT(ev->unlinked);
1479                LASSERT(ev->type == LNET_EVENT_PUT ||
1480                        ev->type == LNET_EVENT_UNLINK);
1481                LASSERT(ev->type != LNET_EVENT_UNLINK ||
1482                        sv->sv_shuttingdown);
1483
1484                buffer = container_of(ev->md.start, struct srpc_buffer, buf_msg);
1485                buffer->buf_peer = ev->initiator;
1486                buffer->buf_self = ev->target.nid;
1487
1488                LASSERT(scd->scd_buf_nposted > 0);
1489                scd->scd_buf_nposted--;
1490
1491                if (sv->sv_shuttingdown) {
1492                        /*
1493                         * Leave buffer on scd->scd_buf_nposted since
1494                         * srpc_finish_service needs to traverse it.
1495                         */
1496                        spin_unlock(&scd->scd_lock);
1497                        break;
1498                }
1499
1500                if (scd->scd_buf_err_stamp &&
1501                    scd->scd_buf_err_stamp < ktime_get_real_seconds()) {
1502                        /* re-enable adding buffer */
1503                        scd->scd_buf_err_stamp = 0;
1504                        scd->scd_buf_err = 0;
1505                }
1506
1507                if (!scd->scd_buf_err &&        /* adding buffer is enabled */
1508                    !scd->scd_buf_adjust &&
1509                    scd->scd_buf_nposted < scd->scd_buf_low) {
1510                        scd->scd_buf_adjust = max(scd->scd_buf_total / 2,
1511                                                  SFW_TEST_WI_MIN);
1512                        swi_schedule_workitem(&scd->scd_buf_wi);
1513                }
1514
1515                list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
1516                msg = &buffer->buf_msg;
1517                type = srpc_service2request(sv->sv_id);
1518
1519                if (ev->status || ev->mlength != sizeof(*msg) ||
1520                    (msg->msg_type != type &&
1521                     msg->msg_type != __swab32(type)) ||
1522                    (msg->msg_magic != SRPC_MSG_MAGIC &&
1523                     msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1524                        CERROR("Dropping RPC (%s) from %s: status %d mlength %d type %u magic %u.\n",
1525                               sv->sv_name, libcfs_id2str(ev->initiator),
1526                               ev->status, ev->mlength,
1527                               msg->msg_type, msg->msg_magic);
1528
1529                        /*
1530                         * NB can't call srpc_service_recycle_buffer here since
1531                         * it may call LNetM[DE]Attach. The invalid magic tells
1532                         * srpc_handle_rpc to drop this RPC
1533                         */
1534                        msg->msg_magic = 0;
1535                }
1536
1537                if (!list_empty(&scd->scd_rpc_free)) {
1538                        srpc = list_entry(scd->scd_rpc_free.next,
1539                                          struct srpc_server_rpc,
1540                                          srpc_list);
1541                        list_del(&srpc->srpc_list);
1542
1543                        srpc_init_server_rpc(srpc, scd, buffer);
1544                        list_add_tail(&srpc->srpc_list,
1545                                      &scd->scd_rpc_active);
1546                        swi_schedule_workitem(&srpc->srpc_wi);
1547                } else {
1548                        list_add_tail(&buffer->buf_list,
1549                                      &scd->scd_buf_blocked);
1550                }
1551
1552                spin_unlock(&scd->scd_lock);
1553
1554                spin_lock(&srpc_data.rpc_glock);
1555                srpc_data.rpc_counters.rpcs_rcvd++;
1556                spin_unlock(&srpc_data.rpc_glock);
1557                break;
1558
1559        case SRPC_BULK_GET_RPLD:
1560                LASSERT(ev->type == LNET_EVENT_SEND ||
1561                        ev->type == LNET_EVENT_REPLY ||
1562                        ev->type == LNET_EVENT_UNLINK);
1563
1564                if (!ev->unlinked)
1565                        break; /* wait for final event */
1566                /* fall through */
1567        case SRPC_BULK_PUT_SENT:
1568                if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
1569                        spin_lock(&srpc_data.rpc_glock);
1570
1571                        if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1572                                srpc_data.rpc_counters.bulk_get += ev->mlength;
1573                        else
1574                                srpc_data.rpc_counters.bulk_put += ev->mlength;
1575
1576                        spin_unlock(&srpc_data.rpc_glock);
1577                }
1578                /* fall through */
1579        case SRPC_REPLY_SENT:
1580                srpc = rpcev->ev_data;
1581                scd = srpc->srpc_scd;
1582
1583                LASSERT(rpcev == &srpc->srpc_ev);
1584
1585                spin_lock(&scd->scd_lock);
1586
1587                rpcev->ev_fired = 1;
1588                rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1589                                   -EINTR : ev->status;
1590                swi_schedule_workitem(&srpc->srpc_wi);
1591
1592                spin_unlock(&scd->scd_lock);
1593                break;
1594        }
1595}
1596
1597int
1598srpc_startup(void)
1599{
1600        int rc;
1601
1602        memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1603        spin_lock_init(&srpc_data.rpc_glock);
1604
1605        /* 1 second pause to avoid timestamp reuse */
1606        set_current_state(TASK_UNINTERRUPTIBLE);
1607        schedule_timeout(HZ);
1608        srpc_data.rpc_matchbits = ((__u64)ktime_get_real_seconds()) << 48;
1609
1610        srpc_data.rpc_state = SRPC_STATE_NONE;
1611
1612        rc = LNetNIInit(LNET_PID_LUSTRE);
1613        if (rc < 0) {
1614                CERROR("LNetNIInit() has failed: %d\n", rc);
1615                return rc;
1616        }
1617
1618        srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1619
1620        LNetInvalidateEQHandle(&srpc_data.rpc_lnet_eq);
1621        rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1622        if (rc) {
1623                CERROR("LNetEQAlloc() has failed: %d\n", rc);
1624                goto bail;
1625        }
1626
1627        rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1628        LASSERT(!rc);
1629        rc = LNetSetLazyPortal(SRPC_REQUEST_PORTAL);
1630        LASSERT(!rc);
1631
1632        srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1633
1634        rc = stt_startup();
1635
1636bail:
1637        if (rc)
1638                srpc_shutdown();
1639        else
1640                srpc_data.rpc_state = SRPC_STATE_RUNNING;
1641
1642        return rc;
1643}
1644
1645void
1646srpc_shutdown(void)
1647{
1648        int i;
1649        int rc;
1650        int state;
1651
1652        state = srpc_data.rpc_state;
1653        srpc_data.rpc_state = SRPC_STATE_STOPPING;
1654
1655        switch (state) {
1656        default:
1657                LBUG();
1658        case SRPC_STATE_RUNNING:
1659                spin_lock(&srpc_data.rpc_glock);
1660
1661                for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1662                        struct srpc_service *sv = srpc_data.rpc_services[i];
1663
1664                        LASSERTF(!sv, "service not empty: id %d, name %s\n",
1665                                 i, sv->sv_name);
1666                }
1667
1668                spin_unlock(&srpc_data.rpc_glock);
1669
1670                stt_shutdown();
1671                /* fall through */
1672        case SRPC_STATE_EQ_INIT:
1673                rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1674                rc = LNetClearLazyPortal(SRPC_REQUEST_PORTAL);
1675                LASSERT(!rc);
1676                rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1677                LASSERT(!rc); /* the EQ should have no user by now */
1678                /* fall through */
1679        case SRPC_STATE_NI_INIT:
1680                LNetNIFini();
1681        }
1682}
1683