linux/drivers/staging/lustre/lnet/klnds/o2iblnd/o2iblnd_cb.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lnet/klnds/o2iblnd/o2iblnd_cb.c
  37 *
  38 * Author: Eric Barton <eric@bartonsoftware.com>
  39 */
  40
  41#include "o2iblnd.h"
  42
  43void
  44kiblnd_tx_done (lnet_ni_t *ni, kib_tx_t *tx)
  45{
  46        lnet_msg_t *lntmsg[2];
  47        kib_net_t  *net = ni->ni_data;
  48        int      rc;
  49        int      i;
  50
  51        LASSERT (net != NULL);
  52        LASSERT (!in_interrupt());
  53        LASSERT (!tx->tx_queued);              /* mustn't be queued for sending */
  54        LASSERT (tx->tx_sending == 0);    /* mustn't be awaiting sent callback */
  55        LASSERT (!tx->tx_waiting);            /* mustn't be awaiting peer response */
  56        LASSERT (tx->tx_pool != NULL);
  57
  58        kiblnd_unmap_tx(ni, tx);
  59
  60        /* tx may have up to 2 lnet msgs to finalise */
  61        lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
  62        lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
  63        rc = tx->tx_status;
  64
  65        if (tx->tx_conn != NULL) {
  66                LASSERT (ni == tx->tx_conn->ibc_peer->ibp_ni);
  67
  68                kiblnd_conn_decref(tx->tx_conn);
  69                tx->tx_conn = NULL;
  70        }
  71
  72        tx->tx_nwrq = 0;
  73        tx->tx_status = 0;
  74
  75        kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
  76
  77        /* delay finalize until my descs have been freed */
  78        for (i = 0; i < 2; i++) {
  79                if (lntmsg[i] == NULL)
  80                        continue;
  81
  82                lnet_finalize(ni, lntmsg[i], rc);
  83        }
  84}
  85
  86void
  87kiblnd_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int status)
  88{
  89        kib_tx_t *tx;
  90
  91        while (!list_empty (txlist)) {
  92                tx = list_entry (txlist->next, kib_tx_t, tx_list);
  93
  94                list_del(&tx->tx_list);
  95                /* complete now */
  96                tx->tx_waiting = 0;
  97                tx->tx_status = status;
  98                kiblnd_tx_done(ni, tx);
  99        }
 100}
 101
 102kib_tx_t *
 103kiblnd_get_idle_tx(lnet_ni_t *ni, lnet_nid_t target)
 104{
 105        kib_net_t               *net = (kib_net_t *)ni->ni_data;
 106        struct list_head                *node;
 107        kib_tx_t                *tx;
 108        kib_tx_poolset_t        *tps;
 109
 110        tps = net->ibn_tx_ps[lnet_cpt_of_nid(target)];
 111        node = kiblnd_pool_alloc_node(&tps->tps_poolset);
 112        if (node == NULL)
 113                return NULL;
 114        tx = container_of(node, kib_tx_t, tx_list);
 115
 116        LASSERT (tx->tx_nwrq == 0);
 117        LASSERT (!tx->tx_queued);
 118        LASSERT (tx->tx_sending == 0);
 119        LASSERT (!tx->tx_waiting);
 120        LASSERT (tx->tx_status == 0);
 121        LASSERT (tx->tx_conn == NULL);
 122        LASSERT (tx->tx_lntmsg[0] == NULL);
 123        LASSERT (tx->tx_lntmsg[1] == NULL);
 124        LASSERT (tx->tx_u.pmr == NULL);
 125        LASSERT (tx->tx_nfrags == 0);
 126
 127        return tx;
 128}
 129
 130void
 131kiblnd_drop_rx(kib_rx_t *rx)
 132{
 133        kib_conn_t              *conn   = rx->rx_conn;
 134        struct kib_sched_info   *sched  = conn->ibc_sched;
 135        unsigned long           flags;
 136
 137        spin_lock_irqsave(&sched->ibs_lock, flags);
 138        LASSERT(conn->ibc_nrx > 0);
 139        conn->ibc_nrx--;
 140        spin_unlock_irqrestore(&sched->ibs_lock, flags);
 141
 142        kiblnd_conn_decref(conn);
 143}
 144
 145int
 146kiblnd_post_rx (kib_rx_t *rx, int credit)
 147{
 148        kib_conn_t       *conn = rx->rx_conn;
 149        kib_net_t         *net = conn->ibc_peer->ibp_ni->ni_data;
 150        struct ib_recv_wr  *bad_wrq = NULL;
 151        struct ib_mr       *mr;
 152        int              rc;
 153
 154        LASSERT (net != NULL);
 155        LASSERT (!in_interrupt());
 156        LASSERT (credit == IBLND_POSTRX_NO_CREDIT ||
 157                 credit == IBLND_POSTRX_PEER_CREDIT ||
 158                 credit == IBLND_POSTRX_RSRVD_CREDIT);
 159
 160        mr = kiblnd_find_dma_mr(conn->ibc_hdev, rx->rx_msgaddr, IBLND_MSG_SIZE);
 161        LASSERT (mr != NULL);
 162
 163        rx->rx_sge.lkey   = mr->lkey;
 164        rx->rx_sge.addr   = rx->rx_msgaddr;
 165        rx->rx_sge.length = IBLND_MSG_SIZE;
 166
 167        rx->rx_wrq.next = NULL;
 168        rx->rx_wrq.sg_list = &rx->rx_sge;
 169        rx->rx_wrq.num_sge = 1;
 170        rx->rx_wrq.wr_id = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
 171
 172        LASSERT (conn->ibc_state >= IBLND_CONN_INIT);
 173        LASSERT (rx->rx_nob >= 0);            /* not posted */
 174
 175        if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
 176                kiblnd_drop_rx(rx);          /* No more posts for this rx */
 177                return 0;
 178        }
 179
 180        rx->rx_nob = -1;                        /* flag posted */
 181
 182        rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
 183        if (rc != 0) {
 184                CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
 185                       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
 186                rx->rx_nob = 0;
 187        }
 188
 189        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
 190                return rc;
 191
 192        if (rc != 0) {
 193                kiblnd_close_conn(conn, rc);
 194                kiblnd_drop_rx(rx);          /* No more posts for this rx */
 195                return rc;
 196        }
 197
 198        if (credit == IBLND_POSTRX_NO_CREDIT)
 199                return 0;
 200
 201        spin_lock(&conn->ibc_lock);
 202        if (credit == IBLND_POSTRX_PEER_CREDIT)
 203                conn->ibc_outstanding_credits++;
 204        else
 205                conn->ibc_reserved_credits++;
 206        spin_unlock(&conn->ibc_lock);
 207
 208        kiblnd_check_sends(conn);
 209        return 0;
 210}
 211
 212kib_tx_t *
 213kiblnd_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
 214{
 215        struct list_head   *tmp;
 216
 217        list_for_each(tmp, &conn->ibc_active_txs) {
 218                kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
 219
 220                LASSERT (!tx->tx_queued);
 221                LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
 222
 223                if (tx->tx_cookie != cookie)
 224                        continue;
 225
 226                if (tx->tx_waiting &&
 227                    tx->tx_msg->ibm_type == txtype)
 228                        return tx;
 229
 230                CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
 231                      tx->tx_waiting ? "" : "NOT ",
 232                      tx->tx_msg->ibm_type, txtype);
 233        }
 234        return NULL;
 235}
 236
 237void
 238kiblnd_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
 239{
 240        kib_tx_t    *tx;
 241        lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
 242        int       idle;
 243
 244        spin_lock(&conn->ibc_lock);
 245
 246        tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
 247        if (tx == NULL) {
 248                spin_unlock(&conn->ibc_lock);
 249
 250                CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",
 251                      txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 252                kiblnd_close_conn(conn, -EPROTO);
 253                return;
 254        }
 255
 256        if (tx->tx_status == 0) {              /* success so far */
 257                if (status < 0) {              /* failed? */
 258                        tx->tx_status = status;
 259                } else if (txtype == IBLND_MSG_GET_REQ) {
 260                        lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
 261                }
 262        }
 263
 264        tx->tx_waiting = 0;
 265
 266        idle = !tx->tx_queued && (tx->tx_sending == 0);
 267        if (idle)
 268                list_del(&tx->tx_list);
 269
 270        spin_unlock(&conn->ibc_lock);
 271
 272        if (idle)
 273                kiblnd_tx_done(ni, tx);
 274}
 275
 276void
 277kiblnd_send_completion(kib_conn_t *conn, int type, int status, __u64 cookie)
 278{
 279        lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
 280        kib_tx_t    *tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
 281
 282        if (tx == NULL) {
 283                CERROR("Can't get tx for completion %x for %s\n",
 284                       type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 285                return;
 286        }
 287
 288        tx->tx_msg->ibm_u.completion.ibcm_status = status;
 289        tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
 290        kiblnd_init_tx_msg(ni, tx, type, sizeof(kib_completion_msg_t));
 291
 292        kiblnd_queue_tx(tx, conn);
 293}
 294
 295void
 296kiblnd_handle_rx (kib_rx_t *rx)
 297{
 298        kib_msg_t    *msg = rx->rx_msg;
 299        kib_conn_t   *conn = rx->rx_conn;
 300        lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
 301        int        credits = msg->ibm_credits;
 302        kib_tx_t     *tx;
 303        int        rc = 0;
 304        int        rc2;
 305        int        post_credit;
 306
 307        LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
 308
 309        CDEBUG (D_NET, "Received %x[%d] from %s\n",
 310                msg->ibm_type, credits,
 311                libcfs_nid2str(conn->ibc_peer->ibp_nid));
 312
 313        if (credits != 0) {
 314                /* Have I received credits that will let me send? */
 315                spin_lock(&conn->ibc_lock);
 316
 317                if (conn->ibc_credits + credits >
 318                    IBLND_MSG_QUEUE_SIZE(conn->ibc_version)) {
 319                        rc2 = conn->ibc_credits;
 320                        spin_unlock(&conn->ibc_lock);
 321
 322                        CERROR("Bad credits from %s: %d + %d > %d\n",
 323                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
 324                               rc2, credits,
 325                               IBLND_MSG_QUEUE_SIZE(conn->ibc_version));
 326
 327                        kiblnd_close_conn(conn, -EPROTO);
 328                        kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
 329                        return;
 330                }
 331
 332                conn->ibc_credits += credits;
 333
 334                /* This ensures the credit taken by NOOP can be returned */
 335                if (msg->ibm_type == IBLND_MSG_NOOP &&
 336                    !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
 337                        conn->ibc_outstanding_credits++;
 338
 339                spin_unlock(&conn->ibc_lock);
 340                kiblnd_check_sends(conn);
 341        }
 342
 343        switch (msg->ibm_type) {
 344        default:
 345                CERROR("Bad IBLND message type %x from %s\n",
 346                       msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 347                post_credit = IBLND_POSTRX_NO_CREDIT;
 348                rc = -EPROTO;
 349                break;
 350
 351        case IBLND_MSG_NOOP:
 352                if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
 353                        post_credit = IBLND_POSTRX_NO_CREDIT;
 354                        break;
 355                }
 356
 357                if (credits != 0) /* credit already posted */
 358                        post_credit = IBLND_POSTRX_NO_CREDIT;
 359                else          /* a keepalive NOOP */
 360                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 361                break;
 362
 363        case IBLND_MSG_IMMEDIATE:
 364                post_credit = IBLND_POSTRX_DONT_POST;
 365                rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
 366                                msg->ibm_srcnid, rx, 0);
 367                if (rc < 0)                  /* repost on error */
 368                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 369                break;
 370
 371        case IBLND_MSG_PUT_REQ:
 372                post_credit = IBLND_POSTRX_DONT_POST;
 373                rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
 374                                msg->ibm_srcnid, rx, 1);
 375                if (rc < 0)                  /* repost on error */
 376                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 377                break;
 378
 379        case IBLND_MSG_PUT_NAK:
 380                CWARN ("PUT_NACK from %s\n",
 381                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
 382                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 383                kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
 384                                         msg->ibm_u.completion.ibcm_status,
 385                                         msg->ibm_u.completion.ibcm_cookie);
 386                break;
 387
 388        case IBLND_MSG_PUT_ACK:
 389                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 390
 391                spin_lock(&conn->ibc_lock);
 392                tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
 393                                        msg->ibm_u.putack.ibpam_src_cookie);
 394                if (tx != NULL)
 395                        list_del(&tx->tx_list);
 396                spin_unlock(&conn->ibc_lock);
 397
 398                if (tx == NULL) {
 399                        CERROR("Unmatched PUT_ACK from %s\n",
 400                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
 401                        rc = -EPROTO;
 402                        break;
 403                }
 404
 405                LASSERT (tx->tx_waiting);
 406                /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
 407                 * (a) I can overwrite tx_msg since my peer has received it!
 408                 * (b) tx_waiting set tells tx_complete() it's not done. */
 409
 410                tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
 411
 412                rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
 413                                       kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
 414                                       &msg->ibm_u.putack.ibpam_rd,
 415                                       msg->ibm_u.putack.ibpam_dst_cookie);
 416                if (rc2 < 0)
 417                        CERROR("Can't setup rdma for PUT to %s: %d\n",
 418                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
 419
 420                spin_lock(&conn->ibc_lock);
 421                tx->tx_waiting = 0;     /* clear waiting and queue atomically */
 422                kiblnd_queue_tx_locked(tx, conn);
 423                spin_unlock(&conn->ibc_lock);
 424                break;
 425
 426        case IBLND_MSG_PUT_DONE:
 427                post_credit = IBLND_POSTRX_PEER_CREDIT;
 428                kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
 429                                         msg->ibm_u.completion.ibcm_status,
 430                                         msg->ibm_u.completion.ibcm_cookie);
 431                break;
 432
 433        case IBLND_MSG_GET_REQ:
 434                post_credit = IBLND_POSTRX_DONT_POST;
 435                rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
 436                                msg->ibm_srcnid, rx, 1);
 437                if (rc < 0)                  /* repost on error */
 438                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 439                break;
 440
 441        case IBLND_MSG_GET_DONE:
 442                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 443                kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
 444                                         msg->ibm_u.completion.ibcm_status,
 445                                         msg->ibm_u.completion.ibcm_cookie);
 446                break;
 447        }
 448
 449        if (rc < 0)                          /* protocol error */
 450                kiblnd_close_conn(conn, rc);
 451
 452        if (post_credit != IBLND_POSTRX_DONT_POST)
 453                kiblnd_post_rx(rx, post_credit);
 454}
 455
 456void
 457kiblnd_rx_complete (kib_rx_t *rx, int status, int nob)
 458{
 459        kib_msg_t    *msg = rx->rx_msg;
 460        kib_conn_t   *conn = rx->rx_conn;
 461        lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
 462        kib_net_t    *net = ni->ni_data;
 463        int        rc;
 464        int        err = -EIO;
 465
 466        LASSERT (net != NULL);
 467        LASSERT (rx->rx_nob < 0);              /* was posted */
 468        rx->rx_nob = 0;                  /* isn't now */
 469
 470        if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
 471                goto ignore;
 472
 473        if (status != IB_WC_SUCCESS) {
 474                CNETERR("Rx from %s failed: %d\n",
 475                        libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
 476                goto failed;
 477        }
 478
 479        LASSERT (nob >= 0);
 480        rx->rx_nob = nob;
 481
 482        rc = kiblnd_unpack_msg(msg, rx->rx_nob);
 483        if (rc != 0) {
 484                CERROR ("Error %d unpacking rx from %s\n",
 485                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 486                goto failed;
 487        }
 488
 489        if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
 490            msg->ibm_dstnid != ni->ni_nid ||
 491            msg->ibm_srcstamp != conn->ibc_incarnation ||
 492            msg->ibm_dststamp != net->ibn_incarnation) {
 493                CERROR ("Stale rx from %s\n",
 494                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
 495                err = -ESTALE;
 496                goto failed;
 497        }
 498
 499        /* set time last known alive */
 500        kiblnd_peer_alive(conn->ibc_peer);
 501
 502        /* racing with connection establishment/teardown! */
 503
 504        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 505                rwlock_t  *g_lock = &kiblnd_data.kib_global_lock;
 506                unsigned long  flags;
 507
 508                write_lock_irqsave(g_lock, flags);
 509                /* must check holding global lock to eliminate race */
 510                if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 511                        list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
 512                        write_unlock_irqrestore(g_lock, flags);
 513                        return;
 514                }
 515                write_unlock_irqrestore(g_lock, flags);
 516        }
 517        kiblnd_handle_rx(rx);
 518        return;
 519
 520 failed:
 521        CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
 522        kiblnd_close_conn(conn, err);
 523 ignore:
 524        kiblnd_drop_rx(rx);                  /* Don't re-post rx. */
 525}
 526
 527struct page *
 528kiblnd_kvaddr_to_page (unsigned long vaddr)
 529{
 530        struct page *page;
 531
 532        if (vaddr >= VMALLOC_START &&
 533            vaddr < VMALLOC_END) {
 534                page = vmalloc_to_page ((void *)vaddr);
 535                LASSERT (page != NULL);
 536                return page;
 537        }
 538#ifdef CONFIG_HIGHMEM
 539        if (vaddr >= PKMAP_BASE &&
 540            vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
 541                /* No highmem pages only used for bulk (kiov) I/O */
 542                CERROR("find page for address in highmem\n");
 543                LBUG();
 544        }
 545#endif
 546        page = virt_to_page (vaddr);
 547        LASSERT (page != NULL);
 548        return page;
 549}
 550
 551static int
 552kiblnd_fmr_map_tx(kib_net_t *net, kib_tx_t *tx, kib_rdma_desc_t *rd, int nob)
 553{
 554        kib_hca_dev_t           *hdev;
 555        __u64                   *pages = tx->tx_pages;
 556        kib_fmr_poolset_t       *fps;
 557        int                     npages;
 558        int                     size;
 559        int                     cpt;
 560        int                     rc;
 561        int                     i;
 562
 563        LASSERT(tx->tx_pool != NULL);
 564        LASSERT(tx->tx_pool->tpo_pool.po_owner != NULL);
 565
 566        hdev  = tx->tx_pool->tpo_hdev;
 567
 568        for (i = 0, npages = 0; i < rd->rd_nfrags; i++) {
 569                for (size = 0; size <  rd->rd_frags[i].rf_nob;
 570                               size += hdev->ibh_page_size) {
 571                        pages[npages ++] = (rd->rd_frags[i].rf_addr &
 572                                            hdev->ibh_page_mask) + size;
 573                }
 574        }
 575
 576        cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
 577
 578        fps = net->ibn_fmr_ps[cpt];
 579        rc = kiblnd_fmr_pool_map(fps, pages, npages, 0, &tx->tx_u.fmr);
 580        if (rc != 0) {
 581                CERROR ("Can't map %d pages: %d\n", npages, rc);
 582                return rc;
 583        }
 584
 585        /* If rd is not tx_rd, it's going to get sent to a peer, who will need
 586         * the rkey */
 587        rd->rd_key = (rd != tx->tx_rd) ? tx->tx_u.fmr.fmr_pfmr->fmr->rkey :
 588                                         tx->tx_u.fmr.fmr_pfmr->fmr->lkey;
 589        rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
 590        rd->rd_frags[0].rf_nob   = nob;
 591        rd->rd_nfrags = 1;
 592
 593        return 0;
 594}
 595
 596static int
 597kiblnd_pmr_map_tx(kib_net_t *net, kib_tx_t *tx, kib_rdma_desc_t *rd, int nob)
 598{
 599        kib_hca_dev_t           *hdev;
 600        kib_pmr_poolset_t       *pps;
 601        __u64                   iova;
 602        int                     cpt;
 603        int                     rc;
 604
 605        LASSERT(tx->tx_pool != NULL);
 606        LASSERT(tx->tx_pool->tpo_pool.po_owner != NULL);
 607
 608        hdev = tx->tx_pool->tpo_hdev;
 609
 610        iova = rd->rd_frags[0].rf_addr & ~hdev->ibh_page_mask;
 611
 612        cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
 613
 614        pps = net->ibn_pmr_ps[cpt];
 615        rc = kiblnd_pmr_pool_map(pps, hdev, rd, &iova, &tx->tx_u.pmr);
 616        if (rc != 0) {
 617                CERROR("Failed to create MR by phybuf: %d\n", rc);
 618                return rc;
 619        }
 620
 621        /* If rd is not tx_rd, it's going to get sent to a peer, who will need
 622         * the rkey */
 623        rd->rd_key = (rd != tx->tx_rd) ? tx->tx_u.pmr->pmr_mr->rkey :
 624                                         tx->tx_u.pmr->pmr_mr->lkey;
 625        rd->rd_nfrags = 1;
 626        rd->rd_frags[0].rf_addr = iova;
 627        rd->rd_frags[0].rf_nob  = nob;
 628
 629        return 0;
 630}
 631
 632void
 633kiblnd_unmap_tx(lnet_ni_t *ni, kib_tx_t *tx)
 634{
 635        kib_net_t  *net = ni->ni_data;
 636
 637        LASSERT(net != NULL);
 638
 639        if (net->ibn_fmr_ps != NULL && tx->tx_u.fmr.fmr_pfmr != NULL) {
 640                kiblnd_fmr_pool_unmap(&tx->tx_u.fmr, tx->tx_status);
 641                tx->tx_u.fmr.fmr_pfmr = NULL;
 642
 643        } else if (net->ibn_pmr_ps != NULL && tx->tx_u.pmr != NULL) {
 644                kiblnd_pmr_pool_unmap(tx->tx_u.pmr);
 645                tx->tx_u.pmr = NULL;
 646        }
 647
 648        if (tx->tx_nfrags != 0) {
 649                kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
 650                                    tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
 651                tx->tx_nfrags = 0;
 652        }
 653}
 654
 655int
 656kiblnd_map_tx(lnet_ni_t *ni, kib_tx_t *tx,
 657              kib_rdma_desc_t *rd, int nfrags)
 658{
 659        kib_hca_dev_t      *hdev  = tx->tx_pool->tpo_hdev;
 660        kib_net_t         *net   = ni->ni_data;
 661        struct ib_mr       *mr    = NULL;
 662        __u32          nob;
 663        int              i;
 664
 665        /* If rd is not tx_rd, it's going to get sent to a peer and I'm the
 666         * RDMA sink */
 667        tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
 668        tx->tx_nfrags = nfrags;
 669
 670        rd->rd_nfrags =
 671                kiblnd_dma_map_sg(hdev->ibh_ibdev,
 672                                  tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
 673
 674        for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
 675                rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
 676                        hdev->ibh_ibdev, &tx->tx_frags[i]);
 677                rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
 678                        hdev->ibh_ibdev, &tx->tx_frags[i]);
 679                nob += rd->rd_frags[i].rf_nob;
 680        }
 681
 682        /* looking for pre-mapping MR */
 683        mr = kiblnd_find_rd_dma_mr(hdev, rd);
 684        if (mr != NULL) {
 685                /* found pre-mapping MR */
 686                rd->rd_key = (rd != tx->tx_rd) ? mr->rkey : mr->lkey;
 687                return 0;
 688        }
 689
 690        if (net->ibn_fmr_ps != NULL)
 691                return kiblnd_fmr_map_tx(net, tx, rd, nob);
 692        else if (net->ibn_pmr_ps != NULL)
 693                return kiblnd_pmr_map_tx(net, tx, rd, nob);
 694
 695        return -EINVAL;
 696}
 697
 698
 699int
 700kiblnd_setup_rd_iov(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
 701                    unsigned int niov, struct iovec *iov, int offset, int nob)
 702{
 703        kib_net_t         *net = ni->ni_data;
 704        struct page     *page;
 705        struct scatterlist *sg;
 706        unsigned long       vaddr;
 707        int              fragnob;
 708        int              page_offset;
 709
 710        LASSERT (nob > 0);
 711        LASSERT (niov > 0);
 712        LASSERT (net != NULL);
 713
 714        while (offset >= iov->iov_len) {
 715                offset -= iov->iov_len;
 716                niov--;
 717                iov++;
 718                LASSERT (niov > 0);
 719        }
 720
 721        sg = tx->tx_frags;
 722        do {
 723                LASSERT (niov > 0);
 724
 725                vaddr = ((unsigned long)iov->iov_base) + offset;
 726                page_offset = vaddr & (PAGE_SIZE - 1);
 727                page = kiblnd_kvaddr_to_page(vaddr);
 728                if (page == NULL) {
 729                        CERROR ("Can't find page\n");
 730                        return -EFAULT;
 731                }
 732
 733                fragnob = min((int)(iov->iov_len - offset), nob);
 734                fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
 735
 736                sg_set_page(sg, page, fragnob, page_offset);
 737                sg++;
 738
 739                if (offset + fragnob < iov->iov_len) {
 740                        offset += fragnob;
 741                } else {
 742                        offset = 0;
 743                        iov++;
 744                        niov--;
 745                }
 746                nob -= fragnob;
 747        } while (nob > 0);
 748
 749        return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
 750}
 751
 752int
 753kiblnd_setup_rd_kiov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
 754                      int nkiov, lnet_kiov_t *kiov, int offset, int nob)
 755{
 756        kib_net_t         *net = ni->ni_data;
 757        struct scatterlist *sg;
 758        int              fragnob;
 759
 760        CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
 761
 762        LASSERT (nob > 0);
 763        LASSERT (nkiov > 0);
 764        LASSERT (net != NULL);
 765
 766        while (offset >= kiov->kiov_len) {
 767                offset -= kiov->kiov_len;
 768                nkiov--;
 769                kiov++;
 770                LASSERT (nkiov > 0);
 771        }
 772
 773        sg = tx->tx_frags;
 774        do {
 775                LASSERT (nkiov > 0);
 776
 777                fragnob = min((int)(kiov->kiov_len - offset), nob);
 778
 779                sg_set_page(sg, kiov->kiov_page, fragnob,
 780                            kiov->kiov_offset + offset);
 781                sg++;
 782
 783                offset = 0;
 784                kiov++;
 785                nkiov--;
 786                nob -= fragnob;
 787        } while (nob > 0);
 788
 789        return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
 790}
 791
 792int
 793kiblnd_post_tx_locked (kib_conn_t *conn, kib_tx_t *tx, int credit)
 794{
 795        kib_msg_t        *msg = tx->tx_msg;
 796        kib_peer_t      *peer = conn->ibc_peer;
 797        int             ver = conn->ibc_version;
 798        int             rc;
 799        int             done;
 800        struct ib_send_wr *bad_wrq;
 801
 802        LASSERT (tx->tx_queued);
 803        /* We rely on this for QP sizing */
 804        LASSERT (tx->tx_nwrq > 0);
 805        LASSERT (tx->tx_nwrq <= 1 + IBLND_RDMA_FRAGS(ver));
 806
 807        LASSERT (credit == 0 || credit == 1);
 808        LASSERT (conn->ibc_outstanding_credits >= 0);
 809        LASSERT (conn->ibc_outstanding_credits <= IBLND_MSG_QUEUE_SIZE(ver));
 810        LASSERT (conn->ibc_credits >= 0);
 811        LASSERT (conn->ibc_credits <= IBLND_MSG_QUEUE_SIZE(ver));
 812
 813        if (conn->ibc_nsends_posted == IBLND_CONCURRENT_SENDS(ver)) {
 814                /* tx completions outstanding... */
 815                CDEBUG(D_NET, "%s: posted enough\n",
 816                       libcfs_nid2str(peer->ibp_nid));
 817                return -EAGAIN;
 818        }
 819
 820        if (credit != 0 && conn->ibc_credits == 0) {   /* no credits */
 821                CDEBUG(D_NET, "%s: no credits\n",
 822                       libcfs_nid2str(peer->ibp_nid));
 823                return -EAGAIN;
 824        }
 825
 826        if (credit != 0 && !IBLND_OOB_CAPABLE(ver) &&
 827            conn->ibc_credits == 1 &&   /* last credit reserved */
 828            msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
 829                CDEBUG(D_NET, "%s: not using last credit\n",
 830                       libcfs_nid2str(peer->ibp_nid));
 831                return -EAGAIN;
 832        }
 833
 834        /* NB don't drop ibc_lock before bumping tx_sending */
 835        list_del(&tx->tx_list);
 836        tx->tx_queued = 0;
 837
 838        if (msg->ibm_type == IBLND_MSG_NOOP &&
 839            (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
 840             (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
 841              conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
 842                /* OK to drop when posted enough NOOPs, since
 843                 * kiblnd_check_sends will queue NOOP again when
 844                 * posted NOOPs complete */
 845                spin_unlock(&conn->ibc_lock);
 846                kiblnd_tx_done(peer->ibp_ni, tx);
 847                spin_lock(&conn->ibc_lock);
 848                CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
 849                       libcfs_nid2str(peer->ibp_nid),
 850                       conn->ibc_noops_posted);
 851                return 0;
 852        }
 853
 854        kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
 855                        peer->ibp_nid, conn->ibc_incarnation);
 856
 857        conn->ibc_credits -= credit;
 858        conn->ibc_outstanding_credits = 0;
 859        conn->ibc_nsends_posted++;
 860        if (msg->ibm_type == IBLND_MSG_NOOP)
 861                conn->ibc_noops_posted++;
 862
 863        /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
 864         * PUT.  If so, it was first queued here as a PUT_REQ, sent and
 865         * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
 866         * and then re-queued here.  It's (just) possible that
 867         * tx_sending is non-zero if we've not done the tx_complete()
 868         * from the first send; hence the ++ rather than = below. */
 869        tx->tx_sending++;
 870        list_add(&tx->tx_list, &conn->ibc_active_txs);
 871
 872        /* I'm still holding ibc_lock! */
 873        if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
 874                rc = -ECONNABORTED;
 875        } else if (tx->tx_pool->tpo_pool.po_failed ||
 876                 conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
 877                /* close_conn will launch failover */
 878                rc = -ENETDOWN;
 879        } else {
 880                rc = ib_post_send(conn->ibc_cmid->qp,
 881                                  tx->tx_wrq, &bad_wrq);
 882        }
 883
 884        conn->ibc_last_send = jiffies;
 885
 886        if (rc == 0)
 887                return 0;
 888
 889        /* NB credits are transferred in the actual
 890         * message, which can only be the last work item */
 891        conn->ibc_credits += credit;
 892        conn->ibc_outstanding_credits += msg->ibm_credits;
 893        conn->ibc_nsends_posted--;
 894        if (msg->ibm_type == IBLND_MSG_NOOP)
 895                conn->ibc_noops_posted--;
 896
 897        tx->tx_status = rc;
 898        tx->tx_waiting = 0;
 899        tx->tx_sending--;
 900
 901        done = (tx->tx_sending == 0);
 902        if (done)
 903                list_del(&tx->tx_list);
 904
 905        spin_unlock(&conn->ibc_lock);
 906
 907        if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
 908                CERROR("Error %d posting transmit to %s\n",
 909                       rc, libcfs_nid2str(peer->ibp_nid));
 910        else
 911                CDEBUG(D_NET, "Error %d posting transmit to %s\n",
 912                       rc, libcfs_nid2str(peer->ibp_nid));
 913
 914        kiblnd_close_conn(conn, rc);
 915
 916        if (done)
 917                kiblnd_tx_done(peer->ibp_ni, tx);
 918
 919        spin_lock(&conn->ibc_lock);
 920
 921        return -EIO;
 922}
 923
 924void
 925kiblnd_check_sends (kib_conn_t *conn)
 926{
 927        int     ver = conn->ibc_version;
 928        lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
 929        kib_tx_t  *tx;
 930
 931        /* Don't send anything until after the connection is established */
 932        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 933                CDEBUG(D_NET, "%s too soon\n",
 934                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
 935                return;
 936        }
 937
 938        spin_lock(&conn->ibc_lock);
 939
 940        LASSERT (conn->ibc_nsends_posted <= IBLND_CONCURRENT_SENDS(ver));
 941        LASSERT (!IBLND_OOB_CAPABLE(ver) ||
 942                 conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
 943        LASSERT (conn->ibc_reserved_credits >= 0);
 944
 945        while (conn->ibc_reserved_credits > 0 &&
 946               !list_empty(&conn->ibc_tx_queue_rsrvd)) {
 947                tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
 948                                    kib_tx_t, tx_list);
 949                list_del(&tx->tx_list);
 950                list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
 951                conn->ibc_reserved_credits--;
 952        }
 953
 954        if (kiblnd_need_noop(conn)) {
 955                spin_unlock(&conn->ibc_lock);
 956
 957                tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
 958                if (tx != NULL)
 959                        kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
 960
 961                spin_lock(&conn->ibc_lock);
 962                if (tx != NULL)
 963                        kiblnd_queue_tx_locked(tx, conn);
 964        }
 965
 966        kiblnd_conn_addref(conn); /* 1 ref for me.... (see b21911) */
 967
 968        for (;;) {
 969                int credit;
 970
 971                if (!list_empty(&conn->ibc_tx_queue_nocred)) {
 972                        credit = 0;
 973                        tx = list_entry(conn->ibc_tx_queue_nocred.next,
 974                                            kib_tx_t, tx_list);
 975                } else if (!list_empty(&conn->ibc_tx_noops)) {
 976                        LASSERT (!IBLND_OOB_CAPABLE(ver));
 977                        credit = 1;
 978                        tx = list_entry(conn->ibc_tx_noops.next,
 979                                        kib_tx_t, tx_list);
 980                } else if (!list_empty(&conn->ibc_tx_queue)) {
 981                        credit = 1;
 982                        tx = list_entry(conn->ibc_tx_queue.next,
 983                                            kib_tx_t, tx_list);
 984                } else
 985                        break;
 986
 987                if (kiblnd_post_tx_locked(conn, tx, credit) != 0)
 988                        break;
 989        }
 990
 991        spin_unlock(&conn->ibc_lock);
 992
 993        kiblnd_conn_decref(conn); /* ...until here */
 994}
 995
 996void
 997kiblnd_tx_complete (kib_tx_t *tx, int status)
 998{
 999        int        failed = (status != IB_WC_SUCCESS);
1000        kib_conn_t   *conn = tx->tx_conn;
1001        int        idle;
1002
1003        LASSERT (tx->tx_sending > 0);
1004
1005        if (failed) {
1006                if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
1007                        CNETERR("Tx -> %s cookie "LPX64
1008                                " sending %d waiting %d: failed %d\n",
1009                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1010                                tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
1011                                status);
1012
1013                kiblnd_close_conn(conn, -EIO);
1014        } else {
1015                kiblnd_peer_alive(conn->ibc_peer);
1016        }
1017
1018        spin_lock(&conn->ibc_lock);
1019
1020        /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1021         * gets to free it, which also drops its ref on 'conn'. */
1022
1023        tx->tx_sending--;
1024        conn->ibc_nsends_posted--;
1025        if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
1026                conn->ibc_noops_posted--;
1027
1028        if (failed) {
1029                tx->tx_waiting = 0;          /* don't wait for peer */
1030                tx->tx_status = -EIO;
1031        }
1032
1033        idle = (tx->tx_sending == 0) &&  /* This is the final callback */
1034               !tx->tx_waiting &&              /* Not waiting for peer */
1035               !tx->tx_queued;            /* Not re-queued (PUT_DONE) */
1036        if (idle)
1037                list_del(&tx->tx_list);
1038
1039        kiblnd_conn_addref(conn);              /* 1 ref for me.... */
1040
1041        spin_unlock(&conn->ibc_lock);
1042
1043        if (idle)
1044                kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1045
1046        kiblnd_check_sends(conn);
1047
1048        kiblnd_conn_decref(conn);              /* ...until here */
1049}
1050
1051void
1052kiblnd_init_tx_msg (lnet_ni_t *ni, kib_tx_t *tx, int type, int body_nob)
1053{
1054        kib_hca_dev_t     *hdev = tx->tx_pool->tpo_hdev;
1055        struct ib_sge     *sge = &tx->tx_sge[tx->tx_nwrq];
1056        struct ib_send_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1057        int             nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1058        struct ib_mr      *mr;
1059
1060        LASSERT (tx->tx_nwrq >= 0);
1061        LASSERT (tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1062        LASSERT (nob <= IBLND_MSG_SIZE);
1063
1064        kiblnd_init_msg(tx->tx_msg, type, body_nob);
1065
1066        mr = kiblnd_find_dma_mr(hdev, tx->tx_msgaddr, nob);
1067        LASSERT (mr != NULL);
1068
1069        sge->lkey   = mr->lkey;
1070        sge->addr   = tx->tx_msgaddr;
1071        sge->length = nob;
1072
1073        memset(wrq, 0, sizeof(*wrq));
1074
1075        wrq->next       = NULL;
1076        wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1077        wrq->sg_list    = sge;
1078        wrq->num_sge    = 1;
1079        wrq->opcode     = IB_WR_SEND;
1080        wrq->send_flags = IB_SEND_SIGNALED;
1081
1082        tx->tx_nwrq++;
1083}
1084
1085int
1086kiblnd_init_rdma (kib_conn_t *conn, kib_tx_t *tx, int type,
1087                  int resid, kib_rdma_desc_t *dstrd, __u64 dstcookie)
1088{
1089        kib_msg_t        *ibmsg = tx->tx_msg;
1090        kib_rdma_desc_t   *srcrd = tx->tx_rd;
1091        struct ib_sge     *sge = &tx->tx_sge[0];
1092        struct ib_send_wr *wrq = &tx->tx_wrq[0];
1093        int             rc  = resid;
1094        int             srcidx;
1095        int             dstidx;
1096        int             wrknob;
1097
1098        LASSERT (!in_interrupt());
1099        LASSERT (tx->tx_nwrq == 0);
1100        LASSERT (type == IBLND_MSG_GET_DONE ||
1101                 type == IBLND_MSG_PUT_DONE);
1102
1103        srcidx = dstidx = 0;
1104
1105        while (resid > 0) {
1106                if (srcidx >= srcrd->rd_nfrags) {
1107                        CERROR("Src buffer exhausted: %d frags\n", srcidx);
1108                        rc = -EPROTO;
1109                        break;
1110                }
1111
1112                if (dstidx == dstrd->rd_nfrags) {
1113                        CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1114                        rc = -EPROTO;
1115                        break;
1116                }
1117
1118                if (tx->tx_nwrq == IBLND_RDMA_FRAGS(conn->ibc_version)) {
1119                        CERROR("RDMA too fragmented for %s (%d): "
1120                               "%d/%d src %d/%d dst frags\n",
1121                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
1122                               IBLND_RDMA_FRAGS(conn->ibc_version),
1123                               srcidx, srcrd->rd_nfrags,
1124                               dstidx, dstrd->rd_nfrags);
1125                        rc = -EMSGSIZE;
1126                        break;
1127                }
1128
1129                wrknob = MIN(MIN(kiblnd_rd_frag_size(srcrd, srcidx),
1130                                 kiblnd_rd_frag_size(dstrd, dstidx)), resid);
1131
1132                sge = &tx->tx_sge[tx->tx_nwrq];
1133                sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1134                sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1135                sge->length = wrknob;
1136
1137                wrq = &tx->tx_wrq[tx->tx_nwrq];
1138
1139                wrq->next       = wrq + 1;
1140                wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1141                wrq->sg_list    = sge;
1142                wrq->num_sge    = 1;
1143                wrq->opcode     = IB_WR_RDMA_WRITE;
1144                wrq->send_flags = 0;
1145
1146                wrq->wr.rdma.remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1147                wrq->wr.rdma.rkey       = kiblnd_rd_frag_key(dstrd, dstidx);
1148
1149                srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1150                dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1151
1152                resid -= wrknob;
1153
1154                tx->tx_nwrq++;
1155                wrq++;
1156                sge++;
1157        }
1158
1159        if (rc < 0)                          /* no RDMA if completing with failure */
1160                tx->tx_nwrq = 0;
1161
1162        ibmsg->ibm_u.completion.ibcm_status = rc;
1163        ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1164        kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1165                           type, sizeof (kib_completion_msg_t));
1166
1167        return rc;
1168}
1169
1170void
1171kiblnd_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn)
1172{
1173        struct list_head   *q;
1174
1175        LASSERT (tx->tx_nwrq > 0);            /* work items set up */
1176        LASSERT (!tx->tx_queued);              /* not queued for sending already */
1177        LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1178
1179        tx->tx_queued = 1;
1180        tx->tx_deadline = jiffies + (*kiblnd_tunables.kib_timeout * HZ);
1181
1182        if (tx->tx_conn == NULL) {
1183                kiblnd_conn_addref(conn);
1184                tx->tx_conn = conn;
1185                LASSERT (tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1186        } else {
1187                /* PUT_DONE first attached to conn as a PUT_REQ */
1188                LASSERT (tx->tx_conn == conn);
1189                LASSERT (tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1190        }
1191
1192        switch (tx->tx_msg->ibm_type) {
1193        default:
1194                LBUG();
1195
1196        case IBLND_MSG_PUT_REQ:
1197        case IBLND_MSG_GET_REQ:
1198                q = &conn->ibc_tx_queue_rsrvd;
1199                break;
1200
1201        case IBLND_MSG_PUT_NAK:
1202        case IBLND_MSG_PUT_ACK:
1203        case IBLND_MSG_PUT_DONE:
1204        case IBLND_MSG_GET_DONE:
1205                q = &conn->ibc_tx_queue_nocred;
1206                break;
1207
1208        case IBLND_MSG_NOOP:
1209                if (IBLND_OOB_CAPABLE(conn->ibc_version))
1210                        q = &conn->ibc_tx_queue_nocred;
1211                else
1212                        q = &conn->ibc_tx_noops;
1213                break;
1214
1215        case IBLND_MSG_IMMEDIATE:
1216                q = &conn->ibc_tx_queue;
1217                break;
1218        }
1219
1220        list_add_tail(&tx->tx_list, q);
1221}
1222
1223void
1224kiblnd_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1225{
1226        spin_lock(&conn->ibc_lock);
1227        kiblnd_queue_tx_locked(tx, conn);
1228        spin_unlock(&conn->ibc_lock);
1229
1230        kiblnd_check_sends(conn);
1231}
1232
1233static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1234                               struct sockaddr_in *srcaddr,
1235                               struct sockaddr_in *dstaddr,
1236                               int timeout_ms)
1237{
1238        unsigned short port;
1239        int rc;
1240
1241        /* allow the port to be reused */
1242        rc = rdma_set_reuseaddr(cmid, 1);
1243        if (rc != 0) {
1244                CERROR("Unable to set reuse on cmid: %d\n", rc);
1245                return rc;
1246        }
1247
1248        /* look for a free privileged port */
1249        for (port = PROT_SOCK-1; port > 0; port--) {
1250                srcaddr->sin_port = htons(port);
1251                rc = rdma_resolve_addr(cmid,
1252                                       (struct sockaddr *)srcaddr,
1253                                       (struct sockaddr *)dstaddr,
1254                                       timeout_ms);
1255                if (rc == 0) {
1256                        CDEBUG(D_NET, "bound to port %hu\n", port);
1257                        return 0;
1258                } else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1259                        CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1260                               port, rc);
1261                } else {
1262                        return rc;
1263                }
1264        }
1265
1266        CERROR("Failed to bind to a free privileged port\n");
1267        return rc;
1268}
1269
1270void
1271kiblnd_connect_peer (kib_peer_t *peer)
1272{
1273        struct rdma_cm_id *cmid;
1274        kib_dev_t        *dev;
1275        kib_net_t        *net = peer->ibp_ni->ni_data;
1276        struct sockaddr_in srcaddr;
1277        struct sockaddr_in dstaddr;
1278        int             rc;
1279
1280        LASSERT (net != NULL);
1281        LASSERT (peer->ibp_connecting > 0);
1282
1283        cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1284                                     IB_QPT_RC);
1285
1286        if (IS_ERR(cmid)) {
1287                CERROR("Can't create CMID for %s: %ld\n",
1288                       libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1289                rc = PTR_ERR(cmid);
1290                goto failed;
1291        }
1292
1293        dev = net->ibn_dev;
1294        memset(&srcaddr, 0, sizeof(srcaddr));
1295        srcaddr.sin_family = AF_INET;
1296        srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1297
1298        memset(&dstaddr, 0, sizeof(dstaddr));
1299        dstaddr.sin_family = AF_INET;
1300        dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1301        dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1302
1303        kiblnd_peer_addref(peer);              /* cmid's ref */
1304
1305        if (*kiblnd_tunables.kib_use_priv_port) {
1306                rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1307                                         *kiblnd_tunables.kib_timeout * 1000);
1308        } else {
1309                rc = rdma_resolve_addr(cmid,
1310                                       (struct sockaddr *)&srcaddr,
1311                                       (struct sockaddr *)&dstaddr,
1312                                       *kiblnd_tunables.kib_timeout * 1000);
1313        }
1314        if (rc != 0) {
1315                /* Can't initiate address resolution:  */
1316                CERROR("Can't resolve addr for %s: %d\n",
1317                       libcfs_nid2str(peer->ibp_nid), rc);
1318                goto failed2;
1319        }
1320
1321        LASSERT (cmid->device != NULL);
1322        CDEBUG(D_NET, "%s: connection bound to %s:%pI4h:%s\n",
1323               libcfs_nid2str(peer->ibp_nid), dev->ibd_ifname,
1324               &dev->ibd_ifip, cmid->device->name);
1325
1326        return;
1327
1328 failed2:
1329        kiblnd_peer_decref(peer);              /* cmid's ref */
1330        rdma_destroy_id(cmid);
1331 failed:
1332        kiblnd_peer_connect_failed(peer, 1, rc);
1333}
1334
1335void
1336kiblnd_launch_tx (lnet_ni_t *ni, kib_tx_t *tx, lnet_nid_t nid)
1337{
1338        kib_peer_t      *peer;
1339        kib_peer_t      *peer2;
1340        kib_conn_t      *conn;
1341        rwlock_t        *g_lock = &kiblnd_data.kib_global_lock;
1342        unsigned long      flags;
1343        int             rc;
1344
1345        /* If I get here, I've committed to send, so I complete the tx with
1346         * failure on any problems */
1347
1348        LASSERT (tx == NULL || tx->tx_conn == NULL); /* only set when assigned a conn */
1349        LASSERT (tx == NULL || tx->tx_nwrq > 0);     /* work items have been set up */
1350
1351        /* First time, just use a read lock since I expect to find my peer
1352         * connected */
1353        read_lock_irqsave(g_lock, flags);
1354
1355        peer = kiblnd_find_peer_locked(nid);
1356        if (peer != NULL && !list_empty(&peer->ibp_conns)) {
1357                /* Found a peer with an established connection */
1358                conn = kiblnd_get_conn_locked(peer);
1359                kiblnd_conn_addref(conn); /* 1 ref for me... */
1360
1361                read_unlock_irqrestore(g_lock, flags);
1362
1363                if (tx != NULL)
1364                        kiblnd_queue_tx(tx, conn);
1365                kiblnd_conn_decref(conn); /* ...to here */
1366                return;
1367        }
1368
1369        read_unlock(g_lock);
1370        /* Re-try with a write lock */
1371        write_lock(g_lock);
1372
1373        peer = kiblnd_find_peer_locked(nid);
1374        if (peer != NULL) {
1375                if (list_empty(&peer->ibp_conns)) {
1376                        /* found a peer, but it's still connecting... */
1377                        LASSERT (peer->ibp_connecting != 0 ||
1378                                 peer->ibp_accepting != 0);
1379                        if (tx != NULL)
1380                                list_add_tail(&tx->tx_list,
1381                                                  &peer->ibp_tx_queue);
1382                        write_unlock_irqrestore(g_lock, flags);
1383                } else {
1384                        conn = kiblnd_get_conn_locked(peer);
1385                        kiblnd_conn_addref(conn); /* 1 ref for me... */
1386
1387                        write_unlock_irqrestore(g_lock, flags);
1388
1389                        if (tx != NULL)
1390                                kiblnd_queue_tx(tx, conn);
1391                        kiblnd_conn_decref(conn); /* ...to here */
1392                }
1393                return;
1394        }
1395
1396        write_unlock_irqrestore(g_lock, flags);
1397
1398        /* Allocate a peer ready to add to the peer table and retry */
1399        rc = kiblnd_create_peer(ni, &peer, nid);
1400        if (rc != 0) {
1401                CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1402                if (tx != NULL) {
1403                        tx->tx_status = -EHOSTUNREACH;
1404                        tx->tx_waiting = 0;
1405                        kiblnd_tx_done(ni, tx);
1406                }
1407                return;
1408        }
1409
1410        write_lock_irqsave(g_lock, flags);
1411
1412        peer2 = kiblnd_find_peer_locked(nid);
1413        if (peer2 != NULL) {
1414                if (list_empty(&peer2->ibp_conns)) {
1415                        /* found a peer, but it's still connecting... */
1416                        LASSERT (peer2->ibp_connecting != 0 ||
1417                                 peer2->ibp_accepting != 0);
1418                        if (tx != NULL)
1419                                list_add_tail(&tx->tx_list,
1420                                                  &peer2->ibp_tx_queue);
1421                        write_unlock_irqrestore(g_lock, flags);
1422                } else {
1423                        conn = kiblnd_get_conn_locked(peer2);
1424                        kiblnd_conn_addref(conn); /* 1 ref for me... */
1425
1426                        write_unlock_irqrestore(g_lock, flags);
1427
1428                        if (tx != NULL)
1429                                kiblnd_queue_tx(tx, conn);
1430                        kiblnd_conn_decref(conn); /* ...to here */
1431                }
1432
1433                kiblnd_peer_decref(peer);
1434                return;
1435        }
1436
1437        /* Brand new peer */
1438        LASSERT (peer->ibp_connecting == 0);
1439        peer->ibp_connecting = 1;
1440
1441        /* always called with a ref on ni, which prevents ni being shutdown */
1442        LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);
1443
1444        if (tx != NULL)
1445                list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1446
1447        kiblnd_peer_addref(peer);
1448        list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1449
1450        write_unlock_irqrestore(g_lock, flags);
1451
1452        kiblnd_connect_peer(peer);
1453        kiblnd_peer_decref(peer);
1454}
1455
1456int
1457kiblnd_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1458{
1459        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
1460        int            type = lntmsg->msg_type;
1461        lnet_process_id_t target = lntmsg->msg_target;
1462        int            target_is_router = lntmsg->msg_target_is_router;
1463        int            routing = lntmsg->msg_routing;
1464        unsigned int      payload_niov = lntmsg->msg_niov;
1465        struct iovec     *payload_iov = lntmsg->msg_iov;
1466        lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1467        unsigned int      payload_offset = lntmsg->msg_offset;
1468        unsigned int      payload_nob = lntmsg->msg_len;
1469        kib_msg_t       *ibmsg;
1470        kib_tx_t         *tx;
1471        int            nob;
1472        int            rc;
1473
1474        /* NB 'private' is different depending on what we're sending.... */
1475
1476        CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1477               payload_nob, payload_niov, libcfs_id2str(target));
1478
1479        LASSERT (payload_nob == 0 || payload_niov > 0);
1480        LASSERT (payload_niov <= LNET_MAX_IOV);
1481
1482        /* Thread context */
1483        LASSERT (!in_interrupt());
1484        /* payload is either all vaddrs or all pages */
1485        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1486
1487        switch (type) {
1488        default:
1489                LBUG();
1490                return (-EIO);
1491
1492        case LNET_MSG_ACK:
1493                LASSERT (payload_nob == 0);
1494                break;
1495
1496        case LNET_MSG_GET:
1497                if (routing || target_is_router)
1498                        break;            /* send IMMEDIATE */
1499
1500                /* is the REPLY message too small for RDMA? */
1501                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1502                if (nob <= IBLND_MSG_SIZE)
1503                        break;            /* send IMMEDIATE */
1504
1505                tx = kiblnd_get_idle_tx(ni, target.nid);
1506                if (tx == NULL) {
1507                        CERROR("Can't allocate txd for GET to %s\n",
1508                               libcfs_nid2str(target.nid));
1509                        return -ENOMEM;
1510                }
1511
1512                ibmsg = tx->tx_msg;
1513
1514                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1515                        rc = kiblnd_setup_rd_iov(ni, tx,
1516                                                 &ibmsg->ibm_u.get.ibgm_rd,
1517                                                 lntmsg->msg_md->md_niov,
1518                                                 lntmsg->msg_md->md_iov.iov,
1519                                                 0, lntmsg->msg_md->md_length);
1520                else
1521                        rc = kiblnd_setup_rd_kiov(ni, tx,
1522                                                  &ibmsg->ibm_u.get.ibgm_rd,
1523                                                  lntmsg->msg_md->md_niov,
1524                                                  lntmsg->msg_md->md_iov.kiov,
1525                                                  0, lntmsg->msg_md->md_length);
1526                if (rc != 0) {
1527                        CERROR("Can't setup GET sink for %s: %d\n",
1528                               libcfs_nid2str(target.nid), rc);
1529                        kiblnd_tx_done(ni, tx);
1530                        return -EIO;
1531                }
1532
1533                nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[tx->tx_nfrags]);
1534                ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1535                ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1536
1537                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1538
1539                tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1540                if (tx->tx_lntmsg[1] == NULL) {
1541                        CERROR("Can't create reply for GET -> %s\n",
1542                               libcfs_nid2str(target.nid));
1543                        kiblnd_tx_done(ni, tx);
1544                        return -EIO;
1545                }
1546
1547                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1548                tx->tx_waiting = 1;          /* waiting for GET_DONE */
1549                kiblnd_launch_tx(ni, tx, target.nid);
1550                return 0;
1551
1552        case LNET_MSG_REPLY:
1553        case LNET_MSG_PUT:
1554                /* Is the payload small enough not to need RDMA? */
1555                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1556                if (nob <= IBLND_MSG_SIZE)
1557                        break;            /* send IMMEDIATE */
1558
1559                tx = kiblnd_get_idle_tx(ni, target.nid);
1560                if (tx == NULL) {
1561                        CERROR("Can't allocate %s txd for %s\n",
1562                               type == LNET_MSG_PUT ? "PUT" : "REPLY",
1563                               libcfs_nid2str(target.nid));
1564                        return -ENOMEM;
1565                }
1566
1567                if (payload_kiov == NULL)
1568                        rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1569                                                 payload_niov, payload_iov,
1570                                                 payload_offset, payload_nob);
1571                else
1572                        rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1573                                                  payload_niov, payload_kiov,
1574                                                  payload_offset, payload_nob);
1575                if (rc != 0) {
1576                        CERROR("Can't setup PUT src for %s: %d\n",
1577                               libcfs_nid2str(target.nid), rc);
1578                        kiblnd_tx_done(ni, tx);
1579                        return -EIO;
1580                }
1581
1582                ibmsg = tx->tx_msg;
1583                ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1584                ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1585                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1586
1587                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1588                tx->tx_waiting = 1;          /* waiting for PUT_{ACK,NAK} */
1589                kiblnd_launch_tx(ni, tx, target.nid);
1590                return 0;
1591        }
1592
1593        /* send IMMEDIATE */
1594
1595        LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1596                 <= IBLND_MSG_SIZE);
1597
1598        tx = kiblnd_get_idle_tx(ni, target.nid);
1599        if (tx == NULL) {
1600                CERROR ("Can't send %d to %s: tx descs exhausted\n",
1601                        type, libcfs_nid2str(target.nid));
1602                return -ENOMEM;
1603        }
1604
1605        ibmsg = tx->tx_msg;
1606        ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1607
1608        if (payload_kiov != NULL)
1609                lnet_copy_kiov2flat(IBLND_MSG_SIZE, ibmsg,
1610                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1611                                    payload_niov, payload_kiov,
1612                                    payload_offset, payload_nob);
1613        else
1614                lnet_copy_iov2flat(IBLND_MSG_SIZE, ibmsg,
1615                                   offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1616                                   payload_niov, payload_iov,
1617                                   payload_offset, payload_nob);
1618
1619        nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1620        kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1621
1622        tx->tx_lntmsg[0] = lntmsg;            /* finalise lntmsg on completion */
1623        kiblnd_launch_tx(ni, tx, target.nid);
1624        return 0;
1625}
1626
1627void
1628kiblnd_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1629{
1630        lnet_process_id_t target = lntmsg->msg_target;
1631        unsigned int      niov = lntmsg->msg_niov;
1632        struct iovec     *iov = lntmsg->msg_iov;
1633        lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1634        unsigned int      offset = lntmsg->msg_offset;
1635        unsigned int      nob = lntmsg->msg_len;
1636        kib_tx_t         *tx;
1637        int            rc;
1638
1639        tx = kiblnd_get_idle_tx(ni, rx->rx_conn->ibc_peer->ibp_nid);
1640        if (tx == NULL) {
1641                CERROR("Can't get tx for REPLY to %s\n",
1642                       libcfs_nid2str(target.nid));
1643                goto failed_0;
1644        }
1645
1646        if (nob == 0)
1647                rc = 0;
1648        else if (kiov == NULL)
1649                rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1650                                         niov, iov, offset, nob);
1651        else
1652                rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1653                                          niov, kiov, offset, nob);
1654
1655        if (rc != 0) {
1656                CERROR("Can't setup GET src for %s: %d\n",
1657                       libcfs_nid2str(target.nid), rc);
1658                goto failed_1;
1659        }
1660
1661        rc = kiblnd_init_rdma(rx->rx_conn, tx,
1662                              IBLND_MSG_GET_DONE, nob,
1663                              &rx->rx_msg->ibm_u.get.ibgm_rd,
1664                              rx->rx_msg->ibm_u.get.ibgm_cookie);
1665        if (rc < 0) {
1666                CERROR("Can't setup rdma for GET from %s: %d\n",
1667                       libcfs_nid2str(target.nid), rc);
1668                goto failed_1;
1669        }
1670
1671        if (nob == 0) {
1672                /* No RDMA: local completion may happen now! */
1673                lnet_finalize(ni, lntmsg, 0);
1674        } else {
1675                /* RDMA: lnet_finalize(lntmsg) when it
1676                 * completes */
1677                tx->tx_lntmsg[0] = lntmsg;
1678        }
1679
1680        kiblnd_queue_tx(tx, rx->rx_conn);
1681        return;
1682
1683 failed_1:
1684        kiblnd_tx_done(ni, tx);
1685 failed_0:
1686        lnet_finalize(ni, lntmsg, -EIO);
1687}
1688
1689int
1690kiblnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1691             unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1692             unsigned int offset, unsigned int mlen, unsigned int rlen)
1693{
1694        kib_rx_t    *rx = private;
1695        kib_msg_t   *rxmsg = rx->rx_msg;
1696        kib_conn_t  *conn = rx->rx_conn;
1697        kib_tx_t    *tx;
1698        kib_msg_t   *txmsg;
1699        int       nob;
1700        int       post_credit = IBLND_POSTRX_PEER_CREDIT;
1701        int       rc = 0;
1702
1703        LASSERT (mlen <= rlen);
1704        LASSERT (!in_interrupt());
1705        /* Either all pages or all vaddrs */
1706        LASSERT (!(kiov != NULL && iov != NULL));
1707
1708        switch (rxmsg->ibm_type) {
1709        default:
1710                LBUG();
1711
1712        case IBLND_MSG_IMMEDIATE:
1713                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1714                if (nob > rx->rx_nob) {
1715                        CERROR ("Immediate message from %s too big: %d(%d)\n",
1716                                libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1717                                nob, rx->rx_nob);
1718                        rc = -EPROTO;
1719                        break;
1720                }
1721
1722                if (kiov != NULL)
1723                        lnet_copy_flat2kiov(niov, kiov, offset,
1724                                            IBLND_MSG_SIZE, rxmsg,
1725                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1726                                            mlen);
1727                else
1728                        lnet_copy_flat2iov(niov, iov, offset,
1729                                           IBLND_MSG_SIZE, rxmsg,
1730                                           offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1731                                           mlen);
1732                lnet_finalize (ni, lntmsg, 0);
1733                break;
1734
1735        case IBLND_MSG_PUT_REQ:
1736                if (mlen == 0) {
1737                        lnet_finalize(ni, lntmsg, 0);
1738                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1739                                               rxmsg->ibm_u.putreq.ibprm_cookie);
1740                        break;
1741                }
1742
1743                tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
1744                if (tx == NULL) {
1745                        CERROR("Can't allocate tx for %s\n",
1746                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
1747                        /* Not replying will break the connection */
1748                        rc = -ENOMEM;
1749                        break;
1750                }
1751
1752                txmsg = tx->tx_msg;
1753                if (kiov == NULL)
1754                        rc = kiblnd_setup_rd_iov(ni, tx,
1755                                                 &txmsg->ibm_u.putack.ibpam_rd,
1756                                                 niov, iov, offset, mlen);
1757                else
1758                        rc = kiblnd_setup_rd_kiov(ni, tx,
1759                                                  &txmsg->ibm_u.putack.ibpam_rd,
1760                                                  niov, kiov, offset, mlen);
1761                if (rc != 0) {
1762                        CERROR("Can't setup PUT sink for %s: %d\n",
1763                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1764                        kiblnd_tx_done(ni, tx);
1765                        /* tell peer it's over */
1766                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1767                                               rxmsg->ibm_u.putreq.ibprm_cookie);
1768                        break;
1769                }
1770
1771                nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[tx->tx_nfrags]);
1772                txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1773                txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1774
1775                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1776
1777                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1778                tx->tx_waiting = 1;          /* waiting for PUT_DONE */
1779                kiblnd_queue_tx(tx, conn);
1780
1781                /* reposted buffer reserved for PUT_DONE */
1782                post_credit = IBLND_POSTRX_NO_CREDIT;
1783                break;
1784
1785        case IBLND_MSG_GET_REQ:
1786                if (lntmsg != NULL) {
1787                        /* Optimized GET; RDMA lntmsg's payload */
1788                        kiblnd_reply(ni, rx, lntmsg);
1789                } else {
1790                        /* GET didn't match anything */
1791                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1792                                               -ENODATA,
1793                                               rxmsg->ibm_u.get.ibgm_cookie);
1794                }
1795                break;
1796        }
1797
1798        kiblnd_post_rx(rx, post_credit);
1799        return rc;
1800}
1801
1802int
1803kiblnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
1804{
1805        struct task_struct *task = kthread_run(fn, arg, "%s", name);
1806
1807        if (IS_ERR(task))
1808                return PTR_ERR(task);
1809
1810        atomic_inc(&kiblnd_data.kib_nthreads);
1811        return 0;
1812}
1813
1814void
1815kiblnd_thread_fini (void)
1816{
1817        atomic_dec (&kiblnd_data.kib_nthreads);
1818}
1819
1820void
1821kiblnd_peer_alive (kib_peer_t *peer)
1822{
1823        /* This is racy, but everyone's only writing cfs_time_current() */
1824        peer->ibp_last_alive = cfs_time_current();
1825        mb();
1826}
1827
1828void
1829kiblnd_peer_notify (kib_peer_t *peer)
1830{
1831        int        error = 0;
1832        cfs_time_t    last_alive = 0;
1833        unsigned long flags;
1834
1835        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1836
1837        if (list_empty(&peer->ibp_conns) &&
1838            peer->ibp_accepting == 0 &&
1839            peer->ibp_connecting == 0 &&
1840            peer->ibp_error != 0) {
1841                error = peer->ibp_error;
1842                peer->ibp_error = 0;
1843
1844                last_alive = peer->ibp_last_alive;
1845        }
1846
1847        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1848
1849        if (error != 0)
1850                lnet_notify(peer->ibp_ni,
1851                            peer->ibp_nid, 0, last_alive);
1852}
1853
1854void
1855kiblnd_close_conn_locked (kib_conn_t *conn, int error)
1856{
1857        /* This just does the immediate housekeeping.  'error' is zero for a
1858         * normal shutdown which can happen only after the connection has been
1859         * established.  If the connection is established, schedule the
1860         * connection to be finished off by the connd.  Otherwise the connd is
1861         * already dealing with it (either to set it up or tear it down).
1862         * Caller holds kib_global_lock exclusively in irq context */
1863        kib_peer_t       *peer = conn->ibc_peer;
1864        kib_dev_t       *dev;
1865        unsigned long     flags;
1866
1867        LASSERT (error != 0 || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1868
1869        if (error != 0 && conn->ibc_comms_error == 0)
1870                conn->ibc_comms_error = error;
1871
1872        if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1873                return; /* already being handled  */
1874
1875        if (error == 0 &&
1876            list_empty(&conn->ibc_tx_noops) &&
1877            list_empty(&conn->ibc_tx_queue) &&
1878            list_empty(&conn->ibc_tx_queue_rsrvd) &&
1879            list_empty(&conn->ibc_tx_queue_nocred) &&
1880            list_empty(&conn->ibc_active_txs)) {
1881                CDEBUG(D_NET, "closing conn to %s\n",
1882                       libcfs_nid2str(peer->ibp_nid));
1883        } else {
1884                CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1885                       libcfs_nid2str(peer->ibp_nid), error,
1886                       list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1887                       list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1888                       list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1889                       list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1890                       list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1891        }
1892
1893        dev = ((kib_net_t *)peer->ibp_ni->ni_data)->ibn_dev;
1894        list_del(&conn->ibc_list);
1895        /* connd (see below) takes over ibc_list's ref */
1896
1897        if (list_empty (&peer->ibp_conns) &&    /* no more conns */
1898            kiblnd_peer_active(peer)) {  /* still in peer table */
1899                kiblnd_unlink_peer_locked(peer);
1900
1901                /* set/clear error on last conn */
1902                peer->ibp_error = conn->ibc_comms_error;
1903        }
1904
1905        kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1906
1907        if (error != 0 &&
1908            kiblnd_dev_can_failover(dev)) {
1909                list_add_tail(&dev->ibd_fail_list,
1910                              &kiblnd_data.kib_failed_devs);
1911                wake_up(&kiblnd_data.kib_failover_waitq);
1912        }
1913
1914        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1915
1916        list_add_tail(&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1917        wake_up(&kiblnd_data.kib_connd_waitq);
1918
1919        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1920}
1921
1922void
1923kiblnd_close_conn(kib_conn_t *conn, int error)
1924{
1925        unsigned long flags;
1926
1927        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1928
1929        kiblnd_close_conn_locked(conn, error);
1930
1931        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1932}
1933
1934void
1935kiblnd_handle_early_rxs(kib_conn_t *conn)
1936{
1937        unsigned long    flags;
1938        kib_rx_t        *rx;
1939
1940        LASSERT(!in_interrupt());
1941        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1942
1943        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1944        while (!list_empty(&conn->ibc_early_rxs)) {
1945                rx = list_entry(conn->ibc_early_rxs.next,
1946                                    kib_rx_t, rx_list);
1947                list_del(&rx->rx_list);
1948                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1949
1950                kiblnd_handle_rx(rx);
1951
1952                write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1953        }
1954        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1955}
1956
1957void
1958kiblnd_abort_txs(kib_conn_t *conn, struct list_head *txs)
1959{
1960        LIST_HEAD       (zombies);
1961        struct list_head          *tmp;
1962        struct list_head          *nxt;
1963        kib_tx_t            *tx;
1964
1965        spin_lock(&conn->ibc_lock);
1966
1967        list_for_each_safe (tmp, nxt, txs) {
1968                tx = list_entry (tmp, kib_tx_t, tx_list);
1969
1970                if (txs == &conn->ibc_active_txs) {
1971                        LASSERT (!tx->tx_queued);
1972                        LASSERT (tx->tx_waiting ||
1973                                 tx->tx_sending != 0);
1974                } else {
1975                        LASSERT (tx->tx_queued);
1976                }
1977
1978                tx->tx_status = -ECONNABORTED;
1979                tx->tx_waiting = 0;
1980
1981                if (tx->tx_sending == 0) {
1982                        tx->tx_queued = 0;
1983                        list_del (&tx->tx_list);
1984                        list_add (&tx->tx_list, &zombies);
1985                }
1986        }
1987
1988        spin_unlock(&conn->ibc_lock);
1989
1990        kiblnd_txlist_done(conn->ibc_peer->ibp_ni, &zombies, -ECONNABORTED);
1991}
1992
1993void
1994kiblnd_finalise_conn (kib_conn_t *conn)
1995{
1996        LASSERT (!in_interrupt());
1997        LASSERT (conn->ibc_state > IBLND_CONN_INIT);
1998
1999        kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2000
2001        /* abort_receives moves QP state to IB_QPS_ERR.  This is only required
2002         * for connections that didn't get as far as being connected, because
2003         * rdma_disconnect() does this for free. */
2004        kiblnd_abort_receives(conn);
2005
2006        /* Complete all tx descs not waiting for sends to complete.
2007         * NB we should be safe from RDMA now that the QP has changed state */
2008
2009        kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2010        kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2011        kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2012        kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2013        kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2014
2015        kiblnd_handle_early_rxs(conn);
2016}
2017
2018void
2019kiblnd_peer_connect_failed (kib_peer_t *peer, int active, int error)
2020{
2021        LIST_HEAD    (zombies);
2022        unsigned long     flags;
2023
2024        LASSERT (error != 0);
2025        LASSERT (!in_interrupt());
2026
2027        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2028
2029        if (active) {
2030                LASSERT (peer->ibp_connecting > 0);
2031                peer->ibp_connecting--;
2032        } else {
2033                LASSERT (peer->ibp_accepting > 0);
2034                peer->ibp_accepting--;
2035        }
2036
2037        if (peer->ibp_connecting != 0 ||
2038            peer->ibp_accepting != 0) {
2039                /* another connection attempt under way... */
2040                write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2041                                            flags);
2042                return;
2043        }
2044
2045        if (list_empty(&peer->ibp_conns)) {
2046                /* Take peer's blocked transmits to complete with error */
2047                list_add(&zombies, &peer->ibp_tx_queue);
2048                list_del_init(&peer->ibp_tx_queue);
2049
2050                if (kiblnd_peer_active(peer))
2051                        kiblnd_unlink_peer_locked(peer);
2052
2053                peer->ibp_error = error;
2054        } else {
2055                /* Can't have blocked transmits if there are connections */
2056                LASSERT (list_empty(&peer->ibp_tx_queue));
2057        }
2058
2059        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2060
2061        kiblnd_peer_notify(peer);
2062
2063        if (list_empty (&zombies))
2064                return;
2065
2066        CNETERR("Deleting messages for %s: connection failed\n",
2067                libcfs_nid2str(peer->ibp_nid));
2068
2069        kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2070}
2071
2072void
2073kiblnd_connreq_done(kib_conn_t *conn, int status)
2074{
2075        kib_peer_t      *peer = conn->ibc_peer;
2076        kib_tx_t          *tx;
2077        struct list_head         txs;
2078        unsigned long      flags;
2079        int             active;
2080
2081        active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2082
2083        CDEBUG(D_NET,"%s: active(%d), version(%x), status(%d)\n",
2084               libcfs_nid2str(peer->ibp_nid), active,
2085               conn->ibc_version, status);
2086
2087        LASSERT (!in_interrupt());
2088        LASSERT ((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2089                  peer->ibp_connecting > 0) ||
2090                 (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2091                  peer->ibp_accepting > 0));
2092
2093        LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2094        conn->ibc_connvars = NULL;
2095
2096        if (status != 0) {
2097                /* failed to establish connection */
2098                kiblnd_peer_connect_failed(peer, active, status);
2099                kiblnd_finalise_conn(conn);
2100                return;
2101        }
2102
2103        /* connection established */
2104        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2105
2106        conn->ibc_last_send = jiffies;
2107        kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2108        kiblnd_peer_alive(peer);
2109
2110        /* Add conn to peer's list and nuke any dangling conns from a different
2111         * peer instance... */
2112        kiblnd_conn_addref(conn);              /* +1 ref for ibc_list */
2113        list_add(&conn->ibc_list, &peer->ibp_conns);
2114        if (active)
2115                peer->ibp_connecting--;
2116        else
2117                peer->ibp_accepting--;
2118
2119        if (peer->ibp_version == 0) {
2120                peer->ibp_version     = conn->ibc_version;
2121                peer->ibp_incarnation = conn->ibc_incarnation;
2122        }
2123
2124        if (peer->ibp_version     != conn->ibc_version ||
2125            peer->ibp_incarnation != conn->ibc_incarnation) {
2126                kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2127                                                conn->ibc_incarnation);
2128                peer->ibp_version     = conn->ibc_version;
2129                peer->ibp_incarnation = conn->ibc_incarnation;
2130        }
2131
2132        /* grab pending txs while I have the lock */
2133        list_add(&txs, &peer->ibp_tx_queue);
2134        list_del_init(&peer->ibp_tx_queue);
2135
2136        if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2137            conn->ibc_comms_error != 0) {       /* error has happened already */
2138                lnet_ni_t *ni = peer->ibp_ni;
2139
2140                /* start to shut down connection */
2141                kiblnd_close_conn_locked(conn, -ECONNABORTED);
2142                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2143
2144                kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2145
2146                return;
2147        }
2148
2149        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2150
2151        /* Schedule blocked txs */
2152        spin_lock(&conn->ibc_lock);
2153        while (!list_empty(&txs)) {
2154                tx = list_entry(txs.next, kib_tx_t, tx_list);
2155                list_del(&tx->tx_list);
2156
2157                kiblnd_queue_tx_locked(tx, conn);
2158        }
2159        spin_unlock(&conn->ibc_lock);
2160
2161        kiblnd_check_sends(conn);
2162
2163        /* schedule blocked rxs */
2164        kiblnd_handle_early_rxs(conn);
2165}
2166
2167void
2168kiblnd_reject(struct rdma_cm_id *cmid, kib_rej_t *rej)
2169{
2170        int       rc;
2171
2172        rc = rdma_reject(cmid, rej, sizeof(*rej));
2173
2174        if (rc != 0)
2175                CWARN("Error %d sending reject\n", rc);
2176}
2177
2178int
2179kiblnd_passive_connect (struct rdma_cm_id *cmid, void *priv, int priv_nob)
2180{
2181        rwlock_t                *g_lock = &kiblnd_data.kib_global_lock;
2182        kib_msg_t            *reqmsg = priv;
2183        kib_msg_t            *ackmsg;
2184        kib_dev_t            *ibdev;
2185        kib_peer_t          *peer;
2186        kib_peer_t          *peer2;
2187        kib_conn_t          *conn;
2188        lnet_ni_t            *ni  = NULL;
2189        kib_net_t            *net = NULL;
2190        lnet_nid_t           nid;
2191        struct rdma_conn_param cp;
2192        kib_rej_t             rej;
2193        int                 version = IBLND_MSG_VERSION;
2194        unsigned long     flags;
2195        int                 rc;
2196        struct sockaddr_in    *peer_addr;
2197        LASSERT (!in_interrupt());
2198
2199        /* cmid inherits 'context' from the corresponding listener id */
2200        ibdev = (kib_dev_t *)cmid->context;
2201        LASSERT (ibdev != NULL);
2202
2203        memset(&rej, 0, sizeof(rej));
2204        rej.ibr_magic           = IBLND_MSG_MAGIC;
2205        rej.ibr_why               = IBLND_REJECT_FATAL;
2206        rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2207
2208        peer_addr = (struct sockaddr_in *)&(cmid->route.addr.dst_addr);
2209        if (*kiblnd_tunables.kib_require_priv_port &&
2210            ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2211                __u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2212                CERROR("Peer's port (%pI4h:%hu) is not privileged\n",
2213                       &ip, ntohs(peer_addr->sin_port));
2214                goto failed;
2215        }
2216
2217        if (priv_nob < offsetof(kib_msg_t, ibm_type)) {
2218                CERROR("Short connection request\n");
2219                goto failed;
2220        }
2221
2222        /* Future protocol version compatibility support!  If the
2223         * o2iblnd-specific protocol changes, or when LNET unifies
2224         * protocols over all LNDs, the initial connection will
2225         * negotiate a protocol version.  I trap this here to avoid
2226         * console errors; the reject tells the peer which protocol I
2227         * speak. */
2228        if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2229            reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2230                goto failed;
2231        if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2232            reqmsg->ibm_version != IBLND_MSG_VERSION &&
2233            reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2234                goto failed;
2235        if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2236            reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2237            reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2238                goto failed;
2239
2240        rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2241        if (rc != 0) {
2242                CERROR("Can't parse connection request: %d\n", rc);
2243                goto failed;
2244        }
2245
2246        nid = reqmsg->ibm_srcnid;
2247        ni  = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2248
2249        if (ni != NULL) {
2250                net = (kib_net_t *)ni->ni_data;
2251                rej.ibr_incarnation = net->ibn_incarnation;
2252        }
2253
2254        if (ni == NULL ||                        /* no matching net */
2255            ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2256            net->ibn_dev != ibdev) {          /* wrong device */
2257                CERROR("Can't accept %s on %s (%s:%d:%pI4h): "
2258                       "bad dst nid %s\n", libcfs_nid2str(nid),
2259                       ni == NULL ? "NA" : libcfs_nid2str(ni->ni_nid),
2260                       ibdev->ibd_ifname, ibdev->ibd_nnets,
2261                       &ibdev->ibd_ifip,
2262                       libcfs_nid2str(reqmsg->ibm_dstnid));
2263
2264                goto failed;
2265        }
2266
2267       /* check time stamp as soon as possible */
2268        if (reqmsg->ibm_dststamp != 0 &&
2269            reqmsg->ibm_dststamp != net->ibn_incarnation) {
2270                CWARN("Stale connection request\n");
2271                rej.ibr_why = IBLND_REJECT_CONN_STALE;
2272                goto failed;
2273        }
2274
2275        /* I can accept peer's version */
2276        version = reqmsg->ibm_version;
2277
2278        if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2279                CERROR("Unexpected connreq msg type: %x from %s\n",
2280                       reqmsg->ibm_type, libcfs_nid2str(nid));
2281                goto failed;
2282        }
2283
2284        if (reqmsg->ibm_u.connparams.ibcp_queue_depth !=
2285            IBLND_MSG_QUEUE_SIZE(version)) {
2286                CERROR("Can't accept %s: incompatible queue depth %d (%d wanted)\n",
2287                       libcfs_nid2str(nid), reqmsg->ibm_u.connparams.ibcp_queue_depth,
2288                       IBLND_MSG_QUEUE_SIZE(version));
2289
2290                if (version == IBLND_MSG_VERSION)
2291                        rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2292
2293                goto failed;
2294        }
2295
2296        if (reqmsg->ibm_u.connparams.ibcp_max_frags !=
2297            IBLND_RDMA_FRAGS(version)) {
2298                CERROR("Can't accept %s(version %x): "
2299                       "incompatible max_frags %d (%d wanted)\n",
2300                       libcfs_nid2str(nid), version,
2301                       reqmsg->ibm_u.connparams.ibcp_max_frags,
2302                       IBLND_RDMA_FRAGS(version));
2303
2304                if (version == IBLND_MSG_VERSION)
2305                        rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2306
2307                goto failed;
2308
2309        }
2310
2311        if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2312                CERROR("Can't accept %s: message size %d too big (%d max)\n",
2313                       libcfs_nid2str(nid),
2314                       reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2315                       IBLND_MSG_SIZE);
2316                goto failed;
2317        }
2318
2319        /* assume 'nid' is a new peer; create  */
2320        rc = kiblnd_create_peer(ni, &peer, nid);
2321        if (rc != 0) {
2322                CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2323                rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2324                goto failed;
2325        }
2326
2327        write_lock_irqsave(g_lock, flags);
2328
2329        peer2 = kiblnd_find_peer_locked(nid);
2330        if (peer2 != NULL) {
2331                if (peer2->ibp_version == 0) {
2332                        peer2->ibp_version     = version;
2333                        peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2334                }
2335
2336                /* not the guy I've talked with */
2337                if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2338                    peer2->ibp_version     != version) {
2339                        kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2340                        write_unlock_irqrestore(g_lock, flags);
2341
2342                        CWARN("Conn stale %s [old ver: %x, new ver: %x]\n",
2343                              libcfs_nid2str(nid), peer2->ibp_version, version);
2344
2345                        kiblnd_peer_decref(peer);
2346                        rej.ibr_why = IBLND_REJECT_CONN_STALE;
2347                        goto failed;
2348                }
2349
2350                /* tie-break connection race in favour of the higher NID */
2351                if (peer2->ibp_connecting != 0 &&
2352                    nid < ni->ni_nid) {
2353                        write_unlock_irqrestore(g_lock, flags);
2354
2355                        CWARN("Conn race %s\n", libcfs_nid2str(peer2->ibp_nid));
2356
2357                        kiblnd_peer_decref(peer);
2358                        rej.ibr_why = IBLND_REJECT_CONN_RACE;
2359                        goto failed;
2360                }
2361
2362                peer2->ibp_accepting++;
2363                kiblnd_peer_addref(peer2);
2364
2365                write_unlock_irqrestore(g_lock, flags);
2366                kiblnd_peer_decref(peer);
2367                peer = peer2;
2368        } else {
2369                /* Brand new peer */
2370                LASSERT (peer->ibp_accepting == 0);
2371                LASSERT (peer->ibp_version == 0 &&
2372                         peer->ibp_incarnation == 0);
2373
2374                peer->ibp_accepting   = 1;
2375                peer->ibp_version     = version;
2376                peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2377
2378                /* I have a ref on ni that prevents it being shutdown */
2379                LASSERT (net->ibn_shutdown == 0);
2380
2381                kiblnd_peer_addref(peer);
2382                list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2383
2384                write_unlock_irqrestore(g_lock, flags);
2385        }
2386
2387        conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT, version);
2388        if (conn == NULL) {
2389                kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2390                kiblnd_peer_decref(peer);
2391                rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2392                goto failed;
2393        }
2394
2395        /* conn now "owns" cmid, so I return success from here on to ensure the
2396         * CM callback doesn't destroy cmid. */
2397
2398        conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2399        conn->ibc_credits         = IBLND_MSG_QUEUE_SIZE(version);
2400        conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE(version);
2401        LASSERT (conn->ibc_credits + conn->ibc_reserved_credits + IBLND_OOB_MSGS(version)
2402                 <= IBLND_RX_MSGS(version));
2403
2404        ackmsg = &conn->ibc_connvars->cv_msg;
2405        memset(ackmsg, 0, sizeof(*ackmsg));
2406
2407        kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2408                        sizeof(ackmsg->ibm_u.connparams));
2409        ackmsg->ibm_u.connparams.ibcp_queue_depth  = IBLND_MSG_QUEUE_SIZE(version);
2410        ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2411        ackmsg->ibm_u.connparams.ibcp_max_frags    = IBLND_RDMA_FRAGS(version);
2412
2413        kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2414
2415        memset(&cp, 0, sizeof(cp));
2416        cp.private_data = ackmsg;
2417        cp.private_data_len    = ackmsg->ibm_nob;
2418        cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2419        cp.initiator_depth     = 0;
2420        cp.flow_control = 1;
2421        cp.retry_count   = *kiblnd_tunables.kib_retry_count;
2422        cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2423
2424        CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2425
2426        rc = rdma_accept(cmid, &cp);
2427        if (rc != 0) {
2428                CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2429                rej.ibr_version = version;
2430                rej.ibr_why     = IBLND_REJECT_FATAL;
2431
2432                kiblnd_reject(cmid, &rej);
2433                kiblnd_connreq_done(conn, rc);
2434                kiblnd_conn_decref(conn);
2435        }
2436
2437        lnet_ni_decref(ni);
2438        return 0;
2439
2440 failed:
2441        if (ni != NULL)
2442                lnet_ni_decref(ni);
2443
2444        rej.ibr_version = version;
2445        rej.ibr_cp.ibcp_queue_depth = IBLND_MSG_QUEUE_SIZE(version);
2446        rej.ibr_cp.ibcp_max_frags   = IBLND_RDMA_FRAGS(version);
2447        kiblnd_reject(cmid, &rej);
2448
2449        return -ECONNREFUSED;
2450}
2451
2452void
2453kiblnd_reconnect (kib_conn_t *conn, int version,
2454                  __u64 incarnation, int why, kib_connparams_t *cp)
2455{
2456        kib_peer_t    *peer = conn->ibc_peer;
2457        char      *reason;
2458        int         retry = 0;
2459        unsigned long  flags;
2460
2461        LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2462        LASSERT (peer->ibp_connecting > 0);     /* 'conn' at least */
2463
2464        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2465
2466        /* retry connection if it's still needed and no other connection
2467         * attempts (active or passive) are in progress
2468         * NB: reconnect is still needed even when ibp_tx_queue is
2469         * empty if ibp_version != version because reconnect may be
2470         * initiated by kiblnd_query() */
2471        if ((!list_empty(&peer->ibp_tx_queue) ||
2472             peer->ibp_version != version) &&
2473            peer->ibp_connecting == 1 &&
2474            peer->ibp_accepting == 0) {
2475                retry = 1;
2476                peer->ibp_connecting++;
2477
2478                peer->ibp_version     = version;
2479                peer->ibp_incarnation = incarnation;
2480        }
2481
2482        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2483
2484        if (!retry)
2485                return;
2486
2487        switch (why) {
2488        default:
2489                reason = "Unknown";
2490                break;
2491
2492        case IBLND_REJECT_CONN_STALE:
2493                reason = "stale";
2494                break;
2495
2496        case IBLND_REJECT_CONN_RACE:
2497                reason = "conn race";
2498                break;
2499
2500        case IBLND_REJECT_CONN_UNCOMPAT:
2501                reason = "version negotiation";
2502                break;
2503        }
2504
2505        CNETERR("%s: retrying (%s), %x, %x, "
2506                "queue_dep: %d, max_frag: %d, msg_size: %d\n",
2507                libcfs_nid2str(peer->ibp_nid),
2508                reason, IBLND_MSG_VERSION, version,
2509                cp != NULL? cp->ibcp_queue_depth :IBLND_MSG_QUEUE_SIZE(version),
2510                cp != NULL? cp->ibcp_max_frags   : IBLND_RDMA_FRAGS(version),
2511                cp != NULL? cp->ibcp_max_msg_size: IBLND_MSG_SIZE);
2512
2513        kiblnd_connect_peer(peer);
2514}
2515
2516void
2517kiblnd_rejected (kib_conn_t *conn, int reason, void *priv, int priv_nob)
2518{
2519        kib_peer_t    *peer = conn->ibc_peer;
2520
2521        LASSERT (!in_interrupt());
2522        LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2523
2524        switch (reason) {
2525        case IB_CM_REJ_STALE_CONN:
2526                kiblnd_reconnect(conn, IBLND_MSG_VERSION, 0,
2527                                 IBLND_REJECT_CONN_STALE, NULL);
2528                break;
2529
2530        case IB_CM_REJ_INVALID_SERVICE_ID:
2531                CNETERR("%s rejected: no listener at %d\n",
2532                        libcfs_nid2str(peer->ibp_nid),
2533                        *kiblnd_tunables.kib_service);
2534                break;
2535
2536        case IB_CM_REJ_CONSUMER_DEFINED:
2537                if (priv_nob >= offsetof(kib_rej_t, ibr_padding)) {
2538                        kib_rej_t       *rej     = priv;
2539                        kib_connparams_t *cp      = NULL;
2540                        int            flip     = 0;
2541                        __u64        incarnation = -1;
2542
2543                        /* NB. default incarnation is -1 because:
2544                         * a) V1 will ignore dst incarnation in connreq.
2545                         * b) V2 will provide incarnation while rejecting me,
2546                         *    -1 will be overwrote.
2547                         *
2548                         * if I try to connect to a V1 peer with V2 protocol,
2549                         * it rejected me then upgrade to V2, I have no idea
2550                         * about the upgrading and try to reconnect with V1,
2551                         * in this case upgraded V2 can find out I'm trying to
2552                         * talk to the old guy and reject me(incarnation is -1).
2553                         */
2554
2555                        if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2556                            rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2557                                __swab32s(&rej->ibr_magic);
2558                                __swab16s(&rej->ibr_version);
2559                                flip = 1;
2560                        }
2561
2562                        if (priv_nob >= sizeof(kib_rej_t) &&
2563                            rej->ibr_version > IBLND_MSG_VERSION_1) {
2564                                /* priv_nob is always 148 in current version
2565                                 * of OFED, so we still need to check version.
2566                                 * (define of IB_CM_REJ_PRIVATE_DATA_SIZE) */
2567                                cp = &rej->ibr_cp;
2568
2569                                if (flip) {
2570                                        __swab64s(&rej->ibr_incarnation);
2571                                        __swab16s(&cp->ibcp_queue_depth);
2572                                        __swab16s(&cp->ibcp_max_frags);
2573                                        __swab32s(&cp->ibcp_max_msg_size);
2574                                }
2575
2576                                incarnation = rej->ibr_incarnation;
2577                        }
2578
2579                        if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2580                            rej->ibr_magic != LNET_PROTO_MAGIC) {
2581                                CERROR("%s rejected: consumer defined fatal error\n",
2582                                       libcfs_nid2str(peer->ibp_nid));
2583                                break;
2584                        }
2585
2586                        if (rej->ibr_version != IBLND_MSG_VERSION &&
2587                            rej->ibr_version != IBLND_MSG_VERSION_1) {
2588                                CERROR("%s rejected: o2iblnd version %x error\n",
2589                                       libcfs_nid2str(peer->ibp_nid),
2590                                       rej->ibr_version);
2591                                break;
2592                        }
2593
2594                        if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2595                            rej->ibr_version == IBLND_MSG_VERSION_1) {
2596                                CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2597                                       libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2598
2599                                if (conn->ibc_version != IBLND_MSG_VERSION_1)
2600                                        rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2601                        }
2602
2603                        switch (rej->ibr_why) {
2604                        case IBLND_REJECT_CONN_RACE:
2605                        case IBLND_REJECT_CONN_STALE:
2606                        case IBLND_REJECT_CONN_UNCOMPAT:
2607                                kiblnd_reconnect(conn, rej->ibr_version,
2608                                                 incarnation, rej->ibr_why, cp);
2609                                break;
2610
2611                        case IBLND_REJECT_MSG_QUEUE_SIZE:
2612                                CERROR("%s rejected: incompatible message queue depth %d, %d\n",
2613                                       libcfs_nid2str(peer->ibp_nid), cp->ibcp_queue_depth,
2614                                       IBLND_MSG_QUEUE_SIZE(conn->ibc_version));
2615                                break;
2616
2617                        case IBLND_REJECT_RDMA_FRAGS:
2618                                CERROR("%s rejected: incompatible # of RDMA fragments %d, %d\n",
2619                                       libcfs_nid2str(peer->ibp_nid), cp->ibcp_max_frags,
2620                                       IBLND_RDMA_FRAGS(conn->ibc_version));
2621                                break;
2622
2623                        case IBLND_REJECT_NO_RESOURCES:
2624                                CERROR("%s rejected: o2iblnd no resources\n",
2625                                       libcfs_nid2str(peer->ibp_nid));
2626                                break;
2627
2628                        case IBLND_REJECT_FATAL:
2629                                CERROR("%s rejected: o2iblnd fatal error\n",
2630                                       libcfs_nid2str(peer->ibp_nid));
2631                                break;
2632
2633                        default:
2634                                CERROR("%s rejected: o2iblnd reason %d\n",
2635                                       libcfs_nid2str(peer->ibp_nid),
2636                                       rej->ibr_why);
2637                                break;
2638                        }
2639                        break;
2640                }
2641                /* fall through */
2642        default:
2643                CNETERR("%s rejected: reason %d, size %d\n",
2644                        libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2645                break;
2646        }
2647
2648        kiblnd_connreq_done(conn, -ECONNREFUSED);
2649}
2650
2651void
2652kiblnd_check_connreply (kib_conn_t *conn, void *priv, int priv_nob)
2653{
2654        kib_peer_t    *peer = conn->ibc_peer;
2655        lnet_ni_t     *ni   = peer->ibp_ni;
2656        kib_net_t     *net  = ni->ni_data;
2657        kib_msg_t     *msg  = priv;
2658        int         ver  = conn->ibc_version;
2659        int         rc   = kiblnd_unpack_msg(msg, priv_nob);
2660        unsigned long  flags;
2661
2662        LASSERT (net != NULL);
2663
2664        if (rc != 0) {
2665                CERROR("Can't unpack connack from %s: %d\n",
2666                       libcfs_nid2str(peer->ibp_nid), rc);
2667                goto failed;
2668        }
2669
2670        if (msg->ibm_type != IBLND_MSG_CONNACK) {
2671                CERROR("Unexpected message %d from %s\n",
2672                       msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2673                rc = -EPROTO;
2674                goto failed;
2675        }
2676
2677        if (ver != msg->ibm_version) {
2678                CERROR("%s replied version %x is different with "
2679                       "requested version %x\n",
2680                       libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2681                rc = -EPROTO;
2682                goto failed;
2683        }
2684
2685        if (msg->ibm_u.connparams.ibcp_queue_depth !=
2686            IBLND_MSG_QUEUE_SIZE(ver)) {
2687                CERROR("%s has incompatible queue depth %d(%d wanted)\n",
2688                       libcfs_nid2str(peer->ibp_nid),
2689                       msg->ibm_u.connparams.ibcp_queue_depth,
2690                       IBLND_MSG_QUEUE_SIZE(ver));
2691                rc = -EPROTO;
2692                goto failed;
2693        }
2694
2695        if (msg->ibm_u.connparams.ibcp_max_frags !=
2696            IBLND_RDMA_FRAGS(ver)) {
2697                CERROR("%s has incompatible max_frags %d (%d wanted)\n",
2698                       libcfs_nid2str(peer->ibp_nid),
2699                       msg->ibm_u.connparams.ibcp_max_frags,
2700                       IBLND_RDMA_FRAGS(ver));
2701                rc = -EPROTO;
2702                goto failed;
2703        }
2704
2705        if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2706                CERROR("%s max message size %d too big (%d max)\n",
2707                       libcfs_nid2str(peer->ibp_nid),
2708                       msg->ibm_u.connparams.ibcp_max_msg_size,
2709                       IBLND_MSG_SIZE);
2710                rc = -EPROTO;
2711                goto failed;
2712        }
2713
2714        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2715        if (msg->ibm_dstnid == ni->ni_nid &&
2716            msg->ibm_dststamp == net->ibn_incarnation)
2717                rc = 0;
2718        else
2719                rc = -ESTALE;
2720        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2721
2722        if (rc != 0) {
2723                CERROR("Bad connection reply from %s, rc = %d, "
2724                       "version: %x max_frags: %d\n",
2725                       libcfs_nid2str(peer->ibp_nid), rc,
2726                       msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2727                goto failed;
2728        }
2729
2730        conn->ibc_incarnation      = msg->ibm_srcstamp;
2731        conn->ibc_credits         =
2732        conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE(ver);
2733        LASSERT (conn->ibc_credits + conn->ibc_reserved_credits + IBLND_OOB_MSGS(ver)
2734                 <= IBLND_RX_MSGS(ver));
2735
2736        kiblnd_connreq_done(conn, 0);
2737        return;
2738
2739 failed:
2740        /* NB My QP has already established itself, so I handle anything going
2741         * wrong here by setting ibc_comms_error.
2742         * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2743         * immediately tears it down. */
2744
2745        LASSERT (rc != 0);
2746        conn->ibc_comms_error = rc;
2747        kiblnd_connreq_done(conn, 0);
2748}
2749
2750int
2751kiblnd_active_connect (struct rdma_cm_id *cmid)
2752{
2753        kib_peer_t            *peer = (kib_peer_t *)cmid->context;
2754        kib_conn_t            *conn;
2755        kib_msg_t              *msg;
2756        struct rdma_conn_param   cp;
2757        int                   version;
2758        __u64               incarnation;
2759        unsigned long       flags;
2760        int                   rc;
2761
2762        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2763
2764        incarnation = peer->ibp_incarnation;
2765        version     = (peer->ibp_version == 0) ? IBLND_MSG_VERSION :
2766                                                 peer->ibp_version;
2767
2768        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2769
2770        conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT, version);
2771        if (conn == NULL) {
2772                kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2773                kiblnd_peer_decref(peer); /* lose cmid's ref */
2774                return -ENOMEM;
2775        }
2776
2777        /* conn "owns" cmid now, so I return success from here on to ensure the
2778         * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2779         * on peer */
2780
2781        msg = &conn->ibc_connvars->cv_msg;
2782
2783        memset(msg, 0, sizeof(*msg));
2784        kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2785        msg->ibm_u.connparams.ibcp_queue_depth  = IBLND_MSG_QUEUE_SIZE(version);
2786        msg->ibm_u.connparams.ibcp_max_frags    = IBLND_RDMA_FRAGS(version);
2787        msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2788
2789        kiblnd_pack_msg(peer->ibp_ni, msg, version,
2790                        0, peer->ibp_nid, incarnation);
2791
2792        memset(&cp, 0, sizeof(cp));
2793        cp.private_data = msg;
2794        cp.private_data_len    = msg->ibm_nob;
2795        cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2796        cp.initiator_depth     = 0;
2797        cp.flow_control = 1;
2798        cp.retry_count   = *kiblnd_tunables.kib_retry_count;
2799        cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2800
2801        LASSERT(cmid->context == (void *)conn);
2802        LASSERT(conn->ibc_cmid == cmid);
2803
2804        rc = rdma_connect(cmid, &cp);
2805        if (rc != 0) {
2806                CERROR("Can't connect to %s: %d\n",
2807                       libcfs_nid2str(peer->ibp_nid), rc);
2808                kiblnd_connreq_done(conn, rc);
2809                kiblnd_conn_decref(conn);
2810        }
2811
2812        return 0;
2813}
2814
2815int
2816kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2817{
2818        kib_peer_t  *peer;
2819        kib_conn_t  *conn;
2820        int       rc;
2821
2822        switch (event->event) {
2823        default:
2824                CERROR("Unexpected event: %d, status: %d\n",
2825                       event->event, event->status);
2826                LBUG();
2827
2828        case RDMA_CM_EVENT_CONNECT_REQUEST:
2829                /* destroy cmid on failure */
2830                rc = kiblnd_passive_connect(cmid,
2831                                            (void *)KIBLND_CONN_PARAM(event),
2832                                            KIBLND_CONN_PARAM_LEN(event));
2833                CDEBUG(D_NET, "connreq: %d\n", rc);
2834                return rc;
2835
2836        case RDMA_CM_EVENT_ADDR_ERROR:
2837                peer = (kib_peer_t *)cmid->context;
2838                CNETERR("%s: ADDR ERROR %d\n",
2839                       libcfs_nid2str(peer->ibp_nid), event->status);
2840                kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2841                kiblnd_peer_decref(peer);
2842                return -EHOSTUNREACH;      /* rc != 0 destroys cmid */
2843
2844        case RDMA_CM_EVENT_ADDR_RESOLVED:
2845                peer = (kib_peer_t *)cmid->context;
2846
2847                CDEBUG(D_NET,"%s Addr resolved: %d\n",
2848                       libcfs_nid2str(peer->ibp_nid), event->status);
2849
2850                if (event->status != 0) {
2851                        CNETERR("Can't resolve address for %s: %d\n",
2852                                libcfs_nid2str(peer->ibp_nid), event->status);
2853                        rc = event->status;
2854                } else {
2855                        rc = rdma_resolve_route(
2856                                cmid, *kiblnd_tunables.kib_timeout * 1000);
2857                        if (rc == 0)
2858                                return 0;
2859                        /* Can't initiate route resolution */
2860                        CERROR("Can't resolve route for %s: %d\n",
2861                               libcfs_nid2str(peer->ibp_nid), rc);
2862                }
2863                kiblnd_peer_connect_failed(peer, 1, rc);
2864                kiblnd_peer_decref(peer);
2865                return rc;                    /* rc != 0 destroys cmid */
2866
2867        case RDMA_CM_EVENT_ROUTE_ERROR:
2868                peer = (kib_peer_t *)cmid->context;
2869                CNETERR("%s: ROUTE ERROR %d\n",
2870                        libcfs_nid2str(peer->ibp_nid), event->status);
2871                kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2872                kiblnd_peer_decref(peer);
2873                return -EHOSTUNREACH;      /* rc != 0 destroys cmid */
2874
2875        case RDMA_CM_EVENT_ROUTE_RESOLVED:
2876                peer = (kib_peer_t *)cmid->context;
2877                CDEBUG(D_NET,"%s Route resolved: %d\n",
2878                       libcfs_nid2str(peer->ibp_nid), event->status);
2879
2880                if (event->status == 0)
2881                        return kiblnd_active_connect(cmid);
2882
2883                CNETERR("Can't resolve route for %s: %d\n",
2884                       libcfs_nid2str(peer->ibp_nid), event->status);
2885                kiblnd_peer_connect_failed(peer, 1, event->status);
2886                kiblnd_peer_decref(peer);
2887                return event->status;      /* rc != 0 destroys cmid */
2888
2889        case RDMA_CM_EVENT_UNREACHABLE:
2890                conn = (kib_conn_t *)cmid->context;
2891                LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2892                        conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2893                CNETERR("%s: UNREACHABLE %d\n",
2894                       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2895                kiblnd_connreq_done(conn, -ENETDOWN);
2896                kiblnd_conn_decref(conn);
2897                return 0;
2898
2899        case RDMA_CM_EVENT_CONNECT_ERROR:
2900                conn = (kib_conn_t *)cmid->context;
2901                LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2902                        conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2903                CNETERR("%s: CONNECT ERROR %d\n",
2904                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2905                kiblnd_connreq_done(conn, -ENOTCONN);
2906                kiblnd_conn_decref(conn);
2907                return 0;
2908
2909        case RDMA_CM_EVENT_REJECTED:
2910                conn = (kib_conn_t *)cmid->context;
2911                switch (conn->ibc_state) {
2912                default:
2913                        LBUG();
2914
2915                case IBLND_CONN_PASSIVE_WAIT:
2916                        CERROR ("%s: REJECTED %d\n",
2917                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
2918                                event->status);
2919                        kiblnd_connreq_done(conn, -ECONNRESET);
2920                        break;
2921
2922                case IBLND_CONN_ACTIVE_CONNECT:
2923                        kiblnd_rejected(conn, event->status,
2924                                        (void *)KIBLND_CONN_PARAM(event),
2925                                        KIBLND_CONN_PARAM_LEN(event));
2926                        break;
2927                }
2928                kiblnd_conn_decref(conn);
2929                return 0;
2930
2931        case RDMA_CM_EVENT_ESTABLISHED:
2932                conn = (kib_conn_t *)cmid->context;
2933                switch (conn->ibc_state) {
2934                default:
2935                        LBUG();
2936
2937                case IBLND_CONN_PASSIVE_WAIT:
2938                        CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
2939                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
2940                        kiblnd_connreq_done(conn, 0);
2941                        break;
2942
2943                case IBLND_CONN_ACTIVE_CONNECT:
2944                        CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
2945                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
2946                        kiblnd_check_connreply(conn,
2947                                               (void *)KIBLND_CONN_PARAM(event),
2948                                               KIBLND_CONN_PARAM_LEN(event));
2949                        break;
2950                }
2951                /* net keeps its ref on conn! */
2952                return 0;
2953
2954        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
2955                CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
2956                return 0;
2957        case RDMA_CM_EVENT_DISCONNECTED:
2958                conn = (kib_conn_t *)cmid->context;
2959                if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
2960                        CERROR("%s DISCONNECTED\n",
2961                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
2962                        kiblnd_connreq_done(conn, -ECONNRESET);
2963                } else {
2964                        kiblnd_close_conn(conn, 0);
2965                }
2966                kiblnd_conn_decref(conn);
2967                cmid->context = NULL;
2968                return 0;
2969
2970        case RDMA_CM_EVENT_DEVICE_REMOVAL:
2971                LCONSOLE_ERROR_MSG(0x131,
2972                                   "Received notification of device removal\n"
2973                                   "Please shutdown LNET to allow this to proceed\n");
2974                /* Can't remove network from underneath LNET for now, so I have
2975                 * to ignore this */
2976                return 0;
2977
2978        case RDMA_CM_EVENT_ADDR_CHANGE:
2979                LCONSOLE_INFO("Physical link changed (eg hca/port)\n");
2980                return 0;
2981        }
2982}
2983
2984static int
2985kiblnd_check_txs_locked(kib_conn_t *conn, struct list_head *txs)
2986{
2987        kib_tx_t          *tx;
2988        struct list_head        *ttmp;
2989
2990        list_for_each (ttmp, txs) {
2991                tx = list_entry (ttmp, kib_tx_t, tx_list);
2992
2993                if (txs != &conn->ibc_active_txs) {
2994                        LASSERT (tx->tx_queued);
2995                } else {
2996                        LASSERT (!tx->tx_queued);
2997                        LASSERT (tx->tx_waiting || tx->tx_sending != 0);
2998                }
2999
3000                if (cfs_time_aftereq (jiffies, tx->tx_deadline)) {
3001                        CERROR("Timed out tx: %s, %lu seconds\n",
3002                               kiblnd_queue2str(conn, txs),
3003                               cfs_duration_sec(jiffies - tx->tx_deadline));
3004                        return 1;
3005                }
3006        }
3007
3008        return 0;
3009}
3010
3011static int
3012kiblnd_conn_timed_out_locked(kib_conn_t *conn)
3013{
3014        return  kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue) ||
3015                kiblnd_check_txs_locked(conn, &conn->ibc_tx_noops) ||
3016                kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_rsrvd) ||
3017                kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_nocred) ||
3018                kiblnd_check_txs_locked(conn, &conn->ibc_active_txs);
3019}
3020
3021void
3022kiblnd_check_conns (int idx)
3023{
3024        LIST_HEAD (closes);
3025        LIST_HEAD (checksends);
3026        struct list_head    *peers = &kiblnd_data.kib_peers[idx];
3027        struct list_head    *ptmp;
3028        kib_peer_t    *peer;
3029        kib_conn_t    *conn;
3030        struct list_head    *ctmp;
3031        unsigned long  flags;
3032
3033        /* NB. We expect to have a look at all the peers and not find any
3034         * RDMAs to time out, so we just use a shared lock while we
3035         * take a look... */
3036        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3037
3038        list_for_each (ptmp, peers) {
3039                peer = list_entry (ptmp, kib_peer_t, ibp_list);
3040
3041                list_for_each (ctmp, &peer->ibp_conns) {
3042                        int timedout;
3043                        int sendnoop;
3044
3045                        conn = list_entry(ctmp, kib_conn_t, ibc_list);
3046
3047                        LASSERT (conn->ibc_state == IBLND_CONN_ESTABLISHED);
3048
3049                        spin_lock(&conn->ibc_lock);
3050
3051                        sendnoop = kiblnd_need_noop(conn);
3052                        timedout = kiblnd_conn_timed_out_locked(conn);
3053                        if (!sendnoop && !timedout) {
3054                                spin_unlock(&conn->ibc_lock);
3055                                continue;
3056                        }
3057
3058                        if (timedout) {
3059                                CERROR("Timed out RDMA with %s (%lu): "
3060                                       "c: %u, oc: %u, rc: %u\n",
3061                                       libcfs_nid2str(peer->ibp_nid),
3062                                       cfs_duration_sec(cfs_time_current() -
3063                                                        peer->ibp_last_alive),
3064                                       conn->ibc_credits,
3065                                       conn->ibc_outstanding_credits,
3066                                       conn->ibc_reserved_credits);
3067                                list_add(&conn->ibc_connd_list, &closes);
3068                        } else {
3069                                list_add(&conn->ibc_connd_list,
3070                                             &checksends);
3071                        }
3072                        /* +ref for 'closes' or 'checksends' */
3073                        kiblnd_conn_addref(conn);
3074
3075                        spin_unlock(&conn->ibc_lock);
3076                }
3077        }
3078
3079        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3080
3081        /* Handle timeout by closing the whole
3082         * connection. We can only be sure RDMA activity
3083         * has ceased once the QP has been modified. */
3084        while (!list_empty(&closes)) {
3085                conn = list_entry(closes.next,
3086                                      kib_conn_t, ibc_connd_list);
3087                list_del(&conn->ibc_connd_list);
3088                kiblnd_close_conn(conn, -ETIMEDOUT);
3089                kiblnd_conn_decref(conn);
3090        }
3091
3092        /* In case we have enough credits to return via a
3093         * NOOP, but there were no non-blocking tx descs
3094         * free to do it last time... */
3095        while (!list_empty(&checksends)) {
3096                conn = list_entry(checksends.next,
3097                                      kib_conn_t, ibc_connd_list);
3098                list_del(&conn->ibc_connd_list);
3099                kiblnd_check_sends(conn);
3100                kiblnd_conn_decref(conn);
3101        }
3102}
3103
3104void
3105kiblnd_disconnect_conn (kib_conn_t *conn)
3106{
3107        LASSERT (!in_interrupt());
3108        LASSERT (current == kiblnd_data.kib_connd);
3109        LASSERT (conn->ibc_state == IBLND_CONN_CLOSING);
3110
3111        rdma_disconnect(conn->ibc_cmid);
3112        kiblnd_finalise_conn(conn);
3113
3114        kiblnd_peer_notify(conn->ibc_peer);
3115}
3116
3117int
3118kiblnd_connd (void *arg)
3119{
3120        wait_queue_t     wait;
3121        unsigned long      flags;
3122        kib_conn_t      *conn;
3123        int             timeout;
3124        int             i;
3125        int             dropped_lock;
3126        int             peer_index = 0;
3127        unsigned long      deadline = jiffies;
3128
3129        cfs_block_allsigs ();
3130
3131        init_waitqueue_entry_current (&wait);
3132        kiblnd_data.kib_connd = current;
3133
3134        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3135
3136        while (!kiblnd_data.kib_shutdown) {
3137
3138                dropped_lock = 0;
3139
3140                if (!list_empty (&kiblnd_data.kib_connd_zombies)) {
3141                        conn = list_entry(kiblnd_data. \
3142                                              kib_connd_zombies.next,
3143                                              kib_conn_t, ibc_list);
3144                        list_del(&conn->ibc_list);
3145
3146                        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock,
3147                                               flags);
3148                        dropped_lock = 1;
3149
3150                        kiblnd_destroy_conn(conn);
3151
3152                        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3153                }
3154
3155                if (!list_empty(&kiblnd_data.kib_connd_conns)) {
3156                        conn = list_entry(kiblnd_data.kib_connd_conns.next,
3157                                              kib_conn_t, ibc_list);
3158                        list_del(&conn->ibc_list);
3159
3160                        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock,
3161                                               flags);
3162                        dropped_lock = 1;
3163
3164                        kiblnd_disconnect_conn(conn);
3165                        kiblnd_conn_decref(conn);
3166
3167                        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3168                }
3169
3170                /* careful with the jiffy wrap... */
3171                timeout = (int)(deadline - jiffies);
3172                if (timeout <= 0) {
3173                        const int n = 4;
3174                        const int p = 1;
3175                        int       chunk = kiblnd_data.kib_peer_hash_size;
3176
3177                        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3178                        dropped_lock = 1;
3179
3180                        /* Time to check for RDMA timeouts on a few more
3181                         * peers: I do checks every 'p' seconds on a
3182                         * proportion of the peer table and I need to check
3183                         * every connection 'n' times within a timeout
3184                         * interval, to ensure I detect a timeout on any
3185                         * connection within (n+1)/n times the timeout
3186                         * interval. */
3187
3188                        if (*kiblnd_tunables.kib_timeout > n * p)
3189                                chunk = (chunk * n * p) /
3190                                        *kiblnd_tunables.kib_timeout;
3191                        if (chunk == 0)
3192                                chunk = 1;
3193
3194                        for (i = 0; i < chunk; i++) {
3195                                kiblnd_check_conns(peer_index);
3196                                peer_index = (peer_index + 1) %
3197                                             kiblnd_data.kib_peer_hash_size;
3198                        }
3199
3200                        deadline += p * HZ;
3201                        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3202                }
3203
3204                if (dropped_lock)
3205                        continue;
3206
3207                /* Nothing to do for 'timeout'  */
3208                set_current_state(TASK_INTERRUPTIBLE);
3209                add_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3210                spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3211
3212                waitq_timedwait(&wait, TASK_INTERRUPTIBLE, timeout);
3213
3214                set_current_state(TASK_RUNNING);
3215                remove_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3216                spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3217        }
3218
3219        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3220
3221        kiblnd_thread_fini();
3222        return 0;
3223}
3224
3225void
3226kiblnd_qp_event(struct ib_event *event, void *arg)
3227{
3228        kib_conn_t *conn = arg;
3229
3230        switch (event->event) {
3231        case IB_EVENT_COMM_EST:
3232                CDEBUG(D_NET, "%s established\n",
3233                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
3234                return;
3235
3236        default:
3237                CERROR("%s: Async QP event type %d\n",
3238                       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3239                return;
3240        }
3241}
3242
3243void
3244kiblnd_complete (struct ib_wc *wc)
3245{
3246        switch (kiblnd_wreqid2type(wc->wr_id)) {
3247        default:
3248                LBUG();
3249
3250        case IBLND_WID_RDMA:
3251                /* We only get RDMA completion notification if it fails.  All
3252                 * subsequent work items, including the final SEND will fail
3253                 * too.  However we can't print out any more info about the
3254                 * failing RDMA because 'tx' might be back on the idle list or
3255                 * even reused already if we didn't manage to post all our work
3256                 * items */
3257                CNETERR("RDMA (tx: %p) failed: %d\n",
3258                        kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3259                return;
3260
3261        case IBLND_WID_TX:
3262                kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3263                return;
3264
3265        case IBLND_WID_RX:
3266                kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3267                                   wc->byte_len);
3268                return;
3269        }
3270}
3271
3272void
3273kiblnd_cq_completion(struct ib_cq *cq, void *arg)
3274{
3275        /* NB I'm not allowed to schedule this conn once its refcount has
3276         * reached 0.  Since fundamentally I'm racing with scheduler threads
3277         * consuming my CQ I could be called after all completions have
3278         * occurred.  But in this case, ibc_nrx == 0 && ibc_nsends_posted == 0
3279         * and this CQ is about to be destroyed so I NOOP. */
3280        kib_conn_t              *conn = (kib_conn_t *)arg;
3281        struct kib_sched_info   *sched = conn->ibc_sched;
3282        unsigned long           flags;
3283
3284        LASSERT(cq == conn->ibc_cq);
3285
3286        spin_lock_irqsave(&sched->ibs_lock, flags);
3287
3288        conn->ibc_ready = 1;
3289
3290        if (!conn->ibc_scheduled &&
3291            (conn->ibc_nrx > 0 ||
3292             conn->ibc_nsends_posted > 0)) {
3293                kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3294                conn->ibc_scheduled = 1;
3295                list_add_tail(&conn->ibc_sched_list, &sched->ibs_conns);
3296
3297                if (waitqueue_active(&sched->ibs_waitq))
3298                        wake_up(&sched->ibs_waitq);
3299        }
3300
3301        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3302}
3303
3304void
3305kiblnd_cq_event(struct ib_event *event, void *arg)
3306{
3307        kib_conn_t *conn = arg;
3308
3309        CERROR("%s: async CQ event type %d\n",
3310               libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3311}
3312
3313int
3314kiblnd_scheduler(void *arg)
3315{
3316        long                    id = (long)arg;
3317        struct kib_sched_info   *sched;
3318        kib_conn_t              *conn;
3319        wait_queue_t            wait;
3320        unsigned long           flags;
3321        struct ib_wc            wc;
3322        int                     did_something;
3323        int                     busy_loops = 0;
3324        int                     rc;
3325
3326        cfs_block_allsigs();
3327
3328        init_waitqueue_entry_current(&wait);
3329
3330        sched = kiblnd_data.kib_scheds[KIB_THREAD_CPT(id)];
3331
3332        rc = cfs_cpt_bind(lnet_cpt_table(), sched->ibs_cpt);
3333        if (rc != 0) {
3334                CWARN("Failed to bind on CPT %d, please verify whether "
3335                      "all CPUs are healthy and reload modules if necessary, "
3336                      "otherwise your system might under risk of low "
3337                      "performance\n", sched->ibs_cpt);
3338        }
3339
3340        spin_lock_irqsave(&sched->ibs_lock, flags);
3341
3342        while (!kiblnd_data.kib_shutdown) {
3343                if (busy_loops++ >= IBLND_RESCHED) {
3344                        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3345
3346                        cond_resched();
3347                        busy_loops = 0;
3348
3349                        spin_lock_irqsave(&sched->ibs_lock, flags);
3350                }
3351
3352                did_something = 0;
3353
3354                if (!list_empty(&sched->ibs_conns)) {
3355                        conn = list_entry(sched->ibs_conns.next,
3356                                              kib_conn_t, ibc_sched_list);
3357                        /* take over kib_sched_conns' ref on conn... */
3358                        LASSERT(conn->ibc_scheduled);
3359                        list_del(&conn->ibc_sched_list);
3360                        conn->ibc_ready = 0;
3361
3362                        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3363
3364                        rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3365                        if (rc == 0) {
3366                                rc = ib_req_notify_cq(conn->ibc_cq,
3367                                                      IB_CQ_NEXT_COMP);
3368                                if (rc < 0) {
3369                                        CWARN("%s: ib_req_notify_cq failed: %d, "
3370                                              "closing connection\n",
3371                                              libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3372                                        kiblnd_close_conn(conn, -EIO);
3373                                        kiblnd_conn_decref(conn);
3374                                        spin_lock_irqsave(&sched->ibs_lock,
3375                                                              flags);
3376                                        continue;
3377                                }
3378
3379                                rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3380                        }
3381
3382                        if (rc < 0) {
3383                                CWARN("%s: ib_poll_cq failed: %d, "
3384                                      "closing connection\n",
3385                                      libcfs_nid2str(conn->ibc_peer->ibp_nid),
3386                                      rc);
3387                                kiblnd_close_conn(conn, -EIO);
3388                                kiblnd_conn_decref(conn);
3389                                spin_lock_irqsave(&sched->ibs_lock, flags);
3390                                continue;
3391                        }
3392
3393                        spin_lock_irqsave(&sched->ibs_lock, flags);
3394
3395                        if (rc != 0 || conn->ibc_ready) {
3396                                /* There may be another completion waiting; get
3397                                 * another scheduler to check while I handle
3398                                 * this one... */
3399                                /* +1 ref for sched_conns */
3400                                kiblnd_conn_addref(conn);
3401                                list_add_tail(&conn->ibc_sched_list,
3402                                                  &sched->ibs_conns);
3403                                if (waitqueue_active(&sched->ibs_waitq))
3404                                        wake_up(&sched->ibs_waitq);
3405                        } else {
3406                                conn->ibc_scheduled = 0;
3407                        }
3408
3409                        if (rc != 0) {
3410                                spin_unlock_irqrestore(&sched->ibs_lock, flags);
3411                                kiblnd_complete(&wc);
3412
3413                                spin_lock_irqsave(&sched->ibs_lock, flags);
3414                        }
3415
3416                        kiblnd_conn_decref(conn); /* ...drop my ref from above */
3417                        did_something = 1;
3418                }
3419
3420                if (did_something)
3421                        continue;
3422
3423                set_current_state(TASK_INTERRUPTIBLE);
3424                add_wait_queue_exclusive(&sched->ibs_waitq, &wait);
3425                spin_unlock_irqrestore(&sched->ibs_lock, flags);
3426
3427                waitq_wait(&wait, TASK_INTERRUPTIBLE);
3428                busy_loops = 0;
3429
3430                remove_wait_queue(&sched->ibs_waitq, &wait);
3431                set_current_state(TASK_RUNNING);
3432                spin_lock_irqsave(&sched->ibs_lock, flags);
3433        }
3434
3435        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3436
3437        kiblnd_thread_fini();
3438        return 0;
3439}
3440
3441int
3442kiblnd_failover_thread(void *arg)
3443{
3444        rwlock_t                *glock = &kiblnd_data.kib_global_lock;
3445        kib_dev_t        *dev;
3446        wait_queue_t     wait;
3447        unsigned long      flags;
3448        int             rc;
3449
3450        LASSERT (*kiblnd_tunables.kib_dev_failover != 0);
3451
3452        cfs_block_allsigs ();
3453
3454        init_waitqueue_entry_current(&wait);
3455        write_lock_irqsave(glock, flags);
3456
3457        while (!kiblnd_data.kib_shutdown) {
3458                int     do_failover = 0;
3459                int     long_sleep;
3460
3461                list_for_each_entry(dev, &kiblnd_data.kib_failed_devs,
3462                                    ibd_fail_list) {
3463                        if (cfs_time_before(cfs_time_current(),
3464                                            dev->ibd_next_failover))
3465                                continue;
3466                        do_failover = 1;
3467                        break;
3468                }
3469
3470                if (do_failover) {
3471                        list_del_init(&dev->ibd_fail_list);
3472                        dev->ibd_failover = 1;
3473                        write_unlock_irqrestore(glock, flags);
3474
3475                        rc = kiblnd_dev_failover(dev);
3476
3477                        write_lock_irqsave(glock, flags);
3478
3479                        LASSERT (dev->ibd_failover);
3480                        dev->ibd_failover = 0;
3481                        if (rc >= 0) { /* Device is OK or failover succeed */
3482                                dev->ibd_next_failover = cfs_time_shift(3);
3483                                continue;
3484                        }
3485
3486                        /* failed to failover, retry later */
3487                        dev->ibd_next_failover =
3488                                cfs_time_shift(min(dev->ibd_failed_failover, 10));
3489                        if (kiblnd_dev_can_failover(dev)) {
3490                                list_add_tail(&dev->ibd_fail_list,
3491                                              &kiblnd_data.kib_failed_devs);
3492                        }
3493
3494                        continue;
3495                }
3496
3497                /* long sleep if no more pending failover */
3498                long_sleep = list_empty(&kiblnd_data.kib_failed_devs);
3499
3500                set_current_state(TASK_INTERRUPTIBLE);
3501                add_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3502                write_unlock_irqrestore(glock, flags);
3503
3504                rc = schedule_timeout(long_sleep ? cfs_time_seconds(10) :
3505                                                   cfs_time_seconds(1));
3506                set_current_state(TASK_RUNNING);
3507                remove_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3508                write_lock_irqsave(glock, flags);
3509
3510                if (!long_sleep || rc != 0)
3511                        continue;
3512
3513                /* have a long sleep, routine check all active devices,
3514                 * we need checking like this because if there is not active
3515                 * connection on the dev and no SEND from local, we may listen
3516                 * on wrong HCA for ever while there is a bonding failover */
3517                list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3518                        if (kiblnd_dev_can_failover(dev)) {
3519                                list_add_tail(&dev->ibd_fail_list,
3520                                              &kiblnd_data.kib_failed_devs);
3521                        }
3522                }
3523        }
3524
3525        write_unlock_irqrestore(glock, flags);
3526
3527        kiblnd_thread_fini();
3528        return 0;
3529}
3530