linux/drivers/staging/lustre/lnet/klnds/o2iblnd/o2iblnd.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 *
  32 * lnet/klnds/o2iblnd/o2iblnd.c
  33 *
  34 * Author: Eric Barton <eric@bartonsoftware.com>
  35 */
  36
  37#include <asm/div64.h>
  38#include <asm/page.h>
  39#include "o2iblnd.h"
  40
  41static struct lnet_lnd the_o2iblnd;
  42
  43struct kib_data kiblnd_data;
  44
  45static __u32 kiblnd_cksum(void *ptr, int nob)
  46{
  47        char *c = ptr;
  48        __u32 sum = 0;
  49
  50        while (nob-- > 0)
  51                sum = ((sum << 1) | (sum >> 31)) + *c++;
  52
  53        /* ensure I don't return 0 (== no checksum) */
  54        return !sum ? 1 : sum;
  55}
  56
  57static char *kiblnd_msgtype2str(int type)
  58{
  59        switch (type) {
  60        case IBLND_MSG_CONNREQ:
  61                return "CONNREQ";
  62
  63        case IBLND_MSG_CONNACK:
  64                return "CONNACK";
  65
  66        case IBLND_MSG_NOOP:
  67                return "NOOP";
  68
  69        case IBLND_MSG_IMMEDIATE:
  70                return "IMMEDIATE";
  71
  72        case IBLND_MSG_PUT_REQ:
  73                return "PUT_REQ";
  74
  75        case IBLND_MSG_PUT_NAK:
  76                return "PUT_NAK";
  77
  78        case IBLND_MSG_PUT_ACK:
  79                return "PUT_ACK";
  80
  81        case IBLND_MSG_PUT_DONE:
  82                return "PUT_DONE";
  83
  84        case IBLND_MSG_GET_REQ:
  85                return "GET_REQ";
  86
  87        case IBLND_MSG_GET_DONE:
  88                return "GET_DONE";
  89
  90        default:
  91                return "???";
  92        }
  93}
  94
  95static int kiblnd_msgtype2size(int type)
  96{
  97        const int hdr_size = offsetof(struct kib_msg, ibm_u);
  98
  99        switch (type) {
 100        case IBLND_MSG_CONNREQ:
 101        case IBLND_MSG_CONNACK:
 102                return hdr_size + sizeof(struct kib_connparams);
 103
 104        case IBLND_MSG_NOOP:
 105                return hdr_size;
 106
 107        case IBLND_MSG_IMMEDIATE:
 108                return offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[0]);
 109
 110        case IBLND_MSG_PUT_REQ:
 111                return hdr_size + sizeof(struct kib_putreq_msg);
 112
 113        case IBLND_MSG_PUT_ACK:
 114                return hdr_size + sizeof(struct kib_putack_msg);
 115
 116        case IBLND_MSG_GET_REQ:
 117                return hdr_size + sizeof(struct kib_get_msg);
 118
 119        case IBLND_MSG_PUT_NAK:
 120        case IBLND_MSG_PUT_DONE:
 121        case IBLND_MSG_GET_DONE:
 122                return hdr_size + sizeof(struct kib_completion_msg);
 123        default:
 124                return -1;
 125        }
 126}
 127
 128static int kiblnd_unpack_rd(struct kib_msg *msg, int flip)
 129{
 130        struct kib_rdma_desc *rd;
 131        int msg_size;
 132        int nob;
 133        int n;
 134        int i;
 135
 136        LASSERT(msg->ibm_type == IBLND_MSG_GET_REQ ||
 137                msg->ibm_type == IBLND_MSG_PUT_ACK);
 138
 139        rd = msg->ibm_type == IBLND_MSG_GET_REQ ?
 140                              &msg->ibm_u.get.ibgm_rd :
 141                              &msg->ibm_u.putack.ibpam_rd;
 142
 143        if (flip) {
 144                __swab32s(&rd->rd_key);
 145                __swab32s(&rd->rd_nfrags);
 146        }
 147
 148        n = rd->rd_nfrags;
 149
 150        nob = offsetof(struct kib_msg, ibm_u) +
 151              kiblnd_rd_msg_size(rd, msg->ibm_type, n);
 152
 153        if (msg->ibm_nob < nob) {
 154                CERROR("Short %s: %d(%d)\n",
 155                       kiblnd_msgtype2str(msg->ibm_type), msg->ibm_nob, nob);
 156                return 1;
 157        }
 158
 159        msg_size = kiblnd_rd_size(rd);
 160        if (msg_size <= 0 || msg_size > LNET_MAX_PAYLOAD) {
 161                CERROR("Bad msg_size: %d, should be 0 < n <= %d\n",
 162                       msg_size, LNET_MAX_PAYLOAD);
 163                return 1;
 164        }
 165
 166        if (!flip)
 167                return 0;
 168
 169        for (i = 0; i < n; i++) {
 170                __swab32s(&rd->rd_frags[i].rf_nob);
 171                __swab64s(&rd->rd_frags[i].rf_addr);
 172        }
 173
 174        return 0;
 175}
 176
 177void kiblnd_pack_msg(struct lnet_ni *ni, struct kib_msg *msg, int version,
 178                     int credits, lnet_nid_t dstnid, __u64 dststamp)
 179{
 180        struct kib_net *net = ni->ni_data;
 181
 182        /*
 183         * CAVEAT EMPTOR! all message fields not set here should have been
 184         * initialised previously.
 185         */
 186        msg->ibm_magic    = IBLND_MSG_MAGIC;
 187        msg->ibm_version  = version;
 188        /*   ibm_type */
 189        msg->ibm_credits  = credits;
 190        /*   ibm_nob */
 191        msg->ibm_cksum    = 0;
 192        msg->ibm_srcnid   = ni->ni_nid;
 193        msg->ibm_srcstamp = net->ibn_incarnation;
 194        msg->ibm_dstnid   = dstnid;
 195        msg->ibm_dststamp = dststamp;
 196
 197        if (*kiblnd_tunables.kib_cksum) {
 198                /* NB ibm_cksum zero while computing cksum */
 199                msg->ibm_cksum = kiblnd_cksum(msg, msg->ibm_nob);
 200        }
 201}
 202
 203int kiblnd_unpack_msg(struct kib_msg *msg, int nob)
 204{
 205        const int hdr_size = offsetof(struct kib_msg, ibm_u);
 206        __u32 msg_cksum;
 207        __u16 version;
 208        int msg_nob;
 209        int flip;
 210
 211        /* 6 bytes are enough to have received magic + version */
 212        if (nob < 6) {
 213                CERROR("Short message: %d\n", nob);
 214                return -EPROTO;
 215        }
 216
 217        if (msg->ibm_magic == IBLND_MSG_MAGIC) {
 218                flip = 0;
 219        } else if (msg->ibm_magic == __swab32(IBLND_MSG_MAGIC)) {
 220                flip = 1;
 221        } else {
 222                CERROR("Bad magic: %08x\n", msg->ibm_magic);
 223                return -EPROTO;
 224        }
 225
 226        version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
 227        if (version != IBLND_MSG_VERSION &&
 228            version != IBLND_MSG_VERSION_1) {
 229                CERROR("Bad version: %x\n", version);
 230                return -EPROTO;
 231        }
 232
 233        if (nob < hdr_size) {
 234                CERROR("Short message: %d\n", nob);
 235                return -EPROTO;
 236        }
 237
 238        msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
 239        if (msg_nob > nob) {
 240                CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
 241                return -EPROTO;
 242        }
 243
 244        /*
 245         * checksum must be computed with ibm_cksum zero and BEFORE anything
 246         * gets flipped
 247         */
 248        msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
 249        msg->ibm_cksum = 0;
 250        if (msg_cksum &&
 251            msg_cksum != kiblnd_cksum(msg, msg_nob)) {
 252                CERROR("Bad checksum\n");
 253                return -EPROTO;
 254        }
 255
 256        msg->ibm_cksum = msg_cksum;
 257
 258        if (flip) {
 259                /* leave magic unflipped as a clue to peer endianness */
 260                msg->ibm_version = version;
 261                BUILD_BUG_ON(sizeof(msg->ibm_type) != 1);
 262                BUILD_BUG_ON(sizeof(msg->ibm_credits) != 1);
 263                msg->ibm_nob     = msg_nob;
 264                __swab64s(&msg->ibm_srcnid);
 265                __swab64s(&msg->ibm_srcstamp);
 266                __swab64s(&msg->ibm_dstnid);
 267                __swab64s(&msg->ibm_dststamp);
 268        }
 269
 270        if (msg->ibm_srcnid == LNET_NID_ANY) {
 271                CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
 272                return -EPROTO;
 273        }
 274
 275        if (msg_nob < kiblnd_msgtype2size(msg->ibm_type)) {
 276                CERROR("Short %s: %d(%d)\n", kiblnd_msgtype2str(msg->ibm_type),
 277                       msg_nob, kiblnd_msgtype2size(msg->ibm_type));
 278                return -EPROTO;
 279        }
 280
 281        switch (msg->ibm_type) {
 282        default:
 283                CERROR("Unknown message type %x\n", msg->ibm_type);
 284                return -EPROTO;
 285
 286        case IBLND_MSG_NOOP:
 287        case IBLND_MSG_IMMEDIATE:
 288        case IBLND_MSG_PUT_REQ:
 289                break;
 290
 291        case IBLND_MSG_PUT_ACK:
 292        case IBLND_MSG_GET_REQ:
 293                if (kiblnd_unpack_rd(msg, flip))
 294                        return -EPROTO;
 295                break;
 296
 297        case IBLND_MSG_PUT_NAK:
 298        case IBLND_MSG_PUT_DONE:
 299        case IBLND_MSG_GET_DONE:
 300                if (flip)
 301                        __swab32s(&msg->ibm_u.completion.ibcm_status);
 302                break;
 303
 304        case IBLND_MSG_CONNREQ:
 305        case IBLND_MSG_CONNACK:
 306                if (flip) {
 307                        __swab16s(&msg->ibm_u.connparams.ibcp_queue_depth);
 308                        __swab16s(&msg->ibm_u.connparams.ibcp_max_frags);
 309                        __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
 310                }
 311                break;
 312        }
 313        return 0;
 314}
 315
 316int kiblnd_create_peer(struct lnet_ni *ni, struct kib_peer **peerp,
 317                       lnet_nid_t nid)
 318{
 319        struct kib_peer *peer;
 320        struct kib_net *net = ni->ni_data;
 321        int cpt = lnet_cpt_of_nid(nid);
 322        unsigned long flags;
 323
 324        LASSERT(net);
 325        LASSERT(nid != LNET_NID_ANY);
 326
 327        LIBCFS_CPT_ALLOC(peer, lnet_cpt_table(), cpt, sizeof(*peer));
 328        if (!peer) {
 329                CERROR("Cannot allocate peer\n");
 330                return -ENOMEM;
 331        }
 332
 333        peer->ibp_ni = ni;
 334        peer->ibp_nid = nid;
 335        peer->ibp_error = 0;
 336        peer->ibp_last_alive = 0;
 337        peer->ibp_max_frags = kiblnd_cfg_rdma_frags(peer->ibp_ni);
 338        peer->ibp_queue_depth = ni->ni_peertxcredits;
 339        atomic_set(&peer->ibp_refcount, 1);  /* 1 ref for caller */
 340
 341        INIT_LIST_HEAD(&peer->ibp_list);     /* not in the peer table yet */
 342        INIT_LIST_HEAD(&peer->ibp_conns);
 343        INIT_LIST_HEAD(&peer->ibp_tx_queue);
 344
 345        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
 346
 347        /* always called with a ref on ni, which prevents ni being shutdown */
 348        LASSERT(!net->ibn_shutdown);
 349
 350        /* npeers only grows with the global lock held */
 351        atomic_inc(&net->ibn_npeers);
 352
 353        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 354
 355        *peerp = peer;
 356        return 0;
 357}
 358
 359void kiblnd_destroy_peer(struct kib_peer *peer)
 360{
 361        struct kib_net *net = peer->ibp_ni->ni_data;
 362
 363        LASSERT(net);
 364        LASSERT(!atomic_read(&peer->ibp_refcount));
 365        LASSERT(!kiblnd_peer_active(peer));
 366        LASSERT(kiblnd_peer_idle(peer));
 367        LASSERT(list_empty(&peer->ibp_tx_queue));
 368
 369        LIBCFS_FREE(peer, sizeof(*peer));
 370
 371        /*
 372         * NB a peer's connections keep a reference on their peer until
 373         * they are destroyed, so we can be assured that _all_ state to do
 374         * with this peer has been cleaned up when its refcount drops to
 375         * zero.
 376         */
 377        atomic_dec(&net->ibn_npeers);
 378}
 379
 380struct kib_peer *kiblnd_find_peer_locked(lnet_nid_t nid)
 381{
 382        /*
 383         * the caller is responsible for accounting the additional reference
 384         * that this creates
 385         */
 386        struct list_head *peer_list = kiblnd_nid2peerlist(nid);
 387        struct list_head *tmp;
 388        struct kib_peer *peer;
 389
 390        list_for_each(tmp, peer_list) {
 391                peer = list_entry(tmp, struct kib_peer, ibp_list);
 392                LASSERT(!kiblnd_peer_idle(peer));
 393
 394                if (peer->ibp_nid != nid)
 395                        continue;
 396
 397                CDEBUG(D_NET, "got peer [%p] -> %s (%d) version: %x\n",
 398                       peer, libcfs_nid2str(nid),
 399                       atomic_read(&peer->ibp_refcount),
 400                       peer->ibp_version);
 401                return peer;
 402        }
 403        return NULL;
 404}
 405
 406void kiblnd_unlink_peer_locked(struct kib_peer *peer)
 407{
 408        LASSERT(list_empty(&peer->ibp_conns));
 409
 410        LASSERT(kiblnd_peer_active(peer));
 411        list_del_init(&peer->ibp_list);
 412        /* lose peerlist's ref */
 413        kiblnd_peer_decref(peer);
 414}
 415
 416static int kiblnd_get_peer_info(struct lnet_ni *ni, int index,
 417                                lnet_nid_t *nidp, int *count)
 418{
 419        struct kib_peer *peer;
 420        struct list_head *ptmp;
 421        int i;
 422        unsigned long flags;
 423
 424        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
 425
 426        for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
 427                list_for_each(ptmp, &kiblnd_data.kib_peers[i]) {
 428                        peer = list_entry(ptmp, struct kib_peer, ibp_list);
 429                        LASSERT(!kiblnd_peer_idle(peer));
 430
 431                        if (peer->ibp_ni != ni)
 432                                continue;
 433
 434                        if (index-- > 0)
 435                                continue;
 436
 437                        *nidp = peer->ibp_nid;
 438                        *count = atomic_read(&peer->ibp_refcount);
 439
 440                        read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
 441                                               flags);
 442                        return 0;
 443                }
 444        }
 445
 446        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 447        return -ENOENT;
 448}
 449
 450static void kiblnd_del_peer_locked(struct kib_peer *peer)
 451{
 452        struct list_head *ctmp;
 453        struct list_head *cnxt;
 454        struct kib_conn *conn;
 455
 456        if (list_empty(&peer->ibp_conns)) {
 457                kiblnd_unlink_peer_locked(peer);
 458        } else {
 459                list_for_each_safe(ctmp, cnxt, &peer->ibp_conns) {
 460                        conn = list_entry(ctmp, struct kib_conn, ibc_list);
 461
 462                        kiblnd_close_conn_locked(conn, 0);
 463                }
 464                /* NB closing peer's last conn unlinked it. */
 465        }
 466        /*
 467         * NB peer now unlinked; might even be freed if the peer table had the
 468         * last ref on it.
 469         */
 470}
 471
 472static int kiblnd_del_peer(struct lnet_ni *ni, lnet_nid_t nid)
 473{
 474        LIST_HEAD(zombies);
 475        struct list_head *ptmp;
 476        struct list_head *pnxt;
 477        struct kib_peer *peer;
 478        int lo;
 479        int hi;
 480        int i;
 481        unsigned long flags;
 482        int rc = -ENOENT;
 483
 484        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
 485
 486        if (nid != LNET_NID_ANY) {
 487                lo = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
 488                hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
 489        } else {
 490                lo = 0;
 491                hi = kiblnd_data.kib_peer_hash_size - 1;
 492        }
 493
 494        for (i = lo; i <= hi; i++) {
 495                list_for_each_safe(ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
 496                        peer = list_entry(ptmp, struct kib_peer, ibp_list);
 497                        LASSERT(!kiblnd_peer_idle(peer));
 498
 499                        if (peer->ibp_ni != ni)
 500                                continue;
 501
 502                        if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
 503                                continue;
 504
 505                        if (!list_empty(&peer->ibp_tx_queue)) {
 506                                LASSERT(list_empty(&peer->ibp_conns));
 507
 508                                list_splice_init(&peer->ibp_tx_queue,
 509                                                 &zombies);
 510                        }
 511
 512                        kiblnd_del_peer_locked(peer);
 513                        rc = 0;  /* matched something */
 514                }
 515        }
 516
 517        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 518
 519        kiblnd_txlist_done(ni, &zombies, -EIO);
 520
 521        return rc;
 522}
 523
 524static struct kib_conn *kiblnd_get_conn_by_idx(struct lnet_ni *ni, int index)
 525{
 526        struct kib_peer *peer;
 527        struct list_head *ptmp;
 528        struct kib_conn *conn;
 529        struct list_head *ctmp;
 530        int i;
 531        unsigned long flags;
 532
 533        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
 534
 535        for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
 536                list_for_each(ptmp, &kiblnd_data.kib_peers[i]) {
 537                        peer = list_entry(ptmp, struct kib_peer, ibp_list);
 538                        LASSERT(!kiblnd_peer_idle(peer));
 539
 540                        if (peer->ibp_ni != ni)
 541                                continue;
 542
 543                        list_for_each(ctmp, &peer->ibp_conns) {
 544                                if (index-- > 0)
 545                                        continue;
 546
 547                                conn = list_entry(ctmp, struct kib_conn,
 548                                                  ibc_list);
 549                                kiblnd_conn_addref(conn);
 550                                read_unlock_irqrestore(
 551                                        &kiblnd_data.kib_global_lock,
 552                                        flags);
 553                                return conn;
 554                        }
 555                }
 556        }
 557
 558        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 559        return NULL;
 560}
 561
 562int kiblnd_translate_mtu(int value)
 563{
 564        switch (value) {
 565        default:
 566                return -1;
 567        case 0:
 568                return 0;
 569        case 256:
 570                return IB_MTU_256;
 571        case 512:
 572                return IB_MTU_512;
 573        case 1024:
 574                return IB_MTU_1024;
 575        case 2048:
 576                return IB_MTU_2048;
 577        case 4096:
 578                return IB_MTU_4096;
 579        }
 580}
 581
 582static void kiblnd_setup_mtu_locked(struct rdma_cm_id *cmid)
 583{
 584        int mtu;
 585
 586        /* XXX There is no path record for iWARP, set by netdev->change_mtu? */
 587        if (!cmid->route.path_rec)
 588                return;
 589
 590        mtu = kiblnd_translate_mtu(*kiblnd_tunables.kib_ib_mtu);
 591        LASSERT(mtu >= 0);
 592        if (mtu)
 593                cmid->route.path_rec->mtu = mtu;
 594}
 595
 596static int kiblnd_get_completion_vector(struct kib_conn *conn, int cpt)
 597{
 598        cpumask_t *mask;
 599        int vectors;
 600        int off;
 601        int i;
 602        lnet_nid_t nid = conn->ibc_peer->ibp_nid;
 603
 604        vectors = conn->ibc_cmid->device->num_comp_vectors;
 605        if (vectors <= 1)
 606                return 0;
 607
 608        mask = cfs_cpt_cpumask(lnet_cpt_table(), cpt);
 609        if (!mask)
 610                return 0;
 611
 612        /* hash NID to CPU id in this partition... */
 613        off = do_div(nid, cpumask_weight(mask));
 614        for_each_cpu(i, mask) {
 615                if (!off--)
 616                        return i % vectors;
 617        }
 618
 619        LBUG();
 620        return 1;
 621}
 622
 623struct kib_conn *kiblnd_create_conn(struct kib_peer *peer, struct rdma_cm_id *cmid,
 624                                    int state, int version)
 625{
 626        /*
 627         * CAVEAT EMPTOR:
 628         * If the new conn is created successfully it takes over the caller's
 629         * ref on 'peer'.  It also "owns" 'cmid' and destroys it when it itself
 630         * is destroyed.  On failure, the caller's ref on 'peer' remains and
 631         * she must dispose of 'cmid'.  (Actually I'd block forever if I tried
 632         * to destroy 'cmid' here since I'm called from the CM which still has
 633         * its ref on 'cmid').
 634         */
 635        rwlock_t *glock = &kiblnd_data.kib_global_lock;
 636        struct kib_net *net = peer->ibp_ni->ni_data;
 637        struct kib_dev *dev;
 638        struct ib_qp_init_attr *init_qp_attr;
 639        struct kib_sched_info *sched;
 640        struct ib_cq_init_attr cq_attr = {};
 641        struct kib_conn *conn;
 642        struct ib_cq *cq;
 643        unsigned long flags;
 644        int cpt;
 645        int rc;
 646        int i;
 647
 648        LASSERT(net);
 649        LASSERT(!in_interrupt());
 650
 651        dev = net->ibn_dev;
 652
 653        cpt = lnet_cpt_of_nid(peer->ibp_nid);
 654        sched = kiblnd_data.kib_scheds[cpt];
 655
 656        LASSERT(sched->ibs_nthreads > 0);
 657
 658        LIBCFS_CPT_ALLOC(init_qp_attr, lnet_cpt_table(), cpt,
 659                         sizeof(*init_qp_attr));
 660        if (!init_qp_attr) {
 661                CERROR("Can't allocate qp_attr for %s\n",
 662                       libcfs_nid2str(peer->ibp_nid));
 663                goto failed_0;
 664        }
 665
 666        LIBCFS_CPT_ALLOC(conn, lnet_cpt_table(), cpt, sizeof(*conn));
 667        if (!conn) {
 668                CERROR("Can't allocate connection for %s\n",
 669                       libcfs_nid2str(peer->ibp_nid));
 670                goto failed_1;
 671        }
 672
 673        conn->ibc_state = IBLND_CONN_INIT;
 674        conn->ibc_version = version;
 675        conn->ibc_peer = peer;            /* I take the caller's ref */
 676        cmid->context = conn;              /* for future CM callbacks */
 677        conn->ibc_cmid = cmid;
 678        conn->ibc_max_frags = peer->ibp_max_frags;
 679        conn->ibc_queue_depth = peer->ibp_queue_depth;
 680
 681        INIT_LIST_HEAD(&conn->ibc_early_rxs);
 682        INIT_LIST_HEAD(&conn->ibc_tx_noops);
 683        INIT_LIST_HEAD(&conn->ibc_tx_queue);
 684        INIT_LIST_HEAD(&conn->ibc_tx_queue_rsrvd);
 685        INIT_LIST_HEAD(&conn->ibc_tx_queue_nocred);
 686        INIT_LIST_HEAD(&conn->ibc_active_txs);
 687        spin_lock_init(&conn->ibc_lock);
 688
 689        LIBCFS_CPT_ALLOC(conn->ibc_connvars, lnet_cpt_table(), cpt,
 690                         sizeof(*conn->ibc_connvars));
 691        if (!conn->ibc_connvars) {
 692                CERROR("Can't allocate in-progress connection state\n");
 693                goto failed_2;
 694        }
 695
 696        write_lock_irqsave(glock, flags);
 697        if (dev->ibd_failover) {
 698                write_unlock_irqrestore(glock, flags);
 699                CERROR("%s: failover in progress\n", dev->ibd_ifname);
 700                goto failed_2;
 701        }
 702
 703        if (dev->ibd_hdev->ibh_ibdev != cmid->device) {
 704                /* wakeup failover thread and teardown connection */
 705                if (kiblnd_dev_can_failover(dev)) {
 706                        list_add_tail(&dev->ibd_fail_list,
 707                                      &kiblnd_data.kib_failed_devs);
 708                        wake_up(&kiblnd_data.kib_failover_waitq);
 709                }
 710
 711                write_unlock_irqrestore(glock, flags);
 712                CERROR("cmid HCA(%s), kib_dev(%s) need failover\n",
 713                       cmid->device->name, dev->ibd_ifname);
 714                goto failed_2;
 715        }
 716
 717        kiblnd_hdev_addref_locked(dev->ibd_hdev);
 718        conn->ibc_hdev = dev->ibd_hdev;
 719
 720        kiblnd_setup_mtu_locked(cmid);
 721
 722        write_unlock_irqrestore(glock, flags);
 723
 724        LIBCFS_CPT_ALLOC(conn->ibc_rxs, lnet_cpt_table(), cpt,
 725                         IBLND_RX_MSGS(conn) * sizeof(struct kib_rx));
 726        if (!conn->ibc_rxs) {
 727                CERROR("Cannot allocate RX buffers\n");
 728                goto failed_2;
 729        }
 730
 731        rc = kiblnd_alloc_pages(&conn->ibc_rx_pages, cpt,
 732                                IBLND_RX_MSG_PAGES(conn));
 733        if (rc)
 734                goto failed_2;
 735
 736        kiblnd_map_rx_descs(conn);
 737
 738        cq_attr.cqe = IBLND_CQ_ENTRIES(conn);
 739        cq_attr.comp_vector = kiblnd_get_completion_vector(conn, cpt);
 740        cq = ib_create_cq(cmid->device,
 741                          kiblnd_cq_completion, kiblnd_cq_event, conn,
 742                          &cq_attr);
 743        if (IS_ERR(cq)) {
 744                CERROR("Failed to create CQ with %d CQEs: %ld\n",
 745                       IBLND_CQ_ENTRIES(conn), PTR_ERR(cq));
 746                goto failed_2;
 747        }
 748
 749        conn->ibc_cq = cq;
 750
 751        rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
 752        if (rc) {
 753                CERROR("Can't request completion notification: %d\n", rc);
 754                goto failed_2;
 755        }
 756
 757        init_qp_attr->event_handler = kiblnd_qp_event;
 758        init_qp_attr->qp_context = conn;
 759        init_qp_attr->cap.max_send_wr = IBLND_SEND_WRS(conn);
 760        init_qp_attr->cap.max_recv_wr = IBLND_RECV_WRS(conn);
 761        init_qp_attr->cap.max_send_sge = 1;
 762        init_qp_attr->cap.max_recv_sge = 1;
 763        init_qp_attr->sq_sig_type = IB_SIGNAL_REQ_WR;
 764        init_qp_attr->qp_type = IB_QPT_RC;
 765        init_qp_attr->send_cq = cq;
 766        init_qp_attr->recv_cq = cq;
 767
 768        conn->ibc_sched = sched;
 769
 770        rc = rdma_create_qp(cmid, conn->ibc_hdev->ibh_pd, init_qp_attr);
 771        if (rc) {
 772                CERROR("Can't create QP: %d, send_wr: %d, recv_wr: %d\n",
 773                       rc, init_qp_attr->cap.max_send_wr,
 774                       init_qp_attr->cap.max_recv_wr);
 775                goto failed_2;
 776        }
 777
 778        LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
 779
 780        /* 1 ref for caller and each rxmsg */
 781        atomic_set(&conn->ibc_refcount, 1 + IBLND_RX_MSGS(conn));
 782        conn->ibc_nrx = IBLND_RX_MSGS(conn);
 783
 784        /* post receives */
 785        for (i = 0; i < IBLND_RX_MSGS(conn); i++) {
 786                rc = kiblnd_post_rx(&conn->ibc_rxs[i],
 787                                    IBLND_POSTRX_NO_CREDIT);
 788                if (rc) {
 789                        CERROR("Can't post rxmsg: %d\n", rc);
 790
 791                        /* Make posted receives complete */
 792                        kiblnd_abort_receives(conn);
 793
 794                        /*
 795                         * correct # of posted buffers
 796                         * NB locking needed now I'm racing with completion
 797                         */
 798                        spin_lock_irqsave(&sched->ibs_lock, flags);
 799                        conn->ibc_nrx -= IBLND_RX_MSGS(conn) - i;
 800                        spin_unlock_irqrestore(&sched->ibs_lock, flags);
 801
 802                        /*
 803                         * cmid will be destroyed by CM(ofed) after cm_callback
 804                         * returned, so we can't refer it anymore
 805                         * (by kiblnd_connd()->kiblnd_destroy_conn)
 806                         */
 807                        rdma_destroy_qp(conn->ibc_cmid);
 808                        conn->ibc_cmid = NULL;
 809
 810                        /* Drop my own and unused rxbuffer refcounts */
 811                        while (i++ <= IBLND_RX_MSGS(conn))
 812                                kiblnd_conn_decref(conn);
 813
 814                        return NULL;
 815                }
 816        }
 817
 818        /* Init successful! */
 819        LASSERT(state == IBLND_CONN_ACTIVE_CONNECT ||
 820                state == IBLND_CONN_PASSIVE_WAIT);
 821        conn->ibc_state = state;
 822
 823        /* 1 more conn */
 824        atomic_inc(&net->ibn_nconns);
 825        return conn;
 826
 827 failed_2:
 828        kiblnd_destroy_conn(conn, true);
 829 failed_1:
 830        LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
 831 failed_0:
 832        return NULL;
 833}
 834
 835void kiblnd_destroy_conn(struct kib_conn *conn, bool free_conn)
 836{
 837        struct rdma_cm_id *cmid = conn->ibc_cmid;
 838        struct kib_peer *peer = conn->ibc_peer;
 839        int rc;
 840
 841        LASSERT(!in_interrupt());
 842        LASSERT(!atomic_read(&conn->ibc_refcount));
 843        LASSERT(list_empty(&conn->ibc_early_rxs));
 844        LASSERT(list_empty(&conn->ibc_tx_noops));
 845        LASSERT(list_empty(&conn->ibc_tx_queue));
 846        LASSERT(list_empty(&conn->ibc_tx_queue_rsrvd));
 847        LASSERT(list_empty(&conn->ibc_tx_queue_nocred));
 848        LASSERT(list_empty(&conn->ibc_active_txs));
 849        LASSERT(!conn->ibc_noops_posted);
 850        LASSERT(!conn->ibc_nsends_posted);
 851
 852        switch (conn->ibc_state) {
 853        default:
 854                /* conn must be completely disengaged from the network */
 855                LBUG();
 856
 857        case IBLND_CONN_DISCONNECTED:
 858                /* connvars should have been freed already */
 859                LASSERT(!conn->ibc_connvars);
 860                break;
 861
 862        case IBLND_CONN_INIT:
 863                break;
 864        }
 865
 866        /* conn->ibc_cmid might be destroyed by CM already */
 867        if (cmid && cmid->qp)
 868                rdma_destroy_qp(cmid);
 869
 870        if (conn->ibc_cq) {
 871                rc = ib_destroy_cq(conn->ibc_cq);
 872                if (rc)
 873                        CWARN("Error destroying CQ: %d\n", rc);
 874        }
 875
 876        if (conn->ibc_rx_pages)
 877                kiblnd_unmap_rx_descs(conn);
 878
 879        if (conn->ibc_rxs) {
 880                LIBCFS_FREE(conn->ibc_rxs,
 881                            IBLND_RX_MSGS(conn) * sizeof(struct kib_rx));
 882        }
 883
 884        if (conn->ibc_connvars)
 885                LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
 886
 887        if (conn->ibc_hdev)
 888                kiblnd_hdev_decref(conn->ibc_hdev);
 889
 890        /* See CAVEAT EMPTOR above in kiblnd_create_conn */
 891        if (conn->ibc_state != IBLND_CONN_INIT) {
 892                struct kib_net *net = peer->ibp_ni->ni_data;
 893
 894                kiblnd_peer_decref(peer);
 895                rdma_destroy_id(cmid);
 896                atomic_dec(&net->ibn_nconns);
 897        }
 898
 899        LIBCFS_FREE(conn, sizeof(*conn));
 900}
 901
 902int kiblnd_close_peer_conns_locked(struct kib_peer *peer, int why)
 903{
 904        struct kib_conn *conn;
 905        struct list_head *ctmp;
 906        struct list_head *cnxt;
 907        int count = 0;
 908
 909        list_for_each_safe(ctmp, cnxt, &peer->ibp_conns) {
 910                conn = list_entry(ctmp, struct kib_conn, ibc_list);
 911
 912                CDEBUG(D_NET, "Closing conn -> %s, version: %x, reason: %d\n",
 913                       libcfs_nid2str(peer->ibp_nid),
 914                       conn->ibc_version, why);
 915
 916                kiblnd_close_conn_locked(conn, why);
 917                count++;
 918        }
 919
 920        return count;
 921}
 922
 923int kiblnd_close_stale_conns_locked(struct kib_peer *peer,
 924                                    int version, __u64 incarnation)
 925{
 926        struct kib_conn *conn;
 927        struct list_head *ctmp;
 928        struct list_head *cnxt;
 929        int count = 0;
 930
 931        list_for_each_safe(ctmp, cnxt, &peer->ibp_conns) {
 932                conn = list_entry(ctmp, struct kib_conn, ibc_list);
 933
 934                if (conn->ibc_version     == version &&
 935                    conn->ibc_incarnation == incarnation)
 936                        continue;
 937
 938                CDEBUG(D_NET,
 939                       "Closing stale conn -> %s version: %x, incarnation:%#llx(%x, %#llx)\n",
 940                       libcfs_nid2str(peer->ibp_nid),
 941                       conn->ibc_version, conn->ibc_incarnation,
 942                       version, incarnation);
 943
 944                kiblnd_close_conn_locked(conn, -ESTALE);
 945                count++;
 946        }
 947
 948        return count;
 949}
 950
 951static int kiblnd_close_matching_conns(struct lnet_ni *ni, lnet_nid_t nid)
 952{
 953        struct kib_peer *peer;
 954        struct list_head *ptmp;
 955        struct list_head *pnxt;
 956        int lo;
 957        int hi;
 958        int i;
 959        unsigned long flags;
 960        int count = 0;
 961
 962        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
 963
 964        if (nid != LNET_NID_ANY) {
 965                lo = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
 966                hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
 967        } else {
 968                lo = 0;
 969                hi = kiblnd_data.kib_peer_hash_size - 1;
 970        }
 971
 972        for (i = lo; i <= hi; i++) {
 973                list_for_each_safe(ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
 974                        peer = list_entry(ptmp, struct kib_peer, ibp_list);
 975                        LASSERT(!kiblnd_peer_idle(peer));
 976
 977                        if (peer->ibp_ni != ni)
 978                                continue;
 979
 980                        if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
 981                                continue;
 982
 983                        count += kiblnd_close_peer_conns_locked(peer, 0);
 984                }
 985        }
 986
 987        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 988
 989        /* wildcards always succeed */
 990        if (nid == LNET_NID_ANY)
 991                return 0;
 992
 993        return !count ? -ENOENT : 0;
 994}
 995
 996static int kiblnd_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
 997{
 998        struct libcfs_ioctl_data *data = arg;
 999        int rc = -EINVAL;
1000
1001        switch (cmd) {
1002        case IOC_LIBCFS_GET_PEER: {
1003                lnet_nid_t nid = 0;
1004                int count = 0;
1005
1006                rc = kiblnd_get_peer_info(ni, data->ioc_count,
1007                                          &nid, &count);
1008                data->ioc_nid   = nid;
1009                data->ioc_count = count;
1010                break;
1011        }
1012
1013        case IOC_LIBCFS_DEL_PEER: {
1014                rc = kiblnd_del_peer(ni, data->ioc_nid);
1015                break;
1016        }
1017        case IOC_LIBCFS_GET_CONN: {
1018                struct kib_conn *conn;
1019
1020                rc = 0;
1021                conn = kiblnd_get_conn_by_idx(ni, data->ioc_count);
1022                if (!conn) {
1023                        rc = -ENOENT;
1024                        break;
1025                }
1026
1027                LASSERT(conn->ibc_cmid);
1028                data->ioc_nid = conn->ibc_peer->ibp_nid;
1029                if (!conn->ibc_cmid->route.path_rec)
1030                        data->ioc_u32[0] = 0; /* iWarp has no path MTU */
1031                else
1032                        data->ioc_u32[0] =
1033                        ib_mtu_enum_to_int(conn->ibc_cmid->route.path_rec->mtu);
1034                kiblnd_conn_decref(conn);
1035                break;
1036        }
1037        case IOC_LIBCFS_CLOSE_CONNECTION: {
1038                rc = kiblnd_close_matching_conns(ni, data->ioc_nid);
1039                break;
1040        }
1041
1042        default:
1043                break;
1044        }
1045
1046        return rc;
1047}
1048
1049static void kiblnd_query(struct lnet_ni *ni, lnet_nid_t nid,
1050                         unsigned long *when)
1051{
1052        unsigned long last_alive = 0;
1053        unsigned long now = cfs_time_current();
1054        rwlock_t *glock = &kiblnd_data.kib_global_lock;
1055        struct kib_peer *peer;
1056        unsigned long flags;
1057
1058        read_lock_irqsave(glock, flags);
1059
1060        peer = kiblnd_find_peer_locked(nid);
1061        if (peer)
1062                last_alive = peer->ibp_last_alive;
1063
1064        read_unlock_irqrestore(glock, flags);
1065
1066        if (last_alive)
1067                *when = last_alive;
1068
1069        /*
1070         * peer is not persistent in hash, trigger peer creation
1071         * and connection establishment with a NULL tx
1072         */
1073        if (!peer)
1074                kiblnd_launch_tx(ni, NULL, nid);
1075
1076        CDEBUG(D_NET, "Peer %s %p, alive %ld secs ago\n",
1077               libcfs_nid2str(nid), peer,
1078               last_alive ? cfs_duration_sec(now - last_alive) : -1);
1079}
1080
1081static void kiblnd_free_pages(struct kib_pages *p)
1082{
1083        int npages = p->ibp_npages;
1084        int i;
1085
1086        for (i = 0; i < npages; i++) {
1087                if (p->ibp_pages[i])
1088                        __free_page(p->ibp_pages[i]);
1089        }
1090
1091        LIBCFS_FREE(p, offsetof(struct kib_pages, ibp_pages[npages]));
1092}
1093
1094int kiblnd_alloc_pages(struct kib_pages **pp, int cpt, int npages)
1095{
1096        struct kib_pages *p;
1097        int i;
1098
1099        LIBCFS_CPT_ALLOC(p, lnet_cpt_table(), cpt,
1100                         offsetof(struct kib_pages, ibp_pages[npages]));
1101        if (!p) {
1102                CERROR("Can't allocate descriptor for %d pages\n", npages);
1103                return -ENOMEM;
1104        }
1105
1106        memset(p, 0, offsetof(struct kib_pages, ibp_pages[npages]));
1107        p->ibp_npages = npages;
1108
1109        for (i = 0; i < npages; i++) {
1110                p->ibp_pages[i] = alloc_pages_node(
1111                                    cfs_cpt_spread_node(lnet_cpt_table(), cpt),
1112                                    GFP_NOFS, 0);
1113                if (!p->ibp_pages[i]) {
1114                        CERROR("Can't allocate page %d of %d\n", i, npages);
1115                        kiblnd_free_pages(p);
1116                        return -ENOMEM;
1117                }
1118        }
1119
1120        *pp = p;
1121        return 0;
1122}
1123
1124void kiblnd_unmap_rx_descs(struct kib_conn *conn)
1125{
1126        struct kib_rx *rx;
1127        int i;
1128
1129        LASSERT(conn->ibc_rxs);
1130        LASSERT(conn->ibc_hdev);
1131
1132        for (i = 0; i < IBLND_RX_MSGS(conn); i++) {
1133                rx = &conn->ibc_rxs[i];
1134
1135                LASSERT(rx->rx_nob >= 0); /* not posted */
1136
1137                kiblnd_dma_unmap_single(conn->ibc_hdev->ibh_ibdev,
1138                                        KIBLND_UNMAP_ADDR(rx, rx_msgunmap,
1139                                                          rx->rx_msgaddr),
1140                                        IBLND_MSG_SIZE, DMA_FROM_DEVICE);
1141        }
1142
1143        kiblnd_free_pages(conn->ibc_rx_pages);
1144
1145        conn->ibc_rx_pages = NULL;
1146}
1147
1148void kiblnd_map_rx_descs(struct kib_conn *conn)
1149{
1150        struct kib_rx *rx;
1151        struct page *pg;
1152        int pg_off;
1153        int ipg;
1154        int i;
1155
1156        for (pg_off = ipg = i = 0; i < IBLND_RX_MSGS(conn); i++) {
1157                pg = conn->ibc_rx_pages->ibp_pages[ipg];
1158                rx = &conn->ibc_rxs[i];
1159
1160                rx->rx_conn = conn;
1161                rx->rx_msg = (struct kib_msg *)(((char *)page_address(pg)) + pg_off);
1162
1163                rx->rx_msgaddr = kiblnd_dma_map_single(conn->ibc_hdev->ibh_ibdev,
1164                                                       rx->rx_msg,
1165                                                       IBLND_MSG_SIZE,
1166                                                       DMA_FROM_DEVICE);
1167                LASSERT(!kiblnd_dma_mapping_error(conn->ibc_hdev->ibh_ibdev,
1168                                                  rx->rx_msgaddr));
1169                KIBLND_UNMAP_ADDR_SET(rx, rx_msgunmap, rx->rx_msgaddr);
1170
1171                CDEBUG(D_NET, "rx %d: %p %#llx(%#llx)\n",
1172                       i, rx->rx_msg, rx->rx_msgaddr,
1173                       (__u64)(page_to_phys(pg) + pg_off));
1174
1175                pg_off += IBLND_MSG_SIZE;
1176                LASSERT(pg_off <= PAGE_SIZE);
1177
1178                if (pg_off == PAGE_SIZE) {
1179                        pg_off = 0;
1180                        ipg++;
1181                        LASSERT(ipg <= IBLND_RX_MSG_PAGES(conn));
1182                }
1183        }
1184}
1185
1186static void kiblnd_unmap_tx_pool(struct kib_tx_pool *tpo)
1187{
1188        struct kib_hca_dev *hdev = tpo->tpo_hdev;
1189        struct kib_tx *tx;
1190        int i;
1191
1192        LASSERT(!tpo->tpo_pool.po_allocated);
1193
1194        if (!hdev)
1195                return;
1196
1197        for (i = 0; i < tpo->tpo_pool.po_size; i++) {
1198                tx = &tpo->tpo_tx_descs[i];
1199                kiblnd_dma_unmap_single(hdev->ibh_ibdev,
1200                                        KIBLND_UNMAP_ADDR(tx, tx_msgunmap,
1201                                                          tx->tx_msgaddr),
1202                                        IBLND_MSG_SIZE, DMA_TO_DEVICE);
1203        }
1204
1205        kiblnd_hdev_decref(hdev);
1206        tpo->tpo_hdev = NULL;
1207}
1208
1209static struct kib_hca_dev *kiblnd_current_hdev(struct kib_dev *dev)
1210{
1211        struct kib_hca_dev *hdev;
1212        unsigned long flags;
1213        int i = 0;
1214
1215        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1216        while (dev->ibd_failover) {
1217                read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1218                if (!(i++ % 50))
1219                        CDEBUG(D_NET, "%s: Wait for failover\n",
1220                               dev->ibd_ifname);
1221                set_current_state(TASK_INTERRUPTIBLE);
1222                schedule_timeout(cfs_time_seconds(1) / 100);
1223
1224                read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1225        }
1226
1227        kiblnd_hdev_addref_locked(dev->ibd_hdev);
1228        hdev = dev->ibd_hdev;
1229
1230        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1231
1232        return hdev;
1233}
1234
1235static void kiblnd_map_tx_pool(struct kib_tx_pool *tpo)
1236{
1237        struct kib_pages *txpgs = tpo->tpo_tx_pages;
1238        struct kib_pool *pool = &tpo->tpo_pool;
1239        struct kib_net *net = pool->po_owner->ps_net;
1240        struct kib_dev *dev;
1241        struct page *page;
1242        struct kib_tx *tx;
1243        int page_offset;
1244        int ipage;
1245        int i;
1246
1247        LASSERT(net);
1248
1249        dev = net->ibn_dev;
1250
1251        /* pre-mapped messages are not bigger than 1 page */
1252        BUILD_BUG_ON(IBLND_MSG_SIZE > PAGE_SIZE);
1253
1254        /* No fancy arithmetic when we do the buffer calculations */
1255        BUILD_BUG_ON(PAGE_SIZE % IBLND_MSG_SIZE);
1256
1257        tpo->tpo_hdev = kiblnd_current_hdev(dev);
1258
1259        for (ipage = page_offset = i = 0; i < pool->po_size; i++) {
1260                page = txpgs->ibp_pages[ipage];
1261                tx = &tpo->tpo_tx_descs[i];
1262
1263                tx->tx_msg = (struct kib_msg *)(((char *)page_address(page)) +
1264                                           page_offset);
1265
1266                tx->tx_msgaddr = kiblnd_dma_map_single(
1267                        tpo->tpo_hdev->ibh_ibdev, tx->tx_msg,
1268                        IBLND_MSG_SIZE, DMA_TO_DEVICE);
1269                LASSERT(!kiblnd_dma_mapping_error(tpo->tpo_hdev->ibh_ibdev,
1270                                                  tx->tx_msgaddr));
1271                KIBLND_UNMAP_ADDR_SET(tx, tx_msgunmap, tx->tx_msgaddr);
1272
1273                list_add(&tx->tx_list, &pool->po_free_list);
1274
1275                page_offset += IBLND_MSG_SIZE;
1276                LASSERT(page_offset <= PAGE_SIZE);
1277
1278                if (page_offset == PAGE_SIZE) {
1279                        page_offset = 0;
1280                        ipage++;
1281                        LASSERT(ipage <= txpgs->ibp_npages);
1282                }
1283        }
1284}
1285
1286static void kiblnd_destroy_fmr_pool(struct kib_fmr_pool *fpo)
1287{
1288        LASSERT(!fpo->fpo_map_count);
1289
1290        if (fpo->fpo_is_fmr) {
1291                if (fpo->fmr.fpo_fmr_pool)
1292                        ib_destroy_fmr_pool(fpo->fmr.fpo_fmr_pool);
1293        } else {
1294                struct kib_fast_reg_descriptor *frd, *tmp;
1295                int i = 0;
1296
1297                list_for_each_entry_safe(frd, tmp, &fpo->fast_reg.fpo_pool_list,
1298                                         frd_list) {
1299                        list_del(&frd->frd_list);
1300                        ib_dereg_mr(frd->frd_mr);
1301                        LIBCFS_FREE(frd, sizeof(*frd));
1302                        i++;
1303                }
1304                if (i < fpo->fast_reg.fpo_pool_size)
1305                        CERROR("FastReg pool still has %d regions registered\n",
1306                               fpo->fast_reg.fpo_pool_size - i);
1307        }
1308
1309        if (fpo->fpo_hdev)
1310                kiblnd_hdev_decref(fpo->fpo_hdev);
1311
1312        LIBCFS_FREE(fpo, sizeof(*fpo));
1313}
1314
1315static void kiblnd_destroy_fmr_pool_list(struct list_head *head)
1316{
1317        struct kib_fmr_pool *fpo, *tmp;
1318
1319        list_for_each_entry_safe(fpo, tmp, head, fpo_list) {
1320                list_del(&fpo->fpo_list);
1321                kiblnd_destroy_fmr_pool(fpo);
1322        }
1323}
1324
1325static int
1326kiblnd_fmr_pool_size(struct lnet_ioctl_config_o2iblnd_tunables *tunables,
1327                     int ncpts)
1328{
1329        int size = tunables->lnd_fmr_pool_size / ncpts;
1330
1331        return max(IBLND_FMR_POOL, size);
1332}
1333
1334static int
1335kiblnd_fmr_flush_trigger(struct lnet_ioctl_config_o2iblnd_tunables *tunables,
1336                         int ncpts)
1337{
1338        int size = tunables->lnd_fmr_flush_trigger / ncpts;
1339
1340        return max(IBLND_FMR_POOL_FLUSH, size);
1341}
1342
1343static int kiblnd_alloc_fmr_pool(struct kib_fmr_poolset *fps, struct kib_fmr_pool *fpo)
1344{
1345        struct ib_fmr_pool_param param = {
1346                .max_pages_per_fmr = LNET_MAX_PAYLOAD / PAGE_SIZE,
1347                .page_shift        = PAGE_SHIFT,
1348                .access            = (IB_ACCESS_LOCAL_WRITE |
1349                                      IB_ACCESS_REMOTE_WRITE),
1350                .pool_size         = fps->fps_pool_size,
1351                .dirty_watermark   = fps->fps_flush_trigger,
1352                .flush_function    = NULL,
1353                .flush_arg         = NULL,
1354                .cache             = !!fps->fps_cache };
1355        int rc = 0;
1356
1357        fpo->fmr.fpo_fmr_pool = ib_create_fmr_pool(fpo->fpo_hdev->ibh_pd,
1358                                                   &param);
1359        if (IS_ERR(fpo->fmr.fpo_fmr_pool)) {
1360                rc = PTR_ERR(fpo->fmr.fpo_fmr_pool);
1361                if (rc != -ENOSYS)
1362                        CERROR("Failed to create FMR pool: %d\n", rc);
1363                else
1364                        CERROR("FMRs are not supported\n");
1365        }
1366
1367        return rc;
1368}
1369
1370static int kiblnd_alloc_freg_pool(struct kib_fmr_poolset *fps, struct kib_fmr_pool *fpo)
1371{
1372        struct kib_fast_reg_descriptor *frd, *tmp;
1373        int i, rc;
1374
1375        INIT_LIST_HEAD(&fpo->fast_reg.fpo_pool_list);
1376        fpo->fast_reg.fpo_pool_size = 0;
1377        for (i = 0; i < fps->fps_pool_size; i++) {
1378                LIBCFS_CPT_ALLOC(frd, lnet_cpt_table(), fps->fps_cpt,
1379                                 sizeof(*frd));
1380                if (!frd) {
1381                        CERROR("Failed to allocate a new fast_reg descriptor\n");
1382                        rc = -ENOMEM;
1383                        goto out;
1384                }
1385
1386                frd->frd_mr = ib_alloc_mr(fpo->fpo_hdev->ibh_pd,
1387                                          IB_MR_TYPE_MEM_REG,
1388                                          LNET_MAX_PAYLOAD / PAGE_SIZE);
1389                if (IS_ERR(frd->frd_mr)) {
1390                        rc = PTR_ERR(frd->frd_mr);
1391                        CERROR("Failed to allocate ib_alloc_mr: %d\n", rc);
1392                        frd->frd_mr = NULL;
1393                        goto out_middle;
1394                }
1395
1396                frd->frd_valid = true;
1397
1398                list_add_tail(&frd->frd_list, &fpo->fast_reg.fpo_pool_list);
1399                fpo->fast_reg.fpo_pool_size++;
1400        }
1401
1402        return 0;
1403
1404out_middle:
1405        if (frd->frd_mr)
1406                ib_dereg_mr(frd->frd_mr);
1407        LIBCFS_FREE(frd, sizeof(*frd));
1408
1409out:
1410        list_for_each_entry_safe(frd, tmp, &fpo->fast_reg.fpo_pool_list,
1411                                 frd_list) {
1412                list_del(&frd->frd_list);
1413                ib_dereg_mr(frd->frd_mr);
1414                LIBCFS_FREE(frd, sizeof(*frd));
1415        }
1416
1417        return rc;
1418}
1419
1420static int kiblnd_create_fmr_pool(struct kib_fmr_poolset *fps,
1421                                  struct kib_fmr_pool **pp_fpo)
1422{
1423        struct kib_dev *dev = fps->fps_net->ibn_dev;
1424        struct ib_device_attr *dev_attr;
1425        struct kib_fmr_pool *fpo;
1426        int rc;
1427
1428        LIBCFS_CPT_ALLOC(fpo, lnet_cpt_table(), fps->fps_cpt, sizeof(*fpo));
1429        if (!fpo)
1430                return -ENOMEM;
1431
1432        fpo->fpo_hdev = kiblnd_current_hdev(dev);
1433        dev_attr = &fpo->fpo_hdev->ibh_ibdev->attrs;
1434
1435        /* Check for FMR or FastReg support */
1436        fpo->fpo_is_fmr = 0;
1437        if (fpo->fpo_hdev->ibh_ibdev->alloc_fmr &&
1438            fpo->fpo_hdev->ibh_ibdev->dealloc_fmr &&
1439            fpo->fpo_hdev->ibh_ibdev->map_phys_fmr &&
1440            fpo->fpo_hdev->ibh_ibdev->unmap_fmr) {
1441                LCONSOLE_INFO("Using FMR for registration\n");
1442                fpo->fpo_is_fmr = 1;
1443        } else if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
1444                LCONSOLE_INFO("Using FastReg for registration\n");
1445        } else {
1446                rc = -ENOSYS;
1447                LCONSOLE_ERROR_MSG(rc, "IB device does not support FMRs nor FastRegs, can't register memory\n");
1448                goto out_fpo;
1449        }
1450
1451        if (fpo->fpo_is_fmr)
1452                rc = kiblnd_alloc_fmr_pool(fps, fpo);
1453        else
1454                rc = kiblnd_alloc_freg_pool(fps, fpo);
1455        if (rc)
1456                goto out_fpo;
1457
1458        fpo->fpo_deadline = cfs_time_shift(IBLND_POOL_DEADLINE);
1459        fpo->fpo_owner = fps;
1460        *pp_fpo = fpo;
1461
1462        return 0;
1463
1464out_fpo:
1465        kiblnd_hdev_decref(fpo->fpo_hdev);
1466        LIBCFS_FREE(fpo, sizeof(*fpo));
1467        return rc;
1468}
1469
1470static void kiblnd_fail_fmr_poolset(struct kib_fmr_poolset *fps,
1471                                    struct list_head *zombies)
1472{
1473        if (!fps->fps_net) /* initialized? */
1474                return;
1475
1476        spin_lock(&fps->fps_lock);
1477
1478        while (!list_empty(&fps->fps_pool_list)) {
1479                struct kib_fmr_pool *fpo = list_entry(fps->fps_pool_list.next,
1480                                                 struct kib_fmr_pool, fpo_list);
1481                fpo->fpo_failed = 1;
1482                list_del(&fpo->fpo_list);
1483                if (!fpo->fpo_map_count)
1484                        list_add(&fpo->fpo_list, zombies);
1485                else
1486                        list_add(&fpo->fpo_list, &fps->fps_failed_pool_list);
1487        }
1488
1489        spin_unlock(&fps->fps_lock);
1490}
1491
1492static void kiblnd_fini_fmr_poolset(struct kib_fmr_poolset *fps)
1493{
1494        if (fps->fps_net) { /* initialized? */
1495                kiblnd_destroy_fmr_pool_list(&fps->fps_failed_pool_list);
1496                kiblnd_destroy_fmr_pool_list(&fps->fps_pool_list);
1497        }
1498}
1499
1500static int
1501kiblnd_init_fmr_poolset(struct kib_fmr_poolset *fps, int cpt, int ncpts,
1502                        struct kib_net *net,
1503                        struct lnet_ioctl_config_o2iblnd_tunables *tunables)
1504{
1505        struct kib_fmr_pool *fpo;
1506        int rc;
1507
1508        memset(fps, 0, sizeof(*fps));
1509
1510        fps->fps_net = net;
1511        fps->fps_cpt = cpt;
1512
1513        fps->fps_pool_size = kiblnd_fmr_pool_size(tunables, ncpts);
1514        fps->fps_flush_trigger = kiblnd_fmr_flush_trigger(tunables, ncpts);
1515        fps->fps_cache = tunables->lnd_fmr_cache;
1516
1517        spin_lock_init(&fps->fps_lock);
1518        INIT_LIST_HEAD(&fps->fps_pool_list);
1519        INIT_LIST_HEAD(&fps->fps_failed_pool_list);
1520
1521        rc = kiblnd_create_fmr_pool(fps, &fpo);
1522        if (!rc)
1523                list_add_tail(&fpo->fpo_list, &fps->fps_pool_list);
1524
1525        return rc;
1526}
1527
1528static int kiblnd_fmr_pool_is_idle(struct kib_fmr_pool *fpo, unsigned long now)
1529{
1530        if (fpo->fpo_map_count) /* still in use */
1531                return 0;
1532        if (fpo->fpo_failed)
1533                return 1;
1534        return cfs_time_aftereq(now, fpo->fpo_deadline);
1535}
1536
1537static int
1538kiblnd_map_tx_pages(struct kib_tx *tx, struct kib_rdma_desc *rd)
1539{
1540        __u64 *pages = tx->tx_pages;
1541        struct kib_hca_dev *hdev;
1542        int npages;
1543        int size;
1544        int i;
1545
1546        hdev = tx->tx_pool->tpo_hdev;
1547
1548        for (i = 0, npages = 0; i < rd->rd_nfrags; i++) {
1549                for (size = 0; size <  rd->rd_frags[i].rf_nob;
1550                     size += hdev->ibh_page_size) {
1551                        pages[npages++] = (rd->rd_frags[i].rf_addr &
1552                                           hdev->ibh_page_mask) + size;
1553                }
1554        }
1555
1556        return npages;
1557}
1558
1559void kiblnd_fmr_pool_unmap(struct kib_fmr *fmr, int status)
1560{
1561        LIST_HEAD(zombies);
1562        struct kib_fmr_pool *fpo = fmr->fmr_pool;
1563        struct kib_fmr_poolset *fps;
1564        unsigned long now = cfs_time_current();
1565        struct kib_fmr_pool *tmp;
1566        int rc;
1567
1568        if (!fpo)
1569                return;
1570
1571        fps = fpo->fpo_owner;
1572        if (fpo->fpo_is_fmr) {
1573                if (fmr->fmr_pfmr) {
1574                        rc = ib_fmr_pool_unmap(fmr->fmr_pfmr);
1575                        LASSERT(!rc);
1576                        fmr->fmr_pfmr = NULL;
1577                }
1578
1579                if (status) {
1580                        rc = ib_flush_fmr_pool(fpo->fmr.fpo_fmr_pool);
1581                        LASSERT(!rc);
1582                }
1583        } else {
1584                struct kib_fast_reg_descriptor *frd = fmr->fmr_frd;
1585
1586                if (frd) {
1587                        frd->frd_valid = false;
1588                        spin_lock(&fps->fps_lock);
1589                        list_add_tail(&frd->frd_list, &fpo->fast_reg.fpo_pool_list);
1590                        spin_unlock(&fps->fps_lock);
1591                        fmr->fmr_frd = NULL;
1592                }
1593        }
1594        fmr->fmr_pool = NULL;
1595
1596        spin_lock(&fps->fps_lock);
1597        fpo->fpo_map_count--;  /* decref the pool */
1598
1599        list_for_each_entry_safe(fpo, tmp, &fps->fps_pool_list, fpo_list) {
1600                /* the first pool is persistent */
1601                if (fps->fps_pool_list.next == &fpo->fpo_list)
1602                        continue;
1603
1604                if (kiblnd_fmr_pool_is_idle(fpo, now)) {
1605                        list_move(&fpo->fpo_list, &zombies);
1606                        fps->fps_version++;
1607                }
1608        }
1609        spin_unlock(&fps->fps_lock);
1610
1611        if (!list_empty(&zombies))
1612                kiblnd_destroy_fmr_pool_list(&zombies);
1613}
1614
1615int kiblnd_fmr_pool_map(struct kib_fmr_poolset *fps, struct kib_tx *tx,
1616                        struct kib_rdma_desc *rd, __u32 nob, __u64 iov,
1617                        struct kib_fmr *fmr)
1618{
1619        __u64 *pages = tx->tx_pages;
1620        bool is_rx = (rd != tx->tx_rd);
1621        bool tx_pages_mapped = false;
1622        struct kib_fmr_pool *fpo;
1623        int npages = 0;
1624        __u64 version;
1625        int rc;
1626
1627 again:
1628        spin_lock(&fps->fps_lock);
1629        version = fps->fps_version;
1630        list_for_each_entry(fpo, &fps->fps_pool_list, fpo_list) {
1631                fpo->fpo_deadline = cfs_time_shift(IBLND_POOL_DEADLINE);
1632                fpo->fpo_map_count++;
1633
1634                if (fpo->fpo_is_fmr) {
1635                        struct ib_pool_fmr *pfmr;
1636
1637                        spin_unlock(&fps->fps_lock);
1638
1639                        if (!tx_pages_mapped) {
1640                                npages = kiblnd_map_tx_pages(tx, rd);
1641                                tx_pages_mapped = 1;
1642                        }
1643
1644                        pfmr = ib_fmr_pool_map_phys(fpo->fmr.fpo_fmr_pool,
1645                                                    pages, npages, iov);
1646                        if (likely(!IS_ERR(pfmr))) {
1647                                fmr->fmr_key = is_rx ? pfmr->fmr->rkey :
1648                                                       pfmr->fmr->lkey;
1649                                fmr->fmr_frd = NULL;
1650                                fmr->fmr_pfmr = pfmr;
1651                                fmr->fmr_pool = fpo;
1652                                return 0;
1653                        }
1654                        rc = PTR_ERR(pfmr);
1655                } else {
1656                        if (!list_empty(&fpo->fast_reg.fpo_pool_list)) {
1657                                struct kib_fast_reg_descriptor *frd;
1658                                struct ib_reg_wr *wr;
1659                                struct ib_mr *mr;
1660                                int n;
1661
1662                                frd = list_first_entry(&fpo->fast_reg.fpo_pool_list,
1663                                                       struct kib_fast_reg_descriptor,
1664                                                       frd_list);
1665                                list_del(&frd->frd_list);
1666                                spin_unlock(&fps->fps_lock);
1667
1668                                mr = frd->frd_mr;
1669
1670                                if (!frd->frd_valid) {
1671                                        __u32 key = is_rx ? mr->rkey : mr->lkey;
1672                                        struct ib_send_wr *inv_wr;
1673
1674                                        inv_wr = &frd->frd_inv_wr;
1675                                        memset(inv_wr, 0, sizeof(*inv_wr));
1676                                        inv_wr->opcode = IB_WR_LOCAL_INV;
1677                                        inv_wr->wr_id = IBLND_WID_MR;
1678                                        inv_wr->ex.invalidate_rkey = key;
1679
1680                                        /* Bump the key */
1681                                        key = ib_inc_rkey(key);
1682                                        ib_update_fast_reg_key(mr, key);
1683                                }
1684
1685                                n = ib_map_mr_sg(mr, tx->tx_frags,
1686                                                 tx->tx_nfrags, NULL, PAGE_SIZE);
1687                                if (unlikely(n != tx->tx_nfrags)) {
1688                                        CERROR("Failed to map mr %d/%d elements\n",
1689                                               n, tx->tx_nfrags);
1690                                        return n < 0 ? n : -EINVAL;
1691                                }
1692
1693                                mr->iova = iov;
1694
1695                                /* Prepare FastReg WR */
1696                                wr = &frd->frd_fastreg_wr;
1697                                memset(wr, 0, sizeof(*wr));
1698                                wr->wr.opcode = IB_WR_REG_MR;
1699                                wr->wr.wr_id = IBLND_WID_MR;
1700                                wr->wr.num_sge = 0;
1701                                wr->wr.send_flags = 0;
1702                                wr->mr = mr;
1703                                wr->key = is_rx ? mr->rkey : mr->lkey;
1704                                wr->access = (IB_ACCESS_LOCAL_WRITE |
1705                                              IB_ACCESS_REMOTE_WRITE);
1706
1707                                fmr->fmr_key = is_rx ? mr->rkey : mr->lkey;
1708                                fmr->fmr_frd = frd;
1709                                fmr->fmr_pfmr = NULL;
1710                                fmr->fmr_pool = fpo;
1711                                return 0;
1712                        }
1713                        spin_unlock(&fps->fps_lock);
1714                        rc = -EBUSY;
1715                }
1716
1717                spin_lock(&fps->fps_lock);
1718                fpo->fpo_map_count--;
1719                if (rc != -EAGAIN) {
1720                        spin_unlock(&fps->fps_lock);
1721                        return rc;
1722                }
1723
1724                /* EAGAIN and ... */
1725                if (version != fps->fps_version) {
1726                        spin_unlock(&fps->fps_lock);
1727                        goto again;
1728                }
1729        }
1730
1731        if (fps->fps_increasing) {
1732                spin_unlock(&fps->fps_lock);
1733                CDEBUG(D_NET, "Another thread is allocating new FMR pool, waiting for her to complete\n");
1734                schedule();
1735                goto again;
1736        }
1737
1738        if (time_before(cfs_time_current(), fps->fps_next_retry)) {
1739                /* someone failed recently */
1740                spin_unlock(&fps->fps_lock);
1741                return -EAGAIN;
1742        }
1743
1744        fps->fps_increasing = 1;
1745        spin_unlock(&fps->fps_lock);
1746
1747        CDEBUG(D_NET, "Allocate new FMR pool\n");
1748        rc = kiblnd_create_fmr_pool(fps, &fpo);
1749        spin_lock(&fps->fps_lock);
1750        fps->fps_increasing = 0;
1751        if (!rc) {
1752                fps->fps_version++;
1753                list_add_tail(&fpo->fpo_list, &fps->fps_pool_list);
1754        } else {
1755                fps->fps_next_retry = cfs_time_shift(IBLND_POOL_RETRY);
1756        }
1757        spin_unlock(&fps->fps_lock);
1758
1759        goto again;
1760}
1761
1762static void kiblnd_fini_pool(struct kib_pool *pool)
1763{
1764        LASSERT(list_empty(&pool->po_free_list));
1765        LASSERT(!pool->po_allocated);
1766
1767        CDEBUG(D_NET, "Finalize %s pool\n", pool->po_owner->ps_name);
1768}
1769
1770static void kiblnd_init_pool(struct kib_poolset *ps, struct kib_pool *pool, int size)
1771{
1772        CDEBUG(D_NET, "Initialize %s pool\n", ps->ps_name);
1773
1774        memset(pool, 0, sizeof(*pool));
1775        INIT_LIST_HEAD(&pool->po_free_list);
1776        pool->po_deadline = cfs_time_shift(IBLND_POOL_DEADLINE);
1777        pool->po_owner    = ps;
1778        pool->po_size     = size;
1779}
1780
1781static void kiblnd_destroy_pool_list(struct list_head *head)
1782{
1783        struct kib_pool *pool;
1784
1785        while (!list_empty(head)) {
1786                pool = list_entry(head->next, struct kib_pool, po_list);
1787                list_del(&pool->po_list);
1788
1789                LASSERT(pool->po_owner);
1790                pool->po_owner->ps_pool_destroy(pool);
1791        }
1792}
1793
1794static void kiblnd_fail_poolset(struct kib_poolset *ps, struct list_head *zombies)
1795{
1796        if (!ps->ps_net) /* initialized? */
1797                return;
1798
1799        spin_lock(&ps->ps_lock);
1800        while (!list_empty(&ps->ps_pool_list)) {
1801                struct kib_pool *po = list_entry(ps->ps_pool_list.next,
1802                                            struct kib_pool, po_list);
1803                po->po_failed = 1;
1804                list_del(&po->po_list);
1805                if (!po->po_allocated)
1806                        list_add(&po->po_list, zombies);
1807                else
1808                        list_add(&po->po_list, &ps->ps_failed_pool_list);
1809        }
1810        spin_unlock(&ps->ps_lock);
1811}
1812
1813static void kiblnd_fini_poolset(struct kib_poolset *ps)
1814{
1815        if (ps->ps_net) { /* initialized? */
1816                kiblnd_destroy_pool_list(&ps->ps_failed_pool_list);
1817                kiblnd_destroy_pool_list(&ps->ps_pool_list);
1818        }
1819}
1820
1821static int kiblnd_init_poolset(struct kib_poolset *ps, int cpt,
1822                               struct kib_net *net, char *name, int size,
1823                               kib_ps_pool_create_t po_create,
1824                               kib_ps_pool_destroy_t po_destroy,
1825                               kib_ps_node_init_t nd_init,
1826                               kib_ps_node_fini_t nd_fini)
1827{
1828        struct kib_pool *pool;
1829        int rc;
1830
1831        memset(ps, 0, sizeof(*ps));
1832
1833        ps->ps_cpt          = cpt;
1834        ps->ps_net          = net;
1835        ps->ps_pool_create  = po_create;
1836        ps->ps_pool_destroy = po_destroy;
1837        ps->ps_node_init    = nd_init;
1838        ps->ps_node_fini    = nd_fini;
1839        ps->ps_pool_size    = size;
1840        if (strlcpy(ps->ps_name, name, sizeof(ps->ps_name))
1841            >= sizeof(ps->ps_name))
1842                return -E2BIG;
1843        spin_lock_init(&ps->ps_lock);
1844        INIT_LIST_HEAD(&ps->ps_pool_list);
1845        INIT_LIST_HEAD(&ps->ps_failed_pool_list);
1846
1847        rc = ps->ps_pool_create(ps, size, &pool);
1848        if (!rc)
1849                list_add(&pool->po_list, &ps->ps_pool_list);
1850        else
1851                CERROR("Failed to create the first pool for %s\n", ps->ps_name);
1852
1853        return rc;
1854}
1855
1856static int kiblnd_pool_is_idle(struct kib_pool *pool, unsigned long now)
1857{
1858        if (pool->po_allocated) /* still in use */
1859                return 0;
1860        if (pool->po_failed)
1861                return 1;
1862        return cfs_time_aftereq(now, pool->po_deadline);
1863}
1864
1865void kiblnd_pool_free_node(struct kib_pool *pool, struct list_head *node)
1866{
1867        LIST_HEAD(zombies);
1868        struct kib_poolset *ps = pool->po_owner;
1869        struct kib_pool *tmp;
1870        unsigned long now = cfs_time_current();
1871
1872        spin_lock(&ps->ps_lock);
1873
1874        if (ps->ps_node_fini)
1875                ps->ps_node_fini(pool, node);
1876
1877        LASSERT(pool->po_allocated > 0);
1878        list_add(node, &pool->po_free_list);
1879        pool->po_allocated--;
1880
1881        list_for_each_entry_safe(pool, tmp, &ps->ps_pool_list, po_list) {
1882                /* the first pool is persistent */
1883                if (ps->ps_pool_list.next == &pool->po_list)
1884                        continue;
1885
1886                if (kiblnd_pool_is_idle(pool, now))
1887                        list_move(&pool->po_list, &zombies);
1888        }
1889        spin_unlock(&ps->ps_lock);
1890
1891        if (!list_empty(&zombies))
1892                kiblnd_destroy_pool_list(&zombies);
1893}
1894
1895struct list_head *kiblnd_pool_alloc_node(struct kib_poolset *ps)
1896{
1897        struct list_head *node;
1898        struct kib_pool *pool;
1899        unsigned int interval = 1;
1900        unsigned long time_before;
1901        unsigned int trips = 0;
1902        int rc;
1903
1904 again:
1905        spin_lock(&ps->ps_lock);
1906        list_for_each_entry(pool, &ps->ps_pool_list, po_list) {
1907                if (list_empty(&pool->po_free_list))
1908                        continue;
1909
1910                pool->po_allocated++;
1911                pool->po_deadline = cfs_time_shift(IBLND_POOL_DEADLINE);
1912                node = pool->po_free_list.next;
1913                list_del(node);
1914
1915                if (ps->ps_node_init) {
1916                        /* still hold the lock */
1917                        ps->ps_node_init(pool, node);
1918                }
1919                spin_unlock(&ps->ps_lock);
1920                return node;
1921        }
1922
1923        /* no available tx pool and ... */
1924        if (ps->ps_increasing) {
1925                /* another thread is allocating a new pool */
1926                spin_unlock(&ps->ps_lock);
1927                trips++;
1928                CDEBUG(D_NET, "Another thread is allocating new %s pool, waiting %d HZs for her to complete. trips = %d\n",
1929                       ps->ps_name, interval, trips);
1930
1931                set_current_state(TASK_INTERRUPTIBLE);
1932                schedule_timeout(interval);
1933                if (interval < cfs_time_seconds(1))
1934                        interval *= 2;
1935
1936                goto again;
1937        }
1938
1939        if (time_before(cfs_time_current(), ps->ps_next_retry)) {
1940                /* someone failed recently */
1941                spin_unlock(&ps->ps_lock);
1942                return NULL;
1943        }
1944
1945        ps->ps_increasing = 1;
1946        spin_unlock(&ps->ps_lock);
1947
1948        CDEBUG(D_NET, "%s pool exhausted, allocate new pool\n", ps->ps_name);
1949        time_before = cfs_time_current();
1950        rc = ps->ps_pool_create(ps, ps->ps_pool_size, &pool);
1951        CDEBUG(D_NET, "ps_pool_create took %lu HZ to complete",
1952               cfs_time_current() - time_before);
1953
1954        spin_lock(&ps->ps_lock);
1955        ps->ps_increasing = 0;
1956        if (!rc) {
1957                list_add_tail(&pool->po_list, &ps->ps_pool_list);
1958        } else {
1959                ps->ps_next_retry = cfs_time_shift(IBLND_POOL_RETRY);
1960                CERROR("Can't allocate new %s pool because out of memory\n",
1961                       ps->ps_name);
1962        }
1963        spin_unlock(&ps->ps_lock);
1964
1965        goto again;
1966}
1967
1968static void kiblnd_destroy_tx_pool(struct kib_pool *pool)
1969{
1970        struct kib_tx_pool *tpo = container_of(pool, struct kib_tx_pool, tpo_pool);
1971        int i;
1972
1973        LASSERT(!pool->po_allocated);
1974
1975        if (tpo->tpo_tx_pages) {
1976                kiblnd_unmap_tx_pool(tpo);
1977                kiblnd_free_pages(tpo->tpo_tx_pages);
1978        }
1979
1980        if (!tpo->tpo_tx_descs)
1981                goto out;
1982
1983        for (i = 0; i < pool->po_size; i++) {
1984                struct kib_tx *tx = &tpo->tpo_tx_descs[i];
1985
1986                list_del(&tx->tx_list);
1987                if (tx->tx_pages)
1988                        LIBCFS_FREE(tx->tx_pages,
1989                                    LNET_MAX_IOV *
1990                                    sizeof(*tx->tx_pages));
1991                if (tx->tx_frags)
1992                        LIBCFS_FREE(tx->tx_frags,
1993                                    (1 + IBLND_MAX_RDMA_FRAGS) *
1994                                     sizeof(*tx->tx_frags));
1995                if (tx->tx_wrq)
1996                        LIBCFS_FREE(tx->tx_wrq,
1997                                    (1 + IBLND_MAX_RDMA_FRAGS) *
1998                                    sizeof(*tx->tx_wrq));
1999                if (tx->tx_sge)
2000                        LIBCFS_FREE(tx->tx_sge,
2001                                    (1 + IBLND_MAX_RDMA_FRAGS) *
2002                                    sizeof(*tx->tx_sge));
2003                if (tx->tx_rd)
2004                        LIBCFS_FREE(tx->tx_rd,
2005                                    offsetof(struct kib_rdma_desc,
2006                                             rd_frags[IBLND_MAX_RDMA_FRAGS]));
2007        }
2008
2009        LIBCFS_FREE(tpo->tpo_tx_descs,
2010                    pool->po_size * sizeof(struct kib_tx));
2011out:
2012        kiblnd_fini_pool(pool);
2013        LIBCFS_FREE(tpo, sizeof(*tpo));
2014}
2015
2016static int kiblnd_tx_pool_size(int ncpts)
2017{
2018        int ntx = *kiblnd_tunables.kib_ntx / ncpts;
2019
2020        return max(IBLND_TX_POOL, ntx);
2021}
2022
2023static int kiblnd_create_tx_pool(struct kib_poolset *ps, int size,
2024                                 struct kib_pool **pp_po)
2025{
2026        int i;
2027        int npg;
2028        struct kib_pool *pool;
2029        struct kib_tx_pool *tpo;
2030
2031        LIBCFS_CPT_ALLOC(tpo, lnet_cpt_table(), ps->ps_cpt, sizeof(*tpo));
2032        if (!tpo) {
2033                CERROR("Failed to allocate TX pool\n");
2034                return -ENOMEM;
2035        }
2036
2037        pool = &tpo->tpo_pool;
2038        kiblnd_init_pool(ps, pool, size);
2039        tpo->tpo_tx_descs = NULL;
2040        tpo->tpo_tx_pages = NULL;
2041
2042        npg = DIV_ROUND_UP(size * IBLND_MSG_SIZE, PAGE_SIZE);
2043        if (kiblnd_alloc_pages(&tpo->tpo_tx_pages, ps->ps_cpt, npg)) {
2044                CERROR("Can't allocate tx pages: %d\n", npg);
2045                LIBCFS_FREE(tpo, sizeof(*tpo));
2046                return -ENOMEM;
2047        }
2048
2049        LIBCFS_CPT_ALLOC(tpo->tpo_tx_descs, lnet_cpt_table(), ps->ps_cpt,
2050                         size * sizeof(struct kib_tx));
2051        if (!tpo->tpo_tx_descs) {
2052                CERROR("Can't allocate %d tx descriptors\n", size);
2053                ps->ps_pool_destroy(pool);
2054                return -ENOMEM;
2055        }
2056
2057        memset(tpo->tpo_tx_descs, 0, size * sizeof(struct kib_tx));
2058
2059        for (i = 0; i < size; i++) {
2060                struct kib_tx *tx = &tpo->tpo_tx_descs[i];
2061
2062                tx->tx_pool = tpo;
2063                if (ps->ps_net->ibn_fmr_ps) {
2064                        LIBCFS_CPT_ALLOC(tx->tx_pages,
2065                                         lnet_cpt_table(), ps->ps_cpt,
2066                                         LNET_MAX_IOV * sizeof(*tx->tx_pages));
2067                        if (!tx->tx_pages)
2068                                break;
2069                }
2070
2071                LIBCFS_CPT_ALLOC(tx->tx_frags, lnet_cpt_table(), ps->ps_cpt,
2072                                 (1 + IBLND_MAX_RDMA_FRAGS) *
2073                                 sizeof(*tx->tx_frags));
2074                if (!tx->tx_frags)
2075                        break;
2076
2077                sg_init_table(tx->tx_frags, IBLND_MAX_RDMA_FRAGS + 1);
2078
2079                LIBCFS_CPT_ALLOC(tx->tx_wrq, lnet_cpt_table(), ps->ps_cpt,
2080                                 (1 + IBLND_MAX_RDMA_FRAGS) *
2081                                 sizeof(*tx->tx_wrq));
2082                if (!tx->tx_wrq)
2083                        break;
2084
2085                LIBCFS_CPT_ALLOC(tx->tx_sge, lnet_cpt_table(), ps->ps_cpt,
2086                                 (1 + IBLND_MAX_RDMA_FRAGS) *
2087                                 sizeof(*tx->tx_sge));
2088                if (!tx->tx_sge)
2089                        break;
2090
2091                LIBCFS_CPT_ALLOC(tx->tx_rd, lnet_cpt_table(), ps->ps_cpt,
2092                                 offsetof(struct kib_rdma_desc,
2093                                          rd_frags[IBLND_MAX_RDMA_FRAGS]));
2094                if (!tx->tx_rd)
2095                        break;
2096        }
2097
2098        if (i == size) {
2099                kiblnd_map_tx_pool(tpo);
2100                *pp_po = pool;
2101                return 0;
2102        }
2103
2104        ps->ps_pool_destroy(pool);
2105        return -ENOMEM;
2106}
2107
2108static void kiblnd_tx_init(struct kib_pool *pool, struct list_head *node)
2109{
2110        struct kib_tx_poolset *tps = container_of(pool->po_owner,
2111                                                  struct kib_tx_poolset,
2112                                                  tps_poolset);
2113        struct kib_tx *tx = list_entry(node, struct kib_tx, tx_list);
2114
2115        tx->tx_cookie = tps->tps_next_tx_cookie++;
2116}
2117
2118static void kiblnd_net_fini_pools(struct kib_net *net)
2119{
2120        int i;
2121
2122        cfs_cpt_for_each(i, lnet_cpt_table()) {
2123                struct kib_tx_poolset *tps;
2124                struct kib_fmr_poolset *fps;
2125
2126                if (net->ibn_tx_ps) {
2127                        tps = net->ibn_tx_ps[i];
2128                        kiblnd_fini_poolset(&tps->tps_poolset);
2129                }
2130
2131                if (net->ibn_fmr_ps) {
2132                        fps = net->ibn_fmr_ps[i];
2133                        kiblnd_fini_fmr_poolset(fps);
2134                }
2135        }
2136
2137        if (net->ibn_tx_ps) {
2138                cfs_percpt_free(net->ibn_tx_ps);
2139                net->ibn_tx_ps = NULL;
2140        }
2141
2142        if (net->ibn_fmr_ps) {
2143                cfs_percpt_free(net->ibn_fmr_ps);
2144                net->ibn_fmr_ps = NULL;
2145        }
2146}
2147
2148static int kiblnd_net_init_pools(struct kib_net *net, struct lnet_ni *ni,
2149                                 __u32 *cpts, int ncpts)
2150{
2151        struct lnet_ioctl_config_o2iblnd_tunables *tunables;
2152        int cpt;
2153        int rc;
2154        int i;
2155
2156        tunables = &ni->ni_lnd_tunables->lt_tun_u.lt_o2ib;
2157
2158        if (tunables->lnd_fmr_pool_size < *kiblnd_tunables.kib_ntx / 4) {
2159                CERROR("Can't set fmr pool size (%d) < ntx / 4(%d)\n",
2160                       tunables->lnd_fmr_pool_size,
2161                       *kiblnd_tunables.kib_ntx / 4);
2162                rc = -EINVAL;
2163                goto failed;
2164        }
2165
2166        /*
2167         * TX pool must be created later than FMR, see LU-2268
2168         * for details
2169         */
2170        LASSERT(!net->ibn_tx_ps);
2171
2172        /*
2173         * premapping can fail if ibd_nmr > 1, so we always create
2174         * FMR pool and map-on-demand if premapping failed
2175         *
2176         * cfs_precpt_alloc is creating an array of struct kib_fmr_poolset
2177         * The number of struct kib_fmr_poolsets create is equal to the
2178         * number of CPTs that exist, i.e net->ibn_fmr_ps[cpt].
2179         */
2180        net->ibn_fmr_ps = cfs_percpt_alloc(lnet_cpt_table(),
2181                                           sizeof(struct kib_fmr_poolset));
2182        if (!net->ibn_fmr_ps) {
2183                CERROR("Failed to allocate FMR pool array\n");
2184                rc = -ENOMEM;
2185                goto failed;
2186        }
2187
2188        for (i = 0; i < ncpts; i++) {
2189                cpt = !cpts ? i : cpts[i];
2190                rc = kiblnd_init_fmr_poolset(net->ibn_fmr_ps[cpt], cpt, ncpts,
2191                                             net, tunables);
2192                if (rc) {
2193                        CERROR("Can't initialize FMR pool for CPT %d: %d\n",
2194                               cpt, rc);
2195                        goto failed;
2196                }
2197        }
2198
2199        if (i > 0)
2200                LASSERT(i == ncpts);
2201
2202        /*
2203         * cfs_precpt_alloc is creating an array of struct kib_tx_poolset
2204         * The number of struct kib_tx_poolsets create is equal to the
2205         * number of CPTs that exist, i.e net->ibn_tx_ps[cpt].
2206         */
2207        net->ibn_tx_ps = cfs_percpt_alloc(lnet_cpt_table(),
2208                                          sizeof(struct kib_tx_poolset));
2209        if (!net->ibn_tx_ps) {
2210                CERROR("Failed to allocate tx pool array\n");
2211                rc = -ENOMEM;
2212                goto failed;
2213        }
2214
2215        for (i = 0; i < ncpts; i++) {
2216                cpt = !cpts ? i : cpts[i];
2217                rc = kiblnd_init_poolset(&net->ibn_tx_ps[cpt]->tps_poolset,
2218                                         cpt, net, "TX",
2219                                         kiblnd_tx_pool_size(ncpts),
2220                                         kiblnd_create_tx_pool,
2221                                         kiblnd_destroy_tx_pool,
2222                                         kiblnd_tx_init, NULL);
2223                if (rc) {
2224                        CERROR("Can't initialize TX pool for CPT %d: %d\n",
2225                               cpt, rc);
2226                        goto failed;
2227                }
2228        }
2229
2230        return 0;
2231 failed:
2232        kiblnd_net_fini_pools(net);
2233        LASSERT(rc);
2234        return rc;
2235}
2236
2237static int kiblnd_hdev_get_attr(struct kib_hca_dev *hdev)
2238{
2239        /*
2240         * It's safe to assume a HCA can handle a page size
2241         * matching that of the native system
2242         */
2243        hdev->ibh_page_shift = PAGE_SHIFT;
2244        hdev->ibh_page_size  = 1 << PAGE_SHIFT;
2245        hdev->ibh_page_mask  = ~((__u64)hdev->ibh_page_size - 1);
2246
2247        hdev->ibh_mr_size = hdev->ibh_ibdev->attrs.max_mr_size;
2248        if (hdev->ibh_mr_size == ~0ULL) {
2249                hdev->ibh_mr_shift = 64;
2250                return 0;
2251        }
2252
2253        CERROR("Invalid mr size: %#llx\n", hdev->ibh_mr_size);
2254        return -EINVAL;
2255}
2256
2257void kiblnd_hdev_destroy(struct kib_hca_dev *hdev)
2258{
2259        if (hdev->ibh_pd)
2260                ib_dealloc_pd(hdev->ibh_pd);
2261
2262        if (hdev->ibh_cmid)
2263                rdma_destroy_id(hdev->ibh_cmid);
2264
2265        LIBCFS_FREE(hdev, sizeof(*hdev));
2266}
2267
2268/* DUMMY */
2269static int kiblnd_dummy_callback(struct rdma_cm_id *cmid,
2270                                 struct rdma_cm_event *event)
2271{
2272        return 0;
2273}
2274
2275static int kiblnd_dev_need_failover(struct kib_dev *dev)
2276{
2277        struct rdma_cm_id *cmid;
2278        struct sockaddr_in srcaddr;
2279        struct sockaddr_in dstaddr;
2280        int rc;
2281
2282        if (!dev->ibd_hdev || /* initializing */
2283            !dev->ibd_hdev->ibh_cmid || /* listener is dead */
2284            *kiblnd_tunables.kib_dev_failover > 1) /* debugging */
2285                return 1;
2286
2287        /*
2288         * XXX: it's UGLY, but I don't have better way to find
2289         * ib-bonding HCA failover because:
2290         *
2291         * a. no reliable CM event for HCA failover...
2292         * b. no OFED API to get ib_device for current net_device...
2293         *
2294         * We have only two choices at this point:
2295         *
2296         * a. rdma_bind_addr(), it will conflict with listener cmid
2297         * b. rdma_resolve_addr() to zero addr
2298         */
2299        cmid = kiblnd_rdma_create_id(kiblnd_dummy_callback, dev, RDMA_PS_TCP,
2300                                     IB_QPT_RC);
2301        if (IS_ERR(cmid)) {
2302                rc = PTR_ERR(cmid);
2303                CERROR("Failed to create cmid for failover: %d\n", rc);
2304                return rc;
2305        }
2306
2307        memset(&srcaddr, 0, sizeof(srcaddr));
2308        srcaddr.sin_family = AF_INET;
2309        srcaddr.sin_addr.s_addr = (__force u32)htonl(dev->ibd_ifip);
2310
2311        memset(&dstaddr, 0, sizeof(dstaddr));
2312        dstaddr.sin_family = AF_INET;
2313        rc = rdma_resolve_addr(cmid, (struct sockaddr *)&srcaddr,
2314                               (struct sockaddr *)&dstaddr, 1);
2315        if (rc || !cmid->device) {
2316                CERROR("Failed to bind %s:%pI4h to device(%p): %d\n",
2317                       dev->ibd_ifname, &dev->ibd_ifip,
2318                       cmid->device, rc);
2319                rdma_destroy_id(cmid);
2320                return rc;
2321        }
2322
2323        rc = dev->ibd_hdev->ibh_ibdev != cmid->device; /* true for failover */
2324        rdma_destroy_id(cmid);
2325
2326        return rc;
2327}
2328
2329int kiblnd_dev_failover(struct kib_dev *dev)
2330{
2331        LIST_HEAD(zombie_tpo);
2332        LIST_HEAD(zombie_ppo);
2333        LIST_HEAD(zombie_fpo);
2334        struct rdma_cm_id *cmid  = NULL;
2335        struct kib_hca_dev *hdev  = NULL;
2336        struct ib_pd *pd;
2337        struct kib_net *net;
2338        struct sockaddr_in addr;
2339        unsigned long flags;
2340        int rc = 0;
2341        int i;
2342
2343        LASSERT(*kiblnd_tunables.kib_dev_failover > 1 ||
2344                dev->ibd_can_failover || !dev->ibd_hdev);
2345
2346        rc = kiblnd_dev_need_failover(dev);
2347        if (rc <= 0)
2348                goto out;
2349
2350        if (dev->ibd_hdev &&
2351            dev->ibd_hdev->ibh_cmid) {
2352                /*
2353                 * XXX it's not good to close old listener at here,
2354                 * because we can fail to create new listener.
2355                 * But we have to close it now, otherwise rdma_bind_addr
2356                 * will return EADDRINUSE... How crap!
2357                 */
2358                write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2359
2360                cmid = dev->ibd_hdev->ibh_cmid;
2361                /*
2362                 * make next schedule of kiblnd_dev_need_failover()
2363                 * return 1 for me
2364                 */
2365                dev->ibd_hdev->ibh_cmid  = NULL;
2366                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2367
2368                rdma_destroy_id(cmid);
2369        }
2370
2371        cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, dev, RDMA_PS_TCP,
2372                                     IB_QPT_RC);
2373        if (IS_ERR(cmid)) {
2374                rc = PTR_ERR(cmid);
2375                CERROR("Failed to create cmid for failover: %d\n", rc);
2376                goto out;
2377        }
2378
2379        memset(&addr, 0, sizeof(addr));
2380        addr.sin_family      = AF_INET;
2381        addr.sin_addr.s_addr = (__force u32)htonl(dev->ibd_ifip);
2382        addr.sin_port   = htons(*kiblnd_tunables.kib_service);
2383
2384        /* Bind to failover device or port */
2385        rc = rdma_bind_addr(cmid, (struct sockaddr *)&addr);
2386        if (rc || !cmid->device) {
2387                CERROR("Failed to bind %s:%pI4h to device(%p): %d\n",
2388                       dev->ibd_ifname, &dev->ibd_ifip,
2389                       cmid->device, rc);
2390                rdma_destroy_id(cmid);
2391                goto out;
2392        }
2393
2394        LIBCFS_ALLOC(hdev, sizeof(*hdev));
2395        if (!hdev) {
2396                CERROR("Failed to allocate kib_hca_dev\n");
2397                rdma_destroy_id(cmid);
2398                rc = -ENOMEM;
2399                goto out;
2400        }
2401
2402        atomic_set(&hdev->ibh_ref, 1);
2403        hdev->ibh_dev   = dev;
2404        hdev->ibh_cmid  = cmid;
2405        hdev->ibh_ibdev = cmid->device;
2406
2407        pd = ib_alloc_pd(cmid->device, 0);
2408        if (IS_ERR(pd)) {
2409                rc = PTR_ERR(pd);
2410                CERROR("Can't allocate PD: %d\n", rc);
2411                goto out;
2412        }
2413
2414        hdev->ibh_pd = pd;
2415
2416        rc = rdma_listen(cmid, 0);
2417        if (rc) {
2418                CERROR("Can't start new listener: %d\n", rc);
2419                goto out;
2420        }
2421
2422        rc = kiblnd_hdev_get_attr(hdev);
2423        if (rc) {
2424                CERROR("Can't get device attributes: %d\n", rc);
2425                goto out;
2426        }
2427
2428        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2429
2430        swap(dev->ibd_hdev, hdev); /* take over the refcount */
2431
2432        list_for_each_entry(net, &dev->ibd_nets, ibn_list) {
2433                cfs_cpt_for_each(i, lnet_cpt_table()) {
2434                        kiblnd_fail_poolset(&net->ibn_tx_ps[i]->tps_poolset,
2435                                            &zombie_tpo);
2436
2437                        if (net->ibn_fmr_ps)
2438                                kiblnd_fail_fmr_poolset(net->ibn_fmr_ps[i],
2439                                                        &zombie_fpo);
2440                }
2441        }
2442
2443        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2444 out:
2445        if (!list_empty(&zombie_tpo))
2446                kiblnd_destroy_pool_list(&zombie_tpo);
2447        if (!list_empty(&zombie_ppo))
2448                kiblnd_destroy_pool_list(&zombie_ppo);
2449        if (!list_empty(&zombie_fpo))
2450                kiblnd_destroy_fmr_pool_list(&zombie_fpo);
2451        if (hdev)
2452                kiblnd_hdev_decref(hdev);
2453
2454        if (rc)
2455                dev->ibd_failed_failover++;
2456        else
2457                dev->ibd_failed_failover = 0;
2458
2459        return rc;
2460}
2461
2462void kiblnd_destroy_dev(struct kib_dev *dev)
2463{
2464        LASSERT(!dev->ibd_nnets);
2465        LASSERT(list_empty(&dev->ibd_nets));
2466
2467        list_del(&dev->ibd_fail_list);
2468        list_del(&dev->ibd_list);
2469
2470        if (dev->ibd_hdev)
2471                kiblnd_hdev_decref(dev->ibd_hdev);
2472
2473        LIBCFS_FREE(dev, sizeof(*dev));
2474}
2475
2476static struct kib_dev *kiblnd_create_dev(char *ifname)
2477{
2478        struct net_device *netdev;
2479        struct kib_dev *dev;
2480        __u32 netmask;
2481        __u32 ip;
2482        int up;
2483        int rc;
2484
2485        rc = lnet_ipif_query(ifname, &up, &ip, &netmask);
2486        if (rc) {
2487                CERROR("Can't query IPoIB interface %s: %d\n",
2488                       ifname, rc);
2489                return NULL;
2490        }
2491
2492        if (!up) {
2493                CERROR("Can't query IPoIB interface %s: it's down\n", ifname);
2494                return NULL;
2495        }
2496
2497        LIBCFS_ALLOC(dev, sizeof(*dev));
2498        if (!dev)
2499                return NULL;
2500
2501        netdev = dev_get_by_name(&init_net, ifname);
2502        if (!netdev) {
2503                dev->ibd_can_failover = 0;
2504        } else {
2505                dev->ibd_can_failover = !!(netdev->flags & IFF_MASTER);
2506                dev_put(netdev);
2507        }
2508
2509        INIT_LIST_HEAD(&dev->ibd_nets);
2510        INIT_LIST_HEAD(&dev->ibd_list); /* not yet in kib_devs */
2511        INIT_LIST_HEAD(&dev->ibd_fail_list);
2512        dev->ibd_ifip = ip;
2513        strcpy(&dev->ibd_ifname[0], ifname);
2514
2515        /* initialize the device */
2516        rc = kiblnd_dev_failover(dev);
2517        if (rc) {
2518                CERROR("Can't initialize device: %d\n", rc);
2519                LIBCFS_FREE(dev, sizeof(*dev));
2520                return NULL;
2521        }
2522
2523        list_add_tail(&dev->ibd_list, &kiblnd_data.kib_devs);
2524        return dev;
2525}
2526
2527static void kiblnd_base_shutdown(void)
2528{
2529        struct kib_sched_info *sched;
2530        int i;
2531
2532        LASSERT(list_empty(&kiblnd_data.kib_devs));
2533
2534        switch (kiblnd_data.kib_init) {
2535        default:
2536                LBUG();
2537
2538        case IBLND_INIT_ALL:
2539        case IBLND_INIT_DATA:
2540                LASSERT(kiblnd_data.kib_peers);
2541                for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++)
2542                        LASSERT(list_empty(&kiblnd_data.kib_peers[i]));
2543                LASSERT(list_empty(&kiblnd_data.kib_connd_zombies));
2544                LASSERT(list_empty(&kiblnd_data.kib_connd_conns));
2545                LASSERT(list_empty(&kiblnd_data.kib_reconn_list));
2546                LASSERT(list_empty(&kiblnd_data.kib_reconn_wait));
2547
2548                /* flag threads to terminate; wake and wait for them to die */
2549                kiblnd_data.kib_shutdown = 1;
2550
2551                /*
2552                 * NB: we really want to stop scheduler threads net by net
2553                 * instead of the whole module, this should be improved
2554                 * with dynamic configuration LNet
2555                 */
2556                cfs_percpt_for_each(sched, i, kiblnd_data.kib_scheds)
2557                        wake_up_all(&sched->ibs_waitq);
2558
2559                wake_up_all(&kiblnd_data.kib_connd_waitq);
2560                wake_up_all(&kiblnd_data.kib_failover_waitq);
2561
2562                i = 2;
2563                while (atomic_read(&kiblnd_data.kib_nthreads)) {
2564                        i++;
2565                        /* power of 2 ? */
2566                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
2567                               "Waiting for %d threads to terminate\n",
2568                               atomic_read(&kiblnd_data.kib_nthreads));
2569                        set_current_state(TASK_UNINTERRUPTIBLE);
2570                        schedule_timeout(cfs_time_seconds(1));
2571                }
2572
2573                /* fall through */
2574
2575        case IBLND_INIT_NOTHING:
2576                break;
2577        }
2578
2579        if (kiblnd_data.kib_peers) {
2580                LIBCFS_FREE(kiblnd_data.kib_peers,
2581                            sizeof(struct list_head) *
2582                            kiblnd_data.kib_peer_hash_size);
2583        }
2584
2585        if (kiblnd_data.kib_scheds)
2586                cfs_percpt_free(kiblnd_data.kib_scheds);
2587
2588        kiblnd_data.kib_init = IBLND_INIT_NOTHING;
2589        module_put(THIS_MODULE);
2590}
2591
2592static void kiblnd_shutdown(struct lnet_ni *ni)
2593{
2594        struct kib_net *net = ni->ni_data;
2595        rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
2596        int i;
2597        unsigned long flags;
2598
2599        LASSERT(kiblnd_data.kib_init == IBLND_INIT_ALL);
2600
2601        if (!net)
2602                goto out;
2603
2604        write_lock_irqsave(g_lock, flags);
2605        net->ibn_shutdown = 1;
2606        write_unlock_irqrestore(g_lock, flags);
2607
2608        switch (net->ibn_init) {
2609        default:
2610                LBUG();
2611
2612        case IBLND_INIT_ALL:
2613                /* nuke all existing peers within this net */
2614                kiblnd_del_peer(ni, LNET_NID_ANY);
2615
2616                /* Wait for all peer state to clean up */
2617                i = 2;
2618                while (atomic_read(&net->ibn_npeers)) {
2619                        i++;
2620                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n? */
2621                               "%s: waiting for %d peers to disconnect\n",
2622                               libcfs_nid2str(ni->ni_nid),
2623                               atomic_read(&net->ibn_npeers));
2624                        set_current_state(TASK_UNINTERRUPTIBLE);
2625                        schedule_timeout(cfs_time_seconds(1));
2626                }
2627
2628                kiblnd_net_fini_pools(net);
2629
2630                write_lock_irqsave(g_lock, flags);
2631                LASSERT(net->ibn_dev->ibd_nnets > 0);
2632                net->ibn_dev->ibd_nnets--;
2633                list_del(&net->ibn_list);
2634                write_unlock_irqrestore(g_lock, flags);
2635
2636                /* fall through */
2637
2638        case IBLND_INIT_NOTHING:
2639                LASSERT(!atomic_read(&net->ibn_nconns));
2640
2641                if (net->ibn_dev && !net->ibn_dev->ibd_nnets)
2642                        kiblnd_destroy_dev(net->ibn_dev);
2643
2644                break;
2645        }
2646
2647        net->ibn_init = IBLND_INIT_NOTHING;
2648        ni->ni_data = NULL;
2649
2650        LIBCFS_FREE(net, sizeof(*net));
2651
2652out:
2653        if (list_empty(&kiblnd_data.kib_devs))
2654                kiblnd_base_shutdown();
2655}
2656
2657static int kiblnd_base_startup(void)
2658{
2659        struct kib_sched_info *sched;
2660        int rc;
2661        int i;
2662
2663        LASSERT(kiblnd_data.kib_init == IBLND_INIT_NOTHING);
2664
2665        try_module_get(THIS_MODULE);
2666        /* zero pointers, flags etc */
2667        memset(&kiblnd_data, 0, sizeof(kiblnd_data));
2668
2669        rwlock_init(&kiblnd_data.kib_global_lock);
2670
2671        INIT_LIST_HEAD(&kiblnd_data.kib_devs);
2672        INIT_LIST_HEAD(&kiblnd_data.kib_failed_devs);
2673
2674        kiblnd_data.kib_peer_hash_size = IBLND_PEER_HASH_SIZE;
2675        LIBCFS_ALLOC(kiblnd_data.kib_peers,
2676                     sizeof(struct list_head) * kiblnd_data.kib_peer_hash_size);
2677        if (!kiblnd_data.kib_peers)
2678                goto failed;
2679        for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++)
2680                INIT_LIST_HEAD(&kiblnd_data.kib_peers[i]);
2681
2682        spin_lock_init(&kiblnd_data.kib_connd_lock);
2683        INIT_LIST_HEAD(&kiblnd_data.kib_connd_conns);
2684        INIT_LIST_HEAD(&kiblnd_data.kib_connd_zombies);
2685        INIT_LIST_HEAD(&kiblnd_data.kib_reconn_list);
2686        INIT_LIST_HEAD(&kiblnd_data.kib_reconn_wait);
2687
2688        init_waitqueue_head(&kiblnd_data.kib_connd_waitq);
2689        init_waitqueue_head(&kiblnd_data.kib_failover_waitq);
2690
2691        kiblnd_data.kib_scheds = cfs_percpt_alloc(lnet_cpt_table(),
2692                                                  sizeof(*sched));
2693        if (!kiblnd_data.kib_scheds)
2694                goto failed;
2695
2696        cfs_percpt_for_each(sched, i, kiblnd_data.kib_scheds) {
2697                int nthrs;
2698
2699                spin_lock_init(&sched->ibs_lock);
2700                INIT_LIST_HEAD(&sched->ibs_conns);
2701                init_waitqueue_head(&sched->ibs_waitq);
2702
2703                nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2704                if (*kiblnd_tunables.kib_nscheds > 0) {
2705                        nthrs = min(nthrs, *kiblnd_tunables.kib_nscheds);
2706                } else {
2707                        /*
2708                         * max to half of CPUs, another half is reserved for
2709                         * upper layer modules
2710                         */
2711                        nthrs = min(max(IBLND_N_SCHED, nthrs >> 1), nthrs);
2712                }
2713
2714                sched->ibs_nthreads_max = nthrs;
2715                sched->ibs_cpt = i;
2716        }
2717
2718        kiblnd_data.kib_error_qpa.qp_state = IB_QPS_ERR;
2719
2720        /* lists/ptrs/locks initialised */
2721        kiblnd_data.kib_init = IBLND_INIT_DATA;
2722        /*****************************************************/
2723
2724        rc = kiblnd_thread_start(kiblnd_connd, NULL, "kiblnd_connd");
2725        if (rc) {
2726                CERROR("Can't spawn o2iblnd connd: %d\n", rc);
2727                goto failed;
2728        }
2729
2730        if (*kiblnd_tunables.kib_dev_failover)
2731                rc = kiblnd_thread_start(kiblnd_failover_thread, NULL,
2732                                         "kiblnd_failover");
2733
2734        if (rc) {
2735                CERROR("Can't spawn o2iblnd failover thread: %d\n", rc);
2736                goto failed;
2737        }
2738
2739        /* flag everything initialised */
2740        kiblnd_data.kib_init = IBLND_INIT_ALL;
2741        /*****************************************************/
2742
2743        return 0;
2744
2745 failed:
2746        kiblnd_base_shutdown();
2747        return -ENETDOWN;
2748}
2749
2750static int kiblnd_start_schedulers(struct kib_sched_info *sched)
2751{
2752        int rc = 0;
2753        int nthrs;
2754        int i;
2755
2756        if (!sched->ibs_nthreads) {
2757                if (*kiblnd_tunables.kib_nscheds > 0) {
2758                        nthrs = sched->ibs_nthreads_max;
2759                } else {
2760                        nthrs = cfs_cpt_weight(lnet_cpt_table(),
2761                                               sched->ibs_cpt);
2762                        nthrs = min(max(IBLND_N_SCHED, nthrs >> 1), nthrs);
2763                        nthrs = min(IBLND_N_SCHED_HIGH, nthrs);
2764                }
2765        } else {
2766                LASSERT(sched->ibs_nthreads <= sched->ibs_nthreads_max);
2767                /* increase one thread if there is new interface */
2768                nthrs = sched->ibs_nthreads < sched->ibs_nthreads_max;
2769        }
2770
2771        for (i = 0; i < nthrs; i++) {
2772                long id;
2773                char name[20];
2774
2775                id = KIB_THREAD_ID(sched->ibs_cpt, sched->ibs_nthreads + i);
2776                snprintf(name, sizeof(name), "kiblnd_sd_%02ld_%02ld",
2777                         KIB_THREAD_CPT(id), KIB_THREAD_TID(id));
2778                rc = kiblnd_thread_start(kiblnd_scheduler, (void *)id, name);
2779                if (!rc)
2780                        continue;
2781
2782                CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2783                       sched->ibs_cpt, sched->ibs_nthreads + i, rc);
2784                break;
2785        }
2786
2787        sched->ibs_nthreads += i;
2788        return rc;
2789}
2790
2791static int kiblnd_dev_start_threads(struct kib_dev *dev, int newdev, __u32 *cpts,
2792                                    int ncpts)
2793{
2794        int cpt;
2795        int rc;
2796        int i;
2797
2798        for (i = 0; i < ncpts; i++) {
2799                struct kib_sched_info *sched;
2800
2801                cpt = !cpts ? i : cpts[i];
2802                sched = kiblnd_data.kib_scheds[cpt];
2803
2804                if (!newdev && sched->ibs_nthreads > 0)
2805                        continue;
2806
2807                rc = kiblnd_start_schedulers(kiblnd_data.kib_scheds[cpt]);
2808                if (rc) {
2809                        CERROR("Failed to start scheduler threads for %s\n",
2810                               dev->ibd_ifname);
2811                        return rc;
2812                }
2813        }
2814        return 0;
2815}
2816
2817static struct kib_dev *kiblnd_dev_search(char *ifname)
2818{
2819        struct kib_dev *alias = NULL;
2820        struct kib_dev *dev;
2821        char *colon;
2822        char *colon2;
2823
2824        colon = strchr(ifname, ':');
2825        list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
2826                if (!strcmp(&dev->ibd_ifname[0], ifname))
2827                        return dev;
2828
2829                if (alias)
2830                        continue;
2831
2832                colon2 = strchr(dev->ibd_ifname, ':');
2833                if (colon)
2834                        *colon = 0;
2835                if (colon2)
2836                        *colon2 = 0;
2837
2838                if (!strcmp(&dev->ibd_ifname[0], ifname))
2839                        alias = dev;
2840
2841                if (colon)
2842                        *colon = ':';
2843                if (colon2)
2844                        *colon2 = ':';
2845        }
2846        return alias;
2847}
2848
2849static int kiblnd_startup(struct lnet_ni *ni)
2850{
2851        char *ifname;
2852        struct kib_dev *ibdev = NULL;
2853        struct kib_net *net;
2854        struct timespec64 tv;
2855        unsigned long flags;
2856        int rc;
2857        int newdev;
2858
2859        LASSERT(ni->ni_lnd == &the_o2iblnd);
2860
2861        if (kiblnd_data.kib_init == IBLND_INIT_NOTHING) {
2862                rc = kiblnd_base_startup();
2863                if (rc)
2864                        return rc;
2865        }
2866
2867        LIBCFS_ALLOC(net, sizeof(*net));
2868        ni->ni_data = net;
2869        if (!net)
2870                goto net_failed;
2871
2872        ktime_get_real_ts64(&tv);
2873        net->ibn_incarnation = tv.tv_sec * USEC_PER_SEC +
2874                               tv.tv_nsec / NSEC_PER_USEC;
2875
2876        rc = kiblnd_tunables_setup(ni);
2877        if (rc)
2878                goto net_failed;
2879
2880        if (ni->ni_interfaces[0]) {
2881                /* Use the IPoIB interface specified in 'networks=' */
2882
2883                BUILD_BUG_ON(LNET_MAX_INTERFACES <= 1);
2884                if (ni->ni_interfaces[1]) {
2885                        CERROR("Multiple interfaces not supported\n");
2886                        goto failed;
2887                }
2888
2889                ifname = ni->ni_interfaces[0];
2890        } else {
2891                ifname = *kiblnd_tunables.kib_default_ipif;
2892        }
2893
2894        if (strlen(ifname) >= sizeof(ibdev->ibd_ifname)) {
2895                CERROR("IPoIB interface name too long: %s\n", ifname);
2896                goto failed;
2897        }
2898
2899        ibdev = kiblnd_dev_search(ifname);
2900
2901        newdev = !ibdev;
2902        /* hmm...create kib_dev even for alias */
2903        if (!ibdev || strcmp(&ibdev->ibd_ifname[0], ifname))
2904                ibdev = kiblnd_create_dev(ifname);
2905
2906        if (!ibdev)
2907                goto failed;
2908
2909        net->ibn_dev = ibdev;
2910        ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ibdev->ibd_ifip);
2911
2912        rc = kiblnd_dev_start_threads(ibdev, newdev,
2913                                      ni->ni_cpts, ni->ni_ncpts);
2914        if (rc)
2915                goto failed;
2916
2917        rc = kiblnd_net_init_pools(net, ni, ni->ni_cpts, ni->ni_ncpts);
2918        if (rc) {
2919                CERROR("Failed to initialize NI pools: %d\n", rc);
2920                goto failed;
2921        }
2922
2923        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2924        ibdev->ibd_nnets++;
2925        list_add_tail(&net->ibn_list, &ibdev->ibd_nets);
2926        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2927
2928        net->ibn_init = IBLND_INIT_ALL;
2929
2930        return 0;
2931
2932failed:
2933        if (!net->ibn_dev && ibdev)
2934                kiblnd_destroy_dev(ibdev);
2935
2936net_failed:
2937        kiblnd_shutdown(ni);
2938
2939        CDEBUG(D_NET, "kiblnd_startup failed\n");
2940        return -ENETDOWN;
2941}
2942
2943static struct lnet_lnd the_o2iblnd = {
2944        .lnd_type       = O2IBLND,
2945        .lnd_startup    = kiblnd_startup,
2946        .lnd_shutdown   = kiblnd_shutdown,
2947        .lnd_ctl        = kiblnd_ctl,
2948        .lnd_query      = kiblnd_query,
2949        .lnd_send       = kiblnd_send,
2950        .lnd_recv       = kiblnd_recv,
2951};
2952
2953static void __exit ko2iblnd_exit(void)
2954{
2955        lnet_unregister_lnd(&the_o2iblnd);
2956}
2957
2958static int __init ko2iblnd_init(void)
2959{
2960        BUILD_BUG_ON(sizeof(struct kib_msg) > IBLND_MSG_SIZE);
2961        BUILD_BUG_ON(offsetof(struct kib_msg,
2962                          ibm_u.get.ibgm_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
2963                          > IBLND_MSG_SIZE);
2964        BUILD_BUG_ON(offsetof(struct kib_msg,
2965                          ibm_u.putack.ibpam_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
2966                          > IBLND_MSG_SIZE);
2967
2968        kiblnd_tunables_init();
2969
2970        lnet_register_lnd(&the_o2iblnd);
2971
2972        return 0;
2973}
2974
2975MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2976MODULE_DESCRIPTION("OpenIB gen2 LNet Network Driver");
2977MODULE_VERSION("2.7.0");
2978MODULE_LICENSE("GPL");
2979
2980module_init(ko2iblnd_init);
2981module_exit(ko2iblnd_exit);
2982