linux/net/rds/ib_send.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/in.h>
  35#include <linux/device.h>
  36#include <linux/dmapool.h>
  37#include <linux/ratelimit.h>
  38
  39#include "rds.h"
  40#include "ib.h"
  41
  42static char *rds_ib_wc_status_strings[] = {
  43#define RDS_IB_WC_STATUS_STR(foo) \
  44                [IB_WC_##foo] = __stringify(IB_WC_##foo)
  45        RDS_IB_WC_STATUS_STR(SUCCESS),
  46        RDS_IB_WC_STATUS_STR(LOC_LEN_ERR),
  47        RDS_IB_WC_STATUS_STR(LOC_QP_OP_ERR),
  48        RDS_IB_WC_STATUS_STR(LOC_EEC_OP_ERR),
  49        RDS_IB_WC_STATUS_STR(LOC_PROT_ERR),
  50        RDS_IB_WC_STATUS_STR(WR_FLUSH_ERR),
  51        RDS_IB_WC_STATUS_STR(MW_BIND_ERR),
  52        RDS_IB_WC_STATUS_STR(BAD_RESP_ERR),
  53        RDS_IB_WC_STATUS_STR(LOC_ACCESS_ERR),
  54        RDS_IB_WC_STATUS_STR(REM_INV_REQ_ERR),
  55        RDS_IB_WC_STATUS_STR(REM_ACCESS_ERR),
  56        RDS_IB_WC_STATUS_STR(REM_OP_ERR),
  57        RDS_IB_WC_STATUS_STR(RETRY_EXC_ERR),
  58        RDS_IB_WC_STATUS_STR(RNR_RETRY_EXC_ERR),
  59        RDS_IB_WC_STATUS_STR(LOC_RDD_VIOL_ERR),
  60        RDS_IB_WC_STATUS_STR(REM_INV_RD_REQ_ERR),
  61        RDS_IB_WC_STATUS_STR(REM_ABORT_ERR),
  62        RDS_IB_WC_STATUS_STR(INV_EECN_ERR),
  63        RDS_IB_WC_STATUS_STR(INV_EEC_STATE_ERR),
  64        RDS_IB_WC_STATUS_STR(FATAL_ERR),
  65        RDS_IB_WC_STATUS_STR(RESP_TIMEOUT_ERR),
  66        RDS_IB_WC_STATUS_STR(GENERAL_ERR),
  67#undef RDS_IB_WC_STATUS_STR
  68};
  69
  70char *rds_ib_wc_status_str(enum ib_wc_status status)
  71{
  72        return rds_str_array(rds_ib_wc_status_strings,
  73                             ARRAY_SIZE(rds_ib_wc_status_strings), status);
  74}
  75
  76/*
  77 * Convert IB-specific error message to RDS error message and call core
  78 * completion handler.
  79 */
  80static void rds_ib_send_complete(struct rds_message *rm,
  81                                 int wc_status,
  82                                 void (*complete)(struct rds_message *rm, int status))
  83{
  84        int notify_status;
  85
  86        switch (wc_status) {
  87        case IB_WC_WR_FLUSH_ERR:
  88                return;
  89
  90        case IB_WC_SUCCESS:
  91                notify_status = RDS_RDMA_SUCCESS;
  92                break;
  93
  94        case IB_WC_REM_ACCESS_ERR:
  95                notify_status = RDS_RDMA_REMOTE_ERROR;
  96                break;
  97
  98        default:
  99                notify_status = RDS_RDMA_OTHER_ERROR;
 100                break;
 101        }
 102        complete(rm, notify_status);
 103}
 104
 105static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
 106                                   struct rm_data_op *op,
 107                                   int wc_status)
 108{
 109        if (op->op_nents)
 110                ib_dma_unmap_sg(ic->i_cm_id->device,
 111                                op->op_sg, op->op_nents,
 112                                DMA_TO_DEVICE);
 113}
 114
 115static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
 116                                   struct rm_rdma_op *op,
 117                                   int wc_status)
 118{
 119        if (op->op_mapped) {
 120                ib_dma_unmap_sg(ic->i_cm_id->device,
 121                                op->op_sg, op->op_nents,
 122                                op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
 123                op->op_mapped = 0;
 124        }
 125
 126        /* If the user asked for a completion notification on this
 127         * message, we can implement three different semantics:
 128         *  1.  Notify when we received the ACK on the RDS message
 129         *      that was queued with the RDMA. This provides reliable
 130         *      notification of RDMA status at the expense of a one-way
 131         *      packet delay.
 132         *  2.  Notify when the IB stack gives us the completion event for
 133         *      the RDMA operation.
 134         *  3.  Notify when the IB stack gives us the completion event for
 135         *      the accompanying RDS messages.
 136         * Here, we implement approach #3. To implement approach #2,
 137         * we would need to take an event for the rdma WR. To implement #1,
 138         * don't call rds_rdma_send_complete at all, and fall back to the notify
 139         * handling in the ACK processing code.
 140         *
 141         * Note: There's no need to explicitly sync any RDMA buffers using
 142         * ib_dma_sync_sg_for_cpu - the completion for the RDMA
 143         * operation itself unmapped the RDMA buffers, which takes care
 144         * of synching.
 145         */
 146        rds_ib_send_complete(container_of(op, struct rds_message, rdma),
 147                             wc_status, rds_rdma_send_complete);
 148
 149        if (op->op_write)
 150                rds_stats_add(s_send_rdma_bytes, op->op_bytes);
 151        else
 152                rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
 153}
 154
 155static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
 156                                     struct rm_atomic_op *op,
 157                                     int wc_status)
 158{
 159        /* unmap atomic recvbuf */
 160        if (op->op_mapped) {
 161                ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
 162                                DMA_FROM_DEVICE);
 163                op->op_mapped = 0;
 164        }
 165
 166        rds_ib_send_complete(container_of(op, struct rds_message, atomic),
 167                             wc_status, rds_atomic_send_complete);
 168
 169        if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
 170                rds_ib_stats_inc(s_ib_atomic_cswp);
 171        else
 172                rds_ib_stats_inc(s_ib_atomic_fadd);
 173}
 174
 175/*
 176 * Unmap the resources associated with a struct send_work.
 177 *
 178 * Returns the rm for no good reason other than it is unobtainable
 179 * other than by switching on wr.opcode, currently, and the caller,
 180 * the event handler, needs it.
 181 */
 182static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
 183                                                struct rds_ib_send_work *send,
 184                                                int wc_status)
 185{
 186        struct rds_message *rm = NULL;
 187
 188        /* In the error case, wc.opcode sometimes contains garbage */
 189        switch (send->s_wr.opcode) {
 190        case IB_WR_SEND:
 191                if (send->s_op) {
 192                        rm = container_of(send->s_op, struct rds_message, data);
 193                        rds_ib_send_unmap_data(ic, send->s_op, wc_status);
 194                }
 195                break;
 196        case IB_WR_RDMA_WRITE:
 197        case IB_WR_RDMA_READ:
 198                if (send->s_op) {
 199                        rm = container_of(send->s_op, struct rds_message, rdma);
 200                        rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
 201                }
 202                break;
 203        case IB_WR_ATOMIC_FETCH_AND_ADD:
 204        case IB_WR_ATOMIC_CMP_AND_SWP:
 205                if (send->s_op) {
 206                        rm = container_of(send->s_op, struct rds_message, atomic);
 207                        rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
 208                }
 209                break;
 210        default:
 211                printk_ratelimited(KERN_NOTICE
 212                               "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
 213                               __func__, send->s_wr.opcode);
 214                break;
 215        }
 216
 217        send->s_wr.opcode = 0xdead;
 218
 219        return rm;
 220}
 221
 222void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 223{
 224        struct rds_ib_send_work *send;
 225        u32 i;
 226
 227        for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
 228                struct ib_sge *sge;
 229
 230                send->s_op = NULL;
 231
 232                send->s_wr.wr_id = i;
 233                send->s_wr.sg_list = send->s_sge;
 234                send->s_wr.ex.imm_data = 0;
 235
 236                sge = &send->s_sge[0];
 237                sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));
 238                sge->length = sizeof(struct rds_header);
 239                sge->lkey = ic->i_mr->lkey;
 240
 241                send->s_sge[1].lkey = ic->i_mr->lkey;
 242        }
 243}
 244
 245void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
 246{
 247        struct rds_ib_send_work *send;
 248        u32 i;
 249
 250        for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
 251                if (send->s_op && send->s_wr.opcode != 0xdead)
 252                        rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
 253        }
 254}
 255
 256/*
 257 * The only fast path caller always has a non-zero nr, so we don't
 258 * bother testing nr before performing the atomic sub.
 259 */
 260static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
 261{
 262        if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
 263            waitqueue_active(&rds_ib_ring_empty_wait))
 264                wake_up(&rds_ib_ring_empty_wait);
 265        BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
 266}
 267
 268/*
 269 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
 270 * operations performed in the send path.  As the sender allocs and potentially
 271 * unallocs the next free entry in the ring it doesn't alter which is
 272 * the next to be freed, which is what this is concerned with.
 273 */
 274void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 275{
 276        struct rds_connection *conn = context;
 277        struct rds_ib_connection *ic = conn->c_transport_data;
 278        struct rds_message *rm = NULL;
 279        struct ib_wc wc;
 280        struct rds_ib_send_work *send;
 281        u32 completed;
 282        u32 oldest;
 283        u32 i = 0;
 284        int ret;
 285        int nr_sig = 0;
 286
 287        rdsdebug("cq %p conn %p\n", cq, conn);
 288        rds_ib_stats_inc(s_ib_tx_cq_call);
 289        ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
 290        if (ret)
 291                rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
 292
 293        while (ib_poll_cq(cq, 1, &wc) > 0) {
 294                rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
 295                         (unsigned long long)wc.wr_id, wc.status,
 296                         rds_ib_wc_status_str(wc.status), wc.byte_len,
 297                         be32_to_cpu(wc.ex.imm_data));
 298                rds_ib_stats_inc(s_ib_tx_cq_event);
 299
 300                if (wc.wr_id == RDS_IB_ACK_WR_ID) {
 301                        if (ic->i_ack_queued + HZ/2 < jiffies)
 302                                rds_ib_stats_inc(s_ib_tx_stalled);
 303                        rds_ib_ack_send_complete(ic);
 304                        continue;
 305                }
 306
 307                oldest = rds_ib_ring_oldest(&ic->i_send_ring);
 308
 309                completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
 310
 311                for (i = 0; i < completed; i++) {
 312                        send = &ic->i_sends[oldest];
 313                        if (send->s_wr.send_flags & IB_SEND_SIGNALED)
 314                                nr_sig++;
 315
 316                        rm = rds_ib_send_unmap_op(ic, send, wc.status);
 317
 318                        if (send->s_queued + HZ/2 < jiffies)
 319                                rds_ib_stats_inc(s_ib_tx_stalled);
 320
 321                        if (send->s_op) {
 322                                if (send->s_op == rm->m_final_op) {
 323                                        /* If anyone waited for this message to get flushed out, wake
 324                                         * them up now */
 325                                        rds_message_unmapped(rm);
 326                                }
 327                                rds_message_put(rm);
 328                                send->s_op = NULL;
 329                        }
 330
 331                        oldest = (oldest + 1) % ic->i_send_ring.w_nr;
 332                }
 333
 334                rds_ib_ring_free(&ic->i_send_ring, completed);
 335                rds_ib_sub_signaled(ic, nr_sig);
 336                nr_sig = 0;
 337
 338                if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
 339                    test_bit(0, &conn->c_map_queued))
 340                        queue_delayed_work(rds_wq, &conn->c_send_w, 0);
 341
 342                /* We expect errors as the qp is drained during shutdown */
 343                if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
 344                        rds_ib_conn_error(conn, "send completion on %pI4 had status "
 345                                          "%u (%s), disconnecting and reconnecting\n",
 346                                          &conn->c_faddr, wc.status,
 347                                          rds_ib_wc_status_str(wc.status));
 348                }
 349        }
 350}
 351
 352/*
 353 * This is the main function for allocating credits when sending
 354 * messages.
 355 *
 356 * Conceptually, we have two counters:
 357 *  -   send credits: this tells us how many WRs we're allowed
 358 *      to submit without overruning the receiver's queue. For
 359 *      each SEND WR we post, we decrement this by one.
 360 *
 361 *  -   posted credits: this tells us how many WRs we recently
 362 *      posted to the receive queue. This value is transferred
 363 *      to the peer as a "credit update" in a RDS header field.
 364 *      Every time we transmit credits to the peer, we subtract
 365 *      the amount of transferred credits from this counter.
 366 *
 367 * It is essential that we avoid situations where both sides have
 368 * exhausted their send credits, and are unable to send new credits
 369 * to the peer. We achieve this by requiring that we send at least
 370 * one credit update to the peer before exhausting our credits.
 371 * When new credits arrive, we subtract one credit that is withheld
 372 * until we've posted new buffers and are ready to transmit these
 373 * credits (see rds_ib_send_add_credits below).
 374 *
 375 * The RDS send code is essentially single-threaded; rds_send_xmit
 376 * sets RDS_IN_XMIT to ensure exclusive access to the send ring.
 377 * However, the ACK sending code is independent and can race with
 378 * message SENDs.
 379 *
 380 * In the send path, we need to update the counters for send credits
 381 * and the counter of posted buffers atomically - when we use the
 382 * last available credit, we cannot allow another thread to race us
 383 * and grab the posted credits counter.  Hence, we have to use a
 384 * spinlock to protect the credit counter, or use atomics.
 385 *
 386 * Spinlocks shared between the send and the receive path are bad,
 387 * because they create unnecessary delays. An early implementation
 388 * using a spinlock showed a 5% degradation in throughput at some
 389 * loads.
 390 *
 391 * This implementation avoids spinlocks completely, putting both
 392 * counters into a single atomic, and updating that atomic using
 393 * atomic_add (in the receive path, when receiving fresh credits),
 394 * and using atomic_cmpxchg when updating the two counters.
 395 */
 396int rds_ib_send_grab_credits(struct rds_ib_connection *ic,
 397                             u32 wanted, u32 *adv_credits, int need_posted, int max_posted)
 398{
 399        unsigned int avail, posted, got = 0, advertise;
 400        long oldval, newval;
 401
 402        *adv_credits = 0;
 403        if (!ic->i_flowctl)
 404                return wanted;
 405
 406try_again:
 407        advertise = 0;
 408        oldval = newval = atomic_read(&ic->i_credits);
 409        posted = IB_GET_POST_CREDITS(oldval);
 410        avail = IB_GET_SEND_CREDITS(oldval);
 411
 412        rdsdebug("rds_ib_send_grab_credits(%u): credits=%u posted=%u\n",
 413                        wanted, avail, posted);
 414
 415        /* The last credit must be used to send a credit update. */
 416        if (avail && !posted)
 417                avail--;
 418
 419        if (avail < wanted) {
 420                struct rds_connection *conn = ic->i_cm_id->context;
 421
 422                /* Oops, there aren't that many credits left! */
 423                set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
 424                got = avail;
 425        } else {
 426                /* Sometimes you get what you want, lalala. */
 427                got = wanted;
 428        }
 429        newval -= IB_SET_SEND_CREDITS(got);
 430
 431        /*
 432         * If need_posted is non-zero, then the caller wants
 433         * the posted regardless of whether any send credits are
 434         * available.
 435         */
 436        if (posted && (got || need_posted)) {
 437                advertise = min_t(unsigned int, posted, max_posted);
 438                newval -= IB_SET_POST_CREDITS(advertise);
 439        }
 440
 441        /* Finally bill everything */
 442        if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
 443                goto try_again;
 444
 445        *adv_credits = advertise;
 446        return got;
 447}
 448
 449void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
 450{
 451        struct rds_ib_connection *ic = conn->c_transport_data;
 452
 453        if (credits == 0)
 454                return;
 455
 456        rdsdebug("rds_ib_send_add_credits(%u): current=%u%s\n",
 457                        credits,
 458                        IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
 459                        test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : "");
 460
 461        atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
 462        if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
 463                queue_delayed_work(rds_wq, &conn->c_send_w, 0);
 464
 465        WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
 466
 467        rds_ib_stats_inc(s_ib_rx_credit_updates);
 468}
 469
 470void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
 471{
 472        struct rds_ib_connection *ic = conn->c_transport_data;
 473
 474        if (posted == 0)
 475                return;
 476
 477        atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
 478
 479        /* Decide whether to send an update to the peer now.
 480         * If we would send a credit update for every single buffer we
 481         * post, we would end up with an ACK storm (ACK arrives,
 482         * consumes buffer, we refill the ring, send ACK to remote
 483         * advertising the newly posted buffer... ad inf)
 484         *
 485         * Performance pretty much depends on how often we send
 486         * credit updates - too frequent updates mean lots of ACKs.
 487         * Too infrequent updates, and the peer will run out of
 488         * credits and has to throttle.
 489         * For the time being, 16 seems to be a good compromise.
 490         */
 491        if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
 492                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 493}
 494
 495static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
 496                                             struct rds_ib_send_work *send,
 497                                             bool notify)
 498{
 499        /*
 500         * We want to delay signaling completions just enough to get
 501         * the batching benefits but not so much that we create dead time
 502         * on the wire.
 503         */
 504        if (ic->i_unsignaled_wrs-- == 0 || notify) {
 505                ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
 506                send->s_wr.send_flags |= IB_SEND_SIGNALED;
 507                return 1;
 508        }
 509        return 0;
 510}
 511
 512/*
 513 * This can be called multiple times for a given message.  The first time
 514 * we see a message we map its scatterlist into the IB device so that
 515 * we can provide that mapped address to the IB scatter gather entries
 516 * in the IB work requests.  We translate the scatterlist into a series
 517 * of work requests that fragment the message.  These work requests complete
 518 * in order so we pass ownership of the message to the completion handler
 519 * once we send the final fragment.
 520 *
 521 * The RDS core uses the c_send_lock to only enter this function once
 522 * per connection.  This makes sure that the tx ring alloc/unalloc pairs
 523 * don't get out of sync and confuse the ring.
 524 */
 525int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 526                unsigned int hdr_off, unsigned int sg, unsigned int off)
 527{
 528        struct rds_ib_connection *ic = conn->c_transport_data;
 529        struct ib_device *dev = ic->i_cm_id->device;
 530        struct rds_ib_send_work *send = NULL;
 531        struct rds_ib_send_work *first;
 532        struct rds_ib_send_work *prev;
 533        struct ib_send_wr *failed_wr;
 534        struct scatterlist *scat;
 535        u32 pos;
 536        u32 i;
 537        u32 work_alloc;
 538        u32 credit_alloc = 0;
 539        u32 posted;
 540        u32 adv_credits = 0;
 541        int send_flags = 0;
 542        int bytes_sent = 0;
 543        int ret;
 544        int flow_controlled = 0;
 545        int nr_sig = 0;
 546
 547        BUG_ON(off % RDS_FRAG_SIZE);
 548        BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
 549
 550        /* Do not send cong updates to IB loopback */
 551        if (conn->c_loopback
 552            && rm->m_inc.i_hdr.h_flags & RDS_FLAG_CONG_BITMAP) {
 553                rds_cong_map_updated(conn->c_fcong, ~(u64) 0);
 554                scat = &rm->data.op_sg[sg];
 555                ret = sizeof(struct rds_header) + RDS_CONG_MAP_BYTES;
 556                ret = min_t(int, ret, scat->length - conn->c_xmit_data_off);
 557                return ret;
 558        }
 559
 560        /* FIXME we may overallocate here */
 561        if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
 562                i = 1;
 563        else
 564                i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
 565
 566        work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 567        if (work_alloc == 0) {
 568                set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
 569                rds_ib_stats_inc(s_ib_tx_ring_full);
 570                ret = -ENOMEM;
 571                goto out;
 572        }
 573
 574        if (ic->i_flowctl) {
 575                credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT);
 576                adv_credits += posted;
 577                if (credit_alloc < work_alloc) {
 578                        rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
 579                        work_alloc = credit_alloc;
 580                        flow_controlled = 1;
 581                }
 582                if (work_alloc == 0) {
 583                        set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
 584                        rds_ib_stats_inc(s_ib_tx_throttle);
 585                        ret = -ENOMEM;
 586                        goto out;
 587                }
 588        }
 589
 590        /* map the message the first time we see it */
 591        if (!ic->i_data_op) {
 592                if (rm->data.op_nents) {
 593                        rm->data.op_count = ib_dma_map_sg(dev,
 594                                                          rm->data.op_sg,
 595                                                          rm->data.op_nents,
 596                                                          DMA_TO_DEVICE);
 597                        rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count);
 598                        if (rm->data.op_count == 0) {
 599                                rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
 600                                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 601                                ret = -ENOMEM; /* XXX ? */
 602                                goto out;
 603                        }
 604                } else {
 605                        rm->data.op_count = 0;
 606                }
 607
 608                rds_message_addref(rm);
 609                ic->i_data_op = &rm->data;
 610
 611                /* Finalize the header */
 612                if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
 613                        rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED;
 614                if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
 615                        rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
 616
 617                /* If it has a RDMA op, tell the peer we did it. This is
 618                 * used by the peer to release use-once RDMA MRs. */
 619                if (rm->rdma.op_active) {
 620                        struct rds_ext_header_rdma ext_hdr;
 621
 622                        ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey);
 623                        rds_message_add_extension(&rm->m_inc.i_hdr,
 624                                        RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
 625                }
 626                if (rm->m_rdma_cookie) {
 627                        rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr,
 628                                        rds_rdma_cookie_key(rm->m_rdma_cookie),
 629                                        rds_rdma_cookie_offset(rm->m_rdma_cookie));
 630                }
 631
 632                /* Note - rds_ib_piggyb_ack clears the ACK_REQUIRED bit, so
 633                 * we should not do this unless we have a chance of at least
 634                 * sticking the header into the send ring. Which is why we
 635                 * should call rds_ib_ring_alloc first. */
 636                rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_ib_piggyb_ack(ic));
 637                rds_message_make_checksum(&rm->m_inc.i_hdr);
 638
 639                /*
 640                 * Update adv_credits since we reset the ACK_REQUIRED bit.
 641                 */
 642                if (ic->i_flowctl) {
 643                        rds_ib_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits);
 644                        adv_credits += posted;
 645                        BUG_ON(adv_credits > 255);
 646                }
 647        }
 648
 649        /* Sometimes you want to put a fence between an RDMA
 650         * READ and the following SEND.
 651         * We could either do this all the time
 652         * or when requested by the user. Right now, we let
 653         * the application choose.
 654         */
 655        if (rm->rdma.op_active && rm->rdma.op_fence)
 656                send_flags = IB_SEND_FENCE;
 657
 658        /* Each frag gets a header. Msgs may be 0 bytes */
 659        send = &ic->i_sends[pos];
 660        first = send;
 661        prev = NULL;
 662        scat = &ic->i_data_op->op_sg[sg];
 663        i = 0;
 664        do {
 665                unsigned int len = 0;
 666
 667                /* Set up the header */
 668                send->s_wr.send_flags = send_flags;
 669                send->s_wr.opcode = IB_WR_SEND;
 670                send->s_wr.num_sge = 1;
 671                send->s_wr.next = NULL;
 672                send->s_queued = jiffies;
 673                send->s_op = NULL;
 674
 675                send->s_sge[0].addr = ic->i_send_hdrs_dma
 676                        + (pos * sizeof(struct rds_header));
 677                send->s_sge[0].length = sizeof(struct rds_header);
 678
 679                memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
 680
 681                /* Set up the data, if present */
 682                if (i < work_alloc
 683                    && scat != &rm->data.op_sg[rm->data.op_count]) {
 684                        len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off);
 685                        send->s_wr.num_sge = 2;
 686
 687                        send->s_sge[1].addr = ib_sg_dma_address(dev, scat) + off;
 688                        send->s_sge[1].length = len;
 689
 690                        bytes_sent += len;
 691                        off += len;
 692                        if (off == ib_sg_dma_len(dev, scat)) {
 693                                scat++;
 694                                off = 0;
 695                        }
 696                }
 697
 698                rds_ib_set_wr_signal_state(ic, send, 0);
 699
 700                /*
 701                 * Always signal the last one if we're stopping due to flow control.
 702                 */
 703                if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
 704                        send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 705
 706                if (send->s_wr.send_flags & IB_SEND_SIGNALED)
 707                        nr_sig++;
 708
 709                rdsdebug("send %p wr %p num_sge %u next %p\n", send,
 710                         &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 711
 712                if (ic->i_flowctl && adv_credits) {
 713                        struct rds_header *hdr = &ic->i_send_hdrs[pos];
 714
 715                        /* add credit and redo the header checksum */
 716                        hdr->h_credit = adv_credits;
 717                        rds_message_make_checksum(hdr);
 718                        adv_credits = 0;
 719                        rds_ib_stats_inc(s_ib_tx_credit_updates);
 720                }
 721
 722                if (prev)
 723                        prev->s_wr.next = &send->s_wr;
 724                prev = send;
 725
 726                pos = (pos + 1) % ic->i_send_ring.w_nr;
 727                send = &ic->i_sends[pos];
 728                i++;
 729
 730        } while (i < work_alloc
 731                 && scat != &rm->data.op_sg[rm->data.op_count]);
 732
 733        /* Account the RDS header in the number of bytes we sent, but just once.
 734         * The caller has no concept of fragmentation. */
 735        if (hdr_off == 0)
 736                bytes_sent += sizeof(struct rds_header);
 737
 738        /* if we finished the message then send completion owns it */
 739        if (scat == &rm->data.op_sg[rm->data.op_count]) {
 740                prev->s_op = ic->i_data_op;
 741                prev->s_wr.send_flags |= IB_SEND_SOLICITED;
 742                ic->i_data_op = NULL;
 743        }
 744
 745        /* Put back wrs & credits we didn't use */
 746        if (i < work_alloc) {
 747                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
 748                work_alloc = i;
 749        }
 750        if (ic->i_flowctl && i < credit_alloc)
 751                rds_ib_send_add_credits(conn, credit_alloc - i);
 752
 753        if (nr_sig)
 754                atomic_add(nr_sig, &ic->i_signaled_sends);
 755
 756        /* XXX need to worry about failed_wr and partial sends. */
 757        failed_wr = &first->s_wr;
 758        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
 759        rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
 760                 first, &first->s_wr, ret, failed_wr);
 761        BUG_ON(failed_wr != &first->s_wr);
 762        if (ret) {
 763                printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
 764                       "returned %d\n", &conn->c_faddr, ret);
 765                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 766                rds_ib_sub_signaled(ic, nr_sig);
 767                if (prev->s_op) {
 768                        ic->i_data_op = prev->s_op;
 769                        prev->s_op = NULL;
 770                }
 771
 772                rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
 773                goto out;
 774        }
 775
 776        ret = bytes_sent;
 777out:
 778        BUG_ON(adv_credits);
 779        return ret;
 780}
 781
 782/*
 783 * Issue atomic operation.
 784 * A simplified version of the rdma case, we always map 1 SG, and
 785 * only 8 bytes, for the return value from the atomic operation.
 786 */
 787int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
 788{
 789        struct rds_ib_connection *ic = conn->c_transport_data;
 790        struct rds_ib_send_work *send = NULL;
 791        struct ib_send_wr *failed_wr;
 792        struct rds_ib_device *rds_ibdev;
 793        u32 pos;
 794        u32 work_alloc;
 795        int ret;
 796        int nr_sig = 0;
 797
 798        rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 799
 800        work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
 801        if (work_alloc != 1) {
 802                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 803                rds_ib_stats_inc(s_ib_tx_ring_full);
 804                ret = -ENOMEM;
 805                goto out;
 806        }
 807
 808        /* address of send request in ring */
 809        send = &ic->i_sends[pos];
 810        send->s_queued = jiffies;
 811
 812        if (op->op_type == RDS_ATOMIC_TYPE_CSWP) {
 813                send->s_wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP;
 814                send->s_wr.wr.atomic.compare_add = op->op_m_cswp.compare;
 815                send->s_wr.wr.atomic.swap = op->op_m_cswp.swap;
 816                send->s_wr.wr.atomic.compare_add_mask = op->op_m_cswp.compare_mask;
 817                send->s_wr.wr.atomic.swap_mask = op->op_m_cswp.swap_mask;
 818        } else { /* FADD */
 819                send->s_wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD;
 820                send->s_wr.wr.atomic.compare_add = op->op_m_fadd.add;
 821                send->s_wr.wr.atomic.swap = 0;
 822                send->s_wr.wr.atomic.compare_add_mask = op->op_m_fadd.nocarry_mask;
 823                send->s_wr.wr.atomic.swap_mask = 0;
 824        }
 825        nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 826        send->s_wr.num_sge = 1;
 827        send->s_wr.next = NULL;
 828        send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
 829        send->s_wr.wr.atomic.rkey = op->op_rkey;
 830        send->s_op = op;
 831        rds_message_addref(container_of(send->s_op, struct rds_message, atomic));
 832
 833        /* map 8 byte retval buffer to the device */
 834        ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
 835        rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
 836        if (ret != 1) {
 837                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 838                rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
 839                ret = -ENOMEM; /* XXX ? */
 840                goto out;
 841        }
 842
 843        /* Convert our struct scatterlist to struct ib_sge */
 844        send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg);
 845        send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg);
 846        send->s_sge[0].lkey = ic->i_mr->lkey;
 847
 848        rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
 849                 send->s_sge[0].addr, send->s_sge[0].length);
 850
 851        if (nr_sig)
 852                atomic_add(nr_sig, &ic->i_signaled_sends);
 853
 854        failed_wr = &send->s_wr;
 855        ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
 856        rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
 857                 send, &send->s_wr, ret, failed_wr);
 858        BUG_ON(failed_wr != &send->s_wr);
 859        if (ret) {
 860                printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
 861                       "returned %d\n", &conn->c_faddr, ret);
 862                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 863                rds_ib_sub_signaled(ic, nr_sig);
 864                goto out;
 865        }
 866
 867        if (unlikely(failed_wr != &send->s_wr)) {
 868                printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
 869                BUG_ON(failed_wr != &send->s_wr);
 870        }
 871
 872out:
 873        return ret;
 874}
 875
 876int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 877{
 878        struct rds_ib_connection *ic = conn->c_transport_data;
 879        struct rds_ib_send_work *send = NULL;
 880        struct rds_ib_send_work *first;
 881        struct rds_ib_send_work *prev;
 882        struct ib_send_wr *failed_wr;
 883        struct scatterlist *scat;
 884        unsigned long len;
 885        u64 remote_addr = op->op_remote_addr;
 886        u32 max_sge = ic->rds_ibdev->max_sge;
 887        u32 pos;
 888        u32 work_alloc;
 889        u32 i;
 890        u32 j;
 891        int sent;
 892        int ret;
 893        int num_sge;
 894        int nr_sig = 0;
 895
 896        /* map the op the first time we see it */
 897        if (!op->op_mapped) {
 898                op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
 899                                             op->op_sg, op->op_nents, (op->op_write) ?
 900                                             DMA_TO_DEVICE : DMA_FROM_DEVICE);
 901                rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count);
 902                if (op->op_count == 0) {
 903                        rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
 904                        ret = -ENOMEM; /* XXX ? */
 905                        goto out;
 906                }
 907
 908                op->op_mapped = 1;
 909        }
 910
 911        /*
 912         * Instead of knowing how to return a partial rdma read/write we insist that there
 913         * be enough work requests to send the entire message.
 914         */
 915        i = ceil(op->op_count, max_sge);
 916
 917        work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 918        if (work_alloc != i) {
 919                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 920                rds_ib_stats_inc(s_ib_tx_ring_full);
 921                ret = -ENOMEM;
 922                goto out;
 923        }
 924
 925        send = &ic->i_sends[pos];
 926        first = send;
 927        prev = NULL;
 928        scat = &op->op_sg[0];
 929        sent = 0;
 930        num_sge = op->op_count;
 931
 932        for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) {
 933                send->s_wr.send_flags = 0;
 934                send->s_queued = jiffies;
 935                send->s_op = NULL;
 936
 937                nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 938
 939                send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
 940                send->s_wr.wr.rdma.remote_addr = remote_addr;
 941                send->s_wr.wr.rdma.rkey = op->op_rkey;
 942
 943                if (num_sge > max_sge) {
 944                        send->s_wr.num_sge = max_sge;
 945                        num_sge -= max_sge;
 946                } else {
 947                        send->s_wr.num_sge = num_sge;
 948                }
 949
 950                send->s_wr.next = NULL;
 951
 952                if (prev)
 953                        prev->s_wr.next = &send->s_wr;
 954
 955                for (j = 0; j < send->s_wr.num_sge && scat != &op->op_sg[op->op_count]; j++) {
 956                        len = ib_sg_dma_len(ic->i_cm_id->device, scat);
 957                        send->s_sge[j].addr =
 958                                 ib_sg_dma_address(ic->i_cm_id->device, scat);
 959                        send->s_sge[j].length = len;
 960                        send->s_sge[j].lkey = ic->i_mr->lkey;
 961
 962                        sent += len;
 963                        rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
 964
 965                        remote_addr += len;
 966                        scat++;
 967                }
 968
 969                rdsdebug("send %p wr %p num_sge %u next %p\n", send,
 970                        &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 971
 972                prev = send;
 973                if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
 974                        send = ic->i_sends;
 975        }
 976
 977        /* give a reference to the last op */
 978        if (scat == &op->op_sg[op->op_count]) {
 979                prev->s_op = op;
 980                rds_message_addref(container_of(op, struct rds_message, rdma));
 981        }
 982
 983        if (i < work_alloc) {
 984                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
 985                work_alloc = i;
 986        }
 987
 988        if (nr_sig)
 989                atomic_add(nr_sig, &ic->i_signaled_sends);
 990
 991        failed_wr = &first->s_wr;
 992        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
 993        rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
 994                 first, &first->s_wr, ret, failed_wr);
 995        BUG_ON(failed_wr != &first->s_wr);
 996        if (ret) {
 997                printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
 998                       "returned %d\n", &conn->c_faddr, ret);
 999                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
1000                rds_ib_sub_signaled(ic, nr_sig);
1001                goto out;
1002        }
1003
1004        if (unlikely(failed_wr != &first->s_wr)) {
1005                printk(KERN_WARNING "RDS/IB: ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
1006                BUG_ON(failed_wr != &first->s_wr);
1007        }
1008
1009
1010out:
1011        return ret;
1012}
1013
1014void rds_ib_xmit_complete(struct rds_connection *conn)
1015{
1016        struct rds_ib_connection *ic = conn->c_transport_data;
1017
1018        /* We may have a pending ACK or window update we were unable
1019         * to send previously (due to flow control). Try again. */
1020        rds_ib_attempt_ack(ic);
1021}
1022