linux/net/rds/ib_send.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/in.h>
  35#include <linux/device.h>
  36#include <linux/dmapool.h>
  37#include <linux/ratelimit.h>
  38
  39#include "rds_single_path.h"
  40#include "rds.h"
  41#include "ib.h"
  42
  43/*
  44 * Convert IB-specific error message to RDS error message and call core
  45 * completion handler.
  46 */
  47static void rds_ib_send_complete(struct rds_message *rm,
  48                                 int wc_status,
  49                                 void (*complete)(struct rds_message *rm, int status))
  50{
  51        int notify_status;
  52
  53        switch (wc_status) {
  54        case IB_WC_WR_FLUSH_ERR:
  55                return;
  56
  57        case IB_WC_SUCCESS:
  58                notify_status = RDS_RDMA_SUCCESS;
  59                break;
  60
  61        case IB_WC_REM_ACCESS_ERR:
  62                notify_status = RDS_RDMA_REMOTE_ERROR;
  63                break;
  64
  65        default:
  66                notify_status = RDS_RDMA_OTHER_ERROR;
  67                break;
  68        }
  69        complete(rm, notify_status);
  70}
  71
  72static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
  73                                   struct rm_rdma_op *op,
  74                                   int wc_status)
  75{
  76        if (op->op_mapped) {
  77                ib_dma_unmap_sg(ic->i_cm_id->device,
  78                                op->op_sg, op->op_nents,
  79                                op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
  80                op->op_mapped = 0;
  81        }
  82
  83        /* If the user asked for a completion notification on this
  84         * message, we can implement three different semantics:
  85         *  1.  Notify when we received the ACK on the RDS message
  86         *      that was queued with the RDMA. This provides reliable
  87         *      notification of RDMA status at the expense of a one-way
  88         *      packet delay.
  89         *  2.  Notify when the IB stack gives us the completion event for
  90         *      the RDMA operation.
  91         *  3.  Notify when the IB stack gives us the completion event for
  92         *      the accompanying RDS messages.
  93         * Here, we implement approach #3. To implement approach #2,
  94         * we would need to take an event for the rdma WR. To implement #1,
  95         * don't call rds_rdma_send_complete at all, and fall back to the notify
  96         * handling in the ACK processing code.
  97         *
  98         * Note: There's no need to explicitly sync any RDMA buffers using
  99         * ib_dma_sync_sg_for_cpu - the completion for the RDMA
 100         * operation itself unmapped the RDMA buffers, which takes care
 101         * of synching.
 102         */
 103        rds_ib_send_complete(container_of(op, struct rds_message, rdma),
 104                             wc_status, rds_rdma_send_complete);
 105
 106        if (op->op_write)
 107                rds_stats_add(s_send_rdma_bytes, op->op_bytes);
 108        else
 109                rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
 110}
 111
 112static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
 113                                     struct rm_atomic_op *op,
 114                                     int wc_status)
 115{
 116        /* unmap atomic recvbuf */
 117        if (op->op_mapped) {
 118                ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
 119                                DMA_FROM_DEVICE);
 120                op->op_mapped = 0;
 121        }
 122
 123        rds_ib_send_complete(container_of(op, struct rds_message, atomic),
 124                             wc_status, rds_atomic_send_complete);
 125
 126        if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
 127                rds_ib_stats_inc(s_ib_atomic_cswp);
 128        else
 129                rds_ib_stats_inc(s_ib_atomic_fadd);
 130}
 131
 132static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
 133                                   struct rm_data_op *op,
 134                                   int wc_status)
 135{
 136        struct rds_message *rm = container_of(op, struct rds_message, data);
 137
 138        if (op->op_nents)
 139                ib_dma_unmap_sg(ic->i_cm_id->device,
 140                                op->op_sg, op->op_nents,
 141                                DMA_TO_DEVICE);
 142
 143        if (rm->rdma.op_active && rm->data.op_notify)
 144                rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status);
 145}
 146
 147/*
 148 * Unmap the resources associated with a struct send_work.
 149 *
 150 * Returns the rm for no good reason other than it is unobtainable
 151 * other than by switching on wr.opcode, currently, and the caller,
 152 * the event handler, needs it.
 153 */
 154static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
 155                                                struct rds_ib_send_work *send,
 156                                                int wc_status)
 157{
 158        struct rds_message *rm = NULL;
 159
 160        /* In the error case, wc.opcode sometimes contains garbage */
 161        switch (send->s_wr.opcode) {
 162        case IB_WR_SEND:
 163                if (send->s_op) {
 164                        rm = container_of(send->s_op, struct rds_message, data);
 165                        rds_ib_send_unmap_data(ic, send->s_op, wc_status);
 166                }
 167                break;
 168        case IB_WR_RDMA_WRITE:
 169        case IB_WR_RDMA_READ:
 170                if (send->s_op) {
 171                        rm = container_of(send->s_op, struct rds_message, rdma);
 172                        rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
 173                }
 174                break;
 175        case IB_WR_ATOMIC_FETCH_AND_ADD:
 176        case IB_WR_ATOMIC_CMP_AND_SWP:
 177                if (send->s_op) {
 178                        rm = container_of(send->s_op, struct rds_message, atomic);
 179                        rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
 180                }
 181                break;
 182        default:
 183                printk_ratelimited(KERN_NOTICE
 184                               "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
 185                               __func__, send->s_wr.opcode);
 186                break;
 187        }
 188
 189        send->s_wr.opcode = 0xdead;
 190
 191        return rm;
 192}
 193
 194void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 195{
 196        struct rds_ib_send_work *send;
 197        u32 i;
 198
 199        for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
 200                struct ib_sge *sge;
 201
 202                send->s_op = NULL;
 203
 204                send->s_wr.wr_id = i;
 205                send->s_wr.sg_list = send->s_sge;
 206                send->s_wr.ex.imm_data = 0;
 207
 208                sge = &send->s_sge[0];
 209                sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));
 210                sge->length = sizeof(struct rds_header);
 211                sge->lkey = ic->i_pd->local_dma_lkey;
 212
 213                send->s_sge[1].lkey = ic->i_pd->local_dma_lkey;
 214        }
 215}
 216
 217void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
 218{
 219        struct rds_ib_send_work *send;
 220        u32 i;
 221
 222        for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
 223                if (send->s_op && send->s_wr.opcode != 0xdead)
 224                        rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
 225        }
 226}
 227
 228/*
 229 * The only fast path caller always has a non-zero nr, so we don't
 230 * bother testing nr before performing the atomic sub.
 231 */
 232static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
 233{
 234        if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
 235            waitqueue_active(&rds_ib_ring_empty_wait))
 236                wake_up(&rds_ib_ring_empty_wait);
 237        BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
 238}
 239
 240/*
 241 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
 242 * operations performed in the send path.  As the sender allocs and potentially
 243 * unallocs the next free entry in the ring it doesn't alter which is
 244 * the next to be freed, which is what this is concerned with.
 245 */
 246void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
 247{
 248        struct rds_message *rm = NULL;
 249        struct rds_connection *conn = ic->conn;
 250        struct rds_ib_send_work *send;
 251        u32 completed;
 252        u32 oldest;
 253        u32 i = 0;
 254        int nr_sig = 0;
 255
 256
 257        rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
 258                 (unsigned long long)wc->wr_id, wc->status,
 259                 ib_wc_status_msg(wc->status), wc->byte_len,
 260                 be32_to_cpu(wc->ex.imm_data));
 261        rds_ib_stats_inc(s_ib_tx_cq_event);
 262
 263        if (wc->wr_id == RDS_IB_ACK_WR_ID) {
 264                if (time_after(jiffies, ic->i_ack_queued + HZ / 2))
 265                        rds_ib_stats_inc(s_ib_tx_stalled);
 266                rds_ib_ack_send_complete(ic);
 267                return;
 268        }
 269
 270        oldest = rds_ib_ring_oldest(&ic->i_send_ring);
 271
 272        completed = rds_ib_ring_completed(&ic->i_send_ring, wc->wr_id, oldest);
 273
 274        for (i = 0; i < completed; i++) {
 275                send = &ic->i_sends[oldest];
 276                if (send->s_wr.send_flags & IB_SEND_SIGNALED)
 277                        nr_sig++;
 278
 279                rm = rds_ib_send_unmap_op(ic, send, wc->status);
 280
 281                if (time_after(jiffies, send->s_queued + HZ / 2))
 282                        rds_ib_stats_inc(s_ib_tx_stalled);
 283
 284                if (send->s_op) {
 285                        if (send->s_op == rm->m_final_op) {
 286                                /* If anyone waited for this message to get
 287                                 * flushed out, wake them up now
 288                                 */
 289                                rds_message_unmapped(rm);
 290                        }
 291                        rds_message_put(rm);
 292                        send->s_op = NULL;
 293                }
 294
 295                oldest = (oldest + 1) % ic->i_send_ring.w_nr;
 296        }
 297
 298        rds_ib_ring_free(&ic->i_send_ring, completed);
 299        rds_ib_sub_signaled(ic, nr_sig);
 300        nr_sig = 0;
 301
 302        if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
 303            test_bit(0, &conn->c_map_queued))
 304                queue_delayed_work(rds_wq, &conn->c_send_w, 0);
 305
 306        /* We expect errors as the qp is drained during shutdown */
 307        if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
 308                rds_ib_conn_error(conn, "send completion on <%pI4,%pI4> had status %u (%s), disconnecting and reconnecting\n",
 309                                  &conn->c_laddr, &conn->c_faddr, wc->status,
 310                                  ib_wc_status_msg(wc->status));
 311        }
 312}
 313
 314/*
 315 * This is the main function for allocating credits when sending
 316 * messages.
 317 *
 318 * Conceptually, we have two counters:
 319 *  -   send credits: this tells us how many WRs we're allowed
 320 *      to submit without overruning the receiver's queue. For
 321 *      each SEND WR we post, we decrement this by one.
 322 *
 323 *  -   posted credits: this tells us how many WRs we recently
 324 *      posted to the receive queue. This value is transferred
 325 *      to the peer as a "credit update" in a RDS header field.
 326 *      Every time we transmit credits to the peer, we subtract
 327 *      the amount of transferred credits from this counter.
 328 *
 329 * It is essential that we avoid situations where both sides have
 330 * exhausted their send credits, and are unable to send new credits
 331 * to the peer. We achieve this by requiring that we send at least
 332 * one credit update to the peer before exhausting our credits.
 333 * When new credits arrive, we subtract one credit that is withheld
 334 * until we've posted new buffers and are ready to transmit these
 335 * credits (see rds_ib_send_add_credits below).
 336 *
 337 * The RDS send code is essentially single-threaded; rds_send_xmit
 338 * sets RDS_IN_XMIT to ensure exclusive access to the send ring.
 339 * However, the ACK sending code is independent and can race with
 340 * message SENDs.
 341 *
 342 * In the send path, we need to update the counters for send credits
 343 * and the counter of posted buffers atomically - when we use the
 344 * last available credit, we cannot allow another thread to race us
 345 * and grab the posted credits counter.  Hence, we have to use a
 346 * spinlock to protect the credit counter, or use atomics.
 347 *
 348 * Spinlocks shared between the send and the receive path are bad,
 349 * because they create unnecessary delays. An early implementation
 350 * using a spinlock showed a 5% degradation in throughput at some
 351 * loads.
 352 *
 353 * This implementation avoids spinlocks completely, putting both
 354 * counters into a single atomic, and updating that atomic using
 355 * atomic_add (in the receive path, when receiving fresh credits),
 356 * and using atomic_cmpxchg when updating the two counters.
 357 */
 358int rds_ib_send_grab_credits(struct rds_ib_connection *ic,
 359                             u32 wanted, u32 *adv_credits, int need_posted, int max_posted)
 360{
 361        unsigned int avail, posted, got = 0, advertise;
 362        long oldval, newval;
 363
 364        *adv_credits = 0;
 365        if (!ic->i_flowctl)
 366                return wanted;
 367
 368try_again:
 369        advertise = 0;
 370        oldval = newval = atomic_read(&ic->i_credits);
 371        posted = IB_GET_POST_CREDITS(oldval);
 372        avail = IB_GET_SEND_CREDITS(oldval);
 373
 374        rdsdebug("wanted=%u credits=%u posted=%u\n",
 375                        wanted, avail, posted);
 376
 377        /* The last credit must be used to send a credit update. */
 378        if (avail && !posted)
 379                avail--;
 380
 381        if (avail < wanted) {
 382                struct rds_connection *conn = ic->i_cm_id->context;
 383
 384                /* Oops, there aren't that many credits left! */
 385                set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
 386                got = avail;
 387        } else {
 388                /* Sometimes you get what you want, lalala. */
 389                got = wanted;
 390        }
 391        newval -= IB_SET_SEND_CREDITS(got);
 392
 393        /*
 394         * If need_posted is non-zero, then the caller wants
 395         * the posted regardless of whether any send credits are
 396         * available.
 397         */
 398        if (posted && (got || need_posted)) {
 399                advertise = min_t(unsigned int, posted, max_posted);
 400                newval -= IB_SET_POST_CREDITS(advertise);
 401        }
 402
 403        /* Finally bill everything */
 404        if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
 405                goto try_again;
 406
 407        *adv_credits = advertise;
 408        return got;
 409}
 410
 411void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
 412{
 413        struct rds_ib_connection *ic = conn->c_transport_data;
 414
 415        if (credits == 0)
 416                return;
 417
 418        rdsdebug("credits=%u current=%u%s\n",
 419                        credits,
 420                        IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
 421                        test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : "");
 422
 423        atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
 424        if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
 425                queue_delayed_work(rds_wq, &conn->c_send_w, 0);
 426
 427        WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
 428
 429        rds_ib_stats_inc(s_ib_rx_credit_updates);
 430}
 431
 432void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
 433{
 434        struct rds_ib_connection *ic = conn->c_transport_data;
 435
 436        if (posted == 0)
 437                return;
 438
 439        atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
 440
 441        /* Decide whether to send an update to the peer now.
 442         * If we would send a credit update for every single buffer we
 443         * post, we would end up with an ACK storm (ACK arrives,
 444         * consumes buffer, we refill the ring, send ACK to remote
 445         * advertising the newly posted buffer... ad inf)
 446         *
 447         * Performance pretty much depends on how often we send
 448         * credit updates - too frequent updates mean lots of ACKs.
 449         * Too infrequent updates, and the peer will run out of
 450         * credits and has to throttle.
 451         * For the time being, 16 seems to be a good compromise.
 452         */
 453        if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
 454                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 455}
 456
 457static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
 458                                             struct rds_ib_send_work *send,
 459                                             bool notify)
 460{
 461        /*
 462         * We want to delay signaling completions just enough to get
 463         * the batching benefits but not so much that we create dead time
 464         * on the wire.
 465         */
 466        if (ic->i_unsignaled_wrs-- == 0 || notify) {
 467                ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
 468                send->s_wr.send_flags |= IB_SEND_SIGNALED;
 469                return 1;
 470        }
 471        return 0;
 472}
 473
 474/*
 475 * This can be called multiple times for a given message.  The first time
 476 * we see a message we map its scatterlist into the IB device so that
 477 * we can provide that mapped address to the IB scatter gather entries
 478 * in the IB work requests.  We translate the scatterlist into a series
 479 * of work requests that fragment the message.  These work requests complete
 480 * in order so we pass ownership of the message to the completion handler
 481 * once we send the final fragment.
 482 *
 483 * The RDS core uses the c_send_lock to only enter this function once
 484 * per connection.  This makes sure that the tx ring alloc/unalloc pairs
 485 * don't get out of sync and confuse the ring.
 486 */
 487int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 488                unsigned int hdr_off, unsigned int sg, unsigned int off)
 489{
 490        struct rds_ib_connection *ic = conn->c_transport_data;
 491        struct ib_device *dev = ic->i_cm_id->device;
 492        struct rds_ib_send_work *send = NULL;
 493        struct rds_ib_send_work *first;
 494        struct rds_ib_send_work *prev;
 495        struct ib_send_wr *failed_wr;
 496        struct scatterlist *scat;
 497        u32 pos;
 498        u32 i;
 499        u32 work_alloc;
 500        u32 credit_alloc = 0;
 501        u32 posted;
 502        u32 adv_credits = 0;
 503        int send_flags = 0;
 504        int bytes_sent = 0;
 505        int ret;
 506        int flow_controlled = 0;
 507        int nr_sig = 0;
 508
 509        BUG_ON(off % RDS_FRAG_SIZE);
 510        BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
 511
 512        /* Do not send cong updates to IB loopback */
 513        if (conn->c_loopback
 514            && rm->m_inc.i_hdr.h_flags & RDS_FLAG_CONG_BITMAP) {
 515                rds_cong_map_updated(conn->c_fcong, ~(u64) 0);
 516                scat = &rm->data.op_sg[sg];
 517                ret = max_t(int, RDS_CONG_MAP_BYTES, scat->length);
 518                return sizeof(struct rds_header) + ret;
 519        }
 520
 521        /* FIXME we may overallocate here */
 522        if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
 523                i = 1;
 524        else
 525                i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
 526
 527        work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 528        if (work_alloc == 0) {
 529                set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
 530                rds_ib_stats_inc(s_ib_tx_ring_full);
 531                ret = -ENOMEM;
 532                goto out;
 533        }
 534
 535        if (ic->i_flowctl) {
 536                credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT);
 537                adv_credits += posted;
 538                if (credit_alloc < work_alloc) {
 539                        rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
 540                        work_alloc = credit_alloc;
 541                        flow_controlled = 1;
 542                }
 543                if (work_alloc == 0) {
 544                        set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
 545                        rds_ib_stats_inc(s_ib_tx_throttle);
 546                        ret = -ENOMEM;
 547                        goto out;
 548                }
 549        }
 550
 551        /* map the message the first time we see it */
 552        if (!ic->i_data_op) {
 553                if (rm->data.op_nents) {
 554                        rm->data.op_count = ib_dma_map_sg(dev,
 555                                                          rm->data.op_sg,
 556                                                          rm->data.op_nents,
 557                                                          DMA_TO_DEVICE);
 558                        rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count);
 559                        if (rm->data.op_count == 0) {
 560                                rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
 561                                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 562                                ret = -ENOMEM; /* XXX ? */
 563                                goto out;
 564                        }
 565                } else {
 566                        rm->data.op_count = 0;
 567                }
 568
 569                rds_message_addref(rm);
 570                rm->data.op_dmasg = 0;
 571                rm->data.op_dmaoff = 0;
 572                ic->i_data_op = &rm->data;
 573
 574                /* Finalize the header */
 575                if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
 576                        rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED;
 577                if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
 578                        rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
 579
 580                /* If it has a RDMA op, tell the peer we did it. This is
 581                 * used by the peer to release use-once RDMA MRs. */
 582                if (rm->rdma.op_active) {
 583                        struct rds_ext_header_rdma ext_hdr;
 584
 585                        ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey);
 586                        rds_message_add_extension(&rm->m_inc.i_hdr,
 587                                        RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
 588                }
 589                if (rm->m_rdma_cookie) {
 590                        rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr,
 591                                        rds_rdma_cookie_key(rm->m_rdma_cookie),
 592                                        rds_rdma_cookie_offset(rm->m_rdma_cookie));
 593                }
 594
 595                /* Note - rds_ib_piggyb_ack clears the ACK_REQUIRED bit, so
 596                 * we should not do this unless we have a chance of at least
 597                 * sticking the header into the send ring. Which is why we
 598                 * should call rds_ib_ring_alloc first. */
 599                rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_ib_piggyb_ack(ic));
 600                rds_message_make_checksum(&rm->m_inc.i_hdr);
 601
 602                /*
 603                 * Update adv_credits since we reset the ACK_REQUIRED bit.
 604                 */
 605                if (ic->i_flowctl) {
 606                        rds_ib_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits);
 607                        adv_credits += posted;
 608                        BUG_ON(adv_credits > 255);
 609                }
 610        }
 611
 612        /* Sometimes you want to put a fence between an RDMA
 613         * READ and the following SEND.
 614         * We could either do this all the time
 615         * or when requested by the user. Right now, we let
 616         * the application choose.
 617         */
 618        if (rm->rdma.op_active && rm->rdma.op_fence)
 619                send_flags = IB_SEND_FENCE;
 620
 621        /* Each frag gets a header. Msgs may be 0 bytes */
 622        send = &ic->i_sends[pos];
 623        first = send;
 624        prev = NULL;
 625        scat = &ic->i_data_op->op_sg[rm->data.op_dmasg];
 626        i = 0;
 627        do {
 628                unsigned int len = 0;
 629
 630                /* Set up the header */
 631                send->s_wr.send_flags = send_flags;
 632                send->s_wr.opcode = IB_WR_SEND;
 633                send->s_wr.num_sge = 1;
 634                send->s_wr.next = NULL;
 635                send->s_queued = jiffies;
 636                send->s_op = NULL;
 637
 638                send->s_sge[0].addr = ic->i_send_hdrs_dma
 639                        + (pos * sizeof(struct rds_header));
 640                send->s_sge[0].length = sizeof(struct rds_header);
 641
 642                memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
 643
 644                /* Set up the data, if present */
 645                if (i < work_alloc
 646                    && scat != &rm->data.op_sg[rm->data.op_count]) {
 647                        len = min(RDS_FRAG_SIZE,
 648                                ib_sg_dma_len(dev, scat) - rm->data.op_dmaoff);
 649                        send->s_wr.num_sge = 2;
 650
 651                        send->s_sge[1].addr = ib_sg_dma_address(dev, scat);
 652                        send->s_sge[1].addr += rm->data.op_dmaoff;
 653                        send->s_sge[1].length = len;
 654
 655                        bytes_sent += len;
 656                        rm->data.op_dmaoff += len;
 657                        if (rm->data.op_dmaoff == ib_sg_dma_len(dev, scat)) {
 658                                scat++;
 659                                rm->data.op_dmasg++;
 660                                rm->data.op_dmaoff = 0;
 661                        }
 662                }
 663
 664                rds_ib_set_wr_signal_state(ic, send, false);
 665
 666                /*
 667                 * Always signal the last one if we're stopping due to flow control.
 668                 */
 669                if (ic->i_flowctl && flow_controlled && i == (work_alloc - 1)) {
 670                        rds_ib_set_wr_signal_state(ic, send, true);
 671                        send->s_wr.send_flags |= IB_SEND_SOLICITED;
 672                }
 673
 674                if (send->s_wr.send_flags & IB_SEND_SIGNALED)
 675                        nr_sig++;
 676
 677                rdsdebug("send %p wr %p num_sge %u next %p\n", send,
 678                         &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 679
 680                if (ic->i_flowctl && adv_credits) {
 681                        struct rds_header *hdr = &ic->i_send_hdrs[pos];
 682
 683                        /* add credit and redo the header checksum */
 684                        hdr->h_credit = adv_credits;
 685                        rds_message_make_checksum(hdr);
 686                        adv_credits = 0;
 687                        rds_ib_stats_inc(s_ib_tx_credit_updates);
 688                }
 689
 690                if (prev)
 691                        prev->s_wr.next = &send->s_wr;
 692                prev = send;
 693
 694                pos = (pos + 1) % ic->i_send_ring.w_nr;
 695                send = &ic->i_sends[pos];
 696                i++;
 697
 698        } while (i < work_alloc
 699                 && scat != &rm->data.op_sg[rm->data.op_count]);
 700
 701        /* Account the RDS header in the number of bytes we sent, but just once.
 702         * The caller has no concept of fragmentation. */
 703        if (hdr_off == 0)
 704                bytes_sent += sizeof(struct rds_header);
 705
 706        /* if we finished the message then send completion owns it */
 707        if (scat == &rm->data.op_sg[rm->data.op_count]) {
 708                prev->s_op = ic->i_data_op;
 709                prev->s_wr.send_flags |= IB_SEND_SOLICITED;
 710                if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED))
 711                        nr_sig += rds_ib_set_wr_signal_state(ic, prev, true);
 712                ic->i_data_op = NULL;
 713        }
 714
 715        /* Put back wrs & credits we didn't use */
 716        if (i < work_alloc) {
 717                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
 718                work_alloc = i;
 719        }
 720        if (ic->i_flowctl && i < credit_alloc)
 721                rds_ib_send_add_credits(conn, credit_alloc - i);
 722
 723        if (nr_sig)
 724                atomic_add(nr_sig, &ic->i_signaled_sends);
 725
 726        /* XXX need to worry about failed_wr and partial sends. */
 727        failed_wr = &first->s_wr;
 728        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
 729        rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
 730                 first, &first->s_wr, ret, failed_wr);
 731        BUG_ON(failed_wr != &first->s_wr);
 732        if (ret) {
 733                printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
 734                       "returned %d\n", &conn->c_faddr, ret);
 735                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 736                rds_ib_sub_signaled(ic, nr_sig);
 737                if (prev->s_op) {
 738                        ic->i_data_op = prev->s_op;
 739                        prev->s_op = NULL;
 740                }
 741
 742                rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
 743                goto out;
 744        }
 745
 746        ret = bytes_sent;
 747out:
 748        BUG_ON(adv_credits);
 749        return ret;
 750}
 751
 752/*
 753 * Issue atomic operation.
 754 * A simplified version of the rdma case, we always map 1 SG, and
 755 * only 8 bytes, for the return value from the atomic operation.
 756 */
 757int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
 758{
 759        struct rds_ib_connection *ic = conn->c_transport_data;
 760        struct rds_ib_send_work *send = NULL;
 761        struct ib_send_wr *failed_wr;
 762        struct rds_ib_device *rds_ibdev;
 763        u32 pos;
 764        u32 work_alloc;
 765        int ret;
 766        int nr_sig = 0;
 767
 768        rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 769
 770        work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
 771        if (work_alloc != 1) {
 772                rds_ib_stats_inc(s_ib_tx_ring_full);
 773                ret = -ENOMEM;
 774                goto out;
 775        }
 776
 777        /* address of send request in ring */
 778        send = &ic->i_sends[pos];
 779        send->s_queued = jiffies;
 780
 781        if (op->op_type == RDS_ATOMIC_TYPE_CSWP) {
 782                send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP;
 783                send->s_atomic_wr.compare_add = op->op_m_cswp.compare;
 784                send->s_atomic_wr.swap = op->op_m_cswp.swap;
 785                send->s_atomic_wr.compare_add_mask = op->op_m_cswp.compare_mask;
 786                send->s_atomic_wr.swap_mask = op->op_m_cswp.swap_mask;
 787        } else { /* FADD */
 788                send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD;
 789                send->s_atomic_wr.compare_add = op->op_m_fadd.add;
 790                send->s_atomic_wr.swap = 0;
 791                send->s_atomic_wr.compare_add_mask = op->op_m_fadd.nocarry_mask;
 792                send->s_atomic_wr.swap_mask = 0;
 793        }
 794        send->s_wr.send_flags = 0;
 795        nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 796        send->s_atomic_wr.wr.num_sge = 1;
 797        send->s_atomic_wr.wr.next = NULL;
 798        send->s_atomic_wr.remote_addr = op->op_remote_addr;
 799        send->s_atomic_wr.rkey = op->op_rkey;
 800        send->s_op = op;
 801        rds_message_addref(container_of(send->s_op, struct rds_message, atomic));
 802
 803        /* map 8 byte retval buffer to the device */
 804        ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
 805        rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
 806        if (ret != 1) {
 807                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 808                rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
 809                ret = -ENOMEM; /* XXX ? */
 810                goto out;
 811        }
 812
 813        /* Convert our struct scatterlist to struct ib_sge */
 814        send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg);
 815        send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg);
 816        send->s_sge[0].lkey = ic->i_pd->local_dma_lkey;
 817
 818        rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
 819                 send->s_sge[0].addr, send->s_sge[0].length);
 820
 821        if (nr_sig)
 822                atomic_add(nr_sig, &ic->i_signaled_sends);
 823
 824        failed_wr = &send->s_atomic_wr.wr;
 825        ret = ib_post_send(ic->i_cm_id->qp, &send->s_atomic_wr.wr, &failed_wr);
 826        rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
 827                 send, &send->s_atomic_wr, ret, failed_wr);
 828        BUG_ON(failed_wr != &send->s_atomic_wr.wr);
 829        if (ret) {
 830                printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
 831                       "returned %d\n", &conn->c_faddr, ret);
 832                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 833                rds_ib_sub_signaled(ic, nr_sig);
 834                goto out;
 835        }
 836
 837        if (unlikely(failed_wr != &send->s_atomic_wr.wr)) {
 838                printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
 839                BUG_ON(failed_wr != &send->s_atomic_wr.wr);
 840        }
 841
 842out:
 843        return ret;
 844}
 845
 846int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 847{
 848        struct rds_ib_connection *ic = conn->c_transport_data;
 849        struct rds_ib_send_work *send = NULL;
 850        struct rds_ib_send_work *first;
 851        struct rds_ib_send_work *prev;
 852        struct ib_send_wr *failed_wr;
 853        struct scatterlist *scat;
 854        unsigned long len;
 855        u64 remote_addr = op->op_remote_addr;
 856        u32 max_sge = ic->rds_ibdev->max_sge;
 857        u32 pos;
 858        u32 work_alloc;
 859        u32 i;
 860        u32 j;
 861        int sent;
 862        int ret;
 863        int num_sge;
 864        int nr_sig = 0;
 865
 866        /* map the op the first time we see it */
 867        if (!op->op_mapped) {
 868                op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
 869                                             op->op_sg, op->op_nents, (op->op_write) ?
 870                                             DMA_TO_DEVICE : DMA_FROM_DEVICE);
 871                rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count);
 872                if (op->op_count == 0) {
 873                        rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
 874                        ret = -ENOMEM; /* XXX ? */
 875                        goto out;
 876                }
 877
 878                op->op_mapped = 1;
 879        }
 880
 881        /*
 882         * Instead of knowing how to return a partial rdma read/write we insist that there
 883         * be enough work requests to send the entire message.
 884         */
 885        i = ceil(op->op_count, max_sge);
 886
 887        work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 888        if (work_alloc != i) {
 889                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 890                rds_ib_stats_inc(s_ib_tx_ring_full);
 891                ret = -ENOMEM;
 892                goto out;
 893        }
 894
 895        send = &ic->i_sends[pos];
 896        first = send;
 897        prev = NULL;
 898        scat = &op->op_sg[0];
 899        sent = 0;
 900        num_sge = op->op_count;
 901
 902        for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) {
 903                send->s_wr.send_flags = 0;
 904                send->s_queued = jiffies;
 905                send->s_op = NULL;
 906
 907                nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 908
 909                send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
 910                send->s_rdma_wr.remote_addr = remote_addr;
 911                send->s_rdma_wr.rkey = op->op_rkey;
 912
 913                if (num_sge > max_sge) {
 914                        send->s_rdma_wr.wr.num_sge = max_sge;
 915                        num_sge -= max_sge;
 916                } else {
 917                        send->s_rdma_wr.wr.num_sge = num_sge;
 918                }
 919
 920                send->s_rdma_wr.wr.next = NULL;
 921
 922                if (prev)
 923                        prev->s_rdma_wr.wr.next = &send->s_rdma_wr.wr;
 924
 925                for (j = 0; j < send->s_rdma_wr.wr.num_sge &&
 926                     scat != &op->op_sg[op->op_count]; j++) {
 927                        len = ib_sg_dma_len(ic->i_cm_id->device, scat);
 928                        send->s_sge[j].addr =
 929                                 ib_sg_dma_address(ic->i_cm_id->device, scat);
 930                        send->s_sge[j].length = len;
 931                        send->s_sge[j].lkey = ic->i_pd->local_dma_lkey;
 932
 933                        sent += len;
 934                        rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
 935
 936                        remote_addr += len;
 937                        scat++;
 938                }
 939
 940                rdsdebug("send %p wr %p num_sge %u next %p\n", send,
 941                        &send->s_rdma_wr.wr,
 942                        send->s_rdma_wr.wr.num_sge,
 943                        send->s_rdma_wr.wr.next);
 944
 945                prev = send;
 946                if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
 947                        send = ic->i_sends;
 948        }
 949
 950        /* give a reference to the last op */
 951        if (scat == &op->op_sg[op->op_count]) {
 952                prev->s_op = op;
 953                rds_message_addref(container_of(op, struct rds_message, rdma));
 954        }
 955
 956        if (i < work_alloc) {
 957                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
 958                work_alloc = i;
 959        }
 960
 961        if (nr_sig)
 962                atomic_add(nr_sig, &ic->i_signaled_sends);
 963
 964        failed_wr = &first->s_rdma_wr.wr;
 965        ret = ib_post_send(ic->i_cm_id->qp, &first->s_rdma_wr.wr, &failed_wr);
 966        rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
 967                 first, &first->s_rdma_wr.wr, ret, failed_wr);
 968        BUG_ON(failed_wr != &first->s_rdma_wr.wr);
 969        if (ret) {
 970                printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
 971                       "returned %d\n", &conn->c_faddr, ret);
 972                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 973                rds_ib_sub_signaled(ic, nr_sig);
 974                goto out;
 975        }
 976
 977        if (unlikely(failed_wr != &first->s_rdma_wr.wr)) {
 978                printk(KERN_WARNING "RDS/IB: ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
 979                BUG_ON(failed_wr != &first->s_rdma_wr.wr);
 980        }
 981
 982
 983out:
 984        return ret;
 985}
 986
 987void rds_ib_xmit_path_complete(struct rds_conn_path *cp)
 988{
 989        struct rds_connection *conn = cp->cp_conn;
 990        struct rds_ib_connection *ic = conn->c_transport_data;
 991
 992        /* We may have a pending ACK or window update we were unable
 993         * to send previously (due to flow control). Try again. */
 994        rds_ib_attempt_ack(ic);
 995}
 996