linux/fs/afs/rxrpc.c
<<
>>
Prefs
   1/* Maintain an RxRPC server socket to do AFS communications through
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/slab.h>
  13#include <net/sock.h>
  14#include <net/af_rxrpc.h>
  15#include <rxrpc/packet.h>
  16#include "internal.h"
  17#include "afs_cm.h"
  18
  19static struct socket *afs_socket; /* my RxRPC socket */
  20static struct workqueue_struct *afs_async_calls;
  21static atomic_t afs_outstanding_calls;
  22static atomic_t afs_outstanding_skbs;
  23
  24static void afs_wake_up_call_waiter(struct afs_call *);
  25static int afs_wait_for_call_to_complete(struct afs_call *);
  26static void afs_wake_up_async_call(struct afs_call *);
  27static int afs_dont_wait_for_call_to_complete(struct afs_call *);
  28static void afs_process_async_call(struct afs_call *);
  29static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *);
  30static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool);
  31
  32/* synchronous call management */
  33const struct afs_wait_mode afs_sync_call = {
  34        .rx_wakeup      = afs_wake_up_call_waiter,
  35        .wait           = afs_wait_for_call_to_complete,
  36};
  37
  38/* asynchronous call management */
  39const struct afs_wait_mode afs_async_call = {
  40        .rx_wakeup      = afs_wake_up_async_call,
  41        .wait           = afs_dont_wait_for_call_to_complete,
  42};
  43
  44/* asynchronous incoming call management */
  45static const struct afs_wait_mode afs_async_incoming_call = {
  46        .rx_wakeup      = afs_wake_up_async_call,
  47};
  48
  49/* asynchronous incoming call initial processing */
  50static const struct afs_call_type afs_RXCMxxxx = {
  51        .name           = "CB.xxxx",
  52        .deliver        = afs_deliver_cm_op_id,
  53        .abort_to_error = afs_abort_to_error,
  54};
  55
  56static void afs_collect_incoming_call(struct work_struct *);
  57
  58static struct sk_buff_head afs_incoming_calls;
  59static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
  60
  61static void afs_async_workfn(struct work_struct *work)
  62{
  63        struct afs_call *call = container_of(work, struct afs_call, async_work);
  64
  65        call->async_workfn(call);
  66}
  67
  68/*
  69 * open an RxRPC socket and bind it to be a server for callback notifications
  70 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
  71 */
  72int afs_open_socket(void)
  73{
  74        struct sockaddr_rxrpc srx;
  75        struct socket *socket;
  76        int ret;
  77
  78        _enter("");
  79
  80        skb_queue_head_init(&afs_incoming_calls);
  81
  82        afs_async_calls = create_singlethread_workqueue("kafsd");
  83        if (!afs_async_calls) {
  84                _leave(" = -ENOMEM [wq]");
  85                return -ENOMEM;
  86        }
  87
  88        ret = sock_create_kern(AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
  89        if (ret < 0) {
  90                destroy_workqueue(afs_async_calls);
  91                _leave(" = %d [socket]", ret);
  92                return ret;
  93        }
  94
  95        socket->sk->sk_allocation = GFP_NOFS;
  96
  97        /* bind the callback manager's address to make this a server socket */
  98        srx.srx_family                  = AF_RXRPC;
  99        srx.srx_service                 = CM_SERVICE;
 100        srx.transport_type              = SOCK_DGRAM;
 101        srx.transport_len               = sizeof(srx.transport.sin);
 102        srx.transport.sin.sin_family    = AF_INET;
 103        srx.transport.sin.sin_port      = htons(AFS_CM_PORT);
 104        memset(&srx.transport.sin.sin_addr, 0,
 105               sizeof(srx.transport.sin.sin_addr));
 106
 107        ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
 108        if (ret < 0) {
 109                sock_release(socket);
 110                destroy_workqueue(afs_async_calls);
 111                _leave(" = %d [bind]", ret);
 112                return ret;
 113        }
 114
 115        rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor);
 116
 117        afs_socket = socket;
 118        _leave(" = 0");
 119        return 0;
 120}
 121
 122/*
 123 * close the RxRPC socket AFS was using
 124 */
 125void afs_close_socket(void)
 126{
 127        _enter("");
 128
 129        sock_release(afs_socket);
 130
 131        _debug("dework");
 132        destroy_workqueue(afs_async_calls);
 133
 134        ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0);
 135        ASSERTCMP(atomic_read(&afs_outstanding_calls), ==, 0);
 136        _leave("");
 137}
 138
 139/*
 140 * note that the data in a socket buffer is now delivered and that the buffer
 141 * should be freed
 142 */
 143static void afs_data_delivered(struct sk_buff *skb)
 144{
 145        if (!skb) {
 146                _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs));
 147                dump_stack();
 148        } else {
 149                _debug("DLVR %p{%u} [%d]",
 150                       skb, skb->mark, atomic_read(&afs_outstanding_skbs));
 151                if (atomic_dec_return(&afs_outstanding_skbs) == -1)
 152                        BUG();
 153                rxrpc_kernel_data_delivered(skb);
 154        }
 155}
 156
 157/*
 158 * free a socket buffer
 159 */
 160static void afs_free_skb(struct sk_buff *skb)
 161{
 162        if (!skb) {
 163                _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs));
 164                dump_stack();
 165        } else {
 166                _debug("FREE %p{%u} [%d]",
 167                       skb, skb->mark, atomic_read(&afs_outstanding_skbs));
 168                if (atomic_dec_return(&afs_outstanding_skbs) == -1)
 169                        BUG();
 170                rxrpc_kernel_free_skb(skb);
 171        }
 172}
 173
 174/*
 175 * free a call
 176 */
 177static void afs_free_call(struct afs_call *call)
 178{
 179        _debug("DONE %p{%s} [%d]",
 180               call, call->type->name, atomic_read(&afs_outstanding_calls));
 181        if (atomic_dec_return(&afs_outstanding_calls) == -1)
 182                BUG();
 183
 184        ASSERTCMP(call->rxcall, ==, NULL);
 185        ASSERT(!work_pending(&call->async_work));
 186        ASSERT(skb_queue_empty(&call->rx_queue));
 187        ASSERT(call->type->name != NULL);
 188
 189        kfree(call->request);
 190        kfree(call);
 191}
 192
 193/*
 194 * End a call but do not free it
 195 */
 196static void afs_end_call_nofree(struct afs_call *call)
 197{
 198        if (call->rxcall) {
 199                rxrpc_kernel_end_call(call->rxcall);
 200                call->rxcall = NULL;
 201        }
 202        if (call->type->destructor)
 203                call->type->destructor(call);
 204}
 205
 206/*
 207 * End a call and free it
 208 */
 209static void afs_end_call(struct afs_call *call)
 210{
 211        afs_end_call_nofree(call);
 212        afs_free_call(call);
 213}
 214
 215/*
 216 * allocate a call with flat request and reply buffers
 217 */
 218struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
 219                                     size_t request_size, size_t reply_size)
 220{
 221        struct afs_call *call;
 222
 223        call = kzalloc(sizeof(*call), GFP_NOFS);
 224        if (!call)
 225                goto nomem_call;
 226
 227        _debug("CALL %p{%s} [%d]",
 228               call, type->name, atomic_read(&afs_outstanding_calls));
 229        atomic_inc(&afs_outstanding_calls);
 230
 231        call->type = type;
 232        call->request_size = request_size;
 233        call->reply_max = reply_size;
 234
 235        if (request_size) {
 236                call->request = kmalloc(request_size, GFP_NOFS);
 237                if (!call->request)
 238                        goto nomem_free;
 239        }
 240
 241        if (reply_size) {
 242                call->buffer = kmalloc(reply_size, GFP_NOFS);
 243                if (!call->buffer)
 244                        goto nomem_free;
 245        }
 246
 247        init_waitqueue_head(&call->waitq);
 248        skb_queue_head_init(&call->rx_queue);
 249        return call;
 250
 251nomem_free:
 252        afs_free_call(call);
 253nomem_call:
 254        return NULL;
 255}
 256
 257/*
 258 * clean up a call with flat buffer
 259 */
 260void afs_flat_call_destructor(struct afs_call *call)
 261{
 262        _enter("");
 263
 264        kfree(call->request);
 265        call->request = NULL;
 266        kfree(call->buffer);
 267        call->buffer = NULL;
 268}
 269
 270/*
 271 * attach the data from a bunch of pages on an inode to a call
 272 */
 273static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
 274                          struct kvec *iov)
 275{
 276        struct page *pages[8];
 277        unsigned count, n, loop, offset, to;
 278        pgoff_t first = call->first, last = call->last;
 279        int ret;
 280
 281        _enter("");
 282
 283        offset = call->first_offset;
 284        call->first_offset = 0;
 285
 286        do {
 287                _debug("attach %lx-%lx", first, last);
 288
 289                count = last - first + 1;
 290                if (count > ARRAY_SIZE(pages))
 291                        count = ARRAY_SIZE(pages);
 292                n = find_get_pages_contig(call->mapping, first, count, pages);
 293                ASSERTCMP(n, ==, count);
 294
 295                loop = 0;
 296                do {
 297                        msg->msg_flags = 0;
 298                        to = PAGE_SIZE;
 299                        if (first + loop >= last)
 300                                to = call->last_to;
 301                        else
 302                                msg->msg_flags = MSG_MORE;
 303                        iov->iov_base = kmap(pages[loop]) + offset;
 304                        iov->iov_len = to - offset;
 305                        offset = 0;
 306
 307                        _debug("- range %u-%u%s",
 308                               offset, to, msg->msg_flags ? " [more]" : "");
 309                        iov_iter_kvec(&msg->msg_iter, WRITE | ITER_KVEC,
 310                                      iov, 1, to - offset);
 311
 312                        /* have to change the state *before* sending the last
 313                         * packet as RxRPC might give us the reply before it
 314                         * returns from sending the request */
 315                        if (first + loop >= last)
 316                                call->state = AFS_CALL_AWAIT_REPLY;
 317                        ret = rxrpc_kernel_send_data(call->rxcall, msg,
 318                                                     to - offset);
 319                        kunmap(pages[loop]);
 320                        if (ret < 0)
 321                                break;
 322                } while (++loop < count);
 323                first += count;
 324
 325                for (loop = 0; loop < count; loop++)
 326                        put_page(pages[loop]);
 327                if (ret < 0)
 328                        break;
 329        } while (first <= last);
 330
 331        _leave(" = %d", ret);
 332        return ret;
 333}
 334
 335/*
 336 * initiate a call
 337 */
 338int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
 339                  const struct afs_wait_mode *wait_mode)
 340{
 341        struct sockaddr_rxrpc srx;
 342        struct rxrpc_call *rxcall;
 343        struct msghdr msg;
 344        struct kvec iov[1];
 345        int ret;
 346        struct sk_buff *skb;
 347
 348        _enter("%x,{%d},", addr->s_addr, ntohs(call->port));
 349
 350        ASSERT(call->type != NULL);
 351        ASSERT(call->type->name != NULL);
 352
 353        _debug("____MAKE %p{%s,%x} [%d]____",
 354               call, call->type->name, key_serial(call->key),
 355               atomic_read(&afs_outstanding_calls));
 356
 357        call->wait_mode = wait_mode;
 358        call->async_workfn = afs_process_async_call;
 359        INIT_WORK(&call->async_work, afs_async_workfn);
 360
 361        memset(&srx, 0, sizeof(srx));
 362        srx.srx_family = AF_RXRPC;
 363        srx.srx_service = call->service_id;
 364        srx.transport_type = SOCK_DGRAM;
 365        srx.transport_len = sizeof(srx.transport.sin);
 366        srx.transport.sin.sin_family = AF_INET;
 367        srx.transport.sin.sin_port = call->port;
 368        memcpy(&srx.transport.sin.sin_addr, addr, 4);
 369
 370        /* create a call */
 371        rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
 372                                         (unsigned long) call, gfp);
 373        call->key = NULL;
 374        if (IS_ERR(rxcall)) {
 375                ret = PTR_ERR(rxcall);
 376                goto error_kill_call;
 377        }
 378
 379        call->rxcall = rxcall;
 380
 381        /* send the request */
 382        iov[0].iov_base = call->request;
 383        iov[0].iov_len  = call->request_size;
 384
 385        msg.msg_name            = NULL;
 386        msg.msg_namelen         = 0;
 387        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
 388                      call->request_size);
 389        msg.msg_control         = NULL;
 390        msg.msg_controllen      = 0;
 391        msg.msg_flags           = (call->send_pages ? MSG_MORE : 0);
 392
 393        /* have to change the state *before* sending the last packet as RxRPC
 394         * might give us the reply before it returns from sending the
 395         * request */
 396        if (!call->send_pages)
 397                call->state = AFS_CALL_AWAIT_REPLY;
 398        ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size);
 399        if (ret < 0)
 400                goto error_do_abort;
 401
 402        if (call->send_pages) {
 403                ret = afs_send_pages(call, &msg, iov);
 404                if (ret < 0)
 405                        goto error_do_abort;
 406        }
 407
 408        /* at this point, an async call may no longer exist as it may have
 409         * already completed */
 410        return wait_mode->wait(call);
 411
 412error_do_abort:
 413        rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT);
 414        while ((skb = skb_dequeue(&call->rx_queue)))
 415                afs_free_skb(skb);
 416error_kill_call:
 417        afs_end_call(call);
 418        _leave(" = %d", ret);
 419        return ret;
 420}
 421
 422/*
 423 * handles intercepted messages that were arriving in the socket's Rx queue
 424 * - called with the socket receive queue lock held to ensure message ordering
 425 * - called with softirqs disabled
 426 */
 427static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
 428                               struct sk_buff *skb)
 429{
 430        struct afs_call *call = (struct afs_call *) user_call_ID;
 431
 432        _enter("%p,,%u", call, skb->mark);
 433
 434        _debug("ICPT %p{%u} [%d]",
 435               skb, skb->mark, atomic_read(&afs_outstanding_skbs));
 436
 437        ASSERTCMP(sk, ==, afs_socket->sk);
 438        atomic_inc(&afs_outstanding_skbs);
 439
 440        if (!call) {
 441                /* its an incoming call for our callback service */
 442                skb_queue_tail(&afs_incoming_calls, skb);
 443                queue_work(afs_wq, &afs_collect_incoming_call_work);
 444        } else {
 445                /* route the messages directly to the appropriate call */
 446                skb_queue_tail(&call->rx_queue, skb);
 447                call->wait_mode->rx_wakeup(call);
 448        }
 449
 450        _leave("");
 451}
 452
 453/*
 454 * deliver messages to a call
 455 */
 456static void afs_deliver_to_call(struct afs_call *call)
 457{
 458        struct sk_buff *skb;
 459        bool last;
 460        u32 abort_code;
 461        int ret;
 462
 463        _enter("");
 464
 465        while ((call->state == AFS_CALL_AWAIT_REPLY ||
 466                call->state == AFS_CALL_AWAIT_OP_ID ||
 467                call->state == AFS_CALL_AWAIT_REQUEST ||
 468                call->state == AFS_CALL_AWAIT_ACK) &&
 469               (skb = skb_dequeue(&call->rx_queue))) {
 470                switch (skb->mark) {
 471                case RXRPC_SKB_MARK_DATA:
 472                        _debug("Rcv DATA");
 473                        last = rxrpc_kernel_is_data_last(skb);
 474                        ret = call->type->deliver(call, skb, last);
 475                        switch (ret) {
 476                        case 0:
 477                                if (last &&
 478                                    call->state == AFS_CALL_AWAIT_REPLY)
 479                                        call->state = AFS_CALL_COMPLETE;
 480                                break;
 481                        case -ENOTCONN:
 482                                abort_code = RX_CALL_DEAD;
 483                                goto do_abort;
 484                        case -ENOTSUPP:
 485                                abort_code = RX_INVALID_OPERATION;
 486                                goto do_abort;
 487                        default:
 488                                abort_code = RXGEN_CC_UNMARSHAL;
 489                                if (call->state != AFS_CALL_AWAIT_REPLY)
 490                                        abort_code = RXGEN_SS_UNMARSHAL;
 491                        do_abort:
 492                                rxrpc_kernel_abort_call(call->rxcall,
 493                                                        abort_code);
 494                                call->error = ret;
 495                                call->state = AFS_CALL_ERROR;
 496                                break;
 497                        }
 498                        afs_data_delivered(skb);
 499                        skb = NULL;
 500                        continue;
 501                case RXRPC_SKB_MARK_FINAL_ACK:
 502                        _debug("Rcv ACK");
 503                        call->state = AFS_CALL_COMPLETE;
 504                        break;
 505                case RXRPC_SKB_MARK_BUSY:
 506                        _debug("Rcv BUSY");
 507                        call->error = -EBUSY;
 508                        call->state = AFS_CALL_BUSY;
 509                        break;
 510                case RXRPC_SKB_MARK_REMOTE_ABORT:
 511                        abort_code = rxrpc_kernel_get_abort_code(skb);
 512                        call->error = call->type->abort_to_error(abort_code);
 513                        call->state = AFS_CALL_ABORTED;
 514                        _debug("Rcv ABORT %u -> %d", abort_code, call->error);
 515                        break;
 516                case RXRPC_SKB_MARK_NET_ERROR:
 517                        call->error = -rxrpc_kernel_get_error_number(skb);
 518                        call->state = AFS_CALL_ERROR;
 519                        _debug("Rcv NET ERROR %d", call->error);
 520                        break;
 521                case RXRPC_SKB_MARK_LOCAL_ERROR:
 522                        call->error = -rxrpc_kernel_get_error_number(skb);
 523                        call->state = AFS_CALL_ERROR;
 524                        _debug("Rcv LOCAL ERROR %d", call->error);
 525                        break;
 526                default:
 527                        BUG();
 528                        break;
 529                }
 530
 531                afs_free_skb(skb);
 532        }
 533
 534        /* make sure the queue is empty if the call is done with (we might have
 535         * aborted the call early because of an unmarshalling error) */
 536        if (call->state >= AFS_CALL_COMPLETE) {
 537                while ((skb = skb_dequeue(&call->rx_queue)))
 538                        afs_free_skb(skb);
 539                if (call->incoming)
 540                        afs_end_call(call);
 541        }
 542
 543        _leave("");
 544}
 545
 546/*
 547 * wait synchronously for a call to complete
 548 */
 549static int afs_wait_for_call_to_complete(struct afs_call *call)
 550{
 551        struct sk_buff *skb;
 552        int ret;
 553
 554        DECLARE_WAITQUEUE(myself, current);
 555
 556        _enter("");
 557
 558        add_wait_queue(&call->waitq, &myself);
 559        for (;;) {
 560                set_current_state(TASK_INTERRUPTIBLE);
 561
 562                /* deliver any messages that are in the queue */
 563                if (!skb_queue_empty(&call->rx_queue)) {
 564                        __set_current_state(TASK_RUNNING);
 565                        afs_deliver_to_call(call);
 566                        continue;
 567                }
 568
 569                ret = call->error;
 570                if (call->state >= AFS_CALL_COMPLETE)
 571                        break;
 572                ret = -EINTR;
 573                if (signal_pending(current))
 574                        break;
 575                schedule();
 576        }
 577
 578        remove_wait_queue(&call->waitq, &myself);
 579        __set_current_state(TASK_RUNNING);
 580
 581        /* kill the call */
 582        if (call->state < AFS_CALL_COMPLETE) {
 583                _debug("call incomplete");
 584                rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD);
 585                while ((skb = skb_dequeue(&call->rx_queue)))
 586                        afs_free_skb(skb);
 587        }
 588
 589        _debug("call complete");
 590        afs_end_call(call);
 591        _leave(" = %d", ret);
 592        return ret;
 593}
 594
 595/*
 596 * wake up a waiting call
 597 */
 598static void afs_wake_up_call_waiter(struct afs_call *call)
 599{
 600        wake_up(&call->waitq);
 601}
 602
 603/*
 604 * wake up an asynchronous call
 605 */
 606static void afs_wake_up_async_call(struct afs_call *call)
 607{
 608        _enter("");
 609        queue_work(afs_async_calls, &call->async_work);
 610}
 611
 612/*
 613 * put a call into asynchronous mode
 614 * - mustn't touch the call descriptor as the call my have completed by the
 615 *   time we get here
 616 */
 617static int afs_dont_wait_for_call_to_complete(struct afs_call *call)
 618{
 619        _enter("");
 620        return -EINPROGRESS;
 621}
 622
 623/*
 624 * delete an asynchronous call
 625 */
 626static void afs_delete_async_call(struct afs_call *call)
 627{
 628        _enter("");
 629
 630        afs_free_call(call);
 631
 632        _leave("");
 633}
 634
 635/*
 636 * perform processing on an asynchronous call
 637 * - on a multiple-thread workqueue this work item may try to run on several
 638 *   CPUs at the same time
 639 */
 640static void afs_process_async_call(struct afs_call *call)
 641{
 642        _enter("");
 643
 644        if (!skb_queue_empty(&call->rx_queue))
 645                afs_deliver_to_call(call);
 646
 647        if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) {
 648                if (call->wait_mode->async_complete)
 649                        call->wait_mode->async_complete(call->reply,
 650                                                        call->error);
 651                call->reply = NULL;
 652
 653                /* kill the call */
 654                afs_end_call_nofree(call);
 655
 656                /* we can't just delete the call because the work item may be
 657                 * queued */
 658                call->async_workfn = afs_delete_async_call;
 659                queue_work(afs_async_calls, &call->async_work);
 660        }
 661
 662        _leave("");
 663}
 664
 665/*
 666 * empty a socket buffer into a flat reply buffer
 667 */
 668void afs_transfer_reply(struct afs_call *call, struct sk_buff *skb)
 669{
 670        size_t len = skb->len;
 671
 672        if (skb_copy_bits(skb, 0, call->buffer + call->reply_size, len) < 0)
 673                BUG();
 674        call->reply_size += len;
 675}
 676
 677/*
 678 * accept the backlog of incoming calls
 679 */
 680static void afs_collect_incoming_call(struct work_struct *work)
 681{
 682        struct rxrpc_call *rxcall;
 683        struct afs_call *call = NULL;
 684        struct sk_buff *skb;
 685
 686        while ((skb = skb_dequeue(&afs_incoming_calls))) {
 687                _debug("new call");
 688
 689                /* don't need the notification */
 690                afs_free_skb(skb);
 691
 692                if (!call) {
 693                        call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
 694                        if (!call) {
 695                                rxrpc_kernel_reject_call(afs_socket);
 696                                return;
 697                        }
 698
 699                        call->async_workfn = afs_process_async_call;
 700                        INIT_WORK(&call->async_work, afs_async_workfn);
 701                        call->wait_mode = &afs_async_incoming_call;
 702                        call->type = &afs_RXCMxxxx;
 703                        init_waitqueue_head(&call->waitq);
 704                        skb_queue_head_init(&call->rx_queue);
 705                        call->state = AFS_CALL_AWAIT_OP_ID;
 706
 707                        _debug("CALL %p{%s} [%d]",
 708                               call, call->type->name,
 709                               atomic_read(&afs_outstanding_calls));
 710                        atomic_inc(&afs_outstanding_calls);
 711                }
 712
 713                rxcall = rxrpc_kernel_accept_call(afs_socket,
 714                                                  (unsigned long) call);
 715                if (!IS_ERR(rxcall)) {
 716                        call->rxcall = rxcall;
 717                        call = NULL;
 718                }
 719        }
 720
 721        if (call)
 722                afs_free_call(call);
 723}
 724
 725/*
 726 * grab the operation ID from an incoming cache manager call
 727 */
 728static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb,
 729                                bool last)
 730{
 731        size_t len = skb->len;
 732        void *oibuf = (void *) &call->operation_ID;
 733
 734        _enter("{%u},{%zu},%d", call->offset, len, last);
 735
 736        ASSERTCMP(call->offset, <, 4);
 737
 738        /* the operation ID forms the first four bytes of the request data */
 739        len = min_t(size_t, len, 4 - call->offset);
 740        if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0)
 741                BUG();
 742        if (!pskb_pull(skb, len))
 743                BUG();
 744        call->offset += len;
 745
 746        if (call->offset < 4) {
 747                if (last) {
 748                        _leave(" = -EBADMSG [op ID short]");
 749                        return -EBADMSG;
 750                }
 751                _leave(" = 0 [incomplete]");
 752                return 0;
 753        }
 754
 755        call->state = AFS_CALL_AWAIT_REQUEST;
 756
 757        /* ask the cache manager to route the call (it'll change the call type
 758         * if successful) */
 759        if (!afs_cm_incoming_call(call))
 760                return -ENOTSUPP;
 761
 762        /* pass responsibility for the remainer of this message off to the
 763         * cache manager op */
 764        return call->type->deliver(call, skb, last);
 765}
 766
 767/*
 768 * send an empty reply
 769 */
 770void afs_send_empty_reply(struct afs_call *call)
 771{
 772        struct msghdr msg;
 773
 774        _enter("");
 775
 776        msg.msg_name            = NULL;
 777        msg.msg_namelen         = 0;
 778        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
 779        msg.msg_control         = NULL;
 780        msg.msg_controllen      = 0;
 781        msg.msg_flags           = 0;
 782
 783        call->state = AFS_CALL_AWAIT_ACK;
 784        switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) {
 785        case 0:
 786                _leave(" [replied]");
 787                return;
 788
 789        case -ENOMEM:
 790                _debug("oom");
 791                rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
 792        default:
 793                afs_end_call(call);
 794                _leave(" [error]");
 795                return;
 796        }
 797}
 798
 799/*
 800 * send a simple reply
 801 */
 802void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
 803{
 804        struct msghdr msg;
 805        struct kvec iov[1];
 806        int n;
 807
 808        _enter("");
 809
 810        iov[0].iov_base         = (void *) buf;
 811        iov[0].iov_len          = len;
 812        msg.msg_name            = NULL;
 813        msg.msg_namelen         = 0;
 814        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
 815        msg.msg_control         = NULL;
 816        msg.msg_controllen      = 0;
 817        msg.msg_flags           = 0;
 818
 819        call->state = AFS_CALL_AWAIT_ACK;
 820        n = rxrpc_kernel_send_data(call->rxcall, &msg, len);
 821        if (n >= 0) {
 822                /* Success */
 823                _leave(" [replied]");
 824                return;
 825        }
 826
 827        if (n == -ENOMEM) {
 828                _debug("oom");
 829                rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
 830        }
 831        afs_end_call(call);
 832        _leave(" [error]");
 833}
 834
 835/*
 836 * extract a piece of data from the received data socket buffers
 837 */
 838int afs_extract_data(struct afs_call *call, struct sk_buff *skb,
 839                     bool last, void *buf, size_t count)
 840{
 841        size_t len = skb->len;
 842
 843        _enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count);
 844
 845        ASSERTCMP(call->offset, <, count);
 846
 847        len = min_t(size_t, len, count - call->offset);
 848        if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 ||
 849            !pskb_pull(skb, len))
 850                BUG();
 851        call->offset += len;
 852
 853        if (call->offset < count) {
 854                if (last) {
 855                        _leave(" = -EBADMSG [%d < %zu]", call->offset, count);
 856                        return -EBADMSG;
 857                }
 858                _leave(" = -EAGAIN");
 859                return -EAGAIN;
 860        }
 861        return 0;
 862}
 863