linux/net/sunrpc/xprtsock.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/xprtsock.c
   3 *
   4 * Client-side transport implementation for sockets.
   5 *
   6 * TCP callback races fixes (C) 1998 Red Hat
   7 * TCP send fixes (C) 1998 Red Hat
   8 * TCP NFS related read + write fixes
   9 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10 *
  11 * Rewrite of larges part of the code in order to stabilize TCP stuff.
  12 * Fix behaviour when socket buffer is full.
  13 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
  14 *
  15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
  16 *
  17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
  18 *   <gilles.quillard@bull.net>
  19 */
  20
  21#include <linux/types.h>
  22#include <linux/string.h>
  23#include <linux/slab.h>
  24#include <linux/module.h>
  25#include <linux/capability.h>
  26#include <linux/pagemap.h>
  27#include <linux/errno.h>
  28#include <linux/socket.h>
  29#include <linux/in.h>
  30#include <linux/net.h>
  31#include <linux/mm.h>
  32#include <linux/un.h>
  33#include <linux/udp.h>
  34#include <linux/tcp.h>
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/sched.h>
  37#include <linux/sunrpc/svcsock.h>
  38#include <linux/sunrpc/xprtsock.h>
  39#include <linux/file.h>
  40#ifdef CONFIG_NFS_V4_1
  41#include <linux/sunrpc/bc_xprt.h>
  42#endif
  43
  44#include <net/sock.h>
  45#include <net/checksum.h>
  46#include <net/udp.h>
  47#include <net/tcp.h>
  48
  49#include "sunrpc.h"
  50
  51static void xs_close(struct rpc_xprt *xprt);
  52
  53/*
  54 * xprtsock tunables
  55 */
  56unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
  57unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
  58
  59unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
  60unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
  61
  62#define XS_TCP_LINGER_TO        (15U * HZ)
  63static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
  64
  65/*
  66 * We can register our own files under /proc/sys/sunrpc by
  67 * calling register_sysctl_table() again.  The files in that
  68 * directory become the union of all files registered there.
  69 *
  70 * We simply need to make sure that we don't collide with
  71 * someone else's file names!
  72 */
  73
  74#ifdef RPC_DEBUG
  75
  76static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
  77static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
  78static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
  79static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
  80
  81static struct ctl_table_header *sunrpc_table_header;
  82
  83/*
  84 * FIXME: changing the UDP slot table size should also resize the UDP
  85 *        socket buffers for existing UDP transports
  86 */
  87static ctl_table xs_tunables_table[] = {
  88        {
  89                .procname       = "udp_slot_table_entries",
  90                .data           = &xprt_udp_slot_table_entries,
  91                .maxlen         = sizeof(unsigned int),
  92                .mode           = 0644,
  93                .proc_handler   = proc_dointvec_minmax,
  94                .extra1         = &min_slot_table_size,
  95                .extra2         = &max_slot_table_size
  96        },
  97        {
  98                .procname       = "tcp_slot_table_entries",
  99                .data           = &xprt_tcp_slot_table_entries,
 100                .maxlen         = sizeof(unsigned int),
 101                .mode           = 0644,
 102                .proc_handler   = proc_dointvec_minmax,
 103                .extra1         = &min_slot_table_size,
 104                .extra2         = &max_slot_table_size
 105        },
 106        {
 107                .procname       = "min_resvport",
 108                .data           = &xprt_min_resvport,
 109                .maxlen         = sizeof(unsigned int),
 110                .mode           = 0644,
 111                .proc_handler   = proc_dointvec_minmax,
 112                .extra1         = &xprt_min_resvport_limit,
 113                .extra2         = &xprt_max_resvport_limit
 114        },
 115        {
 116                .procname       = "max_resvport",
 117                .data           = &xprt_max_resvport,
 118                .maxlen         = sizeof(unsigned int),
 119                .mode           = 0644,
 120                .proc_handler   = proc_dointvec_minmax,
 121                .extra1         = &xprt_min_resvport_limit,
 122                .extra2         = &xprt_max_resvport_limit
 123        },
 124        {
 125                .procname       = "tcp_fin_timeout",
 126                .data           = &xs_tcp_fin_timeout,
 127                .maxlen         = sizeof(xs_tcp_fin_timeout),
 128                .mode           = 0644,
 129                .proc_handler   = proc_dointvec_jiffies,
 130        },
 131        { },
 132};
 133
 134static ctl_table sunrpc_table[] = {
 135        {
 136                .procname       = "sunrpc",
 137                .mode           = 0555,
 138                .child          = xs_tunables_table
 139        },
 140        { },
 141};
 142
 143#endif
 144
 145/*
 146 * Wait duration for a reply from the RPC portmapper.
 147 */
 148#define XS_BIND_TO              (60U * HZ)
 149
 150/*
 151 * Delay if a UDP socket connect error occurs.  This is most likely some
 152 * kind of resource problem on the local host.
 153 */
 154#define XS_UDP_REEST_TO         (2U * HZ)
 155
 156/*
 157 * The reestablish timeout allows clients to delay for a bit before attempting
 158 * to reconnect to a server that just dropped our connection.
 159 *
 160 * We implement an exponential backoff when trying to reestablish a TCP
 161 * transport connection with the server.  Some servers like to drop a TCP
 162 * connection when they are overworked, so we start with a short timeout and
 163 * increase over time if the server is down or not responding.
 164 */
 165#define XS_TCP_INIT_REEST_TO    (3U * HZ)
 166#define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
 167
 168/*
 169 * TCP idle timeout; client drops the transport socket if it is idle
 170 * for this long.  Note that we also timeout UDP sockets to prevent
 171 * holding port numbers when there is no RPC traffic.
 172 */
 173#define XS_IDLE_DISC_TO         (5U * 60 * HZ)
 174
 175#ifdef RPC_DEBUG
 176# undef  RPC_DEBUG_DATA
 177# define RPCDBG_FACILITY        RPCDBG_TRANS
 178#endif
 179
 180#ifdef RPC_DEBUG_DATA
 181static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 182{
 183        u8 *buf = (u8 *) packet;
 184        int j;
 185
 186        dprintk("RPC:       %s\n", msg);
 187        for (j = 0; j < count && j < 128; j += 4) {
 188                if (!(j & 31)) {
 189                        if (j)
 190                                dprintk("\n");
 191                        dprintk("0x%04x ", j);
 192                }
 193                dprintk("%02x%02x%02x%02x ",
 194                        buf[j], buf[j+1], buf[j+2], buf[j+3]);
 195        }
 196        dprintk("\n");
 197}
 198#else
 199static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 200{
 201        /* NOP */
 202}
 203#endif
 204
 205struct sock_xprt {
 206        struct rpc_xprt         xprt;
 207
 208        /*
 209         * Network layer
 210         */
 211        struct socket *         sock;
 212        struct sock *           inet;
 213
 214        /*
 215         * State of TCP reply receive
 216         */
 217        __be32                  tcp_fraghdr,
 218                                tcp_xid,
 219                                tcp_calldir;
 220
 221        u32                     tcp_offset,
 222                                tcp_reclen;
 223
 224        unsigned long           tcp_copied,
 225                                tcp_flags;
 226
 227        /*
 228         * Connection of transports
 229         */
 230        struct delayed_work     connect_worker;
 231        struct sockaddr_storage srcaddr;
 232        unsigned short          srcport;
 233
 234        /*
 235         * UDP socket buffer size parameters
 236         */
 237        size_t                  rcvsize,
 238                                sndsize;
 239
 240        /*
 241         * Saved socket callback addresses
 242         */
 243        void                    (*old_data_ready)(struct sock *, int);
 244        void                    (*old_state_change)(struct sock *);
 245        void                    (*old_write_space)(struct sock *);
 246        void                    (*old_error_report)(struct sock *);
 247};
 248
 249/*
 250 * TCP receive state flags
 251 */
 252#define TCP_RCV_LAST_FRAG       (1UL << 0)
 253#define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
 254#define TCP_RCV_COPY_XID        (1UL << 2)
 255#define TCP_RCV_COPY_DATA       (1UL << 3)
 256#define TCP_RCV_READ_CALLDIR    (1UL << 4)
 257#define TCP_RCV_COPY_CALLDIR    (1UL << 5)
 258
 259/*
 260 * TCP RPC flags
 261 */
 262#define TCP_RPC_REPLY           (1UL << 6)
 263
 264static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
 265{
 266        return (struct sockaddr *) &xprt->addr;
 267}
 268
 269static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
 270{
 271        return (struct sockaddr_un *) &xprt->addr;
 272}
 273
 274static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
 275{
 276        return (struct sockaddr_in *) &xprt->addr;
 277}
 278
 279static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
 280{
 281        return (struct sockaddr_in6 *) &xprt->addr;
 282}
 283
 284static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
 285{
 286        struct sockaddr *sap = xs_addr(xprt);
 287        struct sockaddr_in6 *sin6;
 288        struct sockaddr_in *sin;
 289        struct sockaddr_un *sun;
 290        char buf[128];
 291
 292        switch (sap->sa_family) {
 293        case AF_LOCAL:
 294                sun = xs_addr_un(xprt);
 295                strlcpy(buf, sun->sun_path, sizeof(buf));
 296                xprt->address_strings[RPC_DISPLAY_ADDR] =
 297                                                kstrdup(buf, GFP_KERNEL);
 298                break;
 299        case AF_INET:
 300                (void)rpc_ntop(sap, buf, sizeof(buf));
 301                xprt->address_strings[RPC_DISPLAY_ADDR] =
 302                                                kstrdup(buf, GFP_KERNEL);
 303                sin = xs_addr_in(xprt);
 304                snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
 305                break;
 306        case AF_INET6:
 307                (void)rpc_ntop(sap, buf, sizeof(buf));
 308                xprt->address_strings[RPC_DISPLAY_ADDR] =
 309                                                kstrdup(buf, GFP_KERNEL);
 310                sin6 = xs_addr_in6(xprt);
 311                snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 312                break;
 313        default:
 314                BUG();
 315        }
 316
 317        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 318}
 319
 320static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
 321{
 322        struct sockaddr *sap = xs_addr(xprt);
 323        char buf[128];
 324
 325        snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
 326        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 327
 328        snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
 329        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 330}
 331
 332static void xs_format_peer_addresses(struct rpc_xprt *xprt,
 333                                     const char *protocol,
 334                                     const char *netid)
 335{
 336        xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
 337        xprt->address_strings[RPC_DISPLAY_NETID] = netid;
 338        xs_format_common_peer_addresses(xprt);
 339        xs_format_common_peer_ports(xprt);
 340}
 341
 342static void xs_update_peer_port(struct rpc_xprt *xprt)
 343{
 344        kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
 345        kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
 346
 347        xs_format_common_peer_ports(xprt);
 348}
 349
 350static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 351{
 352        unsigned int i;
 353
 354        for (i = 0; i < RPC_DISPLAY_MAX; i++)
 355                switch (i) {
 356                case RPC_DISPLAY_PROTO:
 357                case RPC_DISPLAY_NETID:
 358                        continue;
 359                default:
 360                        kfree(xprt->address_strings[i]);
 361                }
 362}
 363
 364#define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
 365
 366static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
 367{
 368        struct msghdr msg = {
 369                .msg_name       = addr,
 370                .msg_namelen    = addrlen,
 371                .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
 372        };
 373        struct kvec iov = {
 374                .iov_base       = vec->iov_base + base,
 375                .iov_len        = vec->iov_len - base,
 376        };
 377
 378        if (iov.iov_len != 0)
 379                return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
 380        return kernel_sendmsg(sock, &msg, NULL, 0, 0);
 381}
 382
 383static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
 384{
 385        struct page **ppage;
 386        unsigned int remainder;
 387        int err, sent = 0;
 388
 389        remainder = xdr->page_len - base;
 390        base += xdr->page_base;
 391        ppage = xdr->pages + (base >> PAGE_SHIFT);
 392        base &= ~PAGE_MASK;
 393        for(;;) {
 394                unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
 395                int flags = XS_SENDMSG_FLAGS;
 396
 397                remainder -= len;
 398                if (remainder != 0 || more)
 399                        flags |= MSG_MORE;
 400                err = sock->ops->sendpage(sock, *ppage, base, len, flags);
 401                if (remainder == 0 || err != len)
 402                        break;
 403                sent += err;
 404                ppage++;
 405                base = 0;
 406        }
 407        if (sent == 0)
 408                return err;
 409        if (err > 0)
 410                sent += err;
 411        return sent;
 412}
 413
 414/**
 415 * xs_sendpages - write pages directly to a socket
 416 * @sock: socket to send on
 417 * @addr: UDP only -- address of destination
 418 * @addrlen: UDP only -- length of destination address
 419 * @xdr: buffer containing this request
 420 * @base: starting position in the buffer
 421 *
 422 */
 423static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
 424{
 425        unsigned int remainder = xdr->len - base;
 426        int err, sent = 0;
 427
 428        if (unlikely(!sock))
 429                return -ENOTSOCK;
 430
 431        clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
 432        if (base != 0) {
 433                addr = NULL;
 434                addrlen = 0;
 435        }
 436
 437        if (base < xdr->head[0].iov_len || addr != NULL) {
 438                unsigned int len = xdr->head[0].iov_len - base;
 439                remainder -= len;
 440                err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
 441                if (remainder == 0 || err != len)
 442                        goto out;
 443                sent += err;
 444                base = 0;
 445        } else
 446                base -= xdr->head[0].iov_len;
 447
 448        if (base < xdr->page_len) {
 449                unsigned int len = xdr->page_len - base;
 450                remainder -= len;
 451                err = xs_send_pagedata(sock, xdr, base, remainder != 0);
 452                if (remainder == 0 || err != len)
 453                        goto out;
 454                sent += err;
 455                base = 0;
 456        } else
 457                base -= xdr->page_len;
 458
 459        if (base >= xdr->tail[0].iov_len)
 460                return sent;
 461        err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
 462out:
 463        if (sent == 0)
 464                return err;
 465        if (err > 0)
 466                sent += err;
 467        return sent;
 468}
 469
 470static void xs_nospace_callback(struct rpc_task *task)
 471{
 472        struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
 473
 474        transport->inet->sk_write_pending--;
 475        clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 476}
 477
 478/**
 479 * xs_nospace - place task on wait queue if transmit was incomplete
 480 * @task: task to put to sleep
 481 *
 482 */
 483static int xs_nospace(struct rpc_task *task)
 484{
 485        struct rpc_rqst *req = task->tk_rqstp;
 486        struct rpc_xprt *xprt = req->rq_xprt;
 487        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 488        int ret = 0;
 489
 490        dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
 491                        task->tk_pid, req->rq_slen - req->rq_bytes_sent,
 492                        req->rq_slen);
 493
 494        /* Protect against races with write_space */
 495        spin_lock_bh(&xprt->transport_lock);
 496
 497        /* Don't race with disconnect */
 498        if (xprt_connected(xprt)) {
 499                if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
 500                        ret = -EAGAIN;
 501                        /*
 502                         * Notify TCP that we're limited by the application
 503                         * window size
 504                         */
 505                        set_bit(SOCK_NOSPACE, &transport->sock->flags);
 506                        transport->inet->sk_write_pending++;
 507                        /* ...and wait for more buffer space */
 508                        xprt_wait_for_buffer_space(task, xs_nospace_callback);
 509                }
 510        } else {
 511                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 512                ret = -ENOTCONN;
 513        }
 514
 515        spin_unlock_bh(&xprt->transport_lock);
 516        return ret;
 517}
 518
 519/*
 520 * Construct a stream transport record marker in @buf.
 521 */
 522static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
 523{
 524        u32 reclen = buf->len - sizeof(rpc_fraghdr);
 525        rpc_fraghdr *base = buf->head[0].iov_base;
 526        *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
 527}
 528
 529/**
 530 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
 531 * @task: RPC task that manages the state of an RPC request
 532 *
 533 * Return values:
 534 *        0:    The request has been sent
 535 *   EAGAIN:    The socket was blocked, please call again later to
 536 *              complete the request
 537 * ENOTCONN:    Caller needs to invoke connect logic then call again
 538 *    other:    Some other error occured, the request was not sent
 539 */
 540static int xs_local_send_request(struct rpc_task *task)
 541{
 542        struct rpc_rqst *req = task->tk_rqstp;
 543        struct rpc_xprt *xprt = req->rq_xprt;
 544        struct sock_xprt *transport =
 545                                container_of(xprt, struct sock_xprt, xprt);
 546        struct xdr_buf *xdr = &req->rq_snd_buf;
 547        int status;
 548
 549        xs_encode_stream_record_marker(&req->rq_snd_buf);
 550
 551        xs_pktdump("packet data:",
 552                        req->rq_svec->iov_base, req->rq_svec->iov_len);
 553
 554        status = xs_sendpages(transport->sock, NULL, 0,
 555                                                xdr, req->rq_bytes_sent);
 556        dprintk("RPC:       %s(%u) = %d\n",
 557                        __func__, xdr->len - req->rq_bytes_sent, status);
 558        if (likely(status >= 0)) {
 559                req->rq_bytes_sent += status;
 560                req->rq_xmit_bytes_sent += status;
 561                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 562                        req->rq_bytes_sent = 0;
 563                        return 0;
 564                }
 565                status = -EAGAIN;
 566        }
 567
 568        switch (status) {
 569        case -EAGAIN:
 570                status = xs_nospace(task);
 571                break;
 572        default:
 573                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 574                        -status);
 575        case -EPIPE:
 576                xs_close(xprt);
 577                status = -ENOTCONN;
 578        }
 579
 580        return status;
 581}
 582
 583/**
 584 * xs_udp_send_request - write an RPC request to a UDP socket
 585 * @task: address of RPC task that manages the state of an RPC request
 586 *
 587 * Return values:
 588 *        0:    The request has been sent
 589 *   EAGAIN:    The socket was blocked, please call again later to
 590 *              complete the request
 591 * ENOTCONN:    Caller needs to invoke connect logic then call again
 592 *    other:    Some other error occurred, the request was not sent
 593 */
 594static int xs_udp_send_request(struct rpc_task *task)
 595{
 596        struct rpc_rqst *req = task->tk_rqstp;
 597        struct rpc_xprt *xprt = req->rq_xprt;
 598        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 599        struct xdr_buf *xdr = &req->rq_snd_buf;
 600        int status;
 601
 602        xs_pktdump("packet data:",
 603                                req->rq_svec->iov_base,
 604                                req->rq_svec->iov_len);
 605
 606        if (!xprt_bound(xprt))
 607                return -ENOTCONN;
 608        status = xs_sendpages(transport->sock,
 609                              xs_addr(xprt),
 610                              xprt->addrlen, xdr,
 611                              req->rq_bytes_sent);
 612
 613        dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
 614                        xdr->len - req->rq_bytes_sent, status);
 615
 616        if (status >= 0) {
 617                req->rq_xmit_bytes_sent += status;
 618                if (status >= req->rq_slen)
 619                        return 0;
 620                /* Still some bytes left; set up for a retry later. */
 621                status = -EAGAIN;
 622        }
 623
 624        switch (status) {
 625        case -ENOTSOCK:
 626                status = -ENOTCONN;
 627                /* Should we call xs_close() here? */
 628                break;
 629        case -EAGAIN:
 630                status = xs_nospace(task);
 631                break;
 632        default:
 633                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 634                        -status);
 635        case -ENETUNREACH:
 636        case -EPIPE:
 637        case -ECONNREFUSED:
 638                /* When the server has died, an ICMP port unreachable message
 639                 * prompts ECONNREFUSED. */
 640                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 641        }
 642
 643        return status;
 644}
 645
 646/**
 647 * xs_tcp_shutdown - gracefully shut down a TCP socket
 648 * @xprt: transport
 649 *
 650 * Initiates a graceful shutdown of the TCP socket by calling the
 651 * equivalent of shutdown(SHUT_WR);
 652 */
 653static void xs_tcp_shutdown(struct rpc_xprt *xprt)
 654{
 655        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 656        struct socket *sock = transport->sock;
 657
 658        if (sock != NULL)
 659                kernel_sock_shutdown(sock, SHUT_WR);
 660}
 661
 662/**
 663 * xs_tcp_send_request - write an RPC request to a TCP socket
 664 * @task: address of RPC task that manages the state of an RPC request
 665 *
 666 * Return values:
 667 *        0:    The request has been sent
 668 *   EAGAIN:    The socket was blocked, please call again later to
 669 *              complete the request
 670 * ENOTCONN:    Caller needs to invoke connect logic then call again
 671 *    other:    Some other error occurred, the request was not sent
 672 *
 673 * XXX: In the case of soft timeouts, should we eventually give up
 674 *      if sendmsg is not able to make progress?
 675 */
 676static int xs_tcp_send_request(struct rpc_task *task)
 677{
 678        struct rpc_rqst *req = task->tk_rqstp;
 679        struct rpc_xprt *xprt = req->rq_xprt;
 680        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 681        struct xdr_buf *xdr = &req->rq_snd_buf;
 682        int status;
 683
 684        xs_encode_stream_record_marker(&req->rq_snd_buf);
 685
 686        xs_pktdump("packet data:",
 687                                req->rq_svec->iov_base,
 688                                req->rq_svec->iov_len);
 689
 690        /* Continue transmitting the packet/record. We must be careful
 691         * to cope with writespace callbacks arriving _after_ we have
 692         * called sendmsg(). */
 693        while (1) {
 694                status = xs_sendpages(transport->sock,
 695                                        NULL, 0, xdr, req->rq_bytes_sent);
 696
 697                dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
 698                                xdr->len - req->rq_bytes_sent, status);
 699
 700                if (unlikely(status < 0))
 701                        break;
 702
 703                /* If we've sent the entire packet, immediately
 704                 * reset the count of bytes sent. */
 705                req->rq_bytes_sent += status;
 706                req->rq_xmit_bytes_sent += status;
 707                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 708                        req->rq_bytes_sent = 0;
 709                        return 0;
 710                }
 711
 712                if (status != 0)
 713                        continue;
 714                status = -EAGAIN;
 715                break;
 716        }
 717
 718        switch (status) {
 719        case -ENOTSOCK:
 720                status = -ENOTCONN;
 721                /* Should we call xs_close() here? */
 722                break;
 723        case -EAGAIN:
 724                status = xs_nospace(task);
 725                break;
 726        default:
 727                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 728                        -status);
 729        case -ECONNRESET:
 730        case -EPIPE:
 731                xs_tcp_shutdown(xprt);
 732        case -ECONNREFUSED:
 733        case -ENOTCONN:
 734                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 735        }
 736
 737        return status;
 738}
 739
 740/**
 741 * xs_tcp_release_xprt - clean up after a tcp transmission
 742 * @xprt: transport
 743 * @task: rpc task
 744 *
 745 * This cleans up if an error causes us to abort the transmission of a request.
 746 * In this case, the socket may need to be reset in order to avoid confusing
 747 * the server.
 748 */
 749static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 750{
 751        struct rpc_rqst *req;
 752
 753        if (task != xprt->snd_task)
 754                return;
 755        if (task == NULL)
 756                goto out_release;
 757        req = task->tk_rqstp;
 758        if (req->rq_bytes_sent == 0)
 759                goto out_release;
 760        if (req->rq_bytes_sent == req->rq_snd_buf.len)
 761                goto out_release;
 762        set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
 763out_release:
 764        xprt_release_xprt(xprt, task);
 765}
 766
 767static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 768{
 769        transport->old_data_ready = sk->sk_data_ready;
 770        transport->old_state_change = sk->sk_state_change;
 771        transport->old_write_space = sk->sk_write_space;
 772        transport->old_error_report = sk->sk_error_report;
 773}
 774
 775static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 776{
 777        sk->sk_data_ready = transport->old_data_ready;
 778        sk->sk_state_change = transport->old_state_change;
 779        sk->sk_write_space = transport->old_write_space;
 780        sk->sk_error_report = transport->old_error_report;
 781}
 782
 783static void xs_reset_transport(struct sock_xprt *transport)
 784{
 785        struct socket *sock = transport->sock;
 786        struct sock *sk = transport->inet;
 787
 788        if (sk == NULL)
 789                return;
 790
 791        transport->srcport = 0;
 792
 793        write_lock_bh(&sk->sk_callback_lock);
 794        transport->inet = NULL;
 795        transport->sock = NULL;
 796
 797        sk->sk_user_data = NULL;
 798
 799        xs_restore_old_callbacks(transport, sk);
 800        write_unlock_bh(&sk->sk_callback_lock);
 801
 802        sk->sk_no_check = 0;
 803
 804        sock_release(sock);
 805}
 806
 807/**
 808 * xs_close - close a socket
 809 * @xprt: transport
 810 *
 811 * This is used when all requests are complete; ie, no DRC state remains
 812 * on the server we want to save.
 813 *
 814 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
 815 * xs_reset_transport() zeroing the socket from underneath a writer.
 816 */
 817static void xs_close(struct rpc_xprt *xprt)
 818{
 819        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 820
 821        dprintk("RPC:       xs_close xprt %p\n", xprt);
 822
 823        xs_reset_transport(transport);
 824        xprt->reestablish_timeout = 0;
 825
 826        smp_mb__before_clear_bit();
 827        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
 828        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 829        clear_bit(XPRT_CLOSING, &xprt->state);
 830        smp_mb__after_clear_bit();
 831        xprt_disconnect_done(xprt);
 832}
 833
 834static void xs_tcp_close(struct rpc_xprt *xprt)
 835{
 836        if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
 837                xs_close(xprt);
 838        else
 839                xs_tcp_shutdown(xprt);
 840}
 841
 842/**
 843 * xs_destroy - prepare to shutdown a transport
 844 * @xprt: doomed transport
 845 *
 846 */
 847static void xs_destroy(struct rpc_xprt *xprt)
 848{
 849        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 850
 851        dprintk("RPC:       xs_destroy xprt %p\n", xprt);
 852
 853        cancel_delayed_work_sync(&transport->connect_worker);
 854
 855        xs_close(xprt);
 856        xs_free_peer_addresses(xprt);
 857        xprt_free(xprt);
 858        module_put(THIS_MODULE);
 859}
 860
 861static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
 862{
 863        return (struct rpc_xprt *) sk->sk_user_data;
 864}
 865
 866static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
 867{
 868        struct xdr_skb_reader desc = {
 869                .skb            = skb,
 870                .offset         = sizeof(rpc_fraghdr),
 871                .count          = skb->len - sizeof(rpc_fraghdr),
 872        };
 873
 874        if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
 875                return -1;
 876        if (desc.count)
 877                return -1;
 878        return 0;
 879}
 880
 881/**
 882 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
 883 * @sk: socket with data to read
 884 * @len: how much data to read
 885 *
 886 * Currently this assumes we can read the whole reply in a single gulp.
 887 */
 888static void xs_local_data_ready(struct sock *sk, int len)
 889{
 890        struct rpc_task *task;
 891        struct rpc_xprt *xprt;
 892        struct rpc_rqst *rovr;
 893        struct sk_buff *skb;
 894        int err, repsize, copied;
 895        u32 _xid;
 896        __be32 *xp;
 897
 898        read_lock_bh(&sk->sk_callback_lock);
 899        dprintk("RPC:       %s...\n", __func__);
 900        xprt = xprt_from_sock(sk);
 901        if (xprt == NULL)
 902                goto out;
 903
 904        skb = skb_recv_datagram(sk, 0, 1, &err);
 905        if (skb == NULL)
 906                goto out;
 907
 908        if (xprt->shutdown)
 909                goto dropit;
 910
 911        repsize = skb->len - sizeof(rpc_fraghdr);
 912        if (repsize < 4) {
 913                dprintk("RPC:       impossible RPC reply size %d\n", repsize);
 914                goto dropit;
 915        }
 916
 917        /* Copy the XID from the skb... */
 918        xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
 919        if (xp == NULL)
 920                goto dropit;
 921
 922        /* Look up and lock the request corresponding to the given XID */
 923        spin_lock(&xprt->transport_lock);
 924        rovr = xprt_lookup_rqst(xprt, *xp);
 925        if (!rovr)
 926                goto out_unlock;
 927        task = rovr->rq_task;
 928
 929        copied = rovr->rq_private_buf.buflen;
 930        if (copied > repsize)
 931                copied = repsize;
 932
 933        if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 934                dprintk("RPC:       sk_buff copy failed\n");
 935                goto out_unlock;
 936        }
 937
 938        xprt_complete_rqst(task, copied);
 939
 940 out_unlock:
 941        spin_unlock(&xprt->transport_lock);
 942 dropit:
 943        skb_free_datagram(sk, skb);
 944 out:
 945        read_unlock_bh(&sk->sk_callback_lock);
 946}
 947
 948/**
 949 * xs_udp_data_ready - "data ready" callback for UDP sockets
 950 * @sk: socket with data to read
 951 * @len: how much data to read
 952 *
 953 */
 954static void xs_udp_data_ready(struct sock *sk, int len)
 955{
 956        struct rpc_task *task;
 957        struct rpc_xprt *xprt;
 958        struct rpc_rqst *rovr;
 959        struct sk_buff *skb;
 960        int err, repsize, copied;
 961        u32 _xid;
 962        __be32 *xp;
 963
 964        read_lock_bh(&sk->sk_callback_lock);
 965        dprintk("RPC:       xs_udp_data_ready...\n");
 966        if (!(xprt = xprt_from_sock(sk)))
 967                goto out;
 968
 969        if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
 970                goto out;
 971
 972        if (xprt->shutdown)
 973                goto dropit;
 974
 975        repsize = skb->len - sizeof(struct udphdr);
 976        if (repsize < 4) {
 977                dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
 978                goto dropit;
 979        }
 980
 981        /* Copy the XID from the skb... */
 982        xp = skb_header_pointer(skb, sizeof(struct udphdr),
 983                                sizeof(_xid), &_xid);
 984        if (xp == NULL)
 985                goto dropit;
 986
 987        /* Look up and lock the request corresponding to the given XID */
 988        spin_lock(&xprt->transport_lock);
 989        rovr = xprt_lookup_rqst(xprt, *xp);
 990        if (!rovr)
 991                goto out_unlock;
 992        task = rovr->rq_task;
 993
 994        if ((copied = rovr->rq_private_buf.buflen) > repsize)
 995                copied = repsize;
 996
 997        /* Suck it into the iovec, verify checksum if not done by hw. */
 998        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 999                UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
1000                goto out_unlock;
1001        }
1002
1003        UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
1004
1005        /* Something worked... */
1006        dst_confirm(skb_dst(skb));
1007
1008        xprt_adjust_cwnd(task, copied);
1009        xprt_complete_rqst(task, copied);
1010
1011 out_unlock:
1012        spin_unlock(&xprt->transport_lock);
1013 dropit:
1014        skb_free_datagram(sk, skb);
1015 out:
1016        read_unlock_bh(&sk->sk_callback_lock);
1017}
1018
1019static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1020{
1021        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1022        size_t len, used;
1023        char *p;
1024
1025        p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
1026        len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1027        used = xdr_skb_read_bits(desc, p, len);
1028        transport->tcp_offset += used;
1029        if (used != len)
1030                return;
1031
1032        transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
1033        if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1034                transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1035        else
1036                transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1037        transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1038
1039        transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1040        transport->tcp_offset = 0;
1041
1042        /* Sanity check of the record length */
1043        if (unlikely(transport->tcp_reclen < 8)) {
1044                dprintk("RPC:       invalid TCP record fragment length\n");
1045                xprt_force_disconnect(xprt);
1046                return;
1047        }
1048        dprintk("RPC:       reading TCP record fragment of length %d\n",
1049                        transport->tcp_reclen);
1050}
1051
1052static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1053{
1054        if (transport->tcp_offset == transport->tcp_reclen) {
1055                transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1056                transport->tcp_offset = 0;
1057                if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
1058                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1059                        transport->tcp_flags |= TCP_RCV_COPY_XID;
1060                        transport->tcp_copied = 0;
1061                }
1062        }
1063}
1064
1065static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1066{
1067        size_t len, used;
1068        char *p;
1069
1070        len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1071        dprintk("RPC:       reading XID (%Zu bytes)\n", len);
1072        p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1073        used = xdr_skb_read_bits(desc, p, len);
1074        transport->tcp_offset += used;
1075        if (used != len)
1076                return;
1077        transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1078        transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1079        transport->tcp_copied = 4;
1080        dprintk("RPC:       reading %s XID %08x\n",
1081                        (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1082                                                              : "request with",
1083                        ntohl(transport->tcp_xid));
1084        xs_tcp_check_fraghdr(transport);
1085}
1086
1087static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1088                                       struct xdr_skb_reader *desc)
1089{
1090        size_t len, used;
1091        u32 offset;
1092        char *p;
1093
1094        /*
1095         * We want transport->tcp_offset to be 8 at the end of this routine
1096         * (4 bytes for the xid and 4 bytes for the call/reply flag).
1097         * When this function is called for the first time,
1098         * transport->tcp_offset is 4 (after having already read the xid).
1099         */
1100        offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1101        len = sizeof(transport->tcp_calldir) - offset;
1102        dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1103        p = ((char *) &transport->tcp_calldir) + offset;
1104        used = xdr_skb_read_bits(desc, p, len);
1105        transport->tcp_offset += used;
1106        if (used != len)
1107                return;
1108        transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1109        /*
1110         * We don't yet have the XDR buffer, so we will write the calldir
1111         * out after we get the buffer from the 'struct rpc_rqst'
1112         */
1113        switch (ntohl(transport->tcp_calldir)) {
1114        case RPC_REPLY:
1115                transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1116                transport->tcp_flags |= TCP_RCV_COPY_DATA;
1117                transport->tcp_flags |= TCP_RPC_REPLY;
1118                break;
1119        case RPC_CALL:
1120                transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1121                transport->tcp_flags |= TCP_RCV_COPY_DATA;
1122                transport->tcp_flags &= ~TCP_RPC_REPLY;
1123                break;
1124        default:
1125                dprintk("RPC:       invalid request message type\n");
1126                xprt_force_disconnect(&transport->xprt);
1127        }
1128        xs_tcp_check_fraghdr(transport);
1129}
1130
1131static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1132                                     struct xdr_skb_reader *desc,
1133                                     struct rpc_rqst *req)
1134{
1135        struct sock_xprt *transport =
1136                                container_of(xprt, struct sock_xprt, xprt);
1137        struct xdr_buf *rcvbuf;
1138        size_t len;
1139        ssize_t r;
1140
1141        rcvbuf = &req->rq_private_buf;
1142
1143        if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1144                /*
1145                 * Save the RPC direction in the XDR buffer
1146                 */
1147                memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1148                        &transport->tcp_calldir,
1149                        sizeof(transport->tcp_calldir));
1150                transport->tcp_copied += sizeof(transport->tcp_calldir);
1151                transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1152        }
1153
1154        len = desc->count;
1155        if (len > transport->tcp_reclen - transport->tcp_offset) {
1156                struct xdr_skb_reader my_desc;
1157
1158                len = transport->tcp_reclen - transport->tcp_offset;
1159                memcpy(&my_desc, desc, sizeof(my_desc));
1160                my_desc.count = len;
1161                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1162                                          &my_desc, xdr_skb_read_bits);
1163                desc->count -= r;
1164                desc->offset += r;
1165        } else
1166                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1167                                          desc, xdr_skb_read_bits);
1168
1169        if (r > 0) {
1170                transport->tcp_copied += r;
1171                transport->tcp_offset += r;
1172        }
1173        if (r != len) {
1174                /* Error when copying to the receive buffer,
1175                 * usually because we weren't able to allocate
1176                 * additional buffer pages. All we can do now
1177                 * is turn off TCP_RCV_COPY_DATA, so the request
1178                 * will not receive any additional updates,
1179                 * and time out.
1180                 * Any remaining data from this record will
1181                 * be discarded.
1182                 */
1183                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1184                dprintk("RPC:       XID %08x truncated request\n",
1185                                ntohl(transport->tcp_xid));
1186                dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1187                                "tcp_offset = %u, tcp_reclen = %u\n",
1188                                xprt, transport->tcp_copied,
1189                                transport->tcp_offset, transport->tcp_reclen);
1190                return;
1191        }
1192
1193        dprintk("RPC:       XID %08x read %Zd bytes\n",
1194                        ntohl(transport->tcp_xid), r);
1195        dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1196                        "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1197                        transport->tcp_offset, transport->tcp_reclen);
1198
1199        if (transport->tcp_copied == req->rq_private_buf.buflen)
1200                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1201        else if (transport->tcp_offset == transport->tcp_reclen) {
1202                if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1203                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1204        }
1205}
1206
1207/*
1208 * Finds the request corresponding to the RPC xid and invokes the common
1209 * tcp read code to read the data.
1210 */
1211static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1212                                    struct xdr_skb_reader *desc)
1213{
1214        struct sock_xprt *transport =
1215                                container_of(xprt, struct sock_xprt, xprt);
1216        struct rpc_rqst *req;
1217
1218        dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1219
1220        /* Find and lock the request corresponding to this xid */
1221        spin_lock(&xprt->transport_lock);
1222        req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1223        if (!req) {
1224                dprintk("RPC:       XID %08x request not found!\n",
1225                                ntohl(transport->tcp_xid));
1226                spin_unlock(&xprt->transport_lock);
1227                return -1;
1228        }
1229
1230        xs_tcp_read_common(xprt, desc, req);
1231
1232        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1233                xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1234
1235        spin_unlock(&xprt->transport_lock);
1236        return 0;
1237}
1238
1239#if defined(CONFIG_NFS_V4_1)
1240/*
1241 * Obtains an rpc_rqst previously allocated and invokes the common
1242 * tcp read code to read the data.  The result is placed in the callback
1243 * queue.
1244 * If we're unable to obtain the rpc_rqst we schedule the closing of the
1245 * connection and return -1.
1246 */
1247static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1248                                       struct xdr_skb_reader *desc)
1249{
1250        struct sock_xprt *transport =
1251                                container_of(xprt, struct sock_xprt, xprt);
1252        struct rpc_rqst *req;
1253
1254        req = xprt_alloc_bc_request(xprt);
1255        if (req == NULL) {
1256                printk(KERN_WARNING "Callback slot table overflowed\n");
1257                xprt_force_disconnect(xprt);
1258                return -1;
1259        }
1260
1261        req->rq_xid = transport->tcp_xid;
1262        dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1263        xs_tcp_read_common(xprt, desc, req);
1264
1265        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1266                struct svc_serv *bc_serv = xprt->bc_serv;
1267
1268                /*
1269                 * Add callback request to callback list.  The callback
1270                 * service sleeps on the sv_cb_waitq waiting for new
1271                 * requests.  Wake it up after adding enqueing the
1272                 * request.
1273                 */
1274                dprintk("RPC:       add callback request to list\n");
1275                spin_lock(&bc_serv->sv_cb_lock);
1276                list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1277                spin_unlock(&bc_serv->sv_cb_lock);
1278                wake_up(&bc_serv->sv_cb_waitq);
1279        }
1280
1281        req->rq_private_buf.len = transport->tcp_copied;
1282
1283        return 0;
1284}
1285
1286static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1287                                        struct xdr_skb_reader *desc)
1288{
1289        struct sock_xprt *transport =
1290                                container_of(xprt, struct sock_xprt, xprt);
1291
1292        return (transport->tcp_flags & TCP_RPC_REPLY) ?
1293                xs_tcp_read_reply(xprt, desc) :
1294                xs_tcp_read_callback(xprt, desc);
1295}
1296#else
1297static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1298                                        struct xdr_skb_reader *desc)
1299{
1300        return xs_tcp_read_reply(xprt, desc);
1301}
1302#endif /* CONFIG_NFS_V4_1 */
1303
1304/*
1305 * Read data off the transport.  This can be either an RPC_CALL or an
1306 * RPC_REPLY.  Relay the processing to helper functions.
1307 */
1308static void xs_tcp_read_data(struct rpc_xprt *xprt,
1309                                    struct xdr_skb_reader *desc)
1310{
1311        struct sock_xprt *transport =
1312                                container_of(xprt, struct sock_xprt, xprt);
1313
1314        if (_xs_tcp_read_data(xprt, desc) == 0)
1315                xs_tcp_check_fraghdr(transport);
1316        else {
1317                /*
1318                 * The transport_lock protects the request handling.
1319                 * There's no need to hold it to update the tcp_flags.
1320                 */
1321                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1322        }
1323}
1324
1325static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1326{
1327        size_t len;
1328
1329        len = transport->tcp_reclen - transport->tcp_offset;
1330        if (len > desc->count)
1331                len = desc->count;
1332        desc->count -= len;
1333        desc->offset += len;
1334        transport->tcp_offset += len;
1335        dprintk("RPC:       discarded %Zu bytes\n", len);
1336        xs_tcp_check_fraghdr(transport);
1337}
1338
1339static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1340{
1341        struct rpc_xprt *xprt = rd_desc->arg.data;
1342        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1343        struct xdr_skb_reader desc = {
1344                .skb    = skb,
1345                .offset = offset,
1346                .count  = len,
1347        };
1348
1349        dprintk("RPC:       xs_tcp_data_recv started\n");
1350        do {
1351                /* Read in a new fragment marker if necessary */
1352                /* Can we ever really expect to get completely empty fragments? */
1353                if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1354                        xs_tcp_read_fraghdr(xprt, &desc);
1355                        continue;
1356                }
1357                /* Read in the xid if necessary */
1358                if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1359                        xs_tcp_read_xid(transport, &desc);
1360                        continue;
1361                }
1362                /* Read in the call/reply flag */
1363                if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1364                        xs_tcp_read_calldir(transport, &desc);
1365                        continue;
1366                }
1367                /* Read in the request data */
1368                if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1369                        xs_tcp_read_data(xprt, &desc);
1370                        continue;
1371                }
1372                /* Skip over any trailing bytes on short reads */
1373                xs_tcp_read_discard(transport, &desc);
1374        } while (desc.count);
1375        dprintk("RPC:       xs_tcp_data_recv done\n");
1376        return len - desc.count;
1377}
1378
1379/**
1380 * xs_tcp_data_ready - "data ready" callback for TCP sockets
1381 * @sk: socket with data to read
1382 * @bytes: how much data to read
1383 *
1384 */
1385static void xs_tcp_data_ready(struct sock *sk, int bytes)
1386{
1387        struct rpc_xprt *xprt;
1388        read_descriptor_t rd_desc;
1389        int read;
1390
1391        dprintk("RPC:       xs_tcp_data_ready...\n");
1392
1393        read_lock_bh(&sk->sk_callback_lock);
1394        if (!(xprt = xprt_from_sock(sk)))
1395                goto out;
1396        if (xprt->shutdown)
1397                goto out;
1398
1399        /* Any data means we had a useful conversation, so
1400         * the we don't need to delay the next reconnect
1401         */
1402        if (xprt->reestablish_timeout)
1403                xprt->reestablish_timeout = 0;
1404
1405        /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1406        rd_desc.arg.data = xprt;
1407        do {
1408                rd_desc.count = 65536;
1409                read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1410        } while (read > 0);
1411out:
1412        read_unlock_bh(&sk->sk_callback_lock);
1413}
1414
1415/*
1416 * Do the equivalent of linger/linger2 handling for dealing with
1417 * broken servers that don't close the socket in a timely
1418 * fashion
1419 */
1420static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1421                unsigned long timeout)
1422{
1423        struct sock_xprt *transport;
1424
1425        if (xprt_test_and_set_connecting(xprt))
1426                return;
1427        set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1428        transport = container_of(xprt, struct sock_xprt, xprt);
1429        queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1430                           timeout);
1431}
1432
1433static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1434{
1435        struct sock_xprt *transport;
1436
1437        transport = container_of(xprt, struct sock_xprt, xprt);
1438
1439        if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1440            !cancel_delayed_work(&transport->connect_worker))
1441                return;
1442        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1443        xprt_clear_connecting(xprt);
1444}
1445
1446static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1447{
1448        smp_mb__before_clear_bit();
1449        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1450        clear_bit(XPRT_CLOSING, &xprt->state);
1451        smp_mb__after_clear_bit();
1452        /* Mark transport as closed and wake up all pending tasks */
1453        xprt_disconnect_done(xprt);
1454}
1455
1456/**
1457 * xs_tcp_state_change - callback to handle TCP socket state changes
1458 * @sk: socket whose state has changed
1459 *
1460 */
1461static void xs_tcp_state_change(struct sock *sk)
1462{
1463        struct rpc_xprt *xprt;
1464
1465        read_lock_bh(&sk->sk_callback_lock);
1466        if (!(xprt = xprt_from_sock(sk)))
1467                goto out;
1468        dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1469        dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
1470                        sk->sk_state, xprt_connected(xprt),
1471                        sock_flag(sk, SOCK_DEAD),
1472                        sock_flag(sk, SOCK_ZAPPED),
1473                        sk->sk_shutdown);
1474
1475        switch (sk->sk_state) {
1476        case TCP_ESTABLISHED:
1477                spin_lock(&xprt->transport_lock);
1478                if (!xprt_test_and_set_connected(xprt)) {
1479                        struct sock_xprt *transport = container_of(xprt,
1480                                        struct sock_xprt, xprt);
1481
1482                        /* Reset TCP record info */
1483                        transport->tcp_offset = 0;
1484                        transport->tcp_reclen = 0;
1485                        transport->tcp_copied = 0;
1486                        transport->tcp_flags =
1487                                TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1488
1489                        xprt_wake_pending_tasks(xprt, -EAGAIN);
1490                }
1491                spin_unlock(&xprt->transport_lock);
1492                break;
1493        case TCP_FIN_WAIT1:
1494                /* The client initiated a shutdown of the socket */
1495                xprt->connect_cookie++;
1496                xprt->reestablish_timeout = 0;
1497                set_bit(XPRT_CLOSING, &xprt->state);
1498                smp_mb__before_clear_bit();
1499                clear_bit(XPRT_CONNECTED, &xprt->state);
1500                clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1501                smp_mb__after_clear_bit();
1502                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1503                break;
1504        case TCP_CLOSE_WAIT:
1505                /* The server initiated a shutdown of the socket */
1506                xprt_force_disconnect(xprt);
1507                xprt->connect_cookie++;
1508        case TCP_CLOSING:
1509                /*
1510                 * If the server closed down the connection, make sure that
1511                 * we back off before reconnecting
1512                 */
1513                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1514                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1515                break;
1516        case TCP_LAST_ACK:
1517                set_bit(XPRT_CLOSING, &xprt->state);
1518                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1519                smp_mb__before_clear_bit();
1520                clear_bit(XPRT_CONNECTED, &xprt->state);
1521                smp_mb__after_clear_bit();
1522                break;
1523        case TCP_CLOSE:
1524                xs_tcp_cancel_linger_timeout(xprt);
1525                xs_sock_mark_closed(xprt);
1526        }
1527 out:
1528        read_unlock_bh(&sk->sk_callback_lock);
1529}
1530
1531/**
1532 * xs_error_report - callback mainly for catching socket errors
1533 * @sk: socket
1534 */
1535static void xs_error_report(struct sock *sk)
1536{
1537        struct rpc_xprt *xprt;
1538
1539        read_lock_bh(&sk->sk_callback_lock);
1540        if (!(xprt = xprt_from_sock(sk)))
1541                goto out;
1542        dprintk("RPC:       %s client %p...\n"
1543                        "RPC:       error %d\n",
1544                        __func__, xprt, sk->sk_err);
1545        xprt_wake_pending_tasks(xprt, -EAGAIN);
1546out:
1547        read_unlock_bh(&sk->sk_callback_lock);
1548}
1549
1550static void xs_write_space(struct sock *sk)
1551{
1552        struct socket *sock;
1553        struct rpc_xprt *xprt;
1554
1555        if (unlikely(!(sock = sk->sk_socket)))
1556                return;
1557        clear_bit(SOCK_NOSPACE, &sock->flags);
1558
1559        if (unlikely(!(xprt = xprt_from_sock(sk))))
1560                return;
1561        if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1562                return;
1563
1564        xprt_write_space(xprt);
1565}
1566
1567/**
1568 * xs_udp_write_space - callback invoked when socket buffer space
1569 *                             becomes available
1570 * @sk: socket whose state has changed
1571 *
1572 * Called when more output buffer space is available for this socket.
1573 * We try not to wake our writers until they can make "significant"
1574 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1575 * with a bunch of small requests.
1576 */
1577static void xs_udp_write_space(struct sock *sk)
1578{
1579        read_lock_bh(&sk->sk_callback_lock);
1580
1581        /* from net/core/sock.c:sock_def_write_space */
1582        if (sock_writeable(sk))
1583                xs_write_space(sk);
1584
1585        read_unlock_bh(&sk->sk_callback_lock);
1586}
1587
1588/**
1589 * xs_tcp_write_space - callback invoked when socket buffer space
1590 *                             becomes available
1591 * @sk: socket whose state has changed
1592 *
1593 * Called when more output buffer space is available for this socket.
1594 * We try not to wake our writers until they can make "significant"
1595 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1596 * with a bunch of small requests.
1597 */
1598static void xs_tcp_write_space(struct sock *sk)
1599{
1600        read_lock_bh(&sk->sk_callback_lock);
1601
1602        /* from net/core/stream.c:sk_stream_write_space */
1603        if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1604                xs_write_space(sk);
1605
1606        read_unlock_bh(&sk->sk_callback_lock);
1607}
1608
1609static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1610{
1611        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1612        struct sock *sk = transport->inet;
1613
1614        if (transport->rcvsize) {
1615                sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1616                sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1617        }
1618        if (transport->sndsize) {
1619                sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1620                sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1621                sk->sk_write_space(sk);
1622        }
1623}
1624
1625/**
1626 * xs_udp_set_buffer_size - set send and receive limits
1627 * @xprt: generic transport
1628 * @sndsize: requested size of send buffer, in bytes
1629 * @rcvsize: requested size of receive buffer, in bytes
1630 *
1631 * Set socket send and receive buffer size limits.
1632 */
1633static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1634{
1635        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1636
1637        transport->sndsize = 0;
1638        if (sndsize)
1639                transport->sndsize = sndsize + 1024;
1640        transport->rcvsize = 0;
1641        if (rcvsize)
1642                transport->rcvsize = rcvsize + 1024;
1643
1644        xs_udp_do_set_buffer_size(xprt);
1645}
1646
1647/**
1648 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1649 * @task: task that timed out
1650 *
1651 * Adjust the congestion window after a retransmit timeout has occurred.
1652 */
1653static void xs_udp_timer(struct rpc_task *task)
1654{
1655        xprt_adjust_cwnd(task, -ETIMEDOUT);
1656}
1657
1658static unsigned short xs_get_random_port(void)
1659{
1660        unsigned short range = xprt_max_resvport - xprt_min_resvport;
1661        unsigned short rand = (unsigned short) net_random() % range;
1662        return rand + xprt_min_resvport;
1663}
1664
1665/**
1666 * xs_set_port - reset the port number in the remote endpoint address
1667 * @xprt: generic transport
1668 * @port: new port number
1669 *
1670 */
1671static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1672{
1673        dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1674
1675        rpc_set_port(xs_addr(xprt), port);
1676        xs_update_peer_port(xprt);
1677}
1678
1679static unsigned short xs_get_srcport(struct sock_xprt *transport)
1680{
1681        unsigned short port = transport->srcport;
1682
1683        if (port == 0 && transport->xprt.resvport)
1684                port = xs_get_random_port();
1685        return port;
1686}
1687
1688static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1689{
1690        if (transport->srcport != 0)
1691                transport->srcport = 0;
1692        if (!transport->xprt.resvport)
1693                return 0;
1694        if (port <= xprt_min_resvport || port > xprt_max_resvport)
1695                return xprt_max_resvport;
1696        return --port;
1697}
1698static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1699{
1700        struct sockaddr_storage myaddr;
1701        int err, nloop = 0;
1702        unsigned short port = xs_get_srcport(transport);
1703        unsigned short last;
1704
1705        memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1706        do {
1707                rpc_set_port((struct sockaddr *)&myaddr, port);
1708                err = kernel_bind(sock, (struct sockaddr *)&myaddr,
1709                                transport->xprt.addrlen);
1710                if (port == 0)
1711                        break;
1712                if (err == 0) {
1713                        transport->srcport = port;
1714                        break;
1715                }
1716                last = port;
1717                port = xs_next_srcport(transport, port);
1718                if (port > last)
1719                        nloop++;
1720        } while (err == -EADDRINUSE && nloop != 2);
1721
1722        if (myaddr.ss_family == AF_INET)
1723                dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
1724                                &((struct sockaddr_in *)&myaddr)->sin_addr,
1725                                port, err ? "failed" : "ok", err);
1726        else
1727                dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
1728                                &((struct sockaddr_in6 *)&myaddr)->sin6_addr,
1729                                port, err ? "failed" : "ok", err);
1730        return err;
1731}
1732
1733/*
1734 * We don't support autobind on AF_LOCAL sockets
1735 */
1736static void xs_local_rpcbind(struct rpc_task *task)
1737{
1738        xprt_set_bound(task->tk_xprt);
1739}
1740
1741static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
1742{
1743}
1744
1745#ifdef CONFIG_DEBUG_LOCK_ALLOC
1746static struct lock_class_key xs_key[2];
1747static struct lock_class_key xs_slock_key[2];
1748
1749static inline void xs_reclassify_socketu(struct socket *sock)
1750{
1751        struct sock *sk = sock->sk;
1752
1753        BUG_ON(sock_owned_by_user(sk));
1754        sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
1755                &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]);
1756}
1757
1758static inline void xs_reclassify_socket4(struct socket *sock)
1759{
1760        struct sock *sk = sock->sk;
1761
1762        BUG_ON(sock_owned_by_user(sk));
1763        sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1764                &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1765}
1766
1767static inline void xs_reclassify_socket6(struct socket *sock)
1768{
1769        struct sock *sk = sock->sk;
1770
1771        BUG_ON(sock_owned_by_user(sk));
1772        sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1773                &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1774}
1775
1776static inline void xs_reclassify_socket(int family, struct socket *sock)
1777{
1778        switch (family) {
1779        case AF_LOCAL:
1780                xs_reclassify_socketu(sock);
1781                break;
1782        case AF_INET:
1783                xs_reclassify_socket4(sock);
1784                break;
1785        case AF_INET6:
1786                xs_reclassify_socket6(sock);
1787                break;
1788        }
1789}
1790#else
1791static inline void xs_reclassify_socketu(struct socket *sock)
1792{
1793}
1794
1795static inline void xs_reclassify_socket4(struct socket *sock)
1796{
1797}
1798
1799static inline void xs_reclassify_socket6(struct socket *sock)
1800{
1801}
1802
1803static inline void xs_reclassify_socket(int family, struct socket *sock)
1804{
1805}
1806#endif
1807
1808static struct socket *xs_create_sock(struct rpc_xprt *xprt,
1809                struct sock_xprt *transport, int family, int type, int protocol)
1810{
1811        struct socket *sock;
1812        int err;
1813
1814        err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1);
1815        if (err < 0) {
1816                dprintk("RPC:       can't create %d transport socket (%d).\n",
1817                                protocol, -err);
1818                goto out;
1819        }
1820        xs_reclassify_socket(family, sock);
1821
1822        err = xs_bind(transport, sock);
1823        if (err) {
1824                sock_release(sock);
1825                goto out;
1826        }
1827
1828        return sock;
1829out:
1830        return ERR_PTR(err);
1831}
1832
1833static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1834                                      struct socket *sock)
1835{
1836        struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1837                                                                        xprt);
1838
1839        if (!transport->inet) {
1840                struct sock *sk = sock->sk;
1841
1842                write_lock_bh(&sk->sk_callback_lock);
1843
1844                xs_save_old_callbacks(transport, sk);
1845
1846                sk->sk_user_data = xprt;
1847                sk->sk_data_ready = xs_local_data_ready;
1848                sk->sk_write_space = xs_udp_write_space;
1849                sk->sk_error_report = xs_error_report;
1850                sk->sk_allocation = GFP_ATOMIC;
1851
1852                xprt_clear_connected(xprt);
1853
1854                /* Reset to new socket */
1855                transport->sock = sock;
1856                transport->inet = sk;
1857
1858                write_unlock_bh(&sk->sk_callback_lock);
1859        }
1860
1861        /* Tell the socket layer to start connecting... */
1862        xprt->stat.connect_count++;
1863        xprt->stat.connect_start = jiffies;
1864        return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1865}
1866
1867/**
1868 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
1869 * @xprt: RPC transport to connect
1870 * @transport: socket transport to connect
1871 * @create_sock: function to create a socket of the correct type
1872 *
1873 * Invoked by a work queue tasklet.
1874 */
1875static void xs_local_setup_socket(struct work_struct *work)
1876{
1877        struct sock_xprt *transport =
1878                container_of(work, struct sock_xprt, connect_worker.work);
1879        struct rpc_xprt *xprt = &transport->xprt;
1880        struct socket *sock;
1881        int status = -EIO;
1882
1883        if (xprt->shutdown)
1884                goto out;
1885
1886        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1887        status = __sock_create(xprt->xprt_net, AF_LOCAL,
1888                                        SOCK_STREAM, 0, &sock, 1);
1889        if (status < 0) {
1890                dprintk("RPC:       can't create AF_LOCAL "
1891                        "transport socket (%d).\n", -status);
1892                goto out;
1893        }
1894        xs_reclassify_socketu(sock);
1895
1896        dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
1897                        xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1898
1899        status = xs_local_finish_connecting(xprt, sock);
1900        switch (status) {
1901        case 0:
1902                dprintk("RPC:       xprt %p connected to %s\n",
1903                                xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1904                xprt_set_connected(xprt);
1905                break;
1906        case -ENOENT:
1907                dprintk("RPC:       xprt %p: socket %s does not exist\n",
1908                                xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1909                break;
1910        default:
1911                printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
1912                                __func__, -status,
1913                                xprt->address_strings[RPC_DISPLAY_ADDR]);
1914        }
1915
1916out:
1917        xprt_clear_connecting(xprt);
1918        xprt_wake_pending_tasks(xprt, status);
1919}
1920
1921static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1922{
1923        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1924
1925        if (!transport->inet) {
1926                struct sock *sk = sock->sk;
1927
1928                write_lock_bh(&sk->sk_callback_lock);
1929
1930                xs_save_old_callbacks(transport, sk);
1931
1932                sk->sk_user_data = xprt;
1933                sk->sk_data_ready = xs_udp_data_ready;
1934                sk->sk_write_space = xs_udp_write_space;
1935                sk->sk_error_report = xs_error_report;
1936                sk->sk_no_check = UDP_CSUM_NORCV;
1937                sk->sk_allocation = GFP_ATOMIC;
1938
1939                xprt_set_connected(xprt);
1940
1941                /* Reset to new socket */
1942                transport->sock = sock;
1943                transport->inet = sk;
1944
1945                write_unlock_bh(&sk->sk_callback_lock);
1946        }
1947        xs_udp_do_set_buffer_size(xprt);
1948}
1949
1950static void xs_udp_setup_socket(struct work_struct *work)
1951{
1952        struct sock_xprt *transport =
1953                container_of(work, struct sock_xprt, connect_worker.work);
1954        struct rpc_xprt *xprt = &transport->xprt;
1955        struct socket *sock = transport->sock;
1956        int status = -EIO;
1957
1958        if (xprt->shutdown)
1959                goto out;
1960
1961        /* Start by resetting any existing state */
1962        xs_reset_transport(transport);
1963        sock = xs_create_sock(xprt, transport,
1964                        xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
1965        if (IS_ERR(sock))
1966                goto out;
1967
1968        dprintk("RPC:       worker connecting xprt %p via %s to "
1969                                "%s (port %s)\n", xprt,
1970                        xprt->address_strings[RPC_DISPLAY_PROTO],
1971                        xprt->address_strings[RPC_DISPLAY_ADDR],
1972                        xprt->address_strings[RPC_DISPLAY_PORT]);
1973
1974        xs_udp_finish_connecting(xprt, sock);
1975        status = 0;
1976out:
1977        xprt_clear_connecting(xprt);
1978        xprt_wake_pending_tasks(xprt, status);
1979}
1980
1981/*
1982 * We need to preserve the port number so the reply cache on the server can
1983 * find our cached RPC replies when we get around to reconnecting.
1984 */
1985static void xs_abort_connection(struct sock_xprt *transport)
1986{
1987        int result;
1988        struct sockaddr any;
1989
1990        dprintk("RPC:       disconnecting xprt %p to reuse port\n", transport);
1991
1992        /*
1993         * Disconnect the transport socket by doing a connect operation
1994         * with AF_UNSPEC.  This should return immediately...
1995         */
1996        memset(&any, 0, sizeof(any));
1997        any.sa_family = AF_UNSPEC;
1998        result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1999        if (!result)
2000                xs_sock_mark_closed(&transport->xprt);
2001        else
2002                dprintk("RPC:       AF_UNSPEC connect return code %d\n",
2003                                result);
2004}
2005
2006static void xs_tcp_reuse_connection(struct sock_xprt *transport)
2007{
2008        unsigned int state = transport->inet->sk_state;
2009
2010        if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) {
2011                /* we don't need to abort the connection if the socket
2012                 * hasn't undergone a shutdown
2013                 */
2014                if (transport->inet->sk_shutdown == 0)
2015                        return;
2016                dprintk("RPC:       %s: TCP_CLOSEd and sk_shutdown set to %d\n",
2017                                __func__, transport->inet->sk_shutdown);
2018        }
2019        if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) {
2020                /* we don't need to abort the connection if the socket
2021                 * hasn't undergone a shutdown
2022                 */
2023                if (transport->inet->sk_shutdown == 0)
2024                        return;
2025                dprintk("RPC:       %s: ESTABLISHED/SYN_SENT "
2026                                "sk_shutdown set to %d\n",
2027                                __func__, transport->inet->sk_shutdown);
2028        }
2029        xs_abort_connection(transport);
2030}
2031
2032static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2033{
2034        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2035        int ret = -ENOTCONN;
2036
2037        if (!transport->inet) {
2038                struct sock *sk = sock->sk;
2039
2040                write_lock_bh(&sk->sk_callback_lock);
2041
2042                xs_save_old_callbacks(transport, sk);
2043
2044                sk->sk_user_data = xprt;
2045                sk->sk_data_ready = xs_tcp_data_ready;
2046                sk->sk_state_change = xs_tcp_state_change;
2047                sk->sk_write_space = xs_tcp_write_space;
2048                sk->sk_error_report = xs_error_report;
2049                sk->sk_allocation = GFP_ATOMIC;
2050
2051                /* socket options */
2052                sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
2053                sock_reset_flag(sk, SOCK_LINGER);
2054                tcp_sk(sk)->linger2 = 0;
2055                tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
2056
2057                xprt_clear_connected(xprt);
2058
2059                /* Reset to new socket */
2060                transport->sock = sock;
2061                transport->inet = sk;
2062
2063                write_unlock_bh(&sk->sk_callback_lock);
2064        }
2065
2066        if (!xprt_bound(xprt))
2067                goto out;
2068
2069        /* Tell the socket layer to start connecting... */
2070        xprt->stat.connect_count++;
2071        xprt->stat.connect_start = jiffies;
2072        ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
2073        switch (ret) {
2074        case 0:
2075        case -EINPROGRESS:
2076                /* SYN_SENT! */
2077                xprt->connect_cookie++;
2078                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2079                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2080        }
2081out:
2082        return ret;
2083}
2084
2085/**
2086 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
2087 * @xprt: RPC transport to connect
2088 * @transport: socket transport to connect
2089 * @create_sock: function to create a socket of the correct type
2090 *
2091 * Invoked by a work queue tasklet.
2092 */
2093static void xs_tcp_setup_socket(struct work_struct *work)
2094{
2095        struct sock_xprt *transport =
2096                container_of(work, struct sock_xprt, connect_worker.work);
2097        struct socket *sock = transport->sock;
2098        struct rpc_xprt *xprt = &transport->xprt;
2099        int status = -EIO;
2100
2101        if (xprt->shutdown)
2102                goto out;
2103
2104        if (!sock) {
2105                clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
2106                sock = xs_create_sock(xprt, transport,
2107                                xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP);
2108                if (IS_ERR(sock)) {
2109                        status = PTR_ERR(sock);
2110                        goto out;
2111                }
2112        } else {
2113                int abort_and_exit;
2114
2115                abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
2116                                &xprt->state);
2117                /* "close" the socket, preserving the local port */
2118                xs_tcp_reuse_connection(transport);
2119
2120                if (abort_and_exit)
2121                        goto out_eagain;
2122        }
2123
2124        dprintk("RPC:       worker connecting xprt %p via %s to "
2125                                "%s (port %s)\n", xprt,
2126                        xprt->address_strings[RPC_DISPLAY_PROTO],
2127                        xprt->address_strings[RPC_DISPLAY_ADDR],
2128                        xprt->address_strings[RPC_DISPLAY_PORT]);
2129
2130        status = xs_tcp_finish_connecting(xprt, sock);
2131        dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
2132                        xprt, -status, xprt_connected(xprt),
2133                        sock->sk->sk_state);
2134        switch (status) {
2135        default:
2136                printk("%s: connect returned unhandled error %d\n",
2137                        __func__, status);
2138        case -EADDRNOTAVAIL:
2139                /* We're probably in TIME_WAIT. Get rid of existing socket,
2140                 * and retry
2141                 */
2142                set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
2143                xprt_force_disconnect(xprt);
2144                break;
2145        case -ECONNREFUSED:
2146        case -ECONNRESET:
2147        case -ENETUNREACH:
2148                /* retry with existing socket, after a delay */
2149        case 0:
2150        case -EINPROGRESS:
2151        case -EALREADY:
2152                xprt_clear_connecting(xprt);
2153                return;
2154        case -EINVAL:
2155                /* Happens, for instance, if the user specified a link
2156                 * local IPv6 address without a scope-id.
2157                 */
2158                goto out;
2159        }
2160out_eagain:
2161        status = -EAGAIN;
2162out:
2163        xprt_clear_connecting(xprt);
2164        xprt_wake_pending_tasks(xprt, status);
2165}
2166
2167/**
2168 * xs_connect - connect a socket to a remote endpoint
2169 * @task: address of RPC task that manages state of connect request
2170 *
2171 * TCP: If the remote end dropped the connection, delay reconnecting.
2172 *
2173 * UDP socket connects are synchronous, but we use a work queue anyway
2174 * to guarantee that even unprivileged user processes can set up a
2175 * socket on a privileged port.
2176 *
2177 * If a UDP socket connect fails, the delay behavior here prevents
2178 * retry floods (hard mounts).
2179 */
2180static void xs_connect(struct rpc_task *task)
2181{
2182        struct rpc_xprt *xprt = task->tk_xprt;
2183        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2184
2185        if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
2186                dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2187                                "seconds\n",
2188                                xprt, xprt->reestablish_timeout / HZ);
2189                queue_delayed_work(rpciod_workqueue,
2190                                   &transport->connect_worker,
2191                                   xprt->reestablish_timeout);
2192                xprt->reestablish_timeout <<= 1;
2193                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2194                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2195                if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2196                        xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2197        } else {
2198                dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2199                queue_delayed_work(rpciod_workqueue,
2200                                   &transport->connect_worker, 0);
2201        }
2202}
2203
2204/**
2205 * xs_local_print_stats - display AF_LOCAL socket-specifc stats
2206 * @xprt: rpc_xprt struct containing statistics
2207 * @seq: output file
2208 *
2209 */
2210static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2211{
2212        long idle_time = 0;
2213
2214        if (xprt_connected(xprt))
2215                idle_time = (long)(jiffies - xprt->last_used) / HZ;
2216
2217        seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
2218                        "%llu %llu\n",
2219                        xprt->stat.bind_count,
2220                        xprt->stat.connect_count,
2221                        xprt->stat.connect_time,
2222                        idle_time,
2223                        xprt->stat.sends,
2224                        xprt->stat.recvs,
2225                        xprt->stat.bad_xids,
2226                        xprt->stat.req_u,
2227                        xprt->stat.bklog_u);
2228}
2229
2230/**
2231 * xs_udp_print_stats - display UDP socket-specifc stats
2232 * @xprt: rpc_xprt struct containing statistics
2233 * @seq: output file
2234 *
2235 */
2236static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2237{
2238        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2239
2240        seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2241                        transport->srcport,
2242                        xprt->stat.bind_count,
2243                        xprt->stat.sends,
2244                        xprt->stat.recvs,
2245                        xprt->stat.bad_xids,
2246                        xprt->stat.req_u,
2247                        xprt->stat.bklog_u);
2248}
2249
2250/**
2251 * xs_tcp_print_stats - display TCP socket-specifc stats
2252 * @xprt: rpc_xprt struct containing statistics
2253 * @seq: output file
2254 *
2255 */
2256static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2257{
2258        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2259        long idle_time = 0;
2260
2261        if (xprt_connected(xprt))
2262                idle_time = (long)(jiffies - xprt->last_used) / HZ;
2263
2264        seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2265                        transport->srcport,
2266                        xprt->stat.bind_count,
2267                        xprt->stat.connect_count,
2268                        xprt->stat.connect_time,
2269                        idle_time,
2270                        xprt->stat.sends,
2271                        xprt->stat.recvs,
2272                        xprt->stat.bad_xids,
2273                        xprt->stat.req_u,
2274                        xprt->stat.bklog_u);
2275}
2276
2277/*
2278 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2279 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2280 * to use the server side send routines.
2281 */
2282static void *bc_malloc(struct rpc_task *task, size_t size)
2283{
2284        struct page *page;
2285        struct rpc_buffer *buf;
2286
2287        BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2288        page = alloc_page(GFP_KERNEL);
2289
2290        if (!page)
2291                return NULL;
2292
2293        buf = page_address(page);
2294        buf->len = PAGE_SIZE;
2295
2296        return buf->data;
2297}
2298
2299/*
2300 * Free the space allocated in the bc_alloc routine
2301 */
2302static void bc_free(void *buffer)
2303{
2304        struct rpc_buffer *buf;
2305
2306        if (!buffer)
2307                return;
2308
2309        buf = container_of(buffer, struct rpc_buffer, data);
2310        free_page((unsigned long)buf);
2311}
2312
2313/*
2314 * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2315 * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2316 */
2317static int bc_sendto(struct rpc_rqst *req)
2318{
2319        int len;
2320        struct xdr_buf *xbufp = &req->rq_snd_buf;
2321        struct rpc_xprt *xprt = req->rq_xprt;
2322        struct sock_xprt *transport =
2323                                container_of(xprt, struct sock_xprt, xprt);
2324        struct socket *sock = transport->sock;
2325        unsigned long headoff;
2326        unsigned long tailoff;
2327
2328        xs_encode_stream_record_marker(xbufp);
2329
2330        tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2331        headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2332        len = svc_send_common(sock, xbufp,
2333                              virt_to_page(xbufp->head[0].iov_base), headoff,
2334                              xbufp->tail[0].iov_base, tailoff);
2335
2336        if (len != xbufp->len) {
2337                printk(KERN_NOTICE "Error sending entire callback!\n");
2338                len = -EAGAIN;
2339        }
2340
2341        return len;
2342}
2343
2344/*
2345 * The send routine. Borrows from svc_send
2346 */
2347static int bc_send_request(struct rpc_task *task)
2348{
2349        struct rpc_rqst *req = task->tk_rqstp;
2350        struct svc_xprt *xprt;
2351        struct svc_sock         *svsk;
2352        u32                     len;
2353
2354        dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2355        /*
2356         * Get the server socket associated with this callback xprt
2357         */
2358        xprt = req->rq_xprt->bc_xprt;
2359        svsk = container_of(xprt, struct svc_sock, sk_xprt);
2360
2361        /*
2362         * Grab the mutex to serialize data as the connection is shared
2363         * with the fore channel
2364         */
2365        if (!mutex_trylock(&xprt->xpt_mutex)) {
2366                rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2367                if (!mutex_trylock(&xprt->xpt_mutex))
2368                        return -EAGAIN;
2369                rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2370        }
2371        if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2372                len = -ENOTCONN;
2373        else
2374                len = bc_sendto(req);
2375        mutex_unlock(&xprt->xpt_mutex);
2376
2377        if (len > 0)
2378                len = 0;
2379
2380        return len;
2381}
2382
2383/*
2384 * The close routine. Since this is client initiated, we do nothing
2385 */
2386
2387static void bc_close(struct rpc_xprt *xprt)
2388{
2389}
2390
2391/*
2392 * The xprt destroy routine. Again, because this connection is client
2393 * initiated, we do nothing
2394 */
2395
2396static void bc_destroy(struct rpc_xprt *xprt)
2397{
2398}
2399
2400static struct rpc_xprt_ops xs_local_ops = {
2401        .reserve_xprt           = xprt_reserve_xprt,
2402        .release_xprt           = xs_tcp_release_xprt,
2403        .rpcbind                = xs_local_rpcbind,
2404        .set_port               = xs_local_set_port,
2405        .connect                = xs_connect,
2406        .buf_alloc              = rpc_malloc,
2407        .buf_free               = rpc_free,
2408        .send_request           = xs_local_send_request,
2409        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2410        .close                  = xs_close,
2411        .destroy                = xs_destroy,
2412        .print_stats            = xs_local_print_stats,
2413};
2414
2415static struct rpc_xprt_ops xs_udp_ops = {
2416        .set_buffer_size        = xs_udp_set_buffer_size,
2417        .reserve_xprt           = xprt_reserve_xprt_cong,
2418        .release_xprt           = xprt_release_xprt_cong,
2419        .rpcbind                = rpcb_getport_async,
2420        .set_port               = xs_set_port,
2421        .connect                = xs_connect,
2422        .buf_alloc              = rpc_malloc,
2423        .buf_free               = rpc_free,
2424        .send_request           = xs_udp_send_request,
2425        .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2426        .timer                  = xs_udp_timer,
2427        .release_request        = xprt_release_rqst_cong,
2428        .close                  = xs_close,
2429        .destroy                = xs_destroy,
2430        .print_stats            = xs_udp_print_stats,
2431};
2432
2433static struct rpc_xprt_ops xs_tcp_ops = {
2434        .reserve_xprt           = xprt_reserve_xprt,
2435        .release_xprt           = xs_tcp_release_xprt,
2436        .rpcbind                = rpcb_getport_async,
2437        .set_port               = xs_set_port,
2438        .connect                = xs_connect,
2439        .buf_alloc              = rpc_malloc,
2440        .buf_free               = rpc_free,
2441        .send_request           = xs_tcp_send_request,
2442        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2443        .close                  = xs_tcp_close,
2444        .destroy                = xs_destroy,
2445        .print_stats            = xs_tcp_print_stats,
2446};
2447
2448/*
2449 * The rpc_xprt_ops for the server backchannel
2450 */
2451
2452static struct rpc_xprt_ops bc_tcp_ops = {
2453        .reserve_xprt           = xprt_reserve_xprt,
2454        .release_xprt           = xprt_release_xprt,
2455        .buf_alloc              = bc_malloc,
2456        .buf_free               = bc_free,
2457        .send_request           = bc_send_request,
2458        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2459        .close                  = bc_close,
2460        .destroy                = bc_destroy,
2461        .print_stats            = xs_tcp_print_stats,
2462};
2463
2464static int xs_init_anyaddr(const int family, struct sockaddr *sap)
2465{
2466        static const struct sockaddr_in sin = {
2467                .sin_family             = AF_INET,
2468                .sin_addr.s_addr        = htonl(INADDR_ANY),
2469        };
2470        static const struct sockaddr_in6 sin6 = {
2471                .sin6_family            = AF_INET6,
2472                .sin6_addr              = IN6ADDR_ANY_INIT,
2473        };
2474
2475        switch (family) {
2476        case AF_LOCAL:
2477                break;
2478        case AF_INET:
2479                memcpy(sap, &sin, sizeof(sin));
2480                break;
2481        case AF_INET6:
2482                memcpy(sap, &sin6, sizeof(sin6));
2483                break;
2484        default:
2485                dprintk("RPC:       %s: Bad address family\n", __func__);
2486                return -EAFNOSUPPORT;
2487        }
2488        return 0;
2489}
2490
2491static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2492                                      unsigned int slot_table_size)
2493{
2494        struct rpc_xprt *xprt;
2495        struct sock_xprt *new;
2496
2497        if (args->addrlen > sizeof(xprt->addr)) {
2498                dprintk("RPC:       xs_setup_xprt: address too large\n");
2499                return ERR_PTR(-EBADF);
2500        }
2501
2502        xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size);
2503        if (xprt == NULL) {
2504                dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2505                                "rpc_xprt\n");
2506                return ERR_PTR(-ENOMEM);
2507        }
2508
2509        new = container_of(xprt, struct sock_xprt, xprt);
2510        memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2511        xprt->addrlen = args->addrlen;
2512        if (args->srcaddr)
2513                memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2514        else {
2515                int err;
2516                err = xs_init_anyaddr(args->dstaddr->sa_family,
2517                                        (struct sockaddr *)&new->srcaddr);
2518                if (err != 0)
2519                        return ERR_PTR(err);
2520        }
2521
2522        return xprt;
2523}
2524
2525static const struct rpc_timeout xs_local_default_timeout = {
2526        .to_initval = 10 * HZ,
2527        .to_maxval = 10 * HZ,
2528        .to_retries = 2,
2529};
2530
2531/**
2532 * xs_setup_local - Set up transport to use an AF_LOCAL socket
2533 * @args: rpc transport creation arguments
2534 *
2535 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2536 */
2537static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2538{
2539        struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
2540        struct sock_xprt *transport;
2541        struct rpc_xprt *xprt;
2542        struct rpc_xprt *ret;
2543
2544        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2545        if (IS_ERR(xprt))
2546                return xprt;
2547        transport = container_of(xprt, struct sock_xprt, xprt);
2548
2549        xprt->prot = 0;
2550        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2551        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2552
2553        xprt->bind_timeout = XS_BIND_TO;
2554        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2555        xprt->idle_timeout = XS_IDLE_DISC_TO;
2556
2557        xprt->ops = &xs_local_ops;
2558        xprt->timeout = &xs_local_default_timeout;
2559
2560        switch (sun->sun_family) {
2561        case AF_LOCAL:
2562                if (sun->sun_path[0] != '/') {
2563                        dprintk("RPC:       bad AF_LOCAL address: %s\n",
2564                                        sun->sun_path);
2565                        ret = ERR_PTR(-EINVAL);
2566                        goto out_err;
2567                }
2568                xprt_set_bound(xprt);
2569                INIT_DELAYED_WORK(&transport->connect_worker,
2570                                        xs_local_setup_socket);
2571                xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
2572                break;
2573        default:
2574                ret = ERR_PTR(-EAFNOSUPPORT);
2575                goto out_err;
2576        }
2577
2578        dprintk("RPC:       set up xprt to %s via AF_LOCAL\n",
2579                        xprt->address_strings[RPC_DISPLAY_ADDR]);
2580
2581        if (try_module_get(THIS_MODULE))
2582                return xprt;
2583        ret = ERR_PTR(-EINVAL);
2584out_err:
2585        xprt_free(xprt);
2586        return ret;
2587}
2588
2589static const struct rpc_timeout xs_udp_default_timeout = {
2590        .to_initval = 5 * HZ,
2591        .to_maxval = 30 * HZ,
2592        .to_increment = 5 * HZ,
2593        .to_retries = 5,
2594};
2595
2596/**
2597 * xs_setup_udp - Set up transport to use a UDP socket
2598 * @args: rpc transport creation arguments
2599 *
2600 */
2601static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2602{
2603        struct sockaddr *addr = args->dstaddr;
2604        struct rpc_xprt *xprt;
2605        struct sock_xprt *transport;
2606        struct rpc_xprt *ret;
2607
2608        xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
2609        if (IS_ERR(xprt))
2610                return xprt;
2611        transport = container_of(xprt, struct sock_xprt, xprt);
2612
2613        xprt->prot = IPPROTO_UDP;
2614        xprt->tsh_size = 0;
2615        /* XXX: header size can vary due to auth type, IPv6, etc. */
2616        xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2617
2618        xprt->bind_timeout = XS_BIND_TO;
2619        xprt->reestablish_timeout = XS_UDP_REEST_TO;
2620        xprt->idle_timeout = XS_IDLE_DISC_TO;
2621
2622        xprt->ops = &xs_udp_ops;
2623
2624        xprt->timeout = &xs_udp_default_timeout;
2625
2626        switch (addr->sa_family) {
2627        case AF_INET:
2628                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2629                        xprt_set_bound(xprt);
2630
2631                INIT_DELAYED_WORK(&transport->connect_worker,
2632                                        xs_udp_setup_socket);
2633                xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2634                break;
2635        case AF_INET6:
2636                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2637                        xprt_set_bound(xprt);
2638
2639                INIT_DELAYED_WORK(&transport->connect_worker,
2640                                        xs_udp_setup_socket);
2641                xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2642                break;
2643        default:
2644                ret = ERR_PTR(-EAFNOSUPPORT);
2645                goto out_err;
2646        }
2647
2648        if (xprt_bound(xprt))
2649                dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2650                                xprt->address_strings[RPC_DISPLAY_ADDR],
2651                                xprt->address_strings[RPC_DISPLAY_PORT],
2652                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2653        else
2654                dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2655                                xprt->address_strings[RPC_DISPLAY_ADDR],
2656                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2657
2658        if (try_module_get(THIS_MODULE))
2659                return xprt;
2660        ret = ERR_PTR(-EINVAL);
2661out_err:
2662        xprt_free(xprt);
2663        return ret;
2664}
2665
2666static const struct rpc_timeout xs_tcp_default_timeout = {
2667        .to_initval = 60 * HZ,
2668        .to_maxval = 60 * HZ,
2669        .to_retries = 2,
2670};
2671
2672/**
2673 * xs_setup_tcp - Set up transport to use a TCP socket
2674 * @args: rpc transport creation arguments
2675 *
2676 */
2677static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2678{
2679        struct sockaddr *addr = args->dstaddr;
2680        struct rpc_xprt *xprt;
2681        struct sock_xprt *transport;
2682        struct rpc_xprt *ret;
2683
2684        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2685        if (IS_ERR(xprt))
2686                return xprt;
2687        transport = container_of(xprt, struct sock_xprt, xprt);
2688
2689        xprt->prot = IPPROTO_TCP;
2690        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2691        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2692
2693        xprt->bind_timeout = XS_BIND_TO;
2694        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2695        xprt->idle_timeout = XS_IDLE_DISC_TO;
2696
2697        xprt->ops = &xs_tcp_ops;
2698        xprt->timeout = &xs_tcp_default_timeout;
2699
2700        switch (addr->sa_family) {
2701        case AF_INET:
2702                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2703                        xprt_set_bound(xprt);
2704
2705                INIT_DELAYED_WORK(&transport->connect_worker,
2706                                        xs_tcp_setup_socket);
2707                xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2708                break;
2709        case AF_INET6:
2710                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2711                        xprt_set_bound(xprt);
2712
2713                INIT_DELAYED_WORK(&transport->connect_worker,
2714                                        xs_tcp_setup_socket);
2715                xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2716                break;
2717        default:
2718                ret = ERR_PTR(-EAFNOSUPPORT);
2719                goto out_err;
2720        }
2721
2722        if (xprt_bound(xprt))
2723                dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2724                                xprt->address_strings[RPC_DISPLAY_ADDR],
2725                                xprt->address_strings[RPC_DISPLAY_PORT],
2726                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2727        else
2728                dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2729                                xprt->address_strings[RPC_DISPLAY_ADDR],
2730                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2731
2732
2733        if (try_module_get(THIS_MODULE))
2734                return xprt;
2735        ret = ERR_PTR(-EINVAL);
2736out_err:
2737        xprt_free(xprt);
2738        return ret;
2739}
2740
2741/**
2742 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2743 * @args: rpc transport creation arguments
2744 *
2745 */
2746static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2747{
2748        struct sockaddr *addr = args->dstaddr;
2749        struct rpc_xprt *xprt;
2750        struct sock_xprt *transport;
2751        struct svc_sock *bc_sock;
2752        struct rpc_xprt *ret;
2753
2754        if (args->bc_xprt->xpt_bc_xprt) {
2755                /*
2756                 * This server connection already has a backchannel
2757                 * export; we can't create a new one, as we wouldn't be
2758                 * able to match replies based on xid any more.  So,
2759                 * reuse the already-existing one:
2760                 */
2761                 return args->bc_xprt->xpt_bc_xprt;
2762        }
2763        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2764        if (IS_ERR(xprt))
2765                return xprt;
2766        transport = container_of(xprt, struct sock_xprt, xprt);
2767
2768        xprt->prot = IPPROTO_TCP;
2769        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2770        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2771        xprt->timeout = &xs_tcp_default_timeout;
2772
2773        /* backchannel */
2774        xprt_set_bound(xprt);
2775        xprt->bind_timeout = 0;
2776        xprt->reestablish_timeout = 0;
2777        xprt->idle_timeout = 0;
2778
2779        xprt->ops = &bc_tcp_ops;
2780
2781        switch (addr->sa_family) {
2782        case AF_INET:
2783                xs_format_peer_addresses(xprt, "tcp",
2784                                         RPCBIND_NETID_TCP);
2785                break;
2786        case AF_INET6:
2787                xs_format_peer_addresses(xprt, "tcp",
2788                                   RPCBIND_NETID_TCP6);
2789                break;
2790        default:
2791                ret = ERR_PTR(-EAFNOSUPPORT);
2792                goto out_err;
2793        }
2794
2795        dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2796                        xprt->address_strings[RPC_DISPLAY_ADDR],
2797                        xprt->address_strings[RPC_DISPLAY_PORT],
2798                        xprt->address_strings[RPC_DISPLAY_PROTO]);
2799
2800        /*
2801         * Once we've associated a backchannel xprt with a connection,
2802         * we want to keep it around as long as long as the connection
2803         * lasts, in case we need to start using it for a backchannel
2804         * again; this reference won't be dropped until bc_xprt is
2805         * destroyed.
2806         */
2807        xprt_get(xprt);
2808        args->bc_xprt->xpt_bc_xprt = xprt;
2809        xprt->bc_xprt = args->bc_xprt;
2810        bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2811        transport->sock = bc_sock->sk_sock;
2812        transport->inet = bc_sock->sk_sk;
2813
2814        /*
2815         * Since we don't want connections for the backchannel, we set
2816         * the xprt status to connected
2817         */
2818        xprt_set_connected(xprt);
2819
2820
2821        if (try_module_get(THIS_MODULE))
2822                return xprt;
2823        xprt_put(xprt);
2824        ret = ERR_PTR(-EINVAL);
2825out_err:
2826        xprt_free(xprt);
2827        return ret;
2828}
2829
2830static struct xprt_class        xs_local_transport = {
2831        .list           = LIST_HEAD_INIT(xs_local_transport.list),
2832        .name           = "named UNIX socket",
2833        .owner          = THIS_MODULE,
2834        .ident          = XPRT_TRANSPORT_LOCAL,
2835        .setup          = xs_setup_local,
2836};
2837
2838static struct xprt_class        xs_udp_transport = {
2839        .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2840        .name           = "udp",
2841        .owner          = THIS_MODULE,
2842        .ident          = XPRT_TRANSPORT_UDP,
2843        .setup          = xs_setup_udp,
2844};
2845
2846static struct xprt_class        xs_tcp_transport = {
2847        .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2848        .name           = "tcp",
2849        .owner          = THIS_MODULE,
2850        .ident          = XPRT_TRANSPORT_TCP,
2851        .setup          = xs_setup_tcp,
2852};
2853
2854static struct xprt_class        xs_bc_tcp_transport = {
2855        .list           = LIST_HEAD_INIT(xs_bc_tcp_transport.list),
2856        .name           = "tcp NFSv4.1 backchannel",
2857        .owner          = THIS_MODULE,
2858        .ident          = XPRT_TRANSPORT_BC_TCP,
2859        .setup          = xs_setup_bc_tcp,
2860};
2861
2862/**
2863 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2864 *
2865 */
2866int init_socket_xprt(void)
2867{
2868#ifdef RPC_DEBUG
2869        if (!sunrpc_table_header)
2870                sunrpc_table_header = register_sysctl_table(sunrpc_table);
2871#endif
2872
2873        xprt_register_transport(&xs_local_transport);
2874        xprt_register_transport(&xs_udp_transport);
2875        xprt_register_transport(&xs_tcp_transport);
2876        xprt_register_transport(&xs_bc_tcp_transport);
2877
2878        return 0;
2879}
2880
2881/**
2882 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2883 *
2884 */
2885void cleanup_socket_xprt(void)
2886{
2887#ifdef RPC_DEBUG
2888        if (sunrpc_table_header) {
2889                unregister_sysctl_table(sunrpc_table_header);
2890                sunrpc_table_header = NULL;
2891        }
2892#endif
2893
2894        xprt_unregister_transport(&xs_local_transport);
2895        xprt_unregister_transport(&xs_udp_transport);
2896        xprt_unregister_transport(&xs_tcp_transport);
2897        xprt_unregister_transport(&xs_bc_tcp_transport);
2898}
2899
2900static int param_set_uint_minmax(const char *val,
2901                const struct kernel_param *kp,
2902                unsigned int min, unsigned int max)
2903{
2904        unsigned long num;
2905        int ret;
2906
2907        if (!val)
2908                return -EINVAL;
2909        ret = strict_strtoul(val, 0, &num);
2910        if (ret == -EINVAL || num < min || num > max)
2911                return -EINVAL;
2912        *((unsigned int *)kp->arg) = num;
2913        return 0;
2914}
2915
2916static int param_set_portnr(const char *val, const struct kernel_param *kp)
2917{
2918        return param_set_uint_minmax(val, kp,
2919                        RPC_MIN_RESVPORT,
2920                        RPC_MAX_RESVPORT);
2921}
2922
2923static struct kernel_param_ops param_ops_portnr = {
2924        .set = param_set_portnr,
2925        .get = param_get_uint,
2926};
2927
2928#define param_check_portnr(name, p) \
2929        __param_check(name, p, unsigned int);
2930
2931module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
2932module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
2933
2934static int param_set_slot_table_size(const char *val,
2935                                     const struct kernel_param *kp)
2936{
2937        return param_set_uint_minmax(val, kp,
2938                        RPC_MIN_SLOT_TABLE,
2939                        RPC_MAX_SLOT_TABLE);
2940}
2941
2942static struct kernel_param_ops param_ops_slot_table_size = {
2943        .set = param_set_slot_table_size,
2944        .get = param_get_uint,
2945};
2946
2947#define param_check_slot_table_size(name, p) \
2948        __param_check(name, p, unsigned int);
2949
2950module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
2951                   slot_table_size, 0644);
2952module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
2953                   slot_table_size, 0644);
2954
2955