linux/net/rxrpc/recvmsg.c
<<
>>
Prefs
   1/* RxRPC recvmsg() implementation
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  13
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/export.h>
  17#include <linux/sched/signal.h>
  18
  19#include <net/sock.h>
  20#include <net/af_rxrpc.h>
  21#include "ar-internal.h"
  22
  23/*
  24 * Post a call for attention by the socket or kernel service.  Further
  25 * notifications are suppressed by putting recvmsg_link on a dummy queue.
  26 */
  27void rxrpc_notify_socket(struct rxrpc_call *call)
  28{
  29        struct rxrpc_sock *rx;
  30        struct sock *sk;
  31
  32        _enter("%d", call->debug_id);
  33
  34        if (!list_empty(&call->recvmsg_link))
  35                return;
  36
  37        rcu_read_lock();
  38
  39        rx = rcu_dereference(call->socket);
  40        sk = &rx->sk;
  41        if (rx && sk->sk_state < RXRPC_CLOSE) {
  42                if (call->notify_rx) {
  43                        call->notify_rx(sk, call, call->user_call_ID);
  44                } else {
  45                        write_lock_bh(&rx->recvmsg_lock);
  46                        if (list_empty(&call->recvmsg_link)) {
  47                                rxrpc_get_call(call, rxrpc_call_got);
  48                                list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
  49                        }
  50                        write_unlock_bh(&rx->recvmsg_lock);
  51
  52                        if (!sock_flag(sk, SOCK_DEAD)) {
  53                                _debug("call %ps", sk->sk_data_ready);
  54                                sk->sk_data_ready(sk);
  55                        }
  56                }
  57        }
  58
  59        rcu_read_unlock();
  60        _leave("");
  61}
  62
  63/*
  64 * Pass a call terminating message to userspace.
  65 */
  66static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
  67{
  68        u32 tmp = 0;
  69        int ret;
  70
  71        switch (call->completion) {
  72        case RXRPC_CALL_SUCCEEDED:
  73                ret = 0;
  74                if (rxrpc_is_service_call(call))
  75                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
  76                break;
  77        case RXRPC_CALL_REMOTELY_ABORTED:
  78                tmp = call->abort_code;
  79                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  80                break;
  81        case RXRPC_CALL_LOCALLY_ABORTED:
  82                tmp = call->abort_code;
  83                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  84                break;
  85        case RXRPC_CALL_NETWORK_ERROR:
  86                tmp = call->error;
  87                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
  88                break;
  89        case RXRPC_CALL_LOCAL_ERROR:
  90                tmp = call->error;
  91                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
  92                break;
  93        default:
  94                pr_err("Invalid terminal call state %u\n", call->state);
  95                BUG();
  96                break;
  97        }
  98
  99        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
 100                            call->rx_pkt_offset, call->rx_pkt_len, ret);
 101        return ret;
 102}
 103
 104/*
 105 * Pass back notification of a new call.  The call is added to the
 106 * to-be-accepted list.  This means that the next call to be accepted might not
 107 * be the last call seen awaiting acceptance, but unless we leave this on the
 108 * front of the queue and block all other messages until someone gives us a
 109 * user_ID for it, there's not a lot we can do.
 110 */
 111static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
 112                                  struct rxrpc_call *call,
 113                                  struct msghdr *msg, int flags)
 114{
 115        int tmp = 0, ret;
 116
 117        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
 118
 119        if (ret == 0 && !(flags & MSG_PEEK)) {
 120                _debug("to be accepted");
 121                write_lock_bh(&rx->recvmsg_lock);
 122                list_del_init(&call->recvmsg_link);
 123                write_unlock_bh(&rx->recvmsg_lock);
 124
 125                rxrpc_get_call(call, rxrpc_call_got);
 126                write_lock(&rx->call_lock);
 127                list_add_tail(&call->accept_link, &rx->to_be_accepted);
 128                write_unlock(&rx->call_lock);
 129        }
 130
 131        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_to_be_accepted, 1, 0, 0, ret);
 132        return ret;
 133}
 134
 135/*
 136 * End the packet reception phase.
 137 */
 138static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 139{
 140        _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
 141
 142        trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
 143        ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
 144
 145        if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
 146                rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, true, false,
 147                                  rxrpc_propose_ack_terminal_ack);
 148                rxrpc_send_ack_packet(call, false);
 149        }
 150
 151        write_lock_bh(&call->state_lock);
 152
 153        switch (call->state) {
 154        case RXRPC_CALL_CLIENT_RECV_REPLY:
 155                __rxrpc_call_completed(call);
 156                write_unlock_bh(&call->state_lock);
 157                break;
 158
 159        case RXRPC_CALL_SERVER_RECV_REQUEST:
 160                call->tx_phase = true;
 161                call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
 162                call->ack_at = call->expire_at;
 163                write_unlock_bh(&call->state_lock);
 164                rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial, false, true,
 165                                  rxrpc_propose_ack_processing_op);
 166                break;
 167        default:
 168                write_unlock_bh(&call->state_lock);
 169                break;
 170        }
 171}
 172
 173/*
 174 * Discard a packet we've used up and advance the Rx window by one.
 175 */
 176static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
 177{
 178        struct rxrpc_skb_priv *sp;
 179        struct sk_buff *skb;
 180        rxrpc_serial_t serial;
 181        rxrpc_seq_t hard_ack, top;
 182        u8 flags;
 183        int ix;
 184
 185        _enter("%d", call->debug_id);
 186
 187        hard_ack = call->rx_hard_ack;
 188        top = smp_load_acquire(&call->rx_top);
 189        ASSERT(before(hard_ack, top));
 190
 191        hard_ack++;
 192        ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
 193        skb = call->rxtx_buffer[ix];
 194        rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
 195        sp = rxrpc_skb(skb);
 196        flags = sp->hdr.flags;
 197        serial = sp->hdr.serial;
 198        if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO)
 199                serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1;
 200
 201        call->rxtx_buffer[ix] = NULL;
 202        call->rxtx_annotations[ix] = 0;
 203        /* Barrier against rxrpc_input_data(). */
 204        smp_store_release(&call->rx_hard_ack, hard_ack);
 205
 206        rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 207
 208        _debug("%u,%u,%02x", hard_ack, top, flags);
 209        trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
 210        if (flags & RXRPC_LAST_PACKET) {
 211                rxrpc_end_rx_phase(call, serial);
 212        } else {
 213                /* Check to see if there's an ACK that needs sending. */
 214                if (after_eq(hard_ack, call->ackr_consumed + 2) ||
 215                    after_eq(top, call->ackr_seen + 2) ||
 216                    (hard_ack == top && after(hard_ack, call->ackr_consumed)))
 217                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial,
 218                                          true, false,
 219                                          rxrpc_propose_ack_rotate_rx);
 220                if (call->ackr_reason)
 221                        rxrpc_send_ack_packet(call, false);
 222        }
 223}
 224
 225/*
 226 * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
 227 * padding, but if this is the case, the packet length will be resident in the
 228 * socket buffer.  Note that we can't modify the master skb info as the skb may
 229 * be the home to multiple subpackets.
 230 */
 231static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
 232                               u8 annotation,
 233                               unsigned int offset, unsigned int len)
 234{
 235        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 236        rxrpc_seq_t seq = sp->hdr.seq;
 237        u16 cksum = sp->hdr.cksum;
 238
 239        _enter("");
 240
 241        /* For all but the head jumbo subpacket, the security checksum is in a
 242         * jumbo header immediately prior to the data.
 243         */
 244        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) {
 245                __be16 tmp;
 246                if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
 247                        BUG();
 248                cksum = ntohs(tmp);
 249                seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1;
 250        }
 251
 252        return call->conn->security->verify_packet(call, skb, offset, len,
 253                                                   seq, cksum);
 254}
 255
 256/*
 257 * Locate the data within a packet.  This is complicated by:
 258 *
 259 * (1) An skb may contain a jumbo packet - so we have to find the appropriate
 260 *     subpacket.
 261 *
 262 * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
 263 *     contains an extra header which includes the true length of the data,
 264 *     excluding any encrypted padding.
 265 */
 266static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
 267                             u8 *_annotation,
 268                             unsigned int *_offset, unsigned int *_len)
 269{
 270        unsigned int offset = sizeof(struct rxrpc_wire_header);
 271        unsigned int len = *_len;
 272        int ret;
 273        u8 annotation = *_annotation;
 274
 275        /* Locate the subpacket */
 276        len = skb->len - offset;
 277        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) {
 278                offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) *
 279                           RXRPC_JUMBO_SUBPKTLEN);
 280                len = (annotation & RXRPC_RX_ANNO_JLAST) ?
 281                        skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
 282        }
 283
 284        if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
 285                ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
 286                if (ret < 0)
 287                        return ret;
 288                *_annotation |= RXRPC_RX_ANNO_VERIFIED;
 289        }
 290
 291        *_offset = offset;
 292        *_len = len;
 293        call->conn->security->locate_data(call, skb, _offset, _len);
 294        return 0;
 295}
 296
 297/*
 298 * Deliver messages to a call.  This keeps processing packets until the buffer
 299 * is filled and we find either more DATA (returns 0) or the end of the DATA
 300 * (returns 1).  If more packets are required, it returns -EAGAIN.
 301 */
 302static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 303                              struct msghdr *msg, struct iov_iter *iter,
 304                              size_t len, int flags, size_t *_offset)
 305{
 306        struct rxrpc_skb_priv *sp;
 307        struct sk_buff *skb;
 308        rxrpc_seq_t hard_ack, top, seq;
 309        size_t remain;
 310        bool last;
 311        unsigned int rx_pkt_offset, rx_pkt_len;
 312        int ix, copy, ret = -EAGAIN, ret2;
 313
 314        rx_pkt_offset = call->rx_pkt_offset;
 315        rx_pkt_len = call->rx_pkt_len;
 316
 317        if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
 318                seq = call->rx_hard_ack;
 319                ret = 1;
 320                goto done;
 321        }
 322
 323        /* Barriers against rxrpc_input_data(). */
 324        hard_ack = call->rx_hard_ack;
 325        seq = hard_ack + 1;
 326        while (top = smp_load_acquire(&call->rx_top),
 327               before_eq(seq, top)
 328               ) {
 329                ix = seq & RXRPC_RXTX_BUFF_MASK;
 330                skb = call->rxtx_buffer[ix];
 331                if (!skb) {
 332                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
 333                                            rx_pkt_offset, rx_pkt_len, 0);
 334                        break;
 335                }
 336                smp_rmb();
 337                rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 338                sp = rxrpc_skb(skb);
 339
 340                if (!(flags & MSG_PEEK))
 341                        trace_rxrpc_receive(call, rxrpc_receive_front,
 342                                            sp->hdr.serial, seq);
 343
 344                if (msg)
 345                        sock_recv_timestamp(msg, sock->sk, skb);
 346
 347                if (rx_pkt_offset == 0) {
 348                        ret2 = rxrpc_locate_data(call, skb,
 349                                                 &call->rxtx_annotations[ix],
 350                                                 &rx_pkt_offset, &rx_pkt_len);
 351                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
 352                                            rx_pkt_offset, rx_pkt_len, ret2);
 353                        if (ret2 < 0) {
 354                                ret = ret2;
 355                                goto out;
 356                        }
 357                } else {
 358                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
 359                                            rx_pkt_offset, rx_pkt_len, 0);
 360                }
 361
 362                /* We have to handle short, empty and used-up DATA packets. */
 363                remain = len - *_offset;
 364                copy = rx_pkt_len;
 365                if (copy > remain)
 366                        copy = remain;
 367                if (copy > 0) {
 368                        ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
 369                                                      copy);
 370                        if (ret2 < 0) {
 371                                ret = ret2;
 372                                goto out;
 373                        }
 374
 375                        /* handle piecemeal consumption of data packets */
 376                        rx_pkt_offset += copy;
 377                        rx_pkt_len -= copy;
 378                        *_offset += copy;
 379                }
 380
 381                if (rx_pkt_len > 0) {
 382                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
 383                                            rx_pkt_offset, rx_pkt_len, 0);
 384                        ASSERTCMP(*_offset, ==, len);
 385                        ret = 0;
 386                        break;
 387                }
 388
 389                /* The whole packet has been transferred. */
 390                last = sp->hdr.flags & RXRPC_LAST_PACKET;
 391                if (!(flags & MSG_PEEK))
 392                        rxrpc_rotate_rx_window(call);
 393                rx_pkt_offset = 0;
 394                rx_pkt_len = 0;
 395
 396                if (last) {
 397                        ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
 398                        ret = 1;
 399                        goto out;
 400                }
 401
 402                seq++;
 403        }
 404
 405out:
 406        if (!(flags & MSG_PEEK)) {
 407                call->rx_pkt_offset = rx_pkt_offset;
 408                call->rx_pkt_len = rx_pkt_len;
 409        }
 410done:
 411        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
 412                            rx_pkt_offset, rx_pkt_len, ret);
 413        return ret;
 414}
 415
 416/*
 417 * Receive a message from an RxRPC socket
 418 * - we need to be careful about two or more threads calling recvmsg
 419 *   simultaneously
 420 */
 421int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 422                  int flags)
 423{
 424        struct rxrpc_call *call;
 425        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
 426        struct list_head *l;
 427        size_t copied = 0;
 428        long timeo;
 429        int ret;
 430
 431        DEFINE_WAIT(wait);
 432
 433        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
 434
 435        if (flags & (MSG_OOB | MSG_TRUNC))
 436                return -EOPNOTSUPP;
 437
 438        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
 439
 440try_again:
 441        lock_sock(&rx->sk);
 442
 443        /* Return immediately if a client socket has no outstanding calls */
 444        if (RB_EMPTY_ROOT(&rx->calls) &&
 445            list_empty(&rx->recvmsg_q) &&
 446            rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
 447                release_sock(&rx->sk);
 448                return -ENODATA;
 449        }
 450
 451        if (list_empty(&rx->recvmsg_q)) {
 452                ret = -EWOULDBLOCK;
 453                if (timeo == 0) {
 454                        call = NULL;
 455                        goto error_no_call;
 456                }
 457
 458                release_sock(&rx->sk);
 459
 460                /* Wait for something to happen */
 461                prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
 462                                          TASK_INTERRUPTIBLE);
 463                ret = sock_error(&rx->sk);
 464                if (ret)
 465                        goto wait_error;
 466
 467                if (list_empty(&rx->recvmsg_q)) {
 468                        if (signal_pending(current))
 469                                goto wait_interrupted;
 470                        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
 471                                            0, 0, 0, 0);
 472                        timeo = schedule_timeout(timeo);
 473                }
 474                finish_wait(sk_sleep(&rx->sk), &wait);
 475                goto try_again;
 476        }
 477
 478        /* Find the next call and dequeue it if we're not just peeking.  If we
 479         * do dequeue it, that comes with a ref that we will need to release.
 480         */
 481        write_lock_bh(&rx->recvmsg_lock);
 482        l = rx->recvmsg_q.next;
 483        call = list_entry(l, struct rxrpc_call, recvmsg_link);
 484        if (!(flags & MSG_PEEK))
 485                list_del_init(&call->recvmsg_link);
 486        else
 487                rxrpc_get_call(call, rxrpc_call_got);
 488        write_unlock_bh(&rx->recvmsg_lock);
 489
 490        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
 491
 492        /* We're going to drop the socket lock, so we need to lock the call
 493         * against interference by sendmsg.
 494         */
 495        if (!mutex_trylock(&call->user_mutex)) {
 496                ret = -EWOULDBLOCK;
 497                if (flags & MSG_DONTWAIT)
 498                        goto error_requeue_call;
 499                ret = -ERESTARTSYS;
 500                if (mutex_lock_interruptible(&call->user_mutex) < 0)
 501                        goto error_requeue_call;
 502        }
 503
 504        release_sock(&rx->sk);
 505
 506        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 507                BUG();
 508
 509        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 510                if (flags & MSG_CMSG_COMPAT) {
 511                        unsigned int id32 = call->user_call_ID;
 512
 513                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 514                                       sizeof(unsigned int), &id32);
 515                } else {
 516                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 517                                       sizeof(unsigned long),
 518                                       &call->user_call_ID);
 519                }
 520                if (ret < 0)
 521                        goto error_unlock_call;
 522        }
 523
 524        if (msg->msg_name) {
 525                size_t len = sizeof(call->conn->params.peer->srx);
 526                memcpy(msg->msg_name, &call->conn->params.peer->srx, len);
 527                msg->msg_namelen = len;
 528        }
 529
 530        switch (READ_ONCE(call->state)) {
 531        case RXRPC_CALL_SERVER_ACCEPTING:
 532                ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
 533                break;
 534        case RXRPC_CALL_CLIENT_RECV_REPLY:
 535        case RXRPC_CALL_SERVER_RECV_REQUEST:
 536        case RXRPC_CALL_SERVER_ACK_REQUEST:
 537                ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
 538                                         flags, &copied);
 539                if (ret == -EAGAIN)
 540                        ret = 0;
 541
 542                if (after(call->rx_top, call->rx_hard_ack) &&
 543                    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
 544                        rxrpc_notify_socket(call);
 545                break;
 546        default:
 547                ret = 0;
 548                break;
 549        }
 550
 551        if (ret < 0)
 552                goto error_unlock_call;
 553
 554        if (call->state == RXRPC_CALL_COMPLETE) {
 555                ret = rxrpc_recvmsg_term(call, msg);
 556                if (ret < 0)
 557                        goto error_unlock_call;
 558                if (!(flags & MSG_PEEK))
 559                        rxrpc_release_call(rx, call);
 560                msg->msg_flags |= MSG_EOR;
 561                ret = 1;
 562        }
 563
 564        if (ret == 0)
 565                msg->msg_flags |= MSG_MORE;
 566        else
 567                msg->msg_flags &= ~MSG_MORE;
 568        ret = copied;
 569
 570error_unlock_call:
 571        mutex_unlock(&call->user_mutex);
 572        rxrpc_put_call(call, rxrpc_call_put);
 573        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 574        return ret;
 575
 576error_requeue_call:
 577        if (!(flags & MSG_PEEK)) {
 578                write_lock_bh(&rx->recvmsg_lock);
 579                list_add(&call->recvmsg_link, &rx->recvmsg_q);
 580                write_unlock_bh(&rx->recvmsg_lock);
 581                trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
 582        } else {
 583                rxrpc_put_call(call, rxrpc_call_put);
 584        }
 585error_no_call:
 586        release_sock(&rx->sk);
 587        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 588        return ret;
 589
 590wait_interrupted:
 591        ret = sock_intr_errno(timeo);
 592wait_error:
 593        finish_wait(sk_sleep(&rx->sk), &wait);
 594        call = NULL;
 595        goto error_no_call;
 596}
 597
 598/**
 599 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
 600 * @sock: The socket that the call exists on
 601 * @call: The call to send data through
 602 * @buf: The buffer to receive into
 603 * @size: The size of the buffer, including data already read
 604 * @_offset: The running offset into the buffer.
 605 * @want_more: True if more data is expected to be read
 606 * @_abort: Where the abort code is stored if -ECONNABORTED is returned
 607 *
 608 * Allow a kernel service to receive data and pick up information about the
 609 * state of a call.  Returns 0 if got what was asked for and there's more
 610 * available, 1 if we got what was asked for and we're at the end of the data
 611 * and -EAGAIN if we need more data.
 612 *
 613 * Note that we may return -EAGAIN to drain empty packets at the end of the
 614 * data, even if we've already copied over the requested data.
 615 *
 616 * This function adds the amount it transfers to *_offset, so this should be
 617 * precleared as appropriate.  Note that the amount remaining in the buffer is
 618 * taken to be size - *_offset.
 619 *
 620 * *_abort should also be initialised to 0.
 621 */
 622int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 623                           void *buf, size_t size, size_t *_offset,
 624                           bool want_more, u32 *_abort)
 625{
 626        struct iov_iter iter;
 627        struct kvec iov;
 628        int ret;
 629
 630        _enter("{%d,%s},%zu/%zu,%d",
 631               call->debug_id, rxrpc_call_states[call->state],
 632               *_offset, size, want_more);
 633
 634        ASSERTCMP(*_offset, <=, size);
 635        ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
 636
 637        iov.iov_base = buf + *_offset;
 638        iov.iov_len = size - *_offset;
 639        iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
 640
 641        mutex_lock(&call->user_mutex);
 642
 643        switch (READ_ONCE(call->state)) {
 644        case RXRPC_CALL_CLIENT_RECV_REPLY:
 645        case RXRPC_CALL_SERVER_RECV_REQUEST:
 646        case RXRPC_CALL_SERVER_ACK_REQUEST:
 647                ret = rxrpc_recvmsg_data(sock, call, NULL, &iter, size, 0,
 648                                         _offset);
 649                if (ret < 0)
 650                        goto out;
 651
 652                /* We can only reach here with a partially full buffer if we
 653                 * have reached the end of the data.  We must otherwise have a
 654                 * full buffer or have been given -EAGAIN.
 655                 */
 656                if (ret == 1) {
 657                        if (*_offset < size)
 658                                goto short_data;
 659                        if (!want_more)
 660                                goto read_phase_complete;
 661                        ret = 0;
 662                        goto out;
 663                }
 664
 665                if (!want_more)
 666                        goto excess_data;
 667                goto out;
 668
 669        case RXRPC_CALL_COMPLETE:
 670                goto call_complete;
 671
 672        default:
 673                ret = -EINPROGRESS;
 674                goto out;
 675        }
 676
 677read_phase_complete:
 678        ret = 1;
 679out:
 680        mutex_unlock(&call->user_mutex);
 681        _leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
 682        return ret;
 683
 684short_data:
 685        ret = -EBADMSG;
 686        goto out;
 687excess_data:
 688        ret = -EMSGSIZE;
 689        goto out;
 690call_complete:
 691        *_abort = call->abort_code;
 692        ret = -call->error;
 693        if (call->completion == RXRPC_CALL_SUCCEEDED) {
 694                ret = 1;
 695                if (size > 0)
 696                        ret = -ECONNRESET;
 697        }
 698        goto out;
 699}
 700EXPORT_SYMBOL(rxrpc_kernel_recv_data);
 701