linux/drivers/staging/lustre/lnet/klnds/o2iblnd/o2iblnd_cb.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2012, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 *
  32 * lnet/klnds/o2iblnd/o2iblnd_cb.c
  33 *
  34 * Author: Eric Barton <eric@bartonsoftware.com>
  35 */
  36
  37#include "o2iblnd.h"
  38
  39#define MAX_CONN_RACES_BEFORE_ABORT 20
  40
  41static void kiblnd_peer_alive(struct kib_peer *peer);
  42static void kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error);
  43static void kiblnd_init_tx_msg(struct lnet_ni *ni, struct kib_tx *tx,
  44                               int type, int body_nob);
  45static int kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
  46                            int resid, struct kib_rdma_desc *dstrd,
  47                            __u64 dstcookie);
  48static void kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn);
  49static void kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn);
  50static void kiblnd_unmap_tx(struct lnet_ni *ni, struct kib_tx *tx);
  51static void kiblnd_check_sends_locked(struct kib_conn *conn);
  52
  53static void
  54kiblnd_tx_done(struct lnet_ni *ni, struct kib_tx *tx)
  55{
  56        struct lnet_msg *lntmsg[2];
  57        struct kib_net *net = ni->ni_data;
  58        int rc;
  59        int i;
  60
  61        LASSERT(net);
  62        LASSERT(!in_interrupt());
  63        LASSERT(!tx->tx_queued);               /* mustn't be queued for sending */
  64        LASSERT(!tx->tx_sending);         /* mustn't be awaiting sent callback */
  65        LASSERT(!tx->tx_waiting);             /* mustn't be awaiting peer response */
  66        LASSERT(tx->tx_pool);
  67
  68        kiblnd_unmap_tx(ni, tx);
  69
  70        /* tx may have up to 2 lnet msgs to finalise */
  71        lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
  72        lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
  73        rc = tx->tx_status;
  74
  75        if (tx->tx_conn) {
  76                LASSERT(ni == tx->tx_conn->ibc_peer->ibp_ni);
  77
  78                kiblnd_conn_decref(tx->tx_conn);
  79                tx->tx_conn = NULL;
  80        }
  81
  82        tx->tx_nwrq = 0;
  83        tx->tx_status = 0;
  84
  85        kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
  86
  87        /* delay finalize until my descs have been freed */
  88        for (i = 0; i < 2; i++) {
  89                if (!lntmsg[i])
  90                        continue;
  91
  92                lnet_finalize(ni, lntmsg[i], rc);
  93        }
  94}
  95
  96void
  97kiblnd_txlist_done(struct lnet_ni *ni, struct list_head *txlist, int status)
  98{
  99        struct kib_tx *tx;
 100
 101        while (!list_empty(txlist)) {
 102                tx = list_entry(txlist->next, struct kib_tx, tx_list);
 103
 104                list_del(&tx->tx_list);
 105                /* complete now */
 106                tx->tx_waiting = 0;
 107                tx->tx_status = status;
 108                kiblnd_tx_done(ni, tx);
 109        }
 110}
 111
 112static struct kib_tx *
 113kiblnd_get_idle_tx(struct lnet_ni *ni, lnet_nid_t target)
 114{
 115        struct kib_net *net = (struct kib_net *)ni->ni_data;
 116        struct list_head *node;
 117        struct kib_tx *tx;
 118        struct kib_tx_poolset *tps;
 119
 120        tps = net->ibn_tx_ps[lnet_cpt_of_nid(target)];
 121        node = kiblnd_pool_alloc_node(&tps->tps_poolset);
 122        if (!node)
 123                return NULL;
 124        tx = list_entry(node, struct kib_tx, tx_list);
 125
 126        LASSERT(!tx->tx_nwrq);
 127        LASSERT(!tx->tx_queued);
 128        LASSERT(!tx->tx_sending);
 129        LASSERT(!tx->tx_waiting);
 130        LASSERT(!tx->tx_status);
 131        LASSERT(!tx->tx_conn);
 132        LASSERT(!tx->tx_lntmsg[0]);
 133        LASSERT(!tx->tx_lntmsg[1]);
 134        LASSERT(!tx->tx_nfrags);
 135
 136        return tx;
 137}
 138
 139static void
 140kiblnd_drop_rx(struct kib_rx *rx)
 141{
 142        struct kib_conn *conn = rx->rx_conn;
 143        struct kib_sched_info *sched = conn->ibc_sched;
 144        unsigned long flags;
 145
 146        spin_lock_irqsave(&sched->ibs_lock, flags);
 147        LASSERT(conn->ibc_nrx > 0);
 148        conn->ibc_nrx--;
 149        spin_unlock_irqrestore(&sched->ibs_lock, flags);
 150
 151        kiblnd_conn_decref(conn);
 152}
 153
 154int
 155kiblnd_post_rx(struct kib_rx *rx, int credit)
 156{
 157        struct kib_conn *conn = rx->rx_conn;
 158        struct kib_net *net = conn->ibc_peer->ibp_ni->ni_data;
 159        struct ib_recv_wr *bad_wrq = NULL;
 160        int rc;
 161
 162        LASSERT(net);
 163        LASSERT(!in_interrupt());
 164        LASSERT(credit == IBLND_POSTRX_NO_CREDIT ||
 165                credit == IBLND_POSTRX_PEER_CREDIT ||
 166                credit == IBLND_POSTRX_RSRVD_CREDIT);
 167
 168        rx->rx_sge.lkey   = conn->ibc_hdev->ibh_pd->local_dma_lkey;
 169        rx->rx_sge.addr   = rx->rx_msgaddr;
 170        rx->rx_sge.length = IBLND_MSG_SIZE;
 171
 172        rx->rx_wrq.next    = NULL;
 173        rx->rx_wrq.sg_list = &rx->rx_sge;
 174        rx->rx_wrq.num_sge = 1;
 175        rx->rx_wrq.wr_id   = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
 176
 177        LASSERT(conn->ibc_state >= IBLND_CONN_INIT);
 178        LASSERT(rx->rx_nob >= 0);             /* not posted */
 179
 180        if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
 181                kiblnd_drop_rx(rx);          /* No more posts for this rx */
 182                return 0;
 183        }
 184
 185        rx->rx_nob = -1;                        /* flag posted */
 186
 187        /* NB: need an extra reference after ib_post_recv because we don't
 188         * own this rx (and rx::rx_conn) anymore, LU-5678.
 189         */
 190        kiblnd_conn_addref(conn);
 191        rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
 192        if (unlikely(rc)) {
 193                CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
 194                       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
 195                rx->rx_nob = 0;
 196        }
 197
 198        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
 199                goto out;
 200
 201        if (unlikely(rc)) {
 202                kiblnd_close_conn(conn, rc);
 203                kiblnd_drop_rx(rx);          /* No more posts for this rx */
 204                goto out;
 205        }
 206
 207        if (credit == IBLND_POSTRX_NO_CREDIT)
 208                goto out;
 209
 210        spin_lock(&conn->ibc_lock);
 211        if (credit == IBLND_POSTRX_PEER_CREDIT)
 212                conn->ibc_outstanding_credits++;
 213        else
 214                conn->ibc_reserved_credits++;
 215        kiblnd_check_sends_locked(conn);
 216        spin_unlock(&conn->ibc_lock);
 217
 218out:
 219        kiblnd_conn_decref(conn);
 220        return rc;
 221}
 222
 223static struct kib_tx *
 224kiblnd_find_waiting_tx_locked(struct kib_conn *conn, int txtype, __u64 cookie)
 225{
 226        struct list_head *tmp;
 227
 228        list_for_each(tmp, &conn->ibc_active_txs) {
 229                struct kib_tx *tx = list_entry(tmp, struct kib_tx, tx_list);
 230
 231                LASSERT(!tx->tx_queued);
 232                LASSERT(tx->tx_sending || tx->tx_waiting);
 233
 234                if (tx->tx_cookie != cookie)
 235                        continue;
 236
 237                if (tx->tx_waiting &&
 238                    tx->tx_msg->ibm_type == txtype)
 239                        return tx;
 240
 241                CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
 242                      tx->tx_waiting ? "" : "NOT ",
 243                      tx->tx_msg->ibm_type, txtype);
 244        }
 245        return NULL;
 246}
 247
 248static void
 249kiblnd_handle_completion(struct kib_conn *conn, int txtype, int status, __u64 cookie)
 250{
 251        struct kib_tx *tx;
 252        struct lnet_ni *ni = conn->ibc_peer->ibp_ni;
 253        int idle;
 254
 255        spin_lock(&conn->ibc_lock);
 256
 257        tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
 258        if (!tx) {
 259                spin_unlock(&conn->ibc_lock);
 260
 261                CWARN("Unmatched completion type %x cookie %#llx from %s\n",
 262                      txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 263                kiblnd_close_conn(conn, -EPROTO);
 264                return;
 265        }
 266
 267        if (!tx->tx_status) {          /* success so far */
 268                if (status < 0) /* failed? */
 269                        tx->tx_status = status;
 270                else if (txtype == IBLND_MSG_GET_REQ)
 271                        lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
 272        }
 273
 274        tx->tx_waiting = 0;
 275
 276        idle = !tx->tx_queued && !tx->tx_sending;
 277        if (idle)
 278                list_del(&tx->tx_list);
 279
 280        spin_unlock(&conn->ibc_lock);
 281
 282        if (idle)
 283                kiblnd_tx_done(ni, tx);
 284}
 285
 286static void
 287kiblnd_send_completion(struct kib_conn *conn, int type, int status, __u64 cookie)
 288{
 289        struct lnet_ni *ni = conn->ibc_peer->ibp_ni;
 290        struct kib_tx *tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
 291
 292        if (!tx) {
 293                CERROR("Can't get tx for completion %x for %s\n",
 294                       type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 295                return;
 296        }
 297
 298        tx->tx_msg->ibm_u.completion.ibcm_status = status;
 299        tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
 300        kiblnd_init_tx_msg(ni, tx, type, sizeof(struct kib_completion_msg));
 301
 302        kiblnd_queue_tx(tx, conn);
 303}
 304
 305static void
 306kiblnd_handle_rx(struct kib_rx *rx)
 307{
 308        struct kib_msg *msg = rx->rx_msg;
 309        struct kib_conn *conn = rx->rx_conn;
 310        struct lnet_ni *ni = conn->ibc_peer->ibp_ni;
 311        int credits = msg->ibm_credits;
 312        struct kib_tx *tx;
 313        int rc = 0;
 314        int rc2;
 315        int post_credit;
 316
 317        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
 318
 319        CDEBUG(D_NET, "Received %x[%d] from %s\n",
 320               msg->ibm_type, credits,
 321               libcfs_nid2str(conn->ibc_peer->ibp_nid));
 322
 323        if (credits) {
 324                /* Have I received credits that will let me send? */
 325                spin_lock(&conn->ibc_lock);
 326
 327                if (conn->ibc_credits + credits >
 328                    conn->ibc_queue_depth) {
 329                        rc2 = conn->ibc_credits;
 330                        spin_unlock(&conn->ibc_lock);
 331
 332                        CERROR("Bad credits from %s: %d + %d > %d\n",
 333                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
 334                               rc2, credits, conn->ibc_queue_depth);
 335
 336                        kiblnd_close_conn(conn, -EPROTO);
 337                        kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
 338                        return;
 339                }
 340
 341                conn->ibc_credits += credits;
 342
 343                /* This ensures the credit taken by NOOP can be returned */
 344                if (msg->ibm_type == IBLND_MSG_NOOP &&
 345                    !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
 346                        conn->ibc_outstanding_credits++;
 347
 348                kiblnd_check_sends_locked(conn);
 349                spin_unlock(&conn->ibc_lock);
 350        }
 351
 352        switch (msg->ibm_type) {
 353        default:
 354                CERROR("Bad IBLND message type %x from %s\n",
 355                       msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 356                post_credit = IBLND_POSTRX_NO_CREDIT;
 357                rc = -EPROTO;
 358                break;
 359
 360        case IBLND_MSG_NOOP:
 361                if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
 362                        post_credit = IBLND_POSTRX_NO_CREDIT;
 363                        break;
 364                }
 365
 366                if (credits) /* credit already posted */
 367                        post_credit = IBLND_POSTRX_NO_CREDIT;
 368                else          /* a keepalive NOOP */
 369                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 370                break;
 371
 372        case IBLND_MSG_IMMEDIATE:
 373                post_credit = IBLND_POSTRX_DONT_POST;
 374                rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
 375                                msg->ibm_srcnid, rx, 0);
 376                if (rc < 0)                  /* repost on error */
 377                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 378                break;
 379
 380        case IBLND_MSG_PUT_REQ:
 381                post_credit = IBLND_POSTRX_DONT_POST;
 382                rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
 383                                msg->ibm_srcnid, rx, 1);
 384                if (rc < 0)                  /* repost on error */
 385                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 386                break;
 387
 388        case IBLND_MSG_PUT_NAK:
 389                CWARN("PUT_NACK from %s\n",
 390                      libcfs_nid2str(conn->ibc_peer->ibp_nid));
 391                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 392                kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
 393                                         msg->ibm_u.completion.ibcm_status,
 394                                         msg->ibm_u.completion.ibcm_cookie);
 395                break;
 396
 397        case IBLND_MSG_PUT_ACK:
 398                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 399
 400                spin_lock(&conn->ibc_lock);
 401                tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
 402                                                   msg->ibm_u.putack.ibpam_src_cookie);
 403                if (tx)
 404                        list_del(&tx->tx_list);
 405                spin_unlock(&conn->ibc_lock);
 406
 407                if (!tx) {
 408                        CERROR("Unmatched PUT_ACK from %s\n",
 409                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
 410                        rc = -EPROTO;
 411                        break;
 412                }
 413
 414                LASSERT(tx->tx_waiting);
 415                /*
 416                 * CAVEAT EMPTOR: I could be racing with tx_complete, but...
 417                 * (a) I can overwrite tx_msg since my peer has received it!
 418                 * (b) tx_waiting set tells tx_complete() it's not done.
 419                 */
 420                tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
 421
 422                rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
 423                                       kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
 424                                       &msg->ibm_u.putack.ibpam_rd,
 425                                       msg->ibm_u.putack.ibpam_dst_cookie);
 426                if (rc2 < 0)
 427                        CERROR("Can't setup rdma for PUT to %s: %d\n",
 428                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
 429
 430                spin_lock(&conn->ibc_lock);
 431                tx->tx_waiting = 0;     /* clear waiting and queue atomically */
 432                kiblnd_queue_tx_locked(tx, conn);
 433                spin_unlock(&conn->ibc_lock);
 434                break;
 435
 436        case IBLND_MSG_PUT_DONE:
 437                post_credit = IBLND_POSTRX_PEER_CREDIT;
 438                kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
 439                                         msg->ibm_u.completion.ibcm_status,
 440                                         msg->ibm_u.completion.ibcm_cookie);
 441                break;
 442
 443        case IBLND_MSG_GET_REQ:
 444                post_credit = IBLND_POSTRX_DONT_POST;
 445                rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
 446                                msg->ibm_srcnid, rx, 1);
 447                if (rc < 0)                  /* repost on error */
 448                        post_credit = IBLND_POSTRX_PEER_CREDIT;
 449                break;
 450
 451        case IBLND_MSG_GET_DONE:
 452                post_credit = IBLND_POSTRX_RSRVD_CREDIT;
 453                kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
 454                                         msg->ibm_u.completion.ibcm_status,
 455                                         msg->ibm_u.completion.ibcm_cookie);
 456                break;
 457        }
 458
 459        if (rc < 0)                          /* protocol error */
 460                kiblnd_close_conn(conn, rc);
 461
 462        if (post_credit != IBLND_POSTRX_DONT_POST)
 463                kiblnd_post_rx(rx, post_credit);
 464}
 465
 466static void
 467kiblnd_rx_complete(struct kib_rx *rx, int status, int nob)
 468{
 469        struct kib_msg *msg = rx->rx_msg;
 470        struct kib_conn *conn = rx->rx_conn;
 471        struct lnet_ni *ni = conn->ibc_peer->ibp_ni;
 472        struct kib_net *net = ni->ni_data;
 473        int rc;
 474        int err = -EIO;
 475
 476        LASSERT(net);
 477        LASSERT(rx->rx_nob < 0);               /* was posted */
 478        rx->rx_nob = 0;                  /* isn't now */
 479
 480        if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
 481                goto ignore;
 482
 483        if (status != IB_WC_SUCCESS) {
 484                CNETERR("Rx from %s failed: %d\n",
 485                        libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
 486                goto failed;
 487        }
 488
 489        LASSERT(nob >= 0);
 490        rx->rx_nob = nob;
 491
 492        rc = kiblnd_unpack_msg(msg, rx->rx_nob);
 493        if (rc) {
 494                CERROR("Error %d unpacking rx from %s\n",
 495                       rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 496                goto failed;
 497        }
 498
 499        if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
 500            msg->ibm_dstnid != ni->ni_nid ||
 501            msg->ibm_srcstamp != conn->ibc_incarnation ||
 502            msg->ibm_dststamp != net->ibn_incarnation) {
 503                CERROR("Stale rx from %s\n",
 504                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
 505                err = -ESTALE;
 506                goto failed;
 507        }
 508
 509        /* set time last known alive */
 510        kiblnd_peer_alive(conn->ibc_peer);
 511
 512        /* racing with connection establishment/teardown! */
 513
 514        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 515                rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
 516                unsigned long flags;
 517
 518                write_lock_irqsave(g_lock, flags);
 519                /* must check holding global lock to eliminate race */
 520                if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 521                        list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
 522                        write_unlock_irqrestore(g_lock, flags);
 523                        return;
 524                }
 525                write_unlock_irqrestore(g_lock, flags);
 526        }
 527        kiblnd_handle_rx(rx);
 528        return;
 529
 530 failed:
 531        CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
 532        kiblnd_close_conn(conn, err);
 533 ignore:
 534        kiblnd_drop_rx(rx);                  /* Don't re-post rx. */
 535}
 536
 537static struct page *
 538kiblnd_kvaddr_to_page(unsigned long vaddr)
 539{
 540        struct page *page;
 541
 542        if (is_vmalloc_addr((void *)vaddr)) {
 543                page = vmalloc_to_page((void *)vaddr);
 544                LASSERT(page);
 545                return page;
 546        }
 547#ifdef CONFIG_HIGHMEM
 548        if (vaddr >= PKMAP_BASE &&
 549            vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
 550                /* No highmem pages only used for bulk (kiov) I/O */
 551                CERROR("find page for address in highmem\n");
 552                LBUG();
 553        }
 554#endif
 555        page = virt_to_page(vaddr);
 556        LASSERT(page);
 557        return page;
 558}
 559
 560static int
 561kiblnd_fmr_map_tx(struct kib_net *net, struct kib_tx *tx, struct kib_rdma_desc *rd, __u32 nob)
 562{
 563        struct kib_hca_dev *hdev;
 564        struct kib_fmr_poolset *fps;
 565        int cpt;
 566        int rc;
 567
 568        LASSERT(tx->tx_pool);
 569        LASSERT(tx->tx_pool->tpo_pool.po_owner);
 570
 571        hdev = tx->tx_pool->tpo_hdev;
 572        cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
 573
 574        fps = net->ibn_fmr_ps[cpt];
 575        rc = kiblnd_fmr_pool_map(fps, tx, rd, nob, 0, &tx->fmr);
 576        if (rc) {
 577                CERROR("Can't map %u bytes: %d\n", nob, rc);
 578                return rc;
 579        }
 580
 581        /*
 582         * If rd is not tx_rd, it's going to get sent to a peer, who will need
 583         * the rkey
 584         */
 585        rd->rd_key = tx->fmr.fmr_key;
 586        rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
 587        rd->rd_frags[0].rf_nob = nob;
 588        rd->rd_nfrags = 1;
 589
 590        return 0;
 591}
 592
 593static void kiblnd_unmap_tx(struct lnet_ni *ni, struct kib_tx *tx)
 594{
 595        struct kib_net *net = ni->ni_data;
 596
 597        LASSERT(net);
 598
 599        if (net->ibn_fmr_ps)
 600                kiblnd_fmr_pool_unmap(&tx->fmr, tx->tx_status);
 601
 602        if (tx->tx_nfrags) {
 603                kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
 604                                    tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
 605                tx->tx_nfrags = 0;
 606        }
 607}
 608
 609static int kiblnd_map_tx(struct lnet_ni *ni, struct kib_tx *tx,
 610                         struct kib_rdma_desc *rd, int nfrags)
 611{
 612        struct kib_net *net = ni->ni_data;
 613        struct kib_hca_dev *hdev = net->ibn_dev->ibd_hdev;
 614        __u32 nob;
 615        int i;
 616
 617        /*
 618         * If rd is not tx_rd, it's going to get sent to a peer and I'm the
 619         * RDMA sink
 620         */
 621        tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
 622        tx->tx_nfrags = nfrags;
 623
 624        rd->rd_nfrags = kiblnd_dma_map_sg(hdev->ibh_ibdev, tx->tx_frags,
 625                                          tx->tx_nfrags, tx->tx_dmadir);
 626
 627        for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
 628                rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
 629                        hdev->ibh_ibdev, &tx->tx_frags[i]);
 630                rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
 631                        hdev->ibh_ibdev, &tx->tx_frags[i]);
 632                nob += rd->rd_frags[i].rf_nob;
 633        }
 634
 635        if (net->ibn_fmr_ps)
 636                return kiblnd_fmr_map_tx(net, tx, rd, nob);
 637
 638        return -EINVAL;
 639}
 640
 641static int
 642kiblnd_setup_rd_iov(struct lnet_ni *ni, struct kib_tx *tx,
 643                    struct kib_rdma_desc *rd, unsigned int niov,
 644                    const struct kvec *iov, int offset, int nob)
 645{
 646        struct kib_net *net = ni->ni_data;
 647        struct page *page;
 648        struct scatterlist *sg;
 649        unsigned long vaddr;
 650        int fragnob;
 651        int page_offset;
 652
 653        LASSERT(nob > 0);
 654        LASSERT(niov > 0);
 655        LASSERT(net);
 656
 657        while (offset >= iov->iov_len) {
 658                offset -= iov->iov_len;
 659                niov--;
 660                iov++;
 661                LASSERT(niov > 0);
 662        }
 663
 664        sg = tx->tx_frags;
 665        do {
 666                LASSERT(niov > 0);
 667
 668                vaddr = ((unsigned long)iov->iov_base) + offset;
 669                page_offset = vaddr & (PAGE_SIZE - 1);
 670                page = kiblnd_kvaddr_to_page(vaddr);
 671                if (!page) {
 672                        CERROR("Can't find page\n");
 673                        return -EFAULT;
 674                }
 675
 676                fragnob = min((int)(iov->iov_len - offset), nob);
 677                fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
 678
 679                sg_set_page(sg, page, fragnob, page_offset);
 680                sg = sg_next(sg);
 681                if (!sg) {
 682                        CERROR("lacking enough sg entries to map tx\n");
 683                        return -EFAULT;
 684                }
 685
 686                if (offset + fragnob < iov->iov_len) {
 687                        offset += fragnob;
 688                } else {
 689                        offset = 0;
 690                        iov++;
 691                        niov--;
 692                }
 693                nob -= fragnob;
 694        } while (nob > 0);
 695
 696        return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
 697}
 698
 699static int
 700kiblnd_setup_rd_kiov(struct lnet_ni *ni, struct kib_tx *tx,
 701                     struct kib_rdma_desc *rd, int nkiov,
 702                     const struct bio_vec *kiov, int offset, int nob)
 703{
 704        struct kib_net *net = ni->ni_data;
 705        struct scatterlist *sg;
 706        int fragnob;
 707
 708        CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
 709
 710        LASSERT(nob > 0);
 711        LASSERT(nkiov > 0);
 712        LASSERT(net);
 713
 714        while (offset >= kiov->bv_len) {
 715                offset -= kiov->bv_len;
 716                nkiov--;
 717                kiov++;
 718                LASSERT(nkiov > 0);
 719        }
 720
 721        sg = tx->tx_frags;
 722        do {
 723                LASSERT(nkiov > 0);
 724
 725                fragnob = min((int)(kiov->bv_len - offset), nob);
 726
 727                sg_set_page(sg, kiov->bv_page, fragnob,
 728                            kiov->bv_offset + offset);
 729                sg = sg_next(sg);
 730                if (!sg) {
 731                        CERROR("lacking enough sg entries to map tx\n");
 732                        return -EFAULT;
 733                }
 734
 735                offset = 0;
 736                kiov++;
 737                nkiov--;
 738                nob -= fragnob;
 739        } while (nob > 0);
 740
 741        return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
 742}
 743
 744static int
 745kiblnd_post_tx_locked(struct kib_conn *conn, struct kib_tx *tx, int credit)
 746        __must_hold(&conn->ibc_lock)
 747{
 748        struct kib_msg *msg = tx->tx_msg;
 749        struct kib_peer *peer = conn->ibc_peer;
 750        struct lnet_ni *ni = peer->ibp_ni;
 751        int ver = conn->ibc_version;
 752        int rc;
 753        int done;
 754
 755        LASSERT(tx->tx_queued);
 756        /* We rely on this for QP sizing */
 757        LASSERT(tx->tx_nwrq > 0);
 758
 759        LASSERT(!credit || credit == 1);
 760        LASSERT(conn->ibc_outstanding_credits >= 0);
 761        LASSERT(conn->ibc_outstanding_credits <= conn->ibc_queue_depth);
 762        LASSERT(conn->ibc_credits >= 0);
 763        LASSERT(conn->ibc_credits <= conn->ibc_queue_depth);
 764
 765        if (conn->ibc_nsends_posted == kiblnd_concurrent_sends(ver, ni)) {
 766                /* tx completions outstanding... */
 767                CDEBUG(D_NET, "%s: posted enough\n",
 768                       libcfs_nid2str(peer->ibp_nid));
 769                return -EAGAIN;
 770        }
 771
 772        if (credit && !conn->ibc_credits) {   /* no credits */
 773                CDEBUG(D_NET, "%s: no credits\n",
 774                       libcfs_nid2str(peer->ibp_nid));
 775                return -EAGAIN;
 776        }
 777
 778        if (credit && !IBLND_OOB_CAPABLE(ver) &&
 779            conn->ibc_credits == 1 &&   /* last credit reserved */
 780            msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
 781                CDEBUG(D_NET, "%s: not using last credit\n",
 782                       libcfs_nid2str(peer->ibp_nid));
 783                return -EAGAIN;
 784        }
 785
 786        /* NB don't drop ibc_lock before bumping tx_sending */
 787        list_del(&tx->tx_list);
 788        tx->tx_queued = 0;
 789
 790        if (msg->ibm_type == IBLND_MSG_NOOP &&
 791            (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
 792             (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
 793              conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
 794                /*
 795                 * OK to drop when posted enough NOOPs, since
 796                 * kiblnd_check_sends_locked will queue NOOP again when
 797                 * posted NOOPs complete
 798                 */
 799                spin_unlock(&conn->ibc_lock);
 800                kiblnd_tx_done(peer->ibp_ni, tx);
 801                spin_lock(&conn->ibc_lock);
 802                CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
 803                       libcfs_nid2str(peer->ibp_nid),
 804                       conn->ibc_noops_posted);
 805                return 0;
 806        }
 807
 808        kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
 809                        peer->ibp_nid, conn->ibc_incarnation);
 810
 811        conn->ibc_credits -= credit;
 812        conn->ibc_outstanding_credits = 0;
 813        conn->ibc_nsends_posted++;
 814        if (msg->ibm_type == IBLND_MSG_NOOP)
 815                conn->ibc_noops_posted++;
 816
 817        /*
 818         * CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
 819         * PUT.  If so, it was first queued here as a PUT_REQ, sent and
 820         * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
 821         * and then re-queued here.  It's (just) possible that
 822         * tx_sending is non-zero if we've not done the tx_complete()
 823         * from the first send; hence the ++ rather than = below.
 824         */
 825        tx->tx_sending++;
 826        list_add(&tx->tx_list, &conn->ibc_active_txs);
 827
 828        /* I'm still holding ibc_lock! */
 829        if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
 830                rc = -ECONNABORTED;
 831        } else if (tx->tx_pool->tpo_pool.po_failed ||
 832                 conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
 833                /* close_conn will launch failover */
 834                rc = -ENETDOWN;
 835        } else {
 836                struct kib_fast_reg_descriptor *frd = tx->fmr.fmr_frd;
 837                struct ib_send_wr *bad = &tx->tx_wrq[tx->tx_nwrq - 1].wr;
 838                struct ib_send_wr *wrq = &tx->tx_wrq[0].wr;
 839
 840                if (frd) {
 841                        if (!frd->frd_valid) {
 842                                wrq = &frd->frd_inv_wr;
 843                                wrq->next = &frd->frd_fastreg_wr.wr;
 844                        } else {
 845                                wrq = &frd->frd_fastreg_wr.wr;
 846                        }
 847                        frd->frd_fastreg_wr.wr.next = &tx->tx_wrq[0].wr;
 848                }
 849
 850                LASSERTF(bad->wr_id == kiblnd_ptr2wreqid(tx, IBLND_WID_TX),
 851                         "bad wr_id %llx, opc %d, flags %d, peer: %s\n",
 852                         bad->wr_id, bad->opcode, bad->send_flags,
 853                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
 854                bad = NULL;
 855                rc = ib_post_send(conn->ibc_cmid->qp, wrq, &bad);
 856        }
 857
 858        conn->ibc_last_send = jiffies;
 859
 860        if (!rc)
 861                return 0;
 862
 863        /*
 864         * NB credits are transferred in the actual
 865         * message, which can only be the last work item
 866         */
 867        conn->ibc_credits += credit;
 868        conn->ibc_outstanding_credits += msg->ibm_credits;
 869        conn->ibc_nsends_posted--;
 870        if (msg->ibm_type == IBLND_MSG_NOOP)
 871                conn->ibc_noops_posted--;
 872
 873        tx->tx_status = rc;
 874        tx->tx_waiting = 0;
 875        tx->tx_sending--;
 876
 877        done = !tx->tx_sending;
 878        if (done)
 879                list_del(&tx->tx_list);
 880
 881        spin_unlock(&conn->ibc_lock);
 882
 883        if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
 884                CERROR("Error %d posting transmit to %s\n",
 885                       rc, libcfs_nid2str(peer->ibp_nid));
 886        else
 887                CDEBUG(D_NET, "Error %d posting transmit to %s\n",
 888                       rc, libcfs_nid2str(peer->ibp_nid));
 889
 890        kiblnd_close_conn(conn, rc);
 891
 892        if (done)
 893                kiblnd_tx_done(peer->ibp_ni, tx);
 894
 895        spin_lock(&conn->ibc_lock);
 896
 897        return -EIO;
 898}
 899
 900static void
 901kiblnd_check_sends_locked(struct kib_conn *conn)
 902{
 903        int ver = conn->ibc_version;
 904        struct lnet_ni *ni = conn->ibc_peer->ibp_ni;
 905        struct kib_tx *tx;
 906
 907        /* Don't send anything until after the connection is established */
 908        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
 909                CDEBUG(D_NET, "%s too soon\n",
 910                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
 911                return;
 912        }
 913
 914        LASSERT(conn->ibc_nsends_posted <= kiblnd_concurrent_sends(ver, ni));
 915        LASSERT(!IBLND_OOB_CAPABLE(ver) ||
 916                conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
 917        LASSERT(conn->ibc_reserved_credits >= 0);
 918
 919        while (conn->ibc_reserved_credits > 0 &&
 920               !list_empty(&conn->ibc_tx_queue_rsrvd)) {
 921                tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
 922                                struct kib_tx, tx_list);
 923                list_del(&tx->tx_list);
 924                list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
 925                conn->ibc_reserved_credits--;
 926        }
 927
 928        if (kiblnd_need_noop(conn)) {
 929                spin_unlock(&conn->ibc_lock);
 930
 931                tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
 932                if (tx)
 933                        kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
 934
 935                spin_lock(&conn->ibc_lock);
 936                if (tx)
 937                        kiblnd_queue_tx_locked(tx, conn);
 938        }
 939
 940        for (;;) {
 941                int credit;
 942
 943                if (!list_empty(&conn->ibc_tx_queue_nocred)) {
 944                        credit = 0;
 945                        tx = list_entry(conn->ibc_tx_queue_nocred.next,
 946                                        struct kib_tx, tx_list);
 947                } else if (!list_empty(&conn->ibc_tx_noops)) {
 948                        LASSERT(!IBLND_OOB_CAPABLE(ver));
 949                        credit = 1;
 950                        tx = list_entry(conn->ibc_tx_noops.next,
 951                                        struct kib_tx, tx_list);
 952                } else if (!list_empty(&conn->ibc_tx_queue)) {
 953                        credit = 1;
 954                        tx = list_entry(conn->ibc_tx_queue.next,
 955                                        struct kib_tx, tx_list);
 956                } else {
 957                        break;
 958                }
 959
 960                if (kiblnd_post_tx_locked(conn, tx, credit))
 961                        break;
 962        }
 963}
 964
 965static void
 966kiblnd_tx_complete(struct kib_tx *tx, int status)
 967{
 968        int failed = (status != IB_WC_SUCCESS);
 969        struct kib_conn *conn = tx->tx_conn;
 970        int idle;
 971
 972        LASSERT(tx->tx_sending > 0);
 973
 974        if (failed) {
 975                if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
 976                        CNETERR("Tx -> %s cookie %#llx sending %d waiting %d: failed %d\n",
 977                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
 978                                tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
 979                                status);
 980
 981                kiblnd_close_conn(conn, -EIO);
 982        } else {
 983                kiblnd_peer_alive(conn->ibc_peer);
 984        }
 985
 986        spin_lock(&conn->ibc_lock);
 987
 988        /*
 989         * I could be racing with rdma completion.  Whoever makes 'tx' idle
 990         * gets to free it, which also drops its ref on 'conn'.
 991         */
 992        tx->tx_sending--;
 993        conn->ibc_nsends_posted--;
 994        if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
 995                conn->ibc_noops_posted--;
 996
 997        if (failed) {
 998                tx->tx_waiting = 0;          /* don't wait for peer */
 999                tx->tx_status = -EIO;
1000        }
1001
1002        idle = !tx->tx_sending &&        /* This is the final callback */
1003               !tx->tx_waiting &&              /* Not waiting for peer */
1004               !tx->tx_queued;            /* Not re-queued (PUT_DONE) */
1005        if (idle)
1006                list_del(&tx->tx_list);
1007
1008        kiblnd_check_sends_locked(conn);
1009        spin_unlock(&conn->ibc_lock);
1010
1011        if (idle)
1012                kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1013}
1014
1015static void
1016kiblnd_init_tx_msg(struct lnet_ni *ni, struct kib_tx *tx, int type,
1017                   int body_nob)
1018{
1019        struct kib_hca_dev *hdev = tx->tx_pool->tpo_hdev;
1020        struct ib_sge *sge = &tx->tx_sge[tx->tx_nwrq];
1021        struct ib_rdma_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1022        int nob = offsetof(struct kib_msg, ibm_u) + body_nob;
1023
1024        LASSERT(tx->tx_nwrq >= 0);
1025        LASSERT(tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1026        LASSERT(nob <= IBLND_MSG_SIZE);
1027
1028        kiblnd_init_msg(tx->tx_msg, type, body_nob);
1029
1030        sge->lkey   = hdev->ibh_pd->local_dma_lkey;
1031        sge->addr   = tx->tx_msgaddr;
1032        sge->length = nob;
1033
1034        memset(wrq, 0, sizeof(*wrq));
1035
1036        wrq->wr.next       = NULL;
1037        wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1038        wrq->wr.sg_list    = sge;
1039        wrq->wr.num_sge    = 1;
1040        wrq->wr.opcode     = IB_WR_SEND;
1041        wrq->wr.send_flags = IB_SEND_SIGNALED;
1042
1043        tx->tx_nwrq++;
1044}
1045
1046static int
1047kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
1048                 int resid, struct kib_rdma_desc *dstrd, __u64 dstcookie)
1049{
1050        struct kib_msg *ibmsg = tx->tx_msg;
1051        struct kib_rdma_desc *srcrd = tx->tx_rd;
1052        struct ib_sge *sge = &tx->tx_sge[0];
1053        struct ib_rdma_wr *wrq, *next;
1054        int rc  = resid;
1055        int srcidx = 0;
1056        int dstidx = 0;
1057        int wrknob;
1058
1059        LASSERT(!in_interrupt());
1060        LASSERT(!tx->tx_nwrq);
1061        LASSERT(type == IBLND_MSG_GET_DONE ||
1062                type == IBLND_MSG_PUT_DONE);
1063
1064        if (kiblnd_rd_size(srcrd) > conn->ibc_max_frags << PAGE_SHIFT) {
1065                CERROR("RDMA is too large for peer %s (%d), src size: %d dst size: %d\n",
1066                       libcfs_nid2str(conn->ibc_peer->ibp_nid),
1067                       conn->ibc_max_frags << PAGE_SHIFT,
1068                       kiblnd_rd_size(srcrd), kiblnd_rd_size(dstrd));
1069                rc = -EMSGSIZE;
1070                goto too_big;
1071        }
1072
1073        while (resid > 0) {
1074                if (srcidx >= srcrd->rd_nfrags) {
1075                        CERROR("Src buffer exhausted: %d frags\n", srcidx);
1076                        rc = -EPROTO;
1077                        break;
1078                }
1079
1080                if (dstidx == dstrd->rd_nfrags) {
1081                        CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1082                        rc = -EPROTO;
1083                        break;
1084                }
1085
1086                if (tx->tx_nwrq >= IBLND_MAX_RDMA_FRAGS) {
1087                        CERROR("RDMA has too many fragments for peer %s (%d), src idx/frags: %d/%d dst idx/frags: %d/%d\n",
1088                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
1089                               IBLND_MAX_RDMA_FRAGS,
1090                               srcidx, srcrd->rd_nfrags,
1091                               dstidx, dstrd->rd_nfrags);
1092                        rc = -EMSGSIZE;
1093                        break;
1094                }
1095
1096                wrknob = min3(kiblnd_rd_frag_size(srcrd, srcidx),
1097                              kiblnd_rd_frag_size(dstrd, dstidx),
1098                              (__u32)resid);
1099
1100                sge = &tx->tx_sge[tx->tx_nwrq];
1101                sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1102                sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1103                sge->length = wrknob;
1104
1105                wrq = &tx->tx_wrq[tx->tx_nwrq];
1106                next = wrq + 1;
1107
1108                wrq->wr.next       = &next->wr;
1109                wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1110                wrq->wr.sg_list    = sge;
1111                wrq->wr.num_sge    = 1;
1112                wrq->wr.opcode     = IB_WR_RDMA_WRITE;
1113                wrq->wr.send_flags = 0;
1114
1115                wrq->remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1116                wrq->rkey        = kiblnd_rd_frag_key(dstrd, dstidx);
1117
1118                srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1119                dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1120
1121                resid -= wrknob;
1122
1123                tx->tx_nwrq++;
1124                wrq++;
1125                sge++;
1126        }
1127too_big:
1128        if (rc < 0)                          /* no RDMA if completing with failure */
1129                tx->tx_nwrq = 0;
1130
1131        ibmsg->ibm_u.completion.ibcm_status = rc;
1132        ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1133        kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1134                           type, sizeof(struct kib_completion_msg));
1135
1136        return rc;
1137}
1138
1139static void
1140kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn)
1141{
1142        struct list_head *q;
1143
1144        LASSERT(tx->tx_nwrq > 0);             /* work items set up */
1145        LASSERT(!tx->tx_queued);               /* not queued for sending already */
1146        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1147
1148        tx->tx_queued = 1;
1149        tx->tx_deadline = jiffies +
1150                          msecs_to_jiffies(*kiblnd_tunables.kib_timeout *
1151                                           MSEC_PER_SEC);
1152
1153        if (!tx->tx_conn) {
1154                kiblnd_conn_addref(conn);
1155                tx->tx_conn = conn;
1156                LASSERT(tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1157        } else {
1158                /* PUT_DONE first attached to conn as a PUT_REQ */
1159                LASSERT(tx->tx_conn == conn);
1160                LASSERT(tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1161        }
1162
1163        switch (tx->tx_msg->ibm_type) {
1164        default:
1165                LBUG();
1166
1167        case IBLND_MSG_PUT_REQ:
1168        case IBLND_MSG_GET_REQ:
1169                q = &conn->ibc_tx_queue_rsrvd;
1170                break;
1171
1172        case IBLND_MSG_PUT_NAK:
1173        case IBLND_MSG_PUT_ACK:
1174        case IBLND_MSG_PUT_DONE:
1175        case IBLND_MSG_GET_DONE:
1176                q = &conn->ibc_tx_queue_nocred;
1177                break;
1178
1179        case IBLND_MSG_NOOP:
1180                if (IBLND_OOB_CAPABLE(conn->ibc_version))
1181                        q = &conn->ibc_tx_queue_nocred;
1182                else
1183                        q = &conn->ibc_tx_noops;
1184                break;
1185
1186        case IBLND_MSG_IMMEDIATE:
1187                q = &conn->ibc_tx_queue;
1188                break;
1189        }
1190
1191        list_add_tail(&tx->tx_list, q);
1192}
1193
1194static void
1195kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn)
1196{
1197        spin_lock(&conn->ibc_lock);
1198        kiblnd_queue_tx_locked(tx, conn);
1199        kiblnd_check_sends_locked(conn);
1200        spin_unlock(&conn->ibc_lock);
1201}
1202
1203static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1204                               struct sockaddr_in *srcaddr,
1205                               struct sockaddr_in *dstaddr,
1206                               int timeout_ms)
1207{
1208        unsigned short port;
1209        int rc;
1210
1211        /* allow the port to be reused */
1212        rc = rdma_set_reuseaddr(cmid, 1);
1213        if (rc) {
1214                CERROR("Unable to set reuse on cmid: %d\n", rc);
1215                return rc;
1216        }
1217
1218        /* look for a free privileged port */
1219        for (port = PROT_SOCK - 1; port > 0; port--) {
1220                srcaddr->sin_port = htons(port);
1221                rc = rdma_resolve_addr(cmid,
1222                                       (struct sockaddr *)srcaddr,
1223                                       (struct sockaddr *)dstaddr,
1224                                       timeout_ms);
1225                if (!rc) {
1226                        CDEBUG(D_NET, "bound to port %hu\n", port);
1227                        return 0;
1228                } else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1229                        CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1230                               port, rc);
1231                } else {
1232                        return rc;
1233                }
1234        }
1235
1236        CERROR("Failed to bind to a free privileged port\n");
1237        return rc;
1238}
1239
1240static void
1241kiblnd_connect_peer(struct kib_peer *peer)
1242{
1243        struct rdma_cm_id *cmid;
1244        struct kib_dev *dev;
1245        struct kib_net *net = peer->ibp_ni->ni_data;
1246        struct sockaddr_in srcaddr;
1247        struct sockaddr_in dstaddr;
1248        int rc;
1249
1250        LASSERT(net);
1251        LASSERT(peer->ibp_connecting > 0);
1252        LASSERT(!peer->ibp_reconnecting);
1253
1254        cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1255                                     IB_QPT_RC);
1256
1257        if (IS_ERR(cmid)) {
1258                CERROR("Can't create CMID for %s: %ld\n",
1259                       libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1260                rc = PTR_ERR(cmid);
1261                goto failed;
1262        }
1263
1264        dev = net->ibn_dev;
1265        memset(&srcaddr, 0, sizeof(srcaddr));
1266        srcaddr.sin_family = AF_INET;
1267        srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1268
1269        memset(&dstaddr, 0, sizeof(dstaddr));
1270        dstaddr.sin_family = AF_INET;
1271        dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1272        dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1273
1274        kiblnd_peer_addref(peer);              /* cmid's ref */
1275
1276        if (*kiblnd_tunables.kib_use_priv_port) {
1277                rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1278                                         *kiblnd_tunables.kib_timeout * 1000);
1279        } else {
1280                rc = rdma_resolve_addr(cmid,
1281                                       (struct sockaddr *)&srcaddr,
1282                                       (struct sockaddr *)&dstaddr,
1283                                       *kiblnd_tunables.kib_timeout * 1000);
1284        }
1285        if (rc) {
1286                /* Can't initiate address resolution:  */
1287                CERROR("Can't resolve addr for %s: %d\n",
1288                       libcfs_nid2str(peer->ibp_nid), rc);
1289                goto failed2;
1290        }
1291
1292        LASSERT(cmid->device);
1293        CDEBUG(D_NET, "%s: connection bound to %s:%pI4h:%s\n",
1294               libcfs_nid2str(peer->ibp_nid), dev->ibd_ifname,
1295               &dev->ibd_ifip, cmid->device->name);
1296
1297        return;
1298
1299 failed2:
1300        kiblnd_peer_connect_failed(peer, 1, rc);
1301        kiblnd_peer_decref(peer);              /* cmid's ref */
1302        rdma_destroy_id(cmid);
1303        return;
1304 failed:
1305        kiblnd_peer_connect_failed(peer, 1, rc);
1306}
1307
1308bool
1309kiblnd_reconnect_peer(struct kib_peer *peer)
1310{
1311        rwlock_t *glock = &kiblnd_data.kib_global_lock;
1312        char *reason = NULL;
1313        struct list_head txs;
1314        unsigned long flags;
1315
1316        INIT_LIST_HEAD(&txs);
1317
1318        write_lock_irqsave(glock, flags);
1319        if (!peer->ibp_reconnecting) {
1320                if (peer->ibp_accepting)
1321                        reason = "accepting";
1322                else if (peer->ibp_connecting)
1323                        reason = "connecting";
1324                else if (!list_empty(&peer->ibp_conns))
1325                        reason = "connected";
1326                else /* connected then closed */
1327                        reason = "closed";
1328
1329                goto no_reconnect;
1330        }
1331
1332        LASSERT(!peer->ibp_accepting && !peer->ibp_connecting &&
1333                list_empty(&peer->ibp_conns));
1334        peer->ibp_reconnecting = 0;
1335
1336        if (!kiblnd_peer_active(peer)) {
1337                list_splice_init(&peer->ibp_tx_queue, &txs);
1338                reason = "unlinked";
1339                goto no_reconnect;
1340        }
1341
1342        peer->ibp_connecting++;
1343        peer->ibp_reconnected++;
1344        write_unlock_irqrestore(glock, flags);
1345
1346        kiblnd_connect_peer(peer);
1347        return true;
1348
1349no_reconnect:
1350        write_unlock_irqrestore(glock, flags);
1351
1352        CWARN("Abort reconnection of %s: %s\n",
1353              libcfs_nid2str(peer->ibp_nid), reason);
1354        kiblnd_txlist_done(peer->ibp_ni, &txs, -ECONNABORTED);
1355        return false;
1356}
1357
1358void
1359kiblnd_launch_tx(struct lnet_ni *ni, struct kib_tx *tx, lnet_nid_t nid)
1360{
1361        struct kib_peer *peer;
1362        struct kib_peer *peer2;
1363        struct kib_conn *conn;
1364        rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
1365        unsigned long flags;
1366        int rc;
1367
1368        /*
1369         * If I get here, I've committed to send, so I complete the tx with
1370         * failure on any problems
1371         */
1372        LASSERT(!tx || !tx->tx_conn); /* only set when assigned a conn */
1373        LASSERT(!tx || tx->tx_nwrq > 0);     /* work items have been set up */
1374
1375        /*
1376         * First time, just use a read lock since I expect to find my peer
1377         * connected
1378         */
1379        read_lock_irqsave(g_lock, flags);
1380
1381        peer = kiblnd_find_peer_locked(nid);
1382        if (peer && !list_empty(&peer->ibp_conns)) {
1383                /* Found a peer with an established connection */
1384                conn = kiblnd_get_conn_locked(peer);
1385                kiblnd_conn_addref(conn); /* 1 ref for me... */
1386
1387                read_unlock_irqrestore(g_lock, flags);
1388
1389                if (tx)
1390                        kiblnd_queue_tx(tx, conn);
1391                kiblnd_conn_decref(conn); /* ...to here */
1392                return;
1393        }
1394
1395        read_unlock(g_lock);
1396        /* Re-try with a write lock */
1397        write_lock(g_lock);
1398
1399        peer = kiblnd_find_peer_locked(nid);
1400        if (peer) {
1401                if (list_empty(&peer->ibp_conns)) {
1402                        /* found a peer, but it's still connecting... */
1403                        LASSERT(kiblnd_peer_connecting(peer));
1404                        if (tx)
1405                                list_add_tail(&tx->tx_list,
1406                                              &peer->ibp_tx_queue);
1407                        write_unlock_irqrestore(g_lock, flags);
1408                } else {
1409                        conn = kiblnd_get_conn_locked(peer);
1410                        kiblnd_conn_addref(conn); /* 1 ref for me... */
1411
1412                        write_unlock_irqrestore(g_lock, flags);
1413
1414                        if (tx)
1415                                kiblnd_queue_tx(tx, conn);
1416                        kiblnd_conn_decref(conn); /* ...to here */
1417                }
1418                return;
1419        }
1420
1421        write_unlock_irqrestore(g_lock, flags);
1422
1423        /* Allocate a peer ready to add to the peer table and retry */
1424        rc = kiblnd_create_peer(ni, &peer, nid);
1425        if (rc) {
1426                CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1427                if (tx) {
1428                        tx->tx_status = -EHOSTUNREACH;
1429                        tx->tx_waiting = 0;
1430                        kiblnd_tx_done(ni, tx);
1431                }
1432                return;
1433        }
1434
1435        write_lock_irqsave(g_lock, flags);
1436
1437        peer2 = kiblnd_find_peer_locked(nid);
1438        if (peer2) {
1439                if (list_empty(&peer2->ibp_conns)) {
1440                        /* found a peer, but it's still connecting... */
1441                        LASSERT(kiblnd_peer_connecting(peer2));
1442                        if (tx)
1443                                list_add_tail(&tx->tx_list,
1444                                              &peer2->ibp_tx_queue);
1445                        write_unlock_irqrestore(g_lock, flags);
1446                } else {
1447                        conn = kiblnd_get_conn_locked(peer2);
1448                        kiblnd_conn_addref(conn); /* 1 ref for me... */
1449
1450                        write_unlock_irqrestore(g_lock, flags);
1451
1452                        if (tx)
1453                                kiblnd_queue_tx(tx, conn);
1454                        kiblnd_conn_decref(conn); /* ...to here */
1455                }
1456
1457                kiblnd_peer_decref(peer);
1458                return;
1459        }
1460
1461        /* Brand new peer */
1462        LASSERT(!peer->ibp_connecting);
1463        peer->ibp_connecting = 1;
1464
1465        /* always called with a ref on ni, which prevents ni being shutdown */
1466        LASSERT(!((struct kib_net *)ni->ni_data)->ibn_shutdown);
1467
1468        if (tx)
1469                list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1470
1471        kiblnd_peer_addref(peer);
1472        list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1473
1474        write_unlock_irqrestore(g_lock, flags);
1475
1476        kiblnd_connect_peer(peer);
1477        kiblnd_peer_decref(peer);
1478}
1479
1480int
1481kiblnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg)
1482{
1483        struct lnet_hdr *hdr = &lntmsg->msg_hdr;
1484        int type = lntmsg->msg_type;
1485        struct lnet_process_id target = lntmsg->msg_target;
1486        int target_is_router = lntmsg->msg_target_is_router;
1487        int routing = lntmsg->msg_routing;
1488        unsigned int payload_niov = lntmsg->msg_niov;
1489        struct kvec *payload_iov = lntmsg->msg_iov;
1490        struct bio_vec *payload_kiov = lntmsg->msg_kiov;
1491        unsigned int payload_offset = lntmsg->msg_offset;
1492        unsigned int payload_nob = lntmsg->msg_len;
1493        struct iov_iter from;
1494        struct kib_msg *ibmsg;
1495        struct kib_rdma_desc  *rd;
1496        struct kib_tx *tx;
1497        int nob;
1498        int rc;
1499
1500        /* NB 'private' is different depending on what we're sending.... */
1501
1502        CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1503               payload_nob, payload_niov, libcfs_id2str(target));
1504
1505        LASSERT(!payload_nob || payload_niov > 0);
1506        LASSERT(payload_niov <= LNET_MAX_IOV);
1507
1508        /* Thread context */
1509        LASSERT(!in_interrupt());
1510        /* payload is either all vaddrs or all pages */
1511        LASSERT(!(payload_kiov && payload_iov));
1512
1513        if (payload_kiov)
1514                iov_iter_bvec(&from, ITER_BVEC | WRITE,
1515                              payload_kiov, payload_niov,
1516                              payload_nob + payload_offset);
1517        else
1518                iov_iter_kvec(&from, ITER_KVEC | WRITE,
1519                              payload_iov, payload_niov,
1520                              payload_nob + payload_offset);
1521
1522        iov_iter_advance(&from, payload_offset);
1523
1524        switch (type) {
1525        default:
1526                LBUG();
1527                return -EIO;
1528
1529        case LNET_MSG_ACK:
1530                LASSERT(!payload_nob);
1531                break;
1532
1533        case LNET_MSG_GET:
1534                if (routing || target_is_router)
1535                        break;            /* send IMMEDIATE */
1536
1537                /* is the REPLY message too small for RDMA? */
1538                nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1539                if (nob <= IBLND_MSG_SIZE)
1540                        break;            /* send IMMEDIATE */
1541
1542                tx = kiblnd_get_idle_tx(ni, target.nid);
1543                if (!tx) {
1544                        CERROR("Can't allocate txd for GET to %s\n",
1545                               libcfs_nid2str(target.nid));
1546                        return -ENOMEM;
1547                }
1548
1549                ibmsg = tx->tx_msg;
1550                rd = &ibmsg->ibm_u.get.ibgm_rd;
1551                if (!(lntmsg->msg_md->md_options & LNET_MD_KIOV))
1552                        rc = kiblnd_setup_rd_iov(ni, tx, rd,
1553                                                 lntmsg->msg_md->md_niov,
1554                                                 lntmsg->msg_md->md_iov.iov,
1555                                                 0, lntmsg->msg_md->md_length);
1556                else
1557                        rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1558                                                  lntmsg->msg_md->md_niov,
1559                                                  lntmsg->msg_md->md_iov.kiov,
1560                                                  0, lntmsg->msg_md->md_length);
1561                if (rc) {
1562                        CERROR("Can't setup GET sink for %s: %d\n",
1563                               libcfs_nid2str(target.nid), rc);
1564                        kiblnd_tx_done(ni, tx);
1565                        return -EIO;
1566                }
1567
1568                nob = offsetof(struct kib_get_msg, ibgm_rd.rd_frags[rd->rd_nfrags]);
1569                ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1570                ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1571
1572                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1573
1574                tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1575                if (!tx->tx_lntmsg[1]) {
1576                        CERROR("Can't create reply for GET -> %s\n",
1577                               libcfs_nid2str(target.nid));
1578                        kiblnd_tx_done(ni, tx);
1579                        return -EIO;
1580                }
1581
1582                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1583                tx->tx_waiting = 1;          /* waiting for GET_DONE */
1584                kiblnd_launch_tx(ni, tx, target.nid);
1585                return 0;
1586
1587        case LNET_MSG_REPLY:
1588        case LNET_MSG_PUT:
1589                /* Is the payload small enough not to need RDMA? */
1590                nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob]);
1591                if (nob <= IBLND_MSG_SIZE)
1592                        break;            /* send IMMEDIATE */
1593
1594                tx = kiblnd_get_idle_tx(ni, target.nid);
1595                if (!tx) {
1596                        CERROR("Can't allocate %s txd for %s\n",
1597                               type == LNET_MSG_PUT ? "PUT" : "REPLY",
1598                               libcfs_nid2str(target.nid));
1599                        return -ENOMEM;
1600                }
1601
1602                if (!payload_kiov)
1603                        rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1604                                                 payload_niov, payload_iov,
1605                                                 payload_offset, payload_nob);
1606                else
1607                        rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1608                                                  payload_niov, payload_kiov,
1609                                                  payload_offset, payload_nob);
1610                if (rc) {
1611                        CERROR("Can't setup PUT src for %s: %d\n",
1612                               libcfs_nid2str(target.nid), rc);
1613                        kiblnd_tx_done(ni, tx);
1614                        return -EIO;
1615                }
1616
1617                ibmsg = tx->tx_msg;
1618                ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1619                ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1620                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(struct kib_putreq_msg));
1621
1622                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1623                tx->tx_waiting = 1;          /* waiting for PUT_{ACK,NAK} */
1624                kiblnd_launch_tx(ni, tx, target.nid);
1625                return 0;
1626        }
1627
1628        /* send IMMEDIATE */
1629
1630        LASSERT(offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob])
1631                 <= IBLND_MSG_SIZE);
1632
1633        tx = kiblnd_get_idle_tx(ni, target.nid);
1634        if (!tx) {
1635                CERROR("Can't send %d to %s: tx descs exhausted\n",
1636                       type, libcfs_nid2str(target.nid));
1637                return -ENOMEM;
1638        }
1639
1640        ibmsg = tx->tx_msg;
1641        ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1642
1643        rc = copy_from_iter(&ibmsg->ibm_u.immediate.ibim_payload, payload_nob,
1644                            &from);
1645        if (rc != payload_nob) {
1646                kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
1647                return -EFAULT;
1648        }
1649
1650        nob = offsetof(struct kib_immediate_msg, ibim_payload[payload_nob]);
1651        kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1652
1653        tx->tx_lntmsg[0] = lntmsg;            /* finalise lntmsg on completion */
1654        kiblnd_launch_tx(ni, tx, target.nid);
1655        return 0;
1656}
1657
1658static void
1659kiblnd_reply(struct lnet_ni *ni, struct kib_rx *rx, struct lnet_msg *lntmsg)
1660{
1661        struct lnet_process_id target = lntmsg->msg_target;
1662        unsigned int niov = lntmsg->msg_niov;
1663        struct kvec *iov = lntmsg->msg_iov;
1664        struct bio_vec *kiov = lntmsg->msg_kiov;
1665        unsigned int offset = lntmsg->msg_offset;
1666        unsigned int nob = lntmsg->msg_len;
1667        struct kib_tx *tx;
1668        int rc;
1669
1670        tx = kiblnd_get_idle_tx(ni, rx->rx_conn->ibc_peer->ibp_nid);
1671        if (!tx) {
1672                CERROR("Can't get tx for REPLY to %s\n",
1673                       libcfs_nid2str(target.nid));
1674                goto failed_0;
1675        }
1676
1677        if (!nob)
1678                rc = 0;
1679        else if (!kiov)
1680                rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1681                                         niov, iov, offset, nob);
1682        else
1683                rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1684                                          niov, kiov, offset, nob);
1685
1686        if (rc) {
1687                CERROR("Can't setup GET src for %s: %d\n",
1688                       libcfs_nid2str(target.nid), rc);
1689                goto failed_1;
1690        }
1691
1692        rc = kiblnd_init_rdma(rx->rx_conn, tx,
1693                              IBLND_MSG_GET_DONE, nob,
1694                              &rx->rx_msg->ibm_u.get.ibgm_rd,
1695                              rx->rx_msg->ibm_u.get.ibgm_cookie);
1696        if (rc < 0) {
1697                CERROR("Can't setup rdma for GET from %s: %d\n",
1698                       libcfs_nid2str(target.nid), rc);
1699                goto failed_1;
1700        }
1701
1702        if (!nob) {
1703                /* No RDMA: local completion may happen now! */
1704                lnet_finalize(ni, lntmsg, 0);
1705        } else {
1706                /* RDMA: lnet_finalize(lntmsg) when it completes */
1707                tx->tx_lntmsg[0] = lntmsg;
1708        }
1709
1710        kiblnd_queue_tx(tx, rx->rx_conn);
1711        return;
1712
1713 failed_1:
1714        kiblnd_tx_done(ni, tx);
1715 failed_0:
1716        lnet_finalize(ni, lntmsg, -EIO);
1717}
1718
1719int
1720kiblnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg,
1721            int delayed, struct iov_iter *to, unsigned int rlen)
1722{
1723        struct kib_rx *rx = private;
1724        struct kib_msg *rxmsg = rx->rx_msg;
1725        struct kib_conn *conn = rx->rx_conn;
1726        struct kib_tx *tx;
1727        int nob;
1728        int post_credit = IBLND_POSTRX_PEER_CREDIT;
1729        int rc = 0;
1730
1731        LASSERT(iov_iter_count(to) <= rlen);
1732        LASSERT(!in_interrupt());
1733        /* Either all pages or all vaddrs */
1734
1735        switch (rxmsg->ibm_type) {
1736        default:
1737                LBUG();
1738
1739        case IBLND_MSG_IMMEDIATE:
1740                nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[rlen]);
1741                if (nob > rx->rx_nob) {
1742                        CERROR("Immediate message from %s too big: %d(%d)\n",
1743                               libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1744                               nob, rx->rx_nob);
1745                        rc = -EPROTO;
1746                        break;
1747                }
1748
1749                rc = copy_to_iter(&rxmsg->ibm_u.immediate.ibim_payload, rlen,
1750                                  to);
1751                if (rc != rlen) {
1752                        rc = -EFAULT;
1753                        break;
1754                }
1755
1756                rc = 0;
1757                lnet_finalize(ni, lntmsg, 0);
1758                break;
1759
1760        case IBLND_MSG_PUT_REQ: {
1761                struct kib_msg  *txmsg;
1762                struct kib_rdma_desc *rd;
1763
1764                if (!iov_iter_count(to)) {
1765                        lnet_finalize(ni, lntmsg, 0);
1766                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1767                                               rxmsg->ibm_u.putreq.ibprm_cookie);
1768                        break;
1769                }
1770
1771                tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
1772                if (!tx) {
1773                        CERROR("Can't allocate tx for %s\n",
1774                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
1775                        /* Not replying will break the connection */
1776                        rc = -ENOMEM;
1777                        break;
1778                }
1779
1780                txmsg = tx->tx_msg;
1781                rd = &txmsg->ibm_u.putack.ibpam_rd;
1782                if (!(to->type & ITER_BVEC))
1783                        rc = kiblnd_setup_rd_iov(ni, tx, rd,
1784                                                 to->nr_segs, to->kvec,
1785                                                 to->iov_offset,
1786                                                 iov_iter_count(to));
1787                else
1788                        rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1789                                                  to->nr_segs, to->bvec,
1790                                                  to->iov_offset,
1791                                                  iov_iter_count(to));
1792                if (rc) {
1793                        CERROR("Can't setup PUT sink for %s: %d\n",
1794                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1795                        kiblnd_tx_done(ni, tx);
1796                        /* tell peer it's over */
1797                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1798                                               rxmsg->ibm_u.putreq.ibprm_cookie);
1799                        break;
1800                }
1801
1802                nob = offsetof(struct kib_putack_msg, ibpam_rd.rd_frags[rd->rd_nfrags]);
1803                txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1804                txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1805
1806                kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1807
1808                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1809                tx->tx_waiting = 1;          /* waiting for PUT_DONE */
1810                kiblnd_queue_tx(tx, conn);
1811
1812                /* reposted buffer reserved for PUT_DONE */
1813                post_credit = IBLND_POSTRX_NO_CREDIT;
1814                break;
1815                }
1816
1817        case IBLND_MSG_GET_REQ:
1818                if (lntmsg) {
1819                        /* Optimized GET; RDMA lntmsg's payload */
1820                        kiblnd_reply(ni, rx, lntmsg);
1821                } else {
1822                        /* GET didn't match anything */
1823                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1824                                               -ENODATA,
1825                                               rxmsg->ibm_u.get.ibgm_cookie);
1826                }
1827                break;
1828        }
1829
1830        kiblnd_post_rx(rx, post_credit);
1831        return rc;
1832}
1833
1834int
1835kiblnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
1836{
1837        struct task_struct *task = kthread_run(fn, arg, "%s", name);
1838
1839        if (IS_ERR(task))
1840                return PTR_ERR(task);
1841
1842        atomic_inc(&kiblnd_data.kib_nthreads);
1843        return 0;
1844}
1845
1846static void
1847kiblnd_thread_fini(void)
1848{
1849        atomic_dec(&kiblnd_data.kib_nthreads);
1850}
1851
1852static void
1853kiblnd_peer_alive(struct kib_peer *peer)
1854{
1855        /* This is racy, but everyone's only writing cfs_time_current() */
1856        peer->ibp_last_alive = cfs_time_current();
1857        mb();
1858}
1859
1860static void
1861kiblnd_peer_notify(struct kib_peer *peer)
1862{
1863        int error = 0;
1864        unsigned long last_alive = 0;
1865        unsigned long flags;
1866
1867        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1868
1869        if (kiblnd_peer_idle(peer) && peer->ibp_error) {
1870                error = peer->ibp_error;
1871                peer->ibp_error = 0;
1872
1873                last_alive = peer->ibp_last_alive;
1874        }
1875
1876        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1877
1878        if (error)
1879                lnet_notify(peer->ibp_ni,
1880                            peer->ibp_nid, 0, last_alive);
1881}
1882
1883void
1884kiblnd_close_conn_locked(struct kib_conn *conn, int error)
1885{
1886        /*
1887         * This just does the immediate housekeeping. 'error' is zero for a
1888         * normal shutdown which can happen only after the connection has been
1889         * established.  If the connection is established, schedule the
1890         * connection to be finished off by the connd. Otherwise the connd is
1891         * already dealing with it (either to set it up or tear it down).
1892         * Caller holds kib_global_lock exclusively in irq context
1893         */
1894        struct kib_peer *peer = conn->ibc_peer;
1895        struct kib_dev *dev;
1896        unsigned long flags;
1897
1898        LASSERT(error || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1899
1900        if (error && !conn->ibc_comms_error)
1901                conn->ibc_comms_error = error;
1902
1903        if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1904                return; /* already being handled  */
1905
1906        if (!error &&
1907            list_empty(&conn->ibc_tx_noops) &&
1908            list_empty(&conn->ibc_tx_queue) &&
1909            list_empty(&conn->ibc_tx_queue_rsrvd) &&
1910            list_empty(&conn->ibc_tx_queue_nocred) &&
1911            list_empty(&conn->ibc_active_txs)) {
1912                CDEBUG(D_NET, "closing conn to %s\n",
1913                       libcfs_nid2str(peer->ibp_nid));
1914        } else {
1915                CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1916                        libcfs_nid2str(peer->ibp_nid), error,
1917                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1918                        list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1919                        list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1920                        list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1921                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1922        }
1923
1924        dev = ((struct kib_net *)peer->ibp_ni->ni_data)->ibn_dev;
1925        list_del(&conn->ibc_list);
1926        /* connd (see below) takes over ibc_list's ref */
1927
1928        if (list_empty(&peer->ibp_conns) &&    /* no more conns */
1929            kiblnd_peer_active(peer)) {  /* still in peer table */
1930                kiblnd_unlink_peer_locked(peer);
1931
1932                /* set/clear error on last conn */
1933                peer->ibp_error = conn->ibc_comms_error;
1934        }
1935
1936        kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1937
1938        if (error &&
1939            kiblnd_dev_can_failover(dev)) {
1940                list_add_tail(&dev->ibd_fail_list,
1941                              &kiblnd_data.kib_failed_devs);
1942                wake_up(&kiblnd_data.kib_failover_waitq);
1943        }
1944
1945        spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1946
1947        list_add_tail(&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1948        wake_up(&kiblnd_data.kib_connd_waitq);
1949
1950        spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1951}
1952
1953void
1954kiblnd_close_conn(struct kib_conn *conn, int error)
1955{
1956        unsigned long flags;
1957
1958        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1959
1960        kiblnd_close_conn_locked(conn, error);
1961
1962        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1963}
1964
1965static void
1966kiblnd_handle_early_rxs(struct kib_conn *conn)
1967{
1968        unsigned long flags;
1969        struct kib_rx *rx;
1970        struct kib_rx *tmp;
1971
1972        LASSERT(!in_interrupt());
1973        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1974
1975        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1976        list_for_each_entry_safe(rx, tmp, &conn->ibc_early_rxs, rx_list) {
1977                list_del(&rx->rx_list);
1978                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1979
1980                kiblnd_handle_rx(rx);
1981
1982                write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1983        }
1984        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1985}
1986
1987static void
1988kiblnd_abort_txs(struct kib_conn *conn, struct list_head *txs)
1989{
1990        LIST_HEAD(zombies);
1991        struct list_head *tmp;
1992        struct list_head *nxt;
1993        struct kib_tx *tx;
1994
1995        spin_lock(&conn->ibc_lock);
1996
1997        list_for_each_safe(tmp, nxt, txs) {
1998                tx = list_entry(tmp, struct kib_tx, tx_list);
1999
2000                if (txs == &conn->ibc_active_txs) {
2001                        LASSERT(!tx->tx_queued);
2002                        LASSERT(tx->tx_waiting || tx->tx_sending);
2003                } else {
2004                        LASSERT(tx->tx_queued);
2005                }
2006
2007                tx->tx_status = -ECONNABORTED;
2008                tx->tx_waiting = 0;
2009
2010                if (!tx->tx_sending) {
2011                        tx->tx_queued = 0;
2012                        list_del(&tx->tx_list);
2013                        list_add(&tx->tx_list, &zombies);
2014                }
2015        }
2016
2017        spin_unlock(&conn->ibc_lock);
2018
2019        kiblnd_txlist_done(conn->ibc_peer->ibp_ni, &zombies, -ECONNABORTED);
2020}
2021
2022static void
2023kiblnd_finalise_conn(struct kib_conn *conn)
2024{
2025        LASSERT(!in_interrupt());
2026        LASSERT(conn->ibc_state > IBLND_CONN_INIT);
2027
2028        kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2029
2030        /*
2031         * abort_receives moves QP state to IB_QPS_ERR.  This is only required
2032         * for connections that didn't get as far as being connected, because
2033         * rdma_disconnect() does this for free.
2034         */
2035        kiblnd_abort_receives(conn);
2036
2037        /*
2038         * Complete all tx descs not waiting for sends to complete.
2039         * NB we should be safe from RDMA now that the QP has changed state
2040         */
2041        kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2042        kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2043        kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2044        kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2045        kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2046
2047        kiblnd_handle_early_rxs(conn);
2048}
2049
2050static void
2051kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error)
2052{
2053        LIST_HEAD(zombies);
2054        unsigned long flags;
2055
2056        LASSERT(error);
2057        LASSERT(!in_interrupt());
2058
2059        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2060
2061        if (active) {
2062                LASSERT(peer->ibp_connecting > 0);
2063                peer->ibp_connecting--;
2064        } else {
2065                LASSERT(peer->ibp_accepting > 0);
2066                peer->ibp_accepting--;
2067        }
2068
2069        if (kiblnd_peer_connecting(peer)) {
2070                /* another connection attempt under way... */
2071                write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2072                                        flags);
2073                return;
2074        }
2075
2076        peer->ibp_reconnected = 0;
2077        if (list_empty(&peer->ibp_conns)) {
2078                /* Take peer's blocked transmits to complete with error */
2079                list_add(&zombies, &peer->ibp_tx_queue);
2080                list_del_init(&peer->ibp_tx_queue);
2081
2082                if (kiblnd_peer_active(peer))
2083                        kiblnd_unlink_peer_locked(peer);
2084
2085                peer->ibp_error = error;
2086        } else {
2087                /* Can't have blocked transmits if there are connections */
2088                LASSERT(list_empty(&peer->ibp_tx_queue));
2089        }
2090
2091        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2092
2093        kiblnd_peer_notify(peer);
2094
2095        if (list_empty(&zombies))
2096                return;
2097
2098        CNETERR("Deleting messages for %s: connection failed\n",
2099                libcfs_nid2str(peer->ibp_nid));
2100
2101        kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2102}
2103
2104static void
2105kiblnd_connreq_done(struct kib_conn *conn, int status)
2106{
2107        struct kib_peer *peer = conn->ibc_peer;
2108        struct kib_tx *tx;
2109        struct kib_tx *tmp;
2110        struct list_head txs;
2111        unsigned long flags;
2112        int active;
2113
2114        active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2115
2116        CDEBUG(D_NET, "%s: active(%d), version(%x), status(%d)\n",
2117               libcfs_nid2str(peer->ibp_nid), active,
2118               conn->ibc_version, status);
2119
2120        LASSERT(!in_interrupt());
2121        LASSERT((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2122                 peer->ibp_connecting > 0) ||
2123                 (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2124                 peer->ibp_accepting > 0));
2125
2126        LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2127        conn->ibc_connvars = NULL;
2128
2129        if (status) {
2130                /* failed to establish connection */
2131                kiblnd_peer_connect_failed(peer, active, status);
2132                kiblnd_finalise_conn(conn);
2133                return;
2134        }
2135
2136        /* connection established */
2137        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2138
2139        conn->ibc_last_send = jiffies;
2140        kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2141        kiblnd_peer_alive(peer);
2142
2143        /*
2144         * Add conn to peer's list and nuke any dangling conns from a different
2145         * peer instance...
2146         */
2147        kiblnd_conn_addref(conn);              /* +1 ref for ibc_list */
2148        list_add(&conn->ibc_list, &peer->ibp_conns);
2149        peer->ibp_reconnected = 0;
2150        if (active)
2151                peer->ibp_connecting--;
2152        else
2153                peer->ibp_accepting--;
2154
2155        if (!peer->ibp_version) {
2156                peer->ibp_version     = conn->ibc_version;
2157                peer->ibp_incarnation = conn->ibc_incarnation;
2158        }
2159
2160        if (peer->ibp_version     != conn->ibc_version ||
2161            peer->ibp_incarnation != conn->ibc_incarnation) {
2162                kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2163                                                conn->ibc_incarnation);
2164                peer->ibp_version     = conn->ibc_version;
2165                peer->ibp_incarnation = conn->ibc_incarnation;
2166        }
2167
2168        /* grab pending txs while I have the lock */
2169        list_add(&txs, &peer->ibp_tx_queue);
2170        list_del_init(&peer->ibp_tx_queue);
2171
2172        if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2173            conn->ibc_comms_error) {       /* error has happened already */
2174                struct lnet_ni *ni = peer->ibp_ni;
2175
2176                /* start to shut down connection */
2177                kiblnd_close_conn_locked(conn, -ECONNABORTED);
2178                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2179
2180                kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2181
2182                return;
2183        }
2184
2185        /*
2186         * +1 ref for myself, this connection is visible to other threads
2187         * now, refcount of peer:ibp_conns can be released by connection
2188         * close from either a different thread, or the calling of
2189         * kiblnd_check_sends_locked() below. See bz21911 for details.
2190         */
2191        kiblnd_conn_addref(conn);
2192        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2193
2194        /* Schedule blocked txs */
2195        spin_lock(&conn->ibc_lock);
2196        list_for_each_entry_safe(tx, tmp, &txs, tx_list) {
2197                list_del(&tx->tx_list);
2198
2199                kiblnd_queue_tx_locked(tx, conn);
2200        }
2201        kiblnd_check_sends_locked(conn);
2202        spin_unlock(&conn->ibc_lock);
2203
2204        /* schedule blocked rxs */
2205        kiblnd_handle_early_rxs(conn);
2206
2207        kiblnd_conn_decref(conn);
2208}
2209
2210static void
2211kiblnd_reject(struct rdma_cm_id *cmid, struct kib_rej *rej)
2212{
2213        int rc;
2214
2215        rc = rdma_reject(cmid, rej, sizeof(*rej));
2216
2217        if (rc)
2218                CWARN("Error %d sending reject\n", rc);
2219}
2220
2221static int
2222kiblnd_passive_connect(struct rdma_cm_id *cmid, void *priv, int priv_nob)
2223{
2224        rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
2225        struct kib_msg *reqmsg = priv;
2226        struct kib_msg *ackmsg;
2227        struct kib_dev *ibdev;
2228        struct kib_peer *peer;
2229        struct kib_peer *peer2;
2230        struct kib_conn *conn;
2231        struct lnet_ni *ni  = NULL;
2232        struct kib_net *net = NULL;
2233        lnet_nid_t nid;
2234        struct rdma_conn_param cp;
2235        struct kib_rej rej;
2236        int version = IBLND_MSG_VERSION;
2237        unsigned long flags;
2238        int max_frags;
2239        int rc;
2240        struct sockaddr_in *peer_addr;
2241
2242        LASSERT(!in_interrupt());
2243
2244        /* cmid inherits 'context' from the corresponding listener id */
2245        ibdev = (struct kib_dev *)cmid->context;
2246        LASSERT(ibdev);
2247
2248        memset(&rej, 0, sizeof(rej));
2249        rej.ibr_magic = IBLND_MSG_MAGIC;
2250        rej.ibr_why = IBLND_REJECT_FATAL;
2251        rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2252
2253        peer_addr = (struct sockaddr_in *)&cmid->route.addr.dst_addr;
2254        if (*kiblnd_tunables.kib_require_priv_port &&
2255            ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2256                __u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2257
2258                CERROR("Peer's port (%pI4h:%hu) is not privileged\n",
2259                       &ip, ntohs(peer_addr->sin_port));
2260                goto failed;
2261        }
2262
2263        if (priv_nob < offsetof(struct kib_msg, ibm_type)) {
2264                CERROR("Short connection request\n");
2265                goto failed;
2266        }
2267
2268        /*
2269         * Future protocol version compatibility support!  If the
2270         * o2iblnd-specific protocol changes, or when LNET unifies
2271         * protocols over all LNDs, the initial connection will
2272         * negotiate a protocol version.  I trap this here to avoid
2273         * console errors; the reject tells the peer which protocol I
2274         * speak.
2275         */
2276        if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2277            reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2278                goto failed;
2279        if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2280            reqmsg->ibm_version != IBLND_MSG_VERSION &&
2281            reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2282                goto failed;
2283        if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2284            reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2285            reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2286                goto failed;
2287
2288        rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2289        if (rc) {
2290                CERROR("Can't parse connection request: %d\n", rc);
2291                goto failed;
2292        }
2293
2294        nid = reqmsg->ibm_srcnid;
2295        ni = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2296
2297        if (ni) {
2298                net = (struct kib_net *)ni->ni_data;
2299                rej.ibr_incarnation = net->ibn_incarnation;
2300        }
2301
2302        if (!ni ||                       /* no matching net */
2303            ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2304            net->ibn_dev != ibdev) {          /* wrong device */
2305                CERROR("Can't accept conn from %s on %s (%s:%d:%pI4h): bad dst nid %s\n",
2306                       libcfs_nid2str(nid),
2307                       !ni ? "NA" : libcfs_nid2str(ni->ni_nid),
2308                       ibdev->ibd_ifname, ibdev->ibd_nnets,
2309                       &ibdev->ibd_ifip,
2310                       libcfs_nid2str(reqmsg->ibm_dstnid));
2311
2312                goto failed;
2313        }
2314
2315       /* check time stamp as soon as possible */
2316        if (reqmsg->ibm_dststamp &&
2317            reqmsg->ibm_dststamp != net->ibn_incarnation) {
2318                CWARN("Stale connection request\n");
2319                rej.ibr_why = IBLND_REJECT_CONN_STALE;
2320                goto failed;
2321        }
2322
2323        /* I can accept peer's version */
2324        version = reqmsg->ibm_version;
2325
2326        if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2327                CERROR("Unexpected connreq msg type: %x from %s\n",
2328                       reqmsg->ibm_type, libcfs_nid2str(nid));
2329                goto failed;
2330        }
2331
2332        if (reqmsg->ibm_u.connparams.ibcp_queue_depth >
2333            kiblnd_msg_queue_size(version, ni)) {
2334                CERROR("Can't accept conn from %s, queue depth too large: %d (<=%d wanted)\n",
2335                       libcfs_nid2str(nid),
2336                       reqmsg->ibm_u.connparams.ibcp_queue_depth,
2337                       kiblnd_msg_queue_size(version, ni));
2338
2339                if (version == IBLND_MSG_VERSION)
2340                        rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2341
2342                goto failed;
2343        }
2344
2345        max_frags = reqmsg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2346        if (max_frags > kiblnd_rdma_frags(version, ni)) {
2347                CWARN("Can't accept conn from %s (version %x): max message size %d is too large (%d wanted)\n",
2348                      libcfs_nid2str(nid), version, max_frags,
2349                      kiblnd_rdma_frags(version, ni));
2350
2351                if (version >= IBLND_MSG_VERSION)
2352                        rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2353
2354                goto failed;
2355        } else if (max_frags < kiblnd_rdma_frags(version, ni) &&
2356                   !net->ibn_fmr_ps) {
2357                CWARN("Can't accept conn from %s (version %x): max message size %d incompatible without FMR pool (%d wanted)\n",
2358                      libcfs_nid2str(nid), version, max_frags,
2359                      kiblnd_rdma_frags(version, ni));
2360
2361                if (version == IBLND_MSG_VERSION)
2362                        rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2363
2364                goto failed;
2365        }
2366
2367        if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2368                CERROR("Can't accept %s: message size %d too big (%d max)\n",
2369                       libcfs_nid2str(nid),
2370                       reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2371                       IBLND_MSG_SIZE);
2372                goto failed;
2373        }
2374
2375        /* assume 'nid' is a new peer; create  */
2376        rc = kiblnd_create_peer(ni, &peer, nid);
2377        if (rc) {
2378                CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2379                rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2380                goto failed;
2381        }
2382
2383        /* We have validated the peer's parameters so use those */
2384        peer->ibp_max_frags = max_frags;
2385        peer->ibp_queue_depth = reqmsg->ibm_u.connparams.ibcp_queue_depth;
2386
2387        write_lock_irqsave(g_lock, flags);
2388
2389        peer2 = kiblnd_find_peer_locked(nid);
2390        if (peer2) {
2391                if (!peer2->ibp_version) {
2392                        peer2->ibp_version     = version;
2393                        peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2394                }
2395
2396                /* not the guy I've talked with */
2397                if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2398                    peer2->ibp_version     != version) {
2399                        kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2400
2401                        if (kiblnd_peer_active(peer2)) {
2402                                peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2403                                peer2->ibp_version = version;
2404                        }
2405                        write_unlock_irqrestore(g_lock, flags);
2406
2407                        CWARN("Conn stale %s version %x/%x incarnation %llu/%llu\n",
2408                              libcfs_nid2str(nid), peer2->ibp_version, version,
2409                              peer2->ibp_incarnation, reqmsg->ibm_srcstamp);
2410
2411                        kiblnd_peer_decref(peer);
2412                        rej.ibr_why = IBLND_REJECT_CONN_STALE;
2413                        goto failed;
2414                }
2415
2416                /*
2417                 * Tie-break connection race in favour of the higher NID.
2418                 * If we keep running into a race condition multiple times,
2419                 * we have to assume that the connection attempt with the
2420                 * higher NID is stuck in a connecting state and will never
2421                 * recover.  As such, we pass through this if-block and let
2422                 * the lower NID connection win so we can move forward.
2423                 */
2424                if (peer2->ibp_connecting &&
2425                    nid < ni->ni_nid && peer2->ibp_races <
2426                    MAX_CONN_RACES_BEFORE_ABORT) {
2427                        peer2->ibp_races++;
2428                        write_unlock_irqrestore(g_lock, flags);
2429
2430                        CDEBUG(D_NET, "Conn race %s\n",
2431                               libcfs_nid2str(peer2->ibp_nid));
2432
2433                        kiblnd_peer_decref(peer);
2434                        rej.ibr_why = IBLND_REJECT_CONN_RACE;
2435                        goto failed;
2436                }
2437                if (peer2->ibp_races >= MAX_CONN_RACES_BEFORE_ABORT)
2438                        CNETERR("Conn race %s: unresolved after %d attempts, letting lower NID win\n",
2439                                libcfs_nid2str(peer2->ibp_nid),
2440                                MAX_CONN_RACES_BEFORE_ABORT);
2441                /**
2442                 * passive connection is allowed even this peer is waiting for
2443                 * reconnection.
2444                 */
2445                peer2->ibp_reconnecting = 0;
2446                peer2->ibp_races = 0;
2447                peer2->ibp_accepting++;
2448                kiblnd_peer_addref(peer2);
2449
2450                /**
2451                 * Race with kiblnd_launch_tx (active connect) to create peer
2452                 * so copy validated parameters since we now know what the
2453                 * peer's limits are
2454                 */
2455                peer2->ibp_max_frags = peer->ibp_max_frags;
2456                peer2->ibp_queue_depth = peer->ibp_queue_depth;
2457
2458                write_unlock_irqrestore(g_lock, flags);
2459                kiblnd_peer_decref(peer);
2460                peer = peer2;
2461        } else {
2462                /* Brand new peer */
2463                LASSERT(!peer->ibp_accepting);
2464                LASSERT(!peer->ibp_version &&
2465                        !peer->ibp_incarnation);
2466
2467                peer->ibp_accepting   = 1;
2468                peer->ibp_version     = version;
2469                peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2470
2471                /* I have a ref on ni that prevents it being shutdown */
2472                LASSERT(!net->ibn_shutdown);
2473
2474                kiblnd_peer_addref(peer);
2475                list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2476
2477                write_unlock_irqrestore(g_lock, flags);
2478        }
2479
2480        conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT,
2481                                  version);
2482        if (!conn) {
2483                kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2484                kiblnd_peer_decref(peer);
2485                rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2486                goto failed;
2487        }
2488
2489        /*
2490         * conn now "owns" cmid, so I return success from here on to ensure the
2491         * CM callback doesn't destroy cmid.
2492         */
2493        conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2494        conn->ibc_credits          = conn->ibc_queue_depth;
2495        conn->ibc_reserved_credits = conn->ibc_queue_depth;
2496        LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2497                IBLND_OOB_MSGS(version) <= IBLND_RX_MSGS(conn));
2498
2499        ackmsg = &conn->ibc_connvars->cv_msg;
2500        memset(ackmsg, 0, sizeof(*ackmsg));
2501
2502        kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2503                        sizeof(ackmsg->ibm_u.connparams));
2504        ackmsg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2505        ackmsg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2506        ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2507
2508        kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2509
2510        memset(&cp, 0, sizeof(cp));
2511        cp.private_data = ackmsg;
2512        cp.private_data_len = ackmsg->ibm_nob;
2513        cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2514        cp.initiator_depth = 0;
2515        cp.flow_control = 1;
2516        cp.retry_count = *kiblnd_tunables.kib_retry_count;
2517        cp.rnr_retry_count = *kiblnd_tunables.kib_rnr_retry_count;
2518
2519        CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2520
2521        rc = rdma_accept(cmid, &cp);
2522        if (rc) {
2523                CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2524                rej.ibr_version = version;
2525                rej.ibr_why     = IBLND_REJECT_FATAL;
2526
2527                kiblnd_reject(cmid, &rej);
2528                kiblnd_connreq_done(conn, rc);
2529                kiblnd_conn_decref(conn);
2530        }
2531
2532        lnet_ni_decref(ni);
2533        return 0;
2534
2535 failed:
2536        if (ni) {
2537                rej.ibr_cp.ibcp_queue_depth = kiblnd_msg_queue_size(version, ni);
2538                rej.ibr_cp.ibcp_max_frags = kiblnd_rdma_frags(version, ni);
2539                lnet_ni_decref(ni);
2540        }
2541
2542        rej.ibr_version             = version;
2543        kiblnd_reject(cmid, &rej);
2544
2545        return -ECONNREFUSED;
2546}
2547
2548static void
2549kiblnd_check_reconnect(struct kib_conn *conn, int version,
2550                       __u64 incarnation, int why, struct kib_connparams *cp)
2551{
2552        rwlock_t *glock = &kiblnd_data.kib_global_lock;
2553        struct kib_peer *peer = conn->ibc_peer;
2554        char *reason;
2555        int msg_size = IBLND_MSG_SIZE;
2556        int frag_num = -1;
2557        int queue_dep = -1;
2558        bool reconnect;
2559        unsigned long flags;
2560
2561        LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2562        LASSERT(peer->ibp_connecting > 0);     /* 'conn' at least */
2563        LASSERT(!peer->ibp_reconnecting);
2564
2565        if (cp) {
2566                msg_size = cp->ibcp_max_msg_size;
2567                frag_num        = cp->ibcp_max_frags << IBLND_FRAG_SHIFT;
2568                queue_dep = cp->ibcp_queue_depth;
2569        }
2570
2571        write_lock_irqsave(glock, flags);
2572        /**
2573         * retry connection if it's still needed and no other connection
2574         * attempts (active or passive) are in progress
2575         * NB: reconnect is still needed even when ibp_tx_queue is
2576         * empty if ibp_version != version because reconnect may be
2577         * initiated by kiblnd_query()
2578         */
2579        reconnect = (!list_empty(&peer->ibp_tx_queue) ||
2580                     peer->ibp_version != version) &&
2581                    peer->ibp_connecting == 1 &&
2582                    !peer->ibp_accepting;
2583        if (!reconnect) {
2584                reason = "no need";
2585                goto out;
2586        }
2587
2588        switch (why) {
2589        default:
2590                reason = "Unknown";
2591                break;
2592
2593        case IBLND_REJECT_RDMA_FRAGS: {
2594                struct lnet_ioctl_config_lnd_tunables *tunables;
2595
2596                if (!cp) {
2597                        reason = "can't negotiate max frags";
2598                        goto out;
2599                }
2600                tunables = peer->ibp_ni->ni_lnd_tunables;
2601                if (!tunables->lt_tun_u.lt_o2ib.lnd_map_on_demand) {
2602                        reason = "map_on_demand must be enabled";
2603                        goto out;
2604                }
2605                if (conn->ibc_max_frags <= frag_num) {
2606                        reason = "unsupported max frags";
2607                        goto out;
2608                }
2609
2610                peer->ibp_max_frags = frag_num;
2611                reason = "rdma fragments";
2612                break;
2613        }
2614        case IBLND_REJECT_MSG_QUEUE_SIZE:
2615                if (!cp) {
2616                        reason = "can't negotiate queue depth";
2617                        goto out;
2618                }
2619                if (conn->ibc_queue_depth <= queue_dep) {
2620                        reason = "unsupported queue depth";
2621                        goto out;
2622                }
2623
2624                peer->ibp_queue_depth = queue_dep;
2625                reason = "queue depth";
2626                break;
2627
2628        case IBLND_REJECT_CONN_STALE:
2629                reason = "stale";
2630                break;
2631
2632        case IBLND_REJECT_CONN_RACE:
2633                reason = "conn race";
2634                break;
2635
2636        case IBLND_REJECT_CONN_UNCOMPAT:
2637                reason = "version negotiation";
2638                break;
2639        }
2640
2641        conn->ibc_reconnect = 1;
2642        peer->ibp_reconnecting = 1;
2643        peer->ibp_version = version;
2644        if (incarnation)
2645                peer->ibp_incarnation = incarnation;
2646out:
2647        write_unlock_irqrestore(glock, flags);
2648
2649        CNETERR("%s: %s (%s), %x, %x, msg_size: %d, queue_depth: %d/%d, max_frags: %d/%d\n",
2650                libcfs_nid2str(peer->ibp_nid),
2651                reconnect ? "reconnect" : "don't reconnect",
2652                reason, IBLND_MSG_VERSION, version, msg_size,
2653                conn->ibc_queue_depth, queue_dep,
2654                conn->ibc_max_frags, frag_num);
2655        /**
2656         * if conn::ibc_reconnect is TRUE, connd will reconnect to the peer
2657         * while destroying the zombie
2658         */
2659}
2660
2661static void
2662kiblnd_rejected(struct kib_conn *conn, int reason, void *priv, int priv_nob)
2663{
2664        struct kib_peer *peer = conn->ibc_peer;
2665
2666        LASSERT(!in_interrupt());
2667        LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2668
2669        switch (reason) {
2670        case IB_CM_REJ_STALE_CONN:
2671                kiblnd_check_reconnect(conn, IBLND_MSG_VERSION, 0,
2672                                       IBLND_REJECT_CONN_STALE, NULL);
2673                break;
2674
2675        case IB_CM_REJ_INVALID_SERVICE_ID:
2676                CNETERR("%s rejected: no listener at %d\n",
2677                        libcfs_nid2str(peer->ibp_nid),
2678                        *kiblnd_tunables.kib_service);
2679                break;
2680
2681        case IB_CM_REJ_CONSUMER_DEFINED:
2682                if (priv_nob >= offsetof(struct kib_rej, ibr_padding)) {
2683                        struct kib_rej *rej = priv;
2684                        struct kib_connparams *cp = NULL;
2685                        int flip = 0;
2686                        __u64 incarnation = -1;
2687
2688                        /* NB. default incarnation is -1 because:
2689                         * a) V1 will ignore dst incarnation in connreq.
2690                         * b) V2 will provide incarnation while rejecting me,
2691                         *    -1 will be overwrote.
2692                         *
2693                         * if I try to connect to a V1 peer with V2 protocol,
2694                         * it rejected me then upgrade to V2, I have no idea
2695                         * about the upgrading and try to reconnect with V1,
2696                         * in this case upgraded V2 can find out I'm trying to
2697                         * talk to the old guy and reject me(incarnation is -1).
2698                         */
2699
2700                        if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2701                            rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2702                                __swab32s(&rej->ibr_magic);
2703                                __swab16s(&rej->ibr_version);
2704                                flip = 1;
2705                        }
2706
2707                        if (priv_nob >= sizeof(struct kib_rej) &&
2708                            rej->ibr_version > IBLND_MSG_VERSION_1) {
2709                                /*
2710                                 * priv_nob is always 148 in current version
2711                                 * of OFED, so we still need to check version.
2712                                 * (define of IB_CM_REJ_PRIVATE_DATA_SIZE)
2713                                 */
2714                                cp = &rej->ibr_cp;
2715
2716                                if (flip) {
2717                                        __swab64s(&rej->ibr_incarnation);
2718                                        __swab16s(&cp->ibcp_queue_depth);
2719                                        __swab16s(&cp->ibcp_max_frags);
2720                                        __swab32s(&cp->ibcp_max_msg_size);
2721                                }
2722
2723                                incarnation = rej->ibr_incarnation;
2724                        }
2725
2726                        if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2727                            rej->ibr_magic != LNET_PROTO_MAGIC) {
2728                                CERROR("%s rejected: consumer defined fatal error\n",
2729                                       libcfs_nid2str(peer->ibp_nid));
2730                                break;
2731                        }
2732
2733                        if (rej->ibr_version != IBLND_MSG_VERSION &&
2734                            rej->ibr_version != IBLND_MSG_VERSION_1) {
2735                                CERROR("%s rejected: o2iblnd version %x error\n",
2736                                       libcfs_nid2str(peer->ibp_nid),
2737                                       rej->ibr_version);
2738                                break;
2739                        }
2740
2741                        if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2742                            rej->ibr_version == IBLND_MSG_VERSION_1) {
2743                                CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2744                                       libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2745
2746                                if (conn->ibc_version != IBLND_MSG_VERSION_1)
2747                                        rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2748                        }
2749
2750                        switch (rej->ibr_why) {
2751                        case IBLND_REJECT_CONN_RACE:
2752                        case IBLND_REJECT_CONN_STALE:
2753                        case IBLND_REJECT_CONN_UNCOMPAT:
2754                        case IBLND_REJECT_MSG_QUEUE_SIZE:
2755                        case IBLND_REJECT_RDMA_FRAGS:
2756                                kiblnd_check_reconnect(conn, rej->ibr_version,
2757                                                       incarnation,
2758                                                       rej->ibr_why, cp);
2759                                break;
2760
2761                        case IBLND_REJECT_NO_RESOURCES:
2762                                CERROR("%s rejected: o2iblnd no resources\n",
2763                                       libcfs_nid2str(peer->ibp_nid));
2764                                break;
2765
2766                        case IBLND_REJECT_FATAL:
2767                                CERROR("%s rejected: o2iblnd fatal error\n",
2768                                       libcfs_nid2str(peer->ibp_nid));
2769                                break;
2770
2771                        default:
2772                                CERROR("%s rejected: o2iblnd reason %d\n",
2773                                       libcfs_nid2str(peer->ibp_nid),
2774                                       rej->ibr_why);
2775                                break;
2776                        }
2777                        break;
2778                }
2779                /* fall through */
2780        default:
2781                CNETERR("%s rejected: reason %d, size %d\n",
2782                        libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2783                break;
2784        }
2785
2786        kiblnd_connreq_done(conn, -ECONNREFUSED);
2787}
2788
2789static void
2790kiblnd_check_connreply(struct kib_conn *conn, void *priv, int priv_nob)
2791{
2792        struct kib_peer *peer = conn->ibc_peer;
2793        struct lnet_ni *ni = peer->ibp_ni;
2794        struct kib_net *net = ni->ni_data;
2795        struct kib_msg *msg = priv;
2796        int ver = conn->ibc_version;
2797        int rc = kiblnd_unpack_msg(msg, priv_nob);
2798        unsigned long flags;
2799
2800        LASSERT(net);
2801
2802        if (rc) {
2803                CERROR("Can't unpack connack from %s: %d\n",
2804                       libcfs_nid2str(peer->ibp_nid), rc);
2805                goto failed;
2806        }
2807
2808        if (msg->ibm_type != IBLND_MSG_CONNACK) {
2809                CERROR("Unexpected message %d from %s\n",
2810                       msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2811                rc = -EPROTO;
2812                goto failed;
2813        }
2814
2815        if (ver != msg->ibm_version) {
2816                CERROR("%s replied version %x is different with requested version %x\n",
2817                       libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2818                rc = -EPROTO;
2819                goto failed;
2820        }
2821
2822        if (msg->ibm_u.connparams.ibcp_queue_depth >
2823            conn->ibc_queue_depth) {
2824                CERROR("%s has incompatible queue depth %d (<=%d wanted)\n",
2825                       libcfs_nid2str(peer->ibp_nid),
2826                       msg->ibm_u.connparams.ibcp_queue_depth,
2827                       conn->ibc_queue_depth);
2828                rc = -EPROTO;
2829                goto failed;
2830        }
2831
2832        if ((msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT) >
2833            conn->ibc_max_frags) {
2834                CERROR("%s has incompatible max_frags %d (<=%d wanted)\n",
2835                       libcfs_nid2str(peer->ibp_nid),
2836                       msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT,
2837                       conn->ibc_max_frags);
2838                rc = -EPROTO;
2839                goto failed;
2840        }
2841
2842        if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2843                CERROR("%s max message size %d too big (%d max)\n",
2844                       libcfs_nid2str(peer->ibp_nid),
2845                       msg->ibm_u.connparams.ibcp_max_msg_size,
2846                       IBLND_MSG_SIZE);
2847                rc = -EPROTO;
2848                goto failed;
2849        }
2850
2851        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2852        if (msg->ibm_dstnid == ni->ni_nid &&
2853            msg->ibm_dststamp == net->ibn_incarnation)
2854                rc = 0;
2855        else
2856                rc = -ESTALE;
2857        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2858
2859        if (rc) {
2860                CERROR("Bad connection reply from %s, rc = %d, version: %x max_frags: %d\n",
2861                       libcfs_nid2str(peer->ibp_nid), rc,
2862                       msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2863                goto failed;
2864        }
2865
2866        conn->ibc_incarnation = msg->ibm_srcstamp;
2867        conn->ibc_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2868        conn->ibc_reserved_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2869        conn->ibc_queue_depth = msg->ibm_u.connparams.ibcp_queue_depth;
2870        conn->ibc_max_frags = msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2871        LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2872                IBLND_OOB_MSGS(ver) <= IBLND_RX_MSGS(conn));
2873
2874        kiblnd_connreq_done(conn, 0);
2875        return;
2876
2877 failed:
2878        /*
2879         * NB My QP has already established itself, so I handle anything going
2880         * wrong here by setting ibc_comms_error.
2881         * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2882         * immediately tears it down.
2883         */
2884        LASSERT(rc);
2885        conn->ibc_comms_error = rc;
2886        kiblnd_connreq_done(conn, 0);
2887}
2888
2889static int
2890kiblnd_active_connect(struct rdma_cm_id *cmid)
2891{
2892        struct kib_peer *peer = (struct kib_peer *)cmid->context;
2893        struct kib_conn *conn;
2894        struct kib_msg *msg;
2895        struct rdma_conn_param cp;
2896        int version;
2897        __u64 incarnation;
2898        unsigned long flags;
2899        int rc;
2900
2901        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2902
2903        incarnation = peer->ibp_incarnation;
2904        version = !peer->ibp_version ? IBLND_MSG_VERSION :
2905                                       peer->ibp_version;
2906
2907        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2908
2909        conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT,
2910                                  version);
2911        if (!conn) {
2912                kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2913                kiblnd_peer_decref(peer); /* lose cmid's ref */
2914                return -ENOMEM;
2915        }
2916
2917        /*
2918         * conn "owns" cmid now, so I return success from here on to ensure the
2919         * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2920         * on peer
2921         */
2922        msg = &conn->ibc_connvars->cv_msg;
2923
2924        memset(msg, 0, sizeof(*msg));
2925        kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2926        msg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2927        msg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2928        msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2929
2930        kiblnd_pack_msg(peer->ibp_ni, msg, version,
2931                        0, peer->ibp_nid, incarnation);
2932
2933        memset(&cp, 0, sizeof(cp));
2934        cp.private_data = msg;
2935        cp.private_data_len    = msg->ibm_nob;
2936        cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2937        cp.initiator_depth     = 0;
2938        cp.flow_control        = 1;
2939        cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2940        cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2941
2942        LASSERT(cmid->context == (void *)conn);
2943        LASSERT(conn->ibc_cmid == cmid);
2944
2945        rc = rdma_connect(cmid, &cp);
2946        if (rc) {
2947                CERROR("Can't connect to %s: %d\n",
2948                       libcfs_nid2str(peer->ibp_nid), rc);
2949                kiblnd_connreq_done(conn, rc);
2950                kiblnd_conn_decref(conn);
2951        }
2952
2953        return 0;
2954}
2955
2956int
2957kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2958{
2959        struct kib_peer *peer;
2960        struct kib_conn *conn;
2961        int rc;
2962
2963        switch (event->event) {
2964        default:
2965                CERROR("Unexpected event: %d, status: %d\n",
2966                       event->event, event->status);
2967                LBUG();
2968
2969        case RDMA_CM_EVENT_CONNECT_REQUEST:
2970                /* destroy cmid on failure */
2971                rc = kiblnd_passive_connect(cmid,
2972                                            (void *)KIBLND_CONN_PARAM(event),
2973                                            KIBLND_CONN_PARAM_LEN(event));
2974                CDEBUG(D_NET, "connreq: %d\n", rc);
2975                return rc;
2976
2977        case RDMA_CM_EVENT_ADDR_ERROR:
2978                peer = (struct kib_peer *)cmid->context;
2979                CNETERR("%s: ADDR ERROR %d\n",
2980                        libcfs_nid2str(peer->ibp_nid), event->status);
2981                kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2982                kiblnd_peer_decref(peer);
2983                return -EHOSTUNREACH;      /* rc destroys cmid */
2984
2985        case RDMA_CM_EVENT_ADDR_RESOLVED:
2986                peer = (struct kib_peer *)cmid->context;
2987
2988                CDEBUG(D_NET, "%s Addr resolved: %d\n",
2989                       libcfs_nid2str(peer->ibp_nid), event->status);
2990
2991                if (event->status) {
2992                        CNETERR("Can't resolve address for %s: %d\n",
2993                                libcfs_nid2str(peer->ibp_nid), event->status);
2994                        rc = event->status;
2995                } else {
2996                        rc = rdma_resolve_route(
2997                                cmid, *kiblnd_tunables.kib_timeout * 1000);
2998                        if (!rc)
2999                                return 0;
3000                        /* Can't initiate route resolution */
3001                        CERROR("Can't resolve route for %s: %d\n",
3002                               libcfs_nid2str(peer->ibp_nid), rc);
3003                }
3004                kiblnd_peer_connect_failed(peer, 1, rc);
3005                kiblnd_peer_decref(peer);
3006                return rc;                    /* rc destroys cmid */
3007
3008        case RDMA_CM_EVENT_ROUTE_ERROR:
3009                peer = (struct kib_peer *)cmid->context;
3010                CNETERR("%s: ROUTE ERROR %d\n",
3011                        libcfs_nid2str(peer->ibp_nid), event->status);
3012                kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
3013                kiblnd_peer_decref(peer);
3014                return -EHOSTUNREACH;      /* rc destroys cmid */
3015
3016        case RDMA_CM_EVENT_ROUTE_RESOLVED:
3017                peer = (struct kib_peer *)cmid->context;
3018                CDEBUG(D_NET, "%s Route resolved: %d\n",
3019                       libcfs_nid2str(peer->ibp_nid), event->status);
3020
3021                if (!event->status)
3022                        return kiblnd_active_connect(cmid);
3023
3024                CNETERR("Can't resolve route for %s: %d\n",
3025                        libcfs_nid2str(peer->ibp_nid), event->status);
3026                kiblnd_peer_connect_failed(peer, 1, event->status);
3027                kiblnd_peer_decref(peer);
3028                return event->status;      /* rc destroys cmid */
3029
3030        case RDMA_CM_EVENT_UNREACHABLE:
3031                conn = (struct kib_conn *)cmid->context;
3032                LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3033                        conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3034                CNETERR("%s: UNREACHABLE %d\n",
3035                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3036                kiblnd_connreq_done(conn, -ENETDOWN);
3037                kiblnd_conn_decref(conn);
3038                return 0;
3039
3040        case RDMA_CM_EVENT_CONNECT_ERROR:
3041                conn = (struct kib_conn *)cmid->context;
3042                LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3043                        conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3044                CNETERR("%s: CONNECT ERROR %d\n",
3045                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3046                kiblnd_connreq_done(conn, -ENOTCONN);
3047                kiblnd_conn_decref(conn);
3048                return 0;
3049
3050        case RDMA_CM_EVENT_REJECTED:
3051                conn = (struct kib_conn *)cmid->context;
3052                switch (conn->ibc_state) {
3053                default:
3054                        LBUG();
3055
3056                case IBLND_CONN_PASSIVE_WAIT:
3057                        CERROR("%s: REJECTED %d\n",
3058                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
3059                               event->status);
3060                        kiblnd_connreq_done(conn, -ECONNRESET);
3061                        break;
3062
3063                case IBLND_CONN_ACTIVE_CONNECT:
3064                        kiblnd_rejected(conn, event->status,
3065                                        (void *)KIBLND_CONN_PARAM(event),
3066                                        KIBLND_CONN_PARAM_LEN(event));
3067                        break;
3068                }
3069                kiblnd_conn_decref(conn);
3070                return 0;
3071
3072        case RDMA_CM_EVENT_ESTABLISHED:
3073                conn = (struct kib_conn *)cmid->context;
3074                switch (conn->ibc_state) {
3075                default:
3076                        LBUG();
3077
3078                case IBLND_CONN_PASSIVE_WAIT:
3079                        CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
3080                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
3081                        kiblnd_connreq_done(conn, 0);
3082                        break;
3083
3084                case IBLND_CONN_ACTIVE_CONNECT:
3085                        CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
3086                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
3087                        kiblnd_check_connreply(conn,
3088                                               (void *)KIBLND_CONN_PARAM(event),
3089                                               KIBLND_CONN_PARAM_LEN(event));
3090                        break;
3091                }
3092                /* net keeps its ref on conn! */
3093                return 0;
3094
3095        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
3096                CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
3097                return 0;
3098        case RDMA_CM_EVENT_DISCONNECTED:
3099                conn = (struct kib_conn *)cmid->context;
3100                if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
3101                        CERROR("%s DISCONNECTED\n",
3102                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
3103                        kiblnd_connreq_done(conn, -ECONNRESET);
3104                } else {
3105                        kiblnd_close_conn(conn, 0);
3106                }
3107                kiblnd_conn_decref(conn);
3108                cmid->context = NULL;
3109                return 0;
3110
3111        case RDMA_CM_EVENT_DEVICE_REMOVAL:
3112                LCONSOLE_ERROR_MSG(0x131,
3113                                   "Received notification of device removal\n"
3114                                   "Please shutdown LNET to allow this to proceed\n");
3115                /*
3116                 * Can't remove network from underneath LNET for now, so I have
3117                 * to ignore this
3118                 */
3119                return 0;
3120
3121        case RDMA_CM_EVENT_ADDR_CHANGE:
3122                LCONSOLE_INFO("Physical link changed (eg hca/port)\n");
3123                return 0;
3124        }
3125}
3126
3127static int
3128kiblnd_check_txs_locked(struct kib_conn *conn, struct list_head *txs)
3129{
3130        struct kib_tx *tx;
3131        struct list_head *ttmp;
3132
3133        list_for_each(ttmp, txs) {
3134                tx = list_entry(ttmp, struct kib_tx, tx_list);
3135
3136                if (txs != &conn->ibc_active_txs) {
3137                        LASSERT(tx->tx_queued);
3138                } else {
3139                        LASSERT(!tx->tx_queued);
3140                        LASSERT(tx->tx_waiting || tx->tx_sending);
3141                }
3142
3143                if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
3144                        CERROR("Timed out tx: %s, %lu seconds\n",
3145                               kiblnd_queue2str(conn, txs),
3146                               cfs_duration_sec(jiffies - tx->tx_deadline));
3147                        return 1;
3148                }
3149        }
3150
3151        return 0;
3152}
3153
3154static int
3155kiblnd_conn_timed_out_locked(struct kib_conn *conn)
3156{
3157        return  kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue) ||
3158                kiblnd_check_txs_locked(conn, &conn->ibc_tx_noops) ||
3159                kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_rsrvd) ||
3160                kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_nocred) ||
3161                kiblnd_check_txs_locked(conn, &conn->ibc_active_txs);
3162}
3163
3164static void
3165kiblnd_check_conns(int idx)
3166{
3167        LIST_HEAD(closes);
3168        LIST_HEAD(checksends);
3169        struct list_head *peers = &kiblnd_data.kib_peers[idx];
3170        struct list_head *ptmp;
3171        struct kib_peer *peer;
3172        struct kib_conn *conn;
3173        struct kib_conn *temp;
3174        struct kib_conn *tmp;
3175        struct list_head *ctmp;
3176        unsigned long flags;
3177
3178        /*
3179         * NB. We expect to have a look at all the peers and not find any
3180         * RDMAs to time out, so we just use a shared lock while we
3181         * take a look...
3182         */
3183        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3184
3185        list_for_each(ptmp, peers) {
3186                peer = list_entry(ptmp, struct kib_peer, ibp_list);
3187
3188                list_for_each(ctmp, &peer->ibp_conns) {
3189                        int timedout;
3190                        int sendnoop;
3191
3192                        conn = list_entry(ctmp, struct kib_conn, ibc_list);
3193
3194                        LASSERT(conn->ibc_state == IBLND_CONN_ESTABLISHED);
3195
3196                        spin_lock(&conn->ibc_lock);
3197
3198                        sendnoop = kiblnd_need_noop(conn);
3199                        timedout = kiblnd_conn_timed_out_locked(conn);
3200                        if (!sendnoop && !timedout) {
3201                                spin_unlock(&conn->ibc_lock);
3202                                continue;
3203                        }
3204
3205                        if (timedout) {
3206                                CERROR("Timed out RDMA with %s (%lu): c: %u, oc: %u, rc: %u\n",
3207                                       libcfs_nid2str(peer->ibp_nid),
3208                                       cfs_duration_sec(cfs_time_current() -
3209                                                        peer->ibp_last_alive),
3210                                       conn->ibc_credits,
3211                                       conn->ibc_outstanding_credits,
3212                                       conn->ibc_reserved_credits);
3213                                list_add(&conn->ibc_connd_list, &closes);
3214                        } else {
3215                                list_add(&conn->ibc_connd_list, &checksends);
3216                        }
3217                        /* +ref for 'closes' or 'checksends' */
3218                        kiblnd_conn_addref(conn);
3219
3220                        spin_unlock(&conn->ibc_lock);
3221                }
3222        }
3223
3224        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3225
3226        /*
3227         * Handle timeout by closing the whole
3228         * connection. We can only be sure RDMA activity
3229         * has ceased once the QP has been modified.
3230         */
3231        list_for_each_entry_safe(conn, tmp, &closes, ibc_connd_list) {
3232                list_del(&conn->ibc_connd_list);
3233                kiblnd_close_conn(conn, -ETIMEDOUT);
3234                kiblnd_conn_decref(conn);
3235        }
3236
3237        /*
3238         * In case we have enough credits to return via a
3239         * NOOP, but there were no non-blocking tx descs
3240         * free to do it last time...
3241         */
3242        list_for_each_entry_safe(conn, temp, &checksends, ibc_connd_list) {
3243                list_del(&conn->ibc_connd_list);
3244
3245                spin_lock(&conn->ibc_lock);
3246                kiblnd_check_sends_locked(conn);
3247                spin_unlock(&conn->ibc_lock);
3248
3249                kiblnd_conn_decref(conn);
3250        }
3251}
3252
3253static void
3254kiblnd_disconnect_conn(struct kib_conn *conn)
3255{
3256        LASSERT(!in_interrupt());
3257        LASSERT(current == kiblnd_data.kib_connd);
3258        LASSERT(conn->ibc_state == IBLND_CONN_CLOSING);
3259
3260        rdma_disconnect(conn->ibc_cmid);
3261        kiblnd_finalise_conn(conn);
3262
3263        kiblnd_peer_notify(conn->ibc_peer);
3264}
3265
3266/**
3267 * High-water for reconnection to the same peer, reconnection attempt should
3268 * be delayed after trying more than KIB_RECONN_HIGH_RACE.
3269 */
3270#define KIB_RECONN_HIGH_RACE    10
3271/**
3272 * Allow connd to take a break and handle other things after consecutive
3273 * reconnection attempts.
3274 */
3275#define KIB_RECONN_BREAK        100
3276
3277int
3278kiblnd_connd(void *arg)
3279{
3280        spinlock_t *lock = &kiblnd_data.kib_connd_lock;
3281        wait_queue_entry_t wait;
3282        unsigned long flags;
3283        struct kib_conn *conn;
3284        int timeout;
3285        int i;
3286        int dropped_lock;
3287        int peer_index = 0;
3288        unsigned long deadline = jiffies;
3289
3290        cfs_block_allsigs();
3291
3292        init_waitqueue_entry(&wait, current);
3293        kiblnd_data.kib_connd = current;
3294
3295        spin_lock_irqsave(lock, flags);
3296
3297        while (!kiblnd_data.kib_shutdown) {
3298                int reconn = 0;
3299
3300                dropped_lock = 0;
3301
3302                if (!list_empty(&kiblnd_data.kib_connd_zombies)) {
3303                        struct kib_peer *peer = NULL;
3304
3305                        conn = list_entry(kiblnd_data.kib_connd_zombies.next,
3306                                          struct kib_conn, ibc_list);
3307                        list_del(&conn->ibc_list);
3308                        if (conn->ibc_reconnect) {
3309                                peer = conn->ibc_peer;
3310                                kiblnd_peer_addref(peer);
3311                        }
3312
3313                        spin_unlock_irqrestore(lock, flags);
3314                        dropped_lock = 1;
3315
3316                        kiblnd_destroy_conn(conn, !peer);
3317
3318                        spin_lock_irqsave(lock, flags);
3319                        if (!peer)
3320                                continue;
3321
3322                        conn->ibc_peer = peer;
3323                        if (peer->ibp_reconnected < KIB_RECONN_HIGH_RACE)
3324                                list_add_tail(&conn->ibc_list,
3325                                              &kiblnd_data.kib_reconn_list);
3326                        else
3327                                list_add_tail(&conn->ibc_list,
3328                                              &kiblnd_data.kib_reconn_wait);
3329                }
3330
3331                if (!list_empty(&kiblnd_data.kib_connd_conns)) {
3332                        conn = list_entry(kiblnd_data.kib_connd_conns.next,
3333                                          struct kib_conn, ibc_list);
3334                        list_del(&conn->ibc_list);
3335
3336                        spin_unlock_irqrestore(lock, flags);
3337                        dropped_lock = 1;
3338
3339                        kiblnd_disconnect_conn(conn);
3340                        kiblnd_conn_decref(conn);
3341
3342                        spin_lock_irqsave(lock, flags);
3343                }
3344
3345                while (reconn < KIB_RECONN_BREAK) {
3346                        if (kiblnd_data.kib_reconn_sec !=
3347                            ktime_get_real_seconds()) {
3348                                kiblnd_data.kib_reconn_sec = ktime_get_real_seconds();
3349                                list_splice_init(&kiblnd_data.kib_reconn_wait,
3350                                                 &kiblnd_data.kib_reconn_list);
3351                        }
3352
3353                        if (list_empty(&kiblnd_data.kib_reconn_list))
3354                                break;
3355
3356                        conn = list_entry(kiblnd_data.kib_reconn_list.next,
3357                                          struct kib_conn, ibc_list);
3358                        list_del(&conn->ibc_list);
3359
3360                        spin_unlock_irqrestore(lock, flags);
3361                        dropped_lock = 1;
3362
3363                        reconn += kiblnd_reconnect_peer(conn->ibc_peer);
3364                        kiblnd_peer_decref(conn->ibc_peer);
3365                        LIBCFS_FREE(conn, sizeof(*conn));
3366
3367                        spin_lock_irqsave(lock, flags);
3368                }
3369
3370                /* careful with the jiffy wrap... */
3371                timeout = (int)(deadline - jiffies);
3372                if (timeout <= 0) {
3373                        const int n = 4;
3374                        const int p = 1;
3375                        int chunk = kiblnd_data.kib_peer_hash_size;
3376
3377                        spin_unlock_irqrestore(lock, flags);
3378                        dropped_lock = 1;
3379
3380                        /*
3381                         * Time to check for RDMA timeouts on a few more
3382                         * peers: I do checks every 'p' seconds on a
3383                         * proportion of the peer table and I need to check
3384                         * every connection 'n' times within a timeout
3385                         * interval, to ensure I detect a timeout on any
3386                         * connection within (n+1)/n times the timeout
3387                         * interval.
3388                         */
3389                        if (*kiblnd_tunables.kib_timeout > n * p)
3390                                chunk = (chunk * n * p) /
3391                                        *kiblnd_tunables.kib_timeout;
3392                        if (!chunk)
3393                                chunk = 1;
3394
3395                        for (i = 0; i < chunk; i++) {
3396                                kiblnd_check_conns(peer_index);
3397                                peer_index = (peer_index + 1) %
3398                                             kiblnd_data.kib_peer_hash_size;
3399                        }
3400
3401                        deadline += msecs_to_jiffies(p * MSEC_PER_SEC);
3402                        spin_lock_irqsave(lock, flags);
3403                }
3404
3405                if (dropped_lock)
3406                        continue;
3407
3408                /* Nothing to do for 'timeout'  */
3409                set_current_state(TASK_INTERRUPTIBLE);
3410                add_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3411                spin_unlock_irqrestore(lock, flags);
3412
3413                schedule_timeout(timeout);
3414
3415                remove_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3416                spin_lock_irqsave(lock, flags);
3417        }
3418
3419        spin_unlock_irqrestore(lock, flags);
3420
3421        kiblnd_thread_fini();
3422        return 0;
3423}
3424
3425void
3426kiblnd_qp_event(struct ib_event *event, void *arg)
3427{
3428        struct kib_conn *conn = arg;
3429
3430        switch (event->event) {
3431        case IB_EVENT_COMM_EST:
3432                CDEBUG(D_NET, "%s established\n",
3433                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
3434                /*
3435                 * We received a packet but connection isn't established
3436                 * probably handshake packet was lost, so free to
3437                 * force make connection established
3438                 */
3439                rdma_notify(conn->ibc_cmid, IB_EVENT_COMM_EST);
3440                return;
3441
3442        default:
3443                CERROR("%s: Async QP event type %d\n",
3444                       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3445                return;
3446        }
3447}
3448
3449static void
3450kiblnd_complete(struct ib_wc *wc)
3451{
3452        switch (kiblnd_wreqid2type(wc->wr_id)) {
3453        default:
3454                LBUG();
3455
3456        case IBLND_WID_MR:
3457                if (wc->status != IB_WC_SUCCESS &&
3458                    wc->status != IB_WC_WR_FLUSH_ERR)
3459                        CNETERR("FastReg failed: %d\n", wc->status);
3460                break;
3461
3462        case IBLND_WID_RDMA:
3463                /*
3464                 * We only get RDMA completion notification if it fails.  All
3465                 * subsequent work items, including the final SEND will fail
3466                 * too.  However we can't print out any more info about the
3467                 * failing RDMA because 'tx' might be back on the idle list or
3468                 * even reused already if we didn't manage to post all our work
3469                 * items
3470                 */
3471                CNETERR("RDMA (tx: %p) failed: %d\n",
3472                        kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3473                return;
3474
3475        case IBLND_WID_TX:
3476                kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3477                return;
3478
3479        case IBLND_WID_RX:
3480                kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3481                                   wc->byte_len);
3482                return;
3483        }
3484}
3485
3486void
3487kiblnd_cq_completion(struct ib_cq *cq, void *arg)
3488{
3489        /*
3490         * NB I'm not allowed to schedule this conn once its refcount has
3491         * reached 0.  Since fundamentally I'm racing with scheduler threads
3492         * consuming my CQ I could be called after all completions have
3493         * occurred.  But in this case, !ibc_nrx && !ibc_nsends_posted
3494         * and this CQ is about to be destroyed so I NOOP.
3495         */
3496        struct kib_conn *conn = arg;
3497        struct kib_sched_info *sched = conn->ibc_sched;
3498        unsigned long flags;
3499
3500        LASSERT(cq == conn->ibc_cq);
3501
3502        spin_lock_irqsave(&sched->ibs_lock, flags);
3503
3504        conn->ibc_ready = 1;
3505
3506        if (!conn->ibc_scheduled &&
3507            (conn->ibc_nrx > 0 ||
3508             conn->ibc_nsends_posted > 0)) {
3509                kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3510                conn->ibc_scheduled = 1;
3511                list_add_tail(&conn->ibc_sched_list, &sched->ibs_conns);
3512
3513                if (waitqueue_active(&sched->ibs_waitq))
3514                        wake_up(&sched->ibs_waitq);
3515        }
3516
3517        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3518}
3519
3520void
3521kiblnd_cq_event(struct ib_event *event, void *arg)
3522{
3523        struct kib_conn *conn = arg;
3524
3525        CERROR("%s: async CQ event type %d\n",
3526               libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3527}
3528
3529int
3530kiblnd_scheduler(void *arg)
3531{
3532        long id = (long)arg;
3533        struct kib_sched_info *sched;
3534        struct kib_conn *conn;
3535        wait_queue_entry_t wait;
3536        unsigned long flags;
3537        struct ib_wc wc;
3538        int did_something;
3539        int busy_loops = 0;
3540        int rc;
3541
3542        cfs_block_allsigs();
3543
3544        init_waitqueue_entry(&wait, current);
3545
3546        sched = kiblnd_data.kib_scheds[KIB_THREAD_CPT(id)];
3547
3548        rc = cfs_cpt_bind(lnet_cpt_table(), sched->ibs_cpt);
3549        if (rc) {
3550                CWARN("Unable to bind on CPU partition %d, please verify whether all CPUs are healthy and reload modules if necessary, otherwise your system might under risk of low performance\n",
3551                      sched->ibs_cpt);
3552        }
3553
3554        spin_lock_irqsave(&sched->ibs_lock, flags);
3555
3556        while (!kiblnd_data.kib_shutdown) {
3557                if (busy_loops++ >= IBLND_RESCHED) {
3558                        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3559
3560                        cond_resched();
3561                        busy_loops = 0;
3562
3563                        spin_lock_irqsave(&sched->ibs_lock, flags);
3564                }
3565
3566                did_something = 0;
3567
3568                if (!list_empty(&sched->ibs_conns)) {
3569                        conn = list_entry(sched->ibs_conns.next, struct kib_conn,
3570                                          ibc_sched_list);
3571                        /* take over kib_sched_conns' ref on conn... */
3572                        LASSERT(conn->ibc_scheduled);
3573                        list_del(&conn->ibc_sched_list);
3574                        conn->ibc_ready = 0;
3575
3576                        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3577
3578                        wc.wr_id = IBLND_WID_INVAL;
3579
3580                        rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3581                        if (!rc) {
3582                                rc = ib_req_notify_cq(conn->ibc_cq,
3583                                                      IB_CQ_NEXT_COMP);
3584                                if (rc < 0) {
3585                                        CWARN("%s: ib_req_notify_cq failed: %d, closing connection\n",
3586                                              libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3587                                        kiblnd_close_conn(conn, -EIO);
3588                                        kiblnd_conn_decref(conn);
3589                                        spin_lock_irqsave(&sched->ibs_lock,
3590                                                          flags);
3591                                        continue;
3592                                }
3593
3594                                rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3595                        }
3596
3597                        if (unlikely(rc > 0 && wc.wr_id == IBLND_WID_INVAL)) {
3598                                LCONSOLE_ERROR("ib_poll_cq (rc: %d) returned invalid wr_id, opcode %d, status: %d, vendor_err: %d, conn: %s status: %d\nplease upgrade firmware and OFED or contact vendor.\n",
3599                                               rc, wc.opcode, wc.status,
3600                                               wc.vendor_err,
3601                                               libcfs_nid2str(conn->ibc_peer->ibp_nid),
3602                                               conn->ibc_state);
3603                                rc = -EINVAL;
3604                        }
3605
3606                        if (rc < 0) {
3607                                CWARN("%s: ib_poll_cq failed: %d, closing connection\n",
3608                                      libcfs_nid2str(conn->ibc_peer->ibp_nid),
3609                                      rc);
3610                                kiblnd_close_conn(conn, -EIO);
3611                                kiblnd_conn_decref(conn);
3612                                spin_lock_irqsave(&sched->ibs_lock, flags);
3613                                continue;
3614                        }
3615
3616                        spin_lock_irqsave(&sched->ibs_lock, flags);
3617
3618                        if (rc || conn->ibc_ready) {
3619                                /*
3620                                 * There may be another completion waiting; get
3621                                 * another scheduler to check while I handle
3622                                 * this one...
3623                                 */
3624                                /* +1 ref for sched_conns */
3625                                kiblnd_conn_addref(conn);
3626                                list_add_tail(&conn->ibc_sched_list,
3627                                              &sched->ibs_conns);
3628                                if (waitqueue_active(&sched->ibs_waitq))
3629                                        wake_up(&sched->ibs_waitq);
3630                        } else {
3631                                conn->ibc_scheduled = 0;
3632                        }
3633
3634                        if (rc) {
3635                                spin_unlock_irqrestore(&sched->ibs_lock, flags);
3636                                kiblnd_complete(&wc);
3637
3638                                spin_lock_irqsave(&sched->ibs_lock, flags);
3639                        }
3640
3641                        kiblnd_conn_decref(conn); /* ...drop my ref from above */
3642                        did_something = 1;
3643                }
3644
3645                if (did_something)
3646                        continue;
3647
3648                set_current_state(TASK_INTERRUPTIBLE);
3649                add_wait_queue_exclusive(&sched->ibs_waitq, &wait);
3650                spin_unlock_irqrestore(&sched->ibs_lock, flags);
3651
3652                schedule();
3653                busy_loops = 0;
3654
3655                remove_wait_queue(&sched->ibs_waitq, &wait);
3656                spin_lock_irqsave(&sched->ibs_lock, flags);
3657        }
3658
3659        spin_unlock_irqrestore(&sched->ibs_lock, flags);
3660
3661        kiblnd_thread_fini();
3662        return 0;
3663}
3664
3665int
3666kiblnd_failover_thread(void *arg)
3667{
3668        rwlock_t *glock = &kiblnd_data.kib_global_lock;
3669        struct kib_dev *dev;
3670        wait_queue_entry_t wait;
3671        unsigned long flags;
3672        int rc;
3673
3674        LASSERT(*kiblnd_tunables.kib_dev_failover);
3675
3676        cfs_block_allsigs();
3677
3678        init_waitqueue_entry(&wait, current);
3679        write_lock_irqsave(glock, flags);
3680
3681        while (!kiblnd_data.kib_shutdown) {
3682                int do_failover = 0;
3683                int long_sleep;
3684
3685                list_for_each_entry(dev, &kiblnd_data.kib_failed_devs,
3686                                    ibd_fail_list) {
3687                        if (time_before(cfs_time_current(),
3688                                        dev->ibd_next_failover))
3689                                continue;
3690                        do_failover = 1;
3691                        break;
3692                }
3693
3694                if (do_failover) {
3695                        list_del_init(&dev->ibd_fail_list);
3696                        dev->ibd_failover = 1;
3697                        write_unlock_irqrestore(glock, flags);
3698
3699                        rc = kiblnd_dev_failover(dev);
3700
3701                        write_lock_irqsave(glock, flags);
3702
3703                        LASSERT(dev->ibd_failover);
3704                        dev->ibd_failover = 0;
3705                        if (rc >= 0) { /* Device is OK or failover succeed */
3706                                dev->ibd_next_failover = cfs_time_shift(3);
3707                                continue;
3708                        }
3709
3710                        /* failed to failover, retry later */
3711                        dev->ibd_next_failover =
3712                                cfs_time_shift(min(dev->ibd_failed_failover, 10));
3713                        if (kiblnd_dev_can_failover(dev)) {
3714                                list_add_tail(&dev->ibd_fail_list,
3715                                              &kiblnd_data.kib_failed_devs);
3716                        }
3717
3718                        continue;
3719                }
3720
3721                /* long sleep if no more pending failover */
3722                long_sleep = list_empty(&kiblnd_data.kib_failed_devs);
3723
3724                set_current_state(TASK_INTERRUPTIBLE);
3725                add_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3726                write_unlock_irqrestore(glock, flags);
3727
3728                rc = schedule_timeout(long_sleep ? cfs_time_seconds(10) :
3729                                                   cfs_time_seconds(1));
3730                remove_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3731                write_lock_irqsave(glock, flags);
3732
3733                if (!long_sleep || rc)
3734                        continue;
3735
3736                /*
3737                 * have a long sleep, routine check all active devices,
3738                 * we need checking like this because if there is not active
3739                 * connection on the dev and no SEND from local, we may listen
3740                 * on wrong HCA for ever while there is a bonding failover
3741                 */
3742                list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3743                        if (kiblnd_dev_can_failover(dev)) {
3744                                list_add_tail(&dev->ibd_fail_list,
3745                                              &kiblnd_data.kib_failed_devs);
3746                        }
3747                }
3748        }
3749
3750        write_unlock_irqrestore(glock, flags);
3751
3752        kiblnd_thread_fini();
3753        return 0;
3754}
3755