linux/net/ceph/messenger.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/crc32c.h>
   5#include <linux/ctype.h>
   6#include <linux/highmem.h>
   7#include <linux/inet.h>
   8#include <linux/kthread.h>
   9#include <linux/net.h>
  10#include <linux/nsproxy.h>
  11#include <linux/sched/mm.h>
  12#include <linux/slab.h>
  13#include <linux/socket.h>
  14#include <linux/string.h>
  15#ifdef  CONFIG_BLOCK
  16#include <linux/bio.h>
  17#endif  /* CONFIG_BLOCK */
  18#include <linux/dns_resolver.h>
  19#include <net/tcp.h>
  20
  21#include <linux/ceph/ceph_features.h>
  22#include <linux/ceph/libceph.h>
  23#include <linux/ceph/messenger.h>
  24#include <linux/ceph/decode.h>
  25#include <linux/ceph/pagelist.h>
  26#include <linux/export.h>
  27
  28/*
  29 * Ceph uses the messenger to exchange ceph_msg messages with other
  30 * hosts in the system.  The messenger provides ordered and reliable
  31 * delivery.  We tolerate TCP disconnects by reconnecting (with
  32 * exponential backoff) in the case of a fault (disconnection, bad
  33 * crc, protocol error).  Acks allow sent messages to be discarded by
  34 * the sender.
  35 */
  36
  37/*
  38 * We track the state of the socket on a given connection using
  39 * values defined below.  The transition to a new socket state is
  40 * handled by a function which verifies we aren't coming from an
  41 * unexpected state.
  42 *
  43 *      --------
  44 *      | NEW* |  transient initial state
  45 *      --------
  46 *          | con_sock_state_init()
  47 *          v
  48 *      ----------
  49 *      | CLOSED |  initialized, but no socket (and no
  50 *      ----------  TCP connection)
  51 *       ^      \
  52 *       |       \ con_sock_state_connecting()
  53 *       |        ----------------------
  54 *       |                              \
  55 *       + con_sock_state_closed()       \
  56 *       |+---------------------------    \
  57 *       | \                          \    \
  58 *       |  -----------                \    \
  59 *       |  | CLOSING |  socket event;  \    \
  60 *       |  -----------  await close     \    \
  61 *       |       ^                        \   |
  62 *       |       |                         \  |
  63 *       |       + con_sock_state_closing() \ |
  64 *       |      / \                         | |
  65 *       |     /   ---------------          | |
  66 *       |    /                   \         v v
  67 *       |   /                    --------------
  68 *       |  /    -----------------| CONNECTING |  socket created, TCP
  69 *       |  |   /                 --------------  connect initiated
  70 *       |  |   | con_sock_state_connected()
  71 *       |  |   v
  72 *      -------------
  73 *      | CONNECTED |  TCP connection established
  74 *      -------------
  75 *
  76 * State values for ceph_connection->sock_state; NEW is assumed to be 0.
  77 */
  78
  79#define CON_SOCK_STATE_NEW              0       /* -> CLOSED */
  80#define CON_SOCK_STATE_CLOSED           1       /* -> CONNECTING */
  81#define CON_SOCK_STATE_CONNECTING       2       /* -> CONNECTED or -> CLOSING */
  82#define CON_SOCK_STATE_CONNECTED        3       /* -> CLOSING or -> CLOSED */
  83#define CON_SOCK_STATE_CLOSING          4       /* -> CLOSED */
  84
  85/*
  86 * connection states
  87 */
  88#define CON_STATE_CLOSED        1  /* -> PREOPEN */
  89#define CON_STATE_PREOPEN       2  /* -> CONNECTING, CLOSED */
  90#define CON_STATE_CONNECTING    3  /* -> NEGOTIATING, CLOSED */
  91#define CON_STATE_NEGOTIATING   4  /* -> OPEN, CLOSED */
  92#define CON_STATE_OPEN          5  /* -> STANDBY, CLOSED */
  93#define CON_STATE_STANDBY       6  /* -> PREOPEN, CLOSED */
  94
  95/*
  96 * ceph_connection flag bits
  97 */
  98#define CON_FLAG_LOSSYTX           0  /* we can close channel or drop
  99                                       * messages on errors */
 100#define CON_FLAG_KEEPALIVE_PENDING 1  /* we need to send a keepalive */
 101#define CON_FLAG_WRITE_PENDING     2  /* we have data ready to send */
 102#define CON_FLAG_SOCK_CLOSED       3  /* socket state changed to closed */
 103#define CON_FLAG_BACKOFF           4  /* need to retry queuing delayed work */
 104
 105static bool con_flag_valid(unsigned long con_flag)
 106{
 107        switch (con_flag) {
 108        case CON_FLAG_LOSSYTX:
 109        case CON_FLAG_KEEPALIVE_PENDING:
 110        case CON_FLAG_WRITE_PENDING:
 111        case CON_FLAG_SOCK_CLOSED:
 112        case CON_FLAG_BACKOFF:
 113                return true;
 114        default:
 115                return false;
 116        }
 117}
 118
 119static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
 120{
 121        BUG_ON(!con_flag_valid(con_flag));
 122
 123        clear_bit(con_flag, &con->flags);
 124}
 125
 126static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
 127{
 128        BUG_ON(!con_flag_valid(con_flag));
 129
 130        set_bit(con_flag, &con->flags);
 131}
 132
 133static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
 134{
 135        BUG_ON(!con_flag_valid(con_flag));
 136
 137        return test_bit(con_flag, &con->flags);
 138}
 139
 140static bool con_flag_test_and_clear(struct ceph_connection *con,
 141                                        unsigned long con_flag)
 142{
 143        BUG_ON(!con_flag_valid(con_flag));
 144
 145        return test_and_clear_bit(con_flag, &con->flags);
 146}
 147
 148static bool con_flag_test_and_set(struct ceph_connection *con,
 149                                        unsigned long con_flag)
 150{
 151        BUG_ON(!con_flag_valid(con_flag));
 152
 153        return test_and_set_bit(con_flag, &con->flags);
 154}
 155
 156/* Slab caches for frequently-allocated structures */
 157
 158static struct kmem_cache        *ceph_msg_cache;
 159
 160/* static tag bytes (protocol control messages) */
 161static char tag_msg = CEPH_MSGR_TAG_MSG;
 162static char tag_ack = CEPH_MSGR_TAG_ACK;
 163static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
 164static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
 165
 166#ifdef CONFIG_LOCKDEP
 167static struct lock_class_key socket_class;
 168#endif
 169
 170static void queue_con(struct ceph_connection *con);
 171static void cancel_con(struct ceph_connection *con);
 172static void ceph_con_workfn(struct work_struct *);
 173static void con_fault(struct ceph_connection *con);
 174
 175/*
 176 * Nicely render a sockaddr as a string.  An array of formatted
 177 * strings is used, to approximate reentrancy.
 178 */
 179#define ADDR_STR_COUNT_LOG      5       /* log2(# address strings in array) */
 180#define ADDR_STR_COUNT          (1 << ADDR_STR_COUNT_LOG)
 181#define ADDR_STR_COUNT_MASK     (ADDR_STR_COUNT - 1)
 182#define MAX_ADDR_STR_LEN        64      /* 54 is enough */
 183
 184static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
 185static atomic_t addr_str_seq = ATOMIC_INIT(0);
 186
 187static struct page *zero_page;          /* used in certain error cases */
 188
 189const char *ceph_pr_addr(const struct ceph_entity_addr *addr)
 190{
 191        int i;
 192        char *s;
 193        struct sockaddr_storage ss = addr->in_addr; /* align */
 194        struct sockaddr_in *in4 = (struct sockaddr_in *)&ss;
 195        struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss;
 196
 197        i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
 198        s = addr_str[i];
 199
 200        switch (ss.ss_family) {
 201        case AF_INET:
 202                snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu",
 203                         le32_to_cpu(addr->type), &in4->sin_addr,
 204                         ntohs(in4->sin_port));
 205                break;
 206
 207        case AF_INET6:
 208                snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu",
 209                         le32_to_cpu(addr->type), &in6->sin6_addr,
 210                         ntohs(in6->sin6_port));
 211                break;
 212
 213        default:
 214                snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
 215                         ss.ss_family);
 216        }
 217
 218        return s;
 219}
 220EXPORT_SYMBOL(ceph_pr_addr);
 221
 222static void encode_my_addr(struct ceph_messenger *msgr)
 223{
 224        memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
 225        ceph_encode_banner_addr(&msgr->my_enc_addr);
 226}
 227
 228/*
 229 * work queue for all reading and writing to/from the socket.
 230 */
 231static struct workqueue_struct *ceph_msgr_wq;
 232
 233static int ceph_msgr_slab_init(void)
 234{
 235        BUG_ON(ceph_msg_cache);
 236        ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
 237        if (!ceph_msg_cache)
 238                return -ENOMEM;
 239
 240        return 0;
 241}
 242
 243static void ceph_msgr_slab_exit(void)
 244{
 245        BUG_ON(!ceph_msg_cache);
 246        kmem_cache_destroy(ceph_msg_cache);
 247        ceph_msg_cache = NULL;
 248}
 249
 250static void _ceph_msgr_exit(void)
 251{
 252        if (ceph_msgr_wq) {
 253                destroy_workqueue(ceph_msgr_wq);
 254                ceph_msgr_wq = NULL;
 255        }
 256
 257        BUG_ON(zero_page == NULL);
 258        put_page(zero_page);
 259        zero_page = NULL;
 260
 261        ceph_msgr_slab_exit();
 262}
 263
 264int __init ceph_msgr_init(void)
 265{
 266        if (ceph_msgr_slab_init())
 267                return -ENOMEM;
 268
 269        BUG_ON(zero_page != NULL);
 270        zero_page = ZERO_PAGE(0);
 271        get_page(zero_page);
 272
 273        /*
 274         * The number of active work items is limited by the number of
 275         * connections, so leave @max_active at default.
 276         */
 277        ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
 278        if (ceph_msgr_wq)
 279                return 0;
 280
 281        pr_err("msgr_init failed to create workqueue\n");
 282        _ceph_msgr_exit();
 283
 284        return -ENOMEM;
 285}
 286
 287void ceph_msgr_exit(void)
 288{
 289        BUG_ON(ceph_msgr_wq == NULL);
 290
 291        _ceph_msgr_exit();
 292}
 293
 294void ceph_msgr_flush(void)
 295{
 296        flush_workqueue(ceph_msgr_wq);
 297}
 298EXPORT_SYMBOL(ceph_msgr_flush);
 299
 300/* Connection socket state transition functions */
 301
 302static void con_sock_state_init(struct ceph_connection *con)
 303{
 304        int old_state;
 305
 306        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
 307        if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
 308                printk("%s: unexpected old state %d\n", __func__, old_state);
 309        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 310             CON_SOCK_STATE_CLOSED);
 311}
 312
 313static void con_sock_state_connecting(struct ceph_connection *con)
 314{
 315        int old_state;
 316
 317        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
 318        if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
 319                printk("%s: unexpected old state %d\n", __func__, old_state);
 320        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 321             CON_SOCK_STATE_CONNECTING);
 322}
 323
 324static void con_sock_state_connected(struct ceph_connection *con)
 325{
 326        int old_state;
 327
 328        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
 329        if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
 330                printk("%s: unexpected old state %d\n", __func__, old_state);
 331        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 332             CON_SOCK_STATE_CONNECTED);
 333}
 334
 335static void con_sock_state_closing(struct ceph_connection *con)
 336{
 337        int old_state;
 338
 339        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
 340        if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
 341                        old_state != CON_SOCK_STATE_CONNECTED &&
 342                        old_state != CON_SOCK_STATE_CLOSING))
 343                printk("%s: unexpected old state %d\n", __func__, old_state);
 344        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 345             CON_SOCK_STATE_CLOSING);
 346}
 347
 348static void con_sock_state_closed(struct ceph_connection *con)
 349{
 350        int old_state;
 351
 352        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
 353        if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
 354                    old_state != CON_SOCK_STATE_CLOSING &&
 355                    old_state != CON_SOCK_STATE_CONNECTING &&
 356                    old_state != CON_SOCK_STATE_CLOSED))
 357                printk("%s: unexpected old state %d\n", __func__, old_state);
 358        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 359             CON_SOCK_STATE_CLOSED);
 360}
 361
 362/*
 363 * socket callback functions
 364 */
 365
 366/* data available on socket, or listen socket received a connect */
 367static void ceph_sock_data_ready(struct sock *sk)
 368{
 369        struct ceph_connection *con = sk->sk_user_data;
 370        if (atomic_read(&con->msgr->stopping)) {
 371                return;
 372        }
 373
 374        if (sk->sk_state != TCP_CLOSE_WAIT) {
 375                dout("%s on %p state = %lu, queueing work\n", __func__,
 376                     con, con->state);
 377                queue_con(con);
 378        }
 379}
 380
 381/* socket has buffer space for writing */
 382static void ceph_sock_write_space(struct sock *sk)
 383{
 384        struct ceph_connection *con = sk->sk_user_data;
 385
 386        /* only queue to workqueue if there is data we want to write,
 387         * and there is sufficient space in the socket buffer to accept
 388         * more data.  clear SOCK_NOSPACE so that ceph_sock_write_space()
 389         * doesn't get called again until try_write() fills the socket
 390         * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
 391         * and net/core/stream.c:sk_stream_write_space().
 392         */
 393        if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) {
 394                if (sk_stream_is_writeable(sk)) {
 395                        dout("%s %p queueing write work\n", __func__, con);
 396                        clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 397                        queue_con(con);
 398                }
 399        } else {
 400                dout("%s %p nothing to write\n", __func__, con);
 401        }
 402}
 403
 404/* socket's state has changed */
 405static void ceph_sock_state_change(struct sock *sk)
 406{
 407        struct ceph_connection *con = sk->sk_user_data;
 408
 409        dout("%s %p state = %lu sk_state = %u\n", __func__,
 410             con, con->state, sk->sk_state);
 411
 412        switch (sk->sk_state) {
 413        case TCP_CLOSE:
 414                dout("%s TCP_CLOSE\n", __func__);
 415                fallthrough;
 416        case TCP_CLOSE_WAIT:
 417                dout("%s TCP_CLOSE_WAIT\n", __func__);
 418                con_sock_state_closing(con);
 419                con_flag_set(con, CON_FLAG_SOCK_CLOSED);
 420                queue_con(con);
 421                break;
 422        case TCP_ESTABLISHED:
 423                dout("%s TCP_ESTABLISHED\n", __func__);
 424                con_sock_state_connected(con);
 425                queue_con(con);
 426                break;
 427        default:        /* Everything else is uninteresting */
 428                break;
 429        }
 430}
 431
 432/*
 433 * set up socket callbacks
 434 */
 435static void set_sock_callbacks(struct socket *sock,
 436                               struct ceph_connection *con)
 437{
 438        struct sock *sk = sock->sk;
 439        sk->sk_user_data = con;
 440        sk->sk_data_ready = ceph_sock_data_ready;
 441        sk->sk_write_space = ceph_sock_write_space;
 442        sk->sk_state_change = ceph_sock_state_change;
 443}
 444
 445
 446/*
 447 * socket helpers
 448 */
 449
 450/*
 451 * initiate connection to a remote socket.
 452 */
 453static int ceph_tcp_connect(struct ceph_connection *con)
 454{
 455        struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */
 456        struct socket *sock;
 457        unsigned int noio_flag;
 458        int ret;
 459
 460        BUG_ON(con->sock);
 461
 462        /* sock_create_kern() allocates with GFP_KERNEL */
 463        noio_flag = memalloc_noio_save();
 464        ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family,
 465                               SOCK_STREAM, IPPROTO_TCP, &sock);
 466        memalloc_noio_restore(noio_flag);
 467        if (ret)
 468                return ret;
 469        sock->sk->sk_allocation = GFP_NOFS;
 470
 471#ifdef CONFIG_LOCKDEP
 472        lockdep_set_class(&sock->sk->sk_lock, &socket_class);
 473#endif
 474
 475        set_sock_callbacks(sock, con);
 476
 477        dout("connect %s\n", ceph_pr_addr(&con->peer_addr));
 478
 479        con_sock_state_connecting(con);
 480        ret = sock->ops->connect(sock, (struct sockaddr *)&ss, sizeof(ss),
 481                                 O_NONBLOCK);
 482        if (ret == -EINPROGRESS) {
 483                dout("connect %s EINPROGRESS sk_state = %u\n",
 484                     ceph_pr_addr(&con->peer_addr),
 485                     sock->sk->sk_state);
 486        } else if (ret < 0) {
 487                pr_err("connect %s error %d\n",
 488                       ceph_pr_addr(&con->peer_addr), ret);
 489                sock_release(sock);
 490                return ret;
 491        }
 492
 493        if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
 494                tcp_sock_set_nodelay(sock->sk);
 495
 496        con->sock = sock;
 497        return 0;
 498}
 499
 500/*
 501 * If @buf is NULL, discard up to @len bytes.
 502 */
 503static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
 504{
 505        struct kvec iov = {buf, len};
 506        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
 507        int r;
 508
 509        if (!buf)
 510                msg.msg_flags |= MSG_TRUNC;
 511
 512        iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len);
 513        r = sock_recvmsg(sock, &msg, msg.msg_flags);
 514        if (r == -EAGAIN)
 515                r = 0;
 516        return r;
 517}
 518
 519static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
 520                     int page_offset, size_t length)
 521{
 522        struct bio_vec bvec = {
 523                .bv_page = page,
 524                .bv_offset = page_offset,
 525                .bv_len = length
 526        };
 527        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
 528        int r;
 529
 530        BUG_ON(page_offset + length > PAGE_SIZE);
 531        iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length);
 532        r = sock_recvmsg(sock, &msg, msg.msg_flags);
 533        if (r == -EAGAIN)
 534                r = 0;
 535        return r;
 536}
 537
 538/*
 539 * write something.  @more is true if caller will be sending more data
 540 * shortly.
 541 */
 542static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
 543                            size_t kvlen, size_t len, bool more)
 544{
 545        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
 546        int r;
 547
 548        if (more)
 549                msg.msg_flags |= MSG_MORE;
 550        else
 551                msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
 552
 553        r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
 554        if (r == -EAGAIN)
 555                r = 0;
 556        return r;
 557}
 558
 559/*
 560 * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
 561 */
 562static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
 563                             int offset, size_t size, int more)
 564{
 565        ssize_t (*sendpage)(struct socket *sock, struct page *page,
 566                            int offset, size_t size, int flags);
 567        int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
 568        int ret;
 569
 570        /*
 571         * sendpage cannot properly handle pages with page_count == 0,
 572         * we need to fall back to sendmsg if that's the case.
 573         *
 574         * Same goes for slab pages: skb_can_coalesce() allows
 575         * coalescing neighboring slab objects into a single frag which
 576         * triggers one of hardened usercopy checks.
 577         */
 578        if (sendpage_ok(page))
 579                sendpage = sock->ops->sendpage;
 580        else
 581                sendpage = sock_no_sendpage;
 582
 583        ret = sendpage(sock, page, offset, size, flags);
 584        if (ret == -EAGAIN)
 585                ret = 0;
 586
 587        return ret;
 588}
 589
 590/*
 591 * Shutdown/close the socket for the given connection.
 592 */
 593static int con_close_socket(struct ceph_connection *con)
 594{
 595        int rc = 0;
 596
 597        dout("con_close_socket on %p sock %p\n", con, con->sock);
 598        if (con->sock) {
 599                rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
 600                sock_release(con->sock);
 601                con->sock = NULL;
 602        }
 603
 604        /*
 605         * Forcibly clear the SOCK_CLOSED flag.  It gets set
 606         * independent of the connection mutex, and we could have
 607         * received a socket close event before we had the chance to
 608         * shut the socket down.
 609         */
 610        con_flag_clear(con, CON_FLAG_SOCK_CLOSED);
 611
 612        con_sock_state_closed(con);
 613        return rc;
 614}
 615
 616/*
 617 * Reset a connection.  Discard all incoming and outgoing messages
 618 * and clear *_seq state.
 619 */
 620static void ceph_msg_remove(struct ceph_msg *msg)
 621{
 622        list_del_init(&msg->list_head);
 623
 624        ceph_msg_put(msg);
 625}
 626static void ceph_msg_remove_list(struct list_head *head)
 627{
 628        while (!list_empty(head)) {
 629                struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
 630                                                        list_head);
 631                ceph_msg_remove(msg);
 632        }
 633}
 634
 635static void reset_connection(struct ceph_connection *con)
 636{
 637        /* reset connection, out_queue, msg_ and connect_seq */
 638        /* discard existing out_queue and msg_seq */
 639        dout("reset_connection %p\n", con);
 640        ceph_msg_remove_list(&con->out_queue);
 641        ceph_msg_remove_list(&con->out_sent);
 642
 643        if (con->in_msg) {
 644                BUG_ON(con->in_msg->con != con);
 645                ceph_msg_put(con->in_msg);
 646                con->in_msg = NULL;
 647        }
 648
 649        con->connect_seq = 0;
 650        con->out_seq = 0;
 651        if (con->out_msg) {
 652                BUG_ON(con->out_msg->con != con);
 653                ceph_msg_put(con->out_msg);
 654                con->out_msg = NULL;
 655        }
 656        con->in_seq = 0;
 657        con->in_seq_acked = 0;
 658
 659        con->out_skip = 0;
 660}
 661
 662/*
 663 * mark a peer down.  drop any open connections.
 664 */
 665void ceph_con_close(struct ceph_connection *con)
 666{
 667        mutex_lock(&con->mutex);
 668        dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
 669        con->state = CON_STATE_CLOSED;
 670
 671        con_flag_clear(con, CON_FLAG_LOSSYTX);  /* so we retry next connect */
 672        con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING);
 673        con_flag_clear(con, CON_FLAG_WRITE_PENDING);
 674        con_flag_clear(con, CON_FLAG_BACKOFF);
 675
 676        reset_connection(con);
 677        con->peer_global_seq = 0;
 678        cancel_con(con);
 679        con_close_socket(con);
 680        mutex_unlock(&con->mutex);
 681}
 682EXPORT_SYMBOL(ceph_con_close);
 683
 684/*
 685 * Reopen a closed connection, with a new peer address.
 686 */
 687void ceph_con_open(struct ceph_connection *con,
 688                   __u8 entity_type, __u64 entity_num,
 689                   struct ceph_entity_addr *addr)
 690{
 691        mutex_lock(&con->mutex);
 692        dout("con_open %p %s\n", con, ceph_pr_addr(addr));
 693
 694        WARN_ON(con->state != CON_STATE_CLOSED);
 695        con->state = CON_STATE_PREOPEN;
 696
 697        con->peer_name.type = (__u8) entity_type;
 698        con->peer_name.num = cpu_to_le64(entity_num);
 699
 700        memcpy(&con->peer_addr, addr, sizeof(*addr));
 701        con->delay = 0;      /* reset backoff memory */
 702        mutex_unlock(&con->mutex);
 703        queue_con(con);
 704}
 705EXPORT_SYMBOL(ceph_con_open);
 706
 707/*
 708 * return true if this connection ever successfully opened
 709 */
 710bool ceph_con_opened(struct ceph_connection *con)
 711{
 712        return con->connect_seq > 0;
 713}
 714
 715/*
 716 * initialize a new connection.
 717 */
 718void ceph_con_init(struct ceph_connection *con, void *private,
 719        const struct ceph_connection_operations *ops,
 720        struct ceph_messenger *msgr)
 721{
 722        dout("con_init %p\n", con);
 723        memset(con, 0, sizeof(*con));
 724        con->private = private;
 725        con->ops = ops;
 726        con->msgr = msgr;
 727
 728        con_sock_state_init(con);
 729
 730        mutex_init(&con->mutex);
 731        INIT_LIST_HEAD(&con->out_queue);
 732        INIT_LIST_HEAD(&con->out_sent);
 733        INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
 734
 735        con->state = CON_STATE_CLOSED;
 736}
 737EXPORT_SYMBOL(ceph_con_init);
 738
 739
 740/*
 741 * We maintain a global counter to order connection attempts.  Get
 742 * a unique seq greater than @gt.
 743 */
 744static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
 745{
 746        u32 ret;
 747
 748        spin_lock(&msgr->global_seq_lock);
 749        if (msgr->global_seq < gt)
 750                msgr->global_seq = gt;
 751        ret = ++msgr->global_seq;
 752        spin_unlock(&msgr->global_seq_lock);
 753        return ret;
 754}
 755
 756static void con_out_kvec_reset(struct ceph_connection *con)
 757{
 758        BUG_ON(con->out_skip);
 759
 760        con->out_kvec_left = 0;
 761        con->out_kvec_bytes = 0;
 762        con->out_kvec_cur = &con->out_kvec[0];
 763}
 764
 765static void con_out_kvec_add(struct ceph_connection *con,
 766                                size_t size, void *data)
 767{
 768        int index = con->out_kvec_left;
 769
 770        BUG_ON(con->out_skip);
 771        BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
 772
 773        con->out_kvec[index].iov_len = size;
 774        con->out_kvec[index].iov_base = data;
 775        con->out_kvec_left++;
 776        con->out_kvec_bytes += size;
 777}
 778
 779/*
 780 * Chop off a kvec from the end.  Return residual number of bytes for
 781 * that kvec, i.e. how many bytes would have been written if the kvec
 782 * hadn't been nuked.
 783 */
 784static int con_out_kvec_skip(struct ceph_connection *con)
 785{
 786        int off = con->out_kvec_cur - con->out_kvec;
 787        int skip = 0;
 788
 789        if (con->out_kvec_bytes > 0) {
 790                skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
 791                BUG_ON(con->out_kvec_bytes < skip);
 792                BUG_ON(!con->out_kvec_left);
 793                con->out_kvec_bytes -= skip;
 794                con->out_kvec_left--;
 795        }
 796
 797        return skip;
 798}
 799
 800#ifdef CONFIG_BLOCK
 801
 802/*
 803 * For a bio data item, a piece is whatever remains of the next
 804 * entry in the current bio iovec, or the first entry in the next
 805 * bio in the list.
 806 */
 807static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
 808                                        size_t length)
 809{
 810        struct ceph_msg_data *data = cursor->data;
 811        struct ceph_bio_iter *it = &cursor->bio_iter;
 812
 813        cursor->resid = min_t(size_t, length, data->bio_length);
 814        *it = data->bio_pos;
 815        if (cursor->resid < it->iter.bi_size)
 816                it->iter.bi_size = cursor->resid;
 817
 818        BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
 819        cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
 820}
 821
 822static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
 823                                                size_t *page_offset,
 824                                                size_t *length)
 825{
 826        struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
 827                                           cursor->bio_iter.iter);
 828
 829        *page_offset = bv.bv_offset;
 830        *length = bv.bv_len;
 831        return bv.bv_page;
 832}
 833
 834static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
 835                                        size_t bytes)
 836{
 837        struct ceph_bio_iter *it = &cursor->bio_iter;
 838        struct page *page = bio_iter_page(it->bio, it->iter);
 839
 840        BUG_ON(bytes > cursor->resid);
 841        BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
 842        cursor->resid -= bytes;
 843        bio_advance_iter(it->bio, &it->iter, bytes);
 844
 845        if (!cursor->resid) {
 846                BUG_ON(!cursor->last_piece);
 847                return false;   /* no more data */
 848        }
 849
 850        if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done &&
 851                       page == bio_iter_page(it->bio, it->iter)))
 852                return false;   /* more bytes to process in this segment */
 853
 854        if (!it->iter.bi_size) {
 855                it->bio = it->bio->bi_next;
 856                it->iter = it->bio->bi_iter;
 857                if (cursor->resid < it->iter.bi_size)
 858                        it->iter.bi_size = cursor->resid;
 859        }
 860
 861        BUG_ON(cursor->last_piece);
 862        BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
 863        cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
 864        return true;
 865}
 866#endif /* CONFIG_BLOCK */
 867
 868static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
 869                                        size_t length)
 870{
 871        struct ceph_msg_data *data = cursor->data;
 872        struct bio_vec *bvecs = data->bvec_pos.bvecs;
 873
 874        cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
 875        cursor->bvec_iter = data->bvec_pos.iter;
 876        cursor->bvec_iter.bi_size = cursor->resid;
 877
 878        BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
 879        cursor->last_piece =
 880            cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
 881}
 882
 883static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
 884                                                size_t *page_offset,
 885                                                size_t *length)
 886{
 887        struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
 888                                           cursor->bvec_iter);
 889
 890        *page_offset = bv.bv_offset;
 891        *length = bv.bv_len;
 892        return bv.bv_page;
 893}
 894
 895static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
 896                                        size_t bytes)
 897{
 898        struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
 899        struct page *page = bvec_iter_page(bvecs, cursor->bvec_iter);
 900
 901        BUG_ON(bytes > cursor->resid);
 902        BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
 903        cursor->resid -= bytes;
 904        bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
 905
 906        if (!cursor->resid) {
 907                BUG_ON(!cursor->last_piece);
 908                return false;   /* no more data */
 909        }
 910
 911        if (!bytes || (cursor->bvec_iter.bi_bvec_done &&
 912                       page == bvec_iter_page(bvecs, cursor->bvec_iter)))
 913                return false;   /* more bytes to process in this segment */
 914
 915        BUG_ON(cursor->last_piece);
 916        BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
 917        cursor->last_piece =
 918            cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
 919        return true;
 920}
 921
 922/*
 923 * For a page array, a piece comes from the first page in the array
 924 * that has not already been fully consumed.
 925 */
 926static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
 927                                        size_t length)
 928{
 929        struct ceph_msg_data *data = cursor->data;
 930        int page_count;
 931
 932        BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
 933
 934        BUG_ON(!data->pages);
 935        BUG_ON(!data->length);
 936
 937        cursor->resid = min(length, data->length);
 938        page_count = calc_pages_for(data->alignment, (u64)data->length);
 939        cursor->page_offset = data->alignment & ~PAGE_MASK;
 940        cursor->page_index = 0;
 941        BUG_ON(page_count > (int)USHRT_MAX);
 942        cursor->page_count = (unsigned short)page_count;
 943        BUG_ON(length > SIZE_MAX - cursor->page_offset);
 944        cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE;
 945}
 946
 947static struct page *
 948ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
 949                                        size_t *page_offset, size_t *length)
 950{
 951        struct ceph_msg_data *data = cursor->data;
 952
 953        BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
 954
 955        BUG_ON(cursor->page_index >= cursor->page_count);
 956        BUG_ON(cursor->page_offset >= PAGE_SIZE);
 957
 958        *page_offset = cursor->page_offset;
 959        if (cursor->last_piece)
 960                *length = cursor->resid;
 961        else
 962                *length = PAGE_SIZE - *page_offset;
 963
 964        return data->pages[cursor->page_index];
 965}
 966
 967static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
 968                                                size_t bytes)
 969{
 970        BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
 971
 972        BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
 973
 974        /* Advance the cursor page offset */
 975
 976        cursor->resid -= bytes;
 977        cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
 978        if (!bytes || cursor->page_offset)
 979                return false;   /* more bytes to process in the current page */
 980
 981        if (!cursor->resid)
 982                return false;   /* no more data */
 983
 984        /* Move on to the next page; offset is already at 0 */
 985
 986        BUG_ON(cursor->page_index >= cursor->page_count);
 987        cursor->page_index++;
 988        cursor->last_piece = cursor->resid <= PAGE_SIZE;
 989
 990        return true;
 991}
 992
 993/*
 994 * For a pagelist, a piece is whatever remains to be consumed in the
 995 * first page in the list, or the front of the next page.
 996 */
 997static void
 998ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
 999                                        size_t length)
