linux/drivers/staging/lustre/lnet/klnds/o2iblnd/o2iblnd_cb.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lnet/klnds/o2iblnd/o2iblnd_cb.c
  37 *
  38 * Author: Eric Barton <eric@bartonsoftware.com>
  39 */
  40
  41#include "o2iblnd.h"
  42
  43static void
  44kiblnd_tx_done (lnet_ni_t *ni, kib_tx_t *tx)
  45{
  46        lnet_msg_t *lntmsg[2];
  47        kib_net_t  *net = ni->ni_data;
  48        int      rc;
  49        int      i;
  50
  51        LASSERT (net != NULL);
  52        LASSERT (!in_interrupt());
  53        LASSERT (!tx->tx_queued);              /* mustn't be queued for sending */
  54        LASSERT (tx->tx_sending == 0);    /* mustn't be awaiting sent callback */
  55        LASSERT (!tx->tx_waiting);            /* mustn't be awaiting peer response */
  56        LASSERT (tx->tx_pool != NULL);
  57
  58        kiblnd_unmap_tx(ni, tx);
  59
  60        /* tx may have up to 2 lnet msgs to finalise */
  61        lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
  62        lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
  63        rc = tx->tx_status;
  64
  65        if (tx->tx_conn != NULL) {
  66                LASSERT (ni == tx->tx_conn->ibc_peer->ibp_ni);
  67
  68                kiblnd_conn_decref(tx->tx_conn);
  69                tx->tx_conn = NULL;
  70        }
  71
  72        tx->tx_nwrq = 0;
  73        tx->tx_status = 0;
  74
  75        kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
  76
  77        /* delay finalize until my descs have been freed */
  78        for (i = 0; i < 2; i++) {
  79                if (lntmsg[i] == NULL)
  80                        continue;
  81
  82                lnet_finalize(ni, lntmsg[i], rc);
  83        }
  84}
  85
  86void
  87kiblnd_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int status)
  88{
  89        kib_tx_t *tx;
  90
  91        while (!list_empty (txlist)) {
  92                tx = list_entry (txlist->next, kib_tx_t, tx_list);
  93
  94                list_del(&tx->tx_list);
  95                /* complete now */
  96                tx->tx_waiting = 0;
  97                tx->tx_status = status;
  98                kiblnd_tx_done(ni, tx);
  99        }
 100}
 101
 102static kib_tx_t *
 103kiblnd_get_idle_tx(lnet_ni_t *ni, lnet_nid_t target)
 104{
 105        kib_net_t               *net = (kib_net_t *)ni->ni_data;
 106        struct list_head                *node;
 107        kib_tx_t                *tx;
 108        kib_tx_poolset_t        *tps;
 109
 110        tps = net->ibn_tx_ps[lnet_cpt_of_nid(target)];
 111        node = kiblnd_pool_alloc_node(&tps->tps_poolset);
 112        if (node == NULL)
 113                return NULL;
 114        tx = container_of(node, kib_tx_t, tx_list);
 115
 116        LASSERT (tx->tx_nwrq == 0);
 117        LASSERT (!tx->tx_queued);
 118        LASSERT (tx->tx_sending == 0);
 119        LASSERT (!tx->tx_waiting);
 120        LASSERT (tx->tx_status == 0);
 121        LASSERT (tx->tx_conn == NULL);
 122        LASSERT (tx->tx_lntmsg[0] == NULL);
 123        LASSERT (tx->tx_lntmsg[1] == NULL);
 124        LASSERT (tx->tx_u.pmr == NULL);
 125        LASSERT (tx->tx_nfrags == 0);
 126
 127        return tx;
 128}
 129
 130static void
 131kiblnd_drop_rx(kib_rx_t *rx)
 132{
 133        kib_conn_t              *conn   = rx->rx_conn;
 134        struct kib_sched_info   *sched  = conn->ibc_sched;
 135        unsigned long           flags;
 136
 137        spin_lock_irqsave(&sched->ibs_lock, flags);
 138        LASSERT(conn->ibc_nrx > 0);
 139        conn->ibc_nrx--;
 140        spin_unlock_irqrestore(&sched->ibs_lock, flags);
 141
 142        kiblnd_conn_decref(conn);
 143}
 144
 145int
 146kiblnd_post_rx (kib_rx_t *rx, int credit)
 147{
 148        kib_conn_t       *conn = rx->rx_conn;
 149        kib_net_t         *net = conn->ibc_peer->ibp_ni->ni_data;
 150        struct ib_recv_wr  *bad_wrq = NULL;
 151        struct ib_mr       *mr;
 152        int              rc;
 153
 154        LASSERT (net != NULL);
 155        LASSERT (!in_interrupt());
 156        LASSERT (credit == IBLND_POSTRX_NO_CREDIT ||
 157                 credit == IBLND_POSTRX_PEER_CREDIT ||
 158                 credit == IBLND_POSTRX_RSRVD_CREDIT);
 159
 160        mr = kiblnd_find_dma_mr(conn->ibc_hdev, rx->rx_msgaddr, IBLND_MSG_SIZE);
 161        LASSERT (mr != NULL);
 162
 163        rx->rx_sge.lkey   = mr->lkey;
 164        rx->rx_sge.addr   = rx->rx_msgaddr;
 165        rx->rx_sge.length = IBLND_MSG_SIZE;
 166
 167        rx->rx_wrq.next = NULL;
 168        rx->rx_wrq.sg_list = &rx->rx_sge;
 169        rx->rx_wrq.num_sge = 1;
 170        rx->rx_wrq.wr_id = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
 171
 172        LASSERT (conn->ibc_state >= IBLND_CONN_INIT);
 173        LASSERT (rx->rx_nob >= 0);            /* not posted */
 174
 175        if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
 176                kiblnd_drop_rx(rx);          /* No more posts for this rx */
 177                return 0;
 178        }
 179
 180        rx->rx_nob = -1;                        /* flag posted */
 181
 182        rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
 183        if (rc != 0) {
 184                CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
 185                       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
 186                rx->rx_nob = 0;
 187        }
 188
 189        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
 190                return rc;
 191
 192        if (rc != 0) {
 193                kiblnd_close_conn(conn, rc);
 194                kiblnd_drop_rx(rx);          /* No more posts for this rx */
 195                return rc;
 196        }
 197
 198        if (credit == IBLND_POSTRX_NO_CREDIT)
 199                return 0;
 200
 201        spin_lock(&conn->ibc_lock);
 202        if (credit == IBLND_POSTRX_PEER_CREDIT)
 203                conn->ibc_outstanding_credits++;
 204        else
 205                conn->ibc_reserved_credits++;
 206        spin_unlock(&conn->ibc_lock);
 207
 208        kiblnd_check_sends(conn);
 209        return 0;
 210}
 211
 212static kib_tx_t *
 213kiblnd_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
 214{
 215        struct list_head   *tmp;
 216
 217        list_for_each(tmp, &conn->ibc_active_txs) {
 218                kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
 219
 220                LASSERT (!tx->tx_queued);
 221                LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
 222
 223                if (tx->tx_cookie != cookie)
 224                        continue;
 225
 226                if (tx->tx_waiting &&
 227                    tx->tx_msg->ibm_type == txtype)
 228                        return tx;
 229
 230                CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
 231                      tx->tx_waiting ? "" : "NOT ",
 232                      tx->tx_msg->ibm_type, txtype);
 233        }
 234        return NULL;
 235}
 236
 237static void
 238kiblnd_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
 239{
 240        kib_tx_t    *tx;
 241        lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
 242        int       idle;
 243
 244        spin_lock(&conn->ibc_lock);
 245
 246        tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
 247        if (tx == NULL) {
 248                spin_unlock(&conn->ibc_lock);
 249
 250                CWARN("Unmatched completion type %x cookie %#llx from %s\n",
 251                      txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 252                kiblnd_close_conn(conn, -EPROTO);
 253                return;
 254        }
 255
 256        if (tx->tx_status == 0) {              /* success so far */
 257                if (status < 0) {              /* failed? */
 258                        tx->tx_status = status;
 259                } else if (txtype == IBLND_MSG_GET_REQ) {
 260                        lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
 261                }
 262        }
 263
 264        tx->tx_waiting = 0;
 265
 266        idle = !tx->tx_queued && (tx->tx_sending == 0);
 267        if (idle)
 268                list_del(&tx->tx_list);
 269
 270        spin_unlock(&conn->ibc_lock);
 271
 272        if (idle)
 273                kiblnd_tx_done(ni, tx);
 274}
 275
 276static void
 277kiblnd_send_completion(kib_conn_t *conn, int type, int status, __u64 cookie)
 278{
 279        lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
 280        kib_tx_t    *tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
 281
 282        if (tx == NULL) {
 283                CERROR("Can't get tx for completion %x for %s\n",
 284                       type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 285                return;
 286        }
 287
 288        tx->tx_msg->ibm_u.completion.ibcm_status = status;
 289        tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
 290        kiblnd_init_tx_msg(ni, tx, type, sizeof(kib_completion_msg_t));
 291
 292        kiblnd_queue_tx(tx, conn);
 293}
 294
 295static void
 296kiblnd_handle_rx (kib_rx_t *rx)
 297{
 298        kib_msg_t    *msg = rx->rx_msg;
 299        kib_conn_t   *conn = rx->rx_conn;
 300        lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
 301        int        credits = msg->ibm_credits;
 302        kib_tx_t     *tx;
 303        int        rc = 0;
 304        int        rc2;
 305        int        post_credit;
 306
 307        LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
 308
 309        CDEBUG (D_NET, "Received %x[%d] from %s\n",
 310                msg->ibm_type, credits,
 311                libcfs_nid2str(conn->ibc_peer->ibp_nid));
 312
 313        if (credits != 0) {
 314                /* Have I received credits that will let me send? */
 315                spin_lock(&conn->ibc_lock);
 316
 317                if (conn->ibc_credits + credits >
 318                    IBLND_MSG_QUEUE_SIZE(conn->ibc_version)) {
 319                        rc2 = conn->ibc_credits;
 320                        spin_unlock(&conn->ibc_lock);
 321
 322                        CERROR("Bad credits from %s: %d + %d > %d\n",
 323                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
 324                               rc2, credits,
 325                               IBLND_MSG_QUEUE_SIZE(conn->ibc_version));
 326
 327                        kiblnd_close_conn(conn, -EPROTO);
 328                        kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
 329                        return;
 330                }
 331
 332                conn->ibc_credits += credits;
 333
 334                /* This ensures the credit taken by NOOP can be returned */
 335                if (msg->ibm_type == IBLND_MSG_NOOP &&
 336                    !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
 337                        conn->ibc_outstanding_credits++;
 338
 339                spin_unlock(&conn->ibc_lock);
 340                kiblnd_check_sends(conn);
 341        }
 342
 343        switch (msg->ibm_type) {
 344        default:
 345                CERROR("Bad IBLND message type %x from %s\n",
 346                       msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 347                post_credit = IBLND_POSTRX_NO_CREDIT;
 348                rc = -EPROTO;
 349                break;
 350
 351        case IBLND_MSG_NOOP:
 352                if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
 353                        post_credit = IBLND_POSTRX_NO_CREDIT;
 354                        break;
 355                }
 356
 357                if (credits != 0) /* credit already posted */
 358                        post_credit = IBLND_POSTRX_NO_CREDIT;
 359                else          /* a keepalive NOOP */
 360                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 361                break;
 362
 363        case IBLND_MSG_IMMEDIATE:
 364                post_credit = IBLND_POSTRX_DONT_POST;
 365                rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
 366                                msg->ibm_srcnid, rx, 0);
 367                if (rc < 0)                  /* repost on error */
 368                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 369                break;
 370
 371        case IBLND_MSG_PUT_REQ:
 372                post_credit = IBLND_POSTRX_DONT_POST;
 373                rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
 374                                msg->ibm_srcnid, rx, 1);
 375                if (rc < 0)                  /* repost on error */
 376                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 377                break;
 378
 379        case IBLND_MSG_PUT_NAK:
 380                CWARN ("PUT_NACK from %s\n",
 381                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
 382                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 383                kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
 384                                         msg->ibm_u.completion.ibcm_status,
 385                                         msg->ibm_u.completion.ibcm_cookie);
 386                break;
 387
 388        case IBLND_MSG_PUT_ACK:
 389                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 390
 391                spin_lock(&conn->ibc_lock);
 392                tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
 393                                        msg->ibm_u.putack.ibpam_src_cookie);
 394                if (tx != NULL)
 395                        list_del(&tx->tx_list);
 396                spin_unlock(&conn->ibc_lock);
 397
 398                if (tx == NULL) {
 399                        CERROR("Unmatched PUT_ACK from %s\n",
 400                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
 401                        rc = -EPROTO;
 402                        break;
 403                }
 404
 405                LASSERT (tx->tx_waiting);
 406                /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
 407                 * (a) I can overwrite tx_msg since my peer has received it!
 408                 * (b) tx_waiting set tells tx_complete() it's not done. */
 409
 410                tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
 411
 412                rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
 413                                       kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
 414                                       &msg->ibm_u.putack.ibpam_rd,
 415                                       msg->ibm_u.putack.ibpam_dst_cookie);
 416                if (rc2 < 0)
 417                        CERROR("Can't setup rdma for PUT to %s: %d\n",
 418                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
 419
 420                spin_lock(&conn->ibc_lock);
 421                tx->tx_waiting = 0;     /* clear waiting and queue atomically */
 422                kiblnd_queue_tx_locked(tx, conn);
 423                spin_unlock(&conn->ibc_lock);
 424                break;
 425
 426        case IBLND_MSG_PUT_DONE:
 427                post_credit = IBLND_POSTRX_PEER_CREDIT;
 428                kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
 429                                         msg->ibm_u.completion.ibcm_status,
 430                                         msg->ibm_u.completion.ibcm_cookie);
 431                break;
 432
 433        case IBLND_MSG_GET_REQ:
 434                post_credit = IBLND_POSTRX_DONT_POST;
 435                rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
 436                                msg->ibm_srcnid, rx, 1);
 437                if (rc < 0)                  /* repost on error */
 438                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 439                break;
 440
 441        case IBLND_MSG_GET_DONE:
 442                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 443                kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
 444                                         msg->ibm_u.completion.ibcm_status,
 445                                         msg->ibm_u.completion.ibcm_cookie);
 446                break;
 447        }
 448
 449        if (rc < 0)                          /* protocol error */
 450                kiblnd_close_conn(conn, rc);
 451
 452        if (post_credit != IBLND_POSTRX_DONT_POST)
 453                kiblnd_post_rx(rx, post_credit);
 454}
 455
 456static void
 457kiblnd_rx_complete (kib_rx_t *rx, int status, int nob)
 458{
 459        kib_msg_t    *msg = rx->rx_msg;
 460        kib_conn_t   *conn = rx->rx_conn;
 461        lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
 462        kib_net_t    *net = ni->ni_data;
 463        int        rc;
 464        int        err = -EIO;
 465
 466        LASSERT (net != NULL);
 467        LASSERT (rx->rx_nob < 0);              /* was posted */
 468        rx->rx_nob = 0;                  /* isn't now */
 469
 470        if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
 471                goto ignore;
 472
 473        if (status != IB_WC_SUCCESS) {
 474                CNETERR("Rx from %s failed: %d\n",
 475                        libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
 476                goto failed;
 477        }
 478
 479        LASSERT (nob >= 0);
 480        rx->rx_nob = nob;
 481
 482        rc = kiblnd_unpack_msg(msg, rx->rx_nob);
 483        if (rc != 0) {
 484                CERROR ("Error %d unpacking rx from %s\n",
 485                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 486                goto failed;
 487        }
 488
 489        if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
 490            msg->ibm_dstnid != ni->ni_nid ||
 491            msg->ibm_srcstamp != conn->ibc_incarnation ||
 492            msg->ibm_dststamp != net->ibn_incarnation) {
 493                CERROR ("Stale rx from %s\n",
 494                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
 495                err = -ESTALE;
 496                goto failed;
 497        }
 498
 499        /* set time last known alive */
 500        kiblnd_peer_alive(conn->ibc_peer);
 501
 502        /* racing with connection establishment/teardown! */
 503
 504        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 505                rwlock_t  *g_lock = &kiblnd_data.kib_global_lock;
 506                unsigned long  flags;
 507
 508                write_lock_irqsave(g_lock, flags);
 509                /* must check holding global lock to eliminate race */
 510                if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 511                        list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
 512                        write_unlock_irqrestore(g_lock, flags);
 513                        return;
 514                }
 515                write_unlock_irqrestore(g_lock, flags);
 516        }
 517        kiblnd_handle_rx(rx);
 518        return;
 519
 520 failed:
 521        CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
 522        kiblnd_close_conn(conn, err);
 523 ignore:
 524        kiblnd_drop_rx(rx);                  /* Don't re-post rx. */
 525}
 526
 527static struct page *
 528kiblnd_kvaddr_to_page (unsigned long vaddr)
 529{
 530        struct page *page;
 531
 532        if (is_vmalloc_addr((void *)vaddr)) {
 533                page = vmalloc_to_page ((void *)vaddr);
 534                LASSERT (page != NULL);
 535                return page;
 536        }
 537#ifdef CONFIG_HIGHMEM
 538        if (vaddr >= PKMAP_BASE &&
 539            vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
 540                /* No highmem pages only used for bulk (kiov) I/O */
 541                CERROR("find page for address in highmem\n");
 542                LBUG();
 543        }
 544#endif
 545        page = virt_to_page (vaddr);
 546        LASSERT (page != NULL);
 547        return page;
 548}
 549
 550static int
 551kiblnd_fmr_map_tx(kib_net_t *net, kib_tx_t *tx, kib_rdma_desc_t *rd, int nob)
 552{
 553        kib_hca_dev_t           *hdev;
 554        __u64                   *pages = tx->tx_pages;
 555        kib_fmr_poolset_t       *fps;
 556        int                     npages;
 557        int                     size;
 558        int                     cpt;
 559        int                     rc;
 560        int                     i;
 561
 562        LASSERT(tx->tx_pool != NULL);
 563        LASSERT(tx->tx_pool->tpo_pool.po_owner != NULL);
 564
 565        hdev  = tx->tx_pool->tpo_hdev;
 566
 567        for (i = 0, npages = 0; i < rd->rd_nfrags; i++) {
 568                for (size = 0; size <  rd->rd_frags[i].rf_nob;
 569                               size += hdev->ibh_page_size) {
 570                        pages[npages ++] = (rd->rd_frags[i].rf_addr &
 571                                            hdev->ibh_page_mask) + size;
 572                }
 573        }
 574
 575        cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
 576
 577        fps = net->ibn_fmr_ps[cpt];
 578        rc = kiblnd_fmr_pool_map(fps, pages, npages, 0, &tx->tx_u.fmr);
 579        if (rc != 0) {
 580                CERROR ("Can't map %d pages: %d\n", npages, rc);
 581                return rc;
 582        }
 583
 584        /* If rd is not tx_rd, it's going to get sent to a peer, who will need
 585         * the rkey */
 586        rd->rd_key = (rd != tx->tx_rd) ? tx->tx_u.fmr.fmr_pfmr->fmr->rkey :
 587                                         tx->tx_u.fmr.fmr_pfmr->fmr->lkey;
 588        rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
 589        rd->rd_frags[0].rf_nob   = nob;
 590        rd->rd_nfrags = 1;
 591
 592        return 0;
 593}
 594
 595static int
 596kiblnd_pmr_map_tx(kib_net_t *net, kib_tx_t *tx, kib_rdma_desc_t *rd, int nob)
 597{
 598        kib_hca_dev_t           *hdev;
 599        kib_pmr_poolset_t       *pps;
 600        __u64                   iova;
 601        int                     cpt;
 602        int                     rc;
 603
 604        LASSERT(tx->tx_pool != NULL);
 605        LASSERT(tx->tx_pool->tpo_pool.po_owner != NULL);
 606
 607        hdev = tx->tx_pool->tpo_hdev;
 608
 609        iova = rd->rd_frags[0].rf_addr & ~hdev->ibh_page_mask;
 610
 611        cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
 612
 613        pps = net->ibn_pmr_ps[cpt];
 614        rc = kiblnd_pmr_pool_map(pps, hdev, rd, &iova, &tx->tx_u.pmr);
 615        if (rc != 0) {
 616                CERROR("Failed to create MR by phybuf: %d\n", rc);
 617                return rc;
 618        }
 619
 620        /* If rd is not tx_rd, it's going to get sent to a peer, who will need
 621         * the rkey */
 622        rd->rd_key = (rd != tx->tx_rd) ? tx->tx_u.pmr->pmr_mr->rkey :
 623                                         tx->tx_u.pmr->pmr_mr->lkey;
 624        rd->rd_nfrags = 1;
 625        rd->rd_frags[0].rf_addr = iova;
 626        rd->rd_frags[0].rf_nob  = nob;
 627
 628        return 0;
 629}
 630
 631void
 632kiblnd_unmap_tx(lnet_ni_t *ni, kib_tx_t *tx)
 633{
 634        kib_net_t  *net = ni->ni_data;
 635
 636        LASSERT(net != NULL);
 637
 638        if (net->ibn_fmr_ps != NULL && tx->tx_u.fmr.fmr_pfmr != NULL) {
 639                kiblnd_fmr_pool_unmap(&tx->tx_u.fmr, tx->tx_status);
 640                tx->tx_u.fmr.fmr_pfmr = NULL;
 641
 642        } else if (net->ibn_pmr_ps != NULL && tx->tx_u.pmr != NULL) {
 643                kiblnd_pmr_pool_unmap(tx->tx_u.pmr);
 644                tx->tx_u.pmr = NULL;
 645        }
 646
 647        if (tx->tx_nfrags != 0) {
 648                kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
 649                                    tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
 650                tx->tx_nfrags = 0;
 651        }
 652}
 653
 654int
 655kiblnd_map_tx(lnet_ni_t *ni, kib_tx_t *tx,
 656              kib_rdma_desc_t *rd, int nfrags)
 657{
 658        kib_hca_dev_t      *hdev  = tx->tx_pool->tpo_hdev;
 659        kib_net_t         *net   = ni->ni_data;
 660        struct ib_mr       *mr    = NULL;
 661        __u32          nob;
 662        int              i;
 663
 664        /* If rd is not tx_rd, it's going to get sent to a peer and I'm the
 665         * RDMA sink */
 666        tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
 667        tx->tx_nfrags = nfrags;
 668
 669        rd->rd_nfrags =
 670                kiblnd_dma_map_sg(hdev->ibh_ibdev,
 671                                  tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
 672
 673        for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
 674                rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
 675                        hdev->ibh_ibdev, &tx->tx_frags[i]);
 676                rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
 677                        hdev->ibh_ibdev, &tx->tx_frags[i]);
 678                nob += rd->rd_frags[i].rf_nob;
 679        }
 680
 681        /* looking for pre-mapping MR */
 682        mr = kiblnd_find_rd_dma_mr(hdev, rd);
 683        if (mr != NULL) {
 684                /* found pre-mapping MR */
 685                rd->rd_key = (rd != tx->tx_rd) ? mr->rkey : mr->lkey;
 686                return 0;
 687        }
 688
 689        if (net->ibn_fmr_ps != NULL)
 690                return kiblnd_fmr_map_tx(net, tx, rd, nob);
 691        else if (net->ibn_pmr_ps != NULL)
 692                return kiblnd_pmr_map_tx(net, tx, rd, nob);
 693
 694        return -EINVAL;
 695}
 696
 697
 698static int
 699kiblnd_setup_rd_iov(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
 700                    unsigned int niov, struct iovec *iov, int offset, int nob)
 701{
 702        kib_net_t         *net = ni->ni_data;
 703        struct page     *page;
 704        struct scatterlist *sg;
 705        unsigned long       vaddr;
 706        int              fragnob;
 707        int              page_offset;
 708
 709        LASSERT (nob > 0);
 710        LASSERT (niov > 0);
 711        LASSERT (net != NULL);
 712
 713        while (offset >= iov->iov_len) {
 714                offset -= iov->iov_len;
 715                niov--;
 716                iov++;
 717                LASSERT (niov > 0);
 718        }
 719
 720        sg = tx->tx_frags;
 721        do {
 722                LASSERT (niov > 0);
 723
 724                vaddr = ((unsigned long)iov->iov_base) + offset;
 725                page_offset = vaddr & (PAGE_SIZE - 1);
 726                page = kiblnd_kvaddr_to_page(vaddr);
 727                if (page == NULL) {
 728                        CERROR ("Can't find page\n");
 729                        return -EFAULT;
 730                }
 731
 732                fragnob = min((int)(iov->iov_len - offset), nob);
 733                fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
 734
 735                sg_set_page(sg, page, fragnob, page_offset);
 736                sg++;
 737
 738                if (offset + fragnob < iov->iov_len) {
 739                        offset += fragnob;
 740                } else {
 741                        offset = 0;
 742                        iov++;
 743                        niov--;
 744                }
 745                nob -= fragnob;
 746        } while (nob > 0);
 747
 748        return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
 749}
 750
 751static int
 752kiblnd_setup_rd_kiov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
 753                      int nkiov, lnet_kiov_t *kiov, int offset, int nob)
 754{
 755        kib_net_t         *net = ni->ni_data;
 756        struct scatterlist *sg;
 757        int              fragnob;
 758
 759        CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
 760
 761        LASSERT (nob > 0);
 762        LASSERT (nkiov > 0);
 763        LASSERT (net != NULL);
 764
 765        while (offset >= kiov->kiov_len) {
 766                offset -= kiov->kiov_len;
 767                nkiov--;
 768                kiov++;
 769                LASSERT (nkiov > 0);
 770        }
 771
 772        sg = tx->tx_frags;
 773        do {
 774                LASSERT (nkiov > 0);
 775
 776                fragnob = min((int)(kiov->kiov_len - offset), nob);
 777
 778                sg_set_page(sg, kiov->kiov_page, fragnob,
 779                            kiov->kiov_offset + offset);
 780                sg++;
 781
 782                offset = 0;
 783                kiov++;
 784                nkiov--;
 785                nob -= fragnob;
 786        } while (nob > 0);
 787
 788        return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
 789}
 790
 791static int
 792kiblnd_post_tx_locked (kib_conn_t *conn, kib_tx_t *tx, int credit)
 793        __releases(conn->ibc_lock)
 794        __acquires(conn->ibc_lock)
 795{
 796        kib_msg_t        *msg = tx->tx_msg;
 797        kib_peer_t      *peer = conn->ibc_peer;
 798        int             ver = conn->ibc_version;
 799        int             rc;
 800        int             done;
 801        struct ib_send_wr *bad_wrq;
 802
 803        LASSERT (tx->tx_queued);
 804        /* We rely on this for QP sizing */
 805        LASSERT (tx->tx_nwrq > 0);
 806        LASSERT (tx->tx_nwrq <= 1 + IBLND_RDMA_FRAGS(ver));
 807
 808        LASSERT (credit == 0 || credit == 1);
 809        LASSERT (conn->ibc_outstanding_credits >= 0);
 810        LASSERT (conn->ibc_outstanding_credits <= IBLND_MSG_QUEUE_SIZE(ver));
 811        LASSERT (conn->ibc_credits >= 0);
 812        LASSERT (conn->ibc_credits <= IBLND_MSG_QUEUE_SIZE(ver));
 813
 814        if (conn->ibc_nsends_posted == IBLND_CONCURRENT_SENDS(ver)) {
 815                /* tx completions outstanding... */
 816                CDEBUG(D_NET, "%s: posted enough\n",
 817                       libcfs_nid2str(peer->ibp_nid));
 818                return -EAGAIN;
 819        }
 820
 821        if (credit != 0 && conn->ibc_credits == 0) {   /* no credits */
 822                CDEBUG(D_NET, "%s: no credits\n",
 823                       libcfs_nid2str(peer->ibp_nid));
 824                return -EAGAIN;
 825        }
 826
 827        if (credit != 0 && !IBLND_OOB_CAPABLE(ver) &&
 828            conn->ibc_credits == 1 &&   /* last credit reserved */
 829            msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
 830                CDEBUG(D_NET, "%s: not using last credit\n",
 831                       libcfs_nid2str(peer->ibp_nid));
 832                return -EAGAIN;
 833        }
 834
 835        /* NB don't drop ibc_lock before bumping tx_sending */
 836        list_del(&tx->tx_list);
 837        tx->tx_queued = 0;
 838
 839        if (msg->ibm_type == IBLND_MSG_NOOP &&
 840            (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
 841             (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
 842              conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
 843                /* OK to drop when posted enough NOOPs, since
 844                 * kiblnd_check_sends will queue NOOP again when
 845                 * posted NOOPs complete */
 846                spin_unlock(&conn->ibc_lock);
 847                kiblnd_tx_done(peer->ibp_ni, tx);
 848                spin_lock(&conn->ibc_lock);
 849                CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
 850                       libcfs_nid2str(peer->ibp_nid),
 851                       conn->ibc_noops_posted);
 852                return 0;
 853        }
 854
 855        kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
 856                        peer->ibp_nid, conn->ibc_incarnation);
 857
 858        conn->ibc_credits -= credit;
 859        conn->ibc_outstanding_credits = 0;
 860        conn->ibc_nsends_posted++;
 861        if (msg->ibm_type == IBLND_MSG_NOOP)
 862                conn->ibc_noops_posted++;
 863
 864        /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
 865         * PUT.  If so, it was first queued here as a PUT_REQ, sent and
 866         * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
 867         * and then re-queued here.  It's (just) possible that
 868         * tx_sending is non-zero if we've not done the tx_complete()
 869         * from the first send; hence the ++ rather than = below. */
 870        tx->tx_sending++;
 871        list_add(&tx->tx_list, &conn->ibc_active_txs);
 872
 873        /* I'm still holding ibc_lock! */
 874        if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
 875                rc = -ECONNABORTED;
 876        } else if (tx->tx_pool->tpo_pool.po_failed ||
 877                 conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
 878                /* close_conn will launch failover */
 879                rc = -ENETDOWN;
 880        } else {
 881                rc = ib_post_send(conn->ibc_cmid->qp,
 882                                  tx->tx_wrq, &bad_wrq);
 883        }
 884
 885        conn->ibc_last_send = jiffies;
 886
 887        if (rc == 0)
 888                return 0;
 889
 890        /* NB credits are transferred in the actual
 891         * message, which can only be the last work item */
 892        conn->ibc_credits += credit;
 893        conn->ibc_outstanding_credits += msg->ibm_credits;
 894        conn->ibc_nsends_posted--;
 895        if (msg->ibm_type == IBLND_MSG_NOOP)
 896                conn->ibc_noops_posted--;
 897
 898        tx->tx_status = rc;
 899        tx->tx_waiting = 0;
 900        tx->tx_sending--;
 901
 902        done = (tx->tx_sending == 0);
 903        if (done)
 904                list_del(&tx->tx_list);
 905
 906        spin_unlock(&conn->ibc_lock);
 907
 908        if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
 909                CERROR("Error %d posting transmit to %s\n",
 910                       rc, libcfs_nid2str(peer->ibp_nid));
 911        else
 912                CDEBUG(D_NET, "Error %d posting transmit to %s\n",
 913                       rc, libcfs_nid2str(peer->ibp_nid));
 914
 915        kiblnd_close_conn(conn, rc);
 916
 917        if (done)
 918                kiblnd_tx_done(peer->ibp_ni, tx);
 919
 920        spin_lock(&conn->ibc_lock);
 921
 922        return -EIO;
 923}
 924
 925void
 926kiblnd_check_sends (kib_conn_t *conn)
 927{
 928        int     ver = conn->ibc_version;
 929        lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
 930        kib_tx_t  *tx;
 931
 932        /* Don't send anything until after the connection is established */
 933        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 934                CDEBUG(D_NET, "%s too soon\n",
 935                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
 936                return;
 937        }
 938
 939        spin_lock(&conn->ibc_lock);
 940
 941        LASSERT (conn->ibc_nsends_posted <= IBLND_CONCURRENT_SENDS(ver));
 942        LASSERT (!IBLND_OOB_CAPABLE(ver) ||
 943                 conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
 944        LASSERT (conn->ibc_reserved_credits >= 0);
 945
 946        while (conn->ibc_reserved_credits > 0 &&
 947               !list_empty(&conn->ibc_tx_queue_rsrvd)) {
 948                tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
 949                                    kib_tx_t, tx_list);
 950                list_del(&tx->tx_list);
 951                list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
 952                conn->ibc_reserved_credits--;
 953        }
 954
 955        if (kiblnd_need_noop(conn)) {
 956                spin_unlock(&conn->ibc_lock);
 957
 958                tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
 959                if (tx != NULL)
 960                        kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
 961
 962                spin_lock(&conn->ibc_lock);
 963                if (tx != NULL)
 964                        kiblnd_queue_tx_locked(tx, conn);
 965        }
 966
 967        kiblnd_conn_addref(conn); /* 1 ref for me.... (see b21911) */
 968
 969        for (;;) {
 970                int credit;
 971
 972                if (!list_empty(&conn->ibc_tx_queue_nocred)) {
 973                        credit = 0;
 974                        tx = list_entry(conn->ibc_tx_queue_nocred.next,
 975                                            kib_tx_t, tx_list);
 976                } else if (!list_empty(&conn->ibc_tx_noops)) {
 977                        LASSERT (!IBLND_OOB_CAPABLE(ver));
 978                        credit = 1;
 979                        tx = list_entry(conn->ibc_tx_noops.next,
 980                                        kib_tx_t, tx_list);
 981                } else if (!list_empty(&conn->ibc_tx_queue)) {
 982                        credit = 1;
 983                        tx = list_entry(conn->ibc_tx_queue.next,
 984                                            kib_tx_t, tx_list);
 985                } else
 986                        break;
 987
 988                if (kiblnd_post_tx_locked(conn, tx, credit) != 0)
 989                        break;
 990        }
 991
 992        spin_unlock(&conn->ibc_lock);
 993
 994        kiblnd_conn_decref(conn); /* ...until here */
 995}
 996
 997static void
 998kiblnd_tx_complete (kib_tx_t *tx, int status)
 999{
1000        int        failed = (status != IB_WC_SUCCESS);
1001        kib_conn_t   *conn = tx->tx_conn;
1002        int        idle;
1003
1004        LASSERT (tx->tx_sending > 0);
1005
1006        if (failed) {
1007                if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
1008                        CNETERR("Tx -> %s cookie %#llx sending %d waiting %d: failed %d\n",
1009                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1010                                tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
1011                                status);
1012
1013                kiblnd_close_conn(conn, -EIO);
1014        } else {
1015                kiblnd_peer_alive(conn->ibc_peer);
1016        }
1017
1018        spin_lock(&conn->ibc_lock);
1019
1020        /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1021         * gets to free it, which also drops its ref on 'conn'. */
1022
1023        tx->tx_sending--;
1024        conn->ibc_nsends_posted--;
1025        if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
1026                conn->ibc_noops_posted--;
1027
1028        if (failed) {
1029                tx->tx_waiting = 0;          /* don't wait for peer */
1030                tx->tx_status = -EIO;
1031        }
1032
1033        idle = (tx->tx_sending == 0) &&  /* This is the final callback */
1034               !tx->tx_waiting &&              /* Not waiting for peer */
1035               !tx->tx_queued;            /* Not re-queued (PUT_DONE) */
1036        if (idle)
1037                list_del(&tx->tx_list);
1038
1039        kiblnd_conn_addref(conn);              /* 1 ref for me.... */
1040
1041        spin_unlock(&conn->ibc_lock);
1042
1043        if (idle)
1044                kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1045
1046        kiblnd_check_sends(conn);
1047
1048        kiblnd_conn_decref(conn);              /* ...until here */
1049}
1050
1051void
1052kiblnd_init_tx_msg (lnet_ni_t *ni, kib_tx_t *tx, int type, int body_nob)
1053{
1054        kib_hca_dev_t     *hdev = tx->tx_pool->tpo_hdev;
1055        struct ib_sge     *sge = &tx->tx_sge[tx->tx_nwrq];
1056        struct ib_send_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1057        int             nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1058        struct ib_mr      *mr;
1059
1060        LASSERT (tx->tx_nwrq >= 0);
1061        LASSERT (tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1062        LASSERT (nob <= IBLND_MSG_SIZE);
1063
1064        kiblnd_init_msg(tx->tx_msg, type, body_nob);
1065
1066        mr = kiblnd_find_dma_mr(hdev, tx->tx_msgaddr, nob);
1067        LASSERT (mr != NULL);
1068
1069        sge->lkey   = mr->lkey;
1070        sge->addr   = tx->tx_msgaddr;
1071        sge->length = nob;
1072
1073        memset(wrq, 0, sizeof(*wrq));
1074
1075        wrq->next       = NULL;
1076        wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1077        wrq->sg_list    = sge;
1078        wrq->num_sge    = 1;
1079        wrq->opcode     = IB_WR_SEND;
1080        wrq->send_flags = IB_SEND_SIGNALED;
1081
1082        tx->tx_nwrq++;
1083}
1084
1085int
1086kiblnd_init_rdma (kib_conn_t *conn, kib_tx_t *tx, int type,
1087                  int resid, kib_rdma_desc_t *dstrd, __u64 dstcookie)
1088{
1089        kib_msg_t        *ibmsg = tx->tx_msg;
1090        kib_rdma_desc_t   *srcrd = tx->tx_rd;
1091        struct ib_sge     *sge = &tx->tx_sge[0];
1092        struct ib_send_wr *wrq = &tx->tx_wrq[0];
1093        int             rc  = resid;
1094        int             srcidx;
1095        int             dstidx;
1096        int             wrknob;
1097
1098        LASSERT (!in_interrupt());
1099        LASSERT (tx->tx_nwrq == 0);
1100        LASSERT (type == IBLND_MSG_GET_DONE ||
1101                 type == IBLND_MSG_PUT_DONE);
1102
1103        srcidx = dstidx = 0;
1104
1105        while (resid > 0) {
1106                if (srcidx >= srcrd->rd_nfrags) {
1107                        CERROR("Src buffer exhausted: %d frags\n", srcidx);
1108                        rc = -EPROTO;
1109                        break;
1110                }
1111
1112                if (dstidx == dstrd->rd_nfrags) {
1113                        CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1114                        rc = -EPROTO;
1115                        break;
1116                }
1117
1118                if (tx->tx_nwrq == IBLND_RDMA_FRAGS(conn->ibc_version)) {
1119                        CERROR("RDMA too fragmented for %s (%d): "
1120                               "%d/%d src %d/%d dst frags\n",
1121                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
1122                               IBLND_RDMA_FRAGS(conn->ibc_version),
1123                               srcidx, srcrd->rd_nfrags,
1124                               dstidx, dstrd->rd_nfrags);
1125                        rc = -EMSGSIZE;
1126                        break;
1127                }
1128
1129                wrknob = MIN(MIN(kiblnd_rd_frag_size(srcrd, srcidx),
1130                                 kiblnd_rd_frag_size(dstrd, dstidx)), resid);
1131
1132                sge = &tx->tx_sge[tx->tx_nwrq];
1133                sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1134                sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1135                sge->length = wrknob;
1136
1137                wrq = &tx->tx_wrq[tx->tx_nwrq];
1138
1139                wrq->next       = wrq + 1;
1140                wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1141                wrq->sg_list    = sge;
1142                wrq->num_sge    = 1;
1143                wrq->opcode     = IB_WR_RDMA_WRITE;
1144                wrq->send_flags = 0;
1145
1146                wrq->wr.rdma.remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1147                wrq->wr.rdma.rkey       = kiblnd_rd_frag_key(dstrd, dstidx);
1148
1149                srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1150                dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1151
1152                resid -= wrknob;
1153
1154                tx->tx_nwrq++;
1155                wrq++;
1156                sge++;
1157        }
1158
1159        if (rc < 0)                          /* no RDMA if completing with failure */
1160                tx->tx_nwrq = 0;
1161
1162        ibmsg->ibm_u.completion.ibcm_status = rc;
1163        ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1164        kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1165                           type, sizeof (kib_completion_msg_t));
1166
1167        return rc;
1168}
1169
1170void
1171kiblnd_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn)
1172{
1173        struct list_head   *q;
1174
1175        LASSERT (tx->tx_nwrq > 0);            /* work items set up */
1176        LASSERT (!tx->tx_queued);              /* not queued for sending already */
1177        LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1178
1179        tx->tx_queued = 1;
1180        tx->tx_deadline = jiffies + (*kiblnd_tunables.kib_timeout * HZ);
1181
1182        if (tx->tx_conn == NULL) {
1183                kiblnd_conn_addref(conn);
1184                tx->tx_conn = conn;
1185                LASSERT (tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1186        } else {
1187                /* PUT_DONE first attached to conn as a PUT_REQ */
1188                LASSERT (tx->tx_conn == conn);
1189                LASSERT (tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1190        }
1191
1192        switch (tx->tx_msg->ibm_type) {
1193        default:
1194                LBUG();
1195
1196        case IBLND_MSG_PUT_REQ:
1197        case IBLND_MSG_GET_REQ:
1198                q = &conn->ibc_tx_queue_rsrvd;
1199                break;
1200
1201        case IBLND_MSG_PUT_NAK:
1202        case IBLND_MSG_PUT_ACK:
1203        case IBLND_MSG_PUT_DONE:
1204        case IBLND_MSG_GET_DONE:
1205                q = &conn->ibc_tx_queue_nocred;
1206                break;
1207
1208        case IBLND_MSG_NOOP:
1209                if (IBLND_OOB_CAPABLE(conn->ibc_version))
1210                        q = &conn->ibc_tx_queue_nocred;
1211                else
1212                        q = &conn->ibc_tx_noops;
1213                break;
1214
1215        case IBLND_MSG_IMMEDIATE:
1216                q = &conn->ibc_tx_queue;
1217                break;
1218        }
1219
1220        list_add_tail(&tx->tx_list, q);
1221}
1222
1223void
1224kiblnd_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1225{
1226        spin_lock(&conn->ibc_lock);
1227        kiblnd_queue_tx_locked(tx, conn);
1228        spin_unlock(&conn->ibc_lock);
1229
1230        kiblnd_check_sends(conn);
1231}
1232
1233static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1234                               struct sockaddr_in *srcaddr,
1235                               struct sockaddr_in *dstaddr,
1236                               int timeout_ms)
1237{
1238        unsigned short port;
1239        int rc;
1240
1241        /* allow the port to be reused */
1242        rc = rdma_set_reuseaddr(cmid, 1);
1243        if (rc != 0) {
1244                CERROR("Unable to set reuse on cmid: %d\n", rc);
1245                return rc;
1246        }
1247
1248        /* look for a free privileged port */
1249        for (port = PROT_SOCK-1; port > 0; port--) {
1250                srcaddr->sin_port = htons(port);
1251                rc = rdma_resolve_addr(cmid,
1252                                       (struct sockaddr *)srcaddr,
1253                                       (struct sockaddr *)dstaddr,
1254                                       timeout_ms);
1255                if (rc == 0) {
1256                        CDEBUG(D_NET, "bound to port %hu\n", port);
1257                        return 0;
1258                } else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1259                        CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1260                               port, rc);
1261                } else {
1262                        return rc;
1263                }
1264        }
1265
1266        CERROR("Failed to bind to a free privileged port\n");
1267        return rc;
1268}
1269
1270static void
1271kiblnd_connect_peer (kib_peer_t *peer)
1272{
1273        struct rdma_cm_id *cmid;
1274        kib_dev_t        *dev;
1275        kib_net_t        *net = peer->ibp_ni->ni_data;
1276        struct sockaddr_in srcaddr;
1277        struct sockaddr_in dstaddr;
1278        int             rc;
1279
1280        LASSERT (net != NULL);
1281        LASSERT (peer->ibp_connecting > 0);
1282
1283        cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1284                                     IB_QPT_RC);
1285
1286        if (IS_ERR(cmid)) {
1287                CERROR("Can't create CMID for %s: %ld\n",
1288                       libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1289                rc = PTR_ERR(cmid);
1290                goto failed;
1291        }
1292
1293        dev = net->ibn_dev;
1294        memset(&srcaddr, 0, sizeof(srcaddr));
1295        srcaddr.sin_family = AF_INET;
1296        srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1297
1298        memset(&dstaddr, 0, sizeof(dstaddr));
1299        dstaddr.sin_family = AF_INET;
1300        dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1301        dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1302
1303        kiblnd_peer_addref(peer);              /* cmid's ref */
1304
1305        if (*kiblnd_tunables.kib_use_priv_port) {
1306                rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1307                                         *kiblnd_tunables.kib_timeout * 1000);
1308        } else {
1309                rc = rdma_resolve_addr(cmid,
1310                                       (struct sockaddr *)&srcaddr,
1311                                       (struct sockaddr *)&dstaddr,
1312                                       *kiblnd_tunables.kib_timeout * 1000);
1313        }
1314        if (rc != 0) {
1315                /* Can't initiate address resolution:  */
1316                CERROR("Can't resolve addr for %s: %d\n",
1317                       libcfs_nid2str(peer->ibp_nid), rc);
1318                goto failed2;
1319        }
1320
1321        LASSERT (cmid->device != NULL);
1322        CDEBUG(D_NET, "%s: connection bound to %s:%pI4h:%s\n",
1323               libcfs_nid2str(peer->ibp_nid), dev->ibd_ifname,
1324               &dev->ibd_ifip, cmid->device->name);
1325
1326        return;
1327
1328 failed2:
1329        kiblnd_peer_decref(peer);              /* cmid's ref */
1330        rdma_destroy_id(cmid);
1331 failed:
1332        kiblnd_peer_connect_failed(peer, 1, rc);
1333}
1334
1335void
1336kiblnd_launch_tx (lnet_ni_t *ni, kib_tx_t *tx, lnet_nid_t nid)
1337{
1338        kib_peer_t      *peer;
1339        kib_peer_t      *peer2;
1340        kib_conn_t      *conn;
1341        rwlock_t        *g_lock = &kiblnd_data.kib_global_lock;
1342        unsigned long      flags;
1343        int             rc;
1344
1345        /* If I get here, I've committed to send, so I complete the tx with
1346         * failure on any problems */
1347
1348        LASSERT (tx == NULL || tx->tx_conn == NULL); /* only set when assigned a conn */
1349        LASSERT (tx == NULL || tx->tx_nwrq > 0);     /* work items have been set up */
1350
1351        /* First time, just use a read lock since I expect to find my peer
1352         * connected */
1353        read_lock_irqsave(g_lock, flags);
1354
1355        peer = kiblnd_find_peer_locked(nid);
1356        if (peer != NULL && !list_empty(&peer->ibp_conns)) {
1357                /* Found a peer with an established connection */
1358                conn = kiblnd_get_conn_locked(peer);
1359                kiblnd_conn_addref(conn); /* 1 ref for me... */
1360
1361                read_unlock_irqrestore(g_lock, flags);
1362
1363                if (tx != NULL)
1364                        kiblnd_queue_tx(tx, conn);
1365                kiblnd_conn_decref(conn); /* ...to here */
1366                return;
1367        }
1368
1369        read_unlock(g_lock);
1370        /* Re-try with a write lock */
1371        write_lock(g_lock);
1372
1373        peer = kiblnd_find_peer_locked(nid);
1374        if (peer != NULL) {
1375                if (list_empty(&peer->ibp_conns)) {
1376                        /* found a peer, but it's still connecting... */
1377                        LASSERT (peer->ibp_connecting != 0 ||
1378                                 peer->ibp_accepting != 0);
1379                        if (tx != NULL)
1380                                list_add_tail(&tx->tx_list,
1381                                                  &peer->ibp_tx_queue);
1382                        write_unlock_irqrestore(g_lock, flags);
1383                } else {
1384                        conn = kiblnd_get_conn_locked(peer);
1385                        kiblnd_conn_addref(conn); /* 1 ref for me... */
1386
1387                        write_unlock_irqrestore(g_lock, flags);
1388
1389                        if (tx != NULL)
1390                                kiblnd_queue_tx(tx, conn);
1391                        kiblnd_conn_decref(conn); /* ...to here */
1392                }
1393                return;
1394        }
1395
1396        write_unlock_irqrestore(g_lock, flags);
1397
1398        /* Allocate a peer ready to add to the peer table and retry */
1399        rc = kiblnd_create_peer(ni, &peer, nid);
1400        if (rc != 0) {
1401                CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1402                if (tx != NULL) {
1403                        tx->tx_status = -EHOSTUNREACH;
1404                        tx->tx_waiting = 0;
1405                        kiblnd_tx_done(ni, tx);
1406                }
1407                return;
1408        }
1409
1410        write_lock_irqsave(g_lock, flags);
1411
1412        peer2 = kiblnd_find_peer_locked(nid);
1413        if (peer2 != NULL) {
1414                if (list_empty(&peer2->ibp_conns)) {
1415                        /* found a peer, but it's still connecting... */
1416                        LASSERT (peer2->ibp_connecting != 0 ||
1417                                 peer2->ibp_accepting != 0);
1418                        if (tx != NULL)
1419                                list_add_tail(&tx->tx_list,
1420                                                  &peer2->ibp_tx_queue);
1421                        write_unlock_irqrestore(g_lock, flags);
1422                } else {
1423                        conn = kiblnd_get_conn_locked(peer2);
1424                        kiblnd_conn_addref(conn); /* 1 ref for me... */
1425
1426                        write_unlock_irqrestore(g_lock, flags);
1427
1428                        if (tx != NULL)
1429                                kiblnd_queue_tx(tx, conn);
1430                        kiblnd_conn_decref(conn); /* ...to here */
1431                }
1432
1433                kiblnd_peer_decref(peer);
1434                return;
1435        }
1436
1437        /* Brand new peer */
1438        LASSERT (peer->ibp_connecting == 0);
1439        peer->ibp_connecting = 1;
1440
1441        /* always called with a ref on ni, which prevents ni being shutdown */
1442        LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);
1443
1444        if (tx != NULL)
1445                list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1446
1447        kiblnd_peer_addref(peer);
1448        list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1449
1450        write_unlock_irqrestore(g_lock, flags);
1451
1452        kiblnd_connect_peer(peer);
1453        kiblnd_peer_decref(peer);
1454}
1455
1456int
1457kiblnd_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1458{
1459        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
1460        int            type = lntmsg->msg_type;
1461        lnet_process_id_t target = lntmsg->msg_target;
1462        int            target_is_router = lntmsg->msg_target_is_router;
1463        int            routing = lntmsg->msg_routing;
1464        unsigned int      payload_niov = lntmsg->msg_niov;
1465        struct iovec     *payload_iov = lntmsg->msg_iov;
1466        lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1467        unsigned int      payload_offset = lntmsg->msg_offset;
1468        unsigned int      payload_nob = lntmsg->msg_len;
1469        kib_msg_t       *ibmsg;
1470        kib_tx_t         *tx;
1471        int            nob;
1472        int            rc;
1473
1474        /* NB 'private' is different depending on what we're sending.... */
1475
1476        CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1477               payload_nob, payload_niov, libcfs_id2str(target));
1478
1479        LASSERT (payload_nob == 0 || payload_niov > 0);
1480        LASSERT (payload_niov <= LNET_MAX_IOV);
1481
1482        /* Thread context */
1483        LASSERT (!in_interrupt());
1484        /* payload is either all vaddrs or all pages */
1485        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1486
1487        switch (type) {
1488        default:
1489                LBUG();
1490                return -EIO;
1491
1492        case LNET_MSG_ACK:
1493                LASSERT (payload_nob == 0);
1494                break;
1495
1496        case LNET_MSG_GET:
1497                if (routing || target_is_router)
1498                        break;            /* send IMMEDIATE */
1499
1500                /* is the REPLY message too small for RDMA? */
1501                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1502                if (nob <= IBLND_MSG_SIZE)
1503                        break;            /* send IMMEDIATE */
1504
1505                tx = kiblnd_get_idle_tx(ni, target.nid);
1506                if (tx == NULL) {
1507                        CERROR("Can't allocate txd for GET to %s\n",
1508                               libcfs_nid2str(target.nid));
1509                        return -ENOMEM;
1510                }
1511
1512                ibmsg = tx->tx_msg;
1513
1514                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1515                        rc = kiblnd_setup_rd_iov(ni, tx,
1516                                                 &ibmsg->ibm_u.get.ibgm_rd,
1517                                                 lntmsg->msg_md->md_niov,
1518                                                 lntmsg->msg_md->md_iov.iov,
1519                                                 0, lntmsg->msg_md->md_length);
1520                else
1521                        rc = kiblnd_setup_rd_kiov(ni, tx,
1522                                                  &ibmsg->ibm_u.get.ibgm_rd,
1523                                                  lntmsg->msg_md->md_niov,
1524                                                  lntmsg->msg_md->md_iov.kiov,
1525                                                  0, lntmsg->msg_md->md_length);
1526                if (rc != 0) {
1527                        CERROR("Can't setup GET sink for %s: %d\n",
1528                               libcfs_nid2str(target.nid), rc);
1529                        kiblnd_tx_done(ni, tx);
1530                        return -EIO;
1531                }
1532
1533                nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[tx->tx_nfrags]);
1534                ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1535                ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1536
1537                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1538
1539                tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1540                if (tx->tx_lntmsg[1] == NULL) {
1541                        CERROR("Can't create reply for GET -> %s\n",
1542                               libcfs_nid2str(target.nid));
1543                        kiblnd_tx_done(ni, tx);
1544                        return -EIO;
1545                }
1546
1547                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1548                tx->tx_waiting = 1;          /* waiting for GET_DONE */
1549                kiblnd_launch_tx(ni, tx, target.nid);
1550                return 0;
1551
1552        case LNET_MSG_REPLY:
1553        case LNET_MSG_PUT:
1554                /* Is the payload small enough not to need RDMA? */
1555                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1556                if (nob <= IBLND_MSG_SIZE)
1557                        break;            /* send IMMEDIATE */
1558
1559                tx = kiblnd_get_idle_tx(ni, target.nid);
1560                if (tx == NULL) {
1561                        CERROR("Can't allocate %s txd for %s\n",
1562                               type == LNET_MSG_PUT ? "PUT" : "REPLY",
1563                               libcfs_nid2str(target.nid));
1564                        return -ENOMEM;
1565                }
1566
1567                if (payload_kiov == NULL)
1568                        rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1569                                                 payload_niov, payload_iov,
1570                                                 payload_offset, payload_nob);
1571                else
1572                        rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1573                                                  payload_niov, payload_kiov,
1574                                                  payload_offset, payload_nob);
1575                if (rc != 0) {
1576                        CERROR("Can't setup PUT src for %s: %d\n",
1577                               libcfs_nid2str(target.nid), rc);
1578                        kiblnd_tx_done(ni, tx);
1579                        return -EIO;
1580                }
1581
1582                ibmsg = tx->tx_msg;
1583                ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1584                ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1585                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1586
1587                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1588                tx->tx_waiting = 1;          /* waiting for PUT_{ACK,NAK} */
1589                kiblnd_launch_tx(ni, tx, target.nid);
1590                return 0;
1591        }
1592
1593        /* send IMMEDIATE */
1594
1595        LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1596                 <= IBLND_MSG_SIZE);
1597
1598        tx = kiblnd_get_idle_tx(ni, target.nid);
1599        if (tx == NULL) {
1600                CERROR ("Can't send %d to %s: tx descs exhausted\n",
1601                        type, libcfs_nid2str(target.nid));
1602                return -ENOMEM;
1603        }
1604
1605        ibmsg = tx->tx_msg;
1606        ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1607
1608        if (payload_kiov != NULL)
1609                lnet_copy_kiov2flat(IBLND_MSG_SIZE, ibmsg,
1610                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1611                                    payload_niov, payload_kiov,
1612                                    payload_offset, payload_nob);
1613        else
1614                lnet_copy_iov2flat(IBLND_MSG_SIZE, ibmsg,
1615                                   offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1616                                   payload_niov, payload_iov,
1617                                   payload_offset, payload_nob);
1618
1619        nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1620        kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1621
1622        tx->tx_lntmsg[0] = lntmsg;            /* finalise lntmsg on completion */
1623        kiblnd_launch_tx(ni, tx, target.nid);
1624        return 0;
1625}
1626
1627static void
1628kiblnd_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1629{
1630        lnet_process_id_t target = lntmsg->msg_target;
1631        unsigned int      niov = lntmsg->msg_niov;
1632        struct iovec     *iov = lntmsg->msg_iov;
1633        lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1634        unsigned int      offset = lntmsg->msg_offset;
1635        unsigned int      nob = lntmsg->msg_len;
1636        kib_tx_t         *tx;
1637        int            rc;
1638
1639        tx = kiblnd_get_idle_tx(ni, rx->rx_conn->ibc_peer->ibp_nid);
1640        if (tx == NULL) {
1641                CERROR("Can't get tx for REPLY to %s\n",
1642                       libcfs_nid2str(target.nid));
1643                goto failed_0;
1644        }
1645
1646        if (nob == 0)
1647                rc = 0;
1648        else if (kiov == NULL)
1649                rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1650                                         niov, iov, offset, nob);
1651        else
1652                rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1653                                          niov, kiov, offset, nob);
1654
1655        if (rc != 0) {
1656                CERROR("Can't setup GET src for %s: %d\n",
1657                       libcfs_nid2str(target.nid), rc);
1658                goto failed_1;
1659        }
1660
1661        rc = kiblnd_init_rdma(rx->rx_conn, tx,
1662                              IBLND_MSG_GET_DONE, nob,
1663                              &rx->rx_msg->ibm_u.get.ibgm_rd,
1664                              rx->rx_msg->ibm_u.get.ibgm_cookie);
1665        if (rc < 0) {
1666                CERROR("Can't setup rdma for GET from %s: %d\n",
1667                       libcfs_nid2str(target.nid), rc);
1668                goto failed_1;
1669        }
1670
1671        if (nob == 0) {
1672                /* No RDMA: local completion may happen now! */
1673                lnet_finalize(ni, lntmsg, 0);
1674        } else {
1675                /* RDMA: lnet_finalize(lntmsg) when it
1676                 * completes */
1677                tx->tx_lntmsg[0] = lntmsg;
1678        }
1679
1680        kiblnd_queue_tx(tx, rx->rx_conn);
1681        return;
1682
1683 failed_1:
1684        kiblnd_tx_done(ni, tx);
1685 failed_0:
1686        lnet_finalize(ni, lntmsg, -EIO);
1687}
1688
1689int
1690kiblnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1691             unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1692             unsigned int offset, unsigned int mlen, unsigned int rlen)
1693{
1694        kib_rx_t    *rx = private;
1695        kib_msg_t   *rxmsg = rx->rx_msg;
1696        kib_conn_t  *conn = rx->rx_conn;
1697        kib_tx_t    *tx;
1698        kib_msg_t   *txmsg;
1699        int       nob;
1700        int       post_credit = IBLND_POSTRX_PEER_CREDIT;
1701        int       rc = 0;
1702
1703        LASSERT (mlen <= rlen);
1704        LASSERT (!in_interrupt());
1705        /* Either all pages or all vaddrs */
1706        LASSERT (!(kiov != NULL && iov != NULL));
1707
1708        switch (rxmsg->ibm_type) {
1709        default:
1710                LBUG();
1711
1712        case IBLND_MSG_IMMEDIATE:
1713                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1714                if (nob > rx->rx_nob) {
1715                        CERROR ("Immediate message from %s too big: %d(%d)\n",
1716                                libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1717                                nob, rx->rx_nob);
1718                        rc = -EPROTO;
1719                        break;
1720                }
1721
1722                if (kiov != NULL)
1723                        lnet_copy_flat2kiov(niov, kiov, offset,
1724                                            IBLND_MSG_SIZE, rxmsg,
1725                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1726                                            mlen);
1727                else
1728                        lnet_copy_flat2iov(niov, iov, offset,
1729                                           IBLND_MSG_SIZE, rxmsg,
1730                                           offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1731                                           mlen);
1732                lnet_finalize (ni, lntmsg, 0);
1733                break;
1734
1735        case IBLND_MSG_PUT_REQ:
1736                if (mlen == 0) {
1737                        lnet_finalize(ni, lntmsg, 0);
1738                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1739                                               rxmsg->ibm_u.putreq.ibprm_cookie);
1740                        break;
1741                }
1742
1743                tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
1744                if (tx == NULL) {
1745                        CERROR("Can't allocate tx for %s\n",
1746                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
1747                        /* Not replying will break the connection */
1748                        rc = -ENOMEM;
1749                        break;
1750                }
1751
1752                txmsg = tx->tx_msg;
1753                if (kiov == NULL)
1754                        rc = kiblnd_setup_rd_iov(ni, tx,
1755                                                 &txmsg->ibm_u.putack.ibpam_rd,
1756                                                 niov, iov, offset, mlen);
1757                else
1758                        rc = kiblnd_setup_rd_kiov(ni, tx,
1759                                                  &txmsg->ibm_u.putack.ibpam_rd,
1760                                                  niov, kiov, offset, mlen);
1761                if (rc != 0) {
1762                        CERROR("Can't setup PUT sink for %s: %d\n",
1763                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1764                        kiblnd_tx_done(ni, tx);
1765                        /* tell peer it's over */
1766                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1767                                               rxmsg->ibm_u.putreq.ibprm_cookie);
1768                        break;
1769                }
1770
1771                nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[tx->tx_nfrags]);
1772                txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1773                txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1774
1775                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1776
1777                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1778                tx->tx_waiting = 1;          /* waiting for PUT_DONE */
1779                kiblnd_queue_tx(tx, conn);
1780
1781                /* reposted buffer reserved for PUT_DONE */
1782                post_credit = IBLND_POSTRX_NO_CREDIT;
1783                break;
1784
1785        case IBLND_MSG_GET_REQ:
1786                if (lntmsg != NULL) {
1787                        /* Optimized GET; RDMA lntmsg's payload */
1788                        kiblnd_reply(ni, rx, lntmsg);
1789                } else {
1790                        /* GET didn't match anything */
1791                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1792                                               -ENODATA,
1793                                               rxmsg->ibm_u.get.ibgm_cookie);
1794                }
1795                break;
1796        }
1797
1798        kiblnd_post_rx(rx, post_credit);
1799        return rc;
1800}
1801
1802int
1803kiblnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
1804{
1805        struct task_struct *task = kthread_run(fn, arg, "%s", name);
1806
1807        if (IS_ERR(task))
1808                return PTR_ERR(task);
1809
1810        atomic_inc(&kiblnd_data.kib_nthreads);
1811        return 0;
1812}
1813
1814static void
1815kiblnd_thread_fini (void)
1816{
1817        atomic_dec (&kiblnd_data.kib_nthreads);
1818}
1819
1820void
1821kiblnd_peer_alive (kib_peer_t *peer)
1822{
1823        /* This is racy, but everyone's only writing cfs_time_current() */
1824        peer->ibp_last_alive = cfs_time_current();
1825        mb();
1826}
1827
1828static void
1829kiblnd_peer_notify (kib_peer_t *peer)
1830{
1831        int        error = 0;
1832        unsigned long    last_alive = 0;
1833        unsigned long flags;
1834
1835        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1836
1837        if (list_empty(&peer->ibp_conns) &&
1838            peer->ibp_accepting == 0 &&
1839            peer->ibp_connecting == 0 &&
1840            peer->ibp_error != 0) {
1841                error = peer->ibp_error;
1842                peer->ibp_error = 0;
1843
1844                last_alive = peer->ibp_last_alive;
1845        }
1846
1847        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1848
1849        if (error != 0)
1850                lnet_notify(peer->ibp_ni,
1851                            peer->ibp_nid, 0, last_alive);
1852}
1853
1854void
1855kiblnd_close_conn_locked (kib_conn_t *conn, int error)
1856{
1857        /* This just does the immediate housekeeping.  'error' is zero for a
1858         * normal shutdown which can happen only after the connection has been
1859         * established.  If the connection is established, schedule the
1860         * connection to be finished off by the connd.  Otherwise the connd is
1861         * already dealing with it (either to set it up or tear it down).
1862         * Caller holds kib_global_lock exclusively in irq context */
1863        kib_peer_t       *peer = conn->ibc_peer;
1864        kib_dev_t       *dev;
1865        unsigned long     flags;
1866
1867        LASSERT (error != 0 || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1868
1869        if (error != 0 && conn->ibc_comms_error == 0)
1870                conn->ibc_comms_error = error;
1871
1872        if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1873                return; /* already being handled  */
1874
1875        if (error == 0 &&
1876            list_empty(&conn->ibc_tx_noops) &&
1877            list_empty(&conn->ibc_tx_queue) &&
1878            list_empty(&conn->ibc_tx_queue_rsrvd) &&
1879            list_empty(&conn->ibc_tx_queue_nocred) &&
1880            list_empty(&conn->ibc_active_txs)) {
1881                CDEBUG(D_NET, "closing conn to %s\n",
1882                       libcfs_nid2str(peer->ibp_nid));
1883        } else {
1884                CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1885                       libcfs_nid2str(peer->ibp_nid), error,
1886                       list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1887                       list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1888                       list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1889                       list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1890                       list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1891        }
1892
1893        dev = ((kib_net_t *)peer->ibp_ni->ni_data)->ibn_dev;
1894        list_del(&conn->ibc_list);
1895        /* connd (see below) takes over ibc_list's ref */
1896
1897        if (list_empty (&peer->ibp_conns) &&    /* no more conns */
1898            kiblnd_peer_active(peer)) {  /* still in peer table */
1899                kiblnd_unlink_peer_locked(peer);
1900
1901                /* set/clear error on last conn */
1902                peer->ibp_error = conn->ibc_comms_error;
1903        }
1904
1905        kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1906
1907        if (error != 0 &&
1908            kiblnd_dev_can_failover(dev)) {
1909                list_add_tail(&dev->ibd_fail_list,
1910                              &kiblnd_data.kib_failed_devs);
1911                wake_up(&kiblnd_data.kib_failover_waitq);
1912        }
1913
1914        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1915
1916        list_add_tail(&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1917        wake_up(&kiblnd_data.kib_connd_waitq);
1918
1919        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1920}
1921
1922void
1923kiblnd_close_conn(kib_conn_t *conn, int error)
1924{
1925        unsigned long flags;
1926
1927        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1928
1929        kiblnd_close_conn_locked(conn, error);
1930
1931        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1932}
1933
1934static void
1935kiblnd_handle_early_rxs(kib_conn_t *conn)
1936{
1937        unsigned long    flags;
1938        kib_rx_t        *rx;
1939
1940        LASSERT(!in_interrupt());
1941        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1942
1943        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1944        while (!list_empty(&conn->ibc_early_rxs)) {
1945                rx = list_entry(conn->ibc_early_rxs.next,
1946                                    kib_rx_t, rx_list);
1947                list_del(&rx->rx_list);
1948                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1949
1950                kiblnd_handle_rx(rx);
1951
1952                write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1953        }
1954        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1955}
1956
1957static void
1958kiblnd_abort_txs(kib_conn_t *conn, struct list_head *txs)
1959{
1960        LIST_HEAD       (zombies);
1961        struct list_head          *tmp;
1962        struct list_head          *nxt;
1963        kib_tx_t            *tx;
1964
1965        spin_lock(&conn->ibc_lock);
1966
1967        list_for_each_safe (tmp, nxt, txs) {
1968                tx = list_entry (tmp, kib_tx_t, tx_list);
1969
1970                if (txs == &conn->ibc_active_txs) {
1971                        LASSERT (!tx->tx_queued);
1972                        LASSERT (tx->tx_waiting ||
1973                                 tx->tx_sending != 0);
1974                } else {
1975                        LASSERT (tx->tx_queued);
1976                }
1977
1978                tx->tx_status = -ECONNABORTED;
1979                tx->tx_waiting = 0;
1980
1981                if (tx->tx_sending == 0) {
1982                        tx->tx_queued = 0;
1983                        list_del (&tx->tx_list);
1984                        list_add (&tx->tx_list, &zombies);
1985                }
1986        }
1987
1988        spin_unlock(&conn->ibc_lock);
1989
1990        kiblnd_txlist_done(conn->ibc_peer->ibp_ni, &zombies, -ECONNABORTED);
1991}
1992
1993static void
1994kiblnd_finalise_conn (kib_conn_t *conn)
1995{
1996        LASSERT (!in_interrupt());
1997        LASSERT (conn->ibc_state > IBLND_CONN_INIT);
1998
1999        kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2000
2001        /* abort_receives moves QP state to IB_QPS_ERR.  This is only required
2002         * for connections that didn't get as far as being connected, because
2003         * rdma_disconnect() does this for free. */
2004        kiblnd_abort_receives(conn);
2005
2006        /* Complete all tx descs not waiting for sends to complete.
2007         * NB we should be safe from RDMA now that the QP has changed state */
2008
2009        kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2010        kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2011        kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2012        kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2013        kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2014
2015        kiblnd_handle_early_rxs(conn);
2016}
2017
2018void
2019kiblnd_peer_connect_failed (kib_peer_t *peer, int active, int error)
2020{
2021        LIST_HEAD    (zombies);
2022        unsigned long     flags;
2023
2024        LASSERT (error != 0);
2025        LASSERT (!in_interrupt());
2026
2027        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2028
2029        if (active) {
2030                LASSERT (peer->ibp_connecting > 0);
2031                peer->ibp_connecting--;
2032        } else {
2033                LASSERT (peer->ibp_accepting > 0);
2034                peer->ibp_accepting--;
2035        }
2036
2037        if (peer->ibp_connecting != 0 ||
2038            peer->ibp_accepting != 0) {
2039                /* another connection attempt under way... */
2040                write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2041                                            flags);
2042                return;
2043        }
2044
2045        if (list_empty(&peer->ibp_conns)) {
2046                /* Take peer's blocked transmits to complete with error */
2047                list_add(&zombies, &peer->ibp_tx_queue);
2048                list_del_init(&peer->ibp_tx_queue);
2049
2050                if (kiblnd_peer_active(peer))
2051                        kiblnd_unlink_peer_locked(peer);
2052
2053                peer->ibp_error = error;
2054        } else {
2055                /* Can't have blocked transmits if there are connections */
2056                LASSERT (list_empty(&peer->ibp_tx_queue));
2057        }
2058
2059        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2060
2061        kiblnd_peer_notify(peer);
2062
2063        if (list_empty (&zombies))
2064                return;
2065
2066        CNETERR("Deleting messages for %s: connection failed\n",
2067                libcfs_nid2str(peer->ibp_nid));
2068
2069        kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2070}
2071
2072void
2073kiblnd_connreq_done(kib_conn_t *conn, int status)
2074{
2075        kib_peer_t      *peer = conn->ibc_peer;
2076        kib_tx_t          *tx;
2077        struct list_head         txs;
2078        unsigned long      flags;
2079        int             active;
2080
2081        active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2082
2083        CDEBUG(D_NET, "%s: active(%d), version(%x), status(%d)\n",
2084               libcfs_nid2str(peer->ibp_nid), active,
2085               conn->ibc_version, status);
2086
2087        LASSERT (!in_interrupt());
2088        LASSERT ((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2089                  peer->ibp_connecting > 0) ||
2090                 (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2091                  peer->ibp_accepting > 0));
2092
2093        LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2094        conn->ibc_connvars = NULL;
2095
2096        if (status != 0) {
2097                /* failed to establish connection */
2098                kiblnd_peer_connect_failed(peer, active, status);
2099                kiblnd_finalise_conn(conn);
2100                return;
2101        }
2102
2103        /* connection established */
2104        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2105
2106        conn->ibc_last_send = jiffies;
2107        kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2108        kiblnd_peer_alive(peer);
2109
2110        /* Add conn to peer's list and nuke any dangling conns from a different
2111         * peer instance... */
2112        kiblnd_conn_addref(conn);              /* +1 ref for ibc_list */
2113        list_add(&conn->ibc_list, &peer->ibp_conns);
2114        if (active)
2115                peer->ibp_connecting--;
2116        else
2117                peer->ibp_accepting--;
2118
2119        if (peer->ibp_version == 0) {
2120                peer->ibp_version     = conn->ibc_version;
2121                peer->ibp_incarnation = conn->ibc_incarnation;
2122        }
2123
2124        if (peer->ibp_version     != conn->ibc_version ||
2125            peer->ibp_incarnation != conn->ibc_incarnation) {
2126                kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2127                                                conn->ibc_incarnation);
2128                peer->ibp_version     = conn->ibc_version;
2129                peer->ibp_incarnation = conn->ibc_incarnation;
2130        }
2131
2132        /* grab pending txs while I have the lock */
2133        list_add(&txs, &peer->ibp_tx_queue);
2134        list_del_init(&peer->ibp_tx_queue);
2135
2136        if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2137            conn->ibc_comms_error != 0) {       /* error has happened already */
2138                lnet_ni_t *ni = peer->ibp_ni;
2139
2140                /* start to shut down connection */
2141                kiblnd_close_conn_locked(conn, -ECONNABORTED);
2142                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2143
2144                kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2145
2146                return;
2147        }
2148
2149        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2150
2151        /* Schedule blocked txs */
2152        spin_lock(&conn->ibc_lock);
2153        while (!list_empty(&txs)) {
2154                tx = list_entry(txs.next, kib_tx_t, tx_list);
2155                list_del(&tx->tx_list);
2156
2157                kiblnd_queue_tx_locked(tx, conn);
2158        }
2159        spin_unlock(&conn->ibc_lock);
2160
2161        kiblnd_check_sends(conn);
2162
2163        /* schedule blocked rxs */
2164        kiblnd_handle_early_rxs(conn);
2165}
2166
2167static void
2168kiblnd_reject(struct rdma_cm_id *cmid, kib_rej_t *rej)
2169{
2170        int       rc;
2171
2172        rc = rdma_reject(cmid, rej, sizeof(*rej));
2173
2174        if (rc != 0)
2175                CWARN("Error %d sending reject\n", rc);
2176}
2177
2178static int
2179kiblnd_passive_connect (struct rdma_cm_id *cmid, void *priv, int priv_nob)
2180{
2181        rwlock_t                *g_lock = &kiblnd_data.kib_global_lock;
2182        kib_msg_t            *reqmsg = priv;
2183        kib_msg_t            *ackmsg;
2184        kib_dev_t            *ibdev;
2185        kib_peer_t          *peer;
2186        kib_peer_t          *peer2;
2187        kib_conn_t          *conn;
2188        lnet_ni_t            *ni  = NULL;
2189        kib_net_t            *net = NULL;
2190        lnet_nid_t           nid;
2191        struct rdma_conn_param cp;
2192        kib_rej_t             rej;
2193        int                 version = IBLND_MSG_VERSION;
2194        unsigned long     flags;
2195        int                 rc;
2196        struct sockaddr_in    *peer_addr;
2197        LASSERT (!in_interrupt());
2198
2199        /* cmid inherits 'context' from the corresponding listener id */
2200        ibdev = (kib_dev_t *)cmid->context;
2201        LASSERT (ibdev != NULL);
2202
2203        memset(&rej, 0, sizeof(rej));
2204        rej.ibr_magic           = IBLND_MSG_MAGIC;
2205        rej.ibr_why               = IBLND_REJECT_FATAL;
2206        rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2207
2208        peer_addr = (struct sockaddr_in *)&(cmid->route.addr.dst_addr);
2209        if (*kiblnd_tunables.kib_require_priv_port &&
2210            ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2211                __u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2212                CERROR("Peer's port (%pI4h:%hu) is not privileged\n",
2213                       &ip, ntohs(peer_addr->sin_port));
2214                goto failed;
2215        }
2216
2217        if (priv_nob < offsetof(kib_msg_t, ibm_type)) {
2218                CERROR("Short connection request\n");
2219                goto failed;
2220        }
2221
2222        /* Future protocol version compatibility support!  If the
2223         * o2iblnd-specific protocol changes, or when LNET unifies
2224         * protocols over all LNDs, the initial connection will
2225         * negotiate a protocol version.  I trap this here to avoid
2226         * console errors; the reject tells the peer which protocol I
2227         * speak. */
2228        if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2229            reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2230                goto failed;
2231        if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2232            reqmsg->ibm_version != IBLND_MSG_VERSION &&
2233            reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2234                goto failed;
2235        if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2236            reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2237            reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2238                goto failed;
2239
2240        rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2241        if (rc != 0) {
2242                CERROR("Can't parse connection request: %d\n", rc);
2243                goto failed;
2244        }
2245
2246        nid = reqmsg->ibm_srcnid;
2247        ni  = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2248
2249        if (ni != NULL) {
2250                net = (kib_net_t *)ni->ni_data;
2251                rej.ibr_incarnation = net->ibn_incarnation;
2252        }
2253
2254        if (ni == NULL ||                        /* no matching net */
2255            ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2256            net->ibn_dev != ibdev) {          /* wrong device */
2257                CERROR("Can't accept %s on %s (%s:%d:%pI4h): "
2258                       "bad dst nid %s\n", libcfs_nid2str(nid),
2259                       ni == NULL ? "NA" : libcfs_nid2str(ni->ni_nid),
2260                       ibdev->ibd_ifname, ibdev->ibd_nnets,
2261                       &ibdev->ibd_ifip,
2262                       libcfs_nid2str(reqmsg->ibm_dstnid));
2263
2264                goto failed;
2265        }
2266
2267       /* check time stamp as soon as possible */
2268        if (reqmsg->ibm_dststamp != 0 &&
2269            reqmsg->ibm_dststamp != net->ibn_incarnation) {
2270                CWARN("Stale connection request\n");
2271                rej.ibr_why = IBLND_REJECT_CONN_STALE;
2272                goto failed;
2273        }
2274
2275        /* I can accept peer's version */
2276        version = reqmsg->ibm_version;
2277
2278        if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2279                CERROR("Unexpected connreq msg type: %x from %s\n",
2280                       reqmsg->ibm_type, libcfs_nid2str(nid));
2281                goto failed;
2282        }
2283
2284        if (reqmsg->ibm_u.connparams.ibcp_queue_depth !=
2285            IBLND_MSG_QUEUE_SIZE(version)) {
2286                CERROR("Can't accept %s: incompatible queue depth %d (%d wanted)\n",
2287                       libcfs_nid2str(nid), reqmsg->ibm_u.connparams.ibcp_queue_depth,
2288                       IBLND_MSG_QUEUE_SIZE(version));
2289
2290                if (version == IBLND_MSG_VERSION)
2291                        rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2292
2293                goto failed;
2294        }
2295
2296        if (reqmsg->ibm_u.connparams.ibcp_max_frags !=
2297            IBLND_RDMA_FRAGS(version)) {
2298                CERROR("Can't accept %s(version %x): "
2299                       "incompatible max_frags %d (%d wanted)\n",
2300                       libcfs_nid2str(nid), version,
2301                       reqmsg->ibm_u.connparams.ibcp_max_frags,
2302                       IBLND_RDMA_FRAGS(version));
2303
2304                if (version == IBLND_MSG_VERSION)
2305                        rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2306
2307                goto failed;
2308
2309        }
2310
2311        if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2312                CERROR("Can't accept %s: message size %d too big (%d max)\n",
2313                       libcfs_nid2str(nid),
2314                       reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2315                       IBLND_MSG_SIZE);
2316                goto failed;
2317        }
2318
2319        /* assume 'nid' is a new peer; create  */
2320        rc = kiblnd_create_peer(ni, &peer, nid);
2321        if (rc != 0) {
2322                CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2323                rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2324                goto failed;
2325        }
2326
2327        write_lock_irqsave(g_lock, flags);
2328
2329        peer2 = kiblnd_find_peer_locked(nid);
2330        if (peer2 != NULL) {
2331                if (peer2->ibp_version == 0) {
2332                        peer2->ibp_version     = version;
2333                        peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2334                }
2335
2336                /* not the guy I've talked with */
2337                if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2338                    peer2->ibp_version     != version) {
2339                        kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2340                        write_unlock_irqrestore(g_lock, flags);
2341
2342                        CWARN("Conn stale %s [old ver: %x, new ver: %x]\n",
2343                              libcfs_nid2str(nid), peer2->ibp_version, version);
2344
2345                        kiblnd_peer_decref(peer);
2346                        rej.ibr_why = IBLND_REJECT_CONN_STALE;
2347                        goto failed;
2348                }
2349
2350                /* tie-break connection race in favour of the higher NID */
2351                if (peer2->ibp_connecting != 0 &&
2352                    nid < ni->ni_nid) {
2353                        write_unlock_irqrestore(g_lock, flags);
2354
2355                        CWARN("Conn race %s\n", libcfs_nid2str(peer2->ibp_nid));
2356
2357                        kiblnd_peer_decref(peer);
2358                        rej.ibr_why = IBLND_REJECT_CONN_RACE;
2359                        goto failed;
2360                }
2361
2362                peer2->ibp_accepting++;
2363                kiblnd_peer_addref(peer2);
2364
2365                write_unlock_irqrestore(g_lock, flags);
2366                kiblnd_peer_decref(peer);
2367                peer = peer2;
2368        } else {
2369                /* Brand new peer */
2370                LASSERT (peer->ibp_accepting == 0);
2371                LASSERT (peer->ibp_version == 0 &&
2372                         peer->ibp_incarnation == 0);
2373
2374                peer->ibp_accepting   = 1;
2375                peer->ibp_version     = version;
2376                peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2377
2378                /* I have a ref on ni that prevents it being shutdown */
2379                LASSERT (net->ibn_shutdown == 0);
2380
2381                kiblnd_peer_addref(peer);
2382                list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2383
2384                write_unlock_irqrestore(g_lock, flags);
2385        }
2386
2387        conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT, version);
2388        if (conn == NULL) {
2389                kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2390                kiblnd_peer_decref(peer);
2391                rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2392                goto failed;
2393        }
2394
2395        /* conn now "owns" cmid, so I return success from here on to ensure the
2396         * CM callback doesn't destroy cmid. */
2397
2398        conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2399        conn->ibc_credits         = IBLND_MSG_QUEUE_SIZE(version);
2400        conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE(version);
2401        LASSERT (conn->ibc_credits + conn->ibc_reserved_credits + IBLND_OOB_MSGS(version)
2402                 <= IBLND_RX_MSGS(version));
2403
2404        ackmsg = &conn->ibc_connvars->cv_msg;
2405        memset(ackmsg, 0, sizeof(*ackmsg));
2406
2407        kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2408                        sizeof(ackmsg->ibm_u.connparams));
2409        ackmsg->ibm_u.connparams.ibcp_queue_depth  = IBLND_MSG_QUEUE_SIZE(version);
2410        ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2411        ackmsg->ibm_u.connparams.ibcp_max_frags    = IBLND_RDMA_FRAGS(version);
2412
2413        kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2414
2415        memset(&cp, 0, sizeof(cp));
2416        cp.private_data = ackmsg;
2417        cp.private_data_len    = ackmsg->ibm_nob;
2418        cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2419        cp.initiator_depth     = 0;
2420        cp.flow_control = 1;
2421        cp.retry_count   = *kiblnd_tunables.kib_retry_count;
2422        cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2423
2424        CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2425
2426        rc = rdma_accept(cmid, &cp);
2427        if (rc != 0) {
2428                CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2429                rej.ibr_version = version;
2430                rej.ibr_why     = IBLND_REJECT_FATAL;
2431
2432                kiblnd_reject(cmid, &rej);
2433                kiblnd_connreq_done(conn, rc);
2434                kiblnd_conn_decref(conn);
2435        }
2436
2437        lnet_ni_decref(ni);
2438        return 0;
2439
2440 failed:
2441        if (ni != NULL)
2442                lnet_ni_decref(ni);
2443
2444        rej.ibr_version = version;
2445        rej.ibr_cp.ibcp_queue_depth = IBLND_MSG_QUEUE_SIZE(version);
2446        rej.ibr_cp.ibcp_max_frags   = IBLND_RDMA_FRAGS(version);
2447        kiblnd_reject(cmid, &rej);
2448
2449        return -ECONNREFUSED;
2450}
2451
2452static void
2453kiblnd_reconnect (kib_conn_t *conn, int version,
2454                  __u64 incarnation, int why, kib_connparams_t *cp)
2455{
2456        kib_peer_t    *peer = conn->ibc_peer;
2457        char      *reason;
2458        int         retry = 0;
2459        unsigned long  flags;
2460
2461        LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2462        LASSERT (peer->ibp_connecting > 0);     /* 'conn' at least */
2463
2464        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2465
2466        /* retry connection if it's still needed and no other connection
2467         * attempts (active or passive) are in progress
2468         * NB: reconnect is still needed even when ibp_tx_queue is
2469         * empty if ibp_version != version because reconnect may be
2470         * initiated by kiblnd_query() */
2471        if ((!list_empty(&peer->ibp_tx_queue) ||
2472             peer->ibp_version != version) &&
2473            peer->ibp_connecting == 1 &&
2474            peer->ibp_accepting == 0) {
2475                retry = 1;
2476                peer->ibp_connecting++;
2477
2478                peer->ibp_version     = version;
2479                peer->ibp_incarnation = incarnation;
2480        }
2481
2482        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2483
2484        if (!retry)
2485                return;
2486
2487        switch (why) {
2488        default:
2489                reason = "Unknown";
2490                break;
2491
2492        case IBLND_REJECT_CONN_STALE:
2493                reason = "stale";
2494                break;
2495
2496        case IBLND_REJECT_CONN_RACE:
2497                reason = "conn race";
2498                break;
2499
2500        case IBLND_REJECT_CONN_UNCOMPAT:
2501                reason = "version negotiation";
2502                break;
2503        }
2504
2505        CNETERR("%s: retrying (%s), %x, %x, "
2506                "queue_dep: %d, max_frag: %d, msg_size: %d\n",
2507                libcfs_nid2str(peer->ibp_nid),
2508                reason, IBLND_MSG_VERSION, version,
2509                cp != NULL? cp->ibcp_queue_depth :IBLND_MSG_QUEUE_SIZE(version),
2510                cp != NULL? cp->ibcp_max_frags   : IBLND_RDMA_FRAGS(version),
2511                cp != NULL? cp->ibcp_max_msg_size: IBLND_MSG_SIZE);
2512
2513        kiblnd_connect_peer(peer);
2514}
2515
2516static void
2517kiblnd_rejected (kib_conn_t *conn, int reason, void *priv, int priv_nob)
2518{
2519        kib_peer_t    *peer = conn->ibc_peer;
2520
2521        LASSERT (!in_interrupt());
2522        LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2523
2524        switch (reason) {
2525        case IB_CM_REJ_STALE_CONN:
2526                kiblnd_reconnect(conn, IBLND_MSG_VERSION, 0,
2527                                 IBLND_REJECT_CONN_STALE, NULL);
2528                break;
2529
2530        case IB_CM_REJ_INVALID_SERVICE_ID:
2531                CNETERR("%s rejected: no listener at %d\n",
2532                        libcfs_nid2str(peer->ibp_nid),
2533                        *kiblnd_tunables.kib_service);
2534                break;
2535
2536        case IB_CM_REJ_CONSUMER_DEFINED:
2537                if (priv_nob >= offsetof(kib_rej_t, ibr_padding)) {
2538                        kib_rej_t       *rej     = priv;
2539                        kib_connparams_t *cp      = NULL;
2540                        int            flip     = 0;
2541                        __u64        incarnation = -1;
2542
2543                        /* NB. default incarnation is -1 because:
2544                         * a) V1 will ignore dst incarnation in connreq.
2545                         * b) V2 will provide incarnation while rejecting me,
2546                         *    -1 will be overwrote.
2547                         *
2548                         * if I try to connect to a V1 peer with V2 protocol,
2549                         * it rejected me then upgrade to V2, I have no idea
2550                         * about the upgrading and try to reconnect with V1,
2551                         * in this case upgraded V2 can find out I'm trying to
2552                         * talk to the old guy and reject me(incarnation is -1).
2553                         */
2554
2555                        if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2556                            rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2557                                __swab32s(&rej->ibr_magic);
2558                                __swab16s(&rej->ibr_version);
2559                                flip = 1;
2560                        }
2561
2562                        if (priv_nob >= sizeof(kib_rej_t) &&
2563                            rej->ibr_version > IBLND_MSG_VERSION_1) {
2564                                /* priv_nob is always 148 in current version
2565                                 * of OFED, so we still need to check version.
2566                                 * (define of IB_CM_REJ_PRIVATE_DATA_SIZE) */
2567                                cp = &rej->ibr_cp;
2568
2569                                if (flip) {
2570                                        __swab64s(&rej->ibr_incarnation);
2571                                        __swab16s(&cp->ibcp_queue_depth);
2572                                        __swab16s(&cp->ibcp_max_frags);
2573                                        __swab32s(&cp->ibcp_max_msg_size);
2574                                }
2575
2576                                incarnation = rej->ibr_incarnation;
2577                        }
2578
2579                        if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2580                            rej->ibr_magic != LNET_PROTO_MAGIC) {
2581                                CERROR("%s rejected: consumer defined fatal error\n",
2582                                       libcfs_nid2str(peer->ibp_nid));
2583                                break;
2584                        }
2585
2586                        if (rej->ibr_version != IBLND_MSG_VERSION &&
2587                            rej->ibr_version != IBLND_MSG_VERSION_1) {
2588                                CERROR("%s rejected: o2iblnd version %x error\n",
2589                                       libcfs_nid2str(peer->ibp_nid),
2590                                       rej->ibr_version);
2591                                break;
2592                        }
2593
2594                        if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2595                            rej->ibr_version == IBLND_MSG_VERSION_1) {
2596                                CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2597                                       libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2598
2599                                if (conn->ibc_version != IBLND_MSG_VERSION_1)
2600                                        rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2601                        }
2602
2603                        switch (rej->ibr_why) {
2604                        case IBLND_REJECT_CONN_RACE:
2605                        case IBLND_REJECT_CONN_STALE:
2606                        case IBLND_REJECT_CONN_UNCOMPAT:
2607                                kiblnd_reconnect(conn, rej->ibr_version,
2608                                                 incarnation, rej->ibr_why, cp);
2609                                break;
2610
2611                        case IBLND_REJECT_MSG_QUEUE_SIZE:
2612                                CERROR("%s rejected: incompatible message queue depth %d, %d\n",
2613                                       libcfs_nid2str(peer->ibp_nid),
2614                                       cp != NULL ? cp->ibcp_queue_depth :
2615                                       IBLND_MSG_QUEUE_SIZE(rej->ibr_version),
2616                                       IBLND_MSG_QUEUE_SIZE(conn->ibc_version));
2617                                break;
2618
2619                        case IBLND_REJECT_RDMA_FRAGS:
2620                                CERROR("%s rejected: incompatible # of RDMA fragments %d, %d\n",
2621                                       libcfs_nid2str(peer->ibp_nid),
2622                                       cp != NULL ? cp->ibcp_max_frags :
2623                                       IBLND_RDMA_FRAGS(rej->ibr_version),
2624                                       IBLND_RDMA_FRAGS(conn->ibc_version));
2625                                break;
2626
2627                        case IBLND_REJECT_NO_RESOURCES:
2628                                CERROR("%s rejected: o2iblnd no resources\n",
2629                                       libcfs_nid2str(peer->ibp_nid));
2630                                break;
2631
2632                        case IBLND_REJECT_FATAL:
2633                                CERROR("%s rejected: o2iblnd fatal error\n",
2634                                       libcfs_nid2str(peer->ibp_nid));
2635                                break;
2636
2637                        default:
2638                                CERROR("%s rejected: o2iblnd reason %d\n",
2639                                       libcfs_nid2str(peer->ibp_nid),
2640                                       rej->ibr_why);
2641                                break;
2642                        }
2643                        break;
2644                }
2645                /* fall through */
2646        default:
2647                CNETERR("%s rejected: reason %d, size %d\n",
2648                        libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2649                break;
2650        }
2651
2652        kiblnd_connreq_done(conn, -ECONNREFUSED);
2653}
2654
2655static void
2656kiblnd_check_connreply (kib_conn_t *conn, void *priv, int priv_nob)
2657{
2658        kib_peer_t    *peer = conn->ibc_peer;
2659        lnet_ni_t     *ni   = peer->ibp_ni;
2660        kib_net_t     *net  = ni->ni_data;
2661        kib_msg_t     *msg  = priv;
2662        int         ver  = conn->ibc_version;
2663        int         rc   = kiblnd_unpack_msg(msg, priv_nob);
2664        unsigned long  flags;
2665
2666        LASSERT (net != NULL);
2667
2668        if (rc != 0) {
2669                CERROR("Can't unpack connack from %s: %d\n",
2670                       libcfs_nid2str(peer->ibp_nid), rc);
2671                goto failed;
2672        }
2673
2674        if (msg->ibm_type != IBLND_MSG_CONNACK) {
2675                CERROR("Unexpected message %d from %s\n",
2676                       msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2677                rc = -EPROTO;
2678                goto failed;
2679        }
2680
2681        if (ver != msg->ibm_version) {
2682                CERROR("%s replied version %x is different with "
2683                       "requested version %x\n",
2684                       libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2685                rc = -EPROTO;
2686                goto failed;
2687        }
2688
2689        if (msg->ibm_u.connparams.ibcp_queue_depth !=
2690            IBLND_MSG_QUEUE_SIZE(ver)) {
2691                CERROR("%s has incompatible queue depth %d(%d wanted)\n",
2692                       libcfs_nid2str(peer->ibp_nid),
2693                       msg->ibm_u.connparams.ibcp_queue_depth,
2694                       IBLND_MSG_QUEUE_SIZE(ver));
2695                rc = -EPROTO;
2696                goto failed;
2697        }
2698
2699        if (msg->ibm_u.connparams.ibcp_max_frags !=
2700            IBLND_RDMA_FRAGS(ver)) {
2701                CERROR("%s has incompatible max_frags %d (%d wanted)\n",
2702                       libcfs_nid2str(peer->ibp_nid),
2703                       msg->ibm_u.connparams.ibcp_max_frags,
2704                       IBLND_RDMA_FRAGS(ver));
2705                rc = -EPROTO;
2706                goto failed;
2707        }
2708
2709        if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2710                CERROR("%s max message size %d too big (%d max)\n",
2711                       libcfs_nid2str(peer->ibp_nid),
2712                       msg->ibm_u.connparams.ibcp_max_msg_size,
2713                       IBLND_MSG_SIZE);
2714                rc = -EPROTO;
2715                goto failed;
2716        }
2717
2718        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2719        if (msg->ibm_dstnid == ni->ni_nid &&
2720            msg->ibm_dststamp == net->ibn_incarnation)
2721                rc = 0;
2722        else
2723                rc = -ESTALE;
2724        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2725
2726        if (rc != 0) {
2727                CERROR("Bad connection reply from %s, rc = %d, "
2728                       "version: %x max_frags: %d\n",
2729                       libcfs_nid2str(peer->ibp_nid), rc,
2730                       msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2731                goto failed;
2732        }
2733
2734        conn->ibc_incarnation      = msg->ibm_srcstamp;
2735        conn->ibc_credits         =
2736        conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE(ver);
2737        LASSERT (conn->ibc_credits + conn->ibc_reserved_credits + IBLND_OOB_MSGS(ver)
2738                 <= IBLND_RX_MSGS(ver));
2739
2740        kiblnd_connreq_done(conn, 0);
2741        return;
2742
2743 failed:
2744        /* NB My QP has already established itself, so I handle anything going
2745         * wrong here by setting ibc_comms_error.
2746         * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2747         * immediately tears it down. */
2748
2749        LASSERT (rc != 0);
2750        conn->ibc_comms_error = rc;
2751        kiblnd_connreq_done(conn, 0);
2752}
2753
2754static int
2755kiblnd_active_connect (struct rdma_cm_id *cmid)
2756{
2757        kib_peer_t            *peer = (kib_peer_t *)cmid->context;
2758        kib_conn_t            *conn;
2759        kib_msg_t              *msg;
2760        struct rdma_conn_param   cp;
2761        int                   version;
2762        __u64               incarnation;
2763        unsigned long       flags;
2764        int                   rc;
2765
2766        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2767
2768        incarnation = peer->ibp_incarnation;
2769        version     = (peer->ibp_version == 0) ? IBLND_MSG_VERSION :
2770                                                 peer->ibp_version;
2771
2772        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2773
2774        conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT, version);
2775        if (conn == NULL) {
2776                kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2777                kiblnd_peer_decref(peer); /* lose cmid's ref */
2778                return -ENOMEM;
2779        }
2780
2781        /* conn "owns" cmid now, so I return success from here on to ensure the
2782         * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2783         * on peer */
2784
2785        msg = &conn->ibc_connvars->cv_msg;
2786
2787        memset(msg, 0, sizeof(*msg));
2788        kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2789        msg->ibm_u.connparams.ibcp_queue_depth  = IBLND_MSG_QUEUE_SIZE(version);
2790        msg->ibm_u.connparams.ibcp_max_frags    = IBLND_RDMA_FRAGS(version);
2791        msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2792
2793        kiblnd_pack_msg(peer->ibp_ni, msg, version,
2794                        0, peer->ibp_nid, incarnation);
2795
2796        memset(&cp, 0, sizeof(cp));
2797        cp.private_data = msg;
2798        cp.private_data_len    = msg->ibm_nob;
2799        cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2800        cp.initiator_depth     = 0;
2801        cp.flow_control = 1;
2802        cp.retry_count   = *kiblnd_tunables.kib_retry_count;
2803        cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2804
2805        LASSERT(cmid->context == (void *)conn);
2806        LASSERT(conn->ibc_cmid == cmid);
2807
2808        rc = rdma_connect(cmid, &cp);
2809        if (rc != 0) {
2810                CERROR("Can't connect to %s: %d\n",
2811                       libcfs_nid2str(peer->ibp_nid), rc);
2812                kiblnd_connreq_done(conn, rc);
2813                kiblnd_conn_decref(conn);
2814        }
2815
2816        return 0;
2817}
2818
2819int
2820kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2821{
2822        kib_peer_t  *peer;
2823        kib_conn_t  *conn;
2824        int       rc;
2825
2826        switch (event->event) {
2827        default:
2828                CERROR("Unexpected event: %d, status: %d\n",
2829                       event->event, event->status);
2830                LBUG();
2831
2832        case RDMA_CM_EVENT_CONNECT_REQUEST:
2833                /* destroy cmid on failure */
2834                rc = kiblnd_passive_connect(cmid,
2835                                            (void *)KIBLND_CONN_PARAM(event),
2836                                            KIBLND_CONN_PARAM_LEN(event));
2837                CDEBUG(D_NET, "connreq: %d\n", rc);
2838                return rc;
2839
2840        case RDMA_CM_EVENT_ADDR_ERROR:
2841                peer = (kib_peer_t *)cmid->context;
2842                CNETERR("%s: ADDR ERROR %d\n",
2843                       libcfs_nid2str(peer->ibp_nid), event->status);
2844                kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2845                kiblnd_peer_decref(peer);
2846                return -EHOSTUNREACH;      /* rc != 0 destroys cmid */
2847
2848        case RDMA_CM_EVENT_ADDR_RESOLVED:
2849                peer = (kib_peer_t *)cmid->context;
2850
2851                CDEBUG(D_NET, "%s Addr resolved: %d\n",
2852                       libcfs_nid2str(peer->ibp_nid), event->status);
2853
2854                if (event->status != 0) {
2855                        CNETERR("Can't resolve address for %s: %d\n",
2856                                libcfs_nid2str(peer->ibp_nid), event->status);
2857                        rc = event->status;
2858                } else {
2859                        rc = rdma_resolve_route(
2860                                cmid, *kiblnd_tunables.kib_timeout * 1000);
2861                        if (rc == 0)
2862                                return 0;
2863                        /* Can't initiate route resolution */
2864                        CERROR("Can't resolve route for %s: %d\n",
2865                               libcfs_nid2str(peer->ibp_nid), rc);
2866                }
2867                kiblnd_peer_connect_failed(peer, 1, rc);
2868                kiblnd_peer_decref(peer);
2869                return rc;                    /* rc != 0 destroys cmid */
2870
2871        case RDMA_CM_EVENT_ROUTE_ERROR:
2872                peer = (kib_peer_t *)cmid->context;
2873                CNETERR("%s: ROUTE ERROR %d\n",
2874                        libcfs_nid2str(peer->ibp_nid), event->status);
2875                kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2876                kiblnd_peer_decref(peer);
2877                return -EHOSTUNREACH;      /* rc != 0 destroys cmid */
2878
2879        case RDMA_CM_EVENT_ROUTE_RESOLVED:
2880                peer = (kib_peer_t *)cmid->context;
2881                CDEBUG(D_NET, "%s Route resolved: %d\n",
2882                       libcfs_nid2str(peer->ibp_nid), event->status);
2883
2884                if (event->status == 0)
2885                        return kiblnd_active_connect(cmid);
2886
2887                CNETERR("Can't resolve route for %s: %d\n",
2888                       libcfs_nid2str(peer->ibp_nid), event->status);
2889                kiblnd_peer_connect_failed(peer, 1, event->status);
2890                kiblnd_peer_decref(peer);
2891                return event->status;      /* rc != 0 destroys cmid */
2892
2893        case RDMA_CM_EVENT_UNREACHABLE:
2894                conn = (kib_conn_t *)cmid->context;
2895                LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2896                        conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2897                CNETERR("%s: UNREACHABLE %d\n",
2898                       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2899                kiblnd_connreq_done(conn, -ENETDOWN);
2900                kiblnd_conn_decref(conn);
2901                return 0;
2902
2903        case RDMA_CM_EVENT_CONNECT_ERROR:
2904                conn = (kib_conn_t *)cmid->context;
2905                LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2906                        conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2907                CNETERR("%s: CONNECT ERROR %d\n",
2908                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2909                kiblnd_connreq_done(conn, -ENOTCONN);
2910                kiblnd_conn_decref(conn);
2911                return 0;
2912
2913        case RDMA_CM_EVENT_REJECTED:
2914                conn = (kib_conn_t *)cmid->context;
2915                switch (conn->ibc_state) {
2916                default:
2917                        LBUG();
2918
2919                case IBLND_CONN_PASSIVE_WAIT:
2920                        CERROR ("%s: REJECTED %d\n",
2921                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
2922                                event->status);
2923                        kiblnd_connreq_done(conn, -ECONNRESET);
2924                        break;
2925
2926                case IBLND_CONN_ACTIVE_CONNECT:
2927                        kiblnd_rejected(conn, event->status,
2928                                        (void *)KIBLND_CONN_PARAM(event),
2929                                        KIBLND_CONN_PARAM_LEN(event));
2930                        break;
2931                }
2932                kiblnd_conn_decref(conn);
2933                return 0;
2934
2935        case RDMA_CM_EVENT_ESTABLISHED:
2936                conn = (kib_conn_t *)cmid->context;
2937                switch (conn->ibc_state) {
2938                default:
2939                        LBUG();
2940
2941                case IBLND_CONN_PASSIVE_WAIT:
2942                        CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
2943                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
2944                        kiblnd_connreq_done(conn, 0);
2945                        break;
2946
2947                case IBLND_CONN_ACTIVE_CONNECT:
2948                        CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
2949                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
2950                        kiblnd_check_connreply(conn,
2951                                               (void *)KIBLND_CONN_PARAM(event),
2952                                               KIBLND_CONN_PARAM_LEN(event));
2953                        break;
2954                }
2955                /* net keeps its ref on conn! */
2956                return 0;
2957
2958        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
2959                CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
2960                return 0;
2961        case RDMA_CM_EVENT_DISCONNECTED:
2962                conn = (kib_conn_t *)cmid->context;
2963                if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
2964                        CERROR("%s DISCONNECTED\n",
2965                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
2966                        kiblnd_connreq_done(conn, -ECONNRESET);
2967                } else {
2968                        kiblnd_close_conn(conn, 0);
2969                }
2970                kiblnd_conn_decref(conn);
2971                cmid->context = NULL;
2972                return 0;
2973
2974        case RDMA_CM_EVENT_DEVICE_REMOVAL:
2975                LCONSOLE_ERROR_MSG(0x131,
2976                                   "Received notification of device removal\n"
2977                                   "Please shutdown LNET to allow this to proceed\n");
2978                /* Can't remove network from underneath LNET for now, so I have
2979                 * to ignore this */
2980                return 0;
2981
2982        case RDMA_CM_EVENT_ADDR_CHANGE:
2983                LCONSOLE_INFO("Physical link changed (eg hca/port)\n");
2984                return 0;
2985        }
2986}
2987
2988static int
2989kiblnd_check_txs_locked(kib_conn_t *conn, struct list_head *txs)
2990{
2991        kib_tx_t          *tx;
2992        struct list_head        *ttmp;
2993
2994        list_for_each (ttmp, txs) {
2995                tx = list_entry (ttmp, kib_tx_t, tx_list);
2996
2997                if (txs != &conn->ibc_active_txs) {
2998                        LASSERT (tx->tx_queued);
2999                } else {
3000                        LASSERT (!tx->tx_queued);
3001                        LASSERT (tx->tx_waiting || tx->tx_sending != 0);
3002                }
3003
3004                if (cfs_time_aftereq (jiffies, tx->tx_deadline)) {
3005                        CERROR("Timed out tx: %s, %lu seconds\n",
3006                               kiblnd_queue2str(conn, txs),
3007                               cfs_duration_sec(jiffies - tx->tx_deadline));
3008                        return 1;
3009                }
3010        }
3011
3012        return 0;
3013}
3014
3015static int
3016kiblnd_conn_timed_out_locked(kib_conn_t *conn)
3017{
3018        return  kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue) ||
3019                kiblnd_check_txs_locked(conn, &conn->ibc_tx_noops) ||
3020                kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_rsrvd) ||
3021                kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_nocred) ||
3022                kiblnd_check_txs_locked(conn, &conn->ibc_active_txs);
3023}
3024
3025static void
3026kiblnd_check_conns (int idx)
3027{
3028        LIST_HEAD (closes);
3029        LIST_HEAD (checksends);
3030        struct list_head    *peers = &kiblnd_data.kib_peers[idx];
3031        struct list_head    *ptmp;
3032        kib_peer_t    *peer;
3033        kib_conn_t    *conn;
3034        struct list_head    *ctmp;
3035        unsigned long  flags;
3036
3037        /* NB. We expect to have a look at all the peers and not find any
3038         * RDMAs to time out, so we just use a shared lock while we
3039         * take a look... */
3040        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3041
3042        list_for_each (ptmp, peers) {
3043                peer = list_entry (ptmp, kib_peer_t, ibp_list);
3044
3045                list_for_each (ctmp, &peer->ibp_conns) {
3046                        int timedout;
3047                        int sendnoop;
3048
3049                        conn = list_entry(ctmp, kib_conn_t, ibc_list);
3050
3051                        LASSERT (conn->ibc_state == IBLND_CONN_ESTABLISHED);
3052
3053                        spin_lock(&conn->ibc_lock);
3054
3055                        sendnoop = kiblnd_need_noop(conn);
3056                        timedout = kiblnd_conn_timed_out_locked(conn);
3057                        if (!sendnoop && !timedout) {
3058                                spin_unlock(&conn->ibc_lock);
3059                                continue;
3060                        }
3061
3062                        if (timedout) {
3063                                CERROR("Timed out RDMA with %s (%lu): "
3064                                       "c: %u, oc: %u, rc: %u\n",
3065                                       libcfs_nid2str(peer->ibp_nid),
3066                                       cfs_duration_sec(cfs_time_current() -
3067                                                        peer->ibp_last_alive),
3068                                       conn->ibc_credits,
3069                                       conn->ibc_outstanding_credits,
3070                                       conn->ibc_reserved_credits);
3071                                list_add(&conn->ibc_connd_list, &closes);
3072                        } else {
3073                                list_add(&conn->ibc_connd_list,
3074                                             &checksends);
3075                        }
3076                        /* +ref for 'closes' or 'checksends' */
3077                        kiblnd_conn_addref(conn);
3078
3079                        spin_unlock(&conn->ibc_lock);
3080                }
3081        }
3082
3083        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3084
3085        /* Handle timeout by closing the whole
3086         * connection. We can only be sure RDMA activity
3087         * has ceased once the QP has been modified. */
3088        while (!list_empty(&closes)) {
3089                conn = list_entry(closes.next,
3090                                      kib_conn_t, ibc_connd_list);
3091                list_del(&conn->ibc_connd_list);
3092                kiblnd_close_conn(conn, -ETIMEDOUT);
3093                kiblnd_conn_decref(conn);
3094        }
3095
3096        /* In case we have enough credits to return via a
3097         * NOOP, but there were no non-blocking tx descs
3098         * free to do it last time... */
3099        while (!list_empty(&checksends)) {
3100                conn = list_entry(checksends.next,
3101                                      kib_conn_t, ibc_connd_list);
3102                list_del(&conn->ibc_connd_list);
3103                kiblnd_check_sends(conn);
3104                kiblnd_conn_decref(conn);
3105        }
3106}
3107
3108static void
3109kiblnd_disconnect_conn (kib_conn_t *conn)
3110{
3111        LASSERT (!in_interrupt());
3112        LASSERT (current == kiblnd_data.kib_connd);
3113        LASSERT (conn->ibc_state == IBLND_CONN_CLOSING);
3114
3115        rdma_disconnect(conn->ibc_cmid);
3116        kiblnd_finalise_conn(conn);
3117
3118        kiblnd_peer_notify(conn->ibc_peer);
3119}
3120
3121int
3122kiblnd_connd (void *arg)
3123{
3124        wait_queue_t     wait;
3125        unsigned long      flags;
3126        kib_conn_t      *conn;
3127        int             timeout;
3128        int             i;
3129        int             dropped_lock;
3130        int             peer_index = 0;
3131        unsigned long      deadline = jiffies;
3132
3133        cfs_block_allsigs ();
3134
3135        init_waitqueue_entry(&wait, current);
3136        kiblnd_data.kib_connd = current;
3137
3138        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3139
3140        while (!kiblnd_data.kib_shutdown) {
3141
3142                dropped_lock = 0;
3143
3144                if (!list_empty (&kiblnd_data.kib_connd_zombies)) {
3145                        conn = list_entry(kiblnd_data. \
3146                                              kib_connd_zombies.next,
3147                                              kib_conn_t, ibc_list);
3148                        list_del(&conn->ibc_list);
3149
3150                        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock,
3151                                               flags);
3152                        dropped_lock = 1;
3153
3154                        kiblnd_destroy_conn(conn);
3155
3156                        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3157                }
3158
3159                if (!list_empty(&kiblnd_data.kib_connd_conns)) {
3160                        conn = list_entry(kiblnd_data.kib_connd_conns.next,
3161                                              kib_conn_t, ibc_list);
3162                        list_del(&conn->ibc_list);
3163
3164                        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock,
3165                                               flags);
3166                        dropped_lock = 1;
3167
3168                        kiblnd_disconnect_conn(conn);
3169                        kiblnd_conn_decref(conn);
3170
3171                        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3172                }
3173
3174                /* careful with the jiffy wrap... */
3175                timeout = (int)(deadline - jiffies);
3176                if (timeout <= 0) {
3177                        const int n = 4;
3178                        const int p = 1;
3179                        int       chunk = kiblnd_data.kib_peer_hash_size;
3180
3181                        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3182                        dropped_lock = 1;
3183
3184                        /* Time to check for RDMA timeouts on a few more
3185                         * peers: I do checks every 'p' seconds on a
3186                         * proportion of the peer table and I need to check
3187                         * every connection 'n' times within a timeout
3188                         * interval, to ensure I detect a timeout on any
3189                         * connection within (n+1)/n times the timeout
3190                         * interval. */
3191
3192                        if (*kiblnd_tunables.kib_timeout > n * p)
3193                                chunk = (chunk * n * p) /
3194                                        *kiblnd_tunables.kib_timeout;
3195                        if (chunk == 0)
3196                                chunk = 1;
3197
3198                        for (i = 0; i < chunk; i++) {
3199                                kiblnd_check_conns(peer_index);
3200                                peer_index = (peer_index + 1) %
3201                                             kiblnd_data.kib_peer_hash_size;
3202                        }
3203
3204                        deadline += p * HZ;
3205                        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3206                }
3207
3208                if (dropped_lock)
3209                        continue;
3210
3211                /* Nothing to do for 'timeout'  */
3212                set_current_state(TASK_INTERRUPTIBLE);
3213                add_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3214                spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3215
3216                schedule_timeout(timeout);
3217
3218                remove_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3219                spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3220        }
3221
3222        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3223
3224        kiblnd_thread_fini();
3225        return 0;
3226}
3227
3228void
3229kiblnd_qp_event(struct ib_event *event, void *arg)
3230{
3231        kib_conn_t *conn = arg;
3232
3233        switch (event->event) {
3234        case IB_EVENT_COMM_EST:
3235                CDEBUG(D_NET, "%s established\n",
3236                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
3237                return;
3238
3239        default:
3240                CERROR("%s: Async QP event type %d\n",
3241                       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3242                return;
3243        }
3244}
3245
3246static void
3247kiblnd_complete (struct ib_wc *wc)
3248{
3249        switch (kiblnd_wreqid2type(wc->wr_id)) {
3250        default:
3251                LBUG();
3252
3253        case IBLND_WID_RDMA:
3254                /* We only get RDMA completion notification if it fails.  All
3255                 * subsequent work items, including the final SEND will fail
3256                 * too.  However we can't print out any more info about the
3257                 * failing RDMA because 'tx' might be back on the idle list or
3258                 * even reused already if we didn't manage to post all our work
3259                 * items */
3260                CNETERR("RDMA (tx: %p) failed: %d\n",
3261                        kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3262                return;
3263
3264        case IBLND_WID_TX:
3265                kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3266                return;
3267
3268        case IBLND_WID_RX:
3269                kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3270                                   wc->byte_len);
3271                return;
3272        }
3273}
3274
3275void
3276kiblnd_cq_completion(struct ib_cq *cq, void *arg)
3277{
3278        /* NB I'm not allowed to schedule this conn once its refcount has
3279         * reached 0.  Since fundamentally I'm racing with scheduler threads
3280         * consuming my CQ I could be called after all completions have
3281         * occurred.  But in this case, ibc_nrx == 0 && ibc_nsends_posted == 0
3282         * and this CQ is about to be destroyed so I NOOP. */
3283        kib_conn_t              *conn = (kib_conn_t *)arg;
3284        struct kib_sched_info   *sched = conn->ibc_sched;
3285        unsigned long           flags;
3286
3287        LASSERT(cq == conn->ibc_cq);
3288
3289        spin_lock_irqsave(&sched->ibs_lock, flags);
3290
3291        conn->ibc_ready = 1;
3292
3293        if (!conn->ibc_scheduled &&
3294            (conn->ibc_nrx > 0 ||
3295             conn->ibc_nsends_posted > 0)) {
3296                kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3297                conn->ibc_scheduled = 1;
3298                list_add_tail(&conn->ibc_sched_list, &sched->ibs_conns);
3299
3300                if (waitqueue_active(&sched->ibs_waitq))
3301                        wake_up(&sched->ibs_waitq);
3302        }
3303
3304        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3305}
3306
3307void
3308kiblnd_cq_event(struct ib_event *event, void *arg)
3309{
3310        kib_conn_t *conn = arg;
3311
3312        CERROR("%s: async CQ event type %d\n",
3313               libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3314}
3315
3316int
3317kiblnd_scheduler(void *arg)
3318{
3319        long                    id = (long)arg;
3320        struct kib_sched_info   *sched;
3321        kib_conn_t              *conn;
3322        wait_queue_t            wait;
3323        unsigned long           flags;
3324        struct ib_wc            wc;
3325        int                     did_something;
3326        int                     busy_loops = 0;
3327        int                     rc;
3328
3329        cfs_block_allsigs();
3330
3331        init_waitqueue_entry(&wait, current);
3332
3333        sched = kiblnd_data.kib_scheds[KIB_THREAD_CPT(id)];
3334
3335        rc = cfs_cpt_bind(lnet_cpt_table(), sched->ibs_cpt);
3336        if (rc != 0) {
3337                CWARN("Failed to bind on CPT %d, please verify whether "
3338                      "all CPUs are healthy and reload modules if necessary, "
3339                      "otherwise your system might under risk of low "
3340                      "performance\n", sched->ibs_cpt);
3341        }
3342
3343        spin_lock_irqsave(&sched->ibs_lock, flags);
3344
3345        while (!kiblnd_data.kib_shutdown) {
3346                if (busy_loops++ >= IBLND_RESCHED) {
3347                        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3348
3349                        cond_resched();
3350                        busy_loops = 0;
3351
3352                        spin_lock_irqsave(&sched->ibs_lock, flags);
3353                }
3354
3355                did_something = 0;
3356
3357                if (!list_empty(&sched->ibs_conns)) {
3358                        conn = list_entry(sched->ibs_conns.next,
3359                                              kib_conn_t, ibc_sched_list);
3360                        /* take over kib_sched_conns' ref on conn... */
3361                        LASSERT(conn->ibc_scheduled);
3362                        list_del(&conn->ibc_sched_list);
3363                        conn->ibc_ready = 0;
3364
3365                        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3366
3367                        rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3368                        if (rc == 0) {
3369                                rc = ib_req_notify_cq(conn->ibc_cq,
3370                                                      IB_CQ_NEXT_COMP);
3371                                if (rc < 0) {
3372                                        CWARN("%s: ib_req_notify_cq failed: %d, "
3373                                              "closing connection\n",
3374                                              libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3375                                        kiblnd_close_conn(conn, -EIO);
3376                                        kiblnd_conn_decref(conn);
3377                                        spin_lock_irqsave(&sched->ibs_lock,
3378                                                              flags);
3379                                        continue;
3380                                }
3381
3382                                rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3383                        }
3384
3385                        if (rc < 0) {
3386                                CWARN("%s: ib_poll_cq failed: %d, "
3387                                      "closing connection\n",
3388                                      libcfs_nid2str(conn->ibc_peer->ibp_nid),
3389                                      rc);
3390                                kiblnd_close_conn(conn, -EIO);
3391                                kiblnd_conn_decref(conn);
3392                                spin_lock_irqsave(&sched->ibs_lock, flags);
3393                                continue;
3394                        }
3395
3396                        spin_lock_irqsave(&sched->ibs_lock, flags);
3397
3398                        if (rc != 0 || conn->ibc_ready) {
3399                                /* There may be another completion waiting; get
3400                                 * another scheduler to check while I handle
3401                                 * this one... */
3402                                /* +1 ref for sched_conns */
3403                                kiblnd_conn_addref(conn);
3404                                list_add_tail(&conn->ibc_sched_list,
3405                                                  &sched->ibs_conns);
3406                                if (waitqueue_active(&sched->ibs_waitq))
3407                                        wake_up(&sched->ibs_waitq);
3408                        } else {
3409                                conn->ibc_scheduled = 0;
3410                        }
3411
3412                        if (rc != 0) {
3413                                spin_unlock_irqrestore(&sched->ibs_lock, flags);
3414                                kiblnd_complete(&wc);
3415
3416                                spin_lock_irqsave(&sched->ibs_lock, flags);
3417                        }
3418
3419                        kiblnd_conn_decref(conn); /* ...drop my ref from above */
3420                        did_something = 1;
3421                }
3422
3423                if (did_something)
3424                        continue;
3425
3426                set_current_state(TASK_INTERRUPTIBLE);
3427                add_wait_queue_exclusive(&sched->ibs_waitq, &wait);
3428                spin_unlock_irqrestore(&sched->ibs_lock, flags);
3429
3430                schedule();
3431                busy_loops = 0;
3432
3433                remove_wait_queue(&sched->ibs_waitq, &wait);
3434                spin_lock_irqsave(&sched->ibs_lock, flags);
3435        }
3436
3437        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3438
3439        kiblnd_thread_fini();
3440        return 0;
3441}
3442
3443int
3444kiblnd_failover_thread(void *arg)
3445{
3446        rwlock_t                *glock = &kiblnd_data.kib_global_lock;
3447        kib_dev_t        *dev;
3448        wait_queue_t     wait;
3449        unsigned long      flags;
3450        int             rc;
3451
3452        LASSERT (*kiblnd_tunables.kib_dev_failover != 0);
3453
3454        cfs_block_allsigs ();
3455
3456        init_waitqueue_entry(&wait, current);
3457        write_lock_irqsave(glock, flags);
3458
3459        while (!kiblnd_data.kib_shutdown) {
3460                int     do_failover = 0;
3461                int     long_sleep;
3462
3463                list_for_each_entry(dev, &kiblnd_data.kib_failed_devs,
3464                                    ibd_fail_list) {
3465                        if (time_before(cfs_time_current(),
3466                                        dev->ibd_next_failover))
3467                                continue;
3468                        do_failover = 1;
3469                        break;
3470                }
3471
3472                if (do_failover) {
3473                        list_del_init(&dev->ibd_fail_list);
3474                        dev->ibd_failover = 1;
3475                        write_unlock_irqrestore(glock, flags);
3476
3477                        rc = kiblnd_dev_failover(dev);
3478
3479                        write_lock_irqsave(glock, flags);
3480
3481                        LASSERT (dev->ibd_failover);
3482                        dev->ibd_failover = 0;
3483                        if (rc >= 0) { /* Device is OK or failover succeed */
3484                                dev->ibd_next_failover = cfs_time_shift(3);
3485                                continue;
3486                        }
3487
3488                        /* failed to failover, retry later */
3489                        dev->ibd_next_failover =
3490                                cfs_time_shift(min(dev->ibd_failed_failover, 10));
3491                        if (kiblnd_dev_can_failover(dev)) {
3492                                list_add_tail(&dev->ibd_fail_list,
3493                                              &kiblnd_data.kib_failed_devs);
3494                        }
3495
3496                        continue;
3497                }
3498
3499                /* long sleep if no more pending failover */
3500                long_sleep = list_empty(&kiblnd_data.kib_failed_devs);
3501
3502                set_current_state(TASK_INTERRUPTIBLE);
3503                add_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3504                write_unlock_irqrestore(glock, flags);
3505
3506                rc = schedule_timeout(long_sleep ? cfs_time_seconds(10) :
3507                                                   cfs_time_seconds(1));
3508                remove_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3509                write_lock_irqsave(glock, flags);
3510
3511                if (!long_sleep || rc != 0)
3512                        continue;
3513
3514                /* have a long sleep, routine check all active devices,
3515                 * we need checking like this because if there is not active
3516                 * connection on the dev and no SEND from local, we may listen
3517                 * on wrong HCA for ever while there is a bonding failover */
3518                list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3519                        if (kiblnd_dev_can_failover(dev)) {
3520                                list_add_tail(&dev->ibd_fail_list,
3521                                              &kiblnd_data.kib_failed_devs);
3522                        }
3523                }
3524        }
3525
3526        write_unlock_irqrestore(glock, flags);
3527
3528        kiblnd_thread_fini();
3529        return 0;
3530}
3531