linux/drivers/staging/lustre/lnet/lnet/lib-msg.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lnet/lnet/lib-msg.c
  37 *
  38 * Message decoding, parsing and finalizing routines
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_LNET
  42
  43#include "../../include/linux/lnet/lib-lnet.h"
  44
  45void
  46lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev)
  47{
  48        memset(ev, 0, sizeof(*ev));
  49
  50        ev->status   = 0;
  51        ev->unlinked = 1;
  52        ev->type     = LNET_EVENT_UNLINK;
  53        lnet_md_deconstruct(md, &ev->md);
  54        lnet_md2handle(&ev->md_handle, md);
  55}
  56
  57/*
  58 * Don't need any lock, must be called after lnet_commit_md
  59 */
  60void
  61lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
  62{
  63        lnet_hdr_t *hdr = &msg->msg_hdr;
  64        lnet_event_t *ev  = &msg->msg_ev;
  65
  66        LASSERT(!msg->msg_routing);
  67
  68        ev->type = ev_type;
  69
  70        if (ev_type == LNET_EVENT_SEND) {
  71                /* event for active message */
  72                ev->target.nid    = le64_to_cpu(hdr->dest_nid);
  73                ev->target.pid    = le32_to_cpu(hdr->dest_pid);
  74                ev->initiator.nid = LNET_NID_ANY;
  75                ev->initiator.pid = the_lnet.ln_pid;
  76                ev->sender        = LNET_NID_ANY;
  77        } else {
  78                /* event for passive message */
  79                ev->target.pid    = hdr->dest_pid;
  80                ev->target.nid    = hdr->dest_nid;
  81                ev->initiator.pid = hdr->src_pid;
  82                ev->initiator.nid = hdr->src_nid;
  83                ev->rlength       = hdr->payload_length;
  84                ev->sender        = msg->msg_from;
  85                ev->mlength       = msg->msg_wanted;
  86                ev->offset        = msg->msg_offset;
  87        }
  88
  89        switch (ev_type) {
  90        default:
  91                LBUG();
  92
  93        case LNET_EVENT_PUT: /* passive PUT */
  94                ev->pt_index   = hdr->msg.put.ptl_index;
  95                ev->match_bits = hdr->msg.put.match_bits;
  96                ev->hdr_data   = hdr->msg.put.hdr_data;
  97                return;
  98
  99        case LNET_EVENT_GET: /* passive GET */
 100                ev->pt_index   = hdr->msg.get.ptl_index;
 101                ev->match_bits = hdr->msg.get.match_bits;
 102                ev->hdr_data   = 0;
 103                return;
 104
 105        case LNET_EVENT_ACK: /* ACK */
 106                ev->match_bits = hdr->msg.ack.match_bits;
 107                ev->mlength    = hdr->msg.ack.mlength;
 108                return;
 109
 110        case LNET_EVENT_REPLY: /* REPLY */
 111                return;
 112
 113        case LNET_EVENT_SEND: /* active message */
 114                if (msg->msg_type == LNET_MSG_PUT) {
 115                        ev->pt_index   = le32_to_cpu(hdr->msg.put.ptl_index);
 116                        ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
 117                        ev->offset     = le32_to_cpu(hdr->msg.put.offset);
 118                        ev->mlength    =
 119                        ev->rlength    = le32_to_cpu(hdr->payload_length);
 120                        ev->hdr_data   = le64_to_cpu(hdr->msg.put.hdr_data);
 121
 122                } else {
 123                        LASSERT(msg->msg_type == LNET_MSG_GET);
 124                        ev->pt_index   = le32_to_cpu(hdr->msg.get.ptl_index);
 125                        ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
 126                        ev->mlength    =
 127                        ev->rlength    = le32_to_cpu(hdr->msg.get.sink_length);
 128                        ev->offset     = le32_to_cpu(hdr->msg.get.src_offset);
 129                        ev->hdr_data   = 0;
 130                }
 131                return;
 132        }
 133}
 134
 135void
 136lnet_msg_commit(lnet_msg_t *msg, int cpt)
 137{
 138        struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
 139        lnet_counters_t *counters  = the_lnet.ln_counters[cpt];
 140
 141        /* routed message can be committed for both receiving and sending */
 142        LASSERT(!msg->msg_tx_committed);
 143
 144        if (msg->msg_sending) {
 145                LASSERT(!msg->msg_receiving);
 146
 147                msg->msg_tx_cpt = cpt;
 148                msg->msg_tx_committed = 1;
 149                if (msg->msg_rx_committed) { /* routed message REPLY */
 150                        LASSERT(msg->msg_onactivelist);
 151                        return;
 152                }
 153        } else {
 154                LASSERT(!msg->msg_sending);
 155                msg->msg_rx_cpt = cpt;
 156                msg->msg_rx_committed = 1;
 157        }
 158
 159        LASSERT(!msg->msg_onactivelist);
 160        msg->msg_onactivelist = 1;
 161        list_add(&msg->msg_activelist, &container->msc_active);
 162
 163        counters->msgs_alloc++;
 164        if (counters->msgs_alloc > counters->msgs_max)
 165                counters->msgs_max = counters->msgs_alloc;
 166}
 167
 168static void
 169lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
 170{
 171        lnet_counters_t *counters;
 172        lnet_event_t *ev = &msg->msg_ev;
 173
 174        LASSERT(msg->msg_tx_committed);
 175        if (status)
 176                goto out;
 177
 178        counters = the_lnet.ln_counters[msg->msg_tx_cpt];
 179        switch (ev->type) {
 180        default: /* routed message */
 181                LASSERT(msg->msg_routing);
 182                LASSERT(msg->msg_rx_committed);
 183                LASSERT(!ev->type);
 184
 185                counters->route_length += msg->msg_len;
 186                counters->route_count++;
 187                goto out;
 188
 189        case LNET_EVENT_PUT:
 190                /* should have been decommitted */
 191                LASSERT(!msg->msg_rx_committed);
 192                /* overwritten while sending ACK */
 193                LASSERT(msg->msg_type == LNET_MSG_ACK);
 194                msg->msg_type = LNET_MSG_PUT; /* fix type */
 195                break;
 196
 197        case LNET_EVENT_SEND:
 198                LASSERT(!msg->msg_rx_committed);
 199                if (msg->msg_type == LNET_MSG_PUT)
 200                        counters->send_length += msg->msg_len;
 201                break;
 202
 203        case LNET_EVENT_GET:
 204                LASSERT(msg->msg_rx_committed);
 205                /*
 206                 * overwritten while sending reply, we should never be
 207                 * here for optimized GET
 208                 */
 209                LASSERT(msg->msg_type == LNET_MSG_REPLY);
 210                msg->msg_type = LNET_MSG_GET; /* fix type */
 211                break;
 212        }
 213
 214        counters->send_count++;
 215 out:
 216        lnet_return_tx_credits_locked(msg);
 217        msg->msg_tx_committed = 0;
 218}
 219
 220static void
 221lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
 222{
 223        lnet_counters_t *counters;
 224        lnet_event_t *ev = &msg->msg_ev;
 225
 226        LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */
 227        LASSERT(msg->msg_rx_committed);
 228
 229        if (status)
 230                goto out;
 231
 232        counters = the_lnet.ln_counters[msg->msg_rx_cpt];
 233        switch (ev->type) {
 234        default:
 235                LASSERT(!ev->type);
 236                LASSERT(msg->msg_routing);
 237                goto out;
 238
 239        case LNET_EVENT_ACK:
 240                LASSERT(msg->msg_type == LNET_MSG_ACK);
 241                break;
 242
 243        case LNET_EVENT_GET:
 244                /*
 245                 * type is "REPLY" if it's an optimized GET on passive side,
 246                 * because optimized GET will never be committed for sending,
 247                 * so message type wouldn't be changed back to "GET" by
 248                 * lnet_msg_decommit_tx(), see details in lnet_parse_get()
 249                 */
 250                LASSERT(msg->msg_type == LNET_MSG_REPLY ||
 251                        msg->msg_type == LNET_MSG_GET);
 252                counters->send_length += msg->msg_wanted;
 253                break;
 254
 255        case LNET_EVENT_PUT:
 256                LASSERT(msg->msg_type == LNET_MSG_PUT);
 257                break;
 258
 259        case LNET_EVENT_REPLY:
 260                /*
 261                 * type is "GET" if it's an optimized GET on active side,
 262                 * see details in lnet_create_reply_msg()
 263                 */
 264                LASSERT(msg->msg_type == LNET_MSG_GET ||
 265                        msg->msg_type == LNET_MSG_REPLY);
 266                break;
 267        }
 268
 269        counters->recv_count++;
 270        if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
 271                counters->recv_length += msg->msg_wanted;
 272
 273 out:
 274        lnet_return_rx_credits_locked(msg);
 275        msg->msg_rx_committed = 0;
 276}
 277
 278void
 279lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
 280{
 281        int cpt2 = cpt;
 282
 283        LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
 284        LASSERT(msg->msg_onactivelist);
 285
 286        if (msg->msg_tx_committed) { /* always decommit for sending first */
 287                LASSERT(cpt == msg->msg_tx_cpt);
 288                lnet_msg_decommit_tx(msg, status);
 289        }
 290
 291        if (msg->msg_rx_committed) {
 292                /* forwarding msg committed for both receiving and sending */
 293                if (cpt != msg->msg_rx_cpt) {
 294                        lnet_net_unlock(cpt);
 295                        cpt2 = msg->msg_rx_cpt;
 296                        lnet_net_lock(cpt2);
 297                }
 298                lnet_msg_decommit_rx(msg, status);
 299        }
 300
 301        list_del(&msg->msg_activelist);
 302        msg->msg_onactivelist = 0;
 303
 304        the_lnet.ln_counters[cpt2]->msgs_alloc--;
 305
 306        if (cpt2 != cpt) {
 307                lnet_net_unlock(cpt2);
 308                lnet_net_lock(cpt);
 309        }
 310}
 311
 312void
 313lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
 314                   unsigned int offset, unsigned int mlen)
 315{
 316        /* NB: @offset and @len are only useful for receiving */
 317        /*
 318         * Here, we attach the MD on lnet_msg and mark it busy and
 319         * decrementing its threshold. Come what may, the lnet_msg "owns"
 320         * the MD until a call to lnet_msg_detach_md or lnet_finalize()
 321         * signals completion.
 322         */
 323        LASSERT(!msg->msg_routing);
 324
 325        msg->msg_md = md;
 326        if (msg->msg_receiving) { /* committed for receiving */
 327                msg->msg_offset = offset;
 328                msg->msg_wanted = mlen;
 329        }
 330
 331        md->md_refcount++;
 332        if (md->md_threshold != LNET_MD_THRESH_INF) {
 333                LASSERT(md->md_threshold > 0);
 334                md->md_threshold--;
 335        }
 336
 337        /* build umd in event */
 338        lnet_md2handle(&msg->msg_ev.md_handle, md);
 339        lnet_md_deconstruct(md, &msg->msg_ev.md);
 340}
 341
 342void
 343lnet_msg_detach_md(lnet_msg_t *msg, int status)
 344{
 345        lnet_libmd_t *md = msg->msg_md;
 346        int unlink;
 347
 348        /* Now it's safe to drop my caller's ref */
 349        md->md_refcount--;
 350        LASSERT(md->md_refcount >= 0);
 351
 352        unlink = lnet_md_unlinkable(md);
 353        if (md->md_eq) {
 354                msg->msg_ev.status   = status;
 355                msg->msg_ev.unlinked = unlink;
 356                lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
 357        }
 358
 359        if (unlink)
 360                lnet_md_unlink(md);
 361
 362        msg->msg_md = NULL;
 363}
 364
 365static int
 366lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
 367{
 368        lnet_handle_wire_t ack_wmd;
 369        int rc;
 370        int status = msg->msg_ev.status;
 371
 372        LASSERT(msg->msg_onactivelist);
 373
 374        if (!status && msg->msg_ack) {
 375                /* Only send an ACK if the PUT completed successfully */
 376
 377                lnet_msg_decommit(msg, cpt, 0);
 378
 379                msg->msg_ack = 0;
 380                lnet_net_unlock(cpt);
 381
 382                LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
 383                LASSERT(!msg->msg_routing);
 384
 385                ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
 386
 387                lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
 388
 389                msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
 390                msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
 391                msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
 392
 393                /*
 394                 * NB: we probably want to use NID of msg::msg_from as 3rd
 395                 * parameter (router NID) if it's routed message
 396                 */
 397                rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
 398
 399                lnet_net_lock(cpt);
 400                /*
 401                 * NB: message is committed for sending, we should return
 402                 * on success because LND will finalize this message later.
 403                 *
 404                 * Also, there is possibility that message is committed for
 405                 * sending and also failed before delivering to LND,
 406                 * i.e: ENOMEM, in that case we can't fall through either
 407                 * because CPT for sending can be different with CPT for
 408                 * receiving, so we should return back to lnet_finalize()
 409                 * to make sure we are locking the correct partition.
 410                 */
 411                return rc;
 412
 413        } else if (!status &&   /* OK so far */
 414                   (msg->msg_routing && !msg->msg_sending)) {
 415                /* not forwarded */
 416                LASSERT(!msg->msg_receiving);   /* called back recv already */
 417                lnet_net_unlock(cpt);
 418
 419                rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
 420
 421                lnet_net_lock(cpt);
 422                /*
 423                 * NB: message is committed for sending, we should return
 424                 * on success because LND will finalize this message later.
 425                 *
 426                 * Also, there is possibility that message is committed for
 427                 * sending and also failed before delivering to LND,
 428                 * i.e: ENOMEM, in that case we can't fall through either:
 429                 * - The rule is message must decommit for sending first if
 430                 *   the it's committed for both sending and receiving
 431                 * - CPT for sending can be different with CPT for receiving,
 432                 *   so we should return back to lnet_finalize() to make
 433                 *   sure we are locking the correct partition.
 434                 */
 435                return rc;
 436        }
 437
 438        lnet_msg_decommit(msg, cpt, status);
 439        lnet_msg_free(msg);
 440        return 0;
 441}
 442
 443void
 444lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status)
 445{
 446        struct lnet_msg_container *container;
 447        int my_slot;
 448        int cpt;
 449        int rc;
 450        int i;
 451
 452        LASSERT(!in_interrupt());
 453
 454        if (!msg)
 455                return;
 456#if 0
 457        CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
 458               lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
 459               msg->msg_target_is_router ? "t" : "",
 460               msg->msg_routing ? "X" : "",
 461               msg->msg_ack ? "A" : "",
 462               msg->msg_sending ? "S" : "",
 463               msg->msg_receiving ? "R" : "",
 464               msg->msg_delayed ? "d" : "",
 465               msg->msg_txcredit ? "C" : "",
 466               msg->msg_peertxcredit ? "c" : "",
 467               msg->msg_rtrcredit ? "F" : "",
 468               msg->msg_peerrtrcredit ? "f" : "",
 469               msg->msg_onactivelist ? "!" : "",
 470               !msg->msg_txpeer ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
 471               !msg->msg_rxpeer ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
 472#endif
 473        msg->msg_ev.status = status;
 474
 475        if (msg->msg_md) {
 476                cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
 477
 478                lnet_res_lock(cpt);
 479                lnet_msg_detach_md(msg, status);
 480                lnet_res_unlock(cpt);
 481        }
 482
 483 again:
 484        rc = 0;
 485        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
 486                /* not committed to network yet */
 487                LASSERT(!msg->msg_onactivelist);
 488                lnet_msg_free(msg);
 489                return;
 490        }
 491
 492        /*
 493         * NB: routed message can be committed for both receiving and sending,
 494         * we should finalize in LIFO order and keep counters correct.
 495         * (finalize sending first then finalize receiving)
 496         */
 497        cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt;
 498        lnet_net_lock(cpt);
 499
 500        container = the_lnet.ln_msg_containers[cpt];
 501        list_add_tail(&msg->msg_list, &container->msc_finalizing);
 502
 503        /*
 504         * Recursion breaker.  Don't complete the message here if I am (or
 505         * enough other threads are) already completing messages
 506         */
 507        my_slot = -1;
 508        for (i = 0; i < container->msc_nfinalizers; i++) {
 509                if (container->msc_finalizers[i] == current)
 510                        break;
 511
 512                if (my_slot < 0 && !container->msc_finalizers[i])
 513                        my_slot = i;
 514        }
 515
 516        if (i < container->msc_nfinalizers || my_slot < 0) {
 517                lnet_net_unlock(cpt);
 518                return;
 519        }
 520
 521        container->msc_finalizers[my_slot] = current;
 522
 523        while (!list_empty(&container->msc_finalizing)) {
 524                msg = list_entry(container->msc_finalizing.next,
 525                                 lnet_msg_t, msg_list);
 526
 527                list_del(&msg->msg_list);
 528
 529                /*
 530                 * NB drops and regains the lnet lock if it actually does
 531                 * anything, so my finalizing friends can chomp along too
 532                 */
 533                rc = lnet_complete_msg_locked(msg, cpt);
 534                if (rc)
 535                        break;
 536        }
 537
 538        if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) {
 539                lnet_net_unlock(cpt);
 540                lnet_delay_rule_check();
 541                lnet_net_lock(cpt);
 542        }
 543
 544        container->msc_finalizers[my_slot] = NULL;
 545        lnet_net_unlock(cpt);
 546
 547        if (rc)
 548                goto again;
 549}
 550EXPORT_SYMBOL(lnet_finalize);
 551
 552void
 553lnet_msg_container_cleanup(struct lnet_msg_container *container)
 554{
 555        int count = 0;
 556
 557        if (!container->msc_init)
 558                return;
 559
 560        while (!list_empty(&container->msc_active)) {
 561                lnet_msg_t *msg = list_entry(container->msc_active.next,
 562                                             lnet_msg_t, msg_activelist);
 563
 564                LASSERT(msg->msg_onactivelist);
 565                msg->msg_onactivelist = 0;
 566                list_del(&msg->msg_activelist);
 567                lnet_msg_free(msg);
 568                count++;
 569        }
 570
 571        if (count > 0)
 572                CERROR("%d active msg on exit\n", count);
 573
 574        if (container->msc_finalizers) {
 575                LIBCFS_FREE(container->msc_finalizers,
 576                            container->msc_nfinalizers *
 577                            sizeof(*container->msc_finalizers));
 578                container->msc_finalizers = NULL;
 579        }
 580        container->msc_init = 0;
 581}
 582
 583int
 584lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
 585{
 586        container->msc_init = 1;
 587
 588        INIT_LIST_HEAD(&container->msc_active);
 589        INIT_LIST_HEAD(&container->msc_finalizing);
 590
 591        /* number of CPUs */
 592        container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
 593
 594        LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt,
 595                         container->msc_nfinalizers *
 596                         sizeof(*container->msc_finalizers));
 597
 598        if (!container->msc_finalizers) {
 599                CERROR("Failed to allocate message finalizers\n");
 600                lnet_msg_container_cleanup(container);
 601                return -ENOMEM;
 602        }
 603
 604        return 0;
 605}
 606
 607void
 608lnet_msg_containers_destroy(void)
 609{
 610        struct lnet_msg_container *container;
 611        int i;
 612
 613        if (!the_lnet.ln_msg_containers)
 614                return;
 615
 616        cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers)
 617                lnet_msg_container_cleanup(container);
 618
 619        cfs_percpt_free(the_lnet.ln_msg_containers);
 620        the_lnet.ln_msg_containers = NULL;
 621}
 622
 623int
 624lnet_msg_containers_create(void)
 625{
 626        struct lnet_msg_container *container;
 627        int rc;
 628        int i;
 629
 630        the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(),
 631                                                      sizeof(*container));
 632
 633        if (!the_lnet.ln_msg_containers) {
 634                CERROR("Failed to allocate cpu-partition data for network\n");
 635                return -ENOMEM;
 636        }
 637
 638        cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) {
 639                rc = lnet_msg_container_setup(container, i);
 640                if (rc) {
 641                        lnet_msg_containers_destroy();
 642                        return rc;
 643                }
 644        }
 645
 646        return 0;
 647}
 648