linux/net/rds/send.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/moduleparam.h>
  35#include <linux/gfp.h>
  36#include <net/sock.h>
  37#include <linux/in.h>
  38#include <linux/list.h>
  39#include <linux/ratelimit.h>
  40#include <linux/export.h>
  41#include <linux/sizes.h>
  42
  43#include "rds.h"
  44
  45/* When transmitting messages in rds_send_xmit, we need to emerge from
  46 * time to time and briefly release the CPU. Otherwise the softlock watchdog
  47 * will kick our shin.
  48 * Also, it seems fairer to not let one busy connection stall all the
  49 * others.
  50 *
  51 * send_batch_count is the number of times we'll loop in send_xmit. Setting
  52 * it to 0 will restore the old behavior (where we looped until we had
  53 * drained the queue).
  54 */
  55static int send_batch_count = SZ_1K;
  56module_param(send_batch_count, int, 0444);
  57MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
  58
  59static void rds_send_remove_from_sock(struct list_head *messages, int status);
  60
  61/*
  62 * Reset the send state.  Callers must ensure that this doesn't race with
  63 * rds_send_xmit().
  64 */
  65void rds_send_path_reset(struct rds_conn_path *cp)
  66{
  67        struct rds_message *rm, *tmp;
  68        unsigned long flags;
  69
  70        if (cp->cp_xmit_rm) {
  71                rm = cp->cp_xmit_rm;
  72                cp->cp_xmit_rm = NULL;
  73                /* Tell the user the RDMA op is no longer mapped by the
  74                 * transport. This isn't entirely true (it's flushed out
  75                 * independently) but as the connection is down, there's
  76                 * no ongoing RDMA to/from that memory */
  77                rds_message_unmapped(rm);
  78                rds_message_put(rm);
  79        }
  80
  81        cp->cp_xmit_sg = 0;
  82        cp->cp_xmit_hdr_off = 0;
  83        cp->cp_xmit_data_off = 0;
  84        cp->cp_xmit_atomic_sent = 0;
  85        cp->cp_xmit_rdma_sent = 0;
  86        cp->cp_xmit_data_sent = 0;
  87
  88        cp->cp_conn->c_map_queued = 0;
  89
  90        cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
  91        cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
  92
  93        /* Mark messages as retransmissions, and move them to the send q */
  94        spin_lock_irqsave(&cp->cp_lock, flags);
  95        list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
  96                set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
  97                set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
  98        }
  99        list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
 100        spin_unlock_irqrestore(&cp->cp_lock, flags);
 101}
 102EXPORT_SYMBOL_GPL(rds_send_path_reset);
 103
 104static int acquire_in_xmit(struct rds_conn_path *cp)
 105{
 106        return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
 107}
 108
 109static void release_in_xmit(struct rds_conn_path *cp)
 110{
 111        clear_bit(RDS_IN_XMIT, &cp->cp_flags);
 112        smp_mb__after_atomic();
 113        /*
 114         * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
 115         * hot path and finding waiters is very rare.  We don't want to walk
 116         * the system-wide hashed waitqueue buckets in the fast path only to
 117         * almost never find waiters.
 118         */
 119        if (waitqueue_active(&cp->cp_waitq))
 120                wake_up_all(&cp->cp_waitq);
 121}
 122
 123/*
 124 * We're making the conscious trade-off here to only send one message
 125 * down the connection at a time.
 126 *   Pro:
 127 *      - tx queueing is a simple fifo list
 128 *      - reassembly is optional and easily done by transports per conn
 129 *      - no per flow rx lookup at all, straight to the socket
 130 *      - less per-frag memory and wire overhead
 131 *   Con:
 132 *      - queued acks can be delayed behind large messages
 133 *   Depends:
 134 *      - small message latency is higher behind queued large messages
 135 *      - large message latency isn't starved by intervening small sends
 136 */
 137int rds_send_xmit(struct rds_conn_path *cp)
 138{
 139        struct rds_connection *conn = cp->cp_conn;
 140        struct rds_message *rm;
 141        unsigned long flags;
 142        unsigned int tmp;
 143        struct scatterlist *sg;
 144        int ret = 0;
 145        LIST_HEAD(to_be_dropped);
 146        int batch_count;
 147        unsigned long send_gen = 0;
 148
 149restart:
 150        batch_count = 0;
 151
 152        /*
 153         * sendmsg calls here after having queued its message on the send
 154         * queue.  We only have one task feeding the connection at a time.  If
 155         * another thread is already feeding the queue then we back off.  This
 156         * avoids blocking the caller and trading per-connection data between
 157         * caches per message.
 158         */
 159        if (!acquire_in_xmit(cp)) {
 160                rds_stats_inc(s_send_lock_contention);
 161                ret = -ENOMEM;
 162                goto out;
 163        }
 164
 165        if (rds_destroy_pending(cp->cp_conn)) {
 166                release_in_xmit(cp);
 167                ret = -ENETUNREACH; /* dont requeue send work */
 168                goto out;
 169        }
 170
 171        /*
 172         * we record the send generation after doing the xmit acquire.
 173         * if someone else manages to jump in and do some work, we'll use
 174         * this to avoid a goto restart farther down.
 175         *
 176         * The acquire_in_xmit() check above ensures that only one
 177         * caller can increment c_send_gen at any time.
 178         */
 179        send_gen = READ_ONCE(cp->cp_send_gen) + 1;
 180        WRITE_ONCE(cp->cp_send_gen, send_gen);
 181
 182        /*
 183         * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
 184         * we do the opposite to avoid races.
 185         */
 186        if (!rds_conn_path_up(cp)) {
 187                release_in_xmit(cp);
 188                ret = 0;
 189                goto out;
 190        }
 191
 192        if (conn->c_trans->xmit_path_prepare)
 193                conn->c_trans->xmit_path_prepare(cp);
 194
 195        /*
 196         * spin trying to push headers and data down the connection until
 197         * the connection doesn't make forward progress.
 198         */
 199        while (1) {
 200
 201                rm = cp->cp_xmit_rm;
 202
 203                /*
 204                 * If between sending messages, we can send a pending congestion
 205                 * map update.
 206                 */
 207                if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
 208                        rm = rds_cong_update_alloc(conn);
 209                        if (IS_ERR(rm)) {
 210                                ret = PTR_ERR(rm);
 211                                break;
 212                        }
 213                        rm->data.op_active = 1;
 214                        rm->m_inc.i_conn_path = cp;
 215                        rm->m_inc.i_conn = cp->cp_conn;
 216
 217                        cp->cp_xmit_rm = rm;
 218                }
 219
 220                /*
 221                 * If not already working on one, grab the next message.
 222                 *
 223                 * cp_xmit_rm holds a ref while we're sending this message down
 224                 * the connction.  We can use this ref while holding the
 225                 * send_sem.. rds_send_reset() is serialized with it.
 226                 */
 227                if (!rm) {
 228                        unsigned int len;
 229
 230                        batch_count++;
 231
 232                        /* we want to process as big a batch as we can, but
 233                         * we also want to avoid softlockups.  If we've been
 234                         * through a lot of messages, lets back off and see
 235                         * if anyone else jumps in
 236                         */
 237                        if (batch_count >= send_batch_count)
 238                                goto over_batch;
 239
 240                        spin_lock_irqsave(&cp->cp_lock, flags);
 241
 242                        if (!list_empty(&cp->cp_send_queue)) {
 243                                rm = list_entry(cp->cp_send_queue.next,
 244                                                struct rds_message,
 245                                                m_conn_item);
 246                                rds_message_addref(rm);
 247
 248                                /*
 249                                 * Move the message from the send queue to the retransmit
 250                                 * list right away.
 251                                 */
 252                                list_move_tail(&rm->m_conn_item,
 253                                               &cp->cp_retrans);
 254                        }
 255
 256                        spin_unlock_irqrestore(&cp->cp_lock, flags);
 257
 258                        if (!rm)
 259                                break;
 260
 261                        /* Unfortunately, the way Infiniband deals with
 262                         * RDMA to a bad MR key is by moving the entire
 263                         * queue pair to error state. We cold possibly
 264                         * recover from that, but right now we drop the
 265                         * connection.
 266                         * Therefore, we never retransmit messages with RDMA ops.
 267                         */
 268                        if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
 269                            (rm->rdma.op_active &&
 270                            test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) {
 271                                spin_lock_irqsave(&cp->cp_lock, flags);
 272                                if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
 273                                        list_move(&rm->m_conn_item, &to_be_dropped);
 274                                spin_unlock_irqrestore(&cp->cp_lock, flags);
 275                                continue;
 276                        }
 277
 278                        /* Require an ACK every once in a while */
 279                        len = ntohl(rm->m_inc.i_hdr.h_len);
 280                        if (cp->cp_unacked_packets == 0 ||
 281                            cp->cp_unacked_bytes < len) {
 282                                set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 283
 284                                cp->cp_unacked_packets =
 285                                        rds_sysctl_max_unacked_packets;
 286                                cp->cp_unacked_bytes =
 287                                        rds_sysctl_max_unacked_bytes;
 288                                rds_stats_inc(s_send_ack_required);
 289                        } else {
 290                                cp->cp_unacked_bytes -= len;
 291                                cp->cp_unacked_packets--;
 292                        }
 293
 294                        cp->cp_xmit_rm = rm;
 295                }
 296
 297                /* The transport either sends the whole rdma or none of it */
 298                if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
 299                        rm->m_final_op = &rm->rdma;
 300                        /* The transport owns the mapped memory for now.
 301                         * You can't unmap it while it's on the send queue
 302                         */
 303                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 304                        ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
 305                        if (ret) {
 306                                clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
 307                                wake_up_interruptible(&rm->m_flush_wait);
 308                                break;
 309                        }
 310                        cp->cp_xmit_rdma_sent = 1;
 311
 312                }
 313
 314                if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
 315                        rm->m_final_op = &rm->atomic;
 316                        /* The transport owns the mapped memory for now.
 317                         * You can't unmap it while it's on the send queue
 318                         */
 319                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 320                        ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
 321                        if (ret) {
 322                                clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
 323                                wake_up_interruptible(&rm->m_flush_wait);
 324                                break;
 325                        }
 326                        cp->cp_xmit_atomic_sent = 1;
 327
 328                }
 329
 330                /*
 331                 * A number of cases require an RDS header to be sent
 332                 * even if there is no data.
 333                 * We permit 0-byte sends; rds-ping depends on this.
 334                 * However, if there are exclusively attached silent ops,
 335                 * we skip the hdr/data send, to enable silent operation.
 336                 */
 337                if (rm->data.op_nents == 0) {
 338                        int ops_present;
 339                        int all_ops_are_silent = 1;
 340
 341                        ops_present = (rm->atomic.op_active || rm->rdma.op_active);
 342                        if (rm->atomic.op_active && !rm->atomic.op_silent)
 343                                all_ops_are_silent = 0;
 344                        if (rm->rdma.op_active && !rm->rdma.op_silent)
 345                                all_ops_are_silent = 0;
 346
 347                        if (ops_present && all_ops_are_silent
 348                            && !rm->m_rdma_cookie)
 349                                rm->data.op_active = 0;
 350                }
 351
 352                if (rm->data.op_active && !cp->cp_xmit_data_sent) {
 353                        rm->m_final_op = &rm->data;
 354
 355                        ret = conn->c_trans->xmit(conn, rm,
 356                                                  cp->cp_xmit_hdr_off,
 357                                                  cp->cp_xmit_sg,
 358                                                  cp->cp_xmit_data_off);
 359                        if (ret <= 0)
 360                                break;
 361
 362                        if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
 363                                tmp = min_t(int, ret,
 364                                            sizeof(struct rds_header) -
 365                                            cp->cp_xmit_hdr_off);
 366                                cp->cp_xmit_hdr_off += tmp;
 367                                ret -= tmp;
 368                        }
 369
 370                        sg = &rm->data.op_sg[cp->cp_xmit_sg];
 371                        while (ret) {
 372                                tmp = min_t(int, ret, sg->length -
 373                                                      cp->cp_xmit_data_off);
 374                                cp->cp_xmit_data_off += tmp;
 375                                ret -= tmp;
 376                                if (cp->cp_xmit_data_off == sg->length) {
 377                                        cp->cp_xmit_data_off = 0;
 378                                        sg++;
 379                                        cp->cp_xmit_sg++;
 380                                        BUG_ON(ret != 0 && cp->cp_xmit_sg ==
 381                                               rm->data.op_nents);
 382                                }
 383                        }
 384
 385                        if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
 386                            (cp->cp_xmit_sg == rm->data.op_nents))
 387                                cp->cp_xmit_data_sent = 1;
 388                }
 389
 390                /*
 391                 * A rm will only take multiple times through this loop
 392                 * if there is a data op. Thus, if the data is sent (or there was
 393                 * none), then we're done with the rm.
 394                 */
 395                if (!rm->data.op_active || cp->cp_xmit_data_sent) {
 396                        cp->cp_xmit_rm = NULL;
 397                        cp->cp_xmit_sg = 0;
 398                        cp->cp_xmit_hdr_off = 0;
 399                        cp->cp_xmit_data_off = 0;
 400                        cp->cp_xmit_rdma_sent = 0;
 401                        cp->cp_xmit_atomic_sent = 0;
 402                        cp->cp_xmit_data_sent = 0;
 403
 404                        rds_message_put(rm);
 405                }
 406        }
 407
 408over_batch:
 409        if (conn->c_trans->xmit_path_complete)
 410                conn->c_trans->xmit_path_complete(cp);
 411        release_in_xmit(cp);
 412
 413        /* Nuke any messages we decided not to retransmit. */
 414        if (!list_empty(&to_be_dropped)) {
 415                /* irqs on here, so we can put(), unlike above */
 416                list_for_each_entry(rm, &to_be_dropped, m_conn_item)
 417                        rds_message_put(rm);
 418                rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
 419        }
 420
 421        /*
 422         * Other senders can queue a message after we last test the send queue
 423         * but before we clear RDS_IN_XMIT.  In that case they'd back off and
 424         * not try and send their newly queued message.  We need to check the
 425         * send queue after having cleared RDS_IN_XMIT so that their message
 426         * doesn't get stuck on the send queue.
 427         *
 428         * If the transport cannot continue (i.e ret != 0), then it must
 429         * call us when more room is available, such as from the tx
 430         * completion handler.
 431         *
 432         * We have an extra generation check here so that if someone manages
 433         * to jump in after our release_in_xmit, we'll see that they have done
 434         * some work and we will skip our goto
 435         */
 436        if (ret == 0) {
 437                bool raced;
 438
 439                smp_mb();
 440                raced = send_gen != READ_ONCE(cp->cp_send_gen);
 441
 442                if ((test_bit(0, &conn->c_map_queued) ||
 443                    !list_empty(&cp->cp_send_queue)) && !raced) {
 444                        if (batch_count < send_batch_count)
 445                                goto restart;
 446                        rcu_read_lock();
 447                        if (rds_destroy_pending(cp->cp_conn))
 448                                ret = -ENETUNREACH;
 449                        else
 450                                queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
 451                        rcu_read_unlock();
 452                } else if (raced) {
 453                        rds_stats_inc(s_send_lock_queue_raced);
 454                }
 455        }
 456out:
 457        return ret;
 458}
 459EXPORT_SYMBOL_GPL(rds_send_xmit);
 460
 461static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
 462{
 463        u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
 464
 465        assert_spin_locked(&rs->rs_lock);
 466
 467        BUG_ON(rs->rs_snd_bytes < len);
 468        rs->rs_snd_bytes -= len;
 469
 470        if (rs->rs_snd_bytes == 0)
 471                rds_stats_inc(s_send_queue_empty);
 472}
 473
 474static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
 475                                    is_acked_func is_acked)
 476{
 477        if (is_acked)
 478                return is_acked(rm, ack);
 479        return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
 480}
 481
 482/*
 483 * This is pretty similar to what happens below in the ACK
 484 * handling code - except that we call here as soon as we get
 485 * the IB send completion on the RDMA op and the accompanying
 486 * message.
 487 */
 488void rds_rdma_send_complete(struct rds_message *rm, int status)
 489{
 490        struct rds_sock *rs = NULL;
 491        struct rm_rdma_op *ro;
 492        struct rds_notifier *notifier;
 493        unsigned long flags;
 494        unsigned int notify = 0;
 495
 496        spin_lock_irqsave(&rm->m_rs_lock, flags);
 497
 498        notify =  rm->rdma.op_notify | rm->data.op_notify;
 499        ro = &rm->rdma;
 500        if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
 501            ro->op_active && notify && ro->op_notifier) {
 502                notifier = ro->op_notifier;
 503                rs = rm->m_rs;
 504                sock_hold(rds_rs_to_sk(rs));
 505
 506                notifier->n_status = status;
 507                spin_lock(&rs->rs_lock);
 508                list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
 509                spin_unlock(&rs->rs_lock);
 510
 511                ro->op_notifier = NULL;
 512        }
 513
 514        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 515
 516        if (rs) {
 517                rds_wake_sk_sleep(rs);
 518                sock_put(rds_rs_to_sk(rs));
 519        }
 520}
 521EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
 522
 523/*
 524 * Just like above, except looks at atomic op
 525 */
 526void rds_atomic_send_complete(struct rds_message *rm, int status)
 527{
 528        struct rds_sock *rs = NULL;
 529        struct rm_atomic_op *ao;
 530        struct rds_notifier *notifier;
 531        unsigned long flags;
 532
 533        spin_lock_irqsave(&rm->m_rs_lock, flags);
 534
 535        ao = &rm->atomic;
 536        if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
 537            && ao->op_active && ao->op_notify && ao->op_notifier) {
 538                notifier = ao->op_notifier;
 539                rs = rm->m_rs;
 540                sock_hold(rds_rs_to_sk(rs));
 541
 542                notifier->n_status = status;
 543                spin_lock(&rs->rs_lock);
 544                list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
 545                spin_unlock(&rs->rs_lock);
 546
 547                ao->op_notifier = NULL;
 548        }
 549
 550        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 551
 552        if (rs) {
 553                rds_wake_sk_sleep(rs);
 554                sock_put(rds_rs_to_sk(rs));
 555        }
 556}
 557EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
 558
 559/*
 560 * This is the same as rds_rdma_send_complete except we
 561 * don't do any locking - we have all the ingredients (message,
 562 * socket, socket lock) and can just move the notifier.
 563 */
 564static inline void
 565__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
 566{
 567        struct rm_rdma_op *ro;
 568        struct rm_atomic_op *ao;
 569
 570        ro = &rm->rdma;
 571        if (ro->op_active && ro->op_notify && ro->op_notifier) {
 572                ro->op_notifier->n_status = status;
 573                list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
 574                ro->op_notifier = NULL;
 575        }
 576
 577        ao = &rm->atomic;
 578        if (ao->op_active && ao->op_notify && ao->op_notifier) {
 579                ao->op_notifier->n_status = status;
 580                list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
 581                ao->op_notifier = NULL;
 582        }
 583
 584        /* No need to wake the app - caller does this */
 585}
 586
 587/*
 588 * This removes messages from the socket's list if they're on it.  The list
 589 * argument must be private to the caller, we must be able to modify it
 590 * without locks.  The messages must have a reference held for their
 591 * position on the list.  This function will drop that reference after
 592 * removing the messages from the 'messages' list regardless of if it found
 593 * the messages on the socket list or not.
 594 */
 595static void rds_send_remove_from_sock(struct list_head *messages, int status)
 596{
 597        unsigned long flags;
 598        struct rds_sock *rs = NULL;
 599        struct rds_message *rm;
 600
 601        while (!list_empty(messages)) {
 602                int was_on_sock = 0;
 603
 604                rm = list_entry(messages->next, struct rds_message,
 605                                m_conn_item);
 606                list_del_init(&rm->m_conn_item);
 607
 608                /*
 609                 * If we see this flag cleared then we're *sure* that someone
 610                 * else beat us to removing it from the sock.  If we race
 611                 * with their flag update we'll get the lock and then really
 612                 * see that the flag has been cleared.
 613                 *
 614                 * The message spinlock makes sure nobody clears rm->m_rs
 615                 * while we're messing with it. It does not prevent the
 616                 * message from being removed from the socket, though.
 617                 */
 618                spin_lock_irqsave(&rm->m_rs_lock, flags);
 619                if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
 620                        goto unlock_and_drop;
 621
 622                if (rs != rm->m_rs) {
 623                        if (rs) {
 624                                rds_wake_sk_sleep(rs);
 625                                sock_put(rds_rs_to_sk(rs));
 626                        }
 627                        rs = rm->m_rs;
 628                        if (rs)
 629                                sock_hold(rds_rs_to_sk(rs));
 630                }
 631                if (!rs)
 632                        goto unlock_and_drop;
 633                spin_lock(&rs->rs_lock);
 634
 635                if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
 636                        struct rm_rdma_op *ro = &rm->rdma;
 637                        struct rds_notifier *notifier;
 638
 639                        list_del_init(&rm->m_sock_item);
 640                        rds_send_sndbuf_remove(rs, rm);
 641
 642                        if (ro->op_active && ro->op_notifier &&
 643                               (ro->op_notify || (ro->op_recverr && status))) {
 644                                notifier = ro->op_notifier;
 645                                list_add_tail(&notifier->n_list,
 646                                                &rs->rs_notify_queue);
 647                                if (!notifier->n_status)
 648                                        notifier->n_status = status;
 649                                rm->rdma.op_notifier = NULL;
 650                        }
 651                        was_on_sock = 1;
 652                }
 653                spin_unlock(&rs->rs_lock);
 654
 655unlock_and_drop:
 656                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 657                rds_message_put(rm);
 658                if (was_on_sock)
 659                        rds_message_put(rm);
 660        }
 661
 662        if (rs) {
 663                rds_wake_sk_sleep(rs);
 664                sock_put(rds_rs_to_sk(rs));
 665        }
 666}
 667
 668/*
 669 * Transports call here when they've determined that the receiver queued
 670 * messages up to, and including, the given sequence number.  Messages are
 671 * moved to the retrans queue when rds_send_xmit picks them off the send
 672 * queue. This means that in the TCP case, the message may not have been
 673 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
 674 * checks the RDS_MSG_HAS_ACK_SEQ bit.
 675 */
 676void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
 677                              is_acked_func is_acked)
 678{
 679        struct rds_message *rm, *tmp;
 680        unsigned long flags;
 681        LIST_HEAD(list);
 682
 683        spin_lock_irqsave(&cp->cp_lock, flags);
 684
 685        list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
 686                if (!rds_send_is_acked(rm, ack, is_acked))
 687                        break;
 688
 689                list_move(&rm->m_conn_item, &list);
 690                clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
 691        }
 692
 693        /* order flag updates with spin locks */
 694        if (!list_empty(&list))
 695                smp_mb__after_atomic();
 696
 697        spin_unlock_irqrestore(&cp->cp_lock, flags);
 698
 699        /* now remove the messages from the sock list as needed */
 700        rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
 701}
 702EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
 703
 704void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
 705                         is_acked_func is_acked)
 706{
 707        WARN_ON(conn->c_trans->t_mp_capable);
 708        rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
 709}
 710EXPORT_SYMBOL_GPL(rds_send_drop_acked);
 711
 712void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
 713{
 714        struct rds_message *rm, *tmp;
 715        struct rds_connection *conn;
 716        struct rds_conn_path *cp;
 717        unsigned long flags;
 718        LIST_HEAD(list);
 719
 720        /* get all the messages we're dropping under the rs lock */
 721        spin_lock_irqsave(&rs->rs_lock, flags);
 722
 723        list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
 724                if (dest &&
 725                    (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) ||
 726                     dest->sin6_port != rm->m_inc.i_hdr.h_dport))
 727                        continue;
 728
 729                list_move(&rm->m_sock_item, &list);
 730                rds_send_sndbuf_remove(rs, rm);
 731                clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
 732        }
 733
 734        /* order flag updates with the rs lock */
 735        smp_mb__after_atomic();
 736
 737        spin_unlock_irqrestore(&rs->rs_lock, flags);
 738
 739        if (list_empty(&list))
 740                return;
 741
 742        /* Remove the messages from the conn */
 743        list_for_each_entry(rm, &list, m_sock_item) {
 744
 745                conn = rm->m_inc.i_conn;
 746                if (conn->c_trans->t_mp_capable)
 747                        cp = rm->m_inc.i_conn_path;
 748                else
 749                        cp = &conn->c_path[0];
 750
 751                spin_lock_irqsave(&cp->cp_lock, flags);
 752                /*
 753                 * Maybe someone else beat us to removing rm from the conn.
 754                 * If we race with their flag update we'll get the lock and
 755                 * then really see that the flag has been cleared.
 756                 */
 757                if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
 758                        spin_unlock_irqrestore(&cp->cp_lock, flags);
 759                        continue;
 760                }
 761                list_del_init(&rm->m_conn_item);
 762                spin_unlock_irqrestore(&cp->cp_lock, flags);
 763
 764                /*
 765                 * Couldn't grab m_rs_lock in top loop (lock ordering),
 766                 * but we can now.
 767                 */
 768                spin_lock_irqsave(&rm->m_rs_lock, flags);
 769
 770                spin_lock(&rs->rs_lock);
 771                __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
 772                spin_unlock(&rs->rs_lock);
 773
 774                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 775
 776                rds_message_put(rm);
 777        }
 778
 779        rds_wake_sk_sleep(rs);
 780
 781        while (!list_empty(&list)) {
 782                rm = list_entry(list.next, struct rds_message, m_sock_item);
 783                list_del_init(&rm->m_sock_item);
 784                rds_message_wait(rm);
 785
 786                /* just in case the code above skipped this message
 787                 * because RDS_MSG_ON_CONN wasn't set, run it again here
 788                 * taking m_rs_lock is the only thing that keeps us
 789                 * from racing with ack processing.
 790                 */
 791                spin_lock_irqsave(&rm->m_rs_lock, flags);
 792
 793                spin_lock(&rs->rs_lock);
 794                __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
 795                spin_unlock(&rs->rs_lock);
 796
 797                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 798
 799                rds_message_put(rm);
 800        }
 801}
 802
 803/*
 804 * we only want this to fire once so we use the callers 'queued'.  It's
 805 * possible that another thread can race with us and remove the
 806 * message from the flow with RDS_CANCEL_SENT_TO.
 807 */
 808static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
 809                             struct rds_conn_path *cp,
 810                             struct rds_message *rm, __be16 sport,
 811                             __be16 dport, int *queued)
 812{
 813        unsigned long flags;
 814        u32 len;
 815
 816        if (*queued)
 817                goto out;
 818
 819        len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
 820
 821        /* this is the only place which holds both the socket's rs_lock
 822         * and the connection's c_lock */
 823        spin_lock_irqsave(&rs->rs_lock, flags);
 824
 825        /*
 826         * If there is a little space in sndbuf, we don't queue anything,
 827         * and userspace gets -EAGAIN. But poll() indicates there's send
 828         * room. This can lead to bad behavior (spinning) if snd_bytes isn't
 829         * freed up by incoming acks. So we check the *old* value of
 830         * rs_snd_bytes here to allow the last msg to exceed the buffer,
 831         * and poll() now knows no more data can be sent.
 832         */
 833        if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
 834                rs->rs_snd_bytes += len;
 835
 836                /* let recv side know we are close to send space exhaustion.
 837                 * This is probably not the optimal way to do it, as this
 838                 * means we set the flag on *all* messages as soon as our
 839                 * throughput hits a certain threshold.
 840                 */
 841                if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
 842                        set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 843
 844                list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
 845                set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
 846                rds_message_addref(rm);
 847                sock_hold(rds_rs_to_sk(rs));
 848                rm->m_rs = rs;
 849
 850                /* The code ordering is a little weird, but we're
 851                   trying to minimize the time we hold c_lock */
 852                rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
 853                rm->m_inc.i_conn = conn;
 854                rm->m_inc.i_conn_path = cp;
 855                rds_message_addref(rm);
 856
 857                spin_lock(&cp->cp_lock);
 858                rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
 859                list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
 860                set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
 861                spin_unlock(&cp->cp_lock);
 862
 863                rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
 864                         rm, len, rs, rs->rs_snd_bytes,
 865                         (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
 866
 867                *queued = 1;
 868        }
 869
 870        spin_unlock_irqrestore(&rs->rs_lock, flags);
 871out:
 872        return *queued;
 873}
 874
 875/*
 876 * rds_message is getting to be quite complicated, and we'd like to allocate
 877 * it all in one go. This figures out how big it needs to be up front.
 878 */
 879static int rds_rm_size(struct msghdr *msg, int num_sgs)
 880{
 881        struct cmsghdr *cmsg;
 882        int size = 0;
 883        int cmsg_groups = 0;
 884        int retval;
 885        bool zcopy_cookie = false;
 886
 887        for_each_cmsghdr(cmsg, msg) {
 888                if (!CMSG_OK(msg, cmsg))
 889                        return -EINVAL;
 890
 891                if (cmsg->cmsg_level != SOL_RDS)
 892                        continue;
 893
 894                switch (cmsg->cmsg_type) {
 895                case RDS_CMSG_RDMA_ARGS:
 896                        cmsg_groups |= 1;
 897                        retval = rds_rdma_extra_size(CMSG_DATA(cmsg));
 898                        if (retval < 0)
 899                                return retval;
 900                        size += retval;
 901
 902                        break;
 903
 904                case RDS_CMSG_ZCOPY_COOKIE:
 905                        zcopy_cookie = true;
 906                        /* fall through */
 907
 908                case RDS_CMSG_RDMA_DEST:
 909                case RDS_CMSG_RDMA_MAP:
 910                        cmsg_groups |= 2;
 911                        /* these are valid but do no add any size */
 912                        break;
 913
 914                case RDS_CMSG_ATOMIC_CSWP:
 915                case RDS_CMSG_ATOMIC_FADD:
 916                case RDS_CMSG_MASKED_ATOMIC_CSWP:
 917                case RDS_CMSG_MASKED_ATOMIC_FADD:
 918                        cmsg_groups |= 1;
 919                        size += sizeof(struct scatterlist);
 920                        break;
 921
 922                default:
 923                        return -EINVAL;
 924                }
 925
 926        }
 927
 928        if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
 929                return -EINVAL;
 930
 931        size += num_sgs * sizeof(struct scatterlist);
 932
 933        /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
 934        if (cmsg_groups == 3)
 935                return -EINVAL;
 936
 937        return size;
 938}
 939
 940static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
 941                          struct cmsghdr *cmsg)
 942{
 943        u32 *cookie;
 944
 945        if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) ||
 946            !rm->data.op_mmp_znotifier)
 947                return -EINVAL;
 948        cookie = CMSG_DATA(cmsg);
 949        rm->data.op_mmp_znotifier->z_cookie = *cookie;
 950        return 0;
 951}
 952
 953static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 954                         struct msghdr *msg, int *allocated_mr)
 955{
 956        struct cmsghdr *cmsg;
 957        int ret = 0;
 958
 959        for_each_cmsghdr(cmsg, msg) {
 960                if (!CMSG_OK(msg, cmsg))
 961                        return -EINVAL;
 962
 963                if (cmsg->cmsg_level != SOL_RDS)
 964                        continue;
 965
 966                /* As a side effect, RDMA_DEST and RDMA_MAP will set
 967                 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
 968                 */
 969                switch (cmsg->cmsg_type) {
 970                case RDS_CMSG_RDMA_ARGS:
 971                        ret = rds_cmsg_rdma_args(rs, rm, cmsg);
 972                        break;
 973
 974                case RDS_CMSG_RDMA_DEST:
 975                        ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
 976                        break;
 977
 978                case RDS_CMSG_RDMA_MAP:
 979                        ret = rds_cmsg_rdma_map(rs, rm, cmsg);
 980                        if (!ret)
 981                                *allocated_mr = 1;
 982                        else if (ret == -ENODEV)
 983                                /* Accommodate the get_mr() case which can fail
 984                                 * if connection isn't established yet.
 985                                 */
 986                                ret = -EAGAIN;
 987                        break;
 988                case RDS_CMSG_ATOMIC_CSWP:
 989                case RDS_CMSG_ATOMIC_FADD:
 990                case RDS_CMSG_MASKED_ATOMIC_CSWP:
 991                case RDS_CMSG_MASKED_ATOMIC_FADD:
 992                        ret = rds_cmsg_atomic(rs, rm, cmsg);
 993                        break;
 994
 995                case RDS_CMSG_ZCOPY_COOKIE:
 996                        ret = rds_cmsg_zcopy(rs, rm, cmsg);
 997                        break;
 998
 999                default:
1000                        return -EINVAL;
1001                }
1002
1003                if (ret)
1004                        break;
1005        }
1006
1007        return ret;
1008}
1009
1010static int rds_send_mprds_hash(struct rds_sock *rs,
1011                               struct rds_connection *conn, int nonblock)
1012{
1013        int hash;
1014
1015        if (conn->c_npaths == 0)
1016                hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
1017        else
1018                hash = RDS_MPATH_HASH(rs, conn->c_npaths);
1019        if (conn->c_npaths == 0 && hash != 0) {
1020                rds_send_ping(conn, 0);
1021
1022                /* The underlying connection is not up yet.  Need to wait
1023                 * until it is up to be sure that the non-zero c_path can be
1024                 * used.  But if we are interrupted, we have to use the zero
1025                 * c_path in case the connection ends up being non-MP capable.
1026                 */
1027                if (conn->c_npaths == 0) {
1028                        /* Cannot wait for the connection be made, so just use
1029                         * the base c_path.
1030                         */
1031                        if (nonblock)
1032                                return 0;
1033                        if (wait_event_interruptible(conn->c_hs_waitq,
1034                                                     conn->c_npaths != 0))
1035                                hash = 0;
1036                }
1037                if (conn->c_npaths == 1)
1038                        hash = 0;
1039        }
1040        return hash;
1041}
1042
1043static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
1044{
1045        struct rds_rdma_args *args;
1046        struct cmsghdr *cmsg;
1047
1048        for_each_cmsghdr(cmsg, msg) {
1049                if (!CMSG_OK(msg, cmsg))
1050                        return -EINVAL;
1051
1052                if (cmsg->cmsg_level != SOL_RDS)
1053                        continue;
1054
1055                if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
1056                        if (cmsg->cmsg_len <
1057                            CMSG_LEN(sizeof(struct rds_rdma_args)))
1058                                return -EINVAL;
1059                        args = CMSG_DATA(cmsg);
1060                        *rdma_bytes += args->remote_vec.bytes;
1061                }
1062        }
1063        return 0;
1064}
1065
1066int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1067{
1068        struct sock *sk = sock->sk;
1069        struct rds_sock *rs = rds_sk_to_rs(sk);
1070        DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
1071        DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
1072        __be16 dport;
1073        struct rds_message *rm = NULL;
1074        struct rds_connection *conn;
1075        int ret = 0;
1076        int queued = 0, allocated_mr = 0;
1077        int nonblock = msg->msg_flags & MSG_DONTWAIT;
1078        long timeo = sock_sndtimeo(sk, nonblock);
1079        struct rds_conn_path *cpath;
1080        struct in6_addr daddr;
1081        __u32 scope_id = 0;
1082        size_t total_payload_len = payload_len, rdma_payload_len = 0;
1083        bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
1084                      sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
1085        int num_sgs = ceil(payload_len, PAGE_SIZE);
1086        int namelen;
1087
1088        /* Mirror Linux UDP mirror of BSD error message compatibility */
1089        /* XXX: Perhaps MSG_MORE someday */
1090        if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
1091                ret = -EOPNOTSUPP;
1092                goto out;
1093        }
1094
1095        namelen = msg->msg_namelen;
1096        if (namelen != 0) {
1097                if (namelen < sizeof(*usin)) {
1098                        ret = -EINVAL;
1099                        goto out;
1100                }
1101                switch (usin->sin_family) {
1102                case AF_INET:
1103                        if (usin->sin_addr.s_addr == htonl(INADDR_ANY) ||
1104                            usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) ||
1105                            IN_MULTICAST(ntohl(usin->sin_addr.s_addr))) {
1106                                ret = -EINVAL;
1107                                goto out;
1108                        }
1109                        ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr);
1110                        dport = usin->sin_port;
1111                        break;
1112
1113#if IS_ENABLED(CONFIG_IPV6)
1114                case AF_INET6: {
1115                        int addr_type;
1116
1117                        if (namelen < sizeof(*sin6)) {
1118                                ret = -EINVAL;
1119                                goto out;
1120                        }
1121                        addr_type = ipv6_addr_type(&sin6->sin6_addr);
1122                        if (!(addr_type & IPV6_ADDR_UNICAST)) {
1123                                __be32 addr4;
1124
1125                                if (!(addr_type & IPV6_ADDR_MAPPED)) {
1126                                        ret = -EINVAL;
1127                                        goto out;
1128                                }
1129
1130                                /* It is a mapped address.  Need to do some
1131                                 * sanity checks.
1132                                 */
1133                                addr4 = sin6->sin6_addr.s6_addr32[3];
1134                                if (addr4 == htonl(INADDR_ANY) ||
1135                                    addr4 == htonl(INADDR_BROADCAST) ||
1136                                    IN_MULTICAST(ntohl(addr4))) {
1137                                        ret = -EINVAL;
1138                                        goto out;
1139                                }
1140                        }
1141                        if (addr_type & IPV6_ADDR_LINKLOCAL) {
1142                                if (sin6->sin6_scope_id == 0) {
1143                                        ret = -EINVAL;
1144                                        goto out;
1145                                }
1146                                scope_id = sin6->sin6_scope_id;
1147                        }
1148
1149                        daddr = sin6->sin6_addr;
1150                        dport = sin6->sin6_port;
1151                        break;
1152                }
1153#endif
1154
1155                default:
1156                        ret = -EINVAL;
1157                        goto out;
1158                }
1159        } else {
1160                /* We only care about consistency with ->connect() */
1161                lock_sock(sk);
1162                daddr = rs->rs_conn_addr;
1163                dport = rs->rs_conn_port;
1164                scope_id = rs->rs_bound_scope_id;
1165                release_sock(sk);
1166        }
1167
1168        lock_sock(sk);
1169        if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) {
1170                release_sock(sk);
1171                ret = -ENOTCONN;
1172                goto out;
1173        } else if (namelen != 0) {
1174                /* Cannot send to an IPv4 address using an IPv6 source
1175                 * address and cannot send to an IPv6 address using an
1176                 * IPv4 source address.
1177                 */
1178                if (ipv6_addr_v4mapped(&daddr) ^
1179                    ipv6_addr_v4mapped(&rs->rs_bound_addr)) {
1180                        release_sock(sk);
1181                        ret = -EOPNOTSUPP;
1182                        goto out;
1183                }
1184                /* If the socket is already bound to a link local address,
1185                 * it can only send to peers on the same link.  But allow
1186                 * communicating beween link local and non-link local address.
1187                 */
1188                if (scope_id != rs->rs_bound_scope_id) {
1189                        if (!scope_id) {
1190                                scope_id = rs->rs_bound_scope_id;
1191                        } else if (rs->rs_bound_scope_id) {
1192                                release_sock(sk);
1193                                ret = -EINVAL;
1194                                goto out;
1195                        }
1196                }
1197        }
1198        release_sock(sk);
1199
1200        ret = rds_rdma_bytes(msg, &rdma_payload_len);
1201        if (ret)
1202                goto out;
1203
1204        total_payload_len += rdma_payload_len;
1205        if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
1206                ret = -EMSGSIZE;
1207                goto out;
1208        }
1209
1210        if (payload_len > rds_sk_sndbuf(rs)) {
1211                ret = -EMSGSIZE;
1212                goto out;
1213        }
1214
1215        if (zcopy) {
1216                if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
1217                        ret = -EOPNOTSUPP;
1218                        goto out;
1219                }
1220                num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
1221        }
1222        /* size of rm including all sgs */
1223        ret = rds_rm_size(msg, num_sgs);
1224        if (ret < 0)
1225                goto out;
1226
1227        rm = rds_message_alloc(ret, GFP_KERNEL);
1228        if (!rm) {
1229                ret = -ENOMEM;
1230                goto out;
1231        }
1232
1233        /* Attach data to the rm */
1234        if (payload_len) {
1235                rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
1236                if (!rm->data.op_sg) {
1237                        ret = -ENOMEM;
1238                        goto out;
1239                }
1240                ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
1241                if (ret)
1242                        goto out;
1243        }
1244        rm->data.op_active = 1;
1245
1246        rm->m_daddr = daddr;
1247
1248        /* rds_conn_create has a spinlock that runs with IRQ off.
1249         * Caching the conn in the socket helps a lot. */
1250        if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr))
1251                conn = rs->rs_conn;
1252        else {
1253                conn = rds_conn_create_outgoing(sock_net(sock->sk),
1254                                                &rs->rs_bound_addr, &daddr,
1255                                                rs->rs_transport,
1256                                                sock->sk->sk_allocation,
1257                                                scope_id);
1258                if (IS_ERR(conn)) {
1259                        ret = PTR_ERR(conn);
1260                        goto out;
1261                }
1262                rs->rs_conn = conn;
1263        }
1264
1265        if (conn->c_trans->t_mp_capable)
1266                cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
1267        else
1268                cpath = &conn->c_path[0];
1269
1270        rm->m_conn_path = cpath;
1271
1272        /* Parse any control messages the user may have included. */
1273        ret = rds_cmsg_send(rs, rm, msg, &allocated_mr);
1274        if (ret) {
1275                /* Trigger connection so that its ready for the next retry */
1276                if (ret ==  -EAGAIN)
1277                        rds_conn_connect_if_down(conn);
1278                goto out;
1279        }
1280
1281        if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
1282                printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
1283                               &rm->rdma, conn->c_trans->xmit_rdma);
1284                ret = -EOPNOTSUPP;
1285                goto out;
1286        }
1287
1288        if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
1289                printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
1290                               &rm->atomic, conn->c_trans->xmit_atomic);
1291                ret = -EOPNOTSUPP;
1292                goto out;
1293        }
1294
1295        if (rds_destroy_pending(conn)) {
1296                ret = -EAGAIN;
1297                goto out;
1298        }
1299
1300        rds_conn_path_connect_if_down(cpath);
1301
1302        ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
1303        if (ret) {
1304                rs->rs_seen_congestion = 1;
1305                goto out;
1306        }
1307        while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
1308                                  dport, &queued)) {
1309                rds_stats_inc(s_send_queue_full);
1310
1311                if (nonblock) {
1312                        ret = -EAGAIN;
1313                        goto out;
1314                }
1315
1316                timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
1317                                        rds_send_queue_rm(rs, conn, cpath, rm,
1318                                                          rs->rs_bound_port,
1319                                                          dport,
1320                                                          &queued),
1321                                        timeo);
1322                rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
1323                if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
1324                        continue;
1325
1326                ret = timeo;
1327                if (ret == 0)
1328                        ret = -ETIMEDOUT;
1329                goto out;
1330        }
1331
1332        /*
1333         * By now we've committed to the send.  We reuse rds_send_worker()
1334         * to retry sends in the rds thread if the transport asks us to.
1335         */
1336        rds_stats_inc(s_send_queued);
1337
1338        ret = rds_send_xmit(cpath);
1339        if (ret == -ENOMEM || ret == -EAGAIN) {
1340                ret = 0;
1341                rcu_read_lock();
1342                if (rds_destroy_pending(cpath->cp_conn))
1343                        ret = -ENETUNREACH;
1344                else
1345                        queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1346                rcu_read_unlock();
1347        }
1348        if (ret)
1349                goto out;
1350        rds_message_put(rm);
1351        return payload_len;
1352
1353out:
1354        /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
1355         * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
1356         * or in any other way, we need to destroy the MR again */
1357        if (allocated_mr)
1358                rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
1359
1360        if (rm)
1361                rds_message_put(rm);
1362        return ret;
1363}
1364
1365/*
1366 * send out a probe. Can be shared by rds_send_ping,
1367 * rds_send_pong, rds_send_hb.
1368 * rds_send_hb should use h_flags
1369 *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
1370 * or
1371 *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
1372 */
1373static int
1374rds_send_probe(struct rds_conn_path *cp, __be16 sport,
1375               __be16 dport, u8 h_flags)
1376{
1377        struct rds_message *rm;
1378        unsigned long flags;
1379        int ret = 0;
1380
1381        rm = rds_message_alloc(0, GFP_ATOMIC);
1382        if (!rm) {
1383                ret = -ENOMEM;
1384                goto out;
1385        }
1386
1387        rm->m_daddr = cp->cp_conn->c_faddr;
1388        rm->data.op_active = 1;
1389
1390        rds_conn_path_connect_if_down(cp);
1391
1392        ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
1393        if (ret)
1394                goto out;
1395
1396        spin_lock_irqsave(&cp->cp_lock, flags);
1397        list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
1398        set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
1399        rds_message_addref(rm);
1400        rm->m_inc.i_conn = cp->cp_conn;
1401        rm->m_inc.i_conn_path = cp;
1402
1403        rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
1404                                    cp->cp_next_tx_seq);
1405        rm->m_inc.i_hdr.h_flags |= h_flags;
1406        cp->cp_next_tx_seq++;
1407
1408        if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
1409            cp->cp_conn->c_trans->t_mp_capable) {
1410                u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
1411                u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
1412
1413                rds_message_add_extension(&rm->m_inc.i_hdr,
1414                                          RDS_EXTHDR_NPATHS, &npaths,
1415                                          sizeof(npaths));
1416                rds_message_add_extension(&rm->m_inc.i_hdr,
1417                                          RDS_EXTHDR_GEN_NUM,
1418                                          &my_gen_num,
1419                                          sizeof(u32));
1420        }
1421        spin_unlock_irqrestore(&cp->cp_lock, flags);
1422
1423        rds_stats_inc(s_send_queued);
1424        rds_stats_inc(s_send_pong);
1425
1426        /* schedule the send work on rds_wq */
1427        rcu_read_lock();
1428        if (!rds_destroy_pending(cp->cp_conn))
1429                queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
1430        rcu_read_unlock();
1431
1432        rds_message_put(rm);
1433        return 0;
1434
1435out:
1436        if (rm)
1437                rds_message_put(rm);
1438        return ret;
1439}
1440
1441int
1442rds_send_pong(struct rds_conn_path *cp, __be16 dport)
1443{
1444        return rds_send_probe(cp, 0, dport, 0);
1445}
1446
1447void
1448rds_send_ping(struct rds_connection *conn, int cp_index)
1449{
1450        unsigned long flags;
1451        struct rds_conn_path *cp = &conn->c_path[cp_index];
1452
1453        spin_lock_irqsave(&cp->cp_lock, flags);
1454        if (conn->c_ping_triggered) {
1455                spin_unlock_irqrestore(&cp->cp_lock, flags);
1456                return;
1457        }
1458        conn->c_ping_triggered = 1;
1459        spin_unlock_irqrestore(&cp->cp_lock, flags);
1460        rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
1461}
1462EXPORT_SYMBOL_GPL(rds_send_ping);
1463