linux/drivers/staging/lustre/lnet/lnet/router.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
   3 *
   4 * Copyright (c) 2011, 2012, Intel Corporation.
   5 *
   6 *   This file is part of Portals
   7 *   http://sourceforge.net/projects/sandiaportals/
   8 *
   9 *   Portals is free software; you can redistribute it and/or
  10 *   modify it under the terms of version 2 of the GNU General Public
  11 *   License as published by the Free Software Foundation.
  12 *
  13 *   Portals is distributed in the hope that it will be useful,
  14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16 *   GNU General Public License for more details.
  17 *
  18 *   You should have received a copy of the GNU General Public License
  19 *   along with Portals; if not, write to the Free Software
  20 *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  21 *
  22 */
  23
  24#define DEBUG_SUBSYSTEM S_LNET
  25#include "../../include/linux/lnet/lib-lnet.h"
  26
  27#define LNET_NRB_TINY_MIN       512     /* min value for each CPT */
  28#define LNET_NRB_TINY           (LNET_NRB_TINY_MIN * 4)
  29#define LNET_NRB_SMALL_MIN      4096    /* min value for each CPT */
  30#define LNET_NRB_SMALL          (LNET_NRB_SMALL_MIN * 4)
  31#define LNET_NRB_LARGE_MIN      256     /* min value for each CPT */
  32#define LNET_NRB_LARGE          (LNET_NRB_LARGE_MIN * 4)
  33
  34static char *forwarding = "";
  35module_param(forwarding, charp, 0444);
  36MODULE_PARM_DESC(forwarding, "Explicitly enable/disable forwarding between networks");
  37
  38static int tiny_router_buffers;
  39module_param(tiny_router_buffers, int, 0444);
  40MODULE_PARM_DESC(tiny_router_buffers, "# of 0 payload messages to buffer in the router");
  41static int small_router_buffers;
  42module_param(small_router_buffers, int, 0444);
  43MODULE_PARM_DESC(small_router_buffers, "# of small (1 page) messages to buffer in the router");
  44static int large_router_buffers;
  45module_param(large_router_buffers, int, 0444);
  46MODULE_PARM_DESC(large_router_buffers, "# of large messages to buffer in the router");
  47static int peer_buffer_credits;
  48module_param(peer_buffer_credits, int, 0444);
  49MODULE_PARM_DESC(peer_buffer_credits, "# router buffer credits per peer");
  50
  51static int auto_down = 1;
  52module_param(auto_down, int, 0444);
  53MODULE_PARM_DESC(auto_down, "Automatically mark peers down on comms error");
  54
  55int
  56lnet_peer_buffer_credits(lnet_ni_t *ni)
  57{
  58        /* NI option overrides LNet default */
  59        if (ni->ni_peerrtrcredits > 0)
  60                return ni->ni_peerrtrcredits;
  61        if (peer_buffer_credits > 0)
  62                return peer_buffer_credits;
  63
  64        /* As an approximation, allow this peer the same number of router
  65         * buffers as it is allowed outstanding sends */
  66        return ni->ni_peertxcredits;
  67}
  68
  69/* forward ref's */
  70static int lnet_router_checker(void *);
  71
  72static int check_routers_before_use;
  73module_param(check_routers_before_use, int, 0444);
  74MODULE_PARM_DESC(check_routers_before_use, "Assume routers are down and ping them before use");
  75
  76int avoid_asym_router_failure = 1;
  77module_param(avoid_asym_router_failure, int, 0644);
  78MODULE_PARM_DESC(avoid_asym_router_failure, "Avoid asymmetrical router failures (0 to disable)");
  79
  80static int dead_router_check_interval = 60;
  81module_param(dead_router_check_interval, int, 0644);
  82MODULE_PARM_DESC(dead_router_check_interval, "Seconds between dead router health checks (<= 0 to disable)");
  83
  84static int live_router_check_interval = 60;
  85module_param(live_router_check_interval, int, 0644);
  86MODULE_PARM_DESC(live_router_check_interval, "Seconds between live router health checks (<= 0 to disable)");
  87
  88static int router_ping_timeout = 50;
  89module_param(router_ping_timeout, int, 0644);
  90MODULE_PARM_DESC(router_ping_timeout, "Seconds to wait for the reply to a router health query");
  91
  92int
  93lnet_peers_start_down(void)
  94{
  95        return check_routers_before_use;
  96}
  97
  98void
  99lnet_notify_locked(lnet_peer_t *lp, int notifylnd, int alive,
 100                   unsigned long when)
 101{
 102        if (time_before(when, lp->lp_timestamp)) { /* out of date information */
 103                CDEBUG(D_NET, "Out of date\n");
 104                return;
 105        }
 106
 107        lp->lp_timestamp = when;                /* update timestamp */
 108        lp->lp_ping_deadline = 0;              /* disable ping timeout */
 109
 110        if (lp->lp_alive_count != 0 &&    /* got old news */
 111            (!lp->lp_alive) == (!alive)) {      /* new date for old news */
 112                CDEBUG(D_NET, "Old news\n");
 113                return;
 114        }
 115
 116        /* Flag that notification is outstanding */
 117
 118        lp->lp_alive_count++;
 119        lp->lp_alive = !(!alive);              /* 1 bit! */
 120        lp->lp_notify = 1;
 121        lp->lp_notifylnd |= notifylnd;
 122        if (lp->lp_alive)
 123                lp->lp_ping_feats = LNET_PING_FEAT_INVAL; /* reset */
 124
 125        CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
 126}
 127
 128static void
 129lnet_ni_notify_locked(lnet_ni_t *ni, lnet_peer_t *lp)
 130{
 131        int alive;
 132        int notifylnd;
 133
 134        /* Notify only in 1 thread at any time to ensure ordered notification.
 135         * NB individual events can be missed; the only guarantee is that you
 136         * always get the most recent news */
 137
 138        if (lp->lp_notifying || ni == NULL)
 139                return;
 140
 141        lp->lp_notifying = 1;
 142
 143        while (lp->lp_notify) {
 144                alive = lp->lp_alive;
 145                notifylnd = lp->lp_notifylnd;
 146
 147                lp->lp_notifylnd = 0;
 148                lp->lp_notify    = 0;
 149
 150                if (notifylnd && ni->ni_lnd->lnd_notify != NULL) {
 151                        lnet_net_unlock(lp->lp_cpt);
 152
 153                        /* A new notification could happen now; I'll handle it
 154                         * when control returns to me */
 155
 156                        (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
 157
 158                        lnet_net_lock(lp->lp_cpt);
 159                }
 160        }
 161
 162        lp->lp_notifying = 0;
 163}
 164
 165static void
 166lnet_rtr_addref_locked(lnet_peer_t *lp)
 167{
 168        LASSERT(lp->lp_refcount > 0);
 169        LASSERT(lp->lp_rtr_refcount >= 0);
 170
 171        /* lnet_net_lock must be exclusively locked */
 172        lp->lp_rtr_refcount++;
 173        if (lp->lp_rtr_refcount == 1) {
 174                struct list_head *pos;
 175
 176                /* a simple insertion sort */
 177                list_for_each_prev(pos, &the_lnet.ln_routers) {
 178                        lnet_peer_t *rtr = list_entry(pos, lnet_peer_t,
 179                                                          lp_rtr_list);
 180
 181                        if (rtr->lp_nid < lp->lp_nid)
 182                                break;
 183                }
 184
 185                list_add(&lp->lp_rtr_list, pos);
 186                /* addref for the_lnet.ln_routers */
 187                lnet_peer_addref_locked(lp);
 188                the_lnet.ln_routers_version++;
 189        }
 190}
 191
 192static void
 193lnet_rtr_decref_locked(lnet_peer_t *lp)
 194{
 195        LASSERT(lp->lp_refcount > 0);
 196        LASSERT(lp->lp_rtr_refcount > 0);
 197
 198        /* lnet_net_lock must be exclusively locked */
 199        lp->lp_rtr_refcount--;
 200        if (lp->lp_rtr_refcount == 0) {
 201                LASSERT(list_empty(&lp->lp_routes));
 202
 203                if (lp->lp_rcd != NULL) {
 204                        list_add(&lp->lp_rcd->rcd_list,
 205                                     &the_lnet.ln_rcd_deathrow);
 206                        lp->lp_rcd = NULL;
 207                }
 208
 209                list_del(&lp->lp_rtr_list);
 210                /* decref for the_lnet.ln_routers */
 211                lnet_peer_decref_locked(lp);
 212                the_lnet.ln_routers_version++;
 213        }
 214}
 215
 216lnet_remotenet_t *
 217lnet_find_net_locked(__u32 net)
 218{
 219        lnet_remotenet_t *rnet;
 220        struct list_head *tmp;
 221        struct list_head *rn_list;
 222
 223        LASSERT(!the_lnet.ln_shutdown);
 224
 225        rn_list = lnet_net2rnethash(net);
 226        list_for_each(tmp, rn_list) {
 227                rnet = list_entry(tmp, lnet_remotenet_t, lrn_list);
 228
 229                if (rnet->lrn_net == net)
 230                        return rnet;
 231        }
 232        return NULL;
 233}
 234
 235static void lnet_shuffle_seed(void)
 236{
 237        static int seeded;
 238        __u32 lnd_type, seed[2];
 239        struct timespec64 ts;
 240        lnet_ni_t *ni;
 241        struct list_head *tmp;
 242
 243        if (seeded)
 244                return;
 245
 246        cfs_get_random_bytes(seed, sizeof(seed));
 247
 248        /* Nodes with small feet have little entropy
 249         * the NID for this node gives the most entropy in the low bits */
 250        list_for_each(tmp, &the_lnet.ln_nis) {
 251                ni = list_entry(tmp, lnet_ni_t, ni_list);
 252                lnd_type = LNET_NETTYP(LNET_NIDNET(ni->ni_nid));
 253
 254                if (lnd_type != LOLND)
 255                        seed[0] ^= (LNET_NIDADDR(ni->ni_nid) | lnd_type);
 256        }
 257
 258        ktime_get_ts64(&ts);
 259        cfs_srand(ts.tv_sec ^ seed[0], ts.tv_nsec ^ seed[1]);
 260        seeded = 1;
 261}
 262
 263/* NB expects LNET_LOCK held */
 264static void
 265lnet_add_route_to_rnet(lnet_remotenet_t *rnet, lnet_route_t *route)
 266{
 267        unsigned int len = 0;
 268        unsigned int offset = 0;
 269        struct list_head *e;
 270
 271        lnet_shuffle_seed();
 272
 273        list_for_each(e, &rnet->lrn_routes) {
 274                len++;
 275        }
 276
 277        /* len+1 positions to add a new entry, also prevents division by 0 */
 278        offset = cfs_rand() % (len + 1);
 279        list_for_each(e, &rnet->lrn_routes) {
 280                if (offset == 0)
 281                        break;
 282                offset--;
 283        }
 284        list_add(&route->lr_list, e);
 285        list_add(&route->lr_gwlist, &route->lr_gateway->lp_routes);
 286
 287        the_lnet.ln_remote_nets_version++;
 288        lnet_rtr_addref_locked(route->lr_gateway);
 289}
 290
 291int
 292lnet_add_route(__u32 net, unsigned int hops, lnet_nid_t gateway,
 293               unsigned int priority)
 294{
 295        struct list_head *e;
 296        lnet_remotenet_t *rnet;
 297        lnet_remotenet_t *rnet2;
 298        lnet_route_t *route;
 299        lnet_ni_t *ni;
 300        int add_route;
 301        int rc;
 302
 303        CDEBUG(D_NET, "Add route: net %s hops %u priority %u gw %s\n",
 304               libcfs_net2str(net), hops, priority, libcfs_nid2str(gateway));
 305
 306        if (gateway == LNET_NID_ANY ||
 307            LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
 308            net == LNET_NIDNET(LNET_NID_ANY) ||
 309            LNET_NETTYP(net) == LOLND ||
 310            LNET_NIDNET(gateway) == net ||
 311            hops < 1 || hops > 255)
 312                return -EINVAL;
 313
 314        if (lnet_islocalnet(net))              /* it's a local network */
 315                return 0;                      /* ignore the route entry */
 316
 317        /* Assume net, route, all new */
 318        LIBCFS_ALLOC(route, sizeof(*route));
 319        LIBCFS_ALLOC(rnet, sizeof(*rnet));
 320        if (route == NULL || rnet == NULL) {
 321                CERROR("Out of memory creating route %s %d %s\n",
 322                       libcfs_net2str(net), hops, libcfs_nid2str(gateway));
 323                if (route != NULL)
 324                        LIBCFS_FREE(route, sizeof(*route));
 325                if (rnet != NULL)
 326                        LIBCFS_FREE(rnet, sizeof(*rnet));
 327                return -ENOMEM;
 328        }
 329
 330        INIT_LIST_HEAD(&rnet->lrn_routes);
 331        rnet->lrn_net = net;
 332        route->lr_hops = hops;
 333        route->lr_net = net;
 334        route->lr_priority = priority;
 335
 336        lnet_net_lock(LNET_LOCK_EX);
 337
 338        rc = lnet_nid2peer_locked(&route->lr_gateway, gateway, LNET_LOCK_EX);
 339        if (rc != 0) {
 340                lnet_net_unlock(LNET_LOCK_EX);
 341
 342                LIBCFS_FREE(route, sizeof(*route));
 343                LIBCFS_FREE(rnet, sizeof(*rnet));
 344
 345                if (rc == -EHOSTUNREACH) /* gateway is not on a local net */
 346                        return 0;       /* ignore the route entry */
 347                CERROR("Error %d creating route %s %d %s\n", rc,
 348                       libcfs_net2str(net), hops,
 349                       libcfs_nid2str(gateway));
 350
 351                return rc;
 352        }
 353
 354        LASSERT(!the_lnet.ln_shutdown);
 355
 356        rnet2 = lnet_find_net_locked(net);
 357        if (rnet2 == NULL) {
 358                /* new network */
 359                list_add_tail(&rnet->lrn_list, lnet_net2rnethash(net));
 360                rnet2 = rnet;
 361        }
 362
 363        /* Search for a duplicate route (it's a NOOP if it is) */
 364        add_route = 1;
 365        list_for_each(e, &rnet2->lrn_routes) {
 366                lnet_route_t *route2 = list_entry(e, lnet_route_t, lr_list);
 367
 368                if (route2->lr_gateway == route->lr_gateway) {
 369                        add_route = 0;
 370                        break;
 371                }
 372
 373                /* our lookups must be true */
 374                LASSERT(route2->lr_gateway->lp_nid != gateway);
 375        }
 376
 377        if (add_route) {
 378                lnet_peer_addref_locked(route->lr_gateway); /* +1 for notify */
 379                lnet_add_route_to_rnet(rnet2, route);
 380
 381                ni = route->lr_gateway->lp_ni;
 382                lnet_net_unlock(LNET_LOCK_EX);
 383
 384                /* XXX Assume alive */
 385                if (ni->ni_lnd->lnd_notify != NULL)
 386                        (ni->ni_lnd->lnd_notify)(ni, gateway, 1);
 387
 388                lnet_net_lock(LNET_LOCK_EX);
 389        }
 390
 391        /* -1 for notify or !add_route */
 392        lnet_peer_decref_locked(route->lr_gateway);
 393        lnet_net_unlock(LNET_LOCK_EX);
 394
 395        if (!add_route)
 396                LIBCFS_FREE(route, sizeof(*route));
 397
 398        if (rnet != rnet2)
 399                LIBCFS_FREE(rnet, sizeof(*rnet));
 400
 401        return 0;
 402}
 403
 404int
 405lnet_check_routes(void)
 406{
 407        lnet_remotenet_t *rnet;
 408        lnet_route_t *route;
 409        lnet_route_t *route2;
 410        struct list_head *e1;
 411        struct list_head *e2;
 412        int cpt;
 413        struct list_head *rn_list;
 414        int i;
 415
 416        cpt = lnet_net_lock_current();
 417
 418        for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
 419                rn_list = &the_lnet.ln_remote_nets_hash[i];
 420                list_for_each(e1, rn_list) {
 421                        rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
 422
 423                        route2 = NULL;
 424                        list_for_each(e2, &rnet->lrn_routes) {
 425                                lnet_nid_t nid1;
 426                                lnet_nid_t nid2;
 427                                int net;
 428
 429                                route = list_entry(e2, lnet_route_t,
 430                                                       lr_list);
 431
 432                                if (route2 == NULL) {
 433                                        route2 = route;
 434                                        continue;
 435                                }
 436
 437                                if (route->lr_gateway->lp_ni ==
 438                                    route2->lr_gateway->lp_ni)
 439                                        continue;
 440
 441                                nid1 = route->lr_gateway->lp_nid;
 442                                nid2 = route2->lr_gateway->lp_nid;
 443                                net = rnet->lrn_net;
 444
 445                                lnet_net_unlock(cpt);
 446
 447                                CERROR("Routes to %s via %s and %s not supported\n",
 448                                       libcfs_net2str(net),
 449                                       libcfs_nid2str(nid1),
 450                                       libcfs_nid2str(nid2));
 451                                return -EINVAL;
 452                        }
 453                }
 454        }
 455
 456        lnet_net_unlock(cpt);
 457        return 0;
 458}
 459
 460int
 461lnet_del_route(__u32 net, lnet_nid_t gw_nid)
 462{
 463        struct lnet_peer *gateway;
 464        lnet_remotenet_t *rnet;
 465        lnet_route_t *route;
 466        struct list_head *e1;
 467        struct list_head *e2;
 468        int rc = -ENOENT;
 469        struct list_head *rn_list;
 470        int idx = 0;
 471
 472        CDEBUG(D_NET, "Del route: net %s : gw %s\n",
 473               libcfs_net2str(net), libcfs_nid2str(gw_nid));
 474
 475        /* NB Caller may specify either all routes via the given gateway
 476         * or a specific route entry actual NIDs) */
 477
 478        lnet_net_lock(LNET_LOCK_EX);
 479        if (net == LNET_NIDNET(LNET_NID_ANY))
 480                rn_list = &the_lnet.ln_remote_nets_hash[0];
 481        else
 482                rn_list = lnet_net2rnethash(net);
 483
 484 again:
 485        list_for_each(e1, rn_list) {
 486                rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
 487
 488                if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
 489                        net == rnet->lrn_net))
 490                        continue;
 491
 492                list_for_each(e2, &rnet->lrn_routes) {
 493                        route = list_entry(e2, lnet_route_t, lr_list);
 494
 495                        gateway = route->lr_gateway;
 496                        if (!(gw_nid == LNET_NID_ANY ||
 497                              gw_nid == gateway->lp_nid))
 498                                continue;
 499
 500                        list_del(&route->lr_list);
 501                        list_del(&route->lr_gwlist);
 502                        the_lnet.ln_remote_nets_version++;
 503
 504                        if (list_empty(&rnet->lrn_routes))
 505                                list_del(&rnet->lrn_list);
 506                        else
 507                                rnet = NULL;
 508
 509                        lnet_rtr_decref_locked(gateway);
 510                        lnet_peer_decref_locked(gateway);
 511
 512                        lnet_net_unlock(LNET_LOCK_EX);
 513
 514                        LIBCFS_FREE(route, sizeof(*route));
 515
 516                        if (rnet != NULL)
 517                                LIBCFS_FREE(rnet, sizeof(*rnet));
 518
 519                        rc = 0;
 520                        lnet_net_lock(LNET_LOCK_EX);
 521                        goto again;
 522                }
 523        }
 524
 525        if (net == LNET_NIDNET(LNET_NID_ANY) &&
 526            ++idx < LNET_REMOTE_NETS_HASH_SIZE) {
 527                rn_list = &the_lnet.ln_remote_nets_hash[idx];
 528                goto again;
 529        }
 530        lnet_net_unlock(LNET_LOCK_EX);
 531
 532        return rc;
 533}
 534
 535void
 536lnet_destroy_routes(void)
 537{
 538        lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
 539}
 540
 541int
 542lnet_get_route(int idx, __u32 *net, __u32 *hops,
 543               lnet_nid_t *gateway, __u32 *alive, __u32 *priority)
 544{
 545        struct list_head *e1;
 546        struct list_head *e2;
 547        lnet_remotenet_t *rnet;
 548        lnet_route_t *route;
 549        int cpt;
 550        int i;
 551        struct list_head *rn_list;
 552
 553        cpt = lnet_net_lock_current();
 554
 555        for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
 556                rn_list = &the_lnet.ln_remote_nets_hash[i];
 557                list_for_each(e1, rn_list) {
 558                        rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
 559
 560                        list_for_each(e2, &rnet->lrn_routes) {
 561                                route = list_entry(e2, lnet_route_t,
 562                                                       lr_list);
 563
 564                                if (idx-- == 0) {
 565                                        *net      = rnet->lrn_net;
 566                                        *hops     = route->lr_hops;
 567                                        *priority = route->lr_priority;
 568                                        *gateway  = route->lr_gateway->lp_nid;
 569                                        *alive    = route->lr_gateway->lp_alive;
 570                                        lnet_net_unlock(cpt);
 571                                        return 0;
 572                                }
 573                        }
 574                }
 575        }
 576
 577        lnet_net_unlock(cpt);
 578        return -ENOENT;
 579}
 580
 581void
 582lnet_swap_pinginfo(lnet_ping_info_t *info)
 583{
 584        int i;
 585        lnet_ni_status_t *stat;
 586
 587        __swab32s(&info->pi_magic);
 588        __swab32s(&info->pi_features);
 589        __swab32s(&info->pi_pid);
 590        __swab32s(&info->pi_nnis);
 591        for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
 592                stat = &info->pi_ni[i];
 593                __swab64s(&stat->ns_nid);
 594                __swab32s(&stat->ns_status);
 595        }
 596}
 597
 598/**
 599 * parse router-checker pinginfo, record number of down NIs for remote
 600 * networks on that router.
 601 */
 602static void
 603lnet_parse_rc_info(lnet_rc_data_t *rcd)
 604{
 605        lnet_ping_info_t *info = rcd->rcd_pinginfo;
 606        struct lnet_peer *gw = rcd->rcd_gateway;
 607        lnet_route_t *rtr;
 608
 609        if (!gw->lp_alive)
 610                return;
 611
 612        if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
 613                lnet_swap_pinginfo(info);
 614
 615        /* NB always racing with network! */
 616        if (info->pi_magic != LNET_PROTO_PING_MAGIC) {
 617                CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
 618                       libcfs_nid2str(gw->lp_nid), info->pi_magic);
 619                gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
 620                return;
 621        }
 622
 623        gw->lp_ping_feats = info->pi_features;
 624        if ((gw->lp_ping_feats & LNET_PING_FEAT_MASK) == 0) {
 625                CDEBUG(D_NET, "%s: Unexpected features 0x%x\n",
 626                       libcfs_nid2str(gw->lp_nid), gw->lp_ping_feats);
 627                return; /* nothing I can understand */
 628        }
 629
 630        if ((gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS) == 0)
 631                return; /* can't carry NI status info */
 632
 633        list_for_each_entry(rtr, &gw->lp_routes, lr_gwlist) {
 634                int ptl_status = LNET_NI_STATUS_INVALID;
 635                int down = 0;
 636                int up = 0;
 637                int i;
 638
 639                for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
 640                        lnet_ni_status_t *stat = &info->pi_ni[i];
 641                        lnet_nid_t nid = stat->ns_nid;
 642
 643                        if (nid == LNET_NID_ANY) {
 644                                CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
 645                                       libcfs_nid2str(gw->lp_nid));
 646                                gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
 647                                return;
 648                        }
 649
 650                        if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
 651                                continue;
 652
 653                        if (stat->ns_status == LNET_NI_STATUS_DOWN) {
 654                                if (LNET_NETTYP(LNET_NIDNET(nid)) != PTLLND)
 655                                        down++;
 656                                else if (ptl_status != LNET_NI_STATUS_UP)
 657                                        ptl_status = LNET_NI_STATUS_DOWN;
 658                                continue;
 659                        }
 660
 661                        if (stat->ns_status == LNET_NI_STATUS_UP) {
 662                                if (LNET_NIDNET(nid) == rtr->lr_net) {
 663                                        up = 1;
 664                                        break;
 665                                }
 666                                /* ptl NIs are considered down only when
 667                                 * they're all down */
 668                                if (LNET_NETTYP(LNET_NIDNET(nid)) == PTLLND)
 669                                        ptl_status = LNET_NI_STATUS_UP;
 670                                continue;
 671                        }
 672
 673                        CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
 674                               libcfs_nid2str(gw->lp_nid), stat->ns_status);
 675                        gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
 676                        return;
 677                }
 678
 679                if (up) { /* ignore downed NIs if NI for dest network is up */
 680                        rtr->lr_downis = 0;
 681                        continue;
 682                }
 683                rtr->lr_downis = down + (ptl_status == LNET_NI_STATUS_DOWN);
 684        }
 685}
 686
 687static void
 688lnet_router_checker_event(lnet_event_t *event)
 689{
 690        lnet_rc_data_t *rcd = event->md.user_ptr;
 691        struct lnet_peer *lp;
 692
 693        LASSERT(rcd != NULL);
 694
 695        if (event->unlinked) {
 696                LNetInvalidateHandle(&rcd->rcd_mdh);
 697                return;
 698        }
 699
 700        LASSERT(event->type == LNET_EVENT_SEND ||
 701                event->type == LNET_EVENT_REPLY);
 702
 703        lp = rcd->rcd_gateway;
 704        LASSERT(lp != NULL);
 705
 706         /* NB: it's called with holding lnet_res_lock, we have a few
 707          * places need to hold both locks at the same time, please take
 708          * care of lock ordering */
 709        lnet_net_lock(lp->lp_cpt);
 710        if (!lnet_isrouter(lp) || lp->lp_rcd != rcd) {
 711                /* ignore if no longer a router or rcd is replaced */
 712                goto out;
 713        }
 714
 715        if (event->type == LNET_EVENT_SEND) {
 716                lp->lp_ping_notsent = 0;
 717                if (event->status == 0)
 718                        goto out;
 719        }
 720
 721        /* LNET_EVENT_REPLY */
 722        /* A successful REPLY means the router is up.  If _any_ comms
 723         * to the router fail I assume it's down (this will happen if
 724         * we ping alive routers to try to detect router death before
 725         * apps get burned). */
 726
 727        lnet_notify_locked(lp, 1, (event->status == 0), cfs_time_current());
 728        /* The router checker will wake up very shortly and do the
 729         * actual notification.
 730         * XXX If 'lp' stops being a router before then, it will still
 731         * have the notification pending!!! */
 732
 733        if (avoid_asym_router_failure && event->status == 0)
 734                lnet_parse_rc_info(rcd);
 735
 736 out:
 737        lnet_net_unlock(lp->lp_cpt);
 738}
 739
 740static void
 741lnet_wait_known_routerstate(void)
 742{
 743        lnet_peer_t *rtr;
 744        struct list_head *entry;
 745        int all_known;
 746
 747        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
 748
 749        for (;;) {
 750                int cpt = lnet_net_lock_current();
 751
 752                all_known = 1;
 753                list_for_each(entry, &the_lnet.ln_routers) {
 754                        rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
 755
 756                        if (rtr->lp_alive_count == 0) {
 757                                all_known = 0;
 758                                break;
 759                        }
 760                }
 761
 762                lnet_net_unlock(cpt);
 763
 764                if (all_known)
 765                        return;
 766
 767                set_current_state(TASK_UNINTERRUPTIBLE);
 768                schedule_timeout(cfs_time_seconds(1));
 769        }
 770}
 771
 772void
 773lnet_router_ni_update_locked(lnet_peer_t *gw, __u32 net)
 774{
 775        lnet_route_t *rte;
 776
 777        if ((gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS) != 0) {
 778                list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) {
 779                        if (rte->lr_net == net) {
 780                                rte->lr_downis = 0;
 781                                break;
 782                        }
 783                }
 784        }
 785}
 786
 787static void
 788lnet_update_ni_status_locked(void)
 789{
 790        lnet_ni_t *ni;
 791        time64_t now;
 792        int timeout;
 793
 794        LASSERT(the_lnet.ln_routing);
 795
 796        timeout = router_ping_timeout +
 797                  max(live_router_check_interval, dead_router_check_interval);
 798
 799        now = ktime_get_real_seconds();
 800        list_for_each_entry(ni, &the_lnet.ln_nis, ni_list) {
 801                if (ni->ni_lnd->lnd_type == LOLND)
 802                        continue;
 803
 804                if (now < ni->ni_last_alive + timeout)
 805                        continue;
 806
 807                lnet_ni_lock(ni);
 808                /* re-check with lock */
 809                if (now < ni->ni_last_alive + timeout) {
 810                        lnet_ni_unlock(ni);
 811                        continue;
 812                }
 813
 814                LASSERT(ni->ni_status != NULL);
 815
 816                if (ni->ni_status->ns_status != LNET_NI_STATUS_DOWN) {
 817                        CDEBUG(D_NET, "NI(%s:%d) status changed to down\n",
 818                               libcfs_nid2str(ni->ni_nid), timeout);
 819                        /* NB: so far, this is the only place to set
 820                         * NI status to "down" */
 821                        ni->ni_status->ns_status = LNET_NI_STATUS_DOWN;
 822                }
 823                lnet_ni_unlock(ni);
 824        }
 825}
 826
 827static void
 828lnet_destroy_rc_data(lnet_rc_data_t *rcd)
 829{
 830        LASSERT(list_empty(&rcd->rcd_list));
 831        /* detached from network */
 832        LASSERT(LNetHandleIsInvalid(rcd->rcd_mdh));
 833
 834        if (rcd->rcd_gateway != NULL) {
 835                int cpt = rcd->rcd_gateway->lp_cpt;
 836
 837                lnet_net_lock(cpt);
 838                lnet_peer_decref_locked(rcd->rcd_gateway);
 839                lnet_net_unlock(cpt);
 840        }
 841
 842        if (rcd->rcd_pinginfo != NULL)
 843                LIBCFS_FREE(rcd->rcd_pinginfo, LNET_PINGINFO_SIZE);
 844
 845        LIBCFS_FREE(rcd, sizeof(*rcd));
 846}
 847
 848static lnet_rc_data_t *
 849lnet_create_rc_data_locked(lnet_peer_t *gateway)
 850{
 851        lnet_rc_data_t *rcd = NULL;
 852        lnet_ping_info_t *pi;
 853        int rc;
 854        int i;
 855
 856        lnet_net_unlock(gateway->lp_cpt);
 857
 858        LIBCFS_ALLOC(rcd, sizeof(*rcd));
 859        if (rcd == NULL)
 860                goto out;
 861
 862        LNetInvalidateHandle(&rcd->rcd_mdh);
 863        INIT_LIST_HEAD(&rcd->rcd_list);
 864
 865        LIBCFS_ALLOC(pi, LNET_PINGINFO_SIZE);
 866        if (pi == NULL)
 867                goto out;
 868
 869        for (i = 0; i < LNET_MAX_RTR_NIS; i++) {
 870                pi->pi_ni[i].ns_nid = LNET_NID_ANY;
 871                pi->pi_ni[i].ns_status = LNET_NI_STATUS_INVALID;
 872        }
 873        rcd->rcd_pinginfo = pi;
 874
 875        LASSERT(!LNetHandleIsInvalid(the_lnet.ln_rc_eqh));
 876        rc = LNetMDBind((lnet_md_t){.start     = pi,
 877                                    .user_ptr  = rcd,
 878                                    .length    = LNET_PINGINFO_SIZE,
 879                                    .threshold = LNET_MD_THRESH_INF,
 880                                    .options   = LNET_MD_TRUNCATE,
 881                                    .eq_handle = the_lnet.ln_rc_eqh},
 882                        LNET_UNLINK,
 883                        &rcd->rcd_mdh);
 884        if (rc < 0) {
 885                CERROR("Can't bind MD: %d\n", rc);
 886                goto out;
 887        }
 888        LASSERT(rc == 0);
 889
 890        lnet_net_lock(gateway->lp_cpt);
 891        /* router table changed or someone has created rcd for this gateway */
 892        if (!lnet_isrouter(gateway) || gateway->lp_rcd != NULL) {
 893                lnet_net_unlock(gateway->lp_cpt);
 894                goto out;
 895        }
 896
 897        lnet_peer_addref_locked(gateway);
 898        rcd->rcd_gateway = gateway;
 899        gateway->lp_rcd = rcd;
 900        gateway->lp_ping_notsent = 0;
 901
 902        return rcd;
 903
 904 out:
 905        if (rcd != NULL) {
 906                if (!LNetHandleIsInvalid(rcd->rcd_mdh)) {
 907                        rc = LNetMDUnlink(rcd->rcd_mdh);
 908                        LASSERT(rc == 0);
 909                }
 910                lnet_destroy_rc_data(rcd);
 911        }
 912
 913        lnet_net_lock(gateway->lp_cpt);
 914        return gateway->lp_rcd;
 915}
 916
 917static int
 918lnet_router_check_interval(lnet_peer_t *rtr)
 919{
 920        int secs;
 921
 922        secs = rtr->lp_alive ? live_router_check_interval :
 923                               dead_router_check_interval;
 924        if (secs < 0)
 925                secs = 0;
 926
 927        return secs;
 928}
 929
 930static void
 931lnet_ping_router_locked(lnet_peer_t *rtr)
 932{
 933        lnet_rc_data_t *rcd = NULL;
 934        unsigned long now = cfs_time_current();
 935        int secs;
 936
 937        lnet_peer_addref_locked(rtr);
 938
 939        if (rtr->lp_ping_deadline != 0 && /* ping timed out? */
 940            cfs_time_after(now, rtr->lp_ping_deadline))
 941                lnet_notify_locked(rtr, 1, 0, now);
 942
 943        /* Run any outstanding notifications */
 944        lnet_ni_notify_locked(rtr->lp_ni, rtr);
 945
 946        if (!lnet_isrouter(rtr) ||
 947            the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
 948                /* router table changed or router checker is shutting down */
 949                lnet_peer_decref_locked(rtr);
 950                return;
 951        }
 952
 953        rcd = rtr->lp_rcd != NULL ?
 954              rtr->lp_rcd : lnet_create_rc_data_locked(rtr);
 955
 956        if (rcd == NULL)
 957                return;
 958
 959        secs = lnet_router_check_interval(rtr);
 960
 961        CDEBUG(D_NET,
 962               "rtr %s %d: deadline %lu ping_notsent %d alive %d alive_count %d lp_ping_timestamp %lu\n",
 963               libcfs_nid2str(rtr->lp_nid), secs,
 964               rtr->lp_ping_deadline, rtr->lp_ping_notsent,
 965               rtr->lp_alive, rtr->lp_alive_count, rtr->lp_ping_timestamp);
 966
 967        if (secs != 0 && !rtr->lp_ping_notsent &&
 968            cfs_time_after(now, cfs_time_add(rtr->lp_ping_timestamp,
 969                                             cfs_time_seconds(secs)))) {
 970                int rc;
 971                lnet_process_id_t id;
 972                lnet_handle_md_t mdh;
 973
 974                id.nid = rtr->lp_nid;
 975                id.pid = LUSTRE_SRV_LNET_PID;
 976                CDEBUG(D_NET, "Check: %s\n", libcfs_id2str(id));
 977
 978                rtr->lp_ping_notsent   = 1;
 979                rtr->lp_ping_timestamp = now;
 980
 981                mdh = rcd->rcd_mdh;
 982
 983                if (rtr->lp_ping_deadline == 0) {
 984                        rtr->lp_ping_deadline =
 985                                cfs_time_shift(router_ping_timeout);
 986                }
 987
 988                lnet_net_unlock(rtr->lp_cpt);
 989
 990                rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
 991                             LNET_PROTO_PING_MATCHBITS, 0);
 992
 993                lnet_net_lock(rtr->lp_cpt);
 994                if (rc != 0)
 995                        rtr->lp_ping_notsent = 0; /* no event pending */
 996        }
 997
 998        lnet_peer_decref_locked(rtr);
 999}
