linux/net/sunrpc/xprtrdma/svc_rdma_transport.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
   3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
   4 *
   5 * This software is available to you under a choice of one of two
   6 * licenses.  You may choose to be licensed under the terms of the GNU
   7 * General Public License (GPL) Version 2, available from the file
   8 * COPYING in the main directory of this source tree, or the BSD-type
   9 * license below:
  10 *
  11 * Redistribution and use in source and binary forms, with or without
  12 * modification, are permitted provided that the following conditions
  13 * are met:
  14 *
  15 *      Redistributions of source code must retain the above copyright
  16 *      notice, this list of conditions and the following disclaimer.
  17 *
  18 *      Redistributions in binary form must reproduce the above
  19 *      copyright notice, this list of conditions and the following
  20 *      disclaimer in the documentation and/or other materials provided
  21 *      with the distribution.
  22 *
  23 *      Neither the name of the Network Appliance, Inc. nor the names of
  24 *      its contributors may be used to endorse or promote products
  25 *      derived from this software without specific prior written
  26 *      permission.
  27 *
  28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39 *
  40 * Author: Tom Tucker <tom@opengridcomputing.com>
  41 */
  42
  43#include <linux/sunrpc/svc_xprt.h>
  44#include <linux/sunrpc/addr.h>
  45#include <linux/sunrpc/debug.h>
  46#include <linux/sunrpc/rpc_rdma.h>
  47#include <linux/interrupt.h>
  48#include <linux/sched.h>
  49#include <linux/slab.h>
  50#include <linux/spinlock.h>
  51#include <linux/workqueue.h>
  52#include <rdma/ib_verbs.h>
  53#include <rdma/rdma_cm.h>
  54#include <rdma/rw.h>
  55#include <linux/sunrpc/svc_rdma.h>
  56#include <linux/export.h>
  57#include "xprt_rdma.h"
  58
  59#define RPCDBG_FACILITY RPCDBG_SVCXPRT
  60
  61static int svc_rdma_post_recv(struct svcxprt_rdma *xprt);
  62static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
  63static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
  64                                        struct net *net,
  65                                        struct sockaddr *sa, int salen,
  66                                        int flags);
  67static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
  68static void svc_rdma_release_rqst(struct svc_rqst *);
  69static void svc_rdma_detach(struct svc_xprt *xprt);
  70static void svc_rdma_free(struct svc_xprt *xprt);
  71static int svc_rdma_has_wspace(struct svc_xprt *xprt);
  72static void svc_rdma_secure_port(struct svc_rqst *);
  73static void svc_rdma_kill_temp_xprt(struct svc_xprt *);
  74
  75static const struct svc_xprt_ops svc_rdma_ops = {
  76        .xpo_create = svc_rdma_create,
  77        .xpo_recvfrom = svc_rdma_recvfrom,
  78        .xpo_sendto = svc_rdma_sendto,
  79        .xpo_release_rqst = svc_rdma_release_rqst,
  80        .xpo_detach = svc_rdma_detach,
  81        .xpo_free = svc_rdma_free,
  82        .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
  83        .xpo_has_wspace = svc_rdma_has_wspace,
  84        .xpo_accept = svc_rdma_accept,
  85        .xpo_secure_port = svc_rdma_secure_port,
  86        .xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt,
  87};
  88
  89struct svc_xprt_class svc_rdma_class = {
  90        .xcl_name = "rdma",
  91        .xcl_owner = THIS_MODULE,
  92        .xcl_ops = &svc_rdma_ops,
  93        .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA,
  94        .xcl_ident = XPRT_TRANSPORT_RDMA,
  95};
  96
  97#if defined(CONFIG_SUNRPC_BACKCHANNEL)
  98static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
  99                                           struct sockaddr *, int, int);
 100static void svc_rdma_bc_detach(struct svc_xprt *);
 101static void svc_rdma_bc_free(struct svc_xprt *);
 102
 103static const struct svc_xprt_ops svc_rdma_bc_ops = {
 104        .xpo_create = svc_rdma_bc_create,
 105        .xpo_detach = svc_rdma_bc_detach,
 106        .xpo_free = svc_rdma_bc_free,
 107        .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
 108        .xpo_secure_port = svc_rdma_secure_port,
 109};
 110
 111struct svc_xprt_class svc_rdma_bc_class = {
 112        .xcl_name = "rdma-bc",
 113        .xcl_owner = THIS_MODULE,
 114        .xcl_ops = &svc_rdma_bc_ops,
 115        .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
 116};
 117
 118static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
 119                                           struct net *net,
 120                                           struct sockaddr *sa, int salen,
 121                                           int flags)
 122{
 123        struct svcxprt_rdma *cma_xprt;
 124        struct svc_xprt *xprt;
 125
 126        cma_xprt = rdma_create_xprt(serv, 0);
 127        if (!cma_xprt)
 128                return ERR_PTR(-ENOMEM);
 129        xprt = &cma_xprt->sc_xprt;
 130
 131        svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
 132        set_bit(XPT_CONG_CTRL, &xprt->xpt_flags);
 133        serv->sv_bc_xprt = xprt;
 134
 135        dprintk("svcrdma: %s(%p)\n", __func__, xprt);
 136        return xprt;
 137}
 138
 139static void svc_rdma_bc_detach(struct svc_xprt *xprt)
 140{
 141        dprintk("svcrdma: %s(%p)\n", __func__, xprt);
 142}
 143
 144static void svc_rdma_bc_free(struct svc_xprt *xprt)
 145{
 146        struct svcxprt_rdma *rdma =
 147                container_of(xprt, struct svcxprt_rdma, sc_xprt);
 148
 149        dprintk("svcrdma: %s(%p)\n", __func__, xprt);
 150        if (xprt)
 151                kfree(rdma);
 152}
 153#endif  /* CONFIG_SUNRPC_BACKCHANNEL */
 154
 155static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt,
 156                                           gfp_t flags)
 157{
 158        struct svc_rdma_op_ctxt *ctxt;
 159
 160        ctxt = kmalloc(sizeof(*ctxt), flags);
 161        if (ctxt) {
 162                ctxt->xprt = xprt;
 163                INIT_LIST_HEAD(&ctxt->list);
 164        }
 165        return ctxt;
 166}
 167
 168static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
 169{
 170        unsigned int i;
 171
 172        /* Each RPC/RDMA credit can consume one Receive and
 173         * one Send WQE at the same time.
 174         */
 175        i = xprt->sc_sq_depth + xprt->sc_rq_depth;
 176
 177        while (i--) {
 178                struct svc_rdma_op_ctxt *ctxt;
 179
 180                ctxt = alloc_ctxt(xprt, GFP_KERNEL);
 181                if (!ctxt) {
 182                        dprintk("svcrdma: No memory for RDMA ctxt\n");
 183                        return false;
 184                }
 185                list_add(&ctxt->list, &xprt->sc_ctxts);
 186        }
 187        return true;
 188}
 189
 190struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
 191{
 192        struct svc_rdma_op_ctxt *ctxt = NULL;
 193
 194        spin_lock(&xprt->sc_ctxt_lock);
 195        xprt->sc_ctxt_used++;
 196        if (list_empty(&xprt->sc_ctxts))
 197                goto out_empty;
 198
 199        ctxt = list_first_entry(&xprt->sc_ctxts,
 200                                struct svc_rdma_op_ctxt, list);
 201        list_del(&ctxt->list);
 202        spin_unlock(&xprt->sc_ctxt_lock);
 203
 204out:
 205        ctxt->count = 0;
 206        ctxt->mapped_sges = 0;
 207        return ctxt;
 208
 209out_empty:
 210        /* Either pre-allocation missed the mark, or send
 211         * queue accounting is broken.
 212         */
 213        spin_unlock(&xprt->sc_ctxt_lock);
 214
 215        ctxt = alloc_ctxt(xprt, GFP_NOIO);
 216        if (ctxt)
 217                goto out;
 218
 219        spin_lock(&xprt->sc_ctxt_lock);
 220        xprt->sc_ctxt_used--;
 221        spin_unlock(&xprt->sc_ctxt_lock);
 222        WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n");
 223        return NULL;
 224}
 225
 226void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
 227{
 228        struct svcxprt_rdma *xprt = ctxt->xprt;
 229        struct ib_device *device = xprt->sc_cm_id->device;
 230        unsigned int i;
 231
 232        for (i = 0; i < ctxt->mapped_sges; i++)
 233                ib_dma_unmap_page(device,
 234                                  ctxt->sge[i].addr,
 235                                  ctxt->sge[i].length,
 236                                  ctxt->direction);
 237        ctxt->mapped_sges = 0;
 238}
 239
 240void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
 241{
 242        struct svcxprt_rdma *xprt = ctxt->xprt;
 243        int i;
 244
 245        if (free_pages)
 246                for (i = 0; i < ctxt->count; i++)
 247                        put_page(ctxt->pages[i]);
 248
 249        spin_lock(&xprt->sc_ctxt_lock);
 250        xprt->sc_ctxt_used--;
 251        list_add(&ctxt->list, &xprt->sc_ctxts);
 252        spin_unlock(&xprt->sc_ctxt_lock);
 253}
 254
 255static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
 256{
 257        while (!list_empty(&xprt->sc_ctxts)) {
 258                struct svc_rdma_op_ctxt *ctxt;
 259
 260                ctxt = list_first_entry(&xprt->sc_ctxts,
 261                                        struct svc_rdma_op_ctxt, list);
 262                list_del(&ctxt->list);
 263                kfree(ctxt);
 264        }
 265}
 266
 267/* QP event handler */
 268static void qp_event_handler(struct ib_event *event, void *context)
 269{
 270        struct svc_xprt *xprt = context;
 271
 272        switch (event->event) {
 273        /* These are considered benign events */
 274        case IB_EVENT_PATH_MIG:
 275        case IB_EVENT_COMM_EST:
 276        case IB_EVENT_SQ_DRAINED:
 277        case IB_EVENT_QP_LAST_WQE_REACHED:
 278                dprintk("svcrdma: QP event %s (%d) received for QP=%p\n",
 279                        ib_event_msg(event->event), event->event,
 280                        event->element.qp);
 281                break;
 282        /* These are considered fatal events */
 283        case IB_EVENT_PATH_MIG_ERR:
 284        case IB_EVENT_QP_FATAL:
 285        case IB_EVENT_QP_REQ_ERR:
 286        case IB_EVENT_QP_ACCESS_ERR:
 287        case IB_EVENT_DEVICE_FATAL:
 288        default:
 289                dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, "
 290                        "closing transport\n",
 291                        ib_event_msg(event->event), event->event,
 292                        event->element.qp);
 293                set_bit(XPT_CLOSE, &xprt->xpt_flags);
 294                svc_xprt_enqueue(xprt);
 295                break;
 296        }
 297}
 298
 299/**
 300 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
 301 * @cq:        completion queue
 302 * @wc:        completed WR
 303 *
 304 */
 305static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 306{
 307        struct svcxprt_rdma *xprt = cq->cq_context;
 308        struct ib_cqe *cqe = wc->wr_cqe;
 309        struct svc_rdma_op_ctxt *ctxt;
 310
 311        /* WARNING: Only wc->wr_cqe and wc->status are reliable */
 312        ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
 313        svc_rdma_unmap_dma(ctxt);
 314
 315        if (wc->status != IB_WC_SUCCESS)
 316                goto flushed;
 317
 318        /* All wc fields are now known to be valid */
 319        ctxt->byte_len = wc->byte_len;
 320        spin_lock(&xprt->sc_rq_dto_lock);
 321        list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q);
 322        spin_unlock(&xprt->sc_rq_dto_lock);
 323
 324        svc_rdma_post_recv(xprt);
 325
 326        set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
 327        if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
 328                goto out;
 329        goto out_enqueue;
 330
 331flushed:
 332        if (wc->status != IB_WC_WR_FLUSH_ERR)
 333                pr_err("svcrdma: Recv: %s (%u/0x%x)\n",
 334                       ib_wc_status_msg(wc->status),
 335                       wc->status, wc->vendor_err);
 336        set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 337        svc_rdma_put_context(ctxt, 1);
 338
 339out_enqueue:
 340        svc_xprt_enqueue(&xprt->sc_xprt);
 341out:
 342        svc_xprt_put(&xprt->sc_xprt);
 343}
 344
 345/**
 346 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
 347 * @cq:        completion queue
 348 * @wc:        completed WR
 349 *
 350 */
 351void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 352{
 353        struct svcxprt_rdma *xprt = cq->cq_context;
 354        struct ib_cqe *cqe = wc->wr_cqe;
 355        struct svc_rdma_op_ctxt *ctxt;
 356
 357        atomic_inc(&xprt->sc_sq_avail);
 358        wake_up(&xprt->sc_send_wait);
 359
 360        ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
 361        svc_rdma_unmap_dma(ctxt);
 362        svc_rdma_put_context(ctxt, 1);
 363
 364        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 365                set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 366                svc_xprt_enqueue(&xprt->sc_xprt);
 367                if (wc->status != IB_WC_WR_FLUSH_ERR)
 368                        pr_err("svcrdma: Send: %s (%u/0x%x)\n",
 369                               ib_wc_status_msg(wc->status),
 370                               wc->status, wc->vendor_err);
 371        }
 372
 373        svc_xprt_put(&xprt->sc_xprt);
 374}
 375
 376static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
 377                                             int listener)
 378{
 379        struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
 380
 381        if (!cma_xprt)
 382                return NULL;
 383        svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);
 384        INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
 385        INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
 386        INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 387        INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
 388        INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
 389        init_waitqueue_head(&cma_xprt->sc_send_wait);
 390
 391        spin_lock_init(&cma_xprt->sc_lock);
 392        spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 393        spin_lock_init(&cma_xprt->sc_ctxt_lock);
 394        spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
 395
 396        /*
 397         * Note that this implies that the underlying transport support
 398         * has some form of congestion control (see RFC 7530 section 3.1
 399         * paragraph 2). For now, we assume that all supported RDMA
 400         * transports are suitable here.
 401         */
 402        set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags);
 403
 404        if (listener) {
 405                strcpy(cma_xprt->sc_xprt.xpt_remotebuf, "listener");
 406                set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
 407        }
 408
 409        return cma_xprt;
 410}
 411
 412static int
 413svc_rdma_post_recv(struct svcxprt_rdma *xprt)
 414{
 415        struct ib_recv_wr recv_wr, *bad_recv_wr;
 416        struct svc_rdma_op_ctxt *ctxt;
 417        struct page *page;
 418        dma_addr_t pa;
 419        int sge_no;
 420        int buflen;
 421        int ret;
 422
 423        ctxt = svc_rdma_get_context(xprt);
 424        buflen = 0;
 425        ctxt->direction = DMA_FROM_DEVICE;
 426        ctxt->cqe.done = svc_rdma_wc_receive;
 427        for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
 428                if (sge_no >= xprt->sc_max_sge) {
 429                        pr_err("svcrdma: Too many sges (%d)\n", sge_no);
 430                        goto err_put_ctxt;
 431                }
 432                page = alloc_page(GFP_KERNEL);
 433                if (!page)
 434                        goto err_put_ctxt;
 435                ctxt->pages[sge_no] = page;
 436                pa = ib_dma_map_page(xprt->sc_cm_id->device,
 437                                     page, 0, PAGE_SIZE,
 438                                     DMA_FROM_DEVICE);
 439                if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
 440                        goto err_put_ctxt;
 441                svc_rdma_count_mappings(xprt, ctxt);
 442                ctxt->sge[sge_no].addr = pa;
 443                ctxt->sge[sge_no].length = PAGE_SIZE;
 444                ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
 445                ctxt->count = sge_no + 1;
 446                buflen += PAGE_SIZE;
 447        }
 448        recv_wr.next = NULL;
 449        recv_wr.sg_list = &ctxt->sge[0];
 450        recv_wr.num_sge = ctxt->count;
 451        recv_wr.wr_cqe = &ctxt->cqe;
 452
 453        svc_xprt_get(&xprt->sc_xprt);
 454        ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
 455        if (ret) {
 456                svc_rdma_unmap_dma(ctxt);
 457                svc_rdma_put_context(ctxt, 1);
 458                svc_xprt_put(&xprt->sc_xprt);
 459        }
 460        return ret;
 461
 462 err_put_ctxt:
 463        svc_rdma_unmap_dma(ctxt);
 464        svc_rdma_put_context(ctxt, 1);
 465        return -ENOMEM;
 466}
 467
 468static void
 469svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt,
 470                               struct rdma_conn_param *param)
 471{
 472        const struct rpcrdma_connect_private *pmsg = param->private_data;
 473
 474        if (pmsg &&
 475            pmsg->cp_magic == rpcrdma_cmp_magic &&
 476            pmsg->cp_version == RPCRDMA_CMP_VERSION) {
 477                newxprt->sc_snd_w_inv = pmsg->cp_flags &
 478                                        RPCRDMA_CMP_F_SND_W_INV_OK;
 479
 480                dprintk("svcrdma: client send_size %u, recv_size %u "
 481                        "remote inv %ssupported\n",
 482                        rpcrdma_decode_buffer_size(pmsg->cp_send_size),
 483                        rpcrdma_decode_buffer_size(pmsg->cp_recv_size),
 484                        newxprt->sc_snd_w_inv ? "" : "un");
 485        }
 486}
 487
 488/*
 489 * This function handles the CONNECT_REQUEST event on a listening
 490 * endpoint. It is passed the cma_id for the _new_ connection. The context in
 491 * this cma_id is inherited from the listening cma_id and is the svc_xprt
 492 * structure for the listening endpoint.
 493 *
 494 * This function creates a new xprt for the new connection and enqueues it on
 495 * the accept queue for the listent xprt. When the listen thread is kicked, it
 496 * will call the recvfrom method on the listen xprt which will accept the new
 497 * connection.
 498 */
 499static void handle_connect_req(struct rdma_cm_id *new_cma_id,
 500                               struct rdma_conn_param *param)
 501{
 502        struct svcxprt_rdma *listen_xprt = new_cma_id->context;
 503        struct svcxprt_rdma *newxprt;
 504        struct sockaddr *sa;
 505
 506        /* Create a new transport */
 507        newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
 508        if (!newxprt) {
 509                dprintk("svcrdma: failed to create new transport\n");
 510                return;
 511        }
 512        newxprt->sc_cm_id = new_cma_id;
 513        new_cma_id->context = newxprt;
 514        dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
 515                newxprt, newxprt->sc_cm_id, listen_xprt);
 516        svc_rdma_parse_connect_private(newxprt, param);
 517
 518        /* Save client advertised inbound read limit for use later in accept. */
 519        newxprt->sc_ord = param->initiator_depth;
 520
 521        /* Set the local and remote addresses in the transport */
 522        sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
 523        svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
 524        sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
 525        svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
 526
 527        /*
 528         * Enqueue the new transport on the accept queue of the listening
 529         * transport
 530         */
 531        spin_lock_bh(&listen_xprt->sc_lock);
 532        list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
 533        spin_unlock_bh(&listen_xprt->sc_lock);
 534
 535        set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
 536        svc_xprt_enqueue(&listen_xprt->sc_xprt);
 537}
 538
 539/*
 540 * Handles events generated on the listening endpoint. These events will be
 541 * either be incoming connect requests or adapter removal  events.
 542 */
 543static int rdma_listen_handler(struct rdma_cm_id *cma_id,
 544                               struct rdma_cm_event *event)
 545{
 546        struct svcxprt_rdma *xprt = cma_id->context;
 547        int ret = 0;
 548
 549        switch (event->event) {
 550        case RDMA_CM_EVENT_CONNECT_REQUEST:
 551                dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
 552                        "event = %s (%d)\n", cma_id, cma_id->context,
 553                        rdma_event_msg(event->event), event->event);
 554                handle_connect_req(cma_id, &event->param.conn);
 555                break;
 556
 557        case RDMA_CM_EVENT_ESTABLISHED:
 558                /* Accept complete */
 559                dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
 560                        "cm_id=%p\n", xprt, cma_id);
 561                break;
 562
 563        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 564                dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
 565                        xprt, cma_id);
 566                if (xprt) {
 567                        set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 568                        svc_xprt_enqueue(&xprt->sc_xprt);
 569                }
 570                break;
 571
 572        default:
 573                dprintk("svcrdma: Unexpected event on listening endpoint %p, "
 574                        "event = %s (%d)\n", cma_id,
 575                        rdma_event_msg(event->event), event->event);
 576                break;
 577        }
 578
 579        return ret;
 580}
 581
 582static int rdma_cma_handler(struct rdma_cm_id *cma_id,
 583                            struct rdma_cm_event *event)
 584{
 585        struct svc_xprt *xprt = cma_id->context;
 586        struct svcxprt_rdma *rdma =
 587                container_of(xprt, struct svcxprt_rdma, sc_xprt);
 588        switch (event->event) {
 589        case RDMA_CM_EVENT_ESTABLISHED:
 590                /* Accept complete */
 591                svc_xprt_get(xprt);
 592                dprintk("svcrdma: Connection completed on DTO xprt=%p, "
 593                        "cm_id=%p\n", xprt, cma_id);
 594                clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
 595                svc_xprt_enqueue(xprt);
 596                break;
 597        case RDMA_CM_EVENT_DISCONNECTED:
 598                dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
 599                        xprt, cma_id);
 600                if (xprt) {
 601                        set_bit(XPT_CLOSE, &xprt->xpt_flags);
 602                        svc_xprt_enqueue(xprt);
 603                        svc_xprt_put(xprt);
 604                }
 605                break;
 606        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 607                dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
 608                        "event = %s (%d)\n", cma_id, xprt,
 609                        rdma_event_msg(event->event), event->event);
 610                if (xprt) {
 611                        set_bit(XPT_CLOSE, &xprt->xpt_flags);
 612                        svc_xprt_enqueue(xprt);
 613                        svc_xprt_put(xprt);
 614                }
 615                break;
 616        default:
 617                dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
 618                        "event = %s (%d)\n", cma_id,
 619                        rdma_event_msg(event->event), event->event);
 620                break;
 621        }
 622        return 0;
 623}
 624
 625/*
 626 * Create a listening RDMA service endpoint.
 627 */
 628static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 629                                        struct net *net,
 630                                        struct sockaddr *sa, int salen,
 631                                        int flags)
 632{
 633        struct rdma_cm_id *listen_id;
 634        struct svcxprt_rdma *cma_xprt;
 635        int ret;
 636
 637        dprintk("svcrdma: Creating RDMA socket\n");
 638        if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) {
 639                dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family);
 640                return ERR_PTR(-EAFNOSUPPORT);
 641        }
 642        cma_xprt = rdma_create_xprt(serv, 1);
 643        if (!cma_xprt)
 644                return ERR_PTR(-ENOMEM);
 645
 646        listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt,
 647                                   RDMA_PS_TCP, IB_QPT_RC);
 648        if (IS_ERR(listen_id)) {
 649                ret = PTR_ERR(listen_id);
 650                dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
 651                goto err0;
 652        }
 653
 654        /* Allow both IPv4 and IPv6 sockets to bind a single port
 655         * at the same time.
 656         */
 657#if IS_ENABLED(CONFIG_IPV6)
 658        ret = rdma_set_afonly(listen_id, 1);
 659        if (ret) {
 660                dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret);
 661                goto err1;
 662        }
 663#endif
 664        ret = rdma_bind_addr(listen_id, sa);
 665        if (ret) {
 666                dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
 667                goto err1;
 668        }
 669        cma_xprt->sc_cm_id = listen_id;
 670
 671        ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
 672        if (ret) {
 673                dprintk("svcrdma: rdma_listen failed = %d\n", ret);
 674                goto err1;
 675        }
 676
 677        /*
 678         * We need to use the address from the cm_id in case the
 679         * caller specified 0 for the port number.
 680         */
 681        sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
 682        svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
 683
 684        return &cma_xprt->sc_xprt;
 685
 686 err1:
 687        rdma_destroy_id(listen_id);
 688 err0:
 689        kfree(cma_xprt);
 690        return ERR_PTR(ret);
 691}
 692
 693/*
 694 * This is the xpo_recvfrom function for listening endpoints. Its
 695 * purpose is to accept incoming connections. The CMA callback handler
 696 * has already created a new transport and attached it to the new CMA
 697 * ID.
 698 *
 699 * There is a queue of pending connections hung on the listening
 700 * transport. This queue contains the new svc_xprt structure. This
 701 * function takes svc_xprt structures off the accept_q and completes
 702 * the connection.
 703 */
 704static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 705{
 706        struct svcxprt_rdma *listen_rdma;
 707        struct svcxprt_rdma *newxprt = NULL;
 708        struct rdma_conn_param conn_param;
 709        struct rpcrdma_connect_private pmsg;
 710        struct ib_qp_init_attr qp_attr;
 711        struct ib_device *dev;
 712        struct sockaddr *sap;
 713        unsigned int i, ctxts;
 714        int ret = 0;
 715
 716        listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
 717        clear_bit(XPT_CONN, &xprt->xpt_flags);
 718        /* Get the next entry off the accept list */
 719        spin_lock_bh(&listen_rdma->sc_lock);
 720        if (!list_empty(&listen_rdma->sc_accept_q)) {
 721                newxprt = list_entry(listen_rdma->sc_accept_q.next,
 722                                     struct svcxprt_rdma, sc_accept_q);
 723                list_del_init(&newxprt->sc_accept_q);
 724        }
 725        if (!list_empty(&listen_rdma->sc_accept_q))
 726                set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
 727        spin_unlock_bh(&listen_rdma->sc_lock);
 728        if (!newxprt)
 729                return NULL;
 730
 731        dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
 732                newxprt, newxprt->sc_cm_id);
 733
 734        dev = newxprt->sc_cm_id->device;
 735        newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
 736
 737        /* Qualify the transport resource defaults with the
 738         * capabilities of this particular device */
 739        newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge,
 740                                  (size_t)RPCSVC_MAXPAGES);
 741        newxprt->sc_max_req_size = svcrdma_max_req_size;
 742        newxprt->sc_max_requests = svcrdma_max_requests;
 743        newxprt->sc_max_bc_requests = svcrdma_max_bc_requests;
 744        newxprt->sc_rq_depth = newxprt->sc_max_requests +
 745                               newxprt->sc_max_bc_requests;
 746        if (newxprt->sc_rq_depth > dev->attrs.max_qp_wr) {
 747                pr_warn("svcrdma: reducing receive depth to %d\n",
 748                        dev->attrs.max_qp_wr);
 749                newxprt->sc_rq_depth = dev->attrs.max_qp_wr;
 750                newxprt->sc_max_requests = newxprt->sc_rq_depth - 2;
 751                newxprt->sc_max_bc_requests = 2;
 752        }
 753        newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests);
 754        ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES);
 755        ctxts *= newxprt->sc_max_requests;
 756        newxprt->sc_sq_depth = newxprt->sc_rq_depth + ctxts;
 757        if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) {
 758                pr_warn("svcrdma: reducing send depth to %d\n",
 759                        dev->attrs.max_qp_wr);
 760                newxprt->sc_sq_depth = dev->attrs.max_qp_wr;
 761        }
 762        atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
 763
 764        if (!svc_rdma_prealloc_ctxts(newxprt))
 765                goto errout;
 766
 767        newxprt->sc_pd = ib_alloc_pd(dev, 0);
 768        if (IS_ERR(newxprt->sc_pd)) {
 769                dprintk("svcrdma: error creating PD for connect request\n");
 770                goto errout;
 771        }
 772        newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
 773                                        0, IB_POLL_WORKQUEUE);
 774        if (IS_ERR(newxprt->sc_sq_cq)) {
 775                dprintk("svcrdma: error creating SQ CQ for connect request\n");
 776                goto errout;
 777        }
 778        newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
 779                                        0, IB_POLL_WORKQUEUE);
 780        if (IS_ERR(newxprt->sc_rq_cq)) {
 781                dprintk("svcrdma: error creating RQ CQ for connect request\n");
 782                goto errout;
 783        }
 784
 785        memset(&qp_attr, 0, sizeof qp_attr);
 786        qp_attr.event_handler = qp_event_handler;
 787        qp_attr.qp_context = &newxprt->sc_xprt;
 788        qp_attr.port_num = newxprt->sc_port_num;
 789        qp_attr.cap.max_rdma_ctxs = ctxts;
 790        qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts;
 791        qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
 792        qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
 793        qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
 794        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 795        qp_attr.qp_type = IB_QPT_RC;
 796        qp_attr.send_cq = newxprt->sc_sq_cq;
 797        qp_attr.recv_cq = newxprt->sc_rq_cq;
 798        dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n",
 799                newxprt->sc_cm_id, newxprt->sc_pd);
 800        dprintk("    cap.max_send_wr = %d, cap.max_recv_wr = %d\n",
 801                qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr);
 802        dprintk("    cap.max_send_sge = %d, cap.max_recv_sge = %d\n",
 803                qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge);
 804
 805        ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
 806        if (ret) {
 807                dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
 808                goto errout;
 809        }
 810        newxprt->sc_qp = newxprt->sc_cm_id->qp;
 811
 812        if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 813                newxprt->sc_snd_w_inv = false;
 814        if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) &&
 815            !rdma_ib_or_roce(dev, newxprt->sc_port_num))
 816                goto errout;
 817
 818        /* Post receive buffers */
 819        for (i = 0; i < newxprt->sc_max_requests; i++) {
 820                ret = svc_rdma_post_recv(newxprt);
 821                if (ret) {
 822                        dprintk("svcrdma: failure posting receive buffers\n");
 823                        goto errout;
 824                }
 825        }
 826
 827        /* Swap out the handler */
 828        newxprt->sc_cm_id->event_handler = rdma_cma_handler;
 829
 830        /* Construct RDMA-CM private message */
 831        pmsg.cp_magic = rpcrdma_cmp_magic;
 832        pmsg.cp_version = RPCRDMA_CMP_VERSION;
 833        pmsg.cp_flags = 0;
 834        pmsg.cp_send_size = pmsg.cp_recv_size =
 835                rpcrdma_encode_buffer_size(newxprt->sc_max_req_size);
 836
 837        /* Accept Connection */
 838        set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
 839        memset(&conn_param, 0, sizeof conn_param);
 840        conn_param.responder_resources = 0;
 841        conn_param.initiator_depth = min_t(int, newxprt->sc_ord,
 842                                           dev->attrs.max_qp_init_rd_atom);
 843        if (!conn_param.initiator_depth) {
 844                dprintk("svcrdma: invalid ORD setting\n");
 845                ret = -EINVAL;
 846                goto errout;
 847        }
 848        conn_param.private_data = &pmsg;
 849        conn_param.private_data_len = sizeof(pmsg);
 850        ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
 851        if (ret)
 852                goto errout;
 853
 854        dprintk("svcrdma: new connection %p accepted:\n", newxprt);
 855        sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
 856        dprintk("    local address   : %pIS:%u\n", sap, rpc_get_port(sap));
 857        sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
 858        dprintk("    remote address  : %pIS:%u\n", sap, rpc_get_port(sap));
 859        dprintk("    max_sge         : %d\n", newxprt->sc_max_sge);
 860        dprintk("    sq_depth        : %d\n", newxprt->sc_sq_depth);
 861        dprintk("    rdma_rw_ctxs    : %d\n", ctxts);
 862        dprintk("    max_requests    : %d\n", newxprt->sc_max_requests);
 863        dprintk("    ord             : %d\n", conn_param.initiator_depth);
 864
 865        return &newxprt->sc_xprt;
 866
 867 errout:
 868        dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
 869        /* Take a reference in case the DTO handler runs */
 870        svc_xprt_get(&newxprt->sc_xprt);
 871        if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
 872                ib_destroy_qp(newxprt->sc_qp);
 873        rdma_destroy_id(newxprt->sc_cm_id);
 874        /* This call to put will destroy the transport */
 875        svc_xprt_put(&newxprt->sc_xprt);
 876        return NULL;
 877}
 878
 879static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
 880{
 881}
 882
 883/*
 884 * When connected, an svc_xprt has at least two references:
 885 *
 886 * - A reference held by the cm_id between the ESTABLISHED and
 887 *   DISCONNECTED events. If the remote peer disconnected first, this
 888 *   reference could be gone.
 889 *
 890 * - A reference held by the svc_recv code that called this function
 891 *   as part of close processing.
 892 *
 893 * At a minimum one references should still be held.
 894 */
 895static void svc_rdma_detach(struct svc_xprt *xprt)
 896{
 897        struct svcxprt_rdma *rdma =
 898                container_of(xprt, struct svcxprt_rdma, sc_xprt);
 899        dprintk("svc: svc_rdma_detach(%p)\n", xprt);
 900
 901        /* Disconnect and flush posted WQE */
 902        rdma_disconnect(rdma->sc_cm_id);
 903}
 904
 905static void __svc_rdma_free(struct work_struct *work)
 906{
 907        struct svcxprt_rdma *rdma =
 908                container_of(work, struct svcxprt_rdma, sc_work);
 909        struct svc_xprt *xprt = &rdma->sc_xprt;
 910
 911        dprintk("svcrdma: %s(%p)\n", __func__, rdma);
 912
 913        if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
 914                ib_drain_qp(rdma->sc_qp);
 915
 916        /* We should only be called from kref_put */
 917        if (kref_read(&xprt->xpt_ref) != 0)
 918                pr_err("svcrdma: sc_xprt still in use? (%d)\n",
 919                       kref_read(&xprt->xpt_ref));
 920
 921        while (!list_empty(&rdma->sc_read_complete_q)) {
 922                struct svc_rdma_op_ctxt *ctxt;
 923                ctxt = list_first_entry(&rdma->sc_read_complete_q,
 924                                        struct svc_rdma_op_ctxt, list);
 925                list_del(&ctxt->list);
 926                svc_rdma_put_context(ctxt, 1);
 927        }
 928        while (!list_empty(&rdma->sc_rq_dto_q)) {
 929                struct svc_rdma_op_ctxt *ctxt;
 930                ctxt = list_first_entry(&rdma->sc_rq_dto_q,
 931                                        struct svc_rdma_op_ctxt, list);
 932                list_del(&ctxt->list);
 933                svc_rdma_put_context(ctxt, 1);
 934        }
 935
 936        /* Warn if we leaked a resource or under-referenced */
 937        if (rdma->sc_ctxt_used != 0)
 938                pr_err("svcrdma: ctxt still in use? (%d)\n",
 939                       rdma->sc_ctxt_used);
 940
 941        /* Final put of backchannel client transport */
 942        if (xprt->xpt_bc_xprt) {
 943                xprt_put(xprt->xpt_bc_xprt);
 944                xprt->xpt_bc_xprt = NULL;
 945        }
 946
 947        svc_rdma_destroy_rw_ctxts(rdma);
 948        svc_rdma_destroy_ctxts(rdma);
 949
 950        /* Destroy the QP if present (not a listener) */
 951        if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
 952                ib_destroy_qp(rdma->sc_qp);
 953
 954        if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
 955                ib_free_cq(rdma->sc_sq_cq);
 956
 957        if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
 958                ib_free_cq(rdma->sc_rq_cq);
 959
 960        if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
 961                ib_dealloc_pd(rdma->sc_pd);
 962
 963        /* Destroy the CM ID */
 964        rdma_destroy_id(rdma->sc_cm_id);
 965
 966        kfree(rdma);
 967}
 968
 969static void svc_rdma_free(struct svc_xprt *xprt)
 970{
 971        struct svcxprt_rdma *rdma =
 972                container_of(xprt, struct svcxprt_rdma, sc_xprt);
 973        INIT_WORK(&rdma->sc_work, __svc_rdma_free);
 974        queue_work(svc_rdma_wq, &rdma->sc_work);
 975}
 976
 977static int svc_rdma_has_wspace(struct svc_xprt *xprt)
 978{
 979        struct svcxprt_rdma *rdma =
 980                container_of(xprt, struct svcxprt_rdma, sc_xprt);
 981
 982        /*
 983         * If there are already waiters on the SQ,
 984         * return false.
 985         */
 986        if (waitqueue_active(&rdma->sc_send_wait))
 987                return 0;
 988
 989        /* Otherwise return true. */
 990        return 1;
 991}
 992
 993static void svc_rdma_secure_port(struct svc_rqst *rqstp)
 994{
 995        set_bit(RQ_SECURE, &rqstp->rq_flags);
 996}
 997
 998static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
 999{
1000}
1001
1002int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1003{
1004        struct ib_send_wr *bad_wr, *n_wr;
1005        int wr_count;
1006        int i;
1007        int ret;
1008
1009        if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1010                return -ENOTCONN;
1011
1012        wr_count = 1;
1013        for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
1014                wr_count++;
1015
1016        /* If the SQ is full, wait until an SQ entry is available */
1017        while (1) {
1018                if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) {
1019                        atomic_inc(&rdma_stat_sq_starve);
1020
1021                        /* Wait until SQ WR available if SQ still full */
1022                        atomic_add(wr_count, &xprt->sc_sq_avail);
1023                        wait_event(xprt->sc_send_wait,
1024                                   atomic_read(&xprt->sc_sq_avail) > wr_count);
1025                        if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1026                                return -ENOTCONN;
1027                        continue;
1028                }
1029                /* Take a transport ref for each WR posted */
1030                for (i = 0; i < wr_count; i++)
1031                        svc_xprt_get(&xprt->sc_xprt);
1032
1033                /* Bump used SQ WR count and post */
1034                ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
1035                if (ret) {
1036                        set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
1037                        for (i = 0; i < wr_count; i ++)
1038                                svc_xprt_put(&xprt->sc_xprt);
1039                        dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret);
1040                        dprintk("    sc_sq_avail=%d, sc_sq_depth=%d\n",
1041                                atomic_read(&xprt->sc_sq_avail),
1042                                xprt->sc_sq_depth);
1043                        wake_up(&xprt->sc_send_wait);
1044                }
1045                break;
1046        }
1047        return ret;
1048}
1049