linux/net/rds/iw_recv.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/pci.h>
  35#include <linux/dma-mapping.h>
  36#include <rdma/rdma_cm.h>
  37
  38#include "rds.h"
  39#include "iw.h"
  40
  41static struct kmem_cache *rds_iw_incoming_slab;
  42static struct kmem_cache *rds_iw_frag_slab;
  43static atomic_t rds_iw_allocation = ATOMIC_INIT(0);
  44
  45static void rds_iw_frag_drop_page(struct rds_page_frag *frag)
  46{
  47        rdsdebug("frag %p page %p\n", frag, frag->f_page);
  48        __free_page(frag->f_page);
  49        frag->f_page = NULL;
  50}
  51
  52static void rds_iw_frag_free(struct rds_page_frag *frag)
  53{
  54        rdsdebug("frag %p page %p\n", frag, frag->f_page);
  55        BUG_ON(frag->f_page != NULL);
  56        kmem_cache_free(rds_iw_frag_slab, frag);
  57}
  58
  59/*
  60 * We map a page at a time.  Its fragments are posted in order.  This
  61 * is called in fragment order as the fragments get send completion events.
  62 * Only the last frag in the page performs the unmapping.
  63 *
  64 * It's OK for ring cleanup to call this in whatever order it likes because
  65 * DMA is not in flight and so we can unmap while other ring entries still
  66 * hold page references in their frags.
  67 */
  68static void rds_iw_recv_unmap_page(struct rds_iw_connection *ic,
  69                                   struct rds_iw_recv_work *recv)
  70{
  71        struct rds_page_frag *frag = recv->r_frag;
  72
  73        rdsdebug("recv %p frag %p page %p\n", recv, frag, frag->f_page);
  74        if (frag->f_mapped)
  75                ib_dma_unmap_page(ic->i_cm_id->device,
  76                               frag->f_mapped,
  77                               RDS_FRAG_SIZE, DMA_FROM_DEVICE);
  78        frag->f_mapped = 0;
  79}
  80
  81void rds_iw_recv_init_ring(struct rds_iw_connection *ic)
  82{
  83        struct rds_iw_recv_work *recv;
  84        u32 i;
  85
  86        for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
  87                struct ib_sge *sge;
  88
  89                recv->r_iwinc = NULL;
  90                recv->r_frag = NULL;
  91
  92                recv->r_wr.next = NULL;
  93                recv->r_wr.wr_id = i;
  94                recv->r_wr.sg_list = recv->r_sge;
  95                recv->r_wr.num_sge = RDS_IW_RECV_SGE;
  96
  97                sge = rds_iw_data_sge(ic, recv->r_sge);
  98                sge->addr = 0;
  99                sge->length = RDS_FRAG_SIZE;
 100                sge->lkey = 0;
 101
 102                sge = rds_iw_header_sge(ic, recv->r_sge);
 103                sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
 104                sge->length = sizeof(struct rds_header);
 105                sge->lkey = 0;
 106        }
 107}
 108
 109static void rds_iw_recv_clear_one(struct rds_iw_connection *ic,
 110                                  struct rds_iw_recv_work *recv)
 111{
 112        if (recv->r_iwinc) {
 113                rds_inc_put(&recv->r_iwinc->ii_inc);
 114                recv->r_iwinc = NULL;
 115        }
 116        if (recv->r_frag) {
 117                rds_iw_recv_unmap_page(ic, recv);
 118                if (recv->r_frag->f_page)
 119                        rds_iw_frag_drop_page(recv->r_frag);
 120                rds_iw_frag_free(recv->r_frag);
 121                recv->r_frag = NULL;
 122        }
 123}
 124
 125void rds_iw_recv_clear_ring(struct rds_iw_connection *ic)
 126{
 127        u32 i;
 128
 129        for (i = 0; i < ic->i_recv_ring.w_nr; i++)
 130                rds_iw_recv_clear_one(ic, &ic->i_recvs[i]);
 131
 132        if (ic->i_frag.f_page)
 133                rds_iw_frag_drop_page(&ic->i_frag);
 134}
 135
 136static int rds_iw_recv_refill_one(struct rds_connection *conn,
 137                                  struct rds_iw_recv_work *recv,
 138                                  gfp_t kptr_gfp, gfp_t page_gfp)
 139{
 140        struct rds_iw_connection *ic = conn->c_transport_data;
 141        dma_addr_t dma_addr;
 142        struct ib_sge *sge;
 143        int ret = -ENOMEM;
 144
 145        if (recv->r_iwinc == NULL) {
 146                if (atomic_read(&rds_iw_allocation) >= rds_iw_sysctl_max_recv_allocation) {
 147                        rds_iw_stats_inc(s_iw_rx_alloc_limit);
 148                        goto out;
 149                }
 150                recv->r_iwinc = kmem_cache_alloc(rds_iw_incoming_slab,
 151                                                 kptr_gfp);
 152                if (recv->r_iwinc == NULL)
 153                        goto out;
 154                atomic_inc(&rds_iw_allocation);
 155                INIT_LIST_HEAD(&recv->r_iwinc->ii_frags);
 156                rds_inc_init(&recv->r_iwinc->ii_inc, conn, conn->c_faddr);
 157        }
 158
 159        if (recv->r_frag == NULL) {
 160                recv->r_frag = kmem_cache_alloc(rds_iw_frag_slab, kptr_gfp);
 161                if (recv->r_frag == NULL)
 162                        goto out;
 163                INIT_LIST_HEAD(&recv->r_frag->f_item);
 164                recv->r_frag->f_page = NULL;
 165        }
 166
 167        if (ic->i_frag.f_page == NULL) {
 168                ic->i_frag.f_page = alloc_page(page_gfp);
 169                if (ic->i_frag.f_page == NULL)
 170                        goto out;
 171                ic->i_frag.f_offset = 0;
 172        }
 173
 174        dma_addr = ib_dma_map_page(ic->i_cm_id->device,
 175                                  ic->i_frag.f_page,
 176                                  ic->i_frag.f_offset,
 177                                  RDS_FRAG_SIZE,
 178                                  DMA_FROM_DEVICE);
 179        if (ib_dma_mapping_error(ic->i_cm_id->device, dma_addr))
 180                goto out;
 181
 182        /*
 183         * Once we get the RDS_PAGE_LAST_OFF frag then rds_iw_frag_unmap()
 184         * must be called on this recv.  This happens as completions hit
 185         * in order or on connection shutdown.
 186         */
 187        recv->r_frag->f_page = ic->i_frag.f_page;
 188        recv->r_frag->f_offset = ic->i_frag.f_offset;
 189        recv->r_frag->f_mapped = dma_addr;
 190
 191        sge = rds_iw_data_sge(ic, recv->r_sge);
 192        sge->addr = dma_addr;
 193        sge->length = RDS_FRAG_SIZE;
 194
 195        sge = rds_iw_header_sge(ic, recv->r_sge);
 196        sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
 197        sge->length = sizeof(struct rds_header);
 198
 199        get_page(recv->r_frag->f_page);
 200
 201        if (ic->i_frag.f_offset < RDS_PAGE_LAST_OFF) {
 202                ic->i_frag.f_offset += RDS_FRAG_SIZE;
 203        } else {
 204                put_page(ic->i_frag.f_page);
 205                ic->i_frag.f_page = NULL;
 206                ic->i_frag.f_offset = 0;
 207        }
 208
 209        ret = 0;
 210out:
 211        return ret;
 212}
 213
 214/*
 215 * This tries to allocate and post unused work requests after making sure that
 216 * they have all the allocations they need to queue received fragments into
 217 * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
 218 * pairs don't go unmatched.
 219 *
 220 * -1 is returned if posting fails due to temporary resource exhaustion.
 221 */
 222int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
 223                       gfp_t page_gfp, int prefill)
 224{
 225        struct rds_iw_connection *ic = conn->c_transport_data;
 226        struct rds_iw_recv_work *recv;
 227        struct ib_recv_wr *failed_wr;
 228        unsigned int posted = 0;
 229        int ret = 0;
 230        u32 pos;
 231
 232        while ((prefill || rds_conn_up(conn))
 233                        && rds_iw_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
 234                if (pos >= ic->i_recv_ring.w_nr) {
 235                        printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
 236                                        pos);
 237                        ret = -EINVAL;
 238                        break;
 239                }
 240
 241                recv = &ic->i_recvs[pos];
 242                ret = rds_iw_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
 243                if (ret) {
 244                        ret = -1;
 245                        break;
 246                }
 247
 248                /* XXX when can this fail? */
 249                ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
 250                rdsdebug("recv %p iwinc %p page %p addr %lu ret %d\n", recv,
 251                         recv->r_iwinc, recv->r_frag->f_page,
 252                         (long) recv->r_frag->f_mapped, ret);
 253                if (ret) {
 254                        rds_iw_conn_error(conn, "recv post on "
 255                               "%pI4 returned %d, disconnecting and "
 256                               "reconnecting\n", &conn->c_faddr,
 257                               ret);
 258                        ret = -1;
 259                        break;
 260                }
 261
 262                posted++;
 263        }
 264
 265        /* We're doing flow control - update the window. */
 266        if (ic->i_flowctl && posted)
 267                rds_iw_advertise_credits(conn, posted);
 268
 269        if (ret)
 270                rds_iw_ring_unalloc(&ic->i_recv_ring, 1);
 271        return ret;
 272}
 273
 274void rds_iw_inc_purge(struct rds_incoming *inc)
 275{
 276        struct rds_iw_incoming *iwinc;
 277        struct rds_page_frag *frag;
 278        struct rds_page_frag *pos;
 279
 280        iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
 281        rdsdebug("purging iwinc %p inc %p\n", iwinc, inc);
 282
 283        list_for_each_entry_safe(frag, pos, &iwinc->ii_frags, f_item) {
 284                list_del_init(&frag->f_item);
 285                rds_iw_frag_drop_page(frag);
 286                rds_iw_frag_free(frag);
 287        }
 288}
 289
 290void rds_iw_inc_free(struct rds_incoming *inc)
 291{
 292        struct rds_iw_incoming *iwinc;
 293
 294        iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
 295
 296        rds_iw_inc_purge(inc);
 297        rdsdebug("freeing iwinc %p inc %p\n", iwinc, inc);
 298        BUG_ON(!list_empty(&iwinc->ii_frags));
 299        kmem_cache_free(rds_iw_incoming_slab, iwinc);
 300        atomic_dec(&rds_iw_allocation);
 301        BUG_ON(atomic_read(&rds_iw_allocation) < 0);
 302}
 303
 304int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
 305                            size_t size)
 306{
 307        struct rds_iw_incoming *iwinc;
 308        struct rds_page_frag *frag;
 309        struct iovec *iov = first_iov;
 310        unsigned long to_copy;
 311        unsigned long frag_off = 0;
 312        unsigned long iov_off = 0;
 313        int copied = 0;
 314        int ret;
 315        u32 len;
 316
 317        iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
 318        frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
 319        len = be32_to_cpu(inc->i_hdr.h_len);
 320
 321        while (copied < size && copied < len) {
 322                if (frag_off == RDS_FRAG_SIZE) {
 323                        frag = list_entry(frag->f_item.next,
 324                                          struct rds_page_frag, f_item);
 325                        frag_off = 0;
 326                }
 327                while (iov_off == iov->iov_len) {
 328                        iov_off = 0;
 329                        iov++;
 330                }
 331
 332                to_copy = min(iov->iov_len - iov_off, RDS_FRAG_SIZE - frag_off);
 333                to_copy = min_t(size_t, to_copy, size - copied);
 334                to_copy = min_t(unsigned long, to_copy, len - copied);
 335
 336                rdsdebug("%lu bytes to user [%p, %zu] + %lu from frag "
 337                         "[%p, %lu] + %lu\n",
 338                         to_copy, iov->iov_base, iov->iov_len, iov_off,
 339                         frag->f_page, frag->f_offset, frag_off);
 340
 341                /* XXX needs + offset for multiple recvs per page */
 342                ret = rds_page_copy_to_user(frag->f_page,
 343                                            frag->f_offset + frag_off,
 344                                            iov->iov_base + iov_off,
 345                                            to_copy);
 346                if (ret) {
 347                        copied = ret;
 348                        break;
 349                }
 350
 351                iov_off += to_copy;
 352                frag_off += to_copy;
 353                copied += to_copy;
 354        }
 355
 356        return copied;
 357}
 358
 359/* ic starts out kzalloc()ed */
 360void rds_iw_recv_init_ack(struct rds_iw_connection *ic)
 361{
 362        struct ib_send_wr *wr = &ic->i_ack_wr;
 363        struct ib_sge *sge = &ic->i_ack_sge;
 364
 365        sge->addr = ic->i_ack_dma;
 366        sge->length = sizeof(struct rds_header);
 367        sge->lkey = rds_iw_local_dma_lkey(ic);
 368
 369        wr->sg_list = sge;
 370        wr->num_sge = 1;
 371        wr->opcode = IB_WR_SEND;
 372        wr->wr_id = RDS_IW_ACK_WR_ID;
 373        wr->send_flags = IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 374}
 375
 376/*
 377 * You'd think that with reliable IB connections you wouldn't need to ack
 378 * messages that have been received.  The problem is that IB hardware generates
 379 * an ack message before it has DMAed the message into memory.  This creates a
 380 * potential message loss if the HCA is disabled for any reason between when it
 381 * sends the ack and before the message is DMAed and processed.  This is only a
 382 * potential issue if another HCA is available for fail-over.
 383 *
 384 * When the remote host receives our ack they'll free the sent message from
 385 * their send queue.  To decrease the latency of this we always send an ack
 386 * immediately after we've received messages.
 387 *
 388 * For simplicity, we only have one ack in flight at a time.  This puts
 389 * pressure on senders to have deep enough send queues to absorb the latency of
 390 * a single ack frame being in flight.  This might not be good enough.
 391 *
 392 * This is implemented by have a long-lived send_wr and sge which point to a
 393 * statically allocated ack frame.  This ack wr does not fall under the ring
 394 * accounting that the tx and rx wrs do.  The QP attribute specifically makes
 395 * room for it beyond the ring size.  Send completion notices its special
 396 * wr_id and avoids working with the ring in that case.
 397 */
 398#ifndef KERNEL_HAS_ATOMIC64
 399static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
 400                                int ack_required)
 401{
 402        unsigned long flags;
 403
 404        spin_lock_irqsave(&ic->i_ack_lock, flags);
 405        ic->i_ack_next = seq;
 406        if (ack_required)
 407                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 408        spin_unlock_irqrestore(&ic->i_ack_lock, flags);
 409}
 410
 411static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
 412{
 413        unsigned long flags;
 414        u64 seq;
 415
 416        clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 417
 418        spin_lock_irqsave(&ic->i_ack_lock, flags);
 419        seq = ic->i_ack_next;
 420        spin_unlock_irqrestore(&ic->i_ack_lock, flags);
 421
 422        return seq;
 423}
 424#else
 425static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
 426                                int ack_required)
 427{
 428        atomic64_set(&ic->i_ack_next, seq);
 429        if (ack_required) {
 430                smp_mb__before_clear_bit();
 431                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 432        }
 433}
 434
 435static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
 436{
 437        clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 438        smp_mb__after_clear_bit();
 439
 440        return atomic64_read(&ic->i_ack_next);
 441}
 442#endif
 443
 444
 445static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credits)
 446{
 447        struct rds_header *hdr = ic->i_ack;
 448        struct ib_send_wr *failed_wr;
 449        u64 seq;
 450        int ret;
 451
 452        seq = rds_iw_get_ack(ic);
 453
 454        rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
 455        rds_message_populate_header(hdr, 0, 0, 0);
 456        hdr->h_ack = cpu_to_be64(seq);
 457        hdr->h_credit = adv_credits;
 458        rds_message_make_checksum(hdr);
 459        ic->i_ack_queued = jiffies;
 460
 461        ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, &failed_wr);
 462        if (unlikely(ret)) {
 463                /* Failed to send. Release the WR, and
 464                 * force another ACK.
 465                 */
 466                clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
 467                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 468
 469                rds_iw_stats_inc(s_iw_ack_send_failure);
 470                /* Need to finesse this later. */
 471                BUG();
 472        } else
 473                rds_iw_stats_inc(s_iw_ack_sent);
 474}
 475
 476/*
 477 * There are 3 ways of getting acknowledgements to the peer:
 478 *  1.  We call rds_iw_attempt_ack from the recv completion handler
 479 *      to send an ACK-only frame.
 480 *      However, there can be only one such frame in the send queue
 481 *      at any time, so we may have to postpone it.
 482 *  2.  When another (data) packet is transmitted while there's
 483 *      an ACK in the queue, we piggyback the ACK sequence number
 484 *      on the data packet.
 485 *  3.  If the ACK WR is done sending, we get called from the
 486 *      send queue completion handler, and check whether there's
 487 *      another ACK pending (postponed because the WR was on the
 488 *      queue). If so, we transmit it.
 489 *
 490 * We maintain 2 variables:
 491 *  -   i_ack_flags, which keeps track of whether the ACK WR
 492 *      is currently in the send queue or not (IB_ACK_IN_FLIGHT)
 493 *  -   i_ack_next, which is the last sequence number we received
 494 *
 495 * Potentially, send queue and receive queue handlers can run concurrently.
 496 * It would be nice to not have to use a spinlock to synchronize things,
 497 * but the one problem that rules this out is that 64bit updates are
 498 * not atomic on all platforms. Things would be a lot simpler if
 499 * we had atomic64 or maybe cmpxchg64 everywhere.
 500 *
 501 * Reconnecting complicates this picture just slightly. When we
 502 * reconnect, we may be seeing duplicate packets. The peer
 503 * is retransmitting them, because it hasn't seen an ACK for
 504 * them. It is important that we ACK these.
 505 *
 506 * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
 507 * this flag set *MUST* be acknowledged immediately.
 508 */
 509
 510/*
 511 * When we get here, we're called from the recv queue handler.
 512 * Check whether we ought to transmit an ACK.
 513 */
 514void rds_iw_attempt_ack(struct rds_iw_connection *ic)
 515{
 516        unsigned int adv_credits;
 517
 518        if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
 519                return;
 520
 521        if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
 522                rds_iw_stats_inc(s_iw_ack_send_delayed);
 523                return;
 524        }
 525
 526        /* Can we get a send credit? */
 527        if (!rds_iw_send_grab_credits(ic, 1, &adv_credits, 0, RDS_MAX_ADV_CREDIT)) {
 528                rds_iw_stats_inc(s_iw_tx_throttle);
 529                clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
 530                return;
 531        }
 532
 533        clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 534        rds_iw_send_ack(ic, adv_credits);
 535}
 536
 537/*
 538 * We get here from the send completion handler, when the
 539 * adapter tells us the ACK frame was sent.
 540 */
 541void rds_iw_ack_send_complete(struct rds_iw_connection *ic)
 542{
 543        clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
 544        rds_iw_attempt_ack(ic);
 545}
 546
 547/*
 548 * This is called by the regular xmit code when it wants to piggyback
 549 * an ACK on an outgoing frame.
 550 */
 551u64 rds_iw_piggyb_ack(struct rds_iw_connection *ic)
 552{
 553        if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
 554                rds_iw_stats_inc(s_iw_ack_send_piggybacked);
 555        return rds_iw_get_ack(ic);
 556}
 557
 558/*
 559 * It's kind of lame that we're copying from the posted receive pages into
 560 * long-lived bitmaps.  We could have posted the bitmaps and rdma written into
 561 * them.  But receiving new congestion bitmaps should be a *rare* event, so
 562 * hopefully we won't need to invest that complexity in making it more
 563 * efficient.  By copying we can share a simpler core with TCP which has to
 564 * copy.
 565 */
 566static void rds_iw_cong_recv(struct rds_connection *conn,
 567                              struct rds_iw_incoming *iwinc)
 568{
 569        struct rds_cong_map *map;
 570        unsigned int map_off;
 571        unsigned int map_page;
 572        struct rds_page_frag *frag;
 573        unsigned long frag_off;
 574        unsigned long to_copy;
 575        unsigned long copied;
 576        uint64_t uncongested = 0;
 577        void *addr;
 578
 579        /* catch completely corrupt packets */
 580        if (be32_to_cpu(iwinc->ii_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
 581                return;
 582
 583        map = conn->c_fcong;
 584        map_page = 0;
 585        map_off = 0;
 586
 587        frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
 588        frag_off = 0;
 589
 590        copied = 0;
 591
 592        while (copied < RDS_CONG_MAP_BYTES) {
 593                uint64_t *src, *dst;
 594                unsigned int k;
 595
 596                to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
 597                BUG_ON(to_copy & 7); /* Must be 64bit aligned. */
 598
 599                addr = kmap_atomic(frag->f_page, KM_SOFTIRQ0);
 600
 601                src = addr + frag_off;
 602                dst = (void *)map->m_page_addrs[map_page] + map_off;
 603                for (k = 0; k < to_copy; k += 8) {
 604                        /* Record ports that became uncongested, ie
 605                         * bits that changed from 0 to 1. */
 606                        uncongested |= ~(*src) & *dst;
 607                        *dst++ = *src++;
 608                }
 609                kunmap_atomic(addr, KM_SOFTIRQ0);
 610
 611                copied += to_copy;
 612
 613                map_off += to_copy;
 614                if (map_off == PAGE_SIZE) {
 615                        map_off = 0;
 616                        map_page++;
 617                }
 618
 619                frag_off += to_copy;
 620                if (frag_off == RDS_FRAG_SIZE) {
 621                        frag = list_entry(frag->f_item.next,
 622                                          struct rds_page_frag, f_item);
 623                        frag_off = 0;
 624                }
 625        }
 626
 627        /* the congestion map is in little endian order */
 628        uncongested = le64_to_cpu(uncongested);
 629
 630        rds_cong_map_updated(map, uncongested);
 631}
 632
 633/*
 634 * Rings are posted with all the allocations they'll need to queue the
 635 * incoming message to the receiving socket so this can't fail.
 636 * All fragments start with a header, so we can make sure we're not receiving
 637 * garbage, and we can tell a small 8 byte fragment from an ACK frame.
 638 */
 639struct rds_iw_ack_state {
 640        u64             ack_next;
 641        u64             ack_recv;
 642        unsigned int    ack_required:1;
 643        unsigned int    ack_next_valid:1;
 644        unsigned int    ack_recv_valid:1;
 645};
 646
 647static void rds_iw_process_recv(struct rds_connection *conn,
 648                                struct rds_iw_recv_work *recv, u32 byte_len,
 649                                struct rds_iw_ack_state *state)
 650{
 651        struct rds_iw_connection *ic = conn->c_transport_data;
 652        struct rds_iw_incoming *iwinc = ic->i_iwinc;
 653        struct rds_header *ihdr, *hdr;
 654
 655        /* XXX shut down the connection if port 0,0 are seen? */
 656
 657        rdsdebug("ic %p iwinc %p recv %p byte len %u\n", ic, iwinc, recv,
 658                 byte_len);
 659
 660        if (byte_len < sizeof(struct rds_header)) {
 661                rds_iw_conn_error(conn, "incoming message "
 662                       "from %pI4 didn't inclue a "
 663                       "header, disconnecting and "
 664                       "reconnecting\n",
 665                       &conn->c_faddr);
 666                return;
 667        }
 668        byte_len -= sizeof(struct rds_header);
 669
 670        ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
 671
 672        /* Validate the checksum. */
 673        if (!rds_message_verify_checksum(ihdr)) {
 674                rds_iw_conn_error(conn, "incoming message "
 675                       "from %pI4 has corrupted header - "
 676                       "forcing a reconnect\n",
 677                       &conn->c_faddr);
 678                rds_stats_inc(s_recv_drop_bad_checksum);
 679                return;
 680        }
 681
 682        /* Process the ACK sequence which comes with every packet */
 683        state->ack_recv = be64_to_cpu(ihdr->h_ack);
 684        state->ack_recv_valid = 1;
 685
 686        /* Process the credits update if there was one */
 687        if (ihdr->h_credit)
 688                rds_iw_send_add_credits(conn, ihdr->h_credit);
 689
 690        if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
 691                /* This is an ACK-only packet. The fact that it gets
 692                 * special treatment here is that historically, ACKs
 693                 * were rather special beasts.
 694                 */
 695                rds_iw_stats_inc(s_iw_ack_received);
 696
 697                /*
 698                 * Usually the frags make their way on to incs and are then freed as
 699                 * the inc is freed.  We don't go that route, so we have to drop the
 700                 * page ref ourselves.  We can't just leave the page on the recv
 701                 * because that confuses the dma mapping of pages and each recv's use
 702                 * of a partial page.  We can leave the frag, though, it will be
 703                 * reused.
 704                 *
 705                 * FIXME: Fold this into the code path below.
 706                 */
 707                rds_iw_frag_drop_page(recv->r_frag);
 708                return;
 709        }
 710
 711        /*
 712         * If we don't already have an inc on the connection then this
 713         * fragment has a header and starts a message.. copy its header
 714         * into the inc and save the inc so we can hang upcoming fragments
 715         * off its list.
 716         */
 717        if (iwinc == NULL) {
 718                iwinc = recv->r_iwinc;
 719                recv->r_iwinc = NULL;
 720                ic->i_iwinc = iwinc;
 721
 722                hdr = &iwinc->ii_inc.i_hdr;
 723                memcpy(hdr, ihdr, sizeof(*hdr));
 724                ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
 725
 726                rdsdebug("ic %p iwinc %p rem %u flag 0x%x\n", ic, iwinc,
 727                         ic->i_recv_data_rem, hdr->h_flags);
 728        } else {
 729                hdr = &iwinc->ii_inc.i_hdr;
 730                /* We can't just use memcmp here; fragments of a
 731                 * single message may carry different ACKs */
 732                if (hdr->h_sequence != ihdr->h_sequence
 733                 || hdr->h_len != ihdr->h_len
 734                 || hdr->h_sport != ihdr->h_sport
 735                 || hdr->h_dport != ihdr->h_dport) {
 736                        rds_iw_conn_error(conn,
 737                                "fragment header mismatch; forcing reconnect\n");
 738                        return;
 739                }
 740        }
 741
 742        list_add_tail(&recv->r_frag->f_item, &iwinc->ii_frags);
 743        recv->r_frag = NULL;
 744
 745        if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
 746                ic->i_recv_data_rem -= RDS_FRAG_SIZE;
 747        else {
 748                ic->i_recv_data_rem = 0;
 749                ic->i_iwinc = NULL;
 750
 751                if (iwinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
 752                        rds_iw_cong_recv(conn, iwinc);
 753                else {
 754                        rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
 755                                          &iwinc->ii_inc, GFP_ATOMIC,
 756                                          KM_SOFTIRQ0);
 757                        state->ack_next = be64_to_cpu(hdr->h_sequence);
 758                        state->ack_next_valid = 1;
 759                }
 760
 761                /* Evaluate the ACK_REQUIRED flag *after* we received
 762                 * the complete frame, and after bumping the next_rx
 763                 * sequence. */
 764                if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) {
 765                        rds_stats_inc(s_recv_ack_required);
 766                        state->ack_required = 1;
 767                }
 768
 769                rds_inc_put(&iwinc->ii_inc);
 770        }
 771}
 772
 773/*
 774 * Plucking the oldest entry from the ring can be done concurrently with
 775 * the thread refilling the ring.  Each ring operation is protected by
 776 * spinlocks and the transient state of refilling doesn't change the
 777 * recording of which entry is oldest.
 778 *
 779 * This relies on IB only calling one cq comp_handler for each cq so that
 780 * there will only be one caller of rds_recv_incoming() per RDS connection.
 781 */
 782void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 783{
 784        struct rds_connection *conn = context;
 785        struct rds_iw_connection *ic = conn->c_transport_data;
 786        struct ib_wc wc;
 787        struct rds_iw_ack_state state = { 0, };
 788        struct rds_iw_recv_work *recv;
 789
 790        rdsdebug("conn %p cq %p\n", conn, cq);
 791
 792        rds_iw_stats_inc(s_iw_rx_cq_call);
 793
 794        ib_req_notify_cq(cq, IB_CQ_SOLICITED);
 795
 796        while (ib_poll_cq(cq, 1, &wc) > 0) {
 797                rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 798                         (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
 799                         be32_to_cpu(wc.ex.imm_data));
 800                rds_iw_stats_inc(s_iw_rx_cq_event);
 801
 802                recv = &ic->i_recvs[rds_iw_ring_oldest(&ic->i_recv_ring)];
 803
 804                rds_iw_recv_unmap_page(ic, recv);
 805
 806                /*
 807                 * Also process recvs in connecting state because it is possible
 808                 * to get a recv completion _before_ the rdmacm ESTABLISHED
 809                 * event is processed.
 810                 */
 811                if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
 812                        /* We expect errors as the qp is drained during shutdown */
 813                        if (wc.status == IB_WC_SUCCESS) {
 814                                rds_iw_process_recv(conn, recv, wc.byte_len, &state);
 815                        } else {
 816                                rds_iw_conn_error(conn, "recv completion on "
 817                                       "%pI4 had status %u, disconnecting and "
 818                                       "reconnecting\n", &conn->c_faddr,
 819                                       wc.status);
 820                        }
 821                }
 822
 823                rds_iw_ring_free(&ic->i_recv_ring, 1);
 824        }
 825
 826        if (state.ack_next_valid)
 827                rds_iw_set_ack(ic, state.ack_next, state.ack_required);
 828        if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
 829                rds_send_drop_acked(conn, state.ack_recv, NULL);
 830                ic->i_ack_recv = state.ack_recv;
 831        }
 832        if (rds_conn_up(conn))
 833                rds_iw_attempt_ack(ic);
 834
 835        /* If we ever end up with a really empty receive ring, we're
 836         * in deep trouble, as the sender will definitely see RNR
 837         * timeouts. */
 838        if (rds_iw_ring_empty(&ic->i_recv_ring))
 839                rds_iw_stats_inc(s_iw_rx_ring_empty);
 840
 841        /*
 842         * If the ring is running low, then schedule the thread to refill.
 843         */
 844        if (rds_iw_ring_low(&ic->i_recv_ring))
 845                queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
 846}
 847
 848int rds_iw_recv(struct rds_connection *conn)
 849{
 850        struct rds_iw_connection *ic = conn->c_transport_data;
 851        int ret = 0;
 852
 853        rdsdebug("conn %p\n", conn);
 854
 855        /*
 856         * If we get a temporary posting failure in this context then
 857         * we're really low and we want the caller to back off for a bit.
 858         */
 859        mutex_lock(&ic->i_recv_mutex);
 860        if (rds_iw_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
 861                ret = -ENOMEM;
 862        else
 863                rds_iw_stats_inc(s_iw_rx_refill_from_thread);
 864        mutex_unlock(&ic->i_recv_mutex);
 865
 866        if (rds_conn_up(conn))
 867                rds_iw_attempt_ack(ic);
 868
 869        return ret;
 870}
 871
 872int __init rds_iw_recv_init(void)
 873{
 874        struct sysinfo si;
 875        int ret = -ENOMEM;
 876
 877        /* Default to 30% of all available RAM for recv memory */
 878        si_meminfo(&si);
 879        rds_iw_sysctl_max_recv_allocation = si.totalram / 3 * PAGE_SIZE / RDS_FRAG_SIZE;
 880
 881        rds_iw_incoming_slab = kmem_cache_create("rds_iw_incoming",
 882                                        sizeof(struct rds_iw_incoming),
 883                                        0, 0, NULL);
 884        if (rds_iw_incoming_slab == NULL)
 885                goto out;
 886
 887        rds_iw_frag_slab = kmem_cache_create("rds_iw_frag",
 888                                        sizeof(struct rds_page_frag),
 889                                        0, 0, NULL);
 890        if (rds_iw_frag_slab == NULL)
 891                kmem_cache_destroy(rds_iw_incoming_slab);
 892        else
 893                ret = 0;
 894out:
 895        return ret;
 896}
 897
 898void rds_iw_recv_exit(void)
 899{
 900        kmem_cache_destroy(rds_iw_incoming_slab);
 901        kmem_cache_destroy(rds_iw_frag_slab);
 902}
 903