linux/net/rds/ib_cm.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/in.h>
  35#include <linux/slab.h>
  36#include <linux/vmalloc.h>
  37#include <linux/ratelimit.h>
  38
  39#include "rds.h"
  40#include "ib.h"
  41
  42/*
  43 * Set the selected protocol version
  44 */
  45static void rds_ib_set_protocol(struct rds_connection *conn, unsigned int version)
  46{
  47        conn->c_version = version;
  48}
  49
  50/*
  51 * Set up flow control
  52 */
  53static void rds_ib_set_flow_control(struct rds_connection *conn, u32 credits)
  54{
  55        struct rds_ib_connection *ic = conn->c_transport_data;
  56
  57        if (rds_ib_sysctl_flow_control && credits != 0) {
  58                /* We're doing flow control */
  59                ic->i_flowctl = 1;
  60                rds_ib_send_add_credits(conn, credits);
  61        } else {
  62                ic->i_flowctl = 0;
  63        }
  64}
  65
  66/*
  67 * Tune RNR behavior. Without flow control, we use a rather
  68 * low timeout, but not the absolute minimum - this should
  69 * be tunable.
  70 *
  71 * We already set the RNR retry count to 7 (which is the
  72 * smallest infinite number :-) above.
  73 * If flow control is off, we want to change this back to 0
  74 * so that we learn quickly when our credit accounting is
  75 * buggy.
  76 *
  77 * Caller passes in a qp_attr pointer - don't waste stack spacv
  78 * by allocation this twice.
  79 */
  80static void
  81rds_ib_tune_rnr(struct rds_ib_connection *ic, struct ib_qp_attr *attr)
  82{
  83        int ret;
  84
  85        attr->min_rnr_timer = IB_RNR_TIMER_000_32;
  86        ret = ib_modify_qp(ic->i_cm_id->qp, attr, IB_QP_MIN_RNR_TIMER);
  87        if (ret)
  88                printk(KERN_NOTICE "ib_modify_qp(IB_QP_MIN_RNR_TIMER): err=%d\n", -ret);
  89}
  90
  91/*
  92 * Connection established.
  93 * We get here for both outgoing and incoming connection.
  94 */
  95void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_event *event)
  96{
  97        const struct rds_ib_connect_private *dp = NULL;
  98        struct rds_ib_connection *ic = conn->c_transport_data;
  99        struct ib_qp_attr qp_attr;
 100        int err;
 101
 102        if (event->param.conn.private_data_len >= sizeof(*dp)) {
 103                dp = event->param.conn.private_data;
 104
 105                /* make sure it isn't empty data */
 106                if (dp->dp_protocol_major) {
 107                        rds_ib_set_protocol(conn,
 108                                RDS_PROTOCOL(dp->dp_protocol_major,
 109                                dp->dp_protocol_minor));
 110                        rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
 111                }
 112        }
 113
 114        if (conn->c_version < RDS_PROTOCOL(3, 1)) {
 115                printk(KERN_NOTICE "RDS/IB: Connection to %pI4 version %u.%u failed,"
 116                       " no longer supported\n",
 117                       &conn->c_faddr,
 118                       RDS_PROTOCOL_MAJOR(conn->c_version),
 119                       RDS_PROTOCOL_MINOR(conn->c_version));
 120                rds_conn_destroy(conn);
 121                return;
 122        } else {
 123                printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n",
 124                       &conn->c_faddr,
 125                       RDS_PROTOCOL_MAJOR(conn->c_version),
 126                       RDS_PROTOCOL_MINOR(conn->c_version),
 127                       ic->i_flowctl ? ", flow control" : "");
 128        }
 129
 130        /*
 131         * Init rings and fill recv. this needs to wait until protocol negotiation
 132         * is complete, since ring layout is different from 3.0 to 3.1.
 133         */
 134        rds_ib_send_init_ring(ic);
 135        rds_ib_recv_init_ring(ic);
 136        /* Post receive buffers - as a side effect, this will update
 137         * the posted credit count. */
 138        rds_ib_recv_refill(conn, 1, GFP_KERNEL);
 139
 140        /* Tune RNR behavior */
 141        rds_ib_tune_rnr(ic, &qp_attr);
 142
 143        qp_attr.qp_state = IB_QPS_RTS;
 144        err = ib_modify_qp(ic->i_cm_id->qp, &qp_attr, IB_QP_STATE);
 145        if (err)
 146                printk(KERN_NOTICE "ib_modify_qp(IB_QP_STATE, RTS): err=%d\n", err);
 147
 148        /* update ib_device with this local ipaddr */
 149        err = rds_ib_update_ipaddr(ic->rds_ibdev, conn->c_laddr);
 150        if (err)
 151                printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n",
 152                        err);
 153
 154        /* If the peer gave us the last packet it saw, process this as if
 155         * we had received a regular ACK. */
 156        if (dp) {
 157                /* dp structure start is not guaranteed to be 8 bytes aligned.
 158                 * Since dp_ack_seq is 64-bit extended load operations can be
 159                 * used so go through get_unaligned to avoid unaligned errors.
 160                 */
 161                __be64 dp_ack_seq = get_unaligned(&dp->dp_ack_seq);
 162
 163                if (dp_ack_seq)
 164                        rds_send_drop_acked(conn, be64_to_cpu(dp_ack_seq),
 165                                            NULL);
 166        }
 167
 168        rds_connect_complete(conn);
 169}
 170
 171static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
 172                        struct rdma_conn_param *conn_param,
 173                        struct rds_ib_connect_private *dp,
 174                        u32 protocol_version,
 175                        u32 max_responder_resources,
 176                        u32 max_initiator_depth)
 177{
 178        struct rds_ib_connection *ic = conn->c_transport_data;
 179        struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
 180
 181        memset(conn_param, 0, sizeof(struct rdma_conn_param));
 182
 183        conn_param->responder_resources =
 184                min_t(u32, rds_ibdev->max_responder_resources, max_responder_resources);
 185        conn_param->initiator_depth =
 186                min_t(u32, rds_ibdev->max_initiator_depth, max_initiator_depth);
 187        conn_param->retry_count = min_t(unsigned int, rds_ib_retry_count, 7);
 188        conn_param->rnr_retry_count = 7;
 189
 190        if (dp) {
 191                memset(dp, 0, sizeof(*dp));
 192                dp->dp_saddr = conn->c_laddr;
 193                dp->dp_daddr = conn->c_faddr;
 194                dp->dp_protocol_major = RDS_PROTOCOL_MAJOR(protocol_version);
 195                dp->dp_protocol_minor = RDS_PROTOCOL_MINOR(protocol_version);
 196                dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IB_SUPPORTED_PROTOCOLS);
 197                dp->dp_ack_seq = cpu_to_be64(rds_ib_piggyb_ack(ic));
 198
 199                /* Advertise flow control */
 200                if (ic->i_flowctl) {
 201                        unsigned int credits;
 202
 203                        credits = IB_GET_POST_CREDITS(atomic_read(&ic->i_credits));
 204                        dp->dp_credit = cpu_to_be32(credits);
 205                        atomic_sub(IB_SET_POST_CREDITS(credits), &ic->i_credits);
 206                }
 207
 208                conn_param->private_data = dp;
 209                conn_param->private_data_len = sizeof(*dp);
 210        }
 211}
 212
 213static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
 214{
 215        rdsdebug("event %u (%s) data %p\n",
 216                 event->event, ib_event_msg(event->event), data);
 217}
 218
 219/* Plucking the oldest entry from the ring can be done concurrently with
 220 * the thread refilling the ring.  Each ring operation is protected by
 221 * spinlocks and the transient state of refilling doesn't change the
 222 * recording of which entry is oldest.
 223 *
 224 * This relies on IB only calling one cq comp_handler for each cq so that
 225 * there will only be one caller of rds_recv_incoming() per RDS connection.
 226 */
 227static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
 228{
 229        struct rds_connection *conn = context;
 230        struct rds_ib_connection *ic = conn->c_transport_data;
 231
 232        rdsdebug("conn %p cq %p\n", conn, cq);
 233
 234        rds_ib_stats_inc(s_ib_evt_handler_call);
 235
 236        tasklet_schedule(&ic->i_recv_tasklet);
 237}
 238
 239static void poll_scq(struct rds_ib_connection *ic, struct ib_cq *cq,
 240                     struct ib_wc *wcs)
 241{
 242        int nr, i;
 243        struct ib_wc *wc;
 244
 245        while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) {
 246                for (i = 0; i < nr; i++) {
 247                        wc = wcs + i;
 248                        rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 249                                 (unsigned long long)wc->wr_id, wc->status,
 250                                 wc->byte_len, be32_to_cpu(wc->ex.imm_data));
 251
 252                        if (wc->wr_id <= ic->i_send_ring.w_nr ||
 253                            wc->wr_id == RDS_IB_ACK_WR_ID)
 254                                rds_ib_send_cqe_handler(ic, wc);
 255                        else
 256                                rds_ib_mr_cqe_handler(ic, wc);
 257
 258                }
 259        }
 260}
 261
 262static void rds_ib_tasklet_fn_send(unsigned long data)
 263{
 264        struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
 265        struct rds_connection *conn = ic->conn;
 266
 267        rds_ib_stats_inc(s_ib_tasklet_call);
 268
 269        poll_scq(ic, ic->i_send_cq, ic->i_send_wc);
 270        ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
 271        poll_scq(ic, ic->i_send_cq, ic->i_send_wc);
 272
 273        if (rds_conn_up(conn) &&
 274            (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
 275            test_bit(0, &conn->c_map_queued)))
 276                rds_send_xmit(ic->conn);
 277}
 278
 279static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq,
 280                     struct ib_wc *wcs,
 281                     struct rds_ib_ack_state *ack_state)
 282{
 283        int nr, i;
 284        struct ib_wc *wc;
 285
 286        while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) {
 287                for (i = 0; i < nr; i++) {
 288                        wc = wcs + i;
 289                        rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 290                                 (unsigned long long)wc->wr_id, wc->status,
 291                                 wc->byte_len, be32_to_cpu(wc->ex.imm_data));
 292
 293                        rds_ib_recv_cqe_handler(ic, wc, ack_state);
 294                }
 295        }
 296}
 297
 298static void rds_ib_tasklet_fn_recv(unsigned long data)
 299{
 300        struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
 301        struct rds_connection *conn = ic->conn;
 302        struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
 303        struct rds_ib_ack_state state;
 304
 305        if (!rds_ibdev)
 306                rds_conn_drop(conn);
 307
 308        rds_ib_stats_inc(s_ib_tasklet_call);
 309
 310        memset(&state, 0, sizeof(state));
 311        poll_rcq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
 312        ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
 313        poll_rcq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
 314
 315        if (state.ack_next_valid)
 316                rds_ib_set_ack(ic, state.ack_next, state.ack_required);
 317        if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
 318                rds_send_drop_acked(conn, state.ack_recv, NULL);
 319                ic->i_ack_recv = state.ack_recv;
 320        }
 321
 322        if (rds_conn_up(conn))
 323                rds_ib_attempt_ack(ic);
 324}
 325
 326static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 327{
 328        struct rds_connection *conn = data;
 329        struct rds_ib_connection *ic = conn->c_transport_data;
 330
 331        rdsdebug("conn %p ic %p event %u (%s)\n", conn, ic, event->event,
 332                 ib_event_msg(event->event));
 333
 334        switch (event->event) {
 335        case IB_EVENT_COMM_EST:
 336                rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
 337                break;
 338        default:
 339                rdsdebug("Fatal QP Event %u (%s) "
 340                        "- connection %pI4->%pI4, reconnecting\n",
 341                        event->event, ib_event_msg(event->event),
 342                        &conn->c_laddr, &conn->c_faddr);
 343                rds_conn_drop(conn);
 344                break;
 345        }
 346}
 347
 348static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
 349{
 350        struct rds_connection *conn = context;
 351        struct rds_ib_connection *ic = conn->c_transport_data;
 352
 353        rdsdebug("conn %p cq %p\n", conn, cq);
 354
 355        rds_ib_stats_inc(s_ib_evt_handler_call);
 356
 357        tasklet_schedule(&ic->i_send_tasklet);
 358}
 359
 360/*
 361 * This needs to be very careful to not leave IS_ERR pointers around for
 362 * cleanup to trip over.
 363 */
 364static int rds_ib_setup_qp(struct rds_connection *conn)
 365{
 366        struct rds_ib_connection *ic = conn->c_transport_data;
 367        struct ib_device *dev = ic->i_cm_id->device;
 368        struct ib_qp_init_attr attr;
 369        struct ib_cq_init_attr cq_attr = {};
 370        struct rds_ib_device *rds_ibdev;
 371        int ret, fr_queue_space;
 372
 373        /*
 374         * It's normal to see a null device if an incoming connection races
 375         * with device removal, so we don't print a warning.
 376         */
 377        rds_ibdev = rds_ib_get_client_data(dev);
 378        if (!rds_ibdev)
 379                return -EOPNOTSUPP;
 380
 381        /* The fr_queue_space is currently set to 512, to add extra space on
 382         * completion queue and send queue. This extra space is used for FRMR
 383         * registration and invalidation work requests
 384         */
 385        fr_queue_space = (rds_ibdev->use_fastreg ? RDS_IB_DEFAULT_FR_WR : 0);
 386
 387        /* add the conn now so that connection establishment has the dev */
 388        rds_ib_add_conn(rds_ibdev, conn);
 389
 390        if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
 391                rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
 392        if (rds_ibdev->max_wrs < ic->i_recv_ring.w_nr + 1)
 393                rds_ib_ring_resize(&ic->i_recv_ring, rds_ibdev->max_wrs - 1);
 394
 395        /* Protection domain and memory range */
 396        ic->i_pd = rds_ibdev->pd;
 397
 398        cq_attr.cqe = ic->i_send_ring.w_nr + fr_queue_space + 1;
 399
 400        ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
 401                                     rds_ib_cq_event_handler, conn,
 402                                     &cq_attr);
 403        if (IS_ERR(ic->i_send_cq)) {
 404                ret = PTR_ERR(ic->i_send_cq);
 405                ic->i_send_cq = NULL;
 406                rdsdebug("ib_create_cq send failed: %d\n", ret);
 407                goto out;
 408        }
 409
 410        cq_attr.cqe = ic->i_recv_ring.w_nr;
 411        ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
 412                                     rds_ib_cq_event_handler, conn,
 413                                     &cq_attr);
 414        if (IS_ERR(ic->i_recv_cq)) {
 415                ret = PTR_ERR(ic->i_recv_cq);
 416                ic->i_recv_cq = NULL;
 417                rdsdebug("ib_create_cq recv failed: %d\n", ret);
 418                goto out;
 419        }
 420
 421        ret = ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
 422        if (ret) {
 423                rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
 424                goto out;
 425        }
 426
 427        ret = ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
 428        if (ret) {
 429                rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
 430                goto out;
 431        }
 432
 433        /* XXX negotiate max send/recv with remote? */
 434        memset(&attr, 0, sizeof(attr));
 435        attr.event_handler = rds_ib_qp_event_handler;
 436        attr.qp_context = conn;
 437        /* + 1 to allow for the single ack message */
 438        attr.cap.max_send_wr = ic->i_send_ring.w_nr + fr_queue_space + 1;
 439        attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
 440        attr.cap.max_send_sge = rds_ibdev->max_sge;
 441        attr.cap.max_recv_sge = RDS_IB_RECV_SGE;
 442        attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 443        attr.qp_type = IB_QPT_RC;
 444        attr.send_cq = ic->i_send_cq;
 445        attr.recv_cq = ic->i_recv_cq;
 446        atomic_set(&ic->i_fastreg_wrs, RDS_IB_DEFAULT_FR_WR);
 447
 448        /*
 449         * XXX this can fail if max_*_wr is too large?  Are we supposed
 450         * to back off until we get a value that the hardware can support?
 451         */
 452        ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
 453        if (ret) {
 454                rdsdebug("rdma_create_qp failed: %d\n", ret);
 455                goto out;
 456        }
 457
 458        ic->i_send_hdrs = ib_dma_alloc_coherent(dev,
 459                                           ic->i_send_ring.w_nr *
 460                                                sizeof(struct rds_header),
 461                                           &ic->i_send_hdrs_dma, GFP_KERNEL);
 462        if (!ic->i_send_hdrs) {
 463                ret = -ENOMEM;
 464                rdsdebug("ib_dma_alloc_coherent send failed\n");
 465                goto out;
 466        }
 467
 468        ic->i_recv_hdrs = ib_dma_alloc_coherent(dev,
 469                                           ic->i_recv_ring.w_nr *
 470                                                sizeof(struct rds_header),
 471                                           &ic->i_recv_hdrs_dma, GFP_KERNEL);
 472        if (!ic->i_recv_hdrs) {
 473                ret = -ENOMEM;
 474                rdsdebug("ib_dma_alloc_coherent recv failed\n");
 475                goto out;
 476        }
 477
 478        ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header),
 479                                       &ic->i_ack_dma, GFP_KERNEL);
 480        if (!ic->i_ack) {
 481                ret = -ENOMEM;
 482                rdsdebug("ib_dma_alloc_coherent ack failed\n");
 483                goto out;
 484        }
 485
 486        ic->i_sends = vzalloc_node(ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work),
 487                                   ibdev_to_node(dev));
 488        if (!ic->i_sends) {
 489                ret = -ENOMEM;
 490                rdsdebug("send allocation failed\n");
 491                goto out;
 492        }
 493
 494        ic->i_recvs = vzalloc_node(ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work),
 495                                   ibdev_to_node(dev));
 496        if (!ic->i_recvs) {
 497                ret = -ENOMEM;
 498                rdsdebug("recv allocation failed\n");
 499                goto out;
 500        }
 501
 502        rds_ib_recv_init_ack(ic);
 503
 504        rdsdebug("conn %p pd %p cq %p %p\n", conn, ic->i_pd,
 505                 ic->i_send_cq, ic->i_recv_cq);
 506
 507out:
 508        rds_ib_dev_put(rds_ibdev);
 509        return ret;
 510}
 511
 512static u32 rds_ib_protocol_compatible(struct rdma_cm_event *event)
 513{
 514        const struct rds_ib_connect_private *dp = event->param.conn.private_data;
 515        u16 common;
 516        u32 version = 0;
 517
 518        /*
 519         * rdma_cm private data is odd - when there is any private data in the
 520         * request, we will be given a pretty large buffer without telling us the
 521         * original size. The only way to tell the difference is by looking at
 522         * the contents, which are initialized to zero.
 523         * If the protocol version fields aren't set, this is a connection attempt
 524         * from an older version. This could could be 3.0 or 2.0 - we can't tell.
 525         * We really should have changed this for OFED 1.3 :-(
 526         */
 527
 528        /* Be paranoid. RDS always has privdata */
 529        if (!event->param.conn.private_data_len) {
 530                printk(KERN_NOTICE "RDS incoming connection has no private data, "
 531                        "rejecting\n");
 532                return 0;
 533        }
 534
 535        /* Even if len is crap *now* I still want to check it. -ASG */
 536        if (event->param.conn.private_data_len < sizeof (*dp) ||
 537            dp->dp_protocol_major == 0)
 538                return RDS_PROTOCOL_3_0;
 539
 540        common = be16_to_cpu(dp->dp_protocol_minor_mask) & RDS_IB_SUPPORTED_PROTOCOLS;
 541        if (dp->dp_protocol_major == 3 && common) {
 542                version = RDS_PROTOCOL_3_0;
 543                while ((common >>= 1) != 0)
 544                        version++;
 545        } else
 546                printk_ratelimited(KERN_NOTICE "RDS: Connection from %pI4 using incompatible protocol version %u.%u\n",
 547                                &dp->dp_saddr,
 548                                dp->dp_protocol_major,
 549                                dp->dp_protocol_minor);
 550        return version;
 551}
 552
 553int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
 554                                    struct rdma_cm_event *event)
 555{
 556        __be64 lguid = cm_id->route.path_rec->sgid.global.interface_id;
 557        __be64 fguid = cm_id->route.path_rec->dgid.global.interface_id;
 558        const struct rds_ib_connect_private *dp = event->param.conn.private_data;
 559        struct rds_ib_connect_private dp_rep;
 560        struct rds_connection *conn = NULL;
 561        struct rds_ib_connection *ic = NULL;
 562        struct rdma_conn_param conn_param;
 563        u32 version;
 564        int err = 1, destroy = 1;
 565
 566        /* Check whether the remote protocol version matches ours. */
 567        version = rds_ib_protocol_compatible(event);
 568        if (!version)
 569                goto out;
 570
 571        rdsdebug("saddr %pI4 daddr %pI4 RDSv%u.%u lguid 0x%llx fguid "
 572                 "0x%llx\n", &dp->dp_saddr, &dp->dp_daddr,
 573                 RDS_PROTOCOL_MAJOR(version), RDS_PROTOCOL_MINOR(version),
 574                 (unsigned long long)be64_to_cpu(lguid),
 575                 (unsigned long long)be64_to_cpu(fguid));
 576
 577        /* RDS/IB is not currently netns aware, thus init_net */
 578        conn = rds_conn_create(&init_net, dp->dp_daddr, dp->dp_saddr,
 579                               &rds_ib_transport, GFP_KERNEL);
 580        if (IS_ERR(conn)) {
 581                rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn));
 582                conn = NULL;
 583                goto out;
 584        }
 585
 586        /*
 587         * The connection request may occur while the
 588         * previous connection exist, e.g. in case of failover.
 589         * But as connections may be initiated simultaneously
 590         * by both hosts, we have a random backoff mechanism -
 591         * see the comment above rds_queue_reconnect()
 592         */
 593        mutex_lock(&conn->c_cm_lock);
 594        if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
 595                if (rds_conn_state(conn) == RDS_CONN_UP) {
 596                        rdsdebug("incoming connect while connecting\n");
 597                        rds_conn_drop(conn);
 598                        rds_ib_stats_inc(s_ib_listen_closed_stale);
 599                } else
 600                if (rds_conn_state(conn) == RDS_CONN_CONNECTING) {
 601                        /* Wait and see - our connect may still be succeeding */
 602                        rds_ib_stats_inc(s_ib_connect_raced);
 603                }
 604                goto out;
 605        }
 606
 607        ic = conn->c_transport_data;
 608
 609        rds_ib_set_protocol(conn, version);
 610        rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
 611
 612        /* If the peer gave us the last packet it saw, process this as if
 613         * we had received a regular ACK. */
 614        if (dp->dp_ack_seq)
 615                rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
 616
 617        BUG_ON(cm_id->context);
 618        BUG_ON(ic->i_cm_id);
 619
 620        ic->i_cm_id = cm_id;
 621        cm_id->context = conn;
 622
 623        /* We got halfway through setting up the ib_connection, if we
 624         * fail now, we have to take the long route out of this mess. */
 625        destroy = 0;
 626
 627        err = rds_ib_setup_qp(conn);
 628        if (err) {
 629                rds_ib_conn_error(conn, "rds_ib_setup_qp failed (%d)\n", err);
 630                goto out;
 631        }
 632
 633        rds_ib_cm_fill_conn_param(conn, &conn_param, &dp_rep, version,
 634                event->param.conn.responder_resources,
 635                event->param.conn.initiator_depth);
 636
 637        /* rdma_accept() calls rdma_reject() internally if it fails */
 638        err = rdma_accept(cm_id, &conn_param);
 639        if (err)
 640                rds_ib_conn_error(conn, "rdma_accept failed (%d)\n", err);
 641
 642out:
 643        if (conn)
 644                mutex_unlock(&conn->c_cm_lock);
 645        if (err)
 646                rdma_reject(cm_id, NULL, 0);
 647        return destroy;
 648}
 649
 650
 651int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
 652{
 653        struct rds_connection *conn = cm_id->context;
 654        struct rds_ib_connection *ic = conn->c_transport_data;
 655        struct rdma_conn_param conn_param;
 656        struct rds_ib_connect_private dp;
 657        int ret;
 658
 659        /* If the peer doesn't do protocol negotiation, we must
 660         * default to RDSv3.0 */
 661        rds_ib_set_protocol(conn, RDS_PROTOCOL_3_0);
 662        ic->i_flowctl = rds_ib_sysctl_flow_control;     /* advertise flow control */
 663
 664        ret = rds_ib_setup_qp(conn);
 665        if (ret) {
 666                rds_ib_conn_error(conn, "rds_ib_setup_qp failed (%d)\n", ret);
 667                goto out;
 668        }
 669
 670        rds_ib_cm_fill_conn_param(conn, &conn_param, &dp, RDS_PROTOCOL_VERSION,
 671                UINT_MAX, UINT_MAX);
 672        ret = rdma_connect(cm_id, &conn_param);
 673        if (ret)
 674                rds_ib_conn_error(conn, "rdma_connect failed (%d)\n", ret);
 675
 676out:
 677        /* Beware - returning non-zero tells the rdma_cm to destroy
 678         * the cm_id. We should certainly not do it as long as we still
 679         * "own" the cm_id. */
 680        if (ret) {
 681                if (ic->i_cm_id == cm_id)
 682                        ret = 0;
 683        }
 684        return ret;
 685}
 686
 687int rds_ib_conn_connect(struct rds_connection *conn)
 688{
 689        struct rds_ib_connection *ic = conn->c_transport_data;
 690        struct sockaddr_in src, dest;
 691        int ret;
 692
 693        /* XXX I wonder what affect the port space has */
 694        /* delegate cm event handler to rdma_transport */
 695        ic->i_cm_id = rdma_create_id(&init_net, rds_rdma_cm_event_handler, conn,
 696                                     RDMA_PS_TCP, IB_QPT_RC);
 697        if (IS_ERR(ic->i_cm_id)) {
 698                ret = PTR_ERR(ic->i_cm_id);
 699                ic->i_cm_id = NULL;
 700                rdsdebug("rdma_create_id() failed: %d\n", ret);
 701                goto out;
 702        }
 703
 704        rdsdebug("created cm id %p for conn %p\n", ic->i_cm_id, conn);
 705
 706        src.sin_family = AF_INET;
 707        src.sin_addr.s_addr = (__force u32)conn->c_laddr;
 708        src.sin_port = (__force u16)htons(0);
 709
 710        dest.sin_family = AF_INET;
 711        dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
 712        dest.sin_port = (__force u16)htons(RDS_PORT);
 713
 714        ret = rdma_resolve_addr(ic->i_cm_id, (struct sockaddr *)&src,
 715                                (struct sockaddr *)&dest,
 716                                RDS_RDMA_RESOLVE_TIMEOUT_MS);
 717        if (ret) {
 718                rdsdebug("addr resolve failed for cm id %p: %d\n", ic->i_cm_id,
 719                         ret);
 720                rdma_destroy_id(ic->i_cm_id);
 721                ic->i_cm_id = NULL;
 722        }
 723
 724out:
 725        return ret;
 726}
 727
 728/*
 729 * This is so careful about only cleaning up resources that were built up
 730 * so that it can be called at any point during startup.  In fact it
 731 * can be called multiple times for a given connection.
 732 */
 733void rds_ib_conn_shutdown(struct rds_connection *conn)
 734{
 735        struct rds_ib_connection *ic = conn->c_transport_data;
 736        int err = 0;
 737
 738        rdsdebug("cm %p pd %p cq %p %p qp %p\n", ic->i_cm_id,
 739                 ic->i_pd, ic->i_send_cq, ic->i_recv_cq,
 740                 ic->i_cm_id ? ic->i_cm_id->qp : NULL);
 741
 742        if (ic->i_cm_id) {
 743                struct ib_device *dev = ic->i_cm_id->device;
 744
 745                rdsdebug("disconnecting cm %p\n", ic->i_cm_id);
 746                err = rdma_disconnect(ic->i_cm_id);
 747                if (err) {
 748                        /* Actually this may happen quite frequently, when
 749                         * an outgoing connect raced with an incoming connect.
 750                         */
 751                        rdsdebug("failed to disconnect, cm: %p err %d\n",
 752                                ic->i_cm_id, err);
 753                }
 754
 755                /*
 756                 * We want to wait for tx and rx completion to finish
 757                 * before we tear down the connection, but we have to be
 758                 * careful not to get stuck waiting on a send ring that
 759                 * only has unsignaled sends in it.  We've shutdown new
 760                 * sends before getting here so by waiting for signaled
 761                 * sends to complete we're ensured that there will be no
 762                 * more tx processing.
 763                 */
 764                wait_event(rds_ib_ring_empty_wait,
 765                           rds_ib_ring_empty(&ic->i_recv_ring) &&
 766                           (atomic_read(&ic->i_signaled_sends) == 0) &&
 767                           (atomic_read(&ic->i_fastreg_wrs) == RDS_IB_DEFAULT_FR_WR));
 768                tasklet_kill(&ic->i_send_tasklet);
 769                tasklet_kill(&ic->i_recv_tasklet);
 770
 771                /* first destroy the ib state that generates callbacks */
 772                if (ic->i_cm_id->qp)
 773                        rdma_destroy_qp(ic->i_cm_id);
 774                if (ic->i_send_cq)
 775                        ib_destroy_cq(ic->i_send_cq);
 776                if (ic->i_recv_cq)
 777                        ib_destroy_cq(ic->i_recv_cq);
 778
 779                /* then free the resources that ib callbacks use */
 780                if (ic->i_send_hdrs)
 781                        ib_dma_free_coherent(dev,
 782                                           ic->i_send_ring.w_nr *
 783                                                sizeof(struct rds_header),
 784                                           ic->i_send_hdrs,
 785                                           ic->i_send_hdrs_dma);
 786
 787                if (ic->i_recv_hdrs)
 788                        ib_dma_free_coherent(dev,
 789                                           ic->i_recv_ring.w_nr *
 790                                                sizeof(struct rds_header),
 791                                           ic->i_recv_hdrs,
 792                                           ic->i_recv_hdrs_dma);
 793
 794                if (ic->i_ack)
 795                        ib_dma_free_coherent(dev, sizeof(struct rds_header),
 796                                             ic->i_ack, ic->i_ack_dma);
 797
 798                if (ic->i_sends)
 799                        rds_ib_send_clear_ring(ic);
 800                if (ic->i_recvs)
 801                        rds_ib_recv_clear_ring(ic);
 802
 803                rdma_destroy_id(ic->i_cm_id);
 804
 805                /*
 806                 * Move connection back to the nodev list.
 807                 */
 808                if (ic->rds_ibdev)
 809                        rds_ib_remove_conn(ic->rds_ibdev, conn);
 810
 811                ic->i_cm_id = NULL;
 812                ic->i_pd = NULL;
 813                ic->i_send_cq = NULL;
 814                ic->i_recv_cq = NULL;
 815                ic->i_send_hdrs = NULL;
 816                ic->i_recv_hdrs = NULL;
 817                ic->i_ack = NULL;
 818        }
 819        BUG_ON(ic->rds_ibdev);
 820
 821        /* Clear pending transmit */
 822        if (ic->i_data_op) {
 823                struct rds_message *rm;
 824
 825                rm = container_of(ic->i_data_op, struct rds_message, data);
 826                rds_message_put(rm);
 827                ic->i_data_op = NULL;
 828        }
 829
 830        /* Clear the ACK state */
 831        clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
 832#ifdef KERNEL_HAS_ATOMIC64
 833        atomic64_set(&ic->i_ack_next, 0);
 834#else
 835        ic->i_ack_next = 0;
 836#endif
 837        ic->i_ack_recv = 0;
 838
 839        /* Clear flow control state */
 840        ic->i_flowctl = 0;
 841        atomic_set(&ic->i_credits, 0);
 842
 843        rds_ib_ring_init(&ic->i_send_ring, rds_ib_sysctl_max_send_wr);
 844        rds_ib_ring_init(&ic->i_recv_ring, rds_ib_sysctl_max_recv_wr);
 845
 846        if (ic->i_ibinc) {
 847                rds_inc_put(&ic->i_ibinc->ii_inc);
 848                ic->i_ibinc = NULL;
 849        }
 850
 851        vfree(ic->i_sends);
 852        ic->i_sends = NULL;
 853        vfree(ic->i_recvs);
 854        ic->i_recvs = NULL;
 855}
 856
 857int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 858{
 859        struct rds_ib_connection *ic;
 860        unsigned long flags;
 861        int ret;
 862
 863        /* XXX too lazy? */
 864        ic = kzalloc(sizeof(struct rds_ib_connection), gfp);
 865        if (!ic)
 866                return -ENOMEM;
 867
 868        ret = rds_ib_recv_alloc_caches(ic);
 869        if (ret) {
 870                kfree(ic);
 871                return ret;
 872        }
 873
 874        INIT_LIST_HEAD(&ic->ib_node);
 875        tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
 876                     (unsigned long)ic);
 877        tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
 878                     (unsigned long)ic);
 879        mutex_init(&ic->i_recv_mutex);
 880#ifndef KERNEL_HAS_ATOMIC64
 881        spin_lock_init(&ic->i_ack_lock);
 882#endif
 883        atomic_set(&ic->i_signaled_sends, 0);
 884
 885        /*
 886         * rds_ib_conn_shutdown() waits for these to be emptied so they
 887         * must be initialized before it can be called.
 888         */
 889        rds_ib_ring_init(&ic->i_send_ring, rds_ib_sysctl_max_send_wr);
 890        rds_ib_ring_init(&ic->i_recv_ring, rds_ib_sysctl_max_recv_wr);
 891
 892        ic->conn = conn;
 893        conn->c_transport_data = ic;
 894
 895        spin_lock_irqsave(&ib_nodev_conns_lock, flags);
 896        list_add_tail(&ic->ib_node, &ib_nodev_conns);
 897        spin_unlock_irqrestore(&ib_nodev_conns_lock, flags);
 898
 899
 900        rdsdebug("conn %p conn ic %p\n", conn, conn->c_transport_data);
 901        return 0;
 902}
 903
 904/*
 905 * Free a connection. Connection must be shut down and not set for reconnect.
 906 */
 907void rds_ib_conn_free(void *arg)
 908{
 909        struct rds_ib_connection *ic = arg;
 910        spinlock_t      *lock_ptr;
 911
 912        rdsdebug("ic %p\n", ic);
 913
 914        /*
 915         * Conn is either on a dev's list or on the nodev list.
 916         * A race with shutdown() or connect() would cause problems
 917         * (since rds_ibdev would change) but that should never happen.
 918         */
 919        lock_ptr = ic->rds_ibdev ? &ic->rds_ibdev->spinlock : &ib_nodev_conns_lock;
 920
 921        spin_lock_irq(lock_ptr);
 922        list_del(&ic->ib_node);
 923        spin_unlock_irq(lock_ptr);
 924
 925        rds_ib_recv_free_caches(ic);
 926
 927        kfree(ic);
 928}
 929
 930
 931/*
 932 * An error occurred on the connection
 933 */
 934void
 935__rds_ib_conn_error(struct rds_connection *conn, const char *fmt, ...)
 936{
 937        va_list ap;
 938
 939        rds_conn_drop(conn);
 940
 941        va_start(ap, fmt);
 942        vprintk(fmt, ap);
 943        va_end(ap);
 944}
 945