linux/drivers/staging/lustre/lnet/lnet/lib-move.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lnet/lnet/lib-move.c
  37 *
  38 * Data movement routines
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_LNET
  42
  43#include "../../include/linux/lnet/lib-lnet.h"
  44
  45static int local_nid_dist_zero = 1;
  46module_param(local_nid_dist_zero, int, 0444);
  47MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
  48
  49int
  50lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
  51{
  52        lnet_test_peer_t *tp;
  53        struct list_head *el;
  54        struct list_head *next;
  55        struct list_head cull;
  56
  57        LASSERT(the_lnet.ln_init);
  58
  59        /* NB: use lnet_net_lock(0) to serialize operations on test peers */
  60        if (threshold != 0) {
  61                /* Adding a new entry */
  62                LIBCFS_ALLOC(tp, sizeof(*tp));
  63                if (tp == NULL)
  64                        return -ENOMEM;
  65
  66                tp->tp_nid = nid;
  67                tp->tp_threshold = threshold;
  68
  69                lnet_net_lock(0);
  70                list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
  71                lnet_net_unlock(0);
  72                return 0;
  73        }
  74
  75        /* removing entries */
  76        INIT_LIST_HEAD(&cull);
  77
  78        lnet_net_lock(0);
  79
  80        list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
  81                tp = list_entry(el, lnet_test_peer_t, tp_list);
  82
  83                if (tp->tp_threshold == 0 ||    /* needs culling anyway */
  84                    nid == LNET_NID_ANY ||       /* removing all entries */
  85                    tp->tp_nid == nid) {          /* matched this one */
  86                        list_del(&tp->tp_list);
  87                        list_add(&tp->tp_list, &cull);
  88                }
  89        }
  90
  91        lnet_net_unlock(0);
  92
  93        while (!list_empty(&cull)) {
  94                tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
  95
  96                list_del(&tp->tp_list);
  97                LIBCFS_FREE(tp, sizeof(*tp));
  98        }
  99        return 0;
 100}
 101
 102static int
 103fail_peer(lnet_nid_t nid, int outgoing)
 104{
 105        lnet_test_peer_t *tp;
 106        struct list_head *el;
 107        struct list_head *next;
 108        struct list_head cull;
 109        int fail = 0;
 110
 111        INIT_LIST_HEAD(&cull);
 112
 113        /* NB: use lnet_net_lock(0) to serialize operations on test peers */
 114        lnet_net_lock(0);
 115
 116        list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
 117                tp = list_entry(el, lnet_test_peer_t, tp_list);
 118
 119                if (tp->tp_threshold == 0) {
 120                        /* zombie entry */
 121                        if (outgoing) {
 122                                /* only cull zombies on outgoing tests,
 123                                 * since we may be at interrupt priority on
 124                                 * incoming messages. */
 125                                list_del(&tp->tp_list);
 126                                list_add(&tp->tp_list, &cull);
 127                        }
 128                        continue;
 129                }
 130
 131                if (tp->tp_nid == LNET_NID_ANY || /* fail every peer */
 132                    nid == tp->tp_nid) {        /* fail this peer */
 133                        fail = 1;
 134
 135                        if (tp->tp_threshold != LNET_MD_THRESH_INF) {
 136                                tp->tp_threshold--;
 137                                if (outgoing &&
 138                                    tp->tp_threshold == 0) {
 139                                        /* see above */
 140                                        list_del(&tp->tp_list);
 141                                        list_add(&tp->tp_list, &cull);
 142                                }
 143                        }
 144                        break;
 145                }
 146        }
 147
 148        lnet_net_unlock(0);
 149
 150        while (!list_empty(&cull)) {
 151                tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
 152                list_del(&tp->tp_list);
 153
 154                LIBCFS_FREE(tp, sizeof(*tp));
 155        }
 156
 157        return fail;
 158}
 159
 160unsigned int
 161lnet_iov_nob(unsigned int niov, struct kvec *iov)
 162{
 163        unsigned int nob = 0;
 164
 165        while (niov-- > 0)
 166                nob += (iov++)->iov_len;
 167
 168        return nob;
 169}
 170EXPORT_SYMBOL(lnet_iov_nob);
 171
 172void
 173lnet_copy_iov2iov(unsigned int ndiov, struct kvec *diov, unsigned int doffset,
 174                   unsigned int nsiov, struct kvec *siov, unsigned int soffset,
 175                   unsigned int nob)
 176{
 177        /* NB diov, siov are READ-ONLY */
 178        unsigned int this_nob;
 179
 180        if (nob == 0)
 181                return;
 182
 183        /* skip complete frags before 'doffset' */
 184        LASSERT(ndiov > 0);
 185        while (doffset >= diov->iov_len) {
 186                doffset -= diov->iov_len;
 187                diov++;
 188                ndiov--;
 189                LASSERT(ndiov > 0);
 190        }
 191
 192        /* skip complete frags before 'soffset' */
 193        LASSERT(nsiov > 0);
 194        while (soffset >= siov->iov_len) {
 195                soffset -= siov->iov_len;
 196                siov++;
 197                nsiov--;
 198                LASSERT(nsiov > 0);
 199        }
 200
 201        do {
 202                LASSERT(ndiov > 0);
 203                LASSERT(nsiov > 0);
 204                this_nob = min(diov->iov_len - doffset,
 205                               siov->iov_len - soffset);
 206                this_nob = min(this_nob, nob);
 207
 208                memcpy((char *)diov->iov_base + doffset,
 209                        (char *)siov->iov_base + soffset, this_nob);
 210                nob -= this_nob;
 211
 212                if (diov->iov_len > doffset + this_nob) {
 213                        doffset += this_nob;
 214                } else {
 215                        diov++;
 216                        ndiov--;
 217                        doffset = 0;
 218                }
 219
 220                if (siov->iov_len > soffset + this_nob) {
 221                        soffset += this_nob;
 222                } else {
 223                        siov++;
 224                        nsiov--;
 225                        soffset = 0;
 226                }
 227        } while (nob > 0);
 228}
 229EXPORT_SYMBOL(lnet_copy_iov2iov);
 230
 231int
 232lnet_extract_iov(int dst_niov, struct kvec *dst,
 233                  int src_niov, struct kvec *src,
 234                  unsigned int offset, unsigned int len)
 235{
 236        /* Initialise 'dst' to the subset of 'src' starting at 'offset',
 237         * for exactly 'len' bytes, and return the number of entries.
 238         * NB not destructive to 'src' */
 239        unsigned int frag_len;
 240        unsigned int niov;
 241
 242        if (len == 0)                      /* no data => */
 243                return 0;                    /* no frags */
 244
 245        LASSERT(src_niov > 0);
 246        while (offset >= src->iov_len) {      /* skip initial frags */
 247                offset -= src->iov_len;
 248                src_niov--;
 249                src++;
 250                LASSERT(src_niov > 0);
 251        }
 252
 253        niov = 1;
 254        for (;;) {
 255                LASSERT(src_niov > 0);
 256                LASSERT((int)niov <= dst_niov);
 257
 258                frag_len = src->iov_len - offset;
 259                dst->iov_base = ((char *)src->iov_base) + offset;
 260
 261                if (len <= frag_len) {
 262                        dst->iov_len = len;
 263                        return niov;
 264                }
 265
 266                dst->iov_len = frag_len;
 267
 268                len -= frag_len;
 269                dst++;
 270                src++;
 271                niov++;
 272                src_niov--;
 273                offset = 0;
 274        }
 275}
 276EXPORT_SYMBOL(lnet_extract_iov);
 277
 278
 279unsigned int
 280lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
 281{
 282        unsigned int nob = 0;
 283
 284        while (niov-- > 0)
 285                nob += (kiov++)->kiov_len;
 286
 287        return nob;
 288}
 289EXPORT_SYMBOL(lnet_kiov_nob);
 290
 291void
 292lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
 293                    unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
 294                    unsigned int nob)
 295{
 296        /* NB diov, siov are READ-ONLY */
 297        unsigned int this_nob;
 298        char *daddr = NULL;
 299        char *saddr = NULL;
 300
 301        if (nob == 0)
 302                return;
 303
 304        LASSERT(!in_interrupt());
 305
 306        LASSERT(ndiov > 0);
 307        while (doffset >= diov->kiov_len) {
 308                doffset -= diov->kiov_len;
 309                diov++;
 310                ndiov--;
 311                LASSERT(ndiov > 0);
 312        }
 313
 314        LASSERT(nsiov > 0);
 315        while (soffset >= siov->kiov_len) {
 316                soffset -= siov->kiov_len;
 317                siov++;
 318                nsiov--;
 319                LASSERT(nsiov > 0);
 320        }
 321
 322        do {
 323                LASSERT(ndiov > 0);
 324                LASSERT(nsiov > 0);
 325                this_nob = min(diov->kiov_len - doffset,
 326                               siov->kiov_len - soffset);
 327                this_nob = min(this_nob, nob);
 328
 329                if (daddr == NULL)
 330                        daddr = ((char *)kmap(diov->kiov_page)) +
 331                                diov->kiov_offset + doffset;
 332                if (saddr == NULL)
 333                        saddr = ((char *)kmap(siov->kiov_page)) +
 334                                siov->kiov_offset + soffset;
 335
 336                /* Vanishing risk of kmap deadlock when mapping 2 pages.
 337                 * However in practice at least one of the kiovs will be mapped
 338                 * kernel pages and the map/unmap will be NOOPs */
 339
 340                memcpy(daddr, saddr, this_nob);
 341                nob -= this_nob;
 342
 343                if (diov->kiov_len > doffset + this_nob) {
 344                        daddr += this_nob;
 345                        doffset += this_nob;
 346                } else {
 347                        kunmap(diov->kiov_page);
 348                        daddr = NULL;
 349                        diov++;
 350                        ndiov--;
 351                        doffset = 0;
 352                }
 353
 354                if (siov->kiov_len > soffset + this_nob) {
 355                        saddr += this_nob;
 356                        soffset += this_nob;
 357                } else {
 358                        kunmap(siov->kiov_page);
 359                        saddr = NULL;
 360                        siov++;
 361                        nsiov--;
 362                        soffset = 0;
 363                }
 364        } while (nob > 0);
 365
 366        if (daddr != NULL)
 367                kunmap(diov->kiov_page);
 368        if (saddr != NULL)
 369                kunmap(siov->kiov_page);
 370}
 371EXPORT_SYMBOL(lnet_copy_kiov2kiov);
 372
 373void
 374lnet_copy_kiov2iov(unsigned int niov, struct kvec *iov, unsigned int iovoffset,
 375                   unsigned int nkiov, lnet_kiov_t *kiov,
 376                   unsigned int kiovoffset, unsigned int nob)
 377{
 378        /* NB iov, kiov are READ-ONLY */
 379        unsigned int this_nob;
 380        char *addr = NULL;
 381
 382        if (nob == 0)
 383                return;
 384
 385        LASSERT(!in_interrupt());
 386
 387        LASSERT(niov > 0);
 388        while (iovoffset >= iov->iov_len) {
 389                iovoffset -= iov->iov_len;
 390                iov++;
 391                niov--;
 392                LASSERT(niov > 0);
 393        }
 394
 395        LASSERT(nkiov > 0);
 396        while (kiovoffset >= kiov->kiov_len) {
 397                kiovoffset -= kiov->kiov_len;
 398                kiov++;
 399                nkiov--;
 400                LASSERT(nkiov > 0);
 401        }
 402
 403        do {
 404                LASSERT(niov > 0);
 405                LASSERT(nkiov > 0);
 406                this_nob = min(iov->iov_len - iovoffset,
 407                               (__kernel_size_t) kiov->kiov_len - kiovoffset);
 408                this_nob = min(this_nob, nob);
 409
 410                if (addr == NULL)
 411                        addr = ((char *)kmap(kiov->kiov_page)) +
 412                                kiov->kiov_offset + kiovoffset;
 413
 414                memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
 415                nob -= this_nob;
 416
 417                if (iov->iov_len > iovoffset + this_nob) {
 418                        iovoffset += this_nob;
 419                } else {
 420                        iov++;
 421                        niov--;
 422                        iovoffset = 0;
 423                }
 424
 425                if (kiov->kiov_len > kiovoffset + this_nob) {
 426                        addr += this_nob;
 427                        kiovoffset += this_nob;
 428                } else {
 429                        kunmap(kiov->kiov_page);
 430                        addr = NULL;
 431                        kiov++;
 432                        nkiov--;
 433                        kiovoffset = 0;
 434                }
 435
 436        } while (nob > 0);
 437
 438        if (addr != NULL)
 439                kunmap(kiov->kiov_page);
 440}
 441EXPORT_SYMBOL(lnet_copy_kiov2iov);
 442
 443void
 444lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov,
 445                   unsigned int kiovoffset, unsigned int niov,
 446                   struct kvec *iov, unsigned int iovoffset,
 447                   unsigned int nob)
 448{
 449        /* NB kiov, iov are READ-ONLY */
 450        unsigned int this_nob;
 451        char *addr = NULL;
 452
 453        if (nob == 0)
 454                return;
 455
 456        LASSERT(!in_interrupt());
 457
 458        LASSERT(nkiov > 0);
 459        while (kiovoffset >= kiov->kiov_len) {
 460                kiovoffset -= kiov->kiov_len;
 461                kiov++;
 462                nkiov--;
 463                LASSERT(nkiov > 0);
 464        }
 465
 466        LASSERT(niov > 0);
 467        while (iovoffset >= iov->iov_len) {
 468                iovoffset -= iov->iov_len;
 469                iov++;
 470                niov--;
 471                LASSERT(niov > 0);
 472        }
 473
 474        do {
 475                LASSERT(nkiov > 0);
 476                LASSERT(niov > 0);
 477                this_nob = min((__kernel_size_t) kiov->kiov_len - kiovoffset,
 478                               iov->iov_len - iovoffset);
 479                this_nob = min(this_nob, nob);
 480
 481                if (addr == NULL)
 482                        addr = ((char *)kmap(kiov->kiov_page)) +
 483                                kiov->kiov_offset + kiovoffset;
 484
 485                memcpy(addr, (char *)iov->iov_base + iovoffset, this_nob);
 486                nob -= this_nob;
 487
 488                if (kiov->kiov_len > kiovoffset + this_nob) {
 489                        addr += this_nob;
 490                        kiovoffset += this_nob;
 491                } else {
 492                        kunmap(kiov->kiov_page);
 493                        addr = NULL;
 494                        kiov++;
 495                        nkiov--;
 496                        kiovoffset = 0;
 497                }
 498
 499                if (iov->iov_len > iovoffset + this_nob) {
 500                        iovoffset += this_nob;
 501                } else {
 502                        iov++;
 503                        niov--;
 504                        iovoffset = 0;
 505                }
 506        } while (nob > 0);
 507
 508        if (addr != NULL)
 509                kunmap(kiov->kiov_page);
 510}
 511EXPORT_SYMBOL(lnet_copy_iov2kiov);
 512
 513int
 514lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
 515                   int src_niov, lnet_kiov_t *src,
 516                   unsigned int offset, unsigned int len)
 517{
 518        /* Initialise 'dst' to the subset of 'src' starting at 'offset',
 519         * for exactly 'len' bytes, and return the number of entries.
 520         * NB not destructive to 'src' */
 521        unsigned int frag_len;
 522        unsigned int niov;
 523
 524        if (len == 0)                      /* no data => */
 525                return 0;                    /* no frags */
 526
 527        LASSERT(src_niov > 0);
 528        while (offset >= src->kiov_len) {      /* skip initial frags */
 529                offset -= src->kiov_len;
 530                src_niov--;
 531                src++;
 532                LASSERT(src_niov > 0);
 533        }
 534
 535        niov = 1;
 536        for (;;) {
 537                LASSERT(src_niov > 0);
 538                LASSERT((int)niov <= dst_niov);
 539
 540                frag_len = src->kiov_len - offset;
 541                dst->kiov_page = src->kiov_page;
 542                dst->kiov_offset = src->kiov_offset + offset;
 543
 544                if (len <= frag_len) {
 545                        dst->kiov_len = len;
 546                        LASSERT(dst->kiov_offset + dst->kiov_len
 547                                             <= PAGE_CACHE_SIZE);
 548                        return niov;
 549                }
 550
 551                dst->kiov_len = frag_len;
 552                LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_CACHE_SIZE);
 553
 554                len -= frag_len;
 555                dst++;
 556                src++;
 557                niov++;
 558                src_niov--;
 559                offset = 0;
 560        }
 561}
 562EXPORT_SYMBOL(lnet_extract_kiov);
 563
 564static void
 565lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
 566             unsigned int offset, unsigned int mlen, unsigned int rlen)
 567{
 568        unsigned int niov = 0;
 569        struct kvec *iov = NULL;
 570        lnet_kiov_t *kiov = NULL;
 571        int rc;
 572
 573        LASSERT(!in_interrupt());
 574        LASSERT(mlen == 0 || msg != NULL);
 575
 576        if (msg != NULL) {
 577                LASSERT(msg->msg_receiving);
 578                LASSERT(!msg->msg_sending);
 579                LASSERT(rlen == msg->msg_len);
 580                LASSERT(mlen <= msg->msg_len);
 581                LASSERT(msg->msg_offset == offset);
 582                LASSERT(msg->msg_wanted == mlen);
 583
 584                msg->msg_receiving = 0;
 585
 586                if (mlen != 0) {
 587                        niov = msg->msg_niov;
 588                        iov  = msg->msg_iov;
 589                        kiov = msg->msg_kiov;
 590
 591                        LASSERT(niov > 0);
 592                        LASSERT((iov == NULL) != (kiov == NULL));
 593                }
 594        }
 595
 596        rc = (ni->ni_lnd->lnd_recv)(ni, private, msg, delayed,
 597                                    niov, iov, kiov, offset, mlen, rlen);
 598        if (rc < 0)
 599                lnet_finalize(ni, msg, rc);
 600}
 601
 602static void
 603lnet_setpayloadbuffer(lnet_msg_t *msg)
 604{
 605        lnet_libmd_t *md = msg->msg_md;
 606
 607        LASSERT(msg->msg_len > 0);
 608        LASSERT(!msg->msg_routing);
 609        LASSERT(md != NULL);
 610        LASSERT(msg->msg_niov == 0);
 611        LASSERT(msg->msg_iov == NULL);
 612        LASSERT(msg->msg_kiov == NULL);
 613
 614        msg->msg_niov = md->md_niov;
 615        if ((md->md_options & LNET_MD_KIOV) != 0)
 616                msg->msg_kiov = md->md_iov.kiov;
 617        else
 618                msg->msg_iov = md->md_iov.iov;
 619}
 620
 621void
 622lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
 623               unsigned int offset, unsigned int len)
 624{
 625        msg->msg_type = type;
 626        msg->msg_target = target;
 627        msg->msg_len = len;
 628        msg->msg_offset = offset;
 629
 630        if (len != 0)
 631                lnet_setpayloadbuffer(msg);
 632
 633        memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
 634        msg->msg_hdr.type          = cpu_to_le32(type);
 635        msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
 636        msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
 637        /* src_nid will be set later */
 638        msg->msg_hdr.src_pid    = cpu_to_le32(the_lnet.ln_pid);
 639        msg->msg_hdr.payload_length = cpu_to_le32(len);
 640}
 641
 642static void
 643lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
 644{
 645        void *priv = msg->msg_private;
 646        int rc;
 647
 648        LASSERT(!in_interrupt());
 649        LASSERT(LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
 650                 (msg->msg_txcredit && msg->msg_peertxcredit));
 651
 652        rc = (ni->ni_lnd->lnd_send)(ni, priv, msg);
 653        if (rc < 0)
 654                lnet_finalize(ni, msg, rc);
 655}
 656
 657static int
 658lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
 659{
 660        int rc;
 661
 662        LASSERT(!msg->msg_sending);
 663        LASSERT(msg->msg_receiving);
 664        LASSERT(!msg->msg_rx_ready_delay);
 665        LASSERT(ni->ni_lnd->lnd_eager_recv != NULL);
 666
 667        msg->msg_rx_ready_delay = 1;
 668        rc = (ni->ni_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
 669                                          &msg->msg_private);
 670        if (rc != 0) {
 671                CERROR("recv from %s / send to %s aborted: eager_recv failed %d\n",
 672                       libcfs_nid2str(msg->msg_rxpeer->lp_nid),
 673                       libcfs_id2str(msg->msg_target), rc);
 674                LASSERT(rc < 0); /* required by my callers */
 675        }
 676
 677        return rc;
 678}
 679
 680/* NB: caller shall hold a ref on 'lp' as I'd drop lnet_net_lock */
 681static void
 682lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
 683{
 684        unsigned long last_alive = 0;
 685
 686        LASSERT(lnet_peer_aliveness_enabled(lp));
 687        LASSERT(ni->ni_lnd->lnd_query != NULL);
 688
 689        lnet_net_unlock(lp->lp_cpt);
 690        (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
 691        lnet_net_lock(lp->lp_cpt);
 692
 693        lp->lp_last_query = cfs_time_current();
 694
 695        if (last_alive != 0) /* NI has updated timestamp */
 696                lp->lp_last_alive = last_alive;
 697}
 698
 699/* NB: always called with lnet_net_lock held */
 700static inline int
 701lnet_peer_is_alive(lnet_peer_t *lp, unsigned long now)
 702{
 703        int alive;
 704        unsigned long deadline;
 705
 706        LASSERT(lnet_peer_aliveness_enabled(lp));
 707
 708        /* Trust lnet_notify() if it has more recent aliveness news, but
 709         * ignore the initial assumed death (see lnet_peers_start_down()).
 710         */
 711        if (!lp->lp_alive && lp->lp_alive_count > 0 &&
 712            cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
 713                return 0;
 714
 715        deadline = cfs_time_add(lp->lp_last_alive,
 716                                cfs_time_seconds(lp->lp_ni->ni_peertimeout));
 717        alive = cfs_time_after(deadline, now);
 718
 719        /* Update obsolete lp_alive except for routers assumed to be dead
 720         * initially, because router checker would update aliveness in this
 721         * case, and moreover lp_last_alive at peer creation is assumed.
 722         */
 723        if (alive && !lp->lp_alive &&
 724            !(lnet_isrouter(lp) && lp->lp_alive_count == 0))
 725                lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
 726
 727        return alive;
 728}
 729
 730
 731/* NB: returns 1 when alive, 0 when dead, negative when error;
 732 *     may drop the lnet_net_lock */
 733static int
 734lnet_peer_alive_locked(lnet_peer_t *lp)
 735{
 736        unsigned long now = cfs_time_current();
 737
 738        if (!lnet_peer_aliveness_enabled(lp))
 739                return -ENODEV;
 740
 741        if (lnet_peer_is_alive(lp, now))
 742                return 1;
 743
 744        /* Peer appears dead, but we should avoid frequent NI queries (at
 745         * most once per lnet_queryinterval seconds). */
 746        if (lp->lp_last_query != 0) {
 747                static const int lnet_queryinterval = 1;
 748
 749                unsigned long next_query =
 750                           cfs_time_add(lp->lp_last_query,
 751                                        cfs_time_seconds(lnet_queryinterval));
 752
 753                if (time_before(now, next_query)) {
 754                        if (lp->lp_alive)
 755                                CWARN("Unexpected aliveness of peer %s: %d < %d (%d/%d)\n",
 756                                      libcfs_nid2str(lp->lp_nid),
 757                                      (int)now, (int)next_query,
 758                                      lnet_queryinterval,
 759                                      lp->lp_ni->ni_peertimeout);
 760                        return 0;
 761                }
 762        }
 763
 764        /* query NI for latest aliveness news */
 765        lnet_ni_query_locked(lp->lp_ni, lp);
 766
 767        if (lnet_peer_is_alive(lp, now))
 768                return 1;
 769
 770        lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
 771        return 0;
 772}
 773
 774/**
 775 * \param msg The message to be sent.
 776 * \param do_send True if lnet_ni_send() should be called in this function.
 777 *        lnet_send() is going to lnet_net_unlock immediately after this, so
 778 *        it sets do_send FALSE and I don't do the unlock/send/lock bit.
 779 *
 780 * \retval 0 If \a msg sent or OK to send.
 781 * \retval EAGAIN If \a msg blocked for credit.
 782 * \retval EHOSTUNREACH If the next hop of the message appears dead.
 783 * \retval ECANCELED If the MD of the message has been unlinked.
 784 */
 785static int
 786lnet_post_send_locked(lnet_msg_t *msg, int do_send)
 787{
 788        lnet_peer_t *lp = msg->msg_txpeer;
 789        lnet_ni_t *ni = lp->lp_ni;
 790        int cpt = msg->msg_tx_cpt;
 791        struct lnet_tx_queue *tq = ni->ni_tx_queues[cpt];
 792
 793        /* non-lnet_send() callers have checked before */
 794        LASSERT(!do_send || msg->msg_tx_delayed);
 795        LASSERT(!msg->msg_receiving);
 796        LASSERT(msg->msg_tx_committed);
 797
 798        /* NB 'lp' is always the next hop */
 799        if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
 800            lnet_peer_alive_locked(lp) == 0) {
 801                the_lnet.ln_counters[cpt]->drop_count++;
 802                the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
 803                lnet_net_unlock(cpt);
 804
 805                CNETERR("Dropping message for %s: peer not alive\n",
 806                        libcfs_id2str(msg->msg_target));
 807                if (do_send)
 808                        lnet_finalize(ni, msg, -EHOSTUNREACH);
 809
 810                lnet_net_lock(cpt);
 811                return EHOSTUNREACH;
 812        }
 813
 814        if (msg->msg_md != NULL &&
 815            (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED) != 0) {
 816                lnet_net_unlock(cpt);
 817
 818                CNETERR("Aborting message for %s: LNetM[DE]Unlink() already called on the MD/ME.\n",
 819                        libcfs_id2str(msg->msg_target));
 820                if (do_send)
 821                        lnet_finalize(ni, msg, -ECANCELED);
 822
 823                lnet_net_lock(cpt);
 824                return ECANCELED;
 825        }
 826
 827        if (!msg->msg_peertxcredit) {
 828                LASSERT((lp->lp_txcredits < 0) ==
 829                         !list_empty(&lp->lp_txq));
 830
 831                msg->msg_peertxcredit = 1;
 832                lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
 833                lp->lp_txcredits--;
 834
 835                if (lp->lp_txcredits < lp->lp_mintxcredits)
 836                        lp->lp_mintxcredits = lp->lp_txcredits;
 837
 838                if (lp->lp_txcredits < 0) {
 839                        msg->msg_tx_delayed = 1;
 840                        list_add_tail(&msg->msg_list, &lp->lp_txq);
 841                        return EAGAIN;
 842                }
 843        }
 844
 845        if (!msg->msg_txcredit) {
 846                LASSERT((tq->tq_credits < 0) ==
 847                        !list_empty(&tq->tq_delayed));
 848
 849                msg->msg_txcredit = 1;
 850                tq->tq_credits--;
 851
 852                if (tq->tq_credits < tq->tq_credits_min)
 853                        tq->tq_credits_min = tq->tq_credits;
 854
 855                if (tq->tq_credits < 0) {
 856                        msg->msg_tx_delayed = 1;
 857                        list_add_tail(&msg->msg_list, &tq->tq_delayed);
 858                        return EAGAIN;
 859                }
 860        }
 861
 862        if (do_send) {
 863                lnet_net_unlock(cpt);
 864                lnet_ni_send(ni, msg);
 865                lnet_net_lock(cpt);
 866        }
 867        return 0;
 868}
 869
 870
 871static lnet_rtrbufpool_t *
 872lnet_msg2bufpool(lnet_msg_t *msg)
 873{
 874        lnet_rtrbufpool_t *rbp;
 875        int cpt;
 876
 877        LASSERT(msg->msg_rx_committed);
 878
 879        cpt = msg->msg_rx_cpt;
 880        rbp = &the_lnet.ln_rtrpools[cpt][0];
 881
 882        LASSERT(msg->msg_len <= LNET_MTU);
 883        while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_CACHE_SIZE) {
 884                rbp++;
 885                LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
 886        }
 887
 888        return rbp;
 889}
 890
 891static int
 892lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
 893{
 894        /* lnet_parse is going to lnet_net_unlock immediately after this, so it
 895         * sets do_recv FALSE and I don't do the unlock/send/lock bit.  I
 896         * return EAGAIN if msg blocked and 0 if received or OK to receive */
 897        lnet_peer_t *lp = msg->msg_rxpeer;
 898        lnet_rtrbufpool_t *rbp;
 899        lnet_rtrbuf_t *rb;
 900
 901        LASSERT(msg->msg_iov == NULL);
 902        LASSERT(msg->msg_kiov == NULL);
 903        LASSERT(msg->msg_niov == 0);
 904        LASSERT(msg->msg_routing);
 905        LASSERT(msg->msg_receiving);
 906        LASSERT(!msg->msg_sending);
 907
 908        /* non-lnet_parse callers only receive delayed messages */
 909        LASSERT(!do_recv || msg->msg_rx_delayed);
 910
 911        if (!msg->msg_peerrtrcredit) {
 912                LASSERT((lp->lp_rtrcredits < 0) ==
 913                         !list_empty(&lp->lp_rtrq));
 914
 915                msg->msg_peerrtrcredit = 1;
 916                lp->lp_rtrcredits--;
 917                if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
 918                        lp->lp_minrtrcredits = lp->lp_rtrcredits;
 919
 920                if (lp->lp_rtrcredits < 0) {
 921                        /* must have checked eager_recv before here */
 922                        LASSERT(msg->msg_rx_ready_delay);
 923                        msg->msg_rx_delayed = 1;
 924                        list_add_tail(&msg->msg_list, &lp->lp_rtrq);
 925                        return EAGAIN;
 926                }
 927        }
 928
 929        rbp = lnet_msg2bufpool(msg);
 930
 931        if (!msg->msg_rtrcredit) {
 932                LASSERT((rbp->rbp_credits < 0) ==
 933                         !list_empty(&rbp->rbp_msgs));
 934
 935                msg->msg_rtrcredit = 1;
 936                rbp->rbp_credits--;
 937                if (rbp->rbp_credits < rbp->rbp_mincredits)
 938                        rbp->rbp_mincredits = rbp->rbp_credits;
 939
 940                if (rbp->rbp_credits < 0) {
 941                        /* must have checked eager_recv before here */
 942                        LASSERT(msg->msg_rx_ready_delay);
 943                        msg->msg_rx_delayed = 1;
 944                        list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
 945                        return EAGAIN;
 946                }
 947        }
 948
 949        LASSERT(!list_empty(&rbp->rbp_bufs));
 950        rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
 951        list_del(&rb->rb_list);
 952
 953        msg->msg_niov = rbp->rbp_npages;
 954        msg->msg_kiov = &rb->rb_kiov[0];
 955
 956        if (do_recv) {
 957                int cpt = msg->msg_rx_cpt;
 958
 959                lnet_net_unlock(cpt);
 960                lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
 961                             0, msg->msg_len, msg->msg_len);
 962                lnet_net_lock(cpt);
 963        }
 964        return 0;
 965}
 966
 967void
 968lnet_return_tx_credits_locked(lnet_msg_t *msg)
 969{
 970        lnet_peer_t *txpeer = msg->msg_txpeer;
 971        lnet_msg_t *msg2;
 972
 973        if (msg->msg_txcredit) {
 974                struct lnet_ni *ni = txpeer->lp_ni;
 975                struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
 976
 977                /* give back NI txcredits */
 978                msg->msg_txcredit = 0;
 979
 980                LASSERT((tq->tq_credits < 0) ==
 981                        !list_empty(&tq->tq_delayed));
 982
 983                tq->tq_credits++;
 984                if (tq->tq_credits <= 0) {
 985                        msg2 = list_entry(tq->tq_delayed.next,
 986                                              lnet_msg_t, msg_list);
 987                        list_del(&msg2->msg_list);
 988
 989                        LASSERT(msg2->msg_txpeer->lp_ni == ni);
 990                        LASSERT(msg2->msg_tx_delayed);
 991
 992                        (void) lnet_post_send_locked(msg2, 1);
 993                }
 994        }
 995
 996        if (msg->msg_peertxcredit) {
 997                /* give back peer txcredits */
 998                msg->msg_peertxcredit = 0;
 999
1000                LASSERT((txpeer->lp_txcredits < 0) ==
1001                        !list_empty(&txpeer->lp_txq));
1002
1003                txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
1004                LASSERT(txpeer->lp_txqnob >= 0);
1005
1006                txpeer->lp_txcredits++;
1007                if (txpeer->lp_txcredits <= 0) {
1008                        msg2 = list_entry(txpeer->lp_txq.next,
1009                                              lnet_msg_t, msg_list);
1010                        list_del(&msg2->msg_list);
1011
1012                        LASSERT(msg2->msg_txpeer == txpeer);
1013                        LASSERT(msg2->msg_tx_delayed);
1014
1015                        (void) lnet_post_send_locked(msg2, 1);
1016                }
1017        }
1018
1019        if (txpeer != NULL) {
1020                msg->msg_txpeer = NULL;
1021                lnet_peer_decref_locked(txpeer);
1022        }
1023}
1024
1025void
1026lnet_return_rx_credits_locked(lnet_msg_t *msg)
1027{
1028        lnet_peer_t *rxpeer = msg->msg_rxpeer;
1029        lnet_msg_t *msg2;
1030
1031        if (msg->msg_rtrcredit) {
1032                /* give back global router credits */
1033                lnet_rtrbuf_t *rb;
1034                lnet_rtrbufpool_t *rbp;
1035
1036                /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1037                 * there until it gets one allocated, or aborts the wait
1038                 * itself */
1039                LASSERT(msg->msg_kiov != NULL);
1040
1041                rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
1042                rbp = rb->rb_pool;
1043                LASSERT(rbp == lnet_msg2bufpool(msg));
1044
1045                msg->msg_kiov = NULL;
1046                msg->msg_rtrcredit = 0;
1047
1048                LASSERT((rbp->rbp_credits < 0) ==
1049                        !list_empty(&rbp->rbp_msgs));
1050                LASSERT((rbp->rbp_credits > 0) ==
1051                        !list_empty(&rbp->rbp_bufs));
1052
1053                list_add(&rb->rb_list, &rbp->rbp_bufs);
1054                rbp->rbp_credits++;
1055                if (rbp->rbp_credits <= 0) {
1056                        msg2 = list_entry(rbp->rbp_msgs.next,
1057                                              lnet_msg_t, msg_list);
1058                        list_del(&msg2->msg_list);
1059
1060                        (void) lnet_post_routed_recv_locked(msg2, 1);
1061                }
1062        }
1063
1064        if (msg->msg_peerrtrcredit) {
1065                /* give back peer router credits */
1066                msg->msg_peerrtrcredit = 0;
1067
1068                LASSERT((rxpeer->lp_rtrcredits < 0) ==
1069                        !list_empty(&rxpeer->lp_rtrq));
1070
1071                rxpeer->lp_rtrcredits++;
1072                if (rxpeer->lp_rtrcredits <= 0) {
1073                        msg2 = list_entry(rxpeer->lp_rtrq.next,
1074                                              lnet_msg_t, msg_list);
1075                        list_del(&msg2->msg_list);
1076
1077                        (void) lnet_post_routed_recv_locked(msg2, 1);
1078                }
1079        }
1080        if (rxpeer != NULL) {
1081                msg->msg_rxpeer = NULL;
1082                lnet_peer_decref_locked(rxpeer);
1083        }
1084}
1085
1086static int
1087lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
1088{
1089        lnet_peer_t *p1 = r1->lr_gateway;
1090        lnet_peer_t *p2 = r2->lr_gateway;
1091
1092        if (r1->lr_priority < r2->lr_priority)
1093                return 1;
1094
1095        if (r1->lr_priority > r2->lr_priority)
1096                return -1;
1097
1098        if (r1->lr_hops < r2->lr_hops)
1099                return 1;
1100
1101        if (r1->lr_hops > r2->lr_hops)
1102                return -1;
1103
1104        if (p1->lp_txqnob < p2->lp_txqnob)
1105                return 1;
1106
1107        if (p1->lp_txqnob > p2->lp_txqnob)
1108                return -1;
1109
1110        if (p1->lp_txcredits > p2->lp_txcredits)
1111                return 1;
1112
1113        if (p1->lp_txcredits < p2->lp_txcredits)
1114                return -1;
1115
1116        if (r1->lr_seq - r2->lr_seq <= 0)
1117                return 1;
1118
1119        return -1;
1120}
1121
1122static lnet_peer_t *
1123lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
1124{
1125        lnet_remotenet_t *rnet;
1126        lnet_route_t *rtr;
1127        lnet_route_t *rtr_best;
1128        lnet_route_t *rtr_last;
1129        struct lnet_peer *lp_best;
1130        struct lnet_peer *lp;
1131        int rc;
1132
1133        /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
1134         * rtr_nid nid, otherwise find the best gateway I can use */
1135
1136        rnet = lnet_find_net_locked(LNET_NIDNET(target));
1137        if (rnet == NULL)
1138                return NULL;
1139
1140        lp_best = NULL;
1141        rtr_best = rtr_last = NULL;
1142        list_for_each_entry(rtr, &rnet->lrn_routes, lr_list) {
1143                lp = rtr->lr_gateway;
1144
1145                if (!lp->lp_alive || /* gateway is down */
1146                    ((lp->lp_ping_feats & LNET_PING_FEAT_NI_STATUS) != 0 &&
1147                     rtr->lr_downis != 0)) /* NI to target is down */
1148                        continue;
1149
1150                if (ni != NULL && lp->lp_ni != ni)
1151                        continue;
1152
1153                if (lp->lp_nid == rtr_nid) /* it's pre-determined router */
1154                        return lp;
1155
1156                if (lp_best == NULL) {
1157                        rtr_best = rtr_last = rtr;
1158                        lp_best = lp;
1159                        continue;
1160                }
1161
1162                /* no protection on below fields, but it's harmless */
1163                if (rtr_last->lr_seq - rtr->lr_seq < 0)
1164                        rtr_last = rtr;
1165
1166                rc = lnet_compare_routes(rtr, rtr_best);
1167                if (rc < 0)
1168                        continue;
1169
1170                rtr_best = rtr;
1171                lp_best = lp;
1172        }
1173
1174        /* set sequence number on the best router to the latest sequence + 1
1175         * so we can round-robin all routers, it's race and inaccurate but
1176         * harmless and functional  */
1177        if (rtr_best != NULL)
1178                rtr_best->lr_seq = rtr_last->lr_seq + 1;
1179        return lp_best;
1180}
1181
1182int
1183lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
1184{
1185        lnet_nid_t dst_nid = msg->msg_target.nid;
1186        struct lnet_ni *src_ni;
1187        struct lnet_ni *local_ni;
1188        struct lnet_peer *lp;
1189        int cpt;
1190        int cpt2;
1191        int rc;
1192
1193        /* NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
1194         * but we might want to use pre-determined router for ACK/REPLY
1195         * in the future */
1196        /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
1197        LASSERT(msg->msg_txpeer == NULL);
1198        LASSERT(!msg->msg_sending);
1199        LASSERT(!msg->msg_target_is_router);
1200        LASSERT(!msg->msg_receiving);
1201
1202        msg->msg_sending = 1;
1203
1204        LASSERT(!msg->msg_tx_committed);
1205        cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
1206 again:
1207        lnet_net_lock(cpt);
1208
1209        if (the_lnet.ln_shutdown) {
1210                lnet_net_unlock(cpt);
1211                return -ESHUTDOWN;
1212        }
1213
1214        if (src_nid == LNET_NID_ANY) {
1215                src_ni = NULL;
1216        } else {
1217                src_ni = lnet_nid2ni_locked(src_nid, cpt);
1218                if (src_ni == NULL) {
1219                        lnet_net_unlock(cpt);
1220                        LCONSOLE_WARN("Can't send to %s: src %s is not a local nid\n",
1221                                      libcfs_nid2str(dst_nid),
1222                                      libcfs_nid2str(src_nid));
1223                        return -EINVAL;
1224                }
1225                LASSERT(!msg->msg_routing);
1226        }
1227
1228        /* Is this for someone on a local network? */
1229        local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
1230
1231        if (local_ni != NULL) {
1232                if (src_ni == NULL) {
1233                        src_ni = local_ni;
1234                        src_nid = src_ni->ni_nid;
1235                } else if (src_ni == local_ni) {
1236                        lnet_ni_decref_locked(local_ni, cpt);
1237                } else {
1238                        lnet_ni_decref_locked(local_ni, cpt);
1239                        lnet_ni_decref_locked(src_ni, cpt);
1240                        lnet_net_unlock(cpt);
1241                        LCONSOLE_WARN("No route to %s via from %s\n",
1242                                      libcfs_nid2str(dst_nid),
1243                                      libcfs_nid2str(src_nid));
1244                        return -EINVAL;
1245                }
1246
1247                LASSERT(src_nid != LNET_NID_ANY);
1248                lnet_msg_commit(msg, cpt);
1249
1250                if (!msg->msg_routing)
1251                        msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1252
1253                if (src_ni == the_lnet.ln_loni) {
1254                        /* No send credit hassles with LOLND */
1255                        lnet_net_unlock(cpt);
1256                        lnet_ni_send(src_ni, msg);
1257
1258                        lnet_net_lock(cpt);
1259                        lnet_ni_decref_locked(src_ni, cpt);
1260                        lnet_net_unlock(cpt);
1261                        return 0;
1262                }
1263
1264                rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
1265                /* lp has ref on src_ni; lose mine */
1266                lnet_ni_decref_locked(src_ni, cpt);
1267                if (rc != 0) {
1268                        lnet_net_unlock(cpt);
1269                        LCONSOLE_WARN("Error %d finding peer %s\n", rc,
1270                                      libcfs_nid2str(dst_nid));
1271                        /* ENOMEM or shutting down */
1272                        return rc;
1273                }
1274                LASSERT(lp->lp_ni == src_ni);
1275        } else {
1276                /* sending to a remote network */
1277                lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
1278                if (lp == NULL) {
1279                        if (src_ni != NULL)
1280                                lnet_ni_decref_locked(src_ni, cpt);
1281                        lnet_net_unlock(cpt);
1282
1283                        LCONSOLE_WARN("No route to %s via %s (all routers down)\n",
1284                                      libcfs_id2str(msg->msg_target),
1285                                      libcfs_nid2str(src_nid));
1286                        return -EHOSTUNREACH;
1287                }
1288
1289                /* rtr_nid is LNET_NID_ANY or NID of pre-determined router,
1290                 * it's possible that rtr_nid isn't LNET_NID_ANY and lp isn't
1291                 * pre-determined router, this can happen if router table
1292                 * was changed when we release the lock */
1293                if (rtr_nid != lp->lp_nid) {
1294                        cpt2 = lnet_cpt_of_nid_locked(lp->lp_nid);
1295                        if (cpt2 != cpt) {
1296                                if (src_ni != NULL)
1297                                        lnet_ni_decref_locked(src_ni, cpt);
1298                                lnet_net_unlock(cpt);
1299
1300                                rtr_nid = lp->lp_nid;
1301                                cpt = cpt2;
1302                                goto again;
1303                        }
1304                }
1305
1306                CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
1307                       libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
1308                       lnet_msgtyp2str(msg->msg_type), msg->msg_len);
1309
1310                if (src_ni == NULL) {
1311                        src_ni = lp->lp_ni;
1312                        src_nid = src_ni->ni_nid;
1313                } else {
1314                        LASSERT(src_ni == lp->lp_ni);
1315                        lnet_ni_decref_locked(src_ni, cpt);
1316                }
1317
1318                lnet_peer_addref_locked(lp);
1319
1320                LASSERT(src_nid != LNET_NID_ANY);
1321                lnet_msg_commit(msg, cpt);
1322
1323                if (!msg->msg_routing) {
1324                        /* I'm the source and now I know which NI to send on */
1325                        msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1326                }
1327
1328                msg->msg_target_is_router = 1;
1329                msg->msg_target.nid = lp->lp_nid;
1330                msg->msg_target.pid = LUSTRE_SRV_LNET_PID;
1331        }
1332
1333        /* 'lp' is our best choice of peer */
1334
1335        LASSERT(!msg->msg_peertxcredit);
1336        LASSERT(!msg->msg_txcredit);
1337        LASSERT(msg->msg_txpeer == NULL);
1338
1339        msg->msg_txpeer = lp;              /* msg takes my ref on lp */
1340
1341        rc = lnet_post_send_locked(msg, 0);
1342        lnet_net_unlock(cpt);
1343
1344        if (rc == EHOSTUNREACH || rc == ECANCELED)
1345                return -rc;
1346
1347        if (rc == 0)
1348                lnet_ni_send(src_ni, msg);
1349
1350        return 0; /* rc == 0 or EAGAIN */
1351}
1352
1353static void
1354lnet_drop_message(lnet_ni_t *ni, int cpt, void *private, unsigned int nob)
1355{
1356        lnet_net_lock(cpt);
1357        the_lnet.ln_counters[cpt]->drop_count++;
1358        the_lnet.ln_counters[cpt]->drop_length += nob;
1359        lnet_net_unlock(cpt);
1360
1361        lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1362}
1363
1364static void
1365lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
1366{
1367        lnet_hdr_t *hdr = &msg->msg_hdr;
1368
1369        if (msg->msg_wanted != 0)
1370                lnet_setpayloadbuffer(msg);
1371
1372        lnet_build_msg_event(msg, LNET_EVENT_PUT);
1373
1374        /* Must I ACK?  If so I'll grab the ack_wmd out of the header and put
1375         * it back into the ACK during lnet_finalize() */
1376        msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1377                        (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
1378
1379        lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
1380                     msg->msg_offset, msg->msg_wanted, hdr->payload_length);
1381}
1382
1383static int
1384lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1385{
1386        lnet_hdr_t *hdr = &msg->msg_hdr;
1387        struct lnet_match_info info;
1388        int rc;
1389
1390        /* Convert put fields to host byte order */
1391        hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1392        hdr->msg.put.ptl_index  = le32_to_cpu(hdr->msg.put.ptl_index);
1393        hdr->msg.put.offset     = le32_to_cpu(hdr->msg.put.offset);
1394
1395        info.mi_id.nid  = hdr->src_nid;
1396        info.mi_id.pid  = hdr->src_pid;
1397        info.mi_opc     = LNET_MD_OP_PUT;
1398        info.mi_portal  = hdr->msg.put.ptl_index;
1399        info.mi_rlength = hdr->payload_length;
1400        info.mi_roffset = hdr->msg.put.offset;
1401        info.mi_mbits   = hdr->msg.put.match_bits;
1402
1403        msg->msg_rx_ready_delay = ni->ni_lnd->lnd_eager_recv == NULL;
1404
1405 again:
1406        rc = lnet_ptl_match_md(&info, msg);
1407        switch (rc) {
1408        default:
1409                LBUG();
1410
1411        case LNET_MATCHMD_OK:
1412                lnet_recv_put(ni, msg);
1413                return 0;
1414
1415        case LNET_MATCHMD_NONE:
1416                if (msg->msg_rx_delayed) /* attached on delayed list */
1417                        return 0;
1418
1419                rc = lnet_ni_eager_recv(ni, msg);
1420                if (rc == 0)
1421                        goto again;
1422                /* fall through */
1423
1424        case LNET_MATCHMD_DROP:
1425                CNETERR("Dropping PUT from %s portal %d match %llu offset %d length %d: %d\n",
1426                        libcfs_id2str(info.mi_id), info.mi_portal,
1427                        info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
1428
1429                return ENOENT;  /* +ve: OK but no match */
1430        }
1431}
1432
1433static int
1434lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1435{
1436        struct lnet_match_info info;
1437        lnet_hdr_t *hdr = &msg->msg_hdr;
1438        lnet_handle_wire_t reply_wmd;
1439        int rc;
1440
1441        /* Convert get fields to host byte order */
1442        hdr->msg.get.match_bits  = le64_to_cpu(hdr->msg.get.match_bits);
1443        hdr->msg.get.ptl_index   = le32_to_cpu(hdr->msg.get.ptl_index);
1444        hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
1445        hdr->msg.get.src_offset  = le32_to_cpu(hdr->msg.get.src_offset);
1446
1447        info.mi_id.nid  = hdr->src_nid;
1448        info.mi_id.pid  = hdr->src_pid;
1449        info.mi_opc     = LNET_MD_OP_GET;
1450        info.mi_portal  = hdr->msg.get.ptl_index;
1451        info.mi_rlength = hdr->msg.get.sink_length;
1452        info.mi_roffset = hdr->msg.get.src_offset;
1453        info.mi_mbits   = hdr->msg.get.match_bits;
1454
1455        rc = lnet_ptl_match_md(&info, msg);
1456        if (rc == LNET_MATCHMD_DROP) {
1457                CNETERR("Dropping GET from %s portal %d match %llu offset %d length %d\n",
1458                        libcfs_id2str(info.mi_id), info.mi_portal,
1459                        info.mi_mbits, info.mi_roffset, info.mi_rlength);
1460                return ENOENT;  /* +ve: OK but no match */
1461        }
1462
1463        LASSERT(rc == LNET_MATCHMD_OK);
1464
1465        lnet_build_msg_event(msg, LNET_EVENT_GET);
1466
1467        reply_wmd = hdr->msg.get.return_wmd;
1468
1469        lnet_prep_send(msg, LNET_MSG_REPLY, info.mi_id,
1470                       msg->msg_offset, msg->msg_wanted);
1471
1472        msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1473
1474        if (rdma_get) {
1475                /* The LND completes the REPLY from her recv procedure */
1476                lnet_ni_recv(ni, msg->msg_private, msg, 0,
1477                             msg->msg_offset, msg->msg_len, msg->msg_len);
1478                return 0;
1479        }
1480
1481        lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1482        msg->msg_receiving = 0;
1483
1484        rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
1485        if (rc < 0) {
1486                /* didn't get as far as lnet_ni_send() */
1487                CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1488                       libcfs_nid2str(ni->ni_nid),
1489                       libcfs_id2str(info.mi_id), rc);
1490
1491                lnet_finalize(ni, msg, rc);
1492        }
1493
1494        return 0;
1495}
1496
1497static int
1498lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1499{
1500        void *private = msg->msg_private;
1501        lnet_hdr_t *hdr = &msg->msg_hdr;
1502        lnet_process_id_t src = {0};
1503        lnet_libmd_t *md;
1504        int rlength;
1505        int mlength;
1506        int cpt;
1507
1508        cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
1509        lnet_res_lock(cpt);
1510
1511        src.nid = hdr->src_nid;
1512        src.pid = hdr->src_pid;
1513
1514        /* NB handles only looked up by creator (no flips) */
1515        md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1516        if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1517                CNETERR("%s: Dropping REPLY from %s for %s MD %#llx.%#llx\n",
1518                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1519                        (md == NULL) ? "invalid" : "inactive",
1520                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
1521                        hdr->msg.reply.dst_wmd.wh_object_cookie);
1522                if (md != NULL && md->md_me != NULL)
1523                        CERROR("REPLY MD also attached to portal %d\n",
1524                               md->md_me->me_portal);
1525
1526                lnet_res_unlock(cpt);
1527                return ENOENT;            /* +ve: OK but no match */
1528        }
1529
1530        LASSERT(md->md_offset == 0);
1531
1532        rlength = hdr->payload_length;
1533        mlength = min_t(uint, rlength, md->md_length);
1534
1535        if (mlength < rlength &&
1536            (md->md_options & LNET_MD_TRUNCATE) == 0) {
1537                CNETERR("%s: Dropping REPLY from %s length %d for MD %#llx would overflow (%d)\n",
1538                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1539                        rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1540                        mlength);
1541                lnet_res_unlock(cpt);
1542                return ENOENT;    /* +ve: OK but no match */
1543        }
1544
1545        CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md %#llx\n",
1546               libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1547               mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1548
1549        lnet_msg_attach_md(msg, md, 0, mlength);
1550
1551        if (mlength != 0)
1552                lnet_setpayloadbuffer(msg);
1553
1554        lnet_res_unlock(cpt);
1555
1556        lnet_build_msg_event(msg, LNET_EVENT_REPLY);
1557
1558        lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1559        return 0;
1560}
1561
1562static int
1563lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1564{
1565        lnet_hdr_t *hdr = &msg->msg_hdr;
1566        lnet_process_id_t src = {0};
1567        lnet_libmd_t *md;
1568        int cpt;
1569
1570        src.nid = hdr->src_nid;
1571        src.pid = hdr->src_pid;
1572
1573        /* Convert ack fields to host byte order */
1574        hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1575        hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1576
1577        cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
1578        lnet_res_lock(cpt);
1579
1580        /* NB handles only looked up by creator (no flips) */
1581        md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1582        if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1583                /* Don't moan; this is expected */
1584                CDEBUG(D_NET,
1585                       "%s: Dropping ACK from %s to %s MD %#llx.%#llx\n",
1586                       libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1587                       (md == NULL) ? "invalid" : "inactive",
1588                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
1589                       hdr->msg.ack.dst_wmd.wh_object_cookie);
1590                if (md != NULL && md->md_me != NULL)
1591                        CERROR("Source MD also attached to portal %d\n",
1592                               md->md_me->me_portal);
1593
1594                lnet_res_unlock(cpt);
1595                return ENOENT;            /* +ve! */
1596        }
1597
1598        CDEBUG(D_NET, "%s: ACK from %s into md %#llx\n",
1599               libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1600               hdr->msg.ack.dst_wmd.wh_object_cookie);
1601
1602        lnet_msg_attach_md(msg, md, 0, 0);
1603
1604        lnet_res_unlock(cpt);
1605
1606        lnet_build_msg_event(msg, LNET_EVENT_ACK);
1607
1608        lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1609        return 0;
1610}
1611
1612static int
1613lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
1614{
1615        int rc = 0;
1616
1617        if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
1618            lnet_msg2bufpool(msg)->rbp_credits <= 0) {
1619                if (ni->ni_lnd->lnd_eager_recv == NULL) {
1620                        msg->msg_rx_ready_delay = 1;
1621                } else {
1622                        lnet_net_unlock(msg->msg_rx_cpt);
1623                        rc = lnet_ni_eager_recv(ni, msg);
1624                        lnet_net_lock(msg->msg_rx_cpt);
1625                }
1626        }
1627
1628        if (rc == 0)
1629                rc = lnet_post_routed_recv_locked(msg, 0);
1630        return rc;
1631}
1632
1633char *
1634lnet_msgtyp2str(int type)
1635{
1636        switch (type) {
1637        case LNET_MSG_ACK:
1638                return "ACK";
1639        case LNET_MSG_PUT:
1640                return "PUT";
1641        case LNET_MSG_GET:
1642                return "GET";
1643        case LNET_MSG_REPLY:
1644                return "REPLY";
1645        case LNET_MSG_HELLO:
1646                return "HELLO";
1647        default:
1648                return "<UNKNOWN>";
1649        }
1650}
1651EXPORT_SYMBOL(lnet_msgtyp2str);
1652
1653void
1654lnet_print_hdr(lnet_hdr_t *hdr)
1655{
1656        lnet_process_id_t src = {0};
1657        lnet_process_id_t dst = {0};
1658        char *type_str = lnet_msgtyp2str(hdr->type);
1659
1660        src.nid = hdr->src_nid;
1661        src.pid = hdr->src_pid;
1662
1663        dst.nid = hdr->dest_nid;
1664        dst.pid = hdr->dest_pid;
1665
1666        CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1667        CWARN("    From %s\n", libcfs_id2str(src));
1668        CWARN("    To   %s\n", libcfs_id2str(dst));
1669
1670        switch (hdr->type) {
1671        default:
1672                break;
1673
1674        case LNET_MSG_PUT:
1675                CWARN("    Ptl index %d, ack md %#llx.%#llx, match bits %llu\n",
1676                      hdr->msg.put.ptl_index,
1677                      hdr->msg.put.ack_wmd.wh_interface_cookie,
1678                      hdr->msg.put.ack_wmd.wh_object_cookie,
1679                      hdr->msg.put.match_bits);
1680                CWARN("    Length %d, offset %d, hdr data %#llx\n",
1681                      hdr->payload_length, hdr->msg.put.offset,
1682                      hdr->msg.put.hdr_data);
1683                break;
1684
1685        case LNET_MSG_GET:
1686                CWARN("    Ptl index %d, return md %#llx.%#llx, match bits %llu\n",
1687                      hdr->msg.get.ptl_index,
1688                      hdr->msg.get.return_wmd.wh_interface_cookie,
1689                      hdr->msg.get.return_wmd.wh_object_cookie,
1690                      hdr->msg.get.match_bits);
1691                CWARN("    Length %d, src offset %d\n",
1692                      hdr->msg.get.sink_length,
1693                      hdr->msg.get.src_offset);
1694                break;
1695
1696        case LNET_MSG_ACK:
1697                CWARN("    dst md %#llx.%#llx, manipulated length %d\n",
1698                      hdr->msg.ack.dst_wmd.wh_interface_cookie,
1699                      hdr->msg.ack.dst_wmd.wh_object_cookie,
1700                      hdr->msg.ack.mlength);
1701                break;
1702
1703        case LNET_MSG_REPLY:
1704                CWARN("    dst md %#llx.%#llx, length %d\n",
1705                      hdr->msg.reply.dst_wmd.wh_interface_cookie,
1706                      hdr->msg.reply.dst_wmd.wh_object_cookie,
1707                      hdr->payload_length);
1708        }
1709
1710}
1711
1712int
1713lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
1714           void *private, int rdma_req)
1715{
1716        int rc = 0;
1717        int cpt;
1718        int for_me;
1719        struct lnet_msg *msg;
1720        lnet_pid_t dest_pid;
1721        lnet_nid_t dest_nid;
1722        lnet_nid_t src_nid;
1723        __u32 payload_length;
1724        __u32 type;
1725
1726        LASSERT(!in_interrupt());
1727
1728        type = le32_to_cpu(hdr->type);
1729        src_nid = le64_to_cpu(hdr->src_nid);
1730        dest_nid = le64_to_cpu(hdr->dest_nid);
1731        dest_pid = le32_to_cpu(hdr->dest_pid);
1732        payload_length = le32_to_cpu(hdr->payload_length);
1733
1734        for_me = (ni->ni_nid == dest_nid);
1735        cpt = lnet_cpt_of_nid(from_nid);
1736
1737        switch (type) {
1738        case LNET_MSG_ACK:
1739        case LNET_MSG_GET:
1740                if (payload_length > 0) {
1741                        CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1742                               libcfs_nid2str(from_nid),
1743                               libcfs_nid2str(src_nid),
1744                               lnet_msgtyp2str(type), payload_length);
1745                        return -EPROTO;
1746                }
1747                break;
1748
1749        case LNET_MSG_PUT:
1750        case LNET_MSG_REPLY:
1751                if (payload_length >
1752                   (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1753                        CERROR("%s, src %s: bad %s payload %d (%d max expected)\n",
1754                               libcfs_nid2str(from_nid),
1755                               libcfs_nid2str(src_nid),
1756                               lnet_msgtyp2str(type),
1757                               payload_length,
1758                               for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1759                        return -EPROTO;
1760                }
1761                break;
1762
1763        default:
1764                CERROR("%s, src %s: Bad message type 0x%x\n",
1765                       libcfs_nid2str(from_nid),
1766                       libcfs_nid2str(src_nid), type);
1767                return -EPROTO;
1768        }
1769
1770        if (the_lnet.ln_routing &&
1771            ni->ni_last_alive != get_seconds()) {
1772                lnet_ni_lock(ni);
1773
1774                /* NB: so far here is the only place to set NI status to "up */
1775                ni->ni_last_alive = get_seconds();
1776                if (ni->ni_status != NULL &&
1777                    ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
1778                        ni->ni_status->ns_status = LNET_NI_STATUS_UP;
1779                lnet_ni_unlock(ni);
1780        }
1781
1782        /* Regard a bad destination NID as a protocol error.  Senders should
1783         * know what they're doing; if they don't they're misconfigured, buggy
1784         * or malicious so we chop them off at the knees :) */
1785
1786        if (!for_me) {
1787                if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
1788                        /* should have gone direct */
1789                        CERROR("%s, src %s: Bad dest nid %s (should have been sent direct)\n",
1790                               libcfs_nid2str(from_nid),
1791                               libcfs_nid2str(src_nid),
1792                               libcfs_nid2str(dest_nid));
1793                        return -EPROTO;
1794                }
1795
1796                if (lnet_islocalnid(dest_nid)) {
1797                        /* dest is another local NI; sender should have used
1798                         * this node's NID on its own network */
1799                        CERROR("%s, src %s: Bad dest nid %s (it's my nid but on a different network)\n",
1800                               libcfs_nid2str(from_nid),
1801                               libcfs_nid2str(src_nid),
1802                               libcfs_nid2str(dest_nid));
1803                        return -EPROTO;
1804                }
1805
1806                if (rdma_req && type == LNET_MSG_GET) {
1807                        CERROR("%s, src %s: Bad optimized GET for %s (final destination must be me)\n",
1808                               libcfs_nid2str(from_nid),
1809                               libcfs_nid2str(src_nid),
1810                               libcfs_nid2str(dest_nid));
1811                        return -EPROTO;
1812                }
1813
1814                if (!the_lnet.ln_routing) {
1815                        CERROR("%s, src %s: Dropping message for %s (routing not enabled)\n",
1816                               libcfs_nid2str(from_nid),
1817                               libcfs_nid2str(src_nid),
1818                               libcfs_nid2str(dest_nid));
1819                        goto drop;
1820                }
1821        }
1822
1823        /* Message looks OK; we're not going to return an error, so we MUST
1824         * call back lnd_recv() come what may... */
1825
1826        if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
1827            fail_peer(src_nid, 0)) {         /* shall we now? */
1828                CERROR("%s, src %s: Dropping %s to simulate failure\n",
1829                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1830                       lnet_msgtyp2str(type));
1831                goto drop;
1832        }
1833
1834        msg = lnet_msg_alloc();
1835        if (msg == NULL) {
1836                CERROR("%s, src %s: Dropping %s (out of memory)\n",
1837                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1838                       lnet_msgtyp2str(type));
1839                goto drop;
1840        }
1841
1842        /* msg zeroed in lnet_msg_alloc;
1843         * i.e. flags all clear, pointers NULL etc
1844         */
1845
1846        msg->msg_type = type;
1847        msg->msg_private = private;
1848        msg->msg_receiving = 1;
1849        msg->msg_len = msg->msg_wanted = payload_length;
1850        msg->msg_offset = 0;
1851        msg->msg_hdr = *hdr;
1852        /* for building message event */
1853        msg->msg_from = from_nid;
1854        if (!for_me) {
1855                msg->msg_target.pid     = dest_pid;
1856                msg->msg_target.nid     = dest_nid;
1857                msg->msg_routing        = 1;
1858
1859        } else {
1860                /* convert common msg->hdr fields to host byteorder */
1861                msg->msg_hdr.type       = type;
1862                msg->msg_hdr.src_nid    = src_nid;
1863                msg->msg_hdr.src_pid    = le32_to_cpu(msg->msg_hdr.src_pid);
1864                msg->msg_hdr.dest_nid   = dest_nid;
1865                msg->msg_hdr.dest_pid   = dest_pid;
1866                msg->msg_hdr.payload_length = payload_length;
1867        }
1868
1869        lnet_net_lock(cpt);
1870        rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
1871        if (rc != 0) {
1872                lnet_net_unlock(cpt);
1873                CERROR("%s, src %s: Dropping %s (error %d looking up sender)\n",
1874                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1875                       lnet_msgtyp2str(type), rc);
1876                lnet_msg_free(msg);
1877                goto drop;
1878        }
1879
1880        if (lnet_isrouter(msg->msg_rxpeer)) {
1881                lnet_peer_set_alive(msg->msg_rxpeer);
1882                if (avoid_asym_router_failure &&
1883                    LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
1884                        /* received a remote message from router, update
1885                         * remote NI status on this router.
1886                         * NB: multi-hop routed message will be ignored.
1887                         */
1888                        lnet_router_ni_update_locked(msg->msg_rxpeer,
1889                                                     LNET_NIDNET(src_nid));
1890                }
1891        }
1892
1893        lnet_msg_commit(msg, cpt);
1894
1895        if (!for_me) {
1896                rc = lnet_parse_forward_locked(ni, msg);
1897                lnet_net_unlock(cpt);
1898
1899                if (rc < 0)
1900                        goto free_drop;
1901                if (rc == 0) {
1902                        lnet_ni_recv(ni, msg->msg_private, msg, 0,
1903                                     0, payload_length, payload_length);
1904                }
1905                return 0;
1906        }
1907
1908        lnet_net_unlock(cpt);
1909
1910        switch (type) {
1911        case LNET_MSG_ACK:
1912                rc = lnet_parse_ack(ni, msg);
1913                break;
1914        case LNET_MSG_PUT:
1915                rc = lnet_parse_put(ni, msg);
1916                break;
1917        case LNET_MSG_GET:
1918                rc = lnet_parse_get(ni, msg, rdma_req);
1919                break;
1920        case LNET_MSG_REPLY:
1921                rc = lnet_parse_reply(ni, msg);
1922                break;
1923        default:
1924                LASSERT(0);
1925                rc = -EPROTO;
1926                goto free_drop;  /* prevent an unused label if !kernel */
1927        }
1928
1929        if (rc == 0)
1930                return 0;
1931
1932        LASSERT(rc == ENOENT);
1933
1934 free_drop:
1935        LASSERT(msg->msg_md == NULL);
1936        lnet_finalize(ni, msg, rc);
1937
1938 drop:
1939        lnet_drop_message(ni, cpt, private, payload_length);
1940        return 0;
1941}
1942EXPORT_SYMBOL(lnet_parse);
1943
1944void
1945lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
1946{
1947        while (!list_empty(head)) {
1948                lnet_process_id_t id = {0};
1949                lnet_msg_t *msg;
1950
1951                msg = list_entry(head->next, lnet_msg_t, msg_list);
1952                list_del(&msg->msg_list);
1953
1954                id.nid = msg->msg_hdr.src_nid;
1955                id.pid = msg->msg_hdr.src_pid;
1956
1957                LASSERT(msg->msg_md == NULL);
1958                LASSERT(msg->msg_rx_delayed);
1959                LASSERT(msg->msg_rxpeer != NULL);
1960                LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1961
1962                CWARN("Dropping delayed PUT from %s portal %d match %llu offset %d length %d: %s\n",
1963                      libcfs_id2str(id),
1964                      msg->msg_hdr.msg.put.ptl_index,
1965                      msg->msg_hdr.msg.put.match_bits,
1966                      msg->msg_hdr.msg.put.offset,
1967                      msg->msg_hdr.payload_length, reason);
1968
1969                /* NB I can't drop msg's ref on msg_rxpeer until after I've
1970                 * called lnet_drop_message(), so I just hang onto msg as well
1971                 * until that's done */
1972
1973                lnet_drop_message(msg->msg_rxpeer->lp_ni,
1974                                  msg->msg_rxpeer->lp_cpt,
1975                                  msg->msg_private, msg->msg_len);
1976                /*
1977                 * NB: message will not generate event because w/o attached MD,
1978                 * but we still should give error code so lnet_msg_decommit()
1979                 * can skip counters operations and other checks.
1980                 */
1981                lnet_finalize(msg->msg_rxpeer->lp_ni, msg, -ENOENT);
1982        }
1983}
1984
1985void
1986lnet_recv_delayed_msg_list(struct list_head *head)
1987{
1988        while (!list_empty(head)) {
1989                lnet_msg_t *msg;
1990                lnet_process_id_t id;
1991
1992                msg = list_entry(head->next, lnet_msg_t, msg_list);
1993                list_del(&msg->msg_list);
1994
1995                /* md won't disappear under me, since each msg
1996                 * holds a ref on it */
1997
1998                id.nid = msg->msg_hdr.src_nid;
1999                id.pid = msg->msg_hdr.src_pid;
2000
2001                LASSERT(msg->msg_rx_delayed);
2002                LASSERT(msg->msg_md != NULL);
2003                LASSERT(msg->msg_rxpeer != NULL);
2004                LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
2005
2006                CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d match %llu offset %d length %d.\n",
2007                       libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
2008                       msg->msg_hdr.msg.put.match_bits,
2009                       msg->msg_hdr.msg.put.offset,
2010                       msg->msg_hdr.payload_length);
2011
2012                lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
2013        }
2014}
2015
2016/**
2017 * Initiate an asynchronous PUT operation.
2018 *
2019 * There are several events associated with a PUT: completion of the send on
2020 * the initiator node (LNET_EVENT_SEND), and when the send completes
2021 * successfully, the receipt of an acknowledgment (LNET_EVENT_ACK) indicating
2022 * that the operation was accepted by the target. The event LNET_EVENT_PUT is
2023 * used at the target node to indicate the completion of incoming data
2024 * delivery.
2025 *
2026 * The local events will be logged in the EQ associated with the MD pointed to
2027 * by \a mdh handle. Using a MD without an associated EQ results in these
2028 * events being discarded. In this case, the caller must have another
2029 * mechanism (e.g., a higher level protocol) for determining when it is safe
2030 * to modify the memory region associated with the MD.
2031 *
2032 * Note that LNet does not guarantee the order of LNET_EVENT_SEND and
2033 * LNET_EVENT_ACK, though intuitively ACK should happen after SEND.
2034 *
2035 * \param self Indicates the NID of a local interface through which to send
2036 * the PUT request. Use LNET_NID_ANY to let LNet choose one by itself.
2037 * \param mdh A handle for the MD that describes the memory to be sent. The MD
2038 * must be "free floating" (See LNetMDBind()).
2039 * \param ack Controls whether an acknowledgment is requested.
2040 * Acknowledgments are only sent when they are requested by the initiating
2041 * process and the target MD enables them.
2042 * \param target A process identifier for the target process.
2043 * \param portal The index in the \a target's portal table.
2044 * \param match_bits The match bits to use for MD selection at the target
2045 * process.
2046 * \param offset The offset into the target MD (only used when the target
2047 * MD has the LNET_MD_MANAGE_REMOTE option set).
2048 * \param hdr_data 64 bits of user data that can be included in the message
2049 * header. This data is written to an event queue entry at the target if an
2050 * EQ is present on the matching MD.
2051 *
2052 * \retval  0      Success, and only in this case events will be generated
2053 * and logged to EQ (if it exists).
2054 * \retval -EIO    Simulated failure.
2055 * \retval -ENOMEM Memory allocation failure.
2056 * \retval -ENOENT Invalid MD object.
2057 *
2058 * \see lnet_event_t::hdr_data and lnet_event_kind_t.
2059 */
2060int
2061LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
2062        lnet_process_id_t target, unsigned int portal,
2063        __u64 match_bits, unsigned int offset,
2064        __u64 hdr_data)
2065{
2066        struct lnet_msg *msg;
2067        struct lnet_libmd *md;
2068        int cpt;
2069        int rc;
2070
2071        LASSERT(the_lnet.ln_init);
2072        LASSERT(the_lnet.ln_refcount > 0);
2073
2074        if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
2075            fail_peer(target.nid, 1)) { /* shall we now? */
2076                CERROR("Dropping PUT to %s: simulated failure\n",
2077                       libcfs_id2str(target));
2078                return -EIO;
2079        }
2080
2081        msg = lnet_msg_alloc();
2082        if (msg == NULL) {
2083                CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2084                       libcfs_id2str(target));
2085                return -ENOMEM;
2086        }
2087        msg->msg_vmflush = !!memory_pressure_get();
2088
2089        cpt = lnet_cpt_of_cookie(mdh.cookie);
2090        lnet_res_lock(cpt);
2091
2092        md = lnet_handle2md(&mdh);
2093        if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2094                CERROR("Dropping PUT (%llu:%d:%s): MD (%d) invalid\n",
2095                       match_bits, portal, libcfs_id2str(target),
2096                       md == NULL ? -1 : md->md_threshold);
2097                if (md != NULL && md->md_me != NULL)
2098                        CERROR("Source MD also attached to portal %d\n",
2099                               md->md_me->me_portal);
2100                lnet_res_unlock(cpt);
2101
2102                lnet_msg_free(msg);
2103                return -ENOENT;
2104        }
2105
2106        CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2107
2108        lnet_msg_attach_md(msg, md, 0, 0);
2109
2110        lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2111
2112        msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2113        msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2114        msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2115        msg->msg_hdr.msg.put.hdr_data = hdr_data;
2116
2117        /* NB handles only looked up by creator (no flips) */
2118        if (ack == LNET_ACK_REQ) {
2119                msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2120                        the_lnet.ln_interface_cookie;
2121                msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2122                        md->md_lh.lh_cookie;
2123        } else {
2124                msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2125                        LNET_WIRE_HANDLE_COOKIE_NONE;
2126                msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2127                        LNET_WIRE_HANDLE_COOKIE_NONE;
2128        }
2129
2130        lnet_res_unlock(cpt);
2131
2132        lnet_build_msg_event(msg, LNET_EVENT_SEND);
2133
2134        rc = lnet_send(self, msg, LNET_NID_ANY);
2135        if (rc != 0) {
2136                CNETERR("Error sending PUT to %s: %d\n",
2137                       libcfs_id2str(target), rc);
2138                lnet_finalize(NULL, msg, rc);
2139        }
2140
2141        /* completion will be signalled by an event */
2142        return 0;
2143}
2144EXPORT_SYMBOL(LNetPut);
2145
2146lnet_msg_t *
2147lnet_create_reply_msg(lnet_ni_t *ni, lnet_msg_t *getmsg)
2148{
2149        /* The LND can DMA direct to the GET md (i.e. no REPLY msg).  This
2150         * returns a msg for the LND to pass to lnet_finalize() when the sink
2151         * data has been received.
2152         *
2153         * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
2154         * lnet_finalize() is called on it, so the LND must call this first */
2155
2156        struct lnet_msg *msg = lnet_msg_alloc();
2157        struct lnet_libmd *getmd = getmsg->msg_md;
2158        lnet_process_id_t peer_id = getmsg->msg_target;
2159        int cpt;
2160
2161        LASSERT(!getmsg->msg_target_is_router);
2162        LASSERT(!getmsg->msg_routing);
2163
2164        cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
2165        lnet_res_lock(cpt);
2166
2167        LASSERT(getmd->md_refcount > 0);
2168
2169        if (msg == NULL) {
2170                CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
2171                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2172                goto drop;
2173        }
2174
2175        if (getmd->md_threshold == 0) {
2176                CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
2177                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
2178                        getmd);
2179                lnet_res_unlock(cpt);
2180                goto drop;
2181        }
2182
2183        LASSERT(getmd->md_offset == 0);
2184
2185        CDEBUG(D_NET, "%s: Reply from %s md %p\n",
2186               libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2187
2188        /* setup information for lnet_build_msg_event */
2189        msg->msg_from = peer_id.nid;
2190        msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
2191        msg->msg_hdr.src_nid = peer_id.nid;
2192        msg->msg_hdr.payload_length = getmd->md_length;
2193        msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
2194
2195        lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
2196        lnet_res_unlock(cpt);
2197
2198        cpt = lnet_cpt_of_nid(peer_id.nid);
2199
2200        lnet_net_lock(cpt);
2201        lnet_msg_commit(msg, cpt);
2202        lnet_net_unlock(cpt);
2203
2204        lnet_build_msg_event(msg, LNET_EVENT_REPLY);
2205
2206        return msg;
2207
2208 drop:
2209        cpt = lnet_cpt_of_nid(peer_id.nid);
2210
2211        lnet_net_lock(cpt);
2212        the_lnet.ln_counters[cpt]->drop_count++;
2213        the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
2214        lnet_net_unlock(cpt);
2215
2216        if (msg != NULL)
2217                lnet_msg_free(msg);
2218
2219        return NULL;
2220}
2221EXPORT_SYMBOL(lnet_create_reply_msg);
2222
2223void
2224lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2225{
2226        /* Set the REPLY length, now the RDMA that elides the REPLY message has
2227         * completed and I know it. */
2228        LASSERT(reply != NULL);
2229        LASSERT(reply->msg_type == LNET_MSG_GET);
2230        LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
2231
2232        /* NB I trusted my peer to RDMA.  If she tells me she's written beyond
2233         * the end of my buffer, I might as well be dead. */
2234        LASSERT(len <= reply->msg_ev.mlength);
2235
2236        reply->msg_ev.mlength = len;
2237}
2238EXPORT_SYMBOL(lnet_set_reply_msg_len);
2239
2240/**
2241 * Initiate an asynchronous GET operation.
2242 *
2243 * On the initiator node, an LNET_EVENT_SEND is logged when the GET request
2244 * is sent, and an LNET_EVENT_REPLY is logged when the data returned from
2245 * the target node in the REPLY has been written to local MD.
2246 *
2247 * On the target node, an LNET_EVENT_GET is logged when the GET request
2248 * arrives and is accepted into a MD.
2249 *
2250 * \param self,target,portal,match_bits,offset See the discussion in LNetPut().
2251 * \param mdh A handle for the MD that describes the memory into which the
2252 * requested data will be received. The MD must be "free floating"
2253 * (See LNetMDBind()).
2254 *
2255 * \retval  0      Success, and only in this case events will be generated
2256 * and logged to EQ (if it exists) of the MD.
2257 * \retval -EIO    Simulated failure.
2258 * \retval -ENOMEM Memory allocation failure.
2259 * \retval -ENOENT Invalid MD object.
2260 */
2261int
2262LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
2263        lnet_process_id_t target, unsigned int portal,
2264        __u64 match_bits, unsigned int offset)
2265{
2266        struct lnet_msg *msg;
2267        struct lnet_libmd *md;
2268        int cpt;
2269        int rc;
2270
2271        LASSERT(the_lnet.ln_init);
2272        LASSERT(the_lnet.ln_refcount > 0);
2273
2274        if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
2275            fail_peer(target.nid, 1)) {   /* shall we now? */
2276                CERROR("Dropping GET to %s: simulated failure\n",
2277                       libcfs_id2str(target));
2278                return -EIO;
2279        }
2280
2281        msg = lnet_msg_alloc();
2282        if (msg == NULL) {
2283                CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2284                       libcfs_id2str(target));
2285                return -ENOMEM;
2286        }
2287
2288        cpt = lnet_cpt_of_cookie(mdh.cookie);
2289        lnet_res_lock(cpt);
2290
2291        md = lnet_handle2md(&mdh);
2292        if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2293                CERROR("Dropping GET (%llu:%d:%s): MD (%d) invalid\n",
2294                       match_bits, portal, libcfs_id2str(target),
2295                       md == NULL ? -1 : md->md_threshold);
2296                if (md != NULL && md->md_me != NULL)
2297                        CERROR("REPLY MD also attached to portal %d\n",
2298                               md->md_me->me_portal);
2299
2300                lnet_res_unlock(cpt);
2301
2302                lnet_msg_free(msg);
2303                return -ENOENT;
2304        }
2305
2306        CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2307
2308        lnet_msg_attach_md(msg, md, 0, 0);
2309
2310        lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2311
2312        msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2313        msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2314        msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2315        msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2316
2317        /* NB handles only looked up by creator (no flips) */
2318        msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
2319                the_lnet.ln_interface_cookie;
2320        msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
2321                md->md_lh.lh_cookie;
2322
2323        lnet_res_unlock(cpt);
2324
2325        lnet_build_msg_event(msg, LNET_EVENT_SEND);
2326
2327        rc = lnet_send(self, msg, LNET_NID_ANY);
2328        if (rc < 0) {
2329                CNETERR("Error sending GET to %s: %d\n",
2330                       libcfs_id2str(target), rc);
2331                lnet_finalize(NULL, msg, rc);
2332        }
2333
2334        /* completion will be signalled by an event */
2335        return 0;
2336}
2337EXPORT_SYMBOL(LNetGet);
2338
2339/**
2340 * Calculate distance to node at \a dstnid.
2341 *
2342 * \param dstnid Target NID.
2343 * \param srcnidp If not NULL, NID of the local interface to reach \a dstnid
2344 * is saved here.
2345 * \param orderp If not NULL, order of the route to reach \a dstnid is saved
2346 * here.
2347 *
2348 * \retval 0 If \a dstnid belongs to a local interface, and reserved option
2349 * local_nid_dist_zero is set, which is the default.
2350 * \retval positives Distance to target NID, i.e. number of hops plus one.
2351 * \retval -EHOSTUNREACH If \a dstnid is not reachable.
2352 */
2353int
2354LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2355{
2356        struct list_head *e;
2357        struct lnet_ni *ni;
2358        lnet_remotenet_t *rnet;
2359        __u32 dstnet = LNET_NIDNET(dstnid);
2360        int hops;
2361        int cpt;
2362        __u32 order = 2;
2363        struct list_head *rn_list;
2364
2365        /* if !local_nid_dist_zero, I don't return a distance of 0 ever
2366         * (when lustre sees a distance of 0, it substitutes 0@lo), so I
2367         * keep order 0 free for 0@lo and order 1 free for a local NID
2368         * match */
2369
2370        LASSERT(the_lnet.ln_init);
2371        LASSERT(the_lnet.ln_refcount > 0);
2372
2373        cpt = lnet_net_lock_current();
2374
2375        list_for_each(e, &the_lnet.ln_nis) {
2376                ni = list_entry(e, lnet_ni_t, ni_list);
2377
2378                if (ni->ni_nid == dstnid) {
2379                        if (srcnidp != NULL)
2380                                *srcnidp = dstnid;
2381                        if (orderp != NULL) {
2382                                if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2383                                        *orderp = 0;
2384                                else
2385                                        *orderp = 1;
2386                        }
2387                        lnet_net_unlock(cpt);
2388
2389                        return local_nid_dist_zero ? 0 : 1;
2390                }
2391
2392                if (LNET_NIDNET(ni->ni_nid) == dstnet) {
2393                        if (srcnidp != NULL)
2394                                *srcnidp = ni->ni_nid;
2395                        if (orderp != NULL)
2396                                *orderp = order;
2397                        lnet_net_unlock(cpt);
2398                        return 1;
2399                }
2400
2401                order++;
2402        }
2403
2404        rn_list = lnet_net2rnethash(dstnet);
2405        list_for_each(e, rn_list) {
2406                rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2407
2408                if (rnet->lrn_net == dstnet) {
2409                        lnet_route_t *route;
2410                        lnet_route_t *shortest = NULL;
2411
2412                        LASSERT(!list_empty(&rnet->lrn_routes));
2413
2414                        list_for_each_entry(route, &rnet->lrn_routes,
2415                                                lr_list) {
2416                                if (shortest == NULL ||
2417                                    route->lr_hops < shortest->lr_hops)
2418                                        shortest = route;
2419                        }
2420
2421                        LASSERT(shortest != NULL);
2422                        hops = shortest->lr_hops;
2423                        if (srcnidp != NULL)
2424                                *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
2425                        if (orderp != NULL)
2426                                *orderp = order;
2427                        lnet_net_unlock(cpt);
2428                        return hops + 1;
2429                }
2430                order++;
2431        }
2432
2433        lnet_net_unlock(cpt);
2434        return -EHOSTUNREACH;
2435}
2436EXPORT_SYMBOL(LNetDist);
2437