linux/net/ceph/osd_client.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2
   3#include <linux/ceph/ceph_debug.h>
   4
   5#include <linux/module.h>
   6#include <linux/err.h>
   7#include <linux/highmem.h>
   8#include <linux/mm.h>
   9#include <linux/pagemap.h>
  10#include <linux/slab.h>
  11#include <linux/uaccess.h>
  12#ifdef CONFIG_BLOCK
  13#include <linux/bio.h>
  14#endif
  15
  16#include <linux/ceph/ceph_features.h>
  17#include <linux/ceph/libceph.h>
  18#include <linux/ceph/osd_client.h>
  19#include <linux/ceph/messenger.h>
  20#include <linux/ceph/decode.h>
  21#include <linux/ceph/auth.h>
  22#include <linux/ceph/pagelist.h>
  23#include <linux/ceph/striper.h>
  24
  25#define OSD_OPREPLY_FRONT_LEN   512
  26
  27static struct kmem_cache        *ceph_osd_request_cache;
  28
  29static const struct ceph_connection_operations osd_con_ops;
  30
  31/*
  32 * Implement client access to distributed object storage cluster.
  33 *
  34 * All data objects are stored within a cluster/cloud of OSDs, or
  35 * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
  36 * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
  37 * remote daemons serving up and coordinating consistent and safe
  38 * access to storage.
  39 *
  40 * Cluster membership and the mapping of data objects onto storage devices
  41 * are described by the osd map.
  42 *
  43 * We keep track of pending OSD requests (read, write), resubmit
  44 * requests to different OSDs when the cluster topology/data layout
  45 * change, or retry the affected requests when the communications
  46 * channel with an OSD is reset.
  47 */
  48
  49static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
  50static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
  51static void link_linger(struct ceph_osd *osd,
  52                        struct ceph_osd_linger_request *lreq);
  53static void unlink_linger(struct ceph_osd *osd,
  54                          struct ceph_osd_linger_request *lreq);
  55static void clear_backoffs(struct ceph_osd *osd);
  56
  57#if 1
  58static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
  59{
  60        bool wrlocked = true;
  61
  62        if (unlikely(down_read_trylock(sem))) {
  63                wrlocked = false;
  64                up_read(sem);
  65        }
  66
  67        return wrlocked;
  68}
  69static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
  70{
  71        WARN_ON(!rwsem_is_locked(&osdc->lock));
  72}
  73static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
  74{
  75        WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
  76}
  77static inline void verify_osd_locked(struct ceph_osd *osd)
  78{
  79        struct ceph_osd_client *osdc = osd->o_osdc;
  80
  81        WARN_ON(!(mutex_is_locked(&osd->lock) &&
  82                  rwsem_is_locked(&osdc->lock)) &&
  83                !rwsem_is_wrlocked(&osdc->lock));
  84}
  85static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
  86{
  87        WARN_ON(!mutex_is_locked(&lreq->lock));
  88}
  89#else
  90static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
  91static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
  92static inline void verify_osd_locked(struct ceph_osd *osd) { }
  93static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
  94#endif
  95
  96/*
  97 * calculate the mapping of a file extent onto an object, and fill out the
  98 * request accordingly.  shorten extent as necessary if it crosses an
  99 * object boundary.
 100 *
 101 * fill osd op in request message.
 102 */
 103static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
 104                        u64 *objnum, u64 *objoff, u64 *objlen)
 105{
 106        u64 orig_len = *plen;
 107        u32 xlen;
 108
 109        /* object extent? */
 110        ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
 111                                          objoff, &xlen);
 112        *objlen = xlen;
 113        if (*objlen < orig_len) {
 114                *plen = *objlen;
 115                dout(" skipping last %llu, final file extent %llu~%llu\n",
 116                     orig_len - *plen, off, *plen);
 117        }
 118
 119        dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
 120        return 0;
 121}
 122
 123static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
 124{
 125        memset(osd_data, 0, sizeof (*osd_data));
 126        osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
 127}
 128
 129/*
 130 * Consumes @pages if @own_pages is true.
 131 */
 132static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
 133                        struct page **pages, u64 length, u32 alignment,
 134                        bool pages_from_pool, bool own_pages)
 135{
 136        osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
 137        osd_data->pages = pages;
 138        osd_data->length = length;
 139        osd_data->alignment = alignment;
 140        osd_data->pages_from_pool = pages_from_pool;
 141        osd_data->own_pages = own_pages;
 142}
 143
 144/*
 145 * Consumes a ref on @pagelist.
 146 */
 147static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
 148                        struct ceph_pagelist *pagelist)
 149{
 150        osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
 151        osd_data->pagelist = pagelist;
 152}
 153
 154#ifdef CONFIG_BLOCK
 155static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
 156                                   struct ceph_bio_iter *bio_pos,
 157                                   u32 bio_length)
 158{
 159        osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
 160        osd_data->bio_pos = *bio_pos;
 161        osd_data->bio_length = bio_length;
 162}
 163#endif /* CONFIG_BLOCK */
 164
 165static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
 166                                     struct ceph_bvec_iter *bvec_pos,
 167                                     u32 num_bvecs)
 168{
 169        osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
 170        osd_data->bvec_pos = *bvec_pos;
 171        osd_data->num_bvecs = num_bvecs;
 172}
 173
 174static struct ceph_osd_data *
 175osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
 176{
 177        BUG_ON(which >= osd_req->r_num_ops);
 178
 179        return &osd_req->r_ops[which].raw_data_in;
 180}
 181
 182struct ceph_osd_data *
 183osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
 184                        unsigned int which)
 185{
 186        return osd_req_op_data(osd_req, which, extent, osd_data);
 187}
 188EXPORT_SYMBOL(osd_req_op_extent_osd_data);
 189
 190void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
 191                        unsigned int which, struct page **pages,
 192                        u64 length, u32 alignment,
 193                        bool pages_from_pool, bool own_pages)
 194{
 195        struct ceph_osd_data *osd_data;
 196
 197        osd_data = osd_req_op_raw_data_in(osd_req, which);
 198        ceph_osd_data_pages_init(osd_data, pages, length, alignment,
 199                                pages_from_pool, own_pages);
 200}
 201EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
 202
 203void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
 204                        unsigned int which, struct page **pages,
 205                        u64 length, u32 alignment,
 206                        bool pages_from_pool, bool own_pages)
 207{
 208        struct ceph_osd_data *osd_data;
 209
 210        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 211        ceph_osd_data_pages_init(osd_data, pages, length, alignment,
 212                                pages_from_pool, own_pages);
 213}
 214EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
 215
 216void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
 217                        unsigned int which, struct ceph_pagelist *pagelist)
 218{
 219        struct ceph_osd_data *osd_data;
 220
 221        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 222        ceph_osd_data_pagelist_init(osd_data, pagelist);
 223}
 224EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
 225
 226#ifdef CONFIG_BLOCK
 227void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
 228                                    unsigned int which,
 229                                    struct ceph_bio_iter *bio_pos,
 230                                    u32 bio_length)
 231{
 232        struct ceph_osd_data *osd_data;
 233
 234        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 235        ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
 236}
 237EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 238#endif /* CONFIG_BLOCK */
 239
 240void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
 241                                      unsigned int which,
 242                                      struct bio_vec *bvecs, u32 num_bvecs,
 243                                      u32 bytes)
 244{
 245        struct ceph_osd_data *osd_data;
 246        struct ceph_bvec_iter it = {
 247                .bvecs = bvecs,
 248                .iter = { .bi_size = bytes },
 249        };
 250
 251        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 252        ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
 253}
 254EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs);
 255
 256void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
 257                                         unsigned int which,
 258                                         struct ceph_bvec_iter *bvec_pos)
 259{
 260        struct ceph_osd_data *osd_data;
 261
 262        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 263        ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0);
 264}
 265EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
 266
 267static void osd_req_op_cls_request_info_pagelist(
 268                        struct ceph_osd_request *osd_req,
 269                        unsigned int which, struct ceph_pagelist *pagelist)
 270{
 271        struct ceph_osd_data *osd_data;
 272
 273        osd_data = osd_req_op_data(osd_req, which, cls, request_info);
 274        ceph_osd_data_pagelist_init(osd_data, pagelist);
 275}
 276
 277void osd_req_op_cls_request_data_pagelist(
 278                        struct ceph_osd_request *osd_req,
 279                        unsigned int which, struct ceph_pagelist *pagelist)
 280{
 281        struct ceph_osd_data *osd_data;
 282
 283        osd_data = osd_req_op_data(osd_req, which, cls, request_data);
 284        ceph_osd_data_pagelist_init(osd_data, pagelist);
 285        osd_req->r_ops[which].cls.indata_len += pagelist->length;
 286        osd_req->r_ops[which].indata_len += pagelist->length;
 287}
 288EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
 289
 290void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
 291                        unsigned int which, struct page **pages, u64 length,
 292                        u32 alignment, bool pages_from_pool, bool own_pages)
 293{
 294        struct ceph_osd_data *osd_data;
 295
 296        osd_data = osd_req_op_data(osd_req, which, cls, request_data);
 297        ceph_osd_data_pages_init(osd_data, pages, length, alignment,
 298                                pages_from_pool, own_pages);
 299        osd_req->r_ops[which].cls.indata_len += length;
 300        osd_req->r_ops[which].indata_len += length;
 301}
 302EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
 303
 304void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
 305                                       unsigned int which,
 306                                       struct bio_vec *bvecs, u32 num_bvecs,
 307                                       u32 bytes)
 308{
 309        struct ceph_osd_data *osd_data;
 310        struct ceph_bvec_iter it = {
 311                .bvecs = bvecs,
 312                .iter = { .bi_size = bytes },
 313        };
 314
 315        osd_data = osd_req_op_data(osd_req, which, cls, request_data);
 316        ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
 317        osd_req->r_ops[which].cls.indata_len += bytes;
 318        osd_req->r_ops[which].indata_len += bytes;
 319}
 320EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
 321
 322void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
 323                        unsigned int which, struct page **pages, u64 length,
 324                        u32 alignment, bool pages_from_pool, bool own_pages)
 325{
 326        struct ceph_osd_data *osd_data;
 327
 328        osd_data = osd_req_op_data(osd_req, which, cls, response_data);
 329        ceph_osd_data_pages_init(osd_data, pages, length, alignment,
 330                                pages_from_pool, own_pages);
 331}
 332EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
 333
 334static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 335{
 336        switch (osd_data->type) {
 337        case CEPH_OSD_DATA_TYPE_NONE:
 338                return 0;
 339        case CEPH_OSD_DATA_TYPE_PAGES:
 340                return osd_data->length;
 341        case CEPH_OSD_DATA_TYPE_PAGELIST:
 342                return (u64)osd_data->pagelist->length;
 343#ifdef CONFIG_BLOCK
 344        case CEPH_OSD_DATA_TYPE_BIO:
 345                return (u64)osd_data->bio_length;
 346#endif /* CONFIG_BLOCK */
 347        case CEPH_OSD_DATA_TYPE_BVECS:
 348                return osd_data->bvec_pos.iter.bi_size;
 349        default:
 350                WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
 351                return 0;
 352        }
 353}
 354
 355static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
 356{
 357        if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
 358                int num_pages;
 359
 360                num_pages = calc_pages_for((u64)osd_data->alignment,
 361                                                (u64)osd_data->length);
 362                ceph_release_page_vector(osd_data->pages, num_pages);
 363        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
 364                ceph_pagelist_release(osd_data->pagelist);
 365        }
 366        ceph_osd_data_init(osd_data);
 367}
 368
 369static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 370                        unsigned int which)
 371{
 372        struct ceph_osd_req_op *op;
 373
 374        BUG_ON(which >= osd_req->r_num_ops);
 375        op = &osd_req->r_ops[which];
 376
 377        switch (op->op) {
 378        case CEPH_OSD_OP_READ:
 379        case CEPH_OSD_OP_WRITE:
 380        case CEPH_OSD_OP_WRITEFULL:
 381                ceph_osd_data_release(&op->extent.osd_data);
 382                break;
 383        case CEPH_OSD_OP_CALL:
 384                ceph_osd_data_release(&op->cls.request_info);
 385                ceph_osd_data_release(&op->cls.request_data);
 386                ceph_osd_data_release(&op->cls.response_data);
 387                break;
 388        case CEPH_OSD_OP_SETXATTR:
 389        case CEPH_OSD_OP_CMPXATTR:
 390                ceph_osd_data_release(&op->xattr.osd_data);
 391                break;
 392        case CEPH_OSD_OP_STAT:
 393                ceph_osd_data_release(&op->raw_data_in);
 394                break;
 395        case CEPH_OSD_OP_NOTIFY_ACK:
 396                ceph_osd_data_release(&op->notify_ack.request_data);
 397                break;
 398        case CEPH_OSD_OP_NOTIFY:
 399                ceph_osd_data_release(&op->notify.request_data);
 400                ceph_osd_data_release(&op->notify.response_data);
 401                break;
 402        case CEPH_OSD_OP_LIST_WATCHERS:
 403                ceph_osd_data_release(&op->list_watchers.response_data);
 404                break;
 405        case CEPH_OSD_OP_COPY_FROM2:
 406                ceph_osd_data_release(&op->copy_from.osd_data);
 407                break;
 408        default:
 409                break;
 410        }
 411}
 412
 413/*
 414 * Assumes @t is zero-initialized.
 415 */
 416static void target_init(struct ceph_osd_request_target *t)
 417{
 418        ceph_oid_init(&t->base_oid);
 419        ceph_oloc_init(&t->base_oloc);
 420        ceph_oid_init(&t->target_oid);
 421        ceph_oloc_init(&t->target_oloc);
 422
 423        ceph_osds_init(&t->acting);
 424        ceph_osds_init(&t->up);
 425        t->size = -1;
 426        t->min_size = -1;
 427
 428        t->osd = CEPH_HOMELESS_OSD;
 429}
 430
 431static void target_copy(struct ceph_osd_request_target *dest,
 432                        const struct ceph_osd_request_target *src)
 433{
 434        ceph_oid_copy(&dest->base_oid, &src->base_oid);
 435        ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
 436        ceph_oid_copy(&dest->target_oid, &src->target_oid);
 437        ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
 438
 439        dest->pgid = src->pgid; /* struct */
 440        dest->spgid = src->spgid; /* struct */
 441        dest->pg_num = src->pg_num;
 442        dest->pg_num_mask = src->pg_num_mask;
 443        ceph_osds_copy(&dest->acting, &src->acting);
 444        ceph_osds_copy(&dest->up, &src->up);
 445        dest->size = src->size;
 446        dest->min_size = src->min_size;
 447        dest->sort_bitwise = src->sort_bitwise;
 448        dest->recovery_deletes = src->recovery_deletes;
 449
 450        dest->flags = src->flags;
 451        dest->used_replica = src->used_replica;
 452        dest->paused = src->paused;
 453
 454        dest->epoch = src->epoch;
 455        dest->last_force_resend = src->last_force_resend;
 456
 457        dest->osd = src->osd;
 458}
 459
 460static void target_destroy(struct ceph_osd_request_target *t)
 461{
 462        ceph_oid_destroy(&t->base_oid);
 463        ceph_oloc_destroy(&t->base_oloc);
 464        ceph_oid_destroy(&t->target_oid);
 465        ceph_oloc_destroy(&t->target_oloc);
 466}
 467
 468/*
 469 * requests
 470 */
 471static void request_release_checks(struct ceph_osd_request *req)
 472{
 473        WARN_ON(!RB_EMPTY_NODE(&req->r_node));
 474        WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
 475        WARN_ON(!list_empty(&req->r_private_item));
 476        WARN_ON(req->r_osd);
 477}
 478
 479static void ceph_osdc_release_request(struct kref *kref)
 480{
 481        struct ceph_osd_request *req = container_of(kref,
 482                                            struct ceph_osd_request, r_kref);
 483        unsigned int which;
 484
 485        dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
 486             req->r_request, req->r_reply);
 487        request_release_checks(req);
 488
 489        if (req->r_request)
 490                ceph_msg_put(req->r_request);
 491        if (req->r_reply)
 492                ceph_msg_put(req->r_reply);
 493
 494        for (which = 0; which < req->r_num_ops; which++)
 495                osd_req_op_data_release(req, which);
 496
 497        target_destroy(&req->r_t);
 498        ceph_put_snap_context(req->r_snapc);
 499
 500        if (req->r_mempool)
 501                mempool_free(req, req->r_osdc->req_mempool);
 502        else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
 503                kmem_cache_free(ceph_osd_request_cache, req);
 504        else
 505                kfree(req);
 506}
 507
 508void ceph_osdc_get_request(struct ceph_osd_request *req)
 509{
 510        dout("%s %p (was %d)\n", __func__, req,
 511             kref_read(&req->r_kref));
 512        kref_get(&req->r_kref);
 513}
 514EXPORT_SYMBOL(ceph_osdc_get_request);
 515
 516void ceph_osdc_put_request(struct ceph_osd_request *req)
 517{
 518        if (req) {
 519                dout("%s %p (was %d)\n", __func__, req,
 520                     kref_read(&req->r_kref));
 521                kref_put(&req->r_kref, ceph_osdc_release_request);
 522        }
 523}
 524EXPORT_SYMBOL(ceph_osdc_put_request);
 525
 526static void request_init(struct ceph_osd_request *req)
 527{
 528        /* req only, each op is zeroed in osd_req_op_init() */
 529        memset(req, 0, sizeof(*req));
 530
 531        kref_init(&req->r_kref);
 532        init_completion(&req->r_completion);
 533        RB_CLEAR_NODE(&req->r_node);
 534        RB_CLEAR_NODE(&req->r_mc_node);
 535        INIT_LIST_HEAD(&req->r_private_item);
 536
 537        target_init(&req->r_t);
 538}
 539
 540/*
 541 * This is ugly, but it allows us to reuse linger registration and ping
 542 * requests, keeping the structure of the code around send_linger{_ping}()
 543 * reasonable.  Setting up a min_nr=2 mempool for each linger request
 544 * and dealing with copying ops (this blasts req only, watch op remains
 545 * intact) isn't any better.
 546 */
 547static void request_reinit(struct ceph_osd_request *req)
 548{
 549        struct ceph_osd_client *osdc = req->r_osdc;
 550        bool mempool = req->r_mempool;
 551        unsigned int num_ops = req->r_num_ops;
 552        u64 snapid = req->r_snapid;
 553        struct ceph_snap_context *snapc = req->r_snapc;
 554        bool linger = req->r_linger;
 555        struct ceph_msg *request_msg = req->r_request;
 556        struct ceph_msg *reply_msg = req->r_reply;
 557
 558        dout("%s req %p\n", __func__, req);
 559        WARN_ON(kref_read(&req->r_kref) != 1);
 560        request_release_checks(req);
 561
 562        WARN_ON(kref_read(&request_msg->kref) != 1);
 563        WARN_ON(kref_read(&reply_msg->kref) != 1);
 564        target_destroy(&req->r_t);
 565
 566        request_init(req);
 567        req->r_osdc = osdc;
 568        req->r_mempool = mempool;
 569        req->r_num_ops = num_ops;
 570        req->r_snapid = snapid;
 571        req->r_snapc = snapc;
 572        req->r_linger = linger;
 573        req->r_request = request_msg;
 574        req->r_reply = reply_msg;
 575}
 576
 577struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 578                                               struct ceph_snap_context *snapc,
 579                                               unsigned int num_ops,
 580                                               bool use_mempool,
 581                                               gfp_t gfp_flags)
 582{
 583        struct ceph_osd_request *req;
 584
 585        if (use_mempool) {
 586                BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
 587                req = mempool_alloc(osdc->req_mempool, gfp_flags);
 588        } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
 589                req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
 590        } else {
 591                BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
 592                req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags);
 593        }
 594        if (unlikely(!req))
 595                return NULL;
 596
 597        request_init(req);
 598        req->r_osdc = osdc;
 599        req->r_mempool = use_mempool;
 600        req->r_num_ops = num_ops;
 601        req->r_snapid = CEPH_NOSNAP;
 602        req->r_snapc = ceph_get_snap_context(snapc);
 603
 604        dout("%s req %p\n", __func__, req);
 605        return req;
 606}
 607EXPORT_SYMBOL(ceph_osdc_alloc_request);
 608
 609static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
 610{
 611        return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
 612}
 613
 614static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
 615                                      int num_request_data_items,
 616                                      int num_reply_data_items)
 617{
 618        struct ceph_osd_client *osdc = req->r_osdc;
 619        struct ceph_msg *msg;
 620        int msg_size;
 621
 622        WARN_ON(req->r_request || req->r_reply);
 623        WARN_ON(ceph_oid_empty(&req->r_base_oid));
 624        WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
 625
 626        /* create request message */
 627        msg_size = CEPH_ENCODING_START_BLK_LEN +
 628                        CEPH_PGID_ENCODING_LEN + 1; /* spgid */
 629        msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
 630        msg_size += CEPH_ENCODING_START_BLK_LEN +
 631                        sizeof(struct ceph_osd_reqid); /* reqid */
 632        msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
 633        msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
 634        msg_size += CEPH_ENCODING_START_BLK_LEN +
 635                        ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
 636        msg_size += 4 + req->r_base_oid.name_len; /* oid */
 637        msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
 638        msg_size += 8; /* snapid */
 639        msg_size += 8; /* snap_seq */
 640        msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
 641        msg_size += 4 + 8; /* retry_attempt, features */
 642
 643        if (req->r_mempool)
 644                msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
 645                                       num_request_data_items);
 646        else
 647                msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
 648                                    num_request_data_items, gfp, true);
 649        if (!msg)
 650                return -ENOMEM;
 651
 652        memset(msg->front.iov_base, 0, msg->front.iov_len);
 653        req->r_request = msg;
 654
 655        /* create reply message */
 656        msg_size = OSD_OPREPLY_FRONT_LEN;
 657        msg_size += req->r_base_oid.name_len;
 658        msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
 659
 660        if (req->r_mempool)
 661                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
 662                                       num_reply_data_items);
 663        else
 664                msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
 665                                    num_reply_data_items, gfp, true);
 666        if (!msg)
 667                return -ENOMEM;
 668
 669        req->r_reply = msg;
 670
 671        return 0;
 672}
 673
 674static bool osd_req_opcode_valid(u16 opcode)
 675{
 676        switch (opcode) {
 677#define GENERATE_CASE(op, opcode, str)  case CEPH_OSD_OP_##op: return true;
 678__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
 679#undef GENERATE_CASE
 680        default:
 681                return false;
 682        }
 683}
 684
 685static void get_num_data_items(struct ceph_osd_request *req,
 686                               int *num_request_data_items,
 687                               int *num_reply_data_items)
 688{
 689        struct ceph_osd_req_op *op;
 690
 691        *num_request_data_items = 0;
 692        *num_reply_data_items = 0;
 693
 694        for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
 695                switch (op->op) {
 696                /* request */
 697                case CEPH_OSD_OP_WRITE:
 698                case CEPH_OSD_OP_WRITEFULL:
 699                case CEPH_OSD_OP_SETXATTR:
 700                case CEPH_OSD_OP_CMPXATTR:
 701                case CEPH_OSD_OP_NOTIFY_ACK:
 702                case CEPH_OSD_OP_COPY_FROM2:
 703                        *num_request_data_items += 1;
 704                        break;
 705
 706                /* reply */
 707                case CEPH_OSD_OP_STAT:
 708                case CEPH_OSD_OP_READ:
 709                case CEPH_OSD_OP_LIST_WATCHERS:
 710                        *num_reply_data_items += 1;
 711                        break;
 712
 713                /* both */
 714                case CEPH_OSD_OP_NOTIFY:
 715                        *num_request_data_items += 1;
 716                        *num_reply_data_items += 1;
 717                        break;
 718                case CEPH_OSD_OP_CALL:
 719                        *num_request_data_items += 2;
 720                        *num_reply_data_items += 1;
 721                        break;
 722
 723                default:
 724                        WARN_ON(!osd_req_opcode_valid(op->op));
 725                        break;
 726                }
 727        }
 728}
 729
 730/*
 731 * oid, oloc and OSD op opcode(s) must be filled in before this function
 732 * is called.
 733 */
 734int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 735{
 736        int num_request_data_items, num_reply_data_items;
 737
 738        get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
 739        return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
 740                                          num_reply_data_items);
 741}
 742EXPORT_SYMBOL(ceph_osdc_alloc_messages);
 743
 744/*
 745 * This is an osd op init function for opcodes that have no data or
 746 * other information associated with them.  It also serves as a
 747 * common init routine for all the other init functions, below.
 748 */
 749struct ceph_osd_req_op *
 750osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
 751                 u16 opcode, u32 flags)
 752{
 753        struct ceph_osd_req_op *op;
 754
 755        BUG_ON(which >= osd_req->r_num_ops);
 756        BUG_ON(!osd_req_opcode_valid(opcode));
 757
 758        op = &osd_req->r_ops[which];
 759        memset(op, 0, sizeof (*op));
 760        op->op = opcode;
 761        op->flags = flags;
 762
 763        return op;
 764}
 765EXPORT_SYMBOL(osd_req_op_init);
 766
 767void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 768                                unsigned int which, u16 opcode,
 769                                u64 offset, u64 length,
 770                                u64 truncate_size, u32 truncate_seq)
 771{
 772        struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
 773                                                     opcode, 0);
 774        size_t payload_len = 0;
 775
 776        BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
 777               opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
 778               opcode != CEPH_OSD_OP_TRUNCATE);
 779
 780        op->extent.offset = offset;
 781        op->extent.length = length;
 782        op->extent.truncate_size = truncate_size;
 783        op->extent.truncate_seq = truncate_seq;
 784        if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
 785                payload_len += length;
 786
 787        op->indata_len = payload_len;
 788}
 789EXPORT_SYMBOL(osd_req_op_extent_init);
 790
 791void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
 792                                unsigned int which, u64 length)
 793{
 794        struct ceph_osd_req_op *op;
 795        u64 previous;
 796
 797        BUG_ON(which >= osd_req->r_num_ops);
 798        op = &osd_req->r_ops[which];
 799        previous = op->extent.length;
 800
 801        if (length == previous)
 802                return;         /* Nothing to do */
 803        BUG_ON(length > previous);
 804
 805        op->extent.length = length;
 806        if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
 807                op->indata_len -= previous - length;
 808}
 809EXPORT_SYMBOL(osd_req_op_extent_update);
 810
 811void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
 812                                unsigned int which, u64 offset_inc)
 813{
 814        struct ceph_osd_req_op *op, *prev_op;
 815
 816        BUG_ON(which + 1 >= osd_req->r_num_ops);
 817
 818        prev_op = &osd_req->r_ops[which];
 819        op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
 820        /* dup previous one */
 821        op->indata_len = prev_op->indata_len;
 822        op->outdata_len = prev_op->outdata_len;
 823        op->extent = prev_op->extent;
 824        /* adjust offset */
 825        op->extent.offset += offset_inc;
 826        op->extent.length -= offset_inc;
 827
 828        if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
 829                op->indata_len -= offset_inc;
 830}
 831EXPORT_SYMBOL(osd_req_op_extent_dup_last);
 832
 833int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
 834                        const char *class, const char *method)
 835{
 836        struct ceph_osd_req_op *op;
 837        struct ceph_pagelist *pagelist;
 838        size_t payload_len = 0;
 839        size_t size;
 840        int ret;
 841
 842        op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
 843
 844        pagelist = ceph_pagelist_alloc(GFP_NOFS);
 845        if (!pagelist)
 846                return -ENOMEM;
 847
 848        op->cls.class_name = class;
 849        size = strlen(class);
 850        BUG_ON(size > (size_t) U8_MAX);
 851        op->cls.class_len = size;
 852        ret = ceph_pagelist_append(pagelist, class, size);
 853        if (ret)
 854                goto err_pagelist_free;
 855        payload_len += size;
 856
 857        op->cls.method_name = method;
 858        size = strlen(method);
 859        BUG_ON(size > (size_t) U8_MAX);
 860        op->cls.method_len = size;
 861        ret = ceph_pagelist_append(pagelist, method, size);
 862        if (ret)
 863                goto err_pagelist_free;
 864        payload_len += size;
 865
 866        osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
 867        op->indata_len = payload_len;
 868        return 0;
 869
 870err_pagelist_free:
 871        ceph_pagelist_release(pagelist);
 872        return ret;
 873}
 874EXPORT_SYMBOL(osd_req_op_cls_init);
 875
 876int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 877                          u16 opcode, const char *name, const void *value,
 878                          size_t size, u8 cmp_op, u8 cmp_mode)
 879{
 880        struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
 881                                                     opcode, 0);
 882        struct ceph_pagelist *pagelist;
 883        size_t payload_len;
 884        int ret;
 885
 886        BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
 887
 888        pagelist = ceph_pagelist_alloc(GFP_NOFS);
 889        if (!pagelist)
 890                return -ENOMEM;
 891
 892        payload_len = strlen(name);
 893        op->xattr.name_len = payload_len;
 894        ret = ceph_pagelist_append(pagelist, name, payload_len);
 895        if (ret)
 896                goto err_pagelist_free;
 897
 898        op->xattr.value_len = size;
 899        ret = ceph_pagelist_append(pagelist, value, size);
 900        if (ret)
 901                goto err_pagelist_free;
 902        payload_len += size;
 903
 904        op->xattr.cmp_op = cmp_op;
 905        op->xattr.cmp_mode = cmp_mode;
 906
 907        ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
 908        op->indata_len = payload_len;
 909        return 0;
 910
 911err_pagelist_free:
 912        ceph_pagelist_release(pagelist);
 913        return ret;
 914}
 915EXPORT_SYMBOL(osd_req_op_xattr_init);
 916
 917/*
 918 * @watch_opcode: CEPH_OSD_WATCH_OP_*
 919 */
 920static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
 921                                  u64 cookie, u8 watch_opcode)
 922{
 923        struct ceph_osd_req_op *op;
 924
 925        op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
 926        op->watch.cookie = cookie;
 927        op->watch.op = watch_opcode;
 928        op->watch.gen = 0;
 929}
 930
 931/*
 932 * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
 933 */
 934void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 935                                unsigned int which,
 936                                u64 expected_object_size,
 937                                u64 expected_write_size,
 938                                u32 flags)
 939{
 940        struct ceph_osd_req_op *op;
 941
 942        op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
 943        op->alloc_hint.expected_object_size = expected_object_size;
 944        op->alloc_hint.expected_write_size = expected_write_size;
 945        op->alloc_hint.flags = flags;
 946
 947        /*
 948         * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
 949         * not worth a feature bit.  Set FAILOK per-op flag to make
 950         * sure older osds don't trip over an unsupported opcode.
 951         */
 952        op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
 953}
 954EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
 955
 956static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
 957                                struct ceph_osd_data *osd_data)
 958{
 959        u64 length = ceph_osd_data_length(osd_data);
 960
 961        if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
 962                BUG_ON(length > (u64) SIZE_MAX);
 963                if (length)
 964                        ceph_msg_data_add_pages(msg, osd_data->pages,
 965                                        length, osd_data->alignment, false);
 966        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
 967                BUG_ON(!length);
 968                ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
 969#ifdef CONFIG_BLOCK
 970        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
 971                ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
 972#endif
 973        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
 974                ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
 975        } else {
 976                BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
 977        }
 978}
 979
 980static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 981                             const struct ceph_osd_req_op *src)
 982{
 983        switch (src->op) {
 984        case CEPH_OSD_OP_STAT:
 985                break;
 986        case CEPH_OSD_OP_READ:
 987        case CEPH_OSD_OP_WRITE:
 988        case CEPH_OSD_OP_WRITEFULL:
 989        case CEPH_OSD_OP_ZERO:
 990        case CEPH_OSD_OP_TRUNCATE:
 991                dst->extent.offset = cpu_to_le64(src->extent.offset);
 992                dst->extent.length = cpu_to_le64(src->extent.length);
 993                dst->extent.truncate_size =
 994                        cpu_to_le64(src->extent.truncate_size);
 995                dst->extent.truncate_seq =
 996                        cpu_to_le32(src->extent.truncate_seq);
 997                break;
 998        case CEPH_OSD_OP_CALL:
 999                dst->cls.class_len = src->cls.class_len;
1000                dst->cls.method_len = src->cls.method_len;
1001                dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
1002                break;
1003        case CEPH_OSD_OP_WATCH:
1004                dst->watch.cookie = cpu_to_le64(src->watch.cookie);
1005                dst->watch.ver = cpu_to_le64(0);
1006                dst->watch.op = src->watch.op;
1007                dst->watch.gen = cpu_to_le32(src->watch.gen);
1008                break;
1009        case CEPH_OSD_OP_NOTIFY_ACK:
1010                break;
1011        case CEPH_OSD_OP_NOTIFY:
1012                dst->notify.cookie = cpu_to_le64(src->notify.cookie);
1013                break;
1014        case CEPH_OSD_OP_LIST_WATCHERS:
1015                break;
1016        case CEPH_OSD_OP_SETALLOCHINT:
1017                dst->alloc_hint.expected_object_size =
1018                    cpu_to_le64(src->alloc_hint.expected_object_size);
1019                dst->alloc_hint.expected_write_size =
1020                    cpu_to_le64(src->alloc_hint.expected_write_size);
1021                dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
1022                break;
1023        case CEPH_OSD_OP_SETXATTR:
1024        case CEPH_OSD_OP_CMPXATTR:
1025                dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
1026                dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
1027                dst->xattr.cmp_op = src->xattr.cmp_op;
1028                dst->xattr.cmp_mode = src->xattr.cmp_mode;
1029                break;
1030        case CEPH_OSD_OP_CREATE:
1031        case CEPH_OSD_OP_DELETE:
1032                break;
1033        case CEPH_OSD_OP_COPY_FROM2:
1034                dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
1035                dst->copy_from.src_version =
1036                        cpu_to_le64(src->copy_from.src_version);
1037                dst->copy_from.flags = src->copy_from.flags;
1038                dst->copy_from.src_fadvise_flags =
1039                        cpu_to_le32(src->copy_from.src_fadvise_flags);
1040                break;
1041        default:
1042                pr_err("unsupported osd opcode %s\n",
1043                        ceph_osd_op_name(src->op));
1044                WARN_ON(1);
1045
1046                return 0;
1047        }
1048
1049        dst->op = cpu_to_le16(src->op);
1050        dst->flags = cpu_to_le32(src->flags);
1051        dst->payload_len = cpu_to_le32(src->indata_len);
1052
1053        return src->indata_len;
1054}
1055
1056/*
1057 * build new request AND message, calculate layout, and adjust file
1058 * extent as needed.
1059 *
1060 * if the file was recently truncated, we include information about its
1061 * old and new size so that the object can be updated appropriately.  (we
1062 * avoid synchronously deleting truncated objects because it's slow.)
1063 */
1064struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1065                                               struct ceph_file_layout *layout,
1066                                               struct ceph_vino vino,
1067                                               u64 off, u64 *plen,
1068                                               unsigned int which, int num_ops,
1069                                               int opcode, int flags,
1070                                               struct ceph_snap_context *snapc,
1071                                               u32 truncate_seq,
1072                                               u64 truncate_size,
1073                                               bool use_mempool)
1074{
1075        struct ceph_osd_request *req;
1076        u64 objnum = 0;
1077        u64 objoff = 0;
1078        u64 objlen = 0;
1079        int r;
1080
1081        BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
1082               opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
1083               opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
1084
1085        req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
1086                                        GFP_NOFS);
1087        if (!req) {
1088                r = -ENOMEM;
1089                goto fail;
1090        }
1091
1092        /* calculate max write size */
1093        r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
1094        if (r)
1095                goto fail;
1096
1097        if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
1098                osd_req_op_init(req, which, opcode, 0);
1099        } else {
1100                u32 object_size = layout->object_size;
1101                u32 object_base = off - objoff;
1102                if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
1103                        if (truncate_size <= object_base) {
1104                                truncate_size = 0;
1105                        } else {
1106                                truncate_size -= object_base;
1107                                if (truncate_size > object_size)
1108                                        truncate_size = object_size;
1109                        }
1110                }
1111                osd_req_op_extent_init(req, which, opcode, objoff, objlen,
1112                                       truncate_size, truncate_seq);
1113        }
1114
1115        req->r_base_oloc.pool = layout->pool_id;
1116        req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
1117        ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1118        req->r_flags = flags | osdc->client->options->read_from_replica;
1119
1120        req->r_snapid = vino.snap;
1121        if (flags & CEPH_OSD_FLAG_WRITE)
1122                req->r_data_offset = off;
1123
1124        if (num_ops > 1)
1125                /*
1126                 * This is a special case for ceph_writepages_start(), but it
1127                 * also covers ceph_uninline_data().  If more multi-op request
1128                 * use cases emerge, we will need a separate helper.
1129                 */
1130                r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
1131        else
1132                r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1133        if (r)
1134                goto fail;
1135
1136        return req;
1137
1138fail:
1139        ceph_osdc_put_request(req);
1140        return ERR_PTR(r);
1141}
1142EXPORT_SYMBOL(ceph_osdc_new_request);
1143
1144/*
1145 * We keep osd requests in an rbtree, sorted by ->r_tid.
1146 */
1147DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1148DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1149
1150/*
1151 * Call @fn on each OSD request as long as @fn returns 0.
1152 */
1153static void for_each_request(struct ceph_osd_client *osdc,
1154                        int (*fn)(struct ceph_osd_request *req, void *arg),
1155                        void *arg)
1156{
1157        struct rb_node *n, *p;
1158
1159        for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1160                struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1161
1162                for (p = rb_first(&osd->o_requests); p; ) {
1163                        struct ceph_osd_request *req =
1164                            rb_entry(p, struct ceph_osd_request, r_node);
1165
1166                        p = rb_next(p);
1167                        if (fn(req, arg))
1168                                return;
1169                }
1170        }
1171
1172        for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
1173                struct ceph_osd_request *req =
1174                    rb_entry(p, struct ceph_osd_request, r_node);
1175
1176                p = rb_next(p);
1177                if (fn(req, arg))
1178                        return;
1179        }
1180}
1181
1182static bool osd_homeless(struct ceph_osd *osd)
1183{
1184        return osd->o_osd == CEPH_HOMELESS_OSD;
1185}
1186
1187static bool osd_registered(struct ceph_osd *osd)
1188{
1189        verify_osdc_locked(osd->o_osdc);
1190
1191        return !RB_EMPTY_NODE(&osd->o_node);
1192}
1193
1194/*
1195 * Assumes @osd is zero-initialized.
1196 */
1197static void osd_init(struct ceph_osd *osd)
1198{
1199        refcount_set(&osd->o_ref, 1);
1200        RB_CLEAR_NODE(&osd->o_node);
1201        osd->o_requests = RB_ROOT;
1202        osd->o_linger_requests = RB_ROOT;
1203        osd->o_backoff_mappings = RB_ROOT;
1204        osd->o_backoffs_by_id = RB_ROOT;
1205        INIT_LIST_HEAD(&osd->o_osd_lru);
1206        INIT_LIST_HEAD(&osd->o_keepalive_item);
1207        osd->o_incarnation = 1;
1208        mutex_init(&osd->lock);
1209}
1210
1211static void osd_cleanup(struct ceph_osd *osd)
1212{
1213        WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1214        WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1215        WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1216        WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1217        WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1218        WARN_ON(!list_empty(&osd->o_osd_lru));
1219        WARN_ON(!list_empty(&osd->o_keepalive_item));
1220
1221        if (osd->o_auth.authorizer) {
1222                WARN_ON(osd_homeless(osd));
1223                ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1224        }
1225}
1226
1227/*
1228 * Track open sessions with osds.
1229 */
1230static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1231{
1232        struct ceph_osd *osd;
1233
1234        WARN_ON(onum == CEPH_HOMELESS_OSD);
1235
1236        osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1237        osd_init(osd);
1238        osd->o_osdc = osdc;
1239        osd->o_osd = onum;
1240
1241        ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1242
1243        return osd;
1244}
1245
1246static struct ceph_osd *get_osd(struct ceph_osd *osd)
1247{
1248        if (refcount_inc_not_zero(&osd->o_ref)) {
1249                dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1250                     refcount_read(&osd->o_ref));
1251                return osd;
1252        } else {
1253                dout("get_osd %p FAIL\n", osd);
1254                return NULL;
1255        }
1256}
1257
1258static void put_osd(struct ceph_osd *osd)
1259{
1260        dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1261             refcount_read(&osd->o_ref) - 1);
1262        if (refcount_dec_and_test(&osd->o_ref)) {
1263                osd_cleanup(osd);
1264                kfree(osd);
1265        }
1266}
1267
1268DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1269
1270static void __move_osd_to_lru(struct ceph_osd *osd)
1271{
1272        struct ceph_osd_client *osdc = osd->o_osdc;
1273
1274        dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1275        BUG_ON(!list_empty(&osd->o_osd_lru));
1276
1277        spin_lock(&osdc->osd_lru_lock);
1278        list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1279        spin_unlock(&osdc->osd_lru_lock);
1280
1281        osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1282}
1283
1284static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1285{
1286        if (RB_EMPTY_ROOT(&osd->o_requests) &&
1287            RB_EMPTY_ROOT(&osd->o_linger_requests))
1288                __move_osd_to_lru(osd);
1289}
1290
1291static void __remove_osd_from_lru(struct ceph_osd *osd)
1292{
1293        struct ceph_osd_client *osdc = osd->o_osdc;
1294
1295        dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1296
1297        spin_lock(&osdc->osd_lru_lock);
1298        if (!list_empty(&osd->o_osd_lru))
1299                list_del_init(&osd->o_osd_lru);
1300        spin_unlock(&osdc->osd_lru_lock);
1301}
1302
1303/*
1304 * Close the connection and assign any leftover requests to the
1305 * homeless session.
1306 */
1307static void close_osd(struct ceph_osd *osd)
1308{
1309        struct ceph_osd_client *osdc = osd->o_osdc;
1310        struct rb_node *n;
1311
1312        verify_osdc_wrlocked(osdc);
1313        dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1314
1315        ceph_con_close(&osd->o_con);
1316
1317        for (n = rb_first(&osd->o_requests); n; ) {
1318                struct ceph_osd_request *req =
1319                    rb_entry(n, struct ceph_osd_request, r_node);
1320
1321                n = rb_next(n); /* unlink_request() */
1322
1323                dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1324                unlink_request(osd, req);
1325                link_request(&osdc->homeless_osd, req);
1326        }
1327        for (n = rb_first(&osd->o_linger_requests); n; ) {
1328                struct ceph_osd_linger_request *lreq =
1329                    rb_entry(n, struct ceph_osd_linger_request, node);
1330
1331                n = rb_next(n); /* unlink_linger() */
1332
1333                dout(" reassigning lreq %p linger_id %llu\n", lreq,
1334                     lreq->linger_id);
1335                unlink_linger(osd, lreq);
1336                link_linger(&osdc->homeless_osd, lreq);
1337        }
1338        clear_backoffs(osd);
1339
1340        __remove_osd_from_lru(osd);
1341        erase_osd(&osdc->osds, osd);
1342        put_osd(osd);
1343}
1344
1345/*
1346 * reset osd connect
1347 */
1348static int reopen_osd(struct ceph_osd *osd)
1349{
1350        struct ceph_entity_addr *peer_addr;
1351
1352        dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1353
1354        if (RB_EMPTY_ROOT(&osd->o_requests) &&
1355            RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1356                close_osd(osd);
1357                return -ENODEV;
1358        }
1359
1360        peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1361        if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1362                        !ceph_con_opened(&osd->o_con)) {
1363                struct rb_node *n;
1364
1365                dout("osd addr hasn't changed and connection never opened, "
1366                     "letting msgr retry\n");
1367                /* touch each r_stamp for handle_timeout()'s benfit */
1368                for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1369                        struct ceph_osd_request *req =
1370                            rb_entry(n, struct ceph_osd_request, r_node);
1371                        req->r_stamp = jiffies;
1372                }
1373
1374                return -EAGAIN;
1375        }
1376
1377        ceph_con_close(&osd->o_con);
1378        ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1379        osd->o_incarnation++;
1380
1381        return 0;
1382}
1383
1384static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1385                                          bool wrlocked)
1386{
1387        struct ceph_osd *osd;
1388
1389        if (wrlocked)
1390                verify_osdc_wrlocked(osdc);
1391        else
1392                verify_osdc_locked(osdc);
1393
1394        if (o != CEPH_HOMELESS_OSD)
1395                osd = lookup_osd(&osdc->osds, o);
1396        else
1397                osd = &osdc->homeless_osd;
1398        if (!osd) {
1399                if (!wrlocked)
1400                        return ERR_PTR(-EAGAIN);
1401
1402                osd = create_osd(osdc, o);
1403                insert_osd(&osdc->osds, osd);
1404                ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1405                              &osdc->osdmap->osd_addr[osd->o_osd]);
1406        }
1407
1408        dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1409        return osd;
1410}
1411
1412/*
1413 * Create request <-> OSD session relation.
1414 *
1415 * @req has to be assigned a tid, @osd may be homeless.
1416 */
1417static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1418{
1419        verify_osd_locked(osd);
1420        WARN_ON(!req->r_tid || req->r_osd);
1421        dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1422             req, req->r_tid);
1423
1424        if (!osd_homeless(osd))
1425                __remove_osd_from_lru(osd);
1426        else
1427                atomic_inc(&osd->o_osdc->num_homeless);
1428
1429        get_osd(osd);
1430        insert_request(&osd->o_requests, req);
1431        req->r_osd = osd;
1432}
1433
1434static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1435{
1436        verify_osd_locked(osd);
1437        WARN_ON(req->r_osd != osd);
1438        dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1439             req, req->r_tid);
1440
1441        req->r_osd = NULL;
1442        erase_request(&osd->o_requests, req);
1443        put_osd(osd);
1444
1445        if (!osd_homeless(osd))
1446                maybe_move_osd_to_lru(osd);
1447        else
1448                atomic_dec(&osd->o_osdc->num_homeless);
1449}
1450
1451static bool __pool_full(struct ceph_pg_pool_info *pi)
1452{
1453        return pi->flags & CEPH_POOL_FLAG_FULL;
1454}
1455
1456static bool have_pool_full(struct ceph_osd_client *osdc)
1457{
1458        struct rb_node *n;
1459
1460        for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1461                struct ceph_pg_pool_info *pi =
1462                    rb_entry(n, struct ceph_pg_pool_info, node);
1463
1464                if (__pool_full(pi))
1465                        return true;
1466        }
1467
1468        return false;
1469}
1470
1471static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1472{
1473        struct ceph_pg_pool_info *pi;
1474
1475        pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1476        if (!pi)
1477                return false;
1478
1479        return __pool_full(pi);
1480}
1481
1482/*
1483 * Returns whether a request should be blocked from being sent
1484 * based on the current osdmap and osd_client settings.
1485 */
1486static bool target_should_be_paused(struct ceph_osd_client *osdc,
1487                                    const struct ceph_osd_request_target *t,
1488                                    struct ceph_pg_pool_info *pi)
1489{
1490        bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1491        bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1492                       ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1493                       __pool_full(pi);
1494
1495        WARN_ON(pi->id != t->target_oloc.pool);
1496        return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1497               ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1498               (osdc->osdmap->epoch < osdc->epoch_barrier);
1499}
1500
1501static int pick_random_replica(const struct ceph_osds *acting)
1502{
1503        int i = prandom_u32() % acting->size;
1504
1505        dout("%s picked osd%d, primary osd%d\n", __func__,
1506             acting->osds[i], acting->primary);
1507        return i;
1508}
1509
1510/*
1511 * Picks the closest replica based on client's location given by
1512 * crush_location option.  Prefers the primary if the locality is
1513 * the same.
1514 */
1515static int pick_closest_replica(struct ceph_osd_client *osdc,
1516                                const struct ceph_osds *acting)
1517{
1518        struct ceph_options *opt = osdc->client->options;
1519        int best_i, best_locality;
1520        int i = 0, locality;
1521
1522        do {
1523                locality = ceph_get_crush_locality(osdc->osdmap,
1524                                                   acting->osds[i],
1525                                                   &opt->crush_locs);
1526                if (i == 0 ||
1527                    (locality >= 0 && best_locality < 0) ||
1528                    (locality >= 0 && best_locality >= 0 &&
1529                     locality < best_locality)) {
1530                        best_i = i;
1531                        best_locality = locality;
1532                }
1533        } while (++i < acting->size);
1534
1535        dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
1536             acting->osds[best_i], best_locality, acting->primary);
1537        return best_i;
1538}
1539
1540enum calc_target_result {
1541        CALC_TARGET_NO_ACTION = 0,
1542        CALC_TARGET_NEED_RESEND,
1543        CALC_TARGET_POOL_DNE,
1544};
1545
1546static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1547                                           struct ceph_osd_request_target *t,
1548                                           bool any_change)
1549{
1550        struct ceph_pg_pool_info *pi;
1551        struct ceph_pg pgid, last_pgid;
1552        struct ceph_osds up, acting;
1553        bool is_read = t->flags & CEPH_OSD_FLAG_READ;
1554        bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
1555        bool force_resend = false;
1556        bool unpaused = false;
1557        bool legacy_change = false;
1558        bool split = false;
1559        bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1560        bool recovery_deletes = ceph_osdmap_flag(osdc,
1561                                                 CEPH_OSDMAP_RECOVERY_DELETES);
1562        enum calc_target_result ct_res;
1563
1564        t->epoch = osdc->osdmap->epoch;
1565        pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1566        if (!pi) {
1567                t->osd = CEPH_HOMELESS_OSD;
1568                ct_res = CALC_TARGET_POOL_DNE;
1569                goto out;
1570        }
1571
1572        if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1573                if (t->last_force_resend < pi->last_force_request_resend) {
1574                        t->last_force_resend = pi->last_force_request_resend;
1575                        force_resend = true;
1576                } else if (t->last_force_resend == 0) {
1577                        force_resend = true;
1578                }
1579        }
1580
1581        /* apply tiering */
1582        ceph_oid_copy(&t->target_oid, &t->base_oid);
1583        ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1584        if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1585                if (is_read && pi->read_tier >= 0)
1586                        t->target_oloc.pool = pi->read_tier;
1587                if (is_write && pi->write_tier >= 0)
1588                        t->target_oloc.pool = pi->write_tier;
1589
1590                pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1591                if (!pi) {
1592                        t->osd = CEPH_HOMELESS_OSD;
1593                        ct_res = CALC_TARGET_POOL_DNE;
1594                        goto out;
1595                }
1596        }
1597
1598        __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
1599        last_pgid.pool = pgid.pool;
1600        last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1601
1602        ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1603        if (any_change &&
1604            ceph_is_new_interval(&t->acting,
1605                                 &acting,
1606                                 &t->up,
1607                                 &up,
1608                                 t->size,
1609                                 pi->size,
1610                                 t->min_size,
1611                                 pi->min_size,
1612                                 t->pg_num,
1613                                 pi->pg_num,
1614                                 t->sort_bitwise,
1615                                 sort_bitwise,
1616                                 t->recovery_deletes,
1617                                 recovery_deletes,
1618                                 &last_pgid))
1619                force_resend = true;
1620
1621        if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1622                t->paused = false;
1623                unpaused = true;
1624        }
1625        legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1626                        ceph_osds_changed(&t->acting, &acting,
1627                                          t->used_replica || any_change);
1628        if (t->pg_num)
1629                split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1630
1631        if (legacy_change || force_resend || split) {
1632                t->pgid = pgid; /* struct */
1633                ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1634                ceph_osds_copy(&t->acting, &acting);
1635                ceph_osds_copy(&t->up, &up);
1636                t->size = pi->size;
1637                t->min_size = pi->min_size;
1638                t->pg_num = pi->pg_num;
1639                t->pg_num_mask = pi->pg_num_mask;
1640                t->sort_bitwise = sort_bitwise;
1641                t->recovery_deletes = recovery_deletes;
1642
1643                if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
1644                                 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
1645                    !is_write && pi->type == CEPH_POOL_TYPE_REP &&
1646                    acting.size > 1) {
1647                        int pos;
1648
1649                        WARN_ON(!is_read || acting.osds[0] != acting.primary);
1650                        if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
1651                                pos = pick_random_replica(&acting);
1652                        } else {
1653                                pos = pick_closest_replica(osdc, &acting);
1654                        }
1655                        t->osd = acting.osds[pos];
1656                        t->used_replica = pos > 0;
1657                } else {
1658                        t->osd = acting.primary;
1659                        t->used_replica = false;
1660                }
1661        }
1662
1663        if (unpaused || legacy_change || force_resend || split)
1664                ct_res = CALC_TARGET_NEED_RESEND;
1665        else
1666                ct_res = CALC_TARGET_NO_ACTION;
1667
1668out:
1669        dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
1670             legacy_change, force_resend, split, ct_res, t->osd);
1671        return ct_res;
1672}
1673
1674static struct ceph_spg_mapping *alloc_spg_mapping(void)
1675{
1676        struct ceph_spg_mapping *spg;
1677
1678        spg = kmalloc(sizeof(*spg), GFP_NOIO);
1679        if (!spg)
1680                return NULL;
1681
1682        RB_CLEAR_NODE(&spg->node);
1683        spg->backoffs = RB_ROOT;
1684        return spg;
1685}
1686
1687static void free_spg_mapping(struct ceph_spg_mapping *spg)
1688{
1689        WARN_ON(!RB_EMPTY_NODE(&spg->node));
1690        WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1691
1692        kfree(spg);
1693}
1694
1695/*
1696 * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1697 * ceph_pg_mapping.  Used to track OSD backoffs -- a backoff [range] is
1698 * defined only within a specific spgid; it does not pass anything to
1699 * children on split, or to another primary.
1700 */
1701DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1702                 RB_BYPTR, const struct ceph_spg *, node)
1703
1704static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1705{
1706        return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1707}
1708
1709static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1710                                   void **pkey, size_t *pkey_len)
1711{
1712        if (hoid->key_len) {
1713                *pkey = hoid->key;
1714                *pkey_len = hoid->key_len;
1715        } else {
1716                *pkey = hoid->oid;
1717                *pkey_len = hoid->oid_len;
1718        }
1719}
1720
1721static int compare_names(const void *name1, size_t name1_len,
1722                         const void *name2, size_t name2_len)
1723{
1724        int ret;
1725
1726        ret = memcmp(name1, name2, min(name1_len, name2_len));
1727        if (!ret) {
1728                if (name1_len < name2_len)
1729                        ret = -1;
1730                else if (name1_len > name2_len)
1731                        ret = 1;
1732        }
1733        return ret;
1734}
1735
1736static int hoid_compare(const struct ceph_hobject_id *lhs,
1737                        const struct ceph_hobject_id *rhs)
1738{
1739        void *effective_key1, *effective_key2;
1740        size_t effective_key1_len, effective_key2_len;
1741        int ret;
1742
1743        if (lhs->is_max < rhs->is_max)
1744                return -1;
1745        if (lhs->is_max > rhs->is_max)
1746                return 1;
1747
1748        if (lhs->pool < rhs->pool)
1749                return -1;
1750        if (lhs->pool > rhs->pool)
1751                return 1;
1752
1753        if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1754                return -1;
1755        if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1756                return 1;
1757
1758        ret = compare_names(lhs->nspace, lhs->nspace_len,
1759                            rhs->nspace, rhs->nspace_len);
1760        if (ret)
1761                return ret;
1762
1763        hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1764        hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1765        ret = compare_names(effective_key1, effective_key1_len,
1766                            effective_key2, effective_key2_len);
1767        if (ret)
1768                return ret;
1769
1770        ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1771        if (ret)
1772                return ret;
1773
1774        if (lhs->snapid < rhs->snapid)
1775                return -1;
1776        if (lhs->snapid > rhs->snapid)
1777                return 1;
1778
1779        return 0;
1780}
1781
1782/*
1783 * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1784 * compat stuff here.
1785 *
1786 * Assumes @hoid is zero-initialized.
1787 */
1788static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1789{
1790        u8 struct_v;
1791        u32 struct_len;
1792        int ret;
1793
1794        ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1795                                  &struct_len);
1796        if (ret)
1797                return ret;
1798
1799        if (struct_v < 4) {
1800                pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1801                goto e_inval;
1802        }
1803
1804        hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1805                                                GFP_NOIO);
1806        if (IS_ERR(hoid->key)) {
1807                ret = PTR_ERR(hoid->key);
1808                hoid->key = NULL;
1809                return ret;
1810        }
1811
1812        hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1813                                                GFP_NOIO);
1814        if (IS_ERR(hoid->oid)) {
1815                ret = PTR_ERR(hoid->oid);
1816                hoid->oid = NULL;
1817                return ret;
1818        }
1819
1820        ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1821        ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1822        ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1823
1824        hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1825                                                   GFP_NOIO);
1826        if (IS_ERR(hoid->nspace)) {
1827                ret = PTR_ERR(hoid->nspace);
1828                hoid->nspace = NULL;
1829                return ret;
1830        }
1831
1832        ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1833
1834        ceph_hoid_build_hash_cache(hoid);
1835        return 0;
1836
1837e_inval:
1838        return -EINVAL;
1839}
1840
1841static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1842{
1843        return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1844               4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1845}
1846
1847static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1848{
1849        ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1850        ceph_encode_string(p, end, hoid->key, hoid->key_len);
1851        ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1852        ceph_encode_64(p, hoid->snapid);
1853        ceph_encode_32(p, hoid->hash);
1854        ceph_encode_8(p, hoid->is_max);
1855        ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1856        ceph_encode_64(p, hoid->pool);
1857}
1858
1859static void free_hoid(struct ceph_hobject_id *hoid)
1860{
1861        if (hoid) {
1862                kfree(hoid->key);
1863                kfree(hoid->oid);
1864                kfree(hoid->nspace);
1865                kfree(hoid);
1866        }
1867}
1868
1869static struct ceph_osd_backoff *alloc_backoff(void)
1870{
1871        struct ceph_osd_backoff *backoff;
1872
1873        backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1874        if (!backoff)
1875                return NULL;
1876
1877        RB_CLEAR_NODE(&backoff->spg_node);
1878        RB_CLEAR_NODE(&backoff->id_node);
1879        return backoff;
1880}
1881
1882static void free_backoff(struct ceph_osd_backoff *backoff)
1883{
1884        WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1885        WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1886
1887        free_hoid(backoff->begin);
1888        free_hoid(backoff->end);
1889        kfree(backoff);
1890}
1891
1892/*
1893 * Within a specific spgid, backoffs are managed by ->begin hoid.
1894 */
1895DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1896                        RB_BYVAL, spg_node);
1897
1898static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1899                                            const struct ceph_hobject_id *hoid)
1900{
1901        struct rb_node *n = root->rb_node;
1902
1903        while (n) {
1904                struct ceph_osd_backoff *cur =
1905                    rb_entry(n, struct ceph_osd_backoff, spg_node);
1906                int cmp;
1907
1908                cmp = hoid_compare(hoid, cur->begin);
1909                if (cmp < 0) {
1910                        n = n->rb_left;
1911                } else if (cmp > 0) {
1912                        if (hoid_compare(hoid, cur->end) < 0)
1913                                return cur;
1914
1915                        n = n->rb_right;
1916                } else {
1917                        return cur;
1918                }
1919        }
1920
1921        return NULL;
1922}
1923
1924/*
1925 * Each backoff has a unique id within its OSD session.
1926 */
1927DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1928
1929static void clear_backoffs(struct ceph_osd *osd)
1930{
1931        while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1932                struct ceph_spg_mapping *spg =
1933                    rb_entry(rb_first(&osd->o_backoff_mappings),
1934                             struct ceph_spg_mapping, node);
1935
1936                while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1937                        struct ceph_osd_backoff *backoff =
1938                            rb_entry(rb_first(&spg->backoffs),
1939                                     struct ceph_osd_backoff, spg_node);
1940
1941                        erase_backoff(&spg->backoffs, backoff);
1942                        erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
1943                        free_backoff(backoff);
1944                }
1945                erase_spg_mapping(&osd->o_backoff_mappings, spg);
1946                free_spg_mapping(spg);
1947        }
1948}
1949
1950/*
1951 * Set up a temporary, non-owning view into @t.
1952 */
1953static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
1954                                  const struct ceph_osd_request_target *t)
1955{
1956        hoid->key = NULL;
1957        hoid->key_len = 0;
1958        hoid->oid = t->target_oid.name;
1959        hoid->oid_len = t->target_oid.name_len;
1960        hoid->snapid = CEPH_NOSNAP;
1961        hoid->hash = t->pgid.seed;
1962        hoid->is_max = false;
1963        if (t->target_oloc.pool_ns) {
1964                hoid->nspace = t->target_oloc.pool_ns->str;
1965                hoid->nspace_len = t->target_oloc.pool_ns->len;
1966        } else {
1967                hoid->nspace = NULL;
1968                hoid->nspace_len = 0;
1969        }
1970        hoid->pool = t->target_oloc.pool;
1971        ceph_hoid_build_hash_cache(hoid);
1972}
1973
1974static bool should_plug_request(struct ceph_osd_request *req)
1975{
1976        struct ceph_osd *osd = req->r_osd;
1977        struct ceph_spg_mapping *spg;
1978        struct ceph_osd_backoff *backoff;
1979        struct ceph_hobject_id hoid;
1980
1981        spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
1982        if (!spg)
1983                return false;
1984
1985        hoid_fill_from_target(&hoid, &req->r_t);
1986        backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
1987        if (!backoff)
1988                return false;
1989
1990        dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
1991             __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
1992             backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
1993        return true;
1994}
1995
1996/*
1997 * Keep get_num_data_items() in sync with this function.
1998 */
1999static void setup_request_data(struct ceph_osd_request *req)
2000{
2001        struct ceph_msg *request_msg = req->r_request;
2002        struct ceph_msg *reply_msg = req->r_reply;
2003        struct ceph_osd_req_op *op;
2004
2005        if (req->r_request->num_data_items || req->r_reply->num_data_items)
2006                return;
2007
2008        WARN_ON(request_msg->data_length || reply_msg->data_length);
2009        for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
2010                switch (op->op) {
2011                /* request */
2012                case CEPH_OSD_OP_WRITE:
2013                case CEPH_OSD_OP_WRITEFULL:
2014                        WARN_ON(op->indata_len != op->extent.length);
2015                        ceph_osdc_msg_data_add(request_msg,
2016                                               &op->extent.osd_data);
2017                        break;
2018                case CEPH_OSD_OP_SETXATTR:
2019                case CEPH_OSD_OP_CMPXATTR:
2020                        WARN_ON(op->indata_len != op->xattr.name_len +
2021                                                  op->xattr.value_len);
2022                        ceph_osdc_msg_data_add(request_msg,
2023                                               &op->xattr.osd_data);
2024                        break;
2025                case CEPH_OSD_OP_NOTIFY_ACK:
2026                        ceph_osdc_msg_data_add(request_msg,
2027                                               &op->notify_ack.request_data);
2028                        break;
2029                case CEPH_OSD_OP_COPY_FROM2:
2030                        ceph_osdc_msg_data_add(request_msg,
2031                                               &op->copy_from.osd_data);
2032                        break;
2033
2034                /* reply */
2035                case CEPH_OSD_OP_STAT:
2036                        ceph_osdc_msg_data_add(reply_msg,
2037                                               &op->raw_data_in);
2038                        break;
2039                case CEPH_OSD_OP_READ:
2040                        ceph_osdc_msg_data_add(reply_msg,
2041                                               &op->extent.osd_data);
2042                        break;
2043                case CEPH_OSD_OP_LIST_WATCHERS:
2044                        ceph_osdc_msg_data_add(reply_msg,
2045                                               &op->list_watchers.response_data);
2046                        break;
2047
2048                /* both */
2049                case CEPH_OSD_OP_CALL:
2050                        WARN_ON(op->indata_len != op->cls.class_len +
2051                                                  op->cls.method_len +
2052                                                  op->cls.indata_len);
2053                        ceph_osdc_msg_data_add(request_msg,
2054                                               &op->cls.request_info);
2055                        /* optional, can be NONE */
2056                        ceph_osdc_msg_data_add(request_msg,
2057                                               &op->cls.request_data);
2058                        /* optional, can be NONE */
2059                        ceph_osdc_msg_data_add(reply_msg,
2060                                               &op->cls.response_data);
2061                        break;
2062                case CEPH_OSD_OP_NOTIFY:
2063                        ceph_osdc_msg_data_add(request_msg,
2064                                               &op->notify.request_data);
2065                        ceph_osdc_msg_data_add(reply_msg,
2066                                               &op->notify.response_data);
2067                        break;
2068                }
2069        }
2070}
2071
2072static void encode_pgid(void **p, const struct ceph_pg *pgid)
2073{
2074        ceph_encode_8(p, 1);
2075        ceph_encode_64(p, pgid->pool);
2076        ceph_encode_32(p, pgid->seed);
2077        ceph_encode_32(p, -1); /* preferred */
2078}
2079
2080static void encode_spgid(void **p, const struct ceph_spg *spgid)
2081{
2082        ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
2083        encode_pgid(p, &spgid->pgid);
2084        ceph_encode_8(p, spgid->shard);
2085}
2086
2087static void encode_oloc(void **p, void *end,
2088                        const struct ceph_object_locator *oloc)
2089{
2090        ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
2091        ceph_encode_64(p, oloc->pool);
2092        ceph_encode_32(p, -1); /* preferred */
2093        ceph_encode_32(p, 0);  /* key len */
2094        if (oloc->pool_ns)
2095                ceph_encode_string(p, end, oloc->pool_ns->str,
2096                                   oloc->pool_ns->len);
2097        else
2098                ceph_encode_32(p, 0);
2099}
2100
2101static void encode_request_partial(struct ceph_osd_request *req,
2102                                   struct ceph_msg *msg)
2103{
2104        void *p = msg->front.iov_base;
2105        void *const end = p + msg->front_alloc_len;
2106        u32 data_len = 0;
2107        int i;
2108
2109        if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
2110                /* snapshots aren't writeable */
2111                WARN_ON(req->r_snapid != CEPH_NOSNAP);
2112        } else {
2113                WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
2114                        req->r_data_offset || req->r_snapc);
2115        }
2116
2117        setup_request_data(req);
2118
2119        encode_spgid(&p, &req->r_t.spgid); /* actual spg */
2120        ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
2121        ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
2122        ceph_encode_32(&p, req->r_flags);
2123
2124        /* reqid */
2125        ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
2126        memset(p, 0, sizeof(struct ceph_osd_reqid));
2127        p += sizeof(struct ceph_osd_reqid);
2128
2129        /* trace */
2130        memset(p, 0, sizeof(struct ceph_blkin_trace_info));
2131        p += sizeof(struct ceph_blkin_trace_info);
2132
2133        ceph_encode_32(&p, 0); /* client_inc, always 0 */
2134        ceph_encode_timespec64(p, &req->r_mtime);
2135        p += sizeof(struct ceph_timespec);
2136
2137        encode_oloc(&p, end, &req->r_t.target_oloc);
2138        ceph_encode_string(&p, end, req->r_t.target_oid.name,
2139                           req->r_t.target_oid.name_len);
2140
2141        /* ops, can imply data */
2142        ceph_encode_16(&p, req->r_num_ops);
2143        for (i = 0; i < req->r_num_ops; i++) {
2144                data_len += osd_req_encode_op(p, &req->r_ops[i]);
2145                p += sizeof(struct ceph_osd_op);
2146        }
2147
2148        ceph_encode_64(&p, req->r_snapid); /* snapid */
2149        if (req->r_snapc) {
2150                ceph_encode_64(&p, req->r_snapc->seq);
2151                ceph_encode_32(&p, req->r_snapc->num_snaps);
2152                for (i = 0; i < req->r_snapc->num_snaps; i++)
2153                        ceph_encode_64(&p, req->r_snapc->snaps[i]);
2154        } else {
2155                ceph_encode_64(&p, 0); /* snap_seq */
2156                ceph_encode_32(&p, 0); /* snaps len */
2157        }
2158
2159        ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
2160        BUG_ON(p > end - 8); /* space for features */
2161
2162        msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
2163        /* front_len is finalized in encode_request_finish() */
2164        msg->front.iov_len = p - msg->front.iov_base;
2165        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2166        msg->hdr.data_len = cpu_to_le32(data_len);
2167        /*
2168         * The header "data_off" is a hint to the receiver allowing it
2169         * to align received data into its buffers such that there's no
2170         * need to re-copy it before writing it to disk (direct I/O).
2171         */
2172        msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
2173
2174        dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
2175             req->r_t.target_oid.name, req->r_t.target_oid.name_len);
2176}
2177
2178static void encode_request_finish(struct ceph_msg *msg)
2179{
2180        void *p = msg->front.iov_base;
2181        void *const partial_end = p + msg->front.iov_len;
2182        void *const end = p + msg->front_alloc_len;
2183
2184        if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
2185                /* luminous OSD -- encode features and be done */
2186                p = partial_end;
2187                ceph_encode_64(&p, msg->con->peer_features);
2188        } else {
2189                struct {
2190                        char spgid[CEPH_ENCODING_START_BLK_LEN +
2191                                   CEPH_PGID_ENCODING_LEN + 1];
2192                        __le32 hash;
2193                        __le32 epoch;
2194                        __le32 flags;
2195                        char reqid[CEPH_ENCODING_START_BLK_LEN +
2196                                   sizeof(struct ceph_osd_reqid)];
2197                        char trace[sizeof(struct ceph_blkin_trace_info)];
2198                        __le32 client_inc;
2199                        struct ceph_timespec mtime;
2200                } __packed head;
2201                struct ceph_pg pgid;
2202                void *oloc, *oid, *tail;
2203                int oloc_len, oid_len, tail_len;
2204                int len;
2205
2206                /*
2207                 * Pre-luminous OSD -- reencode v8 into v4 using @head
2208                 * as a temporary buffer.  Encode the raw PG; the rest
2209                 * is just a matter of moving oloc, oid and tail blobs
2210                 * around.
2211                 */
2212                memcpy(&head, p, sizeof(head));
2213                p += sizeof(head);
2214
2215                oloc = p;
2216                p += CEPH_ENCODING_START_BLK_LEN;
2217                pgid.pool = ceph_decode_64(&p);
2218                p += 4 + 4; /* preferred, key len */
2219                len = ceph_decode_32(&p);
2220                p += len;   /* nspace */
2221                oloc_len = p - oloc;
2222
2223                oid = p;
2224                len = ceph_decode_32(&p);
2225                p += len;
2226                oid_len = p - oid;
2227
2228                tail = p;
2229                tail_len = partial_end - p;
2230
2231                p = msg->front.iov_base;
2232                ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
2233                ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
2234                ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
2235                ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
2236
2237                /* reassert_version */
2238                memset(p, 0, sizeof(struct ceph_eversion));
2239                p += sizeof(struct ceph_eversion);
2240
2241                BUG_ON(p >= oloc);
2242                memmove(p, oloc, oloc_len);
2243                p += oloc_len;
2244
2245                pgid.seed = le32_to_cpu(head.hash);
2246                encode_pgid(&p, &pgid); /* raw pg */
2247
2248                BUG_ON(p >= oid);
2249                memmove(p, oid, oid_len);
2250                p += oid_len;
2251
2252                /* tail -- ops, snapid, snapc, retry_attempt */
2253                BUG_ON(p >= tail);
2254                memmove(p, tail, tail_len);
2255                p += tail_len;
2256
2257                msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2258        }
2259
2260        BUG_ON(p > end);
2261        msg->front.iov_len = p - msg->front.iov_base;
2262        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2263
2264        dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2265             le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2266             le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2267             le16_to_cpu(msg->hdr.version));
2268}
2269
2270/*
2271 * @req has to be assigned a tid and registered.
2272 */
2273static void send_request(struct ceph_osd_request *req)
2274{
2275        struct ceph_osd *osd = req->r_osd;
2276
2277        verify_osd_locked(osd);
2278        WARN_ON(osd->o_osd != req->r_t.osd);
2279
2280        /* backoff? */
2281        if (should_plug_request(req))
2282                return;
2283
2284        /*
2285         * We may have a previously queued request message hanging
2286         * around.  Cancel it to avoid corrupting the msgr.
2287         */
2288        if (req->r_sent)
2289                ceph_msg_revoke(req->r_request);
2290
2291        req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
2292        if (req->r_attempts)
2293                req->r_flags |= CEPH_OSD_FLAG_RETRY;
2294        else
2295                WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
2296
2297        encode_request_partial(req, req->r_request);
2298
2299        dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2300             __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
2301             req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2302             req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2303             req->r_attempts);
2304
2305        req->r_t.paused = false;
2306        req->r_stamp = jiffies;
2307        req->r_attempts++;
2308
2309        req->r_sent = osd->o_incarnation;
2310        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
2311        ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2312}
2313
2314static void maybe_request_map(struct ceph_osd_client *osdc)
2315{
2316        bool continuous = false;
2317
2318        verify_osdc_locked(osdc);
2319        WARN_ON(!osdc->osdmap->epoch);
2320
2321        if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2322            ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
2323            ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2324                dout("%s osdc %p continuous\n", __func__, osdc);
2325                continuous = true;
2326        } else {
2327                dout("%s osdc %p onetime\n", __func__, osdc);
2328        }
2329
2330        if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2331                               osdc->osdmap->epoch + 1, continuous))
2332                ceph_monc_renew_subs(&osdc->client->monc);
2333}
2334
2335static void complete_request(struct ceph_osd_request *req, int err);
2336static void send_map_check(struct ceph_osd_request *req);
2337
2338static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2339{
2340        struct ceph_osd_client *osdc = req->r_osdc;
2341        struct ceph_osd *osd;
2342        enum calc_target_result ct_res;
2343        int err = 0;
2344        bool need_send = false;
2345        bool promoted = false;
2346
2347        WARN_ON(req->r_tid);
2348        dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
2349
2350again:
2351        ct_res = calc_target(osdc, &req->r_t, false);
2352        if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
2353                goto promote;
2354
2355        osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2356        if (IS_ERR(osd)) {
2357                WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2358                goto promote;
2359        }
2360
2361        if (osdc->abort_err) {
2362                dout("req %p abort_err %d\n", req, osdc->abort_err);
2363                err = osdc->abort_err;
2364        } else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2365                dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2366                     osdc->epoch_barrier);
2367                req->r_t.paused = true;
2368                maybe_request_map(osdc);
2369        } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2370                   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2371                dout("req %p pausewr\n", req);
2372                req->r_t.paused = true;
2373                maybe_request_map(osdc);
2374        } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
2375                   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2376                dout("req %p pauserd\n", req);
2377                req->r_t.paused = true;
2378                maybe_request_map(osdc);
2379        } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2380                   !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
2381                                     CEPH_OSD_FLAG_FULL_FORCE)) &&
2382                   (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2383                    pool_full(osdc, req->r_t.base_oloc.pool))) {
2384                dout("req %p full/pool_full\n", req);
2385                if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) {
2386                        err = -ENOSPC;
2387                } else {
2388                        pr_warn_ratelimited("FULL or reached pool quota\n");
2389                        req->r_t.paused = true;
2390                        maybe_request_map(osdc);
2391                }
2392        } else if (!osd_homeless(osd)) {
2393                need_send = true;
2394        } else {
2395                maybe_request_map(osdc);
2396        }
2397
2398        mutex_lock(&osd->lock);
2399        /*
2400         * Assign the tid atomically with send_request() to protect
2401         * multiple writes to the same object from racing with each
2402         * other, resulting in out of order ops on the OSDs.
2403         */
2404        req->r_tid = atomic64_inc_return(&osdc->last_tid);
2405        link_request(osd, req);
2406        if (need_send)
2407                send_request(req);
2408        else if (err)
2409                complete_request(req, err);
2410        mutex_unlock(&osd->lock);
2411
2412        if (!err && ct_res == CALC_TARGET_POOL_DNE)
2413                send_map_check(req);
2414
2415        if (promoted)
2416                downgrade_write(&osdc->lock);
2417        return;
2418
2419promote:
2420        up_read(&osdc->lock);
2421        down_write(&osdc->lock);
2422        wrlocked = true;
2423        promoted = true;
2424        goto again;
2425}
2426
2427static void account_request(struct ceph_osd_request *req)
2428{
2429        WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
2430        WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
2431
2432        req->r_flags |= CEPH_OSD_FLAG_ONDISK;
2433        atomic_inc(&req->r_osdc->num_requests);
2434
2435        req->r_start_stamp = jiffies;
2436        req->r_start_latency = ktime_get();
2437}
2438
2439static void submit_request(struct ceph_osd_request *req, bool wrlocked)
2440{
2441        ceph_osdc_get_request(req);
2442        account_request(req);
2443        __submit_request(req, wrlocked);
2444}
2445
2446static void finish_request(struct ceph_osd_request *req)
2447{
2448        struct ceph_osd_client *osdc = req->r_osdc;
2449
2450        WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
2451        dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2452
2453        req->r_end_latency = ktime_get();
2454
2455        if (req->r_osd)
2456                unlink_request(req->r_osd, req);
2457        atomic_dec(&osdc->num_requests);
2458
2459        /*
2460         * If an OSD has failed or returned and a request has been sent
2461         * twice, it's possible to get a reply and end up here while the
2462         * request message is queued for delivery.  We will ignore the
2463         * reply, so not a big deal, but better to try and catch it.
2464         */
2465        ceph_msg_revoke(req->r_request);
2466        ceph_msg_revoke_incoming(req->r_reply);
2467}
2468
2469static void __complete_request(struct ceph_osd_request *req)
2470{
2471        dout("%s req %p tid %llu cb %ps result %d\n", __func__, req,
2472             req->r_tid, req->r_callback, req->r_result);
2473
2474        if (req->r_callback)
2475                req->r_callback(req);
2476        complete_all(&req->r_completion);
2477        ceph_osdc_put_request(req);
2478}
2479
2480static void complete_request_workfn(struct work_struct *work)
2481{
2482        struct ceph_osd_request *req =
2483            container_of(work, struct ceph_osd_request, r_complete_work);
2484
2485        __complete_request(req);
2486}
2487
2488/*
2489 * This is open-coded in handle_reply().
2490 */
2491static void complete_request(struct ceph_osd_request *req, int err)
2492{
2493        dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2494
2495        req->r_result = err;
2496        finish_request(req);
2497
2498        INIT_WORK(&req->r_complete_work, complete_request_workfn);
2499        queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
2500}
2501
2502static void cancel_map_check(struct ceph_osd_request *req)
2503{
2504        struct ceph_osd_client *osdc = req->r_osdc;
2505        struct ceph_osd_request *lookup_req;
2506
2507        verify_osdc_wrlocked(osdc);
2508
2509        lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2510        if (!lookup_req)
2511                return;
2512
2513        WARN_ON(lookup_req != req);
2514        erase_request_mc(&osdc->map_checks, req);
2515        ceph_osdc_put_request(req);
2516}
2517
2518static void cancel_request(struct ceph_osd_request *req)
2519{
2520        dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2521
2522        cancel_map_check(req);
2523        finish_request(req);
2524        complete_all(&req->r_completion);
2525        ceph_osdc_put_request(req);
2526}
2527
2528static void abort_request(struct ceph_osd_request *req, int err)
2529{
2530        dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2531
2532        cancel_map_check(req);
2533        complete_request(req, err);
2534}
2535
2536static int abort_fn(struct ceph_osd_request *req, void *arg)
2537{
2538        int err = *(int *)arg;
2539
2540        abort_request(req, err);
2541        return 0; /* continue iteration */
2542}
2543
2544/*
2545 * Abort all in-flight requests with @err and arrange for all future
2546 * requests to be failed immediately.
2547 */
2548void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
2549{
2550        dout("%s osdc %p err %d\n", __func__, osdc, err);
2551        down_write(&osdc->lock);
2552        for_each_request(osdc, abort_fn, &err);
2553        osdc->abort_err = err;
2554        up_write(&osdc->lock);
2555}
2556EXPORT_SYMBOL(ceph_osdc_abort_requests);
2557
2558void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc)
2559{
2560        down_write(&osdc->lock);
2561        osdc->abort_err = 0;
2562        up_write(&osdc->lock);
2563}
2564EXPORT_SYMBOL(ceph_osdc_clear_abort_err);
2565
2566static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2567{
2568        if (likely(eb > osdc->epoch_barrier)) {
2569                dout("updating epoch_barrier from %u to %u\n",
2570                                osdc->epoch_barrier, eb);
2571                osdc->epoch_barrier = eb;
2572                /* Request map if we're not to the barrier yet */
2573                if (eb > osdc->osdmap->epoch)
2574                        maybe_request_map(osdc);
2575        }
2576}
2577
2578void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2579{
2580        down_read(&osdc->lock);
2581        if (unlikely(eb > osdc->epoch_barrier)) {
2582                up_read(&osdc->lock);
2583                down_write(&osdc->lock);
2584                update_epoch_barrier(osdc, eb);
2585                up_write(&osdc->lock);
2586        } else {
2587                up_read(&osdc->lock);
2588        }
2589}
2590EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2591
2592/*
2593 * We can end up releasing caps as a result of abort_request().
2594 * In that case, we probably want to ensure that the cap release message
2595 * has an updated epoch barrier in it, so set the epoch barrier prior to
2596 * aborting the first request.
2597 */
2598static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
2599{
2600        struct ceph_osd_client *osdc = req->r_osdc;
2601        bool *victims = arg;
2602
2603        if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2604            (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2605             pool_full(osdc, req->r_t.base_oloc.pool))) {
2606                if (!*victims) {
2607                        update_epoch_barrier(osdc, osdc->osdmap->epoch);
2608                        *victims = true;
2609                }
2610                abort_request(req, -ENOSPC);
2611        }
2612
2613        return 0; /* continue iteration */
2614}
2615
2616/*
2617 * Drop all pending requests that are stalled waiting on a full condition to
2618 * clear, and complete them with ENOSPC as the return code. Set the
2619 * osdc->epoch_barrier to the latest map epoch that we've seen if any were
2620 * cancelled.
2621 */
2622static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2623{
2624        bool victims = false;
2625
2626        if (ceph_test_opt(osdc->client, ABORT_ON_FULL) &&
2627            (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
2628                for_each_request(osdc, abort_on_full_fn, &victims);
2629}
2630
2631static void check_pool_dne(struct ceph_osd_request *req)
2632{
2633        struct ceph_osd_client *osdc = req->r_osdc;
2634        struct ceph_osdmap *map = osdc->osdmap;
2635
2636        verify_osdc_wrlocked(osdc);
2637        WARN_ON(!map->epoch);
2638
2639        if (req->r_attempts) {
2640                /*
2641                 * We sent a request earlier, which means that
2642                 * previously the pool existed, and now it does not
2643                 * (i.e., it was deleted).
2644                 */
2645                req->r_map_dne_bound = map->epoch;
2646                dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2647                     req->r_tid);
2648        } else {
2649                dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2650                     req, req->r_tid, req->r_map_dne_bound, map->epoch);
2651        }
2652
2653        if (req->r_map_dne_bound) {
2654                if (map->epoch >= req->r_map_dne_bound) {
2655                        /* we had a new enough map */
2656                        pr_info_ratelimited("tid %llu pool does not exist\n",
2657                                            req->r_tid);
2658                        complete_request(req, -ENOENT);
2659                }
2660        } else {
2661                send_map_check(req);
2662        }
2663}
2664
2665static void map_check_cb(struct ceph_mon_generic_request *greq)
2666{
2667        struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2668        struct ceph_osd_request *req;
2669        u64 tid = greq->private_data;
2670
2671        WARN_ON(greq->result || !greq->u.newest);
2672
2673        down_write(&osdc->lock);
2674        req = lookup_request_mc(&osdc->map_checks, tid);
2675        if (!req) {
2676                dout("%s tid %llu dne\n", __func__, tid);
2677                goto out_unlock;
2678        }
2679
2680        dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2681             req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2682        if (!req->r_map_dne_bound)
2683                req->r_map_dne_bound = greq->u.newest;
2684        erase_request_mc(&osdc->map_checks, req);
2685        check_pool_dne(req);
2686
2687        ceph_osdc_put_request(req);
2688out_unlock:
2689        up_write(&osdc->lock);
2690}
2691
2692static void send_map_check(struct ceph_osd_request *req)
2693{
2694        struct ceph_osd_client *osdc = req->r_osdc;
2695        struct ceph_osd_request *lookup_req;
2696        int ret;
2697
2698        verify_osdc_wrlocked(osdc);
2699
2700        lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2701        if (lookup_req) {
2702                WARN_ON(lookup_req != req);
2703                return;
2704        }
2705
2706        ceph_osdc_get_request(req);
2707        insert_request_mc(&osdc->map_checks, req);
2708        ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2709                                          map_check_cb, req->r_tid);
2710        WARN_ON(ret);
2711}
2712
2713/*
2714 * lingering requests, watch/notify v2 infrastructure
2715 */
2716static void linger_release(struct kref *kref)
2717{
2718        struct ceph_osd_linger_request *lreq =
2719            container_of(kref, struct ceph_osd_linger_request, kref);
2720
2721        dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2722             lreq->reg_req, lreq->ping_req);
2723        WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2724        WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2725        WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2726        WARN_ON(!list_empty(&lreq->scan_item));
2727        WARN_ON(!list_empty(&lreq->pending_lworks));
2728        WARN_ON(lreq->osd);
2729
2730        if (lreq->reg_req)
2731                ceph_osdc_put_request(lreq->reg_req);
2732        if (lreq->ping_req)
2733                ceph_osdc_put_request(lreq->ping_req);
2734        target_destroy(&lreq->t);
2735        kfree(lreq);
2736}
2737
2738static void linger_put(struct ceph_osd_linger_request *lreq)
2739{
2740        if (lreq)
2741                kref_put(&lreq->kref, linger_release);
2742}
2743
2744static struct ceph_osd_linger_request *
2745linger_get(struct ceph_osd_linger_request *lreq)
2746{
2747        kref_get(&lreq->kref);
2748        return lreq;
2749}
2750
2751static struct ceph_osd_linger_request *
2752linger_alloc(struct ceph_osd_client *osdc)
2753{
2754        struct ceph_osd_linger_request *lreq;
2755
2756        lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2757        if (!lreq)
2758                return NULL;
2759
2760        kref_init(&lreq->kref);
2761        mutex_init(&lreq->lock);
2762        RB_CLEAR_NODE(&lreq->node);
2763        RB_CLEAR_NODE(&lreq->osdc_node);
2764        RB_CLEAR_NODE(&lreq->mc_node);
2765        INIT_LIST_HEAD(&lreq->scan_item);
2766        INIT_LIST_HEAD(&lreq->pending_lworks);
2767        init_completion(&lreq->reg_commit_wait);
2768        init_completion(&lreq->notify_finish_wait);
2769
2770        lreq->osdc = osdc;
2771        target_init(&lreq->t);
2772
2773        dout("%s lreq %p\n", __func__, lreq);
2774        return lreq;
2775}
2776
2777DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2778DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2779DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2780
2781/*
2782 * Create linger request <-> OSD session relation.
2783 *
2784 * @lreq has to be registered, @osd may be homeless.
2785 */
2786static void link_linger(struct ceph_osd *osd,
2787                        struct ceph_osd_linger_request *lreq)
2788{
2789        verify_osd_locked(osd);
2790        WARN_ON(!lreq->linger_id || lreq->osd);
2791        dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2792             osd->o_osd, lreq, lreq->linger_id);
2793
2794        if (!osd_homeless(osd))
2795                __remove_osd_from_lru(osd);
2796        else
2797                atomic_inc(&osd->o_osdc->num_homeless);
2798
2799        get_osd(osd);
2800        insert_linger(&osd->o_linger_requests, lreq);
2801        lreq->osd = osd;
2802}
2803
2804static void unlink_linger(struct ceph_osd *osd,
2805                          struct ceph_osd_linger_request *lreq)
2806{
2807        verify_osd_locked(osd);
2808        WARN_ON(lreq->osd != osd);
2809        dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2810             osd->o_osd, lreq, lreq->linger_id);
2811
2812        lreq->osd = NULL;
2813        erase_linger(&osd->o_linger_requests, lreq);
2814        put_osd(osd);
2815
2816        if (!osd_homeless(osd))
2817                maybe_move_osd_to_lru(osd);
2818        else
2819                atomic_dec(&osd->o_osdc->num_homeless);
2820}
2821
2822static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2823{
2824        verify_osdc_locked(lreq->osdc);
2825
2826        return !RB_EMPTY_NODE(&lreq->osdc_node);
2827}
2828
2829static bool linger_registered(struct ceph_osd_linger_request *lreq)
2830{
2831        struct ceph_osd_client *osdc = lreq->osdc;
2832        bool registered;
2833
2834        down_read(&osdc->lock);
2835        registered = __linger_registered(lreq);
2836        up_read(&osdc->lock);
2837
2838        return registered;
2839}
2840
2841static void linger_register(struct ceph_osd_linger_request *lreq)
2842{
2843        struct ceph_osd_client *osdc = lreq->osdc;
2844
2845        verify_osdc_wrlocked(osdc);
2846        WARN_ON(lreq->linger_id);
2847
2848        linger_get(lreq);
2849        lreq->linger_id = ++osdc->last_linger_id;
2850        insert_linger_osdc(&osdc->linger_requests, lreq);
2851}
2852
2853static void linger_unregister(struct ceph_osd_linger_request *lreq)
2854{
2855        struct ceph_osd_client *osdc = lreq->osdc;
2856
2857        verify_osdc_wrlocked(osdc);
2858
2859        erase_linger_osdc(&osdc->linger_requests, lreq);
2860        linger_put(lreq);
2861}
2862
2863static void cancel_linger_request(struct ceph_osd_request *req)
2864{
2865        struct ceph_osd_linger_request *lreq = req->r_priv;
2866
2867        WARN_ON(!req->r_linger);
2868        cancel_request(req);
2869        linger_put(lreq);
2870}
2871
2872struct linger_work {
2873        struct work_struct work;
2874        struct ceph_osd_linger_request *lreq;
2875        struct list_head pending_item;
2876        unsigned long queued_stamp;
2877
2878        union {
2879                struct {
2880                        u64 notify_id;
2881                        u64 notifier_id;
2882                        void *payload; /* points into @msg front */
2883                        size_t payload_len;
2884
2885                        struct ceph_msg *msg; /* for ceph_msg_put() */
2886                } notify;
2887                struct {
2888                        int err;
2889                } error;
2890        };
2891};
2892
2893static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2894                                       work_func_t workfn)
2895{
2896        struct linger_work *lwork;
2897
2898        lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2899        if (!lwork)
2900                return NULL;
2901
2902        INIT_WORK(&lwork->work, workfn);
2903        INIT_LIST_HEAD(&lwork->pending_item);
2904        lwork->lreq = linger_get(lreq);
2905
2906        return lwork;
2907}
2908
2909static void lwork_free(struct linger_work *lwork)
2910{
2911        struct ceph_osd_linger_request *lreq = lwork->lreq;
2912
2913        mutex_lock(&lreq->lock);
2914        list_del(&lwork->pending_item);
2915        mutex_unlock(&lreq->lock);
2916
2917        linger_put(lreq);
2918        kfree(lwork);
2919}
2920
2921static void lwork_queue(struct linger_work *lwork)
2922{
2923        struct ceph_osd_linger_request *lreq = lwork->lreq;
2924        struct ceph_osd_client *osdc = lreq->osdc;
2925
2926        verify_lreq_locked(lreq);
2927        WARN_ON(!list_empty(&lwork->pending_item));
2928
2929        lwork->queued_stamp = jiffies;
2930        list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
2931        queue_work(osdc->notify_wq, &lwork->work);
2932}
2933
2934static void do_watch_notify(struct work_struct *w)
2935{
2936        struct linger_work *lwork = container_of(w, struct linger_work, work);
2937        struct ceph_osd_linger_request *lreq = lwork->lreq;
2938
2939        if (!linger_registered(lreq)) {
2940                dout("%s lreq %p not registered\n", __func__, lreq);
2941                goto out;
2942        }
2943
2944        WARN_ON(!lreq->is_watch);
2945        dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
2946             __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
2947             lwork->notify.payload_len);
2948        lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
2949                  lwork->notify.notifier_id, lwork->notify.payload,
2950                  lwork->notify.payload_len);
2951
2952out:
2953        ceph_msg_put(lwork->notify.msg);
2954        lwork_free(lwork);
2955}
2956
2957static void do_watch_error(struct work_struct *w)
2958{
2959        struct linger_work *lwork = container_of(w, struct linger_work, work);
2960        struct ceph_osd_linger_request *lreq = lwork->lreq;
2961
2962        if (!linger_registered(lreq)) {
2963                dout("%s lreq %p not registered\n", __func__, lreq);
2964                goto out;
2965        }
2966
2967        dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
2968        lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
2969
2970out:
2971        lwork_free(lwork);
2972}
2973
2974static void queue_watch_error(struct ceph_osd_linger_request *lreq)
2975{
2976        struct linger_work *lwork;
2977
2978        lwork = lwork_alloc(lreq, do_watch_error);
2979        if (!lwork) {
2980                pr_err("failed to allocate error-lwork\n");
2981                return;
2982        }
2983
2984        lwork->error.err = lreq->last_error;
2985        lwork_queue(lwork);
2986}
2987
2988static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
2989                                       int result)
2990{
2991        if (!completion_done(&lreq->reg_commit_wait)) {
2992                lreq->reg_commit_error = (result <= 0 ? result : 0);
2993                complete_all(&lreq->reg_commit_wait);
2994        }
2995}
2996
2997static void linger_commit_cb(struct ceph_osd_request *req)
2998{
2999        struct ceph_osd_linger_request *lreq = req->r_priv;
3000
3001        mutex_lock(&lreq->lock);
3002        dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
3003             lreq->linger_id, req->r_result);
3004        linger_reg_commit_complete(lreq, req->r_result);
3005        lreq->committed = true;
3006
3007        if (!lreq->is_watch) {
3008                struct ceph_osd_data *osd_data =
3009                    osd_req_op_data(req, 0, notify, response_data);
3010                void *p = page_address(osd_data->pages[0]);
3011
3012                WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
3013                        osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
3014
3015                /* make note of the notify_id */
3016                if (req->r_ops[0].outdata_len >= sizeof(u64)) {
3017                        lreq->notify_id = ceph_decode_64(&p);
3018                        dout("lreq %p notify_id %llu\n", lreq,
3019                             lreq->notify_id);
3020                } else {
3021                        dout("lreq %p no notify_id\n", lreq);
3022                }
3023        }
3024
3025        mutex_unlock(&lreq->lock);
3026        linger_put(lreq);
3027}
3028
3029static int normalize_watch_error(int err)
3030{
3031        /*
3032         * Translate ENOENT -> ENOTCONN so that a delete->disconnection
3033         * notification and a failure to reconnect because we raced with
3034         * the delete appear the same to the user.
3035         */
3036        if (err == -ENOENT)
3037                err = -ENOTCONN;
3038
3039        return err;
3040}
3041
3042static void linger_reconnect_cb(struct ceph_osd_request *req)
3043{
3044        struct ceph_osd_linger_request *lreq = req->r_priv;
3045
3046        mutex_lock(&lreq->lock);
3047        dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
3048             lreq, lreq->linger_id, req->r_result, lreq->last_error);
3049        if (req->r_result < 0) {
3050                if (!lreq->last_error) {
3051                        lreq->last_error = normalize_watch_error(req->r_result);
3052                        queue_watch_error(lreq);
3053                }
3054        }
3055
3056        mutex_unlock(&lreq->lock);
3057        linger_put(lreq);
3058}
3059
3060static void send_linger(struct ceph_osd_linger_request *lreq)
3061{
3062        struct ceph_osd_request *req = lreq->reg_req;
3063        struct ceph_osd_req_op *op = &req->r_ops[0];
3064
3065        verify_osdc_wrlocked(req->r_osdc);
3066        dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3067
3068        if (req->r_osd)
3069                cancel_linger_request(req);
3070
3071        request_reinit(req);
3072        target_copy(&req->r_t, &lreq->t);
3073        req->r_mtime = lreq->mtime;
3074
3075        mutex_lock(&lreq->lock);
3076        if (lreq->is_watch && lreq->committed) {
3077                WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
3078                        op->watch.cookie != lreq->linger_id);
3079                op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
3080                op->watch.gen = ++lreq->register_gen;
3081                dout("lreq %p reconnect register_gen %u\n", lreq,
3082                     op->watch.gen);
3083                req->r_callback = linger_reconnect_cb;
3084        } else {
3085                if (!lreq->is_watch)
3086                        lreq->notify_id = 0;
3087                else
3088                        WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
3089                dout("lreq %p register\n", lreq);
3090                req->r_callback = linger_commit_cb;
3091        }
3092        mutex_unlock(&lreq->lock);
3093
3094        req->r_priv = linger_get(lreq);
3095        req->r_linger = true;
3096
3097        submit_request(req, true);
3098}
3099
3100static void linger_ping_cb(struct ceph_osd_request *req)
3101{
3102        struct ceph_osd_linger_request *lreq = req->r_priv;
3103
3104        mutex_lock(&lreq->lock);
3105        dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
3106             __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
3107             lreq->last_error);
3108        if (lreq->register_gen == req->r_ops[0].watch.gen) {
3109                if (!req->r_result) {
3110                        lreq->watch_valid_thru = lreq->ping_sent;
3111                } else if (!lreq->last_error) {
3112                        lreq->last_error = normalize_watch_error(req->r_result);
3113                        queue_watch_error(lreq);
3114                }
3115        } else {
3116                dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
3117                     lreq->register_gen, req->r_ops[0].watch.gen);
3118        }
3119
3120        mutex_unlock(&lreq->lock);
3121        linger_put(lreq);
3122}
3123
3124static void send_linger_ping(struct ceph_osd_linger_request *lreq)
3125{
3126        struct ceph_osd_client *osdc = lreq->osdc;
3127        struct ceph_osd_request *req = lreq->ping_req;
3128        struct ceph_osd_req_op *op = &req->r_ops[0];
3129
3130        if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
3131                dout("%s PAUSERD\n", __func__);
3132                return;
3133        }
3134
3135        lreq->ping_sent = jiffies;
3136        dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
3137             __func__, lreq, lreq->linger_id, lreq->ping_sent,
3138             lreq->register_gen);
3139
3140        if (req->r_osd)
3141                cancel_linger_request(req);
3142
3143        request_reinit(req);
3144        target_copy(&req->r_t, &lreq->t);
3145
3146        WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
3147                op->watch.cookie != lreq->linger_id ||
3148                op->watch.op != CEPH_OSD_WATCH_OP_PING);
3149        op->watch.gen = lreq->register_gen;
3150        req->r_callback = linger_ping_cb;
3151        req->r_priv = linger_get(lreq);
3152        req->r_linger = true;
3153
3154        ceph_osdc_get_request(req);
3155        account_request(req);
3156        req->r_tid = atomic64_inc_return(&osdc->last_tid);
3157        link_request(lreq->osd, req);
3158        send_request(req);
3159}
3160
3161static void linger_submit(struct ceph_osd_linger_request *lreq)
3162{
3163        struct ceph_osd_client *osdc = lreq->osdc;
3164        struct ceph_osd *osd;
3165
3166        down_write(&osdc->lock);
3167        linger_register(lreq);
3168        if (lreq->is_watch) {
3169                lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id;
3170                lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id;
3171        } else {
3172                lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id;
3173        }
3174
3175        calc_target(osdc, &lreq->t, false);
3176        osd = lookup_create_osd(osdc, lreq->t.osd, true);
3177        link_linger(osd, lreq);
3178
3179        send_linger(lreq);
3180        up_write(&osdc->lock);
3181}
3182
3183static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
3184{
3185        struct ceph_osd_client *osdc = lreq->osdc;
3186        struct ceph_osd_linger_request *lookup_lreq;
3187
3188        verify_osdc_wrlocked(osdc);
3189
3190        lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3191                                       lreq->linger_id);
3192        if (!lookup_lreq)
3193                return;
3194
3195        WARN_ON(lookup_lreq != lreq);
3196        erase_linger_mc(&osdc->linger_map_checks, lreq);
3197        linger_put(lreq);
3198}
3199
3200/*
3201 * @lreq has to be both registered and linked.
3202 */
3203static void __linger_cancel(struct ceph_osd_linger_request *lreq)
3204{
3205        if (lreq->is_watch && lreq->ping_req->r_osd)
3206                cancel_linger_request(lreq->ping_req);
3207        if (lreq->reg_req->r_osd)
3208                cancel_linger_request(lreq->reg_req);
3209        cancel_linger_map_check(lreq);
3210        unlink_linger(lreq->osd, lreq);
3211        linger_unregister(lreq);
3212}
3213
3214static void linger_cancel(struct ceph_osd_linger_request *lreq)
3215{
3216        struct ceph_osd_client *osdc = lreq->osdc;
3217
3218        down_write(&osdc->lock);
3219        if (__linger_registered(lreq))
3220                __linger_cancel(lreq);
3221        up_write(&osdc->lock);
3222}
3223
3224static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
3225
3226static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
3227{
3228        struct ceph_osd_client *osdc = lreq->osdc;
3229        struct ceph_osdmap *map = osdc->osdmap;
3230
3231        verify_osdc_wrlocked(osdc);
3232        WARN_ON(!map->epoch);
3233
3234        if (lreq->register_gen) {
3235                lreq->map_dne_bound = map->epoch;
3236                dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
3237                     lreq, lreq->linger_id);
3238        } else {
3239                dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
3240                     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3241                     map->epoch);
3242        }
3243
3244        if (lreq->map_dne_bound) {
3245                if (map->epoch >= lreq->map_dne_bound) {
3246                        /* we had a new enough map */
3247                        pr_info("linger_id %llu pool does not exist\n",
3248                                lreq->linger_id);
3249                        linger_reg_commit_complete(lreq, -ENOENT);
3250                        __linger_cancel(lreq);
3251                }
3252        } else {
3253                send_linger_map_check(lreq);
3254        }
3255}
3256
3257static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
3258{
3259        struct ceph_osd_client *osdc = &greq->monc->client->osdc;
3260        struct ceph_osd_linger_request *lreq;
3261        u64 linger_id = greq->private_data;
3262
3263        WARN_ON(greq->result || !greq->u.newest);
3264
3265        down_write(&osdc->lock);
3266        lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
3267        if (!lreq) {
3268                dout("%s linger_id %llu dne\n", __func__, linger_id);
3269                goto out_unlock;
3270        }
3271
3272        dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
3273             __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3274             greq->u.newest);
3275        if (!lreq->map_dne_bound)
3276                lreq->map_dne_bound = greq->u.newest;
3277        erase_linger_mc(&osdc->linger_map_checks, lreq);
3278        check_linger_pool_dne(lreq);
3279
3280        linger_put(lreq);
3281out_unlock:
3282        up_write(&osdc->lock);
3283}
3284
3285static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
3286{
3287        struct ceph_osd_client *osdc = lreq->osdc;
3288        struct ceph_osd_linger_request *lookup_lreq;
3289        int ret;
3290
3291        verify_osdc_wrlocked(osdc);
3292
3293        lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3294                                       lreq->linger_id);
3295        if (lookup_lreq) {
3296                WARN_ON(lookup_lreq != lreq);
3297                return;
3298        }
3299
3300        linger_get(lreq);
3301        insert_linger_mc(&osdc->linger_map_checks, lreq);
3302        ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
3303                                          linger_map_check_cb, lreq->linger_id);
3304        WARN_ON(ret);
3305}
3306
3307static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
3308{
3309        int ret;
3310
3311        dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3312        ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
3313        return ret ?: lreq->reg_commit_error;
3314}
3315
3316static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
3317{
3318        int ret;
3319
3320        dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3321        ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
3322        return ret ?: lreq->notify_finish_error;
3323}
3324
3325/*
3326 * Timeout callback, called every N seconds.  When 1 or more OSD
3327 * requests has been active for more than N seconds, we send a keepalive
3328 * (tag + timestamp) to its OSD to ensure any communications channel
3329 * reset is detected.
3330 */
3331static void handle_timeout(struct work_struct *work)
3332{
3333        struct ceph_osd_client *osdc =
3334                container_of(work, struct ceph_osd_client, timeout_work.work);
3335        struct ceph_options *opts = osdc->client->options;
3336        unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
3337        unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
3338        LIST_HEAD(slow_osds);
3339        struct rb_node *n, *p;
3340
3341        dout("%s osdc %p\n", __func__, osdc);
3342        down_write(&osdc->lock);
3343
3344        /*
3345         * ping osds that are a bit slow.  this ensures that if there
3346         * is a break in the TCP connection we will notice, and reopen
3347         * a connection with that osd (from the fault callback).
3348         */
3349        for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3350                struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3351                bool found = false;
3352
3353                for (p = rb_first(&osd->o_requests); p; ) {
3354                        struct ceph_osd_request *req =
3355                            rb_entry(p, struct ceph_osd_request, r_node);
3356
3357                        p = rb_next(p); /* abort_request() */
3358
3359                        if (time_before(req->r_stamp, cutoff)) {
3360                                dout(" req %p tid %llu on osd%d is laggy\n",
3361                                     req, req->r_tid, osd->o_osd);
3362                                found = true;
3363                        }
3364                        if (opts->osd_request_timeout &&
3365                            time_before(req->r_start_stamp, expiry_cutoff)) {
3366                                pr_err_ratelimited("tid %llu on osd%d timeout\n",
3367                                       req->r_tid, osd->o_osd);
3368                                abort_request(req, -ETIMEDOUT);
3369                        }
3370                }
3371                for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3372                        struct ceph_osd_linger_request *lreq =
3373                            rb_entry(p, struct ceph_osd_linger_request, node);
3374
3375                        dout(" lreq %p linger_id %llu is served by osd%d\n",
3376                             lreq, lreq->linger_id, osd->o_osd);
3377                        found = true;
3378
3379                        mutex_lock(&lreq->lock);
3380                        if (lreq->is_watch && lreq->committed && !lreq->last_error)
3381                                send_linger_ping(lreq);
3382                        mutex_unlock(&lreq->lock);
3383                }
3384
3385                if (found)
3386                        list_move_tail(&osd->o_keepalive_item, &slow_osds);
3387        }
3388
3389        if (opts->osd_request_timeout) {
3390                for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
3391                        struct ceph_osd_request *req =
3392                            rb_entry(p, struct ceph_osd_request, r_node);
3393
3394                        p = rb_next(p); /* abort_request() */
3395
3396                        if (time_before(req->r_start_stamp, expiry_cutoff)) {
3397                                pr_err_ratelimited("tid %llu on osd%d timeout\n",
3398                                       req->r_tid, osdc->homeless_osd.o_osd);
3399                                abort_request(req, -ETIMEDOUT);
3400                        }
3401                }
3402        }
3403
3404        if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
3405                maybe_request_map(osdc);
3406
3407        while (!list_empty(&slow_osds)) {
3408                struct ceph_osd *osd = list_first_entry(&slow_osds,
3409                                                        struct ceph_osd,
3410                                                        o_keepalive_item);
3411                list_del_init(&osd->o_keepalive_item);
3412                ceph_con_keepalive(&osd->o_con);
3413        }
3414
3415        up_write(&osdc->lock);
3416        schedule_delayed_work(&osdc->timeout_work,
3417                              osdc->client->options->osd_keepalive_timeout);
3418}
3419
3420static void handle_osds_timeout(struct work_struct *work)
3421{
3422        struct ceph_osd_client *osdc =
3423                container_of(work, struct ceph_osd_client,
3424                             osds_timeout_work.work);
3425        unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
3426        struct ceph_osd *osd, *nosd;
3427
3428        dout("%s osdc %p\n", __func__, osdc);
3429        down_write(&osdc->lock);
3430        list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3431                if (time_before(jiffies, osd->lru_ttl))
3432                        break;
3433
3434                WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3435                WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3436                close_osd(osd);
3437        }
3438
3439        up_write(&osdc->lock);
3440        schedule_delayed_work(&osdc->osds_timeout_work,
3441                              round_jiffies_relative(delay));
3442}
3443
3444static int ceph_oloc_decode(void **p, void *end,
3445                            struct ceph_object_locator *oloc)
3446{
3447        u8 struct_v, struct_cv;
3448        u32 len;
3449        void *struct_end;
3450        int ret = 0;
3451
3452        ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3453        struct_v = ceph_decode_8(p);
3454        struct_cv = ceph_decode_8(p);
3455        if (struct_v < 3) {
3456                pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
3457                        struct_v, struct_cv);
3458                goto e_inval;
3459        }
3460        if (struct_cv > 6) {
3461                pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
3462                        struct_v, struct_cv);
3463                goto e_inval;
3464        }
3465        len = ceph_decode_32(p);
3466        ceph_decode_need(p, end, len, e_inval);
3467        struct_end = *p + len;
3468
3469        oloc->pool = ceph_decode_64(p);
3470        *p += 4; /* skip preferred */
3471
3472        len = ceph_decode_32(p);
3473        if (len > 0) {
3474                pr_warn("ceph_object_locator::key is set\n");
3475                goto e_inval;
3476        }
3477
3478        if (struct_v >= 5) {
3479                bool changed = false;
3480
3481                len = ceph_decode_32(p);
3482                if (len > 0) {
3483                        ceph_decode_need(p, end, len, e_inval);
3484                        if (!oloc->pool_ns ||
3485                            ceph_compare_string(oloc->pool_ns, *p, len))
3486                                changed = true;
3487                        *p += len;
3488                } else {
3489                        if (oloc->pool_ns)
3490                                changed = true;
3491                }
3492                if (changed) {
3493                        /* redirect changes namespace */
3494                        pr_warn("ceph_object_locator::nspace is changed\n");
3495                        goto e_inval;
3496                }
3497        }
3498
3499        if (struct_v >= 6) {
3500                s64 hash = ceph_decode_64(p);
3501                if (hash != -1) {
3502                        pr_warn("ceph_object_locator::hash is set\n");
3503                        goto e_inval;
3504                }
3505        }
3506
3507        /* skip the rest */
3508        *p = struct_end;
3509out:
3510        return ret;
3511
3512e_inval:
3513        ret = -EINVAL;
3514        goto out;
3515}
3516
3517static int ceph_redirect_decode(void **p, void *end,
3518                                struct ceph_request_redirect *redir)
3519{
3520        u8 struct_v, struct_cv;
3521        u32 len;
3522        void *struct_end;
3523        int ret;
3524
3525        ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3526        struct_v = ceph_decode_8(p);
3527        struct_cv = ceph_decode_8(p);
3528        if (struct_cv > 1) {
3529                pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
3530                        struct_v, struct_cv);
3531                goto e_inval;
3532        }
3533        len = ceph_decode_32(p);
3534        ceph_decode_need(p, end, len, e_inval);
3535        struct_end = *p + len;
3536
3537        ret = ceph_oloc_decode(p, end, &redir->oloc);
3538        if (ret)
3539                goto out;
3540
3541        len = ceph_decode_32(p);
3542        if (len > 0) {
3543                pr_warn("ceph_request_redirect::object_name is set\n");
3544                goto e_inval;
3545        }
3546
3547        /* skip the rest */
3548        *p = struct_end;
3549out:
3550        return ret;
3551
3552e_inval:
3553        ret = -EINVAL;
3554        goto out;
3555}
3556
3557struct MOSDOpReply {
3558        struct ceph_pg pgid;
3559        u64 flags;
3560        int result;
3561        u32 epoch;
3562        int num_ops;
3563        u32 outdata_len[CEPH_OSD_MAX_OPS];
3564        s32 rval[CEPH_OSD_MAX_OPS];
3565        int retry_attempt;
3566        struct ceph_eversion replay_version;
3567        u64 user_version;
3568        struct ceph_request_redirect redirect;
3569};
3570
3571static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
3572{
3573        void *p = msg->front.iov_base;
3574        void *const end = p + msg->front.iov_len;
3575        u16 version = le16_to_cpu(msg->hdr.version);
3576        struct ceph_eversion bad_replay_version;
3577        u8 decode_redir;
3578        u32 len;
3579        int ret;
3580        int i;
3581
3582        ceph_decode_32_safe(&p, end, len, e_inval);
3583        ceph_decode_need(&p, end, len, e_inval);
3584        p += len; /* skip oid */
3585
3586        ret = ceph_decode_pgid(&p, end, &m->pgid);
3587        if (ret)
3588                return ret;
3589
3590        ceph_decode_64_safe(&p, end, m->flags, e_inval);
3591        ceph_decode_32_safe(&p, end, m->result, e_inval);
3592        ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
3593        memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
3594        p += sizeof(bad_replay_version);
3595        ceph_decode_32_safe(&p, end, m->epoch, e_inval);
3596
3597        ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
3598        if (m->num_ops > ARRAY_SIZE(m->outdata_len))
3599                goto e_inval;
3600
3601        ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
3602                         e_inval);
3603        for (i = 0; i < m->num_ops; i++) {
3604                struct ceph_osd_op *op = p;
3605
3606                m->outdata_len[i] = le32_to_cpu(op->payload_len);
3607                p += sizeof(*op);
3608        }
3609
3610        ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3611        for (i = 0; i < m->num_ops; i++)
3612                ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3613
3614        if (version >= 5) {
3615                ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3616                memcpy(&m->replay_version, p, sizeof(m->replay_version));
3617                p += sizeof(m->replay_version);
3618                ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3619        } else {
3620                m->replay_version = bad_replay_version; /* struct */
3621                m->user_version = le64_to_cpu(m->replay_version.version);
3622        }
3623
3624        if (version >= 6) {
3625                if (version >= 7)
3626                        ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3627                else
3628                        decode_redir = 1;
3629        } else {
3630                decode_redir = 0;
3631        }
3632
3633        if (decode_redir) {
3634                ret = ceph_redirect_decode(&p, end, &m->redirect);
3635                if (ret)
3636                        return ret;
3637        } else {
3638                ceph_oloc_init(&m->redirect.oloc);
3639        }
3640
3641        return 0;
3642
3643e_inval:
3644        return -EINVAL;
3645}
3646
3647/*
3648 * Handle MOSDOpReply.  Set ->r_result and call the callback if it is
3649 * specified.
3650 */
3651static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3652{
3653        struct ceph_osd_client *osdc = osd->o_osdc;
3654        struct ceph_osd_request *req;
3655        struct MOSDOpReply m;
3656        u64 tid = le64_to_cpu(msg->hdr.tid);
3657        u32 data_len = 0;
3658        int ret;
3659        int i;
3660
3661        dout("%s msg %p tid %llu\n", __func__, msg, tid);
3662
3663        down_read(&osdc->lock);
3664        if (!osd_registered(osd)) {
3665                dout("%s osd%d unknown\n", __func__, osd->o_osd);
3666                goto out_unlock_osdc;
3667        }
3668        WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3669
3670        mutex_lock(&osd->lock);
3671        req = lookup_request(&osd->o_requests, tid);
3672        if (!req) {
3673                dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3674                goto out_unlock_session;
3675        }
3676
3677        m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3678        ret = decode_MOSDOpReply(msg, &m);
3679        m.redirect.oloc.pool_ns = NULL;
3680        if (ret) {
3681                pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3682                       req->r_tid, ret);
3683                ceph_msg_dump(msg);
3684                goto fail_request;
3685        }
3686        dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3687             __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3688             m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3689             le64_to_cpu(m.replay_version.version), m.user_version);
3690
3691        if (m.retry_attempt >= 0) {
3692                if (m.retry_attempt != req->r_attempts - 1) {
3693                        dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3694                             req, req->r_tid, m.retry_attempt,
3695                             req->r_attempts - 1);
3696                        goto out_unlock_session;
3697                }
3698        } else {
3699                WARN_ON(1); /* MOSDOpReply v4 is assumed */
3700        }
3701
3702        if (!ceph_oloc_empty(&m.redirect.oloc)) {
3703                dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3704                     m.redirect.oloc.pool);
3705                unlink_request(osd, req);
3706                mutex_unlock(&osd->lock);
3707
3708                /*
3709                 * Not ceph_oloc_copy() - changing pool_ns is not
3710                 * supported.
3711                 */
3712                req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3713                req->r_flags |= CEPH_OSD_FLAG_REDIRECTED |
3714                                CEPH_OSD_FLAG_IGNORE_OVERLAY |
3715                                CEPH_OSD_FLAG_IGNORE_CACHE;
3716                req->r_tid = 0;
3717                __submit_request(req, false);
3718                goto out_unlock_osdc;
3719        }
3720
3721        if (m.result == -EAGAIN) {
3722                dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
3723                unlink_request(osd, req);
3724                mutex_unlock(&osd->lock);
3725
3726                /*
3727                 * The object is missing on the replica or not (yet)
3728                 * readable.  Clear pgid to force a resend to the primary
3729                 * via legacy_change.
3730                 */
3731                req->r_t.pgid.pool = 0;
3732                req->r_t.pgid.seed = 0;
3733                WARN_ON(!req->r_t.used_replica);
3734                req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3735                                  CEPH_OSD_FLAG_LOCALIZE_READS);
3736                req->r_tid = 0;
3737                __submit_request(req, false);
3738                goto out_unlock_osdc;
3739        }
3740
3741        if (m.num_ops != req->r_num_ops) {
3742                pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3743                       req->r_num_ops, req->r_tid);
3744                goto fail_request;
3745        }
3746        for (i = 0; i < req->r_num_ops; i++) {
3747                dout(" req %p tid %llu op %d rval %d len %u\n", req,
3748                     req->r_tid, i, m.rval[i], m.outdata_len[i]);
3749                req->r_ops[i].rval = m.rval[i];
3750                req->r_ops[i].outdata_len = m.outdata_len[i];
3751                data_len += m.outdata_len[i];
3752        }
3753        if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3754                pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3755                       le32_to_cpu(msg->hdr.data_len), req->r_tid);
3756                goto fail_request;
3757        }
3758        dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3759             req, req->r_tid, m.result, data_len);
3760
3761        /*
3762         * Since we only ever request ONDISK, we should only ever get
3763         * one (type of) reply back.
3764         */
3765        WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3766        req->r_result = m.result ?: data_len;
3767        finish_request(req);
3768        mutex_unlock(&osd->lock);
3769        up_read(&osdc->lock);
3770
3771        __complete_request(req);
3772        return;
3773
3774fail_request:
3775        complete_request(req, -EIO);
3776out_unlock_session:
3777        mutex_unlock(&osd->lock);
3778out_unlock_osdc:
3779        up_read(&osdc->lock);
3780}
3781
3782static void set_pool_was_full(struct ceph_osd_client *osdc)
3783{
3784        struct rb_node *n;
3785
3786        for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3787                struct ceph_pg_pool_info *pi =
3788                    rb_entry(n, struct ceph_pg_pool_info, node);
3789
3790                pi->was_full = __pool_full(pi);
3791        }
3792}
3793
3794static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3795{
3796        struct ceph_pg_pool_info *pi;
3797
3798        pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3799        if (!pi)
3800                return false;
3801
3802        return pi->was_full && !__pool_full(pi);
3803}
3804
3805static enum calc_target_result
3806recalc_linger_target(struct ceph_osd_linger_request *lreq)
3807{
3808        struct ceph_osd_client *osdc = lreq->osdc;
3809        enum calc_target_result ct_res;
3810
3811        ct_res = calc_target(osdc, &lreq->t, true);
3812        if (ct_res == CALC_TARGET_NEED_RESEND) {
3813                struct ceph_osd *osd;
3814
3815                osd = lookup_create_osd(osdc, lreq->t.osd, true);
3816                if (osd != lreq->osd) {
3817                        unlink_linger(lreq->osd, lreq);
3818                        link_linger(osd, lreq);
3819                }
3820        }
3821
3822        return ct_res;
3823}
3824
3825/*
3826 * Requeue requests whose mapping to an OSD has changed.
3827 */
3828static void scan_requests(struct ceph_osd *osd,
3829                          bool force_resend,
3830                          bool cleared_full,
3831                          bool check_pool_cleared_full,
3832                          struct rb_root *need_resend,
3833                          struct list_head *need_resend_linger)
3834{
3835        struct ceph_osd_client *osdc = osd->o_osdc;
3836        struct rb_node *n;
3837        bool force_resend_writes;
3838
3839        for (n = rb_first(&osd->o_linger_requests); n; ) {
3840                struct ceph_osd_linger_request *lreq =
3841                    rb_entry(n, struct ceph_osd_linger_request, node);
3842                enum calc_target_result ct_res;
3843
3844                n = rb_next(n); /* recalc_linger_target() */
3845
3846                dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3847                     lreq->linger_id);
3848                ct_res = recalc_linger_target(lreq);
3849                switch (ct_res) {
3850                case CALC_TARGET_NO_ACTION:
3851                        force_resend_writes = cleared_full ||
3852                            (check_pool_cleared_full &&
3853                             pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3854                        if (!force_resend && !force_resend_writes)
3855                                break;
3856
3857                        fallthrough;
3858                case CALC_TARGET_NEED_RESEND:
3859                        cancel_linger_map_check(lreq);
3860                        /*
3861                         * scan_requests() for the previous epoch(s)
3862                         * may have already added it to the list, since
3863                         * it's not unlinked here.
3864                         */
3865                        if (list_empty(&lreq->scan_item))
3866                                list_add_tail(&lreq->scan_item, need_resend_linger);
3867                        break;
3868                case CALC_TARGET_POOL_DNE:
3869                        list_del_init(&lreq->scan_item);
3870                        check_linger_pool_dne(lreq);
3871                        break;
3872                }
3873        }
3874
3875        for (n = rb_first(&osd->o_requests); n; ) {
3876                struct ceph_osd_request *req =
3877                    rb_entry(n, struct ceph_osd_request, r_node);
3878                enum calc_target_result ct_res;
3879
3880                n = rb_next(n); /* unlink_request(), check_pool_dne() */
3881
3882                dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3883                ct_res = calc_target(osdc, &req->r_t, false);
3884                switch (ct_res) {
3885                case CALC_TARGET_NO_ACTION:
3886                        force_resend_writes = cleared_full ||
3887                            (check_pool_cleared_full &&
3888                             pool_cleared_full(osdc, req->r_t.base_oloc.pool));
3889                        if (!force_resend &&
3890                            (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
3891                             !force_resend_writes))
3892                                break;
3893
3894                        fallthrough;
3895                case CALC_TARGET_NEED_RESEND:
3896                        cancel_map_check(req);
3897                        unlink_request(osd, req);
3898                        insert_request(need_resend, req);
3899                        break;
3900                case CALC_TARGET_POOL_DNE:
3901                        check_pool_dne(req);
3902                        break;
3903                }
3904        }
3905}
3906
3907static int handle_one_map(struct ceph_osd_client *osdc,
3908                          void *p, void *end, bool incremental,
3909                          struct rb_root *need_resend,
3910                          struct list_head *need_resend_linger)
3911{
3912        struct ceph_osdmap *newmap;
3913        struct rb_node *n;
3914        bool skipped_map = false;
3915        bool was_full;
3916
3917        was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3918        set_pool_was_full(osdc);
3919
3920        if (incremental)
3921                newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
3922        else
3923                newmap = ceph_osdmap_decode(&p, end);
3924        if (IS_ERR(newmap))
3925                return PTR_ERR(newmap);
3926
3927        if (newmap != osdc->osdmap) {
3928                /*
3929                 * Preserve ->was_full before destroying the old map.
3930                 * For pools that weren't in the old map, ->was_full
3931                 * should be false.
3932                 */
3933                for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
3934                        struct ceph_pg_pool_info *pi =
3935                            rb_entry(n, struct ceph_pg_pool_info, node);
3936                        struct ceph_pg_pool_info *old_pi;
3937
3938                        old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
3939                        if (old_pi)
3940                                pi->was_full = old_pi->was_full;
3941                        else
3942                                WARN_ON(pi->was_full);
3943                }
3944
3945                if (osdc->osdmap->epoch &&
3946                    osdc->osdmap->epoch + 1 < newmap->epoch) {
3947                        WARN_ON(incremental);
3948                        skipped_map = true;
3949                }
3950
3951                ceph_osdmap_destroy(osdc->osdmap);
3952                osdc->osdmap = newmap;
3953        }
3954
3955        was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3956        scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
3957                      need_resend, need_resend_linger);
3958
3959        for (n = rb_first(&osdc->osds); n; ) {
3960                struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3961
3962                n = rb_next(n); /* close_osd() */
3963
3964                scan_requests(osd, skipped_map, was_full, true, need_resend,
3965                              need_resend_linger);
3966                if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
3967                    memcmp(&osd->o_con.peer_addr,
3968                           ceph_osd_addr(osdc->osdmap, osd->o_osd),
3969                           sizeof(struct ceph_entity_addr)))
3970                        close_osd(osd);
3971        }
3972
3973        return 0;
3974}
3975
3976static void kick_requests(struct ceph_osd_client *osdc,
3977                          struct rb_root *need_resend,
3978                          struct list_head *need_resend_linger)
3979{
3980        struct ceph_osd_linger_request *lreq, *nlreq;
3981        enum calc_target_result ct_res;
3982        struct rb_node *n;
3983
3984        /* make sure need_resend targets reflect latest map */
3985        for (n = rb_first(need_resend); n; ) {
3986                struct ceph_osd_request *req =
3987                    rb_entry(n, struct ceph_osd_request, r_node);
3988
3989                n = rb_next(n);
3990
3991                if (req->r_t.epoch < osdc->osdmap->epoch) {
3992                        ct_res = calc_target(osdc, &req->r_t, false);
3993                        if (ct_res == CALC_TARGET_POOL_DNE) {
3994                                erase_request(need_resend, req);
3995                                check_pool_dne(req);
3996                        }
3997                }
3998        }
3999
4000        for (n = rb_first(need_resend); n; ) {
4001                struct ceph_osd_request *req =
4002                    rb_entry(n, struct ceph_osd_request, r_node);
4003                struct ceph_osd *osd;
4004
4005                n = rb_next(n);
4006                erase_request(need_resend, req); /* before link_request() */
4007
4008                osd = lookup_create_osd(osdc, req->r_t.osd, true);
4009                link_request(osd, req);
4010                if (!req->r_linger) {
4011                        if (!osd_homeless(osd) && !req->r_t.paused)
4012                                send_request(req);
4013                } else {
4014                        cancel_linger_request(req);
4015                }
4016        }
4017
4018        list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
4019                if (!osd_homeless(lreq->osd))
4020                        send_linger(lreq);
4021
4022                list_del_init(&lreq->scan_item);
4023        }
4024}
4025
4026/*
4027 * Process updated osd map.
4028 *
4029 * The message contains any number of incremental and full maps, normally
4030 * indicating some sort of topology change in the cluster.  Kick requests
4031 * off to different OSDs as needed.
4032 */
4033void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
4034{
4035        void *p = msg->front.iov_base;
4036        void *const end = p + msg->front.iov_len;
4037        u32 nr_maps, maplen;
4038        u32 epoch;
4039        struct ceph_fsid fsid;
4040        struct rb_root need_resend = RB_ROOT;
4041        LIST_HEAD(need_resend_linger);
4042        bool handled_incremental = false;
4043        bool was_pauserd, was_pausewr;
4044        bool pauserd, pausewr;
4045        int err;
4046
4047        dout("%s have %u\n", __func__, osdc->osdmap->epoch);
4048        down_write(&osdc->lock);
4049
4050        /* verify fsid */
4051        ceph_decode_need(&p, end, sizeof(fsid), bad);
4052        ceph_decode_copy(&p, &fsid, sizeof(fsid));
4053        if (ceph_check_fsid(osdc->client, &fsid) < 0)
4054                goto bad;
4055
4056        was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4057        was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4058                      ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4059                      have_pool_full(osdc);
4060
4061        /* incremental maps */
4062        ceph_decode_32_safe(&p, end, nr_maps, bad);
4063        dout(" %d inc maps\n", nr_maps);
4064        while (nr_maps > 0) {
4065                ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4066                epoch = ceph_decode_32(&p);
4067                maplen = ceph_decode_32(&p);
4068                ceph_decode_need(&p, end, maplen, bad);
4069                if (osdc->osdmap->epoch &&
4070                    osdc->osdmap->epoch + 1 == epoch) {
4071                        dout("applying incremental map %u len %d\n",
4072                             epoch, maplen);
4073                        err = handle_one_map(osdc, p, p + maplen, true,
4074                                             &need_resend, &need_resend_linger);
4075                        if (err)
4076                                goto bad;
4077                        handled_incremental = true;
4078                } else {
4079                        dout("ignoring incremental map %u len %d\n",
4080                             epoch, maplen);
4081                }
4082                p += maplen;
4083                nr_maps--;
4084        }
4085        if (handled_incremental)
4086                goto done;
4087
4088        /* full maps */
4089        ceph_decode_32_safe(&p, end, nr_maps, bad);
4090        dout(" %d full maps\n", nr_maps);
4091        while (nr_maps) {
4092                ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4093                epoch = ceph_decode_32(&p);
4094                maplen = ceph_decode_32(&p);
4095                ceph_decode_need(&p, end, maplen, bad);
4096                if (nr_maps > 1) {
4097                        dout("skipping non-latest full map %u len %d\n",
4098                             epoch, maplen);
4099                } else if (osdc->osdmap->epoch >= epoch) {
4100                        dout("skipping full map %u len %d, "
4101                             "older than our %u\n", epoch, maplen,
4102                             osdc->osdmap->epoch);
4103                } else {
4104                        dout("taking full map %u len %d\n", epoch, maplen);
4105                        err = handle_one_map(osdc, p, p + maplen, false,
4106                                             &need_resend, &need_resend_linger);
4107                        if (err)
4108                                goto bad;
4109                }
4110                p += maplen;
4111                nr_maps--;
4112        }
4113
4114done:
4115        /*
4116         * subscribe to subsequent osdmap updates if full to ensure
4117         * we find out when we are no longer full and stop returning
4118         * ENOSPC.
4119         */
4120        pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4121        pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4122                  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4123                  have_pool_full(osdc);
4124        if (was_pauserd || was_pausewr || pauserd || pausewr ||
4125            osdc->osdmap->epoch < osdc->epoch_barrier)
4126                maybe_request_map(osdc);
4127
4128        kick_requests(osdc, &need_resend, &need_resend_linger);
4129
4130        ceph_osdc_abort_on_full(osdc);
4131        ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
4132                          osdc->osdmap->epoch);
4133        up_write(&osdc->lock);
4134        wake_up_all(&osdc->client->auth_wq);
4135        return;
4136
4137bad:
4138        pr_err("osdc handle_map corrupt msg\n");
4139        ceph_msg_dump(msg);
4140        up_write(&osdc->lock);
4141}
4142
4143/*
4144 * Resubmit requests pending on the given osd.
4145 */
4146static void kick_osd_requests(struct ceph_osd *osd)
4147{
4148        struct rb_node *n;
4149
4150        clear_backoffs(osd);
4151
4152        for (n = rb_first(&osd->o_requests); n; ) {
4153                struct ceph_osd_request *req =
4154                    rb_entry(n, struct ceph_osd_request, r_node);
4155
4156                n = rb_next(n); /* cancel_linger_request() */
4157
4158                if (!req->r_linger) {
4159                        if (!req->r_t.paused)
4160                                send_request(req);
4161                } else {
4162                        cancel_linger_request(req);
4163                }
4164        }
4165        for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
4166                struct ceph_osd_linger_request *lreq =
4167                    rb_entry(n, struct ceph_osd_linger_request, node);
4168
4169                send_linger(lreq);
4170        }
4171}
4172
4173/*
4174 * If the osd connection drops, we need to resubmit all requests.
4175 */
4176static void osd_fault(struct ceph_connection *con)
4177{
4178        struct ceph_osd *osd = con->private;
4179        struct ceph_osd_client *osdc = osd->o_osdc;
4180
4181        dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
4182
4183        down_write(&osdc->lock);
4184        if (!osd_registered(osd)) {
4185                dout("%s osd%d unknown\n", __func__, osd->o_osd);
4186                goto out_unlock;
4187        }
4188
4189        if (!reopen_osd(osd))
4190                kick_osd_requests(osd);
4191        maybe_request_map(osdc);
4192
4193out_unlock:
4194        up_write(&osdc->lock);
4195}
4196
4197struct MOSDBackoff {
4198        struct ceph_spg spgid;
4199        u32 map_epoch;
4200        u8 op;
4201        u64 id;
4202        struct ceph_hobject_id *begin;
4203        struct ceph_hobject_id *end;
4204};
4205
4206static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
4207{
4208        void *p = msg->front.iov_base;
4209        void *const end = p + msg->front.iov_len;
4210        u8 struct_v;
4211        u32 struct_len;
4212        int ret;
4213
4214        ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
4215        if (ret)
4216                return ret;
4217
4218        ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
4219        if (ret)
4220                return ret;
4221
4222        ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
4223        ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
4224        ceph_decode_8_safe(&p, end, m->op, e_inval);
4225        ceph_decode_64_safe(&p, end, m->id, e_inval);
4226
4227        m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
4228        if (!m->begin)
4229                return -ENOMEM;
4230
4231        ret = decode_hoid(&p, end, m->begin);
4232        if (ret) {
4233                free_hoid(m->begin);
4234                return ret;
4235        }
4236
4237        m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
4238        if (!m->end) {
4239                free_hoid(m->begin);
4240                return -ENOMEM;
4241        }
4242
4243        ret = decode_hoid(&p, end, m->end);
4244        if (ret) {
4245                free_hoid(m->begin);
4246                free_hoid(m->end);
4247                return ret;
4248        }
4249
4250        return 0;
4251
4252e_inval:
4253        return -EINVAL;
4254}
4255
4256static struct ceph_msg *create_backoff_message(
4257                                const struct ceph_osd_backoff *backoff,
4258                                u32 map_epoch)
4259{
4260        struct ceph_msg *msg;
4261        void *p, *end;
4262        int msg_size;
4263
4264        msg_size = CEPH_ENCODING_START_BLK_LEN +
4265                        CEPH_PGID_ENCODING_LEN + 1; /* spgid */
4266        msg_size += 4 + 1 + 8; /* map_epoch, op, id */
4267        msg_size += CEPH_ENCODING_START_BLK_LEN +
4268                        hoid_encoding_size(backoff->begin);
4269        msg_size += CEPH_ENCODING_START_BLK_LEN +
4270                        hoid_encoding_size(backoff->end);
4271
4272        msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
4273        if (!msg)
4274                return NULL;
4275
4276        p = msg->front.iov_base;
4277        end = p + msg->front_alloc_len;
4278
4279        encode_spgid(&p, &backoff->spgid);
4280        ceph_encode_32(&p, map_epoch);
4281        ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
4282        ceph_encode_64(&p, backoff->id);
4283        encode_hoid(&p, end, backoff->begin);
4284        encode_hoid(&p, end, backoff->end);
4285        BUG_ON(p != end);
4286
4287        msg->front.iov_len = p - msg->front.iov_base;
4288        msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4289        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4290
4291        return msg;
4292}
4293
4294static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4295{
4296        struct ceph_spg_mapping *spg;
4297        struct ceph_osd_backoff *backoff;
4298        struct ceph_msg *msg;
4299
4300        dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4301             m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4302
4303        spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4304        if (!spg) {
4305                spg = alloc_spg_mapping();
4306                if (!spg) {
4307                        pr_err("%s failed to allocate spg\n", __func__);
4308                        return;
4309                }
4310                spg->spgid = m->spgid; /* struct */
4311                insert_spg_mapping(&osd->o_backoff_mappings, spg);
4312        }
4313
4314        backoff = alloc_backoff();
4315        if (!backoff) {
4316                pr_err("%s failed to allocate backoff\n", __func__);
4317                return;
4318        }
4319        backoff->spgid = m->spgid; /* struct */
4320        backoff->id = m->id;
4321        backoff->begin = m->begin;
4322        m->begin = NULL; /* backoff now owns this */
4323        backoff->end = m->end;
4324        m->end = NULL;   /* ditto */
4325
4326        insert_backoff(&spg->backoffs, backoff);
4327        insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4328
4329        /*
4330         * Ack with original backoff's epoch so that the OSD can
4331         * discard this if there was a PG split.
4332         */
4333        msg = create_backoff_message(backoff, m->map_epoch);
4334        if (!msg) {
4335                pr_err("%s failed to allocate msg\n", __func__);
4336                return;
4337        }
4338        ceph_con_send(&osd->o_con, msg);
4339}
4340
4341static bool target_contained_by(const struct ceph_osd_request_target *t,
4342                                const struct ceph_hobject_id *begin,
4343                                const struct ceph_hobject_id *end)
4344{
4345        struct ceph_hobject_id hoid;
4346        int cmp;
4347
4348        hoid_fill_from_target(&hoid, t);
4349        cmp = hoid_compare(&hoid, begin);
4350        return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4351}
4352
4353static void handle_backoff_unblock(struct ceph_osd *osd,
4354                                   const struct MOSDBackoff *m)
4355{
4356        struct ceph_spg_mapping *spg;
4357        struct ceph_osd_backoff *backoff;
4358        struct rb_node *n;
4359
4360        dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4361             m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4362
4363        backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4364        if (!backoff) {
4365                pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4366                       __func__, osd->o_osd, m->spgid.pgid.pool,
4367                       m->spgid.pgid.seed, m->spgid.shard, m->id);
4368                return;
4369        }
4370
4371        if (hoid_compare(backoff->begin, m->begin) &&
4372            hoid_compare(backoff->end, m->end)) {
4373                pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4374                       __func__, osd->o_osd, m->spgid.pgid.pool,
4375                       m->spgid.pgid.seed, m->spgid.shard, m->id);
4376                /* unblock it anyway... */
4377        }
4378
4379        spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4380        BUG_ON(!spg);
4381
4382        erase_backoff(&spg->backoffs, backoff);
4383        erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4384        free_backoff(backoff);
4385
4386        if (RB_EMPTY_ROOT(&spg->backoffs)) {
4387                erase_spg_mapping(&osd->o_backoff_mappings, spg);
4388                free_spg_mapping(spg);
4389        }
4390
4391        for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4392                struct ceph_osd_request *req =
4393                    rb_entry(n, struct ceph_osd_request, r_node);
4394
4395                if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4396                        /*
4397                         * Match against @m, not @backoff -- the PG may
4398                         * have split on the OSD.
4399                         */
4400                        if (target_contained_by(&req->r_t, m->begin, m->end)) {
4401                                /*
4402                                 * If no other installed backoff applies,
4403                                 * resend.
4404                                 */
4405                                send_request(req);
4406                        }
4407                }
4408        }
4409}
4410
4411static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4412{
4413        struct ceph_osd_client *osdc = osd->o_osdc;
4414        struct MOSDBackoff m;
4415        int ret;
4416
4417        down_read(&osdc->lock);
4418        if (!osd_registered(osd)) {
4419                dout("%s osd%d unknown\n", __func__, osd->o_osd);
4420                up_read(&osdc->lock);
4421                return;
4422        }
4423        WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4424
4425        mutex_lock(&osd->lock);
4426        ret = decode_MOSDBackoff(msg, &m);
4427        if (ret) {
4428                pr_err("failed to decode MOSDBackoff: %d\n", ret);
4429                ceph_msg_dump(msg);
4430                goto out_unlock;
4431        }
4432
4433        switch (m.op) {
4434        case CEPH_OSD_BACKOFF_OP_BLOCK:
4435                handle_backoff_block(osd, &m);
4436                break;
4437        case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4438                handle_backoff_unblock(osd, &m);
4439                break;
4440        default:
4441                pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4442        }
4443
4444        free_hoid(m.begin);
4445        free_hoid(m.end);
4446
4447out_unlock:
4448        mutex_unlock(&osd->lock);
4449        up_read(&osdc->lock);
4450}
4451
4452/*
4453 * Process osd watch notifications
4454 */
4455static void handle_watch_notify(struct ceph_osd_client *osdc,
4456                                struct ceph_msg *msg)
4457{
4458        void *p = msg->front.iov_base;
4459        void *const end = p + msg->front.iov_len;
4460        struct ceph_osd_linger_request *lreq;
4461        struct linger_work *lwork;
4462        u8 proto_ver, opcode;
4463        u64 cookie, notify_id;
4464        u64 notifier_id = 0;
4465        s32 return_code = 0;
4466        void *payload = NULL;
4467        u32 payload_len = 0;
4468
4469        ceph_decode_8_safe(&p, end, proto_ver, bad);
4470        ceph_decode_8_safe(&p, end, opcode, bad);
4471        ceph_decode_64_safe(&p, end, cookie, bad);
4472        p += 8; /* skip ver */
4473        ceph_decode_64_safe(&p, end, notify_id, bad);
4474
4475        if (proto_ver >= 1) {
4476                ceph_decode_32_safe(&p, end, payload_len, bad);
4477                ceph_decode_need(&p, end, payload_len, bad);
4478                payload = p;
4479                p += payload_len;
4480        }
4481
4482        if (le16_to_cpu(msg->hdr.version) >= 2)
4483                ceph_decode_32_safe(&p, end, return_code, bad);
4484
4485        if (le16_to_cpu(msg->hdr.version) >= 3)
4486                ceph_decode_64_safe(&p, end, notifier_id, bad);
4487
4488        down_read(&osdc->lock);
4489        lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
4490        if (!lreq) {
4491                dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
4492                     cookie);
4493                goto out_unlock_osdc;
4494        }
4495
4496        mutex_lock(&lreq->lock);
4497        dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
4498             opcode, cookie, lreq, lreq->is_watch);
4499        if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
4500                if (!lreq->last_error) {
4501                        lreq->last_error = -ENOTCONN;
4502                        queue_watch_error(lreq);
4503                }
4504        } else if (!lreq->is_watch) {
4505                /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
4506                if (lreq->notify_id && lreq->notify_id != notify_id) {
4507                        dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
4508                             lreq->notify_id, notify_id);
4509                } else if (!completion_done(&lreq->notify_finish_wait)) {
4510                        struct ceph_msg_data *data =
4511                            msg->num_data_items ? &msg->data[0] : NULL;
4512
4513                        if (data) {
4514                                if (lreq->preply_pages) {
4515                                        WARN_ON(data->type !=
4516                                                        CEPH_MSG_DATA_PAGES);
4517                                        *lreq->preply_pages = data->pages;
4518                                        *lreq->preply_len = data->length;
4519                                        data->own_pages = false;
4520                                }
4521                        }
4522                        lreq->notify_finish_error = return_code;
4523                        complete_all(&lreq->notify_finish_wait);
4524                }
4525        } else {
4526                /* CEPH_WATCH_EVENT_NOTIFY */
4527                lwork = lwork_alloc(lreq, do_watch_notify);
4528                if (!lwork) {
4529                        pr_err("failed to allocate notify-lwork\n");
4530                        goto out_unlock_lreq;
4531                }
4532
4533                lwork->notify.notify_id = notify_id;
4534                lwork->notify.notifier_id = notifier_id;
4535                lwork->notify.payload = payload;
4536                lwork->notify.payload_len = payload_len;
4537                lwork->notify.msg = ceph_msg_get(msg);
4538                lwork_queue(lwork);
4539        }
4540
4541out_unlock_lreq:
4542        mutex_unlock(&lreq->lock);
4543out_unlock_osdc:
4544        up_read(&osdc->lock);
4545        return;
4546
4547bad:
4548        pr_err("osdc handle_watch_notify corrupt msg\n");
4549}
4550
4551/*
4552 * Register request, send initial attempt.
4553 */
4554int ceph_osdc_start_request(struct ceph_osd_client *osdc,
4555                            struct ceph_osd_request *req,
4556                            bool nofail)
4557{
4558        down_read(&osdc->lock);
4559        submit_request(req, false);
4560        up_read(&osdc->lock);
4561
4562        return 0;
4563}
4564EXPORT_SYMBOL(ceph_osdc_start_request);
4565
4566/*
4567 * Unregister a registered request.  The request is not completed:
4568 * ->r_result isn't set and __complete_request() isn't called.
4569 */
4570void ceph_osdc_cancel_request(struct ceph_osd_request *req)
4571{
4572        struct ceph_osd_client *osdc = req->r_osdc;
4573
4574        down_write(&osdc->lock);
4575        if (req->r_osd)
4576                cancel_request(req);
4577        up_write(&osdc->lock);
4578}
4579EXPORT_SYMBOL(ceph_osdc_cancel_request);
4580
4581/*
4582 * @timeout: in jiffies, 0 means "wait forever"
4583 */
4584static int wait_request_timeout(struct ceph_osd_request *req,
4585                                unsigned long timeout)
4586{
4587        long left;
4588
4589        dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4590        left = wait_for_completion_killable_timeout(&req->r_completion,
4591                                                ceph_timeout_jiffies(timeout));
4592        if (left <= 0) {
4593                left = left ?: -ETIMEDOUT;
4594                ceph_osdc_cancel_request(req);
4595        } else {
4596                left = req->r_result; /* completed */
4597        }
4598
4599        return left;
4600}
4601
4602/*
4603 * wait for a request to complete
4604 */
4605int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
4606                           struct ceph_osd_request *req)
4607{
4608        return wait_request_timeout(req, 0);
4609}
4610EXPORT_SYMBOL(ceph_osdc_wait_request);
4611
4612/*
4613 * sync - wait for all in-flight requests to flush.  avoid starvation.
4614 */
4615void ceph_osdc_sync(struct ceph_osd_client *osdc)
4616{
4617        struct rb_node *n, *p;
4618        u64 last_tid = atomic64_read(&osdc->last_tid);
4619
4620again:
4621        down_read(&osdc->lock);
4622        for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
4623                struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4624
4625                mutex_lock(&osd->lock);
4626                for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4627                        struct ceph_osd_request *req =
4628                            rb_entry(p, struct ceph_osd_request, r_node);
4629
4630                        if (req->r_tid > last_tid)
4631                                break;
4632
4633                        if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
4634                                continue;
4635
4636                        ceph_osdc_get_request(req);
4637                        mutex_unlock(&osd->lock);
4638                        up_read(&osdc->lock);
4639                        dout("%s waiting on req %p tid %llu last_tid %llu\n",
4640                             __func__, req, req->r_tid, last_tid);
4641                        wait_for_completion(&req->r_completion);
4642                        ceph_osdc_put_request(req);
4643                        goto again;
4644                }
4645
4646                mutex_unlock(&osd->lock);
4647        }
4648
4649        up_read(&osdc->lock);
4650        dout("%s done last_tid %llu\n", __func__, last_tid);
4651}
4652EXPORT_SYMBOL(ceph_osdc_sync);
4653
4654static struct ceph_osd_request *
4655alloc_linger_request(struct ceph_osd_linger_request *lreq)
4656{
4657        struct ceph_osd_request *req;
4658
4659        req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
4660        if (!req)
4661                return NULL;
4662
4663        ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4664        ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4665        return req;
4666}
4667
4668static struct ceph_osd_request *
4669alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
4670{
4671        struct ceph_osd_request *req;
4672
4673        req = alloc_linger_request(lreq);
4674        if (!req)
4675                return NULL;
4676
4677        /*
4678         * Pass 0 for cookie because we don't know it yet, it will be
4679         * filled in by linger_submit().
4680         */
4681        osd_req_op_watch_init(req, 0, 0, watch_opcode);
4682
4683        if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
4684                ceph_osdc_put_request(req);
4685                return NULL;
4686        }
4687
4688        return req;
4689}
4690
4691/*
4692 * Returns a handle, caller owns a ref.
4693 */
4694struct ceph_osd_linger_request *
4695ceph_osdc_watch(struct ceph_osd_client *osdc,
4696                struct ceph_object_id *oid,
4697                struct ceph_object_locator *oloc,
4698                rados_watchcb2_t wcb,
4699                rados_watcherrcb_t errcb,
4700                void *data)
4701{
4702        struct ceph_osd_linger_request *lreq;
4703        int ret;
4704
4705        lreq = linger_alloc(osdc);
4706        if (!lreq)
4707                return ERR_PTR(-ENOMEM);
4708
4709        lreq->is_watch = true;
4710        lreq->wcb = wcb;
4711        lreq->errcb = errcb;
4712        lreq->data = data;
4713        lreq->watch_valid_thru = jiffies;
4714
4715        ceph_oid_copy(&lreq->t.base_oid, oid);
4716        ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4717        lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4718        ktime_get_real_ts64(&lreq->mtime);
4719
4720        lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH);
4721        if (!lreq->reg_req) {
4722                ret = -ENOMEM;
4723                goto err_put_lreq;
4724        }
4725
4726        lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING);
4727        if (!lreq->ping_req) {
4728                ret = -ENOMEM;
4729                goto err_put_lreq;
4730        }
4731
4732        linger_submit(lreq);
4733        ret = linger_reg_commit_wait(lreq);
4734        if (ret) {
4735                linger_cancel(lreq);
4736                goto err_put_lreq;
4737        }
4738
4739        return lreq;
4740
4741err_put_lreq:
4742        linger_put(lreq);
4743        return ERR_PTR(ret);
4744}
4745EXPORT_SYMBOL(ceph_osdc_watch);
4746
4747/*
4748 * Releases a ref.
4749 *
4750 * Times out after mount_timeout to preserve rbd unmap behaviour
4751 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
4752 * with mount_timeout").
4753 */
4754int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
4755                      struct ceph_osd_linger_request *lreq)
4756{
4757        struct ceph_options *opts = osdc->client->options;
4758        struct ceph_osd_request *req;
4759        int ret;
4760
4761        req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4762        if (!req)
4763                return -ENOMEM;
4764
4765        ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4766        ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4767        req->r_flags = CEPH_OSD_FLAG_WRITE;
4768        ktime_get_real_ts64(&req->r_mtime);
4769        osd_req_op_watch_init(req, 0, lreq->linger_id,
4770                              CEPH_OSD_WATCH_OP_UNWATCH);
4771
4772        ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4773        if (ret)
4774                goto out_put_req;
4775
4776        ceph_osdc_start_request(osdc, req, false);
4777        linger_cancel(lreq);
4778        linger_put(lreq);
4779        ret = wait_request_timeout(req, opts->mount_timeout);
4780
4781out_put_req:
4782        ceph_osdc_put_request(req);
4783        return ret;
4784}
4785EXPORT_SYMBOL(ceph_osdc_unwatch);
4786
4787static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4788                                      u64 notify_id, u64 cookie, void *payload,
4789                                      u32 payload_len)
4790{
4791        struct ceph_osd_req_op *op;
4792        struct ceph_pagelist *pl;
4793        int ret;
4794
4795        op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4796
4797        pl = ceph_pagelist_alloc(GFP_NOIO);
4798        if (!pl)
4799                return -ENOMEM;
4800
4801        ret = ceph_pagelist_encode_64(pl, notify_id);
4802        ret |= ceph_pagelist_encode_64(pl, cookie);
4803        if (payload) {
4804                ret |= ceph_pagelist_encode_32(pl, payload_len);
4805                ret |= ceph_pagelist_append(pl, payload, payload_len);
4806        } else {
4807                ret |= ceph_pagelist_encode_32(pl, 0);
4808        }
4809        if (ret) {
4810                ceph_pagelist_release(pl);
4811                return -ENOMEM;
4812        }
4813
4814        ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
4815        op->indata_len = pl->length;
4816        return 0;
4817}
4818
4819int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4820                         struct ceph_object_id *oid,
4821                         struct ceph_object_locator *oloc,
4822                         u64 notify_id,
4823                         u64 cookie,
4824                         void *payload,
4825                         u32 payload_len)
4826{
4827        struct ceph_osd_request *req;
4828        int ret;
4829
4830        req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4831        if (!req)
4832                return -ENOMEM;
4833
4834        ceph_oid_copy(&req->r_base_oid, oid);
4835        ceph_oloc_copy(&req->r_base_oloc, oloc);
4836        req->r_flags = CEPH_OSD_FLAG_READ;
4837
4838        ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4839                                         payload_len);
4840        if (ret)
4841                goto out_put_req;
4842
4843        ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4844        if (ret)
4845                goto out_put_req;
4846
4847        ceph_osdc_start_request(osdc, req, false);
4848        ret = ceph_osdc_wait_request(osdc, req);
4849
4850out_put_req:
4851        ceph_osdc_put_request(req);
4852        return ret;
4853}
4854EXPORT_SYMBOL(ceph_osdc_notify_ack);
4855
4856static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
4857                                  u64 cookie, u32 prot_ver, u32 timeout,
4858                                  void *payload, u32 payload_len)
4859{
4860        struct ceph_osd_req_op *op;
4861        struct ceph_pagelist *pl;
4862        int ret;
4863
4864        op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
4865        op->notify.cookie = cookie;
4866
4867        pl = ceph_pagelist_alloc(GFP_NOIO);
4868        if (!pl)
4869                return -ENOMEM;
4870
4871        ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
4872        ret |= ceph_pagelist_encode_32(pl, timeout);
4873        ret |= ceph_pagelist_encode_32(pl, payload_len);
4874        ret |= ceph_pagelist_append(pl, payload, payload_len);
4875        if (ret) {
4876                ceph_pagelist_release(pl);
4877                return -ENOMEM;
4878        }
4879
4880        ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
4881        op->indata_len = pl->length;
4882        return 0;
4883}
4884
4885/*
4886 * @timeout: in seconds
4887 *
4888 * @preply_{pages,len} are initialized both on success and error.
4889 * The caller is responsible for:
4890 *
4891 *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
4892 */
4893int ceph_osdc_notify(struct ceph_osd_client *osdc,
4894                     struct ceph_object_id *oid,
4895                     struct ceph_object_locator *oloc,
4896                     void *payload,
4897                     u32 payload_len,
4898                     u32 timeout,
4899                     struct page ***preply_pages,
4900                     size_t *preply_len)
4901{
4902        struct ceph_osd_linger_request *lreq;
4903        struct page **pages;
4904        int ret;
4905
4906        WARN_ON(!timeout);
4907        if (preply_pages) {
4908                *preply_pages = NULL;
4909                *preply_len = 0;
4910        }
4911
4912        lreq = linger_alloc(osdc);
4913        if (!lreq)
4914                return -ENOMEM;
4915
4916        lreq->preply_pages = preply_pages;
4917        lreq->preply_len = preply_len;
4918
4919        ceph_oid_copy(&lreq->t.base_oid, oid);
4920        ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4921        lreq->t.flags = CEPH_OSD_FLAG_READ;
4922
4923        lreq->reg_req = alloc_linger_request(lreq);
4924        if (!lreq->reg_req) {
4925                ret = -ENOMEM;
4926                goto out_put_lreq;
4927        }
4928
4929        /*
4930         * Pass 0 for cookie because we don't know it yet, it will be
4931         * filled in by linger_submit().
4932         */
4933        ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout,
4934                                     payload, payload_len);
4935        if (ret)
4936                goto out_put_lreq;
4937
4938        /* for notify_id */
4939        pages = ceph_alloc_page_vector(1, GFP_NOIO);
4940        if (IS_ERR(pages)) {
4941                ret = PTR_ERR(pages);
4942                goto out_put_lreq;
4943        }
4944        ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
4945                                                 response_data),
4946                                 pages, PAGE_SIZE, 0, false, true);
4947
4948        ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
4949        if (ret)
4950                goto out_put_lreq;
4951
4952        linger_submit(lreq);
4953        ret = linger_reg_commit_wait(lreq);
4954        if (!ret)
4955                ret = linger_notify_finish_wait(lreq);
4956        else
4957                dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4958
4959        linger_cancel(lreq);
4960out_put_lreq:
4961        linger_put(lreq);
4962        return ret;
4963}
4964EXPORT_SYMBOL(ceph_osdc_notify);
4965
4966/*
4967 * Return the number of milliseconds since the watch was last
4968 * confirmed, or an error.  If there is an error, the watch is no
4969 * longer valid, and should be destroyed with ceph_osdc_unwatch().
4970 */
4971int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
4972                          struct ceph_osd_linger_request *lreq)
4973{
4974        unsigned long stamp, age;
4975        int ret;
4976
4977        down_read(&osdc->lock);
4978        mutex_lock(&lreq->lock);
4979        stamp = lreq->watch_valid_thru;
4980        if (!list_empty(&lreq->pending_lworks)) {
4981                struct linger_work *lwork =
4982                    list_first_entry(&lreq->pending_lworks,
4983                                     struct linger_work,
4984                                     pending_item);
4985
4986                if (time_before(lwork->queued_stamp, stamp))
4987                        stamp = lwork->queued_stamp;
4988        }
4989        age = jiffies - stamp;
4990        dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
4991             lreq, lreq->linger_id, age, lreq->last_error);
4992        /* we are truncating to msecs, so return a safe upper bound */
4993        ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
4994
4995        mutex_unlock(&lreq->lock);
4996        up_read(&osdc->lock);
4997        return ret;
4998}
4999
5000static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
5001{
5002        u8 struct_v;
5003        u32 struct_len;
5004        int ret;
5005
5006        ret = ceph_start_decoding(p, end, 2, "watch_item_t",
5007                                  &struct_v, &struct_len);
5008        if (ret)
5009                goto bad;
5010
5011        ret = -EINVAL;
5012        ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad);
5013        ceph_decode_64_safe(p, end, item->cookie, bad);
5014        ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */
5015
5016        if (struct_v >= 2) {
5017                ret = ceph_decode_entity_addr(p, end, &item->addr);
5018                if (ret)
5019                        goto bad;
5020        } else {
5021                ret = 0;
5022        }
5023
5024        dout("%s %s%llu cookie %llu addr %s\n", __func__,
5025             ENTITY_NAME(item->name), item->cookie,
5026             ceph_pr_addr(&item->addr));
5027bad:
5028        return ret;
5029}
5030
5031static int decode_watchers(void **p, void *end,
5032                           struct ceph_watch_item **watchers,
5033                           u32 *num_watchers)
5034{
5035        u8 struct_v;
5036        u32 struct_len;
5037        int i;
5038        int ret;
5039
5040        ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
5041                                  &struct_v, &struct_len);
5042        if (ret)
5043                return ret;
5044
5045        *num_watchers = ceph_decode_32(p);
5046        *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
5047        if (!*watchers)
5048                return -ENOMEM;
5049
5050        for (i = 0; i < *num_watchers; i++) {
5051                ret = decode_watcher(p, end, *watchers + i);
5052                if (ret) {
5053                        kfree(*watchers);
5054                        return ret;
5055                }
5056        }
5057
5058        return 0;
5059}
5060
5061/*
5062 * On success, the caller is responsible for:
5063 *
5064 *     kfree(watchers);
5065 */
5066int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
5067                            struct ceph_object_id *oid,
5068                            struct ceph_object_locator *oloc,
5069                            struct ceph_watch_item **watchers,
5070                            u32 *num_watchers)
5071{
5072        struct ceph_osd_request *req;
5073        struct page **pages;
5074        int ret;
5075
5076        req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5077        if (!req)
5078                return -ENOMEM;
5079
5080        ceph_oid_copy(&req->r_base_oid, oid);
5081        ceph_oloc_copy(&req->r_base_oloc, oloc);
5082        req->r_flags = CEPH_OSD_FLAG_READ;
5083
5084        pages = ceph_alloc_page_vector(1, GFP_NOIO);
5085        if (IS_ERR(pages)) {
5086                ret = PTR_ERR(pages);
5087                goto out_put_req;
5088        }
5089
5090        osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
5091        ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
5092                                                 response_data),
5093                                 pages, PAGE_SIZE, 0, false, true);
5094
5095        ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5096        if (ret)
5097                goto out_put_req;
5098
5099        ceph_osdc_start_request(osdc, req, false);
5100        ret = ceph_osdc_wait_request(osdc, req);
5101        if (ret >= 0) {
5102                void *p = page_address(pages[0]);
5103                void *const end = p + req->r_ops[0].outdata_len;
5104
5105                ret = decode_watchers(&p, end, watchers, num_watchers);
5106        }
5107
5108out_put_req:
5109        ceph_osdc_put_request(req);
5110        return ret;
5111}
5112EXPORT_SYMBOL(ceph_osdc_list_watchers);
5113
5114/*
5115 * Call all pending notify callbacks - for use after a watch is
5116 * unregistered, to make sure no more callbacks for it will be invoked
5117 */
5118void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
5119{
5120        dout("%s osdc %p\n", __func__, osdc);
5121        flush_workqueue(osdc->notify_wq);
5122}
5123EXPORT_SYMBOL(ceph_osdc_flush_notifies);
5124
5125void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
5126{
5127        down_read(&osdc->lock);
5128        maybe_request_map(osdc);
5129        up_read(&osdc->lock);
5130}
5131EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
5132
5133/*
5134 * Execute an OSD class method on an object.
5135 *
5136 * @flags: CEPH_OSD_FLAG_*
5137 * @resp_len: in/out param for reply length
5138 */
5139int ceph_osdc_call(struct ceph_osd_client *osdc,
5140                   struct ceph_object_id *oid,
5141                   struct ceph_object_locator *oloc,
5142                   const char *class, const char *method,
5143                   unsigned int flags,
5144                   struct page *req_page, size_t req_len,
5145                   struct page **resp_pages, size_t *resp_len)
5146{
5147        struct ceph_osd_request *req;
5148        int ret;
5149
5150        if (req_len > PAGE_SIZE)
5151                return -E2BIG;
5152
5153        req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5154        if (!req)
5155                return -ENOMEM;
5156
5157        ceph_oid_copy(&req->r_base_oid, oid);
5158        ceph_oloc_copy(&req->r_base_oloc, oloc);
5159        req->r_flags = flags;
5160
5161        ret = osd_req_op_cls_init(req, 0, class, method);
5162        if (ret)
5163                goto out_put_req;
5164
5165        if (req_page)
5166                osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
5167                                                  0, false, false);
5168        if (resp_pages)
5169                osd_req_op_cls_response_data_pages(req, 0, resp_pages,
5170                                                   *resp_len, 0, false, false);
5171
5172        ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5173        if (ret)
5174                goto out_put_req;
5175
5176        ceph_osdc_start_request(osdc, req, false);
5177        ret = ceph_osdc_wait_request(osdc, req);
5178        if (ret >= 0) {
5179                ret = req->r_ops[0].rval;
5180                if (resp_pages)
5181                        *resp_len = req->r_ops[0].outdata_len;
5182        }
5183
5184out_put_req:
5185        ceph_osdc_put_request(req);
5186        return ret;
5187}
5188EXPORT_SYMBOL(ceph_osdc_call);
5189
5190/*
5191 * reset all osd connections
5192 */
5193void ceph_osdc_reopen_osds(struct ceph_osd_client *osdc)
5194{
5195        struct rb_node *n;
5196
5197        down_write(&osdc->lock);
5198        for (n = rb_first(&osdc->osds); n; ) {
5199                struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
5200
5201                n = rb_next(n);
5202                if (!reopen_osd(osd))
5203                        kick_osd_requests(osd);
5204        }
5205        up_write(&osdc->lock);
5206}
5207
5208/*
5209 * init, shutdown
5210 */
5211int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5212{
5213        int err;
5214
5215        dout("init\n");
5216        osdc->client = client;
5217        init_rwsem(&osdc->lock);
5218        osdc->osds = RB_ROOT;
5219        INIT_LIST_HEAD(&osdc->osd_lru);
5220        spin_lock_init(&osdc->osd_lru_lock);
5221        osd_init(&osdc->homeless_osd);
5222        osdc->homeless_osd.o_osdc = osdc;
5223        osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
5224        osdc->last_linger_id = CEPH_LINGER_ID_START;
5225        osdc->linger_requests = RB_ROOT;
5226        osdc->map_checks = RB_ROOT;
5227        osdc->linger_map_checks = RB_ROOT;
5228        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
5229        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
5230
5231        err = -ENOMEM;
5232        osdc->osdmap = ceph_osdmap_alloc();
5233        if (!osdc->osdmap)
5234                goto out;
5235
5236        osdc->req_mempool = mempool_create_slab_pool(10,
5237                                                     ceph_osd_request_cache);
5238        if (!osdc->req_mempool)
5239                goto out_map;
5240
5241        err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
5242                                PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
5243        if (err < 0)
5244                goto out_mempool;
5245        err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
5246                                PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
5247                                "osd_op_reply");
5248        if (err < 0)
5249                goto out_msgpool;
5250
5251        err = -ENOMEM;
5252        osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
5253        if (!osdc->notify_wq)
5254                goto out_msgpool_reply;
5255
5256        osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
5257        if (!osdc->completion_wq)
5258                goto out_notify_wq;
5259
5260        schedule_delayed_work(&osdc->timeout_work,
5261                              osdc->client->options->osd_keepalive_timeout);
5262        schedule_delayed_work(&osdc->osds_timeout_work,
5263            round_jiffies_relative(osdc->client->options->osd_idle_ttl));
5264
5265        return 0;
5266
5267out_notify_wq:
5268        destroy_workqueue(osdc->notify_wq);
5269out_msgpool_reply:
5270        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5271out_msgpool:
5272        ceph_msgpool_destroy(&osdc->msgpool_op);
5273out_mempool:
5274        mempool_destroy(osdc->req_mempool);
5275out_map:
5276        ceph_osdmap_destroy(osdc->osdmap);
5277out:
5278        return err;
5279}
5280
5281void ceph_osdc_stop(struct ceph_osd_client *osdc)
5282{
5283        destroy_workqueue(osdc->completion_wq);
5284        destroy_workqueue(osdc->notify_wq);
5285        cancel_delayed_work_sync(&osdc->timeout_work);
5286        cancel_delayed_work_sync(&osdc->osds_timeout_work);
5287
5288        down_write(&osdc->lock);
5289        while (!RB_EMPTY_ROOT(&osdc->osds)) {
5290                struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5291                                                struct ceph_osd, o_node);
5292                close_osd(osd);
5293        }
5294        up_write(&osdc->lock);
5295        WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
5296        osd_cleanup(&osdc->homeless_osd);
5297
5298        WARN_ON(!list_empty(&osdc->osd_lru));
5299        WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5300        WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
5301        WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
5302        WARN_ON(atomic_read(&osdc->num_requests));
5303        WARN_ON(atomic_read(&osdc->num_homeless));
5304
5305        ceph_osdmap_destroy(osdc->osdmap);
5306        mempool_destroy(osdc->req_mempool);
5307        ceph_msgpool_destroy(&osdc->msgpool_op);
5308        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5309}
5310
5311static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
5312                                     u64 src_snapid, u64 src_version,
5313                                     struct ceph_object_id *src_oid,
5314                                     struct ceph_object_locator *src_oloc,
5315                                     u32 src_fadvise_flags,
5316                                     u32 dst_fadvise_flags,
5317                                     u32 truncate_seq, u64 truncate_size,
5318                                     u8 copy_from_flags)
5319{
5320        struct ceph_osd_req_op *op;
5321        struct page **pages;
5322        void *p, *end;
5323
5324        pages = ceph_alloc_page_vector(1, GFP_KERNEL);
5325        if (IS_ERR(pages))
5326                return PTR_ERR(pages);
5327
5328        op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
5329                             dst_fadvise_flags);
5330        op->copy_from.snapid = src_snapid;
5331        op->copy_from.src_version = src_version;
5332        op->copy_from.flags = copy_from_flags;
5333        op->copy_from.src_fadvise_flags = src_fadvise_flags;
5334
5335        p = page_address(pages[0]);
5336        end = p + PAGE_SIZE;
5337        ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
5338        encode_oloc(&p, end, src_oloc);
5339        ceph_encode_32(&p, truncate_seq);
5340        ceph_encode_64(&p, truncate_size);
5341        op->indata_len = PAGE_SIZE - (end - p);
5342
5343        ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
5344                                 op->indata_len, 0, false, true);
5345        return 0;
5346}
5347
5348int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
5349                        u64 src_snapid, u64 src_version,
5350                        struct ceph_object_id *src_oid,
5351                        struct ceph_object_locator *src_oloc,
5352                        u32 src_fadvise_flags,
5353                        struct ceph_object_id *dst_oid,
5354                        struct ceph_object_locator *dst_oloc,
5355                        u32 dst_fadvise_flags,
5356                        u32 truncate_seq, u64 truncate_size,
5357                        u8 copy_from_flags)
5358{
5359        struct ceph_osd_request *req;
5360        int ret;
5361
5362        req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
5363        if (!req)
5364                return -ENOMEM;
5365
5366        req->r_flags = CEPH_OSD_FLAG_WRITE;
5367
5368        ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
5369        ceph_oid_copy(&req->r_t.base_oid, dst_oid);
5370
5371        ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
5372                                        src_oloc, src_fadvise_flags,
5373                                        dst_fadvise_flags, truncate_seq,
5374                                        truncate_size, copy_from_flags);
5375        if (ret)
5376                goto out;
5377
5378        ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
5379        if (ret)
5380                goto out;
5381
5382        ceph_osdc_start_request(osdc, req, false);
5383        ret = ceph_osdc_wait_request(osdc, req);
5384
5385out:
5386        ceph_osdc_put_request(req);
5387        return ret;
5388}
5389EXPORT_SYMBOL(ceph_osdc_copy_from);
5390
5391int __init ceph_osdc_setup(void)
5392{
5393        size_t size = sizeof(struct ceph_osd_request) +
5394            CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
5395
5396        BUG_ON(ceph_osd_request_cache);
5397        ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
5398                                                   0, 0, NULL);
5399
5400        return ceph_osd_request_cache ? 0 : -ENOMEM;
5401}
5402
5403void ceph_osdc_cleanup(void)
5404{
5405        BUG_ON(!ceph_osd_request_cache);
5406        kmem_cache_destroy(ceph_osd_request_cache);
5407        ceph_osd_request_cache = NULL;
5408}
5409
5410/*
5411 * handle incoming message
5412 */
5413static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5414{
5415        struct ceph_osd *osd = con->private;
5416        struct ceph_osd_client *osdc = osd->o_osdc;
5417        int type = le16_to_cpu(msg->hdr.type);
5418
5419        switch (type) {
5420        case CEPH_MSG_OSD_MAP:
5421                ceph_osdc_handle_map(osdc, msg);
5422                break;
5423        case CEPH_MSG_OSD_OPREPLY:
5424                handle_reply(osd, msg);
5425                break;
5426        case CEPH_MSG_OSD_BACKOFF:
5427                handle_backoff(osd, msg);
5428                break;
5429        case CEPH_MSG_WATCH_NOTIFY:
5430                handle_watch_notify(osdc, msg);
5431                break;
5432
5433        default:
5434                pr_err("received unknown message type %d %s\n", type,
5435                       ceph_msg_type_name(type));
5436        }
5437
5438        ceph_msg_put(msg);
5439}
5440
5441/*
5442 * Lookup and return message for incoming reply.  Don't try to do
5443 * anything about a larger than preallocated data portion of the
5444 * message at the moment - for now, just skip the message.
5445 */
5446static struct ceph_msg *get_reply(struct ceph_connection *con,
5447                                  struct ceph_msg_header *hdr,
5448                                  int *skip)
5449{
5450        struct ceph_osd *osd = con->private;
5451        struct ceph_osd_client *osdc = osd->o_osdc;
5452        struct ceph_msg *m = NULL;
5453        struct ceph_osd_request *req;
5454        int front_len = le32_to_cpu(hdr->front_len);
5455        int data_len = le32_to_cpu(hdr->data_len);
5456        u64 tid = le64_to_cpu(hdr->tid);
5457
5458        down_read(&osdc->lock);
5459        if (!osd_registered(osd)) {
5460                dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5461                *skip = 1;
5462                goto out_unlock_osdc;
5463        }
5464        WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5465
5466        mutex_lock(&osd->lock);
5467        req = lookup_request(&osd->o_requests, tid);
5468        if (!req) {
5469                dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5470                     osd->o_osd, tid);
5471                *skip = 1;
5472                goto out_unlock_session;
5473        }
5474
5475        ceph_msg_revoke_incoming(req->r_reply);
5476
5477        if (front_len > req->r_reply->front_alloc_len) {
5478                pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5479                        __func__, osd->o_osd, req->r_tid, front_len,
5480                        req->r_reply->front_alloc_len);
5481                m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
5482                                 false);
5483                if (!m)
5484                        goto out_unlock_session;
5485                ceph_msg_put(req->r_reply);
5486                req->r_reply = m;
5487        }
5488
5489        if (data_len > req->r_reply->data_length) {
5490                pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5491                        __func__, osd->o_osd, req->r_tid, data_len,
5492                        req->r_reply->data_length);
5493                m = NULL;
5494                *skip = 1;
5495                goto out_unlock_session;
5496        }
5497
5498        m = ceph_msg_get(req->r_reply);
5499        dout("get_reply tid %lld %p\n", tid, m);
5500
5501out_unlock_session:
5502        mutex_unlock(&osd->lock);
5503out_unlock_osdc:
5504        up_read(&osdc->lock);
5505        return m;
5506}
5507
5508static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5509{
5510        struct ceph_msg *m;
5511        int type = le16_to_cpu(hdr->type);
5512        u32 front_len = le32_to_cpu(hdr->front_len);
5513        u32 data_len = le32_to_cpu(hdr->data_len);
5514
5515        m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
5516        if (!m)
5517                return NULL;
5518
5519        if (data_len) {
5520                struct page **pages;
5521
5522                pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
5523                                               GFP_NOIO);
5524                if (IS_ERR(pages)) {
5525                        ceph_msg_put(m);
5526                        return NULL;
5527                }
5528
5529                ceph_msg_data_add_pages(m, pages, data_len, 0, true);
5530        }
5531
5532        return m;
5533}
5534
5535static struct ceph_msg *alloc_msg(struct ceph_connection *con,
5536                                  struct ceph_msg_header *hdr,
5537                                  int *skip)
5538{
5539        struct ceph_osd *osd = con->private;
5540        int type = le16_to_cpu(hdr->type);
5541
5542        *skip = 0;
5543        switch (type) {
5544        case CEPH_MSG_OSD_MAP:
5545        case CEPH_MSG_OSD_BACKOFF:
5546        case CEPH_MSG_WATCH_NOTIFY:
5547                return alloc_msg_with_page_vector(hdr);
5548        case CEPH_MSG_OSD_OPREPLY:
5549                return get_reply(con, hdr, skip);
5550        default:
5551                pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5552                        osd->o_osd, type);
5553                *skip = 1;
5554                return NULL;
5555        }
5556}
5557
5558/*
5559 * Wrappers to refcount containing ceph_osd struct
5560 */
5561static struct ceph_connection *get_osd_con(struct ceph_connection *con)
5562{
5563        struct ceph_osd *osd = con->private;
5564        if (get_osd(osd))
5565                return con;
5566        return NULL;
5567}
5568
5569static void put_osd_con(struct ceph_connection *con)
5570{
5571        struct ceph_osd *osd = con->private;
5572        put_osd(osd);
5573}
5574
5575/*
5576 * authentication
5577 */
5578/*
5579 * Note: returned pointer is the address of a structure that's
5580 * managed separately.  Caller must *not* attempt to free it.
5581 */
5582static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
5583                                        int *proto, int force_new)
5584{
5585        struct ceph_osd *o = con->private;
5586        struct ceph_osd_client *osdc = o->o_osdc;
5587        struct ceph_auth_client *ac = osdc->client->monc.auth;
5588        struct ceph_auth_handshake *auth = &o->o_auth;
5589
5590        if (force_new && auth->authorizer) {
5591                ceph_auth_destroy_authorizer(auth->authorizer);
5592                auth->authorizer = NULL;
5593        }
5594        if (!auth->authorizer) {
5595                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
5596                                                      auth);
5597                if (ret)
5598                        return ERR_PTR(ret);
5599        } else {
5600                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
5601                                                     auth);
5602                if (ret)
5603                        return ERR_PTR(ret);
5604        }
5605        *proto = ac->protocol;
5606
5607        return auth;
5608}
5609
5610static int add_authorizer_challenge(struct ceph_connection *con,
5611                                    void *challenge_buf, int challenge_buf_len)
5612{
5613        struct ceph_osd *o = con->private;
5614        struct ceph_osd_client *osdc = o->o_osdc;
5615        struct ceph_auth_client *ac = osdc->client->monc.auth;
5616
5617        return ceph_auth_add_authorizer_challenge(ac, o->o_auth.authorizer,
5618                                            challenge_buf, challenge_buf_len);
5619}
5620
5621static int verify_authorizer_reply(struct ceph_connection *con)
5622{
5623        struct ceph_osd *o = con->private;
5624        struct ceph_osd_client *osdc = o->o_osdc;
5625        struct ceph_auth_client *ac = osdc->client->monc.auth;
5626
5627        return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer);
5628}
5629
5630static int invalidate_authorizer(struct ceph_connection *con)
5631{
5632        struct ceph_osd *o = con->private;
5633        struct ceph_osd_client *osdc = o->o_osdc;
5634        struct ceph_auth_client *ac = osdc->client->monc.auth;
5635
5636        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
5637        return ceph_monc_validate_auth(&osdc->client->monc);
5638}
5639
5640static void osd_reencode_message(struct ceph_msg *msg)
5641{
5642        int type = le16_to_cpu(msg->hdr.type);
5643
5644        if (type == CEPH_MSG_OSD_OP)
5645                encode_request_finish(msg);
5646}
5647
5648static int osd_sign_message(struct ceph_msg *msg)
5649{
5650        struct ceph_osd *o = msg->con->private;
5651        struct ceph_auth_handshake *auth = &o->o_auth;
5652
5653        return ceph_auth_sign_message(auth, msg);
5654}
5655
5656static int osd_check_message_signature(struct ceph_msg *msg)
5657{
5658        struct ceph_osd *o = msg->con->private;
5659        struct ceph_auth_handshake *auth = &o->o_auth;
5660
5661        return ceph_auth_check_message_signature(auth, msg);
5662}
5663
5664static const struct ceph_connection_operations osd_con_ops = {
5665        .get = get_osd_con,
5666        .put = put_osd_con,
5667        .dispatch = dispatch,
5668        .get_authorizer = get_authorizer,
5669        .add_authorizer_challenge = add_authorizer_challenge,
5670        .verify_authorizer_reply = verify_authorizer_reply,
5671        .invalidate_authorizer = invalidate_authorizer,
5672        .alloc_msg = alloc_msg,
5673        .reencode_message = osd_reencode_message,
5674        .sign_message = osd_sign_message,
5675        .check_message_signature = osd_check_message_signature,
5676        .fault = osd_fault,
5677};
5678