linux/net/rxrpc/ar-recvmsg.c
<<
>>
Prefs
   1/* RxRPC recvmsg() implementation
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/net.h>
  13#include <linux/skbuff.h>
  14#include <linux/export.h>
  15#include <net/sock.h>
  16#include <net/af_rxrpc.h>
  17#include "ar-internal.h"
  18
  19/*
  20 * removal a call's user ID from the socket tree to make the user ID available
  21 * again and so that it won't be seen again in association with that call
  22 */
  23void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
  24{
  25        _debug("RELEASE CALL %d", call->debug_id);
  26
  27        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
  28                write_lock_bh(&rx->call_lock);
  29                rb_erase(&call->sock_node, &call->socket->calls);
  30                clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
  31                write_unlock_bh(&rx->call_lock);
  32        }
  33
  34        read_lock_bh(&call->state_lock);
  35        if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
  36            !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
  37                rxrpc_queue_call(call);
  38        read_unlock_bh(&call->state_lock);
  39}
  40
  41/*
  42 * receive a message from an RxRPC socket
  43 * - we need to be careful about two or more threads calling recvmsg
  44 *   simultaneously
  45 */
  46int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
  47                  int flags)
  48{
  49        struct rxrpc_skb_priv *sp;
  50        struct rxrpc_call *call = NULL, *continue_call = NULL;
  51        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
  52        struct sk_buff *skb;
  53        long timeo;
  54        int copy, ret, ullen, offset, copied = 0;
  55        u32 abort_code;
  56
  57        DEFINE_WAIT(wait);
  58
  59        _enter(",,,%zu,%d", len, flags);
  60
  61        if (flags & (MSG_OOB | MSG_TRUNC))
  62                return -EOPNOTSUPP;
  63
  64        ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long);
  65
  66        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
  67        msg->msg_flags |= MSG_MORE;
  68
  69        lock_sock(&rx->sk);
  70
  71        for (;;) {
  72                /* return immediately if a client socket has no outstanding
  73                 * calls */
  74                if (RB_EMPTY_ROOT(&rx->calls)) {
  75                        if (copied)
  76                                goto out;
  77                        if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
  78                                release_sock(&rx->sk);
  79                                if (continue_call)
  80                                        rxrpc_put_call(continue_call);
  81                                return -ENODATA;
  82                        }
  83                }
  84
  85                /* get the next message on the Rx queue */
  86                skb = skb_peek(&rx->sk.sk_receive_queue);
  87                if (!skb) {
  88                        /* nothing remains on the queue */
  89                        if (copied &&
  90                            (flags & MSG_PEEK || timeo == 0))
  91                                goto out;
  92
  93                        /* wait for a message to turn up */
  94                        release_sock(&rx->sk);
  95                        prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
  96                                                  TASK_INTERRUPTIBLE);
  97                        ret = sock_error(&rx->sk);
  98                        if (ret)
  99                                goto wait_error;
 100
 101                        if (skb_queue_empty(&rx->sk.sk_receive_queue)) {
 102                                if (signal_pending(current))
 103                                        goto wait_interrupted;
 104                                timeo = schedule_timeout(timeo);
 105                        }
 106                        finish_wait(sk_sleep(&rx->sk), &wait);
 107                        lock_sock(&rx->sk);
 108                        continue;
 109                }
 110
 111        peek_next_packet:
 112                sp = rxrpc_skb(skb);
 113                call = sp->call;
 114                ASSERT(call != NULL);
 115
 116                _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
 117
 118                /* make sure we wait for the state to be updated in this call */
 119                spin_lock_bh(&call->lock);
 120                spin_unlock_bh(&call->lock);
 121
 122                if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
 123                        _debug("packet from released call");
 124                        if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 125                                BUG();
 126                        rxrpc_free_skb(skb);
 127                        continue;
 128                }
 129
 130                /* determine whether to continue last data receive */
 131                if (continue_call) {
 132                        _debug("maybe cont");
 133                        if (call != continue_call ||
 134                            skb->mark != RXRPC_SKB_MARK_DATA) {
 135                                release_sock(&rx->sk);
 136                                rxrpc_put_call(continue_call);
 137                                _leave(" = %d [noncont]", copied);
 138                                return copied;
 139                        }
 140                }
 141
 142                rxrpc_get_call(call);
 143
 144                /* copy the peer address and timestamp */
 145                if (!continue_call) {
 146                        if (msg->msg_name) {
 147                                size_t len =
 148                                        sizeof(call->conn->trans->peer->srx);
 149                                memcpy(msg->msg_name,
 150                                       &call->conn->trans->peer->srx, len);
 151                                msg->msg_namelen = len;
 152                        }
 153                        sock_recv_timestamp(msg, &rx->sk, skb);
 154                }
 155
 156                /* receive the message */
 157                if (skb->mark != RXRPC_SKB_MARK_DATA)
 158                        goto receive_non_data_message;
 159
 160                _debug("recvmsg DATA #%u { %d, %d }",
 161                       ntohl(sp->hdr.seq), skb->len, sp->offset);
 162
 163                if (!continue_call) {
 164                        /* only set the control data once per recvmsg() */
 165                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 166                                       ullen, &call->user_call_ID);
 167                        if (ret < 0)
 168                                goto copy_error;
 169                        ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
 170                }
 171
 172                ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
 173                ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
 174                call->rx_data_recv = ntohl(sp->hdr.seq);
 175
 176                ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
 177
 178                offset = sp->offset;
 179                copy = skb->len - offset;
 180                if (copy > len - copied)
 181                        copy = len - copied;
 182
 183                ret = skb_copy_datagram_msg(skb, offset, msg, copy);
 184
 185                if (ret < 0)
 186                        goto copy_error;
 187
 188                /* handle piecemeal consumption of data packets */
 189                _debug("copied %d+%d", copy, copied);
 190
 191                offset += copy;
 192                copied += copy;
 193
 194                if (!(flags & MSG_PEEK))
 195                        sp->offset = offset;
 196
 197                if (sp->offset < skb->len) {
 198                        _debug("buffer full");
 199                        ASSERTCMP(copied, ==, len);
 200                        break;
 201                }
 202
 203                /* we transferred the whole data packet */
 204                if (sp->hdr.flags & RXRPC_LAST_PACKET) {
 205                        _debug("last");
 206                        if (call->conn->out_clientflag) {
 207                                 /* last byte of reply received */
 208                                ret = copied;
 209                                goto terminal_message;
 210                        }
 211
 212                        /* last bit of request received */
 213                        if (!(flags & MSG_PEEK)) {
 214                                _debug("eat packet");
 215                                if (skb_dequeue(&rx->sk.sk_receive_queue) !=
 216                                    skb)
 217                                        BUG();
 218                                rxrpc_free_skb(skb);
 219                        }
 220                        msg->msg_flags &= ~MSG_MORE;
 221                        break;
 222                }
 223
 224                /* move on to the next data message */
 225                _debug("next");
 226                if (!continue_call)
 227                        continue_call = sp->call;
 228                else
 229                        rxrpc_put_call(call);
 230                call = NULL;
 231
 232                if (flags & MSG_PEEK) {
 233                        _debug("peek next");
 234                        skb = skb->next;
 235                        if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue)
 236                                break;
 237                        goto peek_next_packet;
 238                }
 239
 240                _debug("eat packet");
 241                if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 242                        BUG();
 243                rxrpc_free_skb(skb);
 244        }
 245
 246        /* end of non-terminal data packet reception for the moment */
 247        _debug("end rcv data");
 248out:
 249        release_sock(&rx->sk);
 250        if (call)
 251                rxrpc_put_call(call);
 252        if (continue_call)
 253                rxrpc_put_call(continue_call);
 254        _leave(" = %d [data]", copied);
 255        return copied;
 256
 257        /* handle non-DATA messages such as aborts, incoming connections and
 258         * final ACKs */
 259receive_non_data_message:
 260        _debug("non-data");
 261
 262        if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) {
 263                _debug("RECV NEW CALL");
 264                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code);
 265                if (ret < 0)
 266                        goto copy_error;
 267                if (!(flags & MSG_PEEK)) {
 268                        if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 269                                BUG();
 270                        rxrpc_free_skb(skb);
 271                }
 272                goto out;
 273        }
 274
 275        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 276                       ullen, &call->user_call_ID);
 277        if (ret < 0)
 278                goto copy_error;
 279        ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
 280
 281        switch (skb->mark) {
 282        case RXRPC_SKB_MARK_DATA:
 283                BUG();
 284        case RXRPC_SKB_MARK_FINAL_ACK:
 285                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code);
 286                break;
 287        case RXRPC_SKB_MARK_BUSY:
 288                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
 289                break;
 290        case RXRPC_SKB_MARK_REMOTE_ABORT:
 291                abort_code = call->abort_code;
 292                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
 293                break;
 294        case RXRPC_SKB_MARK_NET_ERROR:
 295                _debug("RECV NET ERROR %d", sp->error);
 296                abort_code = sp->error;
 297                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code);
 298                break;
 299        case RXRPC_SKB_MARK_LOCAL_ERROR:
 300                _debug("RECV LOCAL ERROR %d", sp->error);
 301                abort_code = sp->error;
 302                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
 303                               &abort_code);
 304                break;
 305        default:
 306                BUG();
 307                break;
 308        }
 309
 310        if (ret < 0)
 311                goto copy_error;
 312
 313terminal_message:
 314        _debug("terminal");
 315        msg->msg_flags &= ~MSG_MORE;
 316        msg->msg_flags |= MSG_EOR;
 317
 318        if (!(flags & MSG_PEEK)) {
 319                _net("free terminal skb %p", skb);
 320                if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 321                        BUG();
 322                rxrpc_free_skb(skb);
 323                rxrpc_remove_user_ID(rx, call);
 324        }
 325
 326        release_sock(&rx->sk);
 327        rxrpc_put_call(call);
 328        if (continue_call)
 329                rxrpc_put_call(continue_call);
 330        _leave(" = %d", ret);
 331        return ret;
 332
 333copy_error:
 334        _debug("copy error");
 335        release_sock(&rx->sk);
 336        rxrpc_put_call(call);
 337        if (continue_call)
 338                rxrpc_put_call(continue_call);
 339        _leave(" = %d", ret);
 340        return ret;
 341
 342wait_interrupted:
 343        ret = sock_intr_errno(timeo);
 344wait_error:
 345        finish_wait(sk_sleep(&rx->sk), &wait);
 346        if (continue_call)
 347                rxrpc_put_call(continue_call);
 348        if (copied)
 349                copied = ret;
 350        _leave(" = %d [waitfail %d]", copied, ret);
 351        return copied;
 352
 353}
 354
 355/**
 356 * rxrpc_kernel_data_delivered - Record delivery of data message
 357 * @skb: Message holding data
 358 *
 359 * Record the delivery of a data message.  This permits RxRPC to keep its
 360 * tracking correct.  The socket buffer will be deleted.
 361 */
 362void rxrpc_kernel_data_delivered(struct sk_buff *skb)
 363{
 364        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 365        struct rxrpc_call *call = sp->call;
 366
 367        ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
 368        ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
 369        call->rx_data_recv = ntohl(sp->hdr.seq);
 370
 371        ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
 372        rxrpc_free_skb(skb);
 373}
 374
 375EXPORT_SYMBOL(rxrpc_kernel_data_delivered);
 376
 377/**
 378 * rxrpc_kernel_is_data_last - Determine if data message is last one
 379 * @skb: Message holding data
 380 *
 381 * Determine if data message is last one for the parent call.
 382 */
 383bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
 384{
 385        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 386
 387        ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
 388
 389        return sp->hdr.flags & RXRPC_LAST_PACKET;
 390}
 391
 392EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
 393
 394/**
 395 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
 396 * @skb: Message indicating an abort
 397 *
 398 * Get the abort code from an RxRPC abort message.
 399 */
 400u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
 401{
 402        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 403
 404        ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT);
 405
 406        return sp->call->abort_code;
 407}
 408
 409EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
 410
 411/**
 412 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
 413 * @skb: Message indicating an error
 414 *
 415 * Get the error number from an RxRPC error message.
 416 */
 417int rxrpc_kernel_get_error_number(struct sk_buff *skb)
 418{
 419        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 420
 421        return sp->error;
 422}
 423
 424EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
 425