linux/net/9p/trans_rdma.c
<<
>>
Prefs
   1/*
   2 * linux/fs/9p/trans_rdma.c
   3 *
   4 * RDMA transport layer based on the trans_fd.c implementation.
   5 *
   6 *  Copyright (C) 2008 by Tom Tucker <tom@opengridcomputing.com>
   7 *  Copyright (C) 2006 by Russ Cox <rsc@swtch.com>
   8 *  Copyright (C) 2004-2005 by Latchesar Ionkov <lucho@ionkov.net>
   9 *  Copyright (C) 2004-2008 by Eric Van Hensbergen <ericvh@gmail.com>
  10 *  Copyright (C) 1997-2002 by Ron Minnich <rminnich@sarnoff.com>
  11 *
  12 *  This program is free software; you can redistribute it and/or modify
  13 *  it under the terms of the GNU General Public License version 2
  14 *  as published by the Free Software Foundation.
  15 *
  16 *  This program is distributed in the hope that it will be useful,
  17 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  18 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  19 *  GNU General Public License for more details.
  20 *
  21 *  You should have received a copy of the GNU General Public License
  22 *  along with this program; if not, write to:
  23 *  Free Software Foundation
  24 *  51 Franklin Street, Fifth Floor
  25 *  Boston, MA  02111-1301  USA
  26 *
  27 */
  28
  29#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  30
  31#include <linux/in.h>
  32#include <linux/module.h>
  33#include <linux/net.h>
  34#include <linux/ipv6.h>
  35#include <linux/kthread.h>
  36#include <linux/errno.h>
  37#include <linux/kernel.h>
  38#include <linux/un.h>
  39#include <linux/uaccess.h>
  40#include <linux/inet.h>
  41#include <linux/idr.h>
  42#include <linux/file.h>
  43#include <linux/parser.h>
  44#include <linux/semaphore.h>
  45#include <linux/slab.h>
  46#include <net/9p/9p.h>
  47#include <net/9p/client.h>
  48#include <net/9p/transport.h>
  49#include <rdma/ib_verbs.h>
  50#include <rdma/rdma_cm.h>
  51
  52#define P9_PORT                 5640
  53#define P9_RDMA_SQ_DEPTH        32
  54#define P9_RDMA_RQ_DEPTH        32
  55#define P9_RDMA_SEND_SGE        4
  56#define P9_RDMA_RECV_SGE        4
  57#define P9_RDMA_IRD             0
  58#define P9_RDMA_ORD             0
  59#define P9_RDMA_TIMEOUT         30000           /* 30 seconds */
  60#define P9_RDMA_MAXSIZE         (1024*1024)     /* 1MB */
  61
  62/**
  63 * struct p9_trans_rdma - RDMA transport instance
  64 *
  65 * @state: tracks the transport state machine for connection setup and tear down
  66 * @cm_id: The RDMA CM ID
  67 * @pd: Protection Domain pointer
  68 * @qp: Queue Pair pointer
  69 * @cq: Completion Queue pointer
  70 * @dm_mr: DMA Memory Region pointer
  71 * @lkey: The local access only memory region key
  72 * @timeout: Number of uSecs to wait for connection management events
  73 * @sq_depth: The depth of the Send Queue
  74 * @sq_sem: Semaphore for the SQ
  75 * @rq_depth: The depth of the Receive Queue.
  76 * @rq_sem: Semaphore for the RQ
  77 * @excess_rc : Amount of posted Receive Contexts without a pending request.
  78 *              See rdma_request()
  79 * @addr: The remote peer's address
  80 * @req_lock: Protects the active request list
  81 * @cm_done: Completion event for connection management tracking
  82 */
  83struct p9_trans_rdma {
  84        enum {
  85                P9_RDMA_INIT,
  86                P9_RDMA_ADDR_RESOLVED,
  87                P9_RDMA_ROUTE_RESOLVED,
  88                P9_RDMA_CONNECTED,
  89                P9_RDMA_FLUSHING,
  90                P9_RDMA_CLOSING,
  91                P9_RDMA_CLOSED,
  92        } state;
  93        struct rdma_cm_id *cm_id;
  94        struct ib_pd *pd;
  95        struct ib_qp *qp;
  96        struct ib_cq *cq;
  97        struct ib_mr *dma_mr;
  98        u32 lkey;
  99        long timeout;
 100        int sq_depth;
 101        struct semaphore sq_sem;
 102        int rq_depth;
 103        struct semaphore rq_sem;
 104        atomic_t excess_rc;
 105        struct sockaddr_in addr;
 106        spinlock_t req_lock;
 107
 108        struct completion cm_done;
 109};
 110
 111/**
 112 * p9_rdma_context - Keeps track of in-process WR
 113 *
 114 * @wc_op: The original WR op for when the CQE completes in error.
 115 * @busa: Bus address to unmap when the WR completes
 116 * @req: Keeps track of requests (send)
 117 * @rc: Keepts track of replies (receive)
 118 */
 119struct p9_rdma_req;
 120struct p9_rdma_context {
 121        enum ib_wc_opcode wc_op;
 122        dma_addr_t busa;
 123        union {
 124                struct p9_req_t *req;
 125                struct p9_fcall *rc;
 126        };
 127};
 128
 129/**
 130 * p9_rdma_opts - Collection of mount options
 131 * @port: port of connection
 132 * @sq_depth: The requested depth of the SQ. This really doesn't need
 133 * to be any deeper than the number of threads used in the client
 134 * @rq_depth: The depth of the RQ. Should be greater than or equal to SQ depth
 135 * @timeout: Time to wait in msecs for CM events
 136 */
 137struct p9_rdma_opts {
 138        short port;
 139        int sq_depth;
 140        int rq_depth;
 141        long timeout;
 142};
 143
 144/*
 145 * Option Parsing (code inspired by NFS code)
 146 */
 147enum {
 148        /* Options that take integer arguments */
 149        Opt_port, Opt_rq_depth, Opt_sq_depth, Opt_timeout, Opt_err,
 150};
 151
 152static match_table_t tokens = {
 153        {Opt_port, "port=%u"},
 154        {Opt_sq_depth, "sq=%u"},
 155        {Opt_rq_depth, "rq=%u"},
 156        {Opt_timeout, "timeout=%u"},
 157        {Opt_err, NULL},
 158};
 159
 160/**
 161 * parse_opts - parse mount options into rdma options structure
 162 * @params: options string passed from mount
 163 * @opts: rdma transport-specific structure to parse options into
 164 *
 165 * Returns 0 upon success, -ERRNO upon failure
 166 */
 167static int parse_opts(char *params, struct p9_rdma_opts *opts)
 168{
 169        char *p;
 170        substring_t args[MAX_OPT_ARGS];
 171        int option;
 172        char *options, *tmp_options;
 173
 174        opts->port = P9_PORT;
 175        opts->sq_depth = P9_RDMA_SQ_DEPTH;
 176        opts->rq_depth = P9_RDMA_RQ_DEPTH;
 177        opts->timeout = P9_RDMA_TIMEOUT;
 178
 179        if (!params)
 180                return 0;
 181
 182        tmp_options = kstrdup(params, GFP_KERNEL);
 183        if (!tmp_options) {
 184                p9_debug(P9_DEBUG_ERROR,
 185                         "failed to allocate copy of option string\n");
 186                return -ENOMEM;
 187        }
 188        options = tmp_options;
 189
 190        while ((p = strsep(&options, ",")) != NULL) {
 191                int token;
 192                int r;
 193                if (!*p)
 194                        continue;
 195                token = match_token(p, tokens, args);
 196                r = match_int(&args[0], &option);
 197                if (r < 0) {
 198                        p9_debug(P9_DEBUG_ERROR,
 199                                 "integer field, but no integer?\n");
 200                        continue;
 201                }
 202                switch (token) {
 203                case Opt_port:
 204                        opts->port = option;
 205                        break;
 206                case Opt_sq_depth:
 207                        opts->sq_depth = option;
 208                        break;
 209                case Opt_rq_depth:
 210                        opts->rq_depth = option;
 211                        break;
 212                case Opt_timeout:
 213                        opts->timeout = option;
 214                        break;
 215                default:
 216                        continue;
 217                }
 218        }
 219        /* RQ must be at least as large as the SQ */
 220        opts->rq_depth = max(opts->rq_depth, opts->sq_depth);
 221        kfree(tmp_options);
 222        return 0;
 223}
 224
 225static int
 226p9_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 227{
 228        struct p9_client *c = id->context;
 229        struct p9_trans_rdma *rdma = c->trans;
 230        switch (event->event) {
 231        case RDMA_CM_EVENT_ADDR_RESOLVED:
 232                BUG_ON(rdma->state != P9_RDMA_INIT);
 233                rdma->state = P9_RDMA_ADDR_RESOLVED;
 234                break;
 235
 236        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 237                BUG_ON(rdma->state != P9_RDMA_ADDR_RESOLVED);
 238                rdma->state = P9_RDMA_ROUTE_RESOLVED;
 239                break;
 240
 241        case RDMA_CM_EVENT_ESTABLISHED:
 242                BUG_ON(rdma->state != P9_RDMA_ROUTE_RESOLVED);
 243                rdma->state = P9_RDMA_CONNECTED;
 244                break;
 245
 246        case RDMA_CM_EVENT_DISCONNECTED:
 247                if (rdma)
 248                        rdma->state = P9_RDMA_CLOSED;
 249                if (c)
 250                        c->status = Disconnected;
 251                break;
 252
 253        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
 254                break;
 255
 256        case RDMA_CM_EVENT_ADDR_CHANGE:
 257        case RDMA_CM_EVENT_ROUTE_ERROR:
 258        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 259        case RDMA_CM_EVENT_MULTICAST_JOIN:
 260        case RDMA_CM_EVENT_MULTICAST_ERROR:
 261        case RDMA_CM_EVENT_REJECTED:
 262        case RDMA_CM_EVENT_CONNECT_REQUEST:
 263        case RDMA_CM_EVENT_CONNECT_RESPONSE:
 264        case RDMA_CM_EVENT_CONNECT_ERROR:
 265        case RDMA_CM_EVENT_ADDR_ERROR:
 266        case RDMA_CM_EVENT_UNREACHABLE:
 267                c->status = Disconnected;
 268                rdma_disconnect(rdma->cm_id);
 269                break;
 270        default:
 271                BUG();
 272        }
 273        complete(&rdma->cm_done);
 274        return 0;
 275}
 276
 277static void
 278handle_recv(struct p9_client *client, struct p9_trans_rdma *rdma,
 279            struct p9_rdma_context *c, enum ib_wc_status status, u32 byte_len)
 280{
 281        struct p9_req_t *req;
 282        int err = 0;
 283        int16_t tag;
 284
 285        req = NULL;
 286        ib_dma_unmap_single(rdma->cm_id->device, c->busa, client->msize,
 287                                                         DMA_FROM_DEVICE);
 288
 289        if (status != IB_WC_SUCCESS)
 290                goto err_out;
 291
 292        err = p9_parse_header(c->rc, NULL, NULL, &tag, 1);
 293        if (err)
 294                goto err_out;
 295
 296        req = p9_tag_lookup(client, tag);
 297        if (!req)
 298                goto err_out;
 299
 300        /* Check that we have not yet received a reply for this request.
 301         */
 302        if (unlikely(req->rc)) {
 303                pr_err("Duplicate reply for request %d", tag);
 304                goto err_out;
 305        }
 306
 307        req->rc = c->rc;
 308        req->status = REQ_STATUS_RCVD;
 309        p9_client_cb(client, req);
 310
 311        return;
 312
 313 err_out:
 314        p9_debug(P9_DEBUG_ERROR, "req %p err %d status %d\n", req, err, status);
 315        rdma->state = P9_RDMA_FLUSHING;
 316        client->status = Disconnected;
 317}
 318
 319static void
 320handle_send(struct p9_client *client, struct p9_trans_rdma *rdma,
 321            struct p9_rdma_context *c, enum ib_wc_status status, u32 byte_len)
 322{
 323        ib_dma_unmap_single(rdma->cm_id->device,
 324                            c->busa, c->req->tc->size,
 325                            DMA_TO_DEVICE);
 326}
 327
 328static void qp_event_handler(struct ib_event *event, void *context)
 329{
 330        p9_debug(P9_DEBUG_ERROR, "QP event %d context %p\n",
 331                 event->event, context);
 332}
 333
 334static void cq_comp_handler(struct ib_cq *cq, void *cq_context)
 335{
 336        struct p9_client *client = cq_context;
 337        struct p9_trans_rdma *rdma = client->trans;
 338        int ret;
 339        struct ib_wc wc;
 340
 341        ib_req_notify_cq(rdma->cq, IB_CQ_NEXT_COMP);
 342        while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
 343                struct p9_rdma_context *c = (void *) (unsigned long) wc.wr_id;
 344
 345                switch (c->wc_op) {
 346                case IB_WC_RECV:
 347                        handle_recv(client, rdma, c, wc.status, wc.byte_len);
 348                        up(&rdma->rq_sem);
 349                        break;
 350
 351                case IB_WC_SEND:
 352                        handle_send(client, rdma, c, wc.status, wc.byte_len);
 353                        up(&rdma->sq_sem);
 354                        break;
 355
 356                default:
 357                        pr_err("unexpected completion type, c->wc_op=%d, wc.opcode=%d, status=%d\n",
 358                               c->wc_op, wc.opcode, wc.status);
 359                        break;
 360                }
 361                kfree(c);
 362        }
 363}
 364
 365static void cq_event_handler(struct ib_event *e, void *v)
 366{
 367        p9_debug(P9_DEBUG_ERROR, "CQ event %d context %p\n", e->event, v);
 368}
 369
 370static void rdma_destroy_trans(struct p9_trans_rdma *rdma)
 371{
 372        if (!rdma)
 373                return;
 374
 375        if (rdma->dma_mr && !IS_ERR(rdma->dma_mr))
 376                ib_dereg_mr(rdma->dma_mr);
 377
 378        if (rdma->qp && !IS_ERR(rdma->qp))
 379                ib_destroy_qp(rdma->qp);
 380
 381        if (rdma->pd && !IS_ERR(rdma->pd))
 382                ib_dealloc_pd(rdma->pd);
 383
 384        if (rdma->cq && !IS_ERR(rdma->cq))
 385                ib_destroy_cq(rdma->cq);
 386
 387        if (rdma->cm_id && !IS_ERR(rdma->cm_id))
 388                rdma_destroy_id(rdma->cm_id);
 389
 390        kfree(rdma);
 391}
 392
 393static int
 394post_recv(struct p9_client *client, struct p9_rdma_context *c)
 395{
 396        struct p9_trans_rdma *rdma = client->trans;
 397        struct ib_recv_wr wr, *bad_wr;
 398        struct ib_sge sge;
 399
 400        c->busa = ib_dma_map_single(rdma->cm_id->device,
 401                                    c->rc->sdata, client->msize,
 402                                    DMA_FROM_DEVICE);
 403        if (ib_dma_mapping_error(rdma->cm_id->device, c->busa))
 404                goto error;
 405
 406        sge.addr = c->busa;
 407        sge.length = client->msize;
 408        sge.lkey = rdma->lkey;
 409
 410        wr.next = NULL;
 411        c->wc_op = IB_WC_RECV;
 412        wr.wr_id = (unsigned long) c;
 413        wr.sg_list = &sge;
 414        wr.num_sge = 1;
 415        return ib_post_recv(rdma->qp, &wr, &bad_wr);
 416
 417 error:
 418        p9_debug(P9_DEBUG_ERROR, "EIO\n");
 419        return -EIO;
 420}
 421
 422static int rdma_request(struct p9_client *client, struct p9_req_t *req)
 423{
 424        struct p9_trans_rdma *rdma = client->trans;
 425        struct ib_send_wr wr, *bad_wr;
 426        struct ib_sge sge;
 427        int err = 0;
 428        unsigned long flags;
 429        struct p9_rdma_context *c = NULL;
 430        struct p9_rdma_context *rpl_context = NULL;
 431
 432        /* When an error occurs between posting the recv and the send,
 433         * there will be a receive context posted without a pending request.
 434         * Since there is no way to "un-post" it, we remember it and skip
 435         * post_recv() for the next request.
 436         * So here,
 437         * see if we are this `next request' and need to absorb an excess rc.
 438         * If yes, then drop and free our own, and do not recv_post().
 439         **/
 440        if (unlikely(atomic_read(&rdma->excess_rc) > 0)) {
 441                if ((atomic_sub_return(1, &rdma->excess_rc) >= 0)) {
 442                        /* Got one ! */
 443                        kfree(req->rc);
 444                        req->rc = NULL;
 445                        goto dont_need_post_recv;
 446                } else {
 447                        /* We raced and lost. */
 448                        atomic_inc(&rdma->excess_rc);
 449                }
 450        }
 451
 452        /* Allocate an fcall for the reply */
 453        rpl_context = kmalloc(sizeof *rpl_context, GFP_NOFS);
 454        if (!rpl_context) {
 455                err = -ENOMEM;
 456                goto recv_error;
 457        }
 458        rpl_context->rc = req->rc;
 459
 460        /*
 461         * Post a receive buffer for this request. We need to ensure
 462         * there is a reply buffer available for every outstanding
 463         * request. A flushed request can result in no reply for an
 464         * outstanding request, so we must keep a count to avoid
 465         * overflowing the RQ.
 466         */
 467        if (down_interruptible(&rdma->rq_sem)) {
 468                err = -EINTR;
 469                goto recv_error;
 470        }
 471
 472        err = post_recv(client, rpl_context);
 473        if (err) {
 474                p9_debug(P9_DEBUG_FCALL, "POST RECV failed\n");
 475                goto recv_error;
 476        }
 477        /* remove posted receive buffer from request structure */
 478        req->rc = NULL;
 479
 480dont_need_post_recv:
 481        /* Post the request */
 482        c = kmalloc(sizeof *c, GFP_NOFS);
 483        if (!c) {
 484                err = -ENOMEM;
 485                goto send_error;
 486        }
 487        c->req = req;
 488
 489        c->busa = ib_dma_map_single(rdma->cm_id->device,
 490                                    c->req->tc->sdata, c->req->tc->size,
 491                                    DMA_TO_DEVICE);
 492        if (ib_dma_mapping_error(rdma->cm_id->device, c->busa)) {
 493                err = -EIO;
 494                goto send_error;
 495        }
 496
 497        sge.addr = c->busa;
 498        sge.length = c->req->tc->size;
 499        sge.lkey = rdma->lkey;
 500
 501        wr.next = NULL;
 502        c->wc_op = IB_WC_SEND;
 503        wr.wr_id = (unsigned long) c;
 504        wr.opcode = IB_WR_SEND;
 505        wr.send_flags = IB_SEND_SIGNALED;
 506        wr.sg_list = &sge;
 507        wr.num_sge = 1;
 508
 509        if (down_interruptible(&rdma->sq_sem)) {
 510                err = -EINTR;
 511                goto send_error;
 512        }
 513
 514        err = ib_post_send(rdma->qp, &wr, &bad_wr);
 515        if (err)
 516                goto send_error;
 517
 518        /* Success */
 519        return 0;
 520
 521 /* Handle errors that happened during or while preparing the send: */
 522 send_error:
 523        kfree(c);
 524        p9_debug(P9_DEBUG_ERROR, "Error %d in rdma_request()\n", err);
 525
 526        /* Ach.
 527         *  We did recv_post(), but not send. We have one recv_post in excess.
 528         */
 529        atomic_inc(&rdma->excess_rc);
 530        return err;
 531
 532 /* Handle errors that happened during or while preparing post_recv(): */
 533 recv_error:
 534        kfree(rpl_context);
 535        spin_lock_irqsave(&rdma->req_lock, flags);
 536        if (rdma->state < P9_RDMA_CLOSING) {
 537                rdma->state = P9_RDMA_CLOSING;
 538                spin_unlock_irqrestore(&rdma->req_lock, flags);
 539                rdma_disconnect(rdma->cm_id);
 540        } else
 541                spin_unlock_irqrestore(&rdma->req_lock, flags);
 542        return err;
 543}
 544
 545static void rdma_close(struct p9_client *client)
 546{
 547        struct p9_trans_rdma *rdma;
 548
 549        if (!client)
 550                return;
 551
 552        rdma = client->trans;
 553        if (!rdma)
 554                return;
 555
 556        client->status = Disconnected;
 557        rdma_disconnect(rdma->cm_id);
 558        rdma_destroy_trans(rdma);
 559}
 560
 561/**
 562 * alloc_rdma - Allocate and initialize the rdma transport structure
 563 * @opts: Mount options structure
 564 */
 565static struct p9_trans_rdma *alloc_rdma(struct p9_rdma_opts *opts)
 566{
 567        struct p9_trans_rdma *rdma;
 568
 569        rdma = kzalloc(sizeof(struct p9_trans_rdma), GFP_KERNEL);
 570        if (!rdma)
 571                return NULL;
 572
 573        rdma->sq_depth = opts->sq_depth;
 574        rdma->rq_depth = opts->rq_depth;
 575        rdma->timeout = opts->timeout;
 576        spin_lock_init(&rdma->req_lock);
 577        init_completion(&rdma->cm_done);
 578        sema_init(&rdma->sq_sem, rdma->sq_depth);
 579        sema_init(&rdma->rq_sem, rdma->rq_depth);
 580        atomic_set(&rdma->excess_rc, 0);
 581
 582        return rdma;
 583}
 584
 585/* its not clear to me we can do anything after send has been posted */
 586static int rdma_cancel(struct p9_client *client, struct p9_req_t *req)
 587{
 588        return 1;
 589}
 590
 591/* A request has been fully flushed without a reply.
 592 * That means we have posted one buffer in excess.
 593 */
 594static int rdma_cancelled(struct p9_client *client, struct p9_req_t *req)
 595{
 596        struct p9_trans_rdma *rdma = client->trans;
 597
 598        atomic_inc(&rdma->excess_rc);
 599        return 0;
 600}
 601
 602/**
 603 * trans_create_rdma - Transport method for creating atransport instance
 604 * @client: client instance
 605 * @addr: IP address string
 606 * @args: Mount options string
 607 */
 608static int
 609rdma_create_trans(struct p9_client *client, const char *addr, char *args)
 610{
 611        int err;
 612        struct p9_rdma_opts opts;
 613        struct p9_trans_rdma *rdma;
 614        struct rdma_conn_param conn_param;
 615        struct ib_qp_init_attr qp_attr;
 616        struct ib_device_attr devattr;
 617
 618        /* Parse the transport specific mount options */
 619        err = parse_opts(args, &opts);
 620        if (err < 0)
 621                return err;
 622
 623        /* Create and initialize the RDMA transport structure */
 624        rdma = alloc_rdma(&opts);
 625        if (!rdma)
 626                return -ENOMEM;
 627
 628        /* Create the RDMA CM ID */
 629        rdma->cm_id = rdma_create_id(p9_cm_event_handler, client, RDMA_PS_TCP,
 630                                     IB_QPT_RC);
 631        if (IS_ERR(rdma->cm_id))
 632                goto error;
 633
 634        /* Associate the client with the transport */
 635        client->trans = rdma;
 636
 637        /* Resolve the server's address */
 638        rdma->addr.sin_family = AF_INET;
 639        rdma->addr.sin_addr.s_addr = in_aton(addr);
 640        rdma->addr.sin_port = htons(opts.port);
 641        err = rdma_resolve_addr(rdma->cm_id, NULL,
 642                                (struct sockaddr *)&rdma->addr,
 643                                rdma->timeout);
 644        if (err)
 645                goto error;
 646        err = wait_for_completion_interruptible(&rdma->cm_done);
 647        if (err || (rdma->state != P9_RDMA_ADDR_RESOLVED))
 648                goto error;
 649
 650        /* Resolve the route to the server */
 651        err = rdma_resolve_route(rdma->cm_id, rdma->timeout);
 652        if (err)
 653                goto error;
 654        err = wait_for_completion_interruptible(&rdma->cm_done);
 655        if (err || (rdma->state != P9_RDMA_ROUTE_RESOLVED))
 656                goto error;
 657
 658        /* Query the device attributes */
 659        err = ib_query_device(rdma->cm_id->device, &devattr);
 660        if (err)
 661                goto error;
 662
 663        /* Create the Completion Queue */
 664        rdma->cq = ib_create_cq(rdma->cm_id->device, cq_comp_handler,
 665                                cq_event_handler, client,
 666                                opts.sq_depth + opts.rq_depth + 1, 0);
 667        if (IS_ERR(rdma->cq))
 668                goto error;
 669        ib_req_notify_cq(rdma->cq, IB_CQ_NEXT_COMP);
 670
 671        /* Create the Protection Domain */
 672        rdma->pd = ib_alloc_pd(rdma->cm_id->device);
 673        if (IS_ERR(rdma->pd))
 674                goto error;
 675
 676        /* Cache the DMA lkey in the transport */
 677        rdma->dma_mr = NULL;
 678        if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)
 679                rdma->lkey = rdma->cm_id->device->local_dma_lkey;
 680        else {
 681                rdma->dma_mr = ib_get_dma_mr(rdma->pd, IB_ACCESS_LOCAL_WRITE);
 682                if (IS_ERR(rdma->dma_mr))
 683                        goto error;
 684                rdma->lkey = rdma->dma_mr->lkey;
 685        }
 686
 687        /* Create the Queue Pair */
 688        memset(&qp_attr, 0, sizeof qp_attr);
 689        qp_attr.event_handler = qp_event_handler;
 690        qp_attr.qp_context = client;
 691        qp_attr.cap.max_send_wr = opts.sq_depth;
 692        qp_attr.cap.max_recv_wr = opts.rq_depth;
 693        qp_attr.cap.max_send_sge = P9_RDMA_SEND_SGE;
 694        qp_attr.cap.max_recv_sge = P9_RDMA_RECV_SGE;
 695        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 696        qp_attr.qp_type = IB_QPT_RC;
 697        qp_attr.send_cq = rdma->cq;
 698        qp_attr.recv_cq = rdma->cq;
 699        err = rdma_create_qp(rdma->cm_id, rdma->pd, &qp_attr);
 700        if (err)
 701                goto error;
 702        rdma->qp = rdma->cm_id->qp;
 703
 704        /* Request a connection */
 705        memset(&conn_param, 0, sizeof(conn_param));
 706        conn_param.private_data = NULL;
 707        conn_param.private_data_len = 0;
 708        conn_param.responder_resources = P9_RDMA_IRD;
 709        conn_param.initiator_depth = P9_RDMA_ORD;
 710        err = rdma_connect(rdma->cm_id, &conn_param);
 711        if (err)
 712                goto error;
 713        err = wait_for_completion_interruptible(&rdma->cm_done);
 714        if (err || (rdma->state != P9_RDMA_CONNECTED))
 715                goto error;
 716
 717        client->status = Connected;
 718
 719        return 0;
 720
 721error:
 722        rdma_destroy_trans(rdma);
 723        return -ENOTCONN;
 724}
 725
 726static struct p9_trans_module p9_rdma_trans = {
 727        .name = "rdma",
 728        .maxsize = P9_RDMA_MAXSIZE,
 729        .def = 0,
 730        .owner = THIS_MODULE,
 731        .create = rdma_create_trans,
 732        .close = rdma_close,
 733        .request = rdma_request,
 734        .cancel = rdma_cancel,
 735};
 736
 737/**
 738 * p9_trans_rdma_init - Register the 9P RDMA transport driver
 739 */
 740static int __init p9_trans_rdma_init(void)
 741{
 742        v9fs_register_trans(&p9_rdma_trans);
 743        return 0;
 744}
 745
 746static void __exit p9_trans_rdma_exit(void)
 747{
 748        v9fs_unregister_trans(&p9_rdma_trans);
 749}
 750
 751module_init(p9_trans_rdma_init);
 752module_exit(p9_trans_rdma_exit);
 753
 754MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
 755MODULE_DESCRIPTION("RDMA Transport for 9P");
 756MODULE_LICENSE("Dual BSD/GPL");
 757