1000{
1001        struct ceph_msg_data *data = cursor->data;
1002        struct ceph_pagelist *pagelist;
1003        struct page *page;
1004
1005        BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
1006
1007        pagelist = data->pagelist;
1008        BUG_ON(!pagelist);
1009
1010        if (!length)
1011                return;         /* pagelist can be assigned but empty */
1012
1013        BUG_ON(list_empty(&pagelist->head));
1014        page = list_first_entry(&pagelist->head, struct page, lru);
1015
1016        cursor->resid = min(length, pagelist->length);
1017        cursor->page = page;
1018        cursor->offset = 0;
1019        cursor->last_piece = cursor->resid <= PAGE_SIZE;
1020}
1021
1022static struct page *
1023ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
1024                                size_t *page_offset, size_t *length)
1025{
1026        struct ceph_msg_data *data = cursor->data;
1027        struct ceph_pagelist *pagelist;
1028
1029        BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
1030
1031        pagelist = data->pagelist;
1032        BUG_ON(!pagelist);
1033
1034        BUG_ON(!cursor->page);
1035        BUG_ON(cursor->offset + cursor->resid != pagelist->length);
1036
1037        /* offset of first page in pagelist is always 0 */
1038        *page_offset = cursor->offset & ~PAGE_MASK;
1039        if (cursor->last_piece)
1040                *length = cursor->resid;
1041        else
1042                *length = PAGE_SIZE - *page_offset;
1043
1044        return cursor->page;
1045}
1046
1047static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
1048                                                size_t bytes)
1049{
1050        struct ceph_msg_data *data = cursor->data;
1051        struct ceph_pagelist *pagelist;
1052
1053        BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
1054
1055        pagelist = data->pagelist;
1056        BUG_ON(!pagelist);
1057
1058        BUG_ON(cursor->offset + cursor->resid != pagelist->length);
1059        BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
1060
1061        /* Advance the cursor offset */
1062
1063        cursor->resid -= bytes;
1064        cursor->offset += bytes;
1065        /* offset of first page in pagelist is always 0 */
1066        if (!bytes || cursor->offset & ~PAGE_MASK)
1067                return false;   /* more bytes to process in the current page */
1068
1069        if (!cursor->resid)
1070                return false;   /* no more data */
1071
1072        /* Move on to the next page */
1073
1074        BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
1075        cursor->page = list_next_entry(cursor->page, lru);
1076        cursor->last_piece = cursor->resid <= PAGE_SIZE;
1077
1078        return true;
1079}
1080
1081/*
1082 * Message data is handled (sent or received) in pieces, where each
1083 * piece resides on a single page.  The network layer might not
1084 * consume an entire piece at once.  A data item's cursor keeps
1085 * track of which piece is next to process and how much remains to
1086 * be processed in that piece.  It also tracks whether the current
1087 * piece is the last one in the data item.
1088 */
1089static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
1090{
1091        size_t length = cursor->total_resid;
1092
1093        switch (cursor->data->type) {
1094        case CEPH_MSG_DATA_PAGELIST:
1095                ceph_msg_data_pagelist_cursor_init(cursor, length);
1096                break;
1097        case CEPH_MSG_DATA_PAGES:
1098                ceph_msg_data_pages_cursor_init(cursor, length);
1099                break;
1100#ifdef CONFIG_BLOCK
1101        case CEPH_MSG_DATA_BIO:
1102                ceph_msg_data_bio_cursor_init(cursor, length);
1103                break;
1104#endif /* CONFIG_BLOCK */
1105        case CEPH_MSG_DATA_BVECS:
1106                ceph_msg_data_bvecs_cursor_init(cursor, length);
1107                break;
1108        case CEPH_MSG_DATA_NONE:
1109        default:
1110                /* BUG(); */
1111                break;
1112        }
1113        cursor->need_crc = true;
1114}
1115
1116static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
1117{
1118        struct ceph_msg_data_cursor *cursor = &msg->cursor;
1119
1120        BUG_ON(!length);
1121        BUG_ON(length > msg->data_length);
1122        BUG_ON(!msg->num_data_items);
1123
1124        cursor->total_resid = length;
1125        cursor->data = msg->data;
1126
1127        __ceph_msg_data_cursor_init(cursor);
1128}
1129
1130/*
1131 * Return the page containing the next piece to process for a given
1132 * data item, and supply the page offset and length of that piece.
1133 * Indicate whether this is the last piece in this data item.
1134 */
1135static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
1136                                        size_t *page_offset, size_t *length,
1137                                        bool *last_piece)
1138{
1139        struct page *page;
1140
1141        switch (cursor->data->type) {
1142        case CEPH_MSG_DATA_PAGELIST:
1143                page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
1144                break;
1145        case CEPH_MSG_DATA_PAGES:
1146                page = ceph_msg_data_pages_next(cursor, page_offset, length);
1147                break;
1148#ifdef CONFIG_BLOCK
1149        case CEPH_MSG_DATA_BIO:
1150                page = ceph_msg_data_bio_next(cursor, page_offset, length);
1151                break;
1152#endif /* CONFIG_BLOCK */
1153        case CEPH_MSG_DATA_BVECS:
1154                page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
1155                break;
1156        case CEPH_MSG_DATA_NONE:
1157        default:
1158                page = NULL;
1159                break;
1160        }
1161
1162        BUG_ON(!page);
1163        BUG_ON(*page_offset + *length > PAGE_SIZE);
1164        BUG_ON(!*length);
1165        BUG_ON(*length > cursor->resid);
1166        if (last_piece)
1167                *last_piece = cursor->last_piece;
1168
1169        return page;
1170}
1171
1172/*
1173 * Returns true if the result moves the cursor on to the next piece
1174 * of the data item.
1175 */
1176static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
1177                                  size_t bytes)
1178{
1179        bool new_piece;
1180
1181        BUG_ON(bytes > cursor->resid);
1182        switch (cursor->data->type) {
1183        case CEPH_MSG_DATA_PAGELIST:
1184                new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
1185                break;
1186        case CEPH_MSG_DATA_PAGES:
1187                new_piece = ceph_msg_data_pages_advance(cursor, bytes);
1188                break;
1189#ifdef CONFIG_BLOCK
1190        case CEPH_MSG_DATA_BIO:
1191                new_piece = ceph_msg_data_bio_advance(cursor, bytes);
1192                break;
1193#endif /* CONFIG_BLOCK */
1194        case CEPH_MSG_DATA_BVECS:
1195                new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
1196                break;
1197        case CEPH_MSG_DATA_NONE:
1198        default:
1199                BUG();
1200                break;
1201        }
1202        cursor->total_resid -= bytes;
1203
1204        if (!cursor->resid && cursor->total_resid) {
1205                WARN_ON(!cursor->last_piece);
1206                cursor->data++;
1207                __ceph_msg_data_cursor_init(cursor);
1208                new_piece = true;
1209        }
1210        cursor->need_crc = new_piece;
1211}
1212
1213static size_t sizeof_footer(struct ceph_connection *con)
1214{
1215        return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
1216            sizeof(struct ceph_msg_footer) :
1217            sizeof(struct ceph_msg_footer_old);
1218}
1219
1220static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
1221{
1222        /* Initialize data cursor */
1223
1224        ceph_msg_data_cursor_init(msg, (size_t)data_len);
1225}
1226
1227/*
1228 * Prepare footer for currently outgoing message, and finish things
1229 * off.  Assumes out_kvec* are already valid.. we just add on to the end.
1230 */
1231static void prepare_write_message_footer(struct ceph_connection *con)
1232{
1233        struct ceph_msg *m = con->out_msg;
1234
1235        m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
1236
1237        dout("prepare_write_message_footer %p\n", con);
1238        con_out_kvec_add(con, sizeof_footer(con), &m->footer);
1239        if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
1240                if (con->ops->sign_message)
1241                        con->ops->sign_message(m);
1242                else
1243                        m->footer.sig = 0;
1244        } else {
1245                m->old_footer.flags = m->footer.flags;
1246        }
1247        con->out_more = m->more_to_follow;
1248        con->out_msg_done = true;
1249}
1250
1251/*
1252 * Prepare headers for the next outgoing message.
1253 */
1254static void prepare_write_message(struct ceph_connection *con)
1255{
1256        struct ceph_msg *m;
1257        u32 crc;
1258
1259        con_out_kvec_reset(con);
1260        con->out_msg_done = false;
1261
1262        /* Sneak an ack in there first?  If we can get it into the same
1263         * TCP packet that's a good thing. */
1264        if (con->in_seq > con->in_seq_acked) {
1265                con->in_seq_acked = con->in_seq;
1266                con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
1267                con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1268                con_out_kvec_add(con, sizeof (con->out_temp_ack),
1269                        &con->out_temp_ack);
1270        }
1271
1272        BUG_ON(list_empty(&con->out_queue));
1273        m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
1274        con->out_msg = m;
1275        BUG_ON(m->con != con);
1276
1277        /* put message on sent list */
1278        ceph_msg_get(m);
1279        list_move_tail(&m->list_head, &con->out_sent);
1280
1281        /*
1282         * only assign outgoing seq # if we haven't sent this message
1283         * yet.  if it is requeued, resend with it's original seq.
1284         */
1285        if (m->needs_out_seq) {
1286                m->hdr.seq = cpu_to_le64(++con->out_seq);
1287                m->needs_out_seq = false;
1288
1289                if (con->ops->reencode_message)
1290                        con->ops->reencode_message(m);
1291        }
1292
1293        dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
1294             m, con->out_seq, le16_to_cpu(m->hdr.type),
1295             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
1296             m->data_length);
1297        WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
1298        WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
1299
1300        /* tag + hdr + front + middle */
1301        con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
1302        con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
1303        con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
1304
1305        if (m->middle)
1306                con_out_kvec_add(con, m->middle->vec.iov_len,
1307                        m->middle->vec.iov_base);
1308
1309        /* fill in hdr crc and finalize hdr */
1310        crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
1311        con->out_msg->hdr.crc = cpu_to_le32(crc);
1312        memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
1313
1314        /* fill in front and middle crc, footer */
1315        crc = crc32c(0, m->front.iov_base, m->front.iov_len);
1316        con->out_msg->footer.front_crc = cpu_to_le32(crc);
1317        if (m->middle) {
1318                crc = crc32c(0, m->middle->vec.iov_base,
1319                                m->middle->vec.iov_len);
1320                con->out_msg->footer.middle_crc = cpu_to_le32(crc);
1321        } else
1322                con->out_msg->footer.middle_crc = 0;
1323        dout("%s front_crc %u middle_crc %u\n", __func__,
1324             le32_to_cpu(con->out_msg->footer.front_crc),
1325             le32_to_cpu(con->out_msg->footer.middle_crc));
1326        con->out_msg->footer.flags = 0;
1327
1328        /* is there a data payload? */
1329        con->out_msg->footer.data_crc = 0;
1330        if (m->data_length) {
1331                prepare_message_data(con->out_msg, m->data_length);
1332                con->out_more = 1;  /* data + footer will follow */
1333        } else {
1334                /* no, queue up footer too and be done */
1335                prepare_write_message_footer(con);
1336        }
1337
1338        con_flag_set(con, CON_FLAG_WRITE_PENDING);
1339}
1340
1341/*
1342 * Prepare an ack.
1343 */
1344static void prepare_write_ack(struct ceph_connection *con)
1345{
1346        dout("prepare_write_ack %p %llu -> %llu\n", con,
1347             con->in_seq_acked, con->in_seq);
1348        con->in_seq_acked = con->in_seq;
1349
1350        con_out_kvec_reset(con);
1351
1352        con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
1353
1354        con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1355        con_out_kvec_add(con, sizeof (con->out_temp_ack),
1356                                &con->out_temp_ack);
1357
1358        con->out_more = 1;  /* more will follow.. eventually.. */
1359        con_flag_set(con, CON_FLAG_WRITE_PENDING);
1360}
1361
1362/*
1363 * Prepare to share the seq during handshake
1364 */
1365static void prepare_write_seq(struct ceph_connection *con)
1366{
1367        dout("prepare_write_seq %p %llu -> %llu\n", con,
1368             con->in_seq_acked, con->in_seq);
1369        con->in_seq_acked = con->in_seq;
1370
1371        con_out_kvec_reset(con);
1372
1373        con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1374        con_out_kvec_add(con, sizeof (con->out_temp_ack),
1375                         &con->out_temp_ack);
1376
1377        con_flag_set(con, CON_FLAG_WRITE_PENDING);
1378}
1379
1380/*
1381 * Prepare to write keepalive byte.
1382 */
1383static void prepare_write_keepalive(struct ceph_connection *con)
1384{
1385        dout("prepare_write_keepalive %p\n", con);
1386        con_out_kvec_reset(con);
1387        if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
1388                struct timespec64 now;
1389
1390                ktime_get_real_ts64(&now);
1391                con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
1392                ceph_encode_timespec64(&con->out_temp_keepalive2, &now);
1393                con_out_kvec_add(con, sizeof(con->out_temp_keepalive2),
1394                                 &con->out_temp_keepalive2);
1395        } else {
1396                con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
1397        }
1398        con_flag_set(con, CON_FLAG_WRITE_PENDING);
1399}
1400
1401/*
1402 * Connection negotiation.
1403 */
1404
1405static int get_connect_authorizer(struct ceph_connection *con)
1406{
1407        struct ceph_auth_handshake *auth;
1408        int auth_proto;
1409
1410        if (!con->ops->get_authorizer) {
1411                con->auth = NULL;
1412                con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
1413                con->out_connect.authorizer_len = 0;
1414                return 0;
1415        }
1416
1417        auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry);
1418        if (IS_ERR(auth))
1419                return PTR_ERR(auth);
1420
1421        con->auth = auth;
1422        con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
1423        con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len);
1424        return 0;
1425}
1426
1427/*
1428 * We connected to a peer and are saying hello.
1429 */
1430static void prepare_write_banner(struct ceph_connection *con)
1431{
1432        con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
1433        con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
1434                                        &con->msgr->my_enc_addr);
1435
1436        con->out_more = 0;
1437        con_flag_set(con, CON_FLAG_WRITE_PENDING);
1438}
1439
1440static void __prepare_write_connect(struct ceph_connection *con)
1441{
1442        con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect);
1443        if (con->auth)
1444                con_out_kvec_add(con, con->auth->authorizer_buf_len,
1445                                 con->auth->authorizer_buf);
1446
1447        con->out_more = 0;
1448        con_flag_set(con, CON_FLAG_WRITE_PENDING);
1449}
1450
1451static int prepare_write_connect(struct ceph_connection *con)
1452{
1453        unsigned int global_seq = get_global_seq(con->msgr, 0);
1454        int proto;
1455        int ret;
1456
1457        switch (con->peer_name.type) {
1458        case CEPH_ENTITY_TYPE_MON:
1459                proto = CEPH_MONC_PROTOCOL;
1460                break;
1461        case CEPH_ENTITY_TYPE_OSD:
1462                proto = CEPH_OSDC_PROTOCOL;
1463                break;
1464        case CEPH_ENTITY_TYPE_MDS:
1465                proto = CEPH_MDSC_PROTOCOL;
1466                break;
1467        default:
1468                BUG();
1469        }
1470
1471        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
1472             con->connect_seq, global_seq, proto);
1473
1474        con->out_connect.features =
1475            cpu_to_le64(from_msgr(con->msgr)->supported_features);
1476        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
1477        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
1478        con->out_connect.global_seq = cpu_to_le32(global_seq);
1479        con->out_connect.protocol_version = cpu_to_le32(proto);
1480        con->out_connect.flags = 0;
1481
1482        ret = get_connect_authorizer(con);
1483        if (ret)
1484                return ret;
1485
1486        __prepare_write_connect(con);
1487        return 0;
1488}
1489
1490/*
1491 * write as much of pending kvecs to the socket as we can.
1492 *  1 -> done
1493 *  0 -> socket full, but more to do
1494 * <0 -> error
1495 */
1496static int write_partial_kvec(struct ceph_connection *con)
1497{
1498        int ret;
1499
1500        dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
1501        while (con->out_kvec_bytes > 0) {
1502                ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
1503                                       con->out_kvec_left, con->out_kvec_bytes,
1504                                       con->out_more);
1505                if (ret <= 0)
1506                        goto out;
1507                con->out_kvec_bytes -= ret;
1508                if (con->out_kvec_bytes == 0)
1509                        break;            /* done */
1510
1511                /* account for full iov entries consumed */
1512                while (ret >= con->out_kvec_cur->iov_len) {
1513                        BUG_ON(!con->out_kvec_left);
1514                        ret -= con->out_kvec_cur->iov_len;
1515                        con->out_kvec_cur++;
1516                        con->out_kvec_left--;
1517                }
1518                /* and for a partially-consumed entry */
1519                if (ret) {
1520                        con->out_kvec_cur->iov_len -= ret;
1521                        con->out_kvec_cur->iov_base += ret;
1522                }
1523        }
1524        con->out_kvec_left = 0;
1525        ret = 1;
1526out:
1527        dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
1528             con->out_kvec_bytes, con->out_kvec_left, ret);
1529        return ret;  /* done! */
1530}
1531
1532static u32 ceph_crc32c_page(u32 crc, struct page *page,
1533                                unsigned int page_offset,
1534                                unsigned int length)
1535{
1536        char *kaddr;
1537
1538        kaddr = kmap(page);
1539        BUG_ON(kaddr == NULL);
1540        crc = crc32c(crc, kaddr + page_offset, length);
1541        kunmap(page);
1542
1543        return crc;
1544}
1545/*
1546 * Write as much message data payload as we can.  If we finish, queue
1547 * up the footer.
1548 *  1 -> done, footer is now queued in out_kvec[].
1549 *  0 -> socket full, but more to do
1550 * <0 -> error
1551 */
1552static int write_partial_message_data(struct ceph_connection *con)
1553{
1554        struct ceph_msg *msg = con->out_msg;
1555        struct ceph_msg_data_cursor *cursor = &msg->cursor;
1556        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1557        int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
1558        u32 crc;
1559
1560        dout("%s %p msg %p\n", __func__, con, msg);
1561
1562        if (!msg->num_data_items)
1563                return -EINVAL;
1564
1565        /*
1566         * Iterate through each page that contains data to be
1567         * written, and send as much as possible for each.
1568         *
1569         * If we are calculating the data crc (the default), we will
1570         * need to map the page.  If we have no pages, they have
1571         * been revoked, so use the zero page.
1572         */
1573        crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
1574        while (cursor->total_resid) {
1575                struct page *page;
1576                size_t page_offset;
1577                size_t length;
1578                int ret;
1579
1580                if (!cursor->resid) {
1581                        ceph_msg_data_advance(cursor, 0);
1582                        continue;
1583                }
1584
1585                page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
1586                if (length == cursor->total_resid)
1587                        more = MSG_MORE;
1588                ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
1589                                        more);
1590                if (ret <= 0) {
1591                        if (do_datacrc)
1592                                msg->footer.data_crc = cpu_to_le32(crc);
1593
1594                        return ret;
1595                }
1596                if (do_datacrc && cursor->need_crc)
1597                        crc = ceph_crc32c_page(crc, page, page_offset, length);
1598                ceph_msg_data_advance(cursor, (size_t)ret);
1599        }
1600
1601        dout("%s %p msg %p done\n", __func__, con, msg);
1602
1603        /* prepare and queue up footer, too */
1604        if (do_datacrc)
1605                msg->footer.data_crc = cpu_to_le32(crc);
1606        else
1607                msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
1608        con_out_kvec_reset(con);
1609        prepare_write_message_footer(con);
1610
1611        return 1;       /* must return > 0 to indicate success */
1612}
1613
1614/*
1615 * write some zeros
1616 */
1617static int write_partial_skip(struct ceph_connection *con)
1618{
1619        int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
1620        int ret;
1621
1622        dout("%s %p %d left\n", __func__, con, con->out_skip);
1623        while (con->out_skip > 0) {
1624                size_t size = min(con->out_skip, (int) PAGE_SIZE);
1625
1626                if (size == con->out_skip)
1627                        more = MSG_MORE;
1628                ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, more);
1629                if (ret <= 0)
1630                        goto out;
1631                con->out_skip -= ret;
1632        }
1633        ret = 1;
1634out:
1635        return ret;
1636}
1637
1638/*
1639 * Prepare to read connection handshake, or an ack.
1640 */
1641static void prepare_read_banner(struct ceph_connection *con)
1642{
1643        dout("prepare_read_banner %p\n", con);
1644        con->in_base_pos = 0;
1645}
1646
1647static void prepare_read_connect(struct ceph_connection *con)
1648{
1649        dout("prepare_read_connect %p\n", con);
1650        con->in_base_pos = 0;
1651}
1652
1653static void prepare_read_ack(struct ceph_connection *con)
1654{
1655        dout("prepare_read_ack %p\n", con);
1656        con->in_base_pos = 0;
1657}
1658
1659static void prepare_read_seq(struct ceph_connection *con)
1660{
1661        dout("prepare_read_seq %p\n", con);
1662        con->in_base_pos = 0;
1663        con->in_tag = CEPH_MSGR_TAG_SEQ;
1664}
1665
1666static void prepare_read_tag(struct ceph_connection *con)
1667{
1668        dout("prepare_read_tag %p\n", con);
1669        con->in_base_pos = 0;
1670        con->in_tag = CEPH_MSGR_TAG_READY;
1671}
1672
1673static void prepare_read_keepalive_ack(struct ceph_connection *con)
1674{
1675        dout("prepare_read_keepalive_ack %p\n", con);
1676        con->in_base_pos = 0;
1677}
1678
1679/*
1680 * Prepare to read a message.
1681 */
1682static int prepare_read_message(struct ceph_connection *con)
1683{
1684        dout("prepare_read_message %p\n", con);
1685        BUG_ON(con->in_msg != NULL);
1686        con->in_base_pos = 0;
1687        con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
1688        return 0;
1689}
1690
1691
1692static int read_partial(struct ceph_connection *con,
1693                        int end, int size, void *object)
1694{
1695        while (con->in_base_pos < end) {
1696                int left = end - con->in_base_pos;
1697                int have = size - left;
1698                int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
1699                if (ret <= 0)
1700                        return ret;
1701                con->in_base_pos += ret;
1702        }
1703        return 1;
1704}
1705
1706
1707/*
1708 * Read all or part of the connect-side handshake on a new connection
1709 */
1710static int read_partial_banner(struct ceph_connection *con)
1711{
1712        int size;
1713        int end;
1714        int ret;
1715
1716        dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
1717
1718        /* peer's banner */
1719        size = strlen(CEPH_BANNER);
1720        end = size;
1721        ret = read_partial(con, end, size, con->in_banner);
1722        if (ret <= 0)
1723                goto out;
1724
1725        size = sizeof (con->actual_peer_addr);
1726        end += size;
1727        ret = read_partial(con, end, size, &con->actual_peer_addr);
1728        if (ret <= 0)
1729                goto out;
1730        ceph_decode_banner_addr(&con->actual_peer_addr);
1731
1732        size = sizeof (con->peer_addr_for_me);
1733        end += size;
1734        ret = read_partial(con, end, size, &con->peer_addr_for_me);
1735        if (ret <= 0)
1736                goto out;
1737        ceph_decode_banner_addr(&con->peer_addr_for_me);
1738
1739out:
1740        return ret;
1741}
1742
1743static int read_partial_connect(struct ceph_connection *con)
1744{
1745        int size;
1746        int end;
1747        int ret;
1748
1749        dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
1750
1751        size = sizeof (con->in_reply);
1752        end = size;
1753        ret = read_partial(con, end, size, &con->in_reply);
1754        if (ret <= 0)
1755                goto out;
1756
1757        if (con->auth) {
1758                size = le32_to_cpu(con->in_reply.authorizer_len);
1759                if (size > con->auth->authorizer_reply_buf_len) {
1760                        pr_err("authorizer reply too big: %d > %zu\n", size,
1761                               con->auth->authorizer_reply_buf_len);
1762                        ret = -EINVAL;
1763                        goto out;
1764                }
1765
1766                end += size;
1767                ret = read_partial(con, end, size,
1768                                   con->auth->authorizer_reply_buf);
1769                if (ret <= 0)
1770                        goto out;
1771        }
1772
1773        dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
1774             con, (int)con->in_reply.tag,
1775             le32_to_cpu(con->in_reply.connect_seq),
1776             le32_to_cpu(con->in_reply.global_seq));
1777out:
1778        return ret;
1779}
1780
1781/*
1782 * Verify the hello banner looks okay.
1783 */
1784static int verify_hello(struct ceph_connection *con)
1785{
1786        if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1787                pr_err("connect to %s got bad banner\n",
1788                       ceph_pr_addr(&con->peer_addr));
1789                con->error_msg = "protocol error, bad banner";
1790                return -1;
1791        }
1792        return 0;
1793}
1794
1795static bool addr_is_blank(struct ceph_entity_addr *addr)
1796{
1797        struct sockaddr_storage ss = addr->in_addr; /* align */
1798        struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr;
1799        struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr;
1800
1801        switch (ss.ss_family) {
1802        case AF_INET:
1803                return addr4->s_addr == htonl(INADDR_ANY);
1804        case AF_INET6:
1805                return ipv6_addr_any(addr6);
1806        default:
1807                return true;
1808        }
1809}
1810
1811static int addr_port(struct ceph_entity_addr *addr)
1812{
1813        switch (get_unaligned(&addr->in_addr.ss_family)) {
1814        case AF_INET:
1815                return ntohs(get_unaligned(&((struct sockaddr_in *)&addr->in_addr)->sin_port));
1816        case AF_INET6:
1817                return ntohs(get_unaligned(&((struct sockaddr_in6 *)&addr->in_addr)->sin6_port));
1818        }
1819        return 0;
1820}
1821
1822static void addr_set_port(struct ceph_entity_addr *addr, int p)
1823{
1824        switch (get_unaligned(&addr->in_addr.ss_family)) {
1825        case AF_INET:
1826                put_unaligned(htons(p), &((struct sockaddr_in *)&addr->in_addr)->sin_port);
1827                break;
1828        case AF_INET6:
1829                put_unaligned(htons(p), &((struct sockaddr_in6 *)&addr->in_addr)->sin6_port);
1830                break;
1831        }
1832}
1833
1834/*
1835 * Unlike other *_pton function semantics, zero indicates success.
1836 */
1837static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr,
1838                char delim, const char **ipend)
1839{
1840        memset(&addr->in_addr, 0, sizeof(addr->in_addr));
1841
1842        if (in4_pton(str, len, (u8 *)&((struct sockaddr_in *)&addr->in_addr)->sin_addr.s_addr, delim, ipend)) {
1843                put_unaligned(AF_INET, &addr->in_addr.ss_family);
1844                return 0;
1845        }
1846
1847        if (in6_pton(str, len, (u8 *)&((struct sockaddr_in6 *)&addr->in_addr)->sin6_addr.s6_addr, delim, ipend)) {
1848                put_unaligned(AF_INET6, &addr->in_addr.ss_family);
1849                return 0;
1850        }
1851
1852        return -EINVAL;
1853}
1854
1855/*
1856 * Extract hostname string and resolve using kernel DNS facility.
1857 */
1858#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
1859static int ceph_dns_resolve_name(const char *name, size_t namelen,
1860                struct ceph_entity_addr *addr, char delim, const char **ipend)
1861{
1862        const char *end, *delim_p;
1863        char *colon_p, *ip_addr = NULL;
1864        int ip_len, ret;
1865
1866        /*
1867         * The end of the hostname occurs immediately preceding the delimiter or
1868         * the port marker (':') where the delimiter takes precedence.
1869         */
1870        delim_p = memchr(name, delim, namelen);
1871        colon_p = memchr(name, ':', namelen);
1872
1873        if (delim_p && colon_p)
1874                end = delim_p < colon_p ? delim_p : colon_p;
1875        else if (!delim_p && colon_p)
1876                end = colon_p;
1877        else {
1878                end = delim_p;
1879                if (!end) /* case: hostname:/ */
1880                        end = name + namelen;
1881        }
1882
1883        if (end <= name)
1884                return -EINVAL;
1885
1886        /* do dns_resolve upcall */
1887        ip_len = dns_query(current->nsproxy->net_ns,
1888                           NULL, name, end - name, NULL, &ip_addr, NULL, false);
1889        if (ip_len > 0)
1890                ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL);
1891        else
1892                ret = -ESRCH;
1893
1894        kfree(ip_addr);
1895
1896        *ipend = end;
1897
1898        pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
1899                        ret, ret ? "failed" : ceph_pr_addr(addr));
1900
1901        return ret;
1902}
1903#else
1904static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
1905                struct ceph_entity_addr *addr, char delim, const char **ipend)
1906{
1907        return -EINVAL;
1908}
1909#endif
1910
1911/*
1912 * Parse a server name (IP or hostname). If a valid IP address is not found
1913 * then try to extract a hostname to resolve using userspace DNS upcall.
1914 */
1915static int ceph_parse_server_name(const char *name, size_t namelen,
1916                struct ceph_entity_addr *addr, char delim, const char **ipend)
1917{
1918        int ret;
1919
1920        ret = ceph_pton(name, namelen, addr, delim, ipend);
1921        if (ret)
1922                ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend);
1923
1924        return ret;
1925}
1926
1927/*
1928 * Parse an ip[:port] list into an addr array.  Use the default
1929 * monitor port if a port isn't specified.
1930 */
1931int ceph_parse_ips(const char *c, const char *end,
1932                   struct ceph_entity_addr *addr,
1933                   int max_count, int *count)
1934{
1935        int i, ret = -EINVAL;
1936        const char *p = c;
1937
1938        dout("parse_ips on '%.*s'\n", (int)(end-c), c);
1939        for (i = 0; i < max_count; i++) {
1940                const char *ipend;
1941                int port;
1942                char delim = ',';
1943
1944                if (*p == '[') {
1945                        delim = ']';
1946                        p++;
1947                }
1948
1949                ret = ceph_parse_server_name(p, end - p, &addr[i], delim, &ipend);
1950                if (ret)
1951                        goto bad;
1952                ret = -EINVAL;
1953
1954                p = ipend;
1955
1956                if (delim == ']') {
1957                        if (*p != ']') {
1958                                dout("missing matching ']'\n");
1959                                goto bad;
1960                        }
1961                        p++;
1962                }
1963
1964                /* port? */
1965                if (p < end && *p == ':') {
1966                        port = 0;
1967                        p++;
1968                        while (p < end && *p >= '0' && *p <= '9') {
1969                                port = (port * 10) + (*p - '0');
1970                                p++;
1971                        }
1972                        if (port == 0)
1973                                port = CEPH_MON_PORT;
1974                        else if (port > 65535)
1975                                goto bad;
1976                } else {
1977                        port = CEPH_MON_PORT;
1978                }
1979
1980                addr_set_port(&addr[i], port);
1981                addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
1982
1983                dout("parse_ips got %s\n", ceph_pr_addr(&addr[i]));
1984
1985                if (p == end)
1986                        break;
1987                if (*p != ',')
1988                        goto bad;
1989                p++;
1990        }
1991
1992        if (p != end)
1993                goto bad;
1994
1995        if (count)
1996                *count = i + 1;
1997        return 0;
1998
1999bad:
2000        return ret;
2001}
2002
2003static int process_banner(struct ceph_connection *con)
2004{
2005        dout("process_banner on %p\n", con);
2006
2007        if (verify_hello(con) < 0)
2008                return -1;
2009
2010        /*
2011         * Make sure the other end is who we wanted.  note that the other
2012         * end may not yet know their ip address, so if it's 0.0.0.0, give
2013         * them the benefit of the doubt.
2014         */
2015        if (memcmp(&con->peer_addr, &con->actual_peer_addr,
2016                   sizeof(con->peer_addr)) != 0 &&
2017            !(addr_is_blank(&con->actual_peer_addr) &&
2018              con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
2019                pr_warn("wrong peer, want %s/%u, got %s/%u\n",
2020                        ceph_pr_addr(&con->peer_addr),
2021                        le32_to_cpu(con->peer_addr.nonce),
2022                        ceph_pr_addr(&con->actual_peer_addr),
2023                        le32_to_cpu(con->actual_peer_addr.nonce));
2024                con->error_msg = "wrong peer at address";
2025                return -1;
2026        }
2027
2028        /*
2029         * did we learn our address?
2030         */
2031        if (addr_is_blank(&con->msgr->inst.addr)) {
2032                int port = addr_port(&con->msgr->inst.addr);
2033
2034                memcpy(&con->msgr->inst.addr.in_addr,
2035                       &con->peer_addr_for_me.in_addr,
2036                       sizeof(con->peer_addr_for_me.in_addr));
2037                addr_set_port(&con->msgr->inst.addr, port);
2038                encode_my_addr(con->msgr);
2039                dout("process_banner learned my addr is %s\n",
2040                     ceph_pr_addr(&con->msgr->inst.addr));
2041        }
2042
2043        return 0;
2044}
2045
2046static int process_connect(struct ceph_connection *con)
2047{
2048        u64 sup_feat = from_msgr(con->msgr)->supported_features;
2049        u64 req_feat = from_msgr(con->msgr)->required_features;
2050        u64 server_feat = le64_to_cpu(con->in_reply.features);
2051        int ret;
2052
2053        dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
2054
2055        if (con->auth) {
2056                int len = le32_to_cpu(con->in_reply.authorizer_len);
2057
2058                /*
2059                 * Any connection that defines ->get_authorizer()
2060                 * should also define ->add_authorizer_challenge() and
2061                 * ->verify_authorizer_reply().
2062                 *
2063                 * See get_connect_authorizer().
2064                 */
2065                if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
2066                        ret = con->ops->add_authorizer_challenge(
2067                                    con, con->auth->authorizer_reply_buf, len);
2068                        if (ret < 0)
2069                                return ret;
2070
2071                        con_out_kvec_reset(con);
2072                        __prepare_write_connect(con);
2073                        prepare_read_connect(con);
2074                        return 0;
2075                }
2076
2077                if (len) {
2078                        ret = con->ops->verify_authorizer_reply(con);
2079                        if (ret < 0) {
2080                                con->error_msg = "bad authorize reply";
2081                                return ret;
2082                        }
2083                }
2084        }
2085
2086        switch (con->in_reply.tag) {
2087        case CEPH_MSGR_TAG_FEATURES:
2088                pr_err("%s%lld %s feature set mismatch,"
2089                       " my %llx < server's %llx, missing %llx\n",
2090                       ENTITY_NAME(con->peer_name),
2091                       ceph_pr_addr(&con->peer_addr),
2092                       sup_feat, server_feat, server_feat & ~sup_feat);
2093                con->error_msg = "missing required protocol features";
2094                reset_connection(con);
2095                return -1;
2096
2097        case CEPH_MSGR_TAG_BADPROTOVER:
2098                pr_err("%s%lld %s protocol version mismatch,"
2099                       " my %d != server's %d\n",
2100                       ENTITY_NAME(con->peer_name),
2101                       ceph_pr_addr(&con->peer_addr),
2102                       le32_to_cpu(con->out_connect.protocol_version),
2103                       le32_to_cpu(con->in_reply.protocol_version));
2104                con->error_msg = "protocol version mismatch";
2105                reset_connection(con);
2106                return -1;
2107
2108        case CEPH_MSGR_TAG_BADAUTHORIZER:
2109                con->auth_retry++;
2110                dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
2111                     con->auth_retry);
2112                if (con->auth_retry == 2) {
2113                        con->error_msg = "connect authorization failure";
2114                        return -1;
2115                }
2116                con_out_kvec_reset(con);
2117                ret = prepare_write_connect(con);
2118                if (ret < 0)
2119                        return ret;
2120                prepare_read_connect(con);
2121                break;
2122
2123        case CEPH_MSGR_TAG_RESETSESSION:
2124                /*
2125                 * If we connected with a large connect_seq but the peer
2126                 * has no record of a session with us (no connection, or
2127                 * connect_seq == 0), they will send RESETSESION to indicate
2128                 * that they must have reset their session, and may have
2129                 * dropped messages.
2130                 */
2131                dout("process_connect got RESET peer seq %u\n",
2132                     le32_to_cpu(con->in_reply.connect_seq));
2133                pr_err("%s%lld %s connection reset\n",
2134                       ENTITY_NAME(con->peer_name),
2135                       ceph_pr_addr(&con->peer_addr));
2136                reset_connection(con);
2137                con_out_kvec_reset(con);
2138                ret = prepare_write_connect(con);
2139                if (ret < 0)
2140                        return ret;
2141                prepare_read_connect(con);
2142
2143                /* Tell ceph about it. */
2144                mutex_unlock(&con->mutex);
2145                pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
2146                if (con->ops->peer_reset)
2147                        con->ops->peer_reset(con);
2148                mutex_lock(&con->mutex);
2149                if (con->state != CON_STATE_NEGOTIATING)
2150                        return -EAGAIN;
2151                break;
2152
2153        case CEPH_MSGR_TAG_RETRY_SESSION:
2154                /*
2155                 * If we sent a smaller connect_seq than the peer has, try
2156                 * again with a larger value.
2157                 */
2158                dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
2159                     le32_to_cpu(con->out_connect.connect_seq),
2160                     le32_to_cpu(con->in_reply.connect_seq));
2161                con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
2162                con_out_kvec_reset(con);
2163                ret = prepare_write_connect(con);
2164                if (ret < 0)
2165                        return ret;
2166                prepare_read_connect(con);
2167                break;
2168
2169        case CEPH_MSGR_TAG_RETRY_GLOBAL:
2170                /*
2171                 * If we sent a smaller global_seq than the peer has, try
2172                 * again with a larger value.
2173                 */
2174                dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
2175                     con->peer_global_seq,
2176                     le32_to_cpu(con->in_reply.global_seq));
2177                get_global_seq(con->msgr,
2178                               le32_to_cpu(con->in_reply.global_seq));
2179                con_out_kvec_reset(con);
2180                ret = prepare_write_connect(con);
2181                if (ret < 0)
2182                        return ret;
2183                prepare_read_connect(con);
2184                break;
2185
2186        case CEPH_MSGR_TAG_SEQ:
2187        case CEPH_MSGR_TAG_READY:
2188                if (req_feat & ~server_feat) {
2189                        pr_err("%s%lld %s protocol feature mismatch,"
2190                               " my required %llx > server's %llx, need %llx\n",
2191                               ENTITY_NAME(con->peer_name),
2192                               ceph_pr_addr(&con->peer_addr),
2193                               req_feat, server_feat, req_feat & ~server_feat);
2194                        con->error_msg = "missing required protocol features";
2195                        reset_connection(con);
2196                        return -1;
2197                }
2198
2199                WARN_ON(con->state != CON_STATE_NEGOTIATING);
2200                con->state = CON_STATE_OPEN;
2201                con->auth_retry = 0;    /* we authenticated; clear flag */
2202                con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
2203                con->connect_seq++;
2204                con->peer_features = server_feat;
2205                dout("process_connect got READY gseq %d cseq %d (%d)\n",
2206                     con->peer_global_seq,
2207                     le32_to_cpu(con->in_reply.connect_seq),
2208                     con->connect_seq);
2209                WARN_ON(con->connect_seq !=
2210                        le32_to_cpu(con->in_reply.connect_seq));
2211
2212                if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
2213                        con_flag_set(con, CON_FLAG_LOSSYTX);
2214
2215                con->delay = 0;      /* reset backoff memory */
2216
2217                if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
2218                        prepare_write_seq(con);
2219                        prepare_read_seq(con);
2220                } else {
2221                        prepare_read_tag(con);
2222                }
2223                break;
2224
2225        case CEPH_MSGR_TAG_WAIT:
2226                /*
2227                 * If there is a connection race (we are opening
2228                 * connections to each other), one of us may just have
2229                 * to WAIT.  This shouldn't happen if we are the
2230                 * client.
2231                 */
2232                con->error_msg = "protocol error, got WAIT as client";
2233                return -1;
2234
2235        default:
2236                con->error_msg = "protocol error, garbage tag during connect";
2237                return -1;
2238        }
2239        return 0;
2240}
2241
2242
2243/*
2244 * read (part of) an ack
2245 */
2246static int read_partial_ack(struct ceph_connection *con)
2247{
2248        int size = sizeof (con->in_temp_ack);
2249        int end = size;
2250
2251        return read_partial(con, end, size, &con->in_temp_ack);
2252}
2253
2254/*
2255 * We can finally discard anything that's been acked.
2256 */
2257static void process_ack(struct ceph_connection *con)
2258{
2259        struct ceph_msg *m;
2260        u64 ack = le64_to_cpu(con->in_temp_ack);
2261        u64 seq;
2262        bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ);
2263        struct list_head *list = reconnect ? &con->out_queue : &con->out_sent;
2264
2265        /*
2266         * In the reconnect case, con_fault() has requeued messages
2267         * in out_sent. We should cleanup old messages according to
2268         * the reconnect seq.
2269         */
2270        while (!list_empty(list)) {
2271                m = list_first_entry(list, struct ceph_msg, list_head);
2272                if (reconnect && m->needs_out_seq)
2273                        break;
2274                seq = le64_to_cpu(m->hdr.seq);
2275                if (seq > ack)
2276                        break;
2277                dout("got ack for seq %llu type %d at %p\n", seq,
2278                     le16_to_cpu(m->hdr.type), m);
2279                m->ack_stamp = jiffies;
2280                ceph_msg_remove(m);
2281        }
2282
2283        prepare_read_tag(con);
2284}
2285
2286
2287static int read_partial_message_section(struct ceph_connection *con,
2288                                        struct kvec *section,
2289                                        unsigned int sec_len, u32 *crc)
2290{
2291        int ret, left;
2292
2293        BUG_ON(!section);
2294
2295        while (section->iov_len < sec_len) {
2296                BUG_ON(section->iov_base == NULL);
2297                left = sec_len - section->iov_len;
2298                ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
2299                                       section->iov_len, left);
2300                if (ret <= 0)
2301                        return ret;
2302                section->iov_len += ret;
2303        }
2304        if (section->iov_len == sec_len)
2305                *crc = crc32c(0, section->iov_base, section->iov_len);
2306
2307        return 1;
2308}
2309
2310static int read_partial_msg_data(struct ceph_connection *con)
2311{
2312        struct ceph_msg *msg = con->in_msg;
2313        struct ceph_msg_data_cursor *cursor = &msg->cursor;
2314        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
2315        struct page *page;
2316        size_t page_offset;
2317        size_t length;
2318        u32 crc = 0;
2319        int ret;
2320
2321        if (!msg->num_data_items)
2322                return -EIO;
2323
2324        if (do_datacrc)
2325                crc = con->in_data_crc;
2326        while (cursor->total_resid) {
2327                if (!cursor->resid) {
2328                        ceph_msg_data_advance(cursor, 0);
2329                        continue;
2330                }
2331
2332                page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
2333                ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2334                if (ret <= 0) {
2335                        if (do_datacrc)
2336                                con->in_data_crc = crc;
2337
2338                        return ret;
2339                }
2340
2341                if (do_datacrc)
2342                        crc = ceph_crc32c_page(crc, page, page_offset, ret);
2343                ceph_msg_data_advance(cursor, (size_t)ret);
2344        }
2345        if (do_datacrc)
2346                con->in_data_crc = crc;
2347
2348        return 1;       /* must return > 0 to indicate success */
2349}
2350
2351/*
2352 * read (part of) a message.
2353 */
2354static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2355
2356static int read_partial_message(struct ceph_connection *con)
2357{
2358        struct ceph_msg *m = con->in_msg;
2359        int size;
2360        int end;
2361        int ret;
2362        unsigned int front_len, middle_len, data_len;
2363        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
2364        bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
2365        u64 seq;
2366        u32 crc;
2367
2368        dout("read_partial_message con %p msg %p\n", con, m);
2369
2370        /* header */
2371        size = sizeof (con->in_hdr);
2372        end = size;
2373        ret = read_partial(con, end, size, &con->in_hdr);
2374        if (ret <= 0)
2375                return ret;
2376
2377        crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
2378        if (cpu_to_le32(crc) != con->in_hdr.crc) {
2379                pr_err("read_partial_message bad hdr crc %u != expected %u\n",
2380                       crc, con->in_hdr.crc);
2381                return -EBADMSG;
2382        }
2383
2384        front_len = le32_to_cpu(con->in_hdr.front_len);
2385        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
2386                return -EIO;
2387        middle_len = le32_to_cpu(con->in_hdr.middle_len);
2388        if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
2389                return -EIO;
2390        data_len = le32_to_cpu(con->in_hdr.data_len);
2391        if (data_len > CEPH_MSG_MAX_DATA_LEN)
2392                return -EIO;
2393
2394        /* verify seq# */
2395        seq = le64_to_cpu(con->in_hdr.seq);
2396        if ((s64)seq - (s64)con->in_seq < 1) {
2397                pr_info("skipping %s%lld %s seq %lld expected %lld\n",
2398                        ENTITY_NAME(con->peer_name),
2399                        ceph_pr_addr(&con->peer_addr),
2400                        seq, con->in_seq + 1);
2401                con->in_base_pos = -front_len - middle_len - data_len -
2402                        sizeof_footer(con);
2403                con->in_tag = CEPH_MSGR_TAG_READY;
2404                return 1;
2405        } else if ((s64)seq - (s64)con->in_seq > 1) {
2406                pr_err("read_partial_message bad seq %lld expected %lld\n",
2407                       seq, con->in_seq + 1);
2408                con->error_msg = "bad message sequence # for incoming message";
2409                return -EBADE;
2410        }
2411
2412        /* allocate message? */
2413        if (!con->in_msg) {
2414                int skip = 0;
2415
2416                dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
2417                     front_len, data_len);
2418                ret = ceph_con_in_msg_alloc(con, &skip);
2419                if (ret < 0)
2420                        return ret;
2421
2422                BUG_ON(!con->in_msg ^ skip);
2423                if (skip) {
2424                        /* skip this message */
2425                        dout("alloc_msg said skip message\n");
2426                        con->in_base_pos = -front_len - middle_len - data_len -
2427                                sizeof_footer(con);
2428                        con->in_tag = CEPH_MSGR_TAG_READY;
2429                        con->in_seq++;
2430                        return 1;
2431                }
2432
2433                BUG_ON(!con->in_msg);
2434                BUG_ON(con->in_msg->con != con);
2435                m = con->in_msg;
2436                m->front.iov_len = 0;    /* haven't read it yet */
2437                if (m->middle)
2438                        m->middle->vec.iov_len = 0;
2439
2440                /* prepare for data payload, if any */
2441
2442                if (data_len)
2443                        prepare_message_data(con->in_msg, data_len);
2444        }
2445
2446        /* front */
2447        ret = read_partial_message_section(con, &m->front, front_len,
2448                                           &con->in_front_crc);
2449        if (ret <= 0)
2450                return ret;
2451
2452        /* middle */
2453        if (m->middle) {
2454                ret = read_partial_message_section(con, &m->middle->vec,
2455                                                   middle_len,
2456                                                   &con->in_middle_crc);
2457                if (ret <= 0)
2458                        return ret;
2459        }
2460
2461        /* (page) data */
2462        if (data_len) {
2463                ret = read_partial_msg_data(con);
2464                if (ret <= 0)
2465                        return ret;
2466        }
2467
2468        /* footer */
2469        size = sizeof_footer(con);
2470        end += size;
2471        ret = read_partial(con, end, size, &m->footer);
2472        if (ret <= 0)
2473                return ret;
2474
2475        if (!need_sign) {
2476                m->footer.flags = m->old_footer.flags;
2477                m->footer.sig = 0;
2478        }
2479
2480        dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
2481             m, front_len, m->footer.front_crc, middle_len,
2482             m->footer.middle_crc, data_len, m->footer.data_crc);
2483
2484        /* crc ok? */
2485        if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
2486                pr_err("read_partial_message %p front crc %u != exp. %u\n",
2487                       m, con->in_front_crc, m->footer.front_crc);
2488                return -EBADMSG;
2489        }
2490        if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
2491                pr_err("read_partial_message %p middle crc %u != exp %u\n",
2492                       m, con->in_middle_crc, m->footer.middle_crc);
2493                return -EBADMSG;
2494        }
2495        if (do_datacrc &&
2496            (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
2497            con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
2498                pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
2499                       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
2500                return -EBADMSG;
2501        }
2502
2503        if (need_sign && con->ops->check_message_signature &&
2504            con->ops->check_message_signature(m)) {
2505                pr_err("read_partial_message %p signature check failed\n", m);
2506                return -EBADMSG;
2507        }
2508
2509        return 1; /* done! */
2510}
2511
2512/*
2513 * Process message.  This happens in the worker thread.  The callback should
2514 * be careful not to do anything that waits on other incoming messages or it
2515 * may deadlock.
2516 */
2517static void process_message(struct ceph_connection *con)
2518{
2519        struct ceph_msg *msg = con->in_msg;
2520
2521        BUG_ON(con->in_msg->con != con);
2522        con->in_msg = NULL;
2523
2524        /* if first message, set peer_name */
2525        if (con->peer_name.type == 0)
2526                con->peer_name = msg->hdr.src;
2527
2528        con->in_seq++;
2529        mutex_unlock(&con->mutex);
2530
2531        dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
2532             msg, le64_to_cpu(msg->hdr.seq),
2533             ENTITY_NAME(msg->hdr.src),
2534             le16_to_cpu(msg->hdr.type),
2535             ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
2536             le32_to_cpu(msg->hdr.front_len),
2537             le32_to_cpu(msg->hdr.data_len),
2538             con->in_front_crc, con->in_middle_crc, con->in_data_crc);
2539        con->ops->dispatch(con, msg);
2540
2541        mutex_lock(&con->mutex);
2542}
2543
2544static int read_keepalive_ack(struct ceph_connection *con)
2545{
2546        struct ceph_timespec ceph_ts;
2547        size_t size = sizeof(ceph_ts);
2548        int ret = read_partial(con, size, size, &ceph_ts);
2549        if (ret <= 0)
2550                return ret;
2551        ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
2552        prepare_read_tag(con);
2553        return 1;
2554}
2555
2556/*
2557 * Write something to the socket.  Called in a worker thread when the
2558 * socket appears to be writeable and we have something ready to send.
2559 */
2560static int try_write(struct ceph_connection *con)
2561{
2562        int ret = 1;
2563
2564        dout("try_write start %p state %lu\n", con, con->state);
2565        if (con->state != CON_STATE_PREOPEN &&
2566            con->state != CON_STATE_CONNECTING &&
2567            con->state != CON_STATE_NEGOTIATING &&
2568            con->state != CON_STATE_OPEN)
2569                return 0;
2570
2571        /* open the socket first? */
2572        if (con->state == CON_STATE_PREOPEN) {
2573                BUG_ON(con->sock);
2574                con->state = CON_STATE_CONNECTING;
2575
2576                con_out_kvec_reset(con);
2577                prepare_write_banner(con);
2578                prepare_read_banner(con);
2579
2580                BUG_ON(con->in_msg);
2581                con->in_tag = CEPH_MSGR_TAG_READY;
2582                dout("try_write initiating connect on %p new state %lu\n",
2583                     con, con->state);
2584                ret = ceph_tcp_connect(con);
2585                if (ret < 0) {
2586                        con->error_msg = "connect error";
2587                        goto out;
2588                }
2589        }
2590
2591more:
2592        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
2593        BUG_ON(!con->sock);
2594
2595        /* kvec data queued? */
2596        if (con->out_kvec_left) {
2597                ret = write_partial_kvec(con);
2598                if (ret <= 0)
2599                        goto out;
2600        }
2601        if (con->out_skip) {
2602                ret = write_partial_skip(con);
2603                if (ret <= 0)
2604                        goto out;
2605        }
2606
2607        /* msg pages? */
2608        if (con->out_msg) {
2609                if (con->out_msg_done) {
2610                        ceph_msg_put(con->out_msg);
2611                        con->out_msg = NULL;   /* we're done with this one */
2612                        goto do_next;
2613                }
2614
2615                ret = write_partial_message_data(con);
2616                if (ret == 1)
2617                        goto more;  /* we need to send the footer, too! */
2618                if (ret == 0)
2619                        goto out;
2620                if (ret < 0) {
2621                        dout("try_write write_partial_message_data err %d\n",
2622                             ret);
2623                        goto out;
2624                }
2625        }
2626
2627do_next:
2628        if (con->state == CON_STATE_OPEN) {
2629                if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
2630                        prepare_write_keepalive(con);
2631                        goto more;
2632                }
2633                /* is anything else pending? */
2634                if (!list_empty(&con->out_queue)) {
2635                        prepare_write_message(con);
2636                        goto more;
2637                }
2638                if (con->in_seq > con->in_seq_acked) {
2639                        prepare_write_ack(con);
2640                        goto more;
2641                }
2642        }
2643
2644        /* Nothing to do! */
2645        con_flag_clear(con, CON_FLAG_WRITE_PENDING);
2646        dout("try_write nothing else to write.\n");
2647        ret = 0;
2648out:
2649        dout("try_write done on %p ret %d\n", con, ret);
2650        return ret;
2651}
2652
2653/*
2654 * Read what we can from the socket.
2655 */
2656static int try_read(struct ceph_connection *con)
2657{
2658        int ret = -1;
2659
2660more:
2661        dout("try_read start on %p state %lu\n", con, con->state);
2662        if (con->state != CON_STATE_CONNECTING &&
2663            con->state != CON_STATE_NEGOTIATING &&
2664            con->state != CON_STATE_OPEN)
2665                return 0;
2666
2667        BUG_ON(!con->sock);
2668
2669        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
2670             con->in_base_pos);
2671
2672        if (con->state == CON_STATE_CONNECTING) {
2673                dout("try_read connecting\n");
2674                ret = read_partial_banner(con);
2675                if (ret <= 0)
2676                        goto out;
2677                ret = process_banner(con);
2678                if (ret < 0)
2679                        goto out;
2680
2681                con->state = CON_STATE_NEGOTIATING;
2682
2683                /*
2684                 * Received banner is good, exchange connection info.
2685                 * Do not reset out_kvec, as sending our banner raced
2686                 * with receiving peer banner after connect completed.
2687                 */
2688                ret = prepare_write_connect(con);
2689                if (ret < 0)
2690                        goto out;
2691                prepare_read_connect(con);
2692
2693                /* Send connection info before awaiting response */
2694                goto out;
2695        }
2696
2697        if (con->state == CON_STATE_NEGOTIATING) {
2698                dout("try_read negotiating\n");
2699                ret = read_partial_connect(con);
2700                if (ret <= 0)
2701                        goto out;
2702                ret = process_connect(con);
2703                if (ret < 0)
2704                        goto out;
2705                goto more;
2706        }
2707
2708        WARN_ON(con->state != CON_STATE_OPEN);
2709
2710        if (con->in_base_pos < 0) {
2711                /*
2712                 * skipping + discarding content.
2713                 */
2714                ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
2715                if (ret <= 0)
2716                        goto out;
2717                dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
2718                con->in_base_pos += ret;
2719                if (con->in_base_pos)
2720                        goto more;
2721        }
2722        if (con->in_tag == CEPH_MSGR_TAG_READY) {
2723                /*
2724                 * what's next?
2725                 */
2726                ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
2727                if (ret <= 0)
2728                        goto out;
2729                dout("try_read got tag %d\n", (int)con->in_tag);
2730                switch (con->in_tag) {
2731                case CEPH_MSGR_TAG_MSG:
2732                        prepare_read_message(con);
2733                        break;
2734                case CEPH_MSGR_TAG_ACK:
2735                        prepare_read_ack(con);
2736                        break;
2737                case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
2738                        prepare_read_keepalive_ack(con);
2739                        break;
2740                case CEPH_MSGR_TAG_CLOSE:
2741                        con_close_socket(con);
2742                        con->state = CON_STATE_CLOSED;
2743                        goto out;
2744                default:
2745                        goto bad_tag;
2746                }
2747        }
2748        if (con->in_tag == CEPH_MSGR_TAG_MSG) {
2749                ret = read_partial_message(con);
2750                if (ret <= 0) {
2751                        switch (ret) {
2752                        case -EBADMSG:
2753                                con->error_msg = "bad crc/signature";
2754                                fallthrough;
2755                        case -EBADE:
2756                                ret = -EIO;
2757                                break;
2758                        case -EIO:
2759                                con->error_msg = "io error";
2760                                break;
2761                        }
2762                        goto out;
2763                }
2764                if (con->in_tag == CEPH_MSGR_TAG_READY)
2765                        goto more;
2766                process_message(con);
2767                if (con->state == CON_STATE_OPEN)
2768                        prepare_read_tag(con);
2769                goto more;
2770        }
2771        if (con->in_tag == CEPH_MSGR_TAG_ACK ||
2772            con->in_tag == CEPH_MSGR_TAG_SEQ) {
2773                /*
2774                 * the final handshake seq exchange is semantically
2775                 * equivalent to an ACK
2776                 */
2777                ret = read_partial_ack(con);
2778                if (ret <= 0)
2779                        goto out;
2780                process_ack(con);
2781                goto more;
2782        }
2783        if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
2784                ret = read_keepalive_ack(con);
2785                if (ret <= 0)
2786                        goto out;
2787                goto more;
2788        }
2789
2790out:
2791        dout("try_read done on %p ret %d\n", con, ret);
2792        return ret;
2793
2794bad_tag:
2795        pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
2796        con->error_msg = "protocol error, garbage tag";
2797        ret = -1;
2798        goto out;
2799}
2800
2801
2802/*
2803 * Atomically queue work on a connection after the specified delay.
2804 * Bump @con reference to avoid races with connection teardown.
2805 * Returns 0 if work was queued, or an error code otherwise.
2806 */
2807static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
2808{
2809        if (!con->ops->get(con)) {
2810                dout("%s %p ref count 0\n", __func__, con);
2811                return -ENOENT;
2812        }
2813
2814        dout("%s %p %lu\n", __func__, con, delay);
2815        if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
2816                dout("%s %p - already queued\n", __func__, con);
2817                con->ops->put(con);
2818                return -EBUSY;
2819        }
2820
2821        return 0;
2822}
2823
2824static void queue_con(struct ceph_connection *con)
2825{
2826        (void) queue_con_delay(con, 0);
2827}
2828
2829static void cancel_con(struct ceph_connection *con)
2830{
2831        if (cancel_delayed_work(&con->work)) {
2832                dout("%s %p\n", __func__, con);
2833                con->ops->put(con);
2834        }
2835}
2836
2837static bool con_sock_closed(struct ceph_connection *con)
2838{
2839        if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
2840                return false;
2841
2842#define CASE(x)                                                         \
2843        case CON_STATE_ ## x:                                           \
2844                con->error_msg = "socket closed (con state " #x ")";    \
2845                break;
2846
2847        switch (con->state) {
2848        CASE(CLOSED);
2849        CASE(PREOPEN);
2850        CASE(CONNECTING);
2851        CASE(NEGOTIATING);
2852        CASE(OPEN);
2853        CASE(STANDBY);
2854        default:
2855                pr_warn("%s con %p unrecognized state %lu\n",
2856                        __func__, con, con->state);
2857                con->error_msg = "unrecognized con state";
2858                BUG();
2859                break;
2860        }
2861#undef CASE
2862
2863        return true;
2864}
2865
2866static bool con_backoff(struct ceph_connection *con)
2867{
2868        int ret;
2869
2870        if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF))
2871                return false;
2872
2873        ret = queue_con_delay(con, round_jiffies_relative(con->delay));
2874        if (ret) {
2875                dout("%s: con %p FAILED to back off %lu\n", __func__,
2876                        con, con->delay);
2877                BUG_ON(ret == -ENOENT);
2878                con_flag_set(con, CON_FLAG_BACKOFF);
2879        }
2880
2881        return true;
2882}
2883
2884/* Finish fault handling; con->mutex must *not* be held here */
2885
2886static void con_fault_finish(struct ceph_connection *con)
2887{
2888        dout("%s %p\n", __func__, con);
2889
2890        /*
2891         * in case we faulted due to authentication, invalidate our
2892         * current tickets so that we can get new ones.
2893         */
2894        if (con->auth_retry) {
2895                dout("auth_retry %d, invalidating\n", con->auth_retry);
2896                if (con->ops->invalidate_authorizer)
2897                        con->ops->invalidate_authorizer(con);
2898                con->auth_retry = 0;
2899        }
2900
2901        if (con->ops->fault)
2902                con->ops->fault(con);
2903}
2904
2905/*
2906 * Do some work on a connection.  Drop a connection ref when we're done.
2907 */
2908static void ceph_con_workfn(struct work_struct *work)
2909{
2910        struct ceph_connection *con = container_of(work, struct ceph_connection,
2911                                                   work.work);
2912        bool fault;
2913
2914        mutex_lock(&con->mutex);
2915        while (true) {
2916                int ret;
2917
2918                if ((fault = con_sock_closed(con))) {
2919                        dout("%s: con %p SOCK_CLOSED\n", __func__, con);
2920                        break;
2921                }
2922                if (con_backoff(con)) {
2923                        dout("%s: con %p BACKOFF\n", __func__, con);
2924                        break;
2925                }
2926                if (con->state == CON_STATE_STANDBY) {
2927                        dout("%s: con %p STANDBY\n", __func__, con);
2928                        break;
2929                }
2930                if (con->state == CON_STATE_CLOSED) {
2931                        dout("%s: con %p CLOSED\n", __func__, con);
2932                        BUG_ON(con->sock);
2933                        break;
2934                }
2935                if (con->state == CON_STATE_PREOPEN) {
2936                        dout("%s: con %p PREOPEN\n", __func__, con);
2937                        BUG_ON(con->sock);
2938                }
2939
2940                ret = try_read(con);
2941                if (ret < 0) {
2942                        if (ret == -EAGAIN)
2943                                continue;
2944                        if (!con->error_msg)
2945                                con->error_msg = "socket error on read";
2946                        fault = true;
2947                        break;
2948                }
2949
2950                ret = try_write(con);
2951                if (ret < 0) {
2952                        if (ret == -EAGAIN)
2953                                continue;
2954                        if (!con->error_msg)
2955                                con->error_msg = "socket error on write";
2956                        fault = true;
2957                }
2958
2959                break;  /* If we make it to here, we're done */
2960        }
2961        if (fault)
2962                con_fault(con);
2963        mutex_unlock(&con->mutex);
2964
2965        if (fault)
2966                con_fault_finish(con);
2967
2968        con->ops->put(con);
2969}
2970
2971/*
2972 * Generic error/fault handler.  A retry mechanism is used with
2973 * exponential backoff
2974 */
2975static void con_fault(struct ceph_connection *con)
2976{
2977        dout("fault %p state %lu to peer %s\n",
2978             con, con->state, ceph_pr_addr(&con->peer_addr));
2979
2980        pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2981                ceph_pr_addr(&con->peer_addr), con->error_msg);
2982        con->error_msg = NULL;
2983
2984        WARN_ON(con->state != CON_STATE_CONNECTING &&
2985               con->state != CON_STATE_NEGOTIATING &&
2986               con->state != CON_STATE_OPEN);
2987
2988        con_close_socket(con);
2989
2990        if (con_flag_test(con, CON_FLAG_LOSSYTX)) {
2991                dout("fault on LOSSYTX channel, marking CLOSED\n");
2992                con->state = CON_STATE_CLOSED;
2993                return;
2994        }
2995
2996        if (con->in_msg) {
2997                BUG_ON(con->in_msg->con != con);
2998                ceph_msg_put(con->in_msg);
2999                con->in_msg = NULL;
3000        }
3001        if (con->out_msg) {
3002                BUG_ON(con->out_msg->con != con);
3003                ceph_msg_put(con->out_msg);
3004                con->out_msg = NULL;
3005        }
3006
3007        /* Requeue anything that hasn't been acked */
3008        list_splice_init(&con->out_sent, &con->out_queue);
3009
3010        /* If there are no messages queued or keepalive pending, place
3011         * the connection in a STANDBY state */
3012        if (list_empty(&con->out_queue) &&
3013            !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {
3014                dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
3015                con_flag_clear(con, CON_FLAG_WRITE_PENDING);
3016                con->state = CON_STATE_STANDBY;
3017        } else {
3018                /* retry after a delay. */
3019                con->state = CON_STATE_PREOPEN;
3020                if (con->delay == 0)
3021                        con->delay = BASE_DELAY_INTERVAL;
3022                else if (con->delay < MAX_DELAY_INTERVAL)
3023                        con->delay *= 2;
3024                con_flag_set(con, CON_FLAG_BACKOFF);
3025                queue_con(con);
3026        }
3027}
3028
3029
3030void ceph_messenger_reset_nonce(struct ceph_messenger *msgr)
3031{
3032        u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000;
3033        msgr->inst.addr.nonce = cpu_to_le32(nonce);
3034        encode_my_addr(msgr);
3035}
3036
3037/*
3038 * initialize a new messenger instance
3039 */
3040void ceph_messenger_init(struct ceph_messenger *msgr,
3041                         struct ceph_entity_addr *myaddr)
3042{
3043        spin_lock_init(&msgr->global_seq_lock);
3044
3045        if (myaddr)
3046                msgr->inst.addr = *myaddr;
3047
3048        /* select a random nonce */
3049        msgr->inst.addr.type = 0;
3050        get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
3051        encode_my_addr(msgr);
3052
3053        atomic_set(&msgr->stopping, 0);
3054        write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
3055
3056        dout("%s %p\n", __func__, msgr);
3057}
3058EXPORT_SYMBOL(ceph_messenger_init);
3059
3060void ceph_messenger_fini(struct ceph_messenger *msgr)
3061{
3062        put_net(read_pnet(&msgr->net));
3063}
3064EXPORT_SYMBOL(ceph_messenger_fini);
3065
3066static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
3067{
3068        if (msg->con)
3069                msg->con->ops->put(msg->con);
3070
3071        msg->con = con ? con->ops->get(con) : NULL;
3072        BUG_ON(msg->con != con);
3073}
3074
3075static void clear_standby(struct ceph_connection *con)
3076{
3077        /* come back from STANDBY? */
3078        if (con->state == CON_STATE_STANDBY) {
3079                dout("clear_standby %p and ++connect_seq\n", con);
3080                con->state = CON_STATE_PREOPEN;
3081                con->connect_seq++;
3082                WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING));
3083                WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING));
3084        }
3085}
3086
3087/*
3088 * Queue up an outgoing message on the given connection.
3089 */
3090void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
3091{
3092        /* set src+dst */
3093        msg->hdr.src = con->msgr->inst.name;
3094        BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
3095        msg->needs_out_seq = true;
3096
3097        mutex_lock(&con->mutex);
3098
3099        if (con->state == CON_STATE_CLOSED) {
3100                dout("con_send %p closed, dropping %p\n", con, msg);
3101                ceph_msg_put(msg);
3102                mutex_unlock(&con->mutex);
3103                return;
3104        }
3105
3106        msg_con_set(msg, con);
3107
3108        BUG_ON(!list_empty(&msg->list_head));
3109        list_add_tail(&msg->list_head, &con->out_queue);
3110        dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
3111             ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
3112             ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
3113             le32_to_cpu(msg->hdr.front_len),
3114             le32_to_cpu(msg->hdr.middle_len),
3115             le32_to_cpu(msg->hdr.data_len));
3116
3117        clear_standby(con);
3118        mutex_unlock(&con->mutex);
3119
3120        /* if there wasn't anything waiting to send before, queue
3121         * new work */
3122        if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
3123                queue_con(con);
3124}
3125EXPORT_SYMBOL(ceph_con_send);
3126
3127/*
3128 * Revoke a message that was previously queued for send
3129 */
3130void ceph_msg_revoke(struct ceph_msg *msg)
3131{
3132        struct ceph_connection *con = msg->con;
3133
3134        if (!con) {
3135                dout("%s msg %p null con\n", __func__, msg);
3136                return;         /* Message not in our possession */
3137        }
3138
3139        mutex_lock(&con->mutex);
3140        if (!list_empty(&msg->list_head)) {
3141                dout("%s %p msg %p - was on queue\n", __func__, con, msg);
3142                list_del_init(&msg->list_head);
3143                msg->hdr.seq = 0;
3144
3145                ceph_msg_put(msg);
3146        }
3147        if (con->out_msg == msg) {
3148                BUG_ON(con->out_skip);
3149                /* footer */
3150                if (con->out_msg_done) {
3151                        con->out_skip += con_out_kvec_skip(con);
3152                } else {
3153                        BUG_ON(!msg->data_length);
3154                        con->out_skip += sizeof_footer(con);
3155                }
3156                /* data, middle, front */
3157                if (msg->data_length)
3158                        con->out_skip += msg->cursor.total_resid;
3159                if (msg->middle)
3160                        con->out_skip += con_out_kvec_skip(con);
3161                con->out_skip += con_out_kvec_skip(con);
3162
3163                dout("%s %p msg %p - was sending, will write %d skip %d\n",
3164                     __func__, con, msg, con->out_kvec_bytes, con->out_skip);
3165                msg->hdr.seq = 0;
3166                con->out_msg = NULL;
3167                ceph_msg_put(msg);
3168        }
3169
3170        mutex_unlock(&con->mutex);
3171}
3172
3173/*
3174 * Revoke a message that we may be reading data into
3175 */
3176void ceph_msg_revoke_incoming(struct ceph_msg *msg)
3177{
3178        struct ceph_connection *con = msg->con;
3179
3180        if (!con) {
3181                dout("%s msg %p null con\n", __func__, msg);
3182                return;         /* Message not in our possession */
3183        }
3184
3185        mutex_lock(&con->mutex);
3186        if (con->in_msg == msg) {
3187                unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
3188                unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
3189                unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
3190
3191                /* skip rest of message */
3192                dout("%s %p msg %p revoked\n", __func__, con, msg);
3193                con->in_base_pos = con->in_base_pos -
3194                                sizeof(struct ceph_msg_header) -
3195                                front_len -
3196                                middle_len -
3197                                data_len -
3198                                sizeof(struct ceph_msg_footer);
3199                ceph_msg_put(con->in_msg);
3200                con->in_msg = NULL;
3201                con->in_tag = CEPH_MSGR_TAG_READY;
3202                con->in_seq++;
3203        } else {
3204                dout("%s %p in_msg %p msg %p no-op\n",
3205                     __func__, con, con->in_msg, msg);
3206        }
3207        mutex_unlock(&con->mutex);
3208}
3209
3210/*
3211 * Queue a keepalive byte to ensure the tcp connection is alive.
3212 */
3213void ceph_con_keepalive(struct ceph_connection *con)
3214{
3215        dout("con_keepalive %p\n", con);
3216        mutex_lock(&con->mutex);
3217        clear_standby(con);
3218        con_flag_set(con, CON_FLAG_KEEPALIVE_PENDING);
3219        mutex_unlock(&con->mutex);
3220
3221        if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
3222                queue_con(con);
3223}
3224EXPORT_SYMBOL(ceph_con_keepalive);
3225
3226bool ceph_con_keepalive_expired(struct ceph_connection *con,
3227                               unsigned long interval)
3228{
3229        if (interval > 0 &&
3230            (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
3231                struct timespec64 now;
3232                struct timespec64 ts;
3233                ktime_get_real_ts64(&now);
3234                jiffies_to_timespec64(interval, &ts);
3235                ts = timespec64_add(con->last_keepalive_ack, ts);
3236                return timespec64_compare(&now, &ts) >= 0;
3237        }
3238        return false;
3239}
3240
3241static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
3242{
3243        BUG_ON(msg->num_data_items >= msg->max_data_items);
3244        return &msg->data[msg->num_data_items++];
3245}
3246
3247static void ceph_msg_data_destroy(struct ceph_msg_data *data)
3248{
3249        if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
3250                int num_pages = calc_pages_for(data->alignment, data->length);
3251                ceph_release_page_vector(data->pages, num_pages);
3252        } else if (data->type == CEPH_MSG_DATA_PAGELIST) {
3253                ceph_pagelist_release(data->pagelist);
3254        }
3255}
3256
3257void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
3258                             size_t length, size_t alignment, bool own_pages)
3259{
3260        struct ceph_msg_data *data;
3261
3262        BUG_ON(!pages);
3263        BUG_ON(!length);
3264
3265        data = ceph_msg_data_add(msg);
3266        data->type = CEPH_MSG_DATA_PAGES;
3267        data->pages = pages;
3268        data->length = length;
3269        data->alignment = alignment & ~PAGE_MASK;
3270        data->own_pages = own_pages;
3271
3272        msg->data_length += length;
3273}
3274EXPORT_SYMBOL(ceph_msg_data_add_pages);
3275
3276void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
3277                                struct ceph_pagelist *pagelist)
3278{
3279        struct ceph_msg_data *data;
3280
3281        BUG_ON(!pagelist);
3282        BUG_ON(!pagelist->length);
3283
3284        data = ceph_msg_data_add(msg);
3285        data->type = CEPH_MSG_DATA_PAGELIST;
3286        refcount_inc(&pagelist->refcnt);
3287        data->pagelist = pagelist;
3288
3289        msg->data_length += pagelist->length;
3290}
3291EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
3292
3293#ifdef  CONFIG_BLOCK
3294void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
3295                           u32 length)
3296{
3297        struct ceph_msg_data *data;
3298
3299        data = ceph_msg_data_add(msg);
3300        data->type = CEPH_MSG_DATA_BIO;
3301        data->bio_pos = *bio_pos;
3302        data->bio_length = length;
3303
3304        msg->data_length += length;
3305}
3306EXPORT_SYMBOL(ceph_msg_data_add_bio);
3307#endif  /* CONFIG_BLOCK */
3308
3309void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
3310                             struct ceph_bvec_iter *bvec_pos)
3311{
3312        struct ceph_msg_data *data;
3313
3314        data = ceph_msg_data_add(msg);
3315        data->type = CEPH_MSG_DATA_BVECS;
3316        data->bvec_pos = *bvec_pos;
3317
3318        msg->data_length += bvec_pos->iter.bi_size;
3319}
3320EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
3321
3322/*
3323 * construct a new message with given type, size
3324 * the new msg has a ref count of 1.
3325 */
3326struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
3327                               gfp_t flags, bool can_fail)
3328{
3329        struct ceph_msg *m;
3330
3331        m = kmem_cache_zalloc(ceph_msg_cache, flags);
3332        if (m == NULL)
3333                goto out;
3334
3335        m->hdr.type = cpu_to_le16(type);
3336        m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
3337        m->hdr.front_len = cpu_to_le32(front_len);
3338
3339        INIT_LIST_HEAD(&m->list_head);
3340        kref_init(&m->kref);
3341
3342        /* front */
3343        if (front_len) {
3344                m->front.iov_base = ceph_kvmalloc(front_len, flags);
3345                if (m->front.iov_base == NULL) {
3346                        dout("ceph_msg_new can't allocate %d bytes\n",
3347                             front_len);
3348                        goto out2;
3349                }
3350        } else {
3351                m->front.iov_base = NULL;
3352        }
3353        m->front_alloc_len = m->front.iov_len = front_len;
3354
3355        if (max_data_items) {
3356                m->data = kmalloc_array(max_data_items, sizeof(*m->data),
3357                                        flags);
3358                if (!m->data)
3359                        goto out2;
3360
3361                m->max_data_items = max_data_items;
3362        }
3363
3364        dout("ceph_msg_new %p front %d\n", m, front_len);
3365        return m;
3366
3367out2:
3368        ceph_msg_put(m);
3369out:
3370        if (!can_fail) {
3371                pr_err("msg_new can't create type %d front %d\n", type,
3372                       front_len);
3373                WARN_ON(1);
3374        } else {
3375                dout("msg_new can't create type %d front %d\n", type,
3376                     front_len);
3377        }
3378        return NULL;
3379}
3380EXPORT_SYMBOL(ceph_msg_new2);
3381
3382struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3383                              bool can_fail)
3384{
3385        return ceph_msg_new2(type, front_len, 0, flags, can_fail);
3386}
3387EXPORT_SYMBOL(ceph_msg_new);
3388
3389/*
3390 * Allocate "middle" portion of a message, if it is needed and wasn't
3391 * allocated by alloc_msg.  This allows us to read a small fixed-size
3392 * per-type header in the front and then gracefully fail (i.e.,
3393 * propagate the error to the caller based on info in the front) when
3394 * the middle is too large.
3395 */
3396static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
3397{
3398        int type = le16_to_cpu(msg->hdr.type);
3399        int middle_len = le32_to_cpu(msg->hdr.middle_len);
3400
3401        dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
3402             ceph_msg_type_name(type), middle_len);
3403        BUG_ON(!middle_len);
3404        BUG_ON(msg->middle);
3405
3406        msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
3407        if (!msg->middle)
3408                return -ENOMEM;
3409        return 0;
3410}
3411
3412/*
3413 * Allocate a message for receiving an incoming message on a
3414 * connection, and save the result in con->in_msg.  Uses the
3415 * connection's private alloc_msg op if available.
3416 *
3417 * Returns 0 on success, or a negative error code.
3418 *
3419 * On success, if we set *skip = 1:
3420 *  - the next message should be skipped and ignored.
3421 *  - con->in_msg == NULL
3422 * or if we set *skip = 0:
3423 *  - con->in_msg is non-null.
3424 * On error (ENOMEM, EAGAIN, ...),
3425 *  - con->in_msg == NULL
3426 */
3427static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
3428{
3429        struct ceph_msg_header *hdr = &con->in_hdr;
3430        int middle_len = le32_to_cpu(hdr->middle_len);
3431        struct ceph_msg *msg;
3432        int ret = 0;
3433
3434        BUG_ON(con->in_msg != NULL);
3435        BUG_ON(!con->ops->alloc_msg);
3436
3437        mutex_unlock(&con->mutex);
3438        msg = con->ops->alloc_msg(con, hdr, skip);
3439        mutex_lock(&con->mutex);
3440        if (con->state != CON_STATE_OPEN) {
3441                if (msg)
3442                        ceph_msg_put(msg);
3443                return -EAGAIN;
3444        }
3445        if (msg) {
3446                BUG_ON(*skip);
3447                msg_con_set(msg, con);
3448                con->in_msg = msg;
3449        } else {
3450                /*
3451                 * Null message pointer means either we should skip
3452                 * this message or we couldn't allocate memory.  The
3453                 * former is not an error.
3454                 */
3455                if (*skip)
3456                        return 0;
3457
3458                con->error_msg = "error allocating memory for incoming message";
3459                return -ENOMEM;
3460        }
3461        memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
3462
3463        if (middle_len && !con->in_msg->middle) {
3464                ret = ceph_alloc_middle(con, con->in_msg);
3465                if (ret < 0) {
3466                        ceph_msg_put(con->in_msg);
3467                        con->in_msg = NULL;
3468                }
3469        }
3470
3471        return ret;
3472}
3473
3474
3475/*
3476 * Free a generically kmalloc'd message.
3477 */
3478static void ceph_msg_free(struct ceph_msg *m)
3479{
3480        dout("%s %p\n", __func__, m);
3481        kvfree(m->front.iov_base);
3482        kfree(m->data);
3483        kmem_cache_free(ceph_msg_cache, m);
3484}
3485
3486static void ceph_msg_release(struct kref *kref)
3487{
3488        struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
3489        int i;
3490
3491        dout("%s %p\n", __func__, m);
3492        WARN_ON(!list_empty(&m->list_head));
3493
3494        msg_con_set(m, NULL);
3495
3496        /* drop middle, data, if any */
3497        if (m->middle) {
3498                ceph_buffer_put(m->middle);
3499                m->middle = NULL;
3500        }
3501
3502        for (i = 0; i < m->num_data_items; i++)
3503                ceph_msg_data_destroy(&m->data[i]);
3504
3505        if (m->pool)
3506                ceph_msgpool_put(m->pool, m);
3507        else
3508                ceph_msg_free(m);
3509}
3510
3511struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
3512{
3513        dout("%s %p (was %d)\n", __func__, msg,
3514             kref_read(&msg->kref));
3515        kref_get(&msg->kref);
3516        return msg;
3517}
3518EXPORT_SYMBOL(ceph_msg_get);
3519
3520void ceph_msg_put(struct ceph_msg *msg)
3521{
3522        dout("%s %p (was %d)\n", __func__, msg,
3523             kref_read(&msg->kref));
3524        kref_put(&msg->kref, ceph_msg_release);
3525}
3526EXPORT_SYMBOL(ceph_msg_put);
3527
3528void ceph_msg_dump(struct ceph_msg *msg)
3529{
3530        pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
3531                 msg->front_alloc_len, msg->data_length);
3532        print_hex_dump(KERN_DEBUG, "header: ",
3533                       DUMP_PREFIX_OFFSET, 16, 1,
3534                       &msg->hdr, sizeof(msg->hdr), true);
3535        print_hex_dump(KERN_DEBUG, " front: ",
3536                       DUMP_PREFIX_OFFSET, 16, 1,
3537                       msg->front.iov_base, msg->front.iov_len, true);
3538        if (msg->middle)
3539                print_hex_dump(KERN_DEBUG, "middle: ",
3540                               DUMP_PREFIX_OFFSET, 16, 1,
3541                               msg->middle->vec.iov_base,
3542                               msg->middle->vec.iov_len, true);
3543        print_hex_dump(KERN_DEBUG, "footer: ",
3544                       DUMP_PREFIX_OFFSET, 16, 1,
3545                       &msg->footer, sizeof(msg->footer), true);
3546}
3547EXPORT_SYMBOL(ceph_msg_dump);
3548