linux/drivers/staging/lustre/lnet/selftest/conrpc.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * GPL HEADER START
   4 *
   5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   6 *
   7 * This program is free software; you can redistribute it and/or modify
   8 * it under the terms of the GNU General Public License version 2 only,
   9 * as published by the Free Software Foundation.
  10 *
  11 * This program is distributed in the hope that it will be useful, but
  12 * WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License version 2 for more details (a copy is included
  15 * in the LICENSE file that accompanied this code).
  16 *
  17 * You should have received a copy of the GNU General Public License
  18 * version 2 along with this program; If not, see
  19 * http://www.gnu.org/licenses/gpl-2.0.html
  20 *
  21 * GPL HEADER END
  22 */
  23/*
  24 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  25 * Use is subject to license terms.
  26 *
  27 * Copyright (c) 2011, 2012, Intel Corporation.
  28 */
  29/*
  30 * This file is part of Lustre, http://www.lustre.org/
  31 * Lustre is a trademark of Sun Microsystems, Inc.
  32 *
  33 * lnet/selftest/conctl.c
  34 *
  35 * Console framework rpcs
  36 *
  37 * Author: Liang Zhen <liang@whamcloud.com>
  38 */
  39
  40#include <linux/libcfs/libcfs.h>
  41#include <linux/lnet/lib-lnet.h>
  42#include "timer.h"
  43#include "conrpc.h"
  44#include "console.h"
  45
  46void lstcon_rpc_stat_reply(struct lstcon_rpc_trans *, struct srpc_msg *,
  47                           struct lstcon_node *, struct lstcon_trans_stat *);
  48
  49static void
  50lstcon_rpc_done(struct srpc_client_rpc *rpc)
  51{
  52        struct lstcon_rpc *crpc = (struct lstcon_rpc *)rpc->crpc_priv;
  53
  54        LASSERT(crpc && rpc == crpc->crp_rpc);
  55        LASSERT(crpc->crp_posted && !crpc->crp_finished);
  56
  57        spin_lock(&rpc->crpc_lock);
  58
  59        if (!crpc->crp_trans) {
  60                /*
  61                 * Orphan RPC is not in any transaction,
  62                 * I'm just a poor body and nobody loves me
  63                 */
  64                spin_unlock(&rpc->crpc_lock);
  65
  66                /* release it */
  67                lstcon_rpc_put(crpc);
  68                return;
  69        }
  70
  71        /* not an orphan RPC */
  72        crpc->crp_finished = 1;
  73
  74        if (!crpc->crp_stamp) {
  75                /* not aborted */
  76                LASSERT(!crpc->crp_status);
  77
  78                crpc->crp_stamp = cfs_time_current();
  79                crpc->crp_status = rpc->crpc_status;
  80        }
  81
  82        /* wakeup (transaction)thread if I'm the last RPC in the transaction */
  83        if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining))
  84                wake_up(&crpc->crp_trans->tas_waitq);
  85
  86        spin_unlock(&rpc->crpc_lock);
  87}
  88
  89static int
  90lstcon_rpc_init(struct lstcon_node *nd, int service, unsigned int feats,
  91                int bulk_npg, int bulk_len, int embedded,
  92                struct lstcon_rpc *crpc)
  93{
  94        crpc->crp_rpc = sfw_create_rpc(nd->nd_id, service,
  95                                       feats, bulk_npg, bulk_len,
  96                                       lstcon_rpc_done, (void *)crpc);
  97        if (!crpc->crp_rpc)
  98                return -ENOMEM;
  99
 100        crpc->crp_trans = NULL;
 101        crpc->crp_node = nd;
 102        crpc->crp_posted = 0;
 103        crpc->crp_finished = 0;
 104        crpc->crp_unpacked = 0;
 105        crpc->crp_status = 0;
 106        crpc->crp_stamp = 0;
 107        crpc->crp_embedded = embedded;
 108        INIT_LIST_HEAD(&crpc->crp_link);
 109
 110        atomic_inc(&console_session.ses_rpc_counter);
 111
 112        return 0;
 113}
 114
 115static int
 116lstcon_rpc_prep(struct lstcon_node *nd, int service, unsigned int feats,
 117                int bulk_npg, int bulk_len, struct lstcon_rpc **crpcpp)
 118{
 119        struct lstcon_rpc *crpc = NULL;
 120        int rc;
 121
 122        spin_lock(&console_session.ses_rpc_lock);
 123
 124        crpc = list_first_entry_or_null(&console_session.ses_rpc_freelist,
 125                                        struct lstcon_rpc, crp_link);
 126        if (crpc)
 127                list_del_init(&crpc->crp_link);
 128
 129        spin_unlock(&console_session.ses_rpc_lock);
 130
 131        if (!crpc) {
 132                crpc = kzalloc(sizeof(*crpc), GFP_NOFS);
 133                if (!crpc)
 134                        return -ENOMEM;
 135        }
 136
 137        rc = lstcon_rpc_init(nd, service, feats, bulk_npg, bulk_len, 0, crpc);
 138        if (!rc) {
 139                *crpcpp = crpc;
 140                return 0;
 141        }
 142
 143        kfree(crpc);
 144
 145        return rc;
 146}
 147
 148void
 149lstcon_rpc_put(struct lstcon_rpc *crpc)
 150{
 151        struct srpc_bulk *bulk = &crpc->crp_rpc->crpc_bulk;
 152        int i;
 153
 154        LASSERT(list_empty(&crpc->crp_link));
 155
 156        for (i = 0; i < bulk->bk_niov; i++) {
 157                if (!bulk->bk_iovs[i].bv_page)
 158                        continue;
 159
 160                __free_page(bulk->bk_iovs[i].bv_page);
 161        }
 162
 163        srpc_client_rpc_decref(crpc->crp_rpc);
 164
 165        if (crpc->crp_embedded) {
 166                /* embedded RPC, don't recycle it */
 167                memset(crpc, 0, sizeof(*crpc));
 168                crpc->crp_embedded = 1;
 169
 170        } else {
 171                spin_lock(&console_session.ses_rpc_lock);
 172
 173                list_add(&crpc->crp_link,
 174                         &console_session.ses_rpc_freelist);
 175
 176                spin_unlock(&console_session.ses_rpc_lock);
 177        }
 178
 179        /* RPC is not alive now */
 180        atomic_dec(&console_session.ses_rpc_counter);
 181}
 182
 183static void
 184lstcon_rpc_post(struct lstcon_rpc *crpc)
 185{
 186        struct lstcon_rpc_trans *trans = crpc->crp_trans;
 187
 188        LASSERT(trans);
 189
 190        atomic_inc(&trans->tas_remaining);
 191        crpc->crp_posted = 1;
 192
 193        sfw_post_rpc(crpc->crp_rpc);
 194}
 195
 196static char *
 197lstcon_rpc_trans_name(int transop)
 198{
 199        if (transop == LST_TRANS_SESNEW)
 200                return "SESNEW";
 201
 202        if (transop == LST_TRANS_SESEND)
 203                return "SESEND";
 204
 205        if (transop == LST_TRANS_SESQRY)
 206                return "SESQRY";
 207
 208        if (transop == LST_TRANS_SESPING)
 209                return "SESPING";
 210
 211        if (transop == LST_TRANS_TSBCLIADD)
 212                return "TSBCLIADD";
 213
 214        if (transop == LST_TRANS_TSBSRVADD)
 215                return "TSBSRVADD";
 216
 217        if (transop == LST_TRANS_TSBRUN)
 218                return "TSBRUN";
 219
 220        if (transop == LST_TRANS_TSBSTOP)
 221                return "TSBSTOP";
 222
 223        if (transop == LST_TRANS_TSBCLIQRY)
 224                return "TSBCLIQRY";
 225
 226        if (transop == LST_TRANS_TSBSRVQRY)
 227                return "TSBSRVQRY";
 228
 229        if (transop == LST_TRANS_STATQRY)
 230                return "STATQRY";
 231
 232        return "Unknown";
 233}
 234
 235int
 236lstcon_rpc_trans_prep(struct list_head *translist, int transop,
 237                      struct lstcon_rpc_trans **transpp)
 238{
 239        struct lstcon_rpc_trans *trans;
 240
 241        if (translist) {
 242                list_for_each_entry(trans, translist, tas_link) {
 243                        /*
 244                         * Can't enqueue two private transaction on
 245                         * the same object
 246                         */
 247                        if ((trans->tas_opc & transop) == LST_TRANS_PRIVATE)
 248                                return -EPERM;
 249                }
 250        }
 251
 252        /* create a trans group */
 253        trans = kzalloc(sizeof(*trans), GFP_NOFS);
 254        if (!trans)
 255                return -ENOMEM;
 256
 257        trans->tas_opc = transop;
 258
 259        if (!translist)
 260                INIT_LIST_HEAD(&trans->tas_olink);
 261        else
 262                list_add_tail(&trans->tas_olink, translist);
 263
 264        list_add_tail(&trans->tas_link, &console_session.ses_trans_list);
 265
 266        INIT_LIST_HEAD(&trans->tas_rpcs_list);
 267        atomic_set(&trans->tas_remaining, 0);
 268        init_waitqueue_head(&trans->tas_waitq);
 269
 270        spin_lock(&console_session.ses_rpc_lock);
 271        trans->tas_features = console_session.ses_features;
 272        spin_unlock(&console_session.ses_rpc_lock);
 273
 274        *transpp = trans;
 275        return 0;
 276}
 277
 278void
 279lstcon_rpc_trans_addreq(struct lstcon_rpc_trans *trans, struct lstcon_rpc *crpc)
 280{
 281        list_add_tail(&crpc->crp_link, &trans->tas_rpcs_list);
 282        crpc->crp_trans = trans;
 283}
 284
 285void
 286lstcon_rpc_trans_abort(struct lstcon_rpc_trans *trans, int error)
 287{
 288        struct srpc_client_rpc *rpc;
 289        struct lstcon_rpc *crpc;
 290        struct lstcon_node *nd;
 291
 292        list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
 293                rpc = crpc->crp_rpc;
 294
 295                spin_lock(&rpc->crpc_lock);
 296
 297                if (!crpc->crp_posted || /* not posted */
 298                    crpc->crp_stamp) {   /* rpc done or aborted already */
 299                        if (!crpc->crp_stamp) {
 300                                crpc->crp_stamp = cfs_time_current();
 301                                crpc->crp_status = -EINTR;
 302                        }
 303                        spin_unlock(&rpc->crpc_lock);
 304                        continue;
 305                }
 306
 307                crpc->crp_stamp = cfs_time_current();
 308                crpc->crp_status = error;
 309
 310                spin_unlock(&rpc->crpc_lock);
 311
 312                sfw_abort_rpc(rpc);
 313
 314                if (error != -ETIMEDOUT)
 315                        continue;
 316
 317                nd = crpc->crp_node;
 318                if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
 319                        continue;
 320
 321                nd->nd_stamp = crpc->crp_stamp;
 322                nd->nd_state = LST_NODE_DOWN;
 323        }
 324}
 325
 326static int
 327lstcon_rpc_trans_check(struct lstcon_rpc_trans *trans)
 328{
 329        if (console_session.ses_shutdown &&
 330            !list_empty(&trans->tas_olink)) /* Not an end session RPC */
 331                return 1;
 332
 333        return !atomic_read(&trans->tas_remaining) ? 1 : 0;
 334}
 335
 336int
 337lstcon_rpc_trans_postwait(struct lstcon_rpc_trans *trans, int timeout)
 338{
 339        struct lstcon_rpc *crpc;
 340        int rc;
 341
 342        if (list_empty(&trans->tas_rpcs_list))
 343                return 0;
 344
 345        if (timeout < LST_TRANS_MIN_TIMEOUT)
 346                timeout = LST_TRANS_MIN_TIMEOUT;
 347
 348        CDEBUG(D_NET, "Transaction %s started\n",
 349               lstcon_rpc_trans_name(trans->tas_opc));
 350
 351        /* post all requests */
 352        list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
 353                LASSERT(!crpc->crp_posted);
 354
 355                lstcon_rpc_post(crpc);
 356        }
 357
 358        mutex_unlock(&console_session.ses_mutex);
 359
 360        rc = wait_event_interruptible_timeout(trans->tas_waitq,
 361                                              lstcon_rpc_trans_check(trans),
 362                                              cfs_time_seconds(timeout));
 363        rc = (rc > 0) ? 0 : ((rc < 0) ? -EINTR : -ETIMEDOUT);
 364
 365        mutex_lock(&console_session.ses_mutex);
 366
 367        if (console_session.ses_shutdown)
 368                rc = -ESHUTDOWN;
 369
 370        if (rc || atomic_read(&trans->tas_remaining)) {
 371                /* treat short timeout as canceled */
 372                if (rc == -ETIMEDOUT && timeout < LST_TRANS_MIN_TIMEOUT * 2)
 373                        rc = -EINTR;
 374
 375                lstcon_rpc_trans_abort(trans, rc);
 376        }
 377
 378        CDEBUG(D_NET, "Transaction %s stopped: %d\n",
 379               lstcon_rpc_trans_name(trans->tas_opc), rc);
 380
 381        lstcon_rpc_trans_stat(trans, lstcon_trans_stat());
 382
 383        return rc;
 384}
 385
 386static int
 387lstcon_rpc_get_reply(struct lstcon_rpc *crpc, struct srpc_msg **msgpp)
 388{
 389        struct lstcon_node *nd = crpc->crp_node;
 390        struct srpc_client_rpc *rpc = crpc->crp_rpc;
 391        struct srpc_generic_reply *rep;
 392
 393        LASSERT(nd && rpc);
 394        LASSERT(crpc->crp_stamp);
 395
 396        if (crpc->crp_status) {
 397                *msgpp = NULL;
 398                return crpc->crp_status;
 399        }
 400
 401        *msgpp = &rpc->crpc_replymsg;
 402        if (!crpc->crp_unpacked) {
 403                sfw_unpack_message(*msgpp);
 404                crpc->crp_unpacked = 1;
 405        }
 406
 407        if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
 408                return 0;
 409
 410        nd->nd_stamp = crpc->crp_stamp;
 411        rep = &(*msgpp)->msg_body.reply;
 412
 413        if (rep->sid.ses_nid == LNET_NID_ANY)
 414                nd->nd_state = LST_NODE_UNKNOWN;
 415        else if (lstcon_session_match(rep->sid))
 416                nd->nd_state = LST_NODE_ACTIVE;
 417        else
 418                nd->nd_state = LST_NODE_BUSY;
 419
 420        return 0;
 421}
 422
 423void
 424lstcon_rpc_trans_stat(struct lstcon_rpc_trans *trans, struct lstcon_trans_stat *stat)
 425{
 426        struct lstcon_rpc *crpc;
 427        struct srpc_msg *rep;
 428        int error;
 429
 430        LASSERT(stat);
 431
 432        memset(stat, 0, sizeof(*stat));
 433
 434        list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
 435                lstcon_rpc_stat_total(stat, 1);
 436
 437                LASSERT(crpc->crp_stamp);
 438
 439                error = lstcon_rpc_get_reply(crpc, &rep);
 440                if (error) {
 441                        lstcon_rpc_stat_failure(stat, 1);
 442                        if (!stat->trs_rpc_errno)
 443                                stat->trs_rpc_errno = -error;
 444
 445                        continue;
 446                }
 447
 448                lstcon_rpc_stat_success(stat, 1);
 449
 450                lstcon_rpc_stat_reply(trans, rep, crpc->crp_node, stat);
 451        }
 452
 453        if (trans->tas_opc == LST_TRANS_SESNEW && !stat->trs_fwk_errno) {
 454                stat->trs_fwk_errno =
 455                      lstcon_session_feats_check(trans->tas_features);
 456        }
 457
 458        CDEBUG(D_NET, "transaction %s : success %d, failure %d, total %d, RPC error(%d), Framework error(%d)\n",
 459               lstcon_rpc_trans_name(trans->tas_opc),
 460               lstcon_rpc_stat_success(stat, 0),
 461               lstcon_rpc_stat_failure(stat, 0),
 462               lstcon_rpc_stat_total(stat, 0),
 463               stat->trs_rpc_errno, stat->trs_fwk_errno);
 464}
 465
 466int
 467lstcon_rpc_trans_interpreter(struct lstcon_rpc_trans *trans,
 468                             struct list_head __user *head_up,
 469                             lstcon_rpc_readent_func_t readent)
 470{
 471        struct list_head tmp;
 472        struct list_head __user *next;
 473        struct lstcon_rpc_ent *ent;
 474        struct srpc_generic_reply *rep;
 475        struct lstcon_rpc *crpc;
 476        struct srpc_msg *msg;
 477        struct lstcon_node *nd;
 478        long dur;
 479        struct timeval tv;
 480        int error;
 481
 482        LASSERT(head_up);
 483
 484        next = head_up;
 485
 486        list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
 487                if (copy_from_user(&tmp, next,
 488                                   sizeof(struct list_head)))
 489                        return -EFAULT;
 490
 491                next = tmp.next;
 492                if (next == head_up)
 493                        return 0;
 494
 495                ent = list_entry(next, struct lstcon_rpc_ent, rpe_link);
 496
 497                LASSERT(crpc->crp_stamp);
 498
 499                error = lstcon_rpc_get_reply(crpc, &msg);
 500
 501                nd = crpc->crp_node;
 502
 503                dur = (long)cfs_time_sub(crpc->crp_stamp,
 504                      (unsigned long)console_session.ses_id.ses_stamp);
 505                jiffies_to_timeval(dur, &tv);
 506
 507                if (copy_to_user(&ent->rpe_peer, &nd->nd_id,
 508                                 sizeof(struct lnet_process_id)) ||
 509                    copy_to_user(&ent->rpe_stamp, &tv, sizeof(tv)) ||
 510                    copy_to_user(&ent->rpe_state, &nd->nd_state,
 511                                 sizeof(nd->nd_state)) ||
 512                    copy_to_user(&ent->rpe_rpc_errno, &error,
 513                                 sizeof(error)))
 514                        return -EFAULT;
 515
 516                if (error)
 517                        continue;
 518
 519                /* RPC is done */
 520                rep = (struct srpc_generic_reply *)&msg->msg_body.reply;
 521
 522                if (copy_to_user(&ent->rpe_sid, &rep->sid, sizeof(rep->sid)) ||
 523                    copy_to_user(&ent->rpe_fwk_errno, &rep->status,
 524                                 sizeof(rep->status)))
 525                        return -EFAULT;
 526
 527                if (!readent)
 528                        continue;
 529
 530                error = readent(trans->tas_opc, msg, ent);
 531                if (error)
 532                        return error;
 533        }
 534
 535        return 0;
 536}
 537
 538void
 539lstcon_rpc_trans_destroy(struct lstcon_rpc_trans *trans)
 540{
 541        struct srpc_client_rpc *rpc;
 542        struct lstcon_rpc *crpc;
 543        struct lstcon_rpc *tmp;
 544        int count = 0;
 545
 546        list_for_each_entry_safe(crpc, tmp, &trans->tas_rpcs_list, crp_link) {
 547                rpc = crpc->crp_rpc;
 548
 549                spin_lock(&rpc->crpc_lock);
 550
 551                /* free it if not posted or finished already */
 552                if (!crpc->crp_posted || crpc->crp_finished) {
 553                        spin_unlock(&rpc->crpc_lock);
 554
 555                        list_del_init(&crpc->crp_link);
 556                        lstcon_rpc_put(crpc);
 557
 558                        continue;
 559                }
 560
 561                /*
 562                 * rpcs can be still not callbacked (even LNetMDUnlink is
 563                 * called) because huge timeout for inaccessible network,
 564                 * don't make user wait for them, just abandon them, they
 565                 * will be recycled in callback
 566                 */
 567                LASSERT(crpc->crp_status);
 568
 569                crpc->crp_node = NULL;
 570                crpc->crp_trans = NULL;
 571                list_del_init(&crpc->crp_link);
 572                count++;
 573
 574                spin_unlock(&rpc->crpc_lock);
 575
 576                atomic_dec(&trans->tas_remaining);
 577        }
 578
 579        LASSERT(!atomic_read(&trans->tas_remaining));
 580
 581        list_del(&trans->tas_link);
 582        if (!list_empty(&trans->tas_olink))
 583                list_del(&trans->tas_olink);
 584
 585        CDEBUG(D_NET, "Transaction %s destroyed with %d pending RPCs\n",
 586               lstcon_rpc_trans_name(trans->tas_opc), count);
 587
 588        kfree(trans);
 589}
 590
 591int
 592lstcon_sesrpc_prep(struct lstcon_node *nd, int transop,
 593                   unsigned int feats, struct lstcon_rpc **crpc)
 594{
 595        struct srpc_mksn_reqst *msrq;
 596        struct srpc_rmsn_reqst *rsrq;
 597        int rc;
 598
 599        switch (transop) {
 600        case LST_TRANS_SESNEW:
 601                rc = lstcon_rpc_prep(nd, SRPC_SERVICE_MAKE_SESSION,
 602                                     feats, 0, 0, crpc);
 603                if (rc)
 604                        return rc;
 605
 606                msrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.mksn_reqst;
 607                msrq->mksn_sid = console_session.ses_id;
 608                msrq->mksn_force = console_session.ses_force;
 609                strlcpy(msrq->mksn_name, console_session.ses_name,
 610                        sizeof(msrq->mksn_name));
 611                break;
 612
 613        case LST_TRANS_SESEND:
 614                rc = lstcon_rpc_prep(nd, SRPC_SERVICE_REMOVE_SESSION,
 615                                     feats, 0, 0, crpc);
 616                if (rc)
 617                        return rc;
 618
 619                rsrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.rmsn_reqst;
 620                rsrq->rmsn_sid = console_session.ses_id;
 621                break;
 622
 623        default:
 624                LBUG();
 625        }
 626
 627        return 0;
 628}
 629
 630int
 631lstcon_dbgrpc_prep(struct lstcon_node *nd, unsigned int feats,
 632                   struct lstcon_rpc **crpc)
 633{
 634        struct srpc_debug_reqst *drq;
 635        int rc;
 636
 637        rc = lstcon_rpc_prep(nd, SRPC_SERVICE_DEBUG, feats, 0, 0, crpc);
 638        if (rc)
 639                return rc;
 640
 641        drq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
 642
 643        drq->dbg_sid = console_session.ses_id;
 644        drq->dbg_flags = 0;
 645
 646        return rc;
 647}
 648
 649int
 650lstcon_batrpc_prep(struct lstcon_node *nd, int transop, unsigned int feats,
 651                   struct lstcon_tsb_hdr *tsb, struct lstcon_rpc **crpc)
 652{
 653        struct lstcon_batch *batch;
 654        struct srpc_batch_reqst *brq;
 655        int rc;
 656
 657        rc = lstcon_rpc_prep(nd, SRPC_SERVICE_BATCH, feats, 0, 0, crpc);
 658        if (rc)
 659                return rc;
 660
 661        brq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.bat_reqst;
 662
 663        brq->bar_sid = console_session.ses_id;
 664        brq->bar_bid = tsb->tsb_id;
 665        brq->bar_testidx = tsb->tsb_index;
 666        brq->bar_opc = transop == LST_TRANS_TSBRUN ? SRPC_BATCH_OPC_RUN :
 667                       (transop == LST_TRANS_TSBSTOP ? SRPC_BATCH_OPC_STOP :
 668                       SRPC_BATCH_OPC_QUERY);
 669
 670        if (transop != LST_TRANS_TSBRUN &&
 671            transop != LST_TRANS_TSBSTOP)
 672                return 0;
 673
 674        LASSERT(!tsb->tsb_index);
 675
 676        batch = (struct lstcon_batch *)tsb;
 677        brq->bar_arg = batch->bat_arg;
 678
 679        return 0;
 680}
 681
 682int
 683lstcon_statrpc_prep(struct lstcon_node *nd, unsigned int feats,
 684                    struct lstcon_rpc **crpc)
 685{
 686        struct srpc_stat_reqst *srq;
 687        int rc;
 688
 689        rc = lstcon_rpc_prep(nd, SRPC_SERVICE_QUERY_STAT, feats, 0, 0, crpc);
 690        if (rc)
 691                return rc;
 692
 693        srq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.stat_reqst;
 694
 695        srq->str_sid = console_session.ses_id;
 696        srq->str_type = 0; /* XXX remove it */
 697
 698        return 0;
 699}
 700
 701static struct lnet_process_id_packed *
 702lstcon_next_id(int idx, int nkiov, struct bio_vec *kiov)
 703{
 704        struct lnet_process_id_packed *pid;
 705        int i;
 706
 707        i = idx / SFW_ID_PER_PAGE;
 708
 709        LASSERT(i < nkiov);
 710
 711        pid = (struct lnet_process_id_packed *)page_address(kiov[i].bv_page);
 712
 713        return &pid[idx % SFW_ID_PER_PAGE];
 714}
 715
 716static int
 717lstcon_dstnodes_prep(struct lstcon_group *grp, int idx,
 718                     int dist, int span, int nkiov, struct bio_vec *kiov)
 719{
 720        struct lnet_process_id_packed *pid;
 721        struct lstcon_ndlink *ndl;
 722        struct lstcon_node *nd;
 723        int start;
 724        int end;
 725        int i = 0;
 726
 727        LASSERT(dist >= 1);
 728        LASSERT(span >= 1);
 729        LASSERT(grp->grp_nnode >= 1);
 730
 731        if (span > grp->grp_nnode)
 732                return -EINVAL;
 733
 734        start = ((idx / dist) * span) % grp->grp_nnode;
 735        end = ((idx / dist) * span + span - 1) % grp->grp_nnode;
 736
 737        list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
 738                nd = ndl->ndl_node;
 739                if (i < start) {
 740                        i++;
 741                        continue;
 742                }
 743
 744                if (i > (end >= start ? end : grp->grp_nnode))
 745                        break;
 746
 747                pid = lstcon_next_id((i - start), nkiov, kiov);
 748                pid->nid = nd->nd_id.nid;
 749                pid->pid = nd->nd_id.pid;
 750                i++;
 751        }
 752
 753        if (start <= end) /* done */
 754                return 0;
 755
 756        list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
 757                if (i > grp->grp_nnode + end)
 758                        break;
 759
 760                nd = ndl->ndl_node;
 761                pid = lstcon_next_id((i - start), nkiov, kiov);
 762                pid->nid = nd->nd_id.nid;
 763                pid->pid = nd->nd_id.pid;
 764                i++;
 765        }
 766
 767        return 0;
 768}
 769
 770static int
 771lstcon_pingrpc_prep(struct lst_test_ping_param *param, struct srpc_test_reqst *req)
 772{
 773        struct test_ping_req *prq = &req->tsr_u.ping;
 774
 775        prq->png_size = param->png_size;
 776        prq->png_flags = param->png_flags;
 777        /* TODO dest */
 778        return 0;
 779}
 780
 781static int
 782lstcon_bulkrpc_v0_prep(struct lst_test_bulk_param *param,
 783                       struct srpc_test_reqst *req)
 784{
 785        struct test_bulk_req *brq = &req->tsr_u.bulk_v0;
 786
 787        brq->blk_opc = param->blk_opc;
 788        brq->blk_npg = DIV_ROUND_UP(param->blk_size, PAGE_SIZE);
 789        brq->blk_flags = param->blk_flags;
 790
 791        return 0;
 792}
 793
 794static int
 795lstcon_bulkrpc_v1_prep(struct lst_test_bulk_param *param, bool is_client,
 796                       struct srpc_test_reqst *req)
 797{
 798        struct test_bulk_req_v1 *brq = &req->tsr_u.bulk_v1;
 799
 800        brq->blk_opc = param->blk_opc;
 801        brq->blk_flags = param->blk_flags;
 802        brq->blk_len = param->blk_size;
 803        brq->blk_offset = is_client ? param->blk_cli_off : param->blk_srv_off;
 804
 805        return 0;
 806}
 807
 808int
 809lstcon_testrpc_prep(struct lstcon_node *nd, int transop, unsigned int feats,
 810                    struct lstcon_test *test, struct lstcon_rpc **crpc)
 811{
 812        struct lstcon_group *sgrp = test->tes_src_grp;
 813        struct lstcon_group *dgrp = test->tes_dst_grp;
 814        struct srpc_test_reqst *trq;
 815        struct srpc_bulk *bulk;
 816        int i;
 817        int npg = 0;
 818        int nob = 0;
 819        int rc = 0;
 820
 821        if (transop == LST_TRANS_TSBCLIADD) {
 822                npg = sfw_id_pages(test->tes_span);
 823                nob = !(feats & LST_FEAT_BULK_LEN) ?
 824                      npg * PAGE_SIZE :
 825                      sizeof(struct lnet_process_id_packed) * test->tes_span;
 826        }
 827
 828        rc = lstcon_rpc_prep(nd, SRPC_SERVICE_TEST, feats, npg, nob, crpc);
 829        if (rc)
 830                return rc;
 831
 832        trq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.tes_reqst;
 833
 834        if (transop == LST_TRANS_TSBSRVADD) {
 835                int ndist = DIV_ROUND_UP(sgrp->grp_nnode, test->tes_dist);
 836                int nspan = DIV_ROUND_UP(dgrp->grp_nnode, test->tes_span);
 837                int nmax = DIV_ROUND_UP(ndist, nspan);
 838
 839                trq->tsr_ndest = 0;
 840                trq->tsr_loop = nmax * test->tes_dist * test->tes_concur;
 841        } else {
 842                bulk = &(*crpc)->crp_rpc->crpc_bulk;
 843
 844                for (i = 0; i < npg; i++) {
 845                        int len;
 846
 847                        LASSERT(nob > 0);
 848
 849                        len = !(feats & LST_FEAT_BULK_LEN) ?
 850                              PAGE_SIZE :
 851                              min_t(int, nob, PAGE_SIZE);
 852                        nob -= len;
 853
 854                        bulk->bk_iovs[i].bv_offset = 0;
 855                        bulk->bk_iovs[i].bv_len = len;
 856                        bulk->bk_iovs[i].bv_page = alloc_page(GFP_KERNEL);
 857
 858                        if (!bulk->bk_iovs[i].bv_page) {
 859                                lstcon_rpc_put(*crpc);
 860                                return -ENOMEM;
 861                        }
 862                }
 863
 864                bulk->bk_sink = 0;
 865
 866                LASSERT(transop == LST_TRANS_TSBCLIADD);
 867
 868                rc = lstcon_dstnodes_prep(test->tes_dst_grp,
 869                                          test->tes_cliidx++,
 870                                          test->tes_dist,
 871                                          test->tes_span,
 872                                          npg, &bulk->bk_iovs[0]);
 873                if (rc) {
 874                        lstcon_rpc_put(*crpc);
 875                        return rc;
 876                }
 877
 878                trq->tsr_ndest = test->tes_span;
 879                trq->tsr_loop = test->tes_loop;
 880        }
 881
 882        trq->tsr_sid = console_session.ses_id;
 883        trq->tsr_bid = test->tes_hdr.tsb_id;
 884        trq->tsr_concur = test->tes_concur;
 885        trq->tsr_is_client = (transop == LST_TRANS_TSBCLIADD) ? 1 : 0;
 886        trq->tsr_stop_onerr = !!test->tes_stop_onerr;
 887
 888        switch (test->tes_type) {
 889        case LST_TEST_PING:
 890                trq->tsr_service = SRPC_SERVICE_PING;
 891                rc = lstcon_pingrpc_prep((struct lst_test_ping_param *)
 892                                         &test->tes_param[0], trq);
 893                break;
 894
 895        case LST_TEST_BULK:
 896                trq->tsr_service = SRPC_SERVICE_BRW;
 897                if (!(feats & LST_FEAT_BULK_LEN)) {
 898                        rc = lstcon_bulkrpc_v0_prep((struct lst_test_bulk_param *)
 899                                                    &test->tes_param[0], trq);
 900                } else {
 901                        rc = lstcon_bulkrpc_v1_prep((struct lst_test_bulk_param *)
 902                                                    &test->tes_param[0],
 903                                                    trq->tsr_is_client, trq);
 904                }
 905
 906                break;
 907        default:
 908                LBUG();
 909                break;
 910        }
 911
 912        return rc;
 913}
 914
 915static int
 916lstcon_sesnew_stat_reply(struct lstcon_rpc_trans *trans,
 917                         struct lstcon_node *nd, struct srpc_msg *reply)
 918{
 919        struct srpc_mksn_reply *mksn_rep = &reply->msg_body.mksn_reply;
 920        int status = mksn_rep->mksn_status;
 921
 922        if (!status &&
 923            (reply->msg_ses_feats & ~LST_FEATS_MASK)) {
 924                mksn_rep->mksn_status = EPROTO;
 925                status = EPROTO;
 926        }
 927
 928        if (status == EPROTO) {
 929                CNETERR("session protocol error from %s: %u\n",
 930                        libcfs_nid2str(nd->nd_id.nid),
 931                        reply->msg_ses_feats);
 932        }
 933
 934        if (status)
 935                return status;
 936
 937        if (!trans->tas_feats_updated) {
 938                spin_lock(&console_session.ses_rpc_lock);
 939                if (!trans->tas_feats_updated) {        /* recheck with lock */
 940                        trans->tas_feats_updated = 1;
 941                        trans->tas_features = reply->msg_ses_feats;
 942                }
 943                spin_unlock(&console_session.ses_rpc_lock);
 944        }
 945
 946        if (reply->msg_ses_feats != trans->tas_features) {
 947                CNETERR("Framework features %x from %s is different with features on this transaction: %x\n",
 948                        reply->msg_ses_feats, libcfs_nid2str(nd->nd_id.nid),
 949                        trans->tas_features);
 950                mksn_rep->mksn_status = EPROTO;
 951                status = EPROTO;
 952        }
 953
 954        if (!status) {
 955                /* session timeout on remote node */
 956                nd->nd_timeout = mksn_rep->mksn_timeout;
 957        }
 958
 959        return status;
 960}
 961
 962void
 963lstcon_rpc_stat_reply(struct lstcon_rpc_trans *trans, struct srpc_msg *msg,
 964                      struct lstcon_node *nd, struct lstcon_trans_stat *stat)
 965{
 966        struct srpc_rmsn_reply *rmsn_rep;
 967        struct srpc_debug_reply *dbg_rep;
 968        struct srpc_batch_reply *bat_rep;
 969        struct srpc_test_reply *test_rep;
 970        struct srpc_stat_reply *stat_rep;
 971        int rc = 0;
 972
 973        switch (trans->tas_opc) {
 974        case LST_TRANS_SESNEW:
 975                rc = lstcon_sesnew_stat_reply(trans, nd, msg);
 976                if (!rc) {
 977                        lstcon_sesop_stat_success(stat, 1);
 978                        return;
 979                }
 980
 981                lstcon_sesop_stat_failure(stat, 1);
 982                break;
 983
 984        case LST_TRANS_SESEND:
 985                rmsn_rep = &msg->msg_body.rmsn_reply;
 986                /* ESRCH is not an error for end session */
 987                if (!rmsn_rep->rmsn_status ||
 988                    rmsn_rep->rmsn_status == ESRCH) {
 989                        lstcon_sesop_stat_success(stat, 1);
 990                        return;
 991                }
 992
 993                lstcon_sesop_stat_failure(stat, 1);
 994                rc = rmsn_rep->rmsn_status;
 995                break;
 996
 997        case LST_TRANS_SESQRY:
 998        case LST_TRANS_SESPING:
 999                dbg_rep = &msg->msg_body.dbg_reply;
1000
1001                if (dbg_rep->dbg_status == ESRCH) {
1002                        lstcon_sesqry_stat_unknown(stat, 1);
1003                        return;
1004                }
1005
1006                if (lstcon_session_match(dbg_rep->dbg_sid))
1007                        lstcon_sesqry_stat_active(stat, 1);
1008                else
1009                        lstcon_sesqry_stat_busy(stat, 1);
1010                return;
1011
1012        case LST_TRANS_TSBRUN:
1013        case LST_TRANS_TSBSTOP:
1014                bat_rep = &msg->msg_body.bat_reply;
1015
1016                if (!bat_rep->bar_status) {
1017                        lstcon_tsbop_stat_success(stat, 1);
1018                        return;
1019                }
1020
1021                if (bat_rep->bar_status == EPERM &&
1022                    trans->tas_opc == LST_TRANS_TSBSTOP) {
1023                        lstcon_tsbop_stat_success(stat, 1);
1024                        return;
1025                }
1026
1027                lstcon_tsbop_stat_failure(stat, 1);
1028                rc = bat_rep->bar_status;
1029                break;
1030
1031        case LST_TRANS_TSBCLIQRY:
1032        case LST_TRANS_TSBSRVQRY:
1033                bat_rep = &msg->msg_body.bat_reply;
1034
1035                if (bat_rep->bar_active)
1036                        lstcon_tsbqry_stat_run(stat, 1);
1037                else
1038                        lstcon_tsbqry_stat_idle(stat, 1);
1039
1040                if (!bat_rep->bar_status)
1041                        return;
1042
1043                lstcon_tsbqry_stat_failure(stat, 1);
1044                rc = bat_rep->bar_status;
1045                break;
1046
1047        case LST_TRANS_TSBCLIADD:
1048        case LST_TRANS_TSBSRVADD:
1049                test_rep = &msg->msg_body.tes_reply;
1050
1051                if (!test_rep->tsr_status) {
1052                        lstcon_tsbop_stat_success(stat, 1);
1053                        return;
1054                }
1055
1056                lstcon_tsbop_stat_failure(stat, 1);
1057                rc = test_rep->tsr_status;
1058                break;
1059
1060        case LST_TRANS_STATQRY:
1061                stat_rep = &msg->msg_body.stat_reply;
1062
1063                if (!stat_rep->str_status) {
1064                        lstcon_statqry_stat_success(stat, 1);
1065                        return;
1066                }
1067
1068                lstcon_statqry_stat_failure(stat, 1);
1069                rc = stat_rep->str_status;
1070                break;
1071
1072        default:
1073                LBUG();
1074        }
1075
1076        if (!stat->trs_fwk_errno)
1077                stat->trs_fwk_errno = rc;
1078}
1079
1080int
1081lstcon_rpc_trans_ndlist(struct list_head *ndlist,
1082                        struct list_head *translist, int transop,
1083                        void *arg, lstcon_rpc_cond_func_t condition,
1084                        struct lstcon_rpc_trans **transpp)
1085{
1086        struct lstcon_rpc_trans *trans;
1087        struct lstcon_ndlink *ndl;
1088        struct lstcon_node *nd;
1089        struct lstcon_rpc *rpc;
1090        unsigned int feats;
1091        int rc;
1092
1093        /* Creating session RPG for list of nodes */
1094
1095        rc = lstcon_rpc_trans_prep(translist, transop, &trans);
1096        if (rc) {
1097                CERROR("Can't create transaction %d: %d\n", transop, rc);
1098                return rc;
1099        }
1100
1101        feats = trans->tas_features;
1102        list_for_each_entry(ndl, ndlist, ndl_link) {
1103                rc = !condition ? 1 :
1104                     condition(transop, ndl->ndl_node, arg);
1105
1106                if (!rc)
1107                        continue;
1108
1109                if (rc < 0) {
1110                        CDEBUG(D_NET, "Condition error while creating RPC for transaction %d: %d\n",
1111                               transop, rc);
1112                        break;
1113                }
1114
1115                nd = ndl->ndl_node;
1116
1117                switch (transop) {
1118                case LST_TRANS_SESNEW:
1119                case LST_TRANS_SESEND:
1120                        rc = lstcon_sesrpc_prep(nd, transop, feats, &rpc);
1121                        break;
1122                case LST_TRANS_SESQRY:
1123                case LST_TRANS_SESPING:
1124                        rc = lstcon_dbgrpc_prep(nd, feats, &rpc);
1125                        break;
1126                case LST_TRANS_TSBCLIADD:
1127                case LST_TRANS_TSBSRVADD:
1128                        rc = lstcon_testrpc_prep(nd, transop, feats,
1129                                                 (struct lstcon_test *)arg,
1130                                                 &rpc);
1131                        break;
1132                case LST_TRANS_TSBRUN:
1133                case LST_TRANS_TSBSTOP:
1134                case LST_TRANS_TSBCLIQRY:
1135                case LST_TRANS_TSBSRVQRY:
1136                        rc = lstcon_batrpc_prep(nd, transop, feats,
1137                                                (struct lstcon_tsb_hdr *)arg,
1138                                                &rpc);
1139                        break;
1140                case LST_TRANS_STATQRY:
1141                        rc = lstcon_statrpc_prep(nd, feats, &rpc);
1142                        break;
1143                default:
1144                        rc = -EINVAL;
1145                        break;
1146                }
1147
1148                if (rc) {
1149                        CERROR("Failed to create RPC for transaction %s: %d\n",
1150                               lstcon_rpc_trans_name(transop), rc);
1151                        break;
1152                }
1153
1154                lstcon_rpc_trans_addreq(trans, rpc);
1155        }
1156
1157        if (!rc) {
1158                *transpp = trans;
1159                return 0;
1160        }
1161
1162        lstcon_rpc_trans_destroy(trans);
1163
1164        return rc;
1165}
1166
1167static void
1168lstcon_rpc_pinger(void *arg)
1169{
1170        struct stt_timer *ptimer = (struct stt_timer *)arg;
1171        struct lstcon_rpc_trans *trans;
1172        struct lstcon_rpc *crpc;
1173        struct srpc_msg *rep;
1174        struct srpc_debug_reqst *drq;
1175        struct lstcon_ndlink *ndl;
1176        struct lstcon_node *nd;
1177        int intv;
1178        int count = 0;
1179        int rc;
1180
1181        /*
1182         * RPC pinger is a special case of transaction,
1183         * it's called by timer at 8 seconds interval.
1184         */
1185        mutex_lock(&console_session.ses_mutex);
1186
1187        if (console_session.ses_shutdown || console_session.ses_expired) {
1188                mutex_unlock(&console_session.ses_mutex);
1189                return;
1190        }
1191
1192        if (!console_session.ses_expired &&
1193            ktime_get_real_seconds() - console_session.ses_laststamp >
1194            (time64_t)console_session.ses_timeout)
1195                console_session.ses_expired = 1;
1196
1197        trans = console_session.ses_ping;
1198
1199        LASSERT(trans);
1200
1201        list_for_each_entry(ndl, &console_session.ses_ndl_list, ndl_link) {
1202                nd = ndl->ndl_node;
1203
1204                if (console_session.ses_expired) {
1205                        /* idle console, end session on all nodes */
1206                        if (nd->nd_state != LST_NODE_ACTIVE)
1207                                continue;
1208
1209                        rc = lstcon_sesrpc_prep(nd, LST_TRANS_SESEND,
1210                                                trans->tas_features, &crpc);
1211                        if (rc) {
1212                                CERROR("Out of memory\n");
1213                                break;
1214                        }
1215
1216                        lstcon_rpc_trans_addreq(trans, crpc);
1217                        lstcon_rpc_post(crpc);
1218
1219                        continue;
1220                }
1221
1222                crpc = &nd->nd_ping;
1223
1224                if (crpc->crp_rpc) {
1225                        LASSERT(crpc->crp_trans == trans);
1226                        LASSERT(!list_empty(&crpc->crp_link));
1227
1228                        spin_lock(&crpc->crp_rpc->crpc_lock);
1229
1230                        LASSERT(crpc->crp_posted);
1231
1232                        if (!crpc->crp_finished) {
1233                                /* in flight */
1234                                spin_unlock(&crpc->crp_rpc->crpc_lock);
1235                                continue;
1236                        }
1237
1238                        spin_unlock(&crpc->crp_rpc->crpc_lock);
1239
1240                        lstcon_rpc_get_reply(crpc, &rep);
1241
1242                        list_del_init(&crpc->crp_link);
1243
1244                        lstcon_rpc_put(crpc);
1245                }
1246
1247                if (nd->nd_state != LST_NODE_ACTIVE)
1248                        continue;
1249
1250                intv = (jiffies - nd->nd_stamp) / msecs_to_jiffies(MSEC_PER_SEC);
1251                if (intv < nd->nd_timeout / 2)
1252                        continue;
1253
1254                rc = lstcon_rpc_init(nd, SRPC_SERVICE_DEBUG,
1255                                     trans->tas_features, 0, 0, 1, crpc);
1256                if (rc) {
1257                        CERROR("Out of memory\n");
1258                        break;
1259                }
1260
1261                drq = &crpc->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
1262
1263                drq->dbg_sid = console_session.ses_id;
1264                drq->dbg_flags = 0;
1265
1266                lstcon_rpc_trans_addreq(trans, crpc);
1267                lstcon_rpc_post(crpc);
1268
1269                count++;
1270        }
1271
1272        if (console_session.ses_expired) {
1273                mutex_unlock(&console_session.ses_mutex);
1274                return;
1275        }
1276
1277        CDEBUG(D_NET, "Ping %d nodes in session\n", count);
1278
1279        ptimer->stt_expires = ktime_get_real_seconds() + LST_PING_INTERVAL;
1280        stt_add_timer(ptimer);
1281
1282        mutex_unlock(&console_session.ses_mutex);
1283}
1284
1285int
1286lstcon_rpc_pinger_start(void)
1287{
1288        struct stt_timer *ptimer;
1289        int rc;
1290
1291        LASSERT(list_empty(&console_session.ses_rpc_freelist));
1292        LASSERT(!atomic_read(&console_session.ses_rpc_counter));
1293
1294        rc = lstcon_rpc_trans_prep(NULL, LST_TRANS_SESPING,
1295                                   &console_session.ses_ping);
1296        if (rc) {
1297                CERROR("Failed to create console pinger\n");
1298                return rc;
1299        }
1300
1301        ptimer = &console_session.ses_ping_timer;
1302        ptimer->stt_expires = ktime_get_real_seconds() + LST_PING_INTERVAL;
1303
1304        stt_add_timer(ptimer);
1305
1306        return 0;
1307}
1308
1309void
1310lstcon_rpc_pinger_stop(void)
1311{
1312        LASSERT(console_session.ses_shutdown);
1313
1314        stt_del_timer(&console_session.ses_ping_timer);
1315
1316        lstcon_rpc_trans_abort(console_session.ses_ping, -ESHUTDOWN);
1317        lstcon_rpc_trans_stat(console_session.ses_ping, lstcon_trans_stat());
1318        lstcon_rpc_trans_destroy(console_session.ses_ping);
1319
1320        memset(lstcon_trans_stat(), 0, sizeof(struct lstcon_trans_stat));
1321
1322        console_session.ses_ping = NULL;
1323}
1324
1325void
1326lstcon_rpc_cleanup_wait(void)
1327{
1328        struct lstcon_rpc_trans *trans;
1329        struct lstcon_rpc *crpc;
1330        struct lstcon_rpc *temp;
1331        struct list_head *pacer;
1332        struct list_head zlist;
1333
1334        /* Called with hold of global mutex */
1335
1336        LASSERT(console_session.ses_shutdown);
1337
1338        while (!list_empty(&console_session.ses_trans_list)) {
1339                list_for_each(pacer, &console_session.ses_trans_list) {
1340                        trans = list_entry(pacer, struct lstcon_rpc_trans,
1341                                           tas_link);
1342
1343                        CDEBUG(D_NET, "Session closed, wakeup transaction %s\n",
1344                               lstcon_rpc_trans_name(trans->tas_opc));
1345
1346                        wake_up(&trans->tas_waitq);
1347                }
1348
1349                mutex_unlock(&console_session.ses_mutex);
1350
1351                CWARN("Session is shutting down, waiting for termination of transactions\n");
1352                set_current_state(TASK_UNINTERRUPTIBLE);
1353                schedule_timeout(cfs_time_seconds(1));
1354
1355                mutex_lock(&console_session.ses_mutex);
1356        }
1357
1358        spin_lock(&console_session.ses_rpc_lock);
1359
1360        lst_wait_until(!atomic_read(&console_session.ses_rpc_counter),
1361                       console_session.ses_rpc_lock,
1362                       "Network is not accessible or target is down, waiting for %d console RPCs to being recycled\n",
1363                       atomic_read(&console_session.ses_rpc_counter));
1364
1365        list_add(&zlist, &console_session.ses_rpc_freelist);
1366        list_del_init(&console_session.ses_rpc_freelist);
1367
1368        spin_unlock(&console_session.ses_rpc_lock);
1369
1370        list_for_each_entry_safe(crpc, temp, &zlist, crp_link) {
1371                list_del(&crpc->crp_link);
1372                kfree(crpc);
1373        }
1374}
1375
1376int
1377lstcon_rpc_module_init(void)
1378{
1379        INIT_LIST_HEAD(&console_session.ses_ping_timer.stt_list);
1380        console_session.ses_ping_timer.stt_func = lstcon_rpc_pinger;
1381        console_session.ses_ping_timer.stt_data = &console_session.ses_ping_timer;
1382
1383        console_session.ses_ping = NULL;
1384
1385        spin_lock_init(&console_session.ses_rpc_lock);
1386        atomic_set(&console_session.ses_rpc_counter, 0);
1387        INIT_LIST_HEAD(&console_session.ses_rpc_freelist);
1388
1389        return 0;
1390}
1391
1392void
1393lstcon_rpc_module_fini(void)
1394{
1395        LASSERT(list_empty(&console_session.ses_rpc_freelist));
1396        LASSERT(!atomic_read(&console_session.ses_rpc_counter));
1397}
1398