linux/fs/afs/rxrpc.c
<<
>>
Prefs
   1/* Maintain an RxRPC server socket to do AFS communications through
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/slab.h>
  13#include <linux/sched/signal.h>
  14
  15#include <net/sock.h>
  16#include <net/af_rxrpc.h>
  17#include "internal.h"
  18#include "afs_cm.h"
  19#include "protocol_yfs.h"
  20
  21struct workqueue_struct *afs_async_calls;
  22
  23static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
  24static long afs_wait_for_call_to_complete(struct afs_call *, struct afs_addr_cursor *);
  25static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
  26static void afs_delete_async_call(struct work_struct *);
  27static void afs_process_async_call(struct work_struct *);
  28static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
  29static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
  30static int afs_deliver_cm_op_id(struct afs_call *);
  31
  32/* asynchronous incoming call initial processing */
  33static const struct afs_call_type afs_RXCMxxxx = {
  34        .name           = "CB.xxxx",
  35        .deliver        = afs_deliver_cm_op_id,
  36};
  37
  38/*
  39 * open an RxRPC socket and bind it to be a server for callback notifications
  40 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
  41 */
  42int afs_open_socket(struct afs_net *net)
  43{
  44        struct sockaddr_rxrpc srx;
  45        struct socket *socket;
  46        unsigned int min_level;
  47        int ret;
  48
  49        _enter("");
  50
  51        ret = sock_create_kern(net->net, AF_RXRPC, SOCK_DGRAM, PF_INET6, &socket);
  52        if (ret < 0)
  53                goto error_1;
  54
  55        socket->sk->sk_allocation = GFP_NOFS;
  56
  57        /* bind the callback manager's address to make this a server socket */
  58        memset(&srx, 0, sizeof(srx));
  59        srx.srx_family                  = AF_RXRPC;
  60        srx.srx_service                 = CM_SERVICE;
  61        srx.transport_type              = SOCK_DGRAM;
  62        srx.transport_len               = sizeof(srx.transport.sin6);
  63        srx.transport.sin6.sin6_family  = AF_INET6;
  64        srx.transport.sin6.sin6_port    = htons(AFS_CM_PORT);
  65
  66        min_level = RXRPC_SECURITY_ENCRYPT;
  67        ret = kernel_setsockopt(socket, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL,
  68                                (void *)&min_level, sizeof(min_level));
  69        if (ret < 0)
  70                goto error_2;
  71
  72        ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
  73        if (ret == -EADDRINUSE) {
  74                srx.transport.sin6.sin6_port = 0;
  75                ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
  76        }
  77        if (ret < 0)
  78                goto error_2;
  79
  80        srx.srx_service = YFS_CM_SERVICE;
  81        ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
  82        if (ret < 0)
  83                goto error_2;
  84
  85        /* Ideally, we'd turn on service upgrade here, but we can't because
  86         * OpenAFS is buggy and leaks the userStatus field from packet to
  87         * packet and between FS packets and CB packets - so if we try to do an
  88         * upgrade on an FS packet, OpenAFS will leak that into the CB packet
  89         * it sends back to us.
  90         */
  91
  92        rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
  93                                           afs_rx_discard_new_call);
  94
  95        ret = kernel_listen(socket, INT_MAX);
  96        if (ret < 0)
  97                goto error_2;
  98
  99        net->socket = socket;
 100        afs_charge_preallocation(&net->charge_preallocation_work);
 101        _leave(" = 0");
 102        return 0;
 103
 104error_2:
 105        sock_release(socket);
 106error_1:
 107        _leave(" = %d", ret);
 108        return ret;
 109}
 110
 111/*
 112 * close the RxRPC socket AFS was using
 113 */
 114void afs_close_socket(struct afs_net *net)
 115{
 116        _enter("");
 117
 118        kernel_listen(net->socket, 0);
 119        flush_workqueue(afs_async_calls);
 120
 121        if (net->spare_incoming_call) {
 122                afs_put_call(net->spare_incoming_call);
 123                net->spare_incoming_call = NULL;
 124        }
 125
 126        _debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
 127        wait_var_event(&net->nr_outstanding_calls,
 128                       !atomic_read(&net->nr_outstanding_calls));
 129        _debug("no outstanding calls");
 130
 131        kernel_sock_shutdown(net->socket, SHUT_RDWR);
 132        flush_workqueue(afs_async_calls);
 133        sock_release(net->socket);
 134
 135        _debug("dework");
 136        _leave("");
 137}
 138
 139/*
 140 * Allocate a call.
 141 */
 142static struct afs_call *afs_alloc_call(struct afs_net *net,
 143                                       const struct afs_call_type *type,
 144                                       gfp_t gfp)
 145{
 146        struct afs_call *call;
 147        int o;
 148
 149        call = kzalloc(sizeof(*call), gfp);
 150        if (!call)
 151                return NULL;
 152
 153        call->type = type;
 154        call->net = net;
 155        call->debug_id = atomic_inc_return(&rxrpc_debug_id);
 156        atomic_set(&call->usage, 1);
 157        INIT_WORK(&call->async_work, afs_process_async_call);
 158        init_waitqueue_head(&call->waitq);
 159        spin_lock_init(&call->state_lock);
 160        call->_iter = &call->iter;
 161
 162        o = atomic_inc_return(&net->nr_outstanding_calls);
 163        trace_afs_call(call, afs_call_trace_alloc, 1, o,
 164                       __builtin_return_address(0));
 165        return call;
 166}
 167
 168/*
 169 * Dispose of a reference on a call.
 170 */
 171void afs_put_call(struct afs_call *call)
 172{
 173        struct afs_net *net = call->net;
 174        int n = atomic_dec_return(&call->usage);
 175        int o = atomic_read(&net->nr_outstanding_calls);
 176
 177        trace_afs_call(call, afs_call_trace_put, n + 1, o,
 178                       __builtin_return_address(0));
 179
 180        ASSERTCMP(n, >=, 0);
 181        if (n == 0) {
 182                ASSERT(!work_pending(&call->async_work));
 183                ASSERT(call->type->name != NULL);
 184
 185                if (call->rxcall) {
 186                        rxrpc_kernel_end_call(net->socket, call->rxcall);
 187                        call->rxcall = NULL;
 188                }
 189                if (call->type->destructor)
 190                        call->type->destructor(call);
 191
 192                afs_put_server(call->net, call->cm_server);
 193                afs_put_cb_interest(call->net, call->cbi);
 194                afs_put_addrlist(call->alist);
 195                kfree(call->request);
 196
 197                trace_afs_call(call, afs_call_trace_free, 0, o,
 198                               __builtin_return_address(0));
 199                kfree(call);
 200
 201                o = atomic_dec_return(&net->nr_outstanding_calls);
 202                if (o == 0)
 203                        wake_up_var(&net->nr_outstanding_calls);
 204        }
 205}
 206
 207static struct afs_call *afs_get_call(struct afs_call *call,
 208                                     enum afs_call_trace why)
 209{
 210        int u = atomic_inc_return(&call->usage);
 211
 212        trace_afs_call(call, why, u,
 213                       atomic_read(&call->net->nr_outstanding_calls),
 214                       __builtin_return_address(0));
 215        return call;
 216}
 217
 218/*
 219 * Queue the call for actual work.
 220 */
 221static void afs_queue_call_work(struct afs_call *call)
 222{
 223        if (call->type->work) {
 224                INIT_WORK(&call->work, call->type->work);
 225
 226                afs_get_call(call, afs_call_trace_work);
 227                if (!queue_work(afs_wq, &call->work))
 228                        afs_put_call(call);
 229        }
 230}
 231
 232/*
 233 * allocate a call with flat request and reply buffers
 234 */
 235struct afs_call *afs_alloc_flat_call(struct afs_net *net,
 236                                     const struct afs_call_type *type,
 237                                     size_t request_size, size_t reply_max)
 238{
 239        struct afs_call *call;
 240
 241        call = afs_alloc_call(net, type, GFP_NOFS);
 242        if (!call)
 243                goto nomem_call;
 244
 245        if (request_size) {
 246                call->request_size = request_size;
 247                call->request = kmalloc(request_size, GFP_NOFS);
 248                if (!call->request)
 249                        goto nomem_free;
 250        }
 251
 252        if (reply_max) {
 253                call->reply_max = reply_max;
 254                call->buffer = kmalloc(reply_max, GFP_NOFS);
 255                if (!call->buffer)
 256                        goto nomem_free;
 257        }
 258
 259        afs_extract_to_buf(call, call->reply_max);
 260        call->operation_ID = type->op;
 261        init_waitqueue_head(&call->waitq);
 262        return call;
 263
 264nomem_free:
 265        afs_put_call(call);
 266nomem_call:
 267        return NULL;
 268}
 269
 270/*
 271 * clean up a call with flat buffer
 272 */
 273void afs_flat_call_destructor(struct afs_call *call)
 274{
 275        _enter("");
 276
 277        kfree(call->request);
 278        call->request = NULL;
 279        kfree(call->buffer);
 280        call->buffer = NULL;
 281}
 282
 283#define AFS_BVEC_MAX 8
 284
 285/*
 286 * Load the given bvec with the next few pages.
 287 */
 288static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
 289                          struct bio_vec *bv, pgoff_t first, pgoff_t last,
 290                          unsigned offset)
 291{
 292        struct page *pages[AFS_BVEC_MAX];
 293        unsigned int nr, n, i, to, bytes = 0;
 294
 295        nr = min_t(pgoff_t, last - first + 1, AFS_BVEC_MAX);
 296        n = find_get_pages_contig(call->mapping, first, nr, pages);
 297        ASSERTCMP(n, ==, nr);
 298
 299        msg->msg_flags |= MSG_MORE;
 300        for (i = 0; i < nr; i++) {
 301                to = PAGE_SIZE;
 302                if (first + i >= last) {
 303                        to = call->last_to;
 304                        msg->msg_flags &= ~MSG_MORE;
 305                }
 306                bv[i].bv_page = pages[i];
 307                bv[i].bv_len = to - offset;
 308                bv[i].bv_offset = offset;
 309                bytes += to - offset;
 310                offset = 0;
 311        }
 312
 313        iov_iter_bvec(&msg->msg_iter, WRITE, bv, nr, bytes);
 314}
 315
 316/*
 317 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 318 */
 319static void afs_notify_end_request_tx(struct sock *sock,
 320                                      struct rxrpc_call *rxcall,
 321                                      unsigned long call_user_ID)
 322{
 323        struct afs_call *call = (struct afs_call *)call_user_ID;
 324
 325        afs_set_call_state(call, AFS_CALL_CL_REQUESTING, AFS_CALL_CL_AWAIT_REPLY);
 326}
 327
 328/*
 329 * attach the data from a bunch of pages on an inode to a call
 330 */
 331static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
 332{
 333        struct bio_vec bv[AFS_BVEC_MAX];
 334        unsigned int bytes, nr, loop, offset;
 335        pgoff_t first = call->first, last = call->last;
 336        int ret;
 337
 338        offset = call->first_offset;
 339        call->first_offset = 0;
 340
 341        do {
 342                afs_load_bvec(call, msg, bv, first, last, offset);
 343                trace_afs_send_pages(call, msg, first, last, offset);
 344
 345                offset = 0;
 346                bytes = msg->msg_iter.count;
 347                nr = msg->msg_iter.nr_segs;
 348
 349                ret = rxrpc_kernel_send_data(call->net->socket, call->rxcall, msg,
 350                                             bytes, afs_notify_end_request_tx);
 351                for (loop = 0; loop < nr; loop++)
 352                        put_page(bv[loop].bv_page);
 353                if (ret < 0)
 354                        break;
 355
 356                first += nr;
 357        } while (first <= last);
 358
 359        trace_afs_sent_pages(call, call->first, last, first, ret);
 360        return ret;
 361}
 362
 363/*
 364 * initiate a call
 365 */
 366long afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call,
 367                   gfp_t gfp, bool async)
 368{
 369        struct sockaddr_rxrpc *srx = &ac->alist->addrs[ac->index];
 370        struct rxrpc_call *rxcall;
 371        struct msghdr msg;
 372        struct kvec iov[1];
 373        s64 tx_total_len;
 374        int ret;
 375
 376        _enter(",{%pISp},", &srx->transport);
 377
 378        ASSERT(call->type != NULL);
 379        ASSERT(call->type->name != NULL);
 380
 381        _debug("____MAKE %p{%s,%x} [%d]____",
 382               call, call->type->name, key_serial(call->key),
 383               atomic_read(&call->net->nr_outstanding_calls));
 384
 385        call->async = async;
 386        call->addr_ix = ac->index;
 387        call->alist = afs_get_addrlist(ac->alist);
 388
 389        /* Work out the length we're going to transmit.  This is awkward for
 390         * calls such as FS.StoreData where there's an extra injection of data
 391         * after the initial fixed part.
 392         */
 393        tx_total_len = call->request_size;
 394        if (call->send_pages) {
 395                if (call->last == call->first) {
 396                        tx_total_len += call->last_to - call->first_offset;
 397                } else {
 398                        /* It looks mathematically like you should be able to
 399                         * combine the following lines with the ones above, but
 400                         * unsigned arithmetic is fun when it wraps...
 401                         */
 402                        tx_total_len += PAGE_SIZE - call->first_offset;
 403                        tx_total_len += call->last_to;
 404                        tx_total_len += (call->last - call->first - 1) * PAGE_SIZE;
 405                }
 406        }
 407
 408        /* If the call is going to be asynchronous, we need an extra ref for
 409         * the call to hold itself so the caller need not hang on to its ref.
 410         */
 411        if (call->async)
 412                afs_get_call(call, afs_call_trace_get);
 413
 414        /* create a call */
 415        rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
 416                                         (unsigned long)call,
 417                                         tx_total_len, gfp,
 418                                         (async ?
 419                                          afs_wake_up_async_call :
 420                                          afs_wake_up_call_waiter),
 421                                         call->upgrade,
 422                                         call->debug_id);
 423        if (IS_ERR(rxcall)) {
 424                ret = PTR_ERR(rxcall);
 425                call->error = ret;
 426                goto error_kill_call;
 427        }
 428
 429        call->rxcall = rxcall;
 430
 431        /* send the request */
 432        iov[0].iov_base = call->request;
 433        iov[0].iov_len  = call->request_size;
 434
 435        msg.msg_name            = NULL;
 436        msg.msg_namelen         = 0;
 437        iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, call->request_size);
 438        msg.msg_control         = NULL;
 439        msg.msg_controllen      = 0;
 440        msg.msg_flags           = MSG_WAITALL | (call->send_pages ? MSG_MORE : 0);
 441
 442        ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
 443                                     &msg, call->request_size,
 444                                     afs_notify_end_request_tx);
 445        if (ret < 0)
 446                goto error_do_abort;
 447
 448        if (call->send_pages) {
 449                ret = afs_send_pages(call, &msg);
 450                if (ret < 0)
 451                        goto error_do_abort;
 452        }
 453
 454        /* Note that at this point, we may have received the reply or an abort
 455         * - and an asynchronous call may already have completed.
 456         */
 457        if (call->async) {
 458                afs_put_call(call);
 459                return -EINPROGRESS;
 460        }
 461
 462        return afs_wait_for_call_to_complete(call, ac);
 463
 464error_do_abort:
 465        if (ret != -ECONNABORTED) {
 466                rxrpc_kernel_abort_call(call->net->socket, rxcall,
 467                                        RX_USER_ABORT, ret, "KSD");
 468        } else {
 469                iov_iter_kvec(&msg.msg_iter, READ, NULL, 0, 0);
 470                rxrpc_kernel_recv_data(call->net->socket, rxcall,
 471                                       &msg.msg_iter, false,
 472                                       &call->abort_code, &call->service_id);
 473                ac->abort_code = call->abort_code;
 474                ac->responded = true;
 475        }
 476        call->error = ret;
 477        trace_afs_call_done(call);
 478error_kill_call:
 479        if (call->type->done)
 480                call->type->done(call);
 481
 482        /* We need to dispose of the extra ref we grabbed for an async call.
 483         * The call, however, might be queued on afs_async_calls and we need to
 484         * make sure we don't get any more notifications that might requeue it.
 485         */
 486        if (call->rxcall) {
 487                rxrpc_kernel_end_call(call->net->socket, call->rxcall);
 488                call->rxcall = NULL;
 489        }
 490        if (call->async) {
 491                if (cancel_work_sync(&call->async_work))
 492                        afs_put_call(call);
 493                afs_put_call(call);
 494        }
 495
 496        ac->error = ret;
 497        call->state = AFS_CALL_COMPLETE;
 498        afs_put_call(call);
 499        _leave(" = %d", ret);
 500        return ret;
 501}
 502
 503/*
 504 * deliver messages to a call
 505 */
 506static void afs_deliver_to_call(struct afs_call *call)
 507{
 508        enum afs_call_state state;
 509        u32 abort_code, remote_abort = 0;
 510        int ret;
 511
 512        _enter("%s", call->type->name);
 513
 514        while (state = READ_ONCE(call->state),
 515               state == AFS_CALL_CL_AWAIT_REPLY ||
 516               state == AFS_CALL_SV_AWAIT_OP_ID ||
 517               state == AFS_CALL_SV_AWAIT_REQUEST ||
 518               state == AFS_CALL_SV_AWAIT_ACK
 519               ) {
 520                if (state == AFS_CALL_SV_AWAIT_ACK) {
 521                        iov_iter_kvec(&call->iter, READ, NULL, 0, 0);
 522                        ret = rxrpc_kernel_recv_data(call->net->socket,
 523                                                     call->rxcall, &call->iter,
 524                                                     false, &remote_abort,
 525                                                     &call->service_id);
 526                        trace_afs_receive_data(call, &call->iter, false, ret);
 527
 528                        if (ret == -EINPROGRESS || ret == -EAGAIN)
 529                                return;
 530                        if (ret < 0 || ret == 1) {
 531                                if (ret == 1)
 532                                        ret = 0;
 533                                goto call_complete;
 534                        }
 535                        return;
 536                }
 537
 538                if (call->want_reply_time &&
 539                    rxrpc_kernel_get_reply_time(call->net->socket,
 540                                                call->rxcall,
 541                                                &call->reply_time))
 542                        call->want_reply_time = false;
 543
 544                ret = call->type->deliver(call);
 545                state = READ_ONCE(call->state);
 546                switch (ret) {
 547                case 0:
 548                        afs_queue_call_work(call);
 549                        if (state == AFS_CALL_CL_PROC_REPLY) {
 550                                if (call->cbi)
 551                                        set_bit(AFS_SERVER_FL_MAY_HAVE_CB,
 552                                                &call->cbi->server->flags);
 553                                goto call_complete;
 554                        }
 555                        ASSERTCMP(state, >, AFS_CALL_CL_PROC_REPLY);
 556                        goto done;
 557                case -EINPROGRESS:
 558                case -EAGAIN:
 559                        goto out;
 560                case -ECONNABORTED:
 561                        ASSERTCMP(state, ==, AFS_CALL_COMPLETE);
 562                        goto done;
 563                case -ENOTSUPP:
 564                        abort_code = RXGEN_OPCODE;
 565                        rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
 566                                                abort_code, ret, "KIV");
 567                        goto local_abort;
 568                case -EIO:
 569                        pr_err("kAFS: Call %u in bad state %u\n",
 570                               call->debug_id, state);
 571                        /* Fall through */
 572                case -ENODATA:
 573                case -EBADMSG:
 574                case -EMSGSIZE:
 575                default:
 576                        abort_code = RXGEN_CC_UNMARSHAL;
 577                        if (state != AFS_CALL_CL_AWAIT_REPLY)
 578                                abort_code = RXGEN_SS_UNMARSHAL;
 579                        rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
 580                                                abort_code, ret, "KUM");
 581                        goto local_abort;
 582                }
 583        }
 584
 585done:
 586        if (call->type->done)
 587                call->type->done(call);
 588        if (state == AFS_CALL_COMPLETE && call->incoming)
 589                afs_put_call(call);
 590out:
 591        _leave("");
 592        return;
 593
 594local_abort:
 595        abort_code = 0;
 596call_complete:
 597        afs_set_call_complete(call, ret, remote_abort);
 598        state = AFS_CALL_COMPLETE;
 599        goto done;
 600}
 601
 602/*
 603 * wait synchronously for a call to complete
 604 */
 605static long afs_wait_for_call_to_complete(struct afs_call *call,
 606                                          struct afs_addr_cursor *ac)
 607{
 608        signed long rtt2, timeout;
 609        long ret;
 610        bool stalled = false;
 611        u64 rtt;
 612        u32 life, last_life;
 613
 614        DECLARE_WAITQUEUE(myself, current);
 615
 616        _enter("");
 617
 618        rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
 619        rtt2 = nsecs_to_jiffies64(rtt) * 2;
 620        if (rtt2 < 2)
 621                rtt2 = 2;
 622
 623        timeout = rtt2;
 624        last_life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
 625
 626        add_wait_queue(&call->waitq, &myself);
 627        for (;;) {
 628                set_current_state(TASK_UNINTERRUPTIBLE);
 629
 630                /* deliver any messages that are in the queue */
 631                if (!afs_check_call_state(call, AFS_CALL_COMPLETE) &&
 632                    call->need_attention) {
 633                        call->need_attention = false;
 634                        __set_current_state(TASK_RUNNING);
 635                        afs_deliver_to_call(call);
 636                        continue;
 637                }
 638
 639                if (afs_check_call_state(call, AFS_CALL_COMPLETE))
 640                        break;
 641
 642                life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
 643                if (timeout == 0 &&
 644                    life == last_life && signal_pending(current)) {
 645                        if (stalled)
 646                                break;
 647                        __set_current_state(TASK_RUNNING);
 648                        rxrpc_kernel_probe_life(call->net->socket, call->rxcall);
 649                        timeout = rtt2;
 650                        stalled = true;
 651                        continue;
 652                }
 653
 654                if (life != last_life) {
 655                        timeout = rtt2;
 656                        last_life = life;
 657                        stalled = false;
 658                }
 659
 660                timeout = schedule_timeout(timeout);
 661        }
 662
 663        remove_wait_queue(&call->waitq, &myself);
 664        __set_current_state(TASK_RUNNING);
 665
 666        /* Kill off the call if it's still live. */
 667        if (!afs_check_call_state(call, AFS_CALL_COMPLETE)) {
 668                _debug("call interrupted");
 669                if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
 670                                            RX_USER_ABORT, -EINTR, "KWI"))
 671                        afs_set_call_complete(call, -EINTR, 0);
 672        }
 673
 674        spin_lock_bh(&call->state_lock);
 675        ac->abort_code = call->abort_code;
 676        ac->error = call->error;
 677        spin_unlock_bh(&call->state_lock);
 678
 679        ret = ac->error;
 680        switch (ret) {
 681        case 0:
 682                if (call->ret_reply0) {
 683                        ret = (long)call->reply[0];
 684                        call->reply[0] = NULL;
 685                }
 686                /* Fall through */
 687        case -ECONNABORTED:
 688                ac->responded = true;
 689                break;
 690        }
 691
 692        _debug("call complete");
 693        afs_put_call(call);
 694        _leave(" = %p", (void *)ret);
 695        return ret;
 696}
 697
 698/*
 699 * wake up a waiting call
 700 */
 701static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
 702                                    unsigned long call_user_ID)
 703{
 704        struct afs_call *call = (struct afs_call *)call_user_ID;
 705
 706        call->need_attention = true;
 707        wake_up(&call->waitq);
 708}
 709
 710/*
 711 * wake up an asynchronous call
 712 */
 713static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
 714                                   unsigned long call_user_ID)
 715{
 716        struct afs_call *call = (struct afs_call *)call_user_ID;
 717        int u;
 718
 719        trace_afs_notify_call(rxcall, call);
 720        call->need_attention = true;
 721
 722        u = atomic_fetch_add_unless(&call->usage, 1, 0);
 723        if (u != 0) {
 724                trace_afs_call(call, afs_call_trace_wake, u,
 725                               atomic_read(&call->net->nr_outstanding_calls),
 726                               __builtin_return_address(0));
 727
 728                if (!queue_work(afs_async_calls, &call->async_work))
 729                        afs_put_call(call);
 730        }
 731}
 732
 733/*
 734 * Delete an asynchronous call.  The work item carries a ref to the call struct
 735 * that we need to release.
 736 */
 737static void afs_delete_async_call(struct work_struct *work)
 738{
 739        struct afs_call *call = container_of(work, struct afs_call, async_work);
 740
 741        _enter("");
 742
 743        afs_put_call(call);
 744
 745        _leave("");
 746}
 747
 748/*
 749 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 750 * to the call struct that we either need to release or to pass on.
 751 */
 752static void afs_process_async_call(struct work_struct *work)
 753{
 754        struct afs_call *call = container_of(work, struct afs_call, async_work);
 755
 756        _enter("");
 757
 758        if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
 759                call->need_attention = false;
 760                afs_deliver_to_call(call);
 761        }
 762
 763        if (call->state == AFS_CALL_COMPLETE) {
 764                /* We have two refs to release - one from the alloc and one
 765                 * queued with the work item - and we can't just deallocate the
 766                 * call because the work item may be queued again.
 767                 */
 768                call->async_work.func = afs_delete_async_call;
 769                if (!queue_work(afs_async_calls, &call->async_work))
 770                        afs_put_call(call);
 771        }
 772
 773        afs_put_call(call);
 774        _leave("");
 775}
 776
 777static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
 778{
 779        struct afs_call *call = (struct afs_call *)user_call_ID;
 780
 781        call->rxcall = rxcall;
 782}
 783
 784/*
 785 * Charge the incoming call preallocation.
 786 */
 787void afs_charge_preallocation(struct work_struct *work)
 788{
 789        struct afs_net *net =
 790                container_of(work, struct afs_net, charge_preallocation_work);
 791        struct afs_call *call = net->spare_incoming_call;
 792
 793        for (;;) {
 794                if (!call) {
 795                        call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
 796                        if (!call)
 797                                break;
 798
 799                        call->async = true;
 800                        call->state = AFS_CALL_SV_AWAIT_OP_ID;
 801                        init_waitqueue_head(&call->waitq);
 802                        afs_extract_to_tmp(call);
 803                }
 804
 805                if (rxrpc_kernel_charge_accept(net->socket,
 806                                               afs_wake_up_async_call,
 807                                               afs_rx_attach,
 808                                               (unsigned long)call,
 809                                               GFP_KERNEL,
 810                                               call->debug_id) < 0)
 811                        break;
 812                call = NULL;
 813        }
 814        net->spare_incoming_call = call;
 815}
 816
 817/*
 818 * Discard a preallocated call when a socket is shut down.
 819 */
 820static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
 821                                    unsigned long user_call_ID)
 822{
 823        struct afs_call *call = (struct afs_call *)user_call_ID;
 824
 825        call->rxcall = NULL;
 826        afs_put_call(call);
 827}
 828
 829/*
 830 * Notification of an incoming call.
 831 */
 832static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
 833                            unsigned long user_call_ID)
 834{
 835        struct afs_net *net = afs_sock2net(sk);
 836
 837        queue_work(afs_wq, &net->charge_preallocation_work);
 838}
 839
 840/*
 841 * Grab the operation ID from an incoming cache manager call.  The socket
 842 * buffer is discarded on error or if we don't yet have sufficient data.
 843 */
 844static int afs_deliver_cm_op_id(struct afs_call *call)
 845{
 846        int ret;
 847
 848        _enter("{%zu}", iov_iter_count(call->_iter));
 849
 850        /* the operation ID forms the first four bytes of the request data */
 851        ret = afs_extract_data(call, true);
 852        if (ret < 0)
 853                return ret;
 854
 855        call->operation_ID = ntohl(call->tmp);
 856        afs_set_call_state(call, AFS_CALL_SV_AWAIT_OP_ID, AFS_CALL_SV_AWAIT_REQUEST);
 857
 858        /* ask the cache manager to route the call (it'll change the call type
 859         * if successful) */
 860        if (!afs_cm_incoming_call(call))
 861                return -ENOTSUPP;
 862
 863        trace_afs_cb_call(call);
 864
 865        /* pass responsibility for the remainer of this message off to the
 866         * cache manager op */
 867        return call->type->deliver(call);
 868}
 869
 870/*
 871 * Advance the AFS call state when an RxRPC service call ends the transmit
 872 * phase.
 873 */
 874static void afs_notify_end_reply_tx(struct sock *sock,
 875                                    struct rxrpc_call *rxcall,
 876                                    unsigned long call_user_ID)
 877{
 878        struct afs_call *call = (struct afs_call *)call_user_ID;
 879
 880        afs_set_call_state(call, AFS_CALL_SV_REPLYING, AFS_CALL_SV_AWAIT_ACK);
 881}
 882
 883/*
 884 * send an empty reply
 885 */
 886void afs_send_empty_reply(struct afs_call *call)
 887{
 888        struct afs_net *net = call->net;
 889        struct msghdr msg;
 890
 891        _enter("");
 892
 893        rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
 894
 895        msg.msg_name            = NULL;
 896        msg.msg_namelen         = 0;
 897        iov_iter_kvec(&msg.msg_iter, WRITE, NULL, 0, 0);
 898        msg.msg_control         = NULL;
 899        msg.msg_controllen      = 0;
 900        msg.msg_flags           = 0;
 901
 902        switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
 903                                       afs_notify_end_reply_tx)) {
 904        case 0:
 905                _leave(" [replied]");
 906                return;
 907
 908        case -ENOMEM:
 909                _debug("oom");
 910                rxrpc_kernel_abort_call(net->socket, call->rxcall,
 911                                        RX_USER_ABORT, -ENOMEM, "KOO");
 912        default:
 913                _leave(" [error]");
 914                return;
 915        }
 916}
 917
 918/*
 919 * send a simple reply
 920 */
 921void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
 922{
 923        struct afs_net *net = call->net;
 924        struct msghdr msg;
 925        struct kvec iov[1];
 926        int n;
 927
 928        _enter("");
 929
 930        rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
 931
 932        iov[0].iov_base         = (void *) buf;
 933        iov[0].iov_len          = len;
 934        msg.msg_name            = NULL;
 935        msg.msg_namelen         = 0;
 936        iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
 937        msg.msg_control         = NULL;
 938        msg.msg_controllen      = 0;
 939        msg.msg_flags           = 0;
 940
 941        n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
 942                                   afs_notify_end_reply_tx);
 943        if (n >= 0) {
 944                /* Success */
 945                _leave(" [replied]");
 946                return;
 947        }
 948
 949        if (n == -ENOMEM) {
 950                _debug("oom");
 951                rxrpc_kernel_abort_call(net->socket, call->rxcall,
 952                                        RX_USER_ABORT, -ENOMEM, "KOO");
 953        }
 954        _leave(" [error]");
 955}
 956
 957/*
 958 * Extract a piece of data from the received data socket buffers.
 959 */
 960int afs_extract_data(struct afs_call *call, bool want_more)
 961{
 962        struct afs_net *net = call->net;
 963        struct iov_iter *iter = call->_iter;
 964        enum afs_call_state state;
 965        u32 remote_abort = 0;
 966        int ret;
 967
 968        _enter("{%s,%zu},%d", call->type->name, iov_iter_count(iter), want_more);
 969
 970        ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, iter,
 971                                     want_more, &remote_abort,
 972                                     &call->service_id);
 973        if (ret == 0 || ret == -EAGAIN)
 974                return ret;
 975
 976        state = READ_ONCE(call->state);
 977        if (ret == 1) {
 978                switch (state) {
 979                case AFS_CALL_CL_AWAIT_REPLY:
 980                        afs_set_call_state(call, state, AFS_CALL_CL_PROC_REPLY);
 981                        break;
 982                case AFS_CALL_SV_AWAIT_REQUEST:
 983                        afs_set_call_state(call, state, AFS_CALL_SV_REPLYING);
 984                        break;
 985                case AFS_CALL_COMPLETE:
 986                        kdebug("prem complete %d", call->error);
 987                        return afs_io_error(call, afs_io_error_extract);
 988                default:
 989                        break;
 990                }
 991                return 0;
 992        }
 993
 994        afs_set_call_complete(call, ret, remote_abort);
 995        return ret;
 996}
 997
 998/*
 999 * Log protocol error production.
1000 */
1001noinline int afs_protocol_error(struct afs_call *call, int error,
1002                                enum afs_eproto_cause cause)
1003{
1004        trace_afs_protocol_error(call, error, cause);
1005        return error;
1006}
1007