linux/net/rds/iw_recv.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/slab.h>
  35#include <linux/pci.h>
  36#include <linux/dma-mapping.h>
  37#include <rdma/rdma_cm.h>
  38
  39#include "rds.h"
  40#include "iw.h"
  41
  42static struct kmem_cache *rds_iw_incoming_slab;
  43static struct kmem_cache *rds_iw_frag_slab;
  44static atomic_t rds_iw_allocation = ATOMIC_INIT(0);
  45
  46static void rds_iw_frag_drop_page(struct rds_page_frag *frag)
  47{
  48        rdsdebug("frag %p page %p\n", frag, frag->f_page);
  49        __free_page(frag->f_page);
  50        frag->f_page = NULL;
  51}
  52
  53static void rds_iw_frag_free(struct rds_page_frag *frag)
  54{
  55        rdsdebug("frag %p page %p\n", frag, frag->f_page);
  56        BUG_ON(frag->f_page);
  57        kmem_cache_free(rds_iw_frag_slab, frag);
  58}
  59
  60/*
  61 * We map a page at a time.  Its fragments are posted in order.  This
  62 * is called in fragment order as the fragments get send completion events.
  63 * Only the last frag in the page performs the unmapping.
  64 *
  65 * It's OK for ring cleanup to call this in whatever order it likes because
  66 * DMA is not in flight and so we can unmap while other ring entries still
  67 * hold page references in their frags.
  68 */
  69static void rds_iw_recv_unmap_page(struct rds_iw_connection *ic,
  70                                   struct rds_iw_recv_work *recv)
  71{
  72        struct rds_page_frag *frag = recv->r_frag;
  73
  74        rdsdebug("recv %p frag %p page %p\n", recv, frag, frag->f_page);
  75        if (frag->f_mapped)
  76                ib_dma_unmap_page(ic->i_cm_id->device,
  77                               frag->f_mapped,
  78                               RDS_FRAG_SIZE, DMA_FROM_DEVICE);
  79        frag->f_mapped = 0;
  80}
  81
  82void rds_iw_recv_init_ring(struct rds_iw_connection *ic)
  83{
  84        struct rds_iw_recv_work *recv;
  85        u32 i;
  86
  87        for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
  88                struct ib_sge *sge;
  89
  90                recv->r_iwinc = NULL;
  91                recv->r_frag = NULL;
  92
  93                recv->r_wr.next = NULL;
  94                recv->r_wr.wr_id = i;
  95                recv->r_wr.sg_list = recv->r_sge;
  96                recv->r_wr.num_sge = RDS_IW_RECV_SGE;
  97
  98                sge = rds_iw_data_sge(ic, recv->r_sge);
  99                sge->addr = 0;
 100                sge->length = RDS_FRAG_SIZE;
 101                sge->lkey = 0;
 102
 103                sge = rds_iw_header_sge(ic, recv->r_sge);
 104                sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
 105                sge->length = sizeof(struct rds_header);
 106                sge->lkey = 0;
 107        }
 108}
 109
 110static void rds_iw_recv_clear_one(struct rds_iw_connection *ic,
 111                                  struct rds_iw_recv_work *recv)
 112{
 113        if (recv->r_iwinc) {
 114                rds_inc_put(&recv->r_iwinc->ii_inc);
 115                recv->r_iwinc = NULL;
 116        }
 117        if (recv->r_frag) {
 118                rds_iw_recv_unmap_page(ic, recv);
 119                if (recv->r_frag->f_page)
 120                        rds_iw_frag_drop_page(recv->r_frag);
 121                rds_iw_frag_free(recv->r_frag);
 122                recv->r_frag = NULL;
 123        }
 124}
 125
 126void rds_iw_recv_clear_ring(struct rds_iw_connection *ic)
 127{
 128        u32 i;
 129
 130        for (i = 0; i < ic->i_recv_ring.w_nr; i++)
 131                rds_iw_recv_clear_one(ic, &ic->i_recvs[i]);
 132
 133        if (ic->i_frag.f_page)
 134                rds_iw_frag_drop_page(&ic->i_frag);
 135}
 136
 137static int rds_iw_recv_refill_one(struct rds_connection *conn,
 138                                  struct rds_iw_recv_work *recv,
 139                                  gfp_t kptr_gfp, gfp_t page_gfp)
 140{
 141        struct rds_iw_connection *ic = conn->c_transport_data;
 142        dma_addr_t dma_addr;
 143        struct ib_sge *sge;
 144        int ret = -ENOMEM;
 145
 146        if (!recv->r_iwinc) {
 147                if (!atomic_add_unless(&rds_iw_allocation, 1, rds_iw_sysctl_max_recv_allocation)) {
 148                        rds_iw_stats_inc(s_iw_rx_alloc_limit);
 149                        goto out;
 150                }
 151                recv->r_iwinc = kmem_cache_alloc(rds_iw_incoming_slab,
 152                                                 kptr_gfp);
 153                if (!recv->r_iwinc) {
 154                        atomic_dec(&rds_iw_allocation);
 155                        goto out;
 156                }
 157                INIT_LIST_HEAD(&recv->r_iwinc->ii_frags);
 158                rds_inc_init(&recv->r_iwinc->ii_inc, conn, conn->c_faddr);
 159        }
 160
 161        if (!recv->r_frag) {
 162                recv->r_frag = kmem_cache_alloc(rds_iw_frag_slab, kptr_gfp);
 163                if (!recv->r_frag)
 164                        goto out;
 165                INIT_LIST_HEAD(&recv->r_frag->f_item);
 166                recv->r_frag->f_page = NULL;
 167        }
 168
 169        if (!ic->i_frag.f_page) {
 170                ic->i_frag.f_page = alloc_page(page_gfp);
 171                if (!ic->i_frag.f_page)
 172                        goto out;
 173                ic->i_frag.f_offset = 0;
 174        }
 175
 176        dma_addr = ib_dma_map_page(ic->i_cm_id->device,
 177                                  ic->i_frag.f_page,
 178                                  ic->i_frag.f_offset,
 179                                  RDS_FRAG_SIZE,
 180                                  DMA_FROM_DEVICE);
 181        if (ib_dma_mapping_error(ic->i_cm_id->device, dma_addr))
 182                goto out;
 183
 184        /*
 185         * Once we get the RDS_PAGE_LAST_OFF frag then rds_iw_frag_unmap()
 186         * must be called on this recv.  This happens as completions hit
 187         * in order or on connection shutdown.
 188         */
 189        recv->r_frag->f_page = ic->i_frag.f_page;
 190        recv->r_frag->f_offset = ic->i_frag.f_offset;
 191        recv->r_frag->f_mapped = dma_addr;
 192
 193        sge = rds_iw_data_sge(ic, recv->r_sge);
 194        sge->addr = dma_addr;
 195        sge->length = RDS_FRAG_SIZE;
 196
 197        sge = rds_iw_header_sge(ic, recv->r_sge);
 198        sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
 199        sge->length = sizeof(struct rds_header);
 200
 201        get_page(recv->r_frag->f_page);
 202
 203        if (ic->i_frag.f_offset < RDS_PAGE_LAST_OFF) {
 204                ic->i_frag.f_offset += RDS_FRAG_SIZE;
 205        } else {
 206                put_page(ic->i_frag.f_page);
 207                ic->i_frag.f_page = NULL;
 208                ic->i_frag.f_offset = 0;
 209        }
 210
 211        ret = 0;
 212out:
 213        return ret;
 214}
 215
 216/*
 217 * This tries to allocate and post unused work requests after making sure that
 218 * they have all the allocations they need to queue received fragments into
 219 * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
 220 * pairs don't go unmatched.
 221 *
 222 * -1 is returned if posting fails due to temporary resource exhaustion.
 223 */
 224int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
 225                       gfp_t page_gfp, int prefill)
 226{
 227        struct rds_iw_connection *ic = conn->c_transport_data;
 228        struct rds_iw_recv_work *recv;
 229        struct ib_recv_wr *failed_wr;
 230        unsigned int posted = 0;
 231        int ret = 0;
 232        u32 pos;
 233
 234        while ((prefill || rds_conn_up(conn)) &&
 235               rds_iw_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
 236                if (pos >= ic->i_recv_ring.w_nr) {
 237                        printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
 238                                        pos);
 239                        ret = -EINVAL;
 240                        break;
 241                }
 242
 243                recv = &ic->i_recvs[pos];
 244                ret = rds_iw_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
 245                if (ret) {
 246                        ret = -1;
 247                        break;
 248                }
 249
 250                /* XXX when can this fail? */
 251                ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
 252                rdsdebug("recv %p iwinc %p page %p addr %lu ret %d\n", recv,
 253                         recv->r_iwinc, recv->r_frag->f_page,
 254                         (long) recv->r_frag->f_mapped, ret);
 255                if (ret) {
 256                        rds_iw_conn_error(conn, "recv post on "
 257                               "%pI4 returned %d, disconnecting and "
 258                               "reconnecting\n", &conn->c_faddr,
 259                               ret);
 260                        ret = -1;
 261                        break;
 262                }
 263
 264                posted++;
 265        }
 266
 267        /* We're doing flow control - update the window. */
 268        if (ic->i_flowctl && posted)
 269                rds_iw_advertise_credits(conn, posted);
 270
 271        if (ret)
 272                rds_iw_ring_unalloc(&ic->i_recv_ring, 1);
 273        return ret;
 274}
 275
 276static void rds_iw_inc_purge(struct rds_incoming *inc)
 277{
 278        struct rds_iw_incoming *iwinc;
 279        struct rds_page_frag *frag;
 280        struct rds_page_frag *pos;
 281
 282        iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
 283        rdsdebug("purging iwinc %p inc %p\n", iwinc, inc);
 284
 285        list_for_each_entry_safe(frag, pos, &iwinc->ii_frags, f_item) {
 286                list_del_init(&frag->f_item);
 287                rds_iw_frag_drop_page(frag);
 288                rds_iw_frag_free(frag);
 289        }
 290}
 291
 292void rds_iw_inc_free(struct rds_incoming *inc)
 293{
 294        struct rds_iw_incoming *iwinc;
 295
 296        iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
 297
 298        rds_iw_inc_purge(inc);
 299        rdsdebug("freeing iwinc %p inc %p\n", iwinc, inc);
 300        BUG_ON(!list_empty(&iwinc->ii_frags));
 301        kmem_cache_free(rds_iw_incoming_slab, iwinc);
 302        atomic_dec(&rds_iw_allocation);
 303        BUG_ON(atomic_read(&rds_iw_allocation) < 0);
 304}
 305
 306int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
 307                            size_t size)
 308{
 309        struct rds_iw_incoming *iwinc;
 310        struct rds_page_frag *frag;
 311        struct iovec *iov = first_iov;
 312        unsigned long to_copy;
 313        unsigned long frag_off = 0;
 314        unsigned long iov_off = 0;
 315        int copied = 0;
 316        int ret;
 317        u32 len;
 318
 319        iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
 320        frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
 321        len = be32_to_cpu(inc->i_hdr.h_len);
 322
 323        while (copied < size && copied < len) {
 324                if (frag_off == RDS_FRAG_SIZE) {
 325                        frag = list_entry(frag->f_item.next,
 326                                          struct rds_page_frag, f_item);
 327                        frag_off = 0;
 328                }
 329                while (iov_off == iov->iov_len) {
 330                        iov_off = 0;
 331                        iov++;
 332                }
 333
 334                to_copy = min(iov->iov_len - iov_off, RDS_FRAG_SIZE - frag_off);
 335                to_copy = min_t(size_t, to_copy, size - copied);
 336                to_copy = min_t(unsigned long, to_copy, len - copied);
 337
 338                rdsdebug("%lu bytes to user [%p, %zu] + %lu from frag "
 339                         "[%p, %lu] + %lu\n",
 340                         to_copy, iov->iov_base, iov->iov_len, iov_off,
 341                         frag->f_page, frag->f_offset, frag_off);
 342
 343                /* XXX needs + offset for multiple recvs per page */
 344                ret = rds_page_copy_to_user(frag->f_page,
 345                                            frag->f_offset + frag_off,
 346                                            iov->iov_base + iov_off,
 347                                            to_copy);
 348                if (ret) {
 349                        copied = ret;
 350                        break;
 351                }
 352
 353                iov_off += to_copy;
 354                frag_off += to_copy;
 355                copied += to_copy;
 356        }
 357
 358        return copied;
 359}
 360
 361/* ic starts out kzalloc()ed */
 362void rds_iw_recv_init_ack(struct rds_iw_connection *ic)
 363{
 364        struct ib_send_wr *wr = &ic->i_ack_wr;
 365        struct ib_sge *sge = &ic->i_ack_sge;
 366
 367        sge->addr = ic->i_ack_dma;
 368        sge->length = sizeof(struct rds_header);
 369        sge->lkey = rds_iw_local_dma_lkey(ic);
 370
 371        wr->sg_list = sge;
 372        wr->num_sge = 1;
 373        wr->opcode = IB_WR_SEND;
 374        wr->wr_id = RDS_IW_ACK_WR_ID;
 375        wr->send_flags = IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 376}
 377
 378/*
 379 * You'd think that with reliable IB connections you wouldn't need to ack
 380 * messages that have been received.  The problem is that IB hardware generates
 381 * an ack message before it has DMAed the message into memory.  This creates a
 382 * potential message loss if the HCA is disabled for any reason between when it
 383 * sends the ack and before the message is DMAed and processed.  This is only a
 384 * potential issue if another HCA is available for fail-over.
 385 *
 386 * When the remote host receives our ack they'll free the sent message from
 387 * their send queue.  To decrease the latency of this we always send an ack
 388 * immediately after we've received messages.
 389 *
 390 * For simplicity, we only have one ack in flight at a time.  This puts
 391 * pressure on senders to have deep enough send queues to absorb the latency of
 392 * a single ack frame being in flight.  This might not be good enough.
 393 *
 394 * This is implemented by have a long-lived send_wr and sge which point to a
 395 * statically allocated ack frame.  This ack wr does not fall under the ring
 396 * accounting that the tx and rx wrs do.  The QP attribute specifically makes
 397 * room for it beyond the ring size.  Send completion notices its special
 398 * wr_id and avoids working with the ring in that case.
 399 */
 400#ifndef KERNEL_HAS_ATOMIC64
 401static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
 402                                int ack_required)
 403{
 404        unsigned long flags;
 405
 406        spin_lock_irqsave(&ic->i_ack_lock, flags);
 407        ic->i_ack_next = seq;
 408        if (ack_required)
 409                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 410        spin_unlock_irqrestore(&ic->i_ack_lock, flags);
 411}
 412
 413static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
 414{
 415        unsigned long flags;
 416        u64 seq;
 417
 418        clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 419
 420        spin_lock_irqsave(&ic->i_ack_lock, flags);
 421        seq = ic->i_ack_next;
 422        spin_unlock_irqrestore(&ic->i_ack_lock, flags);
 423
 424        return seq;
 425}
 426#else
 427static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
 428                                int ack_required)
 429{
 430        atomic64_set(&ic->i_ack_next, seq);
 431        if (ack_required) {
 432                smp_mb__before_clear_bit();
 433                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 434        }
 435}
 436
 437static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
 438{
 439        clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 440        smp_mb__after_clear_bit();
 441
 442        return atomic64_read(&ic->i_ack_next);
 443}
 444#endif
 445
 446
 447static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credits)
 448{
 449        struct rds_header *hdr = ic->i_ack;
 450        struct ib_send_wr *failed_wr;
 451        u64 seq;
 452        int ret;
 453
 454        seq = rds_iw_get_ack(ic);
 455
 456        rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
 457        rds_message_populate_header(hdr, 0, 0, 0);
 458        hdr->h_ack = cpu_to_be64(seq);
 459        hdr->h_credit = adv_credits;
 460        rds_message_make_checksum(hdr);
 461        ic->i_ack_queued = jiffies;
 462
 463        ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, &failed_wr);
 464        if (unlikely(ret)) {
 465                /* Failed to send. Release the WR, and
 466                 * force another ACK.
 467                 */
 468                clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
 469                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 470
 471                rds_iw_stats_inc(s_iw_ack_send_failure);
 472
 473                rds_iw_conn_error(ic->conn, "sending ack failed\n");
 474        } else
 475                rds_iw_stats_inc(s_iw_ack_sent);
 476}
 477
 478/*
 479 * There are 3 ways of getting acknowledgements to the peer:
 480 *  1.  We call rds_iw_attempt_ack from the recv completion handler
 481 *      to send an ACK-only frame.
 482 *      However, there can be only one such frame in the send queue
 483 *      at any time, so we may have to postpone it.
 484 *  2.  When another (data) packet is transmitted while there's
 485 *      an ACK in the queue, we piggyback the ACK sequence number
 486 *      on the data packet.
 487 *  3.  If the ACK WR is done sending, we get called from the
 488 *      send queue completion handler, and check whether there's
 489 *      another ACK pending (postponed because the WR was on the
 490 *      queue). If so, we transmit it.
 491 *
 492 * We maintain 2 variables:
 493 *  -   i_ack_flags, which keeps track of whether the ACK WR
 494 *      is currently in the send queue or not (IB_ACK_IN_FLIGHT)
 495 *  -   i_ack_next, which is the last sequence number we received
 496 *
 497 * Potentially, send queue and receive queue handlers can run concurrently.
 498 * It would be nice to not have to use a spinlock to synchronize things,
 499 * but the one problem that rules this out is that 64bit updates are
 500 * not atomic on all platforms. Things would be a lot simpler if
 501 * we had atomic64 or maybe cmpxchg64 everywhere.
 502 *
 503 * Reconnecting complicates this picture just slightly. When we
 504 * reconnect, we may be seeing duplicate packets. The peer
 505 * is retransmitting them, because it hasn't seen an ACK for
 506 * them. It is important that we ACK these.
 507 *
 508 * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
 509 * this flag set *MUST* be acknowledged immediately.
 510 */
 511
 512/*
 513 * When we get here, we're called from the recv queue handler.
 514 * Check whether we ought to transmit an ACK.
 515 */
 516void rds_iw_attempt_ack(struct rds_iw_connection *ic)
 517{
 518        unsigned int adv_credits;
 519
 520        if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
 521                return;
 522
 523        if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
 524                rds_iw_stats_inc(s_iw_ack_send_delayed);
 525                return;
 526        }
 527
 528        /* Can we get a send credit? */
 529        if (!rds_iw_send_grab_credits(ic, 1, &adv_credits, 0, RDS_MAX_ADV_CREDIT)) {
 530                rds_iw_stats_inc(s_iw_tx_throttle);
 531                clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
 532                return;
 533        }
 534
 535        clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 536        rds_iw_send_ack(ic, adv_credits);
 537}
 538
 539/*
 540 * We get here from the send completion handler, when the
 541 * adapter tells us the ACK frame was sent.
 542 */
 543void rds_iw_ack_send_complete(struct rds_iw_connection *ic)
 544{
 545        clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
 546        rds_iw_attempt_ack(ic);
 547}
 548
 549/*
 550 * This is called by the regular xmit code when it wants to piggyback
 551 * an ACK on an outgoing frame.
 552 */
 553u64 rds_iw_piggyb_ack(struct rds_iw_connection *ic)
 554{
 555        if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
 556                rds_iw_stats_inc(s_iw_ack_send_piggybacked);
 557        return rds_iw_get_ack(ic);
 558}
 559
 560/*
 561 * It's kind of lame that we're copying from the posted receive pages into
 562 * long-lived bitmaps.  We could have posted the bitmaps and rdma written into
 563 * them.  But receiving new congestion bitmaps should be a *rare* event, so
 564 * hopefully we won't need to invest that complexity in making it more
 565 * efficient.  By copying we can share a simpler core with TCP which has to
 566 * copy.
 567 */
 568static void rds_iw_cong_recv(struct rds_connection *conn,
 569                              struct rds_iw_incoming *iwinc)
 570{
 571        struct rds_cong_map *map;
 572        unsigned int map_off;
 573        unsigned int map_page;
 574        struct rds_page_frag *frag;
 575        unsigned long frag_off;
 576        unsigned long to_copy;
 577        unsigned long copied;
 578        uint64_t uncongested = 0;
 579        void *addr;
 580
 581        /* catch completely corrupt packets */
 582        if (be32_to_cpu(iwinc->ii_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
 583                return;
 584
 585        map = conn->c_fcong;
 586        map_page = 0;
 587        map_off = 0;
 588
 589        frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
 590        frag_off = 0;
 591
 592        copied = 0;
 593
 594        while (copied < RDS_CONG_MAP_BYTES) {
 595                uint64_t *src, *dst;
 596                unsigned int k;
 597
 598                to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
 599                BUG_ON(to_copy & 7); /* Must be 64bit aligned. */
 600
 601                addr = kmap_atomic(frag->f_page, KM_SOFTIRQ0);
 602
 603                src = addr + frag_off;
 604                dst = (void *)map->m_page_addrs[map_page] + map_off;
 605                for (k = 0; k < to_copy; k += 8) {
 606                        /* Record ports that became uncongested, ie
 607                         * bits that changed from 0 to 1. */
 608                        uncongested |= ~(*src) & *dst;
 609                        *dst++ = *src++;
 610                }
 611                kunmap_atomic(addr, KM_SOFTIRQ0);
 612
 613                copied += to_copy;
 614
 615                map_off += to_copy;
 616                if (map_off == PAGE_SIZE) {
 617                        map_off = 0;
 618                        map_page++;
 619                }
 620
 621                frag_off += to_copy;
 622                if (frag_off == RDS_FRAG_SIZE) {
 623                        frag = list_entry(frag->f_item.next,
 624                                          struct rds_page_frag, f_item);
 625                        frag_off = 0;
 626                }
 627        }
 628
 629        /* the congestion map is in little endian order */
 630        uncongested = le64_to_cpu(uncongested);
 631
 632        rds_cong_map_updated(map, uncongested);
 633}
 634
 635/*
 636 * Rings are posted with all the allocations they'll need to queue the
 637 * incoming message to the receiving socket so this can't fail.
 638 * All fragments start with a header, so we can make sure we're not receiving
 639 * garbage, and we can tell a small 8 byte fragment from an ACK frame.
 640 */
 641struct rds_iw_ack_state {
 642        u64             ack_next;
 643        u64             ack_recv;
 644        unsigned int    ack_required:1;
 645        unsigned int    ack_next_valid:1;
 646        unsigned int    ack_recv_valid:1;
 647};
 648
 649static void rds_iw_process_recv(struct rds_connection *conn,
 650                                struct rds_iw_recv_work *recv, u32 byte_len,
 651                                struct rds_iw_ack_state *state)
 652{
 653        struct rds_iw_connection *ic = conn->c_transport_data;
 654        struct rds_iw_incoming *iwinc = ic->i_iwinc;
 655        struct rds_header *ihdr, *hdr;
 656
 657        /* XXX shut down the connection if port 0,0 are seen? */
 658
 659        rdsdebug("ic %p iwinc %p recv %p byte len %u\n", ic, iwinc, recv,
 660                 byte_len);
 661
 662        if (byte_len < sizeof(struct rds_header)) {
 663                rds_iw_conn_error(conn, "incoming message "
 664                       "from %pI4 didn't inclue a "
 665                       "header, disconnecting and "
 666                       "reconnecting\n",
 667                       &conn->c_faddr);
 668                return;
 669        }
 670        byte_len -= sizeof(struct rds_header);
 671
 672        ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
 673
 674        /* Validate the checksum. */
 675        if (!rds_message_verify_checksum(ihdr)) {
 676                rds_iw_conn_error(conn, "incoming message "
 677                       "from %pI4 has corrupted header - "
 678                       "forcing a reconnect\n",
 679                       &conn->c_faddr);
 680                rds_stats_inc(s_recv_drop_bad_checksum);
 681                return;
 682        }
 683
 684        /* Process the ACK sequence which comes with every packet */
 685        state->ack_recv = be64_to_cpu(ihdr->h_ack);
 686        state->ack_recv_valid = 1;
 687
 688        /* Process the credits update if there was one */
 689        if (ihdr->h_credit)
 690                rds_iw_send_add_credits(conn, ihdr->h_credit);
 691
 692        if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
 693                /* This is an ACK-only packet. The fact that it gets
 694                 * special treatment here is that historically, ACKs
 695                 * were rather special beasts.
 696                 */
 697                rds_iw_stats_inc(s_iw_ack_received);
 698
 699                /*
 700                 * Usually the frags make their way on to incs and are then freed as
 701                 * the inc is freed.  We don't go that route, so we have to drop the
 702                 * page ref ourselves.  We can't just leave the page on the recv
 703                 * because that confuses the dma mapping of pages and each recv's use
 704                 * of a partial page.  We can leave the frag, though, it will be
 705                 * reused.
 706                 *
 707                 * FIXME: Fold this into the code path below.
 708                 */
 709                rds_iw_frag_drop_page(recv->r_frag);
 710                return;
 711        }
 712
 713        /*
 714         * If we don't already have an inc on the connection then this
 715         * fragment has a header and starts a message.. copy its header
 716         * into the inc and save the inc so we can hang upcoming fragments
 717         * off its list.
 718         */
 719        if (!iwinc) {
 720                iwinc = recv->r_iwinc;
 721                recv->r_iwinc = NULL;
 722                ic->i_iwinc = iwinc;
 723
 724                hdr = &iwinc->ii_inc.i_hdr;
 725                memcpy(hdr, ihdr, sizeof(*hdr));
 726                ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
 727
 728                rdsdebug("ic %p iwinc %p rem %u flag 0x%x\n", ic, iwinc,
 729                         ic->i_recv_data_rem, hdr->h_flags);
 730        } else {
 731                hdr = &iwinc->ii_inc.i_hdr;
 732                /* We can't just use memcmp here; fragments of a
 733                 * single message may carry different ACKs */
 734                if (hdr->h_sequence != ihdr->h_sequence ||
 735                    hdr->h_len != ihdr->h_len ||
 736                    hdr->h_sport != ihdr->h_sport ||
 737                    hdr->h_dport != ihdr->h_dport) {
 738                        rds_iw_conn_error(conn,
 739                                "fragment header mismatch; forcing reconnect\n");
 740                        return;
 741                }
 742        }
 743
 744        list_add_tail(&recv->r_frag->f_item, &iwinc->ii_frags);
 745        recv->r_frag = NULL;
 746
 747        if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
 748                ic->i_recv_data_rem -= RDS_FRAG_SIZE;
 749        else {
 750                ic->i_recv_data_rem = 0;
 751                ic->i_iwinc = NULL;
 752
 753                if (iwinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
 754                        rds_iw_cong_recv(conn, iwinc);
 755                else {
 756                        rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
 757                                          &iwinc->ii_inc, GFP_ATOMIC,
 758                                          KM_SOFTIRQ0);
 759                        state->ack_next = be64_to_cpu(hdr->h_sequence);
 760                        state->ack_next_valid = 1;
 761                }
 762
 763                /* Evaluate the ACK_REQUIRED flag *after* we received
 764                 * the complete frame, and after bumping the next_rx
 765                 * sequence. */
 766                if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) {
 767                        rds_stats_inc(s_recv_ack_required);
 768                        state->ack_required = 1;
 769                }
 770
 771                rds_inc_put(&iwinc->ii_inc);
 772        }
 773}
 774
 775/*
 776 * Plucking the oldest entry from the ring can be done concurrently with
 777 * the thread refilling the ring.  Each ring operation is protected by
 778 * spinlocks and the transient state of refilling doesn't change the
 779 * recording of which entry is oldest.
 780 *
 781 * This relies on IB only calling one cq comp_handler for each cq so that
 782 * there will only be one caller of rds_recv_incoming() per RDS connection.
 783 */
 784void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 785{
 786        struct rds_connection *conn = context;
 787        struct rds_iw_connection *ic = conn->c_transport_data;
 788
 789        rdsdebug("conn %p cq %p\n", conn, cq);
 790
 791        rds_iw_stats_inc(s_iw_rx_cq_call);
 792
 793        tasklet_schedule(&ic->i_recv_tasklet);
 794}
 795
 796static inline void rds_poll_cq(struct rds_iw_connection *ic,
 797                               struct rds_iw_ack_state *state)
 798{
 799        struct rds_connection *conn = ic->conn;
 800        struct ib_wc wc;
 801        struct rds_iw_recv_work *recv;
 802
 803        while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
 804                rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 805                         (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
 806                         be32_to_cpu(wc.ex.imm_data));
 807                rds_iw_stats_inc(s_iw_rx_cq_event);
 808
 809                recv = &ic->i_recvs[rds_iw_ring_oldest(&ic->i_recv_ring)];
 810
 811                rds_iw_recv_unmap_page(ic, recv);
 812
 813                /*
 814                 * Also process recvs in connecting state because it is possible
 815                 * to get a recv completion _before_ the rdmacm ESTABLISHED
 816                 * event is processed.
 817                 */
 818                if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
 819                        /* We expect errors as the qp is drained during shutdown */
 820                        if (wc.status == IB_WC_SUCCESS) {
 821                                rds_iw_process_recv(conn, recv, wc.byte_len, state);
 822                        } else {
 823                                rds_iw_conn_error(conn, "recv completion on "
 824                                       "%pI4 had status %u, disconnecting and "
 825                                       "reconnecting\n", &conn->c_faddr,
 826                                       wc.status);
 827                        }
 828                }
 829
 830                rds_iw_ring_free(&ic->i_recv_ring, 1);
 831        }
 832}
 833
 834void rds_iw_recv_tasklet_fn(unsigned long data)
 835{
 836        struct rds_iw_connection *ic = (struct rds_iw_connection *) data;
 837        struct rds_connection *conn = ic->conn;
 838        struct rds_iw_ack_state state = { 0, };
 839
 840        rds_poll_cq(ic, &state);
 841        ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
 842        rds_poll_cq(ic, &state);
 843
 844        if (state.ack_next_valid)
 845                rds_iw_set_ack(ic, state.ack_next, state.ack_required);
 846        if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
 847                rds_send_drop_acked(conn, state.ack_recv, NULL);
 848                ic->i_ack_recv = state.ack_recv;
 849        }
 850        if (rds_conn_up(conn))
 851                rds_iw_attempt_ack(ic);
 852
 853        /* If we ever end up with a really empty receive ring, we're
 854         * in deep trouble, as the sender will definitely see RNR
 855         * timeouts. */
 856        if (rds_iw_ring_empty(&ic->i_recv_ring))
 857                rds_iw_stats_inc(s_iw_rx_ring_empty);
 858
 859        /*
 860         * If the ring is running low, then schedule the thread to refill.
 861         */
 862        if (rds_iw_ring_low(&ic->i_recv_ring))
 863                queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
 864}
 865
 866int rds_iw_recv(struct rds_connection *conn)
 867{
 868        struct rds_iw_connection *ic = conn->c_transport_data;
 869        int ret = 0;
 870
 871        rdsdebug("conn %p\n", conn);
 872
 873        /*
 874         * If we get a temporary posting failure in this context then
 875         * we're really low and we want the caller to back off for a bit.
 876         */
 877        mutex_lock(&ic->i_recv_mutex);
 878        if (rds_iw_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
 879                ret = -ENOMEM;
 880        else
 881                rds_iw_stats_inc(s_iw_rx_refill_from_thread);
 882        mutex_unlock(&ic->i_recv_mutex);
 883
 884        if (rds_conn_up(conn))
 885                rds_iw_attempt_ack(ic);
 886
 887        return ret;
 888}
 889
 890int rds_iw_recv_init(void)
 891{
 892        struct sysinfo si;
 893        int ret = -ENOMEM;
 894
 895        /* Default to 30% of all available RAM for recv memory */
 896        si_meminfo(&si);
 897        rds_iw_sysctl_max_recv_allocation = si.totalram / 3 * PAGE_SIZE / RDS_FRAG_SIZE;
 898
 899        rds_iw_incoming_slab = kmem_cache_create("rds_iw_incoming",
 900                                        sizeof(struct rds_iw_incoming),
 901                                        0, 0, NULL);
 902        if (!rds_iw_incoming_slab)
 903                goto out;
 904
 905        rds_iw_frag_slab = kmem_cache_create("rds_iw_frag",
 906                                        sizeof(struct rds_page_frag),
 907                                        0, 0, NULL);
 908        if (!rds_iw_frag_slab)
 909                kmem_cache_destroy(rds_iw_incoming_slab);
 910        else
 911                ret = 0;
 912out:
 913        return ret;
 914}
 915
 916void rds_iw_recv_exit(void)
 917{
 918        kmem_cache_destroy(rds_iw_incoming_slab);
 919        kmem_cache_destroy(rds_iw_frag_slab);
 920}
 921