linux/net/rds/ib_rdma.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/slab.h>
  35#include <linux/rculist.h>
  36#include <linux/llist.h>
  37
  38#include "rds_single_path.h"
  39#include "ib_mr.h"
  40
  41struct workqueue_struct *rds_ib_mr_wq;
  42
  43static DEFINE_PER_CPU(unsigned long, clean_list_grace);
  44#define CLEAN_LIST_BUSY_BIT 0
  45
  46static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
  47{
  48        struct rds_ib_device *rds_ibdev;
  49        struct rds_ib_ipaddr *i_ipaddr;
  50
  51        rcu_read_lock();
  52        list_for_each_entry_rcu(rds_ibdev, &rds_ib_devices, list) {
  53                list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
  54                        if (i_ipaddr->ipaddr == ipaddr) {
  55                                refcount_inc(&rds_ibdev->refcount);
  56                                rcu_read_unlock();
  57                                return rds_ibdev;
  58                        }
  59                }
  60        }
  61        rcu_read_unlock();
  62
  63        return NULL;
  64}
  65
  66static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
  67{
  68        struct rds_ib_ipaddr *i_ipaddr;
  69
  70        i_ipaddr = kmalloc(sizeof *i_ipaddr, GFP_KERNEL);
  71        if (!i_ipaddr)
  72                return -ENOMEM;
  73
  74        i_ipaddr->ipaddr = ipaddr;
  75
  76        spin_lock_irq(&rds_ibdev->spinlock);
  77        list_add_tail_rcu(&i_ipaddr->list, &rds_ibdev->ipaddr_list);
  78        spin_unlock_irq(&rds_ibdev->spinlock);
  79
  80        return 0;
  81}
  82
  83static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
  84{
  85        struct rds_ib_ipaddr *i_ipaddr;
  86        struct rds_ib_ipaddr *to_free = NULL;
  87
  88
  89        spin_lock_irq(&rds_ibdev->spinlock);
  90        list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
  91                if (i_ipaddr->ipaddr == ipaddr) {
  92                        list_del_rcu(&i_ipaddr->list);
  93                        to_free = i_ipaddr;
  94                        break;
  95                }
  96        }
  97        spin_unlock_irq(&rds_ibdev->spinlock);
  98
  99        if (to_free)
 100                kfree_rcu(to_free, rcu);
 101}
 102
 103int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
 104{
 105        struct rds_ib_device *rds_ibdev_old;
 106
 107        rds_ibdev_old = rds_ib_get_device(ipaddr);
 108        if (!rds_ibdev_old)
 109                return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
 110
 111        if (rds_ibdev_old != rds_ibdev) {
 112                rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr);
 113                rds_ib_dev_put(rds_ibdev_old);
 114                return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
 115        }
 116        rds_ib_dev_put(rds_ibdev_old);
 117
 118        return 0;
 119}
 120
 121void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
 122{
 123        struct rds_ib_connection *ic = conn->c_transport_data;
 124
 125        /* conn was previously on the nodev_conns_list */
 126        spin_lock_irq(&ib_nodev_conns_lock);
 127        BUG_ON(list_empty(&ib_nodev_conns));
 128        BUG_ON(list_empty(&ic->ib_node));
 129        list_del(&ic->ib_node);
 130
 131        spin_lock(&rds_ibdev->spinlock);
 132        list_add_tail(&ic->ib_node, &rds_ibdev->conn_list);
 133        spin_unlock(&rds_ibdev->spinlock);
 134        spin_unlock_irq(&ib_nodev_conns_lock);
 135
 136        ic->rds_ibdev = rds_ibdev;
 137        refcount_inc(&rds_ibdev->refcount);
 138}
 139
 140void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
 141{
 142        struct rds_ib_connection *ic = conn->c_transport_data;
 143
 144        /* place conn on nodev_conns_list */
 145        spin_lock(&ib_nodev_conns_lock);
 146
 147        spin_lock_irq(&rds_ibdev->spinlock);
 148        BUG_ON(list_empty(&ic->ib_node));
 149        list_del(&ic->ib_node);
 150        spin_unlock_irq(&rds_ibdev->spinlock);
 151
 152        list_add_tail(&ic->ib_node, &ib_nodev_conns);
 153
 154        spin_unlock(&ib_nodev_conns_lock);
 155
 156        ic->rds_ibdev = NULL;
 157        rds_ib_dev_put(rds_ibdev);
 158}
 159
 160void rds_ib_destroy_nodev_conns(void)
 161{
 162        struct rds_ib_connection *ic, *_ic;
 163        LIST_HEAD(tmp_list);
 164
 165        /* avoid calling conn_destroy with irqs off */
 166        spin_lock_irq(&ib_nodev_conns_lock);
 167        list_splice(&ib_nodev_conns, &tmp_list);
 168        spin_unlock_irq(&ib_nodev_conns_lock);
 169
 170        list_for_each_entry_safe(ic, _ic, &tmp_list, ib_node)
 171                rds_conn_destroy(ic->conn);
 172}
 173
 174void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo)
 175{
 176        struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool;
 177
 178        iinfo->rdma_mr_max = pool_1m->max_items;
 179        iinfo->rdma_mr_size = pool_1m->fmr_attr.max_pages;
 180}
 181
 182struct rds_ib_mr *rds_ib_reuse_mr(struct rds_ib_mr_pool *pool)
 183{
 184        struct rds_ib_mr *ibmr = NULL;
 185        struct llist_node *ret;
 186        unsigned long *flag;
 187
 188        preempt_disable();
 189        flag = this_cpu_ptr(&clean_list_grace);
 190        set_bit(CLEAN_LIST_BUSY_BIT, flag);
 191        ret = llist_del_first(&pool->clean_list);
 192        if (ret) {
 193                ibmr = llist_entry(ret, struct rds_ib_mr, llnode);
 194                if (pool->pool_type == RDS_IB_MR_8K_POOL)
 195                        rds_ib_stats_inc(s_ib_rdma_mr_8k_reused);
 196                else
 197                        rds_ib_stats_inc(s_ib_rdma_mr_1m_reused);
 198        }
 199
 200        clear_bit(CLEAN_LIST_BUSY_BIT, flag);
 201        preempt_enable();
 202        return ibmr;
 203}
 204
 205static inline void wait_clean_list_grace(void)
 206{
 207        int cpu;
 208        unsigned long *flag;
 209
 210        for_each_online_cpu(cpu) {
 211                flag = &per_cpu(clean_list_grace, cpu);
 212                while (test_bit(CLEAN_LIST_BUSY_BIT, flag))
 213                        cpu_relax();
 214        }
 215}
 216
 217void rds_ib_sync_mr(void *trans_private, int direction)
 218{
 219        struct rds_ib_mr *ibmr = trans_private;
 220        struct rds_ib_device *rds_ibdev = ibmr->device;
 221
 222        switch (direction) {
 223        case DMA_FROM_DEVICE:
 224                ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
 225                        ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
 226                break;
 227        case DMA_TO_DEVICE:
 228                ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg,
 229                        ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
 230                break;
 231        }
 232}
 233
 234void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 235{
 236        struct rds_ib_device *rds_ibdev = ibmr->device;
 237
 238        if (ibmr->sg_dma_len) {
 239                ib_dma_unmap_sg(rds_ibdev->dev,
 240                                ibmr->sg, ibmr->sg_len,
 241                                DMA_BIDIRECTIONAL);
 242                ibmr->sg_dma_len = 0;
 243        }
 244
 245        /* Release the s/g list */
 246        if (ibmr->sg_len) {
 247                unsigned int i;
 248
 249                for (i = 0; i < ibmr->sg_len; ++i) {
 250                        struct page *page = sg_page(&ibmr->sg[i]);
 251
 252                        /* FIXME we need a way to tell a r/w MR
 253                         * from a r/o MR */
 254                        WARN_ON(!page->mapping && irqs_disabled());
 255                        set_page_dirty(page);
 256                        put_page(page);
 257                }
 258                kfree(ibmr->sg);
 259
 260                ibmr->sg = NULL;
 261                ibmr->sg_len = 0;
 262        }
 263}
 264
 265void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 266{
 267        unsigned int pinned = ibmr->sg_len;
 268
 269        __rds_ib_teardown_mr(ibmr);
 270        if (pinned) {
 271                struct rds_ib_mr_pool *pool = ibmr->pool;
 272
 273                atomic_sub(pinned, &pool->free_pinned);
 274        }
 275}
 276
 277static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int free_all)
 278{
 279        unsigned int item_count;
 280
 281        item_count = atomic_read(&pool->item_count);
 282        if (free_all)
 283                return item_count;
 284
 285        return 0;
 286}
 287
 288/*
 289 * given an llist of mrs, put them all into the list_head for more processing
 290 */
 291static unsigned int llist_append_to_list(struct llist_head *llist,
 292                                         struct list_head *list)
 293{
 294        struct rds_ib_mr *ibmr;
 295        struct llist_node *node;
 296        struct llist_node *next;
 297        unsigned int count = 0;
 298
 299        node = llist_del_all(llist);
 300        while (node) {
 301                next = node->next;
 302                ibmr = llist_entry(node, struct rds_ib_mr, llnode);
 303                list_add_tail(&ibmr->unmap_list, list);
 304                node = next;
 305                count++;
 306        }
 307        return count;
 308}
 309
 310/*
 311 * this takes a list head of mrs and turns it into linked llist nodes
 312 * of clusters.  Each cluster has linked llist nodes of
 313 * MR_CLUSTER_SIZE mrs that are ready for reuse.
 314 */
 315static void list_to_llist_nodes(struct rds_ib_mr_pool *pool,
 316                                struct list_head *list,
 317                                struct llist_node **nodes_head,
 318                                struct llist_node **nodes_tail)
 319{
 320        struct rds_ib_mr *ibmr;
 321        struct llist_node *cur = NULL;
 322        struct llist_node **next = nodes_head;
 323
 324        list_for_each_entry(ibmr, list, unmap_list) {
 325                cur = &ibmr->llnode;
 326                *next = cur;
 327                next = &cur->next;
 328        }
 329        *next = NULL;
 330        *nodes_tail = cur;
 331}
 332
 333/*
 334 * Flush our pool of MRs.
 335 * At a minimum, all currently unused MRs are unmapped.
 336 * If the number of MRs allocated exceeds the limit, we also try
 337 * to free as many MRs as needed to get back to this limit.
 338 */
 339int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
 340                         int free_all, struct rds_ib_mr **ibmr_ret)
 341{
 342        struct rds_ib_mr *ibmr;
 343        struct llist_node *clean_nodes;
 344        struct llist_node *clean_tail;
 345        LIST_HEAD(unmap_list);
 346        unsigned long unpinned = 0;
 347        unsigned int nfreed = 0, dirty_to_clean = 0, free_goal;
 348
 349        if (pool->pool_type == RDS_IB_MR_8K_POOL)
 350                rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_flush);
 351        else
 352                rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_flush);
 353
 354        if (ibmr_ret) {
 355                DEFINE_WAIT(wait);
 356                while (!mutex_trylock(&pool->flush_lock)) {
 357                        ibmr = rds_ib_reuse_mr(pool);
 358                        if (ibmr) {
 359                                *ibmr_ret = ibmr;
 360                                finish_wait(&pool->flush_wait, &wait);
 361                                goto out_nolock;
 362                        }
 363
 364                        prepare_to_wait(&pool->flush_wait, &wait,
 365                                        TASK_UNINTERRUPTIBLE);
 366                        if (llist_empty(&pool->clean_list))
 367                                schedule();
 368
 369                        ibmr = rds_ib_reuse_mr(pool);
 370                        if (ibmr) {
 371                                *ibmr_ret = ibmr;
 372                                finish_wait(&pool->flush_wait, &wait);
 373                                goto out_nolock;
 374                        }
 375                }
 376                finish_wait(&pool->flush_wait, &wait);
 377        } else
 378                mutex_lock(&pool->flush_lock);
 379
 380        if (ibmr_ret) {
 381                ibmr = rds_ib_reuse_mr(pool);
 382                if (ibmr) {
 383                        *ibmr_ret = ibmr;
 384                        goto out;
 385                }
 386        }
 387
 388        /* Get the list of all MRs to be dropped. Ordering matters -
 389         * we want to put drop_list ahead of free_list.
 390         */
 391        dirty_to_clean = llist_append_to_list(&pool->drop_list, &unmap_list);
 392        dirty_to_clean += llist_append_to_list(&pool->free_list, &unmap_list);
 393        if (free_all)
 394                llist_append_to_list(&pool->clean_list, &unmap_list);
 395
 396        free_goal = rds_ib_flush_goal(pool, free_all);
 397
 398        if (list_empty(&unmap_list))
 399                goto out;
 400
 401        if (pool->use_fastreg)
 402                rds_ib_unreg_frmr(&unmap_list, &nfreed, &unpinned, free_goal);
 403        else
 404                rds_ib_unreg_fmr(&unmap_list, &nfreed, &unpinned, free_goal);
 405
 406        if (!list_empty(&unmap_list)) {
 407                /* we have to make sure that none of the things we're about
 408                 * to put on the clean list would race with other cpus trying
 409                 * to pull items off.  The llist would explode if we managed to
 410                 * remove something from the clean list and then add it back again
 411                 * while another CPU was spinning on that same item in llist_del_first.
 412                 *
 413                 * This is pretty unlikely, but just in case  wait for an llist grace period
 414                 * here before adding anything back into the clean list.
 415                 */
 416                wait_clean_list_grace();
 417
 418                list_to_llist_nodes(pool, &unmap_list, &clean_nodes, &clean_tail);
 419                if (ibmr_ret)
 420                        *ibmr_ret = llist_entry(clean_nodes, struct rds_ib_mr, llnode);
 421
 422                /* more than one entry in llist nodes */
 423                if (clean_nodes->next)
 424                        llist_add_batch(clean_nodes->next, clean_tail, &pool->clean_list);
 425
 426        }
 427
 428        atomic_sub(unpinned, &pool->free_pinned);
 429        atomic_sub(dirty_to_clean, &pool->dirty_count);
 430        atomic_sub(nfreed, &pool->item_count);
 431
 432out:
 433        mutex_unlock(&pool->flush_lock);
 434        if (waitqueue_active(&pool->flush_wait))
 435                wake_up(&pool->flush_wait);
 436out_nolock:
 437        return 0;
 438}
 439
 440struct rds_ib_mr *rds_ib_try_reuse_ibmr(struct rds_ib_mr_pool *pool)
 441{
 442        struct rds_ib_mr *ibmr = NULL;
 443        int iter = 0;
 444
 445        if (atomic_read(&pool->dirty_count) >= pool->max_items_soft / 10)
 446                queue_delayed_work(rds_ib_mr_wq, &pool->flush_worker, 10);
 447
 448        while (1) {
 449                ibmr = rds_ib_reuse_mr(pool);
 450                if (ibmr)
 451                        return ibmr;
 452
 453                if (atomic_inc_return(&pool->item_count) <= pool->max_items)
 454                        break;
 455
 456                atomic_dec(&pool->item_count);
 457
 458                if (++iter > 2) {
 459                        if (pool->pool_type == RDS_IB_MR_8K_POOL)
 460                                rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted);
 461                        else
 462                                rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted);
 463                        return ERR_PTR(-EAGAIN);
 464                }
 465
 466                /* We do have some empty MRs. Flush them out. */
 467                if (pool->pool_type == RDS_IB_MR_8K_POOL)
 468                        rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait);
 469                else
 470                        rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait);
 471
 472                rds_ib_flush_mr_pool(pool, 0, &ibmr);
 473                if (ibmr)
 474                        return ibmr;
 475        }
 476
 477        return ibmr;
 478}
 479
 480static void rds_ib_mr_pool_flush_worker(struct work_struct *work)
 481{
 482        struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker.work);
 483
 484        rds_ib_flush_mr_pool(pool, 0, NULL);
 485}
 486
 487void rds_ib_free_mr(void *trans_private, int invalidate)
 488{
 489        struct rds_ib_mr *ibmr = trans_private;
 490        struct rds_ib_mr_pool *pool = ibmr->pool;
 491        struct rds_ib_device *rds_ibdev = ibmr->device;
 492
 493        rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
 494
 495        /* Return it to the pool's free list */
 496        if (rds_ibdev->use_fastreg)
 497                rds_ib_free_frmr_list(ibmr);
 498        else
 499                rds_ib_free_fmr_list(ibmr);
 500
 501        atomic_add(ibmr->sg_len, &pool->free_pinned);
 502        atomic_inc(&pool->dirty_count);
 503
 504        /* If we've pinned too many pages, request a flush */
 505        if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned ||
 506            atomic_read(&pool->dirty_count) >= pool->max_items / 5)
 507                queue_delayed_work(rds_ib_mr_wq, &pool->flush_worker, 10);
 508
 509        if (invalidate) {
 510                if (likely(!in_interrupt())) {
 511                        rds_ib_flush_mr_pool(pool, 0, NULL);
 512                } else {
 513                        /* We get here if the user created a MR marked
 514                         * as use_once and invalidate at the same time.
 515                         */
 516                        queue_delayed_work(rds_ib_mr_wq,
 517                                           &pool->flush_worker, 10);
 518                }
 519        }
 520
 521        rds_ib_dev_put(rds_ibdev);
 522}
 523
 524void rds_ib_flush_mrs(void)
 525{
 526        struct rds_ib_device *rds_ibdev;
 527
 528        down_read(&rds_ib_devices_lock);
 529        list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
 530                if (rds_ibdev->mr_8k_pool)
 531                        rds_ib_flush_mr_pool(rds_ibdev->mr_8k_pool, 0, NULL);
 532
 533                if (rds_ibdev->mr_1m_pool)
 534                        rds_ib_flush_mr_pool(rds_ibdev->mr_1m_pool, 0, NULL);
 535        }
 536        up_read(&rds_ib_devices_lock);
 537}
 538
 539void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 540                    struct rds_sock *rs, u32 *key_ret,
 541                    struct rds_connection *conn)
 542{
 543        struct rds_ib_device *rds_ibdev;
 544        struct rds_ib_mr *ibmr = NULL;
 545        struct rds_ib_connection *ic = NULL;
 546        int ret;
 547
 548        rds_ibdev = rds_ib_get_device(rs->rs_bound_addr);
 549        if (!rds_ibdev) {
 550                ret = -ENODEV;
 551                goto out;
 552        }
 553
 554        if (conn)
 555                ic = conn->c_transport_data;
 556
 557        if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) {
 558                ret = -ENODEV;
 559                goto out;
 560        }
 561
 562        if (rds_ibdev->use_fastreg)
 563                ibmr = rds_ib_reg_frmr(rds_ibdev, ic, sg, nents, key_ret);
 564        else
 565                ibmr = rds_ib_reg_fmr(rds_ibdev, sg, nents, key_ret);
 566        if (IS_ERR(ibmr)) {
 567                ret = PTR_ERR(ibmr);
 568                pr_warn("RDS/IB: rds_ib_get_mr failed (errno=%d)\n", ret);
 569        } else {
 570                return ibmr;
 571        }
 572
 573 out:
 574        if (rds_ibdev)
 575                rds_ib_dev_put(rds_ibdev);
 576
 577        return ERR_PTR(ret);
 578}
 579
 580void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
 581{
 582        cancel_delayed_work_sync(&pool->flush_worker);
 583        rds_ib_flush_mr_pool(pool, 1, NULL);
 584        WARN_ON(atomic_read(&pool->item_count));
 585        WARN_ON(atomic_read(&pool->free_pinned));
 586        kfree(pool);
 587}
 588
 589struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
 590                                             int pool_type)
 591{
 592        struct rds_ib_mr_pool *pool;
 593
 594        pool = kzalloc(sizeof(*pool), GFP_KERNEL);
 595        if (!pool)
 596                return ERR_PTR(-ENOMEM);
 597
 598        pool->pool_type = pool_type;
 599        init_llist_head(&pool->free_list);
 600        init_llist_head(&pool->drop_list);
 601        init_llist_head(&pool->clean_list);
 602        mutex_init(&pool->flush_lock);
 603        init_waitqueue_head(&pool->flush_wait);
 604        INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
 605
 606        if (pool_type == RDS_IB_MR_1M_POOL) {
 607                /* +1 allows for unaligned MRs */
 608                pool->fmr_attr.max_pages = RDS_MR_1M_MSG_SIZE + 1;
 609                pool->max_items = rds_ibdev->max_1m_mrs;
 610        } else {
 611                /* pool_type == RDS_IB_MR_8K_POOL */
 612                pool->fmr_attr.max_pages = RDS_MR_8K_MSG_SIZE + 1;
 613                pool->max_items = rds_ibdev->max_8k_mrs;
 614        }
 615
 616        pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4;
 617        pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
 618        pool->fmr_attr.page_shift = PAGE_SHIFT;
 619        pool->max_items_soft = rds_ibdev->max_mrs * 3 / 4;
 620        pool->use_fastreg = rds_ibdev->use_fastreg;
 621
 622        return pool;
 623}
 624
 625int rds_ib_mr_init(void)
 626{
 627        rds_ib_mr_wq = alloc_workqueue("rds_mr_flushd", WQ_MEM_RECLAIM, 0);
 628        if (!rds_ib_mr_wq)
 629                return -ENOMEM;
 630        return 0;
 631}
 632
 633/* By the time this is called all the IB devices should have been torn down and
 634 * had their pools freed.  As each pool is freed its work struct is waited on,
 635 * so the pool flushing work queue should be idle by the time we get here.
 636 */
 637void rds_ib_mr_exit(void)
 638{
 639        destroy_workqueue(rds_ib_mr_wq);
 640}
 641