linux/drivers/staging/lustre/lnet/lnet/lib-ptl.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * GPL HEADER END
  17 */
  18/*
  19 * Copyright (c) 2012, 2015, Intel Corporation.
  20 */
  21/*
  22 * This file is part of Lustre, http://www.lustre.org/
  23 * Lustre is a trademark of Sun Microsystems, Inc.
  24 *
  25 * lnet/lnet/lib-ptl.c
  26 *
  27 * portal & match routines
  28 *
  29 * Author: liang@whamcloud.com
  30 */
  31
  32#define DEBUG_SUBSYSTEM S_LNET
  33
  34#include "../../include/linux/lnet/lib-lnet.h"
  35
  36/* NB: add /proc interfaces in upcoming patches */
  37int portal_rotor = LNET_PTL_ROTOR_HASH_RT;
  38module_param(portal_rotor, int, 0644);
  39MODULE_PARM_DESC(portal_rotor, "redirect PUTs to different cpu-partitions");
  40
  41static int
  42lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
  43                    __u64 mbits, __u64 ignore_bits)
  44{
  45        struct lnet_portal *ptl = the_lnet.ln_portals[index];
  46        int unique;
  47
  48        unique = !ignore_bits &&
  49                 match_id.nid != LNET_NID_ANY &&
  50                 match_id.pid != LNET_PID_ANY;
  51
  52        LASSERT(!lnet_ptl_is_unique(ptl) || !lnet_ptl_is_wildcard(ptl));
  53
  54        /* prefer to check w/o any lock */
  55        if (likely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl)))
  56                goto match;
  57
  58        /* unset, new portal */
  59        lnet_ptl_lock(ptl);
  60        /* check again with lock */
  61        if (unlikely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl))) {
  62                lnet_ptl_unlock(ptl);
  63                goto match;
  64        }
  65
  66        /* still not set */
  67        if (unique)
  68                lnet_ptl_setopt(ptl, LNET_PTL_MATCH_UNIQUE);
  69        else
  70                lnet_ptl_setopt(ptl, LNET_PTL_MATCH_WILDCARD);
  71
  72        lnet_ptl_unlock(ptl);
  73
  74        return 1;
  75
  76 match:
  77        if ((lnet_ptl_is_unique(ptl) && !unique) ||
  78            (lnet_ptl_is_wildcard(ptl) && unique))
  79                return 0;
  80        return 1;
  81}
  82
  83static void
  84lnet_ptl_enable_mt(struct lnet_portal *ptl, int cpt)
  85{
  86        struct lnet_match_table *mtable = ptl->ptl_mtables[cpt];
  87        int i;
  88
  89        /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
  90        LASSERT(lnet_ptl_is_wildcard(ptl));
  91
  92        mtable->mt_enabled = 1;
  93
  94        ptl->ptl_mt_maps[ptl->ptl_mt_nmaps] = cpt;
  95        for (i = ptl->ptl_mt_nmaps - 1; i >= 0; i--) {
  96                LASSERT(ptl->ptl_mt_maps[i] != cpt);
  97                if (ptl->ptl_mt_maps[i] < cpt)
  98                        break;
  99
 100                /* swap to order */
 101                ptl->ptl_mt_maps[i + 1] = ptl->ptl_mt_maps[i];
 102                ptl->ptl_mt_maps[i] = cpt;
 103        }
 104
 105        ptl->ptl_mt_nmaps++;
 106}
 107
 108static void
 109lnet_ptl_disable_mt(struct lnet_portal *ptl, int cpt)
 110{
 111        struct lnet_match_table *mtable = ptl->ptl_mtables[cpt];
 112        int i;
 113
 114        /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
 115        LASSERT(lnet_ptl_is_wildcard(ptl));
 116
 117        if (LNET_CPT_NUMBER == 1)
 118                return; /* never disable the only match-table */
 119
 120        mtable->mt_enabled = 0;
 121
 122        LASSERT(ptl->ptl_mt_nmaps > 0 &&
 123                ptl->ptl_mt_nmaps <= LNET_CPT_NUMBER);
 124
 125        /* remove it from mt_maps */
 126        ptl->ptl_mt_nmaps--;
 127        for (i = 0; i < ptl->ptl_mt_nmaps; i++) {
 128                if (ptl->ptl_mt_maps[i] >= cpt) /* overwrite it */
 129                        ptl->ptl_mt_maps[i] = ptl->ptl_mt_maps[i + 1];
 130        }
 131}
 132
 133static int
 134lnet_try_match_md(lnet_libmd_t *md,
 135                  struct lnet_match_info *info, struct lnet_msg *msg)
 136{
 137        /*
 138         * ALWAYS called holding the lnet_res_lock, and can't lnet_res_unlock;
 139         * lnet_match_blocked_msg() relies on this to avoid races
 140         */
 141        unsigned int offset;
 142        unsigned int mlength;
 143        lnet_me_t *me = md->md_me;
 144
 145        /* MD exhausted */
 146        if (lnet_md_exhausted(md))
 147                return LNET_MATCHMD_NONE | LNET_MATCHMD_EXHAUSTED;
 148
 149        /* mismatched MD op */
 150        if (!(md->md_options & info->mi_opc))
 151                return LNET_MATCHMD_NONE;
 152
 153        /* mismatched ME nid/pid? */
 154        if (me->me_match_id.nid != LNET_NID_ANY &&
 155            me->me_match_id.nid != info->mi_id.nid)
 156                return LNET_MATCHMD_NONE;
 157
 158        if (me->me_match_id.pid != LNET_PID_ANY &&
 159            me->me_match_id.pid != info->mi_id.pid)
 160                return LNET_MATCHMD_NONE;
 161
 162        /* mismatched ME matchbits? */
 163        if ((me->me_match_bits ^ info->mi_mbits) & ~me->me_ignore_bits)
 164                return LNET_MATCHMD_NONE;
 165
 166        /* Hurrah! This _is_ a match; check it out... */
 167
 168        if (!(md->md_options & LNET_MD_MANAGE_REMOTE))
 169                offset = md->md_offset;
 170        else
 171                offset = info->mi_roffset;
 172
 173        if (md->md_options & LNET_MD_MAX_SIZE) {
 174                mlength = md->md_max_size;
 175                LASSERT(md->md_offset + mlength <= md->md_length);
 176        } else {
 177                mlength = md->md_length - offset;
 178        }
 179
 180        if (info->mi_rlength <= mlength) {      /* fits in allowed space */
 181                mlength = info->mi_rlength;
 182        } else if (!(md->md_options & LNET_MD_TRUNCATE)) {
 183                /* this packet _really_ is too big */
 184                CERROR("Matching packet from %s, match %llu length %d too big: %d left, %d allowed\n",
 185                       libcfs_id2str(info->mi_id), info->mi_mbits,
 186                       info->mi_rlength, md->md_length - offset, mlength);
 187
 188                return LNET_MATCHMD_DROP;
 189        }
 190
 191        /* Commit to this ME/MD */
 192        CDEBUG(D_NET, "Incoming %s index %x from %s of length %d/%d into md %#llx [%d] + %d\n",
 193               (info->mi_opc == LNET_MD_OP_PUT) ? "put" : "get",
 194               info->mi_portal, libcfs_id2str(info->mi_id), mlength,
 195               info->mi_rlength, md->md_lh.lh_cookie, md->md_niov, offset);
 196
 197        lnet_msg_attach_md(msg, md, offset, mlength);
 198        md->md_offset = offset + mlength;
 199
 200        if (!lnet_md_exhausted(md))
 201                return LNET_MATCHMD_OK;
 202
 203        /*
 204         * Auto-unlink NOW, so the ME gets unlinked if required.
 205         * We bumped md->md_refcount above so the MD just gets flagged
 206         * for unlink when it is finalized.
 207         */
 208        if (md->md_flags & LNET_MD_FLAG_AUTO_UNLINK)
 209                lnet_md_unlink(md);
 210
 211        return LNET_MATCHMD_OK | LNET_MATCHMD_EXHAUSTED;
 212}
 213
 214static struct lnet_match_table *
 215lnet_match2mt(struct lnet_portal *ptl, lnet_process_id_t id, __u64 mbits)
 216{
 217        if (LNET_CPT_NUMBER == 1)
 218                return ptl->ptl_mtables[0]; /* the only one */
 219
 220        /* if it's a unique portal, return match-table hashed by NID */
 221        return lnet_ptl_is_unique(ptl) ?
 222               ptl->ptl_mtables[lnet_cpt_of_nid(id.nid)] : NULL;
 223}
 224
 225struct lnet_match_table *
 226lnet_mt_of_attach(unsigned int index, lnet_process_id_t id,
 227                  __u64 mbits, __u64 ignore_bits, lnet_ins_pos_t pos)
 228{
 229        struct lnet_portal *ptl;
 230        struct lnet_match_table *mtable;
 231
 232        /* NB: called w/o lock */
 233        LASSERT(index < the_lnet.ln_nportals);
 234
 235        if (!lnet_ptl_match_type(index, id, mbits, ignore_bits))
 236                return NULL;
 237
 238        ptl = the_lnet.ln_portals[index];
 239
 240        mtable = lnet_match2mt(ptl, id, mbits);
 241        if (mtable) /* unique portal or only one match-table */
 242                return mtable;
 243
 244        /* it's a wildcard portal */
 245        switch (pos) {
 246        default:
 247                return NULL;
 248        case LNET_INS_BEFORE:
 249        case LNET_INS_AFTER:
 250                /*
 251                 * posted by no affinity thread, always hash to specific
 252                 * match-table to avoid buffer stealing which is heavy
 253                 */
 254                return ptl->ptl_mtables[ptl->ptl_index % LNET_CPT_NUMBER];
 255        case LNET_INS_LOCAL:
 256                /* posted by cpu-affinity thread */
 257                return ptl->ptl_mtables[lnet_cpt_current()];
 258        }
 259}
 260
 261static struct lnet_match_table *
 262lnet_mt_of_match(struct lnet_match_info *info, struct lnet_msg *msg)
 263{
 264        struct lnet_match_table *mtable;
 265        struct lnet_portal *ptl;
 266        unsigned int nmaps;
 267        unsigned int rotor;
 268        unsigned int cpt;
 269        bool routed;
 270
 271        /* NB: called w/o lock */
 272        LASSERT(info->mi_portal < the_lnet.ln_nportals);
 273        ptl = the_lnet.ln_portals[info->mi_portal];
 274
 275        LASSERT(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl));
 276
 277        mtable = lnet_match2mt(ptl, info->mi_id, info->mi_mbits);
 278        if (mtable)
 279                return mtable;
 280
 281        /* it's a wildcard portal */
 282        routed = LNET_NIDNET(msg->msg_hdr.src_nid) !=
 283                 LNET_NIDNET(msg->msg_hdr.dest_nid);
 284
 285        if (portal_rotor == LNET_PTL_ROTOR_OFF ||
 286            (portal_rotor != LNET_PTL_ROTOR_ON && !routed)) {
 287                cpt = lnet_cpt_current();
 288                if (ptl->ptl_mtables[cpt]->mt_enabled)
 289                        return ptl->ptl_mtables[cpt];
 290        }
 291
 292        rotor = ptl->ptl_rotor++; /* get round-robin factor */
 293        if (portal_rotor == LNET_PTL_ROTOR_HASH_RT && routed)
 294                cpt = lnet_cpt_of_nid(msg->msg_hdr.src_nid);
 295        else
 296                cpt = rotor % LNET_CPT_NUMBER;
 297
 298        if (!ptl->ptl_mtables[cpt]->mt_enabled) {
 299                /* is there any active entry for this portal? */
 300                nmaps = ptl->ptl_mt_nmaps;
 301                /* map to an active mtable to avoid heavy "stealing" */
 302                if (nmaps) {
 303                        /*
 304                         * NB: there is possibility that ptl_mt_maps is being
 305                         * changed because we are not under protection of
 306                         * lnet_ptl_lock, but it shouldn't hurt anything
 307                         */
 308                        cpt = ptl->ptl_mt_maps[rotor % nmaps];
 309                }
 310        }
 311
 312        return ptl->ptl_mtables[cpt];
 313}
 314
 315static int
 316lnet_mt_test_exhausted(struct lnet_match_table *mtable, int pos)
 317{
 318        __u64 *bmap;
 319        int i;
 320
 321        if (!lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
 322                return 0;
 323
 324        if (pos < 0) { /* check all bits */
 325                for (i = 0; i < LNET_MT_EXHAUSTED_BMAP; i++) {
 326                        if (mtable->mt_exhausted[i] != (__u64)(-1))
 327                                return 0;
 328                }
 329                return 1;
 330        }
 331
 332        LASSERT(pos <= LNET_MT_HASH_IGNORE);
 333        /* mtable::mt_mhash[pos] is marked as exhausted or not */
 334        bmap = &mtable->mt_exhausted[pos >> LNET_MT_BITS_U64];
 335        pos &= (1 << LNET_MT_BITS_U64) - 1;
 336
 337        return (*bmap & (1ULL << pos));
 338}
 339
 340static void
 341lnet_mt_set_exhausted(struct lnet_match_table *mtable, int pos, int exhausted)
 342{
 343        __u64 *bmap;
 344
 345        LASSERT(lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]));
 346        LASSERT(pos <= LNET_MT_HASH_IGNORE);
 347
 348        /* set mtable::mt_mhash[pos] as exhausted/non-exhausted */
 349        bmap = &mtable->mt_exhausted[pos >> LNET_MT_BITS_U64];
 350        pos &= (1 << LNET_MT_BITS_U64) - 1;
 351
 352        if (!exhausted)
 353                *bmap &= ~(1ULL << pos);
 354        else
 355                *bmap |= 1ULL << pos;
 356}
 357
 358struct list_head *
 359lnet_mt_match_head(struct lnet_match_table *mtable,
 360                   lnet_process_id_t id, __u64 mbits)
 361{
 362        struct lnet_portal *ptl = the_lnet.ln_portals[mtable->mt_portal];
 363        unsigned long hash = mbits;
 364
 365        if (!lnet_ptl_is_wildcard(ptl)) {
 366                hash += id.nid + id.pid;
 367
 368                LASSERT(lnet_ptl_is_unique(ptl));
 369                hash = hash_long(hash, LNET_MT_HASH_BITS);
 370        }
 371        return &mtable->mt_mhash[hash & LNET_MT_HASH_MASK];
 372}
 373
 374int
 375lnet_mt_match_md(struct lnet_match_table *mtable,
 376                 struct lnet_match_info *info, struct lnet_msg *msg)
 377{
 378        struct list_head *head;
 379        lnet_me_t *me;
 380        lnet_me_t *tmp;
 381        int exhausted = 0;
 382        int rc;
 383
 384        /* any ME with ignore bits? */
 385        if (!list_empty(&mtable->mt_mhash[LNET_MT_HASH_IGNORE]))
 386                head = &mtable->mt_mhash[LNET_MT_HASH_IGNORE];
 387        else
 388                head = lnet_mt_match_head(mtable, info->mi_id, info->mi_mbits);
 389 again:
 390        /* NB: only wildcard portal needs to return LNET_MATCHMD_EXHAUSTED */
 391        if (lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
 392                exhausted = LNET_MATCHMD_EXHAUSTED;
 393
 394        list_for_each_entry_safe(me, tmp, head, me_list) {
 395                /* ME attached but MD not attached yet */
 396                if (!me->me_md)
 397                        continue;
 398
 399                LASSERT(me == me->me_md->md_me);
 400
 401                rc = lnet_try_match_md(me->me_md, info, msg);
 402                if (!(rc & LNET_MATCHMD_EXHAUSTED))
 403                        exhausted = 0; /* mlist is not empty */
 404
 405                if (rc & LNET_MATCHMD_FINISH) {
 406                        /*
 407                         * don't return EXHAUSTED bit because we don't know
 408                         * whether the mlist is empty or not
 409                         */
 410                        return rc & ~LNET_MATCHMD_EXHAUSTED;
 411                }
 412        }
 413
 414        if (exhausted == LNET_MATCHMD_EXHAUSTED) { /* @head is exhausted */
 415                lnet_mt_set_exhausted(mtable, head - mtable->mt_mhash, 1);
 416                if (!lnet_mt_test_exhausted(mtable, -1))
 417                        exhausted = 0;
 418        }
 419
 420        if (!exhausted && head == &mtable->mt_mhash[LNET_MT_HASH_IGNORE]) {
 421                head = lnet_mt_match_head(mtable, info->mi_id, info->mi_mbits);
 422                goto again; /* re-check MEs w/o ignore-bits */
 423        }
 424
 425        if (info->mi_opc == LNET_MD_OP_GET ||
 426            !lnet_ptl_is_lazy(the_lnet.ln_portals[info->mi_portal]))
 427                return exhausted | LNET_MATCHMD_DROP;
 428
 429        return exhausted | LNET_MATCHMD_NONE;
 430}
 431
 432static int
 433lnet_ptl_match_early(struct lnet_portal *ptl, struct lnet_msg *msg)
 434{
 435        int rc;
 436
 437        /*
 438         * message arrived before any buffer posting on this portal,
 439         * simply delay or drop this message
 440         */
 441        if (likely(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)))
 442                return 0;
 443
 444        lnet_ptl_lock(ptl);
 445        /* check it again with hold of lock */
 446        if (lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)) {
 447                lnet_ptl_unlock(ptl);
 448                return 0;
 449        }
 450
 451        if (lnet_ptl_is_lazy(ptl)) {
 452                if (msg->msg_rx_ready_delay) {
 453                        msg->msg_rx_delayed = 1;
 454                        list_add_tail(&msg->msg_list,
 455                                      &ptl->ptl_msg_delayed);
 456                }
 457                rc = LNET_MATCHMD_NONE;
 458        } else {
 459                rc = LNET_MATCHMD_DROP;
 460        }
 461
 462        lnet_ptl_unlock(ptl);
 463        return rc;
 464}
 465
 466static int
 467lnet_ptl_match_delay(struct lnet_portal *ptl,
 468                     struct lnet_match_info *info, struct lnet_msg *msg)
 469{
 470        int first = ptl->ptl_mt_maps[0]; /* read w/o lock */
 471        int rc = 0;
 472        int i;
 473
 474        /**
 475         * Steal buffer from other CPTs, and delay msg if nothing to
 476         * steal. This function is more expensive than a regular
 477         * match, but we don't expect it can happen a lot. The return
 478         * code contains one of LNET_MATCHMD_OK, LNET_MATCHMD_DROP, or
 479         * LNET_MATCHMD_NONE.
 480         */
 481        LASSERT(lnet_ptl_is_wildcard(ptl));
 482
 483        for (i = 0; i < LNET_CPT_NUMBER; i++) {
 484                struct lnet_match_table *mtable;
 485                int cpt;
 486
 487                cpt = (first + i) % LNET_CPT_NUMBER;
 488                mtable = ptl->ptl_mtables[cpt];
 489                if (i && i != LNET_CPT_NUMBER - 1 && !mtable->mt_enabled)
 490                        continue;
 491
 492                lnet_res_lock(cpt);
 493                lnet_ptl_lock(ptl);
 494
 495                if (!i) {
 496                        /* The first try, add to stealing list. */
 497                        list_add_tail(&msg->msg_list,
 498                                      &ptl->ptl_msg_stealing);
 499                }
 500
 501                if (!list_empty(&msg->msg_list)) {
 502                        /* On stealing list. */
 503                        rc = lnet_mt_match_md(mtable, info, msg);
 504
 505                        if ((rc & LNET_MATCHMD_EXHAUSTED) &&
 506                            mtable->mt_enabled)
 507                                lnet_ptl_disable_mt(ptl, cpt);
 508
 509                        if (rc & LNET_MATCHMD_FINISH) {
 510                                /* Match found, remove from stealing list. */
 511                                list_del_init(&msg->msg_list);
 512                        } else if (i == LNET_CPT_NUMBER - 1 ||  /* (1) */
 513                                   !ptl->ptl_mt_nmaps ||        /* (2) */
 514                                   (ptl->ptl_mt_nmaps == 1 &&   /* (3) */
 515                                    ptl->ptl_mt_maps[0] == cpt)) {
 516                                /**
 517                                 * No match found, and this is either
 518                                 * (1) the last cpt to check, or
 519                                 * (2) there is no active cpt, or
 520                                 * (3) this is the only active cpt.
 521                                 * There is nothing to steal: delay or
 522                                 * drop the message.
 523                                 */
 524                                list_del_init(&msg->msg_list);
 525
 526                                if (lnet_ptl_is_lazy(ptl)) {
 527                                        msg->msg_rx_delayed = 1;
 528                                        list_add_tail(&msg->msg_list,
 529                                                      &ptl->ptl_msg_delayed);
 530                                        rc = LNET_MATCHMD_NONE;
 531                                } else {
 532                                        rc = LNET_MATCHMD_DROP;
 533                                }
 534                        } else {
 535                                /* Do another iteration. */
 536                                rc = 0;
 537                        }
 538                } else {
 539                        /**
 540                         * No longer on stealing list: another thread
 541                         * matched the message in lnet_ptl_attach_md().
 542                         * We are now expected to handle the message.
 543                         */
 544                        rc = !msg->msg_md ?
 545                             LNET_MATCHMD_DROP : LNET_MATCHMD_OK;
 546                }
 547
 548                lnet_ptl_unlock(ptl);
 549                lnet_res_unlock(cpt);
 550
 551                /**
 552                 * Note that test (1) above ensures that we always
 553                 * exit the loop through this break statement.
 554                 *
 555                 * LNET_MATCHMD_NONE means msg was added to the
 556                 * delayed queue, and we may no longer reference it
 557                 * after lnet_ptl_unlock() and lnet_res_unlock().
 558                 */
 559                if (rc & (LNET_MATCHMD_FINISH | LNET_MATCHMD_NONE))
 560                        break;
 561        }
 562
 563        return rc;
 564}
 565
 566int
 567lnet_ptl_match_md(struct lnet_match_info *info, struct lnet_msg *msg)
 568{
 569        struct lnet_match_table *mtable;
 570        struct lnet_portal *ptl;
 571        int rc;
 572
 573        CDEBUG(D_NET, "Request from %s of length %d into portal %d MB=%#llx\n",
 574               libcfs_id2str(info->mi_id), info->mi_rlength, info->mi_portal,
 575               info->mi_mbits);
 576
 577        if (info->mi_portal >= the_lnet.ln_nportals) {
 578                CERROR("Invalid portal %d not in [0-%d]\n",
 579                       info->mi_portal, the_lnet.ln_nportals);
 580                return LNET_MATCHMD_DROP;
 581        }
 582
 583        ptl = the_lnet.ln_portals[info->mi_portal];
 584        rc = lnet_ptl_match_early(ptl, msg);
 585        if (rc) /* matched or delayed early message */
 586                return rc;
 587
 588        mtable = lnet_mt_of_match(info, msg);
 589        lnet_res_lock(mtable->mt_cpt);
 590
 591        if (the_lnet.ln_shutdown) {
 592                rc = LNET_MATCHMD_DROP;
 593                goto out1;
 594        }
 595
 596        rc = lnet_mt_match_md(mtable, info, msg);
 597        if ((rc & LNET_MATCHMD_EXHAUSTED) && mtable->mt_enabled) {
 598                lnet_ptl_lock(ptl);
 599                lnet_ptl_disable_mt(ptl, mtable->mt_cpt);
 600                lnet_ptl_unlock(ptl);
 601        }
 602
 603        if (rc & LNET_MATCHMD_FINISH)   /* matched or dropping */
 604                goto out1;
 605
 606        if (!msg->msg_rx_ready_delay)
 607                goto out1;
 608
 609        LASSERT(lnet_ptl_is_lazy(ptl));
 610        LASSERT(!msg->msg_rx_delayed);
 611
 612        /* NB: we don't expect "delay" can happen a lot */
 613        if (lnet_ptl_is_unique(ptl) || LNET_CPT_NUMBER == 1) {
 614                lnet_ptl_lock(ptl);
 615
 616                msg->msg_rx_delayed = 1;
 617                list_add_tail(&msg->msg_list, &ptl->ptl_msg_delayed);
 618
 619                lnet_ptl_unlock(ptl);
 620                lnet_res_unlock(mtable->mt_cpt);
 621                rc = LNET_MATCHMD_NONE;
 622        } else  {
 623                lnet_res_unlock(mtable->mt_cpt);
 624                rc = lnet_ptl_match_delay(ptl, info, msg);
 625        }
 626
 627        /* LNET_MATCHMD_NONE means msg was added to the delay queue */
 628        if (rc & LNET_MATCHMD_NONE) {
 629                CDEBUG(D_NET,
 630                       "Delaying %s from %s ptl %d MB %#llx off %d len %d\n",
 631                       info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET",
 632                       libcfs_id2str(info->mi_id), info->mi_portal,
 633                       info->mi_mbits, info->mi_roffset, info->mi_rlength);
 634        }
 635        goto out0;
 636 out1:
 637        lnet_res_unlock(mtable->mt_cpt);
 638 out0:
 639        /* EXHAUSTED bit is only meaningful for internal functions */
 640        return rc & ~LNET_MATCHMD_EXHAUSTED;
 641}
 642
 643void
 644lnet_ptl_detach_md(lnet_me_t *me, lnet_libmd_t *md)
 645{
 646        LASSERT(me->me_md == md && md->md_me == me);
 647
 648        me->me_md = NULL;
 649        md->md_me = NULL;
 650}
 651
 652/* called with lnet_res_lock held */
 653void
 654lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md,
 655                   struct list_head *matches, struct list_head *drops)
 656{
 657        struct lnet_portal *ptl = the_lnet.ln_portals[me->me_portal];
 658        struct lnet_match_table *mtable;
 659        struct list_head *head;
 660        lnet_msg_t *tmp;
 661        lnet_msg_t *msg;
 662        int exhausted = 0;
 663        int cpt;
 664
 665        LASSERT(!md->md_refcount); /* a brand new MD */
 666
 667        me->me_md = md;
 668        md->md_me = me;
 669
 670        cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
 671        mtable = ptl->ptl_mtables[cpt];
 672
 673        if (list_empty(&ptl->ptl_msg_stealing) &&
 674            list_empty(&ptl->ptl_msg_delayed) &&
 675            !lnet_mt_test_exhausted(mtable, me->me_pos))
 676                return;
 677
 678        lnet_ptl_lock(ptl);
 679        head = &ptl->ptl_msg_stealing;
 680 again:
 681        list_for_each_entry_safe(msg, tmp, head, msg_list) {
 682                struct lnet_match_info info;
 683                lnet_hdr_t *hdr;
 684                int rc;
 685
 686                LASSERT(msg->msg_rx_delayed || head == &ptl->ptl_msg_stealing);
 687
 688                hdr = &msg->msg_hdr;
 689                info.mi_id.nid  = hdr->src_nid;
 690                info.mi_id.pid  = hdr->src_pid;
 691                info.mi_opc     = LNET_MD_OP_PUT;
 692                info.mi_portal  = hdr->msg.put.ptl_index;
 693                info.mi_rlength = hdr->payload_length;
 694                info.mi_roffset = hdr->msg.put.offset;
 695                info.mi_mbits   = hdr->msg.put.match_bits;
 696
 697                rc = lnet_try_match_md(md, &info, msg);
 698
 699                exhausted = (rc & LNET_MATCHMD_EXHAUSTED);
 700                if (rc & LNET_MATCHMD_NONE) {
 701                        if (exhausted)
 702                                break;
 703                        continue;
 704                }
 705
 706                /* Hurrah! This _is_ a match */
 707                LASSERT(rc & LNET_MATCHMD_FINISH);
 708                list_del_init(&msg->msg_list);
 709
 710                if (head == &ptl->ptl_msg_stealing) {
 711                        if (exhausted)
 712                                break;
 713                        /* stealing thread will handle the message */
 714                        continue;
 715                }
 716
 717                if (rc & LNET_MATCHMD_OK) {
 718                        list_add_tail(&msg->msg_list, matches);
 719
 720                        CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d match %llu offset %d length %d.\n",
 721                               libcfs_id2str(info.mi_id),
 722                               info.mi_portal, info.mi_mbits,
 723                               info.mi_roffset, info.mi_rlength);
 724                } else {
 725                        list_add_tail(&msg->msg_list, drops);
 726                }
 727
 728                if (exhausted)
 729                        break;
 730        }
 731
 732        if (!exhausted && head == &ptl->ptl_msg_stealing) {
 733                head = &ptl->ptl_msg_delayed;
 734                goto again;
 735        }
 736
 737        if (lnet_ptl_is_wildcard(ptl) && !exhausted) {
 738                lnet_mt_set_exhausted(mtable, me->me_pos, 0);
 739                if (!mtable->mt_enabled)
 740                        lnet_ptl_enable_mt(ptl, cpt);
 741        }
 742
 743        lnet_ptl_unlock(ptl);
 744}
 745
 746static void
 747lnet_ptl_cleanup(struct lnet_portal *ptl)
 748{
 749        struct lnet_match_table *mtable;
 750        int i;
 751
 752        if (!ptl->ptl_mtables) /* uninitialized portal */
 753                return;
 754
 755        LASSERT(list_empty(&ptl->ptl_msg_delayed));
 756        LASSERT(list_empty(&ptl->ptl_msg_stealing));
 757        cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
 758                struct list_head *mhash;
 759                lnet_me_t *me;
 760                int j;
 761
 762                if (!mtable->mt_mhash) /* uninitialized match-table */
 763                        continue;
 764
 765                mhash = mtable->mt_mhash;
 766                /* cleanup ME */
 767                for (j = 0; j < LNET_MT_HASH_SIZE + 1; j++) {
 768                        while (!list_empty(&mhash[j])) {
 769                                me = list_entry(mhash[j].next,
 770                                                lnet_me_t, me_list);
 771                                CERROR("Active ME %p on exit\n", me);
 772                                list_del(&me->me_list);
 773                                lnet_me_free(me);
 774                        }
 775                }
 776                /* the extra entry is for MEs with ignore bits */
 777                LIBCFS_FREE(mhash, sizeof(*mhash) * (LNET_MT_HASH_SIZE + 1));
 778        }
 779
 780        cfs_percpt_free(ptl->ptl_mtables);
 781        ptl->ptl_mtables = NULL;
 782}
 783
 784static int
 785lnet_ptl_setup(struct lnet_portal *ptl, int index)
 786{
 787        struct lnet_match_table *mtable;
 788        struct list_head *mhash;
 789        int i;
 790        int j;
 791
 792        ptl->ptl_mtables = cfs_percpt_alloc(lnet_cpt_table(),
 793                                            sizeof(struct lnet_match_table));
 794        if (!ptl->ptl_mtables) {
 795                CERROR("Failed to create match table for portal %d\n", index);
 796                return -ENOMEM;
 797        }
 798
 799        ptl->ptl_index = index;
 800        INIT_LIST_HEAD(&ptl->ptl_msg_delayed);
 801        INIT_LIST_HEAD(&ptl->ptl_msg_stealing);
 802        spin_lock_init(&ptl->ptl_lock);
 803        cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
 804                /* the extra entry is for MEs with ignore bits */
 805                LIBCFS_CPT_ALLOC(mhash, lnet_cpt_table(), i,
 806                                 sizeof(*mhash) * (LNET_MT_HASH_SIZE + 1));
 807                if (!mhash) {
 808                        CERROR("Failed to create match hash for portal %d\n",
 809                               index);
 810                        goto failed;
 811                }
 812
 813                memset(&mtable->mt_exhausted[0], -1,
 814                       sizeof(mtable->mt_exhausted[0]) *
 815                       LNET_MT_EXHAUSTED_BMAP);
 816                mtable->mt_mhash = mhash;
 817                for (j = 0; j < LNET_MT_HASH_SIZE + 1; j++)
 818                        INIT_LIST_HEAD(&mhash[j]);
 819
 820                mtable->mt_portal = index;
 821                mtable->mt_cpt = i;
 822        }
 823
 824        return 0;
 825 failed:
 826        lnet_ptl_cleanup(ptl);
 827        return -ENOMEM;
 828}
 829
 830void
 831lnet_portals_destroy(void)
 832{
 833        int i;
 834
 835        if (!the_lnet.ln_portals)
 836                return;
 837
 838        for (i = 0; i < the_lnet.ln_nportals; i++)
 839                lnet_ptl_cleanup(the_lnet.ln_portals[i]);
 840
 841        cfs_array_free(the_lnet.ln_portals);
 842        the_lnet.ln_portals = NULL;
 843}
 844
 845int
 846lnet_portals_create(void)
 847{
 848        int size;
 849        int i;
 850
 851        size = offsetof(struct lnet_portal, ptl_mt_maps[LNET_CPT_NUMBER]);
 852
 853        the_lnet.ln_nportals = MAX_PORTALS;
 854        the_lnet.ln_portals = cfs_array_alloc(the_lnet.ln_nportals, size);
 855        if (!the_lnet.ln_portals) {
 856                CERROR("Failed to allocate portals table\n");
 857                return -ENOMEM;
 858        }
 859
 860        for (i = 0; i < the_lnet.ln_nportals; i++) {
 861                if (lnet_ptl_setup(the_lnet.ln_portals[i], i)) {
 862                        lnet_portals_destroy();
 863                        return -ENOMEM;
 864                }
 865        }
 866
 867        return 0;
 868}
 869
 870/**
 871 * Turn on the lazy portal attribute. Use with caution!
 872 *
 873 * This portal attribute only affects incoming PUT requests to the portal,
 874 * and is off by default. By default, if there's no matching MD for an
 875 * incoming PUT request, it is simply dropped. With the lazy attribute on,
 876 * such requests are queued indefinitely until either a matching MD is
 877 * posted to the portal or the lazy attribute is turned off.
 878 *
 879 * It would prevent dropped requests, however it should be regarded as the
 880 * last line of defense - i.e. users must keep a close watch on active
 881 * buffers on a lazy portal and once it becomes too low post more buffers as
 882 * soon as possible. This is because delayed requests usually have detrimental
 883 * effects on underlying network connections. A few delayed requests often
 884 * suffice to bring an underlying connection to a complete halt, due to flow
 885 * control mechanisms.
 886 *
 887 * There's also a DOS attack risk. If users don't post match-all MDs on a
 888 * lazy portal, a malicious peer can easily stop a service by sending some
 889 * PUT requests with match bits that won't match any MD. A routed server is
 890 * especially vulnerable since the connections to its neighbor routers are
 891 * shared among all clients.
 892 *
 893 * \param portal Index of the portal to enable the lazy attribute on.
 894 *
 895 * \retval 0       On success.
 896 * \retval -EINVAL If \a portal is not a valid index.
 897 */
 898int
 899LNetSetLazyPortal(int portal)
 900{
 901        struct lnet_portal *ptl;
 902
 903        if (portal < 0 || portal >= the_lnet.ln_nportals)
 904                return -EINVAL;
 905
 906        CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
 907        ptl = the_lnet.ln_portals[portal];
 908
 909        lnet_res_lock(LNET_LOCK_EX);
 910        lnet_ptl_lock(ptl);
 911
 912        lnet_ptl_setopt(ptl, LNET_PTL_LAZY);
 913
 914        lnet_ptl_unlock(ptl);
 915        lnet_res_unlock(LNET_LOCK_EX);
 916
 917        return 0;
 918}
 919EXPORT_SYMBOL(LNetSetLazyPortal);
 920
 921int
 922lnet_clear_lazy_portal(struct lnet_ni *ni, int portal, char *reason)
 923{
 924        struct lnet_portal *ptl;
 925        LIST_HEAD(zombies);
 926
 927        if (portal < 0 || portal >= the_lnet.ln_nportals)
 928                return -EINVAL;
 929
 930        ptl = the_lnet.ln_portals[portal];
 931
 932        lnet_res_lock(LNET_LOCK_EX);
 933        lnet_ptl_lock(ptl);
 934
 935        if (!lnet_ptl_is_lazy(ptl)) {
 936                lnet_ptl_unlock(ptl);
 937                lnet_res_unlock(LNET_LOCK_EX);
 938                return 0;
 939        }
 940
 941        if (ni) {
 942                struct lnet_msg *msg, *tmp;
 943
 944                /* grab all messages which are on the NI passed in */
 945                list_for_each_entry_safe(msg, tmp, &ptl->ptl_msg_delayed,
 946                                         msg_list) {
 947                        if (msg->msg_rxpeer->lp_ni == ni)
 948                                list_move(&msg->msg_list, &zombies);
 949                }
 950        } else {
 951                if (the_lnet.ln_shutdown)
 952                        CWARN("Active lazy portal %d on exit\n", portal);
 953                else
 954                        CDEBUG(D_NET, "clearing portal %d lazy\n", portal);
 955
 956                /* grab all the blocked messages atomically */
 957                list_splice_init(&ptl->ptl_msg_delayed, &zombies);
 958
 959                lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY);
 960        }
 961
 962        lnet_ptl_unlock(ptl);
 963        lnet_res_unlock(LNET_LOCK_EX);
 964
 965        lnet_drop_delayed_msg_list(&zombies, reason);
 966
 967        return 0;
 968}
 969
 970/**
 971 * Turn off the lazy portal attribute. Delayed requests on the portal,
 972 * if any, will be all dropped when this function returns.
 973 *
 974 * \param portal Index of the portal to disable the lazy attribute on.
 975 *
 976 * \retval 0       On success.
 977 * \retval -EINVAL If \a portal is not a valid index.
 978 */
 979int
 980LNetClearLazyPortal(int portal)
 981{
 982        return lnet_clear_lazy_portal(NULL, portal,
 983                                      "Clearing lazy portal attr");
 984}
 985EXPORT_SYMBOL(LNetClearLazyPortal);
 986