linux/net/ceph/messenger_v1.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/bvec.h>
   5#include <linux/crc32c.h>
   6#include <linux/net.h>
   7#include <linux/socket.h>
   8#include <net/sock.h>
   9
  10#include <linux/ceph/ceph_features.h>
  11#include <linux/ceph/decode.h>
  12#include <linux/ceph/libceph.h>
  13#include <linux/ceph/messenger.h>
  14
  15/* static tag bytes (protocol control messages) */
  16static char tag_msg = CEPH_MSGR_TAG_MSG;
  17static char tag_ack = CEPH_MSGR_TAG_ACK;
  18static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
  19static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
  20
  21/*
  22 * If @buf is NULL, discard up to @len bytes.
  23 */
  24static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
  25{
  26        struct kvec iov = {buf, len};
  27        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  28        int r;
  29
  30        if (!buf)
  31                msg.msg_flags |= MSG_TRUNC;
  32
  33        iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len);
  34        r = sock_recvmsg(sock, &msg, msg.msg_flags);
  35        if (r == -EAGAIN)
  36                r = 0;
  37        return r;
  38}
  39
  40static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
  41                     int page_offset, size_t length)
  42{
  43        struct bio_vec bvec = {
  44                .bv_page = page,
  45                .bv_offset = page_offset,
  46                .bv_len = length
  47        };
  48        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  49        int r;
  50
  51        BUG_ON(page_offset + length > PAGE_SIZE);
  52        iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length);
  53        r = sock_recvmsg(sock, &msg, msg.msg_flags);
  54        if (r == -EAGAIN)
  55                r = 0;
  56        return r;
  57}
  58
  59/*
  60 * write something.  @more is true if caller will be sending more data
  61 * shortly.
  62 */
  63static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
  64                            size_t kvlen, size_t len, bool more)
  65{
  66        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  67        int r;
  68
  69        if (more)
  70                msg.msg_flags |= MSG_MORE;
  71        else
  72                msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
  73
  74        r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
  75        if (r == -EAGAIN)
  76                r = 0;
  77        return r;
  78}
  79
  80/*
  81 * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
  82 */
  83static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
  84                             int offset, size_t size, int more)
  85{
  86        ssize_t (*sendpage)(struct socket *sock, struct page *page,
  87                            int offset, size_t size, int flags);
  88        int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
  89        int ret;
  90
  91        /*
  92         * sendpage cannot properly handle pages with page_count == 0,
  93         * we need to fall back to sendmsg if that's the case.
  94         *
  95         * Same goes for slab pages: skb_can_coalesce() allows
  96         * coalescing neighboring slab objects into a single frag which
  97         * triggers one of hardened usercopy checks.
  98         */
  99        if (sendpage_ok(page))
 100                sendpage = sock->ops->sendpage;
 101        else
 102                sendpage = sock_no_sendpage;
 103
 104        ret = sendpage(sock, page, offset, size, flags);
 105        if (ret == -EAGAIN)
 106                ret = 0;
 107
 108        return ret;
 109}
 110
 111static void con_out_kvec_reset(struct ceph_connection *con)
 112{
 113        BUG_ON(con->v1.out_skip);
 114
 115        con->v1.out_kvec_left = 0;
 116        con->v1.out_kvec_bytes = 0;
 117        con->v1.out_kvec_cur = &con->v1.out_kvec[0];
 118}
 119
 120static void con_out_kvec_add(struct ceph_connection *con,
 121                                size_t size, void *data)
 122{
 123        int index = con->v1.out_kvec_left;
 124
 125        BUG_ON(con->v1.out_skip);
 126        BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
 127
 128        con->v1.out_kvec[index].iov_len = size;
 129        con->v1.out_kvec[index].iov_base = data;
 130        con->v1.out_kvec_left++;
 131        con->v1.out_kvec_bytes += size;
 132}
 133
 134/*
 135 * Chop off a kvec from the end.  Return residual number of bytes for
 136 * that kvec, i.e. how many bytes would have been written if the kvec
 137 * hadn't been nuked.
 138 */
 139static int con_out_kvec_skip(struct ceph_connection *con)
 140{
 141        int skip = 0;
 142
 143        if (con->v1.out_kvec_bytes > 0) {
 144                skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
 145                BUG_ON(con->v1.out_kvec_bytes < skip);
 146                BUG_ON(!con->v1.out_kvec_left);
 147                con->v1.out_kvec_bytes -= skip;
 148                con->v1.out_kvec_left--;
 149        }
 150
 151        return skip;
 152}
 153
 154static size_t sizeof_footer(struct ceph_connection *con)
 155{
 156        return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
 157            sizeof(struct ceph_msg_footer) :
 158            sizeof(struct ceph_msg_footer_old);
 159}
 160
 161static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 162{
 163        /* Initialize data cursor */
 164
 165        ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
 166}
 167
 168/*
 169 * Prepare footer for currently outgoing message, and finish things
 170 * off.  Assumes out_kvec* are already valid.. we just add on to the end.
 171 */
 172static void prepare_write_message_footer(struct ceph_connection *con)
 173{
 174        struct ceph_msg *m = con->out_msg;
 175
 176        m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 177
 178        dout("prepare_write_message_footer %p\n", con);
 179        con_out_kvec_add(con, sizeof_footer(con), &m->footer);
 180        if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
 181                if (con->ops->sign_message)
 182                        con->ops->sign_message(m);
 183                else
 184                        m->footer.sig = 0;
 185        } else {
 186                m->old_footer.flags = m->footer.flags;
 187        }
 188        con->v1.out_more = m->more_to_follow;
 189        con->v1.out_msg_done = true;
 190}
 191
 192/*
 193 * Prepare headers for the next outgoing message.
 194 */
 195static void prepare_write_message(struct ceph_connection *con)
 196{
 197        struct ceph_msg *m;
 198        u32 crc;
 199
 200        con_out_kvec_reset(con);
 201        con->v1.out_msg_done = false;
 202
 203        /* Sneak an ack in there first?  If we can get it into the same
 204         * TCP packet that's a good thing. */
 205        if (con->in_seq > con->in_seq_acked) {
 206                con->in_seq_acked = con->in_seq;
 207                con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 208                con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 209                con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 210                        &con->v1.out_temp_ack);
 211        }
 212
 213        ceph_con_get_out_msg(con);
 214        m = con->out_msg;
 215
 216        dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
 217             m, con->out_seq, le16_to_cpu(m->hdr.type),
 218             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
 219             m->data_length);
 220        WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
 221        WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
 222
 223        /* tag + hdr + front + middle */
 224        con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
 225        con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
 226        con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 227
 228        if (m->middle)
 229                con_out_kvec_add(con, m->middle->vec.iov_len,
 230                        m->middle->vec.iov_base);
 231
 232        /* fill in hdr crc and finalize hdr */
 233        crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
 234        con->out_msg->hdr.crc = cpu_to_le32(crc);
 235        memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
 236
 237        /* fill in front and middle crc, footer */
 238        crc = crc32c(0, m->front.iov_base, m->front.iov_len);
 239        con->out_msg->footer.front_crc = cpu_to_le32(crc);
 240        if (m->middle) {
 241                crc = crc32c(0, m->middle->vec.iov_base,
 242                                m->middle->vec.iov_len);
 243                con->out_msg->footer.middle_crc = cpu_to_le32(crc);
 244        } else
 245                con->out_msg->footer.middle_crc = 0;
 246        dout("%s front_crc %u middle_crc %u\n", __func__,
 247             le32_to_cpu(con->out_msg->footer.front_crc),
 248             le32_to_cpu(con->out_msg->footer.middle_crc));
 249        con->out_msg->footer.flags = 0;
 250
 251        /* is there a data payload? */
 252        con->out_msg->footer.data_crc = 0;
 253        if (m->data_length) {
 254                prepare_message_data(con->out_msg, m->data_length);
 255                con->v1.out_more = 1;  /* data + footer will follow */
 256        } else {
 257                /* no, queue up footer too and be done */
 258                prepare_write_message_footer(con);
 259        }
 260
 261        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 262}
 263
 264/*
 265 * Prepare an ack.
 266 */
 267static void prepare_write_ack(struct ceph_connection *con)
 268{
 269        dout("prepare_write_ack %p %llu -> %llu\n", con,
 270             con->in_seq_acked, con->in_seq);
 271        con->in_seq_acked = con->in_seq;
 272
 273        con_out_kvec_reset(con);
 274
 275        con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 276
 277        con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 278        con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 279                         &con->v1.out_temp_ack);
 280
 281        con->v1.out_more = 1;  /* more will follow.. eventually.. */
 282        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 283}
 284
 285/*
 286 * Prepare to share the seq during handshake
 287 */
 288static void prepare_write_seq(struct ceph_connection *con)
 289{
 290        dout("prepare_write_seq %p %llu -> %llu\n", con,
 291             con->in_seq_acked, con->in_seq);
 292        con->in_seq_acked = con->in_seq;
 293
 294        con_out_kvec_reset(con);
 295
 296        con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 297        con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 298                         &con->v1.out_temp_ack);
 299
 300        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 301}
 302
 303/*
 304 * Prepare to write keepalive byte.
 305 */
 306static void prepare_write_keepalive(struct ceph_connection *con)
 307{
 308        dout("prepare_write_keepalive %p\n", con);
 309        con_out_kvec_reset(con);
 310        if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
 311                struct timespec64 now;
 312
 313                ktime_get_real_ts64(&now);
 314                con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
 315                ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
 316                con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
 317                                 &con->v1.out_temp_keepalive2);
 318        } else {
 319                con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
 320        }
 321        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 322}
 323
 324/*
 325 * Connection negotiation.
 326 */
 327
 328static int get_connect_authorizer(struct ceph_connection *con)
 329{
 330        struct ceph_auth_handshake *auth;
 331        int auth_proto;
 332
 333        if (!con->ops->get_authorizer) {
 334                con->v1.auth = NULL;
 335                con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
 336                con->v1.out_connect.authorizer_len = 0;
 337                return 0;
 338        }
 339
 340        auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
 341        if (IS_ERR(auth))
 342                return PTR_ERR(auth);
 343
 344        con->v1.auth = auth;
 345        con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
 346        con->v1.out_connect.authorizer_len =
 347                cpu_to_le32(auth->authorizer_buf_len);
 348        return 0;
 349}
 350
 351/*
 352 * We connected to a peer and are saying hello.
 353 */
 354static void prepare_write_banner(struct ceph_connection *con)
 355{
 356        con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
 357        con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
 358                                        &con->msgr->my_enc_addr);
 359
 360        con->v1.out_more = 0;
 361        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 362}
 363
 364static void __prepare_write_connect(struct ceph_connection *con)
 365{
 366        con_out_kvec_add(con, sizeof(con->v1.out_connect),
 367                         &con->v1.out_connect);
 368        if (con->v1.auth)
 369                con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
 370                                 con->v1.auth->authorizer_buf);
 371
 372        con->v1.out_more = 0;
 373        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 374}
 375
 376static int prepare_write_connect(struct ceph_connection *con)
 377{
 378        unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
 379        int proto;
 380        int ret;
 381
 382        switch (con->peer_name.type) {
 383        case CEPH_ENTITY_TYPE_MON:
 384                proto = CEPH_MONC_PROTOCOL;
 385                break;
 386        case CEPH_ENTITY_TYPE_OSD:
 387                proto = CEPH_OSDC_PROTOCOL;
 388                break;
 389        case CEPH_ENTITY_TYPE_MDS:
 390                proto = CEPH_MDSC_PROTOCOL;
 391                break;
 392        default:
 393                BUG();
 394        }
 395
 396        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
 397             con->v1.connect_seq, global_seq, proto);
 398
 399        con->v1.out_connect.features =
 400                cpu_to_le64(from_msgr(con->msgr)->supported_features);
 401        con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
 402        con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
 403        con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
 404        con->v1.out_connect.protocol_version = cpu_to_le32(proto);
 405        con->v1.out_connect.flags = 0;
 406
 407        ret = get_connect_authorizer(con);
 408        if (ret)
 409                return ret;
 410
 411        __prepare_write_connect(con);
 412        return 0;
 413}
 414
 415/*
 416 * write as much of pending kvecs to the socket as we can.
 417 *  1 -> done
 418 *  0 -> socket full, but more to do
 419 * <0 -> error
 420 */
 421static int write_partial_kvec(struct ceph_connection *con)
 422{
 423        int ret;
 424
 425        dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
 426        while (con->v1.out_kvec_bytes > 0) {
 427                ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
 428                                       con->v1.out_kvec_left,
 429                                       con->v1.out_kvec_bytes,
 430                                       con->v1.out_more);
 431                if (ret <= 0)
 432                        goto out;
 433                con->v1.out_kvec_bytes -= ret;
 434                if (!con->v1.out_kvec_bytes)
 435                        break;            /* done */
 436
 437                /* account for full iov entries consumed */
 438                while (ret >= con->v1.out_kvec_cur->iov_len) {
 439                        BUG_ON(!con->v1.out_kvec_left);
 440                        ret -= con->v1.out_kvec_cur->iov_len;
 441                        con->v1.out_kvec_cur++;
 442                        con->v1.out_kvec_left--;
 443                }
 444                /* and for a partially-consumed entry */
 445                if (ret) {
 446                        con->v1.out_kvec_cur->iov_len -= ret;
 447                        con->v1.out_kvec_cur->iov_base += ret;
 448                }
 449        }
 450        con->v1.out_kvec_left = 0;
 451        ret = 1;
 452out:
 453        dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
 454             con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
 455        return ret;  /* done! */
 456}
 457
 458/*
 459 * Write as much message data payload as we can.  If we finish, queue
 460 * up the footer.
 461 *  1 -> done, footer is now queued in out_kvec[].
 462 *  0 -> socket full, but more to do
 463 * <0 -> error
 464 */
 465static int write_partial_message_data(struct ceph_connection *con)
 466{
 467        struct ceph_msg *msg = con->out_msg;
 468        struct ceph_msg_data_cursor *cursor = &msg->cursor;
 469        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
 470        int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 471        u32 crc;
 472
 473        dout("%s %p msg %p\n", __func__, con, msg);
 474
 475        if (!msg->num_data_items)
 476                return -EINVAL;
 477
 478        /*
 479         * Iterate through each page that contains data to be
 480         * written, and send as much as possible for each.
 481         *
 482         * If we are calculating the data crc (the default), we will
 483         * need to map the page.  If we have no pages, they have
 484         * been revoked, so use the zero page.
 485         */
 486        crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
 487        while (cursor->total_resid) {
 488                struct page *page;
 489                size_t page_offset;
 490                size_t length;
 491                int ret;
 492
 493                if (!cursor->resid) {
 494                        ceph_msg_data_advance(cursor, 0);
 495                        continue;
 496                }
 497
 498                page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
 499                if (length == cursor->total_resid)
 500                        more = MSG_MORE;
 501                ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
 502                                        more);
 503                if (ret <= 0) {
 504                        if (do_datacrc)
 505                                msg->footer.data_crc = cpu_to_le32(crc);
 506
 507                        return ret;
 508                }
 509                if (do_datacrc && cursor->need_crc)
 510                        crc = ceph_crc32c_page(crc, page, page_offset, length);
 511                ceph_msg_data_advance(cursor, (size_t)ret);
 512        }
 513
 514        dout("%s %p msg %p done\n", __func__, con, msg);
 515
 516        /* prepare and queue up footer, too */
 517        if (do_datacrc)
 518                msg->footer.data_crc = cpu_to_le32(crc);
 519        else
 520                msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
 521        con_out_kvec_reset(con);
 522        prepare_write_message_footer(con);
 523
 524        return 1;       /* must return > 0 to indicate success */
 525}
 526
 527/*
 528 * write some zeros
 529 */
 530static int write_partial_skip(struct ceph_connection *con)
 531{
 532        int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 533        int ret;
 534
 535        dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
 536        while (con->v1.out_skip > 0) {
 537                size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
 538
 539                if (size == con->v1.out_skip)
 540                        more = MSG_MORE;
 541                ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
 542                                        more);
 543                if (ret <= 0)
 544                        goto out;
 545                con->v1.out_skip -= ret;
 546        }
 547        ret = 1;
 548out:
 549        return ret;
 550}
 551
 552/*
 553 * Prepare to read connection handshake, or an ack.
 554 */
 555static void prepare_read_banner(struct ceph_connection *con)
 556{
 557        dout("prepare_read_banner %p\n", con);
 558        con->v1.in_base_pos = 0;
 559}
 560
 561static void prepare_read_connect(struct ceph_connection *con)
 562{
 563        dout("prepare_read_connect %p\n", con);
 564        con->v1.in_base_pos = 0;
 565}
 566
 567static void prepare_read_ack(struct ceph_connection *con)
 568{
 569        dout("prepare_read_ack %p\n", con);
 570        con->v1.in_base_pos = 0;
 571}
 572
 573static void prepare_read_seq(struct ceph_connection *con)
 574{
 575        dout("prepare_read_seq %p\n", con);
 576        con->v1.in_base_pos = 0;
 577        con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
 578}
 579
 580static void prepare_read_tag(struct ceph_connection *con)
 581{
 582        dout("prepare_read_tag %p\n", con);
 583        con->v1.in_base_pos = 0;
 584        con->v1.in_tag = CEPH_MSGR_TAG_READY;
 585}
 586
 587static void prepare_read_keepalive_ack(struct ceph_connection *con)
 588{
 589        dout("prepare_read_keepalive_ack %p\n", con);
 590        con->v1.in_base_pos = 0;
 591}
 592
 593/*
 594 * Prepare to read a message.
 595 */
 596static int prepare_read_message(struct ceph_connection *con)
 597{
 598        dout("prepare_read_message %p\n", con);
 599        BUG_ON(con->in_msg != NULL);
 600        con->v1.in_base_pos = 0;
 601        con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
 602        return 0;
 603}
 604
 605static int read_partial(struct ceph_connection *con,
 606                        int end, int size, void *object)
 607{
 608        while (con->v1.in_base_pos < end) {
 609                int left = end - con->v1.in_base_pos;
 610                int have = size - left;
 611                int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
 612                if (ret <= 0)
 613                        return ret;
 614                con->v1.in_base_pos += ret;
 615        }
 616        return 1;
 617}
 618
 619/*
 620 * Read all or part of the connect-side handshake on a new connection
 621 */
 622static int read_partial_banner(struct ceph_connection *con)
 623{
 624        int size;
 625        int end;
 626        int ret;
 627
 628        dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
 629
 630        /* peer's banner */
 631        size = strlen(CEPH_BANNER);
 632        end = size;
 633        ret = read_partial(con, end, size, con->v1.in_banner);
 634        if (ret <= 0)
 635                goto out;
 636
 637        size = sizeof(con->v1.actual_peer_addr);
 638        end += size;
 639        ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
 640        if (ret <= 0)
 641                goto out;
 642        ceph_decode_banner_addr(&con->v1.actual_peer_addr);
 643
 644        size = sizeof(con->v1.peer_addr_for_me);
 645        end += size;
 646        ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
 647        if (ret <= 0)
 648                goto out;
 649        ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
 650
 651out:
 652        return ret;
 653}
 654
 655static int read_partial_connect(struct ceph_connection *con)
 656{
 657        int size;
 658        int end;
 659        int ret;
 660
 661        dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
 662
 663        size = sizeof(con->v1.in_reply);
 664        end = size;
 665        ret = read_partial(con, end, size, &con->v1.in_reply);
 666        if (ret <= 0)
 667                goto out;
 668
 669        if (con->v1.auth) {
 670                size = le32_to_cpu(con->v1.in_reply.authorizer_len);
 671                if (size > con->v1.auth->authorizer_reply_buf_len) {
 672                        pr_err("authorizer reply too big: %d > %zu\n", size,
 673                               con->v1.auth->authorizer_reply_buf_len);
 674                        ret = -EINVAL;
 675                        goto out;
 676                }
 677
 678                end += size;
 679                ret = read_partial(con, end, size,
 680                                   con->v1.auth->authorizer_reply_buf);
 681                if (ret <= 0)
 682                        goto out;
 683        }
 684
 685        dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
 686             con, con->v1.in_reply.tag,
 687             le32_to_cpu(con->v1.in_reply.connect_seq),
 688             le32_to_cpu(con->v1.in_reply.global_seq));
 689out:
 690        return ret;
 691}
 692
 693/*
 694 * Verify the hello banner looks okay.
 695 */
 696static int verify_hello(struct ceph_connection *con)
 697{
 698        if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
 699                pr_err("connect to %s got bad banner\n",
 700                       ceph_pr_addr(&con->peer_addr));
 701                con->error_msg = "protocol error, bad banner";
 702                return -1;
 703        }
 704        return 0;
 705}
 706
 707static int process_banner(struct ceph_connection *con)
 708{
 709        struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
 710
 711        dout("process_banner on %p\n", con);
 712
 713        if (verify_hello(con) < 0)
 714                return -1;
 715
 716        /*
 717         * Make sure the other end is who we wanted.  note that the other
 718         * end may not yet know their ip address, so if it's 0.0.0.0, give
 719         * them the benefit of the doubt.
 720         */
 721        if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
 722                   sizeof(con->peer_addr)) != 0 &&
 723            !(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
 724              con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
 725                pr_warn("wrong peer, want %s/%u, got %s/%u\n",
 726                        ceph_pr_addr(&con->peer_addr),
 727                        le32_to_cpu(con->peer_addr.nonce),
 728                        ceph_pr_addr(&con->v1.actual_peer_addr),
 729                        le32_to_cpu(con->v1.actual_peer_addr.nonce));
 730                con->error_msg = "wrong peer at address";
 731                return -1;
 732        }
 733
 734        /*
 735         * did we learn our address?
 736         */
 737        if (ceph_addr_is_blank(my_addr)) {
 738                memcpy(&my_addr->in_addr,
 739                       &con->v1.peer_addr_for_me.in_addr,
 740                       sizeof(con->v1.peer_addr_for_me.in_addr));
 741                ceph_addr_set_port(my_addr, 0);
 742                ceph_encode_my_addr(con->msgr);
 743                dout("process_banner learned my addr is %s\n",
 744                     ceph_pr_addr(my_addr));
 745        }
 746
 747        return 0;
 748}
 749
 750static int process_connect(struct ceph_connection *con)
 751{
 752        u64 sup_feat = from_msgr(con->msgr)->supported_features;
 753        u64 req_feat = from_msgr(con->msgr)->required_features;
 754        u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
 755        int ret;
 756
 757        dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
 758
 759        if (con->v1.auth) {
 760                int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
 761
 762                /*
 763                 * Any connection that defines ->get_authorizer()
 764                 * should also define ->add_authorizer_challenge() and
 765                 * ->verify_authorizer_reply().
 766                 *
 767                 * See get_connect_authorizer().
 768                 */
 769                if (con->v1.in_reply.tag ==
 770                                CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
 771                        ret = con->ops->add_authorizer_challenge(
 772                                con, con->v1.auth->authorizer_reply_buf, len);
 773                        if (ret < 0)
 774                                return ret;
 775
 776                        con_out_kvec_reset(con);
 777                        __prepare_write_connect(con);
 778                        prepare_read_connect(con);
 779                        return 0;
 780                }
 781
 782                if (len) {
 783                        ret = con->ops->verify_authorizer_reply(con);
 784                        if (ret < 0) {
 785                                con->error_msg = "bad authorize reply";
 786                                return ret;
 787                        }
 788                }
 789        }
 790
 791        switch (con->v1.in_reply.tag) {
 792        case CEPH_MSGR_TAG_FEATURES:
 793                pr_err("%s%lld %s feature set mismatch,"
 794                       " my %llx < server's %llx, missing %llx\n",
 795                       ENTITY_NAME(con->peer_name),
 796                       ceph_pr_addr(&con->peer_addr),
 797                       sup_feat, server_feat, server_feat & ~sup_feat);
 798                con->error_msg = "missing required protocol features";
 799                return -1;
 800
 801        case CEPH_MSGR_TAG_BADPROTOVER:
 802                pr_err("%s%lld %s protocol version mismatch,"
 803                       " my %d != server's %d\n",
 804                       ENTITY_NAME(con->peer_name),
 805                       ceph_pr_addr(&con->peer_addr),
 806                       le32_to_cpu(con->v1.out_connect.protocol_version),
 807                       le32_to_cpu(con->v1.in_reply.protocol_version));
 808                con->error_msg = "protocol version mismatch";
 809                return -1;
 810
 811        case CEPH_MSGR_TAG_BADAUTHORIZER:
 812                con->v1.auth_retry++;
 813                dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
 814                     con->v1.auth_retry);
 815                if (con->v1.auth_retry == 2) {
 816                        con->error_msg = "connect authorization failure";
 817                        return -1;
 818                }
 819                con_out_kvec_reset(con);
 820                ret = prepare_write_connect(con);
 821                if (ret < 0)
 822                        return ret;
 823                prepare_read_connect(con);
 824                break;
 825
 826        case CEPH_MSGR_TAG_RESETSESSION:
 827                /*
 828                 * If we connected with a large connect_seq but the peer
 829                 * has no record of a session with us (no connection, or
 830                 * connect_seq == 0), they will send RESETSESION to indicate
 831                 * that they must have reset their session, and may have
 832                 * dropped messages.
 833                 */
 834                dout("process_connect got RESET peer seq %u\n",
 835                     le32_to_cpu(con->v1.in_reply.connect_seq));
 836                pr_info("%s%lld %s session reset\n",
 837                        ENTITY_NAME(con->peer_name),
 838                        ceph_pr_addr(&con->peer_addr));
 839                ceph_con_reset_session(con);
 840                con_out_kvec_reset(con);
 841                ret = prepare_write_connect(con);
 842                if (ret < 0)
 843                        return ret;
 844                prepare_read_connect(con);
 845
 846                /* Tell ceph about it. */
 847                mutex_unlock(&con->mutex);
 848                if (con->ops->peer_reset)
 849                        con->ops->peer_reset(con);
 850                mutex_lock(&con->mutex);
 851                if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
 852                        return -EAGAIN;
 853                break;
 854
 855        case CEPH_MSGR_TAG_RETRY_SESSION:
 856                /*
 857                 * If we sent a smaller connect_seq than the peer has, try
 858                 * again with a larger value.
 859                 */
 860                dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
 861                     le32_to_cpu(con->v1.out_connect.connect_seq),
 862                     le32_to_cpu(con->v1.in_reply.connect_seq));
 863                con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
 864                con_out_kvec_reset(con);
 865                ret = prepare_write_connect(con);
 866                if (ret < 0)
 867                        return ret;
 868                prepare_read_connect(con);
 869                break;
 870
 871        case CEPH_MSGR_TAG_RETRY_GLOBAL:
 872                /*
 873                 * If we sent a smaller global_seq than the peer has, try
 874                 * again with a larger value.
 875                 */
 876                dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
 877                     con->v1.peer_global_seq,
 878                     le32_to_cpu(con->v1.in_reply.global_seq));
 879                ceph_get_global_seq(con->msgr,
 880                                    le32_to_cpu(con->v1.in_reply.global_seq));
 881                con_out_kvec_reset(con);
 882                ret = prepare_write_connect(con);
 883                if (ret < 0)
 884                        return ret;
 885                prepare_read_connect(con);
 886                break;
 887
 888        case CEPH_MSGR_TAG_SEQ:
 889        case CEPH_MSGR_TAG_READY:
 890                if (req_feat & ~server_feat) {
 891                        pr_err("%s%lld %s protocol feature mismatch,"
 892                               " my required %llx > server's %llx, need %llx\n",
 893                               ENTITY_NAME(con->peer_name),
 894                               ceph_pr_addr(&con->peer_addr),
 895                               req_feat, server_feat, req_feat & ~server_feat);
 896                        con->error_msg = "missing required protocol features";
 897                        return -1;
 898                }
 899
 900                WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
 901                con->state = CEPH_CON_S_OPEN;
 902                con->v1.auth_retry = 0;    /* we authenticated; clear flag */
 903                con->v1.peer_global_seq =
 904                        le32_to_cpu(con->v1.in_reply.global_seq);
 905                con->v1.connect_seq++;
 906                con->peer_features = server_feat;
 907                dout("process_connect got READY gseq %d cseq %d (%d)\n",
 908                     con->v1.peer_global_seq,
 909                     le32_to_cpu(con->v1.in_reply.connect_seq),
 910                     con->v1.connect_seq);
 911                WARN_ON(con->v1.connect_seq !=
 912                        le32_to_cpu(con->v1.in_reply.connect_seq));
 913
 914                if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
 915                        ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
 916
 917                con->delay = 0;      /* reset backoff memory */
 918
 919                if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
 920                        prepare_write_seq(con);
 921                        prepare_read_seq(con);
 922                } else {
 923                        prepare_read_tag(con);
 924                }
 925                break;
 926
 927        case CEPH_MSGR_TAG_WAIT:
 928                /*
 929                 * If there is a connection race (we are opening
 930                 * connections to each other), one of us may just have
 931                 * to WAIT.  This shouldn't happen if we are the
 932                 * client.
 933                 */
 934                con->error_msg = "protocol error, got WAIT as client";
 935                return -1;
 936
 937        default:
 938                con->error_msg = "protocol error, garbage tag during connect";
 939                return -1;
 940        }
 941        return 0;
 942}
 943
 944/*
 945 * read (part of) an ack
 946 */
 947static int read_partial_ack(struct ceph_connection *con)
 948{
 949        int size = sizeof(con->v1.in_temp_ack);
 950        int end = size;
 951
 952        return read_partial(con, end, size, &con->v1.in_temp_ack);
 953}
 954
 955/*
 956 * We can finally discard anything that's been acked.
 957 */
 958static void process_ack(struct ceph_connection *con)
 959{
 960        u64 ack = le64_to_cpu(con->v1.in_temp_ack);
 961
 962        if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
 963                ceph_con_discard_sent(con, ack);
 964        else
 965                ceph_con_discard_requeued(con, ack);
 966
 967        prepare_read_tag(con);
 968}
 969
 970static int read_partial_message_section(struct ceph_connection *con,
 971                                        struct kvec *section,
 972                                        unsigned int sec_len, u32 *crc)
 973{
 974        int ret, left;
 975
 976        BUG_ON(!section);
 977
 978        while (section->iov_len < sec_len) {
 979                BUG_ON(section->iov_base == NULL);
 980                left = sec_len - section->iov_len;
 981                ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
 982                                       section->iov_len, left);
 983                if (ret <= 0)
 984                        return ret;
 985                section->iov_len += ret;
 986        }
 987        if (section->iov_len == sec_len)
 988                *crc = crc32c(0, section->iov_base, section->iov_len);
 989
 990        return 1;
 991}
 992
 993static int read_partial_msg_data(struct ceph_connection *con)
 994{
 995        struct ceph_msg *msg = con->in_msg;
 996        struct ceph_msg_data_cursor *cursor = &msg->cursor;
 997        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
 998        struct page *page;
 999        size_t page_offset;
1000        size_t length;
1001        u32 crc = 0;
1002        int ret;
1003
1004        if (!msg->num_data_items)
1005                return -EIO;
1006
1007        if (do_datacrc)
1008                crc = con->in_data_crc;
1009        while (cursor->total_resid) {
1010                if (!cursor->resid) {
1011                        ceph_msg_data_advance(cursor, 0);
1012                        continue;
1013                }
1014
1015                page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
1016                ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
1017                if (ret <= 0) {
1018                        if (do_datacrc)
1019                                con->in_data_crc = crc;
1020
1021                        return ret;
1022                }
1023
1024                if (do_datacrc)
1025                        crc = ceph_crc32c_page(crc, page, page_offset, ret);
1026                ceph_msg_data_advance(cursor, (size_t)ret);
1027        }
1028        if (do_datacrc)
1029                con->in_data_crc = crc;
1030
1031        return 1;       /* must return > 0 to indicate success */
1032}
1033
1034/*
1035 * read (part of) a message.
1036 */
1037static int read_partial_message(struct ceph_connection *con)
1038{
1039        struct ceph_msg *m = con->in_msg;
1040        int size;
1041        int end;
1042        int ret;
1043        unsigned int front_len, middle_len, data_len;
1044        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1045        bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
1046        u64 seq;
1047        u32 crc;
1048
1049        dout("read_partial_message con %p msg %p\n", con, m);
1050
1051        /* header */
1052        size = sizeof(con->v1.in_hdr);
1053        end = size;
1054        ret = read_partial(con, end, size, &con->v1.in_hdr);
1055        if (ret <= 0)
1056                return ret;
1057
1058        crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
1059        if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
1060                pr_err("read_partial_message bad hdr crc %u != expected %u\n",
1061                       crc, con->v1.in_hdr.crc);
1062                return -EBADMSG;
1063        }
1064
1065        front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1066        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1067                return -EIO;
1068        middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1069        if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1070                return -EIO;
1071        data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1072        if (data_len > CEPH_MSG_MAX_DATA_LEN)
1073                return -EIO;
1074
1075        /* verify seq# */
1076        seq = le64_to_cpu(con->v1.in_hdr.seq);
1077        if ((s64)seq - (s64)con->in_seq < 1) {
1078                pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1079                        ENTITY_NAME(con->peer_name),
1080                        ceph_pr_addr(&con->peer_addr),
1081                        seq, con->in_seq + 1);
1082                con->v1.in_base_pos = -front_len - middle_len - data_len -
1083                                      sizeof_footer(con);
1084                con->v1.in_tag = CEPH_MSGR_TAG_READY;
1085                return 1;
1086        } else if ((s64)seq - (s64)con->in_seq > 1) {
1087                pr_err("read_partial_message bad seq %lld expected %lld\n",
1088                       seq, con->in_seq + 1);
1089                con->error_msg = "bad message sequence # for incoming message";
1090                return -EBADE;
1091        }
1092
1093        /* allocate message? */
1094        if (!con->in_msg) {
1095                int skip = 0;
1096
1097                dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
1098                     front_len, data_len);
1099                ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
1100                if (ret < 0)
1101                        return ret;
1102
1103                BUG_ON((!con->in_msg) ^ skip);
1104                if (skip) {
1105                        /* skip this message */
1106                        dout("alloc_msg said skip message\n");
1107                        con->v1.in_base_pos = -front_len - middle_len -
1108                                              data_len - sizeof_footer(con);
1109                        con->v1.in_tag = CEPH_MSGR_TAG_READY;
1110                        con->in_seq++;
1111                        return 1;
1112                }
1113
1114                BUG_ON(!con->in_msg);
1115                BUG_ON(con->in_msg->con != con);
1116                m = con->in_msg;
1117                m->front.iov_len = 0;    /* haven't read it yet */
1118                if (m->middle)
1119                        m->middle->vec.iov_len = 0;
1120
1121                /* prepare for data payload, if any */
1122
1123                if (data_len)
1124                        prepare_message_data(con->in_msg, data_len);
1125        }
1126
1127        /* front */
1128        ret = read_partial_message_section(con, &m->front, front_len,
1129                                           &con->in_front_crc);
1130        if (ret <= 0)
1131                return ret;
1132
1133        /* middle */
1134        if (m->middle) {
1135                ret = read_partial_message_section(con, &m->middle->vec,
1136                                                   middle_len,
1137                                                   &con->in_middle_crc);
1138                if (ret <= 0)
1139                        return ret;
1140        }
1141
1142        /* (page) data */
1143        if (data_len) {
1144                ret = read_partial_msg_data(con);
1145                if (ret <= 0)
1146                        return ret;
1147        }
1148
1149        /* footer */
1150        size = sizeof_footer(con);
1151        end += size;
1152        ret = read_partial(con, end, size, &m->footer);
1153        if (ret <= 0)
1154                return ret;
1155
1156        if (!need_sign) {
1157                m->footer.flags = m->old_footer.flags;
1158                m->footer.sig = 0;
1159        }
1160
1161        dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1162             m, front_len, m->footer.front_crc, middle_len,
1163             m->footer.middle_crc, data_len, m->footer.data_crc);
1164
1165        /* crc ok? */
1166        if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1167                pr_err("read_partial_message %p front crc %u != exp. %u\n",
1168                       m, con->in_front_crc, m->footer.front_crc);
1169                return -EBADMSG;
1170        }
1171        if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1172                pr_err("read_partial_message %p middle crc %u != exp %u\n",
1173                       m, con->in_middle_crc, m->footer.middle_crc);
1174                return -EBADMSG;
1175        }
1176        if (do_datacrc &&
1177            (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1178            con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1179                pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1180                       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1181                return -EBADMSG;
1182        }
1183
1184        if (need_sign && con->ops->check_message_signature &&
1185            con->ops->check_message_signature(m)) {
1186                pr_err("read_partial_message %p signature check failed\n", m);
1187                return -EBADMSG;
1188        }
1189
1190        return 1; /* done! */
1191}
1192
1193static int read_keepalive_ack(struct ceph_connection *con)
1194{
1195        struct ceph_timespec ceph_ts;
1196        size_t size = sizeof(ceph_ts);
1197        int ret = read_partial(con, size, size, &ceph_ts);
1198        if (ret <= 0)
1199                return ret;
1200        ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
1201        prepare_read_tag(con);
1202        return 1;
1203}
1204
1205/*
1206 * Read what we can from the socket.
1207 */
1208int ceph_con_v1_try_read(struct ceph_connection *con)
1209{
1210        int ret = -1;
1211
1212more:
1213        dout("try_read start %p state %d\n", con, con->state);
1214        if (con->state != CEPH_CON_S_V1_BANNER &&
1215            con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1216            con->state != CEPH_CON_S_OPEN)
1217                return 0;
1218
1219        BUG_ON(!con->sock);
1220
1221        dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
1222             con->v1.in_base_pos);
1223
1224        if (con->state == CEPH_CON_S_V1_BANNER) {
1225                ret = read_partial_banner(con);
1226                if (ret <= 0)
1227                        goto out;
1228                ret = process_banner(con);
1229                if (ret < 0)
1230                        goto out;
1231
1232                con->state = CEPH_CON_S_V1_CONNECT_MSG;
1233
1234                /*
1235                 * Received banner is good, exchange connection info.
1236                 * Do not reset out_kvec, as sending our banner raced
1237                 * with receiving peer banner after connect completed.
1238                 */
1239                ret = prepare_write_connect(con);
1240                if (ret < 0)
1241                        goto out;
1242                prepare_read_connect(con);
1243
1244                /* Send connection info before awaiting response */
1245                goto out;
1246        }
1247
1248        if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
1249                ret = read_partial_connect(con);
1250                if (ret <= 0)
1251                        goto out;
1252                ret = process_connect(con);
1253                if (ret < 0)
1254                        goto out;
1255                goto more;
1256        }
1257
1258        WARN_ON(con->state != CEPH_CON_S_OPEN);
1259
1260        if (con->v1.in_base_pos < 0) {
1261                /*
1262                 * skipping + discarding content.
1263                 */
1264                ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
1265                if (ret <= 0)
1266                        goto out;
1267                dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
1268                con->v1.in_base_pos += ret;
1269                if (con->v1.in_base_pos)
1270                        goto more;
1271        }
1272        if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
1273                /*
1274                 * what's next?
1275                 */
1276                ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
1277                if (ret <= 0)
1278                        goto out;
1279                dout("try_read got tag %d\n", con->v1.in_tag);
1280                switch (con->v1.in_tag) {
1281                case CEPH_MSGR_TAG_MSG:
1282                        prepare_read_message(con);
1283                        break;
1284                case CEPH_MSGR_TAG_ACK:
1285                        prepare_read_ack(con);
1286                        break;
1287                case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
1288                        prepare_read_keepalive_ack(con);
1289                        break;
1290                case CEPH_MSGR_TAG_CLOSE:
1291                        ceph_con_close_socket(con);
1292                        con->state = CEPH_CON_S_CLOSED;
1293                        goto out;
1294                default:
1295                        goto bad_tag;
1296                }
1297        }
1298        if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
1299                ret = read_partial_message(con);
1300                if (ret <= 0) {
1301                        switch (ret) {
1302                        case -EBADMSG:
1303                                con->error_msg = "bad crc/signature";
1304                                fallthrough;
1305                        case -EBADE:
1306                                ret = -EIO;
1307                                break;
1308                        case -EIO:
1309                                con->error_msg = "io error";
1310                                break;
1311                        }
1312                        goto out;
1313                }
1314                if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
1315                        goto more;
1316                ceph_con_process_message(con);
1317                if (con->state == CEPH_CON_S_OPEN)
1318                        prepare_read_tag(con);
1319                goto more;
1320        }
1321        if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
1322            con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
1323                /*
1324                 * the final handshake seq exchange is semantically
1325                 * equivalent to an ACK
1326                 */
1327                ret = read_partial_ack(con);
1328                if (ret <= 0)
1329                        goto out;
1330                process_ack(con);
1331                goto more;
1332        }
1333        if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1334                ret = read_keepalive_ack(con);
1335                if (ret <= 0)
1336                        goto out;
1337                goto more;
1338        }
1339
1340out:
1341        dout("try_read done on %p ret %d\n", con, ret);
1342        return ret;
1343
1344bad_tag:
1345        pr_err("try_read bad tag %d\n", con->v1.in_tag);
1346        con->error_msg = "protocol error, garbage tag";
1347        ret = -1;
1348        goto out;
1349}
1350
1351/*
1352 * Write something to the socket.  Called in a worker thread when the
1353 * socket appears to be writeable and we have something ready to send.
1354 */
1355int ceph_con_v1_try_write(struct ceph_connection *con)
1356{
1357        int ret = 1;
1358
1359        dout("try_write start %p state %d\n", con, con->state);
1360        if (con->state != CEPH_CON_S_PREOPEN &&
1361            con->state != CEPH_CON_S_V1_BANNER &&
1362            con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1363            con->state != CEPH_CON_S_OPEN)
1364                return 0;
1365
1366        /* open the socket first? */
1367        if (con->state == CEPH_CON_S_PREOPEN) {
1368                BUG_ON(con->sock);
1369                con->state = CEPH_CON_S_V1_BANNER;
1370
1371                con_out_kvec_reset(con);
1372                prepare_write_banner(con);
1373                prepare_read_banner(con);
1374
1375                BUG_ON(con->in_msg);
1376                con->v1.in_tag = CEPH_MSGR_TAG_READY;
1377                dout("try_write initiating connect on %p new state %d\n",
1378                     con, con->state);
1379                ret = ceph_tcp_connect(con);
1380                if (ret < 0) {
1381                        con->error_msg = "connect error";
1382                        goto out;
1383                }
1384        }
1385
1386more:
1387        dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
1388        BUG_ON(!con->sock);
1389
1390        /* kvec data queued? */
1391        if (con->v1.out_kvec_left) {
1392                ret = write_partial_kvec(con);
1393                if (ret <= 0)
1394                        goto out;
1395        }
1396        if (con->v1.out_skip) {
1397                ret = write_partial_skip(con);
1398                if (ret <= 0)
1399                        goto out;
1400        }
1401
1402        /* msg pages? */
1403        if (con->out_msg) {
1404                if (con->v1.out_msg_done) {
1405                        ceph_msg_put(con->out_msg);
1406                        con->out_msg = NULL;   /* we're done with this one */
1407                        goto do_next;
1408                }
1409
1410                ret = write_partial_message_data(con);
1411                if (ret == 1)
1412                        goto more;  /* we need to send the footer, too! */
1413                if (ret == 0)
1414                        goto out;
1415                if (ret < 0) {
1416                        dout("try_write write_partial_message_data err %d\n",
1417                             ret);
1418                        goto out;
1419                }
1420        }
1421
1422do_next:
1423        if (con->state == CEPH_CON_S_OPEN) {
1424                if (ceph_con_flag_test_and_clear(con,
1425                                CEPH_CON_F_KEEPALIVE_PENDING)) {
1426                        prepare_write_keepalive(con);
1427                        goto more;
1428                }
1429                /* is anything else pending? */
1430                if (!list_empty(&con->out_queue)) {
1431                        prepare_write_message(con);
1432                        goto more;
1433                }
1434                if (con->in_seq > con->in_seq_acked) {
1435                        prepare_write_ack(con);
1436                        goto more;
1437                }
1438        }
1439
1440        /* Nothing to do! */
1441        ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1442        dout("try_write nothing else to write.\n");
1443        ret = 0;
1444out:
1445        dout("try_write done on %p ret %d\n", con, ret);
1446        return ret;
1447}
1448
1449void ceph_con_v1_revoke(struct ceph_connection *con)
1450{
1451        struct ceph_msg *msg = con->out_msg;
1452
1453        WARN_ON(con->v1.out_skip);
1454        /* footer */
1455        if (con->v1.out_msg_done) {
1456                con->v1.out_skip += con_out_kvec_skip(con);
1457        } else {
1458                WARN_ON(!msg->data_length);
1459                con->v1.out_skip += sizeof_footer(con);
1460        }
1461        /* data, middle, front */
1462        if (msg->data_length)
1463                con->v1.out_skip += msg->cursor.total_resid;
1464        if (msg->middle)
1465                con->v1.out_skip += con_out_kvec_skip(con);
1466        con->v1.out_skip += con_out_kvec_skip(con);
1467
1468        dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
1469             con->v1.out_kvec_bytes, con->v1.out_skip);
1470}
1471
1472void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
1473{
1474        unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1475        unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1476        unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1477
1478        /* skip rest of message */
1479        con->v1.in_base_pos = con->v1.in_base_pos -
1480                        sizeof(struct ceph_msg_header) -
1481                        front_len -
1482                        middle_len -
1483                        data_len -
1484                        sizeof(struct ceph_msg_footer);
1485
1486        con->v1.in_tag = CEPH_MSGR_TAG_READY;
1487        con->in_seq++;
1488
1489        dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
1490}
1491
1492bool ceph_con_v1_opened(struct ceph_connection *con)
1493{
1494        return con->v1.connect_seq;
1495}
1496
1497void ceph_con_v1_reset_session(struct ceph_connection *con)
1498{
1499        con->v1.connect_seq = 0;
1500        con->v1.peer_global_seq = 0;
1501}
1502
1503void ceph_con_v1_reset_protocol(struct ceph_connection *con)
1504{
1505        con->v1.out_skip = 0;
1506}
1507