linux/net/ceph/osd_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/err.h>
   5#include <linux/highmem.h>
   6#include <linux/mm.h>
   7#include <linux/pagemap.h>
   8#include <linux/slab.h>
   9#include <linux/uaccess.h>
  10#ifdef CONFIG_BLOCK
  11#include <linux/bio.h>
  12#endif
  13
  14#include <linux/ceph/libceph.h>
  15#include <linux/ceph/osd_client.h>
  16#include <linux/ceph/messenger.h>
  17#include <linux/ceph/decode.h>
  18#include <linux/ceph/auth.h>
  19#include <linux/ceph/pagelist.h>
  20
  21#define OSD_OP_FRONT_LEN        4096
  22#define OSD_OPREPLY_FRONT_LEN   512
  23
  24static const struct ceph_connection_operations osd_con_ops;
  25static int __kick_requests(struct ceph_osd_client *osdc,
  26                          struct ceph_osd *kickosd);
  27
  28static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
  29
  30static int op_needs_trail(int op)
  31{
  32        switch (op) {
  33        case CEPH_OSD_OP_GETXATTR:
  34        case CEPH_OSD_OP_SETXATTR:
  35        case CEPH_OSD_OP_CMPXATTR:
  36        case CEPH_OSD_OP_CALL:
  37                return 1;
  38        default:
  39                return 0;
  40        }
  41}
  42
  43static int op_has_extent(int op)
  44{
  45        return (op == CEPH_OSD_OP_READ ||
  46                op == CEPH_OSD_OP_WRITE);
  47}
  48
  49void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
  50                        struct ceph_file_layout *layout,
  51                        u64 snapid,
  52                        u64 off, u64 *plen, u64 *bno,
  53                        struct ceph_osd_request *req,
  54                        struct ceph_osd_req_op *op)
  55{
  56        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
  57        u64 orig_len = *plen;
  58        u64 objoff, objlen;    /* extent in object */
  59
  60        reqhead->snapid = cpu_to_le64(snapid);
  61
  62        /* object extent? */
  63        ceph_calc_file_object_mapping(layout, off, plen, bno,
  64                                      &objoff, &objlen);
  65        if (*plen < orig_len)
  66                dout(" skipping last %llu, final file extent %llu~%llu\n",
  67                     orig_len - *plen, off, *plen);
  68
  69        if (op_has_extent(op->op)) {
  70                op->extent.offset = objoff;
  71                op->extent.length = objlen;
  72        }
  73        req->r_num_pages = calc_pages_for(off, *plen);
  74        req->r_page_alignment = off & ~PAGE_MASK;
  75        if (op->op == CEPH_OSD_OP_WRITE)
  76                op->payload_len = *plen;
  77
  78        dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
  79             *bno, objoff, objlen, req->r_num_pages);
  80
  81}
  82EXPORT_SYMBOL(ceph_calc_raw_layout);
  83
  84/*
  85 * Implement client access to distributed object storage cluster.
  86 *
  87 * All data objects are stored within a cluster/cloud of OSDs, or
  88 * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
  89 * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
  90 * remote daemons serving up and coordinating consistent and safe
  91 * access to storage.
  92 *
  93 * Cluster membership and the mapping of data objects onto storage devices
  94 * are described by the osd map.
  95 *
  96 * We keep track of pending OSD requests (read, write), resubmit
  97 * requests to different OSDs when the cluster topology/data layout
  98 * change, or retry the affected requests when the communications
  99 * channel with an OSD is reset.
 100 */
 101
 102/*
 103 * calculate the mapping of a file extent onto an object, and fill out the
 104 * request accordingly.  shorten extent as necessary if it crosses an
 105 * object boundary.
 106 *
 107 * fill osd op in request message.
 108 */
 109static void calc_layout(struct ceph_osd_client *osdc,
 110                        struct ceph_vino vino,
 111                        struct ceph_file_layout *layout,
 112                        u64 off, u64 *plen,
 113                        struct ceph_osd_request *req,
 114                        struct ceph_osd_req_op *op)
 115{
 116        u64 bno;
 117
 118        ceph_calc_raw_layout(osdc, layout, vino.snap, off,
 119                             plen, &bno, req, op);
 120
 121        sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
 122        req->r_oid_len = strlen(req->r_oid);
 123}
 124
 125/*
 126 * requests
 127 */
 128void ceph_osdc_release_request(struct kref *kref)
 129{
 130        struct ceph_osd_request *req = container_of(kref,
 131                                                    struct ceph_osd_request,
 132                                                    r_kref);
 133
 134        if (req->r_request)
 135                ceph_msg_put(req->r_request);
 136        if (req->r_reply)
 137                ceph_msg_put(req->r_reply);
 138        if (req->r_con_filling_msg) {
 139                dout("release_request revoking pages %p from con %p\n",
 140                     req->r_pages, req->r_con_filling_msg);
 141                ceph_con_revoke_message(req->r_con_filling_msg,
 142                                      req->r_reply);
 143                ceph_con_put(req->r_con_filling_msg);
 144        }
 145        if (req->r_own_pages)
 146                ceph_release_page_vector(req->r_pages,
 147                                         req->r_num_pages);
 148#ifdef CONFIG_BLOCK
 149        if (req->r_bio)
 150                bio_put(req->r_bio);
 151#endif
 152        ceph_put_snap_context(req->r_snapc);
 153        if (req->r_trail) {
 154                ceph_pagelist_release(req->r_trail);
 155                kfree(req->r_trail);
 156        }
 157        if (req->r_mempool)
 158                mempool_free(req, req->r_osdc->req_mempool);
 159        else
 160                kfree(req);
 161}
 162EXPORT_SYMBOL(ceph_osdc_release_request);
 163
 164static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
 165{
 166        int i = 0;
 167
 168        if (needs_trail)
 169                *needs_trail = 0;
 170        while (ops[i].op) {
 171                if (needs_trail && op_needs_trail(ops[i].op))
 172                        *needs_trail = 1;
 173                i++;
 174        }
 175
 176        return i;
 177}
 178
 179struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 180                                               int flags,
 181                                               struct ceph_snap_context *snapc,
 182                                               struct ceph_osd_req_op *ops,
 183                                               bool use_mempool,
 184                                               gfp_t gfp_flags,
 185                                               struct page **pages,
 186                                               struct bio *bio)
 187{
 188        struct ceph_osd_request *req;
 189        struct ceph_msg *msg;
 190        int needs_trail;
 191        int num_op = get_num_ops(ops, &needs_trail);
 192        size_t msg_size = sizeof(struct ceph_osd_request_head);
 193
 194        msg_size += num_op*sizeof(struct ceph_osd_op);
 195
 196        if (use_mempool) {
 197                req = mempool_alloc(osdc->req_mempool, gfp_flags);
 198                memset(req, 0, sizeof(*req));
 199        } else {
 200                req = kzalloc(sizeof(*req), gfp_flags);
 201        }
 202        if (req == NULL)
 203                return NULL;
 204
 205        req->r_osdc = osdc;
 206        req->r_mempool = use_mempool;
 207
 208        kref_init(&req->r_kref);
 209        init_completion(&req->r_completion);
 210        init_completion(&req->r_safe_completion);
 211        INIT_LIST_HEAD(&req->r_unsafe_item);
 212        req->r_flags = flags;
 213
 214        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
 215
 216        /* create reply message */
 217        if (use_mempool)
 218                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 219        else
 220                msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
 221                                   OSD_OPREPLY_FRONT_LEN, gfp_flags);
 222        if (!msg) {
 223                ceph_osdc_put_request(req);
 224                return NULL;
 225        }
 226        req->r_reply = msg;
 227
 228        /* allocate space for the trailing data */
 229        if (needs_trail) {
 230                req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
 231                if (!req->r_trail) {
 232                        ceph_osdc_put_request(req);
 233                        return NULL;
 234                }
 235                ceph_pagelist_init(req->r_trail);
 236        }
 237        /* create request message; allow space for oid */
 238        msg_size += 40;
 239        if (snapc)
 240                msg_size += sizeof(u64) * snapc->num_snaps;
 241        if (use_mempool)
 242                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 243        else
 244                msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
 245        if (!msg) {
 246                ceph_osdc_put_request(req);
 247                return NULL;
 248        }
 249
 250        msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
 251        memset(msg->front.iov_base, 0, msg->front.iov_len);
 252
 253        req->r_request = msg;
 254        req->r_pages = pages;
 255#ifdef CONFIG_BLOCK
 256        if (bio) {
 257                req->r_bio = bio;
 258                bio_get(req->r_bio);
 259        }
 260#endif
 261
 262        return req;
 263}
 264EXPORT_SYMBOL(ceph_osdc_alloc_request);
 265
 266static void osd_req_encode_op(struct ceph_osd_request *req,
 267                              struct ceph_osd_op *dst,
 268                              struct ceph_osd_req_op *src)
 269{
 270        dst->op = cpu_to_le16(src->op);
 271
 272        switch (dst->op) {
 273        case CEPH_OSD_OP_READ:
 274        case CEPH_OSD_OP_WRITE:
 275                dst->extent.offset =
 276                        cpu_to_le64(src->extent.offset);
 277                dst->extent.length =
 278                        cpu_to_le64(src->extent.length);
 279                dst->extent.truncate_size =
 280                        cpu_to_le64(src->extent.truncate_size);
 281                dst->extent.truncate_seq =
 282                        cpu_to_le32(src->extent.truncate_seq);
 283                break;
 284
 285        case CEPH_OSD_OP_GETXATTR:
 286        case CEPH_OSD_OP_SETXATTR:
 287        case CEPH_OSD_OP_CMPXATTR:
 288                BUG_ON(!req->r_trail);
 289
 290                dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
 291                dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
 292                dst->xattr.cmp_op = src->xattr.cmp_op;
 293                dst->xattr.cmp_mode = src->xattr.cmp_mode;
 294                ceph_pagelist_append(req->r_trail, src->xattr.name,
 295                                     src->xattr.name_len);
 296                ceph_pagelist_append(req->r_trail, src->xattr.val,
 297                                     src->xattr.value_len);
 298                break;
 299        case CEPH_OSD_OP_CALL:
 300                BUG_ON(!req->r_trail);
 301
 302                dst->cls.class_len = src->cls.class_len;
 303                dst->cls.method_len = src->cls.method_len;
 304                dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 305
 306                ceph_pagelist_append(req->r_trail, src->cls.class_name,
 307                                     src->cls.class_len);
 308                ceph_pagelist_append(req->r_trail, src->cls.method_name,
 309                                     src->cls.method_len);
 310                ceph_pagelist_append(req->r_trail, src->cls.indata,
 311                                     src->cls.indata_len);
 312                break;
 313        case CEPH_OSD_OP_ROLLBACK:
 314                dst->snap.snapid = cpu_to_le64(src->snap.snapid);
 315                break;
 316        case CEPH_OSD_OP_STARTSYNC:
 317                break;
 318        default:
 319                pr_err("unrecognized osd opcode %d\n", dst->op);
 320                WARN_ON(1);
 321                break;
 322        }
 323        dst->payload_len = cpu_to_le32(src->payload_len);
 324}
 325
 326/*
 327 * build new request AND message
 328 *
 329 */
 330void ceph_osdc_build_request(struct ceph_osd_request *req,
 331                             u64 off, u64 *plen,
 332                             struct ceph_osd_req_op *src_ops,
 333                             struct ceph_snap_context *snapc,
 334                             struct timespec *mtime,
 335                             const char *oid,
 336                             int oid_len)
 337{
 338        struct ceph_msg *msg = req->r_request;
 339        struct ceph_osd_request_head *head;
 340        struct ceph_osd_req_op *src_op;
 341        struct ceph_osd_op *op;
 342        void *p;
 343        int num_op = get_num_ops(src_ops, NULL);
 344        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
 345        int flags = req->r_flags;
 346        u64 data_len = 0;
 347        int i;
 348
 349        head = msg->front.iov_base;
 350        op = (void *)(head + 1);
 351        p = (void *)(op + num_op);
 352
 353        req->r_snapc = ceph_get_snap_context(snapc);
 354
 355        head->client_inc = cpu_to_le32(1); /* always, for now. */
 356        head->flags = cpu_to_le32(flags);
 357        if (flags & CEPH_OSD_FLAG_WRITE)
 358                ceph_encode_timespec(&head->mtime, mtime);
 359        head->num_ops = cpu_to_le16(num_op);
 360
 361
 362        /* fill in oid */
 363        head->object_len = cpu_to_le32(oid_len);
 364        memcpy(p, oid, oid_len);
 365        p += oid_len;
 366
 367        src_op = src_ops;
 368        while (src_op->op) {
 369                osd_req_encode_op(req, op, src_op);
 370                src_op++;
 371                op++;
 372        }
 373
 374        if (req->r_trail)
 375                data_len += req->r_trail->length;
 376
 377        if (snapc) {
 378                head->snap_seq = cpu_to_le64(snapc->seq);
 379                head->num_snaps = cpu_to_le32(snapc->num_snaps);
 380                for (i = 0; i < snapc->num_snaps; i++) {
 381                        put_unaligned_le64(snapc->snaps[i], p);
 382                        p += sizeof(u64);
 383                }
 384        }
 385
 386        if (flags & CEPH_OSD_FLAG_WRITE) {
 387                req->r_request->hdr.data_off = cpu_to_le16(off);
 388                req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
 389        } else if (data_len) {
 390                req->r_request->hdr.data_off = 0;
 391                req->r_request->hdr.data_len = cpu_to_le32(data_len);
 392        }
 393
 394        req->r_request->page_alignment = req->r_page_alignment;
 395
 396        BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
 397        msg_size = p - msg->front.iov_base;
 398        msg->front.iov_len = msg_size;
 399        msg->hdr.front_len = cpu_to_le32(msg_size);
 400        return;
 401}
 402EXPORT_SYMBOL(ceph_osdc_build_request);
 403
 404/*
 405 * build new request AND message, calculate layout, and adjust file
 406 * extent as needed.
 407 *
 408 * if the file was recently truncated, we include information about its
 409 * old and new size so that the object can be updated appropriately.  (we
 410 * avoid synchronously deleting truncated objects because it's slow.)
 411 *
 412 * if @do_sync, include a 'startsync' command so that the osd will flush
 413 * data quickly.
 414 */
 415struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 416                                               struct ceph_file_layout *layout,
 417                                               struct ceph_vino vino,
 418                                               u64 off, u64 *plen,
 419                                               int opcode, int flags,
 420                                               struct ceph_snap_context *snapc,
 421                                               int do_sync,
 422                                               u32 truncate_seq,
 423                                               u64 truncate_size,
 424                                               struct timespec *mtime,
 425                                               bool use_mempool, int num_reply,
 426                                               int page_align)
 427{
 428        struct ceph_osd_req_op ops[3];
 429        struct ceph_osd_request *req;
 430
 431        ops[0].op = opcode;
 432        ops[0].extent.truncate_seq = truncate_seq;
 433        ops[0].extent.truncate_size = truncate_size;
 434        ops[0].payload_len = 0;
 435
 436        if (do_sync) {
 437                ops[1].op = CEPH_OSD_OP_STARTSYNC;
 438                ops[1].payload_len = 0;
 439                ops[2].op = 0;
 440        } else
 441                ops[1].op = 0;
 442
 443        req = ceph_osdc_alloc_request(osdc, flags,
 444                                         snapc, ops,
 445                                         use_mempool,
 446                                         GFP_NOFS, NULL, NULL);
 447        if (IS_ERR(req))
 448                return req;
 449
 450        /* calculate max write size */
 451        calc_layout(osdc, vino, layout, off, plen, req, ops);
 452        req->r_file_layout = *layout;  /* keep a copy */
 453
 454        /* in case it differs from natural alignment that calc_layout
 455           filled in for us */
 456        req->r_page_alignment = page_align;
 457
 458        ceph_osdc_build_request(req, off, plen, ops,
 459                                snapc,
 460                                mtime,
 461                                req->r_oid, req->r_oid_len);
 462
 463        return req;
 464}
 465EXPORT_SYMBOL(ceph_osdc_new_request);
 466
 467/*
 468 * We keep osd requests in an rbtree, sorted by ->r_tid.
 469 */
 470static void __insert_request(struct ceph_osd_client *osdc,
 471                             struct ceph_osd_request *new)
 472{
 473        struct rb_node **p = &osdc->requests.rb_node;
 474        struct rb_node *parent = NULL;
 475        struct ceph_osd_request *req = NULL;
 476
 477        while (*p) {
 478                parent = *p;
 479                req = rb_entry(parent, struct ceph_osd_request, r_node);
 480                if (new->r_tid < req->r_tid)
 481                        p = &(*p)->rb_left;
 482                else if (new->r_tid > req->r_tid)
 483                        p = &(*p)->rb_right;
 484                else
 485                        BUG();
 486        }
 487
 488        rb_link_node(&new->r_node, parent, p);
 489        rb_insert_color(&new->r_node, &osdc->requests);
 490}
 491
 492static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
 493                                                 u64 tid)
 494{
 495        struct ceph_osd_request *req;
 496        struct rb_node *n = osdc->requests.rb_node;
 497
 498        while (n) {
 499                req = rb_entry(n, struct ceph_osd_request, r_node);
 500                if (tid < req->r_tid)
 501                        n = n->rb_left;
 502                else if (tid > req->r_tid)
 503                        n = n->rb_right;
 504                else
 505                        return req;
 506        }
 507        return NULL;
 508}
 509
 510static struct ceph_osd_request *
 511__lookup_request_ge(struct ceph_osd_client *osdc,
 512                    u64 tid)
 513{
 514        struct ceph_osd_request *req;
 515        struct rb_node *n = osdc->requests.rb_node;
 516
 517        while (n) {
 518                req = rb_entry(n, struct ceph_osd_request, r_node);
 519                if (tid < req->r_tid) {
 520                        if (!n->rb_left)
 521                                return req;
 522                        n = n->rb_left;
 523                } else if (tid > req->r_tid) {
 524                        n = n->rb_right;
 525                } else {
 526                        return req;
 527                }
 528        }
 529        return NULL;
 530}
 531
 532
 533/*
 534 * If the osd connection drops, we need to resubmit all requests.
 535 */
 536static void osd_reset(struct ceph_connection *con)
 537{
 538        struct ceph_osd *osd = con->private;
 539        struct ceph_osd_client *osdc;
 540
 541        if (!osd)
 542                return;
 543        dout("osd_reset osd%d\n", osd->o_osd);
 544        osdc = osd->o_osdc;
 545        down_read(&osdc->map_sem);
 546        kick_requests(osdc, osd);
 547        up_read(&osdc->map_sem);
 548}
 549
 550/*
 551 * Track open sessions with osds.
 552 */
 553static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 554{
 555        struct ceph_osd *osd;
 556
 557        osd = kzalloc(sizeof(*osd), GFP_NOFS);
 558        if (!osd)
 559                return NULL;
 560
 561        atomic_set(&osd->o_ref, 1);
 562        osd->o_osdc = osdc;
 563        INIT_LIST_HEAD(&osd->o_requests);
 564        INIT_LIST_HEAD(&osd->o_osd_lru);
 565        osd->o_incarnation = 1;
 566
 567        ceph_con_init(osdc->client->msgr, &osd->o_con);
 568        osd->o_con.private = osd;
 569        osd->o_con.ops = &osd_con_ops;
 570        osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
 571
 572        INIT_LIST_HEAD(&osd->o_keepalive_item);
 573        return osd;
 574}
 575
 576static struct ceph_osd *get_osd(struct ceph_osd *osd)
 577{
 578        if (atomic_inc_not_zero(&osd->o_ref)) {
 579                dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
 580                     atomic_read(&osd->o_ref));
 581                return osd;
 582        } else {
 583                dout("get_osd %p FAIL\n", osd);
 584                return NULL;
 585        }
 586}
 587
 588static void put_osd(struct ceph_osd *osd)
 589{
 590        dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
 591             atomic_read(&osd->o_ref) - 1);
 592        if (atomic_dec_and_test(&osd->o_ref)) {
 593                struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
 594
 595                if (osd->o_authorizer)
 596                        ac->ops->destroy_authorizer(ac, osd->o_authorizer);
 597                kfree(osd);
 598        }
 599}
 600
 601/*
 602 * remove an osd from our map
 603 */
 604static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 605{
 606        dout("__remove_osd %p\n", osd);
 607        BUG_ON(!list_empty(&osd->o_requests));
 608        rb_erase(&osd->o_node, &osdc->osds);
 609        list_del_init(&osd->o_osd_lru);
 610        ceph_con_close(&osd->o_con);
 611        put_osd(osd);
 612}
 613
 614static void __move_osd_to_lru(struct ceph_osd_client *osdc,
 615                              struct ceph_osd *osd)
 616{
 617        dout("__move_osd_to_lru %p\n", osd);
 618        BUG_ON(!list_empty(&osd->o_osd_lru));
 619        list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
 620        osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
 621}
 622
 623static void __remove_osd_from_lru(struct ceph_osd *osd)
 624{
 625        dout("__remove_osd_from_lru %p\n", osd);
 626        if (!list_empty(&osd->o_osd_lru))
 627                list_del_init(&osd->o_osd_lru);
 628}
 629
 630static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
 631{
 632        struct ceph_osd *osd, *nosd;
 633
 634        dout("__remove_old_osds %p\n", osdc);
 635        mutex_lock(&osdc->request_mutex);
 636        list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
 637                if (!remove_all && time_before(jiffies, osd->lru_ttl))
 638                        break;
 639                __remove_osd(osdc, osd);
 640        }
 641        mutex_unlock(&osdc->request_mutex);
 642}
 643
 644/*
 645 * reset osd connect
 646 */
 647static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 648{
 649        struct ceph_osd_request *req;
 650        int ret = 0;
 651
 652        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
 653        if (list_empty(&osd->o_requests)) {
 654                __remove_osd(osdc, osd);
 655        } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
 656                          &osd->o_con.peer_addr,
 657                          sizeof(osd->o_con.peer_addr)) == 0 &&
 658                   !ceph_con_opened(&osd->o_con)) {
 659                dout(" osd addr hasn't changed and connection never opened,"
 660                     " letting msgr retry");
 661                /* touch each r_stamp for handle_timeout()'s benfit */
 662                list_for_each_entry(req, &osd->o_requests, r_osd_item)
 663                        req->r_stamp = jiffies;
 664                ret = -EAGAIN;
 665        } else {
 666                ceph_con_close(&osd->o_con);
 667                ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
 668                osd->o_incarnation++;
 669        }
 670        return ret;
 671}
 672
 673static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
 674{
 675        struct rb_node **p = &osdc->osds.rb_node;
 676        struct rb_node *parent = NULL;
 677        struct ceph_osd *osd = NULL;
 678
 679        while (*p) {
 680                parent = *p;
 681                osd = rb_entry(parent, struct ceph_osd, o_node);
 682                if (new->o_osd < osd->o_osd)
 683                        p = &(*p)->rb_left;
 684                else if (new->o_osd > osd->o_osd)
 685                        p = &(*p)->rb_right;
 686                else
 687                        BUG();
 688        }
 689
 690        rb_link_node(&new->o_node, parent, p);
 691        rb_insert_color(&new->o_node, &osdc->osds);
 692}
 693
 694static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
 695{
 696        struct ceph_osd *osd;
 697        struct rb_node *n = osdc->osds.rb_node;
 698
 699        while (n) {
 700                osd = rb_entry(n, struct ceph_osd, o_node);
 701                if (o < osd->o_osd)
 702                        n = n->rb_left;
 703                else if (o > osd->o_osd)
 704                        n = n->rb_right;
 705                else
 706                        return osd;
 707        }
 708        return NULL;
 709}
 710
 711static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
 712{
 713        schedule_delayed_work(&osdc->timeout_work,
 714                        osdc->client->options->osd_keepalive_timeout * HZ);
 715}
 716
 717static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
 718{
 719        cancel_delayed_work(&osdc->timeout_work);
 720}
 721
 722/*
 723 * Register request, assign tid.  If this is the first request, set up
 724 * the timeout event.
 725 */
 726static void register_request(struct ceph_osd_client *osdc,
 727                             struct ceph_osd_request *req)
 728{
 729        mutex_lock(&osdc->request_mutex);
 730        req->r_tid = ++osdc->last_tid;
 731        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
 732        INIT_LIST_HEAD(&req->r_req_lru_item);
 733
 734        dout("register_request %p tid %lld\n", req, req->r_tid);
 735        __insert_request(osdc, req);
 736        ceph_osdc_get_request(req);
 737        osdc->num_requests++;
 738
 739        if (osdc->num_requests == 1) {
 740                dout(" first request, scheduling timeout\n");
 741                __schedule_osd_timeout(osdc);
 742        }
 743        mutex_unlock(&osdc->request_mutex);
 744}
 745
 746/*
 747 * called under osdc->request_mutex
 748 */
 749static void __unregister_request(struct ceph_osd_client *osdc,
 750                                 struct ceph_osd_request *req)
 751{
 752        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 753        rb_erase(&req->r_node, &osdc->requests);
 754        osdc->num_requests--;
 755
 756        if (req->r_osd) {
 757                /* make sure the original request isn't in flight. */
 758                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 759
 760                list_del_init(&req->r_osd_item);
 761                if (list_empty(&req->r_osd->o_requests))
 762                        __move_osd_to_lru(osdc, req->r_osd);
 763                req->r_osd = NULL;
 764        }
 765
 766        ceph_osdc_put_request(req);
 767
 768        list_del_init(&req->r_req_lru_item);
 769        if (osdc->num_requests == 0) {
 770                dout(" no requests, canceling timeout\n");
 771                __cancel_osd_timeout(osdc);
 772        }
 773}
 774
 775/*
 776 * Cancel a previously queued request message
 777 */
 778static void __cancel_request(struct ceph_osd_request *req)
 779{
 780        if (req->r_sent && req->r_osd) {
 781                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 782                req->r_sent = 0;
 783        }
 784        list_del_init(&req->r_req_lru_item);
 785}
 786
 787/*
 788 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
 789 * (as needed), and set the request r_osd appropriately.  If there is
 790 * no up osd, set r_osd to NULL.
 791 *
 792 * Return 0 if unchanged, 1 if changed, or negative on error.
 793 *
 794 * Caller should hold map_sem for read and request_mutex.
 795 */
 796static int __map_osds(struct ceph_osd_client *osdc,
 797                      struct ceph_osd_request *req)
 798{
 799        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
 800        struct ceph_pg pgid;
 801        int acting[CEPH_PG_MAX_SIZE];
 802        int o = -1, num = 0;
 803        int err;
 804
 805        dout("map_osds %p tid %lld\n", req, req->r_tid);
 806        err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
 807                                      &req->r_file_layout, osdc->osdmap);
 808        if (err)
 809                return err;
 810        pgid = reqhead->layout.ol_pgid;
 811        req->r_pgid = pgid;
 812
 813        err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
 814        if (err > 0) {
 815                o = acting[0];
 816                num = err;
 817        }
 818
 819        if ((req->r_osd && req->r_osd->o_osd == o &&
 820             req->r_sent >= req->r_osd->o_incarnation &&
 821             req->r_num_pg_osds == num &&
 822             memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
 823            (req->r_osd == NULL && o == -1))
 824                return 0;  /* no change */
 825
 826        dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
 827             req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
 828             req->r_osd ? req->r_osd->o_osd : -1);
 829
 830        /* record full pg acting set */
 831        memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
 832        req->r_num_pg_osds = num;
 833
 834        if (req->r_osd) {
 835                __cancel_request(req);
 836                list_del_init(&req->r_osd_item);
 837                req->r_osd = NULL;
 838        }
 839
 840        req->r_osd = __lookup_osd(osdc, o);
 841        if (!req->r_osd && o >= 0) {
 842                err = -ENOMEM;
 843                req->r_osd = create_osd(osdc);
 844                if (!req->r_osd)
 845                        goto out;
 846
 847                dout("map_osds osd %p is osd%d\n", req->r_osd, o);
 848                req->r_osd->o_osd = o;
 849                req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
 850                __insert_osd(osdc, req->r_osd);
 851
 852                ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
 853        }
 854
 855        if (req->r_osd) {
 856                __remove_osd_from_lru(req->r_osd);
 857                list_add(&req->r_osd_item, &req->r_osd->o_requests);
 858        }
 859        err = 1;   /* osd or pg changed */
 860
 861out:
 862        return err;
 863}
 864
 865/*
 866 * caller should hold map_sem (for read) and request_mutex
 867 */
 868static int __send_request(struct ceph_osd_client *osdc,
 869                          struct ceph_osd_request *req)
 870{
 871        struct ceph_osd_request_head *reqhead;
 872        int err;
 873
 874        err = __map_osds(osdc, req);
 875        if (err < 0)
 876                return err;
 877        if (req->r_osd == NULL) {
 878                dout("send_request %p no up osds in pg\n", req);
 879                ceph_monc_request_next_osdmap(&osdc->client->monc);
 880                return 0;
 881        }
 882
 883        dout("send_request %p tid %llu to osd%d flags %d\n",
 884             req, req->r_tid, req->r_osd->o_osd, req->r_flags);
 885
 886        reqhead = req->r_request->front.iov_base;
 887        reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
 888        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
 889        reqhead->reassert_version = req->r_reassert_version;
 890
 891        req->r_stamp = jiffies;
 892        list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
 893
 894        ceph_msg_get(req->r_request); /* send consumes a ref */
 895        ceph_con_send(&req->r_osd->o_con, req->r_request);
 896        req->r_sent = req->r_osd->o_incarnation;
 897        return 0;
 898}
 899
 900/*
 901 * Timeout callback, called every N seconds when 1 or more osd
 902 * requests has been active for more than N seconds.  When this
 903 * happens, we ping all OSDs with requests who have timed out to
 904 * ensure any communications channel reset is detected.  Reset the
 905 * request timeouts another N seconds in the future as we go.
 906 * Reschedule the timeout event another N seconds in future (unless
 907 * there are no open requests).
 908 */
 909static void handle_timeout(struct work_struct *work)
 910{
 911        struct ceph_osd_client *osdc =
 912                container_of(work, struct ceph_osd_client, timeout_work.work);
 913        struct ceph_osd_request *req, *last_req = NULL;
 914        struct ceph_osd *osd;
 915        unsigned long timeout = osdc->client->options->osd_timeout * HZ;
 916        unsigned long keepalive =
 917                osdc->client->options->osd_keepalive_timeout * HZ;
 918        unsigned long last_stamp = 0;
 919        struct rb_node *p;
 920        struct list_head slow_osds;
 921
 922        dout("timeout\n");
 923        down_read(&osdc->map_sem);
 924
 925        ceph_monc_request_next_osdmap(&osdc->client->monc);
 926
 927        mutex_lock(&osdc->request_mutex);
 928        for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
 929                req = rb_entry(p, struct ceph_osd_request, r_node);
 930
 931                if (req->r_resend) {
 932                        int err;
 933
 934                        dout("osdc resending prev failed %lld\n", req->r_tid);
 935                        err = __send_request(osdc, req);
 936                        if (err)
 937                                dout("osdc failed again on %lld\n", req->r_tid);
 938                        else
 939                                req->r_resend = false;
 940                        continue;
 941                }
 942        }
 943
 944        /*
 945         * reset osds that appear to be _really_ unresponsive.  this
 946         * is a failsafe measure.. we really shouldn't be getting to
 947         * this point if the system is working properly.  the monitors
 948         * should mark the osd as failed and we should find out about
 949         * it from an updated osd map.
 950         */
 951        while (timeout && !list_empty(&osdc->req_lru)) {
 952                req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
 953                                 r_req_lru_item);
 954
 955                if (time_before(jiffies, req->r_stamp + timeout))
 956                        break;
 957
 958                BUG_ON(req == last_req && req->r_stamp == last_stamp);
 959                last_req = req;
 960                last_stamp = req->r_stamp;
 961
 962                osd = req->r_osd;
 963                BUG_ON(!osd);
 964                pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
 965                           req->r_tid, osd->o_osd);
 966                __kick_requests(osdc, osd);
 967        }
 968
 969        /*
 970         * ping osds that are a bit slow.  this ensures that if there
 971         * is a break in the TCP connection we will notice, and reopen
 972         * a connection with that osd (from the fault callback).
 973         */
 974        INIT_LIST_HEAD(&slow_osds);
 975        list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
 976                if (time_before(jiffies, req->r_stamp + keepalive))
 977                        break;
 978
 979                osd = req->r_osd;
 980                BUG_ON(!osd);
 981                dout(" tid %llu is slow, will send keepalive on osd%d\n",
 982                     req->r_tid, osd->o_osd);
 983                list_move_tail(&osd->o_keepalive_item, &slow_osds);
 984        }
 985        while (!list_empty(&slow_osds)) {
 986                osd = list_entry(slow_osds.next, struct ceph_osd,
 987                                 o_keepalive_item);
 988                list_del_init(&osd->o_keepalive_item);
 989                ceph_con_keepalive(&osd->o_con);
 990        }
 991
 992        __schedule_osd_timeout(osdc);
 993        mutex_unlock(&osdc->request_mutex);
 994
 995        up_read(&osdc->map_sem);
 996}
 997
 998static void handle_osds_timeout(struct work_struct *work)
 999{
1000        struct ceph_osd_client *osdc =
1001                container_of(work, struct ceph_osd_client,
1002                             osds_timeout_work.work);
1003        unsigned long delay =
1004                osdc->client->options->osd_idle_ttl * HZ >> 2;
1005
1006        dout("osds timeout\n");
1007        down_read(&osdc->map_sem);
1008        remove_old_osds(osdc, 0);
1009        up_read(&osdc->map_sem);
1010
1011        schedule_delayed_work(&osdc->osds_timeout_work,
1012                              round_jiffies_relative(delay));
1013}
1014
1015/*
1016 * handle osd op reply.  either call the callback if it is specified,
1017 * or do the completion to wake up the waiting thread.
1018 */
1019static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1020                         struct ceph_connection *con)
1021{
1022        struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1023        struct ceph_osd_request *req;
1024        u64 tid;
1025        int numops, object_len, flags;
1026        s32 result;
1027
1028        tid = le64_to_cpu(msg->hdr.tid);
1029        if (msg->front.iov_len < sizeof(*rhead))
1030                goto bad;
1031        numops = le32_to_cpu(rhead->num_ops);
1032        object_len = le32_to_cpu(rhead->object_len);
1033        result = le32_to_cpu(rhead->result);
1034        if (msg->front.iov_len != sizeof(*rhead) + object_len +
1035            numops * sizeof(struct ceph_osd_op))
1036                goto bad;
1037        dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1038
1039        /* lookup */
1040        mutex_lock(&osdc->request_mutex);
1041        req = __lookup_request(osdc, tid);
1042        if (req == NULL) {
1043                dout("handle_reply tid %llu dne\n", tid);
1044                mutex_unlock(&osdc->request_mutex);
1045                return;
1046        }
1047        ceph_osdc_get_request(req);
1048        flags = le32_to_cpu(rhead->flags);
1049
1050        /*
1051         * if this connection filled our message, drop our reference now, to
1052         * avoid a (safe but slower) revoke later.
1053         */
1054        if (req->r_con_filling_msg == con && req->r_reply == msg) {
1055                dout(" dropping con_filling_msg ref %p\n", con);
1056                req->r_con_filling_msg = NULL;
1057                ceph_con_put(con);
1058        }
1059
1060        if (!req->r_got_reply) {
1061                unsigned bytes;
1062
1063                req->r_result = le32_to_cpu(rhead->result);
1064                bytes = le32_to_cpu(msg->hdr.data_len);
1065                dout("handle_reply result %d bytes %d\n", req->r_result,
1066                     bytes);
1067                if (req->r_result == 0)
1068                        req->r_result = bytes;
1069
1070                /* in case this is a write and we need to replay, */
1071                req->r_reassert_version = rhead->reassert_version;
1072
1073                req->r_got_reply = 1;
1074        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1075                dout("handle_reply tid %llu dup ack\n", tid);
1076                mutex_unlock(&osdc->request_mutex);
1077                goto done;
1078        }
1079
1080        dout("handle_reply tid %llu flags %d\n", tid, flags);
1081
1082        /* either this is a read, or we got the safe response */
1083        if (result < 0 ||
1084            (flags & CEPH_OSD_FLAG_ONDISK) ||
1085            ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1086                __unregister_request(osdc, req);
1087
1088        mutex_unlock(&osdc->request_mutex);
1089
1090        if (req->r_callback)
1091                req->r_callback(req, msg);
1092        else
1093                complete_all(&req->r_completion);
1094
1095        if (flags & CEPH_OSD_FLAG_ONDISK) {
1096                if (req->r_safe_callback)
1097                        req->r_safe_callback(req, msg);
1098                complete_all(&req->r_safe_completion);  /* fsync waiter */
1099        }
1100
1101done:
1102        ceph_osdc_put_request(req);
1103        return;
1104
1105bad:
1106        pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1107               (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1108               (int)sizeof(*rhead));
1109        ceph_msg_dump(msg);
1110}
1111
1112
1113static int __kick_requests(struct ceph_osd_client *osdc,
1114                          struct ceph_osd *kickosd)
1115{
1116        struct ceph_osd_request *req;
1117        struct rb_node *p, *n;
1118        int needmap = 0;
1119        int err;
1120
1121        dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
1122        if (kickosd) {
1123                err = __reset_osd(osdc, kickosd);
1124                if (err == -EAGAIN)
1125                        return 1;
1126        } else {
1127                for (p = rb_first(&osdc->osds); p; p = n) {
1128                        struct ceph_osd *osd =
1129                                rb_entry(p, struct ceph_osd, o_node);
1130
1131                        n = rb_next(p);
1132                        if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1133                            memcmp(&osd->o_con.peer_addr,
1134                                   ceph_osd_addr(osdc->osdmap,
1135                                                 osd->o_osd),
1136                                   sizeof(struct ceph_entity_addr)) != 0)
1137                                __reset_osd(osdc, osd);
1138                }
1139        }
1140
1141        for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1142                req = rb_entry(p, struct ceph_osd_request, r_node);
1143
1144                if (req->r_resend) {
1145                        dout(" r_resend set on tid %llu\n", req->r_tid);
1146                        __cancel_request(req);
1147                        goto kick;
1148                }
1149                if (req->r_osd && kickosd == req->r_osd) {
1150                        __cancel_request(req);
1151                        goto kick;
1152                }
1153
1154                err = __map_osds(osdc, req);
1155                if (err == 0)
1156                        continue;  /* no change */
1157                if (err < 0) {
1158                        /*
1159                         * FIXME: really, we should set the request
1160                         * error and fail if this isn't a 'nofail'
1161                         * request, but that's a fair bit more
1162                         * complicated to do.  So retry!
1163                         */
1164                        dout(" setting r_resend on %llu\n", req->r_tid);
1165                        req->r_resend = true;
1166                        continue;
1167                }
1168                if (req->r_osd == NULL) {
1169                        dout("tid %llu maps to no valid osd\n", req->r_tid);
1170                        needmap++;  /* request a newer map */
1171                        continue;
1172                }
1173
1174kick:
1175                dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
1176                     req->r_osd ? req->r_osd->o_osd : -1);
1177                req->r_flags |= CEPH_OSD_FLAG_RETRY;
1178                err = __send_request(osdc, req);
1179                if (err) {
1180                        dout(" setting r_resend on %llu\n", req->r_tid);
1181                        req->r_resend = true;
1182                }
1183        }
1184
1185        return needmap;
1186}
1187
1188/*
1189 * Resubmit osd requests whose osd or osd address has changed.  Request
1190 * a new osd map if osds are down, or we are otherwise unable to determine
1191 * how to direct a request.
1192 *
1193 * Close connections to down osds.
1194 *
1195 * If @who is specified, resubmit requests for that specific osd.
1196 *
1197 * Caller should hold map_sem for read and request_mutex.
1198 */
1199static void kick_requests(struct ceph_osd_client *osdc,
1200                          struct ceph_osd *kickosd)
1201{
1202        int needmap;
1203
1204        mutex_lock(&osdc->request_mutex);
1205        needmap = __kick_requests(osdc, kickosd);
1206        mutex_unlock(&osdc->request_mutex);
1207
1208        if (needmap) {
1209                dout("%d requests for down osds, need new map\n", needmap);
1210                ceph_monc_request_next_osdmap(&osdc->client->monc);
1211        }
1212
1213}
1214/*
1215 * Process updated osd map.
1216 *
1217 * The message contains any number of incremental and full maps, normally
1218 * indicating some sort of topology change in the cluster.  Kick requests
1219 * off to different OSDs as needed.
1220 */
1221void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1222{
1223        void *p, *end, *next;
1224        u32 nr_maps, maplen;
1225        u32 epoch;
1226        struct ceph_osdmap *newmap = NULL, *oldmap;
1227        int err;
1228        struct ceph_fsid fsid;
1229
1230        dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1231        p = msg->front.iov_base;
1232        end = p + msg->front.iov_len;
1233
1234        /* verify fsid */
1235        ceph_decode_need(&p, end, sizeof(fsid), bad);
1236        ceph_decode_copy(&p, &fsid, sizeof(fsid));
1237        if (ceph_check_fsid(osdc->client, &fsid) < 0)
1238                return;
1239
1240        down_write(&osdc->map_sem);
1241
1242        /* incremental maps */
1243        ceph_decode_32_safe(&p, end, nr_maps, bad);
1244        dout(" %d inc maps\n", nr_maps);
1245        while (nr_maps > 0) {
1246                ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1247                epoch = ceph_decode_32(&p);
1248                maplen = ceph_decode_32(&p);
1249                ceph_decode_need(&p, end, maplen, bad);
1250                next = p + maplen;
1251                if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1252                        dout("applying incremental map %u len %d\n",
1253                             epoch, maplen);
1254                        newmap = osdmap_apply_incremental(&p, next,
1255                                                          osdc->osdmap,
1256                                                          osdc->client->msgr);
1257                        if (IS_ERR(newmap)) {
1258                                err = PTR_ERR(newmap);
1259                                goto bad;
1260                        }
1261                        BUG_ON(!newmap);
1262                        if (newmap != osdc->osdmap) {
1263                                ceph_osdmap_destroy(osdc->osdmap);
1264                                osdc->osdmap = newmap;
1265                        }
1266                } else {
1267                        dout("ignoring incremental map %u len %d\n",
1268                             epoch, maplen);
1269                }
1270                p = next;
1271                nr_maps--;
1272        }
1273        if (newmap)
1274                goto done;
1275
1276        /* full maps */
1277        ceph_decode_32_safe(&p, end, nr_maps, bad);
1278        dout(" %d full maps\n", nr_maps);
1279        while (nr_maps) {
1280                ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1281                epoch = ceph_decode_32(&p);
1282                maplen = ceph_decode_32(&p);
1283                ceph_decode_need(&p, end, maplen, bad);
1284                if (nr_maps > 1) {
1285                        dout("skipping non-latest full map %u len %d\n",
1286                             epoch, maplen);
1287                } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1288                        dout("skipping full map %u len %d, "
1289                             "older than our %u\n", epoch, maplen,
1290                             osdc->osdmap->epoch);
1291                } else {
1292                        dout("taking full map %u len %d\n", epoch, maplen);
1293                        newmap = osdmap_decode(&p, p+maplen);
1294                        if (IS_ERR(newmap)) {
1295                                err = PTR_ERR(newmap);
1296                                goto bad;
1297                        }
1298                        BUG_ON(!newmap);
1299                        oldmap = osdc->osdmap;
1300                        osdc->osdmap = newmap;
1301                        if (oldmap)
1302                                ceph_osdmap_destroy(oldmap);
1303                }
1304                p += maplen;
1305                nr_maps--;
1306        }
1307
1308done:
1309        downgrade_write(&osdc->map_sem);
1310        ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1311        if (newmap)
1312                kick_requests(osdc, NULL);
1313        up_read(&osdc->map_sem);
1314        wake_up_all(&osdc->client->auth_wq);
1315        return;
1316
1317bad:
1318        pr_err("osdc handle_map corrupt msg\n");
1319        ceph_msg_dump(msg);
1320        up_write(&osdc->map_sem);
1321        return;
1322}
1323
1324/*
1325 * Register request, send initial attempt.
1326 */
1327int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1328                            struct ceph_osd_request *req,
1329                            bool nofail)
1330{
1331        int rc = 0;
1332
1333        req->r_request->pages = req->r_pages;
1334        req->r_request->nr_pages = req->r_num_pages;
1335#ifdef CONFIG_BLOCK
1336        req->r_request->bio = req->r_bio;
1337#endif
1338        req->r_request->trail = req->r_trail;
1339
1340        register_request(osdc, req);
1341
1342        down_read(&osdc->map_sem);
1343        mutex_lock(&osdc->request_mutex);
1344        /*
1345         * a racing kick_requests() may have sent the message for us
1346         * while we dropped request_mutex above, so only send now if
1347         * the request still han't been touched yet.
1348         */
1349        if (req->r_sent == 0) {
1350                rc = __send_request(osdc, req);
1351                if (rc) {
1352                        if (nofail) {
1353                                dout("osdc_start_request failed send, "
1354                                     " marking %lld\n", req->r_tid);
1355                                req->r_resend = true;
1356                                rc = 0;
1357                        } else {
1358                                __unregister_request(osdc, req);
1359                        }
1360                }
1361        }
1362        mutex_unlock(&osdc->request_mutex);
1363        up_read(&osdc->map_sem);
1364        return rc;
1365}
1366EXPORT_SYMBOL(ceph_osdc_start_request);
1367
1368/*
1369 * wait for a request to complete
1370 */
1371int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1372                           struct ceph_osd_request *req)
1373{
1374        int rc;
1375
1376        rc = wait_for_completion_interruptible(&req->r_completion);
1377        if (rc < 0) {
1378                mutex_lock(&osdc->request_mutex);
1379                __cancel_request(req);
1380                __unregister_request(osdc, req);
1381                mutex_unlock(&osdc->request_mutex);
1382                dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1383                return rc;
1384        }
1385
1386        dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1387        return req->r_result;
1388}
1389EXPORT_SYMBOL(ceph_osdc_wait_request);
1390
1391/*
1392 * sync - wait for all in-flight requests to flush.  avoid starvation.
1393 */
1394void ceph_osdc_sync(struct ceph_osd_client *osdc)
1395{
1396        struct ceph_osd_request *req;
1397        u64 last_tid, next_tid = 0;
1398
1399        mutex_lock(&osdc->request_mutex);
1400        last_tid = osdc->last_tid;
1401        while (1) {
1402                req = __lookup_request_ge(osdc, next_tid);
1403                if (!req)
1404                        break;
1405                if (req->r_tid > last_tid)
1406                        break;
1407
1408                next_tid = req->r_tid + 1;
1409                if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1410                        continue;
1411
1412                ceph_osdc_get_request(req);
1413                mutex_unlock(&osdc->request_mutex);
1414                dout("sync waiting on tid %llu (last is %llu)\n",
1415                     req->r_tid, last_tid);
1416                wait_for_completion(&req->r_safe_completion);
1417                mutex_lock(&osdc->request_mutex);
1418                ceph_osdc_put_request(req);
1419        }
1420        mutex_unlock(&osdc->request_mutex);
1421        dout("sync done (thru tid %llu)\n", last_tid);
1422}
1423EXPORT_SYMBOL(ceph_osdc_sync);
1424
1425/*
1426 * init, shutdown
1427 */
1428int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1429{
1430        int err;
1431
1432        dout("init\n");
1433        osdc->client = client;
1434        osdc->osdmap = NULL;
1435        init_rwsem(&osdc->map_sem);
1436        init_completion(&osdc->map_waiters);
1437        osdc->last_requested_map = 0;
1438        mutex_init(&osdc->request_mutex);
1439        osdc->last_tid = 0;
1440        osdc->osds = RB_ROOT;
1441        INIT_LIST_HEAD(&osdc->osd_lru);
1442        osdc->requests = RB_ROOT;
1443        INIT_LIST_HEAD(&osdc->req_lru);
1444        osdc->num_requests = 0;
1445        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1446        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1447
1448        schedule_delayed_work(&osdc->osds_timeout_work,
1449           round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1450
1451        err = -ENOMEM;
1452        osdc->req_mempool = mempool_create_kmalloc_pool(10,
1453                                        sizeof(struct ceph_osd_request));
1454        if (!osdc->req_mempool)
1455                goto out;
1456
1457        err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1458                                "osd_op");
1459        if (err < 0)
1460                goto out_mempool;
1461        err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1462                                OSD_OPREPLY_FRONT_LEN, 10, true,
1463                                "osd_op_reply");
1464        if (err < 0)
1465                goto out_msgpool;
1466        return 0;
1467
1468out_msgpool:
1469        ceph_msgpool_destroy(&osdc->msgpool_op);
1470out_mempool:
1471        mempool_destroy(osdc->req_mempool);
1472out:
1473        return err;
1474}
1475EXPORT_SYMBOL(ceph_osdc_init);
1476
1477void ceph_osdc_stop(struct ceph_osd_client *osdc)
1478{
1479        cancel_delayed_work_sync(&osdc->timeout_work);
1480        cancel_delayed_work_sync(&osdc->osds_timeout_work);
1481        if (osdc->osdmap) {
1482                ceph_osdmap_destroy(osdc->osdmap);
1483                osdc->osdmap = NULL;
1484        }
1485        remove_old_osds(osdc, 1);
1486        mempool_destroy(osdc->req_mempool);
1487        ceph_msgpool_destroy(&osdc->msgpool_op);
1488        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1489}
1490EXPORT_SYMBOL(ceph_osdc_stop);
1491
1492/*
1493 * Read some contiguous pages.  If we cross a stripe boundary, shorten
1494 * *plen.  Return number of bytes read, or error.
1495 */
1496int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1497                        struct ceph_vino vino, struct ceph_file_layout *layout,
1498                        u64 off, u64 *plen,
1499                        u32 truncate_seq, u64 truncate_size,
1500                        struct page **pages, int num_pages, int page_align)
1501{
1502        struct ceph_osd_request *req;
1503        int rc = 0;
1504
1505        dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1506             vino.snap, off, *plen);
1507        req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1508                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1509                                    NULL, 0, truncate_seq, truncate_size, NULL,
1510                                    false, 1, page_align);
1511        if (!req)
1512                return -ENOMEM;
1513
1514        /* it may be a short read due to an object boundary */
1515        req->r_pages = pages;
1516
1517        dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1518             off, *plen, req->r_num_pages, page_align);
1519
1520        rc = ceph_osdc_start_request(osdc, req, false);
1521        if (!rc)
1522                rc = ceph_osdc_wait_request(osdc, req);
1523
1524        ceph_osdc_put_request(req);
1525        dout("readpages result %d\n", rc);
1526        return rc;
1527}
1528EXPORT_SYMBOL(ceph_osdc_readpages);
1529
1530/*
1531 * do a synchronous write on N pages
1532 */
1533int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1534                         struct ceph_file_layout *layout,
1535                         struct ceph_snap_context *snapc,
1536                         u64 off, u64 len,
1537                         u32 truncate_seq, u64 truncate_size,
1538                         struct timespec *mtime,
1539                         struct page **pages, int num_pages,
1540                         int flags, int do_sync, bool nofail)
1541{
1542        struct ceph_osd_request *req;
1543        int rc = 0;
1544        int page_align = off & ~PAGE_MASK;
1545
1546        BUG_ON(vino.snap != CEPH_NOSNAP);
1547        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1548                                    CEPH_OSD_OP_WRITE,
1549                                    flags | CEPH_OSD_FLAG_ONDISK |
1550                                            CEPH_OSD_FLAG_WRITE,
1551                                    snapc, do_sync,
1552                                    truncate_seq, truncate_size, mtime,
1553                                    nofail, 1, page_align);
1554        if (!req)
1555                return -ENOMEM;
1556
1557        /* it may be a short write due to an object boundary */
1558        req->r_pages = pages;
1559        dout("writepages %llu~%llu (%d pages)\n", off, len,
1560             req->r_num_pages);
1561
1562        rc = ceph_osdc_start_request(osdc, req, nofail);
1563        if (!rc)
1564                rc = ceph_osdc_wait_request(osdc, req);
1565
1566        ceph_osdc_put_request(req);
1567        if (rc == 0)
1568                rc = len;
1569        dout("writepages result %d\n", rc);
1570        return rc;
1571}
1572EXPORT_SYMBOL(ceph_osdc_writepages);
1573
1574/*
1575 * handle incoming message
1576 */
1577static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1578{
1579        struct ceph_osd *osd = con->private;
1580        struct ceph_osd_client *osdc;
1581        int type = le16_to_cpu(msg->hdr.type);
1582
1583        if (!osd)
1584                goto out;
1585        osdc = osd->o_osdc;
1586
1587        switch (type) {
1588        case CEPH_MSG_OSD_MAP:
1589                ceph_osdc_handle_map(osdc, msg);
1590                break;
1591        case CEPH_MSG_OSD_OPREPLY:
1592                handle_reply(osdc, msg, con);
1593                break;
1594
1595        default:
1596                pr_err("received unknown message type %d %s\n", type,
1597                       ceph_msg_type_name(type));
1598        }
1599out:
1600        ceph_msg_put(msg);
1601}
1602
1603/*
1604 * lookup and return message for incoming reply.  set up reply message
1605 * pages.
1606 */
1607static struct ceph_msg *get_reply(struct ceph_connection *con,
1608                                  struct ceph_msg_header *hdr,
1609                                  int *skip)
1610{
1611        struct ceph_osd *osd = con->private;
1612        struct ceph_osd_client *osdc = osd->o_osdc;
1613        struct ceph_msg *m;
1614        struct ceph_osd_request *req;
1615        int front = le32_to_cpu(hdr->front_len);
1616        int data_len = le32_to_cpu(hdr->data_len);
1617        u64 tid;
1618
1619        tid = le64_to_cpu(hdr->tid);
1620        mutex_lock(&osdc->request_mutex);
1621        req = __lookup_request(osdc, tid);
1622        if (!req) {
1623                *skip = 1;
1624                m = NULL;
1625                pr_info("get_reply unknown tid %llu from osd%d\n", tid,
1626                        osd->o_osd);
1627                goto out;
1628        }
1629
1630        if (req->r_con_filling_msg) {
1631                dout("get_reply revoking msg %p from old con %p\n",
1632                     req->r_reply, req->r_con_filling_msg);
1633                ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1634                ceph_con_put(req->r_con_filling_msg);
1635                req->r_con_filling_msg = NULL;
1636        }
1637
1638        if (front > req->r_reply->front.iov_len) {
1639                pr_warning("get_reply front %d > preallocated %d\n",
1640                           front, (int)req->r_reply->front.iov_len);
1641                m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
1642                if (!m)
1643                        goto out;
1644                ceph_msg_put(req->r_reply);
1645                req->r_reply = m;
1646        }
1647        m = ceph_msg_get(req->r_reply);
1648
1649        if (data_len > 0) {
1650                int want = calc_pages_for(req->r_page_alignment, data_len);
1651
1652                if (unlikely(req->r_num_pages < want)) {
1653                        pr_warning("tid %lld reply %d > expected %d pages\n",
1654                                   tid, want, m->nr_pages);
1655                        *skip = 1;
1656                        ceph_msg_put(m);
1657                        m = NULL;
1658                        goto out;
1659                }
1660                m->pages = req->r_pages;
1661                m->nr_pages = req->r_num_pages;
1662                m->page_alignment = req->r_page_alignment;
1663#ifdef CONFIG_BLOCK
1664                m->bio = req->r_bio;
1665#endif
1666        }
1667        *skip = 0;
1668        req->r_con_filling_msg = ceph_con_get(con);
1669        dout("get_reply tid %lld %p\n", tid, m);
1670
1671out:
1672        mutex_unlock(&osdc->request_mutex);
1673        return m;
1674
1675}
1676
1677static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1678                                  struct ceph_msg_header *hdr,
1679                                  int *skip)
1680{
1681        struct ceph_osd *osd = con->private;
1682        int type = le16_to_cpu(hdr->type);
1683        int front = le32_to_cpu(hdr->front_len);
1684
1685        switch (type) {
1686        case CEPH_MSG_OSD_MAP:
1687                return ceph_msg_new(type, front, GFP_NOFS);
1688        case CEPH_MSG_OSD_OPREPLY:
1689                return get_reply(con, hdr, skip);
1690        default:
1691                pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1692                        osd->o_osd);
1693                *skip = 1;
1694                return NULL;
1695        }
1696}
1697
1698/*
1699 * Wrappers to refcount containing ceph_osd struct
1700 */
1701static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1702{
1703        struct ceph_osd *osd = con->private;
1704        if (get_osd(osd))
1705                return con;
1706        return NULL;
1707}
1708
1709static void put_osd_con(struct ceph_connection *con)
1710{
1711        struct ceph_osd *osd = con->private;
1712        put_osd(osd);
1713}
1714
1715/*
1716 * authentication
1717 */
1718static int get_authorizer(struct ceph_connection *con,
1719                          void **buf, int *len, int *proto,
1720                          void **reply_buf, int *reply_len, int force_new)
1721{
1722        struct ceph_osd *o = con->private;
1723        struct ceph_osd_client *osdc = o->o_osdc;
1724        struct ceph_auth_client *ac = osdc->client->monc.auth;
1725        int ret = 0;
1726
1727        if (force_new && o->o_authorizer) {
1728                ac->ops->destroy_authorizer(ac, o->o_authorizer);
1729                o->o_authorizer = NULL;
1730        }
1731        if (o->o_authorizer == NULL) {
1732                ret = ac->ops->create_authorizer(
1733                        ac, CEPH_ENTITY_TYPE_OSD,
1734                        &o->o_authorizer,
1735                        &o->o_authorizer_buf,
1736                        &o->o_authorizer_buf_len,
1737                        &o->o_authorizer_reply_buf,
1738                        &o->o_authorizer_reply_buf_len);
1739                if (ret)
1740                        return ret;
1741        }
1742
1743        *proto = ac->protocol;
1744        *buf = o->o_authorizer_buf;
1745        *len = o->o_authorizer_buf_len;
1746        *reply_buf = o->o_authorizer_reply_buf;
1747        *reply_len = o->o_authorizer_reply_buf_len;
1748        return 0;
1749}
1750
1751
1752static int verify_authorizer_reply(struct ceph_connection *con, int len)
1753{
1754        struct ceph_osd *o = con->private;
1755        struct ceph_osd_client *osdc = o->o_osdc;
1756        struct ceph_auth_client *ac = osdc->client->monc.auth;
1757
1758        return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1759}
1760
1761static int invalidate_authorizer(struct ceph_connection *con)
1762{
1763        struct ceph_osd *o = con->private;
1764        struct ceph_osd_client *osdc = o->o_osdc;
1765        struct ceph_auth_client *ac = osdc->client->monc.auth;
1766
1767        if (ac->ops->invalidate_authorizer)
1768                ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1769
1770        return ceph_monc_validate_auth(&osdc->client->monc);
1771}
1772
1773static const struct ceph_connection_operations osd_con_ops = {
1774        .get = get_osd_con,
1775        .put = put_osd_con,
1776        .dispatch = dispatch,
1777        .get_authorizer = get_authorizer,
1778        .verify_authorizer_reply = verify_authorizer_reply,
1779        .invalidate_authorizer = invalidate_authorizer,
1780        .alloc_msg = alloc_msg,
1781        .fault = osd_reset,
1782};
1783