linux/net/9p/trans_rdma.c
<<
>>
Prefs
   1/*
   2 * linux/fs/9p/trans_rdma.c
   3 *
   4 * RDMA transport layer based on the trans_fd.c implementation.
   5 *
   6 *  Copyright (C) 2008 by Tom Tucker <tom@opengridcomputing.com>
   7 *  Copyright (C) 2006 by Russ Cox <rsc@swtch.com>
   8 *  Copyright (C) 2004-2005 by Latchesar Ionkov <lucho@ionkov.net>
   9 *  Copyright (C) 2004-2008 by Eric Van Hensbergen <ericvh@gmail.com>
  10 *  Copyright (C) 1997-2002 by Ron Minnich <rminnich@sarnoff.com>
  11 *
  12 *  This program is free software; you can redistribute it and/or modify
  13 *  it under the terms of the GNU General Public License version 2
  14 *  as published by the Free Software Foundation.
  15 *
  16 *  This program is distributed in the hope that it will be useful,
  17 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  18 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  19 *  GNU General Public License for more details.
  20 *
  21 *  You should have received a copy of the GNU General Public License
  22 *  along with this program; if not, write to:
  23 *  Free Software Foundation
  24 *  51 Franklin Street, Fifth Floor
  25 *  Boston, MA  02111-1301  USA
  26 *
  27 */
  28
  29#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  30
  31#include <linux/in.h>
  32#include <linux/module.h>
  33#include <linux/net.h>
  34#include <linux/ipv6.h>
  35#include <linux/kthread.h>
  36#include <linux/errno.h>
  37#include <linux/kernel.h>
  38#include <linux/un.h>
  39#include <linux/uaccess.h>
  40#include <linux/inet.h>
  41#include <linux/idr.h>
  42#include <linux/file.h>
  43#include <linux/parser.h>
  44#include <linux/semaphore.h>
  45#include <linux/slab.h>
  46#include <net/9p/9p.h>
  47#include <net/9p/client.h>
  48#include <net/9p/transport.h>
  49#include <rdma/ib_verbs.h>
  50#include <rdma/rdma_cm.h>
  51
  52#define P9_PORT                 5640
  53#define P9_RDMA_SQ_DEPTH        32
  54#define P9_RDMA_RQ_DEPTH        32
  55#define P9_RDMA_SEND_SGE        4
  56#define P9_RDMA_RECV_SGE        4
  57#define P9_RDMA_IRD             0
  58#define P9_RDMA_ORD             0
  59#define P9_RDMA_TIMEOUT         30000           /* 30 seconds */
  60#define P9_RDMA_MAXSIZE         (1024*1024)     /* 1MB */
  61
  62/**
  63 * struct p9_trans_rdma - RDMA transport instance
  64 *
  65 * @state: tracks the transport state machine for connection setup and tear down
  66 * @cm_id: The RDMA CM ID
  67 * @pd: Protection Domain pointer
  68 * @qp: Queue Pair pointer
  69 * @cq: Completion Queue pointer
  70 * @dm_mr: DMA Memory Region pointer
  71 * @lkey: The local access only memory region key
  72 * @timeout: Number of uSecs to wait for connection management events
  73 * @sq_depth: The depth of the Send Queue
  74 * @sq_sem: Semaphore for the SQ
  75 * @rq_depth: The depth of the Receive Queue.
  76 * @rq_sem: Semaphore for the RQ
  77 * @excess_rc : Amount of posted Receive Contexts without a pending request.
  78 *              See rdma_request()
  79 * @addr: The remote peer's address
  80 * @req_lock: Protects the active request list
  81 * @cm_done: Completion event for connection management tracking
  82 */
  83struct p9_trans_rdma {
  84        enum {
  85                P9_RDMA_INIT,
  86                P9_RDMA_ADDR_RESOLVED,
  87                P9_RDMA_ROUTE_RESOLVED,
  88                P9_RDMA_CONNECTED,
  89                P9_RDMA_FLUSHING,
  90                P9_RDMA_CLOSING,
  91                P9_RDMA_CLOSED,
  92        } state;
  93        struct rdma_cm_id *cm_id;
  94        struct ib_pd *pd;
  95        struct ib_qp *qp;
  96        struct ib_cq *cq;
  97        struct ib_mr *dma_mr;
  98        u32 lkey;
  99        long timeout;
 100        int sq_depth;
 101        struct semaphore sq_sem;
 102        int rq_depth;
 103        struct semaphore rq_sem;
 104        atomic_t excess_rc;
 105        struct sockaddr_in addr;
 106        spinlock_t req_lock;
 107
 108        struct completion cm_done;
 109};
 110
 111/**
 112 * p9_rdma_context - Keeps track of in-process WR
 113 *
 114 * @wc_op: The original WR op for when the CQE completes in error.
 115 * @busa: Bus address to unmap when the WR completes
 116 * @req: Keeps track of requests (send)
 117 * @rc: Keepts track of replies (receive)
 118 */
 119struct p9_rdma_req;
 120struct p9_rdma_context {
 121        enum ib_wc_opcode wc_op;
 122        dma_addr_t busa;
 123        union {
 124                struct p9_req_t *req;
 125                struct p9_fcall *rc;
 126        };
 127};
 128
 129/**
 130 * p9_rdma_opts - Collection of mount options
 131 * @port: port of connection
 132 * @sq_depth: The requested depth of the SQ. This really doesn't need
 133 * to be any deeper than the number of threads used in the client
 134 * @rq_depth: The depth of the RQ. Should be greater than or equal to SQ depth
 135 * @timeout: Time to wait in msecs for CM events
 136 */
 137struct p9_rdma_opts {
 138        short port;
 139        int sq_depth;
 140        int rq_depth;
 141        long timeout;
 142};
 143
 144/*
 145 * Option Parsing (code inspired by NFS code)
 146 */
 147enum {
 148        /* Options that take integer arguments */
 149        Opt_port, Opt_rq_depth, Opt_sq_depth, Opt_timeout, Opt_err,
 150};
 151
 152static match_table_t tokens = {
 153        {Opt_port, "port=%u"},
 154        {Opt_sq_depth, "sq=%u"},
 155        {Opt_rq_depth, "rq=%u"},
 156        {Opt_timeout, "timeout=%u"},
 157        {Opt_err, NULL},
 158};
 159
 160/**
 161 * parse_opts - parse mount options into rdma options structure
 162 * @params: options string passed from mount
 163 * @opts: rdma transport-specific structure to parse options into
 164 *
 165 * Returns 0 upon success, -ERRNO upon failure
 166 */
 167static int parse_opts(char *params, struct p9_rdma_opts *opts)
 168{
 169        char *p;
 170        substring_t args[MAX_OPT_ARGS];
 171        int option;
 172        char *options, *tmp_options;
 173
 174        opts->port = P9_PORT;
 175        opts->sq_depth = P9_RDMA_SQ_DEPTH;
 176        opts->rq_depth = P9_RDMA_RQ_DEPTH;
 177        opts->timeout = P9_RDMA_TIMEOUT;
 178
 179        if (!params)
 180                return 0;
 181
 182        tmp_options = kstrdup(params, GFP_KERNEL);
 183        if (!tmp_options) {
 184                p9_debug(P9_DEBUG_ERROR,
 185                         "failed to allocate copy of option string\n");
 186                return -ENOMEM;
 187        }
 188        options = tmp_options;
 189
 190        while ((p = strsep(&options, ",")) != NULL) {
 191                int token;
 192                int r;
 193                if (!*p)
 194                        continue;
 195                token = match_token(p, tokens, args);
 196                if (token == Opt_err)
 197                        continue;
 198                r = match_int(&args[0], &option);
 199                if (r < 0) {
 200                        p9_debug(P9_DEBUG_ERROR,
 201                                 "integer field, but no integer?\n");
 202                        continue;
 203                }
 204                switch (token) {
 205                case Opt_port:
 206                        opts->port = option;
 207                        break;
 208                case Opt_sq_depth:
 209                        opts->sq_depth = option;
 210                        break;
 211                case Opt_rq_depth:
 212                        opts->rq_depth = option;
 213                        break;
 214                case Opt_timeout:
 215                        opts->timeout = option;
 216                        break;
 217                default:
 218                        continue;
 219                }
 220        }
 221        /* RQ must be at least as large as the SQ */
 222        opts->rq_depth = max(opts->rq_depth, opts->sq_depth);
 223        kfree(tmp_options);
 224        return 0;
 225}
 226
 227static int
 228p9_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 229{
 230        struct p9_client *c = id->context;
 231        struct p9_trans_rdma *rdma = c->trans;
 232        switch (event->event) {
 233        case RDMA_CM_EVENT_ADDR_RESOLVED:
 234                BUG_ON(rdma->state != P9_RDMA_INIT);
 235                rdma->state = P9_RDMA_ADDR_RESOLVED;
 236                break;
 237
 238        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 239                BUG_ON(rdma->state != P9_RDMA_ADDR_RESOLVED);
 240                rdma->state = P9_RDMA_ROUTE_RESOLVED;
 241                break;
 242
 243        case RDMA_CM_EVENT_ESTABLISHED:
 244                BUG_ON(rdma->state != P9_RDMA_ROUTE_RESOLVED);
 245                rdma->state = P9_RDMA_CONNECTED;
 246                break;
 247
 248        case RDMA_CM_EVENT_DISCONNECTED:
 249                if (rdma)
 250                        rdma->state = P9_RDMA_CLOSED;
 251                if (c)
 252                        c->status = Disconnected;
 253                break;
 254
 255        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
 256                break;
 257
 258        case RDMA_CM_EVENT_ADDR_CHANGE:
 259        case RDMA_CM_EVENT_ROUTE_ERROR:
 260        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 261        case RDMA_CM_EVENT_MULTICAST_JOIN:
 262        case RDMA_CM_EVENT_MULTICAST_ERROR:
 263        case RDMA_CM_EVENT_REJECTED:
 264        case RDMA_CM_EVENT_CONNECT_REQUEST:
 265        case RDMA_CM_EVENT_CONNECT_RESPONSE:
 266        case RDMA_CM_EVENT_CONNECT_ERROR:
 267        case RDMA_CM_EVENT_ADDR_ERROR:
 268        case RDMA_CM_EVENT_UNREACHABLE:
 269                c->status = Disconnected;
 270                rdma_disconnect(rdma->cm_id);
 271                break;
 272        default:
 273                BUG();
 274        }
 275        complete(&rdma->cm_done);
 276        return 0;
 277}
 278
 279static void
 280handle_recv(struct p9_client *client, struct p9_trans_rdma *rdma,
 281            struct p9_rdma_context *c, enum ib_wc_status status, u32 byte_len)
 282{
 283        struct p9_req_t *req;
 284        int err = 0;
 285        int16_t tag;
 286
 287        req = NULL;
 288        ib_dma_unmap_single(rdma->cm_id->device, c->busa, client->msize,
 289                                                         DMA_FROM_DEVICE);
 290
 291        if (status != IB_WC_SUCCESS)
 292                goto err_out;
 293
 294        err = p9_parse_header(c->rc, NULL, NULL, &tag, 1);
 295        if (err)
 296                goto err_out;
 297
 298        req = p9_tag_lookup(client, tag);
 299        if (!req)
 300                goto err_out;
 301
 302        /* Check that we have not yet received a reply for this request.
 303         */
 304        if (unlikely(req->rc)) {
 305                pr_err("Duplicate reply for request %d", tag);
 306                goto err_out;
 307        }
 308
 309        req->rc = c->rc;
 310        p9_client_cb(client, req, REQ_STATUS_RCVD);
 311
 312        return;
 313
 314 err_out:
 315        p9_debug(P9_DEBUG_ERROR, "req %p err %d status %d\n", req, err, status);
 316        rdma->state = P9_RDMA_FLUSHING;
 317        client->status = Disconnected;
 318}
 319
 320static void
 321handle_send(struct p9_client *client, struct p9_trans_rdma *rdma,
 322            struct p9_rdma_context *c, enum ib_wc_status status, u32 byte_len)
 323{
 324        ib_dma_unmap_single(rdma->cm_id->device,
 325                            c->busa, c->req->tc->size,
 326                            DMA_TO_DEVICE);
 327}
 328
 329static void qp_event_handler(struct ib_event *event, void *context)
 330{
 331        p9_debug(P9_DEBUG_ERROR, "QP event %d context %p\n",
 332                 event->event, context);
 333}
 334
 335static void cq_comp_handler(struct ib_cq *cq, void *cq_context)
 336{
 337        struct p9_client *client = cq_context;
 338        struct p9_trans_rdma *rdma = client->trans;
 339        int ret;
 340        struct ib_wc wc;
 341
 342        ib_req_notify_cq(rdma->cq, IB_CQ_NEXT_COMP);
 343        while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
 344                struct p9_rdma_context *c = (void *) (unsigned long) wc.wr_id;
 345
 346                switch (c->wc_op) {
 347                case IB_WC_RECV:
 348                        handle_recv(client, rdma, c, wc.status, wc.byte_len);
 349                        up(&rdma->rq_sem);
 350                        break;
 351
 352                case IB_WC_SEND:
 353                        handle_send(client, rdma, c, wc.status, wc.byte_len);
 354                        up(&rdma->sq_sem);
 355                        break;
 356
 357                default:
 358                        pr_err("unexpected completion type, c->wc_op=%d, wc.opcode=%d, status=%d\n",
 359                               c->wc_op, wc.opcode, wc.status);
 360                        break;
 361                }
 362                kfree(c);
 363        }
 364}
 365
 366static void cq_event_handler(struct ib_event *e, void *v)
 367{
 368        p9_debug(P9_DEBUG_ERROR, "CQ event %d context %p\n", e->event, v);
 369}
 370
 371static void rdma_destroy_trans(struct p9_trans_rdma *rdma)
 372{
 373        if (!rdma)
 374                return;
 375
 376        if (rdma->dma_mr && !IS_ERR(rdma->dma_mr))
 377                ib_dereg_mr(rdma->dma_mr);
 378
 379        if (rdma->qp && !IS_ERR(rdma->qp))
 380                ib_destroy_qp(rdma->qp);
 381
 382        if (rdma->pd && !IS_ERR(rdma->pd))
 383                ib_dealloc_pd(rdma->pd);
 384
 385        if (rdma->cq && !IS_ERR(rdma->cq))
 386                ib_destroy_cq(rdma->cq);
 387
 388        if (rdma->cm_id && !IS_ERR(rdma->cm_id))
 389                rdma_destroy_id(rdma->cm_id);
 390
 391        kfree(rdma);
 392}
 393
 394static int
 395post_recv(struct p9_client *client, struct p9_rdma_context *c)
 396{
 397        struct p9_trans_rdma *rdma = client->trans;
 398        struct ib_recv_wr wr, *bad_wr;
 399        struct ib_sge sge;
 400
 401        c->busa = ib_dma_map_single(rdma->cm_id->device,
 402                                    c->rc->sdata, client->msize,
 403                                    DMA_FROM_DEVICE);
 404        if (ib_dma_mapping_error(rdma->cm_id->device, c->busa))
 405                goto error;
 406
 407        sge.addr = c->busa;
 408        sge.length = client->msize;
 409        sge.lkey = rdma->lkey;
 410
 411        wr.next = NULL;
 412        c->wc_op = IB_WC_RECV;
 413        wr.wr_id = (unsigned long) c;
 414        wr.sg_list = &sge;
 415        wr.num_sge = 1;
 416        return ib_post_recv(rdma->qp, &wr, &bad_wr);
 417
 418 error:
 419        p9_debug(P9_DEBUG_ERROR, "EIO\n");
 420        return -EIO;
 421}
 422
 423static int rdma_request(struct p9_client *client, struct p9_req_t *req)
 424{
 425        struct p9_trans_rdma *rdma = client->trans;
 426        struct ib_send_wr wr, *bad_wr;
 427        struct ib_sge sge;
 428        int err = 0;
 429        unsigned long flags;
 430        struct p9_rdma_context *c = NULL;
 431        struct p9_rdma_context *rpl_context = NULL;
 432
 433        /* When an error occurs between posting the recv and the send,
 434         * there will be a receive context posted without a pending request.
 435         * Since there is no way to "un-post" it, we remember it and skip
 436         * post_recv() for the next request.
 437         * So here,
 438         * see if we are this `next request' and need to absorb an excess rc.
 439         * If yes, then drop and free our own, and do not recv_post().
 440         **/
 441        if (unlikely(atomic_read(&rdma->excess_rc) > 0)) {
 442                if ((atomic_sub_return(1, &rdma->excess_rc) >= 0)) {
 443                        /* Got one ! */
 444                        kfree(req->rc);
 445                        req->rc = NULL;
 446                        goto dont_need_post_recv;
 447                } else {
 448                        /* We raced and lost. */
 449                        atomic_inc(&rdma->excess_rc);
 450                }
 451        }
 452
 453        /* Allocate an fcall for the reply */
 454        rpl_context = kmalloc(sizeof *rpl_context, GFP_NOFS);
 455        if (!rpl_context) {
 456                err = -ENOMEM;
 457                goto recv_error;
 458        }
 459        rpl_context->rc = req->rc;
 460
 461        /*
 462         * Post a receive buffer for this request. We need to ensure
 463         * there is a reply buffer available for every outstanding
 464         * request. A flushed request can result in no reply for an
 465         * outstanding request, so we must keep a count to avoid
 466         * overflowing the RQ.
 467         */
 468        if (down_interruptible(&rdma->rq_sem)) {
 469                err = -EINTR;
 470                goto recv_error;
 471        }
 472
 473        err = post_recv(client, rpl_context);
 474        if (err) {
 475                p9_debug(P9_DEBUG_FCALL, "POST RECV failed\n");
 476                goto recv_error;
 477        }
 478        /* remove posted receive buffer from request structure */
 479        req->rc = NULL;
 480
 481dont_need_post_recv:
 482        /* Post the request */
 483        c = kmalloc(sizeof *c, GFP_NOFS);
 484        if (!c) {
 485                err = -ENOMEM;
 486                goto send_error;
 487        }
 488        c->req = req;
 489
 490        c->busa = ib_dma_map_single(rdma->cm_id->device,
 491                                    c->req->tc->sdata, c->req->tc->size,
 492                                    DMA_TO_DEVICE);
 493        if (ib_dma_mapping_error(rdma->cm_id->device, c->busa)) {
 494                err = -EIO;
 495                goto send_error;
 496        }
 497
 498        sge.addr = c->busa;
 499        sge.length = c->req->tc->size;
 500        sge.lkey = rdma->lkey;
 501
 502        wr.next = NULL;
 503        c->wc_op = IB_WC_SEND;
 504        wr.wr_id = (unsigned long) c;
 505        wr.opcode = IB_WR_SEND;
 506        wr.send_flags = IB_SEND_SIGNALED;
 507        wr.sg_list = &sge;
 508        wr.num_sge = 1;
 509
 510        if (down_interruptible(&rdma->sq_sem)) {
 511                err = -EINTR;
 512                goto send_error;
 513        }
 514
 515        /* Mark request as `sent' *before* we actually send it,
 516         * because doing if after could erase the REQ_STATUS_RCVD
 517         * status in case of a very fast reply.
 518         */
 519        req->status = REQ_STATUS_SENT;
 520        err = ib_post_send(rdma->qp, &wr, &bad_wr);
 521        if (err)
 522                goto send_error;
 523
 524        /* Success */
 525        return 0;
 526
 527 /* Handle errors that happened during or while preparing the send: */
 528 send_error:
 529        req->status = REQ_STATUS_ERROR;
 530        kfree(c);
 531        p9_debug(P9_DEBUG_ERROR, "Error %d in rdma_request()\n", err);
 532
 533        /* Ach.
 534         *  We did recv_post(), but not send. We have one recv_post in excess.
 535         */
 536        atomic_inc(&rdma->excess_rc);
 537        return err;
 538
 539 /* Handle errors that happened during or while preparing post_recv(): */
 540 recv_error:
 541        kfree(rpl_context);
 542        spin_lock_irqsave(&rdma->req_lock, flags);
 543        if (rdma->state < P9_RDMA_CLOSING) {
 544                rdma->state = P9_RDMA_CLOSING;
 545                spin_unlock_irqrestore(&rdma->req_lock, flags);
 546                rdma_disconnect(rdma->cm_id);
 547        } else
 548                spin_unlock_irqrestore(&rdma->req_lock, flags);
 549        return err;
 550}
 551
 552static void rdma_close(struct p9_client *client)
 553{
 554        struct p9_trans_rdma *rdma;
 555
 556        if (!client)
 557                return;
 558
 559        rdma = client->trans;
 560        if (!rdma)
 561                return;
 562
 563        client->status = Disconnected;
 564        rdma_disconnect(rdma->cm_id);
 565        rdma_destroy_trans(rdma);
 566}
 567
 568/**
 569 * alloc_rdma - Allocate and initialize the rdma transport structure
 570 * @opts: Mount options structure
 571 */
 572static struct p9_trans_rdma *alloc_rdma(struct p9_rdma_opts *opts)
 573{
 574        struct p9_trans_rdma *rdma;
 575
 576        rdma = kzalloc(sizeof(struct p9_trans_rdma), GFP_KERNEL);
 577        if (!rdma)
 578                return NULL;
 579
 580        rdma->sq_depth = opts->sq_depth;
 581        rdma->rq_depth = opts->rq_depth;
 582        rdma->timeout = opts->timeout;
 583        spin_lock_init(&rdma->req_lock);
 584        init_completion(&rdma->cm_done);
 585        sema_init(&rdma->sq_sem, rdma->sq_depth);
 586        sema_init(&rdma->rq_sem, rdma->rq_depth);
 587        atomic_set(&rdma->excess_rc, 0);
 588
 589        return rdma;
 590}
 591
 592static int rdma_cancel(struct p9_client *client, struct p9_req_t *req)
 593{
 594        /* Nothing to do here.
 595         * We will take care of it (if we have to) in rdma_cancelled()
 596         */
 597        return 1;
 598}
 599
 600/* A request has been fully flushed without a reply.
 601 * That means we have posted one buffer in excess.
 602 */
 603static int rdma_cancelled(struct p9_client *client, struct p9_req_t *req)
 604{
 605        struct p9_trans_rdma *rdma = client->trans;
 606        atomic_inc(&rdma->excess_rc);
 607        return 0;
 608}
 609
 610/**
 611 * trans_create_rdma - Transport method for creating atransport instance
 612 * @client: client instance
 613 * @addr: IP address string
 614 * @args: Mount options string
 615 */
 616static int
 617rdma_create_trans(struct p9_client *client, const char *addr, char *args)
 618{
 619        int err;
 620        struct p9_rdma_opts opts;
 621        struct p9_trans_rdma *rdma;
 622        struct rdma_conn_param conn_param;
 623        struct ib_qp_init_attr qp_attr;
 624        struct ib_device_attr devattr;
 625
 626        /* Parse the transport specific mount options */
 627        err = parse_opts(args, &opts);
 628        if (err < 0)
 629                return err;
 630
 631        /* Create and initialize the RDMA transport structure */
 632        rdma = alloc_rdma(&opts);
 633        if (!rdma)
 634                return -ENOMEM;
 635
 636        /* Create the RDMA CM ID */
 637        rdma->cm_id = rdma_create_id(p9_cm_event_handler, client, RDMA_PS_TCP,
 638                                     IB_QPT_RC);
 639        if (IS_ERR(rdma->cm_id))
 640                goto error;
 641
 642        /* Associate the client with the transport */
 643        client->trans = rdma;
 644
 645        /* Resolve the server's address */
 646        rdma->addr.sin_family = AF_INET;
 647        rdma->addr.sin_addr.s_addr = in_aton(addr);
 648        rdma->addr.sin_port = htons(opts.port);
 649        err = rdma_resolve_addr(rdma->cm_id, NULL,
 650                                (struct sockaddr *)&rdma->addr,
 651                                rdma->timeout);
 652        if (err)
 653                goto error;
 654        err = wait_for_completion_interruptible(&rdma->cm_done);
 655        if (err || (rdma->state != P9_RDMA_ADDR_RESOLVED))
 656                goto error;
 657
 658        /* Resolve the route to the server */
 659        err = rdma_resolve_route(rdma->cm_id, rdma->timeout);
 660        if (err)
 661                goto error;
 662        err = wait_for_completion_interruptible(&rdma->cm_done);
 663        if (err || (rdma->state != P9_RDMA_ROUTE_RESOLVED))
 664                goto error;
 665
 666        /* Query the device attributes */
 667        err = ib_query_device(rdma->cm_id->device, &devattr);
 668        if (err)
 669                goto error;
 670
 671        /* Create the Completion Queue */
 672        rdma->cq = ib_create_cq(rdma->cm_id->device, cq_comp_handler,
 673                                cq_event_handler, client,
 674                                opts.sq_depth + opts.rq_depth + 1, 0);
 675        if (IS_ERR(rdma->cq))
 676                goto error;
 677        ib_req_notify_cq(rdma->cq, IB_CQ_NEXT_COMP);
 678
 679        /* Create the Protection Domain */
 680        rdma->pd = ib_alloc_pd(rdma->cm_id->device);
 681        if (IS_ERR(rdma->pd))
 682                goto error;
 683
 684        /* Cache the DMA lkey in the transport */
 685        rdma->dma_mr = NULL;
 686        if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)
 687                rdma->lkey = rdma->cm_id->device->local_dma_lkey;
 688        else {
 689                rdma->dma_mr = ib_get_dma_mr(rdma->pd, IB_ACCESS_LOCAL_WRITE);
 690                if (IS_ERR(rdma->dma_mr))
 691                        goto error;
 692                rdma->lkey = rdma->dma_mr->lkey;
 693        }
 694
 695        /* Create the Queue Pair */
 696        memset(&qp_attr, 0, sizeof qp_attr);
 697        qp_attr.event_handler = qp_event_handler;
 698        qp_attr.qp_context = client;
 699        qp_attr.cap.max_send_wr = opts.sq_depth;
 700        qp_attr.cap.max_recv_wr = opts.rq_depth;
 701        qp_attr.cap.max_send_sge = P9_RDMA_SEND_SGE;
 702        qp_attr.cap.max_recv_sge = P9_RDMA_RECV_SGE;
 703        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 704        qp_attr.qp_type = IB_QPT_RC;
 705        qp_attr.send_cq = rdma->cq;
 706        qp_attr.recv_cq = rdma->cq;
 707        err = rdma_create_qp(rdma->cm_id, rdma->pd, &qp_attr);
 708        if (err)
 709                goto error;
 710        rdma->qp = rdma->cm_id->qp;
 711
 712        /* Request a connection */
 713        memset(&conn_param, 0, sizeof(conn_param));
 714        conn_param.private_data = NULL;
 715        conn_param.private_data_len = 0;
 716        conn_param.responder_resources = P9_RDMA_IRD;
 717        conn_param.initiator_depth = P9_RDMA_ORD;
 718        err = rdma_connect(rdma->cm_id, &conn_param);
 719        if (err)
 720                goto error;
 721        err = wait_for_completion_interruptible(&rdma->cm_done);
 722        if (err || (rdma->state != P9_RDMA_CONNECTED))
 723                goto error;
 724
 725        client->status = Connected;
 726
 727        return 0;
 728
 729error:
 730        rdma_destroy_trans(rdma);
 731        return -ENOTCONN;
 732}
 733
 734static struct p9_trans_module p9_rdma_trans = {
 735        .name = "rdma",
 736        .maxsize = P9_RDMA_MAXSIZE,
 737        .def = 0,
 738        .owner = THIS_MODULE,
 739        .create = rdma_create_trans,
 740        .close = rdma_close,
 741        .request = rdma_request,
 742        .cancel = rdma_cancel,
 743        .cancelled = rdma_cancelled,
 744};
 745
 746/**
 747 * p9_trans_rdma_init - Register the 9P RDMA transport driver
 748 */
 749static int __init p9_trans_rdma_init(void)
 750{
 751        v9fs_register_trans(&p9_rdma_trans);
 752        return 0;
 753}
 754
 755static void __exit p9_trans_rdma_exit(void)
 756{
 757        v9fs_unregister_trans(&p9_rdma_trans);
 758}
 759
 760module_init(p9_trans_rdma_init);
 761module_exit(p9_trans_rdma_exit);
 762
 763MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
 764MODULE_DESCRIPTION("RDMA Transport for 9P");
 765MODULE_LICENSE("Dual BSD/GPL");
 766