1000
1001int
1002lnet_router_checker_start(void)
1003{
1004        int rc;
1005        int eqsz;
1006
1007        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1008
1009        if (check_routers_before_use &&
1010            dead_router_check_interval <= 0) {
1011                LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be set if 'check_routers_before_use' is set\n");
1012                return -EINVAL;
1013        }
1014
1015        if (!the_lnet.ln_routing &&
1016            live_router_check_interval <= 0 &&
1017            dead_router_check_interval <= 0)
1018                return 0;
1019
1020        sema_init(&the_lnet.ln_rc_signal, 0);
1021        /* EQ size doesn't matter; the callback is guaranteed to get every
1022         * event */
1023        eqsz = 0;
1024        rc = LNetEQAlloc(eqsz, lnet_router_checker_event,
1025                         &the_lnet.ln_rc_eqh);
1026        if (rc != 0) {
1027                CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
1028                return -ENOMEM;
1029        }
1030
1031        the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
1032        rc = PTR_ERR(kthread_run(lnet_router_checker,
1033                                 NULL, "router_checker"));
1034        if (IS_ERR_VALUE(rc)) {
1035                CERROR("Can't start router checker thread: %d\n", rc);
1036                /* block until event callback signals exit */
1037                down(&the_lnet.ln_rc_signal);
1038                rc = LNetEQFree(the_lnet.ln_rc_eqh);
1039                LASSERT(rc == 0);
1040                the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1041                return -ENOMEM;
1042        }
1043
1044        if (check_routers_before_use) {
1045                /* Note that a helpful side-effect of pinging all known routers
1046                 * at startup is that it makes them drop stale connections they
1047                 * may have to a previous instance of me. */
1048                lnet_wait_known_routerstate();
1049        }
1050
1051        return 0;
1052}
1053
1054void
1055lnet_router_checker_stop(void)
1056{
1057        int rc;
1058
1059        if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
1060                return;
1061
1062        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1063        the_lnet.ln_rc_state = LNET_RC_STATE_STOPPING;
1064
1065        /* block until event callback signals exit */
1066        down(&the_lnet.ln_rc_signal);
1067        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1068
1069        rc = LNetEQFree(the_lnet.ln_rc_eqh);
1070        LASSERT(rc == 0);
1071}
1072
1073static void
1074lnet_prune_rc_data(int wait_unlink)
1075{
1076        lnet_rc_data_t *rcd;
1077        lnet_rc_data_t *tmp;
1078        lnet_peer_t *lp;
1079        struct list_head head;
1080        int i = 2;
1081
1082        if (likely(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
1083                   list_empty(&the_lnet.ln_rcd_deathrow) &&
1084                   list_empty(&the_lnet.ln_rcd_zombie)))
1085                return;
1086
1087        INIT_LIST_HEAD(&head);
1088
1089        lnet_net_lock(LNET_LOCK_EX);
1090
1091        if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
1092                /* router checker is stopping, prune all */
1093                list_for_each_entry(lp, &the_lnet.ln_routers,
1094                                        lp_rtr_list) {
1095                        if (lp->lp_rcd == NULL)
1096                                continue;
1097
1098                        LASSERT(list_empty(&lp->lp_rcd->rcd_list));
1099                        list_add(&lp->lp_rcd->rcd_list,
1100                                     &the_lnet.ln_rcd_deathrow);
1101                        lp->lp_rcd = NULL;
1102                }
1103        }
1104
1105        /* unlink all RCDs on deathrow list */
1106        list_splice_init(&the_lnet.ln_rcd_deathrow, &head);
1107
1108        if (!list_empty(&head)) {
1109                lnet_net_unlock(LNET_LOCK_EX);
1110
1111                list_for_each_entry(rcd, &head, rcd_list)
1112                        LNetMDUnlink(rcd->rcd_mdh);
1113
1114                lnet_net_lock(LNET_LOCK_EX);
1115        }
1116
1117        list_splice_init(&head, &the_lnet.ln_rcd_zombie);
1118
1119        /* release all zombie RCDs */
1120        while (!list_empty(&the_lnet.ln_rcd_zombie)) {
1121                list_for_each_entry_safe(rcd, tmp, &the_lnet.ln_rcd_zombie,
1122                                             rcd_list) {
1123                        if (LNetHandleIsInvalid(rcd->rcd_mdh))
1124                                list_move(&rcd->rcd_list, &head);
1125                }
1126
1127                wait_unlink = wait_unlink &&
1128                              !list_empty(&the_lnet.ln_rcd_zombie);
1129
1130                lnet_net_unlock(LNET_LOCK_EX);
1131
1132                while (!list_empty(&head)) {
1133                        rcd = list_entry(head.next,
1134                                             lnet_rc_data_t, rcd_list);
1135                        list_del_init(&rcd->rcd_list);
1136                        lnet_destroy_rc_data(rcd);
1137                }
1138
1139                if (!wait_unlink)
1140                        return;
1141
1142                i++;
1143                CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
1144                       "Waiting for rc buffers to unlink\n");
1145                set_current_state(TASK_UNINTERRUPTIBLE);
1146                schedule_timeout(cfs_time_seconds(1) / 4);
1147
1148                lnet_net_lock(LNET_LOCK_EX);
1149        }
1150
1151        lnet_net_unlock(LNET_LOCK_EX);
1152}
1153
1154static int
1155lnet_router_checker(void *arg)
1156{
1157        lnet_peer_t *rtr;
1158        struct list_head *entry;
1159
1160        cfs_block_allsigs();
1161
1162        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1163
1164        while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
1165                __u64 version;
1166                int cpt;
1167                int cpt2;
1168
1169                cpt = lnet_net_lock_current();
1170rescan:
1171                version = the_lnet.ln_routers_version;
1172
1173                list_for_each(entry, &the_lnet.ln_routers) {
1174                        rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
1175
1176                        cpt2 = lnet_cpt_of_nid_locked(rtr->lp_nid);
1177                        if (cpt != cpt2) {
1178                                lnet_net_unlock(cpt);
1179                                cpt = cpt2;
1180                                lnet_net_lock(cpt);
1181                                /* the routers list has changed */
1182                                if (version != the_lnet.ln_routers_version)
1183                                        goto rescan;
1184                        }
1185
1186                        lnet_ping_router_locked(rtr);
1187
1188                        /* NB dropped lock */
1189                        if (version != the_lnet.ln_routers_version) {
1190                                /* the routers list has changed */
1191                                goto rescan;
1192                        }
1193                }
1194
1195                if (the_lnet.ln_routing)
1196                        lnet_update_ni_status_locked();
1197
1198                lnet_net_unlock(cpt);
1199
1200                lnet_prune_rc_data(0); /* don't wait for UNLINK */
1201
1202                /* Call schedule_timeout() here always adds 1 to load average
1203                 * because kernel counts # active tasks as nr_running
1204                 * + nr_uninterruptible. */
1205                set_current_state(TASK_INTERRUPTIBLE);
1206                schedule_timeout(cfs_time_seconds(1));
1207        }
1208
1209        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING);
1210
1211        lnet_prune_rc_data(1); /* wait for UNLINK */
1212
1213        the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1214        up(&the_lnet.ln_rc_signal);
1215        /* The unlink event callback will signal final completion */
1216        return 0;
1217}
1218
1219static void
1220lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
1221{
1222        int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1223
1224        while (--npages >= 0)
1225                __free_page(rb->rb_kiov[npages].kiov_page);
1226
1227        LIBCFS_FREE(rb, sz);
1228}
1229
1230static lnet_rtrbuf_t *
1231lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp, int cpt)
1232{
1233        int npages = rbp->rbp_npages;
1234        int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1235        struct page *page;
1236        lnet_rtrbuf_t *rb;
1237        int i;
1238
1239        LIBCFS_CPT_ALLOC(rb, lnet_cpt_table(), cpt, sz);
1240        if (rb == NULL)
1241                return NULL;
1242
1243        rb->rb_pool = rbp;
1244
1245        for (i = 0; i < npages; i++) {
1246                page = alloc_pages_node(
1247                                cfs_cpt_spread_node(lnet_cpt_table(), cpt),
1248                                GFP_KERNEL | __GFP_ZERO, 0);
1249                if (page == NULL) {
1250                        while (--i >= 0)
1251                                __free_page(rb->rb_kiov[i].kiov_page);
1252
1253                        LIBCFS_FREE(rb, sz);
1254                        return NULL;
1255                }
1256
1257                rb->rb_kiov[i].kiov_len = PAGE_CACHE_SIZE;
1258                rb->rb_kiov[i].kiov_offset = 0;
1259                rb->rb_kiov[i].kiov_page = page;
1260        }
1261
1262        return rb;
1263}
1264
1265static void
1266lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
1267{
1268        int npages = rbp->rbp_npages;
1269        int nbuffers = 0;
1270        lnet_rtrbuf_t *rb;
1271
1272        if (rbp->rbp_nbuffers == 0) /* not initialized or already freed */
1273                return;
1274
1275        LASSERT(list_empty(&rbp->rbp_msgs));
1276        LASSERT(rbp->rbp_credits == rbp->rbp_nbuffers);
1277
1278        while (!list_empty(&rbp->rbp_bufs)) {
1279                LASSERT(rbp->rbp_credits > 0);
1280
1281                rb = list_entry(rbp->rbp_bufs.next,
1282                                    lnet_rtrbuf_t, rb_list);
1283                list_del(&rb->rb_list);
1284                lnet_destroy_rtrbuf(rb, npages);
1285                nbuffers++;
1286        }
1287
1288        LASSERT(rbp->rbp_nbuffers == nbuffers);
1289        LASSERT(rbp->rbp_credits == nbuffers);
1290
1291        rbp->rbp_nbuffers = rbp->rbp_credits = 0;
1292}
1293
1294static int
1295lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
1296{
1297        lnet_rtrbuf_t *rb;
1298        int i;
1299
1300        if (rbp->rbp_nbuffers != 0) {
1301                LASSERT(rbp->rbp_nbuffers == nbufs);
1302                return 0;
1303        }
1304
1305        for (i = 0; i < nbufs; i++) {
1306                rb = lnet_new_rtrbuf(rbp, cpt);
1307
1308                if (rb == NULL) {
1309                        CERROR("Failed to allocate %d router bufs of %d pages\n",
1310                               nbufs, rbp->rbp_npages);
1311                        return -ENOMEM;
1312                }
1313
1314                rbp->rbp_nbuffers++;
1315                rbp->rbp_credits++;
1316                rbp->rbp_mincredits++;
1317                list_add(&rb->rb_list, &rbp->rbp_bufs);
1318
1319                /* No allocation "under fire" */
1320                /* Otherwise we'd need code to schedule blocked msgs etc */
1321                LASSERT(!the_lnet.ln_routing);
1322        }
1323
1324        LASSERT(rbp->rbp_credits == nbufs);
1325        return 0;
1326}
1327
1328static void
1329lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
1330{
1331        INIT_LIST_HEAD(&rbp->rbp_msgs);
1332        INIT_LIST_HEAD(&rbp->rbp_bufs);
1333
1334        rbp->rbp_npages = npages;
1335        rbp->rbp_credits = 0;
1336        rbp->rbp_mincredits = 0;
1337}
1338
1339void
1340lnet_rtrpools_free(void)
1341{
1342        lnet_rtrbufpool_t *rtrp;
1343        int i;
1344
1345        if (the_lnet.ln_rtrpools == NULL) /* uninitialized or freed */
1346                return;
1347
1348        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1349                lnet_rtrpool_free_bufs(&rtrp[0]);
1350                lnet_rtrpool_free_bufs(&rtrp[1]);
1351                lnet_rtrpool_free_bufs(&rtrp[2]);
1352        }
1353
1354        cfs_percpt_free(the_lnet.ln_rtrpools);
1355        the_lnet.ln_rtrpools = NULL;
1356}
1357
1358static int
1359lnet_nrb_tiny_calculate(int npages)
1360{
1361        int nrbs = LNET_NRB_TINY;
1362
1363        if (tiny_router_buffers < 0) {
1364                LCONSOLE_ERROR_MSG(0x10c,
1365                                   "tiny_router_buffers=%d invalid when routing enabled\n",
1366                                   tiny_router_buffers);
1367                return -1;
1368        }
1369
1370        if (tiny_router_buffers > 0)
1371                nrbs = tiny_router_buffers;
1372
1373        nrbs /= LNET_CPT_NUMBER;
1374        return max(nrbs, LNET_NRB_TINY_MIN);
1375}
1376
1377static int
1378lnet_nrb_small_calculate(int npages)
1379{
1380        int nrbs = LNET_NRB_SMALL;
1381
1382        if (small_router_buffers < 0) {
1383                LCONSOLE_ERROR_MSG(0x10c,
1384                                   "small_router_buffers=%d invalid when routing enabled\n",
1385                                   small_router_buffers);
1386                return -1;
1387        }
1388
1389        if (small_router_buffers > 0)
1390                nrbs = small_router_buffers;
1391
1392        nrbs /= LNET_CPT_NUMBER;
1393        return max(nrbs, LNET_NRB_SMALL_MIN);
1394}
1395
1396static int
1397lnet_nrb_large_calculate(int npages)
1398{
1399        int nrbs = LNET_NRB_LARGE;
1400
1401        if (large_router_buffers < 0) {
1402                LCONSOLE_ERROR_MSG(0x10c,
1403                                   "large_router_buffers=%d invalid when routing enabled\n",
1404                                   large_router_buffers);
1405                return -1;
1406        }
1407
1408        if (large_router_buffers > 0)
1409                nrbs = large_router_buffers;
1410
1411        nrbs /= LNET_CPT_NUMBER;
1412        return max(nrbs, LNET_NRB_LARGE_MIN);
1413}
1414
1415int
1416lnet_rtrpools_alloc(int im_a_router)
1417{
1418        lnet_rtrbufpool_t *rtrp;
1419        int large_pages;
1420        int small_pages = 1;
1421        int nrb_tiny;
1422        int nrb_small;
1423        int nrb_large;
1424        int rc;
1425        int i;
1426
1427        large_pages = (LNET_MTU + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
1428
1429        if (!strcmp(forwarding, "")) {
1430                /* not set either way */
1431                if (!im_a_router)
1432                        return 0;
1433        } else if (!strcmp(forwarding, "disabled")) {
1434                /* explicitly disabled */
1435                return 0;
1436        } else if (!strcmp(forwarding, "enabled")) {
1437                /* explicitly enabled */
1438        } else {
1439                LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either 'enabled' or 'disabled'\n");
1440                return -EINVAL;
1441        }
1442
1443        nrb_tiny = lnet_nrb_tiny_calculate(0);
1444        if (nrb_tiny < 0)
1445                return -EINVAL;
1446
1447        nrb_small = lnet_nrb_small_calculate(small_pages);
1448        if (nrb_small < 0)
1449                return -EINVAL;
1450
1451        nrb_large = lnet_nrb_large_calculate(large_pages);
1452        if (nrb_large < 0)
1453                return -EINVAL;
1454
1455        the_lnet.ln_rtrpools = cfs_percpt_alloc(lnet_cpt_table(),
1456                                                LNET_NRBPOOLS *
1457                                                sizeof(lnet_rtrbufpool_t));
1458        if (the_lnet.ln_rtrpools == NULL) {
1459                LCONSOLE_ERROR_MSG(0x10c,
1460                                   "Failed to initialize router buffe pool\n");
1461                return -ENOMEM;
1462        }
1463
1464        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1465                lnet_rtrpool_init(&rtrp[0], 0);
1466                rc = lnet_rtrpool_alloc_bufs(&rtrp[0], nrb_tiny, i);
1467                if (rc != 0)
1468                        goto failed;
1469
1470                lnet_rtrpool_init(&rtrp[1], small_pages);
1471                rc = lnet_rtrpool_alloc_bufs(&rtrp[1], nrb_small, i);
1472                if (rc != 0)
1473                        goto failed;
1474
1475                lnet_rtrpool_init(&rtrp[2], large_pages);
1476                rc = lnet_rtrpool_alloc_bufs(&rtrp[2], nrb_large, i);
1477                if (rc != 0)
1478                        goto failed;
1479        }
1480
1481        lnet_net_lock(LNET_LOCK_EX);
1482        the_lnet.ln_routing = 1;
1483        lnet_net_unlock(LNET_LOCK_EX);
1484
1485        return 0;
1486
1487 failed:
1488        lnet_rtrpools_free();
1489        return rc;
1490}
1491
1492int
1493lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, unsigned long when)
1494{
1495        struct lnet_peer *lp = NULL;
1496        unsigned long now = cfs_time_current();
1497        int cpt = lnet_cpt_of_nid(nid);
1498
1499        LASSERT(!in_interrupt());
1500
1501        CDEBUG(D_NET, "%s notifying %s: %s\n",
1502                (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1503                libcfs_nid2str(nid),
1504                alive ? "up" : "down");
1505
1506        if (ni != NULL &&
1507            LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
1508                CWARN("Ignoring notification of %s %s by %s (different net)\n",
1509                        libcfs_nid2str(nid), alive ? "birth" : "death",
1510                        libcfs_nid2str(ni->ni_nid));
1511                return -EINVAL;
1512        }
1513
1514        /* can't do predictions... */
1515        if (cfs_time_after(when, now)) {
1516                CWARN("Ignoring prediction from %s of %s %s %ld seconds in the future\n",
1517                      (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1518                      libcfs_nid2str(nid), alive ? "up" : "down",
1519                      cfs_duration_sec(cfs_time_sub(when, now)));
1520                return -EINVAL;
1521        }
1522
1523        if (ni != NULL && !alive &&          /* LND telling me she's down */
1524            !auto_down) {                      /* auto-down disabled */
1525                CDEBUG(D_NET, "Auto-down disabled\n");
1526                return 0;
1527        }
1528
1529        lnet_net_lock(cpt);
1530
1531        if (the_lnet.ln_shutdown) {
1532                lnet_net_unlock(cpt);
1533                return -ESHUTDOWN;
1534        }
1535
1536        lp = lnet_find_peer_locked(the_lnet.ln_peer_tables[cpt], nid);
1537        if (lp == NULL) {
1538                /* nid not found */
1539                lnet_net_unlock(cpt);
1540                CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
1541                return 0;
1542        }
1543
1544        /* We can't fully trust LND on reporting exact peer last_alive
1545         * if he notifies us about dead peer. For example ksocklnd can
1546         * call us with when == _time_when_the_node_was_booted_ if
1547         * no connections were successfully established */
1548        if (ni != NULL && !alive && when < lp->lp_last_alive)
1549                when = lp->lp_last_alive;
1550
1551        lnet_notify_locked(lp, ni == NULL, alive, when);
1552
1553        lnet_ni_notify_locked(ni, lp);
1554
1555        lnet_peer_decref_locked(lp);
1556
1557        lnet_net_unlock(cpt);
1558        return 0;
1559}
1560EXPORT_SYMBOL(lnet_notify);
1561