linux/drivers/staging/lustre/lnet/selftest/framework.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lnet/selftest/framework.c
  37 *
  38 * Author: Isaac Huang <isaac@clusterfs.com>
  39 * Author: Liang Zhen  <liangzhen@clusterfs.com>
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_LNET
  43
  44#include "selftest.h"
  45
  46lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
  47
  48static int session_timeout = 100;
  49CFS_MODULE_PARM(session_timeout, "i", int, 0444,
  50                "test session timeout in seconds (100 by default, 0 == never)");
  51
  52static int rpc_timeout = 64;
  53CFS_MODULE_PARM(rpc_timeout, "i", int, 0644,
  54                "rpc timeout in seconds (64 by default, 0 == never)");
  55
  56#define sfw_unpack_id(id)              \
  57do {                                \
  58        __swab64s(&(id).nid);      \
  59        __swab32s(&(id).pid);      \
  60} while (0)
  61
  62#define sfw_unpack_sid(sid)          \
  63do {                                \
  64        __swab64s(&(sid).ses_nid);      \
  65        __swab64s(&(sid).ses_stamp);    \
  66} while (0)
  67
  68#define sfw_unpack_fw_counters(fc)      \
  69do {                                  \
  70        __swab32s(&(fc).running_ms);      \
  71        __swab32s(&(fc).active_batches);  \
  72        __swab32s(&(fc).zombie_sessions); \
  73        __swab32s(&(fc).brw_errors);      \
  74        __swab32s(&(fc).ping_errors);     \
  75} while (0)
  76
  77#define sfw_unpack_rpc_counters(rc)     \
  78do {                                \
  79        __swab32s(&(rc).errors);        \
  80        __swab32s(&(rc).rpcs_sent);     \
  81        __swab32s(&(rc).rpcs_rcvd);     \
  82        __swab32s(&(rc).rpcs_dropped);  \
  83        __swab32s(&(rc).rpcs_expired);  \
  84        __swab64s(&(rc).bulk_get);      \
  85        __swab64s(&(rc).bulk_put);      \
  86} while (0)
  87
  88#define sfw_unpack_lnet_counters(lc)    \
  89do {                                \
  90        __swab32s(&(lc).errors);        \
  91        __swab32s(&(lc).msgs_max);      \
  92        __swab32s(&(lc).msgs_alloc);    \
  93        __swab32s(&(lc).send_count);    \
  94        __swab32s(&(lc).recv_count);    \
  95        __swab32s(&(lc).drop_count);    \
  96        __swab32s(&(lc).route_count);   \
  97        __swab64s(&(lc).send_length);   \
  98        __swab64s(&(lc).recv_length);   \
  99        __swab64s(&(lc).drop_length);   \
 100        __swab64s(&(lc).route_length);  \
 101} while (0)
 102
 103#define sfw_test_active(t)      (atomic_read(&(t)->tsi_nactive) != 0)
 104#define sfw_batch_active(b)     (atomic_read(&(b)->bat_nactive) != 0)
 105
 106struct smoketest_framework {
 107        struct list_head         fw_zombie_rpcs;     /* RPCs to be recycled */
 108        struct list_head         fw_zombie_sessions; /* stopping sessions */
 109        struct list_head         fw_tests;         /* registered test cases */
 110        atomic_t       fw_nzombies;     /* # zombie sessions */
 111        spinlock_t         fw_lock;             /* serialise */
 112        sfw_session_t     *fw_session;          /* _the_ session */
 113        int                fw_shuttingdown;     /* shutdown in progress */
 114        srpc_server_rpc_t *fw_active_srpc;      /* running RPC */
 115} sfw_data;
 116
 117/* forward ref's */
 118int sfw_stop_batch (sfw_batch_t *tsb, int force);
 119void sfw_destroy_session (sfw_session_t *sn);
 120
 121static inline sfw_test_case_t *
 122sfw_find_test_case(int id)
 123{
 124        sfw_test_case_t *tsc;
 125
 126        LASSERT (id <= SRPC_SERVICE_MAX_ID);
 127        LASSERT (id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
 128
 129        list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
 130                if (tsc->tsc_srv_service->sv_id == id)
 131                        return tsc;
 132        }
 133
 134        return NULL;
 135}
 136
 137static int
 138sfw_register_test (srpc_service_t *service, sfw_test_client_ops_t *cliops)
 139{
 140        sfw_test_case_t *tsc;
 141
 142        if (sfw_find_test_case(service->sv_id) != NULL) {
 143                CERROR ("Failed to register test %s (%d)\n",
 144                        service->sv_name, service->sv_id);
 145                return -EEXIST;
 146        }
 147
 148        LIBCFS_ALLOC(tsc, sizeof(sfw_test_case_t));
 149        if (tsc == NULL)
 150                return -ENOMEM;
 151
 152        memset(tsc, 0, sizeof(sfw_test_case_t));
 153        tsc->tsc_cli_ops     = cliops;
 154        tsc->tsc_srv_service = service;
 155
 156        list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
 157        return 0;
 158}
 159
 160void
 161sfw_add_session_timer (void)
 162{
 163        sfw_session_t *sn = sfw_data.fw_session;
 164        stt_timer_t   *timer = &sn->sn_timer;
 165
 166        LASSERT (!sfw_data.fw_shuttingdown);
 167
 168        if (sn == NULL || sn->sn_timeout == 0)
 169                return;
 170
 171        LASSERT (!sn->sn_timer_active);
 172
 173        sn->sn_timer_active = 1;
 174        timer->stt_expires = cfs_time_add(sn->sn_timeout,
 175                                          cfs_time_current_sec());
 176        stt_add_timer(timer);
 177        return;
 178}
 179
 180int
 181sfw_del_session_timer (void)
 182{
 183        sfw_session_t *sn = sfw_data.fw_session;
 184
 185        if (sn == NULL || !sn->sn_timer_active)
 186                return 0;
 187
 188        LASSERT (sn->sn_timeout != 0);
 189
 190        if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
 191                sn->sn_timer_active = 0;
 192                return 0;
 193        }
 194
 195        return EBUSY; /* racing with sfw_session_expired() */
 196}
 197
 198/* called with sfw_data.fw_lock held */
 199static void
 200sfw_deactivate_session (void)
 201{
 202        sfw_session_t *sn = sfw_data.fw_session;
 203        int         nactive = 0;
 204        sfw_batch_t   *tsb;
 205        sfw_test_case_t *tsc;
 206
 207        if (sn == NULL) return;
 208
 209        LASSERT (!sn->sn_timer_active);
 210
 211        sfw_data.fw_session = NULL;
 212        atomic_inc(&sfw_data.fw_nzombies);
 213        list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
 214
 215        spin_unlock(&sfw_data.fw_lock);
 216
 217        list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
 218                srpc_abort_service(tsc->tsc_srv_service);
 219        }
 220
 221        spin_lock(&sfw_data.fw_lock);
 222
 223        list_for_each_entry (tsb, &sn->sn_batches, bat_list) {
 224                if (sfw_batch_active(tsb)) {
 225                        nactive++;
 226                        sfw_stop_batch(tsb, 1);
 227                }
 228        }
 229
 230        if (nactive != 0)
 231                return;   /* wait for active batches to stop */
 232
 233        list_del_init(&sn->sn_list);
 234        spin_unlock(&sfw_data.fw_lock);
 235
 236        sfw_destroy_session(sn);
 237
 238        spin_lock(&sfw_data.fw_lock);
 239}
 240
 241
 242void
 243sfw_session_expired (void *data)
 244{
 245        sfw_session_t *sn = data;
 246
 247        spin_lock(&sfw_data.fw_lock);
 248
 249        LASSERT (sn->sn_timer_active);
 250        LASSERT (sn == sfw_data.fw_session);
 251
 252        CWARN ("Session expired! sid: %s-"LPU64", name: %s\n",
 253               libcfs_nid2str(sn->sn_id.ses_nid),
 254               sn->sn_id.ses_stamp, &sn->sn_name[0]);
 255
 256        sn->sn_timer_active = 0;
 257        sfw_deactivate_session();
 258
 259        spin_unlock(&sfw_data.fw_lock);
 260}
 261
 262static inline void
 263sfw_init_session(sfw_session_t *sn, lst_sid_t sid,
 264                 unsigned features, const char *name)
 265{
 266        stt_timer_t *timer = &sn->sn_timer;
 267
 268        memset(sn, 0, sizeof(sfw_session_t));
 269        INIT_LIST_HEAD(&sn->sn_list);
 270        INIT_LIST_HEAD(&sn->sn_batches);
 271        atomic_set(&sn->sn_refcount, 1);        /* +1 for caller */
 272        atomic_set(&sn->sn_brw_errors, 0);
 273        atomic_set(&sn->sn_ping_errors, 0);
 274        strlcpy(&sn->sn_name[0], name, sizeof(sn->sn_name));
 275
 276        sn->sn_timer_active = 0;
 277        sn->sn_id          = sid;
 278        sn->sn_features     = features;
 279        sn->sn_timeout      = session_timeout;
 280        sn->sn_started      = cfs_time_current();
 281
 282        timer->stt_data = sn;
 283        timer->stt_func = sfw_session_expired;
 284        INIT_LIST_HEAD(&timer->stt_list);
 285}
 286
 287/* completion handler for incoming framework RPCs */
 288void
 289sfw_server_rpc_done(struct srpc_server_rpc *rpc)
 290{
 291        struct srpc_service     *sv     = rpc->srpc_scd->scd_svc;
 292        int                     status  = rpc->srpc_status;
 293
 294        CDEBUG (D_NET,
 295                "Incoming framework RPC done: "
 296                "service %s, peer %s, status %s:%d\n",
 297                sv->sv_name, libcfs_id2str(rpc->srpc_peer),
 298                swi_state2str(rpc->srpc_wi.swi_state),
 299                status);
 300
 301        if (rpc->srpc_bulk != NULL)
 302                sfw_free_pages(rpc);
 303        return;
 304}
 305
 306void
 307sfw_client_rpc_fini (srpc_client_rpc_t *rpc)
 308{
 309        LASSERT (rpc->crpc_bulk.bk_niov == 0);
 310        LASSERT (list_empty(&rpc->crpc_list));
 311        LASSERT (atomic_read(&rpc->crpc_refcount) == 0);
 312
 313        CDEBUG (D_NET,
 314                "Outgoing framework RPC done: "
 315                "service %d, peer %s, status %s:%d:%d\n",
 316                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
 317                swi_state2str(rpc->crpc_wi.swi_state),
 318                rpc->crpc_aborted, rpc->crpc_status);
 319
 320        spin_lock(&sfw_data.fw_lock);
 321
 322        /* my callers must finish all RPCs before shutting me down */
 323        LASSERT(!sfw_data.fw_shuttingdown);
 324        list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
 325
 326        spin_unlock(&sfw_data.fw_lock);
 327}
 328
 329sfw_batch_t *
 330sfw_find_batch (lst_bid_t bid)
 331{
 332        sfw_session_t *sn = sfw_data.fw_session;
 333        sfw_batch_t   *bat;
 334
 335        LASSERT (sn != NULL);
 336
 337        list_for_each_entry (bat, &sn->sn_batches, bat_list) {
 338                if (bat->bat_id.bat_id == bid.bat_id)
 339                        return bat;
 340        }
 341
 342        return NULL;
 343}
 344
 345sfw_batch_t *
 346sfw_bid2batch (lst_bid_t bid)
 347{
 348        sfw_session_t *sn = sfw_data.fw_session;
 349        sfw_batch_t   *bat;
 350
 351        LASSERT (sn != NULL);
 352
 353        bat = sfw_find_batch(bid);
 354        if (bat != NULL)
 355                return bat;
 356
 357        LIBCFS_ALLOC(bat, sizeof(sfw_batch_t));
 358        if (bat == NULL)
 359                return NULL;
 360
 361        bat->bat_error    = 0;
 362        bat->bat_session  = sn;
 363        bat->bat_id       = bid;
 364        atomic_set(&bat->bat_nactive, 0);
 365        INIT_LIST_HEAD(&bat->bat_tests);
 366
 367        list_add_tail(&bat->bat_list, &sn->sn_batches);
 368        return bat;
 369}
 370
 371int
 372sfw_get_stats (srpc_stat_reqst_t *request, srpc_stat_reply_t *reply)
 373{
 374        sfw_session_t  *sn = sfw_data.fw_session;
 375        sfw_counters_t *cnt = &reply->str_fw;
 376        sfw_batch_t    *bat;
 377        struct timeval  tv;
 378
 379        reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
 380
 381        if (request->str_sid.ses_nid == LNET_NID_ANY) {
 382                reply->str_status = EINVAL;
 383                return 0;
 384        }
 385
 386        if (sn == NULL || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
 387                reply->str_status = ESRCH;
 388                return 0;
 389        }
 390
 391        lnet_counters_get(&reply->str_lnet);
 392        srpc_get_counters(&reply->str_rpc);
 393
 394        /* send over the msecs since the session was started
 395         - with 32 bits to send, this is ~49 days */
 396        cfs_duration_usec(cfs_time_sub(cfs_time_current(),
 397                                       sn->sn_started), &tv);
 398
 399        cnt->running_ms      = (__u32)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
 400        cnt->brw_errors      = atomic_read(&sn->sn_brw_errors);
 401        cnt->ping_errors     = atomic_read(&sn->sn_ping_errors);
 402        cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies);
 403
 404        cnt->active_batches = 0;
 405        list_for_each_entry (bat, &sn->sn_batches, bat_list) {
 406                if (atomic_read(&bat->bat_nactive) > 0)
 407                        cnt->active_batches++;
 408        }
 409
 410        reply->str_status = 0;
 411        return 0;
 412}
 413
 414int
 415sfw_make_session(srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply)
 416{
 417        sfw_session_t *sn = sfw_data.fw_session;
 418        srpc_msg_t    *msg = container_of(request, srpc_msg_t,
 419                                          msg_body.mksn_reqst);
 420        int            cplen = 0;
 421
 422        if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
 423                reply->mksn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
 424                reply->mksn_status = EINVAL;
 425                return 0;
 426        }
 427
 428        if (sn != NULL) {
 429                reply->mksn_status  = 0;
 430                reply->mksn_sid     = sn->sn_id;
 431                reply->mksn_timeout = sn->sn_timeout;
 432
 433                if (sfw_sid_equal(request->mksn_sid, sn->sn_id)) {
 434                        atomic_inc(&sn->sn_refcount);
 435                        return 0;
 436                }
 437
 438                if (!request->mksn_force) {
 439                        reply->mksn_status = EBUSY;
 440                        cplen = strlcpy(&reply->mksn_name[0], &sn->sn_name[0],
 441                                        sizeof(reply->mksn_name));
 442                        if (cplen >= sizeof(reply->mksn_name))
 443                                return -E2BIG;
 444                        return 0;
 445                }
 446        }
 447
 448        /* reject the request if it requires unknown features
 449         * NB: old version will always accept all features because it's not
 450         * aware of srpc_msg_t::msg_ses_feats, it's a defect but it's also
 451         * harmless because it will return zero feature to console, and it's
 452         * console's responsibility to make sure all nodes in a session have
 453         * same feature mask. */
 454        if ((msg->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
 455                reply->mksn_status = EPROTO;
 456                return 0;
 457        }
 458
 459        /* brand new or create by force */
 460        LIBCFS_ALLOC(sn, sizeof(sfw_session_t));
 461        if (sn == NULL) {
 462                CERROR ("Dropping RPC (mksn) under memory pressure.\n");
 463                return -ENOMEM;
 464        }
 465
 466        sfw_init_session(sn, request->mksn_sid,
 467                         msg->msg_ses_feats, &request->mksn_name[0]);
 468
 469        spin_lock(&sfw_data.fw_lock);
 470
 471        sfw_deactivate_session();
 472        LASSERT(sfw_data.fw_session == NULL);
 473        sfw_data.fw_session = sn;
 474
 475        spin_unlock(&sfw_data.fw_lock);
 476
 477        reply->mksn_status  = 0;
 478        reply->mksn_sid     = sn->sn_id;
 479        reply->mksn_timeout = sn->sn_timeout;
 480        return 0;
 481}
 482
 483int
 484sfw_remove_session (srpc_rmsn_reqst_t *request, srpc_rmsn_reply_t *reply)
 485{
 486        sfw_session_t *sn = sfw_data.fw_session;
 487
 488        reply->rmsn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
 489
 490        if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
 491                reply->rmsn_status = EINVAL;
 492                return 0;
 493        }
 494
 495        if (sn == NULL || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
 496                reply->rmsn_status = (sn == NULL) ? ESRCH : EBUSY;
 497                return 0;
 498        }
 499
 500        if (!atomic_dec_and_test(&sn->sn_refcount)) {
 501                reply->rmsn_status = 0;
 502                return 0;
 503        }
 504
 505        spin_lock(&sfw_data.fw_lock);
 506        sfw_deactivate_session();
 507        spin_unlock(&sfw_data.fw_lock);
 508
 509        reply->rmsn_status = 0;
 510        reply->rmsn_sid    = LST_INVALID_SID;
 511        LASSERT(sfw_data.fw_session == NULL);
 512        return 0;
 513}
 514
 515int
 516sfw_debug_session (srpc_debug_reqst_t *request, srpc_debug_reply_t *reply)
 517{
 518        sfw_session_t *sn = sfw_data.fw_session;
 519
 520        if (sn == NULL) {
 521                reply->dbg_status = ESRCH;
 522                reply->dbg_sid    = LST_INVALID_SID;
 523                return 0;
 524        }
 525
 526        reply->dbg_status  = 0;
 527        reply->dbg_sid     = sn->sn_id;
 528        reply->dbg_timeout = sn->sn_timeout;
 529        if (strlcpy(reply->dbg_name, &sn->sn_name[0], sizeof(reply->dbg_name))
 530            >= sizeof(reply->dbg_name))
 531                return -E2BIG;
 532
 533        return 0;
 534}
 535
 536void
 537sfw_test_rpc_fini (srpc_client_rpc_t *rpc)
 538{
 539        sfw_test_unit_t     *tsu = rpc->crpc_priv;
 540        sfw_test_instance_t *tsi = tsu->tsu_instance;
 541
 542        /* Called with hold of tsi->tsi_lock */
 543        LASSERT (list_empty(&rpc->crpc_list));
 544        list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
 545}
 546
 547static inline int
 548sfw_test_buffers(sfw_test_instance_t *tsi)
 549{
 550        struct sfw_test_case    *tsc = sfw_find_test_case(tsi->tsi_service);
 551        struct srpc_service     *svc = tsc->tsc_srv_service;
 552        int                     nbuf;
 553
 554        nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts;
 555        return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA);
 556}
 557
 558int
 559sfw_load_test(struct sfw_test_instance *tsi)
 560{
 561        struct sfw_test_case    *tsc;
 562        struct srpc_service     *svc;
 563        int                     nbuf;
 564        int                     rc;
 565
 566        LASSERT(tsi != NULL);
 567        tsc = sfw_find_test_case(tsi->tsi_service);
 568        nbuf = sfw_test_buffers(tsi);
 569        LASSERT(tsc != NULL);
 570        svc = tsc->tsc_srv_service;
 571
 572        if (tsi->tsi_is_client) {
 573                tsi->tsi_ops = tsc->tsc_cli_ops;
 574                return 0;
 575        }
 576
 577        rc = srpc_service_add_buffers(svc, nbuf);
 578        if (rc != 0) {
 579                CWARN("Failed to reserve enough buffers: "
 580                      "service %s, %d needed: %d\n", svc->sv_name, nbuf, rc);
 581                /* NB: this error handler is not strictly correct, because
 582                 * it may release more buffers than already allocated,
 583                 * but it doesn't matter because request portal should
 584                 * be lazy portal and will grow buffers if necessary. */
 585                srpc_service_remove_buffers(svc, nbuf);
 586                return -ENOMEM;
 587        }
 588
 589        CDEBUG(D_NET, "Reserved %d buffers for test %s\n",
 590               nbuf * (srpc_serv_is_framework(svc) ?
 591                       1 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name);
 592        return 0;
 593}
 594
 595void
 596sfw_unload_test(struct sfw_test_instance *tsi)
 597{
 598        struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service);
 599
 600        LASSERT(tsc != NULL);
 601
 602        if (tsi->tsi_is_client)
 603                return;
 604
 605        /* shrink buffers, because request portal is lazy portal
 606         * which can grow buffers at runtime so we may leave
 607         * some buffers behind, but never mind... */
 608        srpc_service_remove_buffers(tsc->tsc_srv_service,
 609                                    sfw_test_buffers(tsi));
 610        return;
 611}
 612
 613void
 614sfw_destroy_test_instance (sfw_test_instance_t *tsi)
 615{
 616        srpc_client_rpc_t *rpc;
 617        sfw_test_unit_t   *tsu;
 618
 619        if (!tsi->tsi_is_client) goto clean;
 620
 621        tsi->tsi_ops->tso_fini(tsi);
 622
 623        LASSERT (!tsi->tsi_stopping);
 624        LASSERT (list_empty(&tsi->tsi_active_rpcs));
 625        LASSERT (!sfw_test_active(tsi));
 626
 627        while (!list_empty(&tsi->tsi_units)) {
 628                tsu = list_entry(tsi->tsi_units.next,
 629                                     sfw_test_unit_t, tsu_list);
 630                list_del(&tsu->tsu_list);
 631                LIBCFS_FREE(tsu, sizeof(*tsu));
 632        }
 633
 634        while (!list_empty(&tsi->tsi_free_rpcs)) {
 635                rpc = list_entry(tsi->tsi_free_rpcs.next,
 636                                     srpc_client_rpc_t, crpc_list);
 637                list_del(&rpc->crpc_list);
 638                LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
 639        }
 640
 641clean:
 642        sfw_unload_test(tsi);
 643        LIBCFS_FREE(tsi, sizeof(*tsi));
 644        return;
 645}
 646
 647void
 648sfw_destroy_batch (sfw_batch_t *tsb)
 649{
 650        sfw_test_instance_t *tsi;
 651
 652        LASSERT (!sfw_batch_active(tsb));
 653        LASSERT (list_empty(&tsb->bat_list));
 654
 655        while (!list_empty(&tsb->bat_tests)) {
 656                tsi = list_entry(tsb->bat_tests.next,
 657                                     sfw_test_instance_t, tsi_list);
 658                list_del_init(&tsi->tsi_list);
 659                sfw_destroy_test_instance(tsi);
 660        }
 661
 662        LIBCFS_FREE(tsb, sizeof(sfw_batch_t));
 663        return;
 664}
 665
 666void
 667sfw_destroy_session (sfw_session_t *sn)
 668{
 669        sfw_batch_t *batch;
 670
 671        LASSERT (list_empty(&sn->sn_list));
 672        LASSERT (sn != sfw_data.fw_session);
 673
 674        while (!list_empty(&sn->sn_batches)) {
 675                batch = list_entry(sn->sn_batches.next,
 676                                       sfw_batch_t, bat_list);
 677                list_del_init(&batch->bat_list);
 678                sfw_destroy_batch(batch);
 679        }
 680
 681        LIBCFS_FREE(sn, sizeof(*sn));
 682        atomic_dec(&sfw_data.fw_nzombies);
 683        return;
 684}
 685
 686void
 687sfw_unpack_addtest_req(srpc_msg_t *msg)
 688{
 689        srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
 690
 691        LASSERT (msg->msg_type == SRPC_MSG_TEST_REQST);
 692        LASSERT (req->tsr_is_client);
 693
 694        if (msg->msg_magic == SRPC_MSG_MAGIC)
 695                return; /* no flipping needed */
 696
 697        LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
 698
 699        if (req->tsr_service == SRPC_SERVICE_BRW) {
 700                if ((msg->msg_ses_feats & LST_FEAT_BULK_LEN) == 0) {
 701                        test_bulk_req_t *bulk = &req->tsr_u.bulk_v0;
 702
 703                        __swab32s(&bulk->blk_opc);
 704                        __swab32s(&bulk->blk_npg);
 705                        __swab32s(&bulk->blk_flags);
 706
 707                } else {
 708                        test_bulk_req_v1_t *bulk = &req->tsr_u.bulk_v1;
 709
 710                        __swab16s(&bulk->blk_opc);
 711                        __swab16s(&bulk->blk_flags);
 712                        __swab32s(&bulk->blk_offset);
 713                        __swab32s(&bulk->blk_len);
 714                }
 715
 716                return;
 717        }
 718
 719        if (req->tsr_service == SRPC_SERVICE_PING) {
 720                test_ping_req_t *ping = &req->tsr_u.ping;
 721
 722                __swab32s(&ping->png_size);
 723                __swab32s(&ping->png_flags);
 724                return;
 725        }
 726
 727        LBUG ();
 728        return;
 729}
 730
 731int
 732sfw_add_test_instance (sfw_batch_t *tsb, srpc_server_rpc_t *rpc)
 733{
 734        srpc_msg_t        *msg = &rpc->srpc_reqstbuf->buf_msg;
 735        srpc_test_reqst_t   *req = &msg->msg_body.tes_reqst;
 736        srpc_bulk_t      *bk = rpc->srpc_bulk;
 737        int               ndest = req->tsr_ndest;
 738        sfw_test_unit_t     *tsu;
 739        sfw_test_instance_t *tsi;
 740        int               i;
 741        int               rc;
 742
 743        LIBCFS_ALLOC(tsi, sizeof(*tsi));
 744        if (tsi == NULL) {
 745                CERROR ("Can't allocate test instance for batch: "LPU64"\n",
 746                        tsb->bat_id.bat_id);
 747                return -ENOMEM;
 748        }
 749
 750        memset(tsi, 0, sizeof(*tsi));
 751        spin_lock_init(&tsi->tsi_lock);
 752        atomic_set(&tsi->tsi_nactive, 0);
 753        INIT_LIST_HEAD(&tsi->tsi_units);
 754        INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
 755        INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
 756
 757        tsi->tsi_stopping      = 0;
 758        tsi->tsi_batch   = tsb;
 759        tsi->tsi_loop     = req->tsr_loop;
 760        tsi->tsi_concur = req->tsr_concur;
 761        tsi->tsi_service       = req->tsr_service;
 762        tsi->tsi_is_client     = !!(req->tsr_is_client);
 763        tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr);
 764
 765        rc = sfw_load_test(tsi);
 766        if (rc != 0) {
 767                LIBCFS_FREE(tsi, sizeof(*tsi));
 768                return rc;
 769        }
 770
 771        LASSERT (!sfw_batch_active(tsb));
 772
 773        if (!tsi->tsi_is_client) {
 774                /* it's test server, just add it to tsb */
 775                list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
 776                return 0;
 777        }
 778
 779        LASSERT (bk != NULL);
 780        LASSERT (bk->bk_niov * SFW_ID_PER_PAGE >= (unsigned int)ndest);
 781        LASSERT((unsigned int)bk->bk_len >=
 782                sizeof(lnet_process_id_packed_t) * ndest);
 783
 784        sfw_unpack_addtest_req(msg);
 785        memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
 786
 787        for (i = 0; i < ndest; i++) {
 788                lnet_process_id_packed_t *dests;
 789                lnet_process_id_packed_t  id;
 790                int                    j;
 791
 792                dests = page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
 793                LASSERT (dests != NULL);  /* my pages are within KVM always */
 794                id = dests[i % SFW_ID_PER_PAGE];
 795                if (msg->msg_magic != SRPC_MSG_MAGIC)
 796                        sfw_unpack_id(id);
 797
 798                for (j = 0; j < tsi->tsi_concur; j++) {
 799                        LIBCFS_ALLOC(tsu, sizeof(sfw_test_unit_t));
 800                        if (tsu == NULL) {
 801                                rc = -ENOMEM;
 802                                CERROR ("Can't allocate tsu for %d\n",
 803                                        tsi->tsi_service);
 804                                goto error;
 805                        }
 806
 807                        tsu->tsu_dest.nid = id.nid;
 808                        tsu->tsu_dest.pid = id.pid;
 809                        tsu->tsu_instance = tsi;
 810                        tsu->tsu_private  = NULL;
 811                        list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
 812                }
 813        }
 814
 815        rc = tsi->tsi_ops->tso_init(tsi);
 816        if (rc == 0) {
 817                list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
 818                return 0;
 819        }
 820
 821error:
 822        LASSERT (rc != 0);
 823        sfw_destroy_test_instance(tsi);
 824        return rc;
 825}
 826
 827static void
 828sfw_test_unit_done (sfw_test_unit_t *tsu)
 829{
 830        sfw_test_instance_t *tsi = tsu->tsu_instance;
 831        sfw_batch_t      *tsb = tsi->tsi_batch;
 832        sfw_session_t       *sn = tsb->bat_session;
 833
 834        LASSERT (sfw_test_active(tsi));
 835
 836        if (!atomic_dec_and_test(&tsi->tsi_nactive))
 837                return;
 838
 839        /* the test instance is done */
 840        spin_lock(&tsi->tsi_lock);
 841
 842        tsi->tsi_stopping = 0;
 843
 844        spin_unlock(&tsi->tsi_lock);
 845
 846        spin_lock(&sfw_data.fw_lock);
 847
 848        if (!atomic_dec_and_test(&tsb->bat_nactive) ||/* tsb still active */
 849            sn == sfw_data.fw_session) {                  /* sn also active */
 850                spin_unlock(&sfw_data.fw_lock);
 851                return;
 852        }
 853
 854        LASSERT (!list_empty(&sn->sn_list)); /* I'm a zombie! */
 855
 856        list_for_each_entry (tsb, &sn->sn_batches, bat_list) {
 857                if (sfw_batch_active(tsb)) {
 858                        spin_unlock(&sfw_data.fw_lock);
 859                        return;
 860                }
 861        }
 862
 863        list_del_init(&sn->sn_list);
 864        spin_unlock(&sfw_data.fw_lock);
 865
 866        sfw_destroy_session(sn);
 867        return;
 868}
 869
 870void
 871sfw_test_rpc_done (srpc_client_rpc_t *rpc)
 872{
 873        sfw_test_unit_t     *tsu = rpc->crpc_priv;
 874        sfw_test_instance_t *tsi = tsu->tsu_instance;
 875        int               done = 0;
 876
 877        tsi->tsi_ops->tso_done_rpc(tsu, rpc);
 878
 879        spin_lock(&tsi->tsi_lock);
 880
 881        LASSERT (sfw_test_active(tsi));
 882        LASSERT (!list_empty(&rpc->crpc_list));
 883
 884        list_del_init(&rpc->crpc_list);
 885
 886        /* batch is stopping or loop is done or get error */
 887        if (tsi->tsi_stopping ||
 888            tsu->tsu_loop == 0 ||
 889            (rpc->crpc_status != 0 && tsi->tsi_stoptsu_onerr))
 890                done = 1;
 891
 892        /* dec ref for poster */
 893        srpc_client_rpc_decref(rpc);
 894
 895        spin_unlock(&tsi->tsi_lock);
 896
 897        if (!done) {
 898                swi_schedule_workitem(&tsu->tsu_worker);
 899                return;
 900        }
 901
 902        sfw_test_unit_done(tsu);
 903        return;
 904}
 905
 906int
 907sfw_create_test_rpc(sfw_test_unit_t *tsu, lnet_process_id_t peer,
 908                    unsigned features, int nblk, int blklen,
 909                    srpc_client_rpc_t **rpcpp)
 910{
 911        srpc_client_rpc_t   *rpc = NULL;
 912        sfw_test_instance_t *tsi = tsu->tsu_instance;
 913
 914        spin_lock(&tsi->tsi_lock);
 915
 916        LASSERT (sfw_test_active(tsi));
 917
 918        if (!list_empty(&tsi->tsi_free_rpcs)) {
 919                /* pick request from buffer */
 920                rpc = list_entry(tsi->tsi_free_rpcs.next,
 921                                     srpc_client_rpc_t, crpc_list);
 922                LASSERT (nblk == rpc->crpc_bulk.bk_niov);
 923                list_del_init(&rpc->crpc_list);
 924        }
 925
 926        spin_unlock(&tsi->tsi_lock);
 927
 928        if (rpc == NULL) {
 929                rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
 930                                             blklen, sfw_test_rpc_done,
 931                                             sfw_test_rpc_fini, tsu);
 932        } else {
 933                srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
 934                                     blklen, sfw_test_rpc_done,
 935                                     sfw_test_rpc_fini, tsu);
 936        }
 937
 938        if (rpc == NULL) {
 939                CERROR("Can't create rpc for test %d\n", tsi->tsi_service);
 940                return -ENOMEM;
 941        }
 942
 943        rpc->crpc_reqstmsg.msg_ses_feats = features;
 944        *rpcpp = rpc;
 945
 946        return 0;
 947}
 948
 949int
 950sfw_run_test (swi_workitem_t *wi)
 951{
 952        sfw_test_unit_t     *tsu = wi->swi_workitem.wi_data;
 953        sfw_test_instance_t *tsi = tsu->tsu_instance;
 954        srpc_client_rpc_t   *rpc = NULL;
 955
 956        LASSERT (wi == &tsu->tsu_worker);
 957
 958        if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
 959                LASSERT (rpc == NULL);
 960                goto test_done;
 961        }
 962
 963        LASSERT (rpc != NULL);
 964
 965        spin_lock(&tsi->tsi_lock);
 966
 967        if (tsi->tsi_stopping) {
 968                list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
 969                spin_unlock(&tsi->tsi_lock);
 970                goto test_done;
 971        }
 972
 973        if (tsu->tsu_loop > 0)
 974                tsu->tsu_loop--;
 975
 976        list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
 977        spin_unlock(&tsi->tsi_lock);
 978
 979        rpc->crpc_timeout = rpc_timeout;
 980
 981        spin_lock(&rpc->crpc_lock);
 982        srpc_post_rpc(rpc);
 983        spin_unlock(&rpc->crpc_lock);
 984        return 0;
 985
 986test_done:
 987        /*
 988         * No one can schedule me now since:
 989         * - previous RPC, if any, has done and
 990         * - no new RPC is initiated.
 991         * - my batch is still active; no one can run it again now.
 992         * Cancel pending schedules and prevent future schedule attempts:
 993         */
 994        swi_exit_workitem(wi);
 995        sfw_test_unit_done(tsu);
 996        return 1;
 997}
 998
 999int
