linux/net/rds/recv.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/slab.h>
  35#include <net/sock.h>
  36#include <linux/in.h>
  37#include <linux/export.h>
  38
  39#include "rds.h"
  40
  41void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
  42                  __be32 saddr)
  43{
  44        atomic_set(&inc->i_refcount, 1);
  45        INIT_LIST_HEAD(&inc->i_item);
  46        inc->i_conn = conn;
  47        inc->i_saddr = saddr;
  48        inc->i_rdma_cookie = 0;
  49}
  50EXPORT_SYMBOL_GPL(rds_inc_init);
  51
  52static void rds_inc_addref(struct rds_incoming *inc)
  53{
  54        rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
  55        atomic_inc(&inc->i_refcount);
  56}
  57
  58void rds_inc_put(struct rds_incoming *inc)
  59{
  60        rdsdebug("put inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
  61        if (atomic_dec_and_test(&inc->i_refcount)) {
  62                BUG_ON(!list_empty(&inc->i_item));
  63
  64                inc->i_conn->c_trans->inc_free(inc);
  65        }
  66}
  67EXPORT_SYMBOL_GPL(rds_inc_put);
  68
  69static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
  70                                  struct rds_cong_map *map,
  71                                  int delta, __be16 port)
  72{
  73        int now_congested;
  74
  75        if (delta == 0)
  76                return;
  77
  78        rs->rs_rcv_bytes += delta;
  79        now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
  80
  81        rdsdebug("rs %p (%pI4:%u) recv bytes %d buf %d "
  82          "now_cong %d delta %d\n",
  83          rs, &rs->rs_bound_addr,
  84          ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
  85          rds_sk_rcvbuf(rs), now_congested, delta);
  86
  87        /* wasn't -> am congested */
  88        if (!rs->rs_congested && now_congested) {
  89                rs->rs_congested = 1;
  90                rds_cong_set_bit(map, port);
  91                rds_cong_queue_updates(map);
  92        }
  93        /* was -> aren't congested */
  94        /* Require more free space before reporting uncongested to prevent
  95           bouncing cong/uncong state too often */
  96        else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
  97                rs->rs_congested = 0;
  98                rds_cong_clear_bit(map, port);
  99                rds_cong_queue_updates(map);
 100        }
 101
 102        /* do nothing if no change in cong state */
 103}
 104
 105/*
 106 * Process all extension headers that come with this message.
 107 */
 108static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
 109{
 110        struct rds_header *hdr = &inc->i_hdr;
 111        unsigned int pos = 0, type, len;
 112        union {
 113                struct rds_ext_header_version version;
 114                struct rds_ext_header_rdma rdma;
 115                struct rds_ext_header_rdma_dest rdma_dest;
 116        } buffer;
 117
 118        while (1) {
 119                len = sizeof(buffer);
 120                type = rds_message_next_extension(hdr, &pos, &buffer, &len);
 121                if (type == RDS_EXTHDR_NONE)
 122                        break;
 123                /* Process extension header here */
 124                switch (type) {
 125                case RDS_EXTHDR_RDMA:
 126                        rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
 127                        break;
 128
 129                case RDS_EXTHDR_RDMA_DEST:
 130                        /* We ignore the size for now. We could stash it
 131                         * somewhere and use it for error checking. */
 132                        inc->i_rdma_cookie = rds_rdma_make_cookie(
 133                                        be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
 134                                        be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
 135
 136                        break;
 137                }
 138        }
 139}
 140
 141/*
 142 * The transport must make sure that this is serialized against other
 143 * rx and conn reset on this specific conn.
 144 *
 145 * We currently assert that only one fragmented message will be sent
 146 * down a connection at a time.  This lets us reassemble in the conn
 147 * instead of per-flow which means that we don't have to go digging through
 148 * flows to tear down partial reassembly progress on conn failure and
 149 * we save flow lookup and locking for each frag arrival.  It does mean
 150 * that small messages will wait behind large ones.  Fragmenting at all
 151 * is only to reduce the memory consumption of pre-posted buffers.
 152 *
 153 * The caller passes in saddr and daddr instead of us getting it from the
 154 * conn.  This lets loopback, who only has one conn for both directions,
 155 * tell us which roles the addrs in the conn are playing for this message.
 156 */
 157void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
 158                       struct rds_incoming *inc, gfp_t gfp)
 159{
 160        struct rds_sock *rs = NULL;
 161        struct sock *sk;
 162        unsigned long flags;
 163
 164        inc->i_conn = conn;
 165        inc->i_rx_jiffies = jiffies;
 166
 167        rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
 168                 "flags 0x%x rx_jiffies %lu\n", conn,
 169                 (unsigned long long)conn->c_next_rx_seq,
 170                 inc,
 171                 (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
 172                 be32_to_cpu(inc->i_hdr.h_len),
 173                 be16_to_cpu(inc->i_hdr.h_sport),
 174                 be16_to_cpu(inc->i_hdr.h_dport),
 175                 inc->i_hdr.h_flags,
 176                 inc->i_rx_jiffies);
 177
 178        /*
 179         * Sequence numbers should only increase.  Messages get their
 180         * sequence number as they're queued in a sending conn.  They
 181         * can be dropped, though, if the sending socket is closed before
 182         * they hit the wire.  So sequence numbers can skip forward
 183         * under normal operation.  They can also drop back in the conn
 184         * failover case as previously sent messages are resent down the
 185         * new instance of a conn.  We drop those, otherwise we have
 186         * to assume that the next valid seq does not come after a
 187         * hole in the fragment stream.
 188         *
 189         * The headers don't give us a way to realize if fragments of
 190         * a message have been dropped.  We assume that frags that arrive
 191         * to a flow are part of the current message on the flow that is
 192         * being reassembled.  This means that senders can't drop messages
 193         * from the sending conn until all their frags are sent.
 194         *
 195         * XXX we could spend more on the wire to get more robust failure
 196         * detection, arguably worth it to avoid data corruption.
 197         */
 198        if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq &&
 199            (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
 200                rds_stats_inc(s_recv_drop_old_seq);
 201                goto out;
 202        }
 203        conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
 204
 205        if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
 206                rds_stats_inc(s_recv_ping);
 207                rds_send_pong(conn, inc->i_hdr.h_sport);
 208                goto out;
 209        }
 210
 211        rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
 212        if (!rs) {
 213                rds_stats_inc(s_recv_drop_no_sock);
 214                goto out;
 215        }
 216
 217        /* Process extension headers */
 218        rds_recv_incoming_exthdrs(inc, rs);
 219
 220        /* We can be racing with rds_release() which marks the socket dead. */
 221        sk = rds_rs_to_sk(rs);
 222
 223        /* serialize with rds_release -> sock_orphan */
 224        write_lock_irqsave(&rs->rs_recv_lock, flags);
 225        if (!sock_flag(sk, SOCK_DEAD)) {
 226                rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
 227                rds_stats_inc(s_recv_queued);
 228                rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 229                                      be32_to_cpu(inc->i_hdr.h_len),
 230                                      inc->i_hdr.h_dport);
 231                rds_inc_addref(inc);
 232                list_add_tail(&inc->i_item, &rs->rs_recv_queue);
 233                __rds_wake_sk_sleep(sk);
 234        } else {
 235                rds_stats_inc(s_recv_drop_dead_sock);
 236        }
 237        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 238
 239out:
 240        if (rs)
 241                rds_sock_put(rs);
 242}
 243EXPORT_SYMBOL_GPL(rds_recv_incoming);
 244
 245/*
 246 * be very careful here.  This is being called as the condition in
 247 * wait_event_*() needs to cope with being called many times.
 248 */
 249static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
 250{
 251        unsigned long flags;
 252
 253        if (!*inc) {
 254                read_lock_irqsave(&rs->rs_recv_lock, flags);
 255                if (!list_empty(&rs->rs_recv_queue)) {
 256                        *inc = list_entry(rs->rs_recv_queue.next,
 257                                          struct rds_incoming,
 258                                          i_item);
 259                        rds_inc_addref(*inc);
 260                }
 261                read_unlock_irqrestore(&rs->rs_recv_lock, flags);
 262        }
 263
 264        return *inc != NULL;
 265}
 266
 267static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
 268                            int drop)
 269{
 270        struct sock *sk = rds_rs_to_sk(rs);
 271        int ret = 0;
 272        unsigned long flags;
 273
 274        write_lock_irqsave(&rs->rs_recv_lock, flags);
 275        if (!list_empty(&inc->i_item)) {
 276                ret = 1;
 277                if (drop) {
 278                        /* XXX make sure this i_conn is reliable */
 279                        rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 280                                              -be32_to_cpu(inc->i_hdr.h_len),
 281                                              inc->i_hdr.h_dport);
 282                        list_del_init(&inc->i_item);
 283                        rds_inc_put(inc);
 284                }
 285        }
 286        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 287
 288        rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
 289        return ret;
 290}
 291
 292/*
 293 * Pull errors off the error queue.
 294 * If msghdr is NULL, we will just purge the error queue.
 295 */
 296int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
 297{
 298        struct rds_notifier *notifier;
 299        struct rds_rdma_notify cmsg = { 0 }; /* fill holes with zero */
 300        unsigned int count = 0, max_messages = ~0U;
 301        unsigned long flags;
 302        LIST_HEAD(copy);
 303        int err = 0;
 304
 305
 306        /* put_cmsg copies to user space and thus may sleep. We can't do this
 307         * with rs_lock held, so first grab as many notifications as we can stuff
 308         * in the user provided cmsg buffer. We don't try to copy more, to avoid
 309         * losing notifications - except when the buffer is so small that it wouldn't
 310         * even hold a single notification. Then we give him as much of this single
 311         * msg as we can squeeze in, and set MSG_CTRUNC.
 312         */
 313        if (msghdr) {
 314                max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
 315                if (!max_messages)
 316                        max_messages = 1;
 317        }
 318
 319        spin_lock_irqsave(&rs->rs_lock, flags);
 320        while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
 321                notifier = list_entry(rs->rs_notify_queue.next,
 322                                struct rds_notifier, n_list);
 323                list_move(&notifier->n_list, &copy);
 324                count++;
 325        }
 326        spin_unlock_irqrestore(&rs->rs_lock, flags);
 327
 328        if (!count)
 329                return 0;
 330
 331        while (!list_empty(&copy)) {
 332                notifier = list_entry(copy.next, struct rds_notifier, n_list);
 333
 334                if (msghdr) {
 335                        cmsg.user_token = notifier->n_user_token;
 336                        cmsg.status = notifier->n_status;
 337
 338                        err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
 339                                       sizeof(cmsg), &cmsg);
 340                        if (err)
 341                                break;
 342                }
 343
 344                list_del_init(&notifier->n_list);
 345                kfree(notifier);
 346        }
 347
 348        /* If we bailed out because of an error in put_cmsg,
 349         * we may be left with one or more notifications that we
 350         * didn't process. Return them to the head of the list. */
 351        if (!list_empty(&copy)) {
 352                spin_lock_irqsave(&rs->rs_lock, flags);
 353                list_splice(&copy, &rs->rs_notify_queue);
 354                spin_unlock_irqrestore(&rs->rs_lock, flags);
 355        }
 356
 357        return err;
 358}
 359
 360/*
 361 * Queue a congestion notification
 362 */
 363static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
 364{
 365        uint64_t notify = rs->rs_cong_notify;
 366        unsigned long flags;
 367        int err;
 368
 369        err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
 370                        sizeof(notify), &notify);
 371        if (err)
 372                return err;
 373
 374        spin_lock_irqsave(&rs->rs_lock, flags);
 375        rs->rs_cong_notify &= ~notify;
 376        spin_unlock_irqrestore(&rs->rs_lock, flags);
 377
 378        return 0;
 379}
 380
 381/*
 382 * Receive any control messages.
 383 */
 384static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg)
 385{
 386        int ret = 0;
 387
 388        if (inc->i_rdma_cookie) {
 389                ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
 390                                sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie);
 391                if (ret)
 392                        return ret;
 393        }
 394
 395        return 0;
 396}
 397
 398int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
 399                size_t size, int msg_flags)
 400{
 401        struct sock *sk = sock->sk;
 402        struct rds_sock *rs = rds_sk_to_rs(sk);
 403        long timeo;
 404        int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
 405        DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
 406        struct rds_incoming *inc = NULL;
 407
 408        /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
 409        timeo = sock_rcvtimeo(sk, nonblock);
 410
 411        rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
 412
 413        if (msg_flags & MSG_OOB)
 414                goto out;
 415
 416        while (1) {
 417                struct iov_iter save;
 418                /* If there are pending notifications, do those - and nothing else */
 419                if (!list_empty(&rs->rs_notify_queue)) {
 420                        ret = rds_notify_queue_get(rs, msg);
 421                        break;
 422                }
 423
 424                if (rs->rs_cong_notify) {
 425                        ret = rds_notify_cong(rs, msg);
 426                        break;
 427                }
 428
 429                if (!rds_next_incoming(rs, &inc)) {
 430                        if (nonblock) {
 431                                ret = -EAGAIN;
 432                                break;
 433                        }
 434
 435                        timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
 436                                        (!list_empty(&rs->rs_notify_queue) ||
 437                                         rs->rs_cong_notify ||
 438                                         rds_next_incoming(rs, &inc)), timeo);
 439                        rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
 440                                 timeo);
 441                        if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
 442                                continue;
 443
 444                        ret = timeo;
 445                        if (ret == 0)
 446                                ret = -ETIMEDOUT;
 447                        break;
 448                }
 449
 450                rdsdebug("copying inc %p from %pI4:%u to user\n", inc,
 451                         &inc->i_conn->c_faddr,
 452                         ntohs(inc->i_hdr.h_sport));
 453                save = msg->msg_iter;
 454                ret = inc->i_conn->c_trans->inc_copy_to_user(inc, &msg->msg_iter);
 455                if (ret < 0)
 456                        break;
 457
 458                /*
 459                 * if the message we just copied isn't at the head of the
 460                 * recv queue then someone else raced us to return it, try
 461                 * to get the next message.
 462                 */
 463                if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
 464                        rds_inc_put(inc);
 465                        inc = NULL;
 466                        rds_stats_inc(s_recv_deliver_raced);
 467                        msg->msg_iter = save;
 468                        continue;
 469                }
 470
 471                if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
 472                        if (msg_flags & MSG_TRUNC)
 473                                ret = be32_to_cpu(inc->i_hdr.h_len);
 474                        msg->msg_flags |= MSG_TRUNC;
 475                }
 476
 477                if (rds_cmsg_recv(inc, msg)) {
 478                        ret = -EFAULT;
 479                        goto out;
 480                }
 481
 482                rds_stats_inc(s_recv_delivered);
 483
 484                if (sin) {
 485                        sin->sin_family = AF_INET;
 486                        sin->sin_port = inc->i_hdr.h_sport;
 487                        sin->sin_addr.s_addr = inc->i_saddr;
 488                        memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
 489                        msg->msg_namelen = sizeof(*sin);
 490                }
 491                break;
 492        }
 493
 494        if (inc)
 495                rds_inc_put(inc);
 496
 497out:
 498        return ret;
 499}
 500
 501/*
 502 * The socket is being shut down and we're asked to drop messages that were
 503 * queued for recvmsg.  The caller has unbound the socket so the receive path
 504 * won't queue any more incoming fragments or messages on the socket.
 505 */
 506void rds_clear_recv_queue(struct rds_sock *rs)
 507{
 508        struct sock *sk = rds_rs_to_sk(rs);
 509        struct rds_incoming *inc, *tmp;
 510        unsigned long flags;
 511
 512        write_lock_irqsave(&rs->rs_recv_lock, flags);
 513        list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
 514                rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 515                                      -be32_to_cpu(inc->i_hdr.h_len),
 516                                      inc->i_hdr.h_dport);
 517                list_del_init(&inc->i_item);
 518                rds_inc_put(inc);
 519        }
 520        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 521}
 522
 523/*
 524 * inc->i_saddr isn't used here because it is only set in the receive
 525 * path.
 526 */
 527void rds_inc_info_copy(struct rds_incoming *inc,
 528                       struct rds_info_iterator *iter,
 529                       __be32 saddr, __be32 daddr, int flip)
 530{
 531        struct rds_info_message minfo;
 532
 533        minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
 534        minfo.len = be32_to_cpu(inc->i_hdr.h_len);
 535
 536        if (flip) {
 537                minfo.laddr = daddr;
 538                minfo.faddr = saddr;
 539                minfo.lport = inc->i_hdr.h_dport;
 540                minfo.fport = inc->i_hdr.h_sport;
 541        } else {
 542                minfo.laddr = saddr;
 543                minfo.faddr = daddr;
 544                minfo.lport = inc->i_hdr.h_sport;
 545                minfo.fport = inc->i_hdr.h_dport;
 546        }
 547
 548        rds_info_copy(iter, &minfo, sizeof(minfo));
 549}
 550