linux/net/9p/trans_rdma.c
<<
>>
Prefs
   1/*
   2 * linux/fs/9p/trans_rdma.c
   3 *
   4 * RDMA transport layer based on the trans_fd.c implementation.
   5 *
   6 *  Copyright (C) 2008 by Tom Tucker <tom@opengridcomputing.com>
   7 *  Copyright (C) 2006 by Russ Cox <rsc@swtch.com>
   8 *  Copyright (C) 2004-2005 by Latchesar Ionkov <lucho@ionkov.net>
   9 *  Copyright (C) 2004-2008 by Eric Van Hensbergen <ericvh@gmail.com>
  10 *  Copyright (C) 1997-2002 by Ron Minnich <rminnich@sarnoff.com>
  11 *
  12 *  This program is free software; you can redistribute it and/or modify
  13 *  it under the terms of the GNU General Public License version 2
  14 *  as published by the Free Software Foundation.
  15 *
  16 *  This program is distributed in the hope that it will be useful,
  17 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  18 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  19 *  GNU General Public License for more details.
  20 *
  21 *  You should have received a copy of the GNU General Public License
  22 *  along with this program; if not, write to:
  23 *  Free Software Foundation
  24 *  51 Franklin Street, Fifth Floor
  25 *  Boston, MA  02111-1301  USA
  26 *
  27 */
  28
  29#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  30
  31#include <linux/in.h>
  32#include <linux/module.h>
  33#include <linux/net.h>
  34#include <linux/ipv6.h>
  35#include <linux/kthread.h>
  36#include <linux/errno.h>
  37#include <linux/kernel.h>
  38#include <linux/un.h>
  39#include <linux/uaccess.h>
  40#include <linux/inet.h>
  41#include <linux/idr.h>
  42#include <linux/file.h>
  43#include <linux/parser.h>
  44#include <linux/semaphore.h>
  45#include <linux/slab.h>
  46#include <net/9p/9p.h>
  47#include <net/9p/client.h>
  48#include <net/9p/transport.h>
  49#include <rdma/ib_verbs.h>
  50#include <rdma/rdma_cm.h>
  51
  52#define P9_PORT                 5640
  53#define P9_RDMA_SQ_DEPTH        32
  54#define P9_RDMA_RQ_DEPTH        32
  55#define P9_RDMA_SEND_SGE        4
  56#define P9_RDMA_RECV_SGE        4
  57#define P9_RDMA_IRD             0
  58#define P9_RDMA_ORD             0
  59#define P9_RDMA_TIMEOUT         30000           /* 30 seconds */
  60#define P9_RDMA_MAXSIZE         (1024*1024)     /* 1MB */
  61
  62/**
  63 * struct p9_trans_rdma - RDMA transport instance
  64 *
  65 * @state: tracks the transport state machine for connection setup and tear down
  66 * @cm_id: The RDMA CM ID
  67 * @pd: Protection Domain pointer
  68 * @qp: Queue Pair pointer
  69 * @cq: Completion Queue pointer
  70 * @dm_mr: DMA Memory Region pointer
  71 * @lkey: The local access only memory region key
  72 * @timeout: Number of uSecs to wait for connection management events
  73 * @sq_depth: The depth of the Send Queue
  74 * @sq_sem: Semaphore for the SQ
  75 * @rq_depth: The depth of the Receive Queue.
  76 * @rq_sem: Semaphore for the RQ
  77 * @excess_rc : Amount of posted Receive Contexts without a pending request.
  78 *              See rdma_request()
  79 * @addr: The remote peer's address
  80 * @req_lock: Protects the active request list
  81 * @cm_done: Completion event for connection management tracking
  82 */
  83struct p9_trans_rdma {
  84        enum {
  85                P9_RDMA_INIT,
  86                P9_RDMA_ADDR_RESOLVED,
  87                P9_RDMA_ROUTE_RESOLVED,
  88                P9_RDMA_CONNECTED,
  89                P9_RDMA_FLUSHING,
  90                P9_RDMA_CLOSING,
  91                P9_RDMA_CLOSED,
  92        } state;
  93        struct rdma_cm_id *cm_id;
  94        struct ib_pd *pd;
  95        struct ib_qp *qp;
  96        struct ib_cq *cq;
  97        long timeout;
  98        int sq_depth;
  99        struct semaphore sq_sem;
 100        int rq_depth;
 101        struct semaphore rq_sem;
 102        atomic_t excess_rc;
 103        struct sockaddr_in addr;
 104        spinlock_t req_lock;
 105
 106        struct completion cm_done;
 107};
 108
 109/**
 110 * p9_rdma_context - Keeps track of in-process WR
 111 *
 112 * @busa: Bus address to unmap when the WR completes
 113 * @req: Keeps track of requests (send)
 114 * @rc: Keepts track of replies (receive)
 115 */
 116struct p9_rdma_req;
 117struct p9_rdma_context {
 118        struct ib_cqe cqe;
 119        dma_addr_t busa;
 120        union {
 121                struct p9_req_t *req;
 122                struct p9_fcall *rc;
 123        };
 124};
 125
 126/**
 127 * p9_rdma_opts - Collection of mount options
 128 * @port: port of connection
 129 * @sq_depth: The requested depth of the SQ. This really doesn't need
 130 * to be any deeper than the number of threads used in the client
 131 * @rq_depth: The depth of the RQ. Should be greater than or equal to SQ depth
 132 * @timeout: Time to wait in msecs for CM events
 133 */
 134struct p9_rdma_opts {
 135        short port;
 136        int sq_depth;
 137        int rq_depth;
 138        long timeout;
 139        int privport;
 140};
 141
 142/*
 143 * Option Parsing (code inspired by NFS code)
 144 */
 145enum {
 146        /* Options that take integer arguments */
 147        Opt_port, Opt_rq_depth, Opt_sq_depth, Opt_timeout,
 148        /* Options that take no argument */
 149        Opt_privport,
 150        Opt_err,
 151};
 152
 153static match_table_t tokens = {
 154        {Opt_port, "port=%u"},
 155        {Opt_sq_depth, "sq=%u"},
 156        {Opt_rq_depth, "rq=%u"},
 157        {Opt_timeout, "timeout=%u"},
 158        {Opt_privport, "privport"},
 159        {Opt_err, NULL},
 160};
 161
 162/**
 163 * parse_opts - parse mount options into rdma options structure
 164 * @params: options string passed from mount
 165 * @opts: rdma transport-specific structure to parse options into
 166 *
 167 * Returns 0 upon success, -ERRNO upon failure
 168 */
 169static int parse_opts(char *params, struct p9_rdma_opts *opts)
 170{
 171        char *p;
 172        substring_t args[MAX_OPT_ARGS];
 173        int option;
 174        char *options, *tmp_options;
 175
 176        opts->port = P9_PORT;
 177        opts->sq_depth = P9_RDMA_SQ_DEPTH;
 178        opts->rq_depth = P9_RDMA_RQ_DEPTH;
 179        opts->timeout = P9_RDMA_TIMEOUT;
 180        opts->privport = 0;
 181
 182        if (!params)
 183                return 0;
 184
 185        tmp_options = kstrdup(params, GFP_KERNEL);
 186        if (!tmp_options) {
 187                p9_debug(P9_DEBUG_ERROR,
 188                         "failed to allocate copy of option string\n");
 189                return -ENOMEM;
 190        }
 191        options = tmp_options;
 192
 193        while ((p = strsep(&options, ",")) != NULL) {
 194                int token;
 195                int r;
 196                if (!*p)
 197                        continue;
 198                token = match_token(p, tokens, args);
 199                if ((token != Opt_err) && (token != Opt_privport)) {
 200                        r = match_int(&args[0], &option);
 201                        if (r < 0) {
 202                                p9_debug(P9_DEBUG_ERROR,
 203                                         "integer field, but no integer?\n");
 204                                continue;
 205                        }
 206                }
 207                switch (token) {
 208                case Opt_port:
 209                        opts->port = option;
 210                        break;
 211                case Opt_sq_depth:
 212                        opts->sq_depth = option;
 213                        break;
 214                case Opt_rq_depth:
 215                        opts->rq_depth = option;
 216                        break;
 217                case Opt_timeout:
 218                        opts->timeout = option;
 219                        break;
 220                case Opt_privport:
 221                        opts->privport = 1;
 222                        break;
 223                default:
 224                        continue;
 225                }
 226        }
 227        /* RQ must be at least as large as the SQ */
 228        opts->rq_depth = max(opts->rq_depth, opts->sq_depth);
 229        kfree(tmp_options);
 230        return 0;
 231}
 232
 233static int
 234p9_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 235{
 236        struct p9_client *c = id->context;
 237        struct p9_trans_rdma *rdma = c->trans;
 238        switch (event->event) {
 239        case RDMA_CM_EVENT_ADDR_RESOLVED:
 240                BUG_ON(rdma->state != P9_RDMA_INIT);
 241                rdma->state = P9_RDMA_ADDR_RESOLVED;
 242                break;
 243
 244        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 245                BUG_ON(rdma->state != P9_RDMA_ADDR_RESOLVED);
 246                rdma->state = P9_RDMA_ROUTE_RESOLVED;
 247                break;
 248
 249        case RDMA_CM_EVENT_ESTABLISHED:
 250                BUG_ON(rdma->state != P9_RDMA_ROUTE_RESOLVED);
 251                rdma->state = P9_RDMA_CONNECTED;
 252                break;
 253
 254        case RDMA_CM_EVENT_DISCONNECTED:
 255                if (rdma)
 256                        rdma->state = P9_RDMA_CLOSED;
 257                if (c)
 258                        c->status = Disconnected;
 259                break;
 260
 261        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
 262                break;
 263
 264        case RDMA_CM_EVENT_ADDR_CHANGE:
 265        case RDMA_CM_EVENT_ROUTE_ERROR:
 266        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 267        case RDMA_CM_EVENT_MULTICAST_JOIN:
 268        case RDMA_CM_EVENT_MULTICAST_ERROR:
 269        case RDMA_CM_EVENT_REJECTED:
 270        case RDMA_CM_EVENT_CONNECT_REQUEST:
 271        case RDMA_CM_EVENT_CONNECT_RESPONSE:
 272        case RDMA_CM_EVENT_CONNECT_ERROR:
 273        case RDMA_CM_EVENT_ADDR_ERROR:
 274        case RDMA_CM_EVENT_UNREACHABLE:
 275                c->status = Disconnected;
 276                rdma_disconnect(rdma->cm_id);
 277                break;
 278        default:
 279                BUG();
 280        }
 281        complete(&rdma->cm_done);
 282        return 0;
 283}
 284
 285static void
 286recv_done(struct ib_cq *cq, struct ib_wc *wc)
 287{
 288        struct p9_client *client = cq->cq_context;
 289        struct p9_trans_rdma *rdma = client->trans;
 290        struct p9_rdma_context *c =
 291                container_of(wc->wr_cqe, struct p9_rdma_context, cqe);
 292        struct p9_req_t *req;
 293        int err = 0;
 294        int16_t tag;
 295
 296        req = NULL;
 297        ib_dma_unmap_single(rdma->cm_id->device, c->busa, client->msize,
 298                                                         DMA_FROM_DEVICE);
 299
 300        if (wc->status != IB_WC_SUCCESS)
 301                goto err_out;
 302
 303        err = p9_parse_header(c->rc, NULL, NULL, &tag, 1);
 304        if (err)
 305                goto err_out;
 306
 307        req = p9_tag_lookup(client, tag);
 308        if (!req)
 309                goto err_out;
 310
 311        /* Check that we have not yet received a reply for this request.
 312         */
 313        if (unlikely(req->rc)) {
 314                pr_err("Duplicate reply for request %d", tag);
 315                goto err_out;
 316        }
 317
 318        req->rc = c->rc;
 319        p9_client_cb(client, req, REQ_STATUS_RCVD);
 320
 321 out:
 322        up(&rdma->rq_sem);
 323        kfree(c);
 324        return;
 325
 326 err_out:
 327        p9_debug(P9_DEBUG_ERROR, "req %p err %d status %d\n",
 328                        req, err, wc->status);
 329        rdma->state = P9_RDMA_FLUSHING;
 330        client->status = Disconnected;
 331        goto out;
 332}
 333
 334static void
 335send_done(struct ib_cq *cq, struct ib_wc *wc)
 336{
 337        struct p9_client *client = cq->cq_context;
 338        struct p9_trans_rdma *rdma = client->trans;
 339        struct p9_rdma_context *c =
 340                container_of(wc->wr_cqe, struct p9_rdma_context, cqe);
 341
 342        ib_dma_unmap_single(rdma->cm_id->device,
 343                            c->busa, c->req->tc->size,
 344                            DMA_TO_DEVICE);
 345        up(&rdma->sq_sem);
 346        kfree(c);
 347}
 348
 349static void qp_event_handler(struct ib_event *event, void *context)
 350{
 351        p9_debug(P9_DEBUG_ERROR, "QP event %d context %p\n",
 352                 event->event, context);
 353}
 354
 355static void rdma_destroy_trans(struct p9_trans_rdma *rdma)
 356{
 357        if (!rdma)
 358                return;
 359
 360        if (rdma->qp && !IS_ERR(rdma->qp))
 361                ib_destroy_qp(rdma->qp);
 362
 363        if (rdma->pd && !IS_ERR(rdma->pd))
 364                ib_dealloc_pd(rdma->pd);
 365
 366        if (rdma->cq && !IS_ERR(rdma->cq))
 367                ib_free_cq(rdma->cq);
 368
 369        if (rdma->cm_id && !IS_ERR(rdma->cm_id))
 370                rdma_destroy_id(rdma->cm_id);
 371
 372        kfree(rdma);
 373}
 374
 375static int
 376post_recv(struct p9_client *client, struct p9_rdma_context *c)
 377{
 378        struct p9_trans_rdma *rdma = client->trans;
 379        struct ib_recv_wr wr, *bad_wr;
 380        struct ib_sge sge;
 381
 382        c->busa = ib_dma_map_single(rdma->cm_id->device,
 383                                    c->rc->sdata, client->msize,
 384                                    DMA_FROM_DEVICE);
 385        if (ib_dma_mapping_error(rdma->cm_id->device, c->busa))
 386                goto error;
 387
 388        c->cqe.done = recv_done;
 389
 390        sge.addr = c->busa;
 391        sge.length = client->msize;
 392        sge.lkey = rdma->pd->local_dma_lkey;
 393
 394        wr.next = NULL;
 395        wr.wr_cqe = &c->cqe;
 396        wr.sg_list = &sge;
 397        wr.num_sge = 1;
 398        return ib_post_recv(rdma->qp, &wr, &bad_wr);
 399
 400 error:
 401        p9_debug(P9_DEBUG_ERROR, "EIO\n");
 402        return -EIO;
 403}
 404
 405static int rdma_request(struct p9_client *client, struct p9_req_t *req)
 406{
 407        struct p9_trans_rdma *rdma = client->trans;
 408        struct ib_send_wr wr, *bad_wr;
 409        struct ib_sge sge;
 410        int err = 0;
 411        unsigned long flags;
 412        struct p9_rdma_context *c = NULL;
 413        struct p9_rdma_context *rpl_context = NULL;
 414
 415        /* When an error occurs between posting the recv and the send,
 416         * there will be a receive context posted without a pending request.
 417         * Since there is no way to "un-post" it, we remember it and skip
 418         * post_recv() for the next request.
 419         * So here,
 420         * see if we are this `next request' and need to absorb an excess rc.
 421         * If yes, then drop and free our own, and do not recv_post().
 422         **/
 423        if (unlikely(atomic_read(&rdma->excess_rc) > 0)) {
 424                if ((atomic_sub_return(1, &rdma->excess_rc) >= 0)) {
 425                        /* Got one ! */
 426                        kfree(req->rc);
 427                        req->rc = NULL;
 428                        goto dont_need_post_recv;
 429                } else {
 430                        /* We raced and lost. */
 431                        atomic_inc(&rdma->excess_rc);
 432                }
 433        }
 434
 435        /* Allocate an fcall for the reply */
 436        rpl_context = kmalloc(sizeof *rpl_context, GFP_NOFS);
 437        if (!rpl_context) {
 438                err = -ENOMEM;
 439                goto recv_error;
 440        }
 441        rpl_context->rc = req->rc;
 442
 443        /*
 444         * Post a receive buffer for this request. We need to ensure
 445         * there is a reply buffer available for every outstanding
 446         * request. A flushed request can result in no reply for an
 447         * outstanding request, so we must keep a count to avoid
 448         * overflowing the RQ.
 449         */
 450        if (down_interruptible(&rdma->rq_sem)) {
 451                err = -EINTR;
 452                goto recv_error;
 453        }
 454
 455        err = post_recv(client, rpl_context);
 456        if (err) {
 457                p9_debug(P9_DEBUG_FCALL, "POST RECV failed\n");
 458                goto recv_error;
 459        }
 460        /* remove posted receive buffer from request structure */
 461        req->rc = NULL;
 462
 463dont_need_post_recv:
 464        /* Post the request */
 465        c = kmalloc(sizeof *c, GFP_NOFS);
 466        if (!c) {
 467                err = -ENOMEM;
 468                goto send_error;
 469        }
 470        c->req = req;
 471
 472        c->busa = ib_dma_map_single(rdma->cm_id->device,
 473                                    c->req->tc->sdata, c->req->tc->size,
 474                                    DMA_TO_DEVICE);
 475        if (ib_dma_mapping_error(rdma->cm_id->device, c->busa)) {
 476                err = -EIO;
 477                goto send_error;
 478        }
 479
 480        c->cqe.done = send_done;
 481
 482        sge.addr = c->busa;
 483        sge.length = c->req->tc->size;
 484        sge.lkey = rdma->pd->local_dma_lkey;
 485
 486        wr.next = NULL;
 487        wr.wr_cqe = &c->cqe;
 488        wr.opcode = IB_WR_SEND;
 489        wr.send_flags = IB_SEND_SIGNALED;
 490        wr.sg_list = &sge;
 491        wr.num_sge = 1;
 492
 493        if (down_interruptible(&rdma->sq_sem)) {
 494                err = -EINTR;
 495                goto send_error;
 496        }
 497
 498        /* Mark request as `sent' *before* we actually send it,
 499         * because doing if after could erase the REQ_STATUS_RCVD
 500         * status in case of a very fast reply.
 501         */
 502        req->status = REQ_STATUS_SENT;
 503        err = ib_post_send(rdma->qp, &wr, &bad_wr);
 504        if (err)
 505                goto send_error;
 506
 507        /* Success */
 508        return 0;
 509
 510 /* Handle errors that happened during or while preparing the send: */
 511 send_error:
 512        req->status = REQ_STATUS_ERROR;
 513        kfree(c);
 514        p9_debug(P9_DEBUG_ERROR, "Error %d in rdma_request()\n", err);
 515
 516        /* Ach.
 517         *  We did recv_post(), but not send. We have one recv_post in excess.
 518         */
 519        atomic_inc(&rdma->excess_rc);
 520        return err;
 521
 522 /* Handle errors that happened during or while preparing post_recv(): */
 523 recv_error:
 524        kfree(rpl_context);
 525        spin_lock_irqsave(&rdma->req_lock, flags);
 526        if (rdma->state < P9_RDMA_CLOSING) {
 527                rdma->state = P9_RDMA_CLOSING;
 528                spin_unlock_irqrestore(&rdma->req_lock, flags);
 529                rdma_disconnect(rdma->cm_id);
 530        } else
 531                spin_unlock_irqrestore(&rdma->req_lock, flags);
 532        return err;
 533}
 534
 535static void rdma_close(struct p9_client *client)
 536{
 537        struct p9_trans_rdma *rdma;
 538
 539        if (!client)
 540                return;
 541
 542        rdma = client->trans;
 543        if (!rdma)
 544                return;
 545
 546        client->status = Disconnected;
 547        rdma_disconnect(rdma->cm_id);
 548        rdma_destroy_trans(rdma);
 549}
 550
 551/**
 552 * alloc_rdma - Allocate and initialize the rdma transport structure
 553 * @opts: Mount options structure
 554 */
 555static struct p9_trans_rdma *alloc_rdma(struct p9_rdma_opts *opts)
 556{
 557        struct p9_trans_rdma *rdma;
 558
 559        rdma = kzalloc(sizeof(struct p9_trans_rdma), GFP_KERNEL);
 560        if (!rdma)
 561                return NULL;
 562
 563        rdma->sq_depth = opts->sq_depth;
 564        rdma->rq_depth = opts->rq_depth;
 565        rdma->timeout = opts->timeout;
 566        spin_lock_init(&rdma->req_lock);
 567        init_completion(&rdma->cm_done);
 568        sema_init(&rdma->sq_sem, rdma->sq_depth);
 569        sema_init(&rdma->rq_sem, rdma->rq_depth);
 570        atomic_set(&rdma->excess_rc, 0);
 571
 572        return rdma;
 573}
 574
 575static int rdma_cancel(struct p9_client *client, struct p9_req_t *req)
 576{
 577        /* Nothing to do here.
 578         * We will take care of it (if we have to) in rdma_cancelled()
 579         */
 580        return 1;
 581}
 582
 583/* A request has been fully flushed without a reply.
 584 * That means we have posted one buffer in excess.
 585 */
 586static int rdma_cancelled(struct p9_client *client, struct p9_req_t *req)
 587{
 588        struct p9_trans_rdma *rdma = client->trans;
 589        atomic_inc(&rdma->excess_rc);
 590        return 0;
 591}
 592
 593static int p9_rdma_bind_privport(struct p9_trans_rdma *rdma)
 594{
 595        struct sockaddr_in cl = {
 596                .sin_family = AF_INET,
 597                .sin_addr.s_addr = htonl(INADDR_ANY),
 598        };
 599        int port, err = -EINVAL;
 600
 601        for (port = P9_DEF_MAX_RESVPORT; port >= P9_DEF_MIN_RESVPORT; port--) {
 602                cl.sin_port = htons((ushort)port);
 603                err = rdma_bind_addr(rdma->cm_id, (struct sockaddr *)&cl);
 604                if (err != -EADDRINUSE)
 605                        break;
 606        }
 607        return err;
 608}
 609
 610/**
 611 * trans_create_rdma - Transport method for creating atransport instance
 612 * @client: client instance
 613 * @addr: IP address string
 614 * @args: Mount options string
 615 */
 616static int
 617rdma_create_trans(struct p9_client *client, const char *addr, char *args)
 618{
 619        int err;
 620        struct p9_rdma_opts opts;
 621        struct p9_trans_rdma *rdma;
 622        struct rdma_conn_param conn_param;
 623        struct ib_qp_init_attr qp_attr;
 624
 625        /* Parse the transport specific mount options */
 626        err = parse_opts(args, &opts);
 627        if (err < 0)
 628                return err;
 629
 630        /* Create and initialize the RDMA transport structure */
 631        rdma = alloc_rdma(&opts);
 632        if (!rdma)
 633                return -ENOMEM;
 634
 635        /* Create the RDMA CM ID */
 636        rdma->cm_id = rdma_create_id(&init_net, p9_cm_event_handler, client,
 637                                     RDMA_PS_TCP, IB_QPT_RC);
 638        if (IS_ERR(rdma->cm_id))
 639                goto error;
 640
 641        /* Associate the client with the transport */
 642        client->trans = rdma;
 643
 644        /* Bind to a privileged port if we need to */
 645        if (opts.privport) {
 646                err = p9_rdma_bind_privport(rdma);
 647                if (err < 0) {
 648                        pr_err("%s (%d): problem binding to privport: %d\n",
 649                               __func__, task_pid_nr(current), -err);
 650                        goto error;
 651                }
 652        }
 653
 654        /* Resolve the server's address */
 655        rdma->addr.sin_family = AF_INET;
 656        rdma->addr.sin_addr.s_addr = in_aton(addr);
 657        rdma->addr.sin_port = htons(opts.port);
 658        err = rdma_resolve_addr(rdma->cm_id, NULL,
 659                                (struct sockaddr *)&rdma->addr,
 660                                rdma->timeout);
 661        if (err)
 662                goto error;
 663        err = wait_for_completion_interruptible(&rdma->cm_done);
 664        if (err || (rdma->state != P9_RDMA_ADDR_RESOLVED))
 665                goto error;
 666
 667        /* Resolve the route to the server */
 668        err = rdma_resolve_route(rdma->cm_id, rdma->timeout);
 669        if (err)
 670                goto error;
 671        err = wait_for_completion_interruptible(&rdma->cm_done);
 672        if (err || (rdma->state != P9_RDMA_ROUTE_RESOLVED))
 673                goto error;
 674
 675        /* Create the Completion Queue */
 676        rdma->cq = ib_alloc_cq(rdma->cm_id->device, client,
 677                        opts.sq_depth + opts.rq_depth + 1,
 678                        0, IB_POLL_SOFTIRQ);
 679        if (IS_ERR(rdma->cq))
 680                goto error;
 681
 682        /* Create the Protection Domain */
 683        rdma->pd = ib_alloc_pd(rdma->cm_id->device);
 684        if (IS_ERR(rdma->pd))
 685                goto error;
 686
 687        /* Create the Queue Pair */
 688        memset(&qp_attr, 0, sizeof qp_attr);
 689        qp_attr.event_handler = qp_event_handler;
 690        qp_attr.qp_context = client;
 691        qp_attr.cap.max_send_wr = opts.sq_depth;
 692        qp_attr.cap.max_recv_wr = opts.rq_depth;
 693        qp_attr.cap.max_send_sge = P9_RDMA_SEND_SGE;
 694        qp_attr.cap.max_recv_sge = P9_RDMA_RECV_SGE;
 695        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 696        qp_attr.qp_type = IB_QPT_RC;
 697        qp_attr.send_cq = rdma->cq;
 698        qp_attr.recv_cq = rdma->cq;
 699        err = rdma_create_qp(rdma->cm_id, rdma->pd, &qp_attr);
 700        if (err)
 701                goto error;
 702        rdma->qp = rdma->cm_id->qp;
 703
 704        /* Request a connection */
 705        memset(&conn_param, 0, sizeof(conn_param));
 706        conn_param.private_data = NULL;
 707        conn_param.private_data_len = 0;
 708        conn_param.responder_resources = P9_RDMA_IRD;
 709        conn_param.initiator_depth = P9_RDMA_ORD;
 710        err = rdma_connect(rdma->cm_id, &conn_param);
 711        if (err)
 712                goto error;
 713        err = wait_for_completion_interruptible(&rdma->cm_done);
 714        if (err || (rdma->state != P9_RDMA_CONNECTED))
 715                goto error;
 716
 717        client->status = Connected;
 718
 719        return 0;
 720
 721error:
 722        rdma_destroy_trans(rdma);
 723        return -ENOTCONN;
 724}
 725
 726static struct p9_trans_module p9_rdma_trans = {
 727        .name = "rdma",
 728        .maxsize = P9_RDMA_MAXSIZE,
 729        .def = 0,
 730        .owner = THIS_MODULE,
 731        .create = rdma_create_trans,
 732        .close = rdma_close,
 733        .request = rdma_request,
 734        .cancel = rdma_cancel,
 735        .cancelled = rdma_cancelled,
 736};
 737
 738/**
 739 * p9_trans_rdma_init - Register the 9P RDMA transport driver
 740 */
 741static int __init p9_trans_rdma_init(void)
 742{
 743        v9fs_register_trans(&p9_rdma_trans);
 744        return 0;
 745}
 746
 747static void __exit p9_trans_rdma_exit(void)
 748{
 749        v9fs_unregister_trans(&p9_rdma_trans);
 750}
 751
 752module_init(p9_trans_rdma_init);
 753module_exit(p9_trans_rdma_exit);
 754
 755MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
 756MODULE_DESCRIPTION("RDMA Transport for 9P");
 757MODULE_LICENSE("Dual BSD/GPL");
 758