linux/drivers/staging/lustre/lnet/lnet/router.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
   3 *
   4 * Copyright (c) 2011, 2015, Intel Corporation.
   5 *
   6 *   This file is part of Portals
   7 *   http://sourceforge.net/projects/sandiaportals/
   8 *
   9 *   Portals is free software; you can redistribute it and/or
  10 *   modify it under the terms of version 2 of the GNU General Public
  11 *   License as published by the Free Software Foundation.
  12 *
  13 *   Portals is distributed in the hope that it will be useful,
  14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16 *   GNU General Public License for more details.
  17 *
  18 */
  19
  20#define DEBUG_SUBSYSTEM S_LNET
  21
  22#include <linux/completion.h>
  23#include <linux/lnet/lib-lnet.h>
  24
  25#define LNET_NRB_TINY_MIN       512     /* min value for each CPT */
  26#define LNET_NRB_TINY           (LNET_NRB_TINY_MIN * 4)
  27#define LNET_NRB_SMALL_MIN      4096    /* min value for each CPT */
  28#define LNET_NRB_SMALL          (LNET_NRB_SMALL_MIN * 4)
  29#define LNET_NRB_SMALL_PAGES    1
  30#define LNET_NRB_LARGE_MIN      256     /* min value for each CPT */
  31#define LNET_NRB_LARGE          (LNET_NRB_LARGE_MIN * 4)
  32#define LNET_NRB_LARGE_PAGES   ((LNET_MTU + PAGE_SIZE - 1) >> \
  33                                 PAGE_SHIFT)
  34
  35static char *forwarding = "";
  36module_param(forwarding, charp, 0444);
  37MODULE_PARM_DESC(forwarding, "Explicitly enable/disable forwarding between networks");
  38
  39static int tiny_router_buffers;
  40module_param(tiny_router_buffers, int, 0444);
  41MODULE_PARM_DESC(tiny_router_buffers, "# of 0 payload messages to buffer in the router");
  42static int small_router_buffers;
  43module_param(small_router_buffers, int, 0444);
  44MODULE_PARM_DESC(small_router_buffers, "# of small (1 page) messages to buffer in the router");
  45static int large_router_buffers;
  46module_param(large_router_buffers, int, 0444);
  47MODULE_PARM_DESC(large_router_buffers, "# of large messages to buffer in the router");
  48static int peer_buffer_credits;
  49module_param(peer_buffer_credits, int, 0444);
  50MODULE_PARM_DESC(peer_buffer_credits, "# router buffer credits per peer");
  51
  52static int auto_down = 1;
  53module_param(auto_down, int, 0444);
  54MODULE_PARM_DESC(auto_down, "Automatically mark peers down on comms error");
  55
  56int
  57lnet_peer_buffer_credits(struct lnet_ni *ni)
  58{
  59        /* NI option overrides LNet default */
  60        if (ni->ni_peerrtrcredits > 0)
  61                return ni->ni_peerrtrcredits;
  62        if (peer_buffer_credits > 0)
  63                return peer_buffer_credits;
  64
  65        /*
  66         * As an approximation, allow this peer the same number of router
  67         * buffers as it is allowed outstanding sends
  68         */
  69        return ni->ni_peertxcredits;
  70}
  71
  72/* forward ref's */
  73static int lnet_router_checker(void *);
  74
  75static int check_routers_before_use;
  76module_param(check_routers_before_use, int, 0444);
  77MODULE_PARM_DESC(check_routers_before_use, "Assume routers are down and ping them before use");
  78
  79int avoid_asym_router_failure = 1;
  80module_param(avoid_asym_router_failure, int, 0644);
  81MODULE_PARM_DESC(avoid_asym_router_failure, "Avoid asymmetrical router failures (0 to disable)");
  82
  83static int dead_router_check_interval = 60;
  84module_param(dead_router_check_interval, int, 0644);
  85MODULE_PARM_DESC(dead_router_check_interval, "Seconds between dead router health checks (<= 0 to disable)");
  86
  87static int live_router_check_interval = 60;
  88module_param(live_router_check_interval, int, 0644);
  89MODULE_PARM_DESC(live_router_check_interval, "Seconds between live router health checks (<= 0 to disable)");
  90
  91static int router_ping_timeout = 50;
  92module_param(router_ping_timeout, int, 0644);
  93MODULE_PARM_DESC(router_ping_timeout, "Seconds to wait for the reply to a router health query");
  94
  95int
  96lnet_peers_start_down(void)
  97{
  98        return check_routers_before_use;
  99}
 100
 101void
 102lnet_notify_locked(struct lnet_peer *lp, int notifylnd, int alive,
 103                   unsigned long when)
 104{
 105        if (time_before(when, lp->lp_timestamp)) { /* out of date information */
 106                CDEBUG(D_NET, "Out of date\n");
 107                return;
 108        }
 109
 110        lp->lp_timestamp = when;                /* update timestamp */
 111        lp->lp_ping_deadline = 0;              /* disable ping timeout */
 112
 113        if (lp->lp_alive_count &&         /* got old news */
 114            (!lp->lp_alive) == (!alive)) {      /* new date for old news */
 115                CDEBUG(D_NET, "Old news\n");
 116                return;
 117        }
 118
 119        /* Flag that notification is outstanding */
 120
 121        lp->lp_alive_count++;
 122        lp->lp_alive = !(!alive);              /* 1 bit! */
 123        lp->lp_notify = 1;
 124        lp->lp_notifylnd |= notifylnd;
 125        if (lp->lp_alive)
 126                lp->lp_ping_feats = LNET_PING_FEAT_INVAL; /* reset */
 127
 128        CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
 129}
 130
 131static void
 132lnet_ni_notify_locked(struct lnet_ni *ni, struct lnet_peer *lp)
 133{
 134        int alive;
 135        int notifylnd;
 136
 137        /*
 138         * Notify only in 1 thread at any time to ensure ordered notification.
 139         * NB individual events can be missed; the only guarantee is that you
 140         * always get the most recent news
 141         */
 142        if (lp->lp_notifying || !ni)
 143                return;
 144
 145        lp->lp_notifying = 1;
 146
 147        while (lp->lp_notify) {
 148                alive = lp->lp_alive;
 149                notifylnd = lp->lp_notifylnd;
 150
 151                lp->lp_notifylnd = 0;
 152                lp->lp_notify    = 0;
 153
 154                if (notifylnd && ni->ni_lnd->lnd_notify) {
 155                        lnet_net_unlock(lp->lp_cpt);
 156
 157                        /*
 158                         * A new notification could happen now; I'll handle it
 159                         * when control returns to me
 160                         */
 161                        ni->ni_lnd->lnd_notify(ni, lp->lp_nid, alive);
 162
 163                        lnet_net_lock(lp->lp_cpt);
 164                }
 165        }
 166
 167        lp->lp_notifying = 0;
 168}
 169
 170static void
 171lnet_rtr_addref_locked(struct lnet_peer *lp)
 172{
 173        LASSERT(lp->lp_refcount > 0);
 174        LASSERT(lp->lp_rtr_refcount >= 0);
 175
 176        /* lnet_net_lock must be exclusively locked */
 177        lp->lp_rtr_refcount++;
 178        if (lp->lp_rtr_refcount == 1) {
 179                struct list_head *pos;
 180
 181                /* a simple insertion sort */
 182                list_for_each_prev(pos, &the_lnet.ln_routers) {
 183                        struct lnet_peer *rtr;
 184
 185                        rtr = list_entry(pos, struct lnet_peer, lp_rtr_list);
 186                        if (rtr->lp_nid < lp->lp_nid)
 187                                break;
 188                }
 189
 190                list_add(&lp->lp_rtr_list, pos);
 191                /* addref for the_lnet.ln_routers */
 192                lnet_peer_addref_locked(lp);
 193                the_lnet.ln_routers_version++;
 194        }
 195}
 196
 197static void
 198lnet_rtr_decref_locked(struct lnet_peer *lp)
 199{
 200        LASSERT(lp->lp_refcount > 0);
 201        LASSERT(lp->lp_rtr_refcount > 0);
 202
 203        /* lnet_net_lock must be exclusively locked */
 204        lp->lp_rtr_refcount--;
 205        if (!lp->lp_rtr_refcount) {
 206                LASSERT(list_empty(&lp->lp_routes));
 207
 208                if (lp->lp_rcd) {
 209                        list_add(&lp->lp_rcd->rcd_list,
 210                                 &the_lnet.ln_rcd_deathrow);
 211                        lp->lp_rcd = NULL;
 212                }
 213
 214                list_del(&lp->lp_rtr_list);
 215                /* decref for the_lnet.ln_routers */
 216                lnet_peer_decref_locked(lp);
 217                the_lnet.ln_routers_version++;
 218        }
 219}
 220
 221struct lnet_remotenet *
 222lnet_find_net_locked(__u32 net)
 223{
 224        struct lnet_remotenet *rnet;
 225        struct list_head *tmp;
 226        struct list_head *rn_list;
 227
 228        LASSERT(!the_lnet.ln_shutdown);
 229
 230        rn_list = lnet_net2rnethash(net);
 231        list_for_each(tmp, rn_list) {
 232                rnet = list_entry(tmp, struct lnet_remotenet, lrn_list);
 233
 234                if (rnet->lrn_net == net)
 235                        return rnet;
 236        }
 237        return NULL;
 238}
 239
 240static void lnet_shuffle_seed(void)
 241{
 242        static int seeded;
 243        __u32 lnd_type, seed[2];
 244        struct timespec64 ts;
 245        struct lnet_ni *ni;
 246        struct list_head *tmp;
 247
 248        if (seeded)
 249                return;
 250
 251        cfs_get_random_bytes(seed, sizeof(seed));
 252
 253        /*
 254         * Nodes with small feet have little entropy
 255         * the NID for this node gives the most entropy in the low bits
 256         */
 257        list_for_each(tmp, &the_lnet.ln_nis) {
 258                ni = list_entry(tmp, struct lnet_ni, ni_list);
 259                lnd_type = LNET_NETTYP(LNET_NIDNET(ni->ni_nid));
 260
 261                if (lnd_type != LOLND)
 262                        seed[0] ^= (LNET_NIDADDR(ni->ni_nid) | lnd_type);
 263        }
 264
 265        ktime_get_ts64(&ts);
 266        cfs_srand(ts.tv_sec ^ seed[0], ts.tv_nsec ^ seed[1]);
 267        seeded = 1;
 268}
 269
 270/* NB expects LNET_LOCK held */
 271static void
 272lnet_add_route_to_rnet(struct lnet_remotenet *rnet, struct lnet_route *route)
 273{
 274        unsigned int len = 0;
 275        unsigned int offset = 0;
 276        struct list_head *e;
 277
 278        lnet_shuffle_seed();
 279
 280        list_for_each(e, &rnet->lrn_routes) {
 281                len++;
 282        }
 283
 284        /* len+1 positions to add a new entry, also prevents division by 0 */
 285        offset = cfs_rand() % (len + 1);
 286        list_for_each(e, &rnet->lrn_routes) {
 287                if (!offset)
 288                        break;
 289                offset--;
 290        }
 291        list_add(&route->lr_list, e);
 292        list_add(&route->lr_gwlist, &route->lr_gateway->lp_routes);
 293
 294        the_lnet.ln_remote_nets_version++;
 295        lnet_rtr_addref_locked(route->lr_gateway);
 296}
 297
 298int
 299lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
 300               unsigned int priority)
 301{
 302        struct list_head *e;
 303        struct lnet_remotenet *rnet;
 304        struct lnet_remotenet *rnet2;
 305        struct lnet_route *route;
 306        struct lnet_ni *ni;
 307        int add_route;
 308        int rc;
 309
 310        CDEBUG(D_NET, "Add route: net %s hops %d priority %u gw %s\n",
 311               libcfs_net2str(net), hops, priority, libcfs_nid2str(gateway));
 312
 313        if (gateway == LNET_NID_ANY ||
 314            LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
 315            net == LNET_NIDNET(LNET_NID_ANY) ||
 316            LNET_NETTYP(net) == LOLND ||
 317            LNET_NIDNET(gateway) == net ||
 318            (hops != LNET_UNDEFINED_HOPS && (hops < 1 || hops > 255)))
 319                return -EINVAL;
 320
 321        if (lnet_islocalnet(net))              /* it's a local network */
 322                return -EEXIST;
 323
 324        /* Assume net, route, all new */
 325        LIBCFS_ALLOC(route, sizeof(*route));
 326        LIBCFS_ALLOC(rnet, sizeof(*rnet));
 327        if (!route || !rnet) {
 328                CERROR("Out of memory creating route %s %d %s\n",
 329                       libcfs_net2str(net), hops, libcfs_nid2str(gateway));
 330                if (route)
 331                        LIBCFS_FREE(route, sizeof(*route));
 332                if (rnet)
 333                        LIBCFS_FREE(rnet, sizeof(*rnet));
 334                return -ENOMEM;
 335        }
 336
 337        INIT_LIST_HEAD(&rnet->lrn_routes);
 338        rnet->lrn_net = net;
 339        route->lr_hops = hops;
 340        route->lr_net = net;
 341        route->lr_priority = priority;
 342
 343        lnet_net_lock(LNET_LOCK_EX);
 344
 345        rc = lnet_nid2peer_locked(&route->lr_gateway, gateway, LNET_LOCK_EX);
 346        if (rc) {
 347                lnet_net_unlock(LNET_LOCK_EX);
 348
 349                LIBCFS_FREE(route, sizeof(*route));
 350                LIBCFS_FREE(rnet, sizeof(*rnet));
 351
 352                if (rc == -EHOSTUNREACH) /* gateway is not on a local net */
 353                        return rc;      /* ignore the route entry */
 354                CERROR("Error %d creating route %s %d %s\n", rc,
 355                       libcfs_net2str(net), hops,
 356                       libcfs_nid2str(gateway));
 357                return rc;
 358        }
 359
 360        LASSERT(!the_lnet.ln_shutdown);
 361
 362        rnet2 = lnet_find_net_locked(net);
 363        if (!rnet2) {
 364                /* new network */
 365                list_add_tail(&rnet->lrn_list, lnet_net2rnethash(net));
 366                rnet2 = rnet;
 367        }
 368
 369        /* Search for a duplicate route (it's a NOOP if it is) */
 370        add_route = 1;
 371        list_for_each(e, &rnet2->lrn_routes) {
 372                struct lnet_route *route2;
 373
 374                route2 = list_entry(e, struct lnet_route, lr_list);
 375                if (route2->lr_gateway == route->lr_gateway) {
 376                        add_route = 0;
 377                        break;
 378                }
 379
 380                /* our lookups must be true */
 381                LASSERT(route2->lr_gateway->lp_nid != gateway);
 382        }
 383
 384        if (add_route) {
 385                lnet_peer_addref_locked(route->lr_gateway); /* +1 for notify */
 386                lnet_add_route_to_rnet(rnet2, route);
 387
 388                ni = route->lr_gateway->lp_ni;
 389                lnet_net_unlock(LNET_LOCK_EX);
 390
 391                /* XXX Assume alive */
 392                if (ni->ni_lnd->lnd_notify)
 393                        ni->ni_lnd->lnd_notify(ni, gateway, 1);
 394
 395                lnet_net_lock(LNET_LOCK_EX);
 396        }
 397
 398        /* -1 for notify or !add_route */
 399        lnet_peer_decref_locked(route->lr_gateway);
 400        lnet_net_unlock(LNET_LOCK_EX);
 401        rc = 0;
 402
 403        if (!add_route) {
 404                rc = -EEXIST;
 405                LIBCFS_FREE(route, sizeof(*route));
 406        }
 407
 408        if (rnet != rnet2)
 409                LIBCFS_FREE(rnet, sizeof(*rnet));
 410
 411        /* indicate to startup the router checker if configured */
 412        wake_up(&the_lnet.ln_rc_waitq);
 413
 414        return rc;
 415}
 416
 417int
 418lnet_check_routes(void)
 419{
 420        struct lnet_remotenet *rnet;
 421        struct lnet_route *route;
 422        struct lnet_route *route2;
 423        struct list_head *e1;
 424        struct list_head *e2;
 425        int cpt;
 426        struct list_head *rn_list;
 427        int i;
 428
 429        cpt = lnet_net_lock_current();
 430
 431        for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
 432                rn_list = &the_lnet.ln_remote_nets_hash[i];
 433                list_for_each(e1, rn_list) {
 434                        rnet = list_entry(e1, struct lnet_remotenet, lrn_list);
 435
 436                        route2 = NULL;
 437                        list_for_each(e2, &rnet->lrn_routes) {
 438                                lnet_nid_t nid1;
 439                                lnet_nid_t nid2;
 440                                int net;
 441
 442                                route = list_entry(e2, struct lnet_route, lr_list);
 443
 444                                if (!route2) {
 445                                        route2 = route;
 446                                        continue;
 447                                }
 448
 449                                if (route->lr_gateway->lp_ni ==
 450                                    route2->lr_gateway->lp_ni)
 451                                        continue;
 452
 453                                nid1 = route->lr_gateway->lp_nid;
 454                                nid2 = route2->lr_gateway->lp_nid;
 455                                net = rnet->lrn_net;
 456
 457                                lnet_net_unlock(cpt);
 458
 459                                CERROR("Routes to %s via %s and %s not supported\n",
 460                                       libcfs_net2str(net),
 461                                       libcfs_nid2str(nid1),
 462                                       libcfs_nid2str(nid2));
 463                                return -EINVAL;
 464                        }
 465                }
 466        }
 467
 468        lnet_net_unlock(cpt);
 469        return 0;
 470}
 471
 472int
 473lnet_del_route(__u32 net, lnet_nid_t gw_nid)
 474{
 475        struct lnet_peer *gateway;
 476        struct lnet_remotenet *rnet;
 477        struct lnet_route *route;
 478        struct list_head *e1;
 479        struct list_head *e2;
 480        int rc = -ENOENT;
 481        struct list_head *rn_list;
 482        int idx = 0;
 483
 484        CDEBUG(D_NET, "Del route: net %s : gw %s\n",
 485               libcfs_net2str(net), libcfs_nid2str(gw_nid));
 486
 487        /*
 488         * NB Caller may specify either all routes via the given gateway
 489         * or a specific route entry actual NIDs)
 490         */
 491        lnet_net_lock(LNET_LOCK_EX);
 492        if (net == LNET_NIDNET(LNET_NID_ANY))
 493                rn_list = &the_lnet.ln_remote_nets_hash[0];
 494        else
 495                rn_list = lnet_net2rnethash(net);
 496
 497 again:
 498        list_for_each(e1, rn_list) {
 499                rnet = list_entry(e1, struct lnet_remotenet, lrn_list);
 500
 501                if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
 502                      net == rnet->lrn_net))
 503                        continue;
 504
 505                list_for_each(e2, &rnet->lrn_routes) {
 506                        route = list_entry(e2, struct lnet_route, lr_list);
 507
 508                        gateway = route->lr_gateway;
 509                        if (!(gw_nid == LNET_NID_ANY ||
 510                              gw_nid == gateway->lp_nid))
 511                                continue;
 512
 513                        list_del(&route->lr_list);
 514                        list_del(&route->lr_gwlist);
 515                        the_lnet.ln_remote_nets_version++;
 516
 517                        if (list_empty(&rnet->lrn_routes))
 518                                list_del(&rnet->lrn_list);
 519                        else
 520                                rnet = NULL;
 521
 522                        lnet_rtr_decref_locked(gateway);
 523                        lnet_peer_decref_locked(gateway);
 524
 525                        lnet_net_unlock(LNET_LOCK_EX);
 526
 527                        LIBCFS_FREE(route, sizeof(*route));
 528
 529                        if (rnet)
 530                                LIBCFS_FREE(rnet, sizeof(*rnet));
 531
 532                        rc = 0;
 533                        lnet_net_lock(LNET_LOCK_EX);
 534                        goto again;
 535                }
 536        }
 537
 538        if (net == LNET_NIDNET(LNET_NID_ANY) &&
 539            ++idx < LNET_REMOTE_NETS_HASH_SIZE) {
 540                rn_list = &the_lnet.ln_remote_nets_hash[idx];
 541                goto again;
 542        }
 543        lnet_net_unlock(LNET_LOCK_EX);
 544
 545        return rc;
 546}
 547
 548void
 549lnet_destroy_routes(void)
 550{
 551        lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
 552}
 553
 554int lnet_get_rtr_pool_cfg(int idx, struct lnet_ioctl_pool_cfg *pool_cfg)
 555{
 556        int i, rc = -ENOENT, j;
 557
 558        if (!the_lnet.ln_rtrpools)
 559                return rc;
 560
 561        for (i = 0; i < LNET_NRBPOOLS; i++) {
 562                struct lnet_rtrbufpool *rbp;
 563
 564                lnet_net_lock(LNET_LOCK_EX);
 565                cfs_percpt_for_each(rbp, j, the_lnet.ln_rtrpools) {
 566                        if (i++ != idx)
 567                                continue;
 568
 569                        pool_cfg->pl_pools[i].pl_npages = rbp[i].rbp_npages;
 570                        pool_cfg->pl_pools[i].pl_nbuffers = rbp[i].rbp_nbuffers;
 571                        pool_cfg->pl_pools[i].pl_credits = rbp[i].rbp_credits;
 572                        pool_cfg->pl_pools[i].pl_mincredits = rbp[i].rbp_mincredits;
 573                        rc = 0;
 574                        break;
 575                }
 576                lnet_net_unlock(LNET_LOCK_EX);
 577        }
 578
 579        lnet_net_lock(LNET_LOCK_EX);
 580        pool_cfg->pl_routing = the_lnet.ln_routing;
 581        lnet_net_unlock(LNET_LOCK_EX);
 582
 583        return rc;
 584}
 585
 586int
 587lnet_get_route(int idx, __u32 *net, __u32 *hops,
 588               lnet_nid_t *gateway, __u32 *alive, __u32 *priority)
 589{
 590        struct list_head *e1;
 591        struct list_head *e2;
 592        struct lnet_remotenet *rnet;
 593        struct lnet_route *route;
 594        int cpt;
 595        int i;
 596        struct list_head *rn_list;
 597
 598        cpt = lnet_net_lock_current();
 599
 600        for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
 601                rn_list = &the_lnet.ln_remote_nets_hash[i];
 602                list_for_each(e1, rn_list) {
 603                        rnet = list_entry(e1, struct lnet_remotenet, lrn_list);
 604
 605                        list_for_each(e2, &rnet->lrn_routes) {
 606                                route = list_entry(e2, struct lnet_route,
 607                                                   lr_list);
 608
 609                                if (!idx--) {
 610                                        *net      = rnet->lrn_net;
 611                                        *hops     = route->lr_hops;
 612                                        *priority = route->lr_priority;
 613                                        *gateway  = route->lr_gateway->lp_nid;
 614                                        *alive = lnet_is_route_alive(route);
 615                                        lnet_net_unlock(cpt);
 616                                        return 0;
 617                                }
 618                        }
 619                }
 620        }
 621
 622        lnet_net_unlock(cpt);
 623        return -ENOENT;
 624}
 625
 626void
 627lnet_swap_pinginfo(struct lnet_ping_info *info)
 628{
 629        int i;
 630        struct lnet_ni_status *stat;
 631
 632        __swab32s(&info->pi_magic);
 633        __swab32s(&info->pi_features);
 634        __swab32s(&info->pi_pid);
 635        __swab32s(&info->pi_nnis);
 636        for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
 637                stat = &info->pi_ni[i];
 638                __swab64s(&stat->ns_nid);
 639                __swab32s(&stat->ns_status);
 640        }
 641}
 642
 643/**
 644 * parse router-checker pinginfo, record number of down NIs for remote
 645 * networks on that router.
 646 */
 647static void
 648lnet_parse_rc_info(struct lnet_rc_data *rcd)
 649{
 650        struct lnet_ping_info *info = rcd->rcd_pinginfo;
 651        struct lnet_peer *gw = rcd->rcd_gateway;
 652        struct lnet_route *rte;
 653
 654        if (!gw->lp_alive)
 655                return;
 656
 657        if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
 658                lnet_swap_pinginfo(info);
 659
 660        /* NB always racing with network! */
 661        if (info->pi_magic != LNET_PROTO_PING_MAGIC) {
 662                CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
 663                       libcfs_nid2str(gw->lp_nid), info->pi_magic);
 664                gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
 665                return;
 666        }
 667
 668        gw->lp_ping_feats = info->pi_features;
 669        if (!(gw->lp_ping_feats & LNET_PING_FEAT_MASK)) {
 670                CDEBUG(D_NET, "%s: Unexpected features 0x%x\n",
 671                       libcfs_nid2str(gw->lp_nid), gw->lp_ping_feats);
 672                return; /* nothing I can understand */
 673        }
 674
 675        if (!(gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS))
 676                return; /* can't carry NI status info */
 677
 678        list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) {
 679                int down = 0;
 680                int up = 0;
 681                int i;
 682
 683                if (gw->lp_ping_feats & LNET_PING_FEAT_RTE_DISABLED) {
 684                        rte->lr_downis = 1;
 685                        continue;
 686                }
 687
 688                for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
 689                        struct lnet_ni_status *stat = &info->pi_ni[i];
 690                        lnet_nid_t nid = stat->ns_nid;
 691
 692                        if (nid == LNET_NID_ANY) {
 693                                CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
 694                                       libcfs_nid2str(gw->lp_nid));
 695                                gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
 696                                return;
 697                        }
 698
 699                        if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
 700                                continue;
 701
 702                        if (stat->ns_status == LNET_NI_STATUS_DOWN) {
 703                                down++;
 704                                continue;
 705                        }
 706
 707                        if (stat->ns_status == LNET_NI_STATUS_UP) {
 708                                if (LNET_NIDNET(nid) == rte->lr_net) {
 709                                        up = 1;
 710                                        break;
 711                                }
 712                                continue;
 713                        }
 714
 715                        CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
 716                               libcfs_nid2str(gw->lp_nid), stat->ns_status);
 717                        gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
 718                        return;
 719                }
 720
 721                if (up) { /* ignore downed NIs if NI for dest network is up */
 722                        rte->lr_downis = 0;
 723                        continue;
 724                }
 725                /**
 726                 * if @down is zero and this route is single-hop, it means
 727                 * we can't find NI for target network
 728                 */
 729                if (!down && rte->lr_hops == 1)
 730                        down = 1;
 731
 732                rte->lr_downis = down;
 733        }
 734}
 735
 736static void
 737lnet_router_checker_event(struct lnet_event *event)
 738{
 739        struct lnet_rc_data *rcd = event->md.user_ptr;
 740        struct lnet_peer *lp;
 741
 742        LASSERT(rcd);
 743
 744        if (event->unlinked) {
 745                LNetInvalidateMDHandle(&rcd->rcd_mdh);
 746                return;
 747        }
 748
 749        LASSERT(event->type == LNET_EVENT_SEND ||
 750                event->type == LNET_EVENT_REPLY);
 751
 752        lp = rcd->rcd_gateway;
 753        LASSERT(lp);
 754
 755        /*
 756         * NB: it's called with holding lnet_res_lock, we have a few
 757         * places need to hold both locks at the same time, please take
 758         * care of lock ordering
 759         */
 760        lnet_net_lock(lp->lp_cpt);
 761        if (!lnet_isrouter(lp) || lp->lp_rcd != rcd) {
 762                /* ignore if no longer a router or rcd is replaced */
 763                goto out;
 764        }
 765
 766        if (event->type == LNET_EVENT_SEND) {
 767                lp->lp_ping_notsent = 0;
 768                if (!event->status)
 769                        goto out;
 770        }
 771
 772        /* LNET_EVENT_REPLY */
 773        /*
 774         * A successful REPLY means the router is up.  If _any_ comms
 775         * to the router fail I assume it's down (this will happen if
 776         * we ping alive routers to try to detect router death before
 777         * apps get burned).
 778         */
 779        lnet_notify_locked(lp, 1, !event->status, cfs_time_current());
 780
 781        /*
 782         * The router checker will wake up very shortly and do the
 783         * actual notification.
 784         * XXX If 'lp' stops being a router before then, it will still
 785         * have the notification pending!!!
 786         */
 787        if (avoid_asym_router_failure && !event->status)
 788                lnet_parse_rc_info(rcd);
 789
 790 out:
 791        lnet_net_unlock(lp->lp_cpt);
 792}
 793
 794static void
 795lnet_wait_known_routerstate(void)
 796{
 797        struct lnet_peer *rtr;
 798        struct list_head *entry;
 799        int all_known;
 800
 801        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
 802
 803        for (;;) {
 804                int cpt = lnet_net_lock_current();
 805
 806                all_known = 1;
 807                list_for_each(entry, &the_lnet.ln_routers) {
 808                        rtr = list_entry(entry, struct lnet_peer, lp_rtr_list);
 809
 810                        if (!rtr->lp_alive_count) {
 811                                all_known = 0;
 812                                break;
 813                        }
 814                }
 815
 816                lnet_net_unlock(cpt);
 817
 818                if (all_known)
 819                        return;
 820
 821                set_current_state(TASK_UNINTERRUPTIBLE);
 822                schedule_timeout(cfs_time_seconds(1));
 823        }
 824}
 825
 826void
 827lnet_router_ni_update_locked(struct lnet_peer *gw, __u32 net)
 828{
 829        struct lnet_route *rte;
 830
 831        if ((gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS)) {
 832                list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) {
 833                        if (rte->lr_net == net) {
 834                                rte->lr_downis = 0;
 835                                break;
 836                        }
 837                }
 838        }
 839}
 840
 841static void
 842lnet_update_ni_status_locked(void)
 843{
 844        struct lnet_ni *ni;
 845        time64_t now;
 846        int timeout;
 847
 848        LASSERT(the_lnet.ln_routing);
 849
 850        timeout = router_ping_timeout +
 851                  max(live_router_check_interval, dead_router_check_interval);
 852
 853        now = ktime_get_real_seconds();
 854        list_for_each_entry(ni, &the_lnet.ln_nis, ni_list) {
 855                if (ni->ni_lnd->lnd_type == LOLND)
 856                        continue;
 857
 858                if (now < ni->ni_last_alive + timeout)
 859                        continue;
 860
 861                lnet_ni_lock(ni);
 862                /* re-check with lock */
 863                if (now < ni->ni_last_alive + timeout) {
 864                        lnet_ni_unlock(ni);
 865                        continue;
 866                }
 867
 868                LASSERT(ni->ni_status);
 869
 870                if (ni->ni_status->ns_status != LNET_NI_STATUS_DOWN) {
 871                        CDEBUG(D_NET, "NI(%s:%d) status changed to down\n",
 872                               libcfs_nid2str(ni->ni_nid), timeout);
 873                        /*
 874                         * NB: so far, this is the only place to set
 875                         * NI status to "down"
 876                         */
 877                        ni->ni_status->ns_status = LNET_NI_STATUS_DOWN;
 878                }
 879                lnet_ni_unlock(ni);
 880        }
 881}
 882
 883static void
 884lnet_destroy_rc_data(struct lnet_rc_data *rcd)
 885{
 886        LASSERT(list_empty(&rcd->rcd_list));
 887        /* detached from network */
 888        LASSERT(LNetMDHandleIsInvalid(rcd->rcd_mdh));
 889
 890        if (rcd->rcd_gateway) {
 891                int cpt = rcd->rcd_gateway->lp_cpt;
 892
 893                lnet_net_lock(cpt);
 894                lnet_peer_decref_locked(rcd->rcd_gateway);
 895                lnet_net_unlock(cpt);
 896        }
 897
 898        if (rcd->rcd_pinginfo)
 899                LIBCFS_FREE(rcd->rcd_pinginfo, LNET_PINGINFO_SIZE);
 900
 901        LIBCFS_FREE(rcd, sizeof(*rcd));
 902}
 903
 904static struct lnet_rc_data *
 905lnet_create_rc_data_locked(struct lnet_peer *gateway)
 906{
 907        struct lnet_rc_data *rcd = NULL;
 908        struct lnet_ping_info *pi;
 909        struct lnet_md md;
 910        int rc;
 911        int i;
 912
 913        lnet_net_unlock(gateway->lp_cpt);
 914
 915        LIBCFS_ALLOC(rcd, sizeof(*rcd));
 916        if (!rcd)
 917                goto out;
 918
 919        LNetInvalidateMDHandle(&rcd->rcd_mdh);
 920        INIT_LIST_HEAD(&rcd->rcd_list);
 921
 922        LIBCFS_ALLOC(pi, LNET_PINGINFO_SIZE);
 923        if (!pi)
 924                goto out;
 925
 926        for (i = 0; i < LNET_MAX_RTR_NIS; i++) {
 927                pi->pi_ni[i].ns_nid = LNET_NID_ANY;
 928                pi->pi_ni[i].ns_status = LNET_NI_STATUS_INVALID;
 929        }
 930        rcd->rcd_pinginfo = pi;
 931
 932        md.start = pi;
 933        md.user_ptr = rcd;
 934        md.length = LNET_PINGINFO_SIZE;
 935        md.threshold = LNET_MD_THRESH_INF;
 936        md.options = LNET_MD_TRUNCATE;
 937        md.eq_handle = the_lnet.ln_rc_eqh;
 938
 939        LASSERT(!LNetEQHandleIsInvalid(the_lnet.ln_rc_eqh));
 940        rc = LNetMDBind(md, LNET_UNLINK, &rcd->rcd_mdh);
 941        if (rc < 0) {
 942                CERROR("Can't bind MD: %d\n", rc);
 943                goto out;
 944        }
 945        LASSERT(!rc);
 946
 947        lnet_net_lock(gateway->lp_cpt);
 948        /* router table changed or someone has created rcd for this gateway */
 949        if (!lnet_isrouter(gateway) || gateway->lp_rcd) {
 950                lnet_net_unlock(gateway->lp_cpt);
 951                goto out;
 952        }
 953
 954        lnet_peer_addref_locked(gateway);
 955        rcd->rcd_gateway = gateway;
 956        gateway->lp_rcd = rcd;
 957        gateway->lp_ping_notsent = 0;
 958
 959        return rcd;
 960
 961 out:
 962        if (rcd) {
 963                if (!LNetMDHandleIsInvalid(rcd->rcd_mdh)) {
 964                        rc = LNetMDUnlink(rcd->rcd_mdh);
 965                        LASSERT(!rc);
 966                }
 967                lnet_destroy_rc_data(rcd);
 968        }
 969
 970        lnet_net_lock(gateway->lp_cpt);
 971        return gateway->lp_rcd;
 972}
 973
 974static int
 975lnet_router_check_interval(struct lnet_peer *rtr)
 976{
 977        int secs;
 978
 979        secs = rtr->lp_alive ? live_router_check_interval :
 980                               dead_router_check_interval;
 981        if (secs < 0)
 982                secs = 0;
 983
 984        return secs;
 985}
 986
 987static void
 988lnet_ping_router_locked(struct lnet_peer *rtr)
 989{
 990        struct lnet_rc_data *rcd = NULL;
 991        unsigned long now = cfs_time_current();
 992        int secs;
 993
 994        lnet_peer_addref_locked(rtr);
 995
 996        if (rtr->lp_ping_deadline && /* ping timed out? */
 997            cfs_time_after(now, rtr->lp_ping_deadline))
 998                lnet_notify_locked(rtr, 1, 0, now);
 999
1000        /* Run any outstanding notifications */
1001        lnet_ni_notify_locked(rtr->lp_ni, rtr);
1002
1003        if (!lnet_isrouter(rtr) ||
1004            the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
1005                /* router table changed or router checker is shutting down */
1006                lnet_peer_decref_locked(rtr);
1007                return;
1008        }
1009
1010        rcd = rtr->lp_rcd ?
1011              rtr->lp_rcd : lnet_create_rc_data_locked(rtr);
1012
1013        if (!rcd)
1014                return;
1015
1016        secs = lnet_router_check_interval(rtr);
1017
1018        CDEBUG(D_NET,
1019               "rtr %s %d: deadline %lu ping_notsent %d alive %d alive_count %d lp_ping_timestamp %lu\n",
1020               libcfs_nid2str(rtr->lp_nid), secs,
1021               rtr->lp_ping_deadline, rtr->lp_ping_notsent,
1022               rtr->lp_alive, rtr->lp_alive_count, rtr->lp_ping_timestamp);
1023
1024        if (secs && !rtr->lp_ping_notsent &&
1025            cfs_time_after(now, cfs_time_add(rtr->lp_ping_timestamp,
1026                                             cfs_time_seconds(secs)))) {
1027                int rc;
1028                struct lnet_process_id id;
1029                struct lnet_handle_md mdh;
1030
1031                id.nid = rtr->lp_nid;
1032                id.pid = LNET_PID_LUSTRE;
1033                CDEBUG(D_NET, "Check: %s\n", libcfs_id2str(id));
1034
1035                rtr->lp_ping_notsent   = 1;
1036                rtr->lp_ping_timestamp = now;
1037
1038                mdh = rcd->rcd_mdh;
1039
1040                if (!rtr->lp_ping_deadline) {
1041                        rtr->lp_ping_deadline =
1042                                cfs_time_shift(router_ping_timeout);
1043                }
1044
1045                lnet_net_unlock(rtr->lp_cpt);
1046
1047                rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
1048                             LNET_PROTO_PING_MATCHBITS, 0);
1049
1050                lnet_net_lock(rtr->lp_cpt);
1051                if (rc)
1052                        rtr->lp_ping_notsent = 0; /* no event pending */
1053        }
1054
1055        lnet_peer_decref_locked(rtr);
1056}
1057
1058int
1059lnet_router_checker_start(void)
1060{
1061        struct task_struct *task;
1062        int rc;
1063        int eqsz = 0;
1064
1065        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1066
1067        if (check_routers_before_use &&
1068            dead_router_check_interval <= 0) {
1069                LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be set if 'check_routers_before_use' is set\n");
1070                return -EINVAL;
1071        }
1072
1073        init_completion(&the_lnet.ln_rc_signal);
1074
1075        rc = LNetEQAlloc(0, lnet_router_checker_event, &the_lnet.ln_rc_eqh);
1076        if (rc) {
1077                CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
1078                return -ENOMEM;
1079        }
1080
1081        the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
1082        task = kthread_run(lnet_router_checker, NULL, "router_checker");
1083        if (IS_ERR(task)) {
1084                rc = PTR_ERR(task);
1085                CERROR("Can't start router checker thread: %d\n", rc);
1086                /* block until event callback signals exit */
1087                wait_for_completion(&the_lnet.ln_rc_signal);
1088                rc = LNetEQFree(the_lnet.ln_rc_eqh);
1089                LASSERT(!rc);
1090                the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1091                return -ENOMEM;
1092        }
1093
1094        if (check_routers_before_use) {
1095                /*
1096                 * Note that a helpful side-effect of pinging all known routers
1097                 * at startup is that it makes them drop stale connections they
1098                 * may have to a previous instance of me.
1099                 */
1100                lnet_wait_known_routerstate();
1101        }
1102
1103        return 0;
1104}
1105
1106void
1107lnet_router_checker_stop(void)
1108{
1109        int rc;
1110
1111        if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
1112                return;
1113
1114        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1115        the_lnet.ln_rc_state = LNET_RC_STATE_STOPPING;
1116        /* wakeup the RC thread if it's sleeping */
1117        wake_up(&the_lnet.ln_rc_waitq);
1118
1119        /* block until event callback signals exit */
1120        wait_for_completion(&the_lnet.ln_rc_signal);
1121        LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1122
1123        rc = LNetEQFree(the_lnet.ln_rc_eqh);
1124        LASSERT(!rc);
1125}
1126
1127static void
1128lnet_prune_rc_data(int wait_unlink)
1129{
1130        struct lnet_rc_data *rcd;
1131        struct lnet_rc_data *tmp;
1132        struct lnet_peer *lp;
1133        struct list_head head;
1134        int i = 2;
1135
1136        if (likely(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
1137                   list_empty(&the_lnet.ln_rcd_deathrow) &&
1138                   list_empty(&the_lnet.ln_rcd_zombie)))
1139                return;
1140
1141        INIT_LIST_HEAD(&head);
1142
1143        lnet_net_lock(LNET_LOCK_EX);
1144
1145        if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
1146                /* router checker is stopping, prune all */
1147                list_for_each_entry(lp, &the_lnet.ln_routers,
1148                                    lp_rtr_list) {
1149                        if (!lp->lp_rcd)
1150                                continue;
1151
1152                        LASSERT(list_empty(&lp->lp_rcd->rcd_list));
1153                        list_add(&lp->lp_rcd->rcd_list,
1154                                 &the_lnet.ln_rcd_deathrow);
1155                        lp->lp_rcd = NULL;
1156                }
1157        }
1158
1159        /* unlink all RCDs on deathrow list */
1160        list_splice_init(&the_lnet.ln_rcd_deathrow, &head);
1161
1162        if (!list_empty(&head)) {
1163                lnet_net_unlock(LNET_LOCK_EX);
1164
1165                list_for_each_entry(rcd, &head, rcd_list)
1166                        LNetMDUnlink(rcd->rcd_mdh);
1167
1168                lnet_net_lock(LNET_LOCK_EX);
1169        }
1170
1171        list_splice_init(&head, &the_lnet.ln_rcd_zombie);
1172
1173        /* release all zombie RCDs */
1174        while (!list_empty(&the_lnet.ln_rcd_zombie)) {
1175                list_for_each_entry_safe(rcd, tmp, &the_lnet.ln_rcd_zombie,
1176                                         rcd_list) {
1177                        if (LNetMDHandleIsInvalid(rcd->rcd_mdh))
1178                                list_move(&rcd->rcd_list, &head);
1179                }
1180
1181                wait_unlink = wait_unlink &&
1182                              !list_empty(&the_lnet.ln_rcd_zombie);
1183
1184                lnet_net_unlock(LNET_LOCK_EX);
1185
1186                while (!list_empty(&head)) {
1187                        rcd = list_entry(head.next,
1188                                         struct lnet_rc_data, rcd_list);
1189                        list_del_init(&rcd->rcd_list);
1190                        lnet_destroy_rc_data(rcd);
1191                }
1192
1193                if (!wait_unlink)
1194                        return;
1195
1196                i++;
1197                CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
1198                       "Waiting for rc buffers to unlink\n");
1199                set_current_state(TASK_UNINTERRUPTIBLE);
1200                schedule_timeout(cfs_time_seconds(1) / 4);
1201
1202                lnet_net_lock(LNET_LOCK_EX);
1203        }
1204
1205        lnet_net_unlock(LNET_LOCK_EX);
1206}
1207
1208/*
1209 * This function is called to check if the RC should block indefinitely.
1210 * It's called from lnet_router_checker() as well as being passed to
1211 * wait_event_interruptible() to avoid the lost wake_up problem.
1212 *
1213 * When it's called from wait_event_interruptible() it is necessary to
1214 * also not sleep if the rc state is not running to avoid a deadlock
1215 * when the system is shutting down
1216 */
1217static inline bool
1218lnet_router_checker_active(void)
1219{
1220        if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING)
1221                return true;
1222
1223        /*
1224         * Router Checker thread needs to run when routing is enabled in
1225         * order to call lnet_update_ni_status_locked()
1226         */
1227        if (the_lnet.ln_routing)
1228                return true;
1229
1230        return !list_empty(&the_lnet.ln_routers) &&
1231                (live_router_check_interval > 0 ||
1232                 dead_router_check_interval > 0);
1233}
1234
1235static int
1236lnet_router_checker(void *arg)
1237{
1238        struct lnet_peer *rtr;
1239        struct list_head *entry;
1240
1241        cfs_block_allsigs();
1242
1243        while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
1244                __u64 version;
1245                int cpt;
1246                int cpt2;
1247
1248                cpt = lnet_net_lock_current();
1249rescan:
1250                version = the_lnet.ln_routers_version;
1251
1252                list_for_each(entry, &the_lnet.ln_routers) {
1253                        rtr = list_entry(entry, struct lnet_peer, lp_rtr_list);
1254
1255                        cpt2 = lnet_cpt_of_nid_locked(rtr->lp_nid);
1256                        if (cpt != cpt2) {
1257                                lnet_net_unlock(cpt);
1258                                cpt = cpt2;
1259                                lnet_net_lock(cpt);
1260                                /* the routers list has changed */
1261                                if (version != the_lnet.ln_routers_version)
1262                                        goto rescan;
1263                        }
1264
1265                        lnet_ping_router_locked(rtr);
1266
1267                        /* NB dropped lock */
1268                        if (version != the_lnet.ln_routers_version) {
1269                                /* the routers list has changed */
1270                                goto rescan;
1271                        }
1272                }
1273
1274                if (the_lnet.ln_routing)
1275                        lnet_update_ni_status_locked();
1276
1277                lnet_net_unlock(cpt);
1278
1279                lnet_prune_rc_data(0); /* don't wait for UNLINK */
1280
1281                /*
1282                 * Call schedule_timeout() here always adds 1 to load average
1283                 * because kernel counts # active tasks as nr_running
1284                 * + nr_uninterruptible.
1285                 */
1286                /*
1287                 * if there are any routes then wakeup every second.  If
1288                 * there are no routes then sleep indefinitely until woken
1289                 * up by a user adding a route
1290                 */
1291                if (!lnet_router_checker_active())
1292                        wait_event_interruptible(the_lnet.ln_rc_waitq,
1293                                                 lnet_router_checker_active());
1294                else
1295                        wait_event_interruptible_timeout(the_lnet.ln_rc_waitq,
1296                                                         false,
1297                                                         cfs_time_seconds(1));
1298        }
1299
1300        lnet_prune_rc_data(1); /* wait for UNLINK */
1301
1302        the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1303        complete(&the_lnet.ln_rc_signal);
1304        /* The unlink event callback will signal final completion */
1305        return 0;
1306}
1307
1308void
1309lnet_destroy_rtrbuf(struct lnet_rtrbuf *rb, int npages)
1310{
1311        int sz = offsetof(struct lnet_rtrbuf, rb_kiov[npages]);
1312
1313        while (--npages >= 0)
1314                __free_page(rb->rb_kiov[npages].bv_page);
1315
1316        LIBCFS_FREE(rb, sz);
1317}
1318
1319static struct lnet_rtrbuf *
1320lnet_new_rtrbuf(struct lnet_rtrbufpool *rbp, int cpt)
1321{
1322        int npages = rbp->rbp_npages;
1323        int sz = offsetof(struct lnet_rtrbuf, rb_kiov[npages]);
1324        struct page *page;
1325        struct lnet_rtrbuf *rb;
1326        int i;
1327
1328        LIBCFS_CPT_ALLOC(rb, lnet_cpt_table(), cpt, sz);
1329        if (!rb)
1330                return NULL;
1331
1332        rb->rb_pool = rbp;
1333
1334        for (i = 0; i < npages; i++) {
1335                page = alloc_pages_node(
1336                                cfs_cpt_spread_node(lnet_cpt_table(), cpt),
1337                                GFP_KERNEL | __GFP_ZERO, 0);
1338                if (!page) {
1339                        while (--i >= 0)
1340                                __free_page(rb->rb_kiov[i].bv_page);
1341
1342                        LIBCFS_FREE(rb, sz);
1343                        return NULL;
1344                }
1345
1346                rb->rb_kiov[i].bv_len = PAGE_SIZE;
1347                rb->rb_kiov[i].bv_offset = 0;
1348                rb->rb_kiov[i].bv_page = page;
1349        }
1350
1351        return rb;
1352}
1353
1354static void
1355lnet_rtrpool_free_bufs(struct lnet_rtrbufpool *rbp, int cpt)
1356{
1357        int npages = rbp->rbp_npages;
1358        struct list_head tmp;
1359        struct lnet_rtrbuf *rb;
1360        struct lnet_rtrbuf *temp;
1361
1362        if (!rbp->rbp_nbuffers) /* not initialized or already freed */
1363                return;
1364
1365        INIT_LIST_HEAD(&tmp);
1366
1367        lnet_net_lock(cpt);
1368        lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt);
1369        list_splice_init(&rbp->rbp_bufs, &tmp);
1370        rbp->rbp_req_nbuffers = 0;
1371        rbp->rbp_nbuffers = 0;
1372        rbp->rbp_credits = 0;
1373        rbp->rbp_mincredits = 0;
1374        lnet_net_unlock(cpt);
1375
1376        /* Free buffers on the free list. */
1377        list_for_each_entry_safe(rb, temp, &tmp, rb_list) {
1378                list_del(&rb->rb_list);
1379                lnet_destroy_rtrbuf(rb, npages);
1380        }
1381}
1382
1383static int
1384lnet_rtrpool_adjust_bufs(struct lnet_rtrbufpool *rbp, int nbufs, int cpt)
1385{
1386        struct list_head rb_list;
1387        struct lnet_rtrbuf *rb;
1388        int num_rb;
1389        int num_buffers = 0;
1390        int old_req_nbufs;
1391        int npages = rbp->rbp_npages;
1392
1393        lnet_net_lock(cpt);
1394        /*
1395         * If we are called for less buffers than already in the pool, we
1396         * just lower the req_nbuffers number and excess buffers will be
1397         * thrown away as they are returned to the free list.  Credits
1398         * then get adjusted as well.
1399         * If we already have enough buffers allocated to serve the
1400         * increase requested, then we can treat that the same way as we
1401         * do the decrease.
1402         */
1403        num_rb = nbufs - rbp->rbp_nbuffers;
1404        if (nbufs <= rbp->rbp_req_nbuffers || num_rb <= 0) {
1405                rbp->rbp_req_nbuffers = nbufs;
1406                lnet_net_unlock(cpt);
1407                return 0;
1408        }
1409        /*
1410         * store the older value of rbp_req_nbuffers and then set it to
1411         * the new request to prevent lnet_return_rx_credits_locked() from
1412         * freeing buffers that we need to keep around
1413         */
1414        old_req_nbufs = rbp->rbp_req_nbuffers;
1415        rbp->rbp_req_nbuffers = nbufs;
1416        lnet_net_unlock(cpt);
1417
1418        INIT_LIST_HEAD(&rb_list);
1419
1420        /*
1421         * allocate the buffers on a local list first.  If all buffers are
1422         * allocated successfully then join this list to the rbp buffer
1423         * list. If not then free all allocated buffers.
1424         */
1425        while (num_rb-- > 0) {
1426                rb = lnet_new_rtrbuf(rbp, cpt);
1427                if (!rb) {
1428                        CERROR("Failed to allocate %d route bufs of %d pages\n",
1429                               nbufs, npages);
1430
1431                        lnet_net_lock(cpt);
1432                        rbp->rbp_req_nbuffers = old_req_nbufs;
1433                        lnet_net_unlock(cpt);
1434
1435                        goto failed;
1436                }
1437
1438                list_add(&rb->rb_list, &rb_list);
1439                num_buffers++;
1440        }
1441
1442        lnet_net_lock(cpt);
1443
1444        list_splice_tail(&rb_list, &rbp->rbp_bufs);
1445        rbp->rbp_nbuffers += num_buffers;
1446        rbp->rbp_credits += num_buffers;
1447        rbp->rbp_mincredits = rbp->rbp_credits;
1448        /*
1449         * We need to schedule blocked msg using the newly
1450         * added buffers.
1451         */
1452        while (!list_empty(&rbp->rbp_bufs) &&
1453               !list_empty(&rbp->rbp_msgs))
1454                lnet_schedule_blocked_locked(rbp);
1455
1456        lnet_net_unlock(cpt);
1457
1458        return 0;
1459
1460failed:
1461        while (!list_empty(&rb_list)) {
1462                rb = list_entry(rb_list.next, struct lnet_rtrbuf, rb_list);
1463                list_del(&rb->rb_list);
1464                lnet_destroy_rtrbuf(rb, npages);
1465        }
1466
1467        return -ENOMEM;
1468}
1469
1470static void
1471lnet_rtrpool_init(struct lnet_rtrbufpool *rbp, int npages)
1472{
1473        INIT_LIST_HEAD(&rbp->rbp_msgs);
1474        INIT_LIST_HEAD(&rbp->rbp_bufs);
1475
1476        rbp->rbp_npages = npages;
1477        rbp->rbp_credits = 0;
1478        rbp->rbp_mincredits = 0;
1479}
1480
1481void
1482lnet_rtrpools_free(int keep_pools)
1483{
1484        struct lnet_rtrbufpool *rtrp;
1485        int i;
1486
1487        if (!the_lnet.ln_rtrpools) /* uninitialized or freed */
1488                return;
1489
1490        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1491                lnet_rtrpool_free_bufs(&rtrp[LNET_TINY_BUF_IDX], i);
1492                lnet_rtrpool_free_bufs(&rtrp[LNET_SMALL_BUF_IDX], i);
1493                lnet_rtrpool_free_bufs(&rtrp[LNET_LARGE_BUF_IDX], i);
1494        }
1495
1496        if (!keep_pools) {
1497                cfs_percpt_free(the_lnet.ln_rtrpools);
1498                the_lnet.ln_rtrpools = NULL;
1499        }
1500}
1501
1502static int
1503lnet_nrb_tiny_calculate(void)
1504{
1505        int nrbs = LNET_NRB_TINY;
1506
1507        if (tiny_router_buffers < 0) {
1508                LCONSOLE_ERROR_MSG(0x10c,
1509                                   "tiny_router_buffers=%d invalid when routing enabled\n",
1510                                   tiny_router_buffers);
1511                return -EINVAL;
1512        }
1513
1514        if (tiny_router_buffers > 0)
1515                nrbs = tiny_router_buffers;
1516
1517        nrbs /= LNET_CPT_NUMBER;
1518        return max(nrbs, LNET_NRB_TINY_MIN);
1519}
1520
1521static int
1522lnet_nrb_small_calculate(void)
1523{
1524        int nrbs = LNET_NRB_SMALL;
1525
1526        if (small_router_buffers < 0) {
1527                LCONSOLE_ERROR_MSG(0x10c,
1528                                   "small_router_buffers=%d invalid when routing enabled\n",
1529                                   small_router_buffers);
1530                return -EINVAL;
1531        }
1532
1533        if (small_router_buffers > 0)
1534                nrbs = small_router_buffers;
1535
1536        nrbs /= LNET_CPT_NUMBER;
1537        return max(nrbs, LNET_NRB_SMALL_MIN);
1538}
1539
1540static int
1541lnet_nrb_large_calculate(void)
1542{
1543        int nrbs = LNET_NRB_LARGE;
1544
1545        if (large_router_buffers < 0) {
1546                LCONSOLE_ERROR_MSG(0x10c,
1547                                   "large_router_buffers=%d invalid when routing enabled\n",
1548                                   large_router_buffers);
1549                return -EINVAL;
1550        }
1551
1552        if (large_router_buffers > 0)
1553                nrbs = large_router_buffers;
1554
1555        nrbs /= LNET_CPT_NUMBER;
1556        return max(nrbs, LNET_NRB_LARGE_MIN);
1557}
1558
1559int
1560lnet_rtrpools_alloc(int im_a_router)
1561{
1562        struct lnet_rtrbufpool *rtrp;
1563        int nrb_tiny;
1564        int nrb_small;
1565        int nrb_large;
1566        int rc;
1567        int i;
1568
1569        if (!strcmp(forwarding, "")) {
1570                /* not set either way */
1571                if (!im_a_router)
1572                        return 0;
1573        } else if (!strcmp(forwarding, "disabled")) {
1574                /* explicitly disabled */
1575                return 0;
1576        } else if (!strcmp(forwarding, "enabled")) {
1577                /* explicitly enabled */
1578        } else {
1579                LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either 'enabled' or 'disabled'\n");
1580                return -EINVAL;
1581        }
1582
1583        nrb_tiny = lnet_nrb_tiny_calculate();
1584        if (nrb_tiny < 0)
1585                return -EINVAL;
1586
1587        nrb_small = lnet_nrb_small_calculate();
1588        if (nrb_small < 0)
1589                return -EINVAL;
1590
1591        nrb_large = lnet_nrb_large_calculate();
1592        if (nrb_large < 0)
1593                return -EINVAL;
1594
1595        the_lnet.ln_rtrpools = cfs_percpt_alloc(lnet_cpt_table(),
1596                                                LNET_NRBPOOLS *
1597                                                sizeof(struct lnet_rtrbufpool));
1598        if (!the_lnet.ln_rtrpools) {
1599                LCONSOLE_ERROR_MSG(0x10c,
1600                                   "Failed to initialize router buffe pool\n");
1601                return -ENOMEM;
1602        }
1603
1604        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1605                lnet_rtrpool_init(&rtrp[LNET_TINY_BUF_IDX], 0);
1606                rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
1607                                              nrb_tiny, i);
1608                if (rc)
1609                        goto failed;
1610
1611                lnet_rtrpool_init(&rtrp[LNET_SMALL_BUF_IDX],
1612                                  LNET_NRB_SMALL_PAGES);
1613                rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
1614                                              nrb_small, i);
1615                if (rc)
1616                        goto failed;
1617
1618                lnet_rtrpool_init(&rtrp[LNET_LARGE_BUF_IDX],
1619                                  LNET_NRB_LARGE_PAGES);
1620                rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
1621                                              nrb_large, i);
1622                if (rc)
1623                        goto failed;
1624        }
1625
1626        lnet_net_lock(LNET_LOCK_EX);
1627        the_lnet.ln_routing = 1;
1628        lnet_net_unlock(LNET_LOCK_EX);
1629
1630        return 0;
1631
1632 failed:
1633        lnet_rtrpools_free(0);
1634        return rc;
1635}
1636
1637static int
1638lnet_rtrpools_adjust_helper(int tiny, int small, int large)
1639{
1640        int nrb = 0;
1641        int rc = 0;
1642        int i;
1643        struct lnet_rtrbufpool *rtrp;
1644
1645        /*
1646         * If the provided values for each buffer pool are different than the
1647         * configured values, we need to take action.
1648         */
1649        if (tiny >= 0) {
1650                tiny_router_buffers = tiny;
1651                nrb = lnet_nrb_tiny_calculate();
1652                cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1653                        rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
1654                                                      nrb, i);
1655                        if (rc)
1656                                return rc;
1657                }
1658        }
1659        if (small >= 0) {
1660                small_router_buffers = small;
1661                nrb = lnet_nrb_small_calculate();
1662                cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1663                        rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
1664                                                      nrb, i);
1665                        if (rc)
1666                                return rc;
1667                }
1668        }
1669        if (large >= 0) {
1670                large_router_buffers = large;
1671                nrb = lnet_nrb_large_calculate();
1672                cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1673                        rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
1674                                                      nrb, i);
1675                        if (rc)
1676                                return rc;
1677                }
1678        }
1679
1680        return 0;
1681}
1682
1683int
1684lnet_rtrpools_adjust(int tiny, int small, int large)
1685{
1686        /*
1687         * this function doesn't revert the changes if adding new buffers
1688         * failed.  It's up to the user space caller to revert the
1689         * changes.
1690         */
1691        if (!the_lnet.ln_routing)
1692                return 0;
1693
1694        return lnet_rtrpools_adjust_helper(tiny, small, large);
1695}
1696
1697int
1698lnet_rtrpools_enable(void)
1699{
1700        int rc = 0;
1701
1702        if (the_lnet.ln_routing)
1703                return 0;
1704
1705        if (!the_lnet.ln_rtrpools)
1706                /*
1707                 * If routing is turned off, and we have never
1708                 * initialized the pools before, just call the
1709                 * standard buffer pool allocation routine as
1710                 * if we are just configuring this for the first
1711                 * time.
1712                 */
1713                rc = lnet_rtrpools_alloc(1);
1714        else
1715                rc = lnet_rtrpools_adjust_helper(0, 0, 0);
1716        if (rc)
1717                return rc;
1718
1719        lnet_net_lock(LNET_LOCK_EX);
1720        the_lnet.ln_routing = 1;
1721
1722        the_lnet.ln_ping_info->pi_features &= ~LNET_PING_FEAT_RTE_DISABLED;
1723        lnet_net_unlock(LNET_LOCK_EX);
1724
1725        return rc;
1726}
1727
1728void
1729lnet_rtrpools_disable(void)
1730{
1731        if (!the_lnet.ln_routing)
1732                return;
1733
1734        lnet_net_lock(LNET_LOCK_EX);
1735        the_lnet.ln_routing = 0;
1736        the_lnet.ln_ping_info->pi_features |= LNET_PING_FEAT_RTE_DISABLED;
1737
1738        tiny_router_buffers = 0;
1739        small_router_buffers = 0;
1740        large_router_buffers = 0;
1741        lnet_net_unlock(LNET_LOCK_EX);
1742        lnet_rtrpools_free(1);
1743}
1744
1745int
1746lnet_notify(struct lnet_ni *ni, lnet_nid_t nid, int alive, unsigned long when)
1747{
1748        struct lnet_peer *lp = NULL;
1749        unsigned long now = cfs_time_current();
1750        int cpt = lnet_cpt_of_nid(nid);
1751
1752        LASSERT(!in_interrupt());
1753
1754        CDEBUG(D_NET, "%s notifying %s: %s\n",
1755               !ni ? "userspace" : libcfs_nid2str(ni->ni_nid),
1756               libcfs_nid2str(nid),
1757               alive ? "up" : "down");
1758
1759        if (ni &&
1760            LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
1761                CWARN("Ignoring notification of %s %s by %s (different net)\n",
1762                      libcfs_nid2str(nid), alive ? "birth" : "death",
1763                      libcfs_nid2str(ni->ni_nid));
1764                return -EINVAL;
1765        }
1766
1767        /* can't do predictions... */
1768        if (cfs_time_after(when, now)) {
1769                CWARN("Ignoring prediction from %s of %s %s %ld seconds in the future\n",
1770                      !ni ? "userspace" : libcfs_nid2str(ni->ni_nid),
1771                      libcfs_nid2str(nid), alive ? "up" : "down",
1772                      cfs_duration_sec(cfs_time_sub(when, now)));
1773                return -EINVAL;
1774        }
1775
1776        if (ni && !alive &&          /* LND telling me she's down */
1777            !auto_down) {                      /* auto-down disabled */
1778                CDEBUG(D_NET, "Auto-down disabled\n");
1779                return 0;
1780        }
1781
1782        lnet_net_lock(cpt);
1783
1784        if (the_lnet.ln_shutdown) {
1785                lnet_net_unlock(cpt);
1786                return -ESHUTDOWN;
1787        }
1788
1789        lp = lnet_find_peer_locked(the_lnet.ln_peer_tables[cpt], nid);
1790        if (!lp) {
1791                /* nid not found */
1792                lnet_net_unlock(cpt);
1793                CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
1794                return 0;
1795        }
1796
1797        /*
1798         * We can't fully trust LND on reporting exact peer last_alive
1799         * if he notifies us about dead peer. For example ksocklnd can
1800         * call us with when == _time_when_the_node_was_booted_ if
1801         * no connections were successfully established
1802         */
1803        if (ni && !alive && when < lp->lp_last_alive)
1804                when = lp->lp_last_alive;
1805
1806        lnet_notify_locked(lp, !ni, alive, when);
1807
1808        if (ni)
1809                lnet_ni_notify_locked(ni, lp);
1810
1811        lnet_peer_decref_locked(lp);
1812
1813        lnet_net_unlock(cpt);
1814        return 0;
1815}
1816EXPORT_SYMBOL(lnet_notify);
1817