linux/drivers/staging/lustre/lnet/selftest/conrpc.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lnet/selftest/conctl.c
  37 *
  38 * Console framework rpcs
  39 *
  40 * Author: Liang Zhen <liang@whamcloud.com>
  41 */
  42
  43#include "../../include/linux/libcfs/libcfs.h"
  44#include "../../include/linux/lnet/lib-lnet.h"
  45#include "timer.h"
  46#include "conrpc.h"
  47#include "console.h"
  48
  49void lstcon_rpc_stat_reply(lstcon_rpc_trans_t *, srpc_msg_t *,
  50                           lstcon_node_t *, lstcon_trans_stat_t *);
  51
  52static void
  53lstcon_rpc_done(srpc_client_rpc_t *rpc)
  54{
  55        lstcon_rpc_t *crpc = (lstcon_rpc_t *)rpc->crpc_priv;
  56
  57        LASSERT(crpc != NULL && rpc == crpc->crp_rpc);
  58        LASSERT(crpc->crp_posted && !crpc->crp_finished);
  59
  60        spin_lock(&rpc->crpc_lock);
  61
  62        if (crpc->crp_trans == NULL) {
  63                /* Orphan RPC is not in any transaction,
  64                 * I'm just a poor body and nobody loves me */
  65                spin_unlock(&rpc->crpc_lock);
  66
  67                /* release it */
  68                lstcon_rpc_put(crpc);
  69                return;
  70        }
  71
  72        /* not an orphan RPC */
  73        crpc->crp_finished = 1;
  74
  75        if (crpc->crp_stamp == 0) {
  76                /* not aborted */
  77                LASSERT(crpc->crp_status == 0);
  78
  79                crpc->crp_stamp  = cfs_time_current();
  80                crpc->crp_status = rpc->crpc_status;
  81        }
  82
  83        /* wakeup (transaction)thread if I'm the last RPC in the transaction */
  84        if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining))
  85                wake_up(&crpc->crp_trans->tas_waitq);
  86
  87        spin_unlock(&rpc->crpc_lock);
  88}
  89
  90static int
  91lstcon_rpc_init(lstcon_node_t *nd, int service, unsigned feats,
  92                int bulk_npg, int bulk_len, int embedded, lstcon_rpc_t *crpc)
  93{
  94        crpc->crp_rpc = sfw_create_rpc(nd->nd_id, service,
  95                                       feats, bulk_npg, bulk_len,
  96                                       lstcon_rpc_done, (void *)crpc);
  97        if (crpc->crp_rpc == NULL)
  98                return -ENOMEM;
  99
 100        crpc->crp_trans    = NULL;
 101        crpc->crp_node     = nd;
 102        crpc->crp_posted   = 0;
 103        crpc->crp_finished = 0;
 104        crpc->crp_unpacked = 0;
 105        crpc->crp_status   = 0;
 106        crpc->crp_stamp    = 0;
 107        crpc->crp_embedded = embedded;
 108        INIT_LIST_HEAD(&crpc->crp_link);
 109
 110        atomic_inc(&console_session.ses_rpc_counter);
 111
 112        return 0;
 113}
 114
 115static int
 116lstcon_rpc_prep(lstcon_node_t *nd, int service, unsigned feats,
 117                int bulk_npg, int bulk_len, lstcon_rpc_t **crpcpp)
 118{
 119        lstcon_rpc_t *crpc = NULL;
 120        int rc;
 121
 122        spin_lock(&console_session.ses_rpc_lock);
 123
 124        if (!list_empty(&console_session.ses_rpc_freelist)) {
 125                crpc = list_entry(console_session.ses_rpc_freelist.next,
 126                                      lstcon_rpc_t, crp_link);
 127                list_del_init(&crpc->crp_link);
 128        }
 129
 130        spin_unlock(&console_session.ses_rpc_lock);
 131
 132        if (crpc == NULL) {
 133                LIBCFS_ALLOC(crpc, sizeof(*crpc));
 134                if (crpc == NULL)
 135                        return -ENOMEM;
 136        }
 137
 138        rc = lstcon_rpc_init(nd, service, feats, bulk_npg, bulk_len, 0, crpc);
 139        if (rc == 0) {
 140                *crpcpp = crpc;
 141                return 0;
 142        }
 143
 144        LIBCFS_FREE(crpc, sizeof(*crpc));
 145
 146        return rc;
 147}
 148
 149void
 150lstcon_rpc_put(lstcon_rpc_t *crpc)
 151{
 152        srpc_bulk_t *bulk = &crpc->crp_rpc->crpc_bulk;
 153        int i;
 154
 155        LASSERT(list_empty(&crpc->crp_link));
 156
 157        for (i = 0; i < bulk->bk_niov; i++) {
 158                if (bulk->bk_iovs[i].kiov_page == NULL)
 159                        continue;
 160
 161                __free_page(bulk->bk_iovs[i].kiov_page);
 162        }
 163
 164        srpc_client_rpc_decref(crpc->crp_rpc);
 165
 166        if (crpc->crp_embedded) {
 167                /* embedded RPC, don't recycle it */
 168                memset(crpc, 0, sizeof(*crpc));
 169                crpc->crp_embedded = 1;
 170
 171        } else {
 172                spin_lock(&console_session.ses_rpc_lock);
 173
 174                list_add(&crpc->crp_link,
 175                             &console_session.ses_rpc_freelist);
 176
 177                spin_unlock(&console_session.ses_rpc_lock);
 178        }
 179
 180        /* RPC is not alive now */
 181        atomic_dec(&console_session.ses_rpc_counter);
 182}
 183
 184static void
 185lstcon_rpc_post(lstcon_rpc_t *crpc)
 186{
 187        lstcon_rpc_trans_t *trans = crpc->crp_trans;
 188
 189        LASSERT(trans != NULL);
 190
 191        atomic_inc(&trans->tas_remaining);
 192        crpc->crp_posted = 1;
 193
 194        sfw_post_rpc(crpc->crp_rpc);
 195}
 196
 197static char *
 198lstcon_rpc_trans_name(int transop)
 199{
 200        if (transop == LST_TRANS_SESNEW)
 201                return "SESNEW";
 202
 203        if (transop == LST_TRANS_SESEND)
 204                return "SESEND";
 205
 206        if (transop == LST_TRANS_SESQRY)
 207                return "SESQRY";
 208
 209        if (transop == LST_TRANS_SESPING)
 210                return "SESPING";
 211
 212        if (transop == LST_TRANS_TSBCLIADD)
 213                return "TSBCLIADD";
 214
 215        if (transop == LST_TRANS_TSBSRVADD)
 216                return "TSBSRVADD";
 217
 218        if (transop == LST_TRANS_TSBRUN)
 219                return "TSBRUN";
 220
 221        if (transop == LST_TRANS_TSBSTOP)
 222                return "TSBSTOP";
 223
 224        if (transop == LST_TRANS_TSBCLIQRY)
 225                return "TSBCLIQRY";
 226
 227        if (transop == LST_TRANS_TSBSRVQRY)
 228                return "TSBSRVQRY";
 229
 230        if (transop == LST_TRANS_STATQRY)
 231                return "STATQRY";
 232
 233        return "Unknown";
 234}
 235
 236int
 237lstcon_rpc_trans_prep(struct list_head *translist,
 238                      int transop, lstcon_rpc_trans_t **transpp)
 239{
 240        lstcon_rpc_trans_t *trans;
 241
 242        if (translist != NULL) {
 243                list_for_each_entry(trans, translist, tas_link) {
 244                        /* Can't enqueue two private transaction on
 245                         * the same object */
 246                        if ((trans->tas_opc & transop) == LST_TRANS_PRIVATE)
 247                                return -EPERM;
 248                }
 249        }
 250
 251        /* create a trans group */
 252        LIBCFS_ALLOC(trans, sizeof(*trans));
 253        if (trans == NULL)
 254                return -ENOMEM;
 255
 256        trans->tas_opc = transop;
 257
 258        if (translist == NULL)
 259                INIT_LIST_HEAD(&trans->tas_olink);
 260        else
 261                list_add_tail(&trans->tas_olink, translist);
 262
 263        list_add_tail(&trans->tas_link, &console_session.ses_trans_list);
 264
 265        INIT_LIST_HEAD(&trans->tas_rpcs_list);
 266        atomic_set(&trans->tas_remaining, 0);
 267        init_waitqueue_head(&trans->tas_waitq);
 268
 269        spin_lock(&console_session.ses_rpc_lock);
 270        trans->tas_features = console_session.ses_features;
 271        spin_unlock(&console_session.ses_rpc_lock);
 272
 273        *transpp = trans;
 274        return 0;
 275}
 276
 277void
 278lstcon_rpc_trans_addreq(lstcon_rpc_trans_t *trans, lstcon_rpc_t *crpc)
 279{
 280        list_add_tail(&crpc->crp_link, &trans->tas_rpcs_list);
 281        crpc->crp_trans = trans;
 282}
 283
 284void
 285lstcon_rpc_trans_abort(lstcon_rpc_trans_t *trans, int error)
 286{
 287        srpc_client_rpc_t *rpc;
 288        lstcon_rpc_t      *crpc;
 289        lstcon_node_t     *nd;
 290
 291        list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
 292                rpc = crpc->crp_rpc;
 293
 294                spin_lock(&rpc->crpc_lock);
 295
 296                if (!crpc->crp_posted || /* not posted */
 297                    crpc->crp_stamp != 0) { /* rpc done or aborted already */
 298                        if (crpc->crp_stamp == 0) {
 299                                crpc->crp_stamp = cfs_time_current();
 300                                crpc->crp_status = -EINTR;
 301                        }
 302                        spin_unlock(&rpc->crpc_lock);
 303                        continue;
 304                }
 305
 306                crpc->crp_stamp  = cfs_time_current();
 307                crpc->crp_status = error;
 308
 309                spin_unlock(&rpc->crpc_lock);
 310
 311                sfw_abort_rpc(rpc);
 312
 313                if (error != ETIMEDOUT)
 314                        continue;
 315
 316                nd = crpc->crp_node;
 317                if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
 318                        continue;
 319
 320                nd->nd_stamp = crpc->crp_stamp;
 321                nd->nd_state = LST_NODE_DOWN;
 322        }
 323}
 324
 325static int
 326lstcon_rpc_trans_check(lstcon_rpc_trans_t *trans)
 327{
 328        if (console_session.ses_shutdown &&
 329            !list_empty(&trans->tas_olink)) /* Not an end session RPC */
 330                return 1;
 331
 332        return (atomic_read(&trans->tas_remaining) == 0) ? 1 : 0;
 333}
 334
 335int
 336lstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout)
 337{
 338        lstcon_rpc_t *crpc;
 339        int rc;
 340
 341        if (list_empty(&trans->tas_rpcs_list))
 342                return 0;
 343
 344        if (timeout < LST_TRANS_MIN_TIMEOUT)
 345                timeout = LST_TRANS_MIN_TIMEOUT;
 346
 347        CDEBUG(D_NET, "Transaction %s started\n",
 348               lstcon_rpc_trans_name(trans->tas_opc));
 349
 350        /* post all requests */
 351        list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
 352                LASSERT(!crpc->crp_posted);
 353
 354                lstcon_rpc_post(crpc);
 355        }
 356
 357        mutex_unlock(&console_session.ses_mutex);
 358
 359        rc = wait_event_interruptible_timeout(trans->tas_waitq,
 360                                              lstcon_rpc_trans_check(trans),
 361                                              cfs_time_seconds(timeout));
 362        rc = (rc > 0) ? 0 : ((rc < 0) ? -EINTR : -ETIMEDOUT);
 363
 364        mutex_lock(&console_session.ses_mutex);
 365
 366        if (console_session.ses_shutdown)
 367                rc = -ESHUTDOWN;
 368
 369        if (rc != 0 || atomic_read(&trans->tas_remaining) != 0) {
 370                /* treat short timeout as canceled */
 371                if (rc == -ETIMEDOUT && timeout < LST_TRANS_MIN_TIMEOUT * 2)
 372                        rc = -EINTR;
 373
 374                lstcon_rpc_trans_abort(trans, rc);
 375        }
 376
 377        CDEBUG(D_NET, "Transaction %s stopped: %d\n",
 378               lstcon_rpc_trans_name(trans->tas_opc), rc);
 379
 380        lstcon_rpc_trans_stat(trans, lstcon_trans_stat());
 381
 382        return rc;
 383}
 384
 385static int
 386lstcon_rpc_get_reply(lstcon_rpc_t *crpc, srpc_msg_t **msgpp)
 387{
 388        lstcon_node_t *nd  = crpc->crp_node;
 389        srpc_client_rpc_t *rpc = crpc->crp_rpc;
 390        srpc_generic_reply_t *rep;
 391
 392        LASSERT(nd != NULL && rpc != NULL);
 393        LASSERT(crpc->crp_stamp != 0);
 394
 395        if (crpc->crp_status != 0) {
 396                *msgpp = NULL;
 397                return crpc->crp_status;
 398        }
 399
 400        *msgpp = &rpc->crpc_replymsg;
 401        if (!crpc->crp_unpacked) {
 402                sfw_unpack_message(*msgpp);
 403                crpc->crp_unpacked = 1;
 404        }
 405
 406        if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
 407                return 0;
 408
 409        nd->nd_stamp = crpc->crp_stamp;
 410        rep = &(*msgpp)->msg_body.reply;
 411
 412        if (rep->sid.ses_nid == LNET_NID_ANY)
 413                nd->nd_state = LST_NODE_UNKNOWN;
 414        else if (lstcon_session_match(rep->sid))
 415                nd->nd_state = LST_NODE_ACTIVE;
 416        else
 417                nd->nd_state = LST_NODE_BUSY;
 418
 419        return 0;
 420}
 421
 422void
 423lstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, lstcon_trans_stat_t *stat)
 424{
 425        lstcon_rpc_t  *crpc;
 426        srpc_msg_t *rep;
 427        int error;
 428
 429        LASSERT(stat != NULL);
 430
 431        memset(stat, 0, sizeof(*stat));
 432
 433        list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
 434                lstcon_rpc_stat_total(stat, 1);
 435
 436                LASSERT(crpc->crp_stamp != 0);
 437
 438                error = lstcon_rpc_get_reply(crpc, &rep);
 439                if (error != 0) {
 440                        lstcon_rpc_stat_failure(stat, 1);
 441                        if (stat->trs_rpc_errno == 0)
 442                                stat->trs_rpc_errno = -error;
 443
 444                        continue;
 445                }
 446
 447                lstcon_rpc_stat_success(stat, 1);
 448
 449                lstcon_rpc_stat_reply(trans, rep, crpc->crp_node, stat);
 450        }
 451
 452        if (trans->tas_opc == LST_TRANS_SESNEW && stat->trs_fwk_errno == 0) {
 453                stat->trs_fwk_errno =
 454                      lstcon_session_feats_check(trans->tas_features);
 455        }
 456
 457        CDEBUG(D_NET, "transaction %s : success %d, failure %d, total %d, RPC error(%d), Framework error(%d)\n",
 458               lstcon_rpc_trans_name(trans->tas_opc),
 459               lstcon_rpc_stat_success(stat, 0),
 460               lstcon_rpc_stat_failure(stat, 0),
 461               lstcon_rpc_stat_total(stat, 0),
 462               stat->trs_rpc_errno, stat->trs_fwk_errno);
 463
 464        return;
 465}
 466
 467int
 468lstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans,
 469                             struct list_head *head_up,
 470                             lstcon_rpc_readent_func_t readent)
 471{
 472        struct list_head tmp;
 473        struct list_head *next;
 474        lstcon_rpc_ent_t *ent;
 475        srpc_generic_reply_t *rep;
 476        lstcon_rpc_t *crpc;
 477        srpc_msg_t *msg;
 478        lstcon_node_t *nd;
 479        long dur;
 480        struct timeval tv;
 481        int error;
 482
 483        LASSERT(head_up != NULL);
 484
 485        next = head_up;
 486
 487        list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
 488                if (copy_from_user(&tmp, next,
 489                                       sizeof(struct list_head)))
 490                        return -EFAULT;
 491
 492                if (tmp.next == head_up)
 493                        return 0;
 494
 495                next = tmp.next;
 496
 497                ent = list_entry(next, lstcon_rpc_ent_t, rpe_link);
 498
 499                LASSERT(crpc->crp_stamp != 0);
 500
 501                error = lstcon_rpc_get_reply(crpc, &msg);
 502
 503                nd = crpc->crp_node;
 504
 505                dur = (long)cfs_time_sub(crpc->crp_stamp,
 506                      (unsigned long)console_session.ses_id.ses_stamp);
 507                jiffies_to_timeval(dur, &tv);
 508
 509                if (copy_to_user(&ent->rpe_peer,
 510                                     &nd->nd_id, sizeof(lnet_process_id_t)) ||
 511                    copy_to_user(&ent->rpe_stamp, &tv, sizeof(tv)) ||
 512                    copy_to_user(&ent->rpe_state,
 513                                     &nd->nd_state, sizeof(nd->nd_state)) ||
 514                    copy_to_user(&ent->rpe_rpc_errno, &error,
 515                                     sizeof(error)))
 516                        return -EFAULT;
 517
 518                if (error != 0)
 519                        continue;
 520
 521                /* RPC is done */
 522                rep = (srpc_generic_reply_t *)&msg->msg_body.reply;
 523
 524                if (copy_to_user(&ent->rpe_sid,
 525                                     &rep->sid, sizeof(lst_sid_t)) ||
 526                    copy_to_user(&ent->rpe_fwk_errno,
 527                                     &rep->status, sizeof(rep->status)))
 528                        return -EFAULT;
 529
 530                if (readent == NULL)
 531                        continue;
 532
 533                error = readent(trans->tas_opc, msg, ent);
 534
 535                if (error != 0)
 536                        return error;
 537        }
 538
 539        return 0;
 540}
 541
 542void
 543lstcon_rpc_trans_destroy(lstcon_rpc_trans_t *trans)
 544{
 545        srpc_client_rpc_t *rpc;
 546        lstcon_rpc_t *crpc;
 547        lstcon_rpc_t *tmp;
 548        int count = 0;
 549
 550        list_for_each_entry_safe(crpc, tmp, &trans->tas_rpcs_list,
 551                                 crp_link) {
 552                rpc = crpc->crp_rpc;
 553
 554                spin_lock(&rpc->crpc_lock);
 555
 556                /* free it if not posted or finished already */
 557                if (!crpc->crp_posted || crpc->crp_finished) {
 558                        spin_unlock(&rpc->crpc_lock);
 559
 560                        list_del_init(&crpc->crp_link);
 561                        lstcon_rpc_put(crpc);
 562
 563                        continue;
 564                }
 565
 566                /* rpcs can be still not callbacked (even LNetMDUnlink is called)
 567                 * because huge timeout for inaccessible network, don't make
 568                 * user wait for them, just abandon them, they will be recycled
 569                 * in callback */
 570
 571                LASSERT(crpc->crp_status != 0);
 572
 573                crpc->crp_node  = NULL;
 574                crpc->crp_trans = NULL;
 575                list_del_init(&crpc->crp_link);
 576                count++;
 577
 578                spin_unlock(&rpc->crpc_lock);
 579
 580                atomic_dec(&trans->tas_remaining);
 581        }
 582
 583        LASSERT(atomic_read(&trans->tas_remaining) == 0);
 584
 585        list_del(&trans->tas_link);
 586        if (!list_empty(&trans->tas_olink))
 587                list_del(&trans->tas_olink);
 588
 589        CDEBUG(D_NET, "Transaction %s destroyed with %d pending RPCs\n",
 590               lstcon_rpc_trans_name(trans->tas_opc), count);
 591
 592        LIBCFS_FREE(trans, sizeof(*trans));
 593
 594        return;
 595}
 596
 597int
 598lstcon_sesrpc_prep(lstcon_node_t *nd, int transop,
 599                   unsigned feats, lstcon_rpc_t **crpc)
 600{
 601        srpc_mksn_reqst_t *msrq;
 602        srpc_rmsn_reqst_t *rsrq;
 603        int rc;
 604
 605        switch (transop) {
 606        case LST_TRANS_SESNEW:
 607                rc = lstcon_rpc_prep(nd, SRPC_SERVICE_MAKE_SESSION,
 608                                     feats, 0, 0, crpc);
 609                if (rc != 0)
 610                        return rc;
 611
 612                msrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.mksn_reqst;
 613                msrq->mksn_sid     = console_session.ses_id;
 614                msrq->mksn_force   = console_session.ses_force;
 615                strncpy(msrq->mksn_name, console_session.ses_name,
 616                        strlen(console_session.ses_name));
 617                break;
 618
 619        case LST_TRANS_SESEND:
 620                rc = lstcon_rpc_prep(nd, SRPC_SERVICE_REMOVE_SESSION,
 621                                     feats, 0, 0, crpc);
 622                if (rc != 0)
 623                        return rc;
 624
 625                rsrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.rmsn_reqst;
 626                rsrq->rmsn_sid = console_session.ses_id;
 627                break;
 628
 629        default:
 630                LBUG();
 631        }
 632
 633        return 0;
 634}
 635
 636int
 637lstcon_dbgrpc_prep(lstcon_node_t *nd, unsigned feats, lstcon_rpc_t **crpc)
 638{
 639        srpc_debug_reqst_t *drq;
 640        int rc;
 641
 642        rc = lstcon_rpc_prep(nd, SRPC_SERVICE_DEBUG, feats, 0, 0, crpc);
 643        if (rc != 0)
 644                return rc;
 645
 646        drq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
 647
 648        drq->dbg_sid   = console_session.ses_id;
 649        drq->dbg_flags = 0;
 650
 651        return rc;
 652}
 653
 654int
 655lstcon_batrpc_prep(lstcon_node_t *nd, int transop, unsigned feats,
 656                   lstcon_tsb_hdr_t *tsb, lstcon_rpc_t **crpc)
 657{
 658        lstcon_batch_t     *batch;
 659        srpc_batch_reqst_t *brq;
 660        int                 rc;
 661
 662        rc = lstcon_rpc_prep(nd, SRPC_SERVICE_BATCH, feats, 0, 0, crpc);
 663        if (rc != 0)
 664                return rc;
 665
 666        brq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.bat_reqst;
 667
 668        brq->bar_sid     = console_session.ses_id;
 669        brq->bar_bid     = tsb->tsb_id;
 670        brq->bar_testidx = tsb->tsb_index;
 671        brq->bar_opc     = transop == LST_TRANS_TSBRUN ? SRPC_BATCH_OPC_RUN :
 672                           (transop == LST_TRANS_TSBSTOP ? SRPC_BATCH_OPC_STOP :
 673                            SRPC_BATCH_OPC_QUERY);
 674
 675        if (transop != LST_TRANS_TSBRUN &&
 676            transop != LST_TRANS_TSBSTOP)
 677                return 0;
 678
 679        LASSERT(tsb->tsb_index == 0);
 680
 681        batch = (lstcon_batch_t *)tsb;
 682        brq->bar_arg = batch->bat_arg;
 683
 684        return 0;
 685}
 686
 687int
 688lstcon_statrpc_prep(lstcon_node_t *nd, unsigned feats, lstcon_rpc_t **crpc)
 689{
 690        srpc_stat_reqst_t *srq;
 691        int                rc;
 692
 693        rc = lstcon_rpc_prep(nd, SRPC_SERVICE_QUERY_STAT, feats, 0, 0, crpc);
 694        if (rc != 0)
 695                return rc;
 696
 697        srq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.stat_reqst;
 698
 699        srq->str_sid  = console_session.ses_id;
 700        srq->str_type = 0; /* XXX remove it */
 701
 702        return 0;
 703}
 704
 705static lnet_process_id_packed_t *
 706lstcon_next_id(int idx, int nkiov, lnet_kiov_t *kiov)
 707{
 708        lnet_process_id_packed_t *pid;
 709        int i;
 710
 711        i = idx / SFW_ID_PER_PAGE;
 712
 713        LASSERT(i < nkiov);
 714
 715        pid = (lnet_process_id_packed_t *)page_address(kiov[i].kiov_page);
 716
 717        return &pid[idx % SFW_ID_PER_PAGE];
 718}
 719
 720static int
 721lstcon_dstnodes_prep(lstcon_group_t *grp, int idx,
 722                     int dist, int span, int nkiov, lnet_kiov_t *kiov)
 723{
 724        lnet_process_id_packed_t *pid;
 725        lstcon_ndlink_t *ndl;
 726        lstcon_node_t *nd;
 727        int start;
 728        int end;
 729        int i = 0;
 730
 731        LASSERT(dist >= 1);
 732        LASSERT(span >= 1);
 733        LASSERT(grp->grp_nnode >= 1);
 734
 735        if (span > grp->grp_nnode)
 736                return -EINVAL;
 737
 738        start = ((idx / dist) * span) % grp->grp_nnode;
 739        end   = ((idx / dist) * span + span - 1) % grp->grp_nnode;
 740
 741        list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
 742                nd = ndl->ndl_node;
 743                if (i < start) {
 744                        i++;
 745                        continue;
 746                }
 747
 748                if (i > (end >= start ? end : grp->grp_nnode))
 749                        break;
 750
 751                pid = lstcon_next_id((i - start), nkiov, kiov);
 752                pid->nid = nd->nd_id.nid;
 753                pid->pid = nd->nd_id.pid;
 754                i++;
 755        }
 756
 757        if (start <= end) /* done */
 758                return 0;
 759
 760        list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
 761                if (i > grp->grp_nnode + end)
 762                        break;
 763
 764                nd = ndl->ndl_node;
 765                pid = lstcon_next_id((i - start), nkiov, kiov);
 766                pid->nid = nd->nd_id.nid;
 767                pid->pid = nd->nd_id.pid;
 768                i++;
 769        }
 770
 771        return 0;
 772}
 773
 774static int
 775lstcon_pingrpc_prep(lst_test_ping_param_t *param, srpc_test_reqst_t *req)
 776{
 777        test_ping_req_t *prq = &req->tsr_u.ping;
 778
 779        prq->png_size  = param->png_size;
 780        prq->png_flags = param->png_flags;
 781        /* TODO dest */
 782        return 0;
 783}
 784
 785static int
 786lstcon_bulkrpc_v0_prep(lst_test_bulk_param_t *param, srpc_test_reqst_t *req)
 787{
 788        test_bulk_req_t *brq = &req->tsr_u.bulk_v0;
 789
 790        brq->blk_opc   = param->blk_opc;
 791        brq->blk_npg   = (param->blk_size + PAGE_CACHE_SIZE - 1) /
 792                          PAGE_CACHE_SIZE;
 793        brq->blk_flags = param->blk_flags;
 794
 795        return 0;
 796}
 797
 798static int
 799lstcon_bulkrpc_v1_prep(lst_test_bulk_param_t *param, srpc_test_reqst_t *req)
 800{
 801        test_bulk_req_v1_t *brq = &req->tsr_u.bulk_v1;
 802
 803        brq->blk_opc    = param->blk_opc;
 804        brq->blk_flags  = param->blk_flags;
 805        brq->blk_len    = param->blk_size;
 806        brq->blk_offset = 0; /* reserved */
 807
 808        return 0;
 809}
 810
 811int
 812lstcon_testrpc_prep(lstcon_node_t *nd, int transop, unsigned feats,
 813                    lstcon_test_t *test, lstcon_rpc_t **crpc)
 814{
 815        lstcon_group_t    *sgrp = test->tes_src_grp;
 816        lstcon_group_t    *dgrp = test->tes_dst_grp;
 817        srpc_test_reqst_t *trq;
 818        srpc_bulk_t       *bulk;
 819        int                i;
 820        int                npg = 0;
 821        int                nob = 0;
 822        int                rc  = 0;
 823
 824        if (transop == LST_TRANS_TSBCLIADD) {
 825                npg = sfw_id_pages(test->tes_span);
 826                nob = (feats & LST_FEAT_BULK_LEN) == 0 ?
 827                      npg * PAGE_CACHE_SIZE :
 828                      sizeof(lnet_process_id_packed_t) * test->tes_span;
 829        }
 830
 831        rc = lstcon_rpc_prep(nd, SRPC_SERVICE_TEST, feats, npg, nob, crpc);
 832        if (rc != 0)
 833                return rc;
 834
 835        trq  = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.tes_reqst;
 836
 837        if (transop == LST_TRANS_TSBSRVADD) {
 838                int ndist = (sgrp->grp_nnode + test->tes_dist - 1) /
 839                            test->tes_dist;
 840                int nspan = (dgrp->grp_nnode + test->tes_span - 1) /
 841                            test->tes_span;
 842                int nmax = (ndist + nspan - 1) / nspan;
 843
 844                trq->tsr_ndest = 0;
 845                trq->tsr_loop  = nmax * test->tes_dist * test->tes_concur;
 846
 847        } else {
 848                bulk = &(*crpc)->crp_rpc->crpc_bulk;
 849
 850                for (i = 0; i < npg; i++) {
 851                        int     len;
 852
 853                        LASSERT(nob > 0);
 854
 855                        len = (feats & LST_FEAT_BULK_LEN) == 0 ?
 856                              PAGE_CACHE_SIZE :
 857                              min_t(int, nob, PAGE_CACHE_SIZE);
 858                        nob -= len;
 859
 860                        bulk->bk_iovs[i].kiov_offset = 0;
 861                        bulk->bk_iovs[i].kiov_len    = len;
 862                        bulk->bk_iovs[i].kiov_page   =
 863                                alloc_page(GFP_KERNEL);
 864
 865                        if (bulk->bk_iovs[i].kiov_page == NULL) {
 866                                lstcon_rpc_put(*crpc);
 867                                return -ENOMEM;
 868                        }
 869                }
 870
 871                bulk->bk_sink = 0;
 872
 873                LASSERT(transop == LST_TRANS_TSBCLIADD);
 874
 875                rc = lstcon_dstnodes_prep(test->tes_dst_grp,
 876                                          test->tes_cliidx++,
 877                                          test->tes_dist,
 878                                          test->tes_span,
 879                                          npg, &bulk->bk_iovs[0]);
 880                if (rc != 0) {
 881                        lstcon_rpc_put(*crpc);
 882                        return rc;
 883                }
 884
 885                trq->tsr_ndest = test->tes_span;
 886                trq->tsr_loop  = test->tes_loop;
 887        }
 888
 889        trq->tsr_sid        = console_session.ses_id;
 890        trq->tsr_bid        = test->tes_hdr.tsb_id;
 891        trq->tsr_concur     = test->tes_concur;
 892        trq->tsr_is_client  = (transop == LST_TRANS_TSBCLIADD) ? 1 : 0;
 893        trq->tsr_stop_onerr = !!test->tes_stop_onerr;
 894
 895        switch (test->tes_type) {
 896        case LST_TEST_PING:
 897                trq->tsr_service = SRPC_SERVICE_PING;
 898                rc = lstcon_pingrpc_prep((lst_test_ping_param_t *)
 899                                         &test->tes_param[0], trq);
 900                break;
 901
 902        case LST_TEST_BULK:
 903                trq->tsr_service = SRPC_SERVICE_BRW;
 904                if ((feats & LST_FEAT_BULK_LEN) == 0) {
 905                        rc = lstcon_bulkrpc_v0_prep((lst_test_bulk_param_t *)
 906                                                    &test->tes_param[0], trq);
 907                } else {
 908                        rc = lstcon_bulkrpc_v1_prep((lst_test_bulk_param_t *)
 909                                                    &test->tes_param[0], trq);
 910                }
 911
 912                break;
 913        default:
 914                LBUG();
 915                break;
 916        }
 917
 918        return rc;
 919}
 920
 921static int
 922lstcon_sesnew_stat_reply(lstcon_rpc_trans_t *trans,
 923                         lstcon_node_t *nd, srpc_msg_t *reply)
 924{
 925        srpc_mksn_reply_t *mksn_rep = &reply->msg_body.mksn_reply;
 926        int                status   = mksn_rep->mksn_status;
 927
 928        if (status == 0 &&
 929            (reply->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
 930                mksn_rep->mksn_status = EPROTO;
 931                status = EPROTO;
 932        }
 933
 934        if (status == EPROTO) {
 935                CNETERR("session protocol error from %s: %u\n",
 936                        libcfs_nid2str(nd->nd_id.nid),
 937                        reply->msg_ses_feats);
 938        }
 939
 940        if (status != 0)
 941                return status;
 942
 943        if (!trans->tas_feats_updated) {
 944                trans->tas_feats_updated = 1;
 945                trans->tas_features = reply->msg_ses_feats;
 946        }
 947
 948        if (reply->msg_ses_feats != trans->tas_features) {
 949                CNETERR("Framework features %x from %s is different with features on this transaction: %x\n",
 950                         reply->msg_ses_feats, libcfs_nid2str(nd->nd_id.nid),
 951                         trans->tas_features);
 952                status = mksn_rep->mksn_status = EPROTO;
 953        }
 954
 955        if (status == 0) {
 956                /* session timeout on remote node */
 957                nd->nd_timeout = mksn_rep->mksn_timeout;
 958        }
 959
 960        return status;
 961}
 962
 963void
 964lstcon_rpc_stat_reply(lstcon_rpc_trans_t *trans, srpc_msg_t *msg,
 965                      lstcon_node_t *nd, lstcon_trans_stat_t *stat)
 966{
 967        srpc_rmsn_reply_t  *rmsn_rep;
 968        srpc_debug_reply_t *dbg_rep;
 969        srpc_batch_reply_t *bat_rep;
 970        srpc_test_reply_t  *test_rep;
 971        srpc_stat_reply_t  *stat_rep;
 972        int                rc = 0;
 973
 974        switch (trans->tas_opc) {
 975        case LST_TRANS_SESNEW:
 976                rc = lstcon_sesnew_stat_reply(trans, nd, msg);
 977                if (rc == 0) {
 978                        lstcon_sesop_stat_success(stat, 1);
 979                        return;
 980                }
 981
 982                lstcon_sesop_stat_failure(stat, 1);
 983                break;
 984
 985        case LST_TRANS_SESEND:
 986                rmsn_rep = &msg->msg_body.rmsn_reply;
 987                /* ESRCH is not an error for end session */
 988                if (rmsn_rep->rmsn_status == 0 ||
 989                    rmsn_rep->rmsn_status == ESRCH) {
 990                        lstcon_sesop_stat_success(stat, 1);
 991                        return;
 992                }
 993
 994                lstcon_sesop_stat_failure(stat, 1);
 995                rc = rmsn_rep->rmsn_status;
 996                break;
 997
 998        case LST_TRANS_SESQRY:
 999        case LST_TRANS_SESPING:
1000                dbg_rep = &msg->msg_body.dbg_reply;
1001
1002                if (dbg_rep->dbg_status == ESRCH) {
1003                        lstcon_sesqry_stat_unknown(stat, 1);
1004                        return;
1005                }
1006
1007                if (lstcon_session_match(dbg_rep->dbg_sid))
1008                        lstcon_sesqry_stat_active(stat, 1);
1009                else
1010                        lstcon_sesqry_stat_busy(stat, 1);
1011                return;
1012
1013        case LST_TRANS_TSBRUN:
1014        case LST_TRANS_TSBSTOP:
1015                bat_rep = &msg->msg_body.bat_reply;
1016
1017                if (bat_rep->bar_status == 0) {
1018                        lstcon_tsbop_stat_success(stat, 1);
1019                        return;
1020                }
1021
1022                if (bat_rep->bar_status == EPERM &&
1023                    trans->tas_opc == LST_TRANS_TSBSTOP) {
1024                        lstcon_tsbop_stat_success(stat, 1);
1025                        return;
1026                }
1027
1028                lstcon_tsbop_stat_failure(stat, 1);
1029                rc = bat_rep->bar_status;
1030                break;
1031
1032        case LST_TRANS_TSBCLIQRY:
1033        case LST_TRANS_TSBSRVQRY:
1034                bat_rep = &msg->msg_body.bat_reply;
1035
1036                if (bat_rep->bar_active != 0)
1037                        lstcon_tsbqry_stat_run(stat, 1);
1038                else
1039                        lstcon_tsbqry_stat_idle(stat, 1);
1040
1041                if (bat_rep->bar_status == 0)
1042                        return;
1043
1044                lstcon_tsbqry_stat_failure(stat, 1);
1045                rc = bat_rep->bar_status;
1046                break;
1047
1048        case LST_TRANS_TSBCLIADD:
1049        case LST_TRANS_TSBSRVADD:
1050                test_rep = &msg->msg_body.tes_reply;
1051
1052                if (test_rep->tsr_status == 0) {
1053                        lstcon_tsbop_stat_success(stat, 1);
1054                        return;
1055                }
1056
1057                lstcon_tsbop_stat_failure(stat, 1);
1058                rc = test_rep->tsr_status;
1059                break;
1060
1061        case LST_TRANS_STATQRY:
1062                stat_rep = &msg->msg_body.stat_reply;
1063
1064                if (stat_rep->str_status == 0) {
1065                        lstcon_statqry_stat_success(stat, 1);
1066                        return;
1067                }
1068
1069                lstcon_statqry_stat_failure(stat, 1);
1070                rc = stat_rep->str_status;
1071                break;
1072
1073        default:
1074                LBUG();
1075        }
1076
1077        if (stat->trs_fwk_errno == 0)
1078                stat->trs_fwk_errno = rc;
1079
1080        return;
1081}
1082
1083int
1084lstcon_rpc_trans_ndlist(struct list_head *ndlist,
1085                        struct list_head *translist, int transop,
1086                        void *arg, lstcon_rpc_cond_func_t condition,
1087                        lstcon_rpc_trans_t **transpp)
1088{
1089        lstcon_rpc_trans_t *trans;
1090        lstcon_ndlink_t *ndl;
1091        lstcon_node_t *nd;
1092        lstcon_rpc_t *rpc;
1093        unsigned feats;
1094        int rc;
1095
1096        /* Creating session RPG for list of nodes */
1097
1098        rc = lstcon_rpc_trans_prep(translist, transop, &trans);
1099        if (rc != 0) {
1100                CERROR("Can't create transaction %d: %d\n", transop, rc);
1101                return rc;
1102        }
1103
1104        feats = trans->tas_features;
1105        list_for_each_entry(ndl, ndlist, ndl_link) {
1106                rc = condition == NULL ? 1 :
1107                     condition(transop, ndl->ndl_node, arg);
1108
1109                if (rc == 0)
1110                        continue;
1111
1112                if (rc < 0) {
1113                        CDEBUG(D_NET, "Condition error while creating RPC for transaction %d: %d\n",
1114                                        transop, rc);
1115                        break;
1116                }
1117
1118                nd = ndl->ndl_node;
1119
1120                switch (transop) {
1121                case LST_TRANS_SESNEW:
1122                case LST_TRANS_SESEND:
1123                        rc = lstcon_sesrpc_prep(nd, transop, feats, &rpc);
1124                        break;
1125                case LST_TRANS_SESQRY:
1126                case LST_TRANS_SESPING:
1127                        rc = lstcon_dbgrpc_prep(nd, feats, &rpc);
1128                        break;
1129                case LST_TRANS_TSBCLIADD:
1130                case LST_TRANS_TSBSRVADD:
1131                        rc = lstcon_testrpc_prep(nd, transop, feats,
1132                                                 (lstcon_test_t *)arg, &rpc);
1133                        break;
1134                case LST_TRANS_TSBRUN:
1135                case LST_TRANS_TSBSTOP:
1136                case LST_TRANS_TSBCLIQRY:
1137                case LST_TRANS_TSBSRVQRY:
1138                        rc = lstcon_batrpc_prep(nd, transop, feats,
1139                                                (lstcon_tsb_hdr_t *)arg, &rpc);
1140                        break;
1141                case LST_TRANS_STATQRY:
1142                        rc = lstcon_statrpc_prep(nd, feats, &rpc);
1143                        break;
1144                default:
1145                        rc = -EINVAL;
1146                        break;
1147                }
1148
1149                if (rc != 0) {
1150                        CERROR("Failed to create RPC for transaction %s: %d\n",
1151                               lstcon_rpc_trans_name(transop), rc);
1152                        break;
1153                }
1154
1155                lstcon_rpc_trans_addreq(trans, rpc);
1156        }
1157
1158        if (rc == 0) {
1159                *transpp = trans;
1160                return 0;
1161        }
1162
1163        lstcon_rpc_trans_destroy(trans);
1164
1165        return rc;
1166}
1167
1168static void
1169lstcon_rpc_pinger(void *arg)
1170{
1171        stt_timer_t *ptimer = (stt_timer_t *)arg;
1172        lstcon_rpc_trans_t *trans;
1173        lstcon_rpc_t *crpc;
1174        srpc_msg_t *rep;
1175        srpc_debug_reqst_t *drq;
1176        lstcon_ndlink_t *ndl;
1177        lstcon_node_t *nd;
1178        int intv;
1179        int count = 0;
1180        int rc;
1181
1182        /* RPC pinger is a special case of transaction,
1183         * it's called by timer at 8 seconds interval.
1184         */
1185        mutex_lock(&console_session.ses_mutex);
1186
1187        if (console_session.ses_shutdown || console_session.ses_expired) {
1188                mutex_unlock(&console_session.ses_mutex);
1189                return;
1190        }
1191
1192        if (!console_session.ses_expired &&
1193            ktime_get_real_seconds() - console_session.ses_laststamp >
1194            (time64_t)console_session.ses_timeout)
1195                console_session.ses_expired = 1;
1196
1197        trans = console_session.ses_ping;
1198
1199        LASSERT(trans != NULL);
1200
1201        list_for_each_entry(ndl, &console_session.ses_ndl_list, ndl_link) {
1202                nd = ndl->ndl_node;
1203
1204                if (console_session.ses_expired) {
1205                        /* idle console, end session on all nodes */
1206                        if (nd->nd_state != LST_NODE_ACTIVE)
1207                                continue;
1208
1209                        rc = lstcon_sesrpc_prep(nd, LST_TRANS_SESEND,
1210                                                trans->tas_features, &crpc);
1211                        if (rc != 0) {
1212                                CERROR("Out of memory\n");
1213                                break;
1214                        }
1215
1216                        lstcon_rpc_trans_addreq(trans, crpc);
1217                        lstcon_rpc_post(crpc);
1218
1219                        continue;
1220                }
1221
1222                crpc = &nd->nd_ping;
1223
1224                if (crpc->crp_rpc != NULL) {
1225                        LASSERT(crpc->crp_trans == trans);
1226                        LASSERT(!list_empty(&crpc->crp_link));
1227
1228                        spin_lock(&crpc->crp_rpc->crpc_lock);
1229
1230                        LASSERT(crpc->crp_posted);
1231
1232                        if (!crpc->crp_finished) {
1233                                /* in flight */
1234                                spin_unlock(&crpc->crp_rpc->crpc_lock);
1235                                continue;
1236                        }
1237
1238                        spin_unlock(&crpc->crp_rpc->crpc_lock);
1239
1240                        lstcon_rpc_get_reply(crpc, &rep);
1241
1242                        list_del_init(&crpc->crp_link);
1243
1244                        lstcon_rpc_put(crpc);
1245                }
1246
1247                if (nd->nd_state != LST_NODE_ACTIVE)
1248                        continue;
1249
1250                intv = (jiffies - nd->nd_stamp) / HZ;
1251                if (intv < nd->nd_timeout / 2)
1252                        continue;
1253
1254                rc = lstcon_rpc_init(nd, SRPC_SERVICE_DEBUG,
1255                                     trans->tas_features, 0, 0, 1, crpc);
1256                if (rc != 0) {
1257                        CERROR("Out of memory\n");
1258                        break;
1259                }
1260
1261                drq = &crpc->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
1262
1263                drq->dbg_sid   = console_session.ses_id;
1264                drq->dbg_flags = 0;
1265
1266                lstcon_rpc_trans_addreq(trans, crpc);
1267                lstcon_rpc_post(crpc);
1268
1269                count++;
1270        }
1271
1272        if (console_session.ses_expired) {
1273                mutex_unlock(&console_session.ses_mutex);
1274                return;
1275        }
1276
1277        CDEBUG(D_NET, "Ping %d nodes in session\n", count);
1278
1279        ptimer->stt_expires = ktime_get_real_seconds() + LST_PING_INTERVAL;
1280        stt_add_timer(ptimer);
1281
1282        mutex_unlock(&console_session.ses_mutex);
1283}
1284
1285int
1286lstcon_rpc_pinger_start(void)
1287{
1288        stt_timer_t *ptimer;
1289        int rc;
1290
1291        LASSERT(list_empty(&console_session.ses_rpc_freelist));
1292        LASSERT(atomic_read(&console_session.ses_rpc_counter) == 0);
1293
1294        rc = lstcon_rpc_trans_prep(NULL, LST_TRANS_SESPING,
1295                                   &console_session.ses_ping);
1296        if (rc != 0) {
1297                CERROR("Failed to create console pinger\n");
1298                return rc;
1299        }
1300
1301        ptimer = &console_session.ses_ping_timer;
1302        ptimer->stt_expires = ktime_get_real_seconds() + LST_PING_INTERVAL;
1303
1304        stt_add_timer(ptimer);
1305
1306        return 0;
1307}
1308
1309void
1310lstcon_rpc_pinger_stop(void)
1311{
1312        LASSERT(console_session.ses_shutdown);
1313
1314        stt_del_timer(&console_session.ses_ping_timer);
1315
1316        lstcon_rpc_trans_abort(console_session.ses_ping, -ESHUTDOWN);
1317        lstcon_rpc_trans_stat(console_session.ses_ping, lstcon_trans_stat());
1318        lstcon_rpc_trans_destroy(console_session.ses_ping);
1319
1320        memset(lstcon_trans_stat(), 0, sizeof(lstcon_trans_stat_t));
1321
1322        console_session.ses_ping = NULL;
1323}
1324
1325void
1326lstcon_rpc_cleanup_wait(void)
1327{
1328        lstcon_rpc_trans_t *trans;
1329        lstcon_rpc_t *crpc;
1330        struct list_head *pacer;
1331        struct list_head zlist;
1332
1333        /* Called with hold of global mutex */
1334
1335        LASSERT(console_session.ses_shutdown);
1336
1337        while (!list_empty(&console_session.ses_trans_list)) {
1338                list_for_each(pacer, &console_session.ses_trans_list) {
1339                        trans = list_entry(pacer, lstcon_rpc_trans_t,
1340                                               tas_link);
1341
1342                        CDEBUG(D_NET, "Session closed, wakeup transaction %s\n",
1343                               lstcon_rpc_trans_name(trans->tas_opc));
1344
1345                        wake_up(&trans->tas_waitq);
1346                }
1347
1348                mutex_unlock(&console_session.ses_mutex);
1349
1350                CWARN("Session is shutting down, waiting for termination of transactions\n");
1351                set_current_state(TASK_UNINTERRUPTIBLE);
1352                schedule_timeout(cfs_time_seconds(1));
1353
1354                mutex_lock(&console_session.ses_mutex);
1355        }
1356
1357        spin_lock(&console_session.ses_rpc_lock);
1358
1359        lst_wait_until((atomic_read(&console_session.ses_rpc_counter) == 0),
1360                       console_session.ses_rpc_lock,
1361                       "Network is not accessible or target is down, waiting for %d console RPCs to being recycled\n",
1362                       atomic_read(&console_session.ses_rpc_counter));
1363
1364        list_add(&zlist, &console_session.ses_rpc_freelist);
1365        list_del_init(&console_session.ses_rpc_freelist);
1366
1367        spin_unlock(&console_session.ses_rpc_lock);
1368
1369        while (!list_empty(&zlist)) {
1370                crpc = list_entry(zlist.next, lstcon_rpc_t, crp_link);
1371
1372                list_del(&crpc->crp_link);
1373                LIBCFS_FREE(crpc, sizeof(lstcon_rpc_t));
1374        }
1375}
1376
1377int
1378lstcon_rpc_module_init(void)
1379{
1380        INIT_LIST_HEAD(&console_session.ses_ping_timer.stt_list);
1381        console_session.ses_ping_timer.stt_func = lstcon_rpc_pinger;
1382        console_session.ses_ping_timer.stt_data = &console_session.ses_ping_timer;
1383
1384        console_session.ses_ping = NULL;
1385
1386        spin_lock_init(&console_session.ses_rpc_lock);
1387        atomic_set(&console_session.ses_rpc_counter, 0);
1388        INIT_LIST_HEAD(&console_session.ses_rpc_freelist);
1389
1390        return 0;
1391}
1392
1393void
1394lstcon_rpc_module_fini(void)
1395{
1396        LASSERT(list_empty(&console_session.ses_rpc_freelist));
1397        LASSERT(atomic_read(&console_session.ses_rpc_counter) == 0);
1398}
1399