linux/drivers/staging/lustre/lnet/klnds/o2iblnd/o2iblnd_cb.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2012, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 *
  32 * lnet/klnds/o2iblnd/o2iblnd_cb.c
  33 *
  34 * Author: Eric Barton <eric@bartonsoftware.com>
  35 */
  36
  37#include "o2iblnd.h"
  38
  39#define MAX_CONN_RACES_BEFORE_ABORT 20
  40
  41static void kiblnd_peer_alive(struct kib_peer *peer);
  42static void kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error);
  43static void kiblnd_init_tx_msg(lnet_ni_t *ni, struct kib_tx *tx,
  44                               int type, int body_nob);
  45static int kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
  46                            int resid, struct kib_rdma_desc *dstrd,
  47                            __u64 dstcookie);
  48static void kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn);
  49static void kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn);
  50static void kiblnd_unmap_tx(lnet_ni_t *ni, struct kib_tx *tx);
  51static void kiblnd_check_sends_locked(struct kib_conn *conn);
  52
  53static void
  54kiblnd_tx_done(lnet_ni_t *ni, struct kib_tx *tx)
  55{
  56        lnet_msg_t *lntmsg[2];
  57        struct kib_net *net = ni->ni_data;
  58        int rc;
  59        int i;
  60
  61        LASSERT(net);
  62        LASSERT(!in_interrupt());
  63        LASSERT(!tx->tx_queued);               /* mustn't be queued for sending */
  64        LASSERT(!tx->tx_sending);         /* mustn't be awaiting sent callback */
  65        LASSERT(!tx->tx_waiting);             /* mustn't be awaiting peer response */
  66        LASSERT(tx->tx_pool);
  67
  68        kiblnd_unmap_tx(ni, tx);
  69
  70        /* tx may have up to 2 lnet msgs to finalise */
  71        lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
  72        lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
  73        rc = tx->tx_status;
  74
  75        if (tx->tx_conn) {
  76                LASSERT(ni == tx->tx_conn->ibc_peer->ibp_ni);
  77
  78                kiblnd_conn_decref(tx->tx_conn);
  79                tx->tx_conn = NULL;
  80        }
  81
  82        tx->tx_nwrq = 0;
  83        tx->tx_status = 0;
  84
  85        kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
  86
  87        /* delay finalize until my descs have been freed */
  88        for (i = 0; i < 2; i++) {
  89                if (!lntmsg[i])
  90                        continue;
  91
  92                lnet_finalize(ni, lntmsg[i], rc);
  93        }
  94}
  95
  96void
  97kiblnd_txlist_done(lnet_ni_t *ni, struct list_head *txlist, int status)
  98{
  99        struct kib_tx *tx;
 100
 101        while (!list_empty(txlist)) {
 102                tx = list_entry(txlist->next, struct kib_tx, tx_list);
 103
 104                list_del(&tx->tx_list);
 105                /* complete now */
 106                tx->tx_waiting = 0;
 107                tx->tx_status = status;
 108                kiblnd_tx_done(ni, tx);
 109        }
 110}
 111
 112static struct kib_tx *
 113kiblnd_get_idle_tx(lnet_ni_t *ni, lnet_nid_t target)
 114{
 115        struct kib_net *net = (struct kib_net *)ni->ni_data;
 116        struct list_head *node;
 117        struct kib_tx *tx;
 118        struct kib_tx_poolset *tps;
 119
 120        tps = net->ibn_tx_ps[lnet_cpt_of_nid(target)];
 121        node = kiblnd_pool_alloc_node(&tps->tps_poolset);
 122        if (!node)
 123                return NULL;
 124        tx = list_entry(node, struct kib_tx, tx_list);
 125
 126        LASSERT(!tx->tx_nwrq);
 127        LASSERT(!tx->tx_queued);
 128        LASSERT(!tx->tx_sending);
 129        LASSERT(!tx->tx_waiting);
 130        LASSERT(!tx->tx_status);
 131        LASSERT(!tx->tx_conn);
 132        LASSERT(!tx->tx_lntmsg[0]);
 133        LASSERT(!tx->tx_lntmsg[1]);
 134        LASSERT(!tx->tx_nfrags);
 135
 136        return tx;
 137}
 138
 139static void
 140kiblnd_drop_rx(struct kib_rx *rx)
 141{
 142        struct kib_conn *conn = rx->rx_conn;
 143        struct kib_sched_info *sched = conn->ibc_sched;
 144        unsigned long flags;
 145
 146        spin_lock_irqsave(&sched->ibs_lock, flags);
 147        LASSERT(conn->ibc_nrx > 0);
 148        conn->ibc_nrx--;
 149        spin_unlock_irqrestore(&sched->ibs_lock, flags);
 150
 151        kiblnd_conn_decref(conn);
 152}
 153
 154int
 155kiblnd_post_rx(struct kib_rx *rx, int credit)
 156{
 157        struct kib_conn *conn = rx->rx_conn;
 158        struct kib_net *net = conn->ibc_peer->ibp_ni->ni_data;
 159        struct ib_recv_wr *bad_wrq = NULL;
 160        struct ib_mr *mr = conn->ibc_hdev->ibh_mrs;
 161        int rc;
 162
 163        LASSERT(net);
 164        LASSERT(!in_interrupt());
 165        LASSERT(credit == IBLND_POSTRX_NO_CREDIT ||
 166                credit == IBLND_POSTRX_PEER_CREDIT ||
 167                credit == IBLND_POSTRX_RSRVD_CREDIT);
 168        LASSERT(mr);
 169
 170        rx->rx_sge.lkey   = mr->lkey;
 171        rx->rx_sge.addr   = rx->rx_msgaddr;
 172        rx->rx_sge.length = IBLND_MSG_SIZE;
 173
 174        rx->rx_wrq.next    = NULL;
 175        rx->rx_wrq.sg_list = &rx->rx_sge;
 176        rx->rx_wrq.num_sge = 1;
 177        rx->rx_wrq.wr_id   = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
 178
 179        LASSERT(conn->ibc_state >= IBLND_CONN_INIT);
 180        LASSERT(rx->rx_nob >= 0);             /* not posted */
 181
 182        if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
 183                kiblnd_drop_rx(rx);          /* No more posts for this rx */
 184                return 0;
 185        }
 186
 187        rx->rx_nob = -1;                        /* flag posted */
 188
 189        /* NB: need an extra reference after ib_post_recv because we don't
 190         * own this rx (and rx::rx_conn) anymore, LU-5678.
 191         */
 192        kiblnd_conn_addref(conn);
 193        rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
 194        if (unlikely(rc)) {
 195                CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
 196                       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
 197                rx->rx_nob = 0;
 198        }
 199
 200        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
 201                goto out;
 202
 203        if (unlikely(rc)) {
 204                kiblnd_close_conn(conn, rc);
 205                kiblnd_drop_rx(rx);          /* No more posts for this rx */
 206                goto out;
 207        }
 208
 209        if (credit == IBLND_POSTRX_NO_CREDIT)
 210                goto out;
 211
 212        spin_lock(&conn->ibc_lock);
 213        if (credit == IBLND_POSTRX_PEER_CREDIT)
 214                conn->ibc_outstanding_credits++;
 215        else
 216                conn->ibc_reserved_credits++;
 217        kiblnd_check_sends_locked(conn);
 218        spin_unlock(&conn->ibc_lock);
 219
 220out:
 221        kiblnd_conn_decref(conn);
 222        return rc;
 223}
 224
 225static struct kib_tx *
 226kiblnd_find_waiting_tx_locked(struct kib_conn *conn, int txtype, __u64 cookie)
 227{
 228        struct list_head *tmp;
 229
 230        list_for_each(tmp, &conn->ibc_active_txs) {
 231                struct kib_tx *tx = list_entry(tmp, struct kib_tx, tx_list);
 232
 233                LASSERT(!tx->tx_queued);
 234                LASSERT(tx->tx_sending || tx->tx_waiting);
 235
 236                if (tx->tx_cookie != cookie)
 237                        continue;
 238
 239                if (tx->tx_waiting &&
 240                    tx->tx_msg->ibm_type == txtype)
 241                        return tx;
 242
 243                CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
 244                      tx->tx_waiting ? "" : "NOT ",
 245                      tx->tx_msg->ibm_type, txtype);
 246        }
 247        return NULL;
 248}
 249
 250static void
 251kiblnd_handle_completion(struct kib_conn *conn, int txtype, int status, __u64 cookie)
 252{
 253        struct kib_tx *tx;
 254        lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
 255        int idle;
 256
 257        spin_lock(&conn->ibc_lock);
 258
 259        tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
 260        if (!tx) {
 261                spin_unlock(&conn->ibc_lock);
 262
 263                CWARN("Unmatched completion type %x cookie %#llx from %s\n",
 264                      txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 265                kiblnd_close_conn(conn, -EPROTO);
 266                return;
 267        }
 268
 269        if (!tx->tx_status) {          /* success so far */
 270                if (status < 0) /* failed? */
 271                        tx->tx_status = status;
 272                else if (txtype == IBLND_MSG_GET_REQ)
 273                        lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
 274        }
 275
 276        tx->tx_waiting = 0;
 277
 278        idle = !tx->tx_queued && !tx->tx_sending;
 279        if (idle)
 280                list_del(&tx->tx_list);
 281
 282        spin_unlock(&conn->ibc_lock);
 283
 284        if (idle)
 285                kiblnd_tx_done(ni, tx);
 286}
 287
 288static void
 289kiblnd_send_completion(struct kib_conn *conn, int type, int status, __u64 cookie)
 290{
 291        lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
 292        struct kib_tx *tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
 293
 294        if (!tx) {
 295                CERROR("Can't get tx for completion %x for %s\n",
 296                       type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 297                return;
 298        }
 299
 300        tx->tx_msg->ibm_u.completion.ibcm_status = status;
 301        tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
 302        kiblnd_init_tx_msg(ni, tx, type, sizeof(struct kib_completion_msg));
 303
 304        kiblnd_queue_tx(tx, conn);
 305}
 306
 307static void
 308kiblnd_handle_rx(struct kib_rx *rx)
 309{
 310        struct kib_msg *msg = rx->rx_msg;
 311        struct kib_conn *conn = rx->rx_conn;
 312        lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
 313        int credits = msg->ibm_credits;
 314        struct kib_tx *tx;
 315        int rc = 0;
 316        int rc2;
 317        int post_credit;
 318
 319        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
 320
 321        CDEBUG(D_NET, "Received %x[%d] from %s\n",
 322               msg->ibm_type, credits,
 323               libcfs_nid2str(conn->ibc_peer->ibp_nid));
 324
 325        if (credits) {
 326                /* Have I received credits that will let me send? */
 327                spin_lock(&conn->ibc_lock);
 328
 329                if (conn->ibc_credits + credits >
 330                    conn->ibc_queue_depth) {
 331                        rc2 = conn->ibc_credits;
 332                        spin_unlock(&conn->ibc_lock);
 333
 334                        CERROR("Bad credits from %s: %d + %d > %d\n",
 335                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
 336                               rc2, credits, conn->ibc_queue_depth);
 337
 338                        kiblnd_close_conn(conn, -EPROTO);
 339                        kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
 340                        return;
 341                }
 342
 343                conn->ibc_credits += credits;
 344
 345                /* This ensures the credit taken by NOOP can be returned */
 346                if (msg->ibm_type == IBLND_MSG_NOOP &&
 347                    !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
 348                        conn->ibc_outstanding_credits++;
 349
 350                kiblnd_check_sends_locked(conn);
 351                spin_unlock(&conn->ibc_lock);
 352        }
 353
 354        switch (msg->ibm_type) {
 355        default:
 356                CERROR("Bad IBLND message type %x from %s\n",
 357                       msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 358                post_credit = IBLND_POSTRX_NO_CREDIT;
 359                rc = -EPROTO;
 360                break;
 361
 362        case IBLND_MSG_NOOP:
 363                if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
 364                        post_credit = IBLND_POSTRX_NO_CREDIT;
 365                        break;
 366                }
 367
 368                if (credits) /* credit already posted */
 369                        post_credit = IBLND_POSTRX_NO_CREDIT;
 370                else          /* a keepalive NOOP */
 371                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 372                break;
 373
 374        case IBLND_MSG_IMMEDIATE:
 375                post_credit = IBLND_POSTRX_DONT_POST;
 376                rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
 377                                msg->ibm_srcnid, rx, 0);
 378                if (rc < 0)                  /* repost on error */
 379                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 380                break;
 381
 382        case IBLND_MSG_PUT_REQ:
 383                post_credit = IBLND_POSTRX_DONT_POST;
 384                rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
 385                                msg->ibm_srcnid, rx, 1);
 386                if (rc < 0)                  /* repost on error */
 387                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 388                break;
 389
 390        case IBLND_MSG_PUT_NAK:
 391                CWARN("PUT_NACK from %s\n",
 392                      libcfs_nid2str(conn->ibc_peer->ibp_nid));
 393                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 394                kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
 395                                         msg->ibm_u.completion.ibcm_status,
 396                                         msg->ibm_u.completion.ibcm_cookie);
 397                break;
 398
 399        case IBLND_MSG_PUT_ACK:
 400                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 401
 402                spin_lock(&conn->ibc_lock);
 403                tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
 404                                                   msg->ibm_u.putack.ibpam_src_cookie);
 405                if (tx)
 406                        list_del(&tx->tx_list);
 407                spin_unlock(&conn->ibc_lock);
 408
 409                if (!tx) {
 410                        CERROR("Unmatched PUT_ACK from %s\n",
 411                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
 412                        rc = -EPROTO;
 413                        break;
 414                }
 415
 416                LASSERT(tx->tx_waiting);
 417                /*
 418                 * CAVEAT EMPTOR: I could be racing with tx_complete, but...
 419                 * (a) I can overwrite tx_msg since my peer has received it!
 420                 * (b) tx_waiting set tells tx_complete() it's not done.
 421                 */
 422                tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
 423
 424                rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
 425                                       kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
 426                                       &msg->ibm_u.putack.ibpam_rd,
 427                                       msg->ibm_u.putack.ibpam_dst_cookie);
 428                if (rc2 < 0)
 429                        CERROR("Can't setup rdma for PUT to %s: %d\n",
 430                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
 431
 432                spin_lock(&conn->ibc_lock);
 433                tx->tx_waiting = 0;     /* clear waiting and queue atomically */
 434                kiblnd_queue_tx_locked(tx, conn);
 435                spin_unlock(&conn->ibc_lock);
 436                break;
 437
 438        case IBLND_MSG_PUT_DONE:
 439                post_credit = IBLND_POSTRX_PEER_CREDIT;
 440                kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
 441                                         msg->ibm_u.completion.ibcm_status,
 442                                         msg->ibm_u.completion.ibcm_cookie);
 443                break;
 444
 445        case IBLND_MSG_GET_REQ:
 446                post_credit = IBLND_POSTRX_DONT_POST;
 447                rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
 448                                msg->ibm_srcnid, rx, 1);
 449                if (rc < 0)                  /* repost on error */
 450                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 451                break;
 452
 453        case IBLND_MSG_GET_DONE:
 454                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 455                kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
 456                                         msg->ibm_u.completion.ibcm_status,
 457                                         msg->ibm_u.completion.ibcm_cookie);
 458                break;
 459        }
 460
 461        if (rc < 0)                          /* protocol error */
 462                kiblnd_close_conn(conn, rc);
 463
 464        if (post_credit != IBLND_POSTRX_DONT_POST)
 465                kiblnd_post_rx(rx, post_credit);
 466}
 467
 468static void
 469kiblnd_rx_complete(struct kib_rx *rx, int status, int nob)
 470{
 471        struct kib_msg *msg = rx->rx_msg;
 472        struct kib_conn *conn = rx->rx_conn;
 473        lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
 474        struct kib_net *net = ni->ni_data;
 475        int rc;
 476        int err = -EIO;
 477
 478        LASSERT(net);
 479        LASSERT(rx->rx_nob < 0);               /* was posted */
 480        rx->rx_nob = 0;                  /* isn't now */
 481
 482        if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
 483                goto ignore;
 484
 485        if (status != IB_WC_SUCCESS) {
 486                CNETERR("Rx from %s failed: %d\n",
 487                        libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
 488                goto failed;
 489        }
 490
 491        LASSERT(nob >= 0);
 492        rx->rx_nob = nob;
 493
 494        rc = kiblnd_unpack_msg(msg, rx->rx_nob);
 495        if (rc) {
 496                CERROR("Error %d unpacking rx from %s\n",
 497                       rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 498                goto failed;
 499        }
 500
 501        if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
 502            msg->ibm_dstnid != ni->ni_nid ||
 503            msg->ibm_srcstamp != conn->ibc_incarnation ||
 504            msg->ibm_dststamp != net->ibn_incarnation) {
 505                CERROR("Stale rx from %s\n",
 506                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
 507                err = -ESTALE;
 508                goto failed;
 509        }
 510
 511        /* set time last known alive */
 512        kiblnd_peer_alive(conn->ibc_peer);
 513
 514        /* racing with connection establishment/teardown! */
 515
 516        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 517                rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
 518                unsigned long flags;
 519
 520                write_lock_irqsave(g_lock, flags);
 521                /* must check holding global lock to eliminate race */
 522                if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 523                        list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
 524                        write_unlock_irqrestore(g_lock, flags);
 525                        return;
 526                }
 527                write_unlock_irqrestore(g_lock, flags);
 528        }
 529        kiblnd_handle_rx(rx);
 530        return;
 531
 532 failed:
 533        CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
 534        kiblnd_close_conn(conn, err);
 535 ignore:
 536        kiblnd_drop_rx(rx);                  /* Don't re-post rx. */
 537}
 538
 539static struct page *
 540kiblnd_kvaddr_to_page(unsigned long vaddr)
 541{
 542        struct page *page;
 543
 544        if (is_vmalloc_addr((void *)vaddr)) {
 545                page = vmalloc_to_page((void *)vaddr);
 546                LASSERT(page);
 547                return page;
 548        }
 549#ifdef CONFIG_HIGHMEM
 550        if (vaddr >= PKMAP_BASE &&
 551            vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
 552                /* No highmem pages only used for bulk (kiov) I/O */
 553                CERROR("find page for address in highmem\n");
 554                LBUG();
 555        }
 556#endif
 557        page = virt_to_page(vaddr);
 558        LASSERT(page);
 559        return page;
 560}
 561
 562static int
 563kiblnd_fmr_map_tx(struct kib_net *net, struct kib_tx *tx, struct kib_rdma_desc *rd, __u32 nob)
 564{
 565        struct kib_hca_dev *hdev;
 566        struct kib_fmr_poolset *fps;
 567        int cpt;
 568        int rc;
 569
 570        LASSERT(tx->tx_pool);
 571        LASSERT(tx->tx_pool->tpo_pool.po_owner);
 572
 573        hdev = tx->tx_pool->tpo_hdev;
 574        cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
 575
 576        fps = net->ibn_fmr_ps[cpt];
 577        rc = kiblnd_fmr_pool_map(fps, tx, rd, nob, 0, &tx->fmr);
 578        if (rc) {
 579                CERROR("Can't map %u bytes: %d\n", nob, rc);
 580                return rc;
 581        }
 582
 583        /*
 584         * If rd is not tx_rd, it's going to get sent to a peer, who will need
 585         * the rkey
 586         */
 587        rd->rd_key = tx->fmr.fmr_key;
 588        rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
 589        rd->rd_frags[0].rf_nob = nob;
 590        rd->rd_nfrags = 1;
 591
 592        return 0;
 593}
 594
 595static void kiblnd_unmap_tx(lnet_ni_t *ni, struct kib_tx *tx)
 596{
 597        struct kib_net *net = ni->ni_data;
 598
 599        LASSERT(net);
 600
 601        if (net->ibn_fmr_ps)
 602                kiblnd_fmr_pool_unmap(&tx->fmr, tx->tx_status);
 603
 604        if (tx->tx_nfrags) {
 605                kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
 606                                    tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
 607                tx->tx_nfrags = 0;
 608        }
 609}
 610
 611static int kiblnd_map_tx(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
 612                         int nfrags)
 613{
 614        struct kib_net *net = ni->ni_data;
 615        struct kib_hca_dev *hdev = net->ibn_dev->ibd_hdev;
 616        struct ib_mr *mr    = NULL;
 617        __u32 nob;
 618        int i;
 619
 620        /*
 621         * If rd is not tx_rd, it's going to get sent to a peer and I'm the
 622         * RDMA sink
 623         */
 624        tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
 625        tx->tx_nfrags = nfrags;
 626
 627        rd->rd_nfrags = kiblnd_dma_map_sg(hdev->ibh_ibdev, tx->tx_frags,
 628                                          tx->tx_nfrags, tx->tx_dmadir);
 629
 630        for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
 631                rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
 632                        hdev->ibh_ibdev, &tx->tx_frags[i]);
 633                rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
 634                        hdev->ibh_ibdev, &tx->tx_frags[i]);
 635                nob += rd->rd_frags[i].rf_nob;
 636        }
 637
 638        mr = kiblnd_find_rd_dma_mr(ni, rd, tx->tx_conn ?
 639                                   tx->tx_conn->ibc_max_frags : -1);
 640        if (mr) {
 641                /* found pre-mapping MR */
 642                rd->rd_key = (rd != tx->tx_rd) ? mr->rkey : mr->lkey;
 643                return 0;
 644        }
 645
 646        if (net->ibn_fmr_ps)
 647                return kiblnd_fmr_map_tx(net, tx, rd, nob);
 648
 649        return -EINVAL;
 650}
 651
 652static int
 653kiblnd_setup_rd_iov(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
 654                    unsigned int niov, const struct kvec *iov, int offset, int nob)
 655{
 656        struct kib_net *net = ni->ni_data;
 657        struct page *page;
 658        struct scatterlist *sg;
 659        unsigned long vaddr;
 660        int fragnob;
 661        int page_offset;
 662
 663        LASSERT(nob > 0);
 664        LASSERT(niov > 0);
 665        LASSERT(net);
 666
 667        while (offset >= iov->iov_len) {
 668                offset -= iov->iov_len;
 669                niov--;
 670                iov++;
 671                LASSERT(niov > 0);
 672        }
 673
 674        sg = tx->tx_frags;
 675        do {
 676                LASSERT(niov > 0);
 677
 678                vaddr = ((unsigned long)iov->iov_base) + offset;
 679                page_offset = vaddr & (PAGE_SIZE - 1);
 680                page = kiblnd_kvaddr_to_page(vaddr);
 681                if (!page) {
 682                        CERROR("Can't find page\n");
 683                        return -EFAULT;
 684                }
 685
 686                fragnob = min((int)(iov->iov_len - offset), nob);
 687                fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
 688
 689                sg_set_page(sg, page, fragnob, page_offset);
 690                sg = sg_next(sg);
 691                if (!sg) {
 692                        CERROR("lacking enough sg entries to map tx\n");
 693                        return -EFAULT;
 694                }
 695
 696                if (offset + fragnob < iov->iov_len) {
 697                        offset += fragnob;
 698                } else {
 699                        offset = 0;
 700                        iov++;
 701                        niov--;
 702                }
 703                nob -= fragnob;
 704        } while (nob > 0);
 705
 706        return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
 707}
 708
 709static int
 710kiblnd_setup_rd_kiov(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
 711                     int nkiov, const lnet_kiov_t *kiov, int offset, int nob)
 712{
 713        struct kib_net *net = ni->ni_data;
 714        struct scatterlist *sg;
 715        int fragnob;
 716
 717        CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
 718
 719        LASSERT(nob > 0);
 720        LASSERT(nkiov > 0);
 721        LASSERT(net);
 722
 723        while (offset >= kiov->bv_len) {
 724                offset -= kiov->bv_len;
 725                nkiov--;
 726                kiov++;
 727                LASSERT(nkiov > 0);
 728        }
 729
 730        sg = tx->tx_frags;
 731        do {
 732                LASSERT(nkiov > 0);
 733
 734                fragnob = min((int)(kiov->bv_len - offset), nob);
 735
 736                sg_set_page(sg, kiov->bv_page, fragnob,
 737                            kiov->bv_offset + offset);
 738                sg = sg_next(sg);
 739                if (!sg) {
 740                        CERROR("lacking enough sg entries to map tx\n");
 741                        return -EFAULT;
 742                }
 743
 744                offset = 0;
 745                kiov++;
 746                nkiov--;
 747                nob -= fragnob;
 748        } while (nob > 0);
 749
 750        return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
 751}
 752
 753static int
 754kiblnd_post_tx_locked(struct kib_conn *conn, struct kib_tx *tx, int credit)
 755        __must_hold(&conn->ibc_lock)
 756{
 757        struct kib_msg *msg = tx->tx_msg;
 758        struct kib_peer *peer = conn->ibc_peer;
 759        struct lnet_ni *ni = peer->ibp_ni;
 760        int ver = conn->ibc_version;
 761        int rc;
 762        int done;
 763
 764        LASSERT(tx->tx_queued);
 765        /* We rely on this for QP sizing */
 766        LASSERT(tx->tx_nwrq > 0);
 767
 768        LASSERT(!credit || credit == 1);
 769        LASSERT(conn->ibc_outstanding_credits >= 0);
 770        LASSERT(conn->ibc_outstanding_credits <= conn->ibc_queue_depth);
 771        LASSERT(conn->ibc_credits >= 0);
 772        LASSERT(conn->ibc_credits <= conn->ibc_queue_depth);
 773
 774        if (conn->ibc_nsends_posted == kiblnd_concurrent_sends(ver, ni)) {
 775                /* tx completions outstanding... */
 776                CDEBUG(D_NET, "%s: posted enough\n",
 777                       libcfs_nid2str(peer->ibp_nid));
 778                return -EAGAIN;
 779        }
 780
 781        if (credit && !conn->ibc_credits) {   /* no credits */
 782                CDEBUG(D_NET, "%s: no credits\n",
 783                       libcfs_nid2str(peer->ibp_nid));
 784                return -EAGAIN;
 785        }
 786
 787        if (credit && !IBLND_OOB_CAPABLE(ver) &&
 788            conn->ibc_credits == 1 &&   /* last credit reserved */
 789            msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
 790                CDEBUG(D_NET, "%s: not using last credit\n",
 791                       libcfs_nid2str(peer->ibp_nid));
 792                return -EAGAIN;
 793        }
 794
 795        /* NB don't drop ibc_lock before bumping tx_sending */
 796        list_del(&tx->tx_list);
 797        tx->tx_queued = 0;
 798
 799        if (msg->ibm_type == IBLND_MSG_NOOP &&
 800            (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
 801             (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
 802              conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
 803                /*
 804                 * OK to drop when posted enough NOOPs, since
 805                 * kiblnd_check_sends_locked will queue NOOP again when
 806                 * posted NOOPs complete
 807                 */
 808                spin_unlock(&conn->ibc_lock);
 809                kiblnd_tx_done(peer->ibp_ni, tx);
 810                spin_lock(&conn->ibc_lock);
 811                CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
 812                       libcfs_nid2str(peer->ibp_nid),
 813                       conn->ibc_noops_posted);
 814                return 0;
 815        }
 816
 817        kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
 818                        peer->ibp_nid, conn->ibc_incarnation);
 819
 820        conn->ibc_credits -= credit;
 821        conn->ibc_outstanding_credits = 0;
 822        conn->ibc_nsends_posted++;
 823        if (msg->ibm_type == IBLND_MSG_NOOP)
 824                conn->ibc_noops_posted++;
 825
 826        /*
 827         * CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
 828         * PUT.  If so, it was first queued here as a PUT_REQ, sent and
 829         * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
 830         * and then re-queued here.  It's (just) possible that
 831         * tx_sending is non-zero if we've not done the tx_complete()
 832         * from the first send; hence the ++ rather than = below.
 833         */
 834        tx->tx_sending++;
 835        list_add(&tx->tx_list, &conn->ibc_active_txs);
 836
 837        /* I'm still holding ibc_lock! */
 838        if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
 839                rc = -ECONNABORTED;
 840        } else if (tx->tx_pool->tpo_pool.po_failed ||
 841                 conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
 842                /* close_conn will launch failover */
 843                rc = -ENETDOWN;
 844        } else {
 845                struct kib_fast_reg_descriptor *frd = tx->fmr.fmr_frd;
 846                struct ib_send_wr *bad = &tx->tx_wrq[tx->tx_nwrq - 1].wr;
 847                struct ib_send_wr *wrq = &tx->tx_wrq[0].wr;
 848
 849                if (frd) {
 850                        if (!frd->frd_valid) {
 851                                wrq = &frd->frd_inv_wr;
 852                                wrq->next = &frd->frd_fastreg_wr.wr;
 853                        } else {
 854                                wrq = &frd->frd_fastreg_wr.wr;
 855                        }
 856                        frd->frd_fastreg_wr.wr.next = &tx->tx_wrq[0].wr;
 857                }
 858
 859                LASSERTF(bad->wr_id == kiblnd_ptr2wreqid(tx, IBLND_WID_TX),
 860                         "bad wr_id %llx, opc %d, flags %d, peer: %s\n",
 861                         bad->wr_id, bad->opcode, bad->send_flags,
 862                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
 863                bad = NULL;
 864                rc = ib_post_send(conn->ibc_cmid->qp, wrq, &bad);
 865        }
 866
 867        conn->ibc_last_send = jiffies;
 868
 869        if (!rc)
 870                return 0;
 871
 872        /*
 873         * NB credits are transferred in the actual
 874         * message, which can only be the last work item
 875         */
 876        conn->ibc_credits += credit;
 877        conn->ibc_outstanding_credits += msg->ibm_credits;
 878        conn->ibc_nsends_posted--;
 879        if (msg->ibm_type == IBLND_MSG_NOOP)
 880                conn->ibc_noops_posted--;
 881
 882        tx->tx_status = rc;
 883        tx->tx_waiting = 0;
 884        tx->tx_sending--;
 885
 886        done = !tx->tx_sending;
 887        if (done)
 888                list_del(&tx->tx_list);
 889
 890        spin_unlock(&conn->ibc_lock);
 891
 892        if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
 893                CERROR("Error %d posting transmit to %s\n",
 894                       rc, libcfs_nid2str(peer->ibp_nid));
 895        else
 896                CDEBUG(D_NET, "Error %d posting transmit to %s\n",
 897                       rc, libcfs_nid2str(peer->ibp_nid));
 898
 899        kiblnd_close_conn(conn, rc);
 900
 901        if (done)
 902                kiblnd_tx_done(peer->ibp_ni, tx);
 903
 904        spin_lock(&conn->ibc_lock);
 905
 906        return -EIO;
 907}
 908
 909static void
 910kiblnd_check_sends_locked(struct kib_conn *conn)
 911{
 912        int ver = conn->ibc_version;
 913        lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
 914        struct kib_tx *tx;
 915
 916        /* Don't send anything until after the connection is established */
 917        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 918                CDEBUG(D_NET, "%s too soon\n",
 919                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
 920                return;
 921        }
 922
 923        LASSERT(conn->ibc_nsends_posted <= kiblnd_concurrent_sends(ver, ni));
 924        LASSERT(!IBLND_OOB_CAPABLE(ver) ||
 925                conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
 926        LASSERT(conn->ibc_reserved_credits >= 0);
 927
 928        while (conn->ibc_reserved_credits > 0 &&
 929               !list_empty(&conn->ibc_tx_queue_rsrvd)) {
 930                tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
 931                                struct kib_tx, tx_list);
 932                list_del(&tx->tx_list);
 933                list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
 934                conn->ibc_reserved_credits--;
 935        }
 936
 937        if (kiblnd_need_noop(conn)) {
 938                spin_unlock(&conn->ibc_lock);
 939
 940                tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
 941                if (tx)
 942                        kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
 943
 944                spin_lock(&conn->ibc_lock);
 945                if (tx)
 946                        kiblnd_queue_tx_locked(tx, conn);
 947        }
 948
 949        for (;;) {
 950                int credit;
 951
 952                if (!list_empty(&conn->ibc_tx_queue_nocred)) {
 953                        credit = 0;
 954                        tx = list_entry(conn->ibc_tx_queue_nocred.next,
 955                                        struct kib_tx, tx_list);
 956                } else if (!list_empty(&conn->ibc_tx_noops)) {
 957                        LASSERT(!IBLND_OOB_CAPABLE(ver));
 958                        credit = 1;
 959                        tx = list_entry(conn->ibc_tx_noops.next,
 960                                        struct kib_tx, tx_list);
 961                } else if (!list_empty(&conn->ibc_tx_queue)) {
 962                        credit = 1;
 963                        tx = list_entry(conn->ibc_tx_queue.next,
 964                                        struct kib_tx, tx_list);
 965                } else {
 966                        break;
 967                }
 968
 969                if (kiblnd_post_tx_locked(conn, tx, credit))
 970                        break;
 971        }
 972}
 973
 974static void
 975kiblnd_tx_complete(struct kib_tx *tx, int status)
 976{
 977        int failed = (status != IB_WC_SUCCESS);
 978        struct kib_conn *conn = tx->tx_conn;
 979        int idle;
 980
 981        LASSERT(tx->tx_sending > 0);
 982
 983        if (failed) {
 984                if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
 985                        CNETERR("Tx -> %s cookie %#llx sending %d waiting %d: failed %d\n",
 986                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
 987                                tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
 988                                status);
 989
 990                kiblnd_close_conn(conn, -EIO);
 991        } else {
 992                kiblnd_peer_alive(conn->ibc_peer);
 993        }
 994
 995        spin_lock(&conn->ibc_lock);
 996
 997        /*
 998         * I could be racing with rdma completion.  Whoever makes 'tx' idle
 999         * gets to free it, which also drops its ref on 'conn'.
1000         */
1001        tx->tx_sending--;
1002        conn->ibc_nsends_posted--;
1003        if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
1004                conn->ibc_noops_posted--;
1005
1006        if (failed) {
1007                tx->tx_waiting = 0;          /* don't wait for peer */
1008                tx->tx_status = -EIO;
1009        }
1010
1011        idle = !tx->tx_sending &&        /* This is the final callback */
1012               !tx->tx_waiting &&              /* Not waiting for peer */
1013               !tx->tx_queued;            /* Not re-queued (PUT_DONE) */
1014        if (idle)
1015                list_del(&tx->tx_list);
1016
1017        kiblnd_check_sends_locked(conn);
1018        spin_unlock(&conn->ibc_lock);
1019
1020        if (idle)
1021                kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1022}
1023
1024static void
1025kiblnd_init_tx_msg(lnet_ni_t *ni, struct kib_tx *tx, int type, int body_nob)
1026{
1027        struct kib_hca_dev *hdev = tx->tx_pool->tpo_hdev;
1028        struct ib_sge *sge = &tx->tx_sge[tx->tx_nwrq];
1029        struct ib_rdma_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1030        int nob = offsetof(struct kib_msg, ibm_u) + body_nob;
1031        struct ib_mr *mr = hdev->ibh_mrs;
1032
1033        LASSERT(tx->tx_nwrq >= 0);
1034        LASSERT(tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1035        LASSERT(nob <= IBLND_MSG_SIZE);
1036        LASSERT(mr);
1037
1038        kiblnd_init_msg(tx->tx_msg, type, body_nob);
1039
1040        sge->lkey   = mr->lkey;
1041        sge->addr   = tx->tx_msgaddr;
1042        sge->length = nob;
1043
1044        memset(wrq, 0, sizeof(*wrq));
1045
1046        wrq->wr.next       = NULL;
1047        wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1048        wrq->wr.sg_list    = sge;
1049        wrq->wr.num_sge    = 1;
1050        wrq->wr.opcode     = IB_WR_SEND;
1051        wrq->wr.send_flags = IB_SEND_SIGNALED;
1052
1053        tx->tx_nwrq++;
1054}
1055
1056static int
1057kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
1058                 int resid, struct kib_rdma_desc *dstrd, __u64 dstcookie)
1059{
1060        struct kib_msg *ibmsg = tx->tx_msg;
1061        struct kib_rdma_desc *srcrd = tx->tx_rd;
1062        struct ib_sge *sge = &tx->tx_sge[0];
1063        struct ib_rdma_wr *wrq, *next;
1064        int rc  = resid;
1065        int srcidx = 0;
1066        int dstidx = 0;
1067        int wrknob;
1068
1069        LASSERT(!in_interrupt());
1070        LASSERT(!tx->tx_nwrq);
1071        LASSERT(type == IBLND_MSG_GET_DONE ||
1072                type == IBLND_MSG_PUT_DONE);
1073
1074        if (kiblnd_rd_size(srcrd) > conn->ibc_max_frags << PAGE_SHIFT) {
1075                CERROR("RDMA is too large for peer %s (%d), src size: %d dst size: %d\n",
1076                       libcfs_nid2str(conn->ibc_peer->ibp_nid),
1077                       conn->ibc_max_frags << PAGE_SHIFT,
1078                       kiblnd_rd_size(srcrd), kiblnd_rd_size(dstrd));
1079                rc = -EMSGSIZE;
1080                goto too_big;
1081        }
1082
1083        while (resid > 0) {
1084                if (srcidx >= srcrd->rd_nfrags) {
1085                        CERROR("Src buffer exhausted: %d frags\n", srcidx);
1086                        rc = -EPROTO;
1087                        break;
1088                }
1089
1090                if (dstidx == dstrd->rd_nfrags) {
1091                        CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1092                        rc = -EPROTO;
1093                        break;
1094                }
1095
1096                if (tx->tx_nwrq >= IBLND_MAX_RDMA_FRAGS) {
1097                        CERROR("RDMA has too many fragments for peer %s (%d), src idx/frags: %d/%d dst idx/frags: %d/%d\n",
1098                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
1099                               IBLND_MAX_RDMA_FRAGS,
1100                               srcidx, srcrd->rd_nfrags,
1101                               dstidx, dstrd->rd_nfrags);
1102                        rc = -EMSGSIZE;
1103                        break;
1104                }
1105
1106                wrknob = min(min(kiblnd_rd_frag_size(srcrd, srcidx),
1107                                 kiblnd_rd_frag_size(dstrd, dstidx)),
1108                             (__u32)resid);
1109
1110                sge = &tx->tx_sge[tx->tx_nwrq];
1111                sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1112                sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1113                sge->length = wrknob;
1114
1115                wrq = &tx->tx_wrq[tx->tx_nwrq];
1116                next = wrq + 1;
1117
1118                wrq->wr.next       = &next->wr;
1119                wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1120                wrq->wr.sg_list    = sge;
1121                wrq->wr.num_sge    = 1;
1122                wrq->wr.opcode     = IB_WR_RDMA_WRITE;
1123                wrq->wr.send_flags = 0;
1124
1125                wrq->remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1126                wrq->rkey        = kiblnd_rd_frag_key(dstrd, dstidx);
1127
1128                srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1129                dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1130
1131                resid -= wrknob;
1132
1133                tx->tx_nwrq++;
1134                wrq++;
1135                sge++;
1136        }
1137too_big:
1138        if (rc < 0)                          /* no RDMA if completing with failure */
1139                tx->tx_nwrq = 0;
1140
1141        ibmsg->ibm_u.completion.ibcm_status = rc;
1142        ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1143        kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1144                           type, sizeof(struct kib_completion_msg));
1145
1146        return rc;
1147}
1148
1149static void
1150kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn)
1151{
1152        struct list_head *q;
1153
1154        LASSERT(tx->tx_nwrq > 0);             /* work items set up */
1155        LASSERT(!tx->tx_queued);               /* not queued for sending already */
1156        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1157
1158        tx->tx_queued = 1;
1159        tx->tx_deadline = jiffies +
1160                          msecs_to_jiffies(*kiblnd_tunables.kib_timeout *
1161                                           MSEC_PER_SEC);
1162
1163        if (!tx->tx_conn) {
1164                kiblnd_conn_addref(conn);
1165                tx->tx_conn = conn;
1166                LASSERT(tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1167        } else {
1168                /* PUT_DONE first attached to conn as a PUT_REQ */
1169                LASSERT(tx->tx_conn == conn);
1170                LASSERT(tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1171        }
1172
1173        switch (tx->tx_msg->ibm_type) {
1174        default:
1175                LBUG();
1176
1177        case IBLND_MSG_PUT_REQ:
1178        case IBLND_MSG_GET_REQ:
1179                q = &conn->ibc_tx_queue_rsrvd;
1180                break;
1181
1182        case IBLND_MSG_PUT_NAK:
1183        case IBLND_MSG_PUT_ACK:
1184        case IBLND_MSG_PUT_DONE:
1185        case IBLND_MSG_GET_DONE:
1186                q = &conn->ibc_tx_queue_nocred;
1187                break;
1188
1189        case IBLND_MSG_NOOP:
1190                if (IBLND_OOB_CAPABLE(conn->ibc_version))
1191                        q = &conn->ibc_tx_queue_nocred;
1192                else
1193                        q = &conn->ibc_tx_noops;
1194                break;
1195
1196        case IBLND_MSG_IMMEDIATE:
1197                q = &conn->ibc_tx_queue;
1198                break;
1199        }
1200
1201        list_add_tail(&tx->tx_list, q);
1202}
1203
1204static void
1205kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn)
1206{
1207        spin_lock(&conn->ibc_lock);
1208        kiblnd_queue_tx_locked(tx, conn);
1209        kiblnd_check_sends_locked(conn);
1210        spin_unlock(&conn->ibc_lock);
1211}
1212
1213static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1214                               struct sockaddr_in *srcaddr,
1215                               struct sockaddr_in *dstaddr,
1216                               int timeout_ms)
1217{
1218        unsigned short port;
1219        int rc;
1220
1221        /* allow the port to be reused */
1222        rc = rdma_set_reuseaddr(cmid, 1);
1223        if (rc) {
1224                CERROR("Unable to set reuse on cmid: %d\n", rc);
1225                return rc;
1226        }
1227
1228        /* look for a free privileged port */
1229        for (port = PROT_SOCK - 1; port > 0; port--) {
1230                srcaddr->sin_port = htons(port);
1231                rc = rdma_resolve_addr(cmid,
1232                                       (struct sockaddr *)srcaddr,
1233                                       (struct sockaddr *)dstaddr,
1234                                       timeout_ms);
1235                if (!rc) {
1236                        CDEBUG(D_NET, "bound to port %hu\n", port);
1237                        return 0;
1238                } else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1239                        CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1240                               port, rc);
1241                } else {
1242                        return rc;
1243                }
1244        }
1245
1246        CERROR("Failed to bind to a free privileged port\n");
1247        return rc;
1248}
1249
1250static void
1251kiblnd_connect_peer(struct kib_peer *peer)
1252{
1253        struct rdma_cm_id *cmid;
1254        struct kib_dev *dev;
1255        struct kib_net *net = peer->ibp_ni->ni_data;
1256        struct sockaddr_in srcaddr;
1257        struct sockaddr_in dstaddr;
1258        int rc;
1259
1260        LASSERT(net);
1261        LASSERT(peer->ibp_connecting > 0);
1262        LASSERT(!peer->ibp_reconnecting);
1263
1264        cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1265                                     IB_QPT_RC);
1266
1267        if (IS_ERR(cmid)) {
1268                CERROR("Can't create CMID for %s: %ld\n",
1269                       libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1270                rc = PTR_ERR(cmid);
1271                goto failed;
1272        }
1273
1274        dev = net->ibn_dev;
1275        memset(&srcaddr, 0, sizeof(srcaddr));
1276        srcaddr.sin_family = AF_INET;
1277        srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1278
1279        memset(&dstaddr, 0, sizeof(dstaddr));
1280        dstaddr.sin_family = AF_INET;
1281        dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1282        dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1283
1284        kiblnd_peer_addref(peer);              /* cmid's ref */
1285
1286        if (*kiblnd_tunables.kib_use_priv_port) {
1287                rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1288                                         *kiblnd_tunables.kib_timeout * 1000);
1289        } else {
1290                rc = rdma_resolve_addr(cmid,
1291                                       (struct sockaddr *)&srcaddr,
1292                                       (struct sockaddr *)&dstaddr,
1293                                       *kiblnd_tunables.kib_timeout * 1000);
1294        }
1295        if (rc) {
1296                /* Can't initiate address resolution:  */
1297                CERROR("Can't resolve addr for %s: %d\n",
1298                       libcfs_nid2str(peer->ibp_nid), rc);
1299                goto failed2;
1300        }
1301
1302        LASSERT(cmid->device);
1303        CDEBUG(D_NET, "%s: connection bound to %s:%pI4h:%s\n",
1304               libcfs_nid2str(peer->ibp_nid), dev->ibd_ifname,
1305               &dev->ibd_ifip, cmid->device->name);
1306
1307        return;
1308
1309 failed2:
1310        kiblnd_peer_connect_failed(peer, 1, rc);
1311        kiblnd_peer_decref(peer);              /* cmid's ref */
1312        rdma_destroy_id(cmid);
1313        return;
1314 failed:
1315        kiblnd_peer_connect_failed(peer, 1, rc);
1316}
1317
1318bool
1319kiblnd_reconnect_peer(struct kib_peer *peer)
1320{
1321        rwlock_t *glock = &kiblnd_data.kib_global_lock;
1322        char *reason = NULL;
1323        struct list_head txs;
1324        unsigned long flags;
1325
1326        INIT_LIST_HEAD(&txs);
1327
1328        write_lock_irqsave(glock, flags);
1329        if (!peer->ibp_reconnecting) {
1330                if (peer->ibp_accepting)
1331                        reason = "accepting";
1332                else if (peer->ibp_connecting)
1333                        reason = "connecting";
1334                else if (!list_empty(&peer->ibp_conns))
1335                        reason = "connected";
1336                else /* connected then closed */
1337                        reason = "closed";
1338
1339                goto no_reconnect;
1340        }
1341
1342        LASSERT(!peer->ibp_accepting && !peer->ibp_connecting &&
1343                list_empty(&peer->ibp_conns));
1344        peer->ibp_reconnecting = 0;
1345
1346        if (!kiblnd_peer_active(peer)) {
1347                list_splice_init(&peer->ibp_tx_queue, &txs);
1348                reason = "unlinked";
1349                goto no_reconnect;
1350        }
1351
1352        peer->ibp_connecting++;
1353        peer->ibp_reconnected++;
1354        write_unlock_irqrestore(glock, flags);
1355
1356        kiblnd_connect_peer(peer);
1357        return true;
1358
1359no_reconnect:
1360        write_unlock_irqrestore(glock, flags);
1361
1362        CWARN("Abort reconnection of %s: %s\n",
1363              libcfs_nid2str(peer->ibp_nid), reason);
1364        kiblnd_txlist_done(peer->ibp_ni, &txs, -ECONNABORTED);
1365        return false;
1366}
1367
1368void
1369kiblnd_launch_tx(lnet_ni_t *ni, struct kib_tx *tx, lnet_nid_t nid)
1370{
1371        struct kib_peer *peer;
1372        struct kib_peer *peer2;
1373        struct kib_conn *conn;
1374        rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
1375        unsigned long flags;
1376        int rc;
1377
1378        /*
1379         * If I get here, I've committed to send, so I complete the tx with
1380         * failure on any problems
1381         */
1382        LASSERT(!tx || !tx->tx_conn); /* only set when assigned a conn */
1383        LASSERT(!tx || tx->tx_nwrq > 0);     /* work items have been set up */
1384
1385        /*
1386         * First time, just use a read lock since I expect to find my peer
1387         * connected
1388         */
1389        read_lock_irqsave(g_lock, flags);
1390
1391        peer = kiblnd_find_peer_locked(nid);
1392        if (peer && !list_empty(&peer->ibp_conns)) {
1393                /* Found a peer with an established connection */
1394                conn = kiblnd_get_conn_locked(peer);
1395                kiblnd_conn_addref(conn); /* 1 ref for me... */
1396
1397                read_unlock_irqrestore(g_lock, flags);
1398
1399                if (tx)
1400                        kiblnd_queue_tx(tx, conn);
1401                kiblnd_conn_decref(conn); /* ...to here */
1402                return;
1403        }
1404
1405        read_unlock(g_lock);
1406        /* Re-try with a write lock */
1407        write_lock(g_lock);
1408
1409        peer = kiblnd_find_peer_locked(nid);
1410        if (peer) {
1411                if (list_empty(&peer->ibp_conns)) {
1412                        /* found a peer, but it's still connecting... */
1413                        LASSERT(kiblnd_peer_connecting(peer));
1414                        if (tx)
1415                                list_add_tail(&tx->tx_list,
1416                                              &peer->ibp_tx_queue);
1417                        write_unlock_irqrestore(g_lock, flags);
1418                } else {
1419                        conn = kiblnd_get_conn_locked(peer);
1420                        kiblnd_conn_addref(conn); /* 1 ref for me... */
1421
1422                        write_unlock_irqrestore(g_lock, flags);
1423
1424                        if (tx)
1425                                kiblnd_queue_tx(tx, conn);
1426                        kiblnd_conn_decref(conn); /* ...to here */
1427                }
1428                return;
1429        }
1430
1431        write_unlock_irqrestore(g_lock, flags);
1432
1433        /* Allocate a peer ready to add to the peer table and retry */
1434        rc = kiblnd_create_peer(ni, &peer, nid);
1435        if (rc) {
1436                CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1437                if (tx) {
1438                        tx->tx_status = -EHOSTUNREACH;
1439                        tx->tx_waiting = 0;
1440                        kiblnd_tx_done(ni, tx);
1441                }
1442                return;
1443        }
1444
1445        write_lock_irqsave(g_lock, flags);
1446
1447        peer2 = kiblnd_find_peer_locked(nid);
1448        if (peer2) {
1449                if (list_empty(&peer2->ibp_conns)) {
1450                        /* found a peer, but it's still connecting... */
1451                        LASSERT(kiblnd_peer_connecting(peer2));
1452                        if (tx)
1453                                list_add_tail(&tx->tx_list,
1454                                              &peer2->ibp_tx_queue);
1455                        write_unlock_irqrestore(g_lock, flags);
1456                } else {
1457                        conn = kiblnd_get_conn_locked(peer2);
1458                        kiblnd_conn_addref(conn); /* 1 ref for me... */
1459
1460                        write_unlock_irqrestore(g_lock, flags);
1461
1462                        if (tx)
1463                                kiblnd_queue_tx(tx, conn);
1464                        kiblnd_conn_decref(conn); /* ...to here */
1465                }
1466
1467                kiblnd_peer_decref(peer);
1468                return;
1469        }
1470
1471        /* Brand new peer */
1472        LASSERT(!peer->ibp_connecting);
1473        peer->ibp_connecting = 1;
1474
1475        /* always called with a ref on ni, which prevents ni being shutdown */
1476        LASSERT(!((struct kib_net *)ni->ni_data)->ibn_shutdown);
1477
1478        if (tx)
1479                list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1480
1481        kiblnd_peer_addref(peer);
1482        list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1483
1484        write_unlock_irqrestore(g_lock, flags);
1485
1486        kiblnd_connect_peer(peer);
1487        kiblnd_peer_decref(peer);
1488}
1489
1490int
1491kiblnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1492{
1493        lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1494        int type = lntmsg->msg_type;
1495        lnet_process_id_t target = lntmsg->msg_target;
1496        int target_is_router = lntmsg->msg_target_is_router;
1497        int routing = lntmsg->msg_routing;
1498        unsigned int payload_niov = lntmsg->msg_niov;
1499        struct kvec *payload_iov = lntmsg->msg_iov;
1500        lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1501        unsigned int payload_offset = lntmsg->msg_offset;
1502        unsigned int payload_nob = lntmsg->msg_len;
1503        struct iov_iter from;
1504        struct kib_msg *ibmsg;
1505        struct kib_rdma_desc  *rd;
1506        struct kib_tx *tx;
1507        int nob;
1508        int rc;
1509
1510        /* NB 'private' is different depending on what we're sending.... */
1511
1512        CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1513               payload_nob, payload_niov, libcfs_id2str(target));
1514
1515        LASSERT(!payload_nob || payload_niov > 0);
1516        LASSERT(payload_niov <= LNET_MAX_IOV);
1517
1518        /* Thread context */
1519        LASSERT(!in_interrupt());
1520        /* payload is either all vaddrs or all pages */
1521        LASSERT(!(payload_kiov && payload_iov));
1522
1523        if (payload_kiov)
1524                iov_iter_bvec(&from, ITER_BVEC | WRITE,
1525                              payload_kiov, payload_niov,
1526                              payload_nob + payload_offset);
1527        else
1528                iov_iter_kvec(&from, ITER_KVEC | WRITE,
1529                              payload_iov, payload_niov,
1530                              payload_nob + payload_offset);
1531
1532        iov_iter_advance(&from, payload_offset);
1533
1534        switch (type) {
1535        default:
1536                LBUG();
1537                return -EIO;
1538
1539        case LNET_MSG_ACK:
1540                LASSERT(!payload_nob);
1541                break;
1542
1543        case LNET_MSG_GET:
1544                if (routing || target_is_router)
1545                        break;            /* send IMMEDIATE */
1546
1547                /* is the REPLY message too small for RDMA? */
1548                nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1549                if (nob <= IBLND_MSG_SIZE)
1550                        break;            /* send IMMEDIATE */
1551
1552                tx = kiblnd_get_idle_tx(ni, target.nid);
1553                if (!tx) {
1554                        CERROR("Can't allocate txd for GET to %s\n",
1555                               libcfs_nid2str(target.nid));
1556                        return -ENOMEM;
1557                }
1558
1559                ibmsg = tx->tx_msg;
1560                rd = &ibmsg->ibm_u.get.ibgm_rd;
1561                if (!(lntmsg->msg_md->md_options & LNET_MD_KIOV))
1562                        rc = kiblnd_setup_rd_iov(ni, tx, rd,
1563                                                 lntmsg->msg_md->md_niov,
1564                                                 lntmsg->msg_md->md_iov.iov,
1565                                                 0, lntmsg->msg_md->md_length);
1566                else
1567                        rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1568                                                  lntmsg->msg_md->md_niov,
1569                                                  lntmsg->msg_md->md_iov.kiov,
1570                                                  0, lntmsg->msg_md->md_length);
1571                if (rc) {
1572                        CERROR("Can't setup GET sink for %s: %d\n",
1573                               libcfs_nid2str(target.nid), rc);
1574                        kiblnd_tx_done(ni, tx);
1575                        return -EIO;
1576                }
1577
1578                nob = offsetof(struct kib_get_msg, ibgm_rd.rd_frags[rd->rd_nfrags]);
1579                ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1580                ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1581
1582                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1583
1584                tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1585                if (!tx->tx_lntmsg[1]) {
1586                        CERROR("Can't create reply for GET -> %s\n",
1587                               libcfs_nid2str(target.nid));
1588                        kiblnd_tx_done(ni, tx);
1589                        return -EIO;
1590                }
1591
1592                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1593                tx->tx_waiting = 1;          /* waiting for GET_DONE */
1594                kiblnd_launch_tx(ni, tx, target.nid);
1595                return 0;
1596
1597        case LNET_MSG_REPLY:
1598        case LNET_MSG_PUT:
1599                /* Is the payload small enough not to need RDMA? */
1600                nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob]);
1601                if (nob <= IBLND_MSG_SIZE)
1602                        break;            /* send IMMEDIATE */
1603
1604                tx = kiblnd_get_idle_tx(ni, target.nid);
1605                if (!tx) {
1606                        CERROR("Can't allocate %s txd for %s\n",
1607                               type == LNET_MSG_PUT ? "PUT" : "REPLY",
1608                               libcfs_nid2str(target.nid));
1609                        return -ENOMEM;
1610                }
1611
1612                if (!payload_kiov)
1613                        rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1614                                                 payload_niov, payload_iov,
1615                                                 payload_offset, payload_nob);
1616                else
1617                        rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1618                                                  payload_niov, payload_kiov,
1619                                                  payload_offset, payload_nob);
1620                if (rc) {
1621                        CERROR("Can't setup PUT src for %s: %d\n",
1622                               libcfs_nid2str(target.nid), rc);
1623                        kiblnd_tx_done(ni, tx);
1624                        return -EIO;
1625                }
1626
1627                ibmsg = tx->tx_msg;
1628                ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1629                ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1630                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(struct kib_putreq_msg));
1631
1632                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1633                tx->tx_waiting = 1;          /* waiting for PUT_{ACK,NAK} */
1634                kiblnd_launch_tx(ni, tx, target.nid);
1635                return 0;
1636        }
1637
1638        /* send IMMEDIATE */
1639
1640        LASSERT(offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob])
1641                 <= IBLND_MSG_SIZE);
1642
1643        tx = kiblnd_get_idle_tx(ni, target.nid);
1644        if (!tx) {
1645                CERROR("Can't send %d to %s: tx descs exhausted\n",
1646                       type, libcfs_nid2str(target.nid));
1647                return -ENOMEM;
1648        }
1649
1650        ibmsg = tx->tx_msg;
1651        ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1652
1653        copy_from_iter(&ibmsg->ibm_u.immediate.ibim_payload, IBLND_MSG_SIZE,
1654                       &from);
1655        nob = offsetof(struct kib_immediate_msg, ibim_payload[payload_nob]);
1656        kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1657
1658        tx->tx_lntmsg[0] = lntmsg;            /* finalise lntmsg on completion */
1659        kiblnd_launch_tx(ni, tx, target.nid);
1660        return 0;
1661}
1662
1663static void
1664kiblnd_reply(lnet_ni_t *ni, struct kib_rx *rx, lnet_msg_t *lntmsg)
1665{
1666        lnet_process_id_t target = lntmsg->msg_target;
1667        unsigned int niov = lntmsg->msg_niov;
1668        struct kvec *iov = lntmsg->msg_iov;
1669        lnet_kiov_t *kiov = lntmsg->msg_kiov;
1670        unsigned int offset = lntmsg->msg_offset;
1671        unsigned int nob = lntmsg->msg_len;
1672        struct kib_tx *tx;
1673        int rc;
1674
1675        tx = kiblnd_get_idle_tx(ni, rx->rx_conn->ibc_peer->ibp_nid);
1676        if (!tx) {
1677                CERROR("Can't get tx for REPLY to %s\n",
1678                       libcfs_nid2str(target.nid));
1679                goto failed_0;
1680        }
1681
1682        if (!nob)
1683                rc = 0;
1684        else if (!kiov)
1685                rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1686                                         niov, iov, offset, nob);
1687        else
1688                rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1689                                          niov, kiov, offset, nob);
1690
1691        if (rc) {
1692                CERROR("Can't setup GET src for %s: %d\n",
1693                       libcfs_nid2str(target.nid), rc);
1694                goto failed_1;
1695        }
1696
1697        rc = kiblnd_init_rdma(rx->rx_conn, tx,
1698                              IBLND_MSG_GET_DONE, nob,
1699                              &rx->rx_msg->ibm_u.get.ibgm_rd,
1700                              rx->rx_msg->ibm_u.get.ibgm_cookie);
1701        if (rc < 0) {
1702                CERROR("Can't setup rdma for GET from %s: %d\n",
1703                       libcfs_nid2str(target.nid), rc);
1704                goto failed_1;
1705        }
1706
1707        if (!nob) {
1708                /* No RDMA: local completion may happen now! */
1709                lnet_finalize(ni, lntmsg, 0);
1710        } else {
1711                /* RDMA: lnet_finalize(lntmsg) when it completes */
1712                tx->tx_lntmsg[0] = lntmsg;
1713        }
1714
1715        kiblnd_queue_tx(tx, rx->rx_conn);
1716        return;
1717
1718 failed_1:
1719        kiblnd_tx_done(ni, tx);
1720 failed_0:
1721        lnet_finalize(ni, lntmsg, -EIO);
1722}
1723
1724int
1725kiblnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1726            struct iov_iter *to, unsigned int rlen)
1727{
1728        struct kib_rx *rx = private;
1729        struct kib_msg *rxmsg = rx->rx_msg;
1730        struct kib_conn *conn = rx->rx_conn;
1731        struct kib_tx *tx;
1732        int nob;
1733        int post_credit = IBLND_POSTRX_PEER_CREDIT;
1734        int rc = 0;
1735
1736        LASSERT(iov_iter_count(to) <= rlen);
1737        LASSERT(!in_interrupt());
1738        /* Either all pages or all vaddrs */
1739
1740        switch (rxmsg->ibm_type) {
1741        default:
1742                LBUG();
1743
1744        case IBLND_MSG_IMMEDIATE:
1745                nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[rlen]);
1746                if (nob > rx->rx_nob) {
1747                        CERROR("Immediate message from %s too big: %d(%d)\n",
1748                               libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1749                               nob, rx->rx_nob);
1750                        rc = -EPROTO;
1751                        break;
1752                }
1753
1754                copy_to_iter(&rxmsg->ibm_u.immediate.ibim_payload,
1755                             IBLND_MSG_SIZE, to);
1756                lnet_finalize(ni, lntmsg, 0);
1757                break;
1758
1759        case IBLND_MSG_PUT_REQ: {
1760                struct kib_msg  *txmsg;
1761                struct kib_rdma_desc *rd;
1762
1763                if (!iov_iter_count(to)) {
1764                        lnet_finalize(ni, lntmsg, 0);
1765                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1766                                               rxmsg->ibm_u.putreq.ibprm_cookie);
1767                        break;
1768                }
1769
1770                tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
1771                if (!tx) {
1772                        CERROR("Can't allocate tx for %s\n",
1773                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
1774                        /* Not replying will break the connection */
1775                        rc = -ENOMEM;
1776                        break;
1777                }
1778
1779                txmsg = tx->tx_msg;
1780                rd = &txmsg->ibm_u.putack.ibpam_rd;
1781                if (!(to->type & ITER_BVEC))
1782                        rc = kiblnd_setup_rd_iov(ni, tx, rd,
1783                                                 to->nr_segs, to->kvec,
1784                                                 to->iov_offset,
1785                                                 iov_iter_count(to));
1786                else
1787                        rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1788                                                  to->nr_segs, to->bvec,
1789                                                  to->iov_offset,
1790                                                  iov_iter_count(to));
1791                if (rc) {
1792                        CERROR("Can't setup PUT sink for %s: %d\n",
1793                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1794                        kiblnd_tx_done(ni, tx);
1795                        /* tell peer it's over */
1796                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1797                                               rxmsg->ibm_u.putreq.ibprm_cookie);
1798                        break;
1799                }
1800
1801                nob = offsetof(struct kib_putack_msg, ibpam_rd.rd_frags[rd->rd_nfrags]);
1802                txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1803                txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1804
1805                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1806
1807                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1808                tx->tx_waiting = 1;          /* waiting for PUT_DONE */
1809                kiblnd_queue_tx(tx, conn);
1810
1811                /* reposted buffer reserved for PUT_DONE */
1812                post_credit = IBLND_POSTRX_NO_CREDIT;
1813                break;
1814                }
1815
1816        case IBLND_MSG_GET_REQ:
1817                if (lntmsg) {
1818                        /* Optimized GET; RDMA lntmsg's payload */
1819                        kiblnd_reply(ni, rx, lntmsg);
1820                } else {
1821                        /* GET didn't match anything */
1822                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1823                                               -ENODATA,
1824                                               rxmsg->ibm_u.get.ibgm_cookie);
1825                }
1826                break;
1827        }
1828
1829        kiblnd_post_rx(rx, post_credit);
1830        return rc;
1831}
1832
1833int
1834kiblnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
1835{
1836        struct task_struct *task = kthread_run(fn, arg, "%s", name);
1837
1838        if (IS_ERR(task))
1839                return PTR_ERR(task);
1840
1841        atomic_inc(&kiblnd_data.kib_nthreads);
1842        return 0;
1843}
1844
1845static void
1846kiblnd_thread_fini(void)
1847{
1848        atomic_dec(&kiblnd_data.kib_nthreads);
1849}
1850
1851static void
1852kiblnd_peer_alive(struct kib_peer *peer)
1853{
1854        /* This is racy, but everyone's only writing cfs_time_current() */
1855        peer->ibp_last_alive = cfs_time_current();
1856        mb();
1857}
1858
1859static void
1860kiblnd_peer_notify(struct kib_peer *peer)
1861{
1862        int error = 0;
1863        unsigned long last_alive = 0;
1864        unsigned long flags;
1865
1866        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1867
1868        if (kiblnd_peer_idle(peer) && peer->ibp_error) {
1869                error = peer->ibp_error;
1870                peer->ibp_error = 0;
1871
1872                last_alive = peer->ibp_last_alive;
1873        }
1874
1875        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1876
1877        if (error)
1878                lnet_notify(peer->ibp_ni,
1879                            peer->ibp_nid, 0, last_alive);
1880}
1881
1882void
1883kiblnd_close_conn_locked(struct kib_conn *conn, int error)
1884{
1885        /*
1886         * This just does the immediate housekeeping. 'error' is zero for a
1887         * normal shutdown which can happen only after the connection has been
1888         * established.  If the connection is established, schedule the
1889         * connection to be finished off by the connd. Otherwise the connd is
1890         * already dealing with it (either to set it up or tear it down).
1891         * Caller holds kib_global_lock exclusively in irq context
1892         */
1893        struct kib_peer *peer = conn->ibc_peer;
1894        struct kib_dev *dev;
1895        unsigned long flags;
1896
1897        LASSERT(error || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1898
1899        if (error && !conn->ibc_comms_error)
1900                conn->ibc_comms_error = error;
1901
1902        if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1903                return; /* already being handled  */
1904
1905        if (!error &&
1906            list_empty(&conn->ibc_tx_noops) &&
1907            list_empty(&conn->ibc_tx_queue) &&
1908            list_empty(&conn->ibc_tx_queue_rsrvd) &&
1909            list_empty(&conn->ibc_tx_queue_nocred) &&
1910            list_empty(&conn->ibc_active_txs)) {
1911                CDEBUG(D_NET, "closing conn to %s\n",
1912                       libcfs_nid2str(peer->ibp_nid));
1913        } else {
1914                CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1915                        libcfs_nid2str(peer->ibp_nid), error,
1916                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1917                        list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1918                        list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1919                        list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1920                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1921        }
1922
1923        dev = ((struct kib_net *)peer->ibp_ni->ni_data)->ibn_dev;
1924        list_del(&conn->ibc_list);
1925        /* connd (see below) takes over ibc_list's ref */
1926
1927        if (list_empty(&peer->ibp_conns) &&    /* no more conns */
1928            kiblnd_peer_active(peer)) {  /* still in peer table */
1929                kiblnd_unlink_peer_locked(peer);
1930
1931                /* set/clear error on last conn */
1932                peer->ibp_error = conn->ibc_comms_error;
1933        }
1934
1935        kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1936
1937        if (error &&
1938            kiblnd_dev_can_failover(dev)) {
1939                list_add_tail(&dev->ibd_fail_list,
1940                              &kiblnd_data.kib_failed_devs);
1941                wake_up(&kiblnd_data.kib_failover_waitq);
1942        }
1943
1944        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1945
1946        list_add_tail(&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1947        wake_up(&kiblnd_data.kib_connd_waitq);
1948
1949        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1950}
1951
1952void
1953kiblnd_close_conn(struct kib_conn *conn, int error)
1954{
1955        unsigned long flags;
1956
1957        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1958
1959        kiblnd_close_conn_locked(conn, error);
1960
1961        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1962}
1963
1964static void
1965kiblnd_handle_early_rxs(struct kib_conn *conn)
1966{
1967        unsigned long flags;
1968        struct kib_rx *rx;
1969        struct kib_rx *tmp;
1970
1971        LASSERT(!in_interrupt());
1972        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1973
1974        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1975        list_for_each_entry_safe(rx, tmp, &conn->ibc_early_rxs, rx_list) {
1976                list_del(&rx->rx_list);
1977                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1978
1979                kiblnd_handle_rx(rx);
1980
1981                write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1982        }
1983        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1984}
1985
1986static void
1987kiblnd_abort_txs(struct kib_conn *conn, struct list_head *txs)
1988{
1989        LIST_HEAD(zombies);
1990        struct list_head *tmp;
1991        struct list_head *nxt;
1992        struct kib_tx *tx;
1993
1994        spin_lock(&conn->ibc_lock);
1995
1996        list_for_each_safe(tmp, nxt, txs) {
1997                tx = list_entry(tmp, struct kib_tx, tx_list);
1998
1999                if (txs == &conn->ibc_active_txs) {
2000                        LASSERT(!tx->tx_queued);
2001                        LASSERT(tx->tx_waiting || tx->tx_sending);
2002                } else {
2003                        LASSERT(tx->tx_queued);
2004                }
2005
2006                tx->tx_status = -ECONNABORTED;
2007                tx->tx_waiting = 0;
2008
2009                if (!tx->tx_sending) {
2010                        tx->tx_queued = 0;
2011                        list_del(&tx->tx_list);
2012                        list_add(&tx->tx_list, &zombies);
2013                }
2014        }
2015
2016        spin_unlock(&conn->ibc_lock);
2017
2018        kiblnd_txlist_done(conn->ibc_peer->ibp_ni, &zombies, -ECONNABORTED);
2019}
2020
2021static void
2022kiblnd_finalise_conn(struct kib_conn *conn)
2023{
2024        LASSERT(!in_interrupt());
2025        LASSERT(conn->ibc_state > IBLND_CONN_INIT);
2026
2027        kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2028
2029        /*
2030         * abort_receives moves QP state to IB_QPS_ERR.  This is only required
2031         * for connections that didn't get as far as being connected, because
2032         * rdma_disconnect() does this for free.
2033         */
2034        kiblnd_abort_receives(conn);
2035
2036        /*
2037         * Complete all tx descs not waiting for sends to complete.
2038         * NB we should be safe from RDMA now that the QP has changed state
2039         */
2040        kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2041        kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2042        kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2043        kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2044        kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2045
2046        kiblnd_handle_early_rxs(conn);
2047}
2048
2049static void
2050kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error)
2051{
2052        LIST_HEAD(zombies);
2053        unsigned long flags;
2054
2055        LASSERT(error);
2056        LASSERT(!in_interrupt());
2057
2058        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2059
2060        if (active) {
2061                LASSERT(peer->ibp_connecting > 0);
2062                peer->ibp_connecting--;
2063        } else {
2064                LASSERT(peer->ibp_accepting > 0);
2065                peer->ibp_accepting--;
2066        }
2067
2068        if (kiblnd_peer_connecting(peer)) {
2069                /* another connection attempt under way... */
2070                write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2071                                        flags);
2072                return;
2073        }
2074
2075        peer->ibp_reconnected = 0;
2076        if (list_empty(&peer->ibp_conns)) {
2077                /* Take peer's blocked transmits to complete with error */
2078                list_add(&zombies, &peer->ibp_tx_queue);
2079                list_del_init(&peer->ibp_tx_queue);
2080
2081                if (kiblnd_peer_active(peer))
2082                        kiblnd_unlink_peer_locked(peer);
2083
2084                peer->ibp_error = error;
2085        } else {
2086                /* Can't have blocked transmits if there are connections */
2087                LASSERT(list_empty(&peer->ibp_tx_queue));
2088        }
2089
2090        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2091
2092        kiblnd_peer_notify(peer);
2093
2094        if (list_empty(&zombies))
2095                return;
2096
2097        CNETERR("Deleting messages for %s: connection failed\n",
2098                libcfs_nid2str(peer->ibp_nid));
2099
2100        kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2101}
2102
2103static void
2104kiblnd_connreq_done(struct kib_conn *conn, int status)
2105{
2106        struct kib_peer *peer = conn->ibc_peer;
2107        struct kib_tx *tx;
2108        struct kib_tx *tmp;
2109        struct list_head txs;
2110        unsigned long flags;
2111        int active;
2112
2113        active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2114
2115        CDEBUG(D_NET, "%s: active(%d), version(%x), status(%d)\n",
2116               libcfs_nid2str(peer->ibp_nid), active,
2117               conn->ibc_version, status);
2118
2119        LASSERT(!in_interrupt());
2120        LASSERT((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2121                 peer->ibp_connecting > 0) ||
2122                 (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2123                 peer->ibp_accepting > 0));
2124
2125        LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2126        conn->ibc_connvars = NULL;
2127
2128        if (status) {
2129                /* failed to establish connection */
2130                kiblnd_peer_connect_failed(peer, active, status);
2131                kiblnd_finalise_conn(conn);
2132                return;
2133        }
2134
2135        /* connection established */
2136        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2137
2138        conn->ibc_last_send = jiffies;
2139        kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2140        kiblnd_peer_alive(peer);
2141
2142        /*
2143         * Add conn to peer's list and nuke any dangling conns from a different
2144         * peer instance...
2145         */
2146        kiblnd_conn_addref(conn);              /* +1 ref for ibc_list */
2147        list_add(&conn->ibc_list, &peer->ibp_conns);
2148        peer->ibp_reconnected = 0;
2149        if (active)
2150                peer->ibp_connecting--;
2151        else
2152                peer->ibp_accepting--;
2153
2154        if (!peer->ibp_version) {
2155                peer->ibp_version     = conn->ibc_version;
2156                peer->ibp_incarnation = conn->ibc_incarnation;
2157        }
2158
2159        if (peer->ibp_version     != conn->ibc_version ||
2160            peer->ibp_incarnation != conn->ibc_incarnation) {
2161                kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2162                                                conn->ibc_incarnation);
2163                peer->ibp_version     = conn->ibc_version;
2164                peer->ibp_incarnation = conn->ibc_incarnation;
2165        }
2166
2167        /* grab pending txs while I have the lock */
2168        list_add(&txs, &peer->ibp_tx_queue);
2169        list_del_init(&peer->ibp_tx_queue);
2170
2171        if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2172            conn->ibc_comms_error) {       /* error has happened already */
2173                lnet_ni_t *ni = peer->ibp_ni;
2174
2175                /* start to shut down connection */
2176                kiblnd_close_conn_locked(conn, -ECONNABORTED);
2177                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2178
2179                kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2180
2181                return;
2182        }
2183
2184        /*
2185         * +1 ref for myself, this connection is visible to other threads
2186         * now, refcount of peer:ibp_conns can be released by connection
2187         * close from either a different thread, or the calling of
2188         * kiblnd_check_sends_locked() below. See bz21911 for details.
2189         */
2190        kiblnd_conn_addref(conn);
2191        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2192
2193        /* Schedule blocked txs */
2194        spin_lock(&conn->ibc_lock);
2195        list_for_each_entry_safe(tx, tmp, &txs, tx_list) {
2196                list_del(&tx->tx_list);
2197
2198                kiblnd_queue_tx_locked(tx, conn);
2199        }
2200        kiblnd_check_sends_locked(conn);
2201        spin_unlock(&conn->ibc_lock);
2202
2203        /* schedule blocked rxs */
2204        kiblnd_handle_early_rxs(conn);
2205
2206        kiblnd_conn_decref(conn);
2207}
2208
2209static void
2210kiblnd_reject(struct rdma_cm_id *cmid, struct kib_rej *rej)
2211{
2212        int rc;
2213
2214        rc = rdma_reject(cmid, rej, sizeof(*rej));
2215
2216        if (rc)
2217                CWARN("Error %d sending reject\n", rc);
2218}
2219
2220static int
2221kiblnd_passive_connect(struct rdma_cm_id *cmid, void *priv, int priv_nob)
2222{
2223        rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
2224        struct kib_msg *reqmsg = priv;
2225        struct kib_msg *ackmsg;
2226        struct kib_dev *ibdev;
2227        struct kib_peer *peer;
2228        struct kib_peer *peer2;
2229        struct kib_conn *conn;
2230        lnet_ni_t *ni  = NULL;
2231        struct kib_net *net = NULL;
2232        lnet_nid_t nid;
2233        struct rdma_conn_param cp;
2234        struct kib_rej rej;
2235        int version = IBLND_MSG_VERSION;
2236        unsigned long flags;
2237        int max_frags;
2238        int rc;
2239        struct sockaddr_in *peer_addr;
2240
2241        LASSERT(!in_interrupt());
2242
2243        /* cmid inherits 'context' from the corresponding listener id */
2244        ibdev = (struct kib_dev *)cmid->context;
2245        LASSERT(ibdev);
2246
2247        memset(&rej, 0, sizeof(rej));
2248        rej.ibr_magic = IBLND_MSG_MAGIC;
2249        rej.ibr_why = IBLND_REJECT_FATAL;
2250        rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2251
2252        peer_addr = (struct sockaddr_in *)&cmid->route.addr.dst_addr;
2253        if (*kiblnd_tunables.kib_require_priv_port &&
2254            ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2255                __u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2256
2257                CERROR("Peer's port (%pI4h:%hu) is not privileged\n",
2258                       &ip, ntohs(peer_addr->sin_port));
2259                goto failed;
2260        }
2261
2262        if (priv_nob < offsetof(struct kib_msg, ibm_type)) {
2263                CERROR("Short connection request\n");
2264                goto failed;
2265        }
2266
2267        /*
2268         * Future protocol version compatibility support!  If the
2269         * o2iblnd-specific protocol changes, or when LNET unifies
2270         * protocols over all LNDs, the initial connection will
2271         * negotiate a protocol version.  I trap this here to avoid
2272         * console errors; the reject tells the peer which protocol I
2273         * speak.
2274         */
2275        if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2276            reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2277                goto failed;
2278        if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2279            reqmsg->ibm_version != IBLND_MSG_VERSION &&
2280            reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2281                goto failed;
2282        if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2283            reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2284            reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2285                goto failed;
2286
2287        rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2288        if (rc) {
2289                CERROR("Can't parse connection request: %d\n", rc);
2290                goto failed;
2291        }
2292
2293        nid = reqmsg->ibm_srcnid;
2294        ni = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2295
2296        if (ni) {
2297                net = (struct kib_net *)ni->ni_data;
2298                rej.ibr_incarnation = net->ibn_incarnation;
2299        }
2300
2301        if (!ni ||                       /* no matching net */
2302            ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2303            net->ibn_dev != ibdev) {          /* wrong device */
2304                CERROR("Can't accept conn from %s on %s (%s:%d:%pI4h): bad dst nid %s\n",
2305                       libcfs_nid2str(nid),
2306                       !ni ? "NA" : libcfs_nid2str(ni->ni_nid),
2307                       ibdev->ibd_ifname, ibdev->ibd_nnets,
2308                       &ibdev->ibd_ifip,
2309                       libcfs_nid2str(reqmsg->ibm_dstnid));
2310
2311                goto failed;
2312        }
2313
2314       /* check time stamp as soon as possible */
2315        if (reqmsg->ibm_dststamp &&
2316            reqmsg->ibm_dststamp != net->ibn_incarnation) {
2317                CWARN("Stale connection request\n");
2318                rej.ibr_why = IBLND_REJECT_CONN_STALE;
2319                goto failed;
2320        }
2321
2322        /* I can accept peer's version */
2323        version = reqmsg->ibm_version;
2324
2325        if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2326                CERROR("Unexpected connreq msg type: %x from %s\n",
2327                       reqmsg->ibm_type, libcfs_nid2str(nid));
2328                goto failed;
2329        }
2330
2331        if (reqmsg->ibm_u.connparams.ibcp_queue_depth >
2332            kiblnd_msg_queue_size(version, ni)) {
2333                CERROR("Can't accept conn from %s, queue depth too large: %d (<=%d wanted)\n",
2334                       libcfs_nid2str(nid),
2335                       reqmsg->ibm_u.connparams.ibcp_queue_depth,
2336                       kiblnd_msg_queue_size(version, ni));
2337
2338                if (version == IBLND_MSG_VERSION)
2339                        rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2340
2341                goto failed;
2342        }
2343
2344        max_frags = reqmsg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2345        if (max_frags > kiblnd_rdma_frags(version, ni)) {
2346                CWARN("Can't accept conn from %s (version %x): max message size %d is too large (%d wanted)\n",
2347                      libcfs_nid2str(nid), version, max_frags,
2348                      kiblnd_rdma_frags(version, ni));
2349
2350                if (version >= IBLND_MSG_VERSION)
2351                        rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2352
2353                goto failed;
2354        } else if (max_frags < kiblnd_rdma_frags(version, ni) &&
2355                   !net->ibn_fmr_ps) {
2356                CWARN("Can't accept conn from %s (version %x): max message size %d incompatible without FMR pool (%d wanted)\n",
2357                      libcfs_nid2str(nid), version, max_frags,
2358                      kiblnd_rdma_frags(version, ni));
2359
2360                if (version == IBLND_MSG_VERSION)
2361                        rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2362
2363                goto failed;
2364        }
2365
2366        if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2367                CERROR("Can't accept %s: message size %d too big (%d max)\n",
2368                       libcfs_nid2str(nid),
2369                       reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2370                       IBLND_MSG_SIZE);
2371                goto failed;
2372        }
2373
2374        /* assume 'nid' is a new peer; create  */
2375        rc = kiblnd_create_peer(ni, &peer, nid);
2376        if (rc) {
2377                CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2378                rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2379                goto failed;
2380        }
2381
2382        /* We have validated the peer's parameters so use those */
2383        peer->ibp_max_frags = max_frags;
2384        peer->ibp_queue_depth = reqmsg->ibm_u.connparams.ibcp_queue_depth;
2385
2386        write_lock_irqsave(g_lock, flags);
2387
2388        peer2 = kiblnd_find_peer_locked(nid);
2389        if (peer2) {
2390                if (!peer2->ibp_version) {
2391                        peer2->ibp_version     = version;
2392                        peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2393                }
2394
2395                /* not the guy I've talked with */
2396                if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2397                    peer2->ibp_version     != version) {
2398                        kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2399
2400                        if (kiblnd_peer_active(peer2)) {
2401                                peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2402                                peer2->ibp_version = version;
2403                        }
2404                        write_unlock_irqrestore(g_lock, flags);
2405
2406                        CWARN("Conn stale %s version %x/%x incarnation %llu/%llu\n",
2407                              libcfs_nid2str(nid), peer2->ibp_version, version,
2408                              peer2->ibp_incarnation, reqmsg->ibm_srcstamp);
2409
2410                        kiblnd_peer_decref(peer);
2411                        rej.ibr_why = IBLND_REJECT_CONN_STALE;
2412                        goto failed;
2413                }
2414
2415                /*
2416                 * Tie-break connection race in favour of the higher NID.
2417                 * If we keep running into a race condition multiple times,
2418                 * we have to assume that the connection attempt with the
2419                 * higher NID is stuck in a connecting state and will never
2420                 * recover.  As such, we pass through this if-block and let
2421                 * the lower NID connection win so we can move forward.
2422                 */
2423                if (peer2->ibp_connecting &&
2424                    nid < ni->ni_nid && peer2->ibp_races <
2425                    MAX_CONN_RACES_BEFORE_ABORT) {
2426                        peer2->ibp_races++;
2427                        write_unlock_irqrestore(g_lock, flags);
2428
2429                        CDEBUG(D_NET, "Conn race %s\n",
2430                               libcfs_nid2str(peer2->ibp_nid));
2431
2432                        kiblnd_peer_decref(peer);
2433                        rej.ibr_why = IBLND_REJECT_CONN_RACE;
2434                        goto failed;
2435                }
2436                if (peer2->ibp_races >= MAX_CONN_RACES_BEFORE_ABORT)
2437                        CNETERR("Conn race %s: unresolved after %d attempts, letting lower NID win\n",
2438                                libcfs_nid2str(peer2->ibp_nid),
2439                                MAX_CONN_RACES_BEFORE_ABORT);
2440                /**
2441                 * passive connection is allowed even this peer is waiting for
2442                 * reconnection.
2443                 */
2444                peer2->ibp_reconnecting = 0;
2445                peer2->ibp_races = 0;
2446                peer2->ibp_accepting++;
2447                kiblnd_peer_addref(peer2);
2448
2449                /**
2450                 * Race with kiblnd_launch_tx (active connect) to create peer
2451                 * so copy validated parameters since we now know what the
2452                 * peer's limits are
2453                 */
2454                peer2->ibp_max_frags = peer->ibp_max_frags;
2455                peer2->ibp_queue_depth = peer->ibp_queue_depth;
2456
2457                write_unlock_irqrestore(g_lock, flags);
2458                kiblnd_peer_decref(peer);
2459                peer = peer2;
2460        } else {
2461                /* Brand new peer */
2462                LASSERT(!peer->ibp_accepting);
2463                LASSERT(!peer->ibp_version &&
2464                        !peer->ibp_incarnation);
2465
2466                peer->ibp_accepting   = 1;
2467                peer->ibp_version     = version;
2468                peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2469
2470                /* I have a ref on ni that prevents it being shutdown */
2471                LASSERT(!net->ibn_shutdown);
2472
2473                kiblnd_peer_addref(peer);
2474                list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2475
2476                write_unlock_irqrestore(g_lock, flags);
2477        }
2478
2479        conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT,
2480                                  version);
2481        if (!conn) {
2482                kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2483                kiblnd_peer_decref(peer);
2484                rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2485                goto failed;
2486        }
2487
2488        /*
2489         * conn now "owns" cmid, so I return success from here on to ensure the
2490         * CM callback doesn't destroy cmid.
2491         */
2492        conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2493        conn->ibc_credits          = conn->ibc_queue_depth;
2494        conn->ibc_reserved_credits = conn->ibc_queue_depth;
2495        LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2496                IBLND_OOB_MSGS(version) <= IBLND_RX_MSGS(conn));
2497
2498        ackmsg = &conn->ibc_connvars->cv_msg;
2499        memset(ackmsg, 0, sizeof(*ackmsg));
2500
2501        kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2502                        sizeof(ackmsg->ibm_u.connparams));
2503        ackmsg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2504        ackmsg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2505        ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2506
2507        kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2508
2509        memset(&cp, 0, sizeof(cp));
2510        cp.private_data = ackmsg;
2511        cp.private_data_len = ackmsg->ibm_nob;
2512        cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2513        cp.initiator_depth = 0;
2514        cp.flow_control = 1;
2515        cp.retry_count = *kiblnd_tunables.kib_retry_count;
2516        cp.rnr_retry_count = *kiblnd_tunables.kib_rnr_retry_count;
2517
2518        CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2519
2520        rc = rdma_accept(cmid, &cp);
2521        if (rc) {
2522                CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2523                rej.ibr_version = version;
2524                rej.ibr_why     = IBLND_REJECT_FATAL;
2525
2526                kiblnd_reject(cmid, &rej);
2527                kiblnd_connreq_done(conn, rc);
2528                kiblnd_conn_decref(conn);
2529        }
2530
2531        lnet_ni_decref(ni);
2532        return 0;
2533
2534 failed:
2535        if (ni) {
2536                rej.ibr_cp.ibcp_queue_depth = kiblnd_msg_queue_size(version, ni);
2537                rej.ibr_cp.ibcp_max_frags = kiblnd_rdma_frags(version, ni);
2538                lnet_ni_decref(ni);
2539        }
2540
2541        rej.ibr_version             = version;
2542        kiblnd_reject(cmid, &rej);
2543
2544        return -ECONNREFUSED;
2545}
2546
2547static void
2548kiblnd_check_reconnect(struct kib_conn *conn, int version,
2549                       __u64 incarnation, int why, struct kib_connparams *cp)
2550{
2551        rwlock_t *glock = &kiblnd_data.kib_global_lock;
2552        struct kib_peer *peer = conn->ibc_peer;
2553        char *reason;
2554        int msg_size = IBLND_MSG_SIZE;
2555        int frag_num = -1;
2556        int queue_dep = -1;
2557        bool reconnect;
2558        unsigned long flags;
2559
2560        LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2561        LASSERT(peer->ibp_connecting > 0);     /* 'conn' at least */
2562        LASSERT(!peer->ibp_reconnecting);
2563
2564        if (cp) {
2565                msg_size = cp->ibcp_max_msg_size;
2566                frag_num        = cp->ibcp_max_frags << IBLND_FRAG_SHIFT;
2567                queue_dep = cp->ibcp_queue_depth;
2568        }
2569
2570        write_lock_irqsave(glock, flags);
2571        /**
2572         * retry connection if it's still needed and no other connection
2573         * attempts (active or passive) are in progress
2574         * NB: reconnect is still needed even when ibp_tx_queue is
2575         * empty if ibp_version != version because reconnect may be
2576         * initiated by kiblnd_query()
2577         */
2578        reconnect = (!list_empty(&peer->ibp_tx_queue) ||
2579                     peer->ibp_version != version) &&
2580                    peer->ibp_connecting == 1 &&
2581                    !peer->ibp_accepting;
2582        if (!reconnect) {
2583                reason = "no need";
2584                goto out;
2585        }
2586
2587        switch (why) {
2588        default:
2589                reason = "Unknown";
2590                break;
2591
2592        case IBLND_REJECT_RDMA_FRAGS: {
2593                struct lnet_ioctl_config_lnd_tunables *tunables;
2594
2595                if (!cp) {
2596                        reason = "can't negotiate max frags";
2597                        goto out;
2598                }
2599                tunables = peer->ibp_ni->ni_lnd_tunables;
2600                if (!tunables->lt_tun_u.lt_o2ib.lnd_map_on_demand) {
2601                        reason = "map_on_demand must be enabled";
2602                        goto out;
2603                }
2604                if (conn->ibc_max_frags <= frag_num) {
2605                        reason = "unsupported max frags";
2606                        goto out;
2607                }
2608
2609                peer->ibp_max_frags = frag_num;
2610                reason = "rdma fragments";
2611                break;
2612        }
2613        case IBLND_REJECT_MSG_QUEUE_SIZE:
2614                if (!cp) {
2615                        reason = "can't negotiate queue depth";
2616                        goto out;
2617                }
2618                if (conn->ibc_queue_depth <= queue_dep) {
2619                        reason = "unsupported queue depth";
2620                        goto out;
2621                }
2622
2623                peer->ibp_queue_depth = queue_dep;
2624                reason = "queue depth";
2625                break;
2626
2627        case IBLND_REJECT_CONN_STALE:
2628                reason = "stale";
2629                break;
2630
2631        case IBLND_REJECT_CONN_RACE:
2632                reason = "conn race";
2633                break;
2634
2635        case IBLND_REJECT_CONN_UNCOMPAT:
2636                reason = "version negotiation";
2637                break;
2638        }
2639
2640        conn->ibc_reconnect = 1;
2641        peer->ibp_reconnecting = 1;
2642        peer->ibp_version = version;
2643        if (incarnation)
2644                peer->ibp_incarnation = incarnation;
2645out:
2646        write_unlock_irqrestore(glock, flags);
2647
2648        CNETERR("%s: %s (%s), %x, %x, msg_size: %d, queue_depth: %d/%d, max_frags: %d/%d\n",
2649                libcfs_nid2str(peer->ibp_nid),
2650                reconnect ? "reconnect" : "don't reconnect",
2651                reason, IBLND_MSG_VERSION, version, msg_size,
2652                conn->ibc_queue_depth, queue_dep,
2653                conn->ibc_max_frags, frag_num);
2654        /**
2655         * if conn::ibc_reconnect is TRUE, connd will reconnect to the peer
2656         * while destroying the zombie
2657         */
2658}
2659
2660static void
2661kiblnd_rejected(struct kib_conn *conn, int reason, void *priv, int priv_nob)
2662{
2663        struct kib_peer *peer = conn->ibc_peer;
2664
2665        LASSERT(!in_interrupt());
2666        LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2667
2668        switch (reason) {
2669        case IB_CM_REJ_STALE_CONN:
2670                kiblnd_check_reconnect(conn, IBLND_MSG_VERSION, 0,
2671                                       IBLND_REJECT_CONN_STALE, NULL);
2672                break;
2673
2674        case IB_CM_REJ_INVALID_SERVICE_ID:
2675                CNETERR("%s rejected: no listener at %d\n",
2676                        libcfs_nid2str(peer->ibp_nid),
2677                        *kiblnd_tunables.kib_service);
2678                break;
2679
2680        case IB_CM_REJ_CONSUMER_DEFINED:
2681                if (priv_nob >= offsetof(struct kib_rej, ibr_padding)) {
2682                        struct kib_rej *rej = priv;
2683                        struct kib_connparams *cp = NULL;
2684                        int flip = 0;
2685                        __u64 incarnation = -1;
2686
2687                        /* NB. default incarnation is -1 because:
2688                         * a) V1 will ignore dst incarnation in connreq.
2689                         * b) V2 will provide incarnation while rejecting me,
2690                         *    -1 will be overwrote.
2691                         *
2692                         * if I try to connect to a V1 peer with V2 protocol,
2693                         * it rejected me then upgrade to V2, I have no idea
2694                         * about the upgrading and try to reconnect with V1,
2695                         * in this case upgraded V2 can find out I'm trying to
2696                         * talk to the old guy and reject me(incarnation is -1).
2697                         */
2698
2699                        if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2700                            rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2701                                __swab32s(&rej->ibr_magic);
2702                                __swab16s(&rej->ibr_version);
2703                                flip = 1;
2704                        }
2705
2706                        if (priv_nob >= sizeof(struct kib_rej) &&
2707                            rej->ibr_version > IBLND_MSG_VERSION_1) {
2708                                /*
2709                                 * priv_nob is always 148 in current version
2710                                 * of OFED, so we still need to check version.
2711                                 * (define of IB_CM_REJ_PRIVATE_DATA_SIZE)
2712                                 */
2713                                cp = &rej->ibr_cp;
2714
2715                                if (flip) {
2716                                        __swab64s(&rej->ibr_incarnation);
2717                                        __swab16s(&cp->ibcp_queue_depth);
2718                                        __swab16s(&cp->ibcp_max_frags);
2719                                        __swab32s(&cp->ibcp_max_msg_size);
2720                                }
2721
2722                                incarnation = rej->ibr_incarnation;
2723                        }
2724
2725                        if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2726                            rej->ibr_magic != LNET_PROTO_MAGIC) {
2727                                CERROR("%s rejected: consumer defined fatal error\n",
2728                                       libcfs_nid2str(peer->ibp_nid));
2729                                break;
2730                        }
2731
2732                        if (rej->ibr_version != IBLND_MSG_VERSION &&
2733                            rej->ibr_version != IBLND_MSG_VERSION_1) {
2734                                CERROR("%s rejected: o2iblnd version %x error\n",
2735                                       libcfs_nid2str(peer->ibp_nid),
2736                                       rej->ibr_version);
2737                                break;
2738                        }
2739
2740                        if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2741                            rej->ibr_version == IBLND_MSG_VERSION_1) {
2742                                CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2743                                       libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2744
2745                                if (conn->ibc_version != IBLND_MSG_VERSION_1)
2746                                        rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2747                        }
2748
2749                        switch (rej->ibr_why) {
2750                        case IBLND_REJECT_CONN_RACE:
2751                        case IBLND_REJECT_CONN_STALE:
2752                        case IBLND_REJECT_CONN_UNCOMPAT:
2753                        case IBLND_REJECT_MSG_QUEUE_SIZE:
2754                        case IBLND_REJECT_RDMA_FRAGS:
2755                                kiblnd_check_reconnect(conn, rej->ibr_version,
2756                                                       incarnation,
2757                                                       rej->ibr_why, cp);
2758                                break;
2759
2760                        case IBLND_REJECT_NO_RESOURCES:
2761                                CERROR("%s rejected: o2iblnd no resources\n",
2762                                       libcfs_nid2str(peer->ibp_nid));
2763                                break;
2764
2765                        case IBLND_REJECT_FATAL:
2766                                CERROR("%s rejected: o2iblnd fatal error\n",
2767                                       libcfs_nid2str(peer->ibp_nid));
2768                                break;
2769
2770                        default:
2771                                CERROR("%s rejected: o2iblnd reason %d\n",
2772                                       libcfs_nid2str(peer->ibp_nid),
2773                                       rej->ibr_why);
2774                                break;
2775                        }
2776                        break;
2777                }
2778                /* fall through */
2779        default:
2780                CNETERR("%s rejected: reason %d, size %d\n",
2781                        libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2782                break;
2783        }
2784
2785        kiblnd_connreq_done(conn, -ECONNREFUSED);
2786}
2787
2788static void
2789kiblnd_check_connreply(struct kib_conn *conn, void *priv, int priv_nob)
2790{
2791        struct kib_peer *peer = conn->ibc_peer;
2792        lnet_ni_t *ni = peer->ibp_ni;
2793        struct kib_net *net = ni->ni_data;
2794        struct kib_msg *msg = priv;
2795        int ver = conn->ibc_version;
2796        int rc = kiblnd_unpack_msg(msg, priv_nob);
2797        unsigned long flags;
2798
2799        LASSERT(net);
2800
2801        if (rc) {
2802                CERROR("Can't unpack connack from %s: %d\n",
2803                       libcfs_nid2str(peer->ibp_nid), rc);
2804                goto failed;
2805        }
2806
2807        if (msg->ibm_type != IBLND_MSG_CONNACK) {
2808                CERROR("Unexpected message %d from %s\n",
2809                       msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2810                rc = -EPROTO;
2811                goto failed;
2812        }
2813
2814        if (ver != msg->ibm_version) {
2815                CERROR("%s replied version %x is different with requested version %x\n",
2816                       libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2817                rc = -EPROTO;
2818                goto failed;
2819        }
2820
2821        if (msg->ibm_u.connparams.ibcp_queue_depth >
2822            conn->ibc_queue_depth) {
2823                CERROR("%s has incompatible queue depth %d (<=%d wanted)\n",
2824                       libcfs_nid2str(peer->ibp_nid),
2825                       msg->ibm_u.connparams.ibcp_queue_depth,
2826                       conn->ibc_queue_depth);
2827                rc = -EPROTO;
2828                goto failed;
2829        }
2830
2831        if ((msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT) >
2832            conn->ibc_max_frags) {
2833                CERROR("%s has incompatible max_frags %d (<=%d wanted)\n",
2834                       libcfs_nid2str(peer->ibp_nid),
2835                       msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT,
2836                       conn->ibc_max_frags);
2837                rc = -EPROTO;
2838                goto failed;
2839        }
2840
2841        if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2842                CERROR("%s max message size %d too big (%d max)\n",
2843                       libcfs_nid2str(peer->ibp_nid),
2844                       msg->ibm_u.connparams.ibcp_max_msg_size,
2845                       IBLND_MSG_SIZE);
2846                rc = -EPROTO;
2847                goto failed;
2848        }
2849
2850        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2851        if (msg->ibm_dstnid == ni->ni_nid &&
2852            msg->ibm_dststamp == net->ibn_incarnation)
2853                rc = 0;
2854        else
2855                rc = -ESTALE;
2856        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2857
2858        if (rc) {
2859                CERROR("Bad connection reply from %s, rc = %d, version: %x max_frags: %d\n",
2860                       libcfs_nid2str(peer->ibp_nid), rc,
2861                       msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2862                goto failed;
2863        }
2864
2865        conn->ibc_incarnation = msg->ibm_srcstamp;
2866        conn->ibc_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2867        conn->ibc_reserved_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2868        conn->ibc_queue_depth = msg->ibm_u.connparams.ibcp_queue_depth;
2869        conn->ibc_max_frags = msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2870        LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2871                IBLND_OOB_MSGS(ver) <= IBLND_RX_MSGS(conn));
2872
2873        kiblnd_connreq_done(conn, 0);
2874        return;
2875
2876 failed:
2877        /*
2878         * NB My QP has already established itself, so I handle anything going
2879         * wrong here by setting ibc_comms_error.
2880         * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2881         * immediately tears it down.
2882         */
2883        LASSERT(rc);
2884        conn->ibc_comms_error = rc;
2885        kiblnd_connreq_done(conn, 0);
2886}
2887
2888static int
2889kiblnd_active_connect(struct rdma_cm_id *cmid)
2890{
2891        struct kib_peer *peer = (struct kib_peer *)cmid->context;
2892        struct kib_conn *conn;
2893        struct kib_msg *msg;
2894        struct rdma_conn_param cp;
2895        int version;
2896        __u64 incarnation;
2897        unsigned long flags;
2898        int rc;
2899
2900        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2901
2902        incarnation = peer->ibp_incarnation;
2903        version = !peer->ibp_version ? IBLND_MSG_VERSION :
2904                                       peer->ibp_version;
2905
2906        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2907
2908        conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT,
2909                                  version);
2910        if (!conn) {
2911                kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2912                kiblnd_peer_decref(peer); /* lose cmid's ref */
2913                return -ENOMEM;
2914        }
2915
2916        /*
2917         * conn "owns" cmid now, so I return success from here on to ensure the
2918         * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2919         * on peer
2920         */
2921        msg = &conn->ibc_connvars->cv_msg;
2922
2923        memset(msg, 0, sizeof(*msg));
2924        kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2925        msg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2926        msg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2927        msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2928
2929        kiblnd_pack_msg(peer->ibp_ni, msg, version,
2930                        0, peer->ibp_nid, incarnation);
2931
2932        memset(&cp, 0, sizeof(cp));
2933        cp.private_data = msg;
2934        cp.private_data_len    = msg->ibm_nob;
2935        cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2936        cp.initiator_depth     = 0;
2937        cp.flow_control        = 1;
2938        cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2939        cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2940
2941        LASSERT(cmid->context == (void *)conn);
2942        LASSERT(conn->ibc_cmid == cmid);
2943
2944        rc = rdma_connect(cmid, &cp);
2945        if (rc) {
2946                CERROR("Can't connect to %s: %d\n",
2947                       libcfs_nid2str(peer->ibp_nid), rc);
2948                kiblnd_connreq_done(conn, rc);
2949                kiblnd_conn_decref(conn);
2950        }
2951
2952        return 0;
2953}
2954
2955int
2956kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2957{
2958        struct kib_peer *peer;
2959        struct kib_conn *conn;
2960        int rc;
2961
2962        switch (event->event) {
2963        default:
2964                CERROR("Unexpected event: %d, status: %d\n",
2965                       event->event, event->status);
2966                LBUG();
2967
2968        case RDMA_CM_EVENT_CONNECT_REQUEST:
2969                /* destroy cmid on failure */
2970                rc = kiblnd_passive_connect(cmid,
2971                                            (void *)KIBLND_CONN_PARAM(event),
2972                                            KIBLND_CONN_PARAM_LEN(event));
2973                CDEBUG(D_NET, "connreq: %d\n", rc);
2974                return rc;
2975
2976        case RDMA_CM_EVENT_ADDR_ERROR:
2977                peer = (struct kib_peer *)cmid->context;
2978                CNETERR("%s: ADDR ERROR %d\n",
2979                        libcfs_nid2str(peer->ibp_nid), event->status);
2980                kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2981                kiblnd_peer_decref(peer);
2982                return -EHOSTUNREACH;      /* rc destroys cmid */
2983
2984        case RDMA_CM_EVENT_ADDR_RESOLVED:
2985                peer = (struct kib_peer *)cmid->context;
2986
2987                CDEBUG(D_NET, "%s Addr resolved: %d\n",
2988                       libcfs_nid2str(peer->ibp_nid), event->status);
2989
2990                if (event->status) {
2991                        CNETERR("Can't resolve address for %s: %d\n",
2992                                libcfs_nid2str(peer->ibp_nid), event->status);
2993                        rc = event->status;
2994                } else {
2995                        rc = rdma_resolve_route(
2996                                cmid, *kiblnd_tunables.kib_timeout * 1000);
2997                        if (!rc)
2998                                return 0;
2999                        /* Can't initiate route resolution */
3000                        CERROR("Can't resolve route for %s: %d\n",
3001                               libcfs_nid2str(peer->ibp_nid), rc);
3002                }
3003                kiblnd_peer_connect_failed(peer, 1, rc);
3004                kiblnd_peer_decref(peer);
3005                return rc;                    /* rc destroys cmid */
3006
3007        case RDMA_CM_EVENT_ROUTE_ERROR:
3008                peer = (struct kib_peer *)cmid->context;
3009                CNETERR("%s: ROUTE ERROR %d\n",
3010                        libcfs_nid2str(peer->ibp_nid), event->status);
3011                kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
3012                kiblnd_peer_decref(peer);
3013                return -EHOSTUNREACH;      /* rc destroys cmid */
3014
3015        case RDMA_CM_EVENT_ROUTE_RESOLVED:
3016                peer = (struct kib_peer *)cmid->context;
3017                CDEBUG(D_NET, "%s Route resolved: %d\n",
3018                       libcfs_nid2str(peer->ibp_nid), event->status);
3019
3020                if (!event->status)
3021                        return kiblnd_active_connect(cmid);
3022
3023                CNETERR("Can't resolve route for %s: %d\n",
3024                        libcfs_nid2str(peer->ibp_nid), event->status);
3025                kiblnd_peer_connect_failed(peer, 1, event->status);
3026                kiblnd_peer_decref(peer);
3027                return event->status;      /* rc destroys cmid */
3028
3029        case RDMA_CM_EVENT_UNREACHABLE:
3030                conn = (struct kib_conn *)cmid->context;
3031                LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3032                        conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3033                CNETERR("%s: UNREACHABLE %d\n",
3034                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3035                kiblnd_connreq_done(conn, -ENETDOWN);
3036                kiblnd_conn_decref(conn);
3037                return 0;
3038
3039        case RDMA_CM_EVENT_CONNECT_ERROR:
3040                conn = (struct kib_conn *)cmid->context;
3041                LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3042                        conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3043                CNETERR("%s: CONNECT ERROR %d\n",
3044                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3045                kiblnd_connreq_done(conn, -ENOTCONN);
3046                kiblnd_conn_decref(conn);
3047                return 0;
3048
3049        case RDMA_CM_EVENT_REJECTED:
3050                conn = (struct kib_conn *)cmid->context;
3051                switch (conn->ibc_state) {
3052                default:
3053                        LBUG();
3054
3055                case IBLND_CONN_PASSIVE_WAIT:
3056                        CERROR("%s: REJECTED %d\n",
3057                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
3058                               event->status);
3059                        kiblnd_connreq_done(conn, -ECONNRESET);
3060                        break;
3061
3062                case IBLND_CONN_ACTIVE_CONNECT:
3063                        kiblnd_rejected(conn, event->status,
3064                                        (void *)KIBLND_CONN_PARAM(event),
3065                                        KIBLND_CONN_PARAM_LEN(event));
3066                        break;
3067                }
3068                kiblnd_conn_decref(conn);
3069                return 0;
3070
3071        case RDMA_CM_EVENT_ESTABLISHED:
3072                conn = (struct kib_conn *)cmid->context;
3073                switch (conn->ibc_state) {
3074                default:
3075                        LBUG();
3076
3077                case IBLND_CONN_PASSIVE_WAIT:
3078                        CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
3079                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
3080                        kiblnd_connreq_done(conn, 0);
3081                        break;
3082
3083                case IBLND_CONN_ACTIVE_CONNECT:
3084                        CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
3085                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
3086                        kiblnd_check_connreply(conn,
3087                                               (void *)KIBLND_CONN_PARAM(event),
3088                                               KIBLND_CONN_PARAM_LEN(event));
3089                        break;
3090                }
3091                /* net keeps its ref on conn! */
3092                return 0;
3093
3094        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
3095                CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
3096                return 0;
3097        case RDMA_CM_EVENT_DISCONNECTED:
3098                conn = (struct kib_conn *)cmid->context;
3099                if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
3100                        CERROR("%s DISCONNECTED\n",
3101                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
3102                        kiblnd_connreq_done(conn, -ECONNRESET);
3103                } else {
3104                        kiblnd_close_conn(conn, 0);
3105                }
3106                kiblnd_conn_decref(conn);
3107                cmid->context = NULL;
3108                return 0;
3109
3110        case RDMA_CM_EVENT_DEVICE_REMOVAL:
3111                LCONSOLE_ERROR_MSG(0x131,
3112                                   "Received notification of device removal\n"
3113                                   "Please shutdown LNET to allow this to proceed\n");
3114                /*
3115                 * Can't remove network from underneath LNET for now, so I have
3116                 * to ignore this
3117                 */
3118                return 0;
3119
3120        case RDMA_CM_EVENT_ADDR_CHANGE:
3121                LCONSOLE_INFO("Physical link changed (eg hca/port)\n");
3122                return 0;
3123        }
3124}
3125
3126static int
3127kiblnd_check_txs_locked(struct kib_conn *conn, struct list_head *txs)
3128{
3129        struct kib_tx *tx;
3130        struct list_head *ttmp;
3131
3132        list_for_each(ttmp, txs) {
3133                tx = list_entry(ttmp, struct kib_tx, tx_list);
3134
3135                if (txs != &conn->ibc_active_txs) {
3136                        LASSERT(tx->tx_queued);
3137                } else {
3138                        LASSERT(!tx->tx_queued);
3139                        LASSERT(tx->tx_waiting || tx->tx_sending);
3140                }
3141
3142                if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
3143                        CERROR("Timed out tx: %s, %lu seconds\n",
3144                               kiblnd_queue2str(conn, txs),
3145                               cfs_duration_sec(jiffies - tx->tx_deadline));
3146                        return 1;
3147                }
3148        }
3149
3150        return 0;
3151}
3152
3153static int
3154kiblnd_conn_timed_out_locked(struct kib_conn *conn)
3155{
3156        return  kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue) ||
3157                kiblnd_check_txs_locked(conn, &conn->ibc_tx_noops) ||
3158                kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_rsrvd) ||
3159                kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_nocred) ||
3160                kiblnd_check_txs_locked(conn, &conn->ibc_active_txs);
3161}
3162
3163static void
3164kiblnd_check_conns(int idx)
3165{
3166        LIST_HEAD(closes);
3167        LIST_HEAD(checksends);
3168        struct list_head *peers = &kiblnd_data.kib_peers[idx];
3169        struct list_head *ptmp;
3170        struct kib_peer *peer;
3171        struct kib_conn *conn;
3172        struct kib_conn *temp;
3173        struct kib_conn *tmp;
3174        struct list_head *ctmp;
3175        unsigned long flags;
3176
3177        /*
3178         * NB. We expect to have a look at all the peers and not find any
3179         * RDMAs to time out, so we just use a shared lock while we
3180         * take a look...
3181         */
3182        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3183
3184        list_for_each(ptmp, peers) {
3185                peer = list_entry(ptmp, struct kib_peer, ibp_list);
3186
3187                list_for_each(ctmp, &peer->ibp_conns) {
3188                        int timedout;
3189                        int sendnoop;
3190
3191                        conn = list_entry(ctmp, struct kib_conn, ibc_list);
3192
3193                        LASSERT(conn->ibc_state == IBLND_CONN_ESTABLISHED);
3194
3195                        spin_lock(&conn->ibc_lock);
3196
3197                        sendnoop = kiblnd_need_noop(conn);
3198                        timedout = kiblnd_conn_timed_out_locked(conn);
3199                        if (!sendnoop && !timedout) {
3200                                spin_unlock(&conn->ibc_lock);
3201                                continue;
3202                        }
3203
3204                        if (timedout) {
3205                                CERROR("Timed out RDMA with %s (%lu): c: %u, oc: %u, rc: %u\n",
3206                                       libcfs_nid2str(peer->ibp_nid),
3207                                       cfs_duration_sec(cfs_time_current() -
3208                                                        peer->ibp_last_alive),
3209                                       conn->ibc_credits,
3210                                       conn->ibc_outstanding_credits,
3211                                       conn->ibc_reserved_credits);
3212                                list_add(&conn->ibc_connd_list, &closes);
3213                        } else {
3214                                list_add(&conn->ibc_connd_list, &checksends);
3215                        }
3216                        /* +ref for 'closes' or 'checksends' */
3217                        kiblnd_conn_addref(conn);
3218
3219                        spin_unlock(&conn->ibc_lock);
3220                }
3221        }
3222
3223        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3224
3225        /*
3226         * Handle timeout by closing the whole
3227         * connection. We can only be sure RDMA activity
3228         * has ceased once the QP has been modified.
3229         */
3230        list_for_each_entry_safe(conn, tmp, &closes, ibc_connd_list) {
3231                list_del(&conn->ibc_connd_list);
3232                kiblnd_close_conn(conn, -ETIMEDOUT);
3233                kiblnd_conn_decref(conn);
3234        }
3235
3236        /*
3237         * In case we have enough credits to return via a
3238         * NOOP, but there were no non-blocking tx descs
3239         * free to do it last time...
3240         */
3241        list_for_each_entry_safe(conn, temp, &checksends, ibc_connd_list) {
3242                list_del(&conn->ibc_connd_list);
3243
3244                spin_lock(&conn->ibc_lock);
3245                kiblnd_check_sends_locked(conn);
3246                spin_unlock(&conn->ibc_lock);
3247
3248                kiblnd_conn_decref(conn);
3249        }
3250}
3251
3252static void
3253kiblnd_disconnect_conn(struct kib_conn *conn)
3254{
3255        LASSERT(!in_interrupt());
3256        LASSERT(current == kiblnd_data.kib_connd);
3257        LASSERT(conn->ibc_state == IBLND_CONN_CLOSING);
3258
3259        rdma_disconnect(conn->ibc_cmid);
3260        kiblnd_finalise_conn(conn);
3261
3262        kiblnd_peer_notify(conn->ibc_peer);
3263}
3264
3265/**
3266 * High-water for reconnection to the same peer, reconnection attempt should
3267 * be delayed after trying more than KIB_RECONN_HIGH_RACE.
3268 */
3269#define KIB_RECONN_HIGH_RACE    10
3270/**
3271 * Allow connd to take a break and handle other things after consecutive
3272 * reconnection attemps.
3273 */
3274#define KIB_RECONN_BREAK        100
3275
3276int
3277kiblnd_connd(void *arg)
3278{
3279        spinlock_t *lock= &kiblnd_data.kib_connd_lock;
3280        wait_queue_t wait;
3281        unsigned long flags;
3282        struct kib_conn *conn;
3283        int timeout;
3284        int i;
3285        int dropped_lock;
3286        int peer_index = 0;
3287        unsigned long deadline = jiffies;
3288
3289        cfs_block_allsigs();
3290
3291        init_waitqueue_entry(&wait, current);
3292        kiblnd_data.kib_connd = current;
3293
3294        spin_lock_irqsave(lock, flags);
3295
3296        while (!kiblnd_data.kib_shutdown) {
3297                int reconn = 0;
3298
3299                dropped_lock = 0;
3300
3301                if (!list_empty(&kiblnd_data.kib_connd_zombies)) {
3302                        struct kib_peer *peer = NULL;
3303
3304                        conn = list_entry(kiblnd_data.kib_connd_zombies.next,
3305                                          struct kib_conn, ibc_list);
3306                        list_del(&conn->ibc_list);
3307                        if (conn->ibc_reconnect) {
3308                                peer = conn->ibc_peer;
3309                                kiblnd_peer_addref(peer);
3310                        }
3311
3312                        spin_unlock_irqrestore(lock, flags);
3313                        dropped_lock = 1;
3314
3315                        kiblnd_destroy_conn(conn, !peer);
3316
3317                        spin_lock_irqsave(lock, flags);
3318                        if (!peer)
3319                                continue;
3320
3321                        conn->ibc_peer = peer;
3322                        if (peer->ibp_reconnected < KIB_RECONN_HIGH_RACE)
3323                                list_add_tail(&conn->ibc_list,
3324                                              &kiblnd_data.kib_reconn_list);
3325                        else
3326                                list_add_tail(&conn->ibc_list,
3327                                              &kiblnd_data.kib_reconn_wait);
3328                }
3329
3330                if (!list_empty(&kiblnd_data.kib_connd_conns)) {
3331                        conn = list_entry(kiblnd_data.kib_connd_conns.next,
3332                                          struct kib_conn, ibc_list);
3333                        list_del(&conn->ibc_list);
3334
3335                        spin_unlock_irqrestore(lock, flags);
3336                        dropped_lock = 1;
3337
3338                        kiblnd_disconnect_conn(conn);
3339                        kiblnd_conn_decref(conn);
3340
3341                        spin_lock_irqsave(lock, flags);
3342                }
3343
3344                while (reconn < KIB_RECONN_BREAK) {
3345                        if (kiblnd_data.kib_reconn_sec !=
3346                            ktime_get_real_seconds()) {
3347                                kiblnd_data.kib_reconn_sec = ktime_get_real_seconds();
3348                                list_splice_init(&kiblnd_data.kib_reconn_wait,
3349                                                 &kiblnd_data.kib_reconn_list);
3350                        }
3351
3352                        if (list_empty(&kiblnd_data.kib_reconn_list))
3353                                break;
3354
3355                        conn = list_entry(kiblnd_data.kib_reconn_list.next,
3356                                          struct kib_conn, ibc_list);
3357                        list_del(&conn->ibc_list);
3358
3359                        spin_unlock_irqrestore(lock, flags);
3360                        dropped_lock = 1;
3361
3362                        reconn += kiblnd_reconnect_peer(conn->ibc_peer);
3363                        kiblnd_peer_decref(conn->ibc_peer);
3364                        LIBCFS_FREE(conn, sizeof(*conn));
3365
3366                        spin_lock_irqsave(lock, flags);
3367                }
3368
3369                /* careful with the jiffy wrap... */
3370                timeout = (int)(deadline - jiffies);
3371                if (timeout <= 0) {
3372                        const int n = 4;
3373                        const int p = 1;
3374                        int chunk = kiblnd_data.kib_peer_hash_size;
3375
3376                        spin_unlock_irqrestore(lock, flags);
3377                        dropped_lock = 1;
3378
3379                        /*
3380                         * Time to check for RDMA timeouts on a few more
3381                         * peers: I do checks every 'p' seconds on a
3382                         * proportion of the peer table and I need to check
3383                         * every connection 'n' times within a timeout
3384                         * interval, to ensure I detect a timeout on any
3385                         * connection within (n+1)/n times the timeout
3386                         * interval.
3387                         */
3388                        if (*kiblnd_tunables.kib_timeout > n * p)
3389                                chunk = (chunk * n * p) /
3390                                        *kiblnd_tunables.kib_timeout;
3391                        if (!chunk)
3392                                chunk = 1;
3393
3394                        for (i = 0; i < chunk; i++) {
3395                                kiblnd_check_conns(peer_index);
3396                                peer_index = (peer_index + 1) %
3397                                             kiblnd_data.kib_peer_hash_size;
3398                        }
3399
3400                        deadline += msecs_to_jiffies(p * MSEC_PER_SEC);
3401                        spin_lock_irqsave(lock, flags);
3402                }
3403
3404                if (dropped_lock)
3405                        continue;
3406
3407                /* Nothing to do for 'timeout'  */
3408                set_current_state(TASK_INTERRUPTIBLE);
3409                add_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3410                spin_unlock_irqrestore(lock, flags);
3411
3412                schedule_timeout(timeout);
3413
3414                remove_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3415                spin_lock_irqsave(lock, flags);
3416        }
3417
3418        spin_unlock_irqrestore(lock, flags);
3419
3420        kiblnd_thread_fini();
3421        return 0;
3422}
3423
3424void
3425kiblnd_qp_event(struct ib_event *event, void *arg)
3426{
3427        struct kib_conn *conn = arg;
3428
3429        switch (event->event) {
3430        case IB_EVENT_COMM_EST:
3431                CDEBUG(D_NET, "%s established\n",
3432                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
3433                /*
3434                 * We received a packet but connection isn't established
3435                 * probably handshake packet was lost, so free to
3436                 * force make connection established
3437                 */
3438                rdma_notify(conn->ibc_cmid, IB_EVENT_COMM_EST);
3439                return;
3440
3441        default:
3442                CERROR("%s: Async QP event type %d\n",
3443                       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3444                return;
3445        }
3446}
3447
3448static void
3449kiblnd_complete(struct ib_wc *wc)
3450{
3451        switch (kiblnd_wreqid2type(wc->wr_id)) {
3452        default:
3453                LBUG();
3454
3455        case IBLND_WID_MR:
3456                if (wc->status != IB_WC_SUCCESS &&
3457                    wc->status != IB_WC_WR_FLUSH_ERR)
3458                        CNETERR("FastReg failed: %d\n", wc->status);
3459                break;
3460
3461        case IBLND_WID_RDMA:
3462                /*
3463                 * We only get RDMA completion notification if it fails.  All
3464                 * subsequent work items, including the final SEND will fail
3465                 * too.  However we can't print out any more info about the
3466                 * failing RDMA because 'tx' might be back on the idle list or
3467                 * even reused already if we didn't manage to post all our work
3468                 * items
3469                 */
3470                CNETERR("RDMA (tx: %p) failed: %d\n",
3471                        kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3472                return;
3473
3474        case IBLND_WID_TX:
3475                kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3476                return;
3477
3478        case IBLND_WID_RX:
3479                kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3480                                   wc->byte_len);
3481                return;
3482        }
3483}
3484
3485void
3486kiblnd_cq_completion(struct ib_cq *cq, void *arg)
3487{
3488        /*
3489         * NB I'm not allowed to schedule this conn once its refcount has
3490         * reached 0.  Since fundamentally I'm racing with scheduler threads
3491         * consuming my CQ I could be called after all completions have
3492         * occurred.  But in this case, !ibc_nrx && !ibc_nsends_posted
3493         * and this CQ is about to be destroyed so I NOOP.
3494         */
3495        struct kib_conn *conn = arg;
3496        struct kib_sched_info *sched = conn->ibc_sched;
3497        unsigned long flags;
3498
3499        LASSERT(cq == conn->ibc_cq);
3500
3501        spin_lock_irqsave(&sched->ibs_lock, flags);
3502
3503        conn->ibc_ready = 1;
3504
3505        if (!conn->ibc_scheduled &&
3506            (conn->ibc_nrx > 0 ||
3507             conn->ibc_nsends_posted > 0)) {
3508                kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3509                conn->ibc_scheduled = 1;
3510                list_add_tail(&conn->ibc_sched_list, &sched->ibs_conns);
3511
3512                if (waitqueue_active(&sched->ibs_waitq))
3513                        wake_up(&sched->ibs_waitq);
3514        }
3515
3516        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3517}
3518
3519void
3520kiblnd_cq_event(struct ib_event *event, void *arg)
3521{
3522        struct kib_conn *conn = arg;
3523
3524        CERROR("%s: async CQ event type %d\n",
3525               libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3526}
3527
3528int
3529kiblnd_scheduler(void *arg)
3530{
3531        long id = (long)arg;
3532        struct kib_sched_info *sched;
3533        struct kib_conn *conn;
3534        wait_queue_t wait;
3535        unsigned long flags;
3536        struct ib_wc wc;
3537        int did_something;
3538        int busy_loops = 0;
3539        int rc;
3540
3541        cfs_block_allsigs();
3542
3543        init_waitqueue_entry(&wait, current);
3544
3545        sched = kiblnd_data.kib_scheds[KIB_THREAD_CPT(id)];
3546
3547        rc = cfs_cpt_bind(lnet_cpt_table(), sched->ibs_cpt);
3548        if (rc) {
3549                CWARN("Failed to bind on CPT %d, please verify whether all CPUs are healthy and reload modules if necessary, otherwise your system might under risk of low performance\n",
3550                      sched->ibs_cpt);
3551        }
3552
3553        spin_lock_irqsave(&sched->ibs_lock, flags);
3554
3555        while (!kiblnd_data.kib_shutdown) {
3556                if (busy_loops++ >= IBLND_RESCHED) {
3557                        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3558
3559                        cond_resched();
3560                        busy_loops = 0;
3561
3562                        spin_lock_irqsave(&sched->ibs_lock, flags);
3563                }
3564
3565                did_something = 0;
3566
3567                if (!list_empty(&sched->ibs_conns)) {
3568                        conn = list_entry(sched->ibs_conns.next, struct kib_conn,
3569                                          ibc_sched_list);
3570                        /* take over kib_sched_conns' ref on conn... */
3571                        LASSERT(conn->ibc_scheduled);
3572                        list_del(&conn->ibc_sched_list);
3573                        conn->ibc_ready = 0;
3574
3575                        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3576
3577                        wc.wr_id = IBLND_WID_INVAL;
3578
3579                        rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3580                        if (!rc) {
3581                                rc = ib_req_notify_cq(conn->ibc_cq,
3582                                                      IB_CQ_NEXT_COMP);
3583                                if (rc < 0) {
3584                                        CWARN("%s: ib_req_notify_cq failed: %d, closing connection\n",
3585                                              libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3586                                        kiblnd_close_conn(conn, -EIO);
3587                                        kiblnd_conn_decref(conn);
3588                                        spin_lock_irqsave(&sched->ibs_lock,
3589                                                          flags);
3590                                        continue;
3591                                }
3592
3593                                rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3594                        }
3595
3596                        if (unlikely(rc > 0 && wc.wr_id == IBLND_WID_INVAL)) {
3597                                LCONSOLE_ERROR("ib_poll_cq (rc: %d) returned invalid wr_id, opcode %d, status: %d, vendor_err: %d, conn: %s status: %d\nplease upgrade firmware and OFED or contact vendor.\n",
3598                                               rc, wc.opcode, wc.status,
3599                                               wc.vendor_err,
3600                                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
3601                                               conn->ibc_state);
3602                                rc = -EINVAL;
3603                        }
3604
3605                        if (rc < 0) {
3606                                CWARN("%s: ib_poll_cq failed: %d, closing connection\n",
3607                                      libcfs_nid2str(conn->ibc_peer->ibp_nid),
3608                                      rc);
3609                                kiblnd_close_conn(conn, -EIO);
3610                                kiblnd_conn_decref(conn);
3611                                spin_lock_irqsave(&sched->ibs_lock, flags);
3612                                continue;
3613                        }
3614
3615                        spin_lock_irqsave(&sched->ibs_lock, flags);
3616
3617                        if (rc || conn->ibc_ready) {
3618                                /*
3619                                 * There may be another completion waiting; get
3620                                 * another scheduler to check while I handle
3621                                 * this one...
3622                                 */
3623                                /* +1 ref for sched_conns */
3624                                kiblnd_conn_addref(conn);
3625                                list_add_tail(&conn->ibc_sched_list,
3626                                              &sched->ibs_conns);
3627                                if (waitqueue_active(&sched->ibs_waitq))
3628                                        wake_up(&sched->ibs_waitq);
3629                        } else {
3630                                conn->ibc_scheduled = 0;
3631                        }
3632
3633                        if (rc) {
3634                                spin_unlock_irqrestore(&sched->ibs_lock, flags);
3635                                kiblnd_complete(&wc);
3636
3637                                spin_lock_irqsave(&sched->ibs_lock, flags);
3638                        }
3639
3640                        kiblnd_conn_decref(conn); /* ...drop my ref from above */
3641                        did_something = 1;
3642                }
3643
3644                if (did_something)
3645                        continue;
3646
3647                set_current_state(TASK_INTERRUPTIBLE);
3648                add_wait_queue_exclusive(&sched->ibs_waitq, &wait);
3649                spin_unlock_irqrestore(&sched->ibs_lock, flags);
3650
3651                schedule();
3652                busy_loops = 0;
3653
3654                remove_wait_queue(&sched->ibs_waitq, &wait);
3655                spin_lock_irqsave(&sched->ibs_lock, flags);
3656        }
3657
3658        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3659
3660        kiblnd_thread_fini();
3661        return 0;
3662}
3663
3664int
3665kiblnd_failover_thread(void *arg)
3666{
3667        rwlock_t *glock = &kiblnd_data.kib_global_lock;
3668        struct kib_dev *dev;
3669        wait_queue_t wait;
3670        unsigned long flags;
3671        int rc;
3672
3673        LASSERT(*kiblnd_tunables.kib_dev_failover);
3674
3675        cfs_block_allsigs();
3676
3677        init_waitqueue_entry(&wait, current);
3678        write_lock_irqsave(glock, flags);
3679
3680        while (!kiblnd_data.kib_shutdown) {
3681                int do_failover = 0;
3682                int long_sleep;
3683
3684                list_for_each_entry(dev, &kiblnd_data.kib_failed_devs,
3685                                    ibd_fail_list) {
3686                        if (time_before(cfs_time_current(),
3687                                        dev->ibd_next_failover))
3688                                continue;
3689                        do_failover = 1;
3690                        break;
3691                }
3692
3693                if (do_failover) {
3694                        list_del_init(&dev->ibd_fail_list);
3695                        dev->ibd_failover = 1;
3696                        write_unlock_irqrestore(glock, flags);
3697
3698                        rc = kiblnd_dev_failover(dev);
3699
3700                        write_lock_irqsave(glock, flags);
3701
3702                        LASSERT(dev->ibd_failover);
3703                        dev->ibd_failover = 0;
3704                        if (rc >= 0) { /* Device is OK or failover succeed */
3705                                dev->ibd_next_failover = cfs_time_shift(3);
3706                                continue;
3707                        }
3708
3709                        /* failed to failover, retry later */
3710                        dev->ibd_next_failover =
3711                                cfs_time_shift(min(dev->ibd_failed_failover, 10));
3712                        if (kiblnd_dev_can_failover(dev)) {
3713                                list_add_tail(&dev->ibd_fail_list,
3714                                              &kiblnd_data.kib_failed_devs);
3715                        }
3716
3717                        continue;
3718                }
3719
3720                /* long sleep if no more pending failover */
3721                long_sleep = list_empty(&kiblnd_data.kib_failed_devs);
3722
3723                set_current_state(TASK_INTERRUPTIBLE);
3724                add_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3725                write_unlock_irqrestore(glock, flags);
3726
3727                rc = schedule_timeout(long_sleep ? cfs_time_seconds(10) :
3728                                                   cfs_time_seconds(1));
3729                remove_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3730                write_lock_irqsave(glock, flags);
3731
3732                if (!long_sleep || rc)
3733                        continue;
3734
3735                /*
3736                 * have a long sleep, routine check all active devices,
3737                 * we need checking like this because if there is not active
3738                 * connection on the dev and no SEND from local, we may listen
3739                 * on wrong HCA for ever while there is a bonding failover
3740                 */
3741                list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3742                        if (kiblnd_dev_can_failover(dev)) {
3743                                list_add_tail(&dev->ibd_fail_list,
3744                                              &kiblnd_data.kib_failed_devs);
3745                        }
3746                }
3747        }
3748
3749        write_unlock_irqrestore(glock, flags);
3750
3751        kiblnd_thread_fini();
3752        return 0;
3753}
3754