linux/net/ceph/messenger_v1.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/bvec.h>
   5#include <linux/crc32c.h>
   6#include <linux/net.h>
   7#include <linux/socket.h>
   8#include <net/sock.h>
   9
  10#include <linux/ceph/ceph_features.h>
  11#include <linux/ceph/decode.h>
  12#include <linux/ceph/libceph.h>
  13#include <linux/ceph/messenger.h>
  14
  15/* static tag bytes (protocol control messages) */
  16static char tag_msg = CEPH_MSGR_TAG_MSG;
  17static char tag_ack = CEPH_MSGR_TAG_ACK;
  18static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
  19static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
  20
  21/*
  22 * If @buf is NULL, discard up to @len bytes.
  23 */
  24static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
  25{
  26        struct kvec iov = {buf, len};
  27        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  28        int r;
  29
  30        if (!buf)
  31                msg.msg_flags |= MSG_TRUNC;
  32
  33        iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len);
  34        r = sock_recvmsg(sock, &msg, msg.msg_flags);
  35        if (r == -EAGAIN)
  36                r = 0;
  37        return r;
  38}
  39
  40static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
  41                     int page_offset, size_t length)
  42{
  43        struct bio_vec bvec = {
  44                .bv_page = page,
  45                .bv_offset = page_offset,
  46                .bv_len = length
  47        };
  48        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  49        int r;
  50
  51        BUG_ON(page_offset + length > PAGE_SIZE);
  52        iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length);
  53        r = sock_recvmsg(sock, &msg, msg.msg_flags);
  54        if (r == -EAGAIN)
  55                r = 0;
  56        return r;
  57}
  58
  59/*
  60 * write something.  @more is true if caller will be sending more data
  61 * shortly.
  62 */
  63static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
  64                            size_t kvlen, size_t len, bool more)
  65{
  66        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  67        int r;
  68
  69        if (more)
  70                msg.msg_flags |= MSG_MORE;
  71        else
  72                msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
  73
  74        r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
  75        if (r == -EAGAIN)
  76                r = 0;
  77        return r;
  78}
  79
  80/*
  81 * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
  82 */
  83static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
  84                             int offset, size_t size, int more)
  85{
  86        ssize_t (*sendpage)(struct socket *sock, struct page *page,
  87                            int offset, size_t size, int flags);
  88        int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
  89        int ret;
  90
  91        /*
  92         * sendpage cannot properly handle pages with page_count == 0,
  93         * we need to fall back to sendmsg if that's the case.
  94         *
  95         * Same goes for slab pages: skb_can_coalesce() allows
  96         * coalescing neighboring slab objects into a single frag which
  97         * triggers one of hardened usercopy checks.
  98         */
  99        if (sendpage_ok(page))
 100                sendpage = sock->ops->sendpage;
 101        else
 102                sendpage = sock_no_sendpage;
 103
 104        ret = sendpage(sock, page, offset, size, flags);
 105        if (ret == -EAGAIN)
 106                ret = 0;
 107
 108        return ret;
 109}
 110
 111static void con_out_kvec_reset(struct ceph_connection *con)
 112{
 113        BUG_ON(con->v1.out_skip);
 114
 115        con->v1.out_kvec_left = 0;
 116        con->v1.out_kvec_bytes = 0;
 117        con->v1.out_kvec_cur = &con->v1.out_kvec[0];
 118}
 119
 120static void con_out_kvec_add(struct ceph_connection *con,
 121                                size_t size, void *data)
 122{
 123        int index = con->v1.out_kvec_left;
 124
 125        BUG_ON(con->v1.out_skip);
 126        BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
 127
 128        con->v1.out_kvec[index].iov_len = size;
 129        con->v1.out_kvec[index].iov_base = data;
 130        con->v1.out_kvec_left++;
 131        con->v1.out_kvec_bytes += size;
 132}
 133
 134/*
 135 * Chop off a kvec from the end.  Return residual number of bytes for
 136 * that kvec, i.e. how many bytes would have been written if the kvec
 137 * hadn't been nuked.
 138 */
 139static int con_out_kvec_skip(struct ceph_connection *con)
 140{
 141        int skip = 0;
 142
 143        if (con->v1.out_kvec_bytes > 0) {
 144                skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
 145                BUG_ON(con->v1.out_kvec_bytes < skip);
 146                BUG_ON(!con->v1.out_kvec_left);
 147                con->v1.out_kvec_bytes -= skip;
 148                con->v1.out_kvec_left--;
 149        }
 150
 151        return skip;
 152}
 153
 154static size_t sizeof_footer(struct ceph_connection *con)
 155{
 156        return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
 157            sizeof(struct ceph_msg_footer) :
 158            sizeof(struct ceph_msg_footer_old);
 159}
 160
 161static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 162{
 163        /* Initialize data cursor */
 164
 165        ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
 166}
 167
 168/*
 169 * Prepare footer for currently outgoing message, and finish things
 170 * off.  Assumes out_kvec* are already valid.. we just add on to the end.
 171 */
 172static void prepare_write_message_footer(struct ceph_connection *con)
 173{
 174        struct ceph_msg *m = con->out_msg;
 175
 176        m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 177
 178        dout("prepare_write_message_footer %p\n", con);
 179        con_out_kvec_add(con, sizeof_footer(con), &m->footer);
 180        if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
 181                if (con->ops->sign_message)
 182                        con->ops->sign_message(m);
 183                else
 184                        m->footer.sig = 0;
 185        } else {
 186                m->old_footer.flags = m->footer.flags;
 187        }
 188        con->v1.out_more = m->more_to_follow;
 189        con->v1.out_msg_done = true;
 190}
 191
 192/*
 193 * Prepare headers for the next outgoing message.
 194 */
 195static void prepare_write_message(struct ceph_connection *con)
 196{
 197        struct ceph_msg *m;
 198        u32 crc;
 199
 200        con_out_kvec_reset(con);
 201        con->v1.out_msg_done = false;
 202
 203        /* Sneak an ack in there first?  If we can get it into the same
 204         * TCP packet that's a good thing. */
 205        if (con->in_seq > con->in_seq_acked) {
 206                con->in_seq_acked = con->in_seq;
 207                con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 208                con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 209                con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 210                        &con->v1.out_temp_ack);
 211        }
 212
 213        ceph_con_get_out_msg(con);
 214        m = con->out_msg;
 215
 216        dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
 217             m, con->out_seq, le16_to_cpu(m->hdr.type),
 218             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
 219             m->data_length);
 220        WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
 221        WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
 222
 223        /* tag + hdr + front + middle */
 224        con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
 225        con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
 226        con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 227
 228        if (m->middle)
 229                con_out_kvec_add(con, m->middle->vec.iov_len,
 230                        m->middle->vec.iov_base);
 231
 232        /* fill in hdr crc and finalize hdr */
 233        crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
 234        con->out_msg->hdr.crc = cpu_to_le32(crc);
 235        memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
 236
 237        /* fill in front and middle crc, footer */
 238        crc = crc32c(0, m->front.iov_base, m->front.iov_len);
 239        con->out_msg->footer.front_crc = cpu_to_le32(crc);
 240        if (m->middle) {
 241                crc = crc32c(0, m->middle->vec.iov_base,
 242                                m->middle->vec.iov_len);
 243                con->out_msg->footer.middle_crc = cpu_to_le32(crc);
 244        } else
 245                con->out_msg->footer.middle_crc = 0;
 246        dout("%s front_crc %u middle_crc %u\n", __func__,
 247             le32_to_cpu(con->out_msg->footer.front_crc),
 248             le32_to_cpu(con->out_msg->footer.middle_crc));
 249        con->out_msg->footer.flags = 0;
 250
 251        /* is there a data payload? */
 252        con->out_msg->footer.data_crc = 0;
 253        if (m->data_length) {
 254                prepare_message_data(con->out_msg, m->data_length);
 255                con->v1.out_more = 1;  /* data + footer will follow */
 256        } else {
 257                /* no, queue up footer too and be done */
 258                prepare_write_message_footer(con);
 259        }
 260
 261        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 262}
 263
 264/*
 265 * Prepare an ack.
 266 */
 267static void prepare_write_ack(struct ceph_connection *con)
 268{
 269        dout("prepare_write_ack %p %llu -> %llu\n", con,
 270             con->in_seq_acked, con->in_seq);
 271        con->in_seq_acked = con->in_seq;
 272
 273        con_out_kvec_reset(con);
 274
 275        con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 276
 277        con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 278        con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 279                         &con->v1.out_temp_ack);
 280
 281        con->v1.out_more = 1;  /* more will follow.. eventually.. */
 282        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 283}
 284
 285/*
 286 * Prepare to share the seq during handshake
 287 */
 288static void prepare_write_seq(struct ceph_connection *con)
 289{
 290        dout("prepare_write_seq %p %llu -> %llu\n", con,
 291             con->in_seq_acked, con->in_seq);
 292        con->in_seq_acked = con->in_seq;
 293
 294        con_out_kvec_reset(con);
 295
 296        con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 297        con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 298                         &con->v1.out_temp_ack);
 299
 300        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 301}
 302
 303/*
 304 * Prepare to write keepalive byte.
 305 */
 306static void prepare_write_keepalive(struct ceph_connection *con)
 307{
 308        dout("prepare_write_keepalive %p\n", con);
 309        con_out_kvec_reset(con);
 310        if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
 311                struct timespec64 now;
 312
 313                ktime_get_real_ts64(&now);
 314                con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
 315                ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
 316                con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
 317                                 &con->v1.out_temp_keepalive2);
 318        } else {
 319                con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
 320        }
 321        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 322}
 323
 324/*
 325 * Connection negotiation.
 326 */
 327
 328static int get_connect_authorizer(struct ceph_connection *con)
 329{
 330        struct ceph_auth_handshake *auth;
 331        int auth_proto;
 332
 333        if (!con->ops->get_authorizer) {
 334                con->v1.auth = NULL;
 335                con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
 336                con->v1.out_connect.authorizer_len = 0;
 337                return 0;
 338        }
 339
 340        auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
 341        if (IS_ERR(auth))
 342                return PTR_ERR(auth);
 343
 344        con->v1.auth = auth;
 345        con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
 346        con->v1.out_connect.authorizer_len =
 347                cpu_to_le32(auth->authorizer_buf_len);
 348        return 0;
 349}
 350
 351/*
 352 * We connected to a peer and are saying hello.
 353 */
 354static void prepare_write_banner(struct ceph_connection *con)
 355{
 356        con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
 357        con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
 358                                        &con->msgr->my_enc_addr);
 359
 360        con->v1.out_more = 0;
 361        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 362}
 363
 364static void __prepare_write_connect(struct ceph_connection *con)
 365{
 366        con_out_kvec_add(con, sizeof(con->v1.out_connect),
 367                         &con->v1.out_connect);
 368        if (con->v1.auth)
 369                con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
 370                                 con->v1.auth->authorizer_buf);
 371
 372        con->v1.out_more = 0;
 373        ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 374}
 375
 376static int prepare_write_connect(struct ceph_connection *con)
 377{
 378        unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
 379        int proto;
 380        int ret;
 381
 382        switch (con->peer_name.type) {
 383        case CEPH_ENTITY_TYPE_MON:
 384                proto = CEPH_MONC_PROTOCOL;
 385                break;
 386        case CEPH_ENTITY_TYPE_OSD:
 387                proto = CEPH_OSDC_PROTOCOL;
 388                break;
 389        case CEPH_ENTITY_TYPE_MDS:
 390                proto = CEPH_MDSC_PROTOCOL;
 391                break;
 392        default:
 393                BUG();
 394        }
 395
 396        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
 397             con->v1.connect_seq, global_seq, proto);
 398
 399        con->v1.out_connect.features =
 400                cpu_to_le64(from_msgr(con->msgr)->supported_features);
 401        con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
 402        con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
 403        con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
 404        con->v1.out_connect.protocol_version = cpu_to_le32(proto);
 405        con->v1.out_connect.flags = 0;
 406
 407        ret = get_connect_authorizer(con);
 408        if (ret)
 409                return ret;
 410
 411        __prepare_write_connect(con);
 412        return 0;
 413}
 414
 415/*
 416 * write as much of pending kvecs to the socket as we can.
 417 *  1 -> done
 418 *  0 -> socket full, but more to do
 419 * <0 -> error
 420 */
 421static int write_partial_kvec(struct ceph_connection *con)
 422{
 423        int ret;
 424
 425        dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
 426        while (con->v1.out_kvec_bytes > 0) {
 427                ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
 428                                       con->v1.out_kvec_left,
 429                                       con->v1.out_kvec_bytes,
 430                                       con->v1.out_more);
 431                if (ret <= 0)
 432                        goto out;
 433                con->v1.out_kvec_bytes -= ret;
 434                if (!con->v1.out_kvec_bytes)
 435                        break;            /* done */
 436
 437                /* account for full iov entries consumed */
 438                while (ret >= con->v1.out_kvec_cur->iov_len) {
 439                        BUG_ON(!con->v1.out_kvec_left);
 440                        ret -= con->v1.out_kvec_cur->iov_len;
 441                        con->v1.out_kvec_cur++;
 442                        con->v1.out_kvec_left--;
 443                }
 444                /* and for a partially-consumed entry */
 445                if (ret) {
 446                        con->v1.out_kvec_cur->iov_len -= ret;
 447                        con->v1.out_kvec_cur->iov_base += ret;
 448                }
 449        }
 450        con->v1.out_kvec_left = 0;
 451        ret = 1;
 452out:
 453        dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
 454             con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
 455        return ret;  /* done! */
 456}
 457
 458/*
 459 * Write as much message data payload as we can.  If we finish, queue
 460 * up the footer.
 461 *  1 -> done, footer is now queued in out_kvec[].
 462 *  0 -> socket full, but more to do
 463 * <0 -> error
 464 */
 465static int write_partial_message_data(struct ceph_connection *con)
 466{
 467        struct ceph_msg *msg = con->out_msg;
 468        struct ceph_msg_data_cursor *cursor = &msg->cursor;
 469        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
 470        int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 471        u32 crc;
 472
 473        dout("%s %p msg %p\n", __func__, con, msg);
 474
 475        if (!msg->num_data_items)
 476                return -EINVAL;
 477
 478        /*
 479         * Iterate through each page that contains data to be
 480         * written, and send as much as possible for each.
 481         *
 482         * If we are calculating the data crc (the default), we will
 483         * need to map the page.  If we have no pages, they have
 484         * been revoked, so use the zero page.
 485         */
 486        crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
 487        while (cursor->total_resid) {
 488                struct page *page;
 489                size_t page_offset;
 490                size_t length;
 491                int ret;
 492
 493                if (!cursor->resid) {
 494                        ceph_msg_data_advance(cursor, 0);
 495                        continue;
 496                }
 497
 498                page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
 499                if (length == cursor->total_resid)
 500                        more = MSG_MORE;
 501                ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
 502                                        more);
 503                if (ret <= 0) {
 504                        if (do_datacrc)
 505                                msg->footer.data_crc = cpu_to_le32(crc);
 506
 507                        return ret;
 508                }
 509                if (do_datacrc && cursor->need_crc)
 510                        crc = ceph_crc32c_page(crc, page, page_offset, length);
 511                ceph_msg_data_advance(cursor, (size_t)ret);
 512        }
 513
 514        dout("%s %p msg %p done\n", __func__, con, msg);
 515
 516        /* prepare and queue up footer, too */
 517        if (do_datacrc)
 518                msg->footer.data_crc = cpu_to_le32(crc);
 519        else
 520                msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
 521        con_out_kvec_reset(con);
 522        prepare_write_message_footer(con);
 523
 524        return 1;       /* must return > 0 to indicate success */
 525}
 526
 527/*
 528 * write some zeros
 529 */
 530static int write_partial_skip(struct ceph_connection *con)
 531{
 532        int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 533        int ret;
 534
 535        dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
 536        while (con->v1.out_skip > 0) {
 537                size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
 538
 539                if (size == con->v1.out_skip)
 540                        more = MSG_MORE;
 541                ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
 542                                        more);
 543                if (ret <= 0)
 544                        goto out;
 545                con->v1.out_skip -= ret;
 546        }
 547        ret = 1;
 548out:
 549        return ret;
 550}
 551
 552/*
 553 * Prepare to read connection handshake, or an ack.
 554 */
 555static void prepare_read_banner(struct ceph_connection *con)
 556{
 557        dout("prepare_read_banner %p\n", con);
 558        con->v1.in_base_pos = 0;
 559}
 560
 561static void prepare_read_connect(struct ceph_connection *con)
 562{
 563        dout("prepare_read_connect %p\n", con);
 564        con->v1.in_base_pos = 0;
 565}
 566
 567static void prepare_read_ack(struct ceph_connection *con)
 568{
 569        dout("prepare_read_ack %p\n", con);
 570        con->v1.in_base_pos = 0;
 571}
 572
 573static void prepare_read_seq(struct ceph_connection *con)
 574{
 575        dout("prepare_read_seq %p\n", con);
 576        con->v1.in_base_pos = 0;
 577        con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
 578}
 579
 580static void prepare_read_tag(struct ceph_connection *con)
 581{
 582        dout("prepare_read_tag %p\n", con);
 583        con->v1.in_base_pos = 0;
 584        con->v1.in_tag = CEPH_MSGR_TAG_READY;
 585}
 586
 587static void prepare_read_keepalive_ack(struct ceph_connection *con)
 588{
 589        dout("prepare_read_keepalive_ack %p\n", con);
 590        con->v1.in_base_pos = 0;
 591}
 592
 593/*
 594 * Prepare to read a message.
 595 */
 596static int prepare_read_message(struct ceph_connection *con)
 597{
 598        dout("prepare_read_message %p\n", con);
 599        BUG_ON(con->in_msg != NULL);
 600        con->v1.in_base_pos = 0;
 601        con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
 602        return 0;
 603}
 604
 605static int read_partial(struct ceph_connection *con,
 606                        int end, int size, void *object)
 607{
 608        while (con->v1.in_base_pos < end) {
 609                int left = end - con->v1.in_base_pos;
 610                int have = size - left;
 611                int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
 612                if (ret <= 0)
 613                        return ret;
 614                con->v1.in_base_pos += ret;
 615        }
 616        return 1;
 617}
 618
 619/*
 620 * Read all or part of the connect-side handshake on a new connection
 621 */
 622static int read_partial_banner(struct ceph_connection *con)
 623{
 624        int size;
 625        int end;
 626        int ret;
 627
 628        dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
 629
 630        /* peer's banner */
 631        size = strlen(CEPH_BANNER);
 632        end = size;
 633        ret = read_partial(con, end, size, con->v1.in_banner);
 634        if (ret <= 0)
 635                goto out;
 636
 637        size = sizeof(con->v1.actual_peer_addr);
 638        end += size;
 639        ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
 640        if (ret <= 0)
 641                goto out;
 642        ceph_decode_banner_addr(&con->v1.actual_peer_addr);
 643
 644        size = sizeof(con->v1.peer_addr_for_me);
 645        end += size;
 646        ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
 647        if (ret <= 0)
 648                goto out;
 649        ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
 650
 651out:
 652        return ret;
 653}
 654
 655static int read_partial_connect(struct ceph_connection *con)
 656{
 657        int size;
 658        int end;
 659        int ret;
 660
 661        dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
 662
 663        size = sizeof(con->v1.in_reply);
 664        end = size;
 665        ret = read_partial(con, end, size, &con->v1.in_reply);
 666        if (ret <= 0)
 667                goto out;
 668
 669        if (con->v1.auth) {
 670                size = le32_to_cpu(con->v1.in_reply.authorizer_len);
 671                if (size > con->v1.auth->authorizer_reply_buf_len) {
 672                        pr_err("authorizer reply too big: %d > %zu\n", size,
 673                               con->v1.auth->authorizer_reply_buf_len);
 674                        ret = -EINVAL;
 675                        goto out;
 676                }
 677
 678                end += size;
 679                ret = read_partial(con, end, size,
 680                                   con->v1.auth->authorizer_reply_buf);
 681                if (ret <= 0)
 682                        goto out;
 683        }
 684
 685        dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
 686             con, con->v1.in_reply.tag,
 687             le32_to_cpu(con->v1.in_reply.connect_seq),
 688             le32_to_cpu(con->v1.in_reply.global_seq));
 689out:
 690        return ret;
 691}
 692
 693/*
 694 * Verify the hello banner looks okay.
 695 */
 696static int verify_hello(struct ceph_connection *con)
 697{
 698        if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
 699                pr_err("connect to %s got bad banner\n",
 700                       ceph_pr_addr(&con->peer_addr));
 701                con->error_msg = "protocol error, bad banner";
 702                return -1;
 703        }
 704        return 0;
 705}
 706
 707static int process_banner(struct ceph_connection *con)
 708{
 709        struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
 710
 711        dout("process_banner on %p\n", con);
 712
 713        if (verify_hello(con) < 0)
 714                return -1;
 715
 716        /*
 717         * Make sure the other end is who we wanted.  note that the other
 718         * end may not yet know their ip address, so if it's 0.0.0.0, give
 719         * them the benefit of the doubt.
 720         */
 721        if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
 722                   sizeof(con->peer_addr)) != 0 &&
 723            !(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
 724              con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
 725                pr_warn("wrong peer, want %s/%u, got %s/%u\n",
 726                        ceph_pr_addr(&con->peer_addr),
 727                        le32_to_cpu(con->peer_addr.nonce),
 728                        ceph_pr_addr(&con->v1.actual_peer_addr),
 729                        le32_to_cpu(con->v1.actual_peer_addr.nonce));
 730                con->error_msg = "wrong peer at address";
 731                return -1;
 732        }
 733
 734        /*
 735         * did we learn our address?
 736         */
 737        if (ceph_addr_is_blank(my_addr)) {
 738                memcpy(&my_addr->in_addr,
 739                       &con->v1.peer_addr_for_me.in_addr,
 740                       sizeof(con->v1.peer_addr_for_me.in_addr));
 741                ceph_addr_set_port(my_addr, 0);
 742                ceph_encode_my_addr(con->msgr);
 743                dout("process_banner learned my addr is %s\n",
 744                     ceph_pr_addr(my_addr));
 745        }
 746
 747        return 0;
 748}
 749
 750static int process_connect(struct ceph_connection *con)
 751{
 752        u64 sup_feat = from_msgr(con->msgr)->supported_features;
 753        u64 req_feat = from_msgr(con->msgr)->required_features;
 754        u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
 755        int ret;
 756
 757        dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
 758
 759        if (con->v1.auth) {
 760                int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
 761
 762                /*
 763                 * Any connection that defines ->get_authorizer()
 764                 * should also define ->add_authorizer_challenge() and
 765                 * ->verify_authorizer_reply().
 766                 *
 767                 * See get_connect_authorizer().
 768                 */
 769                if (con->v1.in_reply.tag ==
 770                                CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
 771                        ret = con->ops->add_authorizer_challenge(
 772                                con, con->v1.auth->authorizer_reply_buf, len);
 773                        if (ret < 0)
 774                                return ret;
 775
 776                        con_out_kvec_reset(con);
 777                        __prepare_write_connect(con);
 778                        prepare_read_connect(con);
 779                        return 0;
 780                }
 781
 782                if (len) {
 783                        ret = con->ops->verify_authorizer_reply(con);
 784                        if (ret < 0) {
 785                                con->error_msg = "bad authorize reply";
 786                                return ret;
 787                        }
 788                }
 789        }
 790
 791        switch (con->v1.in_reply.tag) {
 792        case CEPH_MSGR_TAG_FEATURES:
 793                pr_err("%s%lld %s feature set mismatch,"
 794                       " my %llx < server's %llx, missing %llx\n",
 795                       ENTITY_NAME(con->peer_name),
 796                       ceph_pr_addr(&con->peer_addr),
 797                       sup_feat, server_feat, server_feat & ~sup_feat);
 798                con->error_msg = "missing required protocol features";
 799                return -1;
 800
 801        case CEPH_MSGR_TAG_BADPROTOVER:
 802                pr_err("%s%lld %s protocol version mismatch,"
 803                       " my %d != server's %d\n",
 804                       ENTITY_NAME(con->peer_name),
 805                       ceph_pr_addr(&con->peer_addr),
 806                       le32_to_cpu(con->v1.out_connect.protocol_version),
 807                       le32_to_cpu(con->v1.in_reply.protocol_version));
 808                con->error_msg = "protocol version mismatch";
 809                return -1;
 810
 811        case CEPH_MSGR_TAG_BADAUTHORIZER:
 812                con->v1.auth_retry++;
 813                dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
 814                     con->v1.auth_retry);
 815                if (con->v1.auth_retry == 2) {
 816                        con->error_msg = "connect authorization failure";
 817                        return -1;
 818                }
 819                con_out_kvec_reset(con);
 820                ret = prepare_write_connect(con);
 821                if (ret < 0)
 822                        return ret;
 823                prepare_read_connect(con);
 824                break;
 825
 826        case CEPH_MSGR_TAG_RESETSESSION:
 827                /*
 828                 * If we connected with a large connect_seq but the peer
 829                 * has no record of a session with us (no connection, or
 830                 * connect_seq == 0), they will send RESETSESION to indicate
 831                 * that they must have reset their session, and may have
 832                 * dropped messages.
 833                 */
 834                dout("process_connect got RESET peer seq %u\n",
 835                     le32_to_cpu(con->v1.in_reply.connect_seq));
 836                pr_info("%s%lld %s session reset\n",
 837                        ENTITY_NAME(con->peer_name),
 838                        ceph_pr_addr(&con->peer_addr));
 839                ceph_con_reset_session(con);
 840                con_out_kvec_reset(con);
 841                ret = prepare_write_connect(con);
 842                if (ret < 0)
 843                        return ret;
 844                prepare_read_connect(con);
 845
 846                /* Tell ceph about it. */
 847                mutex_unlock(&con->mutex);
 848                if (con->ops->peer_reset)
 849                        con->ops->peer_reset(con);
 850                mutex_lock(&con->mutex);
 851                if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
 852                        return -EAGAIN;
 853                break;
 854
 855        case CEPH_MSGR_TAG_RETRY_SESSION:
 856                /*
 857                 * If we sent a smaller connect_seq than the peer has, try
 858                 * again with a larger value.
 859                 */
 860                dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
 861                     le32_to_cpu(con->v1.out_connect.connect_seq),
 862                     le32_to_cpu(con->v1.in_reply.connect_seq));
 863                con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
 864                con_out_kvec_reset(con);
 865                ret = prepare_write_connect(con);
 866                if (ret < 0)
 867                        return ret;
 868                prepare_read_connect(con);
 869                break;
 870
 871        case CEPH_MSGR_TAG_RETRY_GLOBAL:
 872                /*
 873                 * If we sent a smaller global_seq than the peer has, try
 874                 * again with a larger value.
 875                 */
 876                dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
 877                     con->v1.peer_global_seq,
 878                     le32_to_cpu(con->v1.in_reply.global_seq));
 879                ceph_get_global_seq(con->msgr,
 880                                    le32_to_cpu(con->v1.in_reply.global_seq));
 881                con_out_kvec_reset(con);
 882                ret = prepare_write_connect(con);
 883                if (ret < 0)
 884                        return ret;
 885                prepare_read_connect(con);
 886                break;
 887
 888        case CEPH_MSGR_TAG_SEQ:
 889        case CEPH_MSGR_TAG_READY:
 890                if (req_feat & ~server_feat) {
 891                        pr_err("%s%lld %s protocol feature mismatch,"
 892                               " my required %llx > server's %llx, need %llx\n",
 893                               ENTITY_NAME(con->peer_name),
 894                               ceph_pr_addr(&con->peer_addr),
 895                               req_feat, server_feat, req_feat & ~server_feat);
 896                        con->error_msg = "missing required protocol features";
 897                        return -1;
 898                }
 899
 900                WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
 901                con->state = CEPH_CON_S_OPEN;
 902                con->v1.auth_retry = 0;    /* we authenticated; clear flag */
 903                con->v1.peer_global_seq =
 904                        le32_to_cpu(con->v1.in_reply.global_seq);
 905                con->v1.connect_seq++;
 906                con->peer_features = server_feat;
 907                dout("process_connect got READY gseq %d cseq %d (%d)\n",
 908                     con->v1.peer_global_seq,
 909                     le32_to_cpu(con->v1.in_reply.connect_seq),
 910                     con->v1.connect_seq);
 911                WARN_ON(con->v1.connect_seq !=
 912                        le32_to_cpu(con->v1.in_reply.connect_seq));
 913
 914                if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
 915                        ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
 916
 917                con->delay = 0;      /* reset backoff memory */
 918
 919                if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
 920                        prepare_write_seq(con);
 921                        prepare_read_seq(con);
 922                } else {
 923                        prepare_read_tag(con);
 924                }
 925                break;
 926
 927        case CEPH_MSGR_TAG_WAIT:
 928                /*
 929                 * If there is a connection race (we are opening
 930                 * connections to each other), one of us may just have
 931                 * to WAIT.  This shouldn't happen if we are the
 932                 * client.
 933                 */
 934                con->error_msg = "protocol error, got WAIT as client";
 935                return -1;
 936
 937        default:
 938                con->error_msg = "protocol error, garbage tag during connect";
 939                return -1;
 940        }
 941        return 0;
 942}
 943
 944/*
 945 * read (part of) an ack
 946 */
 947static int read_partial_ack(struct ceph_connection *con)
 948{
 949        int size = sizeof(con->v1.in_temp_ack);
 950        int end = size;
 951
 952        return read_partial(con, end, size, &con->v1.in_temp_ack);
 953}
 954
 955/*
 956 * We can finally discard anything that's been acked.
 957 */
 958static void process_ack(struct ceph_connection *con)
 959{
 960        u64 ack = le64_to_cpu(con->v1.in_temp_ack);
 961
 962        if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
 963                ceph_con_discard_sent(con, ack);
 964        else
 965                ceph_con_discard_requeued(con, ack);
 966
 967        prepare_read_tag(con);
 968}
 969
 970static int read_partial_message_section(struct ceph_connection *con,
 971                                        struct kvec *section,
 972                                        unsigned int sec_len, u32 *crc)
 973{
 974        int ret, left;
 975
 976        BUG_ON(!section);
 977
 978        while (section->iov_len < sec_len) {
 979                BUG_ON(section->iov_base == NULL);
 980                left = sec_len - section->iov_len;
 981                ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
 982                                       section->iov_len, left);
 983                if (ret <= 0)
 984                        return ret;
 985                section->iov_len += ret;
 986        }
 987        if (section->iov_len == sec_len)
 988                *crc = crc32c(0, section->iov_base, section->iov_len);
 989
 990        return 1;
 991}
 992
 993static int read_partial_msg_data(struct ceph_connection *con)
 994{
 995        struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
 996        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
 997        struct page *page;
 998        size_t page_offset;
 999        size_t length;
1000        u32 crc = 0;
1001        int ret;
1002
1003        if (do_datacrc)
1004                crc = con->in_data_crc;
1005        while (cursor->total_resid) {
1006                if (!cursor->resid) {
1007                        ceph_msg_data_advance(cursor, 0);
1008                        continue;
1009                }
1010
1011                page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
1012                ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
1013                if (ret <= 0) {
1014                        if (do_datacrc)
1015                                con->in_data_crc = crc;
1016
1017                        return ret;
1018                }
1019
1020                if (do_datacrc)
1021                        crc = ceph_crc32c_page(crc, page, page_offset, ret);
1022                ceph_msg_data_advance(cursor, (size_t)ret);
1023        }
1024        if (do_datacrc)
1025                con->in_data_crc = crc;
1026
1027        return 1;       /* must return > 0 to indicate success */
1028}
1029
1030static int read_partial_msg_data_bounce(struct ceph_connection *con)
1031{
1032        struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1033        struct page *page;
1034        size_t off, len;
1035        u32 crc;
1036        int ret;
1037
1038        if (unlikely(!con->bounce_page)) {
1039                con->bounce_page = alloc_page(GFP_NOIO);
1040                if (!con->bounce_page) {
1041                        pr_err("failed to allocate bounce page\n");
1042                        return -ENOMEM;
1043                }
1044        }
1045
1046        crc = con->in_data_crc;
1047        while (cursor->total_resid) {
1048                if (!cursor->resid) {
1049                        ceph_msg_data_advance(cursor, 0);
1050                        continue;
1051                }
1052
1053                page = ceph_msg_data_next(cursor, &off, &len, NULL);
1054                ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
1055                if (ret <= 0) {
1056                        con->in_data_crc = crc;
1057                        return ret;
1058                }
1059
1060                crc = crc32c(crc, page_address(con->bounce_page), ret);
1061                memcpy_to_page(page, off, page_address(con->bounce_page), ret);
1062
1063                ceph_msg_data_advance(cursor, ret);
1064        }
1065        con->in_data_crc = crc;
1066
1067        return 1;       /* must return > 0 to indicate success */
1068}
1069
1070/*
1071 * read (part of) a message.
1072 */
1073static int read_partial_message(struct ceph_connection *con)
1074{
1075        struct ceph_msg *m = con->in_msg;
1076        int size;
1077        int end;
1078        int ret;
1079        unsigned int front_len, middle_len, data_len;
1080        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1081        bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
1082        u64 seq;
1083        u32 crc;
1084
1085        dout("read_partial_message con %p msg %p\n", con, m);
1086
1087        /* header */
1088        size = sizeof(con->v1.in_hdr);
1089        end = size;
1090        ret = read_partial(con, end, size, &con->v1.in_hdr);
1091        if (ret <= 0)
1092                return ret;
1093
1094        crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
1095        if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
1096                pr_err("read_partial_message bad hdr crc %u != expected %u\n",
1097                       crc, con->v1.in_hdr.crc);
1098                return -EBADMSG;
1099        }
1100
1101        front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1102        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1103                return -EIO;
1104        middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1105        if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1106                return -EIO;
1107        data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1108        if (data_len > CEPH_MSG_MAX_DATA_LEN)
1109                return -EIO;
1110
1111        /* verify seq# */
1112        seq = le64_to_cpu(con->v1.in_hdr.seq);
1113        if ((s64)seq - (s64)con->in_seq < 1) {
1114                pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1115                        ENTITY_NAME(con->peer_name),
1116                        ceph_pr_addr(&con->peer_addr),
1117                        seq, con->in_seq + 1);
1118                con->v1.in_base_pos = -front_len - middle_len - data_len -
1119                                      sizeof_footer(con);
1120                con->v1.in_tag = CEPH_MSGR_TAG_READY;
1121                return 1;
1122        } else if ((s64)seq - (s64)con->in_seq > 1) {
1123                pr_err("read_partial_message bad seq %lld expected %lld\n",
1124                       seq, con->in_seq + 1);
1125                con->error_msg = "bad message sequence # for incoming message";
1126                return -EBADE;
1127        }
1128
1129        /* allocate message? */
1130        if (!con->in_msg) {
1131                int skip = 0;
1132
1133                dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
1134                     front_len, data_len);
1135                ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
1136                if (ret < 0)
1137                        return ret;
1138
1139                BUG_ON((!con->in_msg) ^ skip);
1140                if (skip) {
1141                        /* skip this message */
1142                        dout("alloc_msg said skip message\n");
1143                        con->v1.in_base_pos = -front_len - middle_len -
1144                                              data_len - sizeof_footer(con);
1145                        con->v1.in_tag = CEPH_MSGR_TAG_READY;
1146                        con->in_seq++;
1147                        return 1;
1148                }
1149
1150                BUG_ON(!con->in_msg);
1151                BUG_ON(con->in_msg->con != con);
1152                m = con->in_msg;
1153                m->front.iov_len = 0;    /* haven't read it yet */
1154                if (m->middle)
1155                        m->middle->vec.iov_len = 0;
1156
1157                /* prepare for data payload, if any */
1158
1159                if (data_len)
1160                        prepare_message_data(con->in_msg, data_len);
1161        }
1162
1163        /* front */
1164        ret = read_partial_message_section(con, &m->front, front_len,
1165                                           &con->in_front_crc);
1166        if (ret <= 0)
1167                return ret;
1168
1169        /* middle */
1170        if (m->middle) {
1171                ret = read_partial_message_section(con, &m->middle->vec,
1172                                                   middle_len,
1173                                                   &con->in_middle_crc);
1174                if (ret <= 0)
1175                        return ret;
1176        }
1177
1178        /* (page) data */
1179        if (data_len) {
1180                if (!m->num_data_items)
1181                        return -EIO;
1182
1183                if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
1184                        ret = read_partial_msg_data_bounce(con);
1185                else
1186                        ret = read_partial_msg_data(con);
1187                if (ret <= 0)
1188                        return ret;
1189        }
1190
1191        /* footer */
1192        size = sizeof_footer(con);
1193        end += size;
1194        ret = read_partial(con, end, size, &m->footer);
1195        if (ret <= 0)
1196                return ret;
1197
1198        if (!need_sign) {
1199                m->footer.flags = m->old_footer.flags;
1200                m->footer.sig = 0;
1201        }
1202
1203        dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1204             m, front_len, m->footer.front_crc, middle_len,
1205             m->footer.middle_crc, data_len, m->footer.data_crc);
1206
1207        /* crc ok? */
1208        if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1209                pr_err("read_partial_message %p front crc %u != exp. %u\n",
1210                       m, con->in_front_crc, m->footer.front_crc);
1211                return -EBADMSG;
1212        }
1213        if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1214                pr_err("read_partial_message %p middle crc %u != exp %u\n",
1215                       m, con->in_middle_crc, m->footer.middle_crc);
1216                return -EBADMSG;
1217        }
1218        if (do_datacrc &&
1219            (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1220            con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1221                pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1222                       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1223                return -EBADMSG;
1224        }
1225
1226        if (need_sign && con->ops->check_message_signature &&
1227            con->ops->check_message_signature(m)) {
1228                pr_err("read_partial_message %p signature check failed\n", m);
1229                return -EBADMSG;
1230        }
1231
1232        return 1; /* done! */
1233}
1234
1235static int read_keepalive_ack(struct ceph_connection *con)
1236{
1237        struct ceph_timespec ceph_ts;
1238        size_t size = sizeof(ceph_ts);
1239        int ret = read_partial(con, size, size, &ceph_ts);
1240        if (ret <= 0)
1241                return ret;
1242        ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
1243        prepare_read_tag(con);
1244        return 1;
1245}
1246
1247/*
1248 * Read what we can from the socket.
1249 */
1250int ceph_con_v1_try_read(struct ceph_connection *con)
1251{
1252        int ret = -1;
1253
1254more:
1255        dout("try_read start %p state %d\n", con, con->state);
1256        if (con->state != CEPH_CON_S_V1_BANNER &&
1257            con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1258            con->state != CEPH_CON_S_OPEN)
1259                return 0;
1260
1261        BUG_ON(!con->sock);
1262
1263        dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
1264             con->v1.in_base_pos);
1265
1266        if (con->state == CEPH_CON_S_V1_BANNER) {
1267                ret = read_partial_banner(con);
1268                if (ret <= 0)
1269                        goto out;
1270                ret = process_banner(con);
1271                if (ret < 0)
1272                        goto out;
1273
1274                con->state = CEPH_CON_S_V1_CONNECT_MSG;
1275
1276                /*
1277                 * Received banner is good, exchange connection info.
1278                 * Do not reset out_kvec, as sending our banner raced
1279                 * with receiving peer banner after connect completed.
1280                 */
1281                ret = prepare_write_connect(con);
1282                if (ret < 0)
1283                        goto out;
1284                prepare_read_connect(con);
1285
1286                /* Send connection info before awaiting response */
1287                goto out;
1288        }
1289
1290        if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
1291                ret = read_partial_connect(con);
1292                if (ret <= 0)
1293                        goto out;
1294                ret = process_connect(con);
1295                if (ret < 0)
1296                        goto out;
1297                goto more;
1298        }
1299
1300        WARN_ON(con->state != CEPH_CON_S_OPEN);
1301
1302        if (con->v1.in_base_pos < 0) {
1303                /*
1304                 * skipping + discarding content.
1305                 */
1306                ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
1307                if (ret <= 0)
1308                        goto out;
1309                dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
1310                con->v1.in_base_pos += ret;
1311                if (con->v1.in_base_pos)
1312                        goto more;
1313        }
1314        if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
1315                /*
1316                 * what's next?
1317                 */
1318                ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
1319                if (ret <= 0)
1320                        goto out;
1321                dout("try_read got tag %d\n", con->v1.in_tag);
1322                switch (con->v1.in_tag) {
1323                case CEPH_MSGR_TAG_MSG:
1324                        prepare_read_message(con);
1325                        break;
1326                case CEPH_MSGR_TAG_ACK:
1327                        prepare_read_ack(con);
1328                        break;
1329                case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
1330                        prepare_read_keepalive_ack(con);
1331                        break;
1332                case CEPH_MSGR_TAG_CLOSE:
1333                        ceph_con_close_socket(con);
1334                        con->state = CEPH_CON_S_CLOSED;
1335                        goto out;
1336                default:
1337                        goto bad_tag;
1338                }
1339        }
1340        if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
1341                ret = read_partial_message(con);
1342                if (ret <= 0) {
1343                        switch (ret) {
1344                        case -EBADMSG:
1345                                con->error_msg = "bad crc/signature";
1346                                fallthrough;
1347                        case -EBADE:
1348                                ret = -EIO;
1349                                break;
1350                        case -EIO:
1351                                con->error_msg = "io error";
1352                                break;
1353                        }
1354                        goto out;
1355                }
1356                if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
1357                        goto more;
1358                ceph_con_process_message(con);
1359                if (con->state == CEPH_CON_S_OPEN)
1360                        prepare_read_tag(con);
1361                goto more;
1362        }
1363        if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
1364            con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
1365                /*
1366                 * the final handshake seq exchange is semantically
1367                 * equivalent to an ACK
1368                 */
1369                ret = read_partial_ack(con);
1370                if (ret <= 0)
1371                        goto out;
1372                process_ack(con);
1373                goto more;
1374        }
1375        if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1376                ret = read_keepalive_ack(con);
1377                if (ret <= 0)
1378                        goto out;
1379                goto more;
1380        }
1381
1382out:
1383        dout("try_read done on %p ret %d\n", con, ret);
1384        return ret;
1385
1386bad_tag:
1387        pr_err("try_read bad tag %d\n", con->v1.in_tag);
1388        con->error_msg = "protocol error, garbage tag";
1389        ret = -1;
1390        goto out;
1391}
1392
1393/*
1394 * Write something to the socket.  Called in a worker thread when the
1395 * socket appears to be writeable and we have something ready to send.
1396 */
1397int ceph_con_v1_try_write(struct ceph_connection *con)
1398{
1399        int ret = 1;
1400
1401        dout("try_write start %p state %d\n", con, con->state);
1402        if (con->state != CEPH_CON_S_PREOPEN &&
1403            con->state != CEPH_CON_S_V1_BANNER &&
1404            con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1405            con->state != CEPH_CON_S_OPEN)
1406                return 0;
1407
1408        /* open the socket first? */
1409        if (con->state == CEPH_CON_S_PREOPEN) {
1410                BUG_ON(con->sock);
1411                con->state = CEPH_CON_S_V1_BANNER;
1412
1413                con_out_kvec_reset(con);
1414                prepare_write_banner(con);
1415                prepare_read_banner(con);
1416
1417                BUG_ON(con->in_msg);
1418                con->v1.in_tag = CEPH_MSGR_TAG_READY;
1419                dout("try_write initiating connect on %p new state %d\n",
1420                     con, con->state);
1421                ret = ceph_tcp_connect(con);
1422                if (ret < 0) {
1423                        con->error_msg = "connect error";
1424                        goto out;
1425                }
1426        }
1427
1428more:
1429        dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
1430        BUG_ON(!con->sock);
1431
1432        /* kvec data queued? */
1433        if (con->v1.out_kvec_left) {
1434                ret = write_partial_kvec(con);
1435                if (ret <= 0)
1436                        goto out;
1437        }
1438        if (con->v1.out_skip) {
1439                ret = write_partial_skip(con);
1440                if (ret <= 0)
1441                        goto out;
1442        }
1443
1444        /* msg pages? */
1445        if (con->out_msg) {
1446                if (con->v1.out_msg_done) {
1447                        ceph_msg_put(con->out_msg);
1448                        con->out_msg = NULL;   /* we're done with this one */
1449                        goto do_next;
1450                }
1451
1452                ret = write_partial_message_data(con);
1453                if (ret == 1)
1454                        goto more;  /* we need to send the footer, too! */
1455                if (ret == 0)
1456                        goto out;
1457                if (ret < 0) {
1458                        dout("try_write write_partial_message_data err %d\n",
1459                             ret);
1460                        goto out;
1461                }
1462        }
1463
1464do_next:
1465        if (con->state == CEPH_CON_S_OPEN) {
1466                if (ceph_con_flag_test_and_clear(con,
1467                                CEPH_CON_F_KEEPALIVE_PENDING)) {
1468                        prepare_write_keepalive(con);
1469                        goto more;
1470                }
1471                /* is anything else pending? */
1472                if (!list_empty(&con->out_queue)) {
1473                        prepare_write_message(con);
1474                        goto more;
1475                }
1476                if (con->in_seq > con->in_seq_acked) {
1477                        prepare_write_ack(con);
1478                        goto more;
1479                }
1480        }
1481
1482        /* Nothing to do! */
1483        ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1484        dout("try_write nothing else to write.\n");
1485        ret = 0;
1486out:
1487        dout("try_write done on %p ret %d\n", con, ret);
1488        return ret;
1489}
1490
1491void ceph_con_v1_revoke(struct ceph_connection *con)
1492{
1493        struct ceph_msg *msg = con->out_msg;
1494
1495        WARN_ON(con->v1.out_skip);
1496        /* footer */
1497        if (con->v1.out_msg_done) {
1498                con->v1.out_skip += con_out_kvec_skip(con);
1499        } else {
1500                WARN_ON(!msg->data_length);
1501                con->v1.out_skip += sizeof_footer(con);
1502        }
1503        /* data, middle, front */
1504        if (msg->data_length)
1505                con->v1.out_skip += msg->cursor.total_resid;
1506        if (msg->middle)
1507                con->v1.out_skip += con_out_kvec_skip(con);
1508        con->v1.out_skip += con_out_kvec_skip(con);
1509
1510        dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
1511             con->v1.out_kvec_bytes, con->v1.out_skip);
1512}
1513
1514void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
1515{
1516        unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1517        unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1518        unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1519
1520        /* skip rest of message */
1521        con->v1.in_base_pos = con->v1.in_base_pos -
1522                        sizeof(struct ceph_msg_header) -
1523                        front_len -
1524                        middle_len -
1525                        data_len -
1526                        sizeof(struct ceph_msg_footer);
1527
1528        con->v1.in_tag = CEPH_MSGR_TAG_READY;
1529        con->in_seq++;
1530
1531        dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
1532}
1533
1534bool ceph_con_v1_opened(struct ceph_connection *con)
1535{
1536        return con->v1.connect_seq;
1537}
1538
1539void ceph_con_v1_reset_session(struct ceph_connection *con)
1540{
1541        con->v1.connect_seq = 0;
1542        con->v1.peer_global_seq = 0;
1543}
1544
1545void ceph_con_v1_reset_protocol(struct ceph_connection *con)
1546{
1547        con->v1.out_skip = 0;
1548}
1549