linux/drivers/staging/lustre/lnet/lnet/router.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
   3 *
   4 * Copyright (c) 2011, 2012, Intel Corporation.
   5 *
   6 *   This file is part of Portals
   7 *   http://sourceforge.net/projects/sandiaportals/
   8 *
   9 *   Portals is free software; you can redistribute it and/or
  10 *   modify it under the terms of version 2 of the GNU General Public
  11 *   License as published by the Free Software Foundation.
  12 *
  13 *   Portals is distributed in the hope that it will be useful,
  14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16 *   GNU General Public License for more details.
  17 *
  18 *   You should have received a copy of the GNU General Public License
  19 *   along with Portals; if not, write to the Free Software
  20 *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  21 *
  22 */
  23
  24#define DEBUG_SUBSYSTEM S_LNET
  25#include <linux/lnet/lib-lnet.h>
  26
  27#if  defined(LNET_ROUTER)
  28
  29#define LNET_NRB_TINY_MIN       512     /* min value for each CPT */
  30#define LNET_NRB_TINY           (LNET_NRB_TINY_MIN * 4)
  31#define LNET_NRB_SMALL_MIN      4096    /* min value for each CPT */
  32#define LNET_NRB_SMALL          (LNET_NRB_SMALL_MIN * 4)
  33#define LNET_NRB_LARGE_MIN      256     /* min value for each CPT */
  34#define LNET_NRB_LARGE          (LNET_NRB_LARGE_MIN * 4)
  35
  36static char *forwarding = "";
  37module_param(forwarding, charp, 0444);
  38MODULE_PARM_DESC(forwarding, "Explicitly enable/disable forwarding between networks");
  39
  40static int tiny_router_buffers;
  41module_param(tiny_router_buffers, int, 0444);
  42MODULE_PARM_DESC(tiny_router_buffers, "# of 0 payload messages to buffer in the router");
  43static int small_router_buffers;
  44module_param(small_router_buffers, int, 0444);
  45MODULE_PARM_DESC(small_router_buffers, "# of small (1 page) messages to buffer in the router");
  46static int large_router_buffers;
  47module_param(large_router_buffers, int, 0444);
  48MODULE_PARM_DESC(large_router_buffers, "# of large messages to buffer in the router");
  49static int peer_buffer_credits = 0;
  50module_param(peer_buffer_credits, int, 0444);
  51MODULE_PARM_DESC(peer_buffer_credits, "# router buffer credits per peer");
  52
  53static int auto_down = 1;
  54module_param(auto_down, int, 0444);
  55MODULE_PARM_DESC(auto_down, "Automatically mark peers down on comms error");
  56
  57int
  58lnet_peer_buffer_credits(lnet_ni_t *ni)
  59{
  60        /* NI option overrides LNet default */
  61        if (ni->ni_peerrtrcredits > 0)
  62                return ni->ni_peerrtrcredits;
  63        if (peer_buffer_credits > 0)
  64                return peer_buffer_credits;
  65
  66        /* As an approximation, allow this peer the same number of router
  67         * buffers as it is allowed outstanding sends */
  68        return ni->ni_peertxcredits;
  69}
  70
  71/* forward ref's */
  72static int lnet_router_checker(void *);
  73#else
  74
  75int
  76lnet_peer_buffer_credits(lnet_ni_t *ni)
  77{
  78        return 0;
  79}
  80
  81#endif
  82
  83static int check_routers_before_use = 0;
  84module_param(check_routers_before_use, int, 0444);
  85MODULE_PARM_DESC(check_routers_before_use, "Assume routers are down and ping them before use");
  86
  87static int avoid_asym_router_failure = 1;
  88module_param(avoid_asym_router_failure, int, 0644);
  89MODULE_PARM_DESC(avoid_asym_router_failure, "Avoid asymmetrical router failures (0 to disable)");
  90
  91static int dead_router_check_interval = 60;
  92module_param(dead_router_check_interval, int, 0644);
  93MODULE_PARM_DESC(dead_router_check_interval, "Seconds between dead router health checks (<= 0 to disable)");
  94
  95static int live_router_check_interval = 60;
  96module_param(live_router_check_interval, int, 0644);
  97MODULE_PARM_DESC(live_router_check_interval, "Seconds between live router health checks (<= 0 to disable)");
  98
  99static int router_ping_timeout = 50;
 100module_param(router_ping_timeout, int, 0644);
 101MODULE_PARM_DESC(router_ping_timeout, "Seconds to wait for the reply to a router health query");
 102
 103int
 104lnet_peers_start_down(void)
 105{
 106        return check_routers_before_use;
 107}
 108
 109void
 110lnet_notify_locked(lnet_peer_t *lp, int notifylnd, int alive, cfs_time_t when)
 111{
 112        if (cfs_time_before(when, lp->lp_timestamp)) { /* out of date information */
 113                CDEBUG(D_NET, "Out of date\n");
 114                return;
 115        }
 116
 117        lp->lp_timestamp = when;                /* update timestamp */
 118        lp->lp_ping_deadline = 0;              /* disable ping timeout */
 119
 120        if (lp->lp_alive_count != 0 &&    /* got old news */
 121            (!lp->lp_alive) == (!alive)) {      /* new date for old news */
 122                CDEBUG(D_NET, "Old news\n");
 123                return;
 124        }
 125
 126        /* Flag that notification is outstanding */
 127
 128        lp->lp_alive_count++;
 129        lp->lp_alive = !(!alive);              /* 1 bit! */
 130        lp->lp_notify = 1;
 131        lp->lp_notifylnd |= notifylnd;
 132        if (lp->lp_alive)
 133                lp->lp_ping_feats = LNET_PING_FEAT_INVAL; /* reset */
 134
 135        CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
 136}
 137
 138void
 139lnet_ni_notify_locked(lnet_ni_t *ni, lnet_peer_t *lp)
 140{
 141        int     alive;
 142        int     notifylnd;
 143
 144        /* Notify only in 1 thread at any time to ensure ordered notification.
 145         * NB individual events can be missed; the only guarantee is that you
 146         * always get the most recent news */
 147
 148        if (lp->lp_notifying)
 149                return;
 150
 151        lp->lp_notifying = 1;
 152
 153        while (lp->lp_notify) {
 154                alive     = lp->lp_alive;
 155                notifylnd = lp->lp_notifylnd;
 156
 157                lp->lp_notifylnd = 0;
 158                lp->lp_notify    = 0;
 159
 160                if (notifylnd && ni->ni_lnd->lnd_notify != NULL) {
 161                        lnet_net_unlock(lp->lp_cpt);
 162
 163                        /* A new notification could happen now; I'll handle it
 164                         * when control returns to me */
 165
 166                        (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
 167
 168                        lnet_net_lock(lp->lp_cpt);
 169                }
 170        }
 171
 172        lp->lp_notifying = 0;
 173}
 174
 175
 176static void
 177lnet_rtr_addref_locked(lnet_peer_t *lp)
 178{
 179        LASSERT(lp->lp_refcount > 0);
 180        LASSERT(lp->lp_rtr_refcount >= 0);
 181
 182        /* lnet_net_lock must be exclusively locked */
 183        lp->lp_rtr_refcount++;
 184        if (lp->lp_rtr_refcount == 1) {
 185                struct list_head *pos;
 186
 187                /* a simple insertion sort */
 188                list_for_each_prev(pos, &the_lnet.ln_routers) {
 189                        lnet_peer_t *rtr = list_entry(pos, lnet_peer_t,
 190                                                          lp_rtr_list);
 191
 192                        if (rtr->lp_nid < lp->lp_nid)
 193                                break;
 194                }
 195
 196                list_add(&lp->lp_rtr_list, pos);
 197                /* addref for the_lnet.ln_routers */
 198                lnet_peer_addref_locked(lp);
 199                the_lnet.ln_routers_version++;
 200        }
 201}
 202
 203static void
 204lnet_rtr_decref_locked(lnet_peer_t *lp)
 205{
 206        LASSERT(lp->lp_refcount > 0);
 207        LASSERT(lp->lp_rtr_refcount > 0);
 208
 209        /* lnet_net_lock must be exclusively locked */
 210        lp->lp_rtr_refcount--;
 211        if (lp->lp_rtr_refcount == 0) {
 212                LASSERT(list_empty(&lp->lp_routes));
 213
 214                if (lp->lp_rcd != NULL) {
 215                        list_add(&lp->lp_rcd->rcd_list,
 216                                     &the_lnet.ln_rcd_deathrow);
 217                        lp->lp_rcd = NULL;
 218                }
 219
 220                list_del(&lp->lp_rtr_list);
 221                /* decref for the_lnet.ln_routers */
 222                lnet_peer_decref_locked(lp);
 223                the_lnet.ln_routers_version++;
 224        }
 225}
 226
 227lnet_remotenet_t *
 228lnet_find_net_locked (__u32 net)
 229{
 230        lnet_remotenet_t        *rnet;
 231        struct list_head                *tmp;
 232        struct list_head                *rn_list;
 233
 234        LASSERT(!the_lnet.ln_shutdown);
 235
 236        rn_list = lnet_net2rnethash(net);
 237        list_for_each(tmp, rn_list) {
 238                rnet = list_entry(tmp, lnet_remotenet_t, lrn_list);
 239
 240                if (rnet->lrn_net == net)
 241                        return rnet;
 242        }
 243        return NULL;
 244}
 245
 246static void lnet_shuffle_seed(void)
 247{
 248        static int seeded = 0;
 249        int lnd_type, seed[2];
 250        struct timeval tv;
 251        lnet_ni_t *ni;
 252        struct list_head *tmp;
 253
 254        if (seeded)
 255                return;
 256
 257        cfs_get_random_bytes(seed, sizeof(seed));
 258
 259        /* Nodes with small feet have little entropy
 260         * the NID for this node gives the most entropy in the low bits */
 261        list_for_each(tmp, &the_lnet.ln_nis) {
 262                ni = list_entry(tmp, lnet_ni_t, ni_list);
 263                lnd_type = LNET_NETTYP(LNET_NIDNET(ni->ni_nid));
 264
 265                if (lnd_type != LOLND)
 266                        seed[0] ^= (LNET_NIDADDR(ni->ni_nid) | lnd_type);
 267        }
 268
 269        do_gettimeofday(&tv);
 270        cfs_srand(tv.tv_sec ^ seed[0], tv.tv_usec ^ seed[1]);
 271        seeded = 1;
 272        return;
 273}
 274
 275/* NB expects LNET_LOCK held */
 276void
 277lnet_add_route_to_rnet (lnet_remotenet_t *rnet, lnet_route_t *route)
 278{
 279        unsigned int      len = 0;
 280        unsigned int      offset = 0;
 281        struct list_head       *e;
 282
 283        lnet_shuffle_seed();
 284
 285        list_for_each (e, &rnet->lrn_routes) {
 286                len++;
 287        }
 288
 289        /* len+1 positions to add a new entry, also prevents division by 0 */
 290        offset = cfs_rand() % (len + 1);
 291        list_for_each (e, &rnet->lrn_routes) {
 292                if (offset == 0)
 293                        break;
 294                offset--;
 295        }
 296        list_add(&route->lr_list, e);
 297        list_add(&route->lr_gwlist, &route->lr_gateway->lp_routes);
 298
 299        the_lnet.ln_remote_nets_version++;
 300        lnet_rtr_addref_locked(route->lr_gateway);
 301}
 302
 303int
 304lnet_add_route(__u32 net, unsigned int hops, lnet_nid_t gateway,
 305               unsigned int priority)
 306{
 307        struct list_head          *e;
 308        lnet_remotenet_t    *rnet;
 309        lnet_remotenet_t    *rnet2;
 310        lnet_route_t    *route;
 311        lnet_ni_t          *ni;
 312        int               add_route;
 313        int               rc;
 314
 315        CDEBUG(D_NET, "Add route: net %s hops %u priority %u gw %s\n",
 316               libcfs_net2str(net), hops, priority, libcfs_nid2str(gateway));
 317
 318        if (gateway == LNET_NID_ANY ||
 319            LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
 320            net == LNET_NIDNET(LNET_NID_ANY) ||
 321            LNET_NETTYP(net) == LOLND ||
 322            LNET_NIDNET(gateway) == net ||
 323            hops < 1 || hops > 255)
 324                return (-EINVAL);
 325
 326        if (lnet_islocalnet(net))              /* it's a local network */
 327                return 0;                      /* ignore the route entry */
 328
 329        /* Assume net, route, all new */
 330        LIBCFS_ALLOC(route, sizeof(*route));
 331        LIBCFS_ALLOC(rnet, sizeof(*rnet));
 332        if (route == NULL || rnet == NULL) {
 333                CERROR("Out of memory creating route %s %d %s\n",
 334                       libcfs_net2str(net), hops, libcfs_nid2str(gateway));
 335                if (route != NULL)
 336                        LIBCFS_FREE(route, sizeof(*route));
 337                if (rnet != NULL)
 338                        LIBCFS_FREE(rnet, sizeof(*rnet));
 339                return -ENOMEM;
 340        }
 341
 342        INIT_LIST_HEAD(&rnet->lrn_routes);
 343        rnet->lrn_net = net;
 344        route->lr_hops = hops;
 345        route->lr_net = net;
 346        route->lr_priority = priority;
 347
 348        lnet_net_lock(LNET_LOCK_EX);
 349
 350        rc = lnet_nid2peer_locked(&route->lr_gateway, gateway, LNET_LOCK_EX);
 351        if (rc != 0) {
 352                lnet_net_unlock(LNET_LOCK_EX);
 353
 354                LIBCFS_FREE(route, sizeof(*route));
 355                LIBCFS_FREE(rnet, sizeof(*rnet));
 356
 357                if (rc == -EHOSTUNREACH) { /* gateway is not on a local net */
 358                        return 0;       /* ignore the route entry */
 359                } else {
 360                        CERROR("Error %d creating route %s %d %s\n", rc,
 361                               libcfs_net2str(net), hops,
 362                               libcfs_nid2str(gateway));
 363                }
 364                return rc;
 365        }
 366
 367        LASSERT (!the_lnet.ln_shutdown);
 368
 369        rnet2 = lnet_find_net_locked(net);
 370        if (rnet2 == NULL) {
 371                /* new network */
 372                list_add_tail(&rnet->lrn_list, lnet_net2rnethash(net));
 373                rnet2 = rnet;
 374        }
 375
 376        /* Search for a duplicate route (it's a NOOP if it is) */
 377        add_route = 1;
 378        list_for_each (e, &rnet2->lrn_routes) {
 379                lnet_route_t *route2 = list_entry(e, lnet_route_t, lr_list);
 380
 381                if (route2->lr_gateway == route->lr_gateway) {
 382                        add_route = 0;
 383                        break;
 384                }
 385
 386                /* our lookups must be true */
 387                LASSERT (route2->lr_gateway->lp_nid != gateway);
 388        }
 389
 390        if (add_route) {
 391                lnet_peer_addref_locked(route->lr_gateway); /* +1 for notify */
 392                lnet_add_route_to_rnet(rnet2, route);
 393
 394                ni = route->lr_gateway->lp_ni;
 395                lnet_net_unlock(LNET_LOCK_EX);
 396
 397                /* XXX Assume alive */
 398                if (ni->ni_lnd->lnd_notify != NULL)
 399                        (ni->ni_lnd->lnd_notify)(ni, gateway, 1);
 400
 401                lnet_net_lock(LNET_LOCK_EX);
 402        }
 403
 404        /* -1 for notify or !add_route */
 405        lnet_peer_decref_locked(route->lr_gateway);
 406        lnet_net_unlock(LNET_LOCK_EX);
 407
 408        if (!add_route)
 409                LIBCFS_FREE(route, sizeof(*route));
 410
 411        if (rnet != rnet2)
 412                LIBCFS_FREE(rnet, sizeof(*rnet));
 413
 414        return 0;
 415}
 416
 417int
 418lnet_check_routes(void)
 419{
 420        lnet_remotenet_t        *rnet;
 421        lnet_route_t            *route;
 422        lnet_route_t            *route2;
 423        struct list_head                *e1;
 424        struct list_head                *e2;
 425        int                     cpt;
 426        struct list_head                *rn_list;
 427        int                     i;
 428
 429        cpt = lnet_net_lock_current();
 430
 431        for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
 432                rn_list = &the_lnet.ln_remote_nets_hash[i];
 433                list_for_each(e1, rn_list) {
 434                        rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
 435
 436                        route2 = NULL;
 437                        list_for_each(e2, &rnet->lrn_routes) {
 438                                lnet_nid_t      nid1;
 439                                lnet_nid_t      nid2;
 440                                int             net;
 441
 442                                route = list_entry(e2, lnet_route_t,
 443                                                       lr_list);
 444
 445                                if (route2 == NULL) {
 446                                        route2 = route;
 447                                        continue;
 448                                }
 449
 450                                if (route->lr_gateway->lp_ni ==
 451                                    route2->lr_gateway->lp_ni)
 452                                        continue;
 453
 454                                nid1 = route->lr_gateway->lp_nid;
 455                                nid2 = route2->lr_gateway->lp_nid;
 456                                net = rnet->lrn_net;
 457
 458                                lnet_net_unlock(cpt);
 459
 460                                CERROR("Routes to %s via %s and %s not "
 461                                       "supported\n",
 462                                       libcfs_net2str(net),
 463                                       libcfs_nid2str(nid1),
 464                                       libcfs_nid2str(nid2));
 465                                return -EINVAL;
 466                        }
 467                }
 468        }
 469
 470        lnet_net_unlock(cpt);
 471        return 0;
 472}
 473
 474int
 475lnet_del_route(__u32 net, lnet_nid_t gw_nid)
 476{
 477        struct lnet_peer        *gateway;
 478        lnet_remotenet_t        *rnet;
 479        lnet_route_t            *route;
 480        struct list_head                *e1;
 481        struct list_head                *e2;
 482        int                     rc = -ENOENT;
 483        struct list_head                *rn_list;
 484        int                     idx = 0;
 485
 486        CDEBUG(D_NET, "Del route: net %s : gw %s\n",
 487               libcfs_net2str(net), libcfs_nid2str(gw_nid));
 488
 489        /* NB Caller may specify either all routes via the given gateway
 490         * or a specific route entry actual NIDs) */
 491
 492        lnet_net_lock(LNET_LOCK_EX);
 493        if (net == LNET_NIDNET(LNET_NID_ANY))
 494                rn_list = &the_lnet.ln_remote_nets_hash[0];
 495        else
 496                rn_list = lnet_net2rnethash(net);
 497
 498 again:
 499        list_for_each(e1, rn_list) {
 500                rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
 501
 502                if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
 503                        net == rnet->lrn_net))
 504                        continue;
 505
 506                list_for_each(e2, &rnet->lrn_routes) {
 507                        route = list_entry(e2, lnet_route_t, lr_list);
 508
 509                        gateway = route->lr_gateway;
 510                        if (!(gw_nid == LNET_NID_ANY ||
 511                              gw_nid == gateway->lp_nid))
 512                                continue;
 513
 514                        list_del(&route->lr_list);
 515                        list_del(&route->lr_gwlist);
 516                        the_lnet.ln_remote_nets_version++;
 517
 518                        if (list_empty(&rnet->lrn_routes))
 519                                list_del(&rnet->lrn_list);
 520                        else
 521                                rnet = NULL;
 522
 523                        lnet_rtr_decref_locked(gateway);
 524                        lnet_peer_decref_locked(gateway);
 525
 526                        lnet_net_unlock(LNET_LOCK_EX);
 527
 528                        LIBCFS_FREE(route, sizeof(*route));
 529
 530                        if (rnet != NULL)
 531                                LIBCFS_FREE(rnet, sizeof(*rnet));
 532
 533                        rc = 0;
 534                        lnet_net_lock(LNET_LOCK_EX);
 535                        goto again;
 536                }
 537        }
 538
 539        if (net == LNET_NIDNET(LNET_NID_ANY) &&
 540            ++idx < LNET_REMOTE_NETS_HASH_SIZE) {
 541                rn_list = &the_lnet.ln_remote_nets_hash[idx];
 542                goto again;
 543        }
 544        lnet_net_unlock(LNET_LOCK_EX);
 545
 546        return rc;
 547}
 548
 549void
 550lnet_destroy_routes (void)
 551{
 552        lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
 553}
 554
 555int
 556lnet_get_route(int idx, __u32 *net, __u32 *hops,
 557               lnet_nid_t *gateway, __u32 *alive, __u32 *priority)
 558{
 559        struct list_head                *e1;
 560        struct list_head                *e2;
 561        lnet_remotenet_t        *rnet;
 562        lnet_route_t            *route;
 563        int                     cpt;
 564        int                     i;
 565        struct list_head                *rn_list;
 566
 567        cpt = lnet_net_lock_current();
 568
 569        for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
 570                rn_list = &the_lnet.ln_remote_nets_hash[i];
 571                list_for_each(e1, rn_list) {
 572                        rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
 573
 574                        list_for_each(e2, &rnet->lrn_routes) {
 575                                route = list_entry(e2, lnet_route_t,
 576                                                       lr_list);
 577
 578                                if (idx-- == 0) {
 579                                        *net      = rnet->lrn_net;
 580                                        *hops     = route->lr_hops;
 581                                        *priority = route->lr_priority;
 582                                        *gateway  = route->lr_gateway->lp_nid;
 583                                        *alive    = route->lr_gateway->lp_alive;
 584                                        lnet_net_unlock(cpt);
 585                                        return 0;
 586                                }
 587                        }
 588                }
 589        }
 590
 591        lnet_net_unlock(cpt);
 592        return -ENOENT;
 593}
 594
 595void
 596lnet_swap_pinginfo(lnet_ping_info_t *info)
 597{
 598        int            i;
 599        lnet_ni_status_t *stat;
 600
 601        __swab32s(&info->pi_magic);
 602        __swab32s(&info->pi_features);
 603        __swab32s(&info->pi_pid);
 604        __swab32s(&info->pi_nnis);
 605        for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
 606                stat = &info->pi_ni[i];
 607                __swab64s(&stat->ns_nid);
 608                __swab32s(&stat->ns_status);
 609        }
 610        return;
 611}
 612
 613/**
 614 * parse router-checker pinginfo, record number of down NIs for remote
 615 * networks on that router.
 616 */
 617static void
 618lnet_parse_rc_info(lnet_rc_data_t *rcd)
 619{
 620        lnet_ping_info_t        *info = rcd->rcd_pinginfo;
 621        struct lnet_peer        *gw   = rcd->rcd_gateway;
 622        lnet_route_t            *rtr;
 623
 624        if (!gw->lp_alive)
 625                return;
 626
 627        if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
 628                lnet_swap_pinginfo(info);
 629
 630        /* NB always racing with network! */
 631        if (info->pi_magic != LNET_PROTO_PING_MAGIC) {
 632                CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
 633                       libcfs_nid2str(gw->lp_nid), info->pi_magic);
 634                gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
 635                return;
 636        }
 637
 638        gw->lp_ping_feats = info->pi_features;
 639        if ((gw->lp_ping_feats & LNET_PING_FEAT_MASK) == 0) {
 640                CDEBUG(D_NET, "%s: Unexpected features 0x%x\n",
 641                       libcfs_nid2str(gw->lp_nid), gw->lp_ping_feats);
 642                return; /* nothing I can understand */
 643        }
 644
 645        if ((gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS) == 0)
 646                return; /* can't carry NI status info */
 647
 648        list_for_each_entry(rtr, &gw->lp_routes, lr_gwlist) {
 649                int     ptl_status = LNET_NI_STATUS_INVALID;
 650                int     down = 0;
 651                int     up = 0;
 652                int     i;
 653
 654                for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
 655                        lnet_ni_status_t *stat = &info->pi_ni[i];
 656                        lnet_nid_t       nid = stat->ns_nid;
 657
 658                        if (nid == LNET_NID_ANY) {
 659                                CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
 660                                       libcfs_nid2str(gw->lp_nid));
 661                                gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
 662                                return;
 663                        }
 664
 665                        if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
 666                                continue;
 667
 668                        if (stat->ns_status == LNET_NI_STATUS_DOWN) {
 669                                if (LNET_NETTYP(LNET_NIDNET(nid)) != PTLLND)
 670                                        down++;
 671                                else if (ptl_status != LNET_NI_STATUS_UP)
 672                                        ptl_status = LNET_NI_STATUS_DOWN;
 673                                continue;
 674                        }
 675
 676                        if (stat->ns_status == LNET_NI_STATUS_UP) {
 677                                if (LNET_NIDNET(nid) == rtr->lr_net) {
 678                                        up = 1;
 679                                        break;
 680                                }
 681                                /* ptl NIs are considered down only when
 682                                 * they're all down */
 683                                if (LNET_NETTYP(LNET_NIDNET(nid)) == PTLLND)
 684                                        ptl_status = LNET_NI_STATUS_UP;
 685                                continue;
 686                        }
 687
 688                        CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
 689                               libcfs_nid2str(gw->lp_nid), stat->ns_status);
 690                        gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
 691                        return;
 692                }
 693
 694                if (up) { /* ignore downed NIs if NI for dest network is up */
 695                        rtr->lr_downis = 0;
 696                        continue;
 697                }
 698                rtr->lr_downis = down + (ptl_status == LNET_NI_STATUS_DOWN);
 699        }
 700}
 701
 702static void
 703lnet_router_checker_event(lnet_event_t *event)
 704{
 705        lnet_rc_data_t          *rcd = event->md.user_ptr;
 706        struct lnet_peer        *lp;
 707
 708        LASSERT(rcd != NULL);
 709
 710        if (event->unlinked) {
 711                LNetInvalidateHandle(&rcd->rcd_mdh);
 712                return;
 713        }
 714
 715        LASSERT(event->type == LNET_EVENT_SEND ||
 716                event->type == LNET_EVENT_REPLY);
 717
 718        lp = rcd->rcd_gateway;
 719        LASSERT(lp != NULL);
 720
 721         /* NB: it's called with holding lnet_res_lock, we have a few
 722          * places need to hold both locks at the same time, please take
 723          * care of lock ordering */
 724        lnet_net_lock(lp->lp_cpt);
 725        if (!lnet_isrouter(lp) || lp->lp_rcd != rcd) {
 726                /* ignore if no longer a router or rcd is replaced */
 727                goto out;
 728        }
 729
 730        if (event->type == LNET_EVENT_SEND) {
 731                lp->lp_ping_notsent = 0;
 732                if (event->status == 0)
 733                        goto out;
 734        }
 735
 736        /* LNET_EVENT_REPLY */
 737        /* A successful REPLY means the router is up.  If _any_ comms
 738         * to the router fail I assume it's down (this will happen if
 739         * we ping alive routers to try to detect router death before
 740         * apps get burned). */
 741
 742        lnet_notify_locked(lp, 1, (event->status == 0), cfs_time_current());
 743        /* The router checker will wake up very shortly and do the
 744         * actual notification.
 745         * XXX If 'lp' stops being a router before then, it will still
 746         * have the notification pending!!! */
 747
 748        if (avoid_asym_router_failure && event->status == 0)
 749                lnet_parse_rc_info(rcd);
 750
 751 out:
 752        lnet_net_unlock(lp->lp_cpt);
 753}
 754
 755void
 756lnet_wait_known_routerstate(void)
 757{
 758        lnet_peer_t      *rtr;
 759        struct list_head          *entry;
 760        int               all_known;
 761
 762        LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
 763
 764        for (;;) {
 765                int     cpt = lnet_net_lock_current();
 766
 767                all_known = 1;
 768                list_for_each (entry, &the_lnet.ln_routers) {
 769                        rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
 770
 771                        if (rtr->lp_alive_count == 0) {
 772                                all_known = 0;
 773                                break;
 774                        }
 775                }
 776
 777                lnet_net_unlock(cpt);
 778
 779                if (all_known)
 780                        return;
 781
 782                cfs_pause(cfs_time_seconds(1));
 783        }
 784}
 785
 786void
 787lnet_update_ni_status_locked(void)
 788{
 789        lnet_ni_t       *ni;
 790        long            now;
 791        int             timeout;
 792
 793        LASSERT(the_lnet.ln_routing);
 794
 795        timeout = router_ping_timeout +
 796                  MAX(live_router_check_interval, dead_router_check_interval);
 797
 798        now = cfs_time_current_sec();
 799        list_for_each_entry(ni, &the_lnet.ln_nis, ni_list) {
 800                if (ni->ni_lnd->lnd_type == LOLND)
 801                        continue;
 802
 803                if (now < ni->ni_last_alive + timeout)
 804                        continue;
 805
 806                lnet_ni_lock(ni);
 807                /* re-check with lock */
 808                if (now < ni->ni_last_alive + timeout) {
 809                        lnet_ni_unlock(ni);
 810                        continue;
 811                }
 812
 813                LASSERT(ni->ni_status != NULL);
 814
 815                if (ni->ni_status->ns_status != LNET_NI_STATUS_DOWN) {
 816                        CDEBUG(D_NET, "NI(%s:%d) status changed to down\n",
 817                               libcfs_nid2str(ni->ni_nid), timeout);
 818                        /* NB: so far, this is the only place to set
 819                         * NI status to "down" */
 820                        ni->ni_status->ns_status = LNET_NI_STATUS_DOWN;
 821                }
 822                lnet_ni_unlock(ni);
 823        }
 824}
 825
 826void
 827lnet_destroy_rc_data(lnet_rc_data_t *rcd)
 828{
 829        LASSERT(list_empty(&rcd->rcd_list));
 830        /* detached from network */
 831        LASSERT(LNetHandleIsInvalid(rcd->rcd_mdh));
 832
 833        if (rcd->rcd_gateway != NULL) {
 834                int cpt = rcd->rcd_gateway->lp_cpt;
 835
 836                lnet_net_lock(cpt);
 837                lnet_peer_decref_locked(rcd->rcd_gateway);
 838                lnet_net_unlock(cpt);
 839        }
 840
 841        if (rcd->rcd_pinginfo != NULL)
 842                LIBCFS_FREE(rcd->rcd_pinginfo, LNET_PINGINFO_SIZE);
 843
 844        LIBCFS_FREE(rcd, sizeof(*rcd));
 845}
 846
 847lnet_rc_data_t *
 848lnet_create_rc_data_locked(lnet_peer_t *gateway)
 849{
 850        lnet_rc_data_t          *rcd = NULL;
 851        lnet_ping_info_t        *pi;
 852        int                     rc;
 853        int                     i;
 854
 855        lnet_net_unlock(gateway->lp_cpt);
 856
 857        LIBCFS_ALLOC(rcd, sizeof(*rcd));
 858        if (rcd == NULL)
 859                goto out;
 860
 861        LNetInvalidateHandle(&rcd->rcd_mdh);
 862        INIT_LIST_HEAD(&rcd->rcd_list);
 863
 864        LIBCFS_ALLOC(pi, LNET_PINGINFO_SIZE);
 865        if (pi == NULL)
 866                goto out;
 867
 868        memset(pi, 0, LNET_PINGINFO_SIZE);
 869        for (i = 0; i < LNET_MAX_RTR_NIS; i++) {
 870                pi->pi_ni[i].ns_nid = LNET_NID_ANY;
 871                pi->pi_ni[i].ns_status = LNET_NI_STATUS_INVALID;
 872        }
 873        rcd->rcd_pinginfo = pi;
 874
 875        LASSERT (!LNetHandleIsInvalid(the_lnet.ln_rc_eqh));
 876        rc = LNetMDBind((lnet_md_t){.start     = pi,
 877                                    .user_ptr  = rcd,
 878                                    .length    = LNET_PINGINFO_SIZE,
 879                                    .threshold = LNET_MD_THRESH_INF,
 880                                    .options   = LNET_MD_TRUNCATE,
 881                                    .eq_handle = the_lnet.ln_rc_eqh},
 882                        LNET_UNLINK,
 883                        &rcd->rcd_mdh);
 884        if (rc < 0) {
 885                CERROR("Can't bind MD: %d\n", rc);
 886                goto out;
 887        }
 888        LASSERT(rc == 0);
 889
 890        lnet_net_lock(gateway->lp_cpt);
 891        /* router table changed or someone has created rcd for this gateway */
 892        if (!lnet_isrouter(gateway) || gateway->lp_rcd != NULL) {
 893                lnet_net_unlock(gateway->lp_cpt);
 894                goto out;
 895        }
 896
 897        lnet_peer_addref_locked(gateway);
 898        rcd->rcd_gateway = gateway;
 899        gateway->lp_rcd = rcd;
 900        gateway->lp_ping_notsent = 0;
 901
 902        return rcd;
 903
 904 out:
 905        if (rcd != NULL) {
 906                if (!LNetHandleIsInvalid(rcd->rcd_mdh)) {
 907                        rc = LNetMDUnlink(rcd->rcd_mdh);
 908                        LASSERT(rc == 0);
 909                }
 910                lnet_destroy_rc_data(rcd);
 911        }
 912
 913        lnet_net_lock(gateway->lp_cpt);
 914        return gateway->lp_rcd;
 915}
 916
 917static int
 918lnet_router_check_interval (lnet_peer_t *rtr)
 919{
 920        int secs;
 921
 922        secs = rtr->lp_alive ? live_router_check_interval :
 923                               dead_router_check_interval;
 924        if (secs < 0)
 925                secs = 0;
 926
 927        return secs;
 928}
 929
 930static void
 931lnet_ping_router_locked (lnet_peer_t *rtr)
 932{
 933        lnet_rc_data_t *rcd = NULL;
 934        cfs_time_t      now = cfs_time_current();
 935        int          secs;
 936
 937        lnet_peer_addref_locked(rtr);
 938
 939        if (rtr->lp_ping_deadline != 0 && /* ping timed out? */
 940            cfs_time_after(now, rtr->lp_ping_deadline))
 941                lnet_notify_locked(rtr, 1, 0, now);
 942
 943        /* Run any outstanding notifications */
 944        lnet_ni_notify_locked(rtr->lp_ni, rtr);
 945
 946        if (!lnet_isrouter(rtr) ||
 947            the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
 948                /* router table changed or router checker is shutting down */
 949                lnet_peer_decref_locked(rtr);
 950                return;
 951        }
 952
 953        rcd = rtr->lp_rcd != NULL ?
 954              rtr->lp_rcd : lnet_create_rc_data_locked(rtr);
 955
 956        if (rcd == NULL)
 957                return;
 958
 959        secs = lnet_router_check_interval(rtr);
 960
 961        CDEBUG(D_NET,
 962               "rtr %s %d: deadline %lu ping_notsent %d alive %d "
 963               "alive_count %d lp_ping_timestamp %lu\n",
 964               libcfs_nid2str(rtr->lp_nid), secs,
 965               rtr->lp_ping_deadline, rtr->lp_ping_notsent,
 966               rtr->lp_alive, rtr->lp_alive_count, rtr->lp_ping_timestamp);
 967
 968        if (secs != 0 && !rtr->lp_ping_notsent &&
 969            cfs_time_after(now, cfs_time_add(rtr->lp_ping_timestamp,
 970                                             cfs_time_seconds(secs)))) {
 971                int            rc;
 972                lnet_process_id_t id;
 973                lnet_handle_md_t  mdh;
 974
 975                id.nid = rtr->lp_nid;
 976                id.pid = LUSTRE_SRV_LNET_PID;
 977                CDEBUG(D_NET, "Check: %s\n", libcfs_id2str(id));
 978
 979                rtr->lp_ping_notsent   = 1;
 980                rtr->lp_ping_timestamp = now;
 981
 982                mdh = rcd->rcd_mdh;
 983
 984                if (rtr->lp_ping_deadline == 0) {
 985                        rtr->lp_ping_deadline =
 986                                cfs_time_shift(router_ping_timeout);
 987                }
 988
 989                lnet_net_unlock(rtr->lp_cpt);
 990
 991                rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
 992                             LNET_PROTO_PING_MATCHBITS, 0);
 993
 994                lnet_net_lock(rtr->lp_cpt);
 995                if (rc != 0)
 996                        rtr->lp_ping_notsent = 0; /* no event pending */
 997        }
 998
 999        lnet_peer_decref_locked(rtr);