1000sfw_run_batch (sfw_batch_t *tsb)
1001{
1002        swi_workitem_t      *wi;
1003        sfw_test_unit_t     *tsu;
1004        sfw_test_instance_t *tsi;
1005
1006        if (sfw_batch_active(tsb)) {
1007                CDEBUG(D_NET, "Batch already active: "LPU64" (%d)\n",
1008                       tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive));
1009                return 0;
1010        }
1011
1012        list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
1013                if (!tsi->tsi_is_client) /* skip server instances */
1014                        continue;
1015
1016                LASSERT (!tsi->tsi_stopping);
1017                LASSERT (!sfw_test_active(tsi));
1018
1019                atomic_inc(&tsb->bat_nactive);
1020
1021                list_for_each_entry (tsu, &tsi->tsi_units, tsu_list) {
1022                        atomic_inc(&tsi->tsi_nactive);
1023                        tsu->tsu_loop = tsi->tsi_loop;
1024                        wi = &tsu->tsu_worker;
1025                        swi_init_workitem(wi, tsu, sfw_run_test,
1026                                          lst_sched_test[\
1027                                          lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
1028                        swi_schedule_workitem(wi);
1029                }
1030        }
1031
1032        return 0;
1033}
1034
1035int
1036sfw_stop_batch (sfw_batch_t *tsb, int force)
1037{
1038        sfw_test_instance_t *tsi;
1039        srpc_client_rpc_t   *rpc;
1040
1041        if (!sfw_batch_active(tsb)) {
1042                CDEBUG(D_NET, "Batch "LPU64" inactive\n", tsb->bat_id.bat_id);
1043                return 0;
1044        }
1045
1046        list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
1047                spin_lock(&tsi->tsi_lock);
1048
1049                if (!tsi->tsi_is_client ||
1050                    !sfw_test_active(tsi) || tsi->tsi_stopping) {
1051                        spin_unlock(&tsi->tsi_lock);
1052                        continue;
1053                }
1054
1055                tsi->tsi_stopping = 1;
1056
1057                if (!force) {
1058                        spin_unlock(&tsi->tsi_lock);
1059                        continue;
1060                }
1061
1062                /* abort launched rpcs in the test */
1063                list_for_each_entry(rpc, &tsi->tsi_active_rpcs, crpc_list) {
1064                        spin_lock(&rpc->crpc_lock);
1065
1066                        srpc_abort_rpc(rpc, -EINTR);
1067
1068                        spin_unlock(&rpc->crpc_lock);
1069                }
1070
1071                spin_unlock(&tsi->tsi_lock);
1072        }
1073
1074        return 0;
1075}
1076
1077int
1078sfw_query_batch (sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply)
1079{
1080        sfw_test_instance_t *tsi;
1081
1082        if (testidx < 0)
1083                return -EINVAL;
1084
1085        if (testidx == 0) {
1086                reply->bar_active = atomic_read(&tsb->bat_nactive);
1087                return 0;
1088        }
1089
1090        list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
1091                if (testidx-- > 1)
1092                        continue;
1093
1094                reply->bar_active = atomic_read(&tsi->tsi_nactive);
1095                return 0;
1096        }
1097
1098        return -ENOENT;
1099}
1100
1101void
1102sfw_free_pages (srpc_server_rpc_t *rpc)
1103{
1104        srpc_free_bulk(rpc->srpc_bulk);
1105        rpc->srpc_bulk = NULL;
1106}
1107
1108int
1109sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int len,
1110                int sink)
1111{
1112        LASSERT(rpc->srpc_bulk == NULL);
1113        LASSERT(npages > 0 && npages <= LNET_MAX_IOV);
1114
1115        rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, len, sink);
1116        if (rpc->srpc_bulk == NULL)
1117                return -ENOMEM;
1118
1119        return 0;
1120}
1121
1122int
1123sfw_add_test (srpc_server_rpc_t *rpc)
1124{
1125        sfw_session_t     *sn = sfw_data.fw_session;
1126        srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1127        srpc_test_reqst_t *request;
1128        int             rc;
1129        sfw_batch_t       *bat;
1130
1131        request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1132        reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1133
1134        if (request->tsr_loop == 0 ||
1135            request->tsr_concur == 0 ||
1136            request->tsr_sid.ses_nid == LNET_NID_ANY ||
1137            request->tsr_ndest > SFW_MAX_NDESTS ||
1138            (request->tsr_is_client && request->tsr_ndest == 0) ||
1139            request->tsr_concur > SFW_MAX_CONCUR ||
1140            request->tsr_service > SRPC_SERVICE_MAX_ID ||
1141            request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1142                reply->tsr_status = EINVAL;
1143                return 0;
1144        }
1145
1146        if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1147            sfw_find_test_case(request->tsr_service) == NULL) {
1148                reply->tsr_status = ENOENT;
1149                return 0;
1150        }
1151
1152        bat = sfw_bid2batch(request->tsr_bid);
1153        if (bat == NULL) {
1154                CERROR ("Dropping RPC (%s) from %s under memory pressure.\n",
1155                        rpc->srpc_scd->scd_svc->sv_name,
1156                        libcfs_id2str(rpc->srpc_peer));
1157                return -ENOMEM;
1158        }
1159
1160        if (sfw_batch_active(bat)) {
1161                reply->tsr_status = EBUSY;
1162                return 0;
1163        }
1164
1165        if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
1166                /* rpc will be resumed later in sfw_bulk_ready */
1167                int     npg = sfw_id_pages(request->tsr_ndest);
1168                int     len;
1169
1170                if ((sn->sn_features & LST_FEAT_BULK_LEN) == 0) {
1171                        len = npg * PAGE_CACHE_SIZE;
1172
1173                } else  {
1174                        len = sizeof(lnet_process_id_packed_t) *
1175                              request->tsr_ndest;
1176                }
1177
1178                return sfw_alloc_pages(rpc, CFS_CPT_ANY, npg, len, 1);
1179        }
1180
1181        rc = sfw_add_test_instance(bat, rpc);
1182        CDEBUG (rc == 0 ? D_NET : D_WARNING,
1183                "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1184                rc == 0 ? "Added" : "Failed to add", request->tsr_service,
1185                request->tsr_is_client ? "client" : "server",
1186                request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1187
1188        reply->tsr_status = (rc < 0) ? -rc : rc;
1189        return 0;
1190}
1191
1192int
1193sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply)
1194{
1195        sfw_session_t *sn = sfw_data.fw_session;
1196        int         rc = 0;
1197        sfw_batch_t   *bat;
1198
1199        reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1200
1201        if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1202                reply->bar_status = ESRCH;
1203                return 0;
1204        }
1205
1206        bat = sfw_find_batch(request->bar_bid);
1207        if (bat == NULL) {
1208                reply->bar_status = ENOENT;
1209                return 0;
1210        }
1211
1212        switch (request->bar_opc) {
1213        case SRPC_BATCH_OPC_RUN:
1214                rc = sfw_run_batch(bat);
1215                break;
1216
1217        case SRPC_BATCH_OPC_STOP:
1218                rc = sfw_stop_batch(bat, request->bar_arg);
1219                break;
1220
1221        case SRPC_BATCH_OPC_QUERY:
1222                rc = sfw_query_batch(bat, request->bar_testidx, reply);
1223                break;
1224
1225        default:
1226                return -EINVAL; /* drop it */
1227        }
1228
1229        reply->bar_status = (rc < 0) ? -rc : rc;
1230        return 0;
1231}
1232
1233int
1234sfw_handle_server_rpc(struct srpc_server_rpc *rpc)
1235{
1236        struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
1237        srpc_msg_t     *reply   = &rpc->srpc_replymsg;
1238        srpc_msg_t     *request = &rpc->srpc_reqstbuf->buf_msg;
1239        unsigned        features = LST_FEATS_MASK;
1240        int             rc = 0;
1241
1242        LASSERT(sfw_data.fw_active_srpc == NULL);
1243        LASSERT(sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1244
1245        spin_lock(&sfw_data.fw_lock);
1246
1247        if (sfw_data.fw_shuttingdown) {
1248                spin_unlock(&sfw_data.fw_lock);
1249                return -ESHUTDOWN;
1250        }
1251
1252        /* Remove timer to avoid racing with it or expiring active session */
1253        if (sfw_del_session_timer() != 0) {
1254                CERROR("Dropping RPC (%s) from %s: racing with expiry timer.",
1255                       sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1256                spin_unlock(&sfw_data.fw_lock);
1257                return -EAGAIN;
1258        }
1259
1260        sfw_data.fw_active_srpc = rpc;
1261        spin_unlock(&sfw_data.fw_lock);
1262
1263        sfw_unpack_message(request);
1264        LASSERT(request->msg_type == srpc_service2request(sv->sv_id));
1265
1266        /* rpc module should have checked this */
1267        LASSERT(request->msg_version == SRPC_MSG_VERSION);
1268
1269        if (sv->sv_id != SRPC_SERVICE_MAKE_SESSION &&
1270            sv->sv_id != SRPC_SERVICE_DEBUG) {
1271                sfw_session_t *sn = sfw_data.fw_session;
1272
1273                if (sn != NULL &&
1274                    sn->sn_features != request->msg_ses_feats) {
1275                        CNETERR("Features of framework RPC don't match "
1276                                "features of current session: %x/%x\n",
1277                                request->msg_ses_feats, sn->sn_features);
1278                        reply->msg_body.reply.status = EPROTO;
1279                        reply->msg_body.reply.sid    = sn->sn_id;
1280                        goto out;
1281                }
1282
1283        } else if ((request->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
1284                /* NB: at this point, old version will ignore features and
1285                 * create new session anyway, so console should be able
1286                 * to handle this */
1287                reply->msg_body.reply.status = EPROTO;
1288                goto out;
1289        }
1290
1291        switch(sv->sv_id) {
1292        default:
1293                LBUG ();
1294        case SRPC_SERVICE_TEST:
1295                rc = sfw_add_test(rpc);
1296                break;
1297
1298        case SRPC_SERVICE_BATCH:
1299                rc = sfw_control_batch(&request->msg_body.bat_reqst,
1300                                       &reply->msg_body.bat_reply);
1301                break;
1302
1303        case SRPC_SERVICE_QUERY_STAT:
1304                rc = sfw_get_stats(&request->msg_body.stat_reqst,
1305                                   &reply->msg_body.stat_reply);
1306                break;
1307
1308        case SRPC_SERVICE_DEBUG:
1309                rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1310                                       &reply->msg_body.dbg_reply);
1311                break;
1312
1313        case SRPC_SERVICE_MAKE_SESSION:
1314                rc = sfw_make_session(&request->msg_body.mksn_reqst,
1315                                      &reply->msg_body.mksn_reply);
1316                break;
1317
1318        case SRPC_SERVICE_REMOVE_SESSION:
1319                rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1320                                        &reply->msg_body.rmsn_reply);
1321                break;
1322        }
1323
1324        if (sfw_data.fw_session != NULL)
1325                features = sfw_data.fw_session->sn_features;
1326 out:
1327        reply->msg_ses_feats = features;
1328        rpc->srpc_done = sfw_server_rpc_done;
1329        spin_lock(&sfw_data.fw_lock);
1330
1331        if (!sfw_data.fw_shuttingdown)
1332                sfw_add_session_timer();
1333
1334        sfw_data.fw_active_srpc = NULL;
1335        spin_unlock(&sfw_data.fw_lock);
1336        return rc;
1337}
1338
1339int
1340sfw_bulk_ready(struct srpc_server_rpc *rpc, int status)
1341{
1342        struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
1343        int                     rc;
1344
1345        LASSERT(rpc->srpc_bulk != NULL);
1346        LASSERT(sv->sv_id == SRPC_SERVICE_TEST);
1347        LASSERT(sfw_data.fw_active_srpc == NULL);
1348        LASSERT(rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1349
1350        spin_lock(&sfw_data.fw_lock);
1351
1352        if (status != 0) {
1353                CERROR("Bulk transfer failed for RPC: "
1354                       "service %s, peer %s, status %d\n",
1355                       sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1356                spin_unlock(&sfw_data.fw_lock);
1357                return -EIO;
1358        }
1359
1360        if (sfw_data.fw_shuttingdown) {
1361                spin_unlock(&sfw_data.fw_lock);
1362                return -ESHUTDOWN;
1363        }
1364
1365        if (sfw_del_session_timer() != 0) {
1366                CERROR("Dropping RPC (%s) from %s: racing with expiry timer",
1367                       sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1368                spin_unlock(&sfw_data.fw_lock);
1369                return -EAGAIN;
1370        }
1371
1372        sfw_data.fw_active_srpc = rpc;
1373        spin_unlock(&sfw_data.fw_lock);
1374
1375        rc = sfw_add_test(rpc);
1376
1377        spin_lock(&sfw_data.fw_lock);
1378
1379        if (!sfw_data.fw_shuttingdown)
1380                sfw_add_session_timer();
1381
1382        sfw_data.fw_active_srpc = NULL;
1383        spin_unlock(&sfw_data.fw_lock);
1384        return rc;
1385}
1386
1387srpc_client_rpc_t *
1388sfw_create_rpc(lnet_process_id_t peer, int service,
1389               unsigned features, int nbulkiov, int bulklen,
1390               void (*done)(srpc_client_rpc_t *), void *priv)
1391{
1392        srpc_client_rpc_t *rpc = NULL;
1393
1394        spin_lock(&sfw_data.fw_lock);
1395
1396        LASSERT (!sfw_data.fw_shuttingdown);
1397        LASSERT (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1398
1399        if (nbulkiov == 0 && !list_empty(&sfw_data.fw_zombie_rpcs)) {
1400                rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1401                                     srpc_client_rpc_t, crpc_list);
1402                list_del(&rpc->crpc_list);
1403
1404                srpc_init_client_rpc(rpc, peer, service, 0, 0,
1405                                     done, sfw_client_rpc_fini, priv);
1406        }
1407
1408        spin_unlock(&sfw_data.fw_lock);
1409
1410        if (rpc == NULL) {
1411                rpc = srpc_create_client_rpc(peer, service,
1412                                             nbulkiov, bulklen, done,
1413                                             nbulkiov != 0 ?  NULL :
1414                                             sfw_client_rpc_fini,
1415                                             priv);
1416        }
1417
1418        if (rpc != NULL) /* "session" is concept in framework */
1419                rpc->crpc_reqstmsg.msg_ses_feats = features;
1420
1421        return rpc;
1422}
1423
1424void
1425sfw_unpack_message (srpc_msg_t *msg)
1426{
1427        if (msg->msg_magic == SRPC_MSG_MAGIC)
1428                return; /* no flipping needed */
1429
1430        /* srpc module should guarantee I wouldn't get crap */
1431        LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1432
1433        if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1434                srpc_stat_reqst_t *req = &msg->msg_body.stat_reqst;
1435
1436                __swab32s(&req->str_type);
1437                __swab64s(&req->str_rpyid);
1438                sfw_unpack_sid(req->str_sid);
1439                return;
1440        }
1441
1442        if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1443                srpc_stat_reply_t *rep = &msg->msg_body.stat_reply;
1444
1445                __swab32s(&rep->str_status);
1446                sfw_unpack_sid(rep->str_sid);
1447                sfw_unpack_fw_counters(rep->str_fw);
1448                sfw_unpack_rpc_counters(rep->str_rpc);
1449                sfw_unpack_lnet_counters(rep->str_lnet);
1450                return;
1451        }
1452
1453        if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1454                srpc_mksn_reqst_t *req = &msg->msg_body.mksn_reqst;
1455
1456                __swab64s(&req->mksn_rpyid);
1457                __swab32s(&req->mksn_force);
1458                sfw_unpack_sid(req->mksn_sid);
1459                return;
1460        }
1461
1462        if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1463                srpc_mksn_reply_t *rep = &msg->msg_body.mksn_reply;
1464
1465                __swab32s(&rep->mksn_status);
1466                __swab32s(&rep->mksn_timeout);
1467                sfw_unpack_sid(rep->mksn_sid);
1468                return;
1469        }
1470
1471        if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1472                srpc_rmsn_reqst_t *req = &msg->msg_body.rmsn_reqst;
1473
1474                __swab64s(&req->rmsn_rpyid);
1475                sfw_unpack_sid(req->rmsn_sid);
1476                return;
1477        }
1478
1479        if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1480                srpc_rmsn_reply_t *rep = &msg->msg_body.rmsn_reply;
1481
1482                __swab32s(&rep->rmsn_status);
1483                sfw_unpack_sid(rep->rmsn_sid);
1484                return;
1485        }
1486
1487        if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1488                srpc_debug_reqst_t *req = &msg->msg_body.dbg_reqst;
1489
1490                __swab64s(&req->dbg_rpyid);
1491                __swab32s(&req->dbg_flags);
1492                sfw_unpack_sid(req->dbg_sid);
1493                return;
1494        }
1495
1496        if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1497                srpc_debug_reply_t *rep = &msg->msg_body.dbg_reply;
1498
1499                __swab32s(&rep->dbg_nbatch);
1500                __swab32s(&rep->dbg_timeout);
1501                sfw_unpack_sid(rep->dbg_sid);
1502                return;
1503        }
1504
1505        if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1506                srpc_batch_reqst_t *req = &msg->msg_body.bat_reqst;
1507
1508                __swab32s(&req->bar_opc);
1509                __swab64s(&req->bar_rpyid);
1510                __swab32s(&req->bar_testidx);
1511                __swab32s(&req->bar_arg);
1512                sfw_unpack_sid(req->bar_sid);
1513                __swab64s(&req->bar_bid.bat_id);
1514                return;
1515        }
1516
1517        if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1518                srpc_batch_reply_t *rep = &msg->msg_body.bat_reply;
1519
1520                __swab32s(&rep->bar_status);
1521                sfw_unpack_sid(rep->bar_sid);
1522                return;
1523        }
1524
1525        if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1526                srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
1527
1528                __swab64s(&req->tsr_rpyid);
1529                __swab64s(&req->tsr_bulkid);
1530                __swab32s(&req->tsr_loop);
1531                __swab32s(&req->tsr_ndest);
1532                __swab32s(&req->tsr_concur);
1533                __swab32s(&req->tsr_service);
1534                sfw_unpack_sid(req->tsr_sid);
1535                __swab64s(&req->tsr_bid.bat_id);
1536                return;
1537        }
1538
1539        if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1540                srpc_test_reply_t *rep = &msg->msg_body.tes_reply;
1541
1542                __swab32s(&rep->tsr_status);
1543                sfw_unpack_sid(rep->tsr_sid);
1544                return;
1545        }
1546
1547        if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1548                srpc_join_reqst_t *req = &msg->msg_body.join_reqst;
1549
1550                __swab64s(&req->join_rpyid);
1551                sfw_unpack_sid(req->join_sid);
1552                return;
1553        }
1554
1555        if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1556                srpc_join_reply_t *rep = &msg->msg_body.join_reply;
1557
1558                __swab32s(&rep->join_status);
1559                __swab32s(&rep->join_timeout);
1560                sfw_unpack_sid(rep->join_sid);
1561                return;
1562        }
1563
1564        LBUG ();
1565        return;
1566}
1567
1568void
1569sfw_abort_rpc (srpc_client_rpc_t *rpc)
1570{
1571        LASSERT(atomic_read(&rpc->crpc_refcount) > 0);
1572        LASSERT(rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1573
1574        spin_lock(&rpc->crpc_lock);
1575        srpc_abort_rpc(rpc, -EINTR);
1576        spin_unlock(&rpc->crpc_lock);
1577        return;
1578}
1579
1580void
1581sfw_post_rpc (srpc_client_rpc_t *rpc)
1582{
1583        spin_lock(&rpc->crpc_lock);
1584
1585        LASSERT (!rpc->crpc_closed);
1586        LASSERT (!rpc->crpc_aborted);
1587        LASSERT (list_empty(&rpc->crpc_list));
1588        LASSERT (!sfw_data.fw_shuttingdown);
1589
1590        rpc->crpc_timeout = rpc_timeout;
1591        srpc_post_rpc(rpc);
1592
1593        spin_unlock(&rpc->crpc_lock);
1594        return;
1595}
1596
1597static srpc_service_t sfw_services[] =
1598{
1599        {
1600                /* sv_id */    SRPC_SERVICE_DEBUG,
1601                /* sv_name */  "debug",
1602                0
1603        },
1604        {
1605                /* sv_id */    SRPC_SERVICE_QUERY_STAT,
1606                /* sv_name */  "query stats",
1607                0
1608        },
1609        {
1610                /* sv_id */    SRPC_SERVICE_MAKE_SESSION,
1611                /* sv_name */  "make session",
1612                0
1613        },
1614        {
1615                /* sv_id */    SRPC_SERVICE_REMOVE_SESSION,
1616                /* sv_name */  "remove session",
1617                0
1618        },
1619        {
1620                /* sv_id */    SRPC_SERVICE_BATCH,
1621                /* sv_name */  "batch service",
1622                0
1623        },
1624        {
1625                /* sv_id */    SRPC_SERVICE_TEST,
1626                /* sv_name */  "test service",
1627                0
1628        },
1629        {
1630                /* sv_id */    0,
1631                /* sv_name */  NULL,
1632                0
1633        }
1634};
1635
1636extern sfw_test_client_ops_t ping_test_client;
1637extern srpc_service_t   ping_test_service;
1638extern void ping_init_test_client(void);
1639extern void ping_init_test_service(void);
1640
1641extern sfw_test_client_ops_t brw_test_client;
1642extern srpc_service_t   brw_test_service;
1643extern void brw_init_test_client(void);
1644extern void brw_init_test_service(void);
1645
1646
1647int
1648sfw_startup (void)
1649{
1650        int           i;
1651        int           rc;
1652        int           error;
1653        srpc_service_t  *sv;
1654        sfw_test_case_t *tsc;
1655
1656
1657        if (session_timeout < 0) {
1658                CERROR ("Session timeout must be non-negative: %d\n",
1659                        session_timeout);
1660                return -EINVAL;
1661        }
1662
1663        if (rpc_timeout < 0) {
1664                CERROR ("RPC timeout must be non-negative: %d\n",
1665                        rpc_timeout);
1666                return -EINVAL;
1667        }
1668
1669        if (session_timeout == 0)
1670                CWARN ("Zero session_timeout specified "
1671                       "- test sessions never expire.\n");
1672
1673        if (rpc_timeout == 0)
1674                CWARN ("Zero rpc_timeout specified "
1675                       "- test RPC never expire.\n");
1676
1677        memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1678
1679        sfw_data.fw_session     = NULL;
1680        sfw_data.fw_active_srpc = NULL;
1681        spin_lock_init(&sfw_data.fw_lock);
1682        atomic_set(&sfw_data.fw_nzombies, 0);
1683        INIT_LIST_HEAD(&sfw_data.fw_tests);
1684        INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1685        INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1686
1687        brw_init_test_client();
1688        brw_init_test_service();
1689        rc = sfw_register_test(&brw_test_service, &brw_test_client);
1690        LASSERT (rc == 0);
1691
1692        ping_init_test_client();
1693        ping_init_test_service();
1694        rc = sfw_register_test(&ping_test_service, &ping_test_client);
1695        LASSERT (rc == 0);
1696
1697        error = 0;
1698        list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
1699                sv = tsc->tsc_srv_service;
1700
1701                rc = srpc_add_service(sv);
1702                LASSERT (rc != -EBUSY);
1703                if (rc != 0) {
1704                        CWARN ("Failed to add %s service: %d\n",
1705                               sv->sv_name, rc);
1706                        error = rc;
1707                }
1708        }
1709
1710        for (i = 0; ; i++) {
1711                sv = &sfw_services[i];
1712                if (sv->sv_name == NULL) break;
1713
1714                sv->sv_bulk_ready = NULL;
1715                sv->sv_handler    = sfw_handle_server_rpc;
1716                sv->sv_wi_total   = SFW_FRWK_WI_MAX;
1717                if (sv->sv_id == SRPC_SERVICE_TEST)
1718                        sv->sv_bulk_ready = sfw_bulk_ready;
1719
1720                rc = srpc_add_service(sv);
1721                LASSERT (rc != -EBUSY);
1722                if (rc != 0) {
1723                        CWARN ("Failed to add %s service: %d\n",
1724                               sv->sv_name, rc);
1725                        error = rc;
1726                }
1727
1728                /* about to sfw_shutdown, no need to add buffer */
1729                if (error) continue;
1730
1731                rc = srpc_service_add_buffers(sv, sv->sv_wi_total);
1732                if (rc != 0) {
1733                        CWARN("Failed to reserve enough buffers: "
1734                              "service %s, %d needed: %d\n",
1735                              sv->sv_name, sv->sv_wi_total, rc);
1736                        error = -ENOMEM;
1737                }
1738        }
1739
1740        if (error != 0)
1741                sfw_shutdown();
1742        return error;
1743}
1744
1745void
1746sfw_shutdown (void)
1747{
1748        srpc_service_t  *sv;
1749        sfw_test_case_t *tsc;
1750        int              i;
1751
1752        spin_lock(&sfw_data.fw_lock);
1753
1754        sfw_data.fw_shuttingdown = 1;
1755        lst_wait_until(sfw_data.fw_active_srpc == NULL, sfw_data.fw_lock,
1756                       "waiting for active RPC to finish.\n");
1757
1758        if (sfw_del_session_timer() != 0)
1759                lst_wait_until(sfw_data.fw_session == NULL, sfw_data.fw_lock,
1760                               "waiting for session timer to explode.\n");
1761
1762        sfw_deactivate_session();
1763        lst_wait_until(atomic_read(&sfw_data.fw_nzombies) == 0,
1764                       sfw_data.fw_lock,
1765                       "waiting for %d zombie sessions to die.\n",
1766                       atomic_read(&sfw_data.fw_nzombies));
1767
1768        spin_unlock(&sfw_data.fw_lock);
1769
1770        for (i = 0; ; i++) {
1771                sv = &sfw_services[i];
1772                if (sv->sv_name == NULL)
1773                        break;
1774
1775                srpc_shutdown_service(sv);
1776                srpc_remove_service(sv);
1777        }
1778
1779        list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
1780                sv = tsc->tsc_srv_service;
1781                srpc_shutdown_service(sv);
1782                srpc_remove_service(sv);
1783        }
1784
1785        while (!list_empty(&sfw_data.fw_zombie_rpcs)) {
1786                srpc_client_rpc_t *rpc;
1787
1788                rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1789                                     srpc_client_rpc_t, crpc_list);
1790                list_del(&rpc->crpc_list);
1791
1792                LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1793        }
1794
1795        for (i = 0; ; i++) {
1796                sv = &sfw_services[i];
1797                if (sv->sv_name == NULL)
1798                        break;
1799
1800                srpc_wait_service_shutdown(sv);
1801        }
1802
1803        while (!list_empty(&sfw_data.fw_tests)) {
1804                tsc = list_entry(sfw_data.fw_tests.next,
1805                                     sfw_test_case_t, tsc_list);
1806
1807                srpc_wait_service_shutdown(tsc->tsc_srv_service);
1808
1809                list_del(&tsc->tsc_list);
1810                LIBCFS_FREE(tsc, sizeof(*tsc));
1811        }
1812
1813        return;
1814}
1815