linux/drivers/staging/lustre/lnet/lnet/lib-move.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lnet/lnet/lib-move.c
  37 *
  38 * Data movement routines
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_LNET
  42
  43#include <linux/lnet/lib-lnet.h>
  44
  45static int local_nid_dist_zero = 1;
  46module_param(local_nid_dist_zero, int, 0444);
  47MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
  48
  49int
  50lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
  51{
  52        lnet_test_peer_t  *tp;
  53        struct list_head        *el;
  54        struct list_head        *next;
  55        struct list_head         cull;
  56
  57        LASSERT(the_lnet.ln_init);
  58
  59        /* NB: use lnet_net_lock(0) to serialize operations on test peers */
  60        if (threshold != 0) {
  61                /* Adding a new entry */
  62                LIBCFS_ALLOC(tp, sizeof(*tp));
  63                if (tp == NULL)
  64                        return -ENOMEM;
  65
  66                tp->tp_nid = nid;
  67                tp->tp_threshold = threshold;
  68
  69                lnet_net_lock(0);
  70                list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
  71                lnet_net_unlock(0);
  72                return 0;
  73        }
  74
  75        /* removing entries */
  76        INIT_LIST_HEAD(&cull);
  77
  78        lnet_net_lock(0);
  79
  80        list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
  81                tp = list_entry(el, lnet_test_peer_t, tp_list);
  82
  83                if (tp->tp_threshold == 0 ||    /* needs culling anyway */
  84                    nid == LNET_NID_ANY ||       /* removing all entries */
  85                    tp->tp_nid == nid) {          /* matched this one */
  86                        list_del(&tp->tp_list);
  87                        list_add(&tp->tp_list, &cull);
  88                }
  89        }
  90
  91        lnet_net_unlock(0);
  92
  93        while (!list_empty(&cull)) {
  94                tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
  95
  96                list_del(&tp->tp_list);
  97                LIBCFS_FREE(tp, sizeof(*tp));
  98        }
  99        return 0;
 100}
 101
 102static int
 103fail_peer(lnet_nid_t nid, int outgoing)
 104{
 105        lnet_test_peer_t *tp;
 106        struct list_head       *el;
 107        struct list_head       *next;
 108        struct list_head        cull;
 109        int            fail = 0;
 110
 111        INIT_LIST_HEAD(&cull);
 112
 113        /* NB: use lnet_net_lock(0) to serialize operations on test peers */
 114        lnet_net_lock(0);
 115
 116        list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
 117                tp = list_entry(el, lnet_test_peer_t, tp_list);
 118
 119                if (tp->tp_threshold == 0) {
 120                        /* zombie entry */
 121                        if (outgoing) {
 122                                /* only cull zombies on outgoing tests,
 123                                 * since we may be at interrupt priority on
 124                                 * incoming messages. */
 125                                list_del(&tp->tp_list);
 126                                list_add(&tp->tp_list, &cull);
 127                        }
 128                        continue;
 129                }
 130
 131                if (tp->tp_nid == LNET_NID_ANY || /* fail every peer */
 132                    nid == tp->tp_nid) {        /* fail this peer */
 133                        fail = 1;
 134
 135                        if (tp->tp_threshold != LNET_MD_THRESH_INF) {
 136                                tp->tp_threshold--;
 137                                if (outgoing &&
 138                                    tp->tp_threshold == 0) {
 139                                        /* see above */
 140                                        list_del(&tp->tp_list);
 141                                        list_add(&tp->tp_list, &cull);
 142                                }
 143                        }
 144                        break;
 145                }
 146        }
 147
 148        lnet_net_unlock(0);
 149
 150        while (!list_empty(&cull)) {
 151                tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
 152                list_del(&tp->tp_list);
 153
 154                LIBCFS_FREE(tp, sizeof(*tp));
 155        }
 156
 157        return fail;
 158}
 159
 160unsigned int
 161lnet_iov_nob(unsigned int niov, struct iovec *iov)
 162{
 163        unsigned int nob = 0;
 164
 165        while (niov-- > 0)
 166                nob += (iov++)->iov_len;
 167
 168        return nob;
 169}
 170EXPORT_SYMBOL(lnet_iov_nob);
 171
 172void
 173lnet_copy_iov2iov(unsigned int ndiov, struct iovec *diov, unsigned int doffset,
 174                   unsigned int nsiov, struct iovec *siov, unsigned int soffset,
 175                   unsigned int nob)
 176{
 177        /* NB diov, siov are READ-ONLY */
 178        unsigned int  this_nob;
 179
 180        if (nob == 0)
 181                return;
 182
 183        /* skip complete frags before 'doffset' */
 184        LASSERT(ndiov > 0);
 185        while (doffset >= diov->iov_len) {
 186                doffset -= diov->iov_len;
 187                diov++;
 188                ndiov--;
 189                LASSERT(ndiov > 0);
 190        }
 191
 192        /* skip complete frags before 'soffset' */
 193        LASSERT(nsiov > 0);
 194        while (soffset >= siov->iov_len) {
 195                soffset -= siov->iov_len;
 196                siov++;
 197                nsiov--;
 198                LASSERT(nsiov > 0);
 199        }
 200
 201        do {
 202                LASSERT(ndiov > 0);
 203                LASSERT(nsiov > 0);
 204                this_nob = MIN(diov->iov_len - doffset,
 205                               siov->iov_len - soffset);
 206                this_nob = MIN(this_nob, nob);
 207
 208                memcpy((char *)diov->iov_base + doffset,
 209                        (char *)siov->iov_base + soffset, this_nob);
 210                nob -= this_nob;
 211
 212                if (diov->iov_len > doffset + this_nob) {
 213                        doffset += this_nob;
 214                } else {
 215                        diov++;
 216                        ndiov--;
 217                        doffset = 0;
 218                }
 219
 220                if (siov->iov_len > soffset + this_nob) {
 221                        soffset += this_nob;
 222                } else {
 223                        siov++;
 224                        nsiov--;
 225                        soffset = 0;
 226                }
 227        } while (nob > 0);
 228}
 229EXPORT_SYMBOL(lnet_copy_iov2iov);
 230
 231int
 232lnet_extract_iov(int dst_niov, struct iovec *dst,
 233                  int src_niov, struct iovec *src,
 234                  unsigned int offset, unsigned int len)
 235{
 236        /* Initialise 'dst' to the subset of 'src' starting at 'offset',
 237         * for exactly 'len' bytes, and return the number of entries.
 238         * NB not destructive to 'src' */
 239        unsigned int    frag_len;
 240        unsigned int    niov;
 241
 242        if (len == 0)                      /* no data => */
 243                return 0;                    /* no frags */
 244
 245        LASSERT(src_niov > 0);
 246        while (offset >= src->iov_len) {      /* skip initial frags */
 247                offset -= src->iov_len;
 248                src_niov--;
 249                src++;
 250                LASSERT(src_niov > 0);
 251        }
 252
 253        niov = 1;
 254        for (;;) {
 255                LASSERT(src_niov > 0);
 256                LASSERT((int)niov <= dst_niov);
 257
 258                frag_len = src->iov_len - offset;
 259                dst->iov_base = ((char *)src->iov_base) + offset;
 260
 261                if (len <= frag_len) {
 262                        dst->iov_len = len;
 263                        return niov;
 264                }
 265
 266                dst->iov_len = frag_len;
 267
 268                len -= frag_len;
 269                dst++;
 270                src++;
 271                niov++;
 272                src_niov--;
 273                offset = 0;
 274        }
 275}
 276EXPORT_SYMBOL(lnet_extract_iov);
 277
 278
 279unsigned int
 280lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
 281{
 282        unsigned int  nob = 0;
 283
 284        while (niov-- > 0)
 285                nob += (kiov++)->kiov_len;
 286
 287        return nob;
 288}
 289EXPORT_SYMBOL(lnet_kiov_nob);
 290
 291void
 292lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
 293                    unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
 294                    unsigned int nob)
 295{
 296        /* NB diov, siov are READ-ONLY */
 297        unsigned int    this_nob;
 298        char       *daddr = NULL;
 299        char       *saddr = NULL;
 300
 301        if (nob == 0)
 302                return;
 303
 304        LASSERT(!in_interrupt());
 305
 306        LASSERT(ndiov > 0);
 307        while (doffset >= diov->kiov_len) {
 308                doffset -= diov->kiov_len;
 309                diov++;
 310                ndiov--;
 311                LASSERT(ndiov > 0);
 312        }
 313
 314        LASSERT(nsiov > 0);
 315        while (soffset >= siov->kiov_len) {
 316                soffset -= siov->kiov_len;
 317                siov++;
 318                nsiov--;
 319                LASSERT(nsiov > 0);
 320        }
 321
 322        do {
 323                LASSERT(ndiov > 0);
 324                LASSERT(nsiov > 0);
 325                this_nob = MIN(diov->kiov_len - doffset,
 326                               siov->kiov_len - soffset);
 327                this_nob = MIN(this_nob, nob);
 328
 329                if (daddr == NULL)
 330                        daddr = ((char *)kmap(diov->kiov_page)) +
 331                                diov->kiov_offset + doffset;
 332                if (saddr == NULL)
 333                        saddr = ((char *)kmap(siov->kiov_page)) +
 334                                siov->kiov_offset + soffset;
 335
 336                /* Vanishing risk of kmap deadlock when mapping 2 pages.
 337                 * However in practice at least one of the kiovs will be mapped
 338                 * kernel pages and the map/unmap will be NOOPs */
 339
 340                memcpy(daddr, saddr, this_nob);
 341                nob -= this_nob;
 342
 343                if (diov->kiov_len > doffset + this_nob) {
 344                        daddr += this_nob;
 345                        doffset += this_nob;
 346                } else {
 347                        kunmap(diov->kiov_page);
 348                        daddr = NULL;
 349                        diov++;
 350                        ndiov--;
 351                        doffset = 0;
 352                }
 353
 354                if (siov->kiov_len > soffset + this_nob) {
 355                        saddr += this_nob;
 356                        soffset += this_nob;
 357                } else {
 358                        kunmap(siov->kiov_page);
 359                        saddr = NULL;
 360                        siov++;
 361                        nsiov--;
 362                        soffset = 0;
 363                }
 364        } while (nob > 0);
 365
 366        if (daddr != NULL)
 367                kunmap(diov->kiov_page);
 368        if (saddr != NULL)
 369                kunmap(siov->kiov_page);
 370}
 371EXPORT_SYMBOL(lnet_copy_kiov2kiov);
 372
 373void
 374lnet_copy_kiov2iov(unsigned int niov, struct iovec *iov, unsigned int iovoffset,
 375                   unsigned int nkiov, lnet_kiov_t *kiov,
 376                   unsigned int kiovoffset, unsigned int nob)
 377{
 378        /* NB iov, kiov are READ-ONLY */
 379        unsigned int    this_nob;
 380        char       *addr = NULL;
 381
 382        if (nob == 0)
 383                return;
 384
 385        LASSERT(!in_interrupt());
 386
 387        LASSERT(niov > 0);
 388        while (iovoffset >= iov->iov_len) {
 389                iovoffset -= iov->iov_len;
 390                iov++;
 391                niov--;
 392                LASSERT(niov > 0);
 393        }
 394
 395        LASSERT(nkiov > 0);
 396        while (kiovoffset >= kiov->kiov_len) {
 397                kiovoffset -= kiov->kiov_len;
 398                kiov++;
 399                nkiov--;
 400                LASSERT(nkiov > 0);
 401        }
 402
 403        do {
 404                LASSERT(niov > 0);
 405                LASSERT(nkiov > 0);
 406                this_nob = MIN(iov->iov_len - iovoffset,
 407                               kiov->kiov_len - kiovoffset);
 408                this_nob = MIN(this_nob, nob);
 409
 410                if (addr == NULL)
 411                        addr = ((char *)kmap(kiov->kiov_page)) +
 412                                kiov->kiov_offset + kiovoffset;
 413
 414                memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
 415                nob -= this_nob;
 416
 417                if (iov->iov_len > iovoffset + this_nob) {
 418                        iovoffset += this_nob;
 419                } else {
 420                        iov++;
 421                        niov--;
 422                        iovoffset = 0;
 423                }
 424
 425                if (kiov->kiov_len > kiovoffset + this_nob) {
 426                        addr += this_nob;
 427                        kiovoffset += this_nob;
 428                } else {
 429                        kunmap(kiov->kiov_page);
 430                        addr = NULL;
 431                        kiov++;
 432                        nkiov--;
 433                        kiovoffset = 0;
 434                }
 435
 436        } while (nob > 0);
 437
 438        if (addr != NULL)
 439                kunmap(kiov->kiov_page);
 440}
 441EXPORT_SYMBOL(lnet_copy_kiov2iov);
 442
 443void
 444lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov,
 445                   unsigned int kiovoffset, unsigned int niov,
 446                   struct iovec *iov, unsigned int iovoffset,
 447                   unsigned int nob)
 448{
 449        /* NB kiov, iov are READ-ONLY */
 450        unsigned int    this_nob;
 451        char       *addr = NULL;
 452
 453        if (nob == 0)
 454                return;
 455
 456        LASSERT(!in_interrupt());
 457
 458        LASSERT(nkiov > 0);
 459        while (kiovoffset >= kiov->kiov_len) {
 460                kiovoffset -= kiov->kiov_len;
 461                kiov++;
 462                nkiov--;
 463                LASSERT(nkiov > 0);
 464        }
 465
 466        LASSERT(niov > 0);
 467        while (iovoffset >= iov->iov_len) {
 468                iovoffset -= iov->iov_len;
 469                iov++;
 470                niov--;
 471                LASSERT(niov > 0);
 472        }
 473
 474        do {
 475                LASSERT(nkiov > 0);
 476                LASSERT(niov > 0);
 477                this_nob = MIN(kiov->kiov_len - kiovoffset,
 478                               iov->iov_len - iovoffset);
 479                this_nob = MIN(this_nob, nob);
 480
 481                if (addr == NULL)
 482                        addr = ((char *)kmap(kiov->kiov_page)) +
 483                                kiov->kiov_offset + kiovoffset;
 484
 485                memcpy(addr, (char *)iov->iov_base + iovoffset, this_nob);
 486                nob -= this_nob;
 487
 488                if (kiov->kiov_len > kiovoffset + this_nob) {
 489                        addr += this_nob;
 490                        kiovoffset += this_nob;
 491                } else {
 492                        kunmap(kiov->kiov_page);
 493                        addr = NULL;
 494                        kiov++;
 495                        nkiov--;
 496                        kiovoffset = 0;
 497                }
 498
 499                if (iov->iov_len > iovoffset + this_nob) {
 500                        iovoffset += this_nob;
 501                } else {
 502                        iov++;
 503                        niov--;
 504                        iovoffset = 0;
 505                }
 506        } while (nob > 0);
 507
 508        if (addr != NULL)
 509                kunmap(kiov->kiov_page);
 510}
 511EXPORT_SYMBOL(lnet_copy_iov2kiov);
 512
 513int
 514lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
 515                   int src_niov, lnet_kiov_t *src,
 516                   unsigned int offset, unsigned int len)
 517{
 518        /* Initialise 'dst' to the subset of 'src' starting at 'offset',
 519         * for exactly 'len' bytes, and return the number of entries.
 520         * NB not destructive to 'src' */
 521        unsigned int    frag_len;
 522        unsigned int    niov;
 523
 524        if (len == 0)                      /* no data => */
 525                return 0;                    /* no frags */
 526
 527        LASSERT(src_niov > 0);
 528        while (offset >= src->kiov_len) {      /* skip initial frags */
 529                offset -= src->kiov_len;
 530                src_niov--;
 531                src++;
 532                LASSERT(src_niov > 0);
 533        }
 534
 535        niov = 1;
 536        for (;;) {
 537                LASSERT(src_niov > 0);
 538                LASSERT((int)niov <= dst_niov);
 539
 540                frag_len = src->kiov_len - offset;
 541                dst->kiov_page = src->kiov_page;
 542                dst->kiov_offset = src->kiov_offset + offset;
 543
 544                if (len <= frag_len) {
 545                        dst->kiov_len = len;
 546                        LASSERT(dst->kiov_offset + dst->kiov_len
 547                                             <= PAGE_CACHE_SIZE);
 548                        return niov;
 549                }
 550
 551                dst->kiov_len = frag_len;
 552                LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_CACHE_SIZE);
 553
 554                len -= frag_len;
 555                dst++;
 556                src++;
 557                niov++;
 558                src_niov--;
 559                offset = 0;
 560        }
 561}
 562EXPORT_SYMBOL(lnet_extract_kiov);
 563
 564void
 565lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
 566             unsigned int offset, unsigned int mlen, unsigned int rlen)
 567{
 568        unsigned int  niov = 0;
 569        struct iovec *iov = NULL;
 570        lnet_kiov_t  *kiov = NULL;
 571        int        rc;
 572
 573        LASSERT(!in_interrupt());
 574        LASSERT(mlen == 0 || msg != NULL);
 575
 576        if (msg != NULL) {
 577                LASSERT(msg->msg_receiving);
 578                LASSERT(!msg->msg_sending);
 579                LASSERT(rlen == msg->msg_len);
 580                LASSERT(mlen <= msg->msg_len);
 581                LASSERT(msg->msg_offset == offset);
 582                LASSERT(msg->msg_wanted == mlen);
 583
 584                msg->msg_receiving = 0;
 585
 586                if (mlen != 0) {
 587                        niov = msg->msg_niov;
 588                        iov  = msg->msg_iov;
 589                        kiov = msg->msg_kiov;
 590
 591                        LASSERT(niov > 0);
 592                        LASSERT((iov == NULL) != (kiov == NULL));
 593                }
 594        }
 595
 596        rc = (ni->ni_lnd->lnd_recv)(ni, private, msg, delayed,
 597                                    niov, iov, kiov, offset, mlen, rlen);
 598        if (rc < 0)
 599                lnet_finalize(ni, msg, rc);
 600}
 601
 602void
 603lnet_setpayloadbuffer(lnet_msg_t *msg)
 604{
 605        lnet_libmd_t *md = msg->msg_md;
 606
 607        LASSERT(msg->msg_len > 0);
 608        LASSERT(!msg->msg_routing);
 609        LASSERT(md != NULL);
 610        LASSERT(msg->msg_niov == 0);
 611        LASSERT(msg->msg_iov == NULL);
 612        LASSERT(msg->msg_kiov == NULL);
 613
 614        msg->msg_niov = md->md_niov;
 615        if ((md->md_options & LNET_MD_KIOV) != 0)
 616                msg->msg_kiov = md->md_iov.kiov;
 617        else
 618                msg->msg_iov = md->md_iov.iov;
 619}
 620
 621void
 622lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
 623               unsigned int offset, unsigned int len)
 624{
 625        msg->msg_type = type;
 626        msg->msg_target = target;
 627        msg->msg_len = len;
 628        msg->msg_offset = offset;
 629
 630        if (len != 0)
 631                lnet_setpayloadbuffer(msg);
 632
 633        memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
 634        msg->msg_hdr.type          = cpu_to_le32(type);
 635        msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
 636        msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
 637        /* src_nid will be set later */
 638        msg->msg_hdr.src_pid    = cpu_to_le32(the_lnet.ln_pid);
 639        msg->msg_hdr.payload_length = cpu_to_le32(len);
 640}
 641
 642void
 643lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
 644{
 645        void   *priv = msg->msg_private;
 646        int     rc;
 647
 648        LASSERT(!in_interrupt());
 649        LASSERT(LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
 650                 (msg->msg_txcredit && msg->msg_peertxcredit));
 651
 652        rc = (ni->ni_lnd->lnd_send)(ni, priv, msg);
 653        if (rc < 0)
 654                lnet_finalize(ni, msg, rc);
 655}
 656
 657int
 658lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
 659{
 660        int     rc;
 661
 662        LASSERT(!msg->msg_sending);
 663        LASSERT(msg->msg_receiving);
 664        LASSERT(!msg->msg_rx_ready_delay);
 665        LASSERT(ni->ni_lnd->lnd_eager_recv != NULL);
 666
 667        msg->msg_rx_ready_delay = 1;
 668        rc = (ni->ni_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
 669                                          &msg->msg_private);
 670        if (rc != 0) {
 671                CERROR("recv from %s / send to %s aborted: "
 672                       "eager_recv failed %d\n",
 673                       libcfs_nid2str(msg->msg_rxpeer->lp_nid),
 674                       libcfs_id2str(msg->msg_target), rc);
 675                LASSERT(rc < 0); /* required by my callers */
 676        }
 677
 678        return rc;
 679}
 680
 681/* NB: caller shall hold a ref on 'lp' as I'd drop lnet_net_lock */
 682void
 683lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
 684{
 685        cfs_time_t last_alive = 0;
 686
 687        LASSERT(lnet_peer_aliveness_enabled(lp));
 688        LASSERT(ni->ni_lnd->lnd_query != NULL);
 689
 690        lnet_net_unlock(lp->lp_cpt);
 691        (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
 692        lnet_net_lock(lp->lp_cpt);
 693
 694        lp->lp_last_query = cfs_time_current();
 695
 696        if (last_alive != 0) /* NI has updated timestamp */
 697                lp->lp_last_alive = last_alive;
 698}
 699
 700/* NB: always called with lnet_net_lock held */
 701static inline int
 702lnet_peer_is_alive(lnet_peer_t *lp, cfs_time_t now)
 703{
 704        int     alive;
 705        cfs_time_t deadline;
 706
 707        LASSERT(lnet_peer_aliveness_enabled(lp));
 708
 709        /* Trust lnet_notify() if it has more recent aliveness news, but
 710         * ignore the initial assumed death (see lnet_peers_start_down()).
 711         */
 712        if (!lp->lp_alive && lp->lp_alive_count > 0 &&
 713            cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
 714                return 0;
 715
 716        deadline = cfs_time_add(lp->lp_last_alive,
 717                                cfs_time_seconds(lp->lp_ni->ni_peertimeout));
 718        alive = cfs_time_after(deadline, now);
 719
 720        /* Update obsolete lp_alive except for routers assumed to be dead
 721         * initially, because router checker would update aliveness in this
 722         * case, and moreover lp_last_alive at peer creation is assumed.
 723         */
 724        if (alive && !lp->lp_alive &&
 725            !(lnet_isrouter(lp) && lp->lp_alive_count == 0))
 726                lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
 727
 728        return alive;
 729}
 730
 731
 732/* NB: returns 1 when alive, 0 when dead, negative when error;
 733 *     may drop the lnet_net_lock */
 734int
 735lnet_peer_alive_locked(lnet_peer_t *lp)
 736{
 737        cfs_time_t now = cfs_time_current();
 738
 739        if (!lnet_peer_aliveness_enabled(lp))
 740                return -ENODEV;
 741
 742        if (lnet_peer_is_alive(lp, now))
 743                return 1;
 744
 745        /* Peer appears dead, but we should avoid frequent NI queries (at
 746         * most once per lnet_queryinterval seconds). */
 747        if (lp->lp_last_query != 0) {
 748                static const int lnet_queryinterval = 1;
 749
 750                cfs_time_t next_query =
 751                           cfs_time_add(lp->lp_last_query,
 752                                        cfs_time_seconds(lnet_queryinterval));
 753
 754                if (cfs_time_before(now, next_query)) {
 755                        if (lp->lp_alive)
 756                                CWARN("Unexpected aliveness of peer %s: "
 757                                      "%d < %d (%d/%d)\n",
 758                                      libcfs_nid2str(lp->lp_nid),
 759                                      (int)now, (int)next_query,
 760                                      lnet_queryinterval,
 761                                      lp->lp_ni->ni_peertimeout);
 762                        return 0;
 763                }
 764        }
 765
 766        /* query NI for latest aliveness news */
 767        lnet_ni_query_locked(lp->lp_ni, lp);
 768
 769        if (lnet_peer_is_alive(lp, now))
 770                return 1;
 771
 772        lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
 773        return 0;
 774}
 775
 776int
 777lnet_post_send_locked(lnet_msg_t *msg, int do_send)
 778{
 779        /* lnet_send is going to lnet_net_unlock immediately after this,
 780         * so it sets do_send FALSE and I don't do the unlock/send/lock bit.
 781         * I return EAGAIN if msg blocked, EHOSTUNREACH if msg_txpeer
 782         * appears dead, and 0 if sent or OK to send */
 783        struct lnet_peer        *lp = msg->msg_txpeer;
 784        struct lnet_ni          *ni = lp->lp_ni;
 785        struct lnet_tx_queue    *tq;
 786        int                     cpt;
 787
 788        /* non-lnet_send() callers have checked before */
 789        LASSERT(!do_send || msg->msg_tx_delayed);
 790        LASSERT(!msg->msg_receiving);
 791        LASSERT(msg->msg_tx_committed);
 792
 793        cpt = msg->msg_tx_cpt;
 794        tq = ni->ni_tx_queues[cpt];
 795
 796        /* NB 'lp' is always the next hop */
 797        if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
 798            lnet_peer_alive_locked(lp) == 0) {
 799                the_lnet.ln_counters[cpt]->drop_count++;
 800                the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
 801                lnet_net_unlock(cpt);
 802
 803                CNETERR("Dropping message for %s: peer not alive\n",
 804                        libcfs_id2str(msg->msg_target));
 805                if (do_send)
 806                        lnet_finalize(ni, msg, -EHOSTUNREACH);
 807
 808                lnet_net_lock(cpt);
 809                return EHOSTUNREACH;
 810        }
 811
 812        if (!msg->msg_peertxcredit) {
 813                LASSERT((lp->lp_txcredits < 0) ==
 814                         !list_empty(&lp->lp_txq));
 815
 816                msg->msg_peertxcredit = 1;
 817                lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
 818                lp->lp_txcredits--;
 819
 820                if (lp->lp_txcredits < lp->lp_mintxcredits)
 821                        lp->lp_mintxcredits = lp->lp_txcredits;
 822
 823                if (lp->lp_txcredits < 0) {
 824                        msg->msg_tx_delayed = 1;
 825                        list_add_tail(&msg->msg_list, &lp->lp_txq);
 826                        return EAGAIN;
 827                }
 828        }
 829
 830        if (!msg->msg_txcredit) {
 831                LASSERT((tq->tq_credits < 0) ==
 832                        !list_empty(&tq->tq_delayed));
 833
 834                msg->msg_txcredit = 1;
 835                tq->tq_credits--;
 836
 837                if (tq->tq_credits < tq->tq_credits_min)
 838                        tq->tq_credits_min = tq->tq_credits;
 839
 840                if (tq->tq_credits < 0) {
 841                        msg->msg_tx_delayed = 1;
 842                        list_add_tail(&msg->msg_list, &tq->tq_delayed);
 843                        return EAGAIN;
 844                }
 845        }
 846
 847        if (do_send) {
 848                lnet_net_unlock(cpt);
 849                lnet_ni_send(ni, msg);
 850                lnet_net_lock(cpt);
 851        }
 852        return 0;
 853}
 854
 855
 856lnet_rtrbufpool_t *
 857lnet_msg2bufpool(lnet_msg_t *msg)
 858{
 859        lnet_rtrbufpool_t       *rbp;
 860        int                     cpt;
 861
 862        LASSERT(msg->msg_rx_committed);
 863
 864        cpt = msg->msg_rx_cpt;
 865        rbp = &the_lnet.ln_rtrpools[cpt][0];
 866
 867        LASSERT(msg->msg_len <= LNET_MTU);
 868        while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_CACHE_SIZE) {
 869                rbp++;
 870                LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
 871        }
 872
 873        return rbp;
 874}
 875
 876int
 877lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
 878{
 879        /* lnet_parse is going to lnet_net_unlock immediately after this, so it
 880         * sets do_recv FALSE and I don't do the unlock/send/lock bit.  I
 881         * return EAGAIN if msg blocked and 0 if received or OK to receive */
 882        lnet_peer_t      *lp = msg->msg_rxpeer;
 883        lnet_rtrbufpool_t   *rbp;
 884        lnet_rtrbuf_t       *rb;
 885
 886        LASSERT(msg->msg_iov == NULL);
 887        LASSERT(msg->msg_kiov == NULL);
 888        LASSERT(msg->msg_niov == 0);
 889        LASSERT(msg->msg_routing);
 890        LASSERT(msg->msg_receiving);
 891        LASSERT(!msg->msg_sending);
 892
 893        /* non-lnet_parse callers only receive delayed messages */
 894        LASSERT(!do_recv || msg->msg_rx_delayed);
 895
 896        if (!msg->msg_peerrtrcredit) {
 897                LASSERT((lp->lp_rtrcredits < 0) ==
 898                         !list_empty(&lp->lp_rtrq));
 899
 900                msg->msg_peerrtrcredit = 1;
 901                lp->lp_rtrcredits--;
 902                if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
 903                        lp->lp_minrtrcredits = lp->lp_rtrcredits;
 904
 905                if (lp->lp_rtrcredits < 0) {
 906                        /* must have checked eager_recv before here */
 907                        LASSERT(msg->msg_rx_ready_delay);
 908                        msg->msg_rx_delayed = 1;
 909                        list_add_tail(&msg->msg_list, &lp->lp_rtrq);
 910                        return EAGAIN;
 911                }
 912        }
 913
 914        rbp = lnet_msg2bufpool(msg);
 915
 916        if (!msg->msg_rtrcredit) {
 917                LASSERT((rbp->rbp_credits < 0) ==
 918                         !list_empty(&rbp->rbp_msgs));
 919
 920                msg->msg_rtrcredit = 1;
 921                rbp->rbp_credits--;
 922                if (rbp->rbp_credits < rbp->rbp_mincredits)
 923                        rbp->rbp_mincredits = rbp->rbp_credits;
 924
 925                if (rbp->rbp_credits < 0) {
 926                        /* must have checked eager_recv before here */
 927                        LASSERT(msg->msg_rx_ready_delay);
 928                        msg->msg_rx_delayed = 1;
 929                        list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
 930                        return EAGAIN;
 931                }
 932        }
 933
 934        LASSERT(!list_empty(&rbp->rbp_bufs));
 935        rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
 936        list_del(&rb->rb_list);
 937
 938        msg->msg_niov = rbp->rbp_npages;
 939        msg->msg_kiov = &rb->rb_kiov[0];
 940
 941        if (do_recv) {
 942                int cpt = msg->msg_rx_cpt;
 943
 944                lnet_net_unlock(cpt);
 945                lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
 946                             0, msg->msg_len, msg->msg_len);
 947                lnet_net_lock(cpt);
 948        }
 949        return 0;
 950}
 951
 952void
 953lnet_return_tx_credits_locked(lnet_msg_t *msg)
 954{
 955        lnet_peer_t     *txpeer = msg->msg_txpeer;
 956        lnet_msg_t      *msg2;
 957
 958        if (msg->msg_txcredit) {
 959                struct lnet_ni       *ni = txpeer->lp_ni;
 960                struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
 961
 962                /* give back NI txcredits */
 963                msg->msg_txcredit = 0;
 964
 965                LASSERT((tq->tq_credits < 0) ==
 966                        !list_empty(&tq->tq_delayed));
 967
 968                tq->tq_credits++;
 969                if (tq->tq_credits <= 0) {
 970                        msg2 = list_entry(tq->tq_delayed.next,
 971                                              lnet_msg_t, msg_list);
 972                        list_del(&msg2->msg_list);
 973
 974                        LASSERT(msg2->msg_txpeer->lp_ni == ni);
 975                        LASSERT(msg2->msg_tx_delayed);
 976
 977                        (void) lnet_post_send_locked(msg2, 1);
 978                }
 979        }
 980
 981        if (msg->msg_peertxcredit) {
 982                /* give back peer txcredits */
 983                msg->msg_peertxcredit = 0;
 984
 985                LASSERT((txpeer->lp_txcredits < 0) ==
 986                        !list_empty(&txpeer->lp_txq));
 987
 988                txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
 989                LASSERT(txpeer->lp_txqnob >= 0);
 990
 991                txpeer->lp_txcredits++;
 992                if (txpeer->lp_txcredits <= 0) {
 993                        msg2 = list_entry(txpeer->lp_txq.next,
 994                                              lnet_msg_t, msg_list);
 995                        list_del(&msg2->msg_list);
 996
 997                        LASSERT(msg2->msg_txpeer == txpeer);
 998                        LASSERT(msg2->msg_tx_delayed);
 999
1000                        (void) lnet_post_send_locked(msg2, 1);
1001                }
1002        }
1003
1004        if (txpeer != NULL) {
1005                msg->msg_txpeer = NULL;
1006                lnet_peer_decref_locked(txpeer);
1007        }
1008}
1009
1010void
1011lnet_return_rx_credits_locked(lnet_msg_t *msg)
1012{
1013        lnet_peer_t     *rxpeer = msg->msg_rxpeer;
1014        lnet_msg_t      *msg2;
1015
1016        if (msg->msg_rtrcredit) {
1017                /* give back global router credits */
1018                lnet_rtrbuf_t     *rb;
1019                lnet_rtrbufpool_t *rbp;
1020
1021                /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1022                 * there until it gets one allocated, or aborts the wait
1023                 * itself */
1024                LASSERT(msg->msg_kiov != NULL);
1025
1026                rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
1027                rbp = rb->rb_pool;
1028                LASSERT(rbp == lnet_msg2bufpool(msg));
1029
1030                msg->msg_kiov = NULL;
1031                msg->msg_rtrcredit = 0;
1032
1033                LASSERT((rbp->rbp_credits < 0) ==
1034                        !list_empty(&rbp->rbp_msgs));
1035                LASSERT((rbp->rbp_credits > 0) ==
1036                        !list_empty(&rbp->rbp_bufs));
1037
1038                list_add(&rb->rb_list, &rbp->rbp_bufs);
1039                rbp->rbp_credits++;
1040                if (rbp->rbp_credits <= 0) {
1041                        msg2 = list_entry(rbp->rbp_msgs.next,
1042                                              lnet_msg_t, msg_list);
1043                        list_del(&msg2->msg_list);
1044
1045                        (void) lnet_post_routed_recv_locked(msg2, 1);
1046                }
1047        }
1048
1049        if (msg->msg_peerrtrcredit) {
1050                /* give back peer router credits */
1051                msg->msg_peerrtrcredit = 0;
1052
1053                LASSERT((rxpeer->lp_rtrcredits < 0) ==
1054                        !list_empty(&rxpeer->lp_rtrq));
1055
1056                rxpeer->lp_rtrcredits++;
1057                if (rxpeer->lp_rtrcredits <= 0) {
1058                        msg2 = list_entry(rxpeer->lp_rtrq.next,
1059                                              lnet_msg_t, msg_list);
1060                        list_del(&msg2->msg_list);
1061
1062                        (void) lnet_post_routed_recv_locked(msg2, 1);
1063                }
1064        }
1065        if (rxpeer != NULL) {
1066                msg->msg_rxpeer = NULL;
1067                lnet_peer_decref_locked(rxpeer);
1068        }
1069}
1070
1071static int
1072lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
1073{
1074        lnet_peer_t *p1 = r1->lr_gateway;
1075        lnet_peer_t *p2 = r2->lr_gateway;
1076
1077        if (r1->lr_priority < r2->lr_priority)
1078                return 1;
1079
1080        if (r1->lr_priority > r2->lr_priority)
1081                return -1;
1082
1083        if (r1->lr_hops < r2->lr_hops)
1084                return 1;
1085
1086        if (r1->lr_hops > r2->lr_hops)
1087                return -1;
1088
1089        if (p1->lp_txqnob < p2->lp_txqnob)
1090                return 1;
1091
1092        if (p1->lp_txqnob > p2->lp_txqnob)
1093                return -1;
1094
1095        if (p1->lp_txcredits > p2->lp_txcredits)
1096                return 1;
1097
1098        if (p1->lp_txcredits < p2->lp_txcredits)
1099                return -1;
1100
1101        if (r1->lr_seq - r2->lr_seq <= 0)
1102                return 1;
1103
1104        return -1;
1105}
1106
1107static lnet_peer_t *
1108lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
1109{
1110        lnet_remotenet_t        *rnet;
1111        lnet_route_t            *rtr;
1112        lnet_route_t            *rtr_best;
1113        lnet_route_t            *rtr_last;
1114        struct lnet_peer        *lp_best;
1115        struct lnet_peer        *lp;
1116        int                     rc;
1117
1118        /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
1119         * rtr_nid nid, otherwise find the best gateway I can use */
1120
1121        rnet = lnet_find_net_locked(LNET_NIDNET(target));
1122        if (rnet == NULL)
1123                return NULL;
1124
1125        lp_best = NULL;
1126        rtr_best = rtr_last = NULL;
1127        list_for_each_entry(rtr, &rnet->lrn_routes, lr_list) {
1128                lp = rtr->lr_gateway;
1129
1130                if (!lp->lp_alive || /* gateway is down */
1131                    ((lp->lp_ping_feats & LNET_PING_FEAT_NI_STATUS) != 0 &&
1132                     rtr->lr_downis != 0)) /* NI to target is down */
1133                        continue;
1134
1135                if (ni != NULL && lp->lp_ni != ni)
1136                        continue;
1137
1138                if (lp->lp_nid == rtr_nid) /* it's pre-determined router */
1139                        return lp;
1140
1141                if (lp_best == NULL) {
1142                        rtr_best = rtr_last = rtr;
1143                        lp_best = lp;
1144                        continue;
1145                }
1146
1147                /* no protection on below fields, but it's harmless */
1148                if (rtr_last->lr_seq - rtr->lr_seq < 0)
1149                        rtr_last = rtr;
1150
1151                rc = lnet_compare_routes(rtr, rtr_best);
1152                if (rc < 0)
1153                        continue;
1154
1155                rtr_best = rtr;
1156                lp_best = lp;
1157        }
1158
1159        /* set sequence number on the best router to the latest sequence + 1
1160         * so we can round-robin all routers, it's race and inaccurate but
1161         * harmless and functional  */
1162        if (rtr_best != NULL)
1163                rtr_best->lr_seq = rtr_last->lr_seq + 1;
1164        return lp_best;
1165}
1166
1167int
1168lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
1169{
1170        lnet_nid_t              dst_nid = msg->msg_target.nid;
1171        struct lnet_ni          *src_ni;
1172        struct lnet_ni          *local_ni;
1173        struct lnet_peer        *lp;
1174        int                     cpt;
1175        int                     cpt2;
1176        int                     rc;
1177
1178        /* NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
1179         * but we might want to use pre-determined router for ACK/REPLY
1180         * in the future */
1181        /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
1182        LASSERT(msg->msg_txpeer == NULL);
1183        LASSERT(!msg->msg_sending);
1184        LASSERT(!msg->msg_target_is_router);
1185        LASSERT(!msg->msg_receiving);
1186
1187        msg->msg_sending = 1;
1188
1189        LASSERT(!msg->msg_tx_committed);
1190        cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
1191 again:
1192        lnet_net_lock(cpt);
1193
1194        if (the_lnet.ln_shutdown) {
1195                lnet_net_unlock(cpt);
1196                return -ESHUTDOWN;
1197        }
1198
1199        if (src_nid == LNET_NID_ANY) {
1200                src_ni = NULL;
1201        } else {
1202                src_ni = lnet_nid2ni_locked(src_nid, cpt);
1203                if (src_ni == NULL) {
1204                        lnet_net_unlock(cpt);
1205                        LCONSOLE_WARN("Can't send to %s: src %s is not a "
1206                                      "local nid\n", libcfs_nid2str(dst_nid),
1207                                      libcfs_nid2str(src_nid));
1208                        return -EINVAL;
1209                }
1210                LASSERT(!msg->msg_routing);
1211        }
1212
1213        /* Is this for someone on a local network? */
1214        local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
1215
1216        if (local_ni != NULL) {
1217                if (src_ni == NULL) {
1218                        src_ni = local_ni;
1219                        src_nid = src_ni->ni_nid;
1220                } else if (src_ni == local_ni) {
1221                        lnet_ni_decref_locked(local_ni, cpt);
1222                } else {
1223                        lnet_ni_decref_locked(local_ni, cpt);
1224                        lnet_ni_decref_locked(src_ni, cpt);
1225                        lnet_net_unlock(cpt);
1226                        LCONSOLE_WARN("No route to %s via from %s\n",
1227                                      libcfs_nid2str(dst_nid),
1228                                      libcfs_nid2str(src_nid));
1229                        return -EINVAL;
1230                }
1231
1232                LASSERT(src_nid != LNET_NID_ANY);
1233                lnet_msg_commit(msg, cpt);
1234
1235                if (!msg->msg_routing)
1236                        msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1237
1238                if (src_ni == the_lnet.ln_loni) {
1239                        /* No send credit hassles with LOLND */
1240                        lnet_net_unlock(cpt);
1241                        lnet_ni_send(src_ni, msg);
1242
1243                        lnet_net_lock(cpt);
1244                        lnet_ni_decref_locked(src_ni, cpt);
1245                        lnet_net_unlock(cpt);
1246                        return 0;
1247                }
1248
1249                rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
1250                /* lp has ref on src_ni; lose mine */
1251                lnet_ni_decref_locked(src_ni, cpt);
1252                if (rc != 0) {
1253                        lnet_net_unlock(cpt);
1254                        LCONSOLE_WARN("Error %d finding peer %s\n", rc,
1255                                      libcfs_nid2str(dst_nid));
1256                        /* ENOMEM or shutting down */
1257                        return rc;
1258                }
1259                LASSERT(lp->lp_ni == src_ni);
1260        } else {
1261                /* sending to a remote network */
1262                lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
1263                if (lp == NULL) {
1264                        if (src_ni != NULL)
1265                                lnet_ni_decref_locked(src_ni, cpt);
1266                        lnet_net_unlock(cpt);
1267
1268                        LCONSOLE_WARN("No route to %s via %s "
1269                                      "(all routers down)\n",
1270                                      libcfs_id2str(msg->msg_target),
1271                                      libcfs_nid2str(src_nid));
1272                        return -EHOSTUNREACH;
1273                }
1274
1275                /* rtr_nid is LNET_NID_ANY or NID of pre-determined router,
1276                 * it's possible that rtr_nid isn't LNET_NID_ANY and lp isn't
1277                 * pre-determined router, this can happen if router table
1278                 * was changed when we release the lock */
1279                if (rtr_nid != lp->lp_nid) {
1280                        cpt2 = lnet_cpt_of_nid_locked(lp->lp_nid);
1281                        if (cpt2 != cpt) {
1282                                if (src_ni != NULL)
1283                                        lnet_ni_decref_locked(src_ni, cpt);
1284                                lnet_net_unlock(cpt);
1285
1286                                rtr_nid = lp->lp_nid;
1287                                cpt = cpt2;
1288                                goto again;
1289                        }
1290                }
1291
1292                CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
1293                       libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
1294                       lnet_msgtyp2str(msg->msg_type), msg->msg_len);
1295
1296                if (src_ni == NULL) {
1297                        src_ni = lp->lp_ni;
1298                        src_nid = src_ni->ni_nid;
1299                } else {
1300                        LASSERT(src_ni == lp->lp_ni);
1301                        lnet_ni_decref_locked(src_ni, cpt);
1302                }
1303
1304                lnet_peer_addref_locked(lp);
1305
1306                LASSERT(src_nid != LNET_NID_ANY);
1307                lnet_msg_commit(msg, cpt);
1308
1309                if (!msg->msg_routing) {
1310                        /* I'm the source and now I know which NI to send on */
1311                        msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1312                }
1313
1314                msg->msg_target_is_router = 1;
1315                msg->msg_target.nid = lp->lp_nid;
1316                msg->msg_target.pid = LUSTRE_SRV_LNET_PID;
1317        }
1318
1319        /* 'lp' is our best choice of peer */
1320
1321        LASSERT(!msg->msg_peertxcredit);
1322        LASSERT(!msg->msg_txcredit);
1323        LASSERT(msg->msg_txpeer == NULL);
1324
1325        msg->msg_txpeer = lp;              /* msg takes my ref on lp */
1326
1327        rc = lnet_post_send_locked(msg, 0);
1328        lnet_net_unlock(cpt);
1329
1330        if (rc == EHOSTUNREACH)
1331                return -EHOSTUNREACH;
1332
1333        if (rc == 0)
1334                lnet_ni_send(src_ni, msg);
1335
1336        return 0;
1337}
1338
1339static void
1340lnet_drop_message(lnet_ni_t *ni, int cpt, void *private, unsigned int nob)
1341{
1342        lnet_net_lock(cpt);
1343        the_lnet.ln_counters[cpt]->drop_count++;
1344        the_lnet.ln_counters[cpt]->drop_length += nob;
1345        lnet_net_unlock(cpt);
1346
1347        lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1348}
1349
1350static void
1351lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
1352{
1353        lnet_hdr_t      *hdr = &msg->msg_hdr;
1354
1355        if (msg->msg_wanted != 0)
1356                lnet_setpayloadbuffer(msg);
1357
1358        lnet_build_msg_event(msg, LNET_EVENT_PUT);
1359
1360        /* Must I ACK?  If so I'll grab the ack_wmd out of the header and put
1361         * it back into the ACK during lnet_finalize() */
1362        msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1363                        (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
1364
1365        lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
1366                     msg->msg_offset, msg->msg_wanted, hdr->payload_length);
1367}
1368
1369static int
1370lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1371{
1372        lnet_hdr_t              *hdr = &msg->msg_hdr;
1373        struct lnet_match_info  info;
1374        int                     rc;
1375
1376        /* Convert put fields to host byte order */
1377        hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1378        hdr->msg.put.ptl_index  = le32_to_cpu(hdr->msg.put.ptl_index);
1379        hdr->msg.put.offset     = le32_to_cpu(hdr->msg.put.offset);
1380
1381        info.mi_id.nid  = hdr->src_nid;
1382        info.mi_id.pid  = hdr->src_pid;
1383        info.mi_opc     = LNET_MD_OP_PUT;
1384        info.mi_portal  = hdr->msg.put.ptl_index;
1385        info.mi_rlength = hdr->payload_length;
1386        info.mi_roffset = hdr->msg.put.offset;
1387        info.mi_mbits   = hdr->msg.put.match_bits;
1388
1389        msg->msg_rx_ready_delay = ni->ni_lnd->lnd_eager_recv == NULL;
1390
1391 again:
1392        rc = lnet_ptl_match_md(&info, msg);
1393        switch (rc) {
1394        default:
1395                LBUG();
1396
1397        case LNET_MATCHMD_OK:
1398                lnet_recv_put(ni, msg);
1399                return 0;
1400
1401        case LNET_MATCHMD_NONE:
1402                if (msg->msg_rx_delayed) /* attached on delayed list */
1403                        return 0;
1404
1405                rc = lnet_ni_eager_recv(ni, msg);
1406                if (rc == 0)
1407                        goto again;
1408                /* fall through */
1409
1410        case LNET_MATCHMD_DROP:
1411                CNETERR("Dropping PUT from %s portal %d match "LPU64
1412                        " offset %d length %d: %d\n",
1413                        libcfs_id2str(info.mi_id), info.mi_portal,
1414                        info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
1415
1416                return ENOENT;  /* +ve: OK but no match */
1417        }
1418}
1419
1420static int
1421lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1422{
1423        struct lnet_match_info  info;
1424        lnet_hdr_t              *hdr = &msg->msg_hdr;
1425        lnet_handle_wire_t      reply_wmd;
1426        int                     rc;
1427
1428        /* Convert get fields to host byte order */
1429        hdr->msg.get.match_bits   = le64_to_cpu(hdr->msg.get.match_bits);
1430        hdr->msg.get.ptl_index    = le32_to_cpu(hdr->msg.get.ptl_index);
1431        hdr->msg.get.sink_length  = le32_to_cpu(hdr->msg.get.sink_length);
1432        hdr->msg.get.src_offset   = le32_to_cpu(hdr->msg.get.src_offset);
1433
1434        info.mi_id.nid  = hdr->src_nid;
1435        info.mi_id.pid  = hdr->src_pid;
1436        info.mi_opc     = LNET_MD_OP_GET;
1437        info.mi_portal  = hdr->msg.get.ptl_index;
1438        info.mi_rlength = hdr->msg.get.sink_length;
1439        info.mi_roffset = hdr->msg.get.src_offset;
1440        info.mi_mbits   = hdr->msg.get.match_bits;
1441
1442        rc = lnet_ptl_match_md(&info, msg);
1443        if (rc == LNET_MATCHMD_DROP) {
1444                CNETERR("Dropping GET from %s portal %d match "LPU64
1445                        " offset %d length %d\n",
1446                        libcfs_id2str(info.mi_id), info.mi_portal,
1447                        info.mi_mbits, info.mi_roffset, info.mi_rlength);
1448                return ENOENT;  /* +ve: OK but no match */
1449        }
1450
1451        LASSERT(rc == LNET_MATCHMD_OK);
1452
1453        lnet_build_msg_event(msg, LNET_EVENT_GET);
1454
1455        reply_wmd = hdr->msg.get.return_wmd;
1456
1457        lnet_prep_send(msg, LNET_MSG_REPLY, info.mi_id,
1458                       msg->msg_offset, msg->msg_wanted);
1459
1460        msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1461
1462        if (rdma_get) {
1463                /* The LND completes the REPLY from her recv procedure */
1464                lnet_ni_recv(ni, msg->msg_private, msg, 0,
1465                             msg->msg_offset, msg->msg_len, msg->msg_len);
1466                return 0;
1467        }
1468
1469        lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1470        msg->msg_receiving = 0;
1471
1472        rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
1473        if (rc < 0) {
1474                /* didn't get as far as lnet_ni_send() */
1475                CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1476                       libcfs_nid2str(ni->ni_nid),
1477                       libcfs_id2str(info.mi_id), rc);
1478
1479                lnet_finalize(ni, msg, rc);
1480        }
1481
1482        return 0;
1483}
1484
1485static int
1486lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1487{
1488        void         *private = msg->msg_private;
1489        lnet_hdr_t       *hdr = &msg->msg_hdr;
1490        lnet_process_id_t src = {0};
1491        lnet_libmd_t     *md;
1492        int            rlength;
1493        int            mlength;
1494        int                     cpt;
1495
1496        cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
1497        lnet_res_lock(cpt);
1498
1499        src.nid = hdr->src_nid;
1500        src.pid = hdr->src_pid;
1501
1502        /* NB handles only looked up by creator (no flips) */
1503        md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1504        if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1505                CNETERR("%s: Dropping REPLY from %s for %s "
1506                        "MD "LPX64"."LPX64"\n",
1507                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1508                        (md == NULL) ? "invalid" : "inactive",
1509                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
1510                        hdr->msg.reply.dst_wmd.wh_object_cookie);
1511                if (md != NULL && md->md_me != NULL)
1512                        CERROR("REPLY MD also attached to portal %d\n",
1513                               md->md_me->me_portal);
1514
1515                lnet_res_unlock(cpt);
1516                return ENOENT;            /* +ve: OK but no match */
1517        }
1518
1519        LASSERT(md->md_offset == 0);
1520
1521        rlength = hdr->payload_length;
1522        mlength = MIN(rlength, (int)md->md_length);
1523
1524        if (mlength < rlength &&
1525            (md->md_options & LNET_MD_TRUNCATE) == 0) {
1526                CNETERR("%s: Dropping REPLY from %s length %d "
1527                        "for MD "LPX64" would overflow (%d)\n",
1528                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1529                        rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1530                        mlength);
1531                lnet_res_unlock(cpt);
1532                return ENOENT;    /* +ve: OK but no match */
1533        }
1534
1535        CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md "LPX64"\n",
1536               libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1537               mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1538
1539        lnet_msg_attach_md(msg, md, 0, mlength);
1540
1541        if (mlength != 0)
1542                lnet_setpayloadbuffer(msg);
1543
1544        lnet_res_unlock(cpt);
1545
1546        lnet_build_msg_event(msg, LNET_EVENT_REPLY);
1547
1548        lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1549        return 0;
1550}
1551
1552static int
1553lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1554{
1555        lnet_hdr_t       *hdr = &msg->msg_hdr;
1556        lnet_process_id_t src = {0};
1557        lnet_libmd_t     *md;
1558        int                     cpt;
1559
1560        src.nid = hdr->src_nid;
1561        src.pid = hdr->src_pid;
1562
1563        /* Convert ack fields to host byte order */
1564        hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1565        hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1566
1567        cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
1568        lnet_res_lock(cpt);
1569
1570        /* NB handles only looked up by creator (no flips) */
1571        md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1572        if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1573                /* Don't moan; this is expected */
1574                CDEBUG(D_NET,
1575                       "%s: Dropping ACK from %s to %s MD "LPX64"."LPX64"\n",
1576                       libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1577                       (md == NULL) ? "invalid" : "inactive",
1578                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
1579                       hdr->msg.ack.dst_wmd.wh_object_cookie);
1580                if (md != NULL && md->md_me != NULL)
1581                        CERROR("Source MD also attached to portal %d\n",
1582                               md->md_me->me_portal);
1583
1584                lnet_res_unlock(cpt);
1585                return ENOENT;            /* +ve! */
1586        }
1587
1588        CDEBUG(D_NET, "%s: ACK from %s into md "LPX64"\n",
1589               libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1590               hdr->msg.ack.dst_wmd.wh_object_cookie);
1591
1592        lnet_msg_attach_md(msg, md, 0, 0);
1593
1594        lnet_res_unlock(cpt);
1595
1596        lnet_build_msg_event(msg, LNET_EVENT_ACK);
1597
1598        lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1599        return 0;
1600}
1601
1602static int
1603lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
1604{
1605        int     rc = 0;
1606
1607        if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
1608            lnet_msg2bufpool(msg)->rbp_credits <= 0) {
1609                if (ni->ni_lnd->lnd_eager_recv == NULL) {
1610                        msg->msg_rx_ready_delay = 1;
1611                } else {
1612                        lnet_net_unlock(msg->msg_rx_cpt);
1613                        rc = lnet_ni_eager_recv(ni, msg);
1614                        lnet_net_lock(msg->msg_rx_cpt);
1615                }
1616        }
1617
1618        if (rc == 0)
1619                rc = lnet_post_routed_recv_locked(msg, 0);
1620        return rc;
1621}
1622
1623char *
1624lnet_msgtyp2str(int type)
1625{
1626        switch (type) {
1627        case LNET_MSG_ACK:
1628                return "ACK";
1629        case LNET_MSG_PUT:
1630                return "PUT";
1631        case LNET_MSG_GET:
1632                return "GET";
1633        case LNET_MSG_REPLY:
1634                return "REPLY";
1635        case LNET_MSG_HELLO:
1636                return "HELLO";
1637        default:
1638                return "<UNKNOWN>";
1639        }
1640}
1641EXPORT_SYMBOL(lnet_msgtyp2str);
1642
1643void
1644lnet_print_hdr(lnet_hdr_t *hdr)
1645{
1646        lnet_process_id_t src = {0};
1647        lnet_process_id_t dst = {0};
1648        char *type_str = lnet_msgtyp2str(hdr->type);
1649
1650        src.nid = hdr->src_nid;
1651        src.pid = hdr->src_pid;
1652
1653        dst.nid = hdr->dest_nid;
1654        dst.pid = hdr->dest_pid;
1655
1656        CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1657        CWARN("    From %s\n", libcfs_id2str(src));
1658        CWARN("    To   %s\n", libcfs_id2str(dst));
1659
1660        switch (hdr->type) {
1661        default:
1662                break;
1663
1664        case LNET_MSG_PUT:
1665                CWARN("    Ptl index %d, ack md "LPX64"."LPX64", "
1666                      "match bits "LPU64"\n",
1667                      hdr->msg.put.ptl_index,
1668                      hdr->msg.put.ack_wmd.wh_interface_cookie,
1669                      hdr->msg.put.ack_wmd.wh_object_cookie,
1670                      hdr->msg.put.match_bits);
1671                CWARN("    Length %d, offset %d, hdr data "LPX64"\n",
1672                      hdr->payload_length, hdr->msg.put.offset,
1673                      hdr->msg.put.hdr_data);
1674                break;
1675
1676        case LNET_MSG_GET:
1677                CWARN("    Ptl index %d, return md "LPX64"."LPX64", "
1678                      "match bits "LPU64"\n", hdr->msg.get.ptl_index,
1679                      hdr->msg.get.return_wmd.wh_interface_cookie,
1680                      hdr->msg.get.return_wmd.wh_object_cookie,
1681                      hdr->msg.get.match_bits);
1682                CWARN("    Length %d, src offset %d\n",
1683                      hdr->msg.get.sink_length,
1684                      hdr->msg.get.src_offset);
1685                break;
1686
1687        case LNET_MSG_ACK:
1688                CWARN("    dst md "LPX64"."LPX64", "
1689                      "manipulated length %d\n",
1690                      hdr->msg.ack.dst_wmd.wh_interface_cookie,
1691                      hdr->msg.ack.dst_wmd.wh_object_cookie,
1692                      hdr->msg.ack.mlength);
1693                break;
1694
1695        case LNET_MSG_REPLY:
1696                CWARN("    dst md "LPX64"."LPX64", "
1697                      "length %d\n",
1698                      hdr->msg.reply.dst_wmd.wh_interface_cookie,
1699                      hdr->msg.reply.dst_wmd.wh_object_cookie,
1700                      hdr->payload_length);
1701        }
1702
1703}
1704
1705int
1706lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
1707           void *private, int rdma_req)
1708{
1709        int             rc = 0;
1710        int             cpt;
1711        int             for_me;
1712        struct lnet_msg *msg;
1713        lnet_pid_t     dest_pid;
1714        lnet_nid_t     dest_nid;
1715        lnet_nid_t     src_nid;
1716        __u32     payload_length;
1717        __u32     type;
1718
1719        LASSERT(!in_interrupt());
1720
1721        type = le32_to_cpu(hdr->type);
1722        src_nid = le64_to_cpu(hdr->src_nid);
1723        dest_nid = le64_to_cpu(hdr->dest_nid);
1724        dest_pid = le32_to_cpu(hdr->dest_pid);
1725        payload_length = le32_to_cpu(hdr->payload_length);
1726
1727        for_me = (ni->ni_nid == dest_nid);
1728        cpt = lnet_cpt_of_nid(from_nid);
1729
1730        switch (type) {
1731        case LNET_MSG_ACK:
1732        case LNET_MSG_GET:
1733                if (payload_length > 0) {
1734                        CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1735                               libcfs_nid2str(from_nid),
1736                               libcfs_nid2str(src_nid),
1737                               lnet_msgtyp2str(type), payload_length);
1738                        return -EPROTO;
1739                }
1740                break;
1741
1742        case LNET_MSG_PUT:
1743        case LNET_MSG_REPLY:
1744                if (payload_length >
1745                   (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1746                        CERROR("%s, src %s: bad %s payload %d "
1747                               "(%d max expected)\n",
1748                               libcfs_nid2str(from_nid),
1749                               libcfs_nid2str(src_nid),
1750                               lnet_msgtyp2str(type),
1751                               payload_length,
1752                               for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1753                        return -EPROTO;
1754                }
1755                break;
1756
1757        default:
1758                CERROR("%s, src %s: Bad message type 0x%x\n",
1759                       libcfs_nid2str(from_nid),
1760                       libcfs_nid2str(src_nid), type);
1761                return -EPROTO;
1762        }
1763
1764        if (the_lnet.ln_routing &&
1765            ni->ni_last_alive != cfs_time_current_sec()) {
1766                lnet_ni_lock(ni);
1767
1768                /* NB: so far here is the only place to set NI status to "up */
1769                ni->ni_last_alive = cfs_time_current_sec();
1770                if (ni->ni_status != NULL &&
1771                    ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
1772                        ni->ni_status->ns_status = LNET_NI_STATUS_UP;
1773                lnet_ni_unlock(ni);
1774        }
1775
1776        /* Regard a bad destination NID as a protocol error.  Senders should
1777         * know what they're doing; if they don't they're misconfigured, buggy
1778         * or malicious so we chop them off at the knees :) */
1779
1780        if (!for_me) {
1781                if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
1782                        /* should have gone direct */
1783                        CERROR("%s, src %s: Bad dest nid %s "
1784                                "(should have been sent direct)\n",
1785                                libcfs_nid2str(from_nid),
1786                                libcfs_nid2str(src_nid),
1787                                libcfs_nid2str(dest_nid));
1788                        return -EPROTO;
1789                }
1790
1791                if (lnet_islocalnid(dest_nid)) {
1792                        /* dest is another local NI; sender should have used
1793                         * this node's NID on its own network */
1794                        CERROR("%s, src %s: Bad dest nid %s "
1795                                "(it's my nid but on a different network)\n",
1796                                libcfs_nid2str(from_nid),
1797                                libcfs_nid2str(src_nid),
1798                                libcfs_nid2str(dest_nid));
1799                        return -EPROTO;
1800                }
1801
1802                if (rdma_req && type == LNET_MSG_GET) {
1803                        CERROR("%s, src %s: Bad optimized GET for %s "
1804                                "(final destination must be me)\n",
1805                                libcfs_nid2str(from_nid),
1806                                libcfs_nid2str(src_nid),
1807                                libcfs_nid2str(dest_nid));
1808                        return -EPROTO;
1809                }
1810
1811                if (!the_lnet.ln_routing) {
1812                        CERROR("%s, src %s: Dropping message for %s "
1813                                "(routing not enabled)\n",
1814                                libcfs_nid2str(from_nid),
1815                                libcfs_nid2str(src_nid),
1816                                libcfs_nid2str(dest_nid));
1817                        goto drop;
1818                }
1819        }
1820
1821        /* Message looks OK; we're not going to return an error, so we MUST
1822         * call back lnd_recv() come what may... */
1823
1824        if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
1825            fail_peer(src_nid, 0)) {         /* shall we now? */
1826                CERROR("%s, src %s: Dropping %s to simulate failure\n",
1827                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1828                       lnet_msgtyp2str(type));
1829                goto drop;
1830        }
1831
1832        msg = lnet_msg_alloc();
1833        if (msg == NULL) {
1834                CERROR("%s, src %s: Dropping %s (out of memory)\n",
1835                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1836                       lnet_msgtyp2str(type));
1837                goto drop;
1838        }
1839
1840        /* msg zeroed in lnet_msg_alloc;
1841         * i.e. flags all clear, pointers NULL etc
1842         */
1843
1844        msg->msg_type = type;
1845        msg->msg_private = private;
1846        msg->msg_receiving = 1;
1847        msg->msg_len = msg->msg_wanted = payload_length;
1848        msg->msg_offset = 0;
1849        msg->msg_hdr = *hdr;
1850        /* for building message event */
1851        msg->msg_from = from_nid;
1852        if (!for_me) {
1853                msg->msg_target.pid     = dest_pid;
1854                msg->msg_target.nid     = dest_nid;
1855                msg->msg_routing        = 1;
1856
1857        } else {
1858                /* convert common msg->hdr fields to host byteorder */
1859                msg->msg_hdr.type       = type;
1860                msg->msg_hdr.src_nid    = src_nid;
1861                msg->msg_hdr.src_pid    = le32_to_cpu(msg->msg_hdr.src_pid);
1862                msg->msg_hdr.dest_nid   = dest_nid;
1863                msg->msg_hdr.dest_pid   = dest_pid;
1864                msg->msg_hdr.payload_length = payload_length;
1865        }
1866
1867        lnet_net_lock(cpt);
1868        rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
1869        if (rc != 0) {
1870                lnet_net_unlock(cpt);
1871                CERROR("%s, src %s: Dropping %s "
1872                       "(error %d looking up sender)\n",
1873                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1874                       lnet_msgtyp2str(type), rc);
1875                lnet_msg_free(msg);
1876                goto drop;
1877        }
1878
1879        lnet_msg_commit(msg, cpt);
1880
1881        if (!for_me) {
1882                rc = lnet_parse_forward_locked(ni, msg);
1883                lnet_net_unlock(cpt);
1884
1885                if (rc < 0)
1886                        goto free_drop;
1887                if (rc == 0) {
1888                        lnet_ni_recv(ni, msg->msg_private, msg, 0,
1889                                     0, payload_length, payload_length);
1890                }
1891                return 0;
1892        }
1893
1894        lnet_net_unlock(cpt);
1895
1896        switch (type) {
1897        case LNET_MSG_ACK:
1898                rc = lnet_parse_ack(ni, msg);
1899                break;
1900        case LNET_MSG_PUT:
1901                rc = lnet_parse_put(ni, msg);
1902                break;
1903        case LNET_MSG_GET:
1904                rc = lnet_parse_get(ni, msg, rdma_req);
1905                break;
1906        case LNET_MSG_REPLY:
1907                rc = lnet_parse_reply(ni, msg);
1908                break;
1909        default:
1910                LASSERT(0);
1911                rc = -EPROTO;
1912                goto free_drop;  /* prevent an unused label if !kernel */
1913        }
1914
1915        if (rc == 0)
1916                return 0;
1917
1918        LASSERT(rc == ENOENT);
1919
1920 free_drop:
1921        LASSERT(msg->msg_md == NULL);
1922        lnet_finalize(ni, msg, rc);
1923
1924 drop:
1925        lnet_drop_message(ni, cpt, private, payload_length);
1926        return 0;
1927}
1928EXPORT_SYMBOL(lnet_parse);
1929
1930void
1931lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
1932{
1933        while (!list_empty(head)) {
1934                lnet_process_id_t       id = {0};
1935                lnet_msg_t              *msg;
1936
1937                msg = list_entry(head->next, lnet_msg_t, msg_list);
1938                list_del(&msg->msg_list);
1939
1940                id.nid = msg->msg_hdr.src_nid;
1941                id.pid = msg->msg_hdr.src_pid;
1942
1943                LASSERT(msg->msg_md == NULL);
1944                LASSERT(msg->msg_rx_delayed);
1945                LASSERT(msg->msg_rxpeer != NULL);
1946                LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1947
1948                CWARN("Dropping delayed PUT from %s portal %d match "LPU64
1949                      " offset %d length %d: %s\n",
1950                      libcfs_id2str(id),
1951                      msg->msg_hdr.msg.put.ptl_index,
1952                      msg->msg_hdr.msg.put.match_bits,
1953                      msg->msg_hdr.msg.put.offset,
1954                      msg->msg_hdr.payload_length, reason);
1955
1956                /* NB I can't drop msg's ref on msg_rxpeer until after I've
1957                 * called lnet_drop_message(), so I just hang onto msg as well
1958                 * until that's done */
1959
1960                lnet_drop_message(msg->msg_rxpeer->lp_ni,
1961                                  msg->msg_rxpeer->lp_cpt,
1962                                  msg->msg_private, msg->msg_len);
1963                /*
1964                 * NB: message will not generate event because w/o attached MD,
1965                 * but we still should give error code so lnet_msg_decommit()
1966                 * can skip counters operations and other checks.
1967                 */
1968                lnet_finalize(msg->msg_rxpeer->lp_ni, msg, -ENOENT);
1969        }
1970}
1971
1972void
1973lnet_recv_delayed_msg_list(struct list_head *head)
1974{
1975        while (!list_empty(head)) {
1976                lnet_msg_t        *msg;
1977                lnet_process_id_t  id;
1978
1979                msg = list_entry(head->next, lnet_msg_t, msg_list);
1980                list_del(&msg->msg_list);
1981
1982                /* md won't disappear under me, since each msg
1983                 * holds a ref on it */
1984
1985                id.nid = msg->msg_hdr.src_nid;
1986                id.pid = msg->msg_hdr.src_pid;
1987
1988                LASSERT(msg->msg_rx_delayed);
1989                LASSERT(msg->msg_md != NULL);
1990                LASSERT(msg->msg_rxpeer != NULL);
1991                LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1992
1993                CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
1994                       "match "LPU64" offset %d length %d.\n",
1995                        libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
1996                        msg->msg_hdr.msg.put.match_bits,
1997                        msg->msg_hdr.msg.put.offset,
1998                        msg->msg_hdr.payload_length);
1999
2000                lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
2001        }
2002}
2003
2004/**
2005 * Initiate an asynchronous PUT operation.
2006 *
2007 * There are several events associated with a PUT: completion of the send on
2008 * the initiator node (LNET_EVENT_SEND), and when the send completes
2009 * successfully, the receipt of an acknowledgment (LNET_EVENT_ACK) indicating
2010 * that the operation was accepted by the target. The event LNET_EVENT_PUT is
2011 * used at the target node to indicate the completion of incoming data
2012 * delivery.
2013 *
2014 * The local events will be logged in the EQ associated with the MD pointed to
2015 * by \a mdh handle. Using a MD without an associated EQ results in these
2016 * events being discarded. In this case, the caller must have another
2017 * mechanism (e.g., a higher level protocol) for determining when it is safe
2018 * to modify the memory region associated with the MD.
2019 *
2020 * Note that LNet does not guarantee the order of LNET_EVENT_SEND and
2021 * LNET_EVENT_ACK, though intuitively ACK should happen after SEND.
2022 *
2023 * \param self Indicates the NID of a local interface through which to send
2024 * the PUT request. Use LNET_NID_ANY to let LNet choose one by itself.
2025 * \param mdh A handle for the MD that describes the memory to be sent. The MD
2026 * must be "free floating" (See LNetMDBind()).
2027 * \param ack Controls whether an acknowledgment is requested.
2028 * Acknowledgments are only sent when they are requested by the initiating
2029 * process and the target MD enables them.
2030 * \param target A process identifier for the target process.
2031 * \param portal The index in the \a target's portal table.
2032 * \param match_bits The match bits to use for MD selection at the target
2033 * process.
2034 * \param offset The offset into the target MD (only used when the target
2035 * MD has the LNET_MD_MANAGE_REMOTE option set).
2036 * \param hdr_data 64 bits of user data that can be included in the message
2037 * header. This data is written to an event queue entry at the target if an
2038 * EQ is present on the matching MD.
2039 *
2040 * \retval  0      Success, and only in this case events will be generated
2041 * and logged to EQ (if it exists).
2042 * \retval -EIO    Simulated failure.
2043 * \retval -ENOMEM Memory allocation failure.
2044 * \retval -ENOENT Invalid MD object.
2045 *
2046 * \see lnet_event_t::hdr_data and lnet_event_kind_t.
2047 */
2048int
2049LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
2050        lnet_process_id_t target, unsigned int portal,
2051        __u64 match_bits, unsigned int offset,
2052        __u64 hdr_data)
2053{
2054        struct lnet_msg         *msg;
2055        struct lnet_libmd       *md;
2056        int                     cpt;
2057        int                     rc;
2058
2059        LASSERT(the_lnet.ln_init);
2060        LASSERT(the_lnet.ln_refcount > 0);
2061
2062        if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
2063            fail_peer(target.nid, 1)) { /* shall we now? */
2064                CERROR("Dropping PUT to %s: simulated failure\n",
2065                       libcfs_id2str(target));
2066                return -EIO;
2067        }
2068
2069        msg = lnet_msg_alloc();
2070        if (msg == NULL) {
2071                CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2072                       libcfs_id2str(target));
2073                return -ENOMEM;
2074        }
2075        msg->msg_vmflush = !!memory_pressure_get();
2076
2077        cpt = lnet_cpt_of_cookie(mdh.cookie);
2078        lnet_res_lock(cpt);
2079
2080        md = lnet_handle2md(&mdh);
2081        if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2082                CERROR("Dropping PUT ("LPU64":%d:%s): MD (%d) invalid\n",
2083                       match_bits, portal, libcfs_id2str(target),
2084                       md == NULL ? -1 : md->md_threshold);
2085                if (md != NULL && md->md_me != NULL)
2086                        CERROR("Source MD also attached to portal %d\n",
2087                               md->md_me->me_portal);
2088                lnet_res_unlock(cpt);
2089
2090                lnet_msg_free(msg);
2091                return -ENOENT;
2092        }
2093
2094        CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2095
2096        lnet_msg_attach_md(msg, md, 0, 0);
2097
2098        lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2099
2100        msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2101        msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2102        msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2103        msg->msg_hdr.msg.put.hdr_data = hdr_data;
2104
2105        /* NB handles only looked up by creator (no flips) */
2106        if (ack == LNET_ACK_REQ) {
2107                msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2108                        the_lnet.ln_interface_cookie;
2109                msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2110                        md->md_lh.lh_cookie;
2111        } else {
2112                msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2113                        LNET_WIRE_HANDLE_COOKIE_NONE;
2114                msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2115                        LNET_WIRE_HANDLE_COOKIE_NONE;
2116        }
2117
2118        lnet_res_unlock(cpt);
2119
2120        lnet_build_msg_event(msg, LNET_EVENT_SEND);
2121
2122        rc = lnet_send(self, msg, LNET_NID_ANY);
2123        if (rc != 0) {
2124                CNETERR("Error sending PUT to %s: %d\n",
2125                       libcfs_id2str(target), rc);
2126                lnet_finalize(NULL, msg, rc);
2127        }
2128
2129        /* completion will be signalled by an event */
2130        return 0;
2131}
2132EXPORT_SYMBOL(LNetPut);
2133
2134lnet_msg_t *
2135lnet_create_reply_msg(lnet_ni_t *ni, lnet_msg_t *getmsg)
2136{
2137        /* The LND can DMA direct to the GET md (i.e. no REPLY msg).  This
2138         * returns a msg for the LND to pass to lnet_finalize() when the sink
2139         * data has been received.
2140         *
2141         * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
2142         * lnet_finalize() is called on it, so the LND must call this first */
2143
2144        struct lnet_msg         *msg = lnet_msg_alloc();
2145        struct lnet_libmd       *getmd = getmsg->msg_md;
2146        lnet_process_id_t       peer_id = getmsg->msg_target;
2147        int                     cpt;
2148
2149        LASSERT(!getmsg->msg_target_is_router);
2150        LASSERT(!getmsg->msg_routing);
2151
2152        cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
2153        lnet_res_lock(cpt);
2154
2155        LASSERT(getmd->md_refcount > 0);
2156
2157        if (msg == NULL) {
2158                CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
2159                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2160                goto drop;
2161        }
2162
2163        if (getmd->md_threshold == 0) {
2164                CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
2165                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
2166                        getmd);
2167                lnet_res_unlock(cpt);
2168                goto drop;
2169        }
2170
2171        LASSERT(getmd->md_offset == 0);
2172
2173        CDEBUG(D_NET, "%s: Reply from %s md %p\n",
2174               libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2175
2176        /* setup information for lnet_build_msg_event */
2177        msg->msg_from = peer_id.nid;
2178        msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
2179        msg->msg_hdr.src_nid = peer_id.nid;
2180        msg->msg_hdr.payload_length = getmd->md_length;
2181        msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
2182
2183        lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
2184        lnet_res_unlock(cpt);
2185
2186        cpt = lnet_cpt_of_nid(peer_id.nid);
2187
2188        lnet_net_lock(cpt);
2189        lnet_msg_commit(msg, cpt);
2190        lnet_net_unlock(cpt);
2191
2192        lnet_build_msg_event(msg, LNET_EVENT_REPLY);
2193
2194        return msg;
2195
2196 drop:
2197        cpt = lnet_cpt_of_nid(peer_id.nid);
2198
2199        lnet_net_lock(cpt);
2200        the_lnet.ln_counters[cpt]->drop_count++;
2201        the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
2202        lnet_net_unlock(cpt);
2203
2204        if (msg != NULL)
2205                lnet_msg_free(msg);
2206
2207        return NULL;
2208}
2209EXPORT_SYMBOL(lnet_create_reply_msg);
2210
2211void
2212lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2213{
2214        /* Set the REPLY length, now the RDMA that elides the REPLY message has
2215         * completed and I know it. */
2216        LASSERT(reply != NULL);
2217        LASSERT(reply->msg_type == LNET_MSG_GET);
2218        LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
2219
2220        /* NB I trusted my peer to RDMA.  If she tells me she's written beyond
2221         * the end of my buffer, I might as well be dead. */
2222        LASSERT(len <= reply->msg_ev.mlength);
2223
2224        reply->msg_ev.mlength = len;
2225}
2226EXPORT_SYMBOL(lnet_set_reply_msg_len);
2227
2228/**
2229 * Initiate an asynchronous GET operation.
2230 *
2231 * On the initiator node, an LNET_EVENT_SEND is logged when the GET request
2232 * is sent, and an LNET_EVENT_REPLY is logged when the data returned from
2233 * the target node in the REPLY has been written to local MD.
2234 *
2235 * On the target node, an LNET_EVENT_GET is logged when the GET request
2236 * arrives and is accepted into a MD.
2237 *
2238 * \param self,target,portal,match_bits,offset See the discussion in LNetPut().
2239 * \param mdh A handle for the MD that describes the memory into which the
2240 * requested data will be received. The MD must be "free floating"
2241 * (See LNetMDBind()).
2242 *
2243 * \retval  0      Success, and only in this case events will be generated
2244 * and logged to EQ (if it exists) of the MD.
2245 * \retval -EIO    Simulated failure.
2246 * \retval -ENOMEM Memory allocation failure.
2247 * \retval -ENOENT Invalid MD object.
2248 */
2249int
2250LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
2251        lnet_process_id_t target, unsigned int portal,
2252        __u64 match_bits, unsigned int offset)
2253{
2254        struct lnet_msg         *msg;
2255        struct lnet_libmd       *md;
2256        int                     cpt;
2257        int                     rc;
2258
2259        LASSERT(the_lnet.ln_init);
2260        LASSERT(the_lnet.ln_refcount > 0);
2261
2262        if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
2263            fail_peer(target.nid, 1)) {   /* shall we now? */
2264                CERROR("Dropping GET to %s: simulated failure\n",
2265                       libcfs_id2str(target));
2266                return -EIO;
2267        }
2268
2269        msg = lnet_msg_alloc();
2270        if (msg == NULL) {
2271                CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2272                       libcfs_id2str(target));
2273                return -ENOMEM;
2274        }
2275
2276        cpt = lnet_cpt_of_cookie(mdh.cookie);
2277        lnet_res_lock(cpt);
2278
2279        md = lnet_handle2md(&mdh);
2280        if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2281                CERROR("Dropping GET ("LPU64":%d:%s): MD (%d) invalid\n",
2282                       match_bits, portal, libcfs_id2str(target),
2283                       md == NULL ? -1 : md->md_threshold);
2284                if (md != NULL && md->md_me != NULL)
2285                        CERROR("REPLY MD also attached to portal %d\n",
2286                               md->md_me->me_portal);
2287
2288                lnet_res_unlock(cpt);
2289
2290                lnet_msg_free(msg);
2291
2292                return -ENOENT;
2293        }
2294
2295        CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2296
2297        lnet_msg_attach_md(msg, md, 0, 0);
2298
2299        lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2300
2301        msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2302        msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2303        msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2304        msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2305
2306        /* NB handles only looked up by creator (no flips) */
2307        msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
2308                the_lnet.ln_interface_cookie;
2309        msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
2310                md->md_lh.lh_cookie;
2311
2312        lnet_res_unlock(cpt);
2313
2314        lnet_build_msg_event(msg, LNET_EVENT_SEND);
2315
2316        rc = lnet_send(self, msg, LNET_NID_ANY);
2317        if (rc < 0) {
2318                CNETERR("Error sending GET to %s: %d\n",
2319                       libcfs_id2str(target), rc);
2320                lnet_finalize(NULL, msg, rc);
2321        }
2322
2323        /* completion will be signalled by an event */
2324        return 0;
2325}
2326EXPORT_SYMBOL(LNetGet);
2327
2328/**
2329 * Calculate distance to node at \a dstnid.
2330 *
2331 * \param dstnid Target NID.
2332 * \param srcnidp If not NULL, NID of the local interface to reach \a dstnid
2333 * is saved here.
2334 * \param orderp If not NULL, order of the route to reach \a dstnid is saved
2335 * here.
2336 *
2337 * \retval 0 If \a dstnid belongs to a local interface, and reserved option
2338 * local_nid_dist_zero is set, which is the default.
2339 * \retval positives Distance to target NID, i.e. number of hops plus one.
2340 * \retval -EHOSTUNREACH If \a dstnid is not reachable.
2341 */
2342int
2343LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2344{
2345        struct list_head                *e;
2346        struct lnet_ni          *ni;
2347        lnet_remotenet_t        *rnet;
2348        __u32                   dstnet = LNET_NIDNET(dstnid);
2349        int                     hops;
2350        int                     cpt;
2351        __u32                   order = 2;
2352        struct list_head                *rn_list;
2353
2354        /* if !local_nid_dist_zero, I don't return a distance of 0 ever
2355         * (when lustre sees a distance of 0, it substitutes 0@lo), so I
2356         * keep order 0 free for 0@lo and order 1 free for a local NID
2357         * match */
2358
2359        LASSERT(the_lnet.ln_init);
2360        LASSERT(the_lnet.ln_refcount > 0);
2361
2362        cpt = lnet_net_lock_current();
2363
2364        list_for_each(e, &the_lnet.ln_nis) {
2365                ni = list_entry(e, lnet_ni_t, ni_list);
2366
2367                if (ni->ni_nid == dstnid) {
2368                        if (srcnidp != NULL)
2369                                *srcnidp = dstnid;
2370                        if (orderp != NULL) {
2371                                if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2372                                        *orderp = 0;
2373                                else
2374                                        *orderp = 1;
2375                        }
2376                        lnet_net_unlock(cpt);
2377
2378                        return local_nid_dist_zero ? 0 : 1;
2379                }
2380
2381                if (LNET_NIDNET(ni->ni_nid) == dstnet) {
2382                        if (srcnidp != NULL)
2383                                *srcnidp = ni->ni_nid;
2384                        if (orderp != NULL)
2385                                *orderp = order;
2386                        lnet_net_unlock(cpt);
2387                        return 1;
2388                }
2389
2390                order++;
2391        }
2392
2393        rn_list = lnet_net2rnethash(dstnet);
2394        list_for_each(e, rn_list) {
2395                rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2396
2397                if (rnet->lrn_net == dstnet) {
2398                        lnet_route_t *route;
2399                        lnet_route_t *shortest = NULL;
2400
2401                        LASSERT(!list_empty(&rnet->lrn_routes));
2402
2403                        list_for_each_entry(route, &rnet->lrn_routes,
2404                                                lr_list) {
2405                                if (shortest == NULL ||
2406                                    route->lr_hops < shortest->lr_hops)
2407                                        shortest = route;
2408                        }
2409
2410                        LASSERT(shortest != NULL);
2411                        hops = shortest->lr_hops;
2412                        if (srcnidp != NULL)
2413                                *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
2414                        if (orderp != NULL)
2415                                *orderp = order;
2416                        lnet_net_unlock(cpt);
2417                        return hops + 1;
2418                }
2419                order++;
2420        }
2421
2422        lnet_net_unlock(cpt);
2423        return -EHOSTUNREACH;
2424}
2425EXPORT_SYMBOL(LNetDist);
2426
2427/**
2428 * Set the number of asynchronous messages expected from a target process.
2429 *
2430 * This function is only meaningful for userspace callers. It's a no-op when
2431 * called from kernel.
2432 *
2433 * Asynchronous messages are those that can come from a target when the
2434 * userspace process is not waiting for IO to complete; e.g., AST callbacks
2435 * from Lustre servers. Specifying the expected number of such messages
2436 * allows them to be eagerly received when user process is not running in
2437 * LNet; otherwise network errors may occur.
2438 *
2439 * \param id Process ID of the target process.
2440 * \param nasync Number of asynchronous messages expected from the target.
2441 *
2442 * \return 0 on success, and an error code otherwise.
2443 */
2444int
2445LNetSetAsync(lnet_process_id_t id, int nasync)
2446{
2447        return 0;
2448}
2449EXPORT_SYMBOL(LNetSetAsync);
2450