1000        return;
1001}
1002
1003int
1004lnet_router_checker_start(void)
1005{
1006        int       rc;
1007        int       eqsz;
1008
1009        LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1010
1011        if (check_routers_before_use &&
1012            dead_router_check_interval <= 0) {
1013                LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be"
1014                                   " set if 'check_routers_before_use' is set"
1015                                   "\n");
1016                return -EINVAL;
1017        }
1018
1019        if (!the_lnet.ln_routing &&
1020            live_router_check_interval <= 0 &&
1021            dead_router_check_interval <= 0)
1022                return 0;
1023
1024        sema_init(&the_lnet.ln_rc_signal, 0);
1025        /* EQ size doesn't matter; the callback is guaranteed to get every
1026         * event */
1027        eqsz = 0;
1028        rc = LNetEQAlloc(eqsz, lnet_router_checker_event,
1029                         &the_lnet.ln_rc_eqh);
1030        if (rc != 0) {
1031                CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
1032                return -ENOMEM;
1033        }
1034
1035        the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
1036        rc = PTR_ERR(kthread_run(lnet_router_checker,
1037                                 NULL, "router_checker"));
1038        if (IS_ERR_VALUE(rc)) {
1039                CERROR("Can't start router checker thread: %d\n", rc);
1040                /* block until event callback signals exit */
1041                down(&the_lnet.ln_rc_signal);
1042                rc = LNetEQFree(the_lnet.ln_rc_eqh);
1043                LASSERT(rc == 0);
1044                the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1045                return -ENOMEM;
1046        }
1047
1048        if (check_routers_before_use) {
1049                /* Note that a helpful side-effect of pinging all known routers
1050                 * at startup is that it makes them drop stale connections they
1051                 * may have to a previous instance of me. */
1052                lnet_wait_known_routerstate();
1053        }
1054
1055        return 0;
1056}
1057
1058void
1059lnet_router_checker_stop (void)
1060{
1061        int rc;
1062
1063        if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
1064                return;
1065
1066        LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1067        the_lnet.ln_rc_state = LNET_RC_STATE_STOPPING;
1068
1069        /* block until event callback signals exit */
1070        down(&the_lnet.ln_rc_signal);
1071        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1072
1073        rc = LNetEQFree(the_lnet.ln_rc_eqh);
1074        LASSERT (rc == 0);
1075        return;
1076}
1077
1078static void
1079lnet_prune_rc_data(int wait_unlink)
1080{
1081        lnet_rc_data_t          *rcd;
1082        lnet_rc_data_t          *tmp;
1083        lnet_peer_t             *lp;
1084        struct list_head                head;
1085        int                     i = 2;
1086
1087        if (likely(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
1088                   list_empty(&the_lnet.ln_rcd_deathrow) &&
1089                   list_empty(&the_lnet.ln_rcd_zombie)))
1090                return;
1091
1092        INIT_LIST_HEAD(&head);
1093
1094        lnet_net_lock(LNET_LOCK_EX);
1095
1096        if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
1097                /* router checker is stopping, prune all */
1098                list_for_each_entry(lp, &the_lnet.ln_routers,
1099                                        lp_rtr_list) {
1100                        if (lp->lp_rcd == NULL)
1101                                continue;
1102
1103                        LASSERT(list_empty(&lp->lp_rcd->rcd_list));
1104                        list_add(&lp->lp_rcd->rcd_list,
1105                                     &the_lnet.ln_rcd_deathrow);
1106                        lp->lp_rcd = NULL;
1107                }
1108        }
1109
1110        /* unlink all RCDs on deathrow list */
1111        list_splice_init(&the_lnet.ln_rcd_deathrow, &head);
1112
1113        if (!list_empty(&head)) {
1114                lnet_net_unlock(LNET_LOCK_EX);
1115
1116                list_for_each_entry(rcd, &head, rcd_list)
1117                        LNetMDUnlink(rcd->rcd_mdh);
1118
1119                lnet_net_lock(LNET_LOCK_EX);
1120        }
1121
1122        list_splice_init(&head, &the_lnet.ln_rcd_zombie);
1123
1124        /* release all zombie RCDs */
1125        while (!list_empty(&the_lnet.ln_rcd_zombie)) {
1126                list_for_each_entry_safe(rcd, tmp, &the_lnet.ln_rcd_zombie,
1127                                             rcd_list) {
1128                        if (LNetHandleIsInvalid(rcd->rcd_mdh))
1129                                list_move(&rcd->rcd_list, &head);
1130                }
1131
1132                wait_unlink = wait_unlink &&
1133                              !list_empty(&the_lnet.ln_rcd_zombie);
1134
1135                lnet_net_unlock(LNET_LOCK_EX);
1136
1137                while (!list_empty(&head)) {
1138                        rcd = list_entry(head.next,
1139                                             lnet_rc_data_t, rcd_list);
1140                        list_del_init(&rcd->rcd_list);
1141                        lnet_destroy_rc_data(rcd);
1142                }
1143
1144                if (!wait_unlink)
1145                        return;
1146
1147                i++;
1148                CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
1149                       "Waiting for rc buffers to unlink\n");
1150                cfs_pause(cfs_time_seconds(1) / 4);
1151
1152                lnet_net_lock(LNET_LOCK_EX);
1153        }
1154
1155        lnet_net_unlock(LNET_LOCK_EX);
1156}
1157
1158
1159#if  defined(LNET_ROUTER)
1160
1161static int
1162lnet_router_checker(void *arg)
1163{
1164        lnet_peer_t       *rtr;
1165        struct list_head        *entry;
1166
1167        cfs_block_allsigs();
1168
1169        LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1170
1171        while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
1172                __u64   version;
1173                int     cpt;
1174                int     cpt2;
1175
1176                cpt = lnet_net_lock_current();
1177rescan:
1178                version = the_lnet.ln_routers_version;
1179
1180                list_for_each(entry, &the_lnet.ln_routers) {
1181                        rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
1182
1183                        cpt2 = lnet_cpt_of_nid_locked(rtr->lp_nid);
1184                        if (cpt != cpt2) {
1185                                lnet_net_unlock(cpt);
1186                                cpt = cpt2;
1187                                lnet_net_lock(cpt);
1188                                /* the routers list has changed */
1189                                if (version != the_lnet.ln_routers_version)
1190                                        goto rescan;
1191                        }
1192
1193                        lnet_ping_router_locked(rtr);
1194
1195                        /* NB dropped lock */
1196                        if (version != the_lnet.ln_routers_version) {
1197                                /* the routers list has changed */
1198                                goto rescan;
1199                        }
1200                }
1201
1202                if (the_lnet.ln_routing)
1203                        lnet_update_ni_status_locked();
1204
1205                lnet_net_unlock(cpt);
1206
1207                lnet_prune_rc_data(0); /* don't wait for UNLINK */
1208
1209                /* Call cfs_pause() here always adds 1 to load average
1210                 * because kernel counts # active tasks as nr_running
1211                 * + nr_uninterruptible. */
1212                schedule_timeout_and_set_state(TASK_INTERRUPTIBLE,
1213                                                   cfs_time_seconds(1));
1214        }
1215
1216        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING);
1217
1218        lnet_prune_rc_data(1); /* wait for UNLINK */
1219
1220        the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1221        up(&the_lnet.ln_rc_signal);
1222        /* The unlink event callback will signal final completion */
1223        return 0;
1224}
1225
1226void
1227lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
1228{
1229        int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1230
1231        while (--npages >= 0)
1232                __free_page(rb->rb_kiov[npages].kiov_page);
1233
1234        LIBCFS_FREE(rb, sz);
1235}
1236
1237lnet_rtrbuf_t *
1238lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp, int cpt)
1239{
1240        int         npages = rbp->rbp_npages;
1241        int         sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1242        struct page   *page;
1243        lnet_rtrbuf_t *rb;
1244        int         i;
1245
1246        LIBCFS_CPT_ALLOC(rb, lnet_cpt_table(), cpt, sz);
1247        if (rb == NULL)
1248                return NULL;
1249
1250        rb->rb_pool = rbp;
1251
1252        for (i = 0; i < npages; i++) {
1253                page = alloc_pages_node(
1254                                cfs_cpt_spread_node(lnet_cpt_table(), cpt),
1255                                __GFP_ZERO | GFP_IOFS, 0);
1256                if (page == NULL) {
1257                        while (--i >= 0)
1258                                __free_page(rb->rb_kiov[i].kiov_page);
1259
1260                        LIBCFS_FREE(rb, sz);
1261                        return NULL;
1262                }
1263
1264                rb->rb_kiov[i].kiov_len = PAGE_CACHE_SIZE;
1265                rb->rb_kiov[i].kiov_offset = 0;
1266                rb->rb_kiov[i].kiov_page = page;
1267        }
1268
1269        return rb;
1270}
1271
1272void
1273lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
1274{
1275        int             npages = rbp->rbp_npages;
1276        int             nbuffers = 0;
1277        lnet_rtrbuf_t   *rb;
1278
1279        if (rbp->rbp_nbuffers == 0) /* not initialized or already freed */
1280                return;
1281
1282        LASSERT (list_empty(&rbp->rbp_msgs));
1283        LASSERT (rbp->rbp_credits == rbp->rbp_nbuffers);
1284
1285        while (!list_empty(&rbp->rbp_bufs)) {
1286                LASSERT (rbp->rbp_credits > 0);
1287
1288                rb = list_entry(rbp->rbp_bufs.next,
1289                                    lnet_rtrbuf_t, rb_list);
1290                list_del(&rb->rb_list);
1291                lnet_destroy_rtrbuf(rb, npages);
1292                nbuffers++;
1293        }
1294
1295        LASSERT (rbp->rbp_nbuffers == nbuffers);
1296        LASSERT (rbp->rbp_credits == nbuffers);
1297
1298        rbp->rbp_nbuffers = rbp->rbp_credits = 0;
1299}
1300
1301int
1302lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
1303{
1304        lnet_rtrbuf_t *rb;
1305        int         i;
1306
1307        if (rbp->rbp_nbuffers != 0) {
1308                LASSERT (rbp->rbp_nbuffers == nbufs);
1309                return 0;
1310        }
1311
1312        for (i = 0; i < nbufs; i++) {
1313                rb = lnet_new_rtrbuf(rbp, cpt);
1314
1315                if (rb == NULL) {
1316                        CERROR("Failed to allocate %d router bufs of %d pages\n",
1317                               nbufs, rbp->rbp_npages);
1318                        return -ENOMEM;
1319                }
1320
1321                rbp->rbp_nbuffers++;
1322                rbp->rbp_credits++;
1323                rbp->rbp_mincredits++;
1324                list_add(&rb->rb_list, &rbp->rbp_bufs);
1325
1326                /* No allocation "under fire" */
1327                /* Otherwise we'd need code to schedule blocked msgs etc */
1328                LASSERT (!the_lnet.ln_routing);
1329        }
1330
1331        LASSERT (rbp->rbp_credits == nbufs);
1332        return 0;
1333}
1334
1335void
1336lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
1337{
1338        INIT_LIST_HEAD(&rbp->rbp_msgs);
1339        INIT_LIST_HEAD(&rbp->rbp_bufs);
1340
1341        rbp->rbp_npages = npages;
1342        rbp->rbp_credits = 0;
1343        rbp->rbp_mincredits = 0;
1344}
1345
1346void
1347lnet_rtrpools_free(void)
1348{
1349        lnet_rtrbufpool_t *rtrp;
1350        int               i;
1351
1352        if (the_lnet.ln_rtrpools == NULL) /* uninitialized or freed */
1353                return;
1354
1355        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1356                lnet_rtrpool_free_bufs(&rtrp[0]);
1357                lnet_rtrpool_free_bufs(&rtrp[1]);
1358                lnet_rtrpool_free_bufs(&rtrp[2]);
1359        }
1360
1361        cfs_percpt_free(the_lnet.ln_rtrpools);
1362        the_lnet.ln_rtrpools = NULL;
1363}
1364
1365static int
1366lnet_nrb_tiny_calculate(int npages)
1367{
1368        int     nrbs = LNET_NRB_TINY;
1369
1370        if (tiny_router_buffers < 0) {
1371                LCONSOLE_ERROR_MSG(0x10c,
1372                                   "tiny_router_buffers=%d invalid when "
1373                                   "routing enabled\n", tiny_router_buffers);
1374                return -1;
1375        }
1376
1377        if (tiny_router_buffers > 0)
1378                nrbs = tiny_router_buffers;
1379
1380        nrbs /= LNET_CPT_NUMBER;
1381        return max(nrbs, LNET_NRB_TINY_MIN);
1382}
1383
1384static int
1385lnet_nrb_small_calculate(int npages)
1386{
1387        int     nrbs = LNET_NRB_SMALL;
1388
1389        if (small_router_buffers < 0) {
1390                LCONSOLE_ERROR_MSG(0x10c,
1391                                   "small_router_buffers=%d invalid when "
1392                                   "routing enabled\n", small_router_buffers);
1393                return -1;
1394        }
1395
1396        if (small_router_buffers > 0)
1397                nrbs = small_router_buffers;
1398
1399        nrbs /= LNET_CPT_NUMBER;
1400        return max(nrbs, LNET_NRB_SMALL_MIN);
1401}
1402
1403static int
1404lnet_nrb_large_calculate(int npages)
1405{
1406        int     nrbs = LNET_NRB_LARGE;
1407
1408        if (large_router_buffers < 0) {
1409                LCONSOLE_ERROR_MSG(0x10c,
1410                                   "large_router_buffers=%d invalid when "
1411                                   "routing enabled\n", large_router_buffers);
1412                return -1;
1413        }
1414
1415        if (large_router_buffers > 0)
1416                nrbs = large_router_buffers;
1417
1418        nrbs /= LNET_CPT_NUMBER;
1419        return max(nrbs, LNET_NRB_LARGE_MIN);
1420}
1421
1422int
1423lnet_rtrpools_alloc(int im_a_router)
1424{
1425        lnet_rtrbufpool_t *rtrp;
1426        int     large_pages = (LNET_MTU + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
1427        int     small_pages = 1;
1428        int     nrb_tiny;
1429        int     nrb_small;
1430        int     nrb_large;
1431        int     rc;
1432        int     i;
1433
1434        if (!strcmp(forwarding, "")) {
1435                /* not set either way */
1436                if (!im_a_router)
1437                        return 0;
1438        } else if (!strcmp(forwarding, "disabled")) {
1439                /* explicitly disabled */
1440                return 0;
1441        } else if (!strcmp(forwarding, "enabled")) {
1442                /* explicitly enabled */
1443        } else {
1444                LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either "
1445                                   "'enabled' or 'disabled'\n");
1446                return -EINVAL;
1447        }
1448
1449        nrb_tiny = lnet_nrb_tiny_calculate(0);
1450        if (nrb_tiny < 0)
1451                return -EINVAL;
1452
1453        nrb_small = lnet_nrb_small_calculate(small_pages);
1454        if (nrb_small < 0)
1455                return -EINVAL;
1456
1457        nrb_large = lnet_nrb_large_calculate(large_pages);
1458        if (nrb_large < 0)
1459                return -EINVAL;
1460
1461        the_lnet.ln_rtrpools = cfs_percpt_alloc(lnet_cpt_table(),
1462                                                LNET_NRBPOOLS *
1463                                                sizeof(lnet_rtrbufpool_t));
1464        if (the_lnet.ln_rtrpools == NULL) {
1465                LCONSOLE_ERROR_MSG(0x10c,
1466                                   "Failed to initialize router buffe pool\n");
1467                return -ENOMEM;
1468        }
1469
1470        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1471                lnet_rtrpool_init(&rtrp[0], 0);
1472                rc = lnet_rtrpool_alloc_bufs(&rtrp[0], nrb_tiny, i);
1473                if (rc != 0)
1474                        goto failed;
1475
1476                lnet_rtrpool_init(&rtrp[1], small_pages);
1477                rc = lnet_rtrpool_alloc_bufs(&rtrp[1], nrb_small, i);
1478                if (rc != 0)
1479                        goto failed;
1480
1481                lnet_rtrpool_init(&rtrp[2], large_pages);
1482                rc = lnet_rtrpool_alloc_bufs(&rtrp[2], nrb_large, i);
1483                if (rc != 0)
1484                        goto failed;
1485        }
1486
1487        lnet_net_lock(LNET_LOCK_EX);
1488        the_lnet.ln_routing = 1;
1489        lnet_net_unlock(LNET_LOCK_EX);
1490
1491        return 0;
1492
1493 failed:
1494        lnet_rtrpools_free();
1495        return rc;
1496}
1497
1498int
1499lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
1500{
1501        struct lnet_peer        *lp = NULL;
1502        cfs_time_t              now = cfs_time_current();
1503        int                     cpt = lnet_cpt_of_nid(nid);
1504
1505        LASSERT (!in_interrupt ());
1506
1507        CDEBUG (D_NET, "%s notifying %s: %s\n",
1508                (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1509                libcfs_nid2str(nid),
1510                alive ? "up" : "down");
1511
1512        if (ni != NULL &&
1513            LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
1514                CWARN ("Ignoring notification of %s %s by %s (different net)\n",
1515                        libcfs_nid2str(nid), alive ? "birth" : "death",
1516                        libcfs_nid2str(ni->ni_nid));
1517                return -EINVAL;
1518        }
1519
1520        /* can't do predictions... */
1521        if (cfs_time_after(when, now)) {
1522                CWARN ("Ignoring prediction from %s of %s %s "
1523                       "%ld seconds in the future\n",
1524                       (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1525                       libcfs_nid2str(nid), alive ? "up" : "down",
1526                       cfs_duration_sec(cfs_time_sub(when, now)));
1527                return -EINVAL;
1528        }
1529
1530        if (ni != NULL && !alive &&          /* LND telling me she's down */
1531            !auto_down) {                      /* auto-down disabled */
1532                CDEBUG(D_NET, "Auto-down disabled\n");
1533                return 0;
1534        }
1535
1536        lnet_net_lock(cpt);
1537
1538        if (the_lnet.ln_shutdown) {
1539                lnet_net_unlock(cpt);
1540                return -ESHUTDOWN;
1541        }
1542
1543        lp = lnet_find_peer_locked(the_lnet.ln_peer_tables[cpt], nid);
1544        if (lp == NULL) {
1545                /* nid not found */
1546                lnet_net_unlock(cpt);
1547                CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
1548                return 0;
1549        }
1550
1551        /* We can't fully trust LND on reporting exact peer last_alive
1552         * if he notifies us about dead peer. For example ksocklnd can
1553         * call us with when == _time_when_the_node_was_booted_ if
1554         * no connections were successfully established */
1555        if (ni != NULL && !alive && when < lp->lp_last_alive)
1556                when = lp->lp_last_alive;
1557
1558        lnet_notify_locked(lp, ni == NULL, alive, when);
1559
1560        lnet_ni_notify_locked(ni, lp);
1561
1562        lnet_peer_decref_locked(lp);
1563
1564        lnet_net_unlock(cpt);
1565        return 0;
1566}
1567EXPORT_SYMBOL(lnet_notify);
1568
1569void
1570lnet_get_tunables (void)
1571{
1572        return;
1573}
1574
1575#else
1576
1577int
1578lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
1579{
1580        return -EOPNOTSUPP;
1581}
1582
1583void
1584lnet_router_checker (void)
1585{
1586        static time_t last = 0;
1587        static int    running = 0;
1588
1589        time_t      now = cfs_time_current_sec();
1590        int            interval = now - last;
1591        int            rc;
1592        __u64        version;
1593        lnet_peer_t      *rtr;
1594
1595        /* It's no use to call me again within a sec - all intervals and
1596         * timeouts are measured in seconds */
1597        if (last != 0 && interval < 2)
1598                return;
1599
1600        if (last != 0 &&
1601            interval > MAX(live_router_check_interval,
1602                           dead_router_check_interval))
1603                CNETERR("Checker(%d/%d) not called for %d seconds\n",
1604                        live_router_check_interval, dead_router_check_interval,
1605                        interval);
1606
1607        LASSERT(LNET_CPT_NUMBER == 1);
1608
1609        lnet_net_lock(0);
1610        LASSERT(!running); /* recursion check */
1611        running = 1;
1612        lnet_net_unlock(0);
1613
1614        last = now;
1615
1616        if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING)
1617                lnet_prune_rc_data(0); /* unlink all rcd and nowait */
1618
1619        /* consume all pending events */
1620        while (1) {
1621                int       i;
1622                lnet_event_t ev;
1623
1624                /* NB ln_rc_eqh must be the 1st in 'eventqs' otherwise the
1625                 * recursion breaker in LNetEQPoll would fail */
1626                rc = LNetEQPoll(&the_lnet.ln_rc_eqh, 1, 0, &ev, &i);
1627                if (rc == 0)   /* no event pending */
1628                        break;
1629
1630                /* NB a lost SENT prevents me from pinging a router again */
1631                if (rc == -EOVERFLOW) {
1632                        CERROR("Dropped an event!!!\n");
1633                        abort();
1634                }
1635
1636                LASSERT (rc == 1);
1637
1638                lnet_router_checker_event(&ev);
1639        }
1640
1641        if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING) {
1642                lnet_prune_rc_data(1); /* release rcd */
1643                the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1644                running = 0;
1645                return;
1646        }
1647
1648        LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1649
1650        lnet_net_lock(0);
1651
1652        version = the_lnet.ln_routers_version;
1653        list_for_each_entry (rtr, &the_lnet.ln_routers, lp_rtr_list) {
1654                lnet_ping_router_locked(rtr);
1655                LASSERT (version == the_lnet.ln_routers_version);
1656        }
1657
1658        lnet_net_unlock(0);
1659
1660        running = 0; /* lock only needed for the recursion check */
1661        return;
1662}
1663
1664/* NB lnet_peers_start_down depends on me,
1665 * so must be called before any peer creation */
1666void
1667lnet_get_tunables (void)
1668{
1669        char *s;
1670
1671        s = getenv("LNET_ROUTER_PING_TIMEOUT");
1672        if (s != NULL) router_ping_timeout = atoi(s);
1673
1674        s = getenv("LNET_LIVE_ROUTER_CHECK_INTERVAL");
1675        if (s != NULL) live_router_check_interval = atoi(s);
1676
1677        s = getenv("LNET_DEAD_ROUTER_CHECK_INTERVAL");
1678        if (s != NULL) dead_router_check_interval = atoi(s);
1679
1680        /* This replaces old lnd_notify mechanism */
1681        check_routers_before_use = 1;
1682        if (dead_router_check_interval <= 0)
1683                dead_router_check_interval = 30;
1684}
1685
1686void
1687lnet_rtrpools_free(void)
1688{
1689}
1690
1691int
1692lnet_rtrpools_alloc(int im_a_arouter)
1693{
1694        return 0;
1695}
1696
1697#endif
1698