linux/drivers/staging/lustre/lnet/lnet/net_fault.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2014, Intel Corporation.
  24 */
  25/*
  26 * This file is part of Lustre, http://www.lustre.org/
  27 * Lustre is a trademark of Seagate, Inc.
  28 *
  29 * lnet/lnet/net_fault.c
  30 *
  31 * Lustre network fault simulation
  32 *
  33 * Author: liang.zhen@intel.com
  34 */
  35
  36#define DEBUG_SUBSYSTEM S_LNET
  37
  38#include "../../include/linux/lnet/lib-lnet.h"
  39#include "../../include/linux/lnet/lnetctl.h"
  40
  41#define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
  42                                 LNET_GET_BIT | LNET_REPLY_BIT)
  43
  44struct lnet_drop_rule {
  45        /** link chain on the_lnet.ln_drop_rules */
  46        struct list_head        dr_link;
  47        /** attributes of this rule */
  48        struct lnet_fault_attr  dr_attr;
  49        /** lock to protect \a dr_drop_at and \a dr_stat */
  50        spinlock_t              dr_lock;
  51        /**
  52         * the message sequence to drop, which means message is dropped when
  53         * dr_stat.drs_count == dr_drop_at
  54         */
  55        unsigned long           dr_drop_at;
  56        /**
  57         * seconds to drop the next message, it's exclusive with dr_drop_at
  58         */
  59        unsigned long           dr_drop_time;
  60        /** baseline to caculate dr_drop_time */
  61        unsigned long           dr_time_base;
  62        /** statistic of dropped messages */
  63        struct lnet_fault_stat  dr_stat;
  64};
  65
  66static bool
  67lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
  68{
  69        if (nid == msg_nid || nid == LNET_NID_ANY)
  70                return true;
  71
  72        if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
  73                return false;
  74
  75        /* 255.255.255.255@net is wildcard for all addresses in a network */
  76        return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
  77}
  78
  79static bool
  80lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
  81                      lnet_nid_t dst, unsigned int type, unsigned int portal)
  82{
  83        if (!lnet_fault_nid_match(attr->fa_src, src) ||
  84            !lnet_fault_nid_match(attr->fa_dst, dst))
  85                return false;
  86
  87        if (!(attr->fa_msg_mask & (1 << type)))
  88                return false;
  89
  90        /**
  91         * NB: ACK and REPLY have no portal, but they should have been
  92         * rejected by message mask
  93         */
  94        if (attr->fa_ptl_mask && /* has portal filter */
  95            !(attr->fa_ptl_mask & (1ULL << portal)))
  96                return false;
  97
  98        return true;
  99}
 100
 101static int
 102lnet_fault_attr_validate(struct lnet_fault_attr *attr)
 103{
 104        if (!attr->fa_msg_mask)
 105                attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
 106
 107        if (!attr->fa_ptl_mask) /* no portal filter */
 108                return 0;
 109
 110        /* NB: only PUT and GET can be filtered if portal filter has been set */
 111        attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
 112        if (!attr->fa_msg_mask) {
 113                CDEBUG(D_NET, "can't find valid message type bits %x\n",
 114                       attr->fa_msg_mask);
 115                return -EINVAL;
 116        }
 117        return 0;
 118}
 119
 120static void
 121lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
 122{
 123        /* NB: fs_counter is NOT updated by this function */
 124        switch (type) {
 125        case LNET_MSG_PUT:
 126                stat->fs_put++;
 127                return;
 128        case LNET_MSG_ACK:
 129                stat->fs_ack++;
 130                return;
 131        case LNET_MSG_GET:
 132                stat->fs_get++;
 133                return;
 134        case LNET_MSG_REPLY:
 135                stat->fs_reply++;
 136                return;
 137        }
 138}
 139
 140/**
 141 * LNet message drop simulation
 142 */
 143
 144/**
 145 * Add a new drop rule to LNet
 146 * There is no check for duplicated drop rule, all rules will be checked for
 147 * incoming message.
 148 */
 149static int
 150lnet_drop_rule_add(struct lnet_fault_attr *attr)
 151{
 152        struct lnet_drop_rule *rule;
 153
 154        if (attr->u.drop.da_rate & attr->u.drop.da_interval) {
 155                CDEBUG(D_NET, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
 156                       attr->u.drop.da_rate, attr->u.drop.da_interval);
 157                return -EINVAL;
 158        }
 159
 160        if (lnet_fault_attr_validate(attr))
 161                return -EINVAL;
 162
 163        CFS_ALLOC_PTR(rule);
 164        if (!rule)
 165                return -ENOMEM;
 166
 167        spin_lock_init(&rule->dr_lock);
 168
 169        rule->dr_attr = *attr;
 170        if (attr->u.drop.da_interval) {
 171                rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
 172                rule->dr_drop_time = cfs_time_shift(cfs_rand() %
 173                                                    attr->u.drop.da_interval);
 174        } else {
 175                rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
 176        }
 177
 178        lnet_net_lock(LNET_LOCK_EX);
 179        list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
 180        lnet_net_unlock(LNET_LOCK_EX);
 181
 182        CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
 183               libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
 184               attr->u.drop.da_rate, attr->u.drop.da_interval);
 185        return 0;
 186}
 187
 188/**
 189 * Remove matched drop rules from lnet, all rules that can match \a src and
 190 * \a dst will be removed.
 191 * If \a src is zero, then all rules have \a dst as destination will be remove
 192 * If \a dst is zero, then all rules have \a src as source will be removed
 193 * If both of them are zero, all rules will be removed
 194 */
 195static int
 196lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
 197{
 198        struct lnet_drop_rule *rule;
 199        struct lnet_drop_rule *tmp;
 200        struct list_head zombies;
 201        int n = 0;
 202
 203        INIT_LIST_HEAD(&zombies);
 204
 205        lnet_net_lock(LNET_LOCK_EX);
 206        list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
 207                if (rule->dr_attr.fa_src != src && src)
 208                        continue;
 209
 210                if (rule->dr_attr.fa_dst != dst && dst)
 211                        continue;
 212
 213                list_move(&rule->dr_link, &zombies);
 214        }
 215        lnet_net_unlock(LNET_LOCK_EX);
 216
 217        list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
 218                CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
 219                       libcfs_nid2str(rule->dr_attr.fa_src),
 220                       libcfs_nid2str(rule->dr_attr.fa_dst),
 221                       rule->dr_attr.u.drop.da_rate,
 222                       rule->dr_attr.u.drop.da_interval);
 223
 224                list_del(&rule->dr_link);
 225                CFS_FREE_PTR(rule);
 226                n++;
 227        }
 228
 229        return n;
 230}
 231
 232/**
 233 * List drop rule at position of \a pos
 234 */
 235static int
 236lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
 237                    struct lnet_fault_stat *stat)
 238{
 239        struct lnet_drop_rule *rule;
 240        int cpt;
 241        int i = 0;
 242        int rc = -ENOENT;
 243
 244        cpt = lnet_net_lock_current();
 245        list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
 246                if (i++ < pos)
 247                        continue;
 248
 249                spin_lock(&rule->dr_lock);
 250                *attr = rule->dr_attr;
 251                *stat = rule->dr_stat;
 252                spin_unlock(&rule->dr_lock);
 253                rc = 0;
 254                break;
 255        }
 256
 257        lnet_net_unlock(cpt);
 258        return rc;
 259}
 260
 261/**
 262 * reset counters for all drop rules
 263 */
 264static void
 265lnet_drop_rule_reset(void)
 266{
 267        struct lnet_drop_rule *rule;
 268        int cpt;
 269
 270        cpt = lnet_net_lock_current();
 271
 272        list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
 273                struct lnet_fault_attr *attr = &rule->dr_attr;
 274
 275                spin_lock(&rule->dr_lock);
 276
 277                memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
 278                if (attr->u.drop.da_rate) {
 279                        rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
 280                } else {
 281                        rule->dr_drop_time = cfs_time_shift(cfs_rand() %
 282                                                attr->u.drop.da_interval);
 283                        rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
 284                }
 285                spin_unlock(&rule->dr_lock);
 286        }
 287
 288        lnet_net_unlock(cpt);
 289}
 290
 291/**
 292 * check source/destination NID, portal, message type and drop rate,
 293 * decide whether should drop this message or not
 294 */
 295static bool
 296drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
 297                lnet_nid_t dst, unsigned int type, unsigned int portal)
 298{
 299        struct lnet_fault_attr *attr = &rule->dr_attr;
 300        bool drop;
 301
 302        if (!lnet_fault_attr_match(attr, src, dst, type, portal))
 303                return false;
 304
 305        /* match this rule, check drop rate now */
 306        spin_lock(&rule->dr_lock);
 307        if (rule->dr_drop_time) { /* time based drop */
 308                unsigned long now = cfs_time_current();
 309
 310                rule->dr_stat.fs_count++;
 311                drop = cfs_time_aftereq(now, rule->dr_drop_time);
 312                if (drop) {
 313                        if (cfs_time_after(now, rule->dr_time_base))
 314                                rule->dr_time_base = now;
 315
 316                        rule->dr_drop_time = rule->dr_time_base +
 317                                             cfs_time_seconds(cfs_rand() %
 318                                                attr->u.drop.da_interval);
 319                        rule->dr_time_base += cfs_time_seconds(attr->u.drop.da_interval);
 320
 321                        CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lu\n",
 322                               libcfs_nid2str(attr->fa_src),
 323                               libcfs_nid2str(attr->fa_dst),
 324                               rule->dr_drop_time);
 325                }
 326
 327        } else { /* rate based drop */
 328                drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
 329
 330                if (!do_div(rule->dr_stat.fs_count, attr->u.drop.da_rate)) {
 331                        rule->dr_drop_at = rule->dr_stat.fs_count +
 332                                           cfs_rand() % attr->u.drop.da_rate;
 333                        CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
 334                               libcfs_nid2str(attr->fa_src),
 335                               libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
 336                }
 337        }
 338
 339        if (drop) { /* drop this message, update counters */
 340                lnet_fault_stat_inc(&rule->dr_stat, type);
 341                rule->dr_stat.u.drop.ds_dropped++;
 342        }
 343
 344        spin_unlock(&rule->dr_lock);
 345        return drop;
 346}
 347
 348/**
 349 * Check if message from \a src to \a dst can match any existed drop rule
 350 */
 351bool
 352lnet_drop_rule_match(lnet_hdr_t *hdr)
 353{
 354        struct lnet_drop_rule *rule;
 355        lnet_nid_t src = le64_to_cpu(hdr->src_nid);
 356        lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
 357        unsigned int typ = le32_to_cpu(hdr->type);
 358        unsigned int ptl = -1;
 359        bool drop = false;
 360        int cpt;
 361
 362        /**
 363         * NB: if Portal is specified, then only PUT and GET will be
 364         * filtered by drop rule
 365         */
 366        if (typ == LNET_MSG_PUT)
 367                ptl = le32_to_cpu(hdr->msg.put.ptl_index);
 368        else if (typ == LNET_MSG_GET)
 369                ptl = le32_to_cpu(hdr->msg.get.ptl_index);
 370
 371        cpt = lnet_net_lock_current();
 372        list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
 373                drop = drop_rule_match(rule, src, dst, typ, ptl);
 374                if (drop)
 375                        break;
 376        }
 377
 378        lnet_net_unlock(cpt);
 379        return drop;
 380}
 381
 382/**
 383 * LNet Delay Simulation
 384 */
 385/** timestamp (second) to send delayed message */
 386#define msg_delay_send           msg_ev.hdr_data
 387
 388struct lnet_delay_rule {
 389        /** link chain on the_lnet.ln_delay_rules */
 390        struct list_head        dl_link;
 391        /** link chain on delay_dd.dd_sched_rules */
 392        struct list_head        dl_sched_link;
 393        /** attributes of this rule */
 394        struct lnet_fault_attr  dl_attr;
 395        /** lock to protect \a below members */
 396        spinlock_t              dl_lock;
 397        /** refcount of delay rule */
 398        atomic_t                dl_refcount;
 399        /**
 400         * the message sequence to delay, which means message is delayed when
 401         * dl_stat.fs_count == dl_delay_at
 402         */
 403        unsigned long           dl_delay_at;
 404        /**
 405         * seconds to delay the next message, it's exclusive with dl_delay_at
 406         */
 407        unsigned long           dl_delay_time;
 408        /** baseline to caculate dl_delay_time */
 409        unsigned long           dl_time_base;
 410        /** jiffies to send the next delayed message */
 411        unsigned long           dl_msg_send;
 412        /** delayed message list */
 413        struct list_head        dl_msg_list;
 414        /** statistic of delayed messages */
 415        struct lnet_fault_stat  dl_stat;
 416        /** timer to wakeup delay_daemon */
 417        struct timer_list       dl_timer;
 418};
 419
 420struct delay_daemon_data {
 421        /** serialise rule add/remove */
 422        struct mutex            dd_mutex;
 423        /** protect rules on \a dd_sched_rules */
 424        spinlock_t              dd_lock;
 425        /** scheduled delay rules (by timer) */
 426        struct list_head        dd_sched_rules;
 427        /** daemon thread sleeps at here */
 428        wait_queue_head_t       dd_waitq;
 429        /** controller (lctl command) wait at here */
 430        wait_queue_head_t       dd_ctl_waitq;
 431        /** daemon is running */
 432        unsigned int            dd_running;
 433        /** daemon stopped */
 434        unsigned int            dd_stopped;
 435};
 436
 437static struct delay_daemon_data delay_dd;
 438
 439static unsigned long
 440round_timeout(unsigned long timeout)
 441{
 442        return cfs_time_seconds((unsigned int)
 443                        cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
 444}
 445
 446static void
 447delay_rule_decref(struct lnet_delay_rule *rule)
 448{
 449        if (atomic_dec_and_test(&rule->dl_refcount)) {
 450                LASSERT(list_empty(&rule->dl_sched_link));
 451                LASSERT(list_empty(&rule->dl_msg_list));
 452                LASSERT(list_empty(&rule->dl_link));
 453
 454                CFS_FREE_PTR(rule);
 455        }
 456}
 457
 458/**
 459 * check source/destination NID, portal, message type and delay rate,
 460 * decide whether should delay this message or not
 461 */
 462static bool
 463delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
 464                 lnet_nid_t dst, unsigned int type, unsigned int portal,
 465                 struct lnet_msg *msg)
 466{
 467        struct lnet_fault_attr *attr = &rule->dl_attr;
 468        bool delay;
 469
 470        if (!lnet_fault_attr_match(attr, src, dst, type, portal))
 471                return false;
 472
 473        /* match this rule, check delay rate now */
 474        spin_lock(&rule->dl_lock);
 475        if (rule->dl_delay_time) { /* time based delay */
 476                unsigned long now = cfs_time_current();
 477
 478                rule->dl_stat.fs_count++;
 479                delay = cfs_time_aftereq(now, rule->dl_delay_time);
 480                if (delay) {
 481                        if (cfs_time_after(now, rule->dl_time_base))
 482                                rule->dl_time_base = now;
 483
 484                        rule->dl_delay_time = rule->dl_time_base +
 485                                             cfs_time_seconds(cfs_rand() %
 486                                                attr->u.delay.la_interval);
 487                        rule->dl_time_base += cfs_time_seconds(attr->u.delay.la_interval);
 488
 489                        CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lu\n",
 490                               libcfs_nid2str(attr->fa_src),
 491                               libcfs_nid2str(attr->fa_dst),
 492                               rule->dl_delay_time);
 493                }
 494
 495        } else { /* rate based delay */
 496                delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
 497                /* generate the next random rate sequence */
 498                if (!do_div(rule->dl_stat.fs_count, attr->u.delay.la_rate)) {
 499                        rule->dl_delay_at = rule->dl_stat.fs_count +
 500                                            cfs_rand() % attr->u.delay.la_rate;
 501                        CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
 502                               libcfs_nid2str(attr->fa_src),
 503                               libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
 504                }
 505        }
 506
 507        if (!delay) {
 508                spin_unlock(&rule->dl_lock);
 509                return false;
 510        }
 511
 512        /* delay this message, update counters */
 513        lnet_fault_stat_inc(&rule->dl_stat, type);
 514        rule->dl_stat.u.delay.ls_delayed++;
 515
 516        list_add_tail(&msg->msg_list, &rule->dl_msg_list);
 517        msg->msg_delay_send = round_timeout(
 518                        cfs_time_shift(attr->u.delay.la_latency));
 519        if (rule->dl_msg_send == -1) {
 520                rule->dl_msg_send = msg->msg_delay_send;
 521                mod_timer(&rule->dl_timer, rule->dl_msg_send);
 522        }
 523
 524        spin_unlock(&rule->dl_lock);
 525        return true;
 526}
 527
 528/**
 529 * check if \a msg can match any Delay Rule, receiving of this message
 530 * will be delayed if there is a match.
 531 */
 532bool
 533lnet_delay_rule_match_locked(lnet_hdr_t *hdr, struct lnet_msg *msg)
 534{
 535        struct lnet_delay_rule *rule;
 536        lnet_nid_t src = le64_to_cpu(hdr->src_nid);
 537        lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
 538        unsigned int typ = le32_to_cpu(hdr->type);
 539        unsigned int ptl = -1;
 540
 541        /* NB: called with hold of lnet_net_lock */
 542
 543        /**
 544         * NB: if Portal is specified, then only PUT and GET will be
 545         * filtered by delay rule
 546         */
 547        if (typ == LNET_MSG_PUT)
 548                ptl = le32_to_cpu(hdr->msg.put.ptl_index);
 549        else if (typ == LNET_MSG_GET)
 550                ptl = le32_to_cpu(hdr->msg.get.ptl_index);
 551
 552        list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
 553                if (delay_rule_match(rule, src, dst, typ, ptl, msg))
 554                        return true;
 555        }
 556
 557        return false;
 558}
 559
 560/** check out delayed messages for send */
 561static void
 562delayed_msg_check(struct lnet_delay_rule *rule, bool all,
 563                  struct list_head *msg_list)
 564{
 565        struct lnet_msg *msg;
 566        struct lnet_msg *tmp;
 567        unsigned long now = cfs_time_current();
 568
 569        if (!all && rule->dl_msg_send > now)
 570                return;
 571
 572        spin_lock(&rule->dl_lock);
 573        list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
 574                if (!all && msg->msg_delay_send > now)
 575                        break;
 576
 577                msg->msg_delay_send = 0;
 578                list_move_tail(&msg->msg_list, msg_list);
 579        }
 580
 581        if (list_empty(&rule->dl_msg_list)) {
 582                del_timer(&rule->dl_timer);
 583                rule->dl_msg_send = -1;
 584
 585        } else if (!list_empty(msg_list)) {
 586                /*
 587                 * dequeued some timedout messages, update timer for the
 588                 * next delayed message on rule
 589                 */
 590                msg = list_entry(rule->dl_msg_list.next,
 591                                 struct lnet_msg, msg_list);
 592                rule->dl_msg_send = msg->msg_delay_send;
 593                mod_timer(&rule->dl_timer, rule->dl_msg_send);
 594        }
 595        spin_unlock(&rule->dl_lock);
 596}
 597
 598static void
 599delayed_msg_process(struct list_head *msg_list, bool drop)
 600{
 601        struct lnet_msg *msg;
 602
 603        while (!list_empty(msg_list)) {
 604                struct lnet_ni *ni;
 605                int cpt;
 606                int rc;
 607
 608                msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
 609                LASSERT(msg->msg_rxpeer);
 610
 611                ni = msg->msg_rxpeer->lp_ni;
 612                cpt = msg->msg_rx_cpt;
 613
 614                list_del_init(&msg->msg_list);
 615                if (drop) {
 616                        rc = -ECANCELED;
 617
 618                } else if (!msg->msg_routing) {
 619                        rc = lnet_parse_local(ni, msg);
 620                        if (!rc)
 621                                continue;
 622
 623                } else {
 624                        lnet_net_lock(cpt);
 625                        rc = lnet_parse_forward_locked(ni, msg);
 626                        lnet_net_unlock(cpt);
 627
 628                        switch (rc) {
 629                        case LNET_CREDIT_OK:
 630                                lnet_ni_recv(ni, msg->msg_private, msg, 0,
 631                                             0, msg->msg_len, msg->msg_len);
 632                        case LNET_CREDIT_WAIT:
 633                                continue;
 634                        default: /* failures */
 635                                break;
 636                        }
 637                }
 638
 639                lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
 640                lnet_finalize(ni, msg, rc);
 641        }
 642}
 643
 644/**
 645 * Process delayed messages for scheduled rules
 646 * This function can either be called by delay_rule_daemon, or by lnet_finalise
 647 */
 648void
 649lnet_delay_rule_check(void)
 650{
 651        struct lnet_delay_rule *rule;
 652        struct list_head msgs;
 653
 654        INIT_LIST_HEAD(&msgs);
 655        while (1) {
 656                if (list_empty(&delay_dd.dd_sched_rules))
 657                        break;
 658
 659                spin_lock_bh(&delay_dd.dd_lock);
 660                if (list_empty(&delay_dd.dd_sched_rules)) {
 661                        spin_unlock_bh(&delay_dd.dd_lock);
 662                        break;
 663                }
 664
 665                rule = list_entry(delay_dd.dd_sched_rules.next,
 666                                  struct lnet_delay_rule, dl_sched_link);
 667                list_del_init(&rule->dl_sched_link);
 668                spin_unlock_bh(&delay_dd.dd_lock);
 669
 670                delayed_msg_check(rule, false, &msgs);
 671                delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
 672        }
 673
 674        if (!list_empty(&msgs))
 675                delayed_msg_process(&msgs, false);
 676}
 677
 678/** daemon thread to handle delayed messages */
 679static int
 680lnet_delay_rule_daemon(void *arg)
 681{
 682        delay_dd.dd_running = 1;
 683        wake_up(&delay_dd.dd_ctl_waitq);
 684
 685        while (delay_dd.dd_running) {
 686                wait_event_interruptible(delay_dd.dd_waitq,
 687                                         !delay_dd.dd_running ||
 688                                         !list_empty(&delay_dd.dd_sched_rules));
 689                lnet_delay_rule_check();
 690        }
 691
 692        /* in case more rules have been enqueued after my last check */
 693        lnet_delay_rule_check();
 694        delay_dd.dd_stopped = 1;
 695        wake_up(&delay_dd.dd_ctl_waitq);
 696
 697        return 0;
 698}
 699
 700static void
 701delay_timer_cb(unsigned long arg)
 702{
 703        struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
 704
 705        spin_lock_bh(&delay_dd.dd_lock);
 706        if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
 707                atomic_inc(&rule->dl_refcount);
 708                list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
 709                wake_up(&delay_dd.dd_waitq);
 710        }
 711        spin_unlock_bh(&delay_dd.dd_lock);
 712}
 713
 714/**
 715 * Add a new delay rule to LNet
 716 * There is no check for duplicated delay rule, all rules will be checked for
 717 * incoming message.
 718 */
 719int
 720lnet_delay_rule_add(struct lnet_fault_attr *attr)
 721{
 722        struct lnet_delay_rule *rule;
 723        int rc = 0;
 724
 725        if (attr->u.delay.la_rate & attr->u.delay.la_interval) {
 726                CDEBUG(D_NET, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
 727                       attr->u.delay.la_rate, attr->u.delay.la_interval);
 728                return -EINVAL;
 729        }
 730
 731        if (!attr->u.delay.la_latency) {
 732                CDEBUG(D_NET, "delay latency cannot be zero\n");
 733                return -EINVAL;
 734        }
 735
 736        if (lnet_fault_attr_validate(attr))
 737                return -EINVAL;
 738
 739        CFS_ALLOC_PTR(rule);
 740        if (!rule)
 741                return -ENOMEM;
 742
 743        mutex_lock(&delay_dd.dd_mutex);
 744        if (!delay_dd.dd_running) {
 745                struct task_struct *task;
 746
 747                /**
 748                 *  NB: although LND threads will process delayed message
 749                 * in lnet_finalize, but there is no guarantee that LND
 750                 * threads will be waken up if no other message needs to
 751                 * be handled.
 752                 * Only one daemon thread, performance is not the concern
 753                 * of this simualation module.
 754                 */
 755                task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
 756                if (IS_ERR(task)) {
 757                        rc = PTR_ERR(task);
 758                        goto failed;
 759                }
 760                wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
 761        }
 762
 763        init_timer(&rule->dl_timer);
 764        rule->dl_timer.function = delay_timer_cb;
 765        rule->dl_timer.data = (unsigned long)rule;
 766
 767        spin_lock_init(&rule->dl_lock);
 768        INIT_LIST_HEAD(&rule->dl_msg_list);
 769        INIT_LIST_HEAD(&rule->dl_sched_link);
 770
 771        rule->dl_attr = *attr;
 772        if (attr->u.delay.la_interval) {
 773                rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
 774                rule->dl_delay_time = cfs_time_shift(cfs_rand() %
 775                                                     attr->u.delay.la_interval);
 776        } else {
 777                rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
 778        }
 779
 780        rule->dl_msg_send = -1;
 781
 782        lnet_net_lock(LNET_LOCK_EX);
 783        atomic_set(&rule->dl_refcount, 1);
 784        list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
 785        lnet_net_unlock(LNET_LOCK_EX);
 786
 787        CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
 788               libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
 789               attr->u.delay.la_rate);
 790
 791        mutex_unlock(&delay_dd.dd_mutex);
 792        return 0;
 793failed:
 794        mutex_unlock(&delay_dd.dd_mutex);
 795        CFS_FREE_PTR(rule);
 796        return rc;
 797}
 798
 799/**
 800 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
 801 * and \a dst are zero, all rules will be removed, otherwise only matched rules
 802 * will be removed.
 803 * If \a src is zero, then all rules have \a dst as destination will be remove
 804 * If \a dst is zero, then all rules have \a src as source will be removed
 805 *
 806 * When a delay rule is removed, all delayed messages of this rule will be
 807 * processed immediately.
 808 */
 809int
 810lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
 811{
 812        struct lnet_delay_rule *rule;
 813        struct lnet_delay_rule *tmp;
 814        struct list_head rule_list;
 815        struct list_head msg_list;
 816        int n = 0;
 817        bool cleanup;
 818
 819        INIT_LIST_HEAD(&rule_list);
 820        INIT_LIST_HEAD(&msg_list);
 821
 822        if (shutdown) {
 823                src = 0;
 824                dst = 0;
 825        }
 826
 827        mutex_lock(&delay_dd.dd_mutex);
 828        lnet_net_lock(LNET_LOCK_EX);
 829
 830        list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
 831                if (rule->dl_attr.fa_src != src && src)
 832                        continue;
 833
 834                if (rule->dl_attr.fa_dst != dst && dst)
 835                        continue;
 836
 837                CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
 838                       libcfs_nid2str(rule->dl_attr.fa_src),
 839                       libcfs_nid2str(rule->dl_attr.fa_dst),
 840                       rule->dl_attr.u.delay.la_rate,
 841                       rule->dl_attr.u.delay.la_interval);
 842                /* refcount is taken over by rule_list */
 843                list_move(&rule->dl_link, &rule_list);
 844        }
 845
 846        /* check if we need to shutdown delay_daemon */
 847        cleanup = list_empty(&the_lnet.ln_delay_rules) &&
 848                  !list_empty(&rule_list);
 849        lnet_net_unlock(LNET_LOCK_EX);
 850
 851        list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
 852                list_del_init(&rule->dl_link);
 853
 854                del_timer_sync(&rule->dl_timer);
 855                delayed_msg_check(rule, true, &msg_list);
 856                delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
 857                n++;
 858        }
 859
 860        if (cleanup) { /* no more delay rule, shutdown delay_daemon */
 861                LASSERT(delay_dd.dd_running);
 862                delay_dd.dd_running = 0;
 863                wake_up(&delay_dd.dd_waitq);
 864
 865                while (!delay_dd.dd_stopped)
 866                        wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
 867        }
 868        mutex_unlock(&delay_dd.dd_mutex);
 869
 870        if (!list_empty(&msg_list))
 871                delayed_msg_process(&msg_list, shutdown);
 872
 873        return n;
 874}
 875
 876/**
 877 * List Delay Rule at position of \a pos
 878 */
 879int
 880lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
 881                     struct lnet_fault_stat *stat)
 882{
 883        struct lnet_delay_rule *rule;
 884        int cpt;
 885        int i = 0;
 886        int rc = -ENOENT;
 887
 888        cpt = lnet_net_lock_current();
 889        list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
 890                if (i++ < pos)
 891                        continue;
 892
 893                spin_lock(&rule->dl_lock);
 894                *attr = rule->dl_attr;
 895                *stat = rule->dl_stat;
 896                spin_unlock(&rule->dl_lock);
 897                rc = 0;
 898                break;
 899        }
 900
 901        lnet_net_unlock(cpt);
 902        return rc;
 903}
 904
 905/**
 906 * reset counters for all Delay Rules
 907 */
 908void
 909lnet_delay_rule_reset(void)
 910{
 911        struct lnet_delay_rule *rule;
 912        int cpt;
 913
 914        cpt = lnet_net_lock_current();
 915
 916        list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
 917                struct lnet_fault_attr *attr = &rule->dl_attr;
 918
 919                spin_lock(&rule->dl_lock);
 920
 921                memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
 922                if (attr->u.delay.la_rate) {
 923                        rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
 924                } else {
 925                        rule->dl_delay_time = cfs_time_shift(cfs_rand() %
 926                                                attr->u.delay.la_interval);
 927                        rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
 928                }
 929                spin_unlock(&rule->dl_lock);
 930        }
 931
 932        lnet_net_unlock(cpt);
 933}
 934
 935int
 936lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
 937{
 938        struct lnet_fault_attr *attr;
 939        struct lnet_fault_stat *stat;
 940
 941        attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
 942
 943        switch (opc) {
 944        default:
 945                return -EINVAL;
 946
 947        case LNET_CTL_DROP_ADD:
 948                if (!attr)
 949                        return -EINVAL;
 950
 951                return lnet_drop_rule_add(attr);
 952
 953        case LNET_CTL_DROP_DEL:
 954                if (!attr)
 955                        return -EINVAL;
 956
 957                data->ioc_count = lnet_drop_rule_del(attr->fa_src,
 958                                                     attr->fa_dst);
 959                return 0;
 960
 961        case LNET_CTL_DROP_RESET:
 962                lnet_drop_rule_reset();
 963                return 0;
 964
 965        case LNET_CTL_DROP_LIST:
 966                stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
 967                if (!attr || !stat)
 968                        return -EINVAL;
 969
 970                return lnet_drop_rule_list(data->ioc_count, attr, stat);
 971
 972        case LNET_CTL_DELAY_ADD:
 973                if (!attr)
 974                        return -EINVAL;
 975
 976                return lnet_delay_rule_add(attr);
 977
 978        case LNET_CTL_DELAY_DEL:
 979                if (!attr)
 980                        return -EINVAL;
 981
 982                data->ioc_count = lnet_delay_rule_del(attr->fa_src,
 983                                                      attr->fa_dst, false);
 984                return 0;
 985
 986        case LNET_CTL_DELAY_RESET:
 987                lnet_delay_rule_reset();
 988                return 0;
 989
 990        case LNET_CTL_DELAY_LIST:
 991                stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
 992                if (!attr || !stat)
 993                        return -EINVAL;
 994
 995                return lnet_delay_rule_list(data->ioc_count, attr, stat);
 996        }
 997}
 998
 999int
1000lnet_fault_init(void)
1001{
1002        CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1003        CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1004        CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1005        CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1006
1007        mutex_init(&delay_dd.dd_mutex);
1008        spin_lock_init(&delay_dd.dd_lock);
1009        init_waitqueue_head(&delay_dd.dd_waitq);
1010        init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1011        INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1012
1013        return 0;
1014}
1015
1016void
1017lnet_fault_fini(void)
1018{
1019        lnet_drop_rule_del(0, 0);
1020        lnet_delay_rule_del(0, 0, true);
1021
1022        LASSERT(list_empty(&the_lnet.ln_drop_rules));
1023        LASSERT(list_empty(&the_lnet.ln_delay_rules));
1024        LASSERT(list_empty(&delay_dd.dd_sched_rules));
1025}
1026