linux/net/rds/send.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/moduleparam.h>
  35#include <linux/gfp.h>
  36#include <net/sock.h>
  37#include <linux/in.h>
  38#include <linux/list.h>
  39#include <linux/ratelimit.h>
  40#include <linux/export.h>
  41#include <linux/sizes.h>
  42
  43#include "rds.h"
  44
  45/* When transmitting messages in rds_send_xmit, we need to emerge from
  46 * time to time and briefly release the CPU. Otherwise the softlock watchdog
  47 * will kick our shin.
  48 * Also, it seems fairer to not let one busy connection stall all the
  49 * others.
  50 *
  51 * send_batch_count is the number of times we'll loop in send_xmit. Setting
  52 * it to 0 will restore the old behavior (where we looped until we had
  53 * drained the queue).
  54 */
  55static int send_batch_count = SZ_1K;
  56module_param(send_batch_count, int, 0444);
  57MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
  58
  59static void rds_send_remove_from_sock(struct list_head *messages, int status);
  60
  61/*
  62 * Reset the send state.  Callers must ensure that this doesn't race with
  63 * rds_send_xmit().
  64 */
  65void rds_send_path_reset(struct rds_conn_path *cp)
  66{
  67        struct rds_message *rm, *tmp;
  68        unsigned long flags;
  69
  70        if (cp->cp_xmit_rm) {
  71                rm = cp->cp_xmit_rm;
  72                cp->cp_xmit_rm = NULL;
  73                /* Tell the user the RDMA op is no longer mapped by the
  74                 * transport. This isn't entirely true (it's flushed out
  75                 * independently) but as the connection is down, there's
  76                 * no ongoing RDMA to/from that memory */
  77                rds_message_unmapped(rm);
  78                rds_message_put(rm);
  79        }
  80
  81        cp->cp_xmit_sg = 0;
  82        cp->cp_xmit_hdr_off = 0;
  83        cp->cp_xmit_data_off = 0;
  84        cp->cp_xmit_atomic_sent = 0;
  85        cp->cp_xmit_rdma_sent = 0;
  86        cp->cp_xmit_data_sent = 0;
  87
  88        cp->cp_conn->c_map_queued = 0;
  89
  90        cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
  91        cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
  92
  93        /* Mark messages as retransmissions, and move them to the send q */
  94        spin_lock_irqsave(&cp->cp_lock, flags);
  95        list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
  96                set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
  97                set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
  98        }
  99        list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
 100        spin_unlock_irqrestore(&cp->cp_lock, flags);
 101}
 102EXPORT_SYMBOL_GPL(rds_send_path_reset);
 103
 104static int acquire_in_xmit(struct rds_conn_path *cp)
 105{
 106        return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
 107}
 108
 109static void release_in_xmit(struct rds_conn_path *cp)
 110{
 111        clear_bit(RDS_IN_XMIT, &cp->cp_flags);
 112        smp_mb__after_atomic();
 113        /*
 114         * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
 115         * hot path and finding waiters is very rare.  We don't want to walk
 116         * the system-wide hashed waitqueue buckets in the fast path only to
 117         * almost never find waiters.
 118         */
 119        if (waitqueue_active(&cp->cp_waitq))
 120                wake_up_all(&cp->cp_waitq);
 121}
 122
 123/*
 124 * We're making the conscious trade-off here to only send one message
 125 * down the connection at a time.
 126 *   Pro:
 127 *      - tx queueing is a simple fifo list
 128 *      - reassembly is optional and easily done by transports per conn
 129 *      - no per flow rx lookup at all, straight to the socket
 130 *      - less per-frag memory and wire overhead
 131 *   Con:
 132 *      - queued acks can be delayed behind large messages
 133 *   Depends:
 134 *      - small message latency is higher behind queued large messages
 135 *      - large message latency isn't starved by intervening small sends
 136 */
 137int rds_send_xmit(struct rds_conn_path *cp)
 138{
 139        struct rds_connection *conn = cp->cp_conn;
 140        struct rds_message *rm;
 141        unsigned long flags;
 142        unsigned int tmp;
 143        struct scatterlist *sg;
 144        int ret = 0;
 145        LIST_HEAD(to_be_dropped);
 146        int batch_count;
 147        unsigned long send_gen = 0;
 148
 149restart:
 150        batch_count = 0;
 151
 152        /*
 153         * sendmsg calls here after having queued its message on the send
 154         * queue.  We only have one task feeding the connection at a time.  If
 155         * another thread is already feeding the queue then we back off.  This
 156         * avoids blocking the caller and trading per-connection data between
 157         * caches per message.
 158         */
 159        if (!acquire_in_xmit(cp)) {
 160                rds_stats_inc(s_send_lock_contention);
 161                ret = -ENOMEM;
 162                goto out;
 163        }
 164
 165        /*
 166         * we record the send generation after doing the xmit acquire.
 167         * if someone else manages to jump in and do some work, we'll use
 168         * this to avoid a goto restart farther down.
 169         *
 170         * The acquire_in_xmit() check above ensures that only one
 171         * caller can increment c_send_gen at any time.
 172         */
 173        send_gen = READ_ONCE(cp->cp_send_gen) + 1;
 174        WRITE_ONCE(cp->cp_send_gen, send_gen);
 175
 176        /*
 177         * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
 178         * we do the opposite to avoid races.
 179         */
 180        if (!rds_conn_path_up(cp)) {
 181                release_in_xmit(cp);
 182                ret = 0;
 183                goto out;
 184        }
 185
 186        if (conn->c_trans->xmit_path_prepare)
 187                conn->c_trans->xmit_path_prepare(cp);
 188
 189        /*
 190         * spin trying to push headers and data down the connection until
 191         * the connection doesn't make forward progress.
 192         */
 193        while (1) {
 194
 195                rm = cp->cp_xmit_rm;
 196
 197                /*
 198                 * If between sending messages, we can send a pending congestion
 199                 * map update.
 200                 */
 201                if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
 202                        rm = rds_cong_update_alloc(conn);
 203                        if (IS_ERR(rm)) {
 204                                ret = PTR_ERR(rm);
 205                                break;
 206                        }
 207                        rm->data.op_active = 1;
 208                        rm->m_inc.i_conn_path = cp;
 209                        rm->m_inc.i_conn = cp->cp_conn;
 210
 211                        cp->cp_xmit_rm = rm;
 212                }
 213
 214                /*
 215                 * If not already working on one, grab the next message.
 216                 *
 217                 * cp_xmit_rm holds a ref while we're sending this message down
 218                 * the connction.  We can use this ref while holding the
 219                 * send_sem.. rds_send_reset() is serialized with it.
 220                 */
 221                if (!rm) {
 222                        unsigned int len;
 223
 224                        batch_count++;
 225
 226                        /* we want to process as big a batch as we can, but
 227                         * we also want to avoid softlockups.  If we've been
 228                         * through a lot of messages, lets back off and see
 229                         * if anyone else jumps in
 230                         */
 231                        if (batch_count >= send_batch_count)
 232                                goto over_batch;
 233
 234                        spin_lock_irqsave(&cp->cp_lock, flags);
 235
 236                        if (!list_empty(&cp->cp_send_queue)) {
 237                                rm = list_entry(cp->cp_send_queue.next,
 238                                                struct rds_message,
 239                                                m_conn_item);
 240                                rds_message_addref(rm);
 241
 242                                /*
 243                                 * Move the message from the send queue to the retransmit
 244                                 * list right away.
 245                                 */
 246                                list_move_tail(&rm->m_conn_item,
 247                                               &cp->cp_retrans);
 248                        }
 249
 250                        spin_unlock_irqrestore(&cp->cp_lock, flags);
 251
 252                        if (!rm)
 253                                break;
 254
 255                        /* Unfortunately, the way Infiniband deals with
 256                         * RDMA to a bad MR key is by moving the entire
 257                         * queue pair to error state. We cold possibly
 258                         * recover from that, but right now we drop the
 259                         * connection.
 260                         * Therefore, we never retransmit messages with RDMA ops.
 261                         */
 262                        if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
 263                            (rm->rdma.op_active &&
 264                            test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) {
 265                                spin_lock_irqsave(&cp->cp_lock, flags);
 266                                if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
 267                                        list_move(&rm->m_conn_item, &to_be_dropped);
 268                                spin_unlock_irqrestore(&cp->cp_lock, flags);
 269                                continue;
 270                        }
 271
 272                        /* Require an ACK every once in a while */
 273                        len = ntohl(rm->m_inc.i_hdr.h_len);
 274                        if (cp->cp_unacked_packets == 0 ||
 275                            cp->cp_unacked_bytes < len) {
 276                                __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 277
 278                                cp->cp_unacked_packets =
 279                                        rds_sysctl_max_unacked_packets;
 280                                cp->cp_unacked_bytes =
 281                                        rds_sysctl_max_unacked_bytes;
 282                                rds_stats_inc(s_send_ack_required);
 283                        } else {
 284                                cp->cp_unacked_bytes -= len;
 285                                cp->cp_unacked_packets--;
 286                        }
 287
 288                        cp->cp_xmit_rm = rm;
 289                }
 290
 291                /* The transport either sends the whole rdma or none of it */
 292                if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
 293                        rm->m_final_op = &rm->rdma;
 294                        /* The transport owns the mapped memory for now.
 295                         * You can't unmap it while it's on the send queue
 296                         */
 297                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 298                        ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
 299                        if (ret) {
 300                                clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
 301                                wake_up_interruptible(&rm->m_flush_wait);
 302                                break;
 303                        }
 304                        cp->cp_xmit_rdma_sent = 1;
 305
 306                }
 307
 308                if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
 309                        rm->m_final_op = &rm->atomic;
 310                        /* The transport owns the mapped memory for now.
 311                         * You can't unmap it while it's on the send queue
 312                         */
 313                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 314                        ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
 315                        if (ret) {
 316                                clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
 317                                wake_up_interruptible(&rm->m_flush_wait);
 318                                break;
 319                        }
 320                        cp->cp_xmit_atomic_sent = 1;
 321
 322                }
 323
 324                /*
 325                 * A number of cases require an RDS header to be sent
 326                 * even if there is no data.
 327                 * We permit 0-byte sends; rds-ping depends on this.
 328                 * However, if there are exclusively attached silent ops,
 329                 * we skip the hdr/data send, to enable silent operation.
 330                 */
 331                if (rm->data.op_nents == 0) {
 332                        int ops_present;
 333                        int all_ops_are_silent = 1;
 334
 335                        ops_present = (rm->atomic.op_active || rm->rdma.op_active);
 336                        if (rm->atomic.op_active && !rm->atomic.op_silent)
 337                                all_ops_are_silent = 0;
 338                        if (rm->rdma.op_active && !rm->rdma.op_silent)
 339                                all_ops_are_silent = 0;
 340
 341                        if (ops_present && all_ops_are_silent
 342                            && !rm->m_rdma_cookie)
 343                                rm->data.op_active = 0;
 344                }
 345
 346                if (rm->data.op_active && !cp->cp_xmit_data_sent) {
 347                        rm->m_final_op = &rm->data;
 348
 349                        ret = conn->c_trans->xmit(conn, rm,
 350                                                  cp->cp_xmit_hdr_off,
 351                                                  cp->cp_xmit_sg,
 352                                                  cp->cp_xmit_data_off);
 353                        if (ret <= 0)
 354                                break;
 355
 356                        if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
 357                                tmp = min_t(int, ret,
 358                                            sizeof(struct rds_header) -
 359                                            cp->cp_xmit_hdr_off);
 360                                cp->cp_xmit_hdr_off += tmp;
 361                                ret -= tmp;
 362                        }
 363
 364                        sg = &rm->data.op_sg[cp->cp_xmit_sg];
 365                        while (ret) {
 366                                tmp = min_t(int, ret, sg->length -
 367                                                      cp->cp_xmit_data_off);
 368                                cp->cp_xmit_data_off += tmp;
 369                                ret -= tmp;
 370                                if (cp->cp_xmit_data_off == sg->length) {
 371                                        cp->cp_xmit_data_off = 0;
 372                                        sg++;
 373                                        cp->cp_xmit_sg++;
 374                                        BUG_ON(ret != 0 && cp->cp_xmit_sg ==
 375                                               rm->data.op_nents);
 376                                }
 377                        }
 378
 379                        if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
 380                            (cp->cp_xmit_sg == rm->data.op_nents))
 381                                cp->cp_xmit_data_sent = 1;
 382                }
 383
 384                /*
 385                 * A rm will only take multiple times through this loop
 386                 * if there is a data op. Thus, if the data is sent (or there was
 387                 * none), then we're done with the rm.
 388                 */
 389                if (!rm->data.op_active || cp->cp_xmit_data_sent) {
 390                        cp->cp_xmit_rm = NULL;
 391                        cp->cp_xmit_sg = 0;
 392                        cp->cp_xmit_hdr_off = 0;
 393                        cp->cp_xmit_data_off = 0;
 394                        cp->cp_xmit_rdma_sent = 0;
 395                        cp->cp_xmit_atomic_sent = 0;
 396                        cp->cp_xmit_data_sent = 0;
 397
 398                        rds_message_put(rm);
 399                }
 400        }
 401
 402over_batch:
 403        if (conn->c_trans->xmit_path_complete)
 404                conn->c_trans->xmit_path_complete(cp);
 405        release_in_xmit(cp);
 406
 407        /* Nuke any messages we decided not to retransmit. */
 408        if (!list_empty(&to_be_dropped)) {
 409                /* irqs on here, so we can put(), unlike above */
 410                list_for_each_entry(rm, &to_be_dropped, m_conn_item)
 411                        rds_message_put(rm);
 412                rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
 413        }
 414
 415        /*
 416         * Other senders can queue a message after we last test the send queue
 417         * but before we clear RDS_IN_XMIT.  In that case they'd back off and
 418         * not try and send their newly queued message.  We need to check the
 419         * send queue after having cleared RDS_IN_XMIT so that their message
 420         * doesn't get stuck on the send queue.
 421         *
 422         * If the transport cannot continue (i.e ret != 0), then it must
 423         * call us when more room is available, such as from the tx
 424         * completion handler.
 425         *
 426         * We have an extra generation check here so that if someone manages
 427         * to jump in after our release_in_xmit, we'll see that they have done
 428         * some work and we will skip our goto
 429         */
 430        if (ret == 0) {
 431                smp_mb();
 432                if ((test_bit(0, &conn->c_map_queued) ||
 433                     !list_empty(&cp->cp_send_queue)) &&
 434                        send_gen == READ_ONCE(cp->cp_send_gen)) {
 435                        rds_stats_inc(s_send_lock_queue_raced);
 436                        if (batch_count < send_batch_count)
 437                                goto restart;
 438                        queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
 439                }
 440        }
 441out:
 442        return ret;
 443}
 444EXPORT_SYMBOL_GPL(rds_send_xmit);
 445
 446static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
 447{
 448        u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
 449
 450        assert_spin_locked(&rs->rs_lock);
 451
 452        BUG_ON(rs->rs_snd_bytes < len);
 453        rs->rs_snd_bytes -= len;
 454
 455        if (rs->rs_snd_bytes == 0)
 456                rds_stats_inc(s_send_queue_empty);
 457}
 458
 459static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
 460                                    is_acked_func is_acked)
 461{
 462        if (is_acked)
 463                return is_acked(rm, ack);
 464        return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
 465}
 466
 467/*
 468 * This is pretty similar to what happens below in the ACK
 469 * handling code - except that we call here as soon as we get
 470 * the IB send completion on the RDMA op and the accompanying
 471 * message.
 472 */
 473void rds_rdma_send_complete(struct rds_message *rm, int status)
 474{
 475        struct rds_sock *rs = NULL;
 476        struct rm_rdma_op *ro;
 477        struct rds_notifier *notifier;
 478        unsigned long flags;
 479        unsigned int notify = 0;
 480
 481        spin_lock_irqsave(&rm->m_rs_lock, flags);
 482
 483        notify =  rm->rdma.op_notify | rm->data.op_notify;
 484        ro = &rm->rdma;
 485        if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
 486            ro->op_active && notify && ro->op_notifier) {
 487                notifier = ro->op_notifier;
 488                rs = rm->m_rs;
 489                sock_hold(rds_rs_to_sk(rs));
 490
 491                notifier->n_status = status;
 492                spin_lock(&rs->rs_lock);
 493                list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
 494                spin_unlock(&rs->rs_lock);
 495
 496                ro->op_notifier = NULL;
 497        }
 498
 499        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 500
 501        if (rs) {
 502                rds_wake_sk_sleep(rs);
 503                sock_put(rds_rs_to_sk(rs));
 504        }
 505}
 506EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
 507
 508/*
 509 * Just like above, except looks at atomic op
 510 */
 511void rds_atomic_send_complete(struct rds_message *rm, int status)
 512{
 513        struct rds_sock *rs = NULL;
 514        struct rm_atomic_op *ao;
 515        struct rds_notifier *notifier;
 516        unsigned long flags;
 517
 518        spin_lock_irqsave(&rm->m_rs_lock, flags);
 519
 520        ao = &rm->atomic;
 521        if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
 522            && ao->op_active && ao->op_notify && ao->op_notifier) {
 523                notifier = ao->op_notifier;
 524                rs = rm->m_rs;
 525                sock_hold(rds_rs_to_sk(rs));
 526
 527                notifier->n_status = status;
 528                spin_lock(&rs->rs_lock);
 529                list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
 530                spin_unlock(&rs->rs_lock);
 531
 532                ao->op_notifier = NULL;
 533        }
 534
 535        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 536
 537        if (rs) {
 538                rds_wake_sk_sleep(rs);
 539                sock_put(rds_rs_to_sk(rs));
 540        }
 541}
 542EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
 543
 544/*
 545 * This is the same as rds_rdma_send_complete except we
 546 * don't do any locking - we have all the ingredients (message,
 547 * socket, socket lock) and can just move the notifier.
 548 */
 549static inline void
 550__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
 551{
 552        struct rm_rdma_op *ro;
 553        struct rm_atomic_op *ao;
 554
 555        ro = &rm->rdma;
 556        if (ro->op_active && ro->op_notify && ro->op_notifier) {
 557                ro->op_notifier->n_status = status;
 558                list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
 559                ro->op_notifier = NULL;
 560        }
 561
 562        ao = &rm->atomic;
 563        if (ao->op_active && ao->op_notify && ao->op_notifier) {
 564                ao->op_notifier->n_status = status;
 565                list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
 566                ao->op_notifier = NULL;
 567        }
 568
 569        /* No need to wake the app - caller does this */
 570}
 571
 572/*
 573 * This removes messages from the socket's list if they're on it.  The list
 574 * argument must be private to the caller, we must be able to modify it
 575 * without locks.  The messages must have a reference held for their
 576 * position on the list.  This function will drop that reference after
 577 * removing the messages from the 'messages' list regardless of if it found
 578 * the messages on the socket list or not.
 579 */
 580static void rds_send_remove_from_sock(struct list_head *messages, int status)
 581{
 582        unsigned long flags;
 583        struct rds_sock *rs = NULL;
 584        struct rds_message *rm;
 585
 586        while (!list_empty(messages)) {
 587                int was_on_sock = 0;
 588
 589                rm = list_entry(messages->next, struct rds_message,
 590                                m_conn_item);
 591                list_del_init(&rm->m_conn_item);
 592
 593                /*
 594                 * If we see this flag cleared then we're *sure* that someone
 595                 * else beat us to removing it from the sock.  If we race
 596                 * with their flag update we'll get the lock and then really
 597                 * see that the flag has been cleared.
 598                 *
 599                 * The message spinlock makes sure nobody clears rm->m_rs
 600                 * while we're messing with it. It does not prevent the
 601                 * message from being removed from the socket, though.
 602                 */
 603                spin_lock_irqsave(&rm->m_rs_lock, flags);
 604                if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
 605                        goto unlock_and_drop;
 606
 607                if (rs != rm->m_rs) {
 608                        if (rs) {
 609                                rds_wake_sk_sleep(rs);
 610                                sock_put(rds_rs_to_sk(rs));
 611                        }
 612                        rs = rm->m_rs;
 613                        if (rs)
 614                                sock_hold(rds_rs_to_sk(rs));
 615                }
 616                if (!rs)
 617                        goto unlock_and_drop;
 618                spin_lock(&rs->rs_lock);
 619
 620                if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
 621                        struct rm_rdma_op *ro = &rm->rdma;
 622                        struct rds_notifier *notifier;
 623
 624                        list_del_init(&rm->m_sock_item);
 625                        rds_send_sndbuf_remove(rs, rm);
 626
 627                        if (ro->op_active && ro->op_notifier &&
 628                               (ro->op_notify || (ro->op_recverr && status))) {
 629                                notifier = ro->op_notifier;
 630                                list_add_tail(&notifier->n_list,
 631                                                &rs->rs_notify_queue);
 632                                if (!notifier->n_status)
 633                                        notifier->n_status = status;
 634                                rm->rdma.op_notifier = NULL;
 635                        }
 636                        was_on_sock = 1;
 637                        rm->m_rs = NULL;
 638                }
 639                spin_unlock(&rs->rs_lock);
 640
 641unlock_and_drop:
 642                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 643                rds_message_put(rm);
 644                if (was_on_sock)
 645                        rds_message_put(rm);
 646        }
 647
 648        if (rs) {
 649                rds_wake_sk_sleep(rs);
 650                sock_put(rds_rs_to_sk(rs));
 651        }
 652}
 653
 654/*
 655 * Transports call here when they've determined that the receiver queued
 656 * messages up to, and including, the given sequence number.  Messages are
 657 * moved to the retrans queue when rds_send_xmit picks them off the send
 658 * queue. This means that in the TCP case, the message may not have been
 659 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
 660 * checks the RDS_MSG_HAS_ACK_SEQ bit.
 661 */
 662void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
 663                              is_acked_func is_acked)
 664{
 665        struct rds_message *rm, *tmp;
 666        unsigned long flags;
 667        LIST_HEAD(list);
 668
 669        spin_lock_irqsave(&cp->cp_lock, flags);
 670
 671        list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
 672                if (!rds_send_is_acked(rm, ack, is_acked))
 673                        break;
 674
 675                list_move(&rm->m_conn_item, &list);
 676                clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
 677        }
 678
 679        /* order flag updates with spin locks */
 680        if (!list_empty(&list))
 681                smp_mb__after_atomic();
 682
 683        spin_unlock_irqrestore(&cp->cp_lock, flags);
 684
 685        /* now remove the messages from the sock list as needed */
 686        rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
 687}
 688EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
 689
 690void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
 691                         is_acked_func is_acked)
 692{
 693        WARN_ON(conn->c_trans->t_mp_capable);
 694        rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
 695}
 696EXPORT_SYMBOL_GPL(rds_send_drop_acked);
 697
 698void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
 699{
 700        struct rds_message *rm, *tmp;
 701        struct rds_connection *conn;
 702        struct rds_conn_path *cp;
 703        unsigned long flags;
 704        LIST_HEAD(list);
 705
 706        /* get all the messages we're dropping under the rs lock */
 707        spin_lock_irqsave(&rs->rs_lock, flags);
 708
 709        list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
 710                if (dest && (dest->sin_addr.s_addr != rm->m_daddr ||
 711                             dest->sin_port != rm->m_inc.i_hdr.h_dport))
 712                        continue;
 713
 714                list_move(&rm->m_sock_item, &list);
 715                rds_send_sndbuf_remove(rs, rm);
 716                clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
 717        }
 718
 719        /* order flag updates with the rs lock */
 720        smp_mb__after_atomic();
 721
 722        spin_unlock_irqrestore(&rs->rs_lock, flags);
 723
 724        if (list_empty(&list))
 725                return;
 726
 727        /* Remove the messages from the conn */
 728        list_for_each_entry(rm, &list, m_sock_item) {
 729
 730                conn = rm->m_inc.i_conn;
 731                if (conn->c_trans->t_mp_capable)
 732                        cp = rm->m_inc.i_conn_path;
 733                else
 734                        cp = &conn->c_path[0];
 735
 736                spin_lock_irqsave(&cp->cp_lock, flags);
 737                /*
 738                 * Maybe someone else beat us to removing rm from the conn.
 739                 * If we race with their flag update we'll get the lock and
 740                 * then really see that the flag has been cleared.
 741                 */
 742                if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
 743                        spin_unlock_irqrestore(&cp->cp_lock, flags);
 744                        spin_lock_irqsave(&rm->m_rs_lock, flags);
 745                        rm->m_rs = NULL;
 746                        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 747                        continue;
 748                }
 749                list_del_init(&rm->m_conn_item);
 750                spin_unlock_irqrestore(&cp->cp_lock, flags);
 751
 752                /*
 753                 * Couldn't grab m_rs_lock in top loop (lock ordering),
 754                 * but we can now.
 755                 */
 756                spin_lock_irqsave(&rm->m_rs_lock, flags);
 757
 758                spin_lock(&rs->rs_lock);
 759                __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
 760                spin_unlock(&rs->rs_lock);
 761
 762                rm->m_rs = NULL;
 763                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 764
 765                rds_message_put(rm);
 766        }
 767
 768        rds_wake_sk_sleep(rs);
 769
 770        while (!list_empty(&list)) {
 771                rm = list_entry(list.next, struct rds_message, m_sock_item);
 772                list_del_init(&rm->m_sock_item);
 773                rds_message_wait(rm);
 774
 775                /* just in case the code above skipped this message
 776                 * because RDS_MSG_ON_CONN wasn't set, run it again here
 777                 * taking m_rs_lock is the only thing that keeps us
 778                 * from racing with ack processing.
 779                 */
 780                spin_lock_irqsave(&rm->m_rs_lock, flags);
 781
 782                spin_lock(&rs->rs_lock);
 783                __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
 784                spin_unlock(&rs->rs_lock);
 785
 786                rm->m_rs = NULL;
 787                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 788
 789                rds_message_put(rm);
 790        }
 791}
 792
 793/*
 794 * we only want this to fire once so we use the callers 'queued'.  It's
 795 * possible that another thread can race with us and remove the
 796 * message from the flow with RDS_CANCEL_SENT_TO.
 797 */
 798static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
 799                             struct rds_conn_path *cp,
 800                             struct rds_message *rm, __be16 sport,
 801                             __be16 dport, int *queued)
 802{
 803        unsigned long flags;
 804        u32 len;
 805
 806        if (*queued)
 807                goto out;
 808
 809        len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
 810
 811        /* this is the only place which holds both the socket's rs_lock
 812         * and the connection's c_lock */
 813        spin_lock_irqsave(&rs->rs_lock, flags);
 814
 815        /*
 816         * If there is a little space in sndbuf, we don't queue anything,
 817         * and userspace gets -EAGAIN. But poll() indicates there's send
 818         * room. This can lead to bad behavior (spinning) if snd_bytes isn't
 819         * freed up by incoming acks. So we check the *old* value of
 820         * rs_snd_bytes here to allow the last msg to exceed the buffer,
 821         * and poll() now knows no more data can be sent.
 822         */
 823        if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
 824                rs->rs_snd_bytes += len;
 825
 826                /* let recv side know we are close to send space exhaustion.
 827                 * This is probably not the optimal way to do it, as this
 828                 * means we set the flag on *all* messages as soon as our
 829                 * throughput hits a certain threshold.
 830                 */
 831                if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
 832                        __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 833
 834                list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
 835                set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
 836                rds_message_addref(rm);
 837                rm->m_rs = rs;
 838
 839                /* The code ordering is a little weird, but we're
 840                   trying to minimize the time we hold c_lock */
 841                rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
 842                rm->m_inc.i_conn = conn;
 843                rm->m_inc.i_conn_path = cp;
 844                rds_message_addref(rm);
 845
 846                spin_lock(&cp->cp_lock);
 847                rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
 848                list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
 849                set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
 850                spin_unlock(&cp->cp_lock);
 851
 852                rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
 853                         rm, len, rs, rs->rs_snd_bytes,
 854                         (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
 855
 856                *queued = 1;
 857        }
 858
 859        spin_unlock_irqrestore(&rs->rs_lock, flags);
 860out:
 861        return *queued;
 862}
 863
 864/*
 865 * rds_message is getting to be quite complicated, and we'd like to allocate
 866 * it all in one go. This figures out how big it needs to be up front.
 867 */
 868static int rds_rm_size(struct msghdr *msg, int data_len)
 869{
 870        struct cmsghdr *cmsg;
 871        int size = 0;
 872        int cmsg_groups = 0;
 873        int retval;
 874
 875        for_each_cmsghdr(cmsg, msg) {
 876                if (!CMSG_OK(msg, cmsg))
 877                        return -EINVAL;
 878
 879                if (cmsg->cmsg_level != SOL_RDS)
 880                        continue;
 881
 882                switch (cmsg->cmsg_type) {
 883                case RDS_CMSG_RDMA_ARGS:
 884                        cmsg_groups |= 1;
 885                        retval = rds_rdma_extra_size(CMSG_DATA(cmsg));
 886                        if (retval < 0)
 887                                return retval;
 888                        size += retval;
 889
 890                        break;
 891
 892                case RDS_CMSG_RDMA_DEST:
 893                case RDS_CMSG_RDMA_MAP:
 894                        cmsg_groups |= 2;
 895                        /* these are valid but do no add any size */
 896                        break;
 897
 898                case RDS_CMSG_ATOMIC_CSWP:
 899                case RDS_CMSG_ATOMIC_FADD:
 900                case RDS_CMSG_MASKED_ATOMIC_CSWP:
 901                case RDS_CMSG_MASKED_ATOMIC_FADD:
 902                        cmsg_groups |= 1;
 903                        size += sizeof(struct scatterlist);
 904                        break;
 905
 906                default:
 907                        return -EINVAL;
 908                }
 909
 910        }
 911
 912        size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
 913
 914        /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
 915        if (cmsg_groups == 3)
 916                return -EINVAL;
 917
 918        return size;
 919}
 920
 921static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 922                         struct msghdr *msg, int *allocated_mr)
 923{
 924        struct cmsghdr *cmsg;
 925        int ret = 0;
 926
 927        for_each_cmsghdr(cmsg, msg) {
 928                if (!CMSG_OK(msg, cmsg))
 929                        return -EINVAL;
 930
 931                if (cmsg->cmsg_level != SOL_RDS)
 932                        continue;
 933
 934                /* As a side effect, RDMA_DEST and RDMA_MAP will set
 935                 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
 936                 */
 937                switch (cmsg->cmsg_type) {
 938                case RDS_CMSG_RDMA_ARGS:
 939                        ret = rds_cmsg_rdma_args(rs, rm, cmsg);
 940                        break;
 941
 942                case RDS_CMSG_RDMA_DEST:
 943                        ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
 944                        break;
 945
 946                case RDS_CMSG_RDMA_MAP:
 947                        ret = rds_cmsg_rdma_map(rs, rm, cmsg);
 948                        if (!ret)
 949                                *allocated_mr = 1;
 950                        else if (ret == -ENODEV)
 951                                /* Accommodate the get_mr() case which can fail
 952                                 * if connection isn't established yet.
 953                                 */
 954                                ret = -EAGAIN;
 955                        break;
 956                case RDS_CMSG_ATOMIC_CSWP:
 957                case RDS_CMSG_ATOMIC_FADD:
 958                case RDS_CMSG_MASKED_ATOMIC_CSWP:
 959                case RDS_CMSG_MASKED_ATOMIC_FADD:
 960                        ret = rds_cmsg_atomic(rs, rm, cmsg);
 961                        break;
 962
 963                default:
 964                        return -EINVAL;
 965                }
 966
 967                if (ret)
 968                        break;
 969        }
 970
 971        return ret;
 972}
 973
 974static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
 975{
 976        int hash;
 977
 978        if (conn->c_npaths == 0)
 979                hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
 980        else
 981                hash = RDS_MPATH_HASH(rs, conn->c_npaths);
 982        if (conn->c_npaths == 0 && hash != 0) {
 983                rds_send_ping(conn, 0);
 984
 985                if (conn->c_npaths == 0) {
 986                        wait_event_interruptible(conn->c_hs_waitq,
 987                                                 (conn->c_npaths != 0));
 988                }
 989                if (conn->c_npaths == 1)
 990                        hash = 0;
 991        }
 992        return hash;
 993}
 994
 995static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
 996{
 997        struct rds_rdma_args *args;
 998        struct cmsghdr *cmsg;
 999
1000        for_each_cmsghdr(cmsg, msg) {
1001                if (!CMSG_OK(msg, cmsg))
1002                        return -EINVAL;
1003
1004                if (cmsg->cmsg_level != SOL_RDS)
1005                        continue;
1006
1007                if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
1008                        args = CMSG_DATA(cmsg);
1009                        *rdma_bytes += args->remote_vec.bytes;
1010                }
1011        }
1012        return 0;
1013}
1014
1015int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1016{
1017        struct sock *sk = sock->sk;
1018        struct rds_sock *rs = rds_sk_to_rs(sk);
1019        DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
1020        __be32 daddr;
1021        __be16 dport;
1022        struct rds_message *rm = NULL;
1023        struct rds_connection *conn;
1024        int ret = 0;
1025        int queued = 0, allocated_mr = 0;
1026        int nonblock = msg->msg_flags & MSG_DONTWAIT;
1027        long timeo = sock_sndtimeo(sk, nonblock);
1028        struct rds_conn_path *cpath;
1029        size_t total_payload_len = payload_len, rdma_payload_len = 0;
1030
1031        /* Mirror Linux UDP mirror of BSD error message compatibility */
1032        /* XXX: Perhaps MSG_MORE someday */
1033        if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) {
1034                ret = -EOPNOTSUPP;
1035                goto out;
1036        }
1037
1038        if (msg->msg_namelen) {
1039                /* XXX fail non-unicast destination IPs? */
1040                if (msg->msg_namelen < sizeof(*usin) || usin->sin_family != AF_INET) {
1041                        ret = -EINVAL;
1042                        goto out;
1043                }
1044                daddr = usin->sin_addr.s_addr;
1045                dport = usin->sin_port;
1046        } else {
1047                /* We only care about consistency with ->connect() */
1048                lock_sock(sk);
1049                daddr = rs->rs_conn_addr;
1050                dport = rs->rs_conn_port;
1051                release_sock(sk);
1052        }
1053
1054        lock_sock(sk);
1055        if (daddr == 0 || rs->rs_bound_addr == 0) {
1056                release_sock(sk);
1057                ret = -ENOTCONN; /* XXX not a great errno */
1058                goto out;
1059        }
1060        release_sock(sk);
1061
1062        ret = rds_rdma_bytes(msg, &rdma_payload_len);
1063        if (ret)
1064                goto out;
1065
1066        total_payload_len += rdma_payload_len;
1067        if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
1068                ret = -EMSGSIZE;
1069                goto out;
1070        }
1071
1072        if (payload_len > rds_sk_sndbuf(rs)) {
1073                ret = -EMSGSIZE;
1074                goto out;
1075        }
1076
1077        /* size of rm including all sgs */
1078        ret = rds_rm_size(msg, payload_len);
1079        if (ret < 0)
1080                goto out;
1081
1082        rm = rds_message_alloc(ret, GFP_KERNEL);
1083        if (!rm) {
1084                ret = -ENOMEM;
1085                goto out;
1086        }
1087
1088        /* Attach data to the rm */
1089        if (payload_len) {
1090                rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
1091                if (!rm->data.op_sg) {
1092                        ret = -ENOMEM;
1093                        goto out;
1094                }
1095                ret = rds_message_copy_from_user(rm, &msg->msg_iter);
1096                if (ret)
1097                        goto out;
1098        }
1099        rm->data.op_active = 1;
1100
1101        rm->m_daddr = daddr;
1102
1103        /* rds_conn_create has a spinlock that runs with IRQ off.
1104         * Caching the conn in the socket helps a lot. */
1105        if (rs->rs_conn && rs->rs_conn->c_faddr == daddr)
1106                conn = rs->rs_conn;
1107        else {
1108                conn = rds_conn_create_outgoing(sock_net(sock->sk),
1109                                                rs->rs_bound_addr, daddr,
1110                                        rs->rs_transport,
1111                                        sock->sk->sk_allocation);
1112                if (IS_ERR(conn)) {
1113                        ret = PTR_ERR(conn);
1114                        goto out;
1115                }
1116                rs->rs_conn = conn;
1117        }
1118
1119        /* Parse any control messages the user may have included. */
1120        ret = rds_cmsg_send(rs, rm, msg, &allocated_mr);
1121        if (ret) {
1122                /* Trigger connection so that its ready for the next retry */
1123                if (ret ==  -EAGAIN)
1124                        rds_conn_connect_if_down(conn);
1125                goto out;
1126        }
1127
1128        if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
1129                printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
1130                               &rm->rdma, conn->c_trans->xmit_rdma);
1131                ret = -EOPNOTSUPP;
1132                goto out;
1133        }
1134
1135        if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
1136                printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
1137                               &rm->atomic, conn->c_trans->xmit_atomic);
1138                ret = -EOPNOTSUPP;
1139                goto out;
1140        }
1141
1142        if (conn->c_trans->t_mp_capable)
1143                cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)];
1144        else
1145                cpath = &conn->c_path[0];
1146
1147        rds_conn_path_connect_if_down(cpath);
1148
1149        ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
1150        if (ret) {
1151                rs->rs_seen_congestion = 1;
1152                goto out;
1153        }
1154        while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
1155                                  dport, &queued)) {
1156                rds_stats_inc(s_send_queue_full);
1157
1158                if (nonblock) {
1159                        ret = -EAGAIN;
1160                        goto out;
1161                }
1162
1163                timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
1164                                        rds_send_queue_rm(rs, conn, cpath, rm,
1165                                                          rs->rs_bound_port,
1166                                                          dport,
1167                                                          &queued),
1168                                        timeo);
1169                rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
1170                if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
1171                        continue;
1172
1173                ret = timeo;
1174                if (ret == 0)
1175                        ret = -ETIMEDOUT;
1176                goto out;
1177        }
1178
1179        /*
1180         * By now we've committed to the send.  We reuse rds_send_worker()
1181         * to retry sends in the rds thread if the transport asks us to.
1182         */
1183        rds_stats_inc(s_send_queued);
1184
1185        ret = rds_send_xmit(cpath);
1186        if (ret == -ENOMEM || ret == -EAGAIN)
1187                queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1188
1189        rds_message_put(rm);
1190        return payload_len;
1191
1192out:
1193        /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
1194         * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
1195         * or in any other way, we need to destroy the MR again */
1196        if (allocated_mr)
1197                rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
1198
1199        if (rm)
1200                rds_message_put(rm);
1201        return ret;
1202}
1203
1204/*
1205 * send out a probe. Can be shared by rds_send_ping,
1206 * rds_send_pong, rds_send_hb.
1207 * rds_send_hb should use h_flags
1208 *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
1209 * or
1210 *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
1211 */
1212static int
1213rds_send_probe(struct rds_conn_path *cp, __be16 sport,
1214               __be16 dport, u8 h_flags)
1215{
1216        struct rds_message *rm;
1217        unsigned long flags;
1218        int ret = 0;
1219
1220        rm = rds_message_alloc(0, GFP_ATOMIC);
1221        if (!rm) {
1222                ret = -ENOMEM;
1223                goto out;
1224        }
1225
1226        rm->m_daddr = cp->cp_conn->c_faddr;
1227        rm->data.op_active = 1;
1228
1229        rds_conn_path_connect_if_down(cp);
1230
1231        ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
1232        if (ret)
1233                goto out;
1234
1235        spin_lock_irqsave(&cp->cp_lock, flags);
1236        list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
1237        set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
1238        rds_message_addref(rm);
1239        rm->m_inc.i_conn = cp->cp_conn;
1240        rm->m_inc.i_conn_path = cp;
1241
1242        rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
1243                                    cp->cp_next_tx_seq);
1244        rm->m_inc.i_hdr.h_flags |= h_flags;
1245        cp->cp_next_tx_seq++;
1246
1247        if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
1248            cp->cp_conn->c_trans->t_mp_capable) {
1249                u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
1250                u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
1251
1252                rds_message_add_extension(&rm->m_inc.i_hdr,
1253                                          RDS_EXTHDR_NPATHS, &npaths,
1254                                          sizeof(npaths));
1255                rds_message_add_extension(&rm->m_inc.i_hdr,
1256                                          RDS_EXTHDR_GEN_NUM,
1257                                          &my_gen_num,
1258                                          sizeof(u32));
1259        }
1260        spin_unlock_irqrestore(&cp->cp_lock, flags);
1261
1262        rds_stats_inc(s_send_queued);
1263        rds_stats_inc(s_send_pong);
1264
1265        /* schedule the send work on rds_wq */
1266        queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
1267
1268        rds_message_put(rm);
1269        return 0;
1270
1271out:
1272        if (rm)
1273                rds_message_put(rm);
1274        return ret;
1275}
1276
1277int
1278rds_send_pong(struct rds_conn_path *cp, __be16 dport)
1279{
1280        return rds_send_probe(cp, 0, dport, 0);
1281}
1282
1283void
1284rds_send_ping(struct rds_connection *conn, int cp_index)
1285{
1286        unsigned long flags;
1287        struct rds_conn_path *cp = &conn->c_path[cp_index];
1288
1289        spin_lock_irqsave(&cp->cp_lock, flags);
1290        if (conn->c_ping_triggered) {
1291                spin_unlock_irqrestore(&cp->cp_lock, flags);
1292                return;
1293        }
1294        conn->c_ping_triggered = 1;
1295        spin_unlock_irqrestore(&cp->cp_lock, flags);
1296        rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
1297}
1298EXPORT_SYMBOL_GPL(rds_send_ping);
1299