linux/net/rds/send.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/moduleparam.h>
  35#include <linux/gfp.h>
  36#include <net/sock.h>
  37#include <linux/in.h>
  38#include <linux/list.h>
  39#include <linux/ratelimit.h>
  40#include <linux/export.h>
  41#include <linux/sizes.h>
  42
  43#include "rds.h"
  44
  45/* When transmitting messages in rds_send_xmit, we need to emerge from
  46 * time to time and briefly release the CPU. Otherwise the softlock watchdog
  47 * will kick our shin.
  48 * Also, it seems fairer to not let one busy connection stall all the
  49 * others.
  50 *
  51 * send_batch_count is the number of times we'll loop in send_xmit. Setting
  52 * it to 0 will restore the old behavior (where we looped until we had
  53 * drained the queue).
  54 */
  55static int send_batch_count = SZ_1K;
  56module_param(send_batch_count, int, 0444);
  57MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
  58
  59static void rds_send_remove_from_sock(struct list_head *messages, int status);
  60
  61/*
  62 * Reset the send state.  Callers must ensure that this doesn't race with
  63 * rds_send_xmit().
  64 */
  65void rds_send_path_reset(struct rds_conn_path *cp)
  66{
  67        struct rds_message *rm, *tmp;
  68        unsigned long flags;
  69
  70        if (cp->cp_xmit_rm) {
  71                rm = cp->cp_xmit_rm;
  72                cp->cp_xmit_rm = NULL;
  73                /* Tell the user the RDMA op is no longer mapped by the
  74                 * transport. This isn't entirely true (it's flushed out
  75                 * independently) but as the connection is down, there's
  76                 * no ongoing RDMA to/from that memory */
  77                rds_message_unmapped(rm);
  78                rds_message_put(rm);
  79        }
  80
  81        cp->cp_xmit_sg = 0;
  82        cp->cp_xmit_hdr_off = 0;
  83        cp->cp_xmit_data_off = 0;
  84        cp->cp_xmit_atomic_sent = 0;
  85        cp->cp_xmit_rdma_sent = 0;
  86        cp->cp_xmit_data_sent = 0;
  87
  88        cp->cp_conn->c_map_queued = 0;
  89
  90        cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
  91        cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
  92
  93        /* Mark messages as retransmissions, and move them to the send q */
  94        spin_lock_irqsave(&cp->cp_lock, flags);
  95        list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
  96                set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
  97                set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
  98        }
  99        list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
 100        spin_unlock_irqrestore(&cp->cp_lock, flags);
 101}
 102EXPORT_SYMBOL_GPL(rds_send_path_reset);
 103
 104static int acquire_in_xmit(struct rds_conn_path *cp)
 105{
 106        return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
 107}
 108
 109static void release_in_xmit(struct rds_conn_path *cp)
 110{
 111        clear_bit(RDS_IN_XMIT, &cp->cp_flags);
 112        smp_mb__after_atomic();
 113        /*
 114         * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
 115         * hot path and finding waiters is very rare.  We don't want to walk
 116         * the system-wide hashed waitqueue buckets in the fast path only to
 117         * almost never find waiters.
 118         */
 119        if (waitqueue_active(&cp->cp_waitq))
 120                wake_up_all(&cp->cp_waitq);
 121}
 122
 123/*
 124 * We're making the conscious trade-off here to only send one message
 125 * down the connection at a time.
 126 *   Pro:
 127 *      - tx queueing is a simple fifo list
 128 *      - reassembly is optional and easily done by transports per conn
 129 *      - no per flow rx lookup at all, straight to the socket
 130 *      - less per-frag memory and wire overhead
 131 *   Con:
 132 *      - queued acks can be delayed behind large messages
 133 *   Depends:
 134 *      - small message latency is higher behind queued large messages
 135 *      - large message latency isn't starved by intervening small sends
 136 */
 137int rds_send_xmit(struct rds_conn_path *cp)
 138{
 139        struct rds_connection *conn = cp->cp_conn;
 140        struct rds_message *rm;
 141        unsigned long flags;
 142        unsigned int tmp;
 143        struct scatterlist *sg;
 144        int ret = 0;
 145        LIST_HEAD(to_be_dropped);
 146        int batch_count;
 147        unsigned long send_gen = 0;
 148
 149restart:
 150        batch_count = 0;
 151
 152        /*
 153         * sendmsg calls here after having queued its message on the send
 154         * queue.  We only have one task feeding the connection at a time.  If
 155         * another thread is already feeding the queue then we back off.  This
 156         * avoids blocking the caller and trading per-connection data between
 157         * caches per message.
 158         */
 159        if (!acquire_in_xmit(cp)) {
 160                rds_stats_inc(s_send_lock_contention);
 161                ret = -ENOMEM;
 162                goto out;
 163        }
 164
 165        if (rds_destroy_pending(cp->cp_conn)) {
 166                release_in_xmit(cp);
 167                ret = -ENETUNREACH; /* dont requeue send work */
 168                goto out;
 169        }
 170
 171        /*
 172         * we record the send generation after doing the xmit acquire.
 173         * if someone else manages to jump in and do some work, we'll use
 174         * this to avoid a goto restart farther down.
 175         *
 176         * The acquire_in_xmit() check above ensures that only one
 177         * caller can increment c_send_gen at any time.
 178         */
 179        send_gen = READ_ONCE(cp->cp_send_gen) + 1;
 180        WRITE_ONCE(cp->cp_send_gen, send_gen);
 181
 182        /*
 183         * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
 184         * we do the opposite to avoid races.
 185         */
 186        if (!rds_conn_path_up(cp)) {
 187                release_in_xmit(cp);
 188                ret = 0;
 189                goto out;
 190        }
 191
 192        if (conn->c_trans->xmit_path_prepare)
 193                conn->c_trans->xmit_path_prepare(cp);
 194
 195        /*
 196         * spin trying to push headers and data down the connection until
 197         * the connection doesn't make forward progress.
 198         */
 199        while (1) {
 200
 201                rm = cp->cp_xmit_rm;
 202
 203                /*
 204                 * If between sending messages, we can send a pending congestion
 205                 * map update.
 206                 */
 207                if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
 208                        rm = rds_cong_update_alloc(conn);
 209                        if (IS_ERR(rm)) {
 210                                ret = PTR_ERR(rm);
 211                                break;
 212                        }
 213                        rm->data.op_active = 1;
 214                        rm->m_inc.i_conn_path = cp;
 215                        rm->m_inc.i_conn = cp->cp_conn;
 216
 217                        cp->cp_xmit_rm = rm;
 218                }
 219
 220                /*
 221                 * If not already working on one, grab the next message.
 222                 *
 223                 * cp_xmit_rm holds a ref while we're sending this message down
 224                 * the connction.  We can use this ref while holding the
 225                 * send_sem.. rds_send_reset() is serialized with it.
 226                 */
 227                if (!rm) {
 228                        unsigned int len;
 229
 230                        batch_count++;
 231
 232                        /* we want to process as big a batch as we can, but
 233                         * we also want to avoid softlockups.  If we've been
 234                         * through a lot of messages, lets back off and see
 235                         * if anyone else jumps in
 236                         */
 237                        if (batch_count >= send_batch_count)
 238                                goto over_batch;
 239
 240                        spin_lock_irqsave(&cp->cp_lock, flags);
 241
 242                        if (!list_empty(&cp->cp_send_queue)) {
 243                                rm = list_entry(cp->cp_send_queue.next,
 244                                                struct rds_message,
 245                                                m_conn_item);
 246                                rds_message_addref(rm);
 247
 248                                /*
 249                                 * Move the message from the send queue to the retransmit
 250                                 * list right away.
 251                                 */
 252                                list_move_tail(&rm->m_conn_item,
 253                                               &cp->cp_retrans);
 254                        }
 255
 256                        spin_unlock_irqrestore(&cp->cp_lock, flags);
 257
 258                        if (!rm)
 259                                break;
 260
 261                        /* Unfortunately, the way Infiniband deals with
 262                         * RDMA to a bad MR key is by moving the entire
 263                         * queue pair to error state. We cold possibly
 264                         * recover from that, but right now we drop the
 265                         * connection.
 266                         * Therefore, we never retransmit messages with RDMA ops.
 267                         */
 268                        if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
 269                            (rm->rdma.op_active &&
 270                            test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) {
 271                                spin_lock_irqsave(&cp->cp_lock, flags);
 272                                if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
 273                                        list_move(&rm->m_conn_item, &to_be_dropped);
 274                                spin_unlock_irqrestore(&cp->cp_lock, flags);
 275                                continue;
 276                        }
 277
 278                        /* Require an ACK every once in a while */
 279                        len = ntohl(rm->m_inc.i_hdr.h_len);
 280                        if (cp->cp_unacked_packets == 0 ||
 281                            cp->cp_unacked_bytes < len) {
 282                                set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 283
 284                                cp->cp_unacked_packets =
 285                                        rds_sysctl_max_unacked_packets;
 286                                cp->cp_unacked_bytes =
 287                                        rds_sysctl_max_unacked_bytes;
 288                                rds_stats_inc(s_send_ack_required);
 289                        } else {
 290                                cp->cp_unacked_bytes -= len;
 291                                cp->cp_unacked_packets--;
 292                        }
 293
 294                        cp->cp_xmit_rm = rm;
 295                }
 296
 297                /* The transport either sends the whole rdma or none of it */
 298                if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
 299                        rm->m_final_op = &rm->rdma;
 300                        /* The transport owns the mapped memory for now.
 301                         * You can't unmap it while it's on the send queue
 302                         */
 303                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 304                        ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
 305                        if (ret) {
 306                                clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
 307                                wake_up_interruptible(&rm->m_flush_wait);
 308                                break;
 309                        }
 310                        cp->cp_xmit_rdma_sent = 1;
 311
 312                }
 313
 314                if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
 315                        rm->m_final_op = &rm->atomic;
 316                        /* The transport owns the mapped memory for now.
 317                         * You can't unmap it while it's on the send queue
 318                         */
 319                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 320                        ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
 321                        if (ret) {
 322                                clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
 323                                wake_up_interruptible(&rm->m_flush_wait);
 324                                break;
 325                        }
 326                        cp->cp_xmit_atomic_sent = 1;
 327
 328                }
 329
 330                /*
 331                 * A number of cases require an RDS header to be sent
 332                 * even if there is no data.
 333                 * We permit 0-byte sends; rds-ping depends on this.
 334                 * However, if there are exclusively attached silent ops,
 335                 * we skip the hdr/data send, to enable silent operation.
 336                 */
 337                if (rm->data.op_nents == 0) {
 338                        int ops_present;
 339                        int all_ops_are_silent = 1;
 340
 341                        ops_present = (rm->atomic.op_active || rm->rdma.op_active);
 342                        if (rm->atomic.op_active && !rm->atomic.op_silent)
 343                                all_ops_are_silent = 0;
 344                        if (rm->rdma.op_active && !rm->rdma.op_silent)
 345                                all_ops_are_silent = 0;
 346
 347                        if (ops_present && all_ops_are_silent
 348                            && !rm->m_rdma_cookie)
 349                                rm->data.op_active = 0;
 350                }
 351
 352                if (rm->data.op_active && !cp->cp_xmit_data_sent) {
 353                        rm->m_final_op = &rm->data;
 354
 355                        ret = conn->c_trans->xmit(conn, rm,
 356                                                  cp->cp_xmit_hdr_off,
 357                                                  cp->cp_xmit_sg,
 358                                                  cp->cp_xmit_data_off);
 359                        if (ret <= 0)
 360                                break;
 361
 362                        if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
 363                                tmp = min_t(int, ret,
 364                                            sizeof(struct rds_header) -
 365                                            cp->cp_xmit_hdr_off);
 366                                cp->cp_xmit_hdr_off += tmp;
 367                                ret -= tmp;
 368                        }
 369
 370                        sg = &rm->data.op_sg[cp->cp_xmit_sg];
 371                        while (ret) {
 372                                tmp = min_t(int, ret, sg->length -
 373                                                      cp->cp_xmit_data_off);
 374                                cp->cp_xmit_data_off += tmp;
 375                                ret -= tmp;
 376                                if (cp->cp_xmit_data_off == sg->length) {
 377                                        cp->cp_xmit_data_off = 0;
 378                                        sg++;
 379                                        cp->cp_xmit_sg++;
 380                                        BUG_ON(ret != 0 && cp->cp_xmit_sg ==
 381                                               rm->data.op_nents);
 382                                }
 383                        }
 384
 385                        if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
 386                            (cp->cp_xmit_sg == rm->data.op_nents))
 387                                cp->cp_xmit_data_sent = 1;
 388                }
 389
 390                /*
 391                 * A rm will only take multiple times through this loop
 392                 * if there is a data op. Thus, if the data is sent (or there was
 393                 * none), then we're done with the rm.
 394                 */
 395                if (!rm->data.op_active || cp->cp_xmit_data_sent) {
 396                        cp->cp_xmit_rm = NULL;
 397                        cp->cp_xmit_sg = 0;
 398                        cp->cp_xmit_hdr_off = 0;
 399                        cp->cp_xmit_data_off = 0;
 400                        cp->cp_xmit_rdma_sent = 0;
 401                        cp->cp_xmit_atomic_sent = 0;
 402                        cp->cp_xmit_data_sent = 0;
 403
 404                        rds_message_put(rm);
 405                }
 406        }
 407
 408over_batch:
 409        if (conn->c_trans->xmit_path_complete)
 410                conn->c_trans->xmit_path_complete(cp);
 411        release_in_xmit(cp);
 412
 413        /* Nuke any messages we decided not to retransmit. */
 414        if (!list_empty(&to_be_dropped)) {
 415                /* irqs on here, so we can put(), unlike above */
 416                list_for_each_entry(rm, &to_be_dropped, m_conn_item)
 417                        rds_message_put(rm);
 418                rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
 419        }
 420
 421        /*
 422         * Other senders can queue a message after we last test the send queue
 423         * but before we clear RDS_IN_XMIT.  In that case they'd back off and
 424         * not try and send their newly queued message.  We need to check the
 425         * send queue after having cleared RDS_IN_XMIT so that their message
 426         * doesn't get stuck on the send queue.
 427         *
 428         * If the transport cannot continue (i.e ret != 0), then it must
 429         * call us when more room is available, such as from the tx
 430         * completion handler.
 431         *
 432         * We have an extra generation check here so that if someone manages
 433         * to jump in after our release_in_xmit, we'll see that they have done
 434         * some work and we will skip our goto
 435         */
 436        if (ret == 0) {
 437                bool raced;
 438
 439                smp_mb();
 440                raced = send_gen != READ_ONCE(cp->cp_send_gen);
 441
 442                if ((test_bit(0, &conn->c_map_queued) ||
 443                    !list_empty(&cp->cp_send_queue)) && !raced) {
 444                        if (batch_count < send_batch_count)
 445                                goto restart;
 446                        rcu_read_lock();
 447                        if (rds_destroy_pending(cp->cp_conn))
 448                                ret = -ENETUNREACH;
 449                        else
 450                                queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
 451                        rcu_read_unlock();
 452                } else if (raced) {
 453                        rds_stats_inc(s_send_lock_queue_raced);
 454                }
 455        }
 456out:
 457        return ret;
 458}
 459EXPORT_SYMBOL_GPL(rds_send_xmit);
 460
 461static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
 462{
 463        u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
 464
 465        assert_spin_locked(&rs->rs_lock);
 466
 467        BUG_ON(rs->rs_snd_bytes < len);
 468        rs->rs_snd_bytes -= len;
 469
 470        if (rs->rs_snd_bytes == 0)
 471                rds_stats_inc(s_send_queue_empty);
 472}
 473
 474static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
 475                                    is_acked_func is_acked)
 476{
 477        if (is_acked)
 478                return is_acked(rm, ack);
 479        return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
 480}
 481
 482/*
 483 * This is pretty similar to what happens below in the ACK
 484 * handling code - except that we call here as soon as we get
 485 * the IB send completion on the RDMA op and the accompanying
 486 * message.
 487 */
 488void rds_rdma_send_complete(struct rds_message *rm, int status)
 489{
 490        struct rds_sock *rs = NULL;
 491        struct rm_rdma_op *ro;
 492        struct rds_notifier *notifier;
 493        unsigned long flags;
 494        unsigned int notify = 0;
 495
 496        spin_lock_irqsave(&rm->m_rs_lock, flags);
 497
 498        notify =  rm->rdma.op_notify | rm->data.op_notify;
 499        ro = &rm->rdma;
 500        if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
 501            ro->op_active && notify && ro->op_notifier) {
 502                notifier = ro->op_notifier;
 503                rs = rm->m_rs;
 504                sock_hold(rds_rs_to_sk(rs));
 505
 506                notifier->n_status = status;
 507                spin_lock(&rs->rs_lock);
 508                list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
 509                spin_unlock(&rs->rs_lock);
 510
 511                ro->op_notifier = NULL;
 512        }
 513
 514        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 515
 516        if (rs) {
 517                rds_wake_sk_sleep(rs);
 518                sock_put(rds_rs_to_sk(rs));
 519        }
 520}
 521EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
 522
 523/*
 524 * Just like above, except looks at atomic op
 525 */
 526void rds_atomic_send_complete(struct rds_message *rm, int status)
 527{
 528        struct rds_sock *rs = NULL;
 529        struct rm_atomic_op *ao;
 530        struct rds_notifier *notifier;
 531        unsigned long flags;
 532
 533        spin_lock_irqsave(&rm->m_rs_lock, flags);
 534
 535        ao = &rm->atomic;
 536        if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
 537            && ao->op_active && ao->op_notify && ao->op_notifier) {
 538                notifier = ao->op_notifier;
 539                rs = rm->m_rs;
 540                sock_hold(rds_rs_to_sk(rs));
 541
 542                notifier->n_status = status;
 543                spin_lock(&rs->rs_lock);
 544                list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
 545                spin_unlock(&rs->rs_lock);
 546
 547                ao->op_notifier = NULL;
 548        }
 549
 550        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 551
 552        if (rs) {
 553                rds_wake_sk_sleep(rs);
 554                sock_put(rds_rs_to_sk(rs));
 555        }
 556}
 557EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
 558
 559/*
 560 * This is the same as rds_rdma_send_complete except we
 561 * don't do any locking - we have all the ingredients (message,
 562 * socket, socket lock) and can just move the notifier.
 563 */
 564static inline void
 565__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
 566{
 567        struct rm_rdma_op *ro;
 568        struct rm_atomic_op *ao;
 569
 570        ro = &rm->rdma;
 571        if (ro->op_active && ro->op_notify && ro->op_notifier) {
 572                ro->op_notifier->n_status = status;
 573                list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
 574                ro->op_notifier = NULL;
 575        }
 576
 577        ao = &rm->atomic;
 578        if (ao->op_active && ao->op_notify && ao->op_notifier) {
 579                ao->op_notifier->n_status = status;
 580                list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
 581                ao->op_notifier = NULL;
 582        }
 583
 584        /* No need to wake the app - caller does this */
 585}
 586
 587/*
 588 * This removes messages from the socket's list if they're on it.  The list
 589 * argument must be private to the caller, we must be able to modify it
 590 * without locks.  The messages must have a reference held for their
 591 * position on the list.  This function will drop that reference after
 592 * removing the messages from the 'messages' list regardless of if it found
 593 * the messages on the socket list or not.
 594 */
 595static void rds_send_remove_from_sock(struct list_head *messages, int status)
 596{
 597        unsigned long flags;
 598        struct rds_sock *rs = NULL;
 599        struct rds_message *rm;
 600
 601        while (!list_empty(messages)) {
 602                int was_on_sock = 0;
 603
 604                rm = list_entry(messages->next, struct rds_message,
 605                                m_conn_item);
 606                list_del_init(&rm->m_conn_item);
 607
 608                /*
 609                 * If we see this flag cleared then we're *sure* that someone
 610                 * else beat us to removing it from the sock.  If we race
 611                 * with their flag update we'll get the lock and then really
 612                 * see that the flag has been cleared.
 613                 *
 614                 * The message spinlock makes sure nobody clears rm->m_rs
 615                 * while we're messing with it. It does not prevent the
 616                 * message from being removed from the socket, though.
 617                 */
 618                spin_lock_irqsave(&rm->m_rs_lock, flags);
 619                if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
 620                        goto unlock_and_drop;
 621
 622                if (rs != rm->m_rs) {
 623                        if (rs) {
 624                                rds_wake_sk_sleep(rs);
 625                                sock_put(rds_rs_to_sk(rs));
 626                        }
 627                        rs = rm->m_rs;
 628                        if (rs)
 629                                sock_hold(rds_rs_to_sk(rs));
 630                }
 631                if (!rs)
 632                        goto unlock_and_drop;
 633                spin_lock(&rs->rs_lock);
 634
 635                if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
 636                        struct rm_rdma_op *ro = &rm->rdma;
 637                        struct rds_notifier *notifier;
 638
 639                        list_del_init(&rm->m_sock_item);
 640                        rds_send_sndbuf_remove(rs, rm);
 641
 642                        if (ro->op_active && ro->op_notifier &&
 643                               (ro->op_notify || (ro->op_recverr && status))) {
 644                                notifier = ro->op_notifier;
 645                                list_add_tail(&notifier->n_list,
 646                                                &rs->rs_notify_queue);
 647                                if (!notifier->n_status)
 648                                        notifier->n_status = status;
 649                                rm->rdma.op_notifier = NULL;
 650                        }
 651                        was_on_sock = 1;
 652                }
 653                spin_unlock(&rs->rs_lock);
 654
 655unlock_and_drop:
 656                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 657                rds_message_put(rm);
 658                if (was_on_sock)
 659                        rds_message_put(rm);
 660        }
 661
 662        if (rs) {
 663                rds_wake_sk_sleep(rs);
 664                sock_put(rds_rs_to_sk(rs));
 665        }
 666}
 667
 668/*
 669 * Transports call here when they've determined that the receiver queued
 670 * messages up to, and including, the given sequence number.  Messages are
 671 * moved to the retrans queue when rds_send_xmit picks them off the send
 672 * queue. This means that in the TCP case, the message may not have been
 673 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
 674 * checks the RDS_MSG_HAS_ACK_SEQ bit.
 675 */
 676void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
 677                              is_acked_func is_acked)
 678{
 679        struct rds_message *rm, *tmp;
 680        unsigned long flags;
 681        LIST_HEAD(list);
 682
 683        spin_lock_irqsave(&cp->cp_lock, flags);
 684
 685        list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
 686                if (!rds_send_is_acked(rm, ack, is_acked))
 687                        break;
 688
 689                list_move(&rm->m_conn_item, &list);
 690                clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
 691        }
 692
 693        /* order flag updates with spin locks */
 694        if (!list_empty(&list))
 695                smp_mb__after_atomic();
 696
 697        spin_unlock_irqrestore(&cp->cp_lock, flags);
 698
 699        /* now remove the messages from the sock list as needed */
 700        rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
 701}
 702EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
 703
 704void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
 705                         is_acked_func is_acked)
 706{
 707        WARN_ON(conn->c_trans->t_mp_capable);
 708        rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
 709}
 710EXPORT_SYMBOL_GPL(rds_send_drop_acked);
 711
 712void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
 713{
 714        struct rds_message *rm, *tmp;
 715        struct rds_connection *conn;
 716        struct rds_conn_path *cp;
 717        unsigned long flags;
 718        LIST_HEAD(list);
 719
 720        /* get all the messages we're dropping under the rs lock */
 721        spin_lock_irqsave(&rs->rs_lock, flags);
 722
 723        list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
 724                if (dest &&
 725                    (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) ||
 726                     dest->sin6_port != rm->m_inc.i_hdr.h_dport))
 727                        continue;
 728
 729                list_move(&rm->m_sock_item, &list);
 730                rds_send_sndbuf_remove(rs, rm);
 731                clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
 732        }
 733
 734        /* order flag updates with the rs lock */
 735        smp_mb__after_atomic();
 736
 737        spin_unlock_irqrestore(&rs->rs_lock, flags);
 738
 739        if (list_empty(&list))
 740                return;
 741
 742        /* Remove the messages from the conn */
 743        list_for_each_entry(rm, &list, m_sock_item) {
 744
 745                conn = rm->m_inc.i_conn;
 746                if (conn->c_trans->t_mp_capable)
 747                        cp = rm->m_inc.i_conn_path;
 748                else
 749                        cp = &conn->c_path[0];
 750
 751                spin_lock_irqsave(&cp->cp_lock, flags);
 752                /*
 753                 * Maybe someone else beat us to removing rm from the conn.
 754                 * If we race with their flag update we'll get the lock and
 755                 * then really see that the flag has been cleared.
 756                 */
 757                if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
 758                        spin_unlock_irqrestore(&cp->cp_lock, flags);
 759                        continue;
 760                }
 761                list_del_init(&rm->m_conn_item);
 762                spin_unlock_irqrestore(&cp->cp_lock, flags);
 763
 764                /*
 765                 * Couldn't grab m_rs_lock in top loop (lock ordering),
 766                 * but we can now.
 767                 */
 768                spin_lock_irqsave(&rm->m_rs_lock, flags);
 769
 770                spin_lock(&rs->rs_lock);
 771                __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
 772                spin_unlock(&rs->rs_lock);
 773
 774                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 775
 776                rds_message_put(rm);
 777        }
 778
 779        rds_wake_sk_sleep(rs);
 780
 781        while (!list_empty(&list)) {
 782                rm = list_entry(list.next, struct rds_message, m_sock_item);
 783                list_del_init(&rm->m_sock_item);
 784                rds_message_wait(rm);
 785
 786                /* just in case the code above skipped this message
 787                 * because RDS_MSG_ON_CONN wasn't set, run it again here
 788                 * taking m_rs_lock is the only thing that keeps us
 789                 * from racing with ack processing.
 790                 */
 791                spin_lock_irqsave(&rm->m_rs_lock, flags);
 792
 793                spin_lock(&rs->rs_lock);
 794                __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
 795                spin_unlock(&rs->rs_lock);
 796
 797                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 798
 799                rds_message_put(rm);
 800        }
 801}
 802
 803/*
 804 * we only want this to fire once so we use the callers 'queued'.  It's
 805 * possible that another thread can race with us and remove the
 806 * message from the flow with RDS_CANCEL_SENT_TO.
 807 */
 808static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
 809                             struct rds_conn_path *cp,
 810                             struct rds_message *rm, __be16 sport,
 811                             __be16 dport, int *queued)
 812{
 813        unsigned long flags;
 814        u32 len;
 815
 816        if (*queued)
 817                goto out;
 818
 819        len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
 820
 821        /* this is the only place which holds both the socket's rs_lock
 822         * and the connection's c_lock */
 823        spin_lock_irqsave(&rs->rs_lock, flags);
 824
 825        /*
 826         * If there is a little space in sndbuf, we don't queue anything,
 827         * and userspace gets -EAGAIN. But poll() indicates there's send
 828         * room. This can lead to bad behavior (spinning) if snd_bytes isn't
 829         * freed up by incoming acks. So we check the *old* value of
 830         * rs_snd_bytes here to allow the last msg to exceed the buffer,
 831         * and poll() now knows no more data can be sent.
 832         */
 833        if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
 834                rs->rs_snd_bytes += len;
 835
 836                /* let recv side know we are close to send space exhaustion.
 837                 * This is probably not the optimal way to do it, as this
 838                 * means we set the flag on *all* messages as soon as our
 839                 * throughput hits a certain threshold.
 840                 */
 841                if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
 842                        set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 843
 844                list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
 845                set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
 846                rds_message_addref(rm);
 847                sock_hold(rds_rs_to_sk(rs));
 848                rm->m_rs = rs;
 849
 850                /* The code ordering is a little weird, but we're
 851                   trying to minimize the time we hold c_lock */
 852                rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
 853                rm->m_inc.i_conn = conn;
 854                rm->m_inc.i_conn_path = cp;
 855                rds_message_addref(rm);
 856
 857                spin_lock(&cp->cp_lock);
 858                rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
 859                list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
 860                set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
 861                spin_unlock(&cp->cp_lock);
 862
 863                rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
 864                         rm, len, rs, rs->rs_snd_bytes,
 865                         (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
 866
 867                *queued = 1;
 868        }
 869
 870        spin_unlock_irqrestore(&rs->rs_lock, flags);
 871out:
 872        return *queued;
 873}
 874
 875/*
 876 * rds_message is getting to be quite complicated, and we'd like to allocate
 877 * it all in one go. This figures out how big it needs to be up front.
 878 */
 879static int rds_rm_size(struct msghdr *msg, int num_sgs,
 880                       struct rds_iov_vector_arr *vct)
 881{
 882        struct cmsghdr *cmsg;
 883        int size = 0;
 884        int cmsg_groups = 0;
 885        int retval;
 886        bool zcopy_cookie = false;
 887        struct rds_iov_vector *iov, *tmp_iov;
 888
 889        if (num_sgs < 0)
 890                return -EINVAL;
 891
 892        for_each_cmsghdr(cmsg, msg) {
 893                if (!CMSG_OK(msg, cmsg))
 894                        return -EINVAL;
 895
 896                if (cmsg->cmsg_level != SOL_RDS)
 897                        continue;
 898
 899                switch (cmsg->cmsg_type) {
 900                case RDS_CMSG_RDMA_ARGS:
 901                        if (vct->indx >= vct->len) {
 902                                vct->len += vct->incr;
 903                                tmp_iov =
 904                                        krealloc(vct->vec,
 905                                                 vct->len *
 906                                                 sizeof(struct rds_iov_vector),
 907                                                 GFP_KERNEL);
 908                                if (!tmp_iov) {
 909                                        vct->len -= vct->incr;
 910                                        return -ENOMEM;
 911                                }
 912                                vct->vec = tmp_iov;
 913                        }
 914                        iov = &vct->vec[vct->indx];
 915                        memset(iov, 0, sizeof(struct rds_iov_vector));
 916                        vct->indx++;
 917                        cmsg_groups |= 1;
 918                        retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov);
 919                        if (retval < 0)
 920                                return retval;
 921                        size += retval;
 922
 923                        break;
 924
 925                case RDS_CMSG_ZCOPY_COOKIE:
 926                        zcopy_cookie = true;
 927                        /* fall through */
 928
 929                case RDS_CMSG_RDMA_DEST:
 930                case RDS_CMSG_RDMA_MAP:
 931                        cmsg_groups |= 2;
 932                        /* these are valid but do no add any size */
 933                        break;
 934
 935                case RDS_CMSG_ATOMIC_CSWP:
 936                case RDS_CMSG_ATOMIC_FADD:
 937                case RDS_CMSG_MASKED_ATOMIC_CSWP:
 938                case RDS_CMSG_MASKED_ATOMIC_FADD:
 939                        cmsg_groups |= 1;
 940                        size += sizeof(struct scatterlist);
 941                        break;
 942
 943                default:
 944                        return -EINVAL;
 945                }
 946
 947        }
 948
 949        if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
 950                return -EINVAL;
 951
 952        size += num_sgs * sizeof(struct scatterlist);
 953
 954        /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
 955        if (cmsg_groups == 3)
 956                return -EINVAL;
 957
 958        return size;
 959}
 960
 961static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
 962                          struct cmsghdr *cmsg)
 963{
 964        u32 *cookie;
 965
 966        if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) ||
 967            !rm->data.op_mmp_znotifier)
 968                return -EINVAL;
 969        cookie = CMSG_DATA(cmsg);
 970        rm->data.op_mmp_znotifier->z_cookie = *cookie;
 971        return 0;
 972}
 973
 974static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 975                         struct msghdr *msg, int *allocated_mr,
 976                         struct rds_iov_vector_arr *vct)
 977{
 978        struct cmsghdr *cmsg;
 979        int ret = 0, ind = 0;
 980
 981        for_each_cmsghdr(cmsg, msg) {
 982                if (!CMSG_OK(msg, cmsg))
 983                        return -EINVAL;
 984
 985                if (cmsg->cmsg_level != SOL_RDS)
 986                        continue;
 987
 988                /* As a side effect, RDMA_DEST and RDMA_MAP will set
 989                 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
 990                 */
 991                switch (cmsg->cmsg_type) {
 992                case RDS_CMSG_RDMA_ARGS:
 993                        if (ind >= vct->indx)
 994                                return -ENOMEM;
 995                        ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]);
 996                        ind++;
 997                        break;
 998
 999                case RDS_CMSG_RDMA_DEST:
1000                        ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
1001                        break;
1002
1003                case RDS_CMSG_RDMA_MAP:
1004                        ret = rds_cmsg_rdma_map(rs, rm, cmsg);
1005                        if (!ret)
1006                                *allocated_mr = 1;
1007                        else if (ret == -ENODEV)
1008                                /* Accommodate the get_mr() case which can fail
1009                                 * if connection isn't established yet.
1010                                 */
1011                                ret = -EAGAIN;
1012                        break;
1013                case RDS_CMSG_ATOMIC_CSWP:
1014                case RDS_CMSG_ATOMIC_FADD:
1015                case RDS_CMSG_MASKED_ATOMIC_CSWP:
1016                case RDS_CMSG_MASKED_ATOMIC_FADD:
1017                        ret = rds_cmsg_atomic(rs, rm, cmsg);
1018                        break;
1019
1020                case RDS_CMSG_ZCOPY_COOKIE:
1021                        ret = rds_cmsg_zcopy(rs, rm, cmsg);
1022                        break;
1023
1024                default:
1025                        return -EINVAL;
1026                }
1027
1028                if (ret)
1029                        break;
1030        }
1031
1032        return ret;
1033}
1034
1035static int rds_send_mprds_hash(struct rds_sock *rs,
1036                               struct rds_connection *conn, int nonblock)
1037{
1038        int hash;
1039
1040        if (conn->c_npaths == 0)
1041                hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
1042        else
1043                hash = RDS_MPATH_HASH(rs, conn->c_npaths);
1044        if (conn->c_npaths == 0 && hash != 0) {
1045                rds_send_ping(conn, 0);
1046
1047                /* The underlying connection is not up yet.  Need to wait
1048                 * until it is up to be sure that the non-zero c_path can be
1049                 * used.  But if we are interrupted, we have to use the zero
1050                 * c_path in case the connection ends up being non-MP capable.
1051                 */
1052                if (conn->c_npaths == 0) {
1053                        /* Cannot wait for the connection be made, so just use
1054                         * the base c_path.
1055                         */
1056                        if (nonblock)
1057                                return 0;
1058                        if (wait_event_interruptible(conn->c_hs_waitq,
1059                                                     conn->c_npaths != 0))
1060                                hash = 0;
1061                }
1062                if (conn->c_npaths == 1)
1063                        hash = 0;
1064        }
1065        return hash;
1066}
1067
1068static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
1069{
1070        struct rds_rdma_args *args;
1071        struct cmsghdr *cmsg;
1072
1073        for_each_cmsghdr(cmsg, msg) {
1074                if (!CMSG_OK(msg, cmsg))
1075                        return -EINVAL;
1076
1077                if (cmsg->cmsg_level != SOL_RDS)
1078                        continue;
1079
1080                if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
1081                        if (cmsg->cmsg_len <
1082                            CMSG_LEN(sizeof(struct rds_rdma_args)))
1083                                return -EINVAL;
1084                        args = CMSG_DATA(cmsg);
1085                        *rdma_bytes += args->remote_vec.bytes;
1086                }
1087        }
1088        return 0;
1089}
1090
1091int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1092{
1093        struct sock *sk = sock->sk;
1094        struct rds_sock *rs = rds_sk_to_rs(sk);
1095        DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
1096        DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
1097        __be16 dport;
1098        struct rds_message *rm = NULL;
1099        struct rds_connection *conn;
1100        int ret = 0;
1101        int queued = 0, allocated_mr = 0;
1102        int nonblock = msg->msg_flags & MSG_DONTWAIT;
1103        long timeo = sock_sndtimeo(sk, nonblock);
1104        struct rds_conn_path *cpath;
1105        struct in6_addr daddr;
1106        __u32 scope_id = 0;
1107        size_t total_payload_len = payload_len, rdma_payload_len = 0;
1108        bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
1109                      sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
1110        int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE);
1111        int namelen;
1112        struct rds_iov_vector_arr vct;
1113        int ind;
1114
1115        memset(&vct, 0, sizeof(vct));
1116
1117        /* expect 1 RDMA CMSG per rds_sendmsg. can still grow if more needed. */
1118        vct.incr = 1;
1119
1120        /* Mirror Linux UDP mirror of BSD error message compatibility */
1121        /* XXX: Perhaps MSG_MORE someday */
1122        if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
1123                ret = -EOPNOTSUPP;
1124                goto out;
1125        }
1126
1127        namelen = msg->msg_namelen;
1128        if (namelen != 0) {
1129                if (namelen < sizeof(*usin)) {
1130                        ret = -EINVAL;
1131                        goto out;
1132                }
1133                switch (usin->sin_family) {
1134                case AF_INET:
1135                        if (usin->sin_addr.s_addr == htonl(INADDR_ANY) ||
1136                            usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) ||
1137                            IN_MULTICAST(ntohl(usin->sin_addr.s_addr))) {
1138                                ret = -EINVAL;
1139                                goto out;
1140                        }
1141                        ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr);
1142                        dport = usin->sin_port;
1143                        break;
1144
1145#if IS_ENABLED(CONFIG_IPV6)
1146                case AF_INET6: {
1147                        int addr_type;
1148
1149                        if (namelen < sizeof(*sin6)) {
1150                                ret = -EINVAL;
1151                                goto out;
1152                        }
1153                        addr_type = ipv6_addr_type(&sin6->sin6_addr);
1154                        if (!(addr_type & IPV6_ADDR_UNICAST)) {
1155                                __be32 addr4;
1156
1157                                if (!(addr_type & IPV6_ADDR_MAPPED)) {
1158                                        ret = -EINVAL;
1159                                        goto out;
1160                                }
1161
1162                                /* It is a mapped address.  Need to do some
1163                                 * sanity checks.
1164                                 */
1165                                addr4 = sin6->sin6_addr.s6_addr32[3];
1166                                if (addr4 == htonl(INADDR_ANY) ||
1167                                    addr4 == htonl(INADDR_BROADCAST) ||
1168                                    IN_MULTICAST(ntohl(addr4))) {
1169                                        ret = -EINVAL;
1170                                        goto out;
1171                                }
1172                        }
1173                        if (addr_type & IPV6_ADDR_LINKLOCAL) {
1174                                if (sin6->sin6_scope_id == 0) {
1175                                        ret = -EINVAL;
1176                                        goto out;
1177                                }
1178                                scope_id = sin6->sin6_scope_id;
1179                        }
1180
1181                        daddr = sin6->sin6_addr;
1182                        dport = sin6->sin6_port;
1183                        break;
1184                }
1185#endif
1186
1187                default:
1188                        ret = -EINVAL;
1189                        goto out;
1190                }
1191        } else {
1192                /* We only care about consistency with ->connect() */
1193                lock_sock(sk);
1194                daddr = rs->rs_conn_addr;
1195                dport = rs->rs_conn_port;
1196                scope_id = rs->rs_bound_scope_id;
1197                release_sock(sk);
1198        }
1199
1200        lock_sock(sk);
1201        if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) {
1202                release_sock(sk);
1203                ret = -ENOTCONN;
1204                goto out;
1205        } else if (namelen != 0) {
1206                /* Cannot send to an IPv4 address using an IPv6 source
1207                 * address and cannot send to an IPv6 address using an
1208                 * IPv4 source address.
1209                 */
1210                if (ipv6_addr_v4mapped(&daddr) ^
1211                    ipv6_addr_v4mapped(&rs->rs_bound_addr)) {
1212                        release_sock(sk);
1213                        ret = -EOPNOTSUPP;
1214                        goto out;
1215                }
1216                /* If the socket is already bound to a link local address,
1217                 * it can only send to peers on the same link.  But allow
1218                 * communicating beween link local and non-link local address.
1219                 */
1220                if (scope_id != rs->rs_bound_scope_id) {
1221                        if (!scope_id) {
1222                                scope_id = rs->rs_bound_scope_id;
1223                        } else if (rs->rs_bound_scope_id) {
1224                                release_sock(sk);
1225                                ret = -EINVAL;
1226                                goto out;
1227                        }
1228                }
1229        }
1230        release_sock(sk);
1231
1232        ret = rds_rdma_bytes(msg, &rdma_payload_len);
1233        if (ret)
1234                goto out;
1235
1236        total_payload_len += rdma_payload_len;
1237        if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
1238                ret = -EMSGSIZE;
1239                goto out;
1240        }
1241
1242        if (payload_len > rds_sk_sndbuf(rs)) {
1243                ret = -EMSGSIZE;
1244                goto out;
1245        }
1246
1247        if (zcopy) {
1248                if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
1249                        ret = -EOPNOTSUPP;
1250                        goto out;
1251                }
1252                num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
1253        }
1254        /* size of rm including all sgs */
1255        ret = rds_rm_size(msg, num_sgs, &vct);
1256        if (ret < 0)
1257                goto out;
1258
1259        rm = rds_message_alloc(ret, GFP_KERNEL);
1260        if (!rm) {
1261                ret = -ENOMEM;
1262                goto out;
1263        }
1264
1265        /* Attach data to the rm */
1266        if (payload_len) {
1267                rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs, &ret);
1268                if (!rm->data.op_sg)
1269                        goto out;
1270                ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
1271                if (ret)
1272                        goto out;
1273        }
1274        rm->data.op_active = 1;
1275
1276        rm->m_daddr = daddr;
1277
1278        /* rds_conn_create has a spinlock that runs with IRQ off.
1279         * Caching the conn in the socket helps a lot. */
1280        if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) &&
1281            rs->rs_tos == rs->rs_conn->c_tos) {
1282                conn = rs->rs_conn;
1283        } else {
1284                conn = rds_conn_create_outgoing(sock_net(sock->sk),
1285                                                &rs->rs_bound_addr, &daddr,
1286                                                rs->rs_transport, rs->rs_tos,
1287                                                sock->sk->sk_allocation,
1288                                                scope_id);
1289                if (IS_ERR(conn)) {
1290                        ret = PTR_ERR(conn);
1291                        goto out;
1292                }
1293                rs->rs_conn = conn;
1294        }
1295
1296        if (conn->c_trans->t_mp_capable)
1297                cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
1298        else
1299                cpath = &conn->c_path[0];
1300
1301        rm->m_conn_path = cpath;
1302
1303        /* Parse any control messages the user may have included. */
1304        ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct);
1305        if (ret) {
1306                /* Trigger connection so that its ready for the next retry */
1307                if (ret ==  -EAGAIN)
1308                        rds_conn_connect_if_down(conn);
1309                goto out;
1310        }
1311
1312        if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
1313                printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
1314                               &rm->rdma, conn->c_trans->xmit_rdma);
1315                ret = -EOPNOTSUPP;
1316                goto out;
1317        }
1318
1319        if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
1320                printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
1321                               &rm->atomic, conn->c_trans->xmit_atomic);
1322                ret = -EOPNOTSUPP;
1323                goto out;
1324        }
1325
1326        if (rds_destroy_pending(conn)) {
1327                ret = -EAGAIN;
1328                goto out;
1329        }
1330
1331        rds_conn_path_connect_if_down(cpath);
1332
1333        ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
1334        if (ret) {
1335                rs->rs_seen_congestion = 1;
1336                goto out;
1337        }
1338        while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
1339                                  dport, &queued)) {
1340                rds_stats_inc(s_send_queue_full);
1341
1342                if (nonblock) {
1343                        ret = -EAGAIN;
1344                        goto out;
1345                }
1346
1347                timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
1348                                        rds_send_queue_rm(rs, conn, cpath, rm,
1349                                                          rs->rs_bound_port,
1350                                                          dport,
1351                                                          &queued),
1352                                        timeo);
1353                rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
1354                if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
1355                        continue;
1356
1357                ret = timeo;
1358                if (ret == 0)
1359                        ret = -ETIMEDOUT;
1360                goto out;
1361        }
1362
1363        /*
1364         * By now we've committed to the send.  We reuse rds_send_worker()
1365         * to retry sends in the rds thread if the transport asks us to.
1366         */
1367        rds_stats_inc(s_send_queued);
1368
1369        ret = rds_send_xmit(cpath);
1370        if (ret == -ENOMEM || ret == -EAGAIN) {
1371                ret = 0;
1372                rcu_read_lock();
1373                if (rds_destroy_pending(cpath->cp_conn))
1374                        ret = -ENETUNREACH;
1375                else
1376                        queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1377                rcu_read_unlock();
1378        }
1379        if (ret)
1380                goto out;
1381        rds_message_put(rm);
1382
1383        for (ind = 0; ind < vct.indx; ind++)
1384                kfree(vct.vec[ind].iov);
1385        kfree(vct.vec);
1386
1387        return payload_len;
1388
1389out:
1390        for (ind = 0; ind < vct.indx; ind++)
1391                kfree(vct.vec[ind].iov);
1392        kfree(vct.vec);
1393
1394        /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
1395         * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
1396         * or in any other way, we need to destroy the MR again */
1397        if (allocated_mr)
1398                rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
1399
1400        if (rm)
1401                rds_message_put(rm);
1402        return ret;
1403}
1404
1405/*
1406 * send out a probe. Can be shared by rds_send_ping,
1407 * rds_send_pong, rds_send_hb.
1408 * rds_send_hb should use h_flags
1409 *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
1410 * or
1411 *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
1412 */
1413static int
1414rds_send_probe(struct rds_conn_path *cp, __be16 sport,
1415               __be16 dport, u8 h_flags)
1416{
1417        struct rds_message *rm;
1418        unsigned long flags;
1419        int ret = 0;
1420
1421        rm = rds_message_alloc(0, GFP_ATOMIC);
1422        if (!rm) {
1423                ret = -ENOMEM;
1424                goto out;
1425        }
1426
1427        rm->m_daddr = cp->cp_conn->c_faddr;
1428        rm->data.op_active = 1;
1429
1430        rds_conn_path_connect_if_down(cp);
1431
1432        ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
1433        if (ret)
1434                goto out;
1435
1436        spin_lock_irqsave(&cp->cp_lock, flags);
1437        list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
1438        set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
1439        rds_message_addref(rm);
1440        rm->m_inc.i_conn = cp->cp_conn;
1441        rm->m_inc.i_conn_path = cp;
1442
1443        rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
1444                                    cp->cp_next_tx_seq);
1445        rm->m_inc.i_hdr.h_flags |= h_flags;
1446        cp->cp_next_tx_seq++;
1447
1448        if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
1449            cp->cp_conn->c_trans->t_mp_capable) {
1450                u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
1451                u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
1452
1453                rds_message_add_extension(&rm->m_inc.i_hdr,
1454                                          RDS_EXTHDR_NPATHS, &npaths,
1455                                          sizeof(npaths));
1456                rds_message_add_extension(&rm->m_inc.i_hdr,
1457                                          RDS_EXTHDR_GEN_NUM,
1458                                          &my_gen_num,
1459                                          sizeof(u32));
1460        }
1461        spin_unlock_irqrestore(&cp->cp_lock, flags);
1462
1463        rds_stats_inc(s_send_queued);
1464        rds_stats_inc(s_send_pong);
1465
1466        /* schedule the send work on rds_wq */
1467        rcu_read_lock();
1468        if (!rds_destroy_pending(cp->cp_conn))
1469                queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
1470        rcu_read_unlock();
1471
1472        rds_message_put(rm);
1473        return 0;
1474
1475out:
1476        if (rm)
1477                rds_message_put(rm);
1478        return ret;
1479}
1480
1481int
1482rds_send_pong(struct rds_conn_path *cp, __be16 dport)
1483{
1484        return rds_send_probe(cp, 0, dport, 0);
1485}
1486
1487void
1488rds_send_ping(struct rds_connection *conn, int cp_index)
1489{
1490        unsigned long flags;
1491        struct rds_conn_path *cp = &conn->c_path[cp_index];
1492
1493        spin_lock_irqsave(&cp->cp_lock, flags);
1494        if (conn->c_ping_triggered) {
1495                spin_unlock_irqrestore(&cp->cp_lock, flags);
1496                return;
1497        }
1498        conn->c_ping_triggered = 1;
1499        spin_unlock_irqrestore(&cp->cp_lock, flags);
1500        rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
1501}
1502EXPORT_SYMBOL_GPL(rds_send_ping);
1503