linux/net/rds/recv.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/slab.h>
  35#include <net/sock.h>
  36#include <linux/in.h>
  37#include <linux/export.h>
  38#include <linux/time.h>
  39#include <linux/rds.h>
  40
  41#include "rds.h"
  42
  43void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
  44                 struct in6_addr *saddr)
  45{
  46        refcount_set(&inc->i_refcount, 1);
  47        INIT_LIST_HEAD(&inc->i_item);
  48        inc->i_conn = conn;
  49        inc->i_saddr = *saddr;
  50        inc->i_usercopy.rdma_cookie = 0;
  51        inc->i_usercopy.rx_tstamp = ktime_set(0, 0);
  52
  53        memset(inc->i_rx_lat_trace, 0, sizeof(inc->i_rx_lat_trace));
  54}
  55EXPORT_SYMBOL_GPL(rds_inc_init);
  56
  57void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp,
  58                       struct in6_addr  *saddr)
  59{
  60        refcount_set(&inc->i_refcount, 1);
  61        INIT_LIST_HEAD(&inc->i_item);
  62        inc->i_conn = cp->cp_conn;
  63        inc->i_conn_path = cp;
  64        inc->i_saddr = *saddr;
  65        inc->i_usercopy.rdma_cookie = 0;
  66        inc->i_usercopy.rx_tstamp = ktime_set(0, 0);
  67}
  68EXPORT_SYMBOL_GPL(rds_inc_path_init);
  69
  70static void rds_inc_addref(struct rds_incoming *inc)
  71{
  72        rdsdebug("addref inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
  73        refcount_inc(&inc->i_refcount);
  74}
  75
  76void rds_inc_put(struct rds_incoming *inc)
  77{
  78        rdsdebug("put inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
  79        if (refcount_dec_and_test(&inc->i_refcount)) {
  80                BUG_ON(!list_empty(&inc->i_item));
  81
  82                inc->i_conn->c_trans->inc_free(inc);
  83        }
  84}
  85EXPORT_SYMBOL_GPL(rds_inc_put);
  86
  87static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
  88                                  struct rds_cong_map *map,
  89                                  int delta, __be16 port)
  90{
  91        int now_congested;
  92
  93        if (delta == 0)
  94                return;
  95
  96        rs->rs_rcv_bytes += delta;
  97        if (delta > 0)
  98                rds_stats_add(s_recv_bytes_added_to_socket, delta);
  99        else
 100                rds_stats_add(s_recv_bytes_removed_from_socket, -delta);
 101
 102        /* loop transport doesn't send/recv congestion updates */
 103        if (rs->rs_transport->t_type == RDS_TRANS_LOOP)
 104                return;
 105
 106        now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
 107
 108        rdsdebug("rs %p (%pI6c:%u) recv bytes %d buf %d "
 109          "now_cong %d delta %d\n",
 110          rs, &rs->rs_bound_addr,
 111          ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
 112          rds_sk_rcvbuf(rs), now_congested, delta);
 113
 114        /* wasn't -> am congested */
 115        if (!rs->rs_congested && now_congested) {
 116                rs->rs_congested = 1;
 117                rds_cong_set_bit(map, port);
 118                rds_cong_queue_updates(map);
 119        }
 120        /* was -> aren't congested */
 121        /* Require more free space before reporting uncongested to prevent
 122           bouncing cong/uncong state too often */
 123        else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
 124                rs->rs_congested = 0;
 125                rds_cong_clear_bit(map, port);
 126                rds_cong_queue_updates(map);
 127        }
 128
 129        /* do nothing if no change in cong state */
 130}
 131
 132static void rds_conn_peer_gen_update(struct rds_connection *conn,
 133                                     u32 peer_gen_num)
 134{
 135        int i;
 136        struct rds_message *rm, *tmp;
 137        unsigned long flags;
 138
 139        WARN_ON(conn->c_trans->t_type != RDS_TRANS_TCP);
 140        if (peer_gen_num != 0) {
 141                if (conn->c_peer_gen_num != 0 &&
 142                    peer_gen_num != conn->c_peer_gen_num) {
 143                        for (i = 0; i < RDS_MPATH_WORKERS; i++) {
 144                                struct rds_conn_path *cp;
 145
 146                                cp = &conn->c_path[i];
 147                                spin_lock_irqsave(&cp->cp_lock, flags);
 148                                cp->cp_next_tx_seq = 1;
 149                                cp->cp_next_rx_seq = 0;
 150                                list_for_each_entry_safe(rm, tmp,
 151                                                         &cp->cp_retrans,
 152                                                         m_conn_item) {
 153                                        set_bit(RDS_MSG_FLUSH, &rm->m_flags);
 154                                }
 155                                spin_unlock_irqrestore(&cp->cp_lock, flags);
 156                        }
 157                }
 158                conn->c_peer_gen_num = peer_gen_num;
 159        }
 160}
 161
 162/*
 163 * Process all extension headers that come with this message.
 164 */
 165static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
 166{
 167        struct rds_header *hdr = &inc->i_hdr;
 168        unsigned int pos = 0, type, len;
 169        union {
 170                struct rds_ext_header_version version;
 171                struct rds_ext_header_rdma rdma;
 172                struct rds_ext_header_rdma_dest rdma_dest;
 173        } buffer;
 174
 175        while (1) {
 176                len = sizeof(buffer);
 177                type = rds_message_next_extension(hdr, &pos, &buffer, &len);
 178                if (type == RDS_EXTHDR_NONE)
 179                        break;
 180                /* Process extension header here */
 181                switch (type) {
 182                case RDS_EXTHDR_RDMA:
 183                        rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
 184                        break;
 185
 186                case RDS_EXTHDR_RDMA_DEST:
 187                        /* We ignore the size for now. We could stash it
 188                         * somewhere and use it for error checking. */
 189                        inc->i_usercopy.rdma_cookie = rds_rdma_make_cookie(
 190                                        be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
 191                                        be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
 192
 193                        break;
 194                }
 195        }
 196}
 197
 198static void rds_recv_hs_exthdrs(struct rds_header *hdr,
 199                                struct rds_connection *conn)
 200{
 201        unsigned int pos = 0, type, len;
 202        union {
 203                struct rds_ext_header_version version;
 204                u16 rds_npaths;
 205                u32 rds_gen_num;
 206        } buffer;
 207        u32 new_peer_gen_num = 0;
 208
 209        while (1) {
 210                len = sizeof(buffer);
 211                type = rds_message_next_extension(hdr, &pos, &buffer, &len);
 212                if (type == RDS_EXTHDR_NONE)
 213                        break;
 214                /* Process extension header here */
 215                switch (type) {
 216                case RDS_EXTHDR_NPATHS:
 217                        conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
 218                                               be16_to_cpu(buffer.rds_npaths));
 219                        break;
 220                case RDS_EXTHDR_GEN_NUM:
 221                        new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
 222                        break;
 223                default:
 224                        pr_warn_ratelimited("ignoring unknown exthdr type "
 225                                             "0x%x\n", type);
 226                }
 227        }
 228        /* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
 229        conn->c_npaths = max_t(int, conn->c_npaths, 1);
 230        conn->c_ping_triggered = 0;
 231        rds_conn_peer_gen_update(conn, new_peer_gen_num);
 232}
 233
 234/* rds_start_mprds() will synchronously start multiple paths when appropriate.
 235 * The scheme is based on the following rules:
 236 *
 237 * 1. rds_sendmsg on first connect attempt sends the probe ping, with the
 238 *    sender's npaths (s_npaths)
 239 * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It
 240 *    sends back a probe-pong with r_npaths. After that, if rcvr is the
 241 *    smaller ip addr, it starts rds_conn_path_connect_if_down on all
 242 *    mprds_paths.
 243 * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down.
 244 *    If it is the smaller ipaddr, rds_conn_path_connect_if_down can be
 245 *    called after reception of the probe-pong on all mprds_paths.
 246 *    Otherwise (sender of probe-ping is not the smaller ip addr): just call
 247 *    rds_conn_path_connect_if_down on the hashed path. (see rule 4)
 248 * 4. rds_connect_worker must only trigger a connection if laddr < faddr.
 249 * 5. sender may end up queuing the packet on the cp. will get sent out later.
 250 *    when connection is completed.
 251 */
 252static void rds_start_mprds(struct rds_connection *conn)
 253{
 254        int i;
 255        struct rds_conn_path *cp;
 256
 257        if (conn->c_npaths > 1 &&
 258            rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) < 0) {
 259                for (i = 0; i < conn->c_npaths; i++) {
 260                        cp = &conn->c_path[i];
 261                        rds_conn_path_connect_if_down(cp);
 262                }
 263        }
 264}
 265
 266/*
 267 * The transport must make sure that this is serialized against other
 268 * rx and conn reset on this specific conn.
 269 *
 270 * We currently assert that only one fragmented message will be sent
 271 * down a connection at a time.  This lets us reassemble in the conn
 272 * instead of per-flow which means that we don't have to go digging through
 273 * flows to tear down partial reassembly progress on conn failure and
 274 * we save flow lookup and locking for each frag arrival.  It does mean
 275 * that small messages will wait behind large ones.  Fragmenting at all
 276 * is only to reduce the memory consumption of pre-posted buffers.
 277 *
 278 * The caller passes in saddr and daddr instead of us getting it from the
 279 * conn.  This lets loopback, who only has one conn for both directions,
 280 * tell us which roles the addrs in the conn are playing for this message.
 281 */
 282void rds_recv_incoming(struct rds_connection *conn, struct in6_addr *saddr,
 283                       struct in6_addr *daddr,
 284                       struct rds_incoming *inc, gfp_t gfp)
 285{
 286        struct rds_sock *rs = NULL;
 287        struct sock *sk;
 288        unsigned long flags;
 289        struct rds_conn_path *cp;
 290
 291        inc->i_conn = conn;
 292        inc->i_rx_jiffies = jiffies;
 293        if (conn->c_trans->t_mp_capable)
 294                cp = inc->i_conn_path;
 295        else
 296                cp = &conn->c_path[0];
 297
 298        rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
 299                 "flags 0x%x rx_jiffies %lu\n", conn,
 300                 (unsigned long long)cp->cp_next_rx_seq,
 301                 inc,
 302                 (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
 303                 be32_to_cpu(inc->i_hdr.h_len),
 304                 be16_to_cpu(inc->i_hdr.h_sport),
 305                 be16_to_cpu(inc->i_hdr.h_dport),
 306                 inc->i_hdr.h_flags,
 307                 inc->i_rx_jiffies);
 308
 309        /*
 310         * Sequence numbers should only increase.  Messages get their
 311         * sequence number as they're queued in a sending conn.  They
 312         * can be dropped, though, if the sending socket is closed before
 313         * they hit the wire.  So sequence numbers can skip forward
 314         * under normal operation.  They can also drop back in the conn
 315         * failover case as previously sent messages are resent down the
 316         * new instance of a conn.  We drop those, otherwise we have
 317         * to assume that the next valid seq does not come after a
 318         * hole in the fragment stream.
 319         *
 320         * The headers don't give us a way to realize if fragments of
 321         * a message have been dropped.  We assume that frags that arrive
 322         * to a flow are part of the current message on the flow that is
 323         * being reassembled.  This means that senders can't drop messages
 324         * from the sending conn until all their frags are sent.
 325         *
 326         * XXX we could spend more on the wire to get more robust failure
 327         * detection, arguably worth it to avoid data corruption.
 328         */
 329        if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq &&
 330            (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
 331                rds_stats_inc(s_recv_drop_old_seq);
 332                goto out;
 333        }
 334        cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
 335
 336        if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
 337                if (inc->i_hdr.h_sport == 0) {
 338                        rdsdebug("ignore ping with 0 sport from %pI6c\n",
 339                                 saddr);
 340                        goto out;
 341                }
 342                rds_stats_inc(s_recv_ping);
 343                rds_send_pong(cp, inc->i_hdr.h_sport);
 344                /* if this is a handshake ping, start multipath if necessary */
 345                if (RDS_HS_PROBE(be16_to_cpu(inc->i_hdr.h_sport),
 346                                 be16_to_cpu(inc->i_hdr.h_dport))) {
 347                        rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
 348                        rds_start_mprds(cp->cp_conn);
 349                }
 350                goto out;
 351        }
 352
 353        if (be16_to_cpu(inc->i_hdr.h_dport) ==  RDS_FLAG_PROBE_PORT &&
 354            inc->i_hdr.h_sport == 0) {
 355                rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
 356                /* if this is a handshake pong, start multipath if necessary */
 357                rds_start_mprds(cp->cp_conn);
 358                wake_up(&cp->cp_conn->c_hs_waitq);
 359                goto out;
 360        }
 361
 362        rs = rds_find_bound(daddr, inc->i_hdr.h_dport, conn->c_bound_if);
 363        if (!rs) {
 364                rds_stats_inc(s_recv_drop_no_sock);
 365                goto out;
 366        }
 367
 368        /* Process extension headers */
 369        rds_recv_incoming_exthdrs(inc, rs);
 370
 371        /* We can be racing with rds_release() which marks the socket dead. */
 372        sk = rds_rs_to_sk(rs);
 373
 374        /* serialize with rds_release -> sock_orphan */
 375        write_lock_irqsave(&rs->rs_recv_lock, flags);
 376        if (!sock_flag(sk, SOCK_DEAD)) {
 377                rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
 378                rds_stats_inc(s_recv_queued);
 379                rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 380                                      be32_to_cpu(inc->i_hdr.h_len),
 381                                      inc->i_hdr.h_dport);
 382                if (sock_flag(sk, SOCK_RCVTSTAMP))
 383                        inc->i_usercopy.rx_tstamp = ktime_get_real();
 384                rds_inc_addref(inc);
 385                inc->i_rx_lat_trace[RDS_MSG_RX_END] = local_clock();
 386                list_add_tail(&inc->i_item, &rs->rs_recv_queue);
 387                __rds_wake_sk_sleep(sk);
 388        } else {
 389                rds_stats_inc(s_recv_drop_dead_sock);
 390        }
 391        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 392
 393out:
 394        if (rs)
 395                rds_sock_put(rs);
 396}
 397EXPORT_SYMBOL_GPL(rds_recv_incoming);
 398
 399/*
 400 * be very careful here.  This is being called as the condition in
 401 * wait_event_*() needs to cope with being called many times.
 402 */
 403static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
 404{
 405        unsigned long flags;
 406
 407        if (!*inc) {
 408                read_lock_irqsave(&rs->rs_recv_lock, flags);
 409                if (!list_empty(&rs->rs_recv_queue)) {
 410                        *inc = list_entry(rs->rs_recv_queue.next,
 411                                          struct rds_incoming,
 412                                          i_item);
 413                        rds_inc_addref(*inc);
 414                }
 415                read_unlock_irqrestore(&rs->rs_recv_lock, flags);
 416        }
 417
 418        return *inc != NULL;
 419}
 420
 421static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
 422                            int drop)
 423{
 424        struct sock *sk = rds_rs_to_sk(rs);
 425        int ret = 0;
 426        unsigned long flags;
 427
 428        write_lock_irqsave(&rs->rs_recv_lock, flags);
 429        if (!list_empty(&inc->i_item)) {
 430                ret = 1;
 431                if (drop) {
 432                        /* XXX make sure this i_conn is reliable */
 433                        rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 434                                              -be32_to_cpu(inc->i_hdr.h_len),
 435                                              inc->i_hdr.h_dport);
 436                        list_del_init(&inc->i_item);
 437                        rds_inc_put(inc);
 438                }
 439        }
 440        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 441
 442        rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
 443        return ret;
 444}
 445
 446/*
 447 * Pull errors off the error queue.
 448 * If msghdr is NULL, we will just purge the error queue.
 449 */
 450int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
 451{
 452        struct rds_notifier *notifier;
 453        struct rds_rdma_notify cmsg = { 0 }; /* fill holes with zero */
 454        unsigned int count = 0, max_messages = ~0U;
 455        unsigned long flags;
 456        LIST_HEAD(copy);
 457        int err = 0;
 458
 459
 460        /* put_cmsg copies to user space and thus may sleep. We can't do this
 461         * with rs_lock held, so first grab as many notifications as we can stuff
 462         * in the user provided cmsg buffer. We don't try to copy more, to avoid
 463         * losing notifications - except when the buffer is so small that it wouldn't
 464         * even hold a single notification. Then we give him as much of this single
 465         * msg as we can squeeze in, and set MSG_CTRUNC.
 466         */
 467        if (msghdr) {
 468                max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
 469                if (!max_messages)
 470                        max_messages = 1;
 471        }
 472
 473        spin_lock_irqsave(&rs->rs_lock, flags);
 474        while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
 475                notifier = list_entry(rs->rs_notify_queue.next,
 476                                struct rds_notifier, n_list);
 477                list_move(&notifier->n_list, &copy);
 478                count++;
 479        }
 480        spin_unlock_irqrestore(&rs->rs_lock, flags);
 481
 482        if (!count)
 483                return 0;
 484
 485        while (!list_empty(&copy)) {
 486                notifier = list_entry(copy.next, struct rds_notifier, n_list);
 487
 488                if (msghdr) {
 489                        cmsg.user_token = notifier->n_user_token;
 490                        cmsg.status = notifier->n_status;
 491
 492                        err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
 493                                       sizeof(cmsg), &cmsg);
 494                        if (err)
 495                                break;
 496                }
 497
 498                list_del_init(&notifier->n_list);
 499                kfree(notifier);
 500        }
 501
 502        /* If we bailed out because of an error in put_cmsg,
 503         * we may be left with one or more notifications that we
 504         * didn't process. Return them to the head of the list. */
 505        if (!list_empty(&copy)) {
 506                spin_lock_irqsave(&rs->rs_lock, flags);
 507                list_splice(&copy, &rs->rs_notify_queue);
 508                spin_unlock_irqrestore(&rs->rs_lock, flags);
 509        }
 510
 511        return err;
 512}
 513
 514/*
 515 * Queue a congestion notification
 516 */
 517static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
 518{
 519        uint64_t notify = rs->rs_cong_notify;
 520        unsigned long flags;
 521        int err;
 522
 523        err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
 524                        sizeof(notify), &notify);
 525        if (err)
 526                return err;
 527
 528        spin_lock_irqsave(&rs->rs_lock, flags);
 529        rs->rs_cong_notify &= ~notify;
 530        spin_unlock_irqrestore(&rs->rs_lock, flags);
 531
 532        return 0;
 533}
 534
 535/*
 536 * Receive any control messages.
 537 */
 538static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
 539                         struct rds_sock *rs)
 540{
 541        int ret = 0;
 542
 543        if (inc->i_usercopy.rdma_cookie) {
 544                ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
 545                                sizeof(inc->i_usercopy.rdma_cookie),
 546                                &inc->i_usercopy.rdma_cookie);
 547                if (ret)
 548                        goto out;
 549        }
 550
 551        if ((inc->i_usercopy.rx_tstamp != 0) &&
 552            sock_flag(rds_rs_to_sk(rs), SOCK_RCVTSTAMP)) {
 553                struct __kernel_old_timeval tv =
 554                        ns_to_kernel_old_timeval(inc->i_usercopy.rx_tstamp);
 555
 556                if (!sock_flag(rds_rs_to_sk(rs), SOCK_TSTAMP_NEW)) {
 557                        ret = put_cmsg(msg, SOL_SOCKET, SO_TIMESTAMP_OLD,
 558                                       sizeof(tv), &tv);
 559                } else {
 560                        struct __kernel_sock_timeval sk_tv;
 561
 562                        sk_tv.tv_sec = tv.tv_sec;
 563                        sk_tv.tv_usec = tv.tv_usec;
 564
 565                        ret = put_cmsg(msg, SOL_SOCKET, SO_TIMESTAMP_NEW,
 566                                       sizeof(sk_tv), &sk_tv);
 567                }
 568
 569                if (ret)
 570                        goto out;
 571        }
 572
 573        if (rs->rs_rx_traces) {
 574                struct rds_cmsg_rx_trace t;
 575                int i, j;
 576
 577                memset(&t, 0, sizeof(t));
 578                inc->i_rx_lat_trace[RDS_MSG_RX_CMSG] = local_clock();
 579                t.rx_traces =  rs->rs_rx_traces;
 580                for (i = 0; i < rs->rs_rx_traces; i++) {
 581                        j = rs->rs_rx_trace[i];
 582                        t.rx_trace_pos[i] = j;
 583                        t.rx_trace[i] = inc->i_rx_lat_trace[j + 1] -
 584                                          inc->i_rx_lat_trace[j];
 585                }
 586
 587                ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RXPATH_LATENCY,
 588                               sizeof(t), &t);
 589                if (ret)
 590                        goto out;
 591        }
 592
 593out:
 594        return ret;
 595}
 596
 597static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
 598{
 599        struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
 600        struct rds_msg_zcopy_info *info = NULL;
 601        struct rds_zcopy_cookies *done;
 602        unsigned long flags;
 603
 604        if (!msg->msg_control)
 605                return false;
 606
 607        if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) ||
 608            msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
 609                return false;
 610
 611        spin_lock_irqsave(&q->lock, flags);
 612        if (!list_empty(&q->zcookie_head)) {
 613                info = list_entry(q->zcookie_head.next,
 614                                  struct rds_msg_zcopy_info, rs_zcookie_next);
 615                list_del(&info->rs_zcookie_next);
 616        }
 617        spin_unlock_irqrestore(&q->lock, flags);
 618        if (!info)
 619                return false;
 620        done = &info->zcookies;
 621        if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
 622                     done)) {
 623                spin_lock_irqsave(&q->lock, flags);
 624                list_add(&info->rs_zcookie_next, &q->zcookie_head);
 625                spin_unlock_irqrestore(&q->lock, flags);
 626                return false;
 627        }
 628        kfree(info);
 629        return true;
 630}
 631
 632int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 633                int msg_flags)
 634{
 635        struct sock *sk = sock->sk;
 636        struct rds_sock *rs = rds_sk_to_rs(sk);
 637        long timeo;
 638        int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
 639        DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
 640        DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
 641        struct rds_incoming *inc = NULL;
 642
 643        /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
 644        timeo = sock_rcvtimeo(sk, nonblock);
 645
 646        rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
 647
 648        if (msg_flags & MSG_OOB)
 649                goto out;
 650        if (msg_flags & MSG_ERRQUEUE)
 651                return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
 652
 653        while (1) {
 654                /* If there are pending notifications, do those - and nothing else */
 655                if (!list_empty(&rs->rs_notify_queue)) {
 656                        ret = rds_notify_queue_get(rs, msg);
 657                        break;
 658                }
 659
 660                if (rs->rs_cong_notify) {
 661                        ret = rds_notify_cong(rs, msg);
 662                        break;
 663                }
 664
 665                if (!rds_next_incoming(rs, &inc)) {
 666                        if (nonblock) {
 667                                bool reaped = rds_recvmsg_zcookie(rs, msg);
 668
 669                                ret = reaped ?  0 : -EAGAIN;
 670                                break;
 671                        }
 672
 673                        timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
 674                                        (!list_empty(&rs->rs_notify_queue) ||
 675                                         rs->rs_cong_notify ||
 676                                         rds_next_incoming(rs, &inc)), timeo);
 677                        rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
 678                                 timeo);
 679                        if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
 680                                continue;
 681
 682                        ret = timeo;
 683                        if (ret == 0)
 684                                ret = -ETIMEDOUT;
 685                        break;
 686                }
 687
 688                rdsdebug("copying inc %p from %pI6c:%u to user\n", inc,
 689                         &inc->i_conn->c_faddr,
 690                         ntohs(inc->i_hdr.h_sport));
 691                ret = inc->i_conn->c_trans->inc_copy_to_user(inc, &msg->msg_iter);
 692                if (ret < 0)
 693                        break;
 694
 695                /*
 696                 * if the message we just copied isn't at the head of the
 697                 * recv queue then someone else raced us to return it, try
 698                 * to get the next message.
 699                 */
 700                if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
 701                        rds_inc_put(inc);
 702                        inc = NULL;
 703                        rds_stats_inc(s_recv_deliver_raced);
 704                        iov_iter_revert(&msg->msg_iter, ret);
 705                        continue;
 706                }
 707
 708                if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
 709                        if (msg_flags & MSG_TRUNC)
 710                                ret = be32_to_cpu(inc->i_hdr.h_len);
 711                        msg->msg_flags |= MSG_TRUNC;
 712                }
 713
 714                if (rds_cmsg_recv(inc, msg, rs)) {
 715                        ret = -EFAULT;
 716                        goto out;
 717                }
 718                rds_recvmsg_zcookie(rs, msg);
 719
 720                rds_stats_inc(s_recv_delivered);
 721
 722                if (msg->msg_name) {
 723                        if (ipv6_addr_v4mapped(&inc->i_saddr)) {
 724                                sin = (struct sockaddr_in *)msg->msg_name;
 725
 726                                sin->sin_family = AF_INET;
 727                                sin->sin_port = inc->i_hdr.h_sport;
 728                                sin->sin_addr.s_addr =
 729                                    inc->i_saddr.s6_addr32[3];
 730                                memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
 731                                msg->msg_namelen = sizeof(*sin);
 732                        } else {
 733                                sin6 = (struct sockaddr_in6 *)msg->msg_name;
 734
 735                                sin6->sin6_family = AF_INET6;
 736                                sin6->sin6_port = inc->i_hdr.h_sport;
 737                                sin6->sin6_addr = inc->i_saddr;
 738                                sin6->sin6_flowinfo = 0;
 739                                sin6->sin6_scope_id = rs->rs_bound_scope_id;
 740                                msg->msg_namelen = sizeof(*sin6);
 741                        }
 742                }
 743                break;
 744        }
 745
 746        if (inc)
 747                rds_inc_put(inc);
 748
 749out:
 750        return ret;
 751}
 752
 753/*
 754 * The socket is being shut down and we're asked to drop messages that were
 755 * queued for recvmsg.  The caller has unbound the socket so the receive path
 756 * won't queue any more incoming fragments or messages on the socket.
 757 */
 758void rds_clear_recv_queue(struct rds_sock *rs)
 759{
 760        struct sock *sk = rds_rs_to_sk(rs);
 761        struct rds_incoming *inc, *tmp;
 762        unsigned long flags;
 763
 764        write_lock_irqsave(&rs->rs_recv_lock, flags);
 765        list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
 766                rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 767                                      -be32_to_cpu(inc->i_hdr.h_len),
 768                                      inc->i_hdr.h_dport);
 769                list_del_init(&inc->i_item);
 770                rds_inc_put(inc);
 771        }
 772        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 773}
 774
 775/*
 776 * inc->i_saddr isn't used here because it is only set in the receive
 777 * path.
 778 */
 779void rds_inc_info_copy(struct rds_incoming *inc,
 780                       struct rds_info_iterator *iter,
 781                       __be32 saddr, __be32 daddr, int flip)
 782{
 783        struct rds_info_message minfo;
 784
 785        minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
 786        minfo.len = be32_to_cpu(inc->i_hdr.h_len);
 787        minfo.tos = inc->i_conn->c_tos;
 788
 789        if (flip) {
 790                minfo.laddr = daddr;
 791                minfo.faddr = saddr;
 792                minfo.lport = inc->i_hdr.h_dport;
 793                minfo.fport = inc->i_hdr.h_sport;
 794        } else {
 795                minfo.laddr = saddr;
 796                minfo.faddr = daddr;
 797                minfo.lport = inc->i_hdr.h_sport;
 798                minfo.fport = inc->i_hdr.h_dport;
 799        }
 800
 801        minfo.flags = 0;
 802
 803        rds_info_copy(iter, &minfo, sizeof(minfo));
 804}
 805
 806#if IS_ENABLED(CONFIG_IPV6)
 807void rds6_inc_info_copy(struct rds_incoming *inc,
 808                        struct rds_info_iterator *iter,
 809                        struct in6_addr *saddr, struct in6_addr *daddr,
 810                        int flip)
 811{
 812        struct rds6_info_message minfo6;
 813
 814        minfo6.seq = be64_to_cpu(inc->i_hdr.h_sequence);
 815        minfo6.len = be32_to_cpu(inc->i_hdr.h_len);
 816        minfo6.tos = inc->i_conn->c_tos;
 817
 818        if (flip) {
 819                minfo6.laddr = *daddr;
 820                minfo6.faddr = *saddr;
 821                minfo6.lport = inc->i_hdr.h_dport;
 822                minfo6.fport = inc->i_hdr.h_sport;
 823        } else {
 824                minfo6.laddr = *saddr;
 825                minfo6.faddr = *daddr;
 826                minfo6.lport = inc->i_hdr.h_sport;
 827                minfo6.fport = inc->i_hdr.h_dport;
 828        }
 829
 830        minfo6.flags = 0;
 831
 832        rds_info_copy(iter, &minfo6, sizeof(minfo6));
 833}
 834#endif
 835