linux/net/rds/rdma.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2007 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/pagemap.h>
  34#include <linux/slab.h>
  35#include <linux/rbtree.h>
  36#include <linux/dma-mapping.h> /* for DMA_*_DEVICE */
  37
  38#include "rds.h"
  39
  40/*
  41 * XXX
  42 *  - build with sparse
  43 *  - should we limit the size of a mr region?  let transport return failure?
  44 *  - should we detect duplicate keys on a socket?  hmm.
  45 *  - an rdma is an mlock, apply rlimit?
  46 */
  47
  48/*
  49 * get the number of pages by looking at the page indices that the start and
  50 * end addresses fall in.
  51 *
  52 * Returns 0 if the vec is invalid.  It is invalid if the number of bytes
  53 * causes the address to wrap or overflows an unsigned int.  This comes
  54 * from being stored in the 'length' member of 'struct scatterlist'.
  55 */
  56static unsigned int rds_pages_in_vec(struct rds_iovec *vec)
  57{
  58        if ((vec->addr + vec->bytes <= vec->addr) ||
  59            (vec->bytes > (u64)UINT_MAX))
  60                return 0;
  61
  62        return ((vec->addr + vec->bytes + PAGE_SIZE - 1) >> PAGE_SHIFT) -
  63                (vec->addr >> PAGE_SHIFT);
  64}
  65
  66static struct rds_mr *rds_mr_tree_walk(struct rb_root *root, u64 key,
  67                                       struct rds_mr *insert)
  68{
  69        struct rb_node **p = &root->rb_node;
  70        struct rb_node *parent = NULL;
  71        struct rds_mr *mr;
  72
  73        while (*p) {
  74                parent = *p;
  75                mr = rb_entry(parent, struct rds_mr, r_rb_node);
  76
  77                if (key < mr->r_key)
  78                        p = &(*p)->rb_left;
  79                else if (key > mr->r_key)
  80                        p = &(*p)->rb_right;
  81                else
  82                        return mr;
  83        }
  84
  85        if (insert) {
  86                rb_link_node(&insert->r_rb_node, parent, p);
  87                rb_insert_color(&insert->r_rb_node, root);
  88                atomic_inc(&insert->r_refcount);
  89        }
  90        return NULL;
  91}
  92
  93/*
  94 * Destroy the transport-specific part of a MR.
  95 */
  96static void rds_destroy_mr(struct rds_mr *mr)
  97{
  98        struct rds_sock *rs = mr->r_sock;
  99        void *trans_private = NULL;
 100        unsigned long flags;
 101
 102        rdsdebug("RDS: destroy mr key is %x refcnt %u\n",
 103                        mr->r_key, atomic_read(&mr->r_refcount));
 104
 105        if (test_and_set_bit(RDS_MR_DEAD, &mr->r_state))
 106                return;
 107
 108        spin_lock_irqsave(&rs->rs_rdma_lock, flags);
 109        if (!RB_EMPTY_NODE(&mr->r_rb_node))
 110                rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
 111        trans_private = mr->r_trans_private;
 112        mr->r_trans_private = NULL;
 113        spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 114
 115        if (trans_private)
 116                mr->r_trans->free_mr(trans_private, mr->r_invalidate);
 117}
 118
 119void __rds_put_mr_final(struct rds_mr *mr)
 120{
 121        rds_destroy_mr(mr);
 122        kfree(mr);
 123}
 124
 125/*
 126 * By the time this is called we can't have any more ioctls called on
 127 * the socket so we don't need to worry about racing with others.
 128 */
 129void rds_rdma_drop_keys(struct rds_sock *rs)
 130{
 131        struct rds_mr *mr;
 132        struct rb_node *node;
 133        unsigned long flags;
 134
 135        /* Release any MRs associated with this socket */
 136        spin_lock_irqsave(&rs->rs_rdma_lock, flags);
 137        while ((node = rb_first(&rs->rs_rdma_keys))) {
 138                mr = container_of(node, struct rds_mr, r_rb_node);
 139                if (mr->r_trans == rs->rs_transport)
 140                        mr->r_invalidate = 0;
 141                rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
 142                RB_CLEAR_NODE(&mr->r_rb_node);
 143                spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 144                rds_destroy_mr(mr);
 145                rds_mr_put(mr);
 146                spin_lock_irqsave(&rs->rs_rdma_lock, flags);
 147        }
 148        spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 149
 150        if (rs->rs_transport && rs->rs_transport->flush_mrs)
 151                rs->rs_transport->flush_mrs();
 152}
 153
 154/*
 155 * Helper function to pin user pages.
 156 */
 157static int rds_pin_pages(unsigned long user_addr, unsigned int nr_pages,
 158                        struct page **pages, int write)
 159{
 160        int ret;
 161
 162        ret = get_user_pages_fast(user_addr, nr_pages, write, pages);
 163
 164        if (ret >= 0 && ret < nr_pages) {
 165                while (ret--)
 166                        put_page(pages[ret]);
 167                ret = -EFAULT;
 168        }
 169
 170        return ret;
 171}
 172
 173static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 174                                u64 *cookie_ret, struct rds_mr **mr_ret)
 175{
 176        struct rds_mr *mr = NULL, *found;
 177        unsigned int nr_pages;
 178        struct page **pages = NULL;
 179        struct scatterlist *sg;
 180        void *trans_private;
 181        unsigned long flags;
 182        rds_rdma_cookie_t cookie;
 183        unsigned int nents;
 184        long i;
 185        int ret;
 186
 187        if (rs->rs_bound_addr == 0) {
 188                ret = -ENOTCONN; /* XXX not a great errno */
 189                goto out;
 190        }
 191
 192        if (!rs->rs_transport->get_mr) {
 193                ret = -EOPNOTSUPP;
 194                goto out;
 195        }
 196
 197        nr_pages = rds_pages_in_vec(&args->vec);
 198        if (nr_pages == 0) {
 199                ret = -EINVAL;
 200                goto out;
 201        }
 202
 203        rdsdebug("RDS: get_mr addr %llx len %llu nr_pages %u\n",
 204                args->vec.addr, args->vec.bytes, nr_pages);
 205
 206        /* XXX clamp nr_pages to limit the size of this alloc? */
 207        pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
 208        if (!pages) {
 209                ret = -ENOMEM;
 210                goto out;
 211        }
 212
 213        mr = kzalloc(sizeof(struct rds_mr), GFP_KERNEL);
 214        if (!mr) {
 215                ret = -ENOMEM;
 216                goto out;
 217        }
 218
 219        atomic_set(&mr->r_refcount, 1);
 220        RB_CLEAR_NODE(&mr->r_rb_node);
 221        mr->r_trans = rs->rs_transport;
 222        mr->r_sock = rs;
 223
 224        if (args->flags & RDS_RDMA_USE_ONCE)
 225                mr->r_use_once = 1;
 226        if (args->flags & RDS_RDMA_INVALIDATE)
 227                mr->r_invalidate = 1;
 228        if (args->flags & RDS_RDMA_READWRITE)
 229                mr->r_write = 1;
 230
 231        /*
 232         * Pin the pages that make up the user buffer and transfer the page
 233         * pointers to the mr's sg array.  We check to see if we've mapped
 234         * the whole region after transferring the partial page references
 235         * to the sg array so that we can have one page ref cleanup path.
 236         *
 237         * For now we have no flag that tells us whether the mapping is
 238         * r/o or r/w. We need to assume r/w, or we'll do a lot of RDMA to
 239         * the zero page.
 240         */
 241        ret = rds_pin_pages(args->vec.addr, nr_pages, pages, 1);
 242        if (ret < 0)
 243                goto out;
 244
 245        nents = ret;
 246        sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL);
 247        if (!sg) {
 248                ret = -ENOMEM;
 249                goto out;
 250        }
 251        WARN_ON(!nents);
 252        sg_init_table(sg, nents);
 253
 254        /* Stick all pages into the scatterlist */
 255        for (i = 0 ; i < nents; i++)
 256                sg_set_page(&sg[i], pages[i], PAGE_SIZE, 0);
 257
 258        rdsdebug("RDS: trans_private nents is %u\n", nents);
 259
 260        /* Obtain a transport specific MR. If this succeeds, the
 261         * s/g list is now owned by the MR.
 262         * Note that dma_map() implies that pending writes are
 263         * flushed to RAM, so no dma_sync is needed here. */
 264        trans_private = rs->rs_transport->get_mr(sg, nents, rs,
 265                                                 &mr->r_key);
 266
 267        if (IS_ERR(trans_private)) {
 268                for (i = 0 ; i < nents; i++)
 269                        put_page(sg_page(&sg[i]));
 270                kfree(sg);
 271                ret = PTR_ERR(trans_private);
 272                goto out;
 273        }
 274
 275        mr->r_trans_private = trans_private;
 276
 277        rdsdebug("RDS: get_mr put_user key is %x cookie_addr %p\n",
 278               mr->r_key, (void *)(unsigned long) args->cookie_addr);
 279
 280        /* The user may pass us an unaligned address, but we can only
 281         * map page aligned regions. So we keep the offset, and build
 282         * a 64bit cookie containing <R_Key, offset> and pass that
 283         * around. */
 284        cookie = rds_rdma_make_cookie(mr->r_key, args->vec.addr & ~PAGE_MASK);
 285        if (cookie_ret)
 286                *cookie_ret = cookie;
 287
 288        if (args->cookie_addr && put_user(cookie, (u64 __user *)(unsigned long) args->cookie_addr)) {
 289                ret = -EFAULT;
 290                goto out;
 291        }
 292
 293        /* Inserting the new MR into the rbtree bumps its
 294         * reference count. */
 295        spin_lock_irqsave(&rs->rs_rdma_lock, flags);
 296        found = rds_mr_tree_walk(&rs->rs_rdma_keys, mr->r_key, mr);
 297        spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 298
 299        BUG_ON(found && found != mr);
 300
 301        rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
 302        if (mr_ret) {
 303                atomic_inc(&mr->r_refcount);
 304                *mr_ret = mr;
 305        }
 306
 307        ret = 0;
 308out:
 309        kfree(pages);
 310        if (mr)
 311                rds_mr_put(mr);
 312        return ret;
 313}
 314
 315int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen)
 316{
 317        struct rds_get_mr_args args;
 318
 319        if (optlen != sizeof(struct rds_get_mr_args))
 320                return -EINVAL;
 321
 322        if (copy_from_user(&args, (struct rds_get_mr_args __user *)optval,
 323                           sizeof(struct rds_get_mr_args)))
 324                return -EFAULT;
 325
 326        return __rds_rdma_map(rs, &args, NULL, NULL);
 327}
 328
 329int rds_get_mr_for_dest(struct rds_sock *rs, char __user *optval, int optlen)
 330{
 331        struct rds_get_mr_for_dest_args args;
 332        struct rds_get_mr_args new_args;
 333
 334        if (optlen != sizeof(struct rds_get_mr_for_dest_args))
 335                return -EINVAL;
 336
 337        if (copy_from_user(&args, (struct rds_get_mr_for_dest_args __user *)optval,
 338                           sizeof(struct rds_get_mr_for_dest_args)))
 339                return -EFAULT;
 340
 341        /*
 342         * Initially, just behave like get_mr().
 343         * TODO: Implement get_mr as wrapper around this
 344         *       and deprecate it.
 345         */
 346        new_args.vec = args.vec;
 347        new_args.cookie_addr = args.cookie_addr;
 348        new_args.flags = args.flags;
 349
 350        return __rds_rdma_map(rs, &new_args, NULL, NULL);
 351}
 352
 353/*
 354 * Free the MR indicated by the given R_Key
 355 */
 356int rds_free_mr(struct rds_sock *rs, char __user *optval, int optlen)
 357{
 358        struct rds_free_mr_args args;
 359        struct rds_mr *mr;
 360        unsigned long flags;
 361
 362        if (optlen != sizeof(struct rds_free_mr_args))
 363                return -EINVAL;
 364
 365        if (copy_from_user(&args, (struct rds_free_mr_args __user *)optval,
 366                           sizeof(struct rds_free_mr_args)))
 367                return -EFAULT;
 368
 369        /* Special case - a null cookie means flush all unused MRs */
 370        if (args.cookie == 0) {
 371                if (!rs->rs_transport || !rs->rs_transport->flush_mrs)
 372                        return -EINVAL;
 373                rs->rs_transport->flush_mrs();
 374                return 0;
 375        }
 376
 377        /* Look up the MR given its R_key and remove it from the rbtree
 378         * so nobody else finds it.
 379         * This should also prevent races with rds_rdma_unuse.
 380         */
 381        spin_lock_irqsave(&rs->rs_rdma_lock, flags);
 382        mr = rds_mr_tree_walk(&rs->rs_rdma_keys, rds_rdma_cookie_key(args.cookie), NULL);
 383        if (mr) {
 384                rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
 385                RB_CLEAR_NODE(&mr->r_rb_node);
 386                if (args.flags & RDS_RDMA_INVALIDATE)
 387                        mr->r_invalidate = 1;
 388        }
 389        spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 390
 391        if (!mr)
 392                return -EINVAL;
 393
 394        /*
 395         * call rds_destroy_mr() ourselves so that we're sure it's done by the time
 396         * we return.  If we let rds_mr_put() do it it might not happen until
 397         * someone else drops their ref.
 398         */
 399        rds_destroy_mr(mr);
 400        rds_mr_put(mr);
 401        return 0;
 402}
 403
 404/*
 405 * This is called when we receive an extension header that
 406 * tells us this MR was used. It allows us to implement
 407 * use_once semantics
 408 */
 409void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force)
 410{
 411        struct rds_mr *mr;
 412        unsigned long flags;
 413        int zot_me = 0;
 414
 415        spin_lock_irqsave(&rs->rs_rdma_lock, flags);
 416        mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
 417        if (!mr) {
 418                printk(KERN_ERR "rds: trying to unuse MR with unknown r_key %u!\n", r_key);
 419                spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 420                return;
 421        }
 422
 423        if (mr->r_use_once || force) {
 424                rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
 425                RB_CLEAR_NODE(&mr->r_rb_node);
 426                zot_me = 1;
 427        }
 428        spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 429
 430        /* May have to issue a dma_sync on this memory region.
 431         * Note we could avoid this if the operation was a RDMA READ,
 432         * but at this point we can't tell. */
 433        if (mr->r_trans->sync_mr)
 434                mr->r_trans->sync_mr(mr->r_trans_private, DMA_FROM_DEVICE);
 435
 436        /* If the MR was marked as invalidate, this will
 437         * trigger an async flush. */
 438        if (zot_me)
 439                rds_destroy_mr(mr);
 440        rds_mr_put(mr);
 441}
 442
 443void rds_rdma_free_op(struct rm_rdma_op *ro)
 444{
 445        unsigned int i;
 446
 447        for (i = 0; i < ro->op_nents; i++) {
 448                struct page *page = sg_page(&ro->op_sg[i]);
 449
 450                /* Mark page dirty if it was possibly modified, which
 451                 * is the case for a RDMA_READ which copies from remote
 452                 * to local memory */
 453                if (!ro->op_write) {
 454                        BUG_ON(irqs_disabled());
 455                        set_page_dirty(page);
 456                }
 457                put_page(page);
 458        }
 459
 460        kfree(ro->op_notifier);
 461        ro->op_notifier = NULL;
 462        ro->op_active = 0;
 463}
 464
 465void rds_atomic_free_op(struct rm_atomic_op *ao)
 466{
 467        struct page *page = sg_page(ao->op_sg);
 468
 469        /* Mark page dirty if it was possibly modified, which
 470         * is the case for a RDMA_READ which copies from remote
 471         * to local memory */
 472        set_page_dirty(page);
 473        put_page(page);
 474
 475        kfree(ao->op_notifier);
 476        ao->op_notifier = NULL;
 477        ao->op_active = 0;
 478}
 479
 480
 481/*
 482 * Count the number of pages needed to describe an incoming iovec array.
 483 */
 484static int rds_rdma_pages(struct rds_iovec iov[], int nr_iovecs)
 485{
 486        int tot_pages = 0;
 487        unsigned int nr_pages;
 488        unsigned int i;
 489
 490        /* figure out the number of pages in the vector */
 491        for (i = 0; i < nr_iovecs; i++) {
 492                nr_pages = rds_pages_in_vec(&iov[i]);
 493                if (nr_pages == 0)
 494                        return -EINVAL;
 495
 496                tot_pages += nr_pages;
 497
 498                /*
 499                 * nr_pages for one entry is limited to (UINT_MAX>>PAGE_SHIFT)+1,
 500                 * so tot_pages cannot overflow without first going negative.
 501                 */
 502                if (tot_pages < 0)
 503                        return -EINVAL;
 504        }
 505
 506        return tot_pages;
 507}
 508
 509int rds_rdma_extra_size(struct rds_rdma_args *args)
 510{
 511        struct rds_iovec vec;
 512        struct rds_iovec __user *local_vec;
 513        int tot_pages = 0;
 514        unsigned int nr_pages;
 515        unsigned int i;
 516
 517        local_vec = (struct rds_iovec __user *)(unsigned long) args->local_vec_addr;
 518
 519        /* figure out the number of pages in the vector */
 520        for (i = 0; i < args->nr_local; i++) {
 521                if (copy_from_user(&vec, &local_vec[i],
 522                                   sizeof(struct rds_iovec)))
 523                        return -EFAULT;
 524
 525                nr_pages = rds_pages_in_vec(&vec);
 526                if (nr_pages == 0)
 527                        return -EINVAL;
 528
 529                tot_pages += nr_pages;
 530
 531                /*
 532                 * nr_pages for one entry is limited to (UINT_MAX>>PAGE_SHIFT)+1,
 533                 * so tot_pages cannot overflow without first going negative.
 534                 */
 535                if (tot_pages < 0)
 536                        return -EINVAL;
 537        }
 538
 539        return tot_pages * sizeof(struct scatterlist);
 540}
 541
 542/*
 543 * The application asks for a RDMA transfer.
 544 * Extract all arguments and set up the rdma_op
 545 */
 546int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
 547                          struct cmsghdr *cmsg)
 548{
 549        struct rds_rdma_args *args;
 550        struct rm_rdma_op *op = &rm->rdma;
 551        int nr_pages;
 552        unsigned int nr_bytes;
 553        struct page **pages = NULL;
 554        struct rds_iovec iovstack[UIO_FASTIOV], *iovs = iovstack;
 555        int iov_size;
 556        unsigned int i, j;
 557        int ret = 0;
 558
 559        if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_rdma_args))
 560            || rm->rdma.op_active)
 561                return -EINVAL;
 562
 563        args = CMSG_DATA(cmsg);
 564
 565        if (rs->rs_bound_addr == 0) {
 566                ret = -ENOTCONN; /* XXX not a great errno */
 567                goto out;
 568        }
 569
 570        if (args->nr_local > UIO_MAXIOV) {
 571                ret = -EMSGSIZE;
 572                goto out;
 573        }
 574
 575        /* Check whether to allocate the iovec area */
 576        iov_size = args->nr_local * sizeof(struct rds_iovec);
 577        if (args->nr_local > UIO_FASTIOV) {
 578                iovs = sock_kmalloc(rds_rs_to_sk(rs), iov_size, GFP_KERNEL);
 579                if (!iovs) {
 580                        ret = -ENOMEM;
 581                        goto out;
 582                }
 583        }
 584
 585        if (copy_from_user(iovs, (struct rds_iovec __user *)(unsigned long) args->local_vec_addr, iov_size)) {
 586                ret = -EFAULT;
 587                goto out;
 588        }
 589
 590        nr_pages = rds_rdma_pages(iovs, args->nr_local);
 591        if (nr_pages < 0) {
 592                ret = -EINVAL;
 593                goto out;
 594        }
 595
 596        pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
 597        if (!pages) {
 598                ret = -ENOMEM;
 599                goto out;
 600        }
 601
 602        op->op_write = !!(args->flags & RDS_RDMA_READWRITE);
 603        op->op_fence = !!(args->flags & RDS_RDMA_FENCE);
 604        op->op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
 605        op->op_silent = !!(args->flags & RDS_RDMA_SILENT);
 606        op->op_active = 1;
 607        op->op_recverr = rs->rs_recverr;
 608        WARN_ON(!nr_pages);
 609        op->op_sg = rds_message_alloc_sgs(rm, nr_pages);
 610        if (!op->op_sg) {
 611                ret = -ENOMEM;
 612                goto out;
 613        }
 614
 615        if (op->op_notify || op->op_recverr) {
 616                /* We allocate an uninitialized notifier here, because
 617                 * we don't want to do that in the completion handler. We
 618                 * would have to use GFP_ATOMIC there, and don't want to deal
 619                 * with failed allocations.
 620                 */
 621                op->op_notifier = kmalloc(sizeof(struct rds_notifier), GFP_KERNEL);
 622                if (!op->op_notifier) {
 623                        ret = -ENOMEM;
 624                        goto out;
 625                }
 626                op->op_notifier->n_user_token = args->user_token;
 627                op->op_notifier->n_status = RDS_RDMA_SUCCESS;
 628        }
 629
 630        /* The cookie contains the R_Key of the remote memory region, and
 631         * optionally an offset into it. This is how we implement RDMA into
 632         * unaligned memory.
 633         * When setting up the RDMA, we need to add that offset to the
 634         * destination address (which is really an offset into the MR)
 635         * FIXME: We may want to move this into ib_rdma.c
 636         */
 637        op->op_rkey = rds_rdma_cookie_key(args->cookie);
 638        op->op_remote_addr = args->remote_vec.addr + rds_rdma_cookie_offset(args->cookie);
 639
 640        nr_bytes = 0;
 641
 642        rdsdebug("RDS: rdma prepare nr_local %llu rva %llx rkey %x\n",
 643               (unsigned long long)args->nr_local,
 644               (unsigned long long)args->remote_vec.addr,
 645               op->op_rkey);
 646
 647        for (i = 0; i < args->nr_local; i++) {
 648                struct rds_iovec *iov = &iovs[i];
 649                /* don't need to check, rds_rdma_pages() verified nr will be +nonzero */
 650                unsigned int nr = rds_pages_in_vec(iov);
 651
 652                rs->rs_user_addr = iov->addr;
 653                rs->rs_user_bytes = iov->bytes;
 654
 655                /* If it's a WRITE operation, we want to pin the pages for reading.
 656                 * If it's a READ operation, we need to pin the pages for writing.
 657                 */
 658                ret = rds_pin_pages(iov->addr, nr, pages, !op->op_write);
 659                if (ret < 0)
 660                        goto out;
 661
 662                rdsdebug("RDS: nr_bytes %u nr %u iov->bytes %llu iov->addr %llx\n",
 663                         nr_bytes, nr, iov->bytes, iov->addr);
 664
 665                nr_bytes += iov->bytes;
 666
 667                for (j = 0; j < nr; j++) {
 668                        unsigned int offset = iov->addr & ~PAGE_MASK;
 669                        struct scatterlist *sg;
 670
 671                        sg = &op->op_sg[op->op_nents + j];
 672                        sg_set_page(sg, pages[j],
 673                                        min_t(unsigned int, iov->bytes, PAGE_SIZE - offset),
 674                                        offset);
 675
 676                        rdsdebug("RDS: sg->offset %x sg->len %x iov->addr %llx iov->bytes %llu\n",
 677                               sg->offset, sg->length, iov->addr, iov->bytes);
 678
 679                        iov->addr += sg->length;
 680                        iov->bytes -= sg->length;
 681                }
 682
 683                op->op_nents += nr;
 684        }
 685
 686        if (nr_bytes > args->remote_vec.bytes) {
 687                rdsdebug("RDS nr_bytes %u remote_bytes %u do not match\n",
 688                                nr_bytes,
 689                                (unsigned int) args->remote_vec.bytes);
 690                ret = -EINVAL;
 691                goto out;
 692        }
 693        op->op_bytes = nr_bytes;
 694
 695out:
 696        if (iovs != iovstack)
 697                sock_kfree_s(rds_rs_to_sk(rs), iovs, iov_size);
 698        kfree(pages);
 699        if (ret)
 700                rds_rdma_free_op(op);
 701        else
 702                rds_stats_inc(s_send_rdma);
 703
 704        return ret;
 705}
 706
 707/*
 708 * The application wants us to pass an RDMA destination (aka MR)
 709 * to the remote
 710 */
 711int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
 712                          struct cmsghdr *cmsg)
 713{
 714        unsigned long flags;
 715        struct rds_mr *mr;
 716        u32 r_key;
 717        int err = 0;
 718
 719        if (cmsg->cmsg_len < CMSG_LEN(sizeof(rds_rdma_cookie_t)) ||
 720            rm->m_rdma_cookie != 0)
 721                return -EINVAL;
 722
 723        memcpy(&rm->m_rdma_cookie, CMSG_DATA(cmsg), sizeof(rm->m_rdma_cookie));
 724
 725        /* We are reusing a previously mapped MR here. Most likely, the
 726         * application has written to the buffer, so we need to explicitly
 727         * flush those writes to RAM. Otherwise the HCA may not see them
 728         * when doing a DMA from that buffer.
 729         */
 730        r_key = rds_rdma_cookie_key(rm->m_rdma_cookie);
 731
 732        spin_lock_irqsave(&rs->rs_rdma_lock, flags);
 733        mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
 734        if (!mr)
 735                err = -EINVAL;  /* invalid r_key */
 736        else
 737                atomic_inc(&mr->r_refcount);
 738        spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 739
 740        if (mr) {
 741                mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
 742                rm->rdma.op_rdma_mr = mr;
 743        }
 744        return err;
 745}
 746
 747/*
 748 * The application passes us an address range it wants to enable RDMA
 749 * to/from. We map the area, and save the <R_Key,offset> pair
 750 * in rm->m_rdma_cookie. This causes it to be sent along to the peer
 751 * in an extension header.
 752 */
 753int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
 754                          struct cmsghdr *cmsg)
 755{
 756        if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_get_mr_args)) ||
 757            rm->m_rdma_cookie != 0)
 758                return -EINVAL;
 759
 760        return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.op_rdma_mr);
 761}
 762
 763/*
 764 * Fill in rds_message for an atomic request.
 765 */
 766int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
 767                    struct cmsghdr *cmsg)
 768{
 769        struct page *page = NULL;
 770        struct rds_atomic_args *args;
 771        int ret = 0;
 772
 773        if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_atomic_args))
 774         || rm->atomic.op_active)
 775                return -EINVAL;
 776
 777        args = CMSG_DATA(cmsg);
 778
 779        /* Nonmasked & masked cmsg ops converted to masked hw ops */
 780        switch (cmsg->cmsg_type) {
 781        case RDS_CMSG_ATOMIC_FADD:
 782                rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD;
 783                rm->atomic.op_m_fadd.add = args->fadd.add;
 784                rm->atomic.op_m_fadd.nocarry_mask = 0;
 785                break;
 786        case RDS_CMSG_MASKED_ATOMIC_FADD:
 787                rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD;
 788                rm->atomic.op_m_fadd.add = args->m_fadd.add;
 789                rm->atomic.op_m_fadd.nocarry_mask = args->m_fadd.nocarry_mask;
 790                break;
 791        case RDS_CMSG_ATOMIC_CSWP:
 792                rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP;
 793                rm->atomic.op_m_cswp.compare = args->cswp.compare;
 794                rm->atomic.op_m_cswp.swap = args->cswp.swap;
 795                rm->atomic.op_m_cswp.compare_mask = ~0;
 796                rm->atomic.op_m_cswp.swap_mask = ~0;
 797                break;
 798        case RDS_CMSG_MASKED_ATOMIC_CSWP:
 799                rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP;
 800                rm->atomic.op_m_cswp.compare = args->m_cswp.compare;
 801                rm->atomic.op_m_cswp.swap = args->m_cswp.swap;
 802                rm->atomic.op_m_cswp.compare_mask = args->m_cswp.compare_mask;
 803                rm->atomic.op_m_cswp.swap_mask = args->m_cswp.swap_mask;
 804                break;
 805        default:
 806                BUG(); /* should never happen */
 807        }
 808
 809        rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
 810        rm->atomic.op_silent = !!(args->flags & RDS_RDMA_SILENT);
 811        rm->atomic.op_active = 1;
 812        rm->atomic.op_recverr = rs->rs_recverr;
 813        rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1);
 814        if (!rm->atomic.op_sg) {
 815                ret = -ENOMEM;
 816                goto err;
 817        }
 818
 819        /* verify 8 byte-aligned */
 820        if (args->local_addr & 0x7) {
 821                ret = -EFAULT;
 822                goto err;
 823        }
 824
 825        ret = rds_pin_pages(args->local_addr, 1, &page, 1);
 826        if (ret != 1)
 827                goto err;
 828        ret = 0;
 829
 830        sg_set_page(rm->atomic.op_sg, page, 8, offset_in_page(args->local_addr));
 831
 832        if (rm->atomic.op_notify || rm->atomic.op_recverr) {
 833                /* We allocate an uninitialized notifier here, because
 834                 * we don't want to do that in the completion handler. We
 835                 * would have to use GFP_ATOMIC there, and don't want to deal
 836                 * with failed allocations.
 837                 */
 838                rm->atomic.op_notifier = kmalloc(sizeof(*rm->atomic.op_notifier), GFP_KERNEL);
 839                if (!rm->atomic.op_notifier) {
 840                        ret = -ENOMEM;
 841                        goto err;
 842                }
 843
 844                rm->atomic.op_notifier->n_user_token = args->user_token;
 845                rm->atomic.op_notifier->n_status = RDS_RDMA_SUCCESS;
 846        }
 847
 848        rm->atomic.op_rkey = rds_rdma_cookie_key(args->cookie);
 849        rm->atomic.op_remote_addr = args->remote_addr + rds_rdma_cookie_offset(args->cookie);
 850
 851        return ret;
 852err:
 853        if (page)
 854                put_page(page);
 855        kfree(rm->atomic.op_notifier);
 856
 857        return ret;
 858}
 859