linux/net/rds/recv.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/slab.h>
  35#include <net/sock.h>
  36#include <linux/in.h>
  37#include <linux/export.h>
  38#include <linux/time.h>
  39#include <linux/rds.h>
  40
  41#include "rds.h"
  42
  43void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
  44                  __be32 saddr)
  45{
  46        atomic_set(&inc->i_refcount, 1);
  47        INIT_LIST_HEAD(&inc->i_item);
  48        inc->i_conn = conn;
  49        inc->i_saddr = saddr;
  50        inc->i_rdma_cookie = 0;
  51        inc->i_rx_tstamp.tv_sec = 0;
  52        inc->i_rx_tstamp.tv_usec = 0;
  53}
  54EXPORT_SYMBOL_GPL(rds_inc_init);
  55
  56void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp,
  57                       __be32 saddr)
  58{
  59        atomic_set(&inc->i_refcount, 1);
  60        INIT_LIST_HEAD(&inc->i_item);
  61        inc->i_conn = cp->cp_conn;
  62        inc->i_conn_path = cp;
  63        inc->i_saddr = saddr;
  64        inc->i_rdma_cookie = 0;
  65        inc->i_rx_tstamp.tv_sec = 0;
  66        inc->i_rx_tstamp.tv_usec = 0;
  67}
  68EXPORT_SYMBOL_GPL(rds_inc_path_init);
  69
  70static void rds_inc_addref(struct rds_incoming *inc)
  71{
  72        rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
  73        atomic_inc(&inc->i_refcount);
  74}
  75
  76void rds_inc_put(struct rds_incoming *inc)
  77{
  78        rdsdebug("put inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
  79        if (atomic_dec_and_test(&inc->i_refcount)) {
  80                BUG_ON(!list_empty(&inc->i_item));
  81
  82                inc->i_conn->c_trans->inc_free(inc);
  83        }
  84}
  85EXPORT_SYMBOL_GPL(rds_inc_put);
  86
  87static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
  88                                  struct rds_cong_map *map,
  89                                  int delta, __be16 port)
  90{
  91        int now_congested;
  92
  93        if (delta == 0)
  94                return;
  95
  96        rs->rs_rcv_bytes += delta;
  97        now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
  98
  99        rdsdebug("rs %p (%pI4:%u) recv bytes %d buf %d "
 100          "now_cong %d delta %d\n",
 101          rs, &rs->rs_bound_addr,
 102          ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
 103          rds_sk_rcvbuf(rs), now_congested, delta);
 104
 105        /* wasn't -> am congested */
 106        if (!rs->rs_congested && now_congested) {
 107                rs->rs_congested = 1;
 108                rds_cong_set_bit(map, port);
 109                rds_cong_queue_updates(map);
 110        }
 111        /* was -> aren't congested */
 112        /* Require more free space before reporting uncongested to prevent
 113           bouncing cong/uncong state too often */
 114        else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
 115                rs->rs_congested = 0;
 116                rds_cong_clear_bit(map, port);
 117                rds_cong_queue_updates(map);
 118        }
 119
 120        /* do nothing if no change in cong state */
 121}
 122
 123static void rds_conn_peer_gen_update(struct rds_connection *conn,
 124                                     u32 peer_gen_num)
 125{
 126        int i;
 127        struct rds_message *rm, *tmp;
 128        unsigned long flags;
 129
 130        WARN_ON(conn->c_trans->t_type != RDS_TRANS_TCP);
 131        if (peer_gen_num != 0) {
 132                if (conn->c_peer_gen_num != 0 &&
 133                    peer_gen_num != conn->c_peer_gen_num) {
 134                        for (i = 0; i < RDS_MPATH_WORKERS; i++) {
 135                                struct rds_conn_path *cp;
 136
 137                                cp = &conn->c_path[i];
 138                                spin_lock_irqsave(&cp->cp_lock, flags);
 139                                cp->cp_next_tx_seq = 1;
 140                                cp->cp_next_rx_seq = 0;
 141                                list_for_each_entry_safe(rm, tmp,
 142                                                         &cp->cp_retrans,
 143                                                         m_conn_item) {
 144                                        set_bit(RDS_MSG_FLUSH, &rm->m_flags);
 145                                }
 146                                spin_unlock_irqrestore(&cp->cp_lock, flags);
 147                        }
 148                }
 149                conn->c_peer_gen_num = peer_gen_num;
 150        }
 151}
 152
 153/*
 154 * Process all extension headers that come with this message.
 155 */
 156static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
 157{
 158        struct rds_header *hdr = &inc->i_hdr;
 159        unsigned int pos = 0, type, len;
 160        union {
 161                struct rds_ext_header_version version;
 162                struct rds_ext_header_rdma rdma;
 163                struct rds_ext_header_rdma_dest rdma_dest;
 164        } buffer;
 165
 166        while (1) {
 167                len = sizeof(buffer);
 168                type = rds_message_next_extension(hdr, &pos, &buffer, &len);
 169                if (type == RDS_EXTHDR_NONE)
 170                        break;
 171                /* Process extension header here */
 172                switch (type) {
 173                case RDS_EXTHDR_RDMA:
 174                        rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
 175                        break;
 176
 177                case RDS_EXTHDR_RDMA_DEST:
 178                        /* We ignore the size for now. We could stash it
 179                         * somewhere and use it for error checking. */
 180                        inc->i_rdma_cookie = rds_rdma_make_cookie(
 181                                        be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
 182                                        be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
 183
 184                        break;
 185                }
 186        }
 187}
 188
 189static void rds_recv_hs_exthdrs(struct rds_header *hdr,
 190                                struct rds_connection *conn)
 191{
 192        unsigned int pos = 0, type, len;
 193        union {
 194                struct rds_ext_header_version version;
 195                u16 rds_npaths;
 196                u32 rds_gen_num;
 197        } buffer;
 198        u32 new_peer_gen_num = 0;
 199
 200        while (1) {
 201                len = sizeof(buffer);
 202                type = rds_message_next_extension(hdr, &pos, &buffer, &len);
 203                if (type == RDS_EXTHDR_NONE)
 204                        break;
 205                /* Process extension header here */
 206                switch (type) {
 207                case RDS_EXTHDR_NPATHS:
 208                        conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
 209                                               buffer.rds_npaths);
 210                        break;
 211                case RDS_EXTHDR_GEN_NUM:
 212                        new_peer_gen_num = buffer.rds_gen_num;
 213                        break;
 214                default:
 215                        pr_warn_ratelimited("ignoring unknown exthdr type "
 216                                             "0x%x\n", type);
 217                }
 218        }
 219        /* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
 220        conn->c_npaths = max_t(int, conn->c_npaths, 1);
 221        rds_conn_peer_gen_update(conn, new_peer_gen_num);
 222}
 223
 224/* rds_start_mprds() will synchronously start multiple paths when appropriate.
 225 * The scheme is based on the following rules:
 226 *
 227 * 1. rds_sendmsg on first connect attempt sends the probe ping, with the
 228 *    sender's npaths (s_npaths)
 229 * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It
 230 *    sends back a probe-pong with r_npaths. After that, if rcvr is the
 231 *    smaller ip addr, it starts rds_conn_path_connect_if_down on all
 232 *    mprds_paths.
 233 * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down.
 234 *    If it is the smaller ipaddr, rds_conn_path_connect_if_down can be
 235 *    called after reception of the probe-pong on all mprds_paths.
 236 *    Otherwise (sender of probe-ping is not the smaller ip addr): just call
 237 *    rds_conn_path_connect_if_down on the hashed path. (see rule 4)
 238 * 4. when cp_index > 0, rds_connect_worker must only trigger
 239 *    a connection if laddr < faddr.
 240 * 5. sender may end up queuing the packet on the cp. will get sent out later.
 241 *    when connection is completed.
 242 */
 243static void rds_start_mprds(struct rds_connection *conn)
 244{
 245        int i;
 246        struct rds_conn_path *cp;
 247
 248        if (conn->c_npaths > 1 && conn->c_laddr < conn->c_faddr) {
 249                for (i = 1; i < conn->c_npaths; i++) {
 250                        cp = &conn->c_path[i];
 251                        rds_conn_path_connect_if_down(cp);
 252                }
 253        }
 254}
 255
 256/*
 257 * The transport must make sure that this is serialized against other
 258 * rx and conn reset on this specific conn.
 259 *
 260 * We currently assert that only one fragmented message will be sent
 261 * down a connection at a time.  This lets us reassemble in the conn
 262 * instead of per-flow which means that we don't have to go digging through
 263 * flows to tear down partial reassembly progress on conn failure and
 264 * we save flow lookup and locking for each frag arrival.  It does mean
 265 * that small messages will wait behind large ones.  Fragmenting at all
 266 * is only to reduce the memory consumption of pre-posted buffers.
 267 *
 268 * The caller passes in saddr and daddr instead of us getting it from the
 269 * conn.  This lets loopback, who only has one conn for both directions,
 270 * tell us which roles the addrs in the conn are playing for this message.
 271 */
 272void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
 273                       struct rds_incoming *inc, gfp_t gfp)
 274{
 275        struct rds_sock *rs = NULL;
 276        struct sock *sk;
 277        unsigned long flags;
 278        struct rds_conn_path *cp;
 279
 280        inc->i_conn = conn;
 281        inc->i_rx_jiffies = jiffies;
 282        if (conn->c_trans->t_mp_capable)
 283                cp = inc->i_conn_path;
 284        else
 285                cp = &conn->c_path[0];
 286
 287        rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
 288                 "flags 0x%x rx_jiffies %lu\n", conn,
 289                 (unsigned long long)cp->cp_next_rx_seq,
 290                 inc,
 291                 (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
 292                 be32_to_cpu(inc->i_hdr.h_len),
 293                 be16_to_cpu(inc->i_hdr.h_sport),
 294                 be16_to_cpu(inc->i_hdr.h_dport),
 295                 inc->i_hdr.h_flags,
 296                 inc->i_rx_jiffies);
 297
 298        /*
 299         * Sequence numbers should only increase.  Messages get their
 300         * sequence number as they're queued in a sending conn.  They
 301         * can be dropped, though, if the sending socket is closed before
 302         * they hit the wire.  So sequence numbers can skip forward
 303         * under normal operation.  They can also drop back in the conn
 304         * failover case as previously sent messages are resent down the
 305         * new instance of a conn.  We drop those, otherwise we have
 306         * to assume that the next valid seq does not come after a
 307         * hole in the fragment stream.
 308         *
 309         * The headers don't give us a way to realize if fragments of
 310         * a message have been dropped.  We assume that frags that arrive
 311         * to a flow are part of the current message on the flow that is
 312         * being reassembled.  This means that senders can't drop messages
 313         * from the sending conn until all their frags are sent.
 314         *
 315         * XXX we could spend more on the wire to get more robust failure
 316         * detection, arguably worth it to avoid data corruption.
 317         */
 318        if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq &&
 319            (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
 320                rds_stats_inc(s_recv_drop_old_seq);
 321                goto out;
 322        }
 323        cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
 324
 325        if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
 326                if (inc->i_hdr.h_sport == 0) {
 327                        rdsdebug("ignore ping with 0 sport from 0x%x\n", saddr);
 328                        goto out;
 329                }
 330                rds_stats_inc(s_recv_ping);
 331                rds_send_pong(cp, inc->i_hdr.h_sport);
 332                /* if this is a handshake ping, start multipath if necessary */
 333                if (RDS_HS_PROBE(inc->i_hdr.h_sport, inc->i_hdr.h_dport)) {
 334                        rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
 335                        rds_start_mprds(cp->cp_conn);
 336                }
 337                goto out;
 338        }
 339
 340        if (inc->i_hdr.h_dport ==  RDS_FLAG_PROBE_PORT &&
 341            inc->i_hdr.h_sport == 0) {
 342                rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
 343                /* if this is a handshake pong, start multipath if necessary */
 344                rds_start_mprds(cp->cp_conn);
 345                wake_up(&cp->cp_conn->c_hs_waitq);
 346                goto out;
 347        }
 348
 349        rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
 350        if (!rs) {
 351                rds_stats_inc(s_recv_drop_no_sock);
 352                goto out;
 353        }
 354
 355        /* Process extension headers */
 356        rds_recv_incoming_exthdrs(inc, rs);
 357
 358        /* We can be racing with rds_release() which marks the socket dead. */
 359        sk = rds_rs_to_sk(rs);
 360
 361        /* serialize with rds_release -> sock_orphan */
 362        write_lock_irqsave(&rs->rs_recv_lock, flags);
 363        if (!sock_flag(sk, SOCK_DEAD)) {
 364                rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
 365                rds_stats_inc(s_recv_queued);
 366                rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 367                                      be32_to_cpu(inc->i_hdr.h_len),
 368                                      inc->i_hdr.h_dport);
 369                if (sock_flag(sk, SOCK_RCVTSTAMP))
 370                        do_gettimeofday(&inc->i_rx_tstamp);
 371                rds_inc_addref(inc);
 372                list_add_tail(&inc->i_item, &rs->rs_recv_queue);
 373                __rds_wake_sk_sleep(sk);
 374        } else {
 375                rds_stats_inc(s_recv_drop_dead_sock);
 376        }
 377        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 378
 379out:
 380        if (rs)
 381                rds_sock_put(rs);
 382}
 383EXPORT_SYMBOL_GPL(rds_recv_incoming);
 384
 385/*
 386 * be very careful here.  This is being called as the condition in
 387 * wait_event_*() needs to cope with being called many times.
 388 */
 389static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
 390{
 391        unsigned long flags;
 392
 393        if (!*inc) {
 394                read_lock_irqsave(&rs->rs_recv_lock, flags);
 395                if (!list_empty(&rs->rs_recv_queue)) {
 396                        *inc = list_entry(rs->rs_recv_queue.next,
 397                                          struct rds_incoming,
 398                                          i_item);
 399                        rds_inc_addref(*inc);
 400                }
 401                read_unlock_irqrestore(&rs->rs_recv_lock, flags);
 402        }
 403
 404        return *inc != NULL;
 405}
 406
 407static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
 408                            int drop)
 409{
 410        struct sock *sk = rds_rs_to_sk(rs);
 411        int ret = 0;
 412        unsigned long flags;
 413
 414        write_lock_irqsave(&rs->rs_recv_lock, flags);
 415        if (!list_empty(&inc->i_item)) {
 416                ret = 1;
 417                if (drop) {
 418                        /* XXX make sure this i_conn is reliable */
 419                        rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 420                                              -be32_to_cpu(inc->i_hdr.h_len),
 421                                              inc->i_hdr.h_dport);
 422                        list_del_init(&inc->i_item);
 423                        rds_inc_put(inc);
 424                }
 425        }
 426        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 427
 428        rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
 429        return ret;
 430}
 431
 432/*
 433 * Pull errors off the error queue.
 434 * If msghdr is NULL, we will just purge the error queue.
 435 */
 436int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
 437{
 438        struct rds_notifier *notifier;
 439        struct rds_rdma_notify cmsg = { 0 }; /* fill holes with zero */
 440        unsigned int count = 0, max_messages = ~0U;
 441        unsigned long flags;
 442        LIST_HEAD(copy);
 443        int err = 0;
 444
 445
 446        /* put_cmsg copies to user space and thus may sleep. We can't do this
 447         * with rs_lock held, so first grab as many notifications as we can stuff
 448         * in the user provided cmsg buffer. We don't try to copy more, to avoid
 449         * losing notifications - except when the buffer is so small that it wouldn't
 450         * even hold a single notification. Then we give him as much of this single
 451         * msg as we can squeeze in, and set MSG_CTRUNC.
 452         */
 453        if (msghdr) {
 454                max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
 455                if (!max_messages)
 456                        max_messages = 1;
 457        }
 458
 459        spin_lock_irqsave(&rs->rs_lock, flags);
 460        while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
 461                notifier = list_entry(rs->rs_notify_queue.next,
 462                                struct rds_notifier, n_list);
 463                list_move(&notifier->n_list, &copy);
 464                count++;
 465        }
 466        spin_unlock_irqrestore(&rs->rs_lock, flags);
 467
 468        if (!count)
 469                return 0;
 470
 471        while (!list_empty(&copy)) {
 472                notifier = list_entry(copy.next, struct rds_notifier, n_list);
 473
 474                if (msghdr) {
 475                        cmsg.user_token = notifier->n_user_token;
 476                        cmsg.status = notifier->n_status;
 477
 478                        err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
 479                                       sizeof(cmsg), &cmsg);
 480                        if (err)
 481                                break;
 482                }
 483
 484                list_del_init(&notifier->n_list);
 485                kfree(notifier);
 486        }
 487
 488        /* If we bailed out because of an error in put_cmsg,
 489         * we may be left with one or more notifications that we
 490         * didn't process. Return them to the head of the list. */
 491        if (!list_empty(&copy)) {
 492                spin_lock_irqsave(&rs->rs_lock, flags);
 493                list_splice(&copy, &rs->rs_notify_queue);
 494                spin_unlock_irqrestore(&rs->rs_lock, flags);
 495        }
 496
 497        return err;
 498}
 499
 500/*
 501 * Queue a congestion notification
 502 */
 503static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
 504{
 505        uint64_t notify = rs->rs_cong_notify;
 506        unsigned long flags;
 507        int err;
 508
 509        err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
 510                        sizeof(notify), &notify);
 511        if (err)
 512                return err;
 513
 514        spin_lock_irqsave(&rs->rs_lock, flags);
 515        rs->rs_cong_notify &= ~notify;
 516        spin_unlock_irqrestore(&rs->rs_lock, flags);
 517
 518        return 0;
 519}
 520
 521/*
 522 * Receive any control messages.
 523 */
 524static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
 525                         struct rds_sock *rs)
 526{
 527        int ret = 0;
 528
 529        if (inc->i_rdma_cookie) {
 530                ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
 531                                sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie);
 532                if (ret)
 533                        return ret;
 534        }
 535
 536        if ((inc->i_rx_tstamp.tv_sec != 0) &&
 537            sock_flag(rds_rs_to_sk(rs), SOCK_RCVTSTAMP)) {
 538                ret = put_cmsg(msg, SOL_SOCKET, SCM_TIMESTAMP,
 539                               sizeof(struct timeval),
 540                               &inc->i_rx_tstamp);
 541                if (ret)
 542                        return ret;
 543        }
 544
 545        return 0;
 546}
 547
 548int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
 549                size_t size, int msg_flags)
 550{
 551        struct sock *sk = sock->sk;
 552        struct rds_sock *rs = rds_sk_to_rs(sk);
 553        long timeo;
 554        int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
 555        struct sockaddr_in *sin;
 556        struct rds_incoming *inc = NULL;
 557
 558        /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
 559        timeo = sock_rcvtimeo(sk, nonblock);
 560
 561        rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
 562
 563        if (msg_flags & MSG_OOB)
 564                goto out;
 565
 566        while (1) {
 567                /* If there are pending notifications, do those - and nothing else */
 568                if (!list_empty(&rs->rs_notify_queue)) {
 569                        ret = rds_notify_queue_get(rs, msg);
 570                        break;
 571                }
 572
 573                if (rs->rs_cong_notify) {
 574                        ret = rds_notify_cong(rs, msg);
 575                        break;
 576                }
 577
 578                if (!rds_next_incoming(rs, &inc)) {
 579                        if (nonblock) {
 580                                ret = -EAGAIN;
 581                                break;
 582                        }
 583
 584                        timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
 585                                        (!list_empty(&rs->rs_notify_queue) ||
 586                                         rs->rs_cong_notify ||
 587                                         rds_next_incoming(rs, &inc)), timeo);
 588                        rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
 589                                 timeo);
 590                        if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
 591                                continue;
 592
 593                        ret = timeo;
 594                        if (ret == 0)
 595                                ret = -ETIMEDOUT;
 596                        break;
 597                }
 598
 599                rdsdebug("copying inc %p from %pI4:%u to user\n", inc,
 600                         &inc->i_conn->c_faddr,
 601                         ntohs(inc->i_hdr.h_sport));
 602                ret = inc->i_conn->c_trans->inc_copy_to_user(inc, msg->msg_iov,
 603                                                             size);
 604                if (ret < 0)
 605                        break;
 606
 607                /*
 608                 * if the message we just copied isn't at the head of the
 609                 * recv queue then someone else raced us to return it, try
 610                 * to get the next message.
 611                 */
 612                if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
 613                        rds_inc_put(inc);
 614                        inc = NULL;
 615                        rds_stats_inc(s_recv_deliver_raced);
 616                        continue;
 617                }
 618
 619                if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
 620                        if (msg_flags & MSG_TRUNC)
 621                                ret = be32_to_cpu(inc->i_hdr.h_len);
 622                        msg->msg_flags |= MSG_TRUNC;
 623                }
 624
 625                if (rds_cmsg_recv(inc, msg, rs)) {
 626                        ret = -EFAULT;
 627                        goto out;
 628                }
 629
 630                rds_stats_inc(s_recv_delivered);
 631
 632                sin = (struct sockaddr_in *)msg->msg_name;
 633                if (sin) {
 634                        sin->sin_family = AF_INET;
 635                        sin->sin_port = inc->i_hdr.h_sport;
 636                        sin->sin_addr.s_addr = inc->i_saddr;
 637                        memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
 638                        msg->msg_namelen = sizeof(*sin);
 639                }
 640                break;
 641        }
 642
 643        if (inc)
 644                rds_inc_put(inc);
 645
 646out:
 647        return ret;
 648}
 649
 650/*
 651 * The socket is being shut down and we're asked to drop messages that were
 652 * queued for recvmsg.  The caller has unbound the socket so the receive path
 653 * won't queue any more incoming fragments or messages on the socket.
 654 */
 655void rds_clear_recv_queue(struct rds_sock *rs)
 656{
 657        struct sock *sk = rds_rs_to_sk(rs);
 658        struct rds_incoming *inc, *tmp;
 659        unsigned long flags;
 660
 661        write_lock_irqsave(&rs->rs_recv_lock, flags);
 662        list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
 663                rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 664                                      -be32_to_cpu(inc->i_hdr.h_len),
 665                                      inc->i_hdr.h_dport);
 666                list_del_init(&inc->i_item);
 667                rds_inc_put(inc);
 668        }
 669        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 670}
 671
 672/*
 673 * inc->i_saddr isn't used here because it is only set in the receive
 674 * path.
 675 */
 676void rds_inc_info_copy(struct rds_incoming *inc,
 677                       struct rds_info_iterator *iter,
 678                       __be32 saddr, __be32 daddr, int flip)
 679{
 680        struct rds_info_message minfo;
 681
 682        minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
 683        minfo.len = be32_to_cpu(inc->i_hdr.h_len);
 684
 685        if (flip) {
 686                minfo.laddr = daddr;
 687                minfo.faddr = saddr;
 688                minfo.lport = inc->i_hdr.h_dport;
 689                minfo.fport = inc->i_hdr.h_sport;
 690        } else {
 691                minfo.laddr = saddr;
 692                minfo.faddr = daddr;
 693                minfo.lport = inc->i_hdr.h_sport;
 694                minfo.fport = inc->i_hdr.h_dport;
 695        }
 696
 697        minfo.flags = 0;
 698
 699        rds_info_copy(iter, &minfo, sizeof(minfo));
 700}
 701