linux/net/rxrpc/recvmsg.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/* RxRPC recvmsg() implementation
   3 *
   4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   5 * Written by David Howells (dhowells@redhat.com)
   6 */
   7
   8#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
   9
  10#include <linux/net.h>
  11#include <linux/skbuff.h>
  12#include <linux/export.h>
  13#include <linux/sched/signal.h>
  14
  15#include <net/sock.h>
  16#include <net/af_rxrpc.h>
  17#include "ar-internal.h"
  18
  19/*
  20 * Post a call for attention by the socket or kernel service.  Further
  21 * notifications are suppressed by putting recvmsg_link on a dummy queue.
  22 */
  23void rxrpc_notify_socket(struct rxrpc_call *call)
  24{
  25        struct rxrpc_sock *rx;
  26        struct sock *sk;
  27
  28        _enter("%d", call->debug_id);
  29
  30        if (!list_empty(&call->recvmsg_link))
  31                return;
  32
  33        rcu_read_lock();
  34
  35        rx = rcu_dereference(call->socket);
  36        sk = &rx->sk;
  37        if (rx && sk->sk_state < RXRPC_CLOSE) {
  38                if (call->notify_rx) {
  39                        spin_lock_bh(&call->notify_lock);
  40                        call->notify_rx(sk, call, call->user_call_ID);
  41                        spin_unlock_bh(&call->notify_lock);
  42                } else {
  43                        write_lock_bh(&rx->recvmsg_lock);
  44                        if (list_empty(&call->recvmsg_link)) {
  45                                rxrpc_get_call(call, rxrpc_call_got);
  46                                list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
  47                        }
  48                        write_unlock_bh(&rx->recvmsg_lock);
  49
  50                        if (!sock_flag(sk, SOCK_DEAD)) {
  51                                _debug("call %ps", sk->sk_data_ready);
  52                                sk->sk_data_ready(sk);
  53                        }
  54                }
  55        }
  56
  57        rcu_read_unlock();
  58        _leave("");
  59}
  60
  61/*
  62 * Transition a call to the complete state.
  63 */
  64bool __rxrpc_set_call_completion(struct rxrpc_call *call,
  65                                 enum rxrpc_call_completion compl,
  66                                 u32 abort_code,
  67                                 int error)
  68{
  69        if (call->state < RXRPC_CALL_COMPLETE) {
  70                call->abort_code = abort_code;
  71                call->error = error;
  72                call->completion = compl;
  73                call->state = RXRPC_CALL_COMPLETE;
  74                trace_rxrpc_call_complete(call);
  75                wake_up(&call->waitq);
  76                rxrpc_notify_socket(call);
  77                return true;
  78        }
  79        return false;
  80}
  81
  82bool rxrpc_set_call_completion(struct rxrpc_call *call,
  83                               enum rxrpc_call_completion compl,
  84                               u32 abort_code,
  85                               int error)
  86{
  87        bool ret = false;
  88
  89        if (call->state < RXRPC_CALL_COMPLETE) {
  90                write_lock_bh(&call->state_lock);
  91                ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
  92                write_unlock_bh(&call->state_lock);
  93        }
  94        return ret;
  95}
  96
  97/*
  98 * Record that a call successfully completed.
  99 */
 100bool __rxrpc_call_completed(struct rxrpc_call *call)
 101{
 102        return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
 103}
 104
 105bool rxrpc_call_completed(struct rxrpc_call *call)
 106{
 107        bool ret = false;
 108
 109        if (call->state < RXRPC_CALL_COMPLETE) {
 110                write_lock_bh(&call->state_lock);
 111                ret = __rxrpc_call_completed(call);
 112                write_unlock_bh(&call->state_lock);
 113        }
 114        return ret;
 115}
 116
 117/*
 118 * Record that a call is locally aborted.
 119 */
 120bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call,
 121                        rxrpc_seq_t seq, u32 abort_code, int error)
 122{
 123        trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq,
 124                          abort_code, error);
 125        return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
 126                                           abort_code, error);
 127}
 128
 129bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
 130                      rxrpc_seq_t seq, u32 abort_code, int error)
 131{
 132        bool ret;
 133
 134        write_lock_bh(&call->state_lock);
 135        ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
 136        write_unlock_bh(&call->state_lock);
 137        return ret;
 138}
 139
 140/*
 141 * Pass a call terminating message to userspace.
 142 */
 143static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
 144{
 145        u32 tmp = 0;
 146        int ret;
 147
 148        switch (call->completion) {
 149        case RXRPC_CALL_SUCCEEDED:
 150                ret = 0;
 151                if (rxrpc_is_service_call(call))
 152                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
 153                break;
 154        case RXRPC_CALL_REMOTELY_ABORTED:
 155                tmp = call->abort_code;
 156                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
 157                break;
 158        case RXRPC_CALL_LOCALLY_ABORTED:
 159                tmp = call->abort_code;
 160                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
 161                break;
 162        case RXRPC_CALL_NETWORK_ERROR:
 163                tmp = -call->error;
 164                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
 165                break;
 166        case RXRPC_CALL_LOCAL_ERROR:
 167                tmp = -call->error;
 168                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
 169                break;
 170        default:
 171                pr_err("Invalid terminal call state %u\n", call->state);
 172                BUG();
 173                break;
 174        }
 175
 176        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
 177                            call->rx_pkt_offset, call->rx_pkt_len, ret);
 178        return ret;
 179}
 180
 181/*
 182 * End the packet reception phase.
 183 */
 184static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 185{
 186        _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
 187
 188        trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
 189        ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
 190
 191        if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
 192                rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true,
 193                                  rxrpc_propose_ack_terminal_ack);
 194                //rxrpc_send_ack_packet(call, false, NULL);
 195        }
 196
 197        write_lock_bh(&call->state_lock);
 198
 199        switch (call->state) {
 200        case RXRPC_CALL_CLIENT_RECV_REPLY:
 201                __rxrpc_call_completed(call);
 202                write_unlock_bh(&call->state_lock);
 203                break;
 204
 205        case RXRPC_CALL_SERVER_RECV_REQUEST:
 206                call->tx_phase = true;
 207                call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
 208                call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
 209                write_unlock_bh(&call->state_lock);
 210                rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true,
 211                                  rxrpc_propose_ack_processing_op);
 212                break;
 213        default:
 214                write_unlock_bh(&call->state_lock);
 215                break;
 216        }
 217}
 218
 219/*
 220 * Discard a packet we've used up and advance the Rx window by one.
 221 */
 222static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
 223{
 224        struct rxrpc_skb_priv *sp;
 225        struct sk_buff *skb;
 226        rxrpc_serial_t serial;
 227        rxrpc_seq_t hard_ack, top;
 228        bool last = false;
 229        u8 subpacket;
 230        int ix;
 231
 232        _enter("%d", call->debug_id);
 233
 234        hard_ack = call->rx_hard_ack;
 235        top = smp_load_acquire(&call->rx_top);
 236        ASSERT(before(hard_ack, top));
 237
 238        hard_ack++;
 239        ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
 240        skb = call->rxtx_buffer[ix];
 241        rxrpc_see_skb(skb, rxrpc_skb_rotated);
 242        sp = rxrpc_skb(skb);
 243
 244        subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
 245        serial = sp->hdr.serial + subpacket;
 246
 247        if (subpacket == sp->nr_subpackets - 1 &&
 248            sp->rx_flags & RXRPC_SKB_INCL_LAST)
 249                last = true;
 250
 251        call->rxtx_buffer[ix] = NULL;
 252        call->rxtx_annotations[ix] = 0;
 253        /* Barrier against rxrpc_input_data(). */
 254        smp_store_release(&call->rx_hard_ack, hard_ack);
 255
 256        rxrpc_free_skb(skb, rxrpc_skb_freed);
 257
 258        trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
 259        if (last) {
 260                rxrpc_end_rx_phase(call, serial);
 261        } else {
 262                /* Check to see if there's an ACK that needs sending. */
 263                if (after_eq(hard_ack, call->ackr_consumed + 2) ||
 264                    after_eq(top, call->ackr_seen + 2) ||
 265                    (hard_ack == top && after(hard_ack, call->ackr_consumed)))
 266                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial,
 267                                          true, true,
 268                                          rxrpc_propose_ack_rotate_rx);
 269                if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
 270                        rxrpc_send_ack_packet(call, false, NULL);
 271        }
 272}
 273
 274/*
 275 * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
 276 * padding, but if this is the case, the packet length will be resident in the
 277 * socket buffer.  Note that we can't modify the master skb info as the skb may
 278 * be the home to multiple subpackets.
 279 */
 280static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
 281                               u8 annotation,
 282                               unsigned int offset, unsigned int len)
 283{
 284        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 285        rxrpc_seq_t seq = sp->hdr.seq;
 286        u16 cksum = sp->hdr.cksum;
 287        u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
 288
 289        _enter("");
 290
 291        /* For all but the head jumbo subpacket, the security checksum is in a
 292         * jumbo header immediately prior to the data.
 293         */
 294        if (subpacket > 0) {
 295                __be16 tmp;
 296                if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
 297                        BUG();
 298                cksum = ntohs(tmp);
 299                seq += subpacket;
 300        }
 301
 302        return call->security->verify_packet(call, skb, offset, len,
 303                                             seq, cksum);
 304}
 305
 306/*
 307 * Locate the data within a packet.  This is complicated by:
 308 *
 309 * (1) An skb may contain a jumbo packet - so we have to find the appropriate
 310 *     subpacket.
 311 *
 312 * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
 313 *     contains an extra header which includes the true length of the data,
 314 *     excluding any encrypted padding.
 315 */
 316static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
 317                             u8 *_annotation,
 318                             unsigned int *_offset, unsigned int *_len,
 319                             bool *_last)
 320{
 321        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 322        unsigned int offset = sizeof(struct rxrpc_wire_header);
 323        unsigned int len;
 324        bool last = false;
 325        int ret;
 326        u8 annotation = *_annotation;
 327        u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
 328
 329        /* Locate the subpacket */
 330        offset += subpacket * RXRPC_JUMBO_SUBPKTLEN;
 331        len = skb->len - offset;
 332        if (subpacket < sp->nr_subpackets - 1)
 333                len = RXRPC_JUMBO_DATALEN;
 334        else if (sp->rx_flags & RXRPC_SKB_INCL_LAST)
 335                last = true;
 336
 337        if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
 338                ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
 339                if (ret < 0)
 340                        return ret;
 341                *_annotation |= RXRPC_RX_ANNO_VERIFIED;
 342        }
 343
 344        *_offset = offset;
 345        *_len = len;
 346        *_last = last;
 347        call->security->locate_data(call, skb, _offset, _len);
 348        return 0;
 349}
 350
 351/*
 352 * Deliver messages to a call.  This keeps processing packets until the buffer
 353 * is filled and we find either more DATA (returns 0) or the end of the DATA
 354 * (returns 1).  If more packets are required, it returns -EAGAIN.
 355 */
 356static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 357                              struct msghdr *msg, struct iov_iter *iter,
 358                              size_t len, int flags, size_t *_offset)
 359{
 360        struct rxrpc_skb_priv *sp;
 361        struct sk_buff *skb;
 362        rxrpc_serial_t serial;
 363        rxrpc_seq_t hard_ack, top, seq;
 364        size_t remain;
 365        bool rx_pkt_last;
 366        unsigned int rx_pkt_offset, rx_pkt_len;
 367        int ix, copy, ret = -EAGAIN, ret2;
 368
 369        if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
 370            call->ackr_reason)
 371                rxrpc_send_ack_packet(call, false, NULL);
 372
 373        rx_pkt_offset = call->rx_pkt_offset;
 374        rx_pkt_len = call->rx_pkt_len;
 375        rx_pkt_last = call->rx_pkt_last;
 376
 377        if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
 378                seq = call->rx_hard_ack;
 379                ret = 1;
 380                goto done;
 381        }
 382
 383        /* Barriers against rxrpc_input_data(). */
 384        hard_ack = call->rx_hard_ack;
 385        seq = hard_ack + 1;
 386
 387        while (top = smp_load_acquire(&call->rx_top),
 388               before_eq(seq, top)
 389               ) {
 390                ix = seq & RXRPC_RXTX_BUFF_MASK;
 391                skb = call->rxtx_buffer[ix];
 392                if (!skb) {
 393                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
 394                                            rx_pkt_offset, rx_pkt_len, 0);
 395                        break;
 396                }
 397                smp_rmb();
 398                rxrpc_see_skb(skb, rxrpc_skb_seen);
 399                sp = rxrpc_skb(skb);
 400
 401                if (!(flags & MSG_PEEK)) {
 402                        serial = sp->hdr.serial;
 403                        serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
 404                        trace_rxrpc_receive(call, rxrpc_receive_front,
 405                                            serial, seq);
 406                }
 407
 408                if (msg)
 409                        sock_recv_timestamp(msg, sock->sk, skb);
 410
 411                if (rx_pkt_offset == 0) {
 412                        ret2 = rxrpc_locate_data(call, skb,
 413                                                 &call->rxtx_annotations[ix],
 414                                                 &rx_pkt_offset, &rx_pkt_len,
 415                                                 &rx_pkt_last);
 416                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
 417                                            rx_pkt_offset, rx_pkt_len, ret2);
 418                        if (ret2 < 0) {
 419                                ret = ret2;
 420                                goto out;
 421                        }
 422                } else {
 423                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
 424                                            rx_pkt_offset, rx_pkt_len, 0);
 425                }
 426
 427                /* We have to handle short, empty and used-up DATA packets. */
 428                remain = len - *_offset;
 429                copy = rx_pkt_len;
 430                if (copy > remain)
 431                        copy = remain;
 432                if (copy > 0) {
 433                        ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
 434                                                      copy);
 435                        if (ret2 < 0) {
 436                                ret = ret2;
 437                                goto out;
 438                        }
 439
 440                        /* handle piecemeal consumption of data packets */
 441                        rx_pkt_offset += copy;
 442                        rx_pkt_len -= copy;
 443                        *_offset += copy;
 444                }
 445
 446                if (rx_pkt_len > 0) {
 447                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
 448                                            rx_pkt_offset, rx_pkt_len, 0);
 449                        ASSERTCMP(*_offset, ==, len);
 450                        ret = 0;
 451                        break;
 452                }
 453
 454                /* The whole packet has been transferred. */
 455                if (!(flags & MSG_PEEK))
 456                        rxrpc_rotate_rx_window(call);
 457                rx_pkt_offset = 0;
 458                rx_pkt_len = 0;
 459
 460                if (rx_pkt_last) {
 461                        ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
 462                        ret = 1;
 463                        goto out;
 464                }
 465
 466                seq++;
 467        }
 468
 469out:
 470        if (!(flags & MSG_PEEK)) {
 471                call->rx_pkt_offset = rx_pkt_offset;
 472                call->rx_pkt_len = rx_pkt_len;
 473                call->rx_pkt_last = rx_pkt_last;
 474        }
 475done:
 476        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
 477                            rx_pkt_offset, rx_pkt_len, ret);
 478        if (ret == -EAGAIN)
 479                set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags);
 480        return ret;
 481}
 482
 483/*
 484 * Receive a message from an RxRPC socket
 485 * - we need to be careful about two or more threads calling recvmsg
 486 *   simultaneously
 487 */
 488int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 489                  int flags)
 490{
 491        struct rxrpc_call *call;
 492        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
 493        struct list_head *l;
 494        size_t copied = 0;
 495        long timeo;
 496        int ret;
 497
 498        DEFINE_WAIT(wait);
 499
 500        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
 501
 502        if (flags & (MSG_OOB | MSG_TRUNC))
 503                return -EOPNOTSUPP;
 504
 505        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
 506
 507try_again:
 508        lock_sock(&rx->sk);
 509
 510        /* Return immediately if a client socket has no outstanding calls */
 511        if (RB_EMPTY_ROOT(&rx->calls) &&
 512            list_empty(&rx->recvmsg_q) &&
 513            rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
 514                release_sock(&rx->sk);
 515                return -EAGAIN;
 516        }
 517
 518        if (list_empty(&rx->recvmsg_q)) {
 519                ret = -EWOULDBLOCK;
 520                if (timeo == 0) {
 521                        call = NULL;
 522                        goto error_no_call;
 523                }
 524
 525                release_sock(&rx->sk);
 526
 527                /* Wait for something to happen */
 528                prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
 529                                          TASK_INTERRUPTIBLE);
 530                ret = sock_error(&rx->sk);
 531                if (ret)
 532                        goto wait_error;
 533
 534                if (list_empty(&rx->recvmsg_q)) {
 535                        if (signal_pending(current))
 536                                goto wait_interrupted;
 537                        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
 538                                            0, 0, 0, 0);
 539                        timeo = schedule_timeout(timeo);
 540                }
 541                finish_wait(sk_sleep(&rx->sk), &wait);
 542                goto try_again;
 543        }
 544
 545        /* Find the next call and dequeue it if we're not just peeking.  If we
 546         * do dequeue it, that comes with a ref that we will need to release.
 547         */
 548        write_lock_bh(&rx->recvmsg_lock);
 549        l = rx->recvmsg_q.next;
 550        call = list_entry(l, struct rxrpc_call, recvmsg_link);
 551        if (!(flags & MSG_PEEK))
 552                list_del_init(&call->recvmsg_link);
 553        else
 554                rxrpc_get_call(call, rxrpc_call_got);
 555        write_unlock_bh(&rx->recvmsg_lock);
 556
 557        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
 558
 559        /* We're going to drop the socket lock, so we need to lock the call
 560         * against interference by sendmsg.
 561         */
 562        if (!mutex_trylock(&call->user_mutex)) {
 563                ret = -EWOULDBLOCK;
 564                if (flags & MSG_DONTWAIT)
 565                        goto error_requeue_call;
 566                ret = -ERESTARTSYS;
 567                if (mutex_lock_interruptible(&call->user_mutex) < 0)
 568                        goto error_requeue_call;
 569        }
 570
 571        release_sock(&rx->sk);
 572
 573        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 574                BUG();
 575
 576        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 577                if (flags & MSG_CMSG_COMPAT) {
 578                        unsigned int id32 = call->user_call_ID;
 579
 580                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 581                                       sizeof(unsigned int), &id32);
 582                } else {
 583                        unsigned long idl = call->user_call_ID;
 584
 585                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 586                                       sizeof(unsigned long), &idl);
 587                }
 588                if (ret < 0)
 589                        goto error_unlock_call;
 590        }
 591
 592        if (msg->msg_name && call->peer) {
 593                struct sockaddr_rxrpc *srx = msg->msg_name;
 594                size_t len = sizeof(call->peer->srx);
 595
 596                memcpy(msg->msg_name, &call->peer->srx, len);
 597                srx->srx_service = call->service_id;
 598                msg->msg_namelen = len;
 599        }
 600
 601        switch (READ_ONCE(call->state)) {
 602        case RXRPC_CALL_CLIENT_RECV_REPLY:
 603        case RXRPC_CALL_SERVER_RECV_REQUEST:
 604        case RXRPC_CALL_SERVER_ACK_REQUEST:
 605                ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
 606                                         flags, &copied);
 607                if (ret == -EAGAIN)
 608                        ret = 0;
 609
 610                if (after(call->rx_top, call->rx_hard_ack) &&
 611                    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
 612                        rxrpc_notify_socket(call);
 613                break;
 614        default:
 615                ret = 0;
 616                break;
 617        }
 618
 619        if (ret < 0)
 620                goto error_unlock_call;
 621
 622        if (call->state == RXRPC_CALL_COMPLETE) {
 623                ret = rxrpc_recvmsg_term(call, msg);
 624                if (ret < 0)
 625                        goto error_unlock_call;
 626                if (!(flags & MSG_PEEK))
 627                        rxrpc_release_call(rx, call);
 628                msg->msg_flags |= MSG_EOR;
 629                ret = 1;
 630        }
 631
 632        if (ret == 0)
 633                msg->msg_flags |= MSG_MORE;
 634        else
 635                msg->msg_flags &= ~MSG_MORE;
 636        ret = copied;
 637
 638error_unlock_call:
 639        mutex_unlock(&call->user_mutex);
 640        rxrpc_put_call(call, rxrpc_call_put);
 641        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 642        return ret;
 643
 644error_requeue_call:
 645        if (!(flags & MSG_PEEK)) {
 646                write_lock_bh(&rx->recvmsg_lock);
 647                list_add(&call->recvmsg_link, &rx->recvmsg_q);
 648                write_unlock_bh(&rx->recvmsg_lock);
 649                trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
 650        } else {
 651                rxrpc_put_call(call, rxrpc_call_put);
 652        }
 653error_no_call:
 654        release_sock(&rx->sk);
 655error_trace:
 656        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 657        return ret;
 658
 659wait_interrupted:
 660        ret = sock_intr_errno(timeo);
 661wait_error:
 662        finish_wait(sk_sleep(&rx->sk), &wait);
 663        call = NULL;
 664        goto error_trace;
 665}
 666
 667/**
 668 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
 669 * @sock: The socket that the call exists on
 670 * @call: The call to send data through
 671 * @iter: The buffer to receive into
 672 * @_len: The amount of data we want to receive (decreased on return)
 673 * @want_more: True if more data is expected to be read
 674 * @_abort: Where the abort code is stored if -ECONNABORTED is returned
 675 * @_service: Where to store the actual service ID (may be upgraded)
 676 *
 677 * Allow a kernel service to receive data and pick up information about the
 678 * state of a call.  Returns 0 if got what was asked for and there's more
 679 * available, 1 if we got what was asked for and we're at the end of the data
 680 * and -EAGAIN if we need more data.
 681 *
 682 * Note that we may return -EAGAIN to drain empty packets at the end of the
 683 * data, even if we've already copied over the requested data.
 684 *
 685 * *_abort should also be initialised to 0.
 686 */
 687int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 688                           struct iov_iter *iter, size_t *_len,
 689                           bool want_more, u32 *_abort, u16 *_service)
 690{
 691        size_t offset = 0;
 692        int ret;
 693
 694        _enter("{%d,%s},%zu,%d",
 695               call->debug_id, rxrpc_call_states[call->state],
 696               *_len, want_more);
 697
 698        ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING);
 699
 700        mutex_lock(&call->user_mutex);
 701
 702        switch (READ_ONCE(call->state)) {
 703        case RXRPC_CALL_CLIENT_RECV_REPLY:
 704        case RXRPC_CALL_SERVER_RECV_REQUEST:
 705        case RXRPC_CALL_SERVER_ACK_REQUEST:
 706                ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
 707                                         *_len, 0, &offset);
 708                *_len -= offset;
 709                if (ret < 0)
 710                        goto out;
 711
 712                /* We can only reach here with a partially full buffer if we
 713                 * have reached the end of the data.  We must otherwise have a
 714                 * full buffer or have been given -EAGAIN.
 715                 */
 716                if (ret == 1) {
 717                        if (iov_iter_count(iter) > 0)
 718                                goto short_data;
 719                        if (!want_more)
 720                                goto read_phase_complete;
 721                        ret = 0;
 722                        goto out;
 723                }
 724
 725                if (!want_more)
 726                        goto excess_data;
 727                goto out;
 728
 729        case RXRPC_CALL_COMPLETE:
 730                goto call_complete;
 731
 732        default:
 733                ret = -EINPROGRESS;
 734                goto out;
 735        }
 736
 737read_phase_complete:
 738        ret = 1;
 739out:
 740        switch (call->ackr_reason) {
 741        case RXRPC_ACK_IDLE:
 742                break;
 743        case RXRPC_ACK_DELAY:
 744                if (ret != -EAGAIN)
 745                        break;
 746                fallthrough;
 747        default:
 748                rxrpc_send_ack_packet(call, false, NULL);
 749        }
 750
 751        if (_service)
 752                *_service = call->service_id;
 753        mutex_unlock(&call->user_mutex);
 754        _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
 755        return ret;
 756
 757short_data:
 758        trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
 759        ret = -EBADMSG;
 760        goto out;
 761excess_data:
 762        trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
 763        ret = -EMSGSIZE;
 764        goto out;
 765call_complete:
 766        *_abort = call->abort_code;
 767        ret = call->error;
 768        if (call->completion == RXRPC_CALL_SUCCEEDED) {
 769                ret = 1;
 770                if (iov_iter_count(iter) > 0)
 771                        ret = -ECONNRESET;
 772        }
 773        goto out;
 774}
 775EXPORT_SYMBOL(rxrpc_kernel_recv_data);
 776
 777/**
 778 * rxrpc_kernel_get_reply_time - Get timestamp on first reply packet
 779 * @sock: The socket that the call exists on
 780 * @call: The call to query
 781 * @_ts: Where to put the timestamp
 782 *
 783 * Retrieve the timestamp from the first DATA packet of the reply if it is
 784 * in the ring.  Returns true if successful, false if not.
 785 */
 786bool rxrpc_kernel_get_reply_time(struct socket *sock, struct rxrpc_call *call,
 787                                 ktime_t *_ts)
 788{
 789        struct sk_buff *skb;
 790        rxrpc_seq_t hard_ack, top, seq;
 791        bool success = false;
 792
 793        mutex_lock(&call->user_mutex);
 794
 795        if (READ_ONCE(call->state) != RXRPC_CALL_CLIENT_RECV_REPLY)
 796                goto out;
 797
 798        hard_ack = call->rx_hard_ack;
 799        if (hard_ack != 0)
 800                goto out;
 801
 802        seq = hard_ack + 1;
 803        top = smp_load_acquire(&call->rx_top);
 804        if (after(seq, top))
 805                goto out;
 806
 807        skb = call->rxtx_buffer[seq & RXRPC_RXTX_BUFF_MASK];
 808        if (!skb)
 809                goto out;
 810
 811        *_ts = skb_get_ktime(skb);
 812        success = true;
 813
 814out:
 815        mutex_unlock(&call->user_mutex);
 816        return success;
 817}
 818EXPORT_SYMBOL(rxrpc_kernel_get_reply_time);